进程间通信组件ZeroMQ详解

打印 上一主题 下一主题

主题 829|帖子 829|积分 2487

在一些复杂的项目中,往往会由不同功能的步伐组成,且在步伐运行期间,各个步伐还需要进行相互通信,实现进程间通信的方式有很多种,最常用的就是通过消息中间件,比如RabbitMQ,Kafka,以及ZeroMQ等,而RabbitMQ和Kafka这两款中间件往往都需要独立安装步骤才能使用,ZeroMQ却不需要独立安装部署,而是作为动态库直接在步伐中引用即可。今天以一个简单的小例子,简述ZeroMQ的常见用法,仅供学习分享使用,如有不敷之处,还请指正。

ZeroMQ (又被称为 ØMQ, 0MQ, or zmq),固然看起来像是可嵌入的网络组件,现实上却是一款并发框架。ZeroMQ作为一款开源通用消息组件,通过Socket可以将原子消息通过不同协议(进程内,进程间,TCP和广播等)进行传输。基于ZeroMQ,可以由多种模式进行选择,如:fan-out,发布-订阅,使命分发,请求-应答等,而且支持1-N,N-N等多端通信。对于C#开发职员,ZeroMQ有两种方式可供选择,1. NetMQ,提供一个端口给C#;2. clrzmq4通过C#绑定到libzmq。而NetMQ正是ZeroMQ推荐的使用方式

ZeroMQ通信模式

 
ZeroMQ提供了多种通信模式,主要有以下几种:

  • 请求应答(Request-Response)模式,此模式是ZeroMQ所有通信方式中最简单的一种模式,当客户端发出请求时,期望得到应答,且必须得到应答,才算一个完备的通信。


  • 发布订阅(Publish-Subscriber)模式,此模式发送者并不直接发送消息给吸收者,而是将要发送的消息进行分类,吸收者根据分类只吸收自己感兴趣的消息,这就是发布订阅模式。这里提及的消息分类,通常被称为主题(topic)或过滤器(filter)。ZeroMQ通过多种信息来表达主题内容,以字节数组或者字符串的形式表示。
  • 生产者-消费者(Push-Pull)模式,此模式是一个消息分发机制,主要用于使命并发和并行处置惩罚场景,正常情况下,一个或者多个生产者发送消息,而一个或多个消费者吸收并处置惩罚这些消息,这种模式特殊适合于需要高效使命分发的场景,如分布式计算,大数据处置惩罚等。
  • 路由经销商(Router-Dealer)模式,此种模式是请求回复模式的一种升级,当Router收到消息的时候,会自动在消息的前面添加一帧,用来识别发送端的地址。当发送一个消息的时候,需要先发送一帧对端的地址,然后再发送消息,如果目标地址指向的对端不存在了,这个消息就会被丢弃。对端的地址默认情况下由ZMQ来产生一个唯一标识UUID。DEALER可以恣意读写,不需要额外的地址帧,当有多个对端的时候,循环给单个对端发送消息。(注意:不是群发消息,与PUB不同)。
  • 多发布订阅(XPub-XSub),通常情况下,发布订阅模式适用于一个发布者,多个订阅者的场景。如果需要多个消息发布者,那XPub-XSub模式将会比力适用。
本文主要讲解请求应答模式和发布订阅模式,其他通信模式,如果感兴趣可以参考官方文档。
请求应答

请求应答(Request-Response),此模式是ZeroMQ所有通信方式中最简单的一种模式,当客户端发出请求时,期望得到应答,且必须得到应答,才算一个完备的通信。请求应答模式是同步壅闭模式,如果发送消息次序错误,会抛出异常。精确的请求应答次序如下:

  • 请求端(RequestSocket)发送一个消息
  • 响应端(ResponseSocket)阅读请求消息
  • 响应端(ResponseSocket)发送响应信息
  • 请求端(RequestSocket)吸收响应端(ResponseSocket)发送的信息。
请求应答模式,主要由RequestSocket和ResponseSocket组成,实现消息的请求和应答。
请求端发送消息之前,需要先进行连接Connect,然后才能发送消息。示例代码如下所示:
  1. public class ZeroMQRequest:IDisposable
  2. {
  3.         public Action<string> Received;
  4.         public Action<string> Sended;
  5.         private string url = string.Empty;
  6.         private RequestSocket request;
  7.         public ZeroMQRequest(string url)
  8.         {
  9.                 this.request = new RequestSocket();
  10.                 this.url = url;
  11.         }
  12.         public void Connect()
  13.         {
  14.                 request.Connect(this.url);
  15.         }
  16.         public void BeginReceive()
  17.         {
  18.                 string msg = this.request.ReceiveFrameString();
  19.                 Received?.Invoke(msg);
  20.         }
  21.         public void SendMsg(string msg)
  22.         {
  23.                 this.request.SendFrame(msg);
  24.                 if (Sended != null)
  25.                 {
  26.                         Sended.Invoke(msg);
  27.                 }
  28.         }
  29.         public void Disconnect()
  30.         {
  31.                 request.Disconnect(this.url);
  32.         }
  33.         public void Dispose()
  34.         {
  35.                 request.Close();
  36.                 request.Dispose();
  37.         }
  38. }
复制代码
响应端吸收消息之前,需要先进行绑定(Bind)到对应的网络端口,然后才能吸收消息。示例代码如下所示:
  1. public class ZeroMQResponse:IDisposable
  2. {
  3.         public Action<string> Received;
  4.         public Action<string> Sended;
  5.         private string url = string.Empty;
  6.         private ResponseSocket response;
  7.         public ZeroMQResponse(string url)
  8.         {
  9.                 this.url = url;
  10.                 this.response = new ResponseSocket();
  11.                 this.response.Bind(this.url);
  12.         }
  13.         public void BeginReceive()
  14.         {
  15.                 Task.Run(() =>
  16.                 {
  17.                         while (true)
  18.                         {
  19.                                 string msg = this.response.ReceiveFrameString();
  20.                                 Received?.Invoke(msg);
  21.                                 //收到回复
  22.                                 Send("Ok");
  23.                         }
  24.                 });
  25.                
  26.         }
  27.         public void Send(string msg)
  28.         {
  29.                 this.response.SendFrame(msg);
  30.                 if (Sended != null)
  31.                 {
  32.                         Sended.Invoke(msg);
  33.                 }
  34.         }
  35.         public void Dispose()
  36.         {
  37.                 this.response.Dispose();
  38.         }
  39. }
复制代码
上述代码是将ReuqestSocket和ResponseSocket进行封装,并通过委托Action公开了吸收和发送后响应接口,在使用时进行调用即可。
请求应答模式示例截图如下所示:

由于请求应答模式是壅闭模式,如果没有发送就调用吸收方法,或一连调用吸收方法,或一连发送(发送没有响应就再次发送),则会抛出异常。
一连两次发送,异常信息如下所示:

一连两次吸收,异常信息如下所示:

发布订阅

发布订阅模式,将要发送的信息按照主题(Topic)进行分类,哪个吸收端订阅了这个主题,就吸收对应的消息,而不是直接发发送给吸收者,如许有助于对消息进行分类处置惩罚。所以发布订阅模式并非壅闭模式,也不是一对一的请求响应,而是按需分类,异步响应模式。此模式主要有PublisherSocket和SubscriberSocket两个类,分别用于处置惩罚消息的发布和订阅。精确的发布订阅次序,如下所示:

  • 发送端:定义PubliserSocket对象,并绑定(Bind)到指定端口。然后发送主题和消息。
  • 吸收端:定义SubscriberSocket对象,连接到指定端口,订阅主题,吸收指定主题的消息。
消息发布类(PublisherSocket),在消息发送之前,起首绑定一个端口,然后才能发送主题和消息,示例代码如下所示:
  1. public class ZeroMQPublisher : IDisposable
  2. {
  3.         private string url=string.Empty;
  4.         private PublisherSocket publisher;
  5.         public Action<string> Sended;
  6.         public ZeroMQPublisher(string url)
  7.         {
  8.                 this.url = url;
  9.                 this.publisher = new PublisherSocket();
  10.                 this.publisher.Bind(url);
  11.         }
  12.         public void Send(string topic,string msg)
  13.         {
  14.                 this.publisher.SendMoreFrame(topic);
  15.                 this.publisher.SendFrame(msg);
  16.                 if(Sended != null)
  17.                 {
  18.                         Sended.Invoke($"send msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
  19.                 }
  20.         }
  21.         public void Dispose()
  22.         {
  23.                 this.publisher.Close();
  24.                 this.publisher.Dispose();
  25.         }
  26. }
复制代码
消息订阅类(SubscriberSocket),在消息吸收之前,起首连接端口,订阅主题(Subscribe方法),然后才能进行消息的吸收,示例代码如下所示:
  1. public class ZeroMQSubscriber : IDisposable
  2. {
  3.         private string url=string.Empty;
  4.         private SubscriberSocket subscriber;
  5.         public Action<string> Received;
  6.         private bool isRunning = false;
  7.         public ZeroMQSubscriber(string url)
  8.         {
  9.                 this.url = url;
  10.                 this.subscriber = new SubscriberSocket();
  11.                 this.subscriber.Connect(url);
  12.                 this.subscriber.Subscribe(string.Empty);
  13.                 this.isRunning = true;
  14.         }
  15.         public void BeginReceive()
  16.         {
  17.                 Task.Run(() =>
  18.                 {
  19.                         while(isRunning)
  20.                         {
  21.                                 var topic = this.subscriber.ReceiveFrameString();
  22.                                 var msg = this.subscriber.ReceiveFrameString();
  23.                                 if(Received != null)
  24.                                 {
  25.                                         Received.Invoke($"received msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");
  26.                                 }
  27.                         }
  28.                 });
  29.         }
  30.         public void DisConnect()
  31.         {
  32.                 isRunning = false;
  33.                 this.subscriber.Disconnect(this.url);
  34.         }
  35.         public void Dispose()
  36.         {
  37.                 this.isRunning=false;
  38.                 this.subscriber.Close();
  39.                 this.subscriber?.Dispose();
  40.         }
  41. }
复制代码
注意,发布订阅模式是单向触发的,即消息发布者,不可以吸收消息;消息吸收者,也不可以发布消息。吸收端在调用Subscribe方法时,如果主题为空,则表示可以订阅任何主题。
发布订阅模式示例截图如下所示:

源码下载

关注老码识途公众号,回复关键字ZeroMQ,即可获取示例源码,如下图所示:

以上就是《进程间通信组件库ZeroMQ详解》的全部内容。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表