rocketmq基本架构

打印 上一主题 下一主题

主题 986|帖子 986|积分 2958


简介

Name server



  • 负责broker注册、心跳,路由等功能,类似Kafka的ZK
  • name server节点之间不互相通讯,broker需要和所有name server进行通讯。扩容name server需要重启broker,不然broker不会和name server创建毗连
  • producer和consumer也都是和name server创建长毗连,获取路由信息,拿到对应的broker信息,与broker创建长毗连,然后发送/消耗消息
路由发现

Pull的模式。当topic路由信息发生变化时,name server不回主动推送给客户端,而是客户端定时拉取。默认客户端每30秒会拉取一次最新的路由信息
   扩展:
  1)push模型:实时性好,但是需要维护一个长链接,消耗服务端资源。client数目不多,实时性要求高,server数据变化比较频仍的场景适合此种模式
  2)pull模型:实时性差
  3)long
polling模型:长轮询模式。客户端定时发送拉取哀求,服务端会hold住毗连一段时间,在此期间的数据变动通过此毗连推送。超过hold时间后才断开毗连。分身以上两种方式
  Broker



  • broker每30s给name server发送一次心跳,name server每120s检查一次所有的broker心跳时间,超过阈值踢出broker
  • broker节点集群是主从集群,master负责处置惩罚读写哀求,slave负责对master中的数据进行备份。master和slave有类似的broker name,但broker id不同,broker id为0的是master,非0的是slave。每个broker与name server集群中的所有节点创建长毗连,定时注册topic信息到所有name server
源码分析

NameServer

NameServer的启动过程分析

NameServer服务器相关的源码在namesrv模块下,目录结构如下:

NamesrvStartup类就是Name Server服务器启动的启动类,NamesrvStartup类中有一个main启动类,main方法调用main0,main0主要流程代码

main0 方法的主要作用就是创建Name Server服务器的控制器,并且启动Name Server服务器的控制器。NamesrvController类的作用就是为Name Server服务的启动提供详细的逻辑实现,主要包罗配置信息的加载、远程通敬佩务器的创建和加载、默认处置惩罚器的注册以及心跳检测呆板监控Broker的健康状态等。Name Server服务器的控制器的创建方法为createNamesrvController方法,createNamesrvController方法的主要流程代码如下:
  1. //代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController
  2. public static NamesrvController createNamesrvController(String[] args){
  3.      //设置rocketMQ的版本信息,REMOTING_VERSION_KEY的值为:rocketmq.remoting.version,CURRENT_VERSION的值为:V4_7_0
  4.     System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  5.      //构建命令行,添加帮助命令和Name Server的提示命令,将createNamesrvController方法的args参数进行解析
  6.      //代码省略
  7.      //nameServer 服务器配置类和netty 服务器配置类
  8.      final NamesrvConfig namesrvConfig = new NamesrvConfig();
  9.      final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  10.      //设置netty服务器的监听端口
  11.      nettyServerConfig.setListenPort(9876);
  12.      // 判断上述构建的命令行是否有configFile(缩写为C)配置文件,如果有的话,则读取configFile配置文件的配置信息,
  13.      // 并将转为NamesrvConfig和NettyServerConfig的配置信息
  14.      // 代码省略
  15.     // 如果构建的命令行存在字符'p',就打印所有的配置信息病区退出方法
  16.     // 代码省略
  17.     //首先将构建的命令行转换为Properties,然后将通过反射的方式将Properties的属性转换为namesrvConfig的配置项和配置值。
  18.     MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  19.     //打印nameServer 服务器配置类和 netty 服务器配置类的配置信息
  20.     MixAll.printObjectProperties(log, namesrvConfig);
  21.     MixAll.printObjectProperties(log, nettyServerConfig);
  22.      //将namesrvConfig和nettyServerConfig作为参数创建nameServer 服务器的控制器
  23.       final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
  24.       //将所有的配置保存在内存中(Properties)
  25.       controller.getConfiguration().registerConfig(properties);
  26.       return controller;
  27. }
复制代码
createNamesrvController方法主要做了几件事,读取和解析配置信息,包罗Name Server服务的配置信息、Netty 服务器的配置信息、打印读取或者解析的配置信息、生存配置信息到当地文件中,以及根据namesrvConfig配置和nettyServerConfig配置作为参数创建nameServer 服务器的控制器。创建好Name server控制器以后,就可以启动它了。启动Name Server的方法的主流程如下:
  1. //代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#start
  2. public static NamesrvController start(final NamesrvController controller){
  3.      //初始化nameserver 服务器,如果初始化失败则退出
  4.       boolean initResult = controller.initialize();
  5.        if (!initResult) {
  6.            controller.shutdown();
  7.            System.exit(-3);
  8.        }
  9.      //添加关闭的钩子,进行内存清理、对象销毁等惭怍
  10.      Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
  11.             @Override
  12.             public Void call() throws Exception {
  13.                 controller.shutdown();
  14.                 return null;
  15.             }
  16.         }));
  17.      //启动
  18.      controller.start();
  19. }
复制代码
start方法没什么逻辑,主要作用就是进行初始化工作,然后进行启动Name Server控制器,接下来看看进行了哪些初始化工作以及如何启动Name Server的,初始化initialize方法的主要流程如下:
  1. //代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#initialize
  2. public boolean initialize() {
  3.      // key-value 配置加载
  4.      this.kvConfigManager.load();
  5.      // //创建netty远程服务器,用来进行网络传输以及通信
  6.      this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  7.       //远程服务器线程池
  8.         this.remotingExecutor =
  9.             Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  10.        //注册处理器
  11.        this.registerProcessor();
  12.       //每10秒扫描不活跃的broker
  13.         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  14.             @Override
  15.             public void run() {
  16.                 NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  17.             }
  18.         }, 5, 10, TimeUnit.SECONDS);
  19.         //每10秒打印配置信息(key-value)
  20.         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  21.             @Override
  22.             public void run() {
  23.                 NamesrvController.this.kvConfigManager.printAllPeriodically();
  24.             }
  25.         }, 1, 10, TimeUnit.MINUTES);
  26.         //省略部分代码
  27.         return true;
  28. }
复制代码
initialize方法的主要逻辑如下:
加载配置文件。读取文件名为"user.home/namesrv/kvConfig.json"(其中user.home为用户的目录),然后将读取的文件内容转为KVConfigSerializeWrapper类,最后将所有的key-value生存在如下map中:
   //用来生存不同定名空间的key-value private final HashMap<String/* Namespace /,
HashMap<String/
Key /, String/ Value */>> configTable = new
HashMap<String, HashMap<String, String>>();
  

  • 创建Netty服务器。Name Server 用netty与生产者、消耗者以及Boker进行通讯。
  • 注册处置惩罚器。这里主要注册的是默认的处置惩罚器DefaultRequestProcessor,注册的逻辑主要是初始化DefaultRequestProcessor并生存着,待需要使用的时候直接使用。处置惩罚器的作用就是处置惩罚生产者、消耗者以及Broker服务器的不同哀求,比如获取生产者和消耗者获取所有的路由信息,Broker服务器注册路由信息等。处置惩罚器DefaultRequestProcessor处置惩罚不同的哀求将会在下面进行讲述。
  • 实行定时任务。主要有两个定时任务,一个是每十秒扫描不活泼的Broker。并且将逾期的Broker清理掉。别的一个是每十秒打印key-valu的配置信息。
上面就是initialize方法的主要逻辑,特别需要注意每10秒扫描不活泼的broker的定时任务:
  1. //NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  2. //代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker
  3. public void scanNotActiveBroker() {
  4.      //所有存活的Broker
  5.      Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
  6.      //遍历Broker
  7.      while (it.hasNext()) {
  8.             Entry<String, BrokerLiveInfo> next = it.next();
  9.             long last = next.getValue().getLastUpdateTimestamp();
  10.             //最后更新时间加上broker过期时间(120秒)小于当前时间,则关闭与broker的远程连接。并且将缓存在map中的broker信息删除
  11.             if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
  12.                 RemotingUtil.closeChannel(next.getValue().getChannel());
  13.                 it.remove();
  14.                 //将过期的Channel连接清理掉。以及删除缓存的Broker
  15.                 this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
  16.             }
  17.         }
  18. }
复制代码
scanNotActiveBroker方法的逻辑主要是遍历缓存在brokerLiveTable的Broker,将Broker最后更新时间加上120秒的结果是否小于当前时间,假如小于当前时间,阐明Broker已经逾期,大概是已经下线了,以是可以扫除Broker信息,并且关闭Name Server 服务器与Broker服务器毗连,这样被扫除的Broker就不会与Name Server服务器进行远程通讯了。brokerLiveTable的结果如下:
  1. //保存broker地址与broker存活信息的对应关系
  2. private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
复制代码
brokerLiveTable缓存着以brokerAddr为key(Broker 地址),以BrokerLiveInfo为value的结果,BrokerLiveInfo是Broker存活对象,主要有如下几个属性:
  1. class BrokerLiveInfo {
  2.     //最后更新时间
  3.     private long lastUpdateTimestamp;
  4.     //版本信息
  5.     private DataVersion dataVersion;
  6.     //连接
  7.     private Channel channel;
  8.     //高可用服务器地址
  9.     private String haServerAddr;
  10.     //省略代码
  11. }
复制代码
从BrokerLiveInfo中删除了逾期的Broker后,还需要做清理Name Server服务器与Broker服务器的毗连,onChannelDestroy方法主要是清理缓存在如下map的信息:
  1. ////代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager//保存broker地址与broker存活信息的对应关系
  2. private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
  3. //生存broker地址与过滤服务器的对应关系,Filter Server 与消息过滤有关系private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;//生存broker 名字与 broker元数据的关系private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;//生存集群名字与集群下所有broker名字对应的关系private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//生存topic与topic下所有队列元数据的对应关系 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
复制代码
在扫描逾期的broker时,首先找到不活泼的broker,然后onChannelDestroy方法清理与该不活泼broker有关的缓存,清理的主要流程如下:


  • 清理不活泼的broker存活信息。首先遍历brokerLiveTable找到不活泼的broker,然后删除brokerLiveTable中的与该不活泼的broker有关的缓存信息。
  • 清理与消息过滤有关的缓存。找到不活泼的broker存活信息,删除filterServerTable中的与该broker地址有关的消息过滤的服务信息。
  • 清理与不活泼broker的元素居。brokerAddrTable生存着broker名字与broker元素居对应的信息,BrokerData类生存着cluster、brokerName、brokerId与broker name。遍历brokerAddrTable找到与该不活泼broker的名字相等的broker元素进行删除。
  • 清理集群下对应的不活泼broker名字。clusterAddrTable生存集群名字与集群下所有broker名字对应的关系,遍历clusterAddrTable的所有key,从clusterAddrTable中找到与不活泼broker名字相等的元素,然后删除。
  • 清理与该不活泼broker的topic对应队列数据。topicQueueTable生存topic与topic下所有队列元数据的对应关系,QueueData生存着brokerName、readQueueNums(可读队列数目)、writeQueueNums(可写队列数目)等。遍历topicQueueTable的key,找到与不活泼broker名字类似的QueueData进行删除。
初始化nameserver 服务器以后,接下来就可以启动nameserver 服务器:
  1. //代码位置:org.apache.rocketmq.namesrv.NamesrvController#start
  2. public void start() throws Exception {
  3.     //启动远程服务器(netty 服务器)
  4.     this.remotingServer.start();
  5.     //启动文件监听线程
  6.     if (this.fileWatchService != null) {
  7.         this.fileWatchService.start();
  8.     }
  9. }
复制代码
start方法做了两件事,第一件就是启动netty服务器,netty服务器主要负责与Broker、生产者与消耗者之间的通讯,处置惩罚Broker、生产者与消耗者的不同哀求。根据nettyConfig配置,设置启动的配置和各种处置惩罚器,然后采用netty服务器启动的模板启动服务器,详细的代码就不分析了,有爱好的可以看看netty启动代码模板是怎么样的。第二件事就是启动文件监听线程,监听tts相关文件是否发生变化。
Name Server 服务器启动流程的源代码分析到此为止了,在这里总结下Name Server 服务器启动流程主要做了什么事:


  • 加载和读取配置。设置Name Server 服务器启动的配置NamesrvConfig和启动Netty服务器启动的配置NettyServerConfig。
  • 初始化相关的组件。netty服务类、远程服务线程池、处置惩罚器以及定时任务的初始化。
  • 启动Netty服务器。Netty服务器用来与broker、生产者、消耗者进行通讯、处置惩罚与它们之间的各种哀求,并且对哀求的响应结果进行处置惩罚。
Broker管理和路由信息的管理

Name Server 服务器的作用主要有两个:Broker管理和路由信息管理。
Broker管理

在上面分析的Name Server 服务器的启动过程中,也有一个与Broker管理相关的分析,那就是启动一个定时线程池每十秒去扫描不活泼的Broker。将不活泼的Broker清理掉。除了在Name Server 服务器启动时启动定时任务去扫描不活泼的Broker外,Name Server 服务器启动以后,通过netty服务器吸收Broker、生产者、消耗者的不同哀求,将吸收到哀求会交给在Name Server服务器启动时注册的处置惩罚器DefaultRequestProcessor类的processRequest方法处置惩罚。processRequest方法根据哀求的不同范例,将哀求交给不同的方法进行处置惩罚。有关Broker管理的哀求主要有注册Broker、注销Broker,processRequest方法处置惩罚注册Broker、注销Broker哀求的代吗如下:
  1. //代码位置:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest
  2. public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)  {
  3.          switch (request.getCode()) {
  4.             //省略无关代码
  5.             //注册Broker
  6.             case RequestCode.REGISTER_BROKER:
  7.                 Version brokerVersion = MQVersion.value2Version(request.getVersion());
  8.                 if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
  9.                     return this.registerBrokerWithFilterServer(ctx, request);
  10.                 } else {
  11.                     return this.registerBroker(ctx, request);
  12.                 }
  13.             //注销Broker
  14.             case RequestCode.UNREGISTER_BROKER:
  15.                 return this.unregisterBroker(ctx, request);
  16.            //省略无关代码
  17.          }
  18. }
复制代码
Broker注册

Broker 服务器启动时,会向Name Server 服务器发送Broker 相关的信息,如集群的名字、Broker地址、Broker名字、topic相关信息等,注册Broker主要的代码比较长,接下来会分成好几部门进行讲解。如下:
  1. //代码位置:org.apache.rocketmq.namesrv.processor.RouteInfoManager#registerBroker
  2. public RegisterBrokerResult registerBroker(
  3.         final String clusterName,
  4.         final String brokerAddr,
  5.         final String brokerName,
  6.         final long brokerId,
  7.         final String haServerAddr,
  8.         final TopicConfigSerializeWrapper topicConfigWrapper,
  9.         final List<String> filterServerList,
  10.         final Channel channel) {
  11.      RegisterBrokerResult result = new RegisterBrokerResult();
  12.      this.lock.writeLock().lockInterruptibly();
  13.      //根据集群的名字获取所有的broker名字
  14.      Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
  15.      if (null == brokerNames) {
  16.            brokerNames = new HashSet<String>();
  17.            this.clusterAddrTable.put(clusterName, brokerNames);
  18.       }
  19.       //名字保存在broker名字中
  20.       brokerNames.add(brokerName);
  21.     //省略代码
  22. }
复制代码
registerBroker方法根据集群的名字获取该集群下所有的Broker名字的Set,假如不存在就创建并添加进clusterAddrTable中,clusterAddrTable生存着集群名字与该集群下所有的Broker名字对应关系,最后将broker名字生存在set中。
  1. public RegisterBrokerResult registerBroker(
  2.         final String clusterName,
  3.         final String brokerAddr,
  4.         final String brokerName,
  5.         final long brokerId,
  6.         final String haServerAddr,
  7.         final TopicConfigSerializeWrapper topicConfigWrapper,
  8.         final List<String> filterServerList,
  9.         final Channel channel) {
  10.      //省略代码
  11.      boolean registerFirst = false;
  12.      //获取broker 元数据
  13.      BrokerData brokerData = this.brokerAddrTable.get(brokerName);
  14.      if (null == brokerData) {
  15.            registerFirst = true;
  16.            brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
  17.            this.brokerAddrTable.put(brokerName, brokerData);
  18.       }
  19.      //获取所有的broker地址
  20.      Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
  21.      Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
  22.      while (it.hasNext()) {
  23.            Entry<Long, String> item = it.next();
  24.            if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
  25.                   it.remove();
  26.             }
  27.      }
  28.      String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
  29.      registerFirst = registerFirst || (null == oldAddr);
  30.    //省略代码
  31. }
复制代码
上述代码主要做了两件事:


  • 缓存broker元数据信息。首先根据broker名字从brokerAddrTable中获取Broker元数据brokerData,假如brokerData不存在,阐明是第一次注册,创建Broker元数据并添加进brokerAddrTable中,brokerAddrTable生存着Broker名字与Broker元数据对应的信息。
  • 从Broker元数据brokerData中获取该元数据中的所有Broker地址信息brokerAddrsMap。brokerAddrsMap生存着brokerId与所有Broker名字对应信息。遍历brokerAddrsMap中的所有broker地址,查找与参数brokerAddr类似但是与参数borkerId不同的进行删除,保证一个broker名字对应着BrokerId,最后将参数brokerId与参数brokerAddr生存到brokerData元数据的brokerAddrsMap中进行缓存。
  1. public RegisterBrokerResult registerBroker(
  2.         final String clusterName,
  3.         final String brokerAddr,
  4.         final String brokerName,
  5.         final long brokerId,
  6.         final String haServerAddr,
  7.         final TopicConfigSerializeWrapper topicConfigWrapper,
  8.         final List<String> filterServerList,
  9.         final Channel channel) {
  10.      //省略代码
  11.      //如果topic的配置不空并且是broker master
  12.      if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {
  13.           //如果topic配置改变或者是第一次注册
  14.           if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {
  15.                   //获取所有的topic配置
  16.                   ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();
  17.                   if (tcTable != null) {
  18.                        //遍历topic配置,创建并更新队列元素
  19.                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
  20.                              this.createAndUpdateQueueData(brokerName, entry.getValue());
  21.                         }
  22.                     }
  23.              }
  24.        }
  25.     //省略代码
  26. }
复制代码
假如参数topicConfigWrapper不等于空,并且brokerId等于0时,判断topic是否改变,假如topic改变或者是第一次注册,获取所有的topic配置,并创建和更新队列元数据。QueueData生存着队列元数据,如Broker名字、写队列数目、读队列数目,假如队列缓存中不存在该队列元数据,则添加,否则遍历缓存map找到该队列元数据进行删除,假如是新添加的则添加进队列缓存中。
  1. public RegisterBrokerResult registerBroker(
  2.         final String clusterName,
  3.         final String brokerAddr,
  4.         final String brokerName,
  5.         final long brokerId,
  6.         final String haServerAddr,
  7.         final TopicConfigSerializeWrapper topicConfigWrapper,
  8.         final List<String> filterServerList,
  9.         final Channel channel) {
  10.    //省略代码
  11.     //创建broker存活对象,并进行保存
  12.     BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(
  13.                         System.currentTimeMillis(),
  14.                         topicConfigWrapper.getDataVersion(),
  15.                         channel,
  16.                         haServerAddr));
  17.     if (null == prevBrokerLiveInfo) {
  18.            log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
  19.     }
  20.     //如果过滤服务地址不为空,则缓存到filterServerTable
  21.     if (filterServerList != null) {
  22.             if (filterServerList.isEmpty()) {
  23.                    this.filterServerTable.remove(brokerAddr);
  24.             } else {
  25.                    this.filterServerTable.put(brokerAddr, filterServerList);
  26.             }
  27.      }
  28.      //如果不是broker master,获取高可用服务器地址以及master地址
  29.      if (MixAll.MASTER_ID != brokerId) {
  30.               String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
  31.               if (masterAddr != null) {
  32.                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
  33.                    if (brokerLiveInfo != null) {
  34.                          result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
  35.                          result.setMasterAddr(masterAddr);
  36.                     }
  37.                }
  38.       }
  39.       return result;
  40. }
复制代码
最子女码片段,主要做了三件事,首先创建了Broker存活对象BrokerLiveInfo,添加到brokerLiveTable中缓存,在Name Server 启动时,供定时线程任务每十秒进行扫描。以确保非正常的Broker被清理掉。然后是判断参数filterServerList是否为空,假如不为空,则添加到filterServerTable缓存,filterServerTable生存着与消息过滤相关的过滤服务。最后,判断该注册的Broker不是Broker master,则设置高可用服务器地址以及master地址。到此为止,Broker注册的代码就分析完成了,总而言之,Broker注册就是Broker将相关的元数据信息,如Broker名字,Broker地址、topic信息发送给Name Server服务器,Name Server吸收到以后将这些元数据缓存起来,以供后续能够快速找到这些元数据,生产者和消耗者也可以通过Name Server服务器获取到Broke相关的信息,这样,生产者和消耗者就可以和Broker服务器进行通讯了,生产者发送消息给Broker服务器,消耗者从Broker服务器消耗消息。
Broker注销

Broker注销的过程刚好跟Broker注册的过程相反,Broker注册是将Broker相关信息和Topic配置信息缓存起来,以供生产者和消耗者使用。而Broker注销则是将Broker注销缓存的Broker信息从缓存中删除,Broker注销unregisterBroker方法主要代码流程如下:
  1. //代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker
  2. public void unregisterBroker(
  3.         final String clusterName,
  4.         final String brokerAddr,
  5.         final String brokerName,
  6.         final long brokerId) {
  7.     this.lock.writeLock().lockInterruptibly();
  8.     //将缓存的broker存活对象删除
  9.     BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
  10.      //将所有的过滤服务删除
  11.      this.filterServerTable.remove(brokerAddr);
  12.     boolean removeBrokerName = false;
  13.     //删除broker元数据
  14.      if (null != brokerData) {
  15.            String addr = brokerData.getBrokerAddrs().remove(brokerId);
  16.            if (brokerData.getBrokerAddrs().isEmpty()) {
  17.                 this.brokerAddrTable.remove(brokerName);
  18.                 removeBrokerName = true;
  19.             }
  20.       }
  21.       //如果删除broker元数据成功
  22.       if (removeBrokerName) {
  23.            Set<String> nameSet = this.clusterAddrTable.get(clusterName);
  24.            if (nameSet != null) {
  25.                 boolean removed = nameSet.remove(brokerName);
  26.                 if (nameSet.isEmpty()) {
  27.                       this.clusterAddrTable.remove(clusterName);
  28.                  }
  29.             }
  30.             //根据brokerName删除topic配置信息
  31.             this.removeTopicByBrokerName(brokerName);
  32.       }
  33.       this.lock.writeLock().unlock();
  34. }
复制代码
unregisterBroker方法的参数有集群名字、broker地址、broker名字、brokerId,主要逻辑为:


  • 根据broker地址删除broker存活对象。
  • 根据broker地址删除所有消息过滤服务。
  • 删除broker元数据。
  • 假如删除元数据成功,则根据集群名字删除该集群的所有broker名字,以及根据根据- brokerName删除topic配置信息。
路由信息的管理

处置惩罚器DefaultRequestProcessor类的processRequest方法除了处置惩罚Broker注册和Broker注销的哀求外,还处路由信息管理有关的哀求,吸收到生产者和消耗者的路由信息相关的哀求,会交给处置惩罚器DefaultRequestProcessor类的processRequest方法处置惩罚,processRequest方法则会根据不同的哀求范例将哀求交给RouteInfoManager类的不同方法处置惩罚。RouteInfoManager类用map进行缓存路由相关信息,map如下:
  1. //topic与队列数据对应映射关系
  2. private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
  3. //broker 名字与broker 元数据对应映射关系
  4. private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
  5. //保存cluster的所有broker name
  6. private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
  7. //broker 地址 与 BrokerLiveInfo存活对象的对应映射关系
  8. private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
  9. //broker 地址 的所有过滤服务
  10. private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
复制代码
RouteInfoManager类利用上面几个map缓存了Broker信息,topic相关信息、集群信息、消息过滤服务信息等,假如这些缓存的信息有变化,就是网这些map新增或删除缓存。这就是Name Server服务的路由信息管理。processRequest方法是如何处置惩罚路由信息管理的,详细实现可以阅读详细的代码,无非就是将不同的哀求委托给RouteInfoManager的不同方法,RouteInfoManager的不同实现了上面缓存信息的管理。
Broker

Broker 主要负责消息的存储,投递和查询以及保证服务的高可用。Broker负责吸收生产者发送的消息并存储、同时为消耗者消耗消息提供支持。为了实现这些功能,Broker包含几个紧张的子模块:
通讯模块:负责处置惩罚来自客户端(生产者、消耗者)的哀求。
客户端管理模块:负责管理客户端(生产者、消耗者)和维护消耗者的Topic订阅信息。
存储模块:提供存储消息和查询消息的能力,方便Broker将消息存储到硬盘。
高可用服务(HA Service):提供数据冗余的能力,保证数据存储到多个服务器上,将Master Broker的数据同步到Slavew Broker上。
索引服务(Index service):对投递到Broker的消息创建索引,提供快速查询消息的能力。

broker启动过程分析

在Name Server启动以后,Broker就可以开始启动了,启动过程将所有路由信息都注册到Name server服务器上,生产者就可以发送消息到Broker,消耗者也可以从Broker消耗消息。接下来就来看看Broker的详细启动过程。
  1. //源代码位置:org.apache.rocketmq.broker.BrokerStartup#main
  2. public static void main(String[] args) {
  3.       start(createBrokerController(args));
  4. }
复制代码
BrokerStartup类是Broker的启动类,在BrokerStartup类的main方法中,首先创建用createBrokerController方法创建Broker控制器(BrokerController类),Broker控制器主要负责Broker启动过程的详细的相关逻辑实现。创建好Broker 控制器以后,就可以启动Broker 控制器了,以是下面将从两个部门分析Broker的启动过程:


  • 创建Broker控制器
  • 初始化配置信息
  • 创建并初始化Broker控制
  • 注册Broker关闭的钩子
  • 启动Broker控制器
创建Broker控制器

Broker在启动的时候,会初始化一些配置,如Broker配置、netty服务端配置、netty客户端配置、消息存储配置,为Broker启动提供配置准备。
  1. //源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
  2. public static BrokerController createBrokerController(String[] args) {
  3.     /**
  4.     省略代码
  5.     注释:
  6.         1、设置RocketMQ的版本
  7.         2、设置netty接收和发送请求的buffer大小
  8.         3、构建命令行:将命令行进行解析封装
  9.     **/
  10.      //broker配置、netty服务端配置、netty客户端配置
  11.      final BrokerConfig brokerConfig = new BrokerConfig();
  12.      final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  13.      final NettyClientConfig nettyClientConfig = new NettyClientConfig();
  14.      nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
  15.                 String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
  16.      //设置netty监听接口
  17.      nettyServerConfig.setListenPort(10911);
  18.      //消息存储配置
  19.      final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
  20.      //如果broker的角色是slave,设置命中消息在内存的最大比例
  21.      if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
  22.          int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
  23.          messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
  24.      }
  25.      //省略代码
  26. }
复制代码
createBrokerController方法创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类,BrokerConfig类是Broker配置类。


  • BrokerConfig:属性主要包罗Broker相关的配置属性,如Broker名字、Broker Id、Broker毗连的Name server地址、集群名字等。
  • NettyServerConfig:Broker netty服务端配置类,Broker netty服务端主要用来吸收客户端的哀求,NettyServerConfig类主要属性包罗监听接口、服务工作线程数、吸收和发送哀求的buffer大小等。
  • NettyClientConfig:netty客户端配置类,用于生产者、消耗者这些客户端与Broker进行通讯相关配置,配置属性主要包罗客户端工作线程数、客户端回调线程数、毗连超时时间、毗连不活泼时间隔断、毗连最大闲置时间等。
  • MessageStoreConfig:消息存储配置类,配置属性包罗存储路径、commitlog文件存储目录、CommitLog文件的大小、CommitLog刷盘的时间隔断等。
初始化配置信息

创建完这些配置类以后,接下来会为这些配置类的一些配置属性设置值,先看看如下代码:
  1. //源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
  2. public static BrokerController createBrokerController(String[] args) {
  3.      //省略代码
  4.      //如果命令中包含字母c,则读取配置文件,将配置文件的内容设置到配置类中
  5.       if (commandLine.hasOption('c')) {
  6.           String file = commandLine.getOptionValue('c');
  7.           if (file != null) {
  8.               configFile = file;
  9.               InputStream in = new BufferedInputStream(new FileInputStream(file));
  10.               properties = new Properties();
  11.               properties.load(in);
  12.               //读取配置文件的中namesrv地址
  13.               properties2SystemEnv(properties);
  14.               //将配置文件中的配置项映射到配置类中去
  15.               MixAll.properties2Object(properties, brokerConfig);
  16.               MixAll.properties2Object(properties, nettyServerConfig);
  17.               MixAll.properties2Object(properties, nettyClientConfig);
  18.               MixAll.properties2Object(properties, messageStoreConfig);
  19.               //设置配置broker配置文件
  20.               BrokerPathConfigHelper.setBrokerConfigPath(file);
  21.               in.close();
  22.           }
  23.       }
  24.        //设置broker配置类
  25.        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
  26.      //省略代码
  27. }
复制代码
上述主要的代码逻辑为假如下令行中存在下令参数为‘c’(c是configFile的缩写),那么就读取configFile文件的内容,将configFile配置文件的配置项映射到BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类中。接下来createBrokerController方法做一些判断必要配置的正当性,如下代码所示:
  1. //源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
  2. public static BrokerController createBrokerController(String[] args) {
  3.      //省略代码
  4.      //如果broker配置文件的rocketmqHome属性值为null,直接结束程序
  5.       if (null == brokerConfig.getRocketmqHome()) {
  6.              System.out.printf("Please set the %s variable in your environment to match the location of
  7.                                   the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
  8.              System.exit(-2);
  9.        }
  10.         //如果name server服务器的地址不为null
  11.        String namesrvAddr = brokerConfig.getNamesrvAddr();
  12.        if (null != namesrvAddr) {
  13.             try {
  14.                  //namesrvAddr是以";"分割的多个地址
  15.                  String[] addrArray = namesrvAddr.split(";");
  16.                  //每个地址是ip:port的形式,检测下是否形如ip:port的形式
  17.                  for (String addr : addrArray) {
  18.                      RemotingUtil.string2SocketAddress(addr);
  19.                  }
  20.               } catch (Exception e) {
  21.                   System.out.printf(
  22.                         "The Name Server Address[%s] illegal, please set it as follows,
  23.                            "127.0.0.1:9876;192.168.0.1:9876"%n",namesrvAddr);
  24.                   System.exit(-3);
  25.               }
  26.          }
  27.          //设置BrokerId,broker master 的BrokerId设置为0,broker slave 设置为大于0的值
  28.           switch (messageStoreConfig.getBrokerRole()) {
  29.               case ASYNC_MASTER:
  30.               case SYNC_MASTER:
  31.                   brokerConfig.setBrokerId(MixAll.MASTER_ID);
  32.                   break;
  33.                case SLAVE:
  34.                   //如果小于等于0,退出程序
  35.                   if (brokerConfig.getBrokerId() <= 0) {
  36.                       System.out.printf("Slave's brokerId must be > 0");
  37.                       System.exit(-3);
  38.                    }
  39.                    break;
  40.                  default:
  41.                     break;
  42.              }
  43.           //省略代码
  44. }
复制代码
首先会判断下RocketmqHome的值是否为空,RocketmqHome是Borker相关配置生存的文件目录,假如为空则直接退出步伐,启动Broker失败;然后判断下Name server 地址是否为空,假如不为空则解析以“;”分割的name server地址,检测下地址的正当性,假如不正当则直接退出步伐;最后判断下Broker的脚色,假如是master,BrokerId设置为0,假如是SLAVE,则BrokerId设置为大于0的数,否则直接退出步伐,Broker启动失败。
createBrokerController方法进行必要配置参数的判断以后,将进行日志的设置、以及打印配置信息,主要代码如下:
  1. //源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
  2. public static BrokerController createBrokerController(String[] args) {
  3.     //省略代码
  4.     //注释:日志设置
  5.     //printConfigItem 打印配置信息
  6.     if (commandLine.hasOption('p')) {
  7.         InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  8.         MixAll.printObjectProperties(console, brokerConfig);
  9.         MixAll.printObjectProperties(console, nettyServerConfig);
  10.         MixAll.printObjectProperties(console, nettyClientConfig);
  11.         MixAll.printObjectProperties(console, messageStoreConfig);
  12.         System.exit(0);
  13.     } else if (commandLine.hasOption('m')) {
  14.     //printImportantConfig 打印重要配置信息
  15.         InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  16.         MixAll.printObjectProperties(console, brokerConfig, true);
  17.         MixAll.printObjectProperties(console, nettyServerConfig, true);
  18.         MixAll.printObjectProperties(console, nettyClientConfig, true);
  19.         MixAll.printObjectProperties(console, messageStoreConfig, true);
  20.         System.exit(0);
  21.     }
  22.     //打印配置信息
  23.     log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
  24.     MixAll.printObjectProperties(log, brokerConfig);
  25.     MixAll.printObjectProperties(log, nettyServerConfig);
  26.     MixAll.printObjectProperties(log, nettyClientConfig);
  27.     MixAll.printObjectProperties(log, messageStoreConfig);
  28.     //代码省略
  29. }
复制代码
createBrokerController方法的以上代码逻辑打印配置信息,先判断下令行参数是否包含字母‘p’(printConfigItem的缩写),假如包含字母‘p’,则打印配置信息,否则判断下下令行是否包含字母‘m’,则打印被@ImportantField注解的配置属性,也就是紧张的配置属性。最后,不管下令行中是否存在字母‘p’或者字母‘m’,都打印配置信息。
以上就是初始化配置信息的全部代码,初始化配置信息主要是创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类,并为这些配置类设置配置的值,同时根据下令行参数判断打印配置信息。
初始化Broker控制器

  1. //源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
  2. public static BrokerController createBrokerController(String[] args) {
  3.     //省略代码
  4.      //创建BrokerController(broker 控制器)
  5.     final BrokerController controller = new BrokerController(
  6.            brokerConfig,
  7.            nettyServerConfig,
  8.            nettyClientConfig,
  9.            messageStoreConfig);
  10.     // remember all configs to prevent discard
  11.     //将所有的配置信息保存在内存
  12.     controller.getConfiguration().registerConfig(properties);
  13.    //初始化broker控制器
  14.     boolean initResult = controller.initialize();
  15.     //如果初始化失败,则退出
  16.     if (!initResult) {
  17.         controller.shutdown();
  18.          System.exit(-3);
  19.     }
  20.     //省略代码
  21. }
复制代码
创建并初始化Broker控制的代码比较简单,创建以配置类作为参数的BrokerController对象,并将所有的配置信息生存在内容中,方便在其他地方使用;创建完Broker控制器对象以后,对控制器进行初始化,当初始化失败以后,则直接退出步伐。
initialize方法主要是加载一些生存在当地的一些配置数据,总结起来做了如下几方面的事变:


  • 加载topic配置、消耗者位移数据、订阅组数据、消耗者过滤配置
  • 创建消息相关的组件,并加载消息数据
  • 创建netty服务器
  • 创建一系列线程
  • 注册处置惩罚器
  • 启动一系列定时任务
  • 初始化事务组件
  • 初始化acl组件
  • 注册RpcHook
加载topic配置、消耗者位移数据、订阅组数据、消耗者过滤配置

  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.    //加载topic配置 topics.json
  4.     boolean result = this.topicConfigManager.load();
  5.     //加载消费者位移数据 consumerOffset.json
  6.     result = result && this.consumerOffsetManager.load();
  7.     //加载订阅组数据 subscriptionGroup.json
  8.     result = result && this.subscriptionGroupManager.load();
  9.     //加载消费者过滤 consumerFilter.json
  10.     result = result && this.consumerFilterManager.load();
  11.     //省略代码
  12. }
复制代码
load方法是抽象类ConfigManager的方法,该方法读取文件的内容解码成对应的配置对象,假如文件中的内容为空,就读取备份文件中的内容进行解码。读取的文件都是生存在user.home/store/config/下,user.home是用户目录,不同人的电脑user.home一般不同。topicConfigManager.load()读取topics.json文件,假如该文件的内容为空,那么就读取topics.json.bak文件内容,topics.json生存的是topic数据;同理,consumerOffsetManager.load()方法读取consumerOffset.json和consumerOffset.json.bak文件,生存的是消耗者位移数据;subscriptionGroupManager.load()方法读取subscriptionGroup.json和subscriptionGroup.json.bak文件,生存订阅组数据(消耗者分组数据)、consumerFilterManager.load()方法读取的是consumerFilter.json和consumerFilter.json.bak的内容,生存的是消耗者过滤数据。
创建消息相关的组件,并加载消息数据

  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.         //省略代码
  4.         //如果上述都加载成功
  5.         if (result) {
  6.             try {
  7.                 //创建消息存储器
  8.                 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,                                this.messageArrivingListener,this.brokerConfig);
  9.                 //如果开启了容灾、主从自动切换,添加DLedger角色改变处理器
  10.                 if (messageStoreConfig.isEnableDLegerCommitLog()) {
  11.                     DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
  12.                     ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
  13.                 }
  14.                 //broker 相关统计
  15.                 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
  16.                 //load plugin
  17.                 //加载消息存储插件
  18.                 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
  19.                 this.messageStore = MessageStoreFactory.build(context, this.messageStore);
  20.                 this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
  21.             } catch (IOException e) {
  22.                 result = false;
  23.                 log.error("Failed to initialize", e);
  24.             }
  25.         }
  26.         //加载消息文件
  27.         result = result && this.messageStore.load();
  28.         //省略代码
  29. }
复制代码
假如加载topic配置、消耗者位移数据、订阅组数据、消耗者过滤配置成功以后,就创建消息相关的组件,并加载消息数据,这个过程创建了消息存储器、DLedger脚色改变处置惩罚器、Broker统计相关组件以及消息存储插件,然后加载消息文件中的数据。接下来详细看看加载消息文件中的messageStore.load()方法:
  1. //代码位置:org.apache.rocketmq.store.DefaultMessageStore#load
  2. public boolean load() {
  3.         boolean result = true;
  4.         try {
  5.             //判断abort是否存在
  6.             boolean lastExitOK = !this.isTempFileExist();
  7.             log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
  8.             //加载定时消费服务器
  9.             if (null != scheduleMessageService) {
  10.             //读取delayOffset.json文件
  11.                 result = result && this.scheduleMessageService.load();
  12.             }
  13.             // load Commit Log
  14.             //加载 Commit log 文件
  15.             result = result && this.commitLog.load();
  16.             // load Consume Queue
  17.             //加载消费者队列 文件consumequeue
  18.             result = result && this.loadConsumeQueue();
  19.             if (result) {
  20.                 //加载检查点文件checkpoint
  21.                 this.storeCheckpoint =
  22.                     new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
  23.                 //加载索引文件
  24.                 this.indexService.load(lastExitOK);
  25.                 //数据恢复
  26.                 this.recover(lastExitOK);
  27.                 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
  28.             }
  29.         } catch (Exception e) {
  30.             log.error("load exception", e);
  31.             result = false;
  32.         }
  33.         if (!result) {
  34.             this.allocateMappedFileService.shutdown();
  35.         }
  36.         return result;
  37. }
复制代码
load方法主要逻辑就是加载各种数据文件,主要有以下几方面进行加载数据:


  • isTempFileExist方法判断abort是否存在,假如不存在,阐明Broker是正常关闭的,否则就是非常关闭。
  • scheduleMessageService.load()方法读取user.home/store/config/下的delayOffset.json文件的内容,该文件内容生存延迟消息的位移数据。
  • commitLog.load()加载 CommitLog 文件, CommitLog 文件生存的是消息内容
  • loadConsumeQueue()方法加载consumequeue目录下的内容,ConsumeQueue(消息消耗队列)是消耗消息的索引,消耗者通过ConsumeQueue可以快速找到查找待消耗的消息,consumequeue目录下的文件组织方式是:topic/queueId/fileName,以是就可以快速找待消耗的消息在哪一个Commit log 文件中。
  • indexService.load(lastExitOK)加载索引文件,加载的是user.home/store/index/目录下文件,文件名fileName是以创建时的时间戳定名的,以是可以通过时间区间来快速查询消息,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故底层实现为hash索引。
  • recover(lastExitOK)方法将CommitLog 文件的内容加载到内存中以及topic队列。
创建netty服务器

  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.     //省略代码
  4.     if (result) {
  5.         //创建netty远程服务器
  6.         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
  7.          NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
  8.          fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
  9.          this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
  10.         //省略代码
  11.     }
  12.      //省略代码
  13. }
复制代码
创建netty服务器的时候创建了两个,一个是平凡的,一个是快速的,remotingServer用来与生产者、消耗者进行通讯。当isSendMessageWithVIPChannel=true的时候会选择port-2的fastRemotingServer进行的消息的处置惩罚,为了防止某些很紧张的业务阻塞,就再开启了一个remotingServer进行处置惩罚,但是如今默认是不开启的,fastRemotingServer主要是为了兼容老版本的RocketMQ.。
创建一系列线程池

  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.             //代码省略
  4.             //发送消息线程池
  5.             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
  6.                 this.brokerConfig.getSendMessageThreadPoolNums(),
  7.                 this.brokerConfig.getSendMessageThreadPoolNums(),
  8.                 1000 * 60,
  9.                 TimeUnit.MILLISECONDS,
  10.                 this.sendThreadPoolQueue,
  11.                 new ThreadFactoryImpl("SendMessageThread_"));
  12.             //拉取消息线程池
  13.             //this.pullMessageExecutor
  14.             //回复消息线程池
  15.             //this.replyMessageExecutor
  16.             //查询消息线程池
  17.             //this.queryMessageExecutor
  18.             //broker 管理线程池
  19.             //this.adminBrokerExecutor
  20.             //客户端管理线程池
  21.             //this.clientManageExecutor
  22.             //心跳线程池
  23.             //this.heartbeatExecutor
  24.             //事务线程池
  25.            // this.endTransactionExecutor
  26.             //消费者管理线程池
  27.             this.consumerManageExecutor =
  28.                 Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
  29.                     "ConsumerManageThread_"));
  30.             //代码省略
  31. }
复制代码
创建的线程池对象有发送消息线程池、拉取消息线程池、回复消息线程池、查询消息线程池、broker 管理线程池、客户端管理线程池、心跳线程池、事务线程池、消耗者管理线程池。
注册哀求处置惩罚器

  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.     //省略代码
  4.     //注册处理器
  5.      this.registerProcessor();
  6.     //省略代码
  7. }
复制代码
registerProcessor()方法如下:
  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#registerProcessor
  2. public void registerProcessor() {
  3.         //发送消息处理器
  4.         SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
  5.         sendProcessor.registerSendMessageHook(sendMessageHookList);
  6.         sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
  7.         //远程服务注册发送消息处理器
  8.         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
  9.         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
  10.         this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
  11.         this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
  12.         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
  13.         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
  14.         this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
  15.         this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
  16.         //注册拉消息处理器
  17.         //注册回复消息处理器
  18.         //注册查询消息处理器
  19.         //注册客户端管理处理器
  20.         //注册消费者管理处理器
  21.         //注册事务处理器
  22.          //注册broker处理器
  23.         AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
  24.         this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
  25.         this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
  26. }
复制代码
registerProcessor方法注册了发送消息处置惩罚器、远程服务注册发送消息处置惩罚器、拉消息处置惩罚器、回复消息处置惩罚器、查询消息处置惩罚器、客户端管理处置惩罚器、消耗者管理处置惩罚器、事务处置惩罚器、broker处置惩罚器。registerProcessor注册方法也很简单,就是以RequestCode作为key,以Pair<处置惩罚器,线程池>作为Value生存在名字为processorTable的HashMap中。每个哀求都是在线程池中处置惩罚的,这样可以提高处置惩罚哀求的性能。对于每个传入的哀求,根据RequestCode就可以在processorTable查找处置惩罚器来处置惩罚哀求。每个处置惩罚器都有有一个processRequest方法进行处置惩罚哀求。
启动一系列定时任务

Broker初始化方法initialize中,会启动一系列的配景定时线程任务,这些配景任务包罗都是由scheduledExecutorService线程池实行的,scheduledExecutorService是单线程线程池( Executors.newSingleThreadScheduledExecutor()),只用单线程线程池实行配景定时任务有一个利益就是减少线程过多,反而导致线程为了抢占CPU加剧了竞争。这一些配景定时线程任务如下:


  • 每24小时打印昨天产生了多少消息,消耗了多少消息
  • 每五秒生存消耗者位移到文件中
  • 每10秒生存消耗者过滤到文件中
  • 每3分钟定时检测消耗的进度
  • 每秒打印队列的大小以及队列头部元素存在的时间
  • 每分钟打印已存储在CommitLog中但尚未分派到消耗队列的字节数
  • 每两分钟定时获取获取name server 地址
  • 每分钟定时打印slave 数据同步掉队多少
  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.     //省略代码
  4.     final long period = 1000 * 60 * 60 * 24;
  5.     //每24小时打印昨天产生了多少消息,消费了多少消息
  6.      this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  7.             @Override
  8.             public void run() {
  9.               try {
  10.                    BrokerController.this.getBrokerStats().record();
  11.                } catch (Throwable e) {
  12.                     log.error("schedule record error.", e);
  13.                }
  14.              }
  15.       }, initialDelay, period, TimeUnit.MILLISECONDS);
  16.     //省略代码
  17. }
复制代码
每24小时打印昨天产生了多少消息,消耗了多少消息的定时任务比较简单,就是将昨天消息的生产和消耗的数目统计出来,然后把这两个指标打印出来。
  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.     //省略代码
  4.     //每五秒保存消费者位移
  5.     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  6.        @Override
  7.        public void run() {
  8.            try {
  9.                   BrokerController.this.consumerOffsetManager.persist();
  10.             } catch (Throwable e) {
  11.                   log.error("schedule persist consumerOffset error.", e);
  12.             }
  13.         }
  14.     }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  15.     //每10秒保存消费者过滤
  16.     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  17.          @Override
  18.         public void run() {
  19.           try {
  20.               BrokerController.this.consumerFilterManager.persist();
  21.           } catch (Throwable e) {
  22.                log.error("schedule persist consumer filter error.", e);
  23.           }
  24.         }
  25.     }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
  26.     //省略代码
  27. }
复制代码
每五秒生存消耗者位移和每10秒生存消耗者过滤定时任务都是生存在文件中,每五秒生存消耗者位移定时任务将消耗者位移生存在consumerOffset.json文件中,每10秒生存消耗者过滤定时任务将消耗者过滤生存在consumerFilter.json文件中。
  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.     //省略代码
  4.     //每3分钟定时检测消费的进度
  5.     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  6.            @Override
  7.            public void run() {
  8.                 try {
  9.                     BrokerController.this.protectBroker();
  10.                 } catch (Throwable e) {
  11.                     log.error("protectBroker error.", e);
  12.                 }
  13.             }
  14.      }, 3, 3, TimeUnit.MINUTES);
  15.     //省略代码
  16. }
复制代码
每3分钟定时检测消耗进度的定时任务的作用是检测消耗者的消耗进度,当消耗者消耗消息的进度掉队大于配置的最大掉队阈值时,就停止消耗者消耗,详细的实现看protectBroker的源码:
//源代码位置:org.apache.rocketmq.broker.BrokerController#protectBroker
public void protectBroker() {
//是否开启慢消耗检测开关,默认未开启
if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
//遍历统计项
final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<String, MomentStatsItem> next = it.next();
final long fallBehindBytes = next.getValue().getValue().get();
//消耗者消耗消息的进度掉队消耗者掉队阈值
if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
final String[] split = next.getValue().getStatsKey().split(“@”);
final String group = split[2];
LOG_PROTECTION.info(“[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it”, group, fallBehindBytes);
//设置消耗者消耗的标记,关闭消耗
this.subscriptionGroupManager.disableConsume(group);
}
}
}
}
protectBroker方法首先判别是否开启慢消耗检测开关,假如开启了,就进行遍历统计项,判断消耗者消耗消息的进度掉队消耗者掉队阈值的时候,就停止该消耗者停止消耗来保护broker,假如消耗者消耗比较慢,那么在Broker的消耗会越来越多,积压在Broker上,以是停止慢消耗者消耗消息,让其他消耗者消耗,减少消息的积压。
  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
  2. public boolean initialize() throws CloneNotSupportedException {
  3.     //代码省略
  4.     //每秒打印队列的大小以及队列头部元素存在的时间
  5.     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  6.          @Override
  7.          public void run() {
  8.              try {
  9.                   BrokerController.this.printWaterMark();
  10.               } catch (Throwable e) {
  11.                    log.error("printWaterMark error.", e);
  12.              }
  13.         }
  14.      }, 10, 1, TimeUnit.SECONDS);
  15.     //代码省略
  16. }
复制代码
每秒打印队列的大小以及队列头部元素存在的时间定时任务,会打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小,以及打印队列头部元素存在的时间,这个时间等于当前时间减去头部元素创建的时间,就是该元素创建到如今已经耗费了多长时间。详细的代码如下:
  1. //源代码位置:org.apache.rocketmq.broker.BrokerController#headSlowTimeMills
  2. public long headSlowTimeMills(BlockingQueue<Runnable> q) {
  3.         long slowTimeMills = 0;
  4.         //队列的头
  5.         final Runnable peek = q.peek();
  6.         if (peek != null) {
  7.             RequestTask rt = BrokerFastFailure.castRunnable(peek);
  8.             //当前时间减去创建时间
  9.             slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
  10.         }
  11.         if (slowTimeMills < 0) {
  12.             slowTimeMills = 0;
  13.         }
  14.         return slowTimeMills;
  15. }
复制代码
初始化事务消息

  1. //源码位置:org.apache.rocketmq.broker.BrokerController#initialTransaction
  2. private void initialTransaction() {
  3.         //加载transactionalMessageService,利用spi
  4.         this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
  5.         if (null == this.transactionalMessageService) {
  6.             this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
  7.             log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
  8.         }
  9.         //创建transactionalMessage检查监听器
  10.         this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
  11.         if (null == this.transactionalMessageCheckListener) {
  12.             this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
  13.             log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
  14.         }
  15.         this.transactionalMessageCheckListener.setBrokerController(this);
  16.         //创建事务消息检查服务
  17.         this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
  18. }
复制代码
initialTransaction方法主要创建与事务消息相关的类,创建transactionalMessageService(事务消息服务)、transactionalMessageCheckListener(事务消息检查监听器)、transactionalMessageCheckService(事务消息检查服务)。transactionalMessageService用于处置惩罚事务消息,transactionalMessageCheckListener主要用来回查消息监听,transactionalMessageCheckService用于检查超时的 Half 消息是否需要回查。RocketMQ发送事务消息是将消耗先写入到事务相关的topic的中,这个消息就称为半消息,当当地事务成功实行,那么半消息会还原为原来的消息,然后再进行生存。initialTransaction在创建transactionalMessageService和transactionalMessageCheckListener都使用了ServiceProvider.loadClass方法,这个方法就是采用SPI原理,SPI原理就是利用反射加载META-INF/service目录下的某个接口的所有实现,只要实现接口,然后META-INF/service目录下添加文件名为全类名的文件,这样SPI就可以加载详细的实现类,具有可拓展性。
初始化acl组件

  1. //源码位置:org.apache.rocketmq.broker.BrokerController#initialAcl
  2. private void initialAcl() {
  3.         if (!this.brokerConfig.isAclEnable()) {
  4.             log.info("The broker dose not enable acl");
  5.             return;
  6.         }
  7.         //利用SPI加载权限相关的校验器
  8.         List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
  9.         if (accessValidators == null || accessValidators.isEmpty()) {
  10.             log.info("The broker dose not load the AccessValidator");
  11.             return;
  12.         }
  13.         //将所有的权限校验器进行缓存以及注册
  14.         for (AccessValidator accessValidator: accessValidators) {
  15.             final AccessValidator validator = accessValidator;
  16.             accessValidatorMap.put(validator.getClass(),validator);
  17.             this.registerServerRPCHook(new RPCHook() {
  18.                 @Override
  19.                 public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
  20.                     //Do not catch the exception
  21.                     validator.validate(validator.parse(request, remoteAddr));
  22.                 }
  23.                 @Override
  24.                 public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
  25.                 }
  26.             });
  27.         }
  28.     }
复制代码
initialAcl方法主要是加载权限相关校验器,RocketMQ的相关的管理的权限验证和安全就交给这里的加载的校验器了。initialAcl方法也利用SPI原理加载接口的详细实现类,将所有加载的校验器缓存在map中,然后再注册RPC钩子,在哀求之前调用校验器的validate的方法。
注册RpcHook

  1. //源码位置:org.apache.rocketmq.broker.BrokerController#initialRpcHooks
  2. private void initialRpcHooks() {
  3.       //利用SPI加载钩子
  4.       List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
  5.       if (rpcHooks == null || rpcHooks.isEmpty()) {
  6.            return;
  7.        }
  8.         //注册钩子
  9.        for (RPCHook rpcHook: rpcHooks) {
  10.            this.registerServerRPCHook(rpcHook);
  11.        }
  12. }
复制代码
initialRpcHooks方法加RPC钩子,利用SPI原理加载详细的钩子实现,然后将所有的钩子进行注册,钩子的注册是将钩子生存在List中。
以上分析就是创建Broker控制器的全过程,这个过程首先辈行一些必要的初始化配置,如Broker配置、网络通讯Neety配置以及存储相关配置等。然后在创建并初始化Broker控制器,创建并初始化Broker控制器的过程中,又进行了多个步调,如加载topic配置、消耗者位移数据、启动一系列配景定时任务、创建事务消息相关组件等。
Broker控制器的启动

  1. //源码位置:org.apache.rocketmq.broker.BrokerController#start
  2. public static BrokerController start(BrokerController controller) {
  3.         try {
  4.             //Broker控制器启动
  5.             controller.start();
  6.             //打印Broker成功的消息
  7.             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
  8.                 + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
  9.             if (null != controller.getBrokerConfig().getNamesrvAddr()) {
  10.                 tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
  11.             }
  12.             log.info(tip);
  13.             System.out.printf("%s%n", tip);
  14.             return controller;
  15.         } catch (Throwable e) {
  16.             e.printStackTrace();
  17.             System.exit(-1);
  18.         }
  19.         return null;
  20. }
复制代码
controller.start()方法主要是启动各种组件:


  • 启动消息消息存储器
  • netty服务的启动
  • 文件监听器启动
  • broker 对外api启动
  • 长轮询拉取消息服务启动
  • 客户端长毗连服务启动
  • 过滤服务管理启动
  • broker 相关统计启动
  • broker 快速失败启动
  1. //源码位置:org.apache.rocketmq.broker.BrokerController#start
  2. public void start() throws Exception {
  3.         if (this.messageStore != null) {
  4.             //启动消息消息存储
  5.             this.messageStore.start();
  6.         }
  7.         if (this.remotingServer != null) {
  8.             //netty服务的启动
  9.             this.remotingServer.start();
  10.         }
  11.         if (this.fastRemotingServer != null) {
  12.             this.fastRemotingServer.start();
  13.         }
  14.         //文件改变监听启动
  15.         if (this.fileWatchService != null) {
  16.             this.fileWatchService.start();
  17.         }
  18.         //broker 对外api启动
  19.         if (this.brokerOuterAPI != null) {
  20.             this.brokerOuterAPI.start();
  21.         }
  22.         //保持长轮询请求的服务启动
  23.         if (this.pullRequestHoldService != null) {
  24.             this.pullRequestHoldService.start();
  25.         }
  26.         //客户端长连接服务启动
  27.         if (this.clientHousekeepingService != null) {
  28.             this.clientHousekeepingService.start();
  29.         }
  30.         //过滤服务管理启动
  31.         if (this.filterServerManager != null) {
  32.             this.filterServerManager.start();
  33.         }
  34.         //如果没有采用主从切换(多副本)
  35.         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  36.             startProcessorByHa(messageStoreConfig.getBrokerRole());
  37.             handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
  38.             this.registerBrokerAll(true, false, true);
  39.         }
  40.         //定时注册broker
  41.         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  42.             @Override
  43.             public void run() {
  44.                 try {
  45.                     BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
  46.                 } catch (Throwable e) {
  47.                     log.error("registerBrokerAll Exception", e);
  48.                 }
  49.             }
  50.         }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
  51.         //broker 相关统计启动
  52.         if (this.brokerStatsManager != null) {
  53.             this.brokerStatsManager.start();
  54.         }
  55.         //broker 快速失败启动
  56.         if (this.brokerFastFailure != null) {
  57.             this.brokerFastFailure.start();
  58.         }
  59. }
复制代码
启动过程另有很多细节没有分析,放到下个文章吧吧吧吧吧
文章大量参考:https://www.zhihu.com/column/c_1437729921845690368

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立聪堂德州十三局店

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表