章节
第一章:https://blog.51cto.com/kimiliucn/7263756
第二章:
作者:西瓜程序猿
主页传送门:https://blog.51cto.com/kimiliucn
开发背景
在开发某一个需求的时候,领导要求使用RocketMQ(阿里云版) 作为消息队列。使用的版本是5.x,目前也已经没有4.x购买的入口了,所以只能买5.x系列。公司项目还是用的比较老的技术.NET Framework 4.8,生产者主要有WebAPI/MVC/JOB(控制台应用程序),然后消费者采用的是Windows服务进行长链接消费信息。这期间因为各种原因踩过很多坑,然后咨询了客服说RocketMQ(阿里云版)5.0不支持.NET Framework,但最终操作下来竟然能使用(只支持集群模式,不支持订阅模式),那今天[西瓜程序猿]来记录一下如何使用RocketMQ(阿里云版),给各位小伙伴作为参考防止踩坑。
环境版本
阿里云RocketMQ版本:5.0系列
.NET版本:.NET Framework 4.8
.NET版本:生产端(WebAPI/MVC/JOB)、消费端(Windows服务)
如果不知道怎么选,或者不知道怎么买云消息队列RocketMQ(阿里云版)?可以联系我[西瓜程序猿],如果需要特价购买可以通过下面地址访问:
活动地址:官网地址
一、RocketMQ基本介绍
官网地址:RocketMQ · 官方网站 | RocketMQ
RocketMQ阿里云-官方文档:如何快速入门RocketMQ_云消息队列 MQ-阿里云帮助中心
1.1-RocketMQ简介
RocketMQ(Apache RocketMQ)是一个开源的分布式消息中间件系统,由阿里巴巴集团旗下的阿里云计算平台团队开发和维护。它最初是为满足阿里巴巴内部大规模分布式消息传递需求而设计的,后来成为 Apache 基金会的顶级开源项目之一。
1.2-RocketMQ优势
在众多应用场景中广泛应用,如电子商务、物流配送、金融支付、大数据处理等。它被许多企业用于构建高性能和可靠的消息队列系统,实现异步通信和解耦应用程序组件。RocketMQ 提供了可靠、可扩展和高性能的消息传递解决方案,具备以下特点:
- 异步通信:RocketMQ 支持发布-订阅和点对点两种消息通信模式,以满足不同场景下的需求。
- 高可靠性:提供多种存储选项,包括本地文件存储和远程共享存储,以确保消息的可靠传输和持久化。
- 高吞吐量:支持水平扩展,可以轻松应对大规模消息传递和高并发的需求。
- 严格有序性:支持消息按照发送顺序和消费顺序进行有序处理,保证消息的顺序性。
- 分布式架构:采用分布式架构,具备良好的横向扩展能力和高可用性。
1.3-RocketMQ基本概念
此段内容根据阿里云官方文档整理:如何快速入门RocketMQ_云消息队列 MQ-阿里云帮助中心
主题(Topic):云消息队列 RocketMQ 版中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。
消息类型(MessageType):云消息队列 RocketMQ 版中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。云消息队列 RocketMQ 版支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
消息队列(MessageQueue):队列是云消息队列 RocketMQ 版中消息存储和传输的实际容器,也是消息的最小存储单元。云消息队列 RocketMQ 版的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
消息(Message):消息是云消息队列 RocketMQ 版中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到云消息队列 RocketMQ 版服务端,服务端按照相关语义将消息投递到消费端进行消费。
消息视图(MessageView):消息视图是云消息队列 RocketMQ 版面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签(MessageTag):消息标签是云消息队列 RocketMQ 版提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。
消息位点(MessageQueueOffset):消息是按到达云消息队列 RocketMQ 版服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset):一条消息被某个消费者消费完成后不会立即从队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
消息索引(MessageKey):消息索引是云消息队列 RocketMQ 版提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer):生产者是云消息队列 RocketMQ 版系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成云消息队列 RocketMQ 版的消息并发送至服务端。
事务检查器(TransactionChecker):云消息队列 RocketMQ 版中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。
事务状态(TransactionResolution):云消息队列 RocketMQ 版中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。
消费者分组(ConsumerGroup):消费者分组是云消息队列 RocketMQ 版系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在云消息队列 RocketMQ 版中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
消费者(Consumer):消费者是云消息队列 RocketMQ 版中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从云消息队列 RocketMQ 版服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
消费结果(ConsumeResult):云消息队列 RocketMQ 版中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系(Subscription):订阅关系是云消息队列 RocketMQ 版系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
消息过滤:消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在云消息队列 RocketMQ 版的服务端完成。
重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到云消息队列 RocketMQ 版服务端的消息。
消息轨迹:在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由云消息队列 RocketMQ 版服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积:生产者已经将消息发送到云消息队列 RocketMQ 版的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在云消息队列 RocketMQ 版的服务端保存着未被消费的消息,该状态即消息堆积。
事务消息:事务消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
定时/延时消息:定时/延时消息是云消息队列 RocketMQ 版提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息:顺序消息是云消息队列 RocketMQ 版提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
二、RocketMQ前期准备
首先需要下载相关.NET相关的SDK,然后在阿里云后台找到【实例用户名】【实例密码】【接入点链接信息】等信息,最后还需要创建【Group ID】和【Topic】用于给我们调用。
2.1-下载资源包及SDK
[西瓜程序猿]给正在看这篇文章的小伙伴提供了资源包,【ONSClient4CPP】文件夹里面包含使用RocketMQ阿里云版本要依赖的DLL文件,【RocketMQ_SDK】文件夹包含了.NET Framework使用RocketMQ阿里云版本要用到的SDK文件,【vcredistx64】文件夹包含了Visual C++ 2015运行时环境安装包,因为C++ DLL文件需要依赖这个,这个需要进行安装。还包含其他辅助的工具及代码。
可以访问下载(如果失效了,请联系我)。
下载地址(编码:stalua6n):https://yongteng.lanzoub.com/ice5a16p978h
密码:1q81
文件截图:
2.2-查询基本配置信息
(1)首先点击下面链接进入消息队列RocketMQ工作台,如果没有登录首先要进行登录。然后在【资源分布】里面找到要操作的地域列表,点击【地域名称】。
消息队列RocketMQ(阿里云版):阿里云登录 - 欢迎登录阿里云,安全稳定的云计算服务平台
(2)然后可以看到实例列表,找到要操作的实例,点击【详情】。
(3)然后在【运行信息】中找到【实例用户名】和【实例密码】,注意不是实例ID/实例名称。
(5)然后还在当前页面,往下翻到【TCP 协议接入点】中找到接入点和网络信息。如果大家需要在外网访问自行开通公网访问,好像需要另外付费。[西瓜程序猿]这边只能通过【VPC专有网络】访问,也就是只能在内网访问。所以我以VPC专有网络来介绍。
那我们就把必要的信息都集齐全了,分别是【实例用户名】【实例密码】【TCP 协议接入点连接】。
2.3-配置Topic和Group
那什么是Topic呢?云消息队列 RocketMQ 版中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。可以理解为不同的系统、不同的发布环境配置不同的Topic。然后来说一下如何配Topic和GroupID。
(1)在左侧导航栏找到【Topic管理】,然后点击【创建Topic】。名称和描述都是必填的,消息类型根据自己业务场景选择。[西瓜程序猿]这边要求消息按照顺序发送和消费,所以选择【顺序消息】。
(2)然后再来创建GroupID。一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
那我们就把必要的资源都创建好了,分别是【Topic名称】【GroupID】。
Topic名称:
GroupID:
二、RockeetMQ核心部分封装
官方开发文档:RocketMQONS系列.NETSDK的版本信息_云消息队列 MQ-阿里云帮助中心
2.1-创建类库项目
(1)点击【创建新项目】,然后选择【类库(.NET Framework)】。
目录:
(2)然后新建一个【SDK】文件夹,将下载好的资源包里面文件夹【RocketMQ_SDK】的文件,复制到项目中【SDK】文件夹里面。
资源包:
项目:
(3)然后就安装相关的包,分别是【log4net】用来记录日志,【Newtonsoft.Json】用来做JSON序列化和反序列化。(如果自己项目中有日志系统和反序列化工具,也可以不安装,根据自己项目依赖公共辅助层去使用)
<package id="log4net" versinotallow="2.0.15" targetFramework="net48" />
<package id="Newtonsoft.Json" versinotallow="12.0.1" targetFramework="net48" />
(4)创建了一个【Helper】文件夹写一个JSON反序列化的帮助类(根据自己业务需要创建)。
目录:
代码:
public class JsonUtility
{
/// <summary>
/// 将实体类序列化为JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="data"></param>
/// <returns></returns>
static public string SerializeJSON<T>(T data)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(data);
}
/// <summary>
/// 反序列化JSON
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="json"></param>
/// <returns></returns>
static public T DeserializeJSON<T>(string json)
{
return Newtonsoft.Json.JsonConvert.DeserializeObject<T>(json);
}
/// <summary>
/// 将IEnumerable<T,V>序列化为JSON
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
static public string SerializeDictionary(IEnumerable<KeyValuePair<string, string>> value)
{
return Newtonsoft.Json.JsonConvert.SerializeObject(value.Select(I => new { label = I.Key, value = I.Value }));
}
}
(5)然后在创建一个【Attributes】文件夹。在里面新建两个Attribute特性,一个【ConsumerTagAttribute】用来区分Tag标签,另一个【EventTypeAttribute】用来区分事件类型。
目录:
代码:
/// <summary>
/// Tag标签
/// </summary>
public class ConsumerTagAttribute : Attribute
{
public string Tag { get; set; }
public ConsumerTagAttribute(string tag)
{
Tag = tag;
}
}
/// <summary>
/// 事件类型
/// </summary>
public class EventTypeAttribute : Attribute
{
public string EventType { get; set; }
public EventTypeAttribute(string eventType)
{
EventType = eventType;
}
}
2.2-封装传输实体模型
然后我们需要设计生产者和消费者直接需要传输共同的消息时哪些。
目前想到的(如果有好的建议可以在评论区讨论哈):
MessageId:消息id
Tag:对应RocketMQ中Tag
SendTime:发送时间
Source:消息来源
EventType:事件类型
Body:消息体
目录:
(1)创建一个【Models】文件夹,用来存相关的实体。然后创建【IQueueOnsCommonModel】生产者/消费者公共模型接口,然后创建【QueueOnsCommonModel】文件实现IQueueOnsCommonModel接口。
IQueueOnsCommonModel:
/// <summary>
/// 生产者/消费者公共模型接口
/// </summary>
public interface IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
string MessageId { get; set; }
/// <summary>
/// 对应RocketMQ中Tag
/// </summary>
string Tag { get; set; }
/// <summary>
/// 发送时间
/// </summary>
DateTime SendTime { get; set; }
/// <summary>
/// 消息来源
/// </summary>
string Source { get; set; }
/// <summary>
/// 事件类型
/// </summary>
string EventType { get; set; }
/// <summary>
/// 消息体
/// </summary>
string Body { get; set; }
}
QueueOnsCommonModel:
/// <summary>
/// 生产者/消费者公共模型实现
/// </summary>
public class QueueOnsCommonModel : IQueueOnsCommonModel
{
/// <summary>
/// 消息id
/// </summary>
public string MessageId { get; set; }
/// <summary>
/// 对应RocketMQ中Tag
/// </summary>
public string Tag { get; set; }
/// <summary>
/// 发送时间
/// </summary>
public DateTime SendTime { get; set; }
/// <summary>
/// 消息来源
/// </summary>
public string Source { get; set; }
/// <summary>
/// 事件类型
/// </summary>
public string EventType { get; set; }
/// <summary>
/// 消息体
/// </summary>
public string Body { get; set; }
}
(2)创建一个【ONSPropertyConfigModel】文件,用来做配置文件的实体。
/// <summary>
/// RocketMQ配置属性
/// </summary>
public class ONSPropertyConfigModel
{
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。
/// </summary>
public string AccessKey { get; set; }
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。
/// </summary>
public string SecretKey { get; set; }
/// <summary>
/// 设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。
/// </summary>
public string GroupId { get; set; }
/// <summary>
/// 您在云消息队列 RocketMQ 版控制台创建的Topic。
/// </summary>
public string Topics { get; set; }
/// <summary>
/// 设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”
/// </summary>
public string NAMESRV_ADDR { get; set; }
/// <summary>
/// 消费者/生产者目标来源
/// </summary>
public string OnsClientCode { get; set; }
}
(3)然后创建一个【QueueTagConsts】文件,用来订单消息队列Tag常量,和一个【QueueOnsEventType】文件,用来定义事件类型。
目录:
QueueTagConsts:
/// <summary>
/// 消息队列Tag常量定义
/// 命名规范:项目名_自定义业务名_Tag
/// </summary>
public class QueueTagConsts
{
/// <summary>
/// 测试Sample
/// </summary>
public const string XG_Blog_Sample_Tag = "XG_Blog_Sample_Tag";
}
QueueOnsEventType:
/// <summary>
/// 消息队列-事件类型
/// </summary>
public class QueueOnsEventType
{
/// <summary>
/// RocketMQ测试
/// </summary>
public const string RocketMQ_TEST = "RocketMQ_TEST";
}
2.3-封装连接RocketMQ
创建一个【Core】文件夹,然后创建一个【IConsumerMsg】消费接口,和一个【QueueOnsProducer】文件用来封装RocketMQ生产者连接。
目录:
IConsumerMsg:
/// <summary>
/// 消费接口
/// </summary>
public interface IConsumerMsg
{
void Consume(QueueOnsCommonModel model);
}
QueueOnsProducer:
/// <summary>
/// 消息队列-RocketMQ生产者
/// </summary>
public class QueueOnsProducer
{
private static Producer _producer;
private static PushConsumer _consumer;
private readonly static ILog logger = LogManager.GetLogger(typeof(QueueOnsProducer));
private static string Ons_Topic = "";
private static string Ons_AccessKey = "";
private static string Ons_SecretKey = "";
private static string Ons_GroupId = "";
private static string Ons_NameSrv = "";
private static int Ons_ConsumptionPattern = 1;
private static string Ons_Client_Code = "Test_RocketMQ_Producer";
private const string Ons_LogPath = "C://rocket_mq_logs";
public static string getOnsTopic
{
get
{
return Ons_Topic;
}
}
public static string getOnsClientCode
{
get
{
return Ons_Client_Code;
}
}
private static ONSFactoryProperty getFactoryPropertyProducer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
return factoryInfo;
}
private static ONSFactoryProperty getFactoryPropertyConsumer()
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, Ons_AccessKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, Ons_SecretKey);
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, Ons_GroupId);
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, Ons_Topic);
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, Ons_NameSrv);
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, Ons_LogPath);
//消费模式(1:集群消费、2:广播消费)
if (Ons_ConsumptionPattern == 1)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
}
else if (Ons_ConsumptionPattern == 2)
{
factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
}
return factoryInfo;
}
public static void CreateProducer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_producer = ONSFactory.getInstance().createProducer(getFactoryPropertyProducer());
}
public static void StartProducer()
{
if (_producer != null)
{
_producer.start();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]生产者 启动 成功!";
logger.Info(msg);
}
else
{
throw new ArgumentNullException("_producer is null,请先执行[CreateProducer]创建生产者后启动");
}
}
public static void ShutdownProducer()
{
if (_producer != null)
{
_producer.shutdown();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]生产者 已关闭连接!";
logger.Info(msg);
}
}
public static string SendMessage(QueueOnsCommonModel model, string tag = "RegisterLog")
{
if (model == null) { throw new ArgumentNullException("model is null"); }
model.SendTime = DateTime.Now;
model.Source = Ons_Client_Code;
var send_str = JsonUtility.SerializeJSON(model);
byte[] bytes = Encoding.UTF8.GetBytes(send_str);
string str_new_msg = Encoding.Default.GetString(bytes);
logger.Info("【发送队列消息】消息内容:" + str_new_msg);
string msg_key = model.MessageId;
string msg_id = string.Empty;
Message msg = new Message(Ons_Topic, tag, str_new_msg);
msg.setKey(msg_key);
try
{
SendResultONS sendResult = _producer.send(msg);
msg_id = sendResult.getMessageId();
logger.Info("【发送队列消息】消息ID:" + msg_id);
}
catch (Exception ex)
{
logger.Error($"【发送队列消息】发生异常了:{ex.Message}", ex);
throw ex;
}
return msg_id;
}
public static void CreatePushConsumer(ONSPropertyConfigModel config)
{
if (config == null) { throw new ArgumentNullException("config is null"); }
if (string.IsNullOrEmpty(config.AccessKey)) { throw new ArgumentNullException("AccessKey is null"); }
if (string.IsNullOrEmpty(config.SecretKey)) { throw new ArgumentNullException("SecretKey is null"); }
if (string.IsNullOrEmpty(config.GroupId)) { throw new ArgumentNullException("GroupId is null"); }
if (string.IsNullOrEmpty(config.Topics)) { throw new ArgumentNullException("Topics is null"); }
if (string.IsNullOrEmpty(config.NAMESRV_ADDR)) { throw new ArgumentNullException("NAMESRV_ADDR is null"); }
if (string.IsNullOrEmpty(config.OnsClientCode)) { throw new ArgumentNullException("OnsClientCode is null"); }
// 集群消费。
Ons_ConsumptionPattern = 1;
// 广播消费。
//Ons_ConsumptionPattern = 2;
Ons_AccessKey = config.AccessKey;
Ons_SecretKey = config.SecretKey;
Ons_GroupId = config.GroupId;
Ons_Topic = config.Topics;
Ons_NameSrv = config.NAMESRV_ADDR;
Ons_Client_Code = config.OnsClientCode;
_consumer = ONSFactory.getInstance().createPushConsumer(getFactoryPropertyConsumer());
}
public static void SetPushConsumer(MessageListener listener, string subExpression = "*")
{
_consumer.subscribe(Ons_Topic, subExpression, listener);
}
public static void StartPushConsumer()
{
_consumer.start();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]消费者 启动 成功!";
logger.Info(msg);
}
public static void ShutdownPushConsumer()
{
if (_consumer != null)
{
_consumer.shutdown();
string msg = $"【{Ons_Topic}】-【{Ons_Client_Code}】:[{DateTime.Now}]消费者 已关闭连接!";
logger.Info(msg);
}
}
}
三、生产端实现
3.1-创建生产者
3.1.1-创建MVC项目
(1)然后创建一个生产者,可以创建WebAPI/MVC/JOB(控制台应用程序)等等,那[西瓜程序猿]以MVC项目作为例子来介绍一下,创建一个名为【RocketMQ.Producer】项目。
运行测试一下:
3.1.2-项目依赖配置
阿里云提供的.NET版本是基于云消息队列 RocketMQ 版的CPP版本的托管封装,这样能保证.NET完全不依赖于Windows.NET公共库。内部采用C++多线程并发处理,保证.NET版本的高效稳定。
(1)底层的C++ DLL相关文件,以及Visual C++ 2015运行时环境安装包。如果没有安装Visual Studio 2015运行时环境,需要在资源包找到【vc_redist.x64.exe】文件进行安装。
(2)在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击【RocketMQ.Producer】项目,然后点击【属性】。
(3)点击左侧选项的【生成】,然后将目标平台改为【x64】。
(3)将资源包【ONSClient4CPP】文件夹里面所有的文件,复制到【bin】目录下。
资源包:
项目:
3.1.3-使用log4net
(1)使用lo4net输出日志,大家也可以用别的日志框架,记得在用到写入日志的地方自行进行修改。那[西瓜程序猿]使用log4net来介绍。我们在项目的根目录下创建一个文件为【log4net.config】。
(2)【log4net.config】内容如下。
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<configSections>
<section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
</configSections>
<system.web>
<compilation debug="true" targetFramework="4.5.2" />
<httpRuntime targetFramework="4.5.2" />
</system.web>
<log4net>
<!--错误日志:::记录错误日志-->
<!--按日期分割日志文件 一天一个-->
<!-- appender 定义日志输出方式 将日志以回滚文件的形式写到文件中。-->
<appender name="ErrorAppender" type="log4net.Appender.RollingFileAppender">
<!--保存路径:下面路径项目启动的时候自动在C盘中创建log、logError文件-->
<file value="log/error/error_" />
<!-- 如果想在本项目中添加路径,那就直接去掉C:\\ 只设置log\\LogError 项目启动中默认创建文件 -->
<appendToFile value="true"/>
<!--按照何种方式产生多个日志文件(日期[Date],文件大小[Size],混合[Composite])-->
<rollingStyle value="Date"/>
<!--这是按日期产生文件夹-->
<datePattern value="yyyy-MM-dd'.log'"/>
<!--是否只写到一个文件中-->
<staticLogFileName value="false"/>
<!--保留的log文件数量 超过此数量后 自动删除之前的 好像只有在 按Size分割时有效 设定值value="-1"为不限文件数-->
<param name="MaxSizeRollBackups" value="100"/>
<!--每个文件的大小。只在混合方式与文件大小方式下使用。超出大小后在所有文件名后自动增加正整数重新命名,数字最大的最早写入。可用的单位:KB|MB|GB。不要使用小数,否则会一直写入当前日志-->
<maximumFileSize value="50MB" />
<!-- layout 控制Appender的输出格式,也可以是xml 一个Appender只能是一个layout-->
<layout type="log4net.Layout.PatternLayout">
<!--每条日志末尾的文字说明-->
<!--输出格式 模板-->
<!-- <param name="ConversionPattern" value="记录时间:%date 线程ID:[%thread] 日志级别:%-5level 记录类:%logger
操作者ID:%property{Operator} 操作类型:%property{Action}%n 当前机器名:%property%n当前机器名及登录用户:%username %n
记录位置:%location%n 消息描述:%property{Message}%n 异常:%exception%n 消息:%message%newline%n%n" />-->
<!--样例:2008-03-26 13:42:32,111 [10] INFO Log4NetDemo.MainClass [(null)] - info-->
<!--<conversionPattern value="%newline %n记录时间:%date %n线程ID:[%thread] %n日志级别: %-5level %n错误描述:%message%newline %n"/>-->
<conversionPattern value="%n==========
%n【日志级别】%-5level
%n【记录时间】%date
%n【执行时间】[%r]毫秒
%n【出错文件】%F
%n【出错行号】%L
%n【出错的类】%logger 属性[%property{NDC}]
%n【错误描述】%message
%n【错误详情】%newline"/>
</layout>
<filter type="log4net.Filter.LevelRangeFilter,log4net">
<levelMin value="ERROR" />
<levelMax value="FATAL" />
</filter>
</appender>
<!--DEBUG:::记录DEBUG日志-->
<!--按日期分割日志文件 一天一个-->
<!-- appender 定义日志输出方式 将日志以回滚文件的形式写到文件中。-->
<appender name="DebugAppender" type="log4net.Appender.RollingFileAppender">
<!--保存路径:下面路径项目启动的时候自动在C盘中创建log、logError文件-->
<file value="log/debug/debug_" />
<!-- 如果想在本项目中添加路径,那就直接去掉C:\\ 只设置log\\LogError 项目启动中默认创建文件 -->
<appendToFile value="true"/>
<!--按照何种方式产生多个日志文件(日期[Date],文件大小[Size],混合[Composite])-->
<rollingStyle value="Date"/>
<!--这是按日期产生文件夹-->
<datePattern value="yyyy-MM-dd'.log'"/>
<!--是否只写到一个文件中-->
<staticLogFileName value="false"/>
<!--保留的log文件数量 超过此数量后 自动删除之前的 好像只有在 按Size分割时有效 设定值value="-1"为不限文件数-->
<param name="MaxSizeRollBackups" value="100"/>
<!--每个文件的大小。只在混合方式与文件大小方式下使用。超出大小后在所有文件名后自动增加正整数重新命名,数字最大的最早写入。可用的单位:KB|MB|GB。不要使用小数,否则会一直写入当前日志-->
<maximumFileSize value="50MB" />
<!-- layout 控制Appender的输出格式,也可以是xml 一个Appender只能是一个layout-->
<layout type="log4net.Layout.PatternLayout">
<!--每条日志末尾的文字说明-->
<!--输出格式 模板-->
<!-- <param name="ConversionPattern" value="记录时间:%date 线程ID:[%thread] 日志级别:%-5level 记录类:%logger
操作者ID:%property{Operator} 操作类型:%property{Action}%n 当前机器名:%property%n当前机器名及登录用户:%username %n
记录位置:%location%n 消息描述:%property{Message}%n 异常:%exception%n 消息:%message%newline%n%n" />-->
<!--样例:2008-03-26 13:42:32,111 [10] INFO Log4NetDemo.MainClass [(null)] - info-->
<!--<conversionPattern value="%newline %n记录时间:%date %n线程ID:[%thread] %n日志级别: %-5level %n错误描述:%message%newline %n"/>-->
<conversionPattern value="%n==========
%n【日志级别】%-2level
%n【记录时间】%date
%n【执行时间】[%r]毫秒
%n【debug文件】%F
%n【debug行号】%L
%n【debug类】%logger 属性[%property{NDC}]
%n【debug描述】%message"/>
</layout>
<filter type="log4net.Filter.LevelRangeFilter,log4net">
<levelMin value="DEBUG" />
<levelMax value="WARN" />
</filter>
</appender>
<!--INFO:::记录INFO日志-->
<!--按日期分割日志文件 一天一个-->
<!-- appender 定义日志输出方式 将日志以回滚文件的形式写到文件中。-->
<appender name="INFOAppender" type="log4net.Appender.RollingFileAppender">
<!--保存路径:下面路径项目启动的时候自动在C盘中创建log、logError文件-->
<file value="log/info/info_" />
<!-- 如果想在本项目中添加路径,那就直接去掉C:\\ 只设置log\\LogError 项目启动中默认创建文件 -->
<appendToFile value="true"/>
<!--按照何种方式产生多个日志文件(日期[Date],文件大小[Size],混合[Composite])-->
<rollingStyle value="Date"/>
<!--这是按日期产生文件夹-->
<datePattern value="yyyy-MM-dd'.log'"/>
<!--是否只写到一个文件中-->
<staticLogFileName value="false"/>
<!--保留的log文件数量 超过此数量后 自动删除之前的 好像只有在 按Size分割时有效 设定值value="-1"为不限文件数-->
<param name="MaxSizeRollBackups" value="100"/>
<!--每个文件的大小。只在混合方式与文件大小方式下使用。超出大小后在所有文件名后自动增加正整数重新命名,数字最大的最早写入。可用的单位:KB|MB|GB。不要使用小数,否则会一直写入当前日志-->
<maximumFileSize value="50MB" />
<!-- layout 控制Appender的输出格式,也可以是xml 一个Appender只能是一个layout-->
<layout type="log4net.Layout.PatternLayout">
<!--每条日志末尾的文字说明-->
<!--输出格式 模板-->
<!-- <param name="ConversionPattern" value="记录时间:%date 线程ID:[%thread] 日志级别:%-5level 记录类:%logger
操作者ID:%property{Operator} 操作类型:%property{Action}%n 当前机器名:%property%n当前机器名及登录用户:%username %n
记录位置:%location%n 消息描述:%property{Message}%n 异常:%exception%n 消息:%message%newline%n%n" />-->
<!--样例:2008-03-26 13:42:32,111 [10] INFO Log4NetDemo.MainClass [(null)] - info-->
<!--<conversionPattern value="%newline %n记录时间:%date %n线程ID:[%thread] %n日志级别: %-5level %n错误描述:%message%newline %n"/>-->
<conversionPattern value="%n==========
%n【日志级别】%-2level
%n【记录时间】%date
%n【执行时间】[%r]毫秒
%n【info文件】%F
%n【info行号】%L
%n【info类】%logger 属性[%property{NDC}]
%n【info描述】%message"/>
</layout>
<filter type="log4net.Filter.LevelRangeFilter,log4net">
<levelMin value="INFO" />
<levelMax value="WARN" />
</filter>
</appender>
<!--Set root logger level to DEBUG and its only appender to A1-->
<root>
<!--控制级别,由低到高: ALL|DEBUG|INFO|WARN|ERROR|FATAL|OFF-->
<level value="ALL" />
<appender-ref ref="DebugAppender" />
<appender-ref ref="ErrorAppender" />
<appender-ref ref="INFOAppender" />
</root>
</log4net>
</configuration>
(3)并且右击【log4net.config】文件,点击【属性】,然后将[复制到输出目录]设置为【始终复制】。
(4)然后安装log4net。在项目目录中右击【引用】,然后点击【管理NuGet程序包】
(5)然后点击浏览,搜索【log4net】,右侧点击安装即可。
(6)然后在【Global.asax】文件中注册log4net。
protected void Application_Start()
{
XmlConfigurator.Configure(new System.IO.FileInfo(Server.MapPath("~/log4net.config")));
}
3.1.4-封装发送消息
(1)在当前项目新建一个【Services】文件夹,作为服务层。大家也可以将Services创建为单独的类库,然后在这个项目上去引入【RocketMQ.Core】,在用【RocketMQ.Producer】项目区引入【Services】。那[西瓜程序猿]为了方便就直接在当前项目写了。然后再【Services】文件夹里面创建【BaseProducerService】文件,用于封装生产者发送消息服务。
目录:
代码:
/// <summary>
/// 生产者服务
/// </summary>
public class BaseProducerService
{
private readonly ILog logger = log4net.LogManager.GetLogger(typeof(BaseProducerService));
public void SendQueueOnsProducer(string body, string msg_tag, string mgs_eventType)
{
if (string.IsNullOrEmpty(body)) { throw new ArgumentNullException("body is null"); }
if (string.IsNullOrEmpty(msg_tag)) { throw new ArgumentNullException("msg_tag is null"); }
if (string.IsNullOrEmpty(mgs_eventType)) { throw new ArgumentNullException("mgs_eventType is null"); }
string ons_topic = QueueOnsProducer.getOnsTopic;
string ons_client_code = QueueOnsProducer.getOnsClientCode;
//TODO:这里需要生成唯一ID
string businessId = "MQ_1001";
logger.Info($"【发送RocketMQ消息队列消息】准备开始执行了:(消息key:{businessId})(tag:{msg_tag})(event_type:{mgs_eventType})");
logger.Info($"【发送RocketMQ消息队列消息】消息内容:{body}");
// TODO:在这里可以持久化生产者消息
logger.Info($"【发送RocketMQ消息队列消息】消息持久化成功!(消息主键id:{businessId})");
Task.Run(() =>
{
try
{
QueueOnsProducer.SendMessage(new QueueOnsCommonModel()
{
MessageId = businessId,
Tag = msg_tag,
EventType = mgs_eventType,
Body = body
}, msg_tag);
logger.Info($"【发送RocketMQ消息队列消息】消息发送成功!");
}
catch (Exception ex)
{
logger.Error($"【发送RocketMQ消息队列消息】发生异常:{ex.Message}", ex);
}
});
}
}
3.2-配置连接信息
(1)然后右击【RocketMQ.Producer】项目下,点击【引用】,然后将【RocketMQ.Core】项目勾选上确定。
(2)然后将前期准备的基本信息放在配置文件中。在【Web.config】文件进行配置。
代码:
<!--消息队列:RocketMQ-->
<!--设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。-->
<add key="ons_access_key" value="xxx" />
<!--设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。-->
<add key="ons_secret_key" value="xxx" />
<!--您在云消息队列 RocketMQ 版控制台创建的Topic。-->
<add key="ons_topic" value="XG_CXY_Test" />
<!--设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。-->
<add key="ons_groupId" value="XG_CXY_Group_Test" />
<!--设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”-->
<add key="ons_name_srv" value="xxx-xxx-xxx-xxx.rmq.aliyuncs.com:8080" />
<!--消费者/生产者目标来源-->
<add key="ons_client_code" value="XG_CXY_Producer_Develop" />
(3)然后创建一个【Config】文件夹,写一个获得【ConfigGeter】配置文件的帮助类。
代码:
/// <summary>
/// 配置文件
/// </summary>
public class ConfigGeter
{
private static T TryGetValueFromConfig<T>(Func<string, T> parseFunc, Func<T> defaultTValueFunc, [CallerMemberName] string key = "", string supressKey = "")
{
try
{
if (!string.IsNullOrWhiteSpace(supressKey))
{
key = supressKey;
}
var node = ConfigurationManager.AppSettings[key];
return !string.IsNullOrEmpty(node) ? parseFunc(node) : defaultTValueFunc();
}
catch (Exception ex)
{
return default(T);
}
}
#region 消息队列:RocketMQ
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。
/// </summary>
public static string ons_access_key
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。
/// </summary>
public static string ons_secret_key
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 您在云消息队列 RocketMQ 版控制台创建的Topic。
/// </summary>
public static string ons_topic
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。
/// </summary>
public static string ons_groupId
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
/// </summary>
public static string ons_name_srv
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
/// <summary>
/// 消息来源(生产者/消费端客户端编码)
/// </summary>
public static string ons_client_code
{
get
{
return TryGetValueFromConfig(_ => _, () => string.Empty);
}
}
#endregion
}
3.3-启动生产者
3.3.1-MVC/WebAPI项目
在【Global.asax】文件Application_Start方法中创建生产者,主要就是从配置文件中获得配置信息,然后调用【QueueOnsProducer.CreateProducer】方法创建消息队列生产者,通过调用【QueueOnsProducer.StartProducer】方法来启动生产者。
代码:
protected void Application_Start()
{
//创建生产者[西瓜程序猿]
string ons_access_key = ConfigGeter.ons_access_key;
string ons_secret_key = ConfigGeter.ons_secret_key;
string ons_topic = ConfigGeter.ons_topic;
string ons_groupId = ConfigGeter.ons_groupId;
string ons_name_srv = ConfigGeter.ons_name_srv;
string ons_client_code = ConfigGeter.ons_client_code;
QueueOnsProducer.CreateProducer(new ONSPropertyConfigModel()
{
AccessKey = ons_access_key,
SecretKey = ons_secret_key,
Topics = ons_topic,
GroupId = ons_groupId,
NAMESRV_ADDR = ons_name_srv,
OnsClientCode = ons_client_code,
});
//启动生产者
QueueOnsProducer.StartProducer();
}
3.3.2-JOB(控制台应用程序)项目
在【Program.cs】项目启动文件的Main方法中创建生产者,主要就是从配置文件中获得配置信息,然后调用【QueueOnsProducer.CreateProducer】方法创建消息队列生产者,通过调用【QueueOnsProducer.StartProducer】方法来启动生产者。
3.4-发送消息
(1)先设计好消息传输内容(Body)实体,比如我这边需要根据姓名/账号做一些异步业务处理,那我这笔就新建一个名为【RocketMQSampleModel】类。
目录:
代码:
/// <summary>
/// 发送RocketMQ测试消息实体
/// </summary>
public class RocketMQSampleModel
{
public string user_name { get; set; }
public string user_account { get; set; }
}
(2)然后就开始创建具体的发送RocketMQ消息的服务,可以根据自己的业务去创建,那[西瓜程序猿]这边就创建一个名为【SampleProducerService】的发送RocketMQ消息服务,然后继承【BaseProducerService】类。
目录:
代码:
public class SampleProducerService : BaseProducerService
{
/// <summary>
/// 发送RocketMQ测试消息
/// </summary>
/// <param name="model"></param>
public void SendTestMessageHandle(RocketMQSampleModel model)
{
if (model == null) return;
string msg_body = JsonUtility.SerializeJSON<RocketMQSampleModel>(model);
if (msg_body != null)
{
SendQueueOnsProducer(msg_body, QueueTagConsts.XG_Blog_Sample_Tag, QueueOnsEventType.RocketMQ_TEST);
}
}
}
(3)然后我们在Controller里面去调用一下发送消息。[西瓜程序猿]这里以【Home/Index】里面进行使用。
代码:
//调用消息队列
new SampleProducerService().SendTestMessageHandle(new RocketMQSampleModel()
{
user_name = "西瓜程序猿",
user_account = "admin"
});
(4)然后运行一下,看看能不能成功消息消息(默认就会执行到Home/Index)。[西瓜程序猿]这边需要先发布到服务器上才能调用,因为只能在服务器内网访问,那我这边发布一下。
注意:发布到服务器上后,也需要将资源包中的【ONSClient4CPP】所有文件拷贝到服务器站点的【bin】目录下。
(5)发布好了,然后运行一下,可以看到是成功了。
然后我们在来看看日志,提示发送成功了。
最后在去阿里云后台查询一下是否有这条消息记录。可以根据消息Key和消息ID两种方式进行查询。可以在后台看到是真正发送成功了。
博客对于图文有数量限制要求,那这一节先写到这里,持续更新中,下一章节有消费者的实现、防踩坑指南等等!
我是西瓜程序猿,感谢大家的阅读。编写不易,如果对大家有帮助,用您发财的小手点个赞和关注呗,非常感谢!有问题欢迎联系我一起学习与探讨~
下一章节:
版权声明:本文为原创文章,版权归 [西瓜程序猿] 所有,转载请注明出处,有任何疑问请私信咨询。
原文链接:https://blog.51cto.com/kimiliucn/7263756