章节
第一章:https://www.cnblogs.com/kimiliucn/p/17662052.html
第二章:https://www.cnblogs.com/kimiliucn/p/17667200.html

作者:西瓜程序猿
主页传送门:https://www.cnblogs.com/kimiliucn/
上一章节主要介绍了RocketMQ基本介绍和前期准备,以及如何创建生产者。那这一章节主要介绍一下消费端的实现、如何发布消费端,以及遇到的坑怎么去解决。
四、消费端实现
4.1-创建消费者
4.1.1-创建Windows服务项目
(1)右击解决方案,然后依次点击【添加】——>【新建项目】,然后选择【 Windows 服务(.NET Framework) 】,点击下一步。
注意:Windows服务只有在.NET Framework版本中才有了,在跨平台中使用Worker Service。

(2)修改项目名称,项目名称[西瓜程序猿]写的是【RocketMQ.Consumer】,然后框架选择的是【.NET Farmework 4.8】,这个可以根据自己的需要填写和选择,然后点击【创建】。

创建好的目录如下:【Program.cs】是主程序的入口,【Service1.cs】是服务的入口,可以创建多个,然后在Prodrams.cs中配置就好了。

(3)【Service1】服务名称可以重命名修改,此处我重命名为【RocketMQConsumerService】, Program.cs文件中也相对应的也要进行修改。


(4)然后我们就可以在【RocketMQConsumerService】中写业务逻辑代码了,有很多种方式可以定位到要写的具体代码文件,先列举两种常用的。
方法一:在【program.cs】文件中,找到这个类,按键盘上的F12可以直接进入查看文件。

方法二:直接右击,然后点击【查看代码】。
业务代码写到这里面:

到这一步消费者服务就创建好了,然后就写具体的业务代码就行了。注意:服务必须至少重写 OnStart 和 OnStop 才有用。
4.1.2-项目依赖配置
(1)在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击【RocketMQ.Consumer】项目,然后点击【属性】。

(2)点击左侧选项的【生成】,然后将目标平台改为【x64】。

(3)将资源包【ONSClient4CPP】文件夹里面所有的文件,复制到【bin/Debug】目录下。
资源包:

项目:

4.1.3-配置日志(log4net)
(1)为了方便测试,先介绍一下如何使用log4net做日志记录,当日志启动时和停止时我们记录一下。我们在项目目录下新建一个文件夹【LogConfig】,然后再创建一个文件为【log4net.config】。

(2)【log4net.config】内容如下。 (3)并且右击【log4net.config】文件,点击【属性】,然后将[复制到输出目录]设置为【始终复制】。

(4)然后安装log4net。在项目目录中右击【引用】,然后点击【管理NuGet程序包】

(5)然后点击浏览,搜索【log4net】,右侧点击安装。

(6)重要:然后配置【AssemblyInfo.cs 】文件,如果不配置,是输出不了日志的。

添加到底部即可:(如果你的【log4net.config】文件路径和我的不一样,记得修改成跟自己配置路径一样的)。

代码:- [assembly: log4net.Config.XmlConfigurator(ConfigFileExtension = "config", ConfigFile = "LogConfig/log4net.config", Watch = true)]
复制代码 (7)在服务启动方法【OnStart】中,配置启动log4net。

代码:- XmlConfigurator.Configure(new System.IO.FileInfo("LogConfig/log4net.config"));
复制代码 (8)然后就可以使用log4net了,首先在Windows服务中获得log4net的实例。

代码:- private static readonly log4net.ILog logger = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
复制代码 4.2-配置连接信息
(1)然后右击【RocketMQ.Consumer】项目下,点击【引用】,然后将【RocketMQ.Core】项目勾选上确定。

(2)然后将前期准备的基本信息放在配置文件中。在【App.config】文件进行配置。

代码:- [/code](3)然后创建一个【Config】文件夹,写一个获得【ConfigSetting】配置文件的帮助类。
- [img]https://img2023.cnblogs.com/blog/3240243/202308/3240243-20230830144439104-1644968756.png[/img]
- 代码:
- [code] /// <summary>
- /// 配置文件
- /// </summary>
- public class ConfigGeter
- {
- private static T TryGetValueFromConfig<T>(Func<string, T> parseFunc, Func<T> defaultTValueFunc, [CallerMemberName] string key = "", string supressKey = "")
- {
- try
- {
- if (!string.IsNullOrWhiteSpace(supressKey))
- {
- key = supressKey;
- }
- var node = ConfigurationManager.AppSettings[key];
- return !string.IsNullOrEmpty(node) ? parseFunc(node) : defaultTValueFunc();
- }
- catch (Exception ex)
- {
- return default(T);
- }
- }
- #region 消息队列:RocketMQ
- /// <summary>
- /// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例用户名。
- /// </summary>
- public static string ons_access_key
- {
- get
- {
- return TryGetValueFromConfig(_ => _, () => string.Empty);
- }
- }
- /// <summary>
- /// 设置为云消息队列 RocketMQ 版控制台实例详情页的实例密码。
- /// </summary>
- public static string ons_secret_key
- {
- get
- {
- return TryGetValueFromConfig(_ => _, () => string.Empty);
- }
- }
- /// <summary>
- /// 您在云消息队列 RocketMQ 版控制台创建的Topic。
- /// </summary>
- public static string ons_topic
- {
- get
- {
- return TryGetValueFromConfig(_ => _, () => string.Empty);
- }
- }
- /// <summary>
- /// 设置为您在云消息队列 RocketMQ 版控制台创建的Group ID。
- /// </summary>
- public static string ons_groupId
- {
- get
- {
- return TryGetValueFromConfig(_ => _, () => string.Empty);
- }
- }
- /// <summary>
- /// 设置为您从云消息队列 RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
- /// </summary>
- public static string ons_name_srv
- {
- get
- {
- return TryGetValueFromConfig(_ => _, () => string.Empty);
- }
- }
- /// <summary>
- /// 消息来源(生产者/消费端客户端编码)
- /// </summary>
- public static string ons_client_code
- {
- get
- {
- return TryGetValueFromConfig(_ => _, () => string.Empty);
- }
- }
- #endregion
- }
复制代码 4.3-封装核心代码
(1)新建一个【ConsumerStartup】文件,这个类继承自【MessageListener】类,然后实现consume方法,这个方法主要是消费者具体要执行的任务。

代码:- /// <summary>
- /// 消费端启动
- /// </summary>
- public class ConsumerStartup : MessageListener
- {
- private readonly static ILog logger = LogManager.GetLogger(typeof(ConsumerStartup));
- private readonly static ConsumerManager manager = new ConsumerManager();
- private readonly string _consumerClientCode;
- private readonly string _ons_groupId;
- /// <summary>
- /// 构造函数
- /// </summary>
- /// <param name="consumerClientCode">消费者客户端Code</param>
- /// <param name="ons_groupId">消费者消费的分组</param>
- public ConsumerStartup(string consumerClientCode, string ons_groupId)
- {
- _consumerClientCode = consumerClientCode;
- _ons_groupId = ons_groupId;
- }
- ~ConsumerStartup()
- {
- }
- /// <summary>
- /// 消费者任务
- /// </summary>
- /// <param name="value"></param>
- /// <param name="context"></param>
- /// <returns></returns>
- public override ons.Action consume(Message value, ConsumeContext context)
- {
- Console.WriteLine("【消费者任务】:消费者消息进来了...");
- logger.Info($"【消费者任务】:消费者消息进来了...");
- string topic = value.getTopic();
- string business_id = value.getKey();
- string message_id = value.getMsgID();
- string msg_tag = value.getTag();
- byte[] bytes = Encoding.Default.GetBytes(value.getBody());
- string msg_body = Encoding.Default.GetString(bytes);
- if (string.IsNullOrEmpty(msg_body))
- {
- return ons.Action.CommitMessage;
- };
- string log_body = $"本次消费的消息:【消费序列:{value.getQueueOffset()}】【消息key:{business_id}】【消息ID:{message_id}】【Tag:{msg_tag}】";
- Console.WriteLine(log_body);
- logger.Info(log_body);
- logger.Info($"【消费内容】:{msg_body}");
- int status = 1;
- string error_msg = "";
- long sys_msg_id = 0;
- QueueOnsCommonModel consumerModel = null;
- try
- {
- //调度到具体的消费者
- consumerModel = JsonUtility.DeserializeJSON<QueueOnsCommonModel>(msg_body);
- if (consumerModel != null)
- {
- logger.Info($"【消费者任务】:真正开始执行了(消息key:{consumerModel.MessageId})");
- if (!long.TryParse(consumerModel.MessageId, out sys_msg_id))
- {
- logger.Info("sys_msg_id 转换失败!");
- }
- manager.ExecuteConsumer(consumerModel.Tag, consumerModel.EventType, consumerModel);
- logger.Info($"【消费者任务】:执行完成了(消息key:{consumerModel.MessageId})");
- }
- else
- {
- status = 2;
- error_msg = "【调度到具体的消费者】解析消息body内容为空,无法进行消费";
- logger.Error($"【调度到具体的消费者】解析消息body内容为空,无法进行消费");
- }
- }
- catch (Exception ex)
- {
- logger.Error($"【消费者任务】:发生异常了:{ex.Message}", ex);
- status = 2;
- error_msg = ex.Message;
- }
- return ons.Action.CommitMessage;
- }
- }
复制代码 4.4-启动消费者
在【RocketMQConsumerService.cs】文件OnStart方法中创建生产者,主要就是从配置文件中获得配置信息,然后调用【QueueOnsProducer.CreatePushConsumer】方法创建消息队列生产者,通过调用【QueueOnsProducer.SetPushConsumer】方法来设置生产者,最后通过调用【QueueOnsProducer.StartPushConsumer】方法来启动生产者。

代码:- //创建消费者
- string ons_access_key = ConfigSetting.ons_access_key;
- string ons_secret_key = ConfigSetting.ons_secret_key;
- string ons_topic = ConfigSetting.ons_topic;
- string ons_groupId = ConfigSetting.ons_groupId;
- string ons_name_srv = ConfigSetting.ons_name_srv;
- string ons_client_code = ConfigSetting.ons_client_code;
- QueueOnsProducer.CreatePushConsumer(new ONSPropertyConfigModel()
- {
- AccessKey = ons_access_key,
- SecretKey = ons_secret_key,
- Topics = ons_topic,
- GroupId = ons_groupId,
- NAMESRV_ADDR = ons_name_srv,
- OnsClientCode = ons_client_code,
- });
- //设置消费者
- QueueOnsProducer.SetPushConsumer(new ConsumerStartup(ons_client_code, ons_groupId), "*");
- //启动消费者
- QueueOnsProducer.StartPushConsumer();
复制代码 4.5-接收消费消息
我们如果要创建一个具体消费者去消费某一条消息,需要先创建一个类,然后实现【IConsumerMsg】接口中的【Consume】方法。需要在这个方法上面标注两个特性,也可以是一个(意味着满足一个条件即可),一个是【ConsumerTag】Tag标签,表示要消费哪个生产的Tag标签,一个是【EventType】,表示要消费哪个生产的事件类型。如果有多个不同的消费者,就按照上面的方式创建多个即可。[西瓜程序猿]这边创建一个名为【SampleConsumer】的类作为例子。

代码:- /// <summary>
- /// 消费者Sample
- /// </summary>
- [ConsumerTag(QueueTagConsts.XG_Blog_Sample_Tag)]
- [EventType(QueueOnsEventType.RocketMQ_TEST)]
- public class SampleConsumer : IConsumerMsg
- {
- private readonly static ILog logger = LogManager.GetLogger(typeof(SampleConsumer));
- public void Consume(QueueOnsCommonModel model)
- {
- logger.Info($"【西瓜程序猿-消费者Sample】:测试消费者进来了");
- if (model != null)
- {
- Console.WriteLine("tag:" + model.Tag);
- Console.WriteLine("body" + model.Body);
- }
- Console.WriteLine("【西瓜程序猿-消费者Sample】消费成功了!");
- }
- }
复制代码 五、发布消费端
然后来介绍一下如何部署消费端。之前看评论区说使用NSSM部署安装Window服务更方便,后面我也试了一下确实还挺好用,但是针对目前这个程序始终运行不起来(各位大佬如果有更好的方法和建议可以在评论区提出来哈),所以这次还是用之前的方法来介绍如何部署Windows服务了。
5.1-服务基本配置
(1)点击我们的服务【RocketMQConsumerService.cs】,然后右击点击【添加安装程序】。

(2)然后可以看到下面多出来了一个文件,就是安装程序。


(3)然后可以修改基本信息,服务组件中的【服务名称】【服务描述】等等。我们右击【serviceInstall1】点击属性,然后进行修改。


(4)然后点击【serviceProcessInstall1】右击属性,进行修改。


5.2-服务运行与发布
当我们直接按F5或者其他方式直接运行项目时,会提示:"无法从命令行或调试程序启动服务。必须首先安装 Windows服务(使用installutil.exe),然后用ServerExplorer、Windows服务管理工具或 NET START命令启动它。"。不是这样运行的,跟着下面步骤来操作运行与发布Windows服务吧。

前提注意:如果你设置的目标平台是x64,打开的目录会不一样,不然导致服务运行不起来。可以右击项目名,点击【属性】——>【生成】——>【目标平台】查看。

如果不是x64版本,复制这个地址:
C:\Windows\Microsoft.NET\Framework\v4.0.30319
如果是x64版本,复制这个地址:
C:\Windows\Microsoft.NET\Framework64\v4.0.30319
不然会报类似这种错误:在初始化安装时发生异常: System.BadImageFormatException: 未能加载文件或程序集...
(1)然后我们把上面的地址(根据自己的环境选择)添加到环境变量中。点击【控制面板】——>【系统和安全】

(2)然后点击【系统】

(3)点击【高级系统设置】

(4)点击【环境变量】

(5)在【系统变量】中找到Path,然后点击【编辑】。

(6)然后点击【新建】,然后把我们拷贝的目录复制到这里。然后点击确认即可。

(7)测试是否配置成功,输入这个命令查看一下【InstallUtil】,如果是下面这样的内容说明成功了。

(8)然后编辑解决方案和项目。

(9)以管理员身份运行cmd命令,然后安装服务。
InstallUtil 项目启动执行文件全路径
西瓜程序猿的例子:
InstallUtil D:\项目演示临时保存\MyDemoService\MyDemoService\bin\Debug\MyDemoService.exe

(10)出现这个说明安装成功了。

(11)打开服务管理器,找到要启动的服务,然后右击启动服务。

(12)如果要卸载服务,可以运行这个命令:
InstallUtil /u 项目启动执行文件全路径
西瓜程序猿的例子:
InstallUtil /u D:\项目演示临时保存\MyDemoService\MyDemoService\bin\Debug\MyDemoService.exe

5.3-常见命令
1、安装服务:InstallUtil 项目启动执行文件全路径
2、启动服务:net start 服务名
3、停止服务:net stop 服务名
4、卸载服务:InstallUtil /u 项目启动执行文件全路径
5.4-测试消费消息
(1)首先可以先看一下日志,看一下这个消费者服务是否启动成功了。

(2)然后再日志里面记录下消费的消费,在根据消息Key或者消息ID在阿里云后台查询一下这一条消息的【消息轨迹】,如果提示消费成功就说明确实已经进行消费了。

最后,还有可能会出现消息生产失败、消息消费失败等场景,大佬们可以根据实际情况进行设计和跳转哈。
六、防踩坑指南
5.1:ons.ONSClient4CPPPINVOKE的类型初始值设定项引发异常
异常详情:
“ons.ONSClient4CPPPINVOKE”的类型初始值设定项引发异常。
解决方案:
第一步:在使用Visual Studio(VS)开发.NET的应用程序和类库时,默认的目标平台为“Any CPU”。但是.NET SDK仅支持Windows 64-bit操作系统,所以需要自行设置。先右击【RocketMQ.Producer】项目,然后点击【属性】,点击左侧选项的【生成】,然后将目标平台改为【x64】。

第二步:将资源包【ONSClient4CPP】文件夹里面所有的文件,复制到【bin】目录下。

5.2:Topic Route does not exist
异常详情:
Topic Route does not exist, Topic:XG_CXY_Test exception:msg: No route info of this topic, ,error:-1,in file line:581
See https://github.com/alibaba/ons/issues/7 for further details.”
异常截图:

解决方案:
这个问题一般是没有链接上RocketMQ,检查一下配置文件中信息是否与RocketMQ信息一致。尤其是[ons_name_srv] RocketMQ 版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”切记不要加"http://或者https
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |