scanNotActiveBroker方法的逻辑主要是遍历缓存在brokerLiveTable的Broker,将Broker最后更新时间加上120秒的结果是否小于当前时间,假如小于当前时间,阐明Broker已经逾期,大概是已经下线了,以是可以扫除Broker信息,并且关闭Name Server 服务器与Broker服务器毗连,这样被扫除的Broker就不会与Name Server服务器进行远程通讯了。brokerLiveTable的结果如下:
//保存broker地址与broker存活信息的对应关系
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
start方法做了两件事,第一件就是启动netty服务器,netty服务器主要负责与Broker、生产者与消耗者之间的通讯,处置惩罚Broker、生产者与消耗者的不同哀求。根据nettyConfig配置,设置启动的配置和各种处置惩罚器,然后采用netty服务器启动的模板启动服务器,详细的代码就不分析了,有爱好的可以看看netty启动代码模板是怎么样的。第二件事就是启动文件监听线程,监听tts相关文件是否发生变化。
Name Server 服务器启动流程的源代码分析到此为止了,在这里总结下Name Server 服务器启动流程主要做了什么事:
加载和读取配置。设置Name Server 服务器启动的配置NamesrvConfig和启动Netty服务器启动的配置NettyServerConfig。
在上面分析的Name Server 服务器的启动过程中,也有一个与Broker管理相关的分析,那就是启动一个定时线程池每十秒去扫描不活泼的Broker。将不活泼的Broker清理掉。除了在Name Server 服务器启动时启动定时任务去扫描不活泼的Broker外,Name Server 服务器启动以后,通过netty服务器吸收Broker、生产者、消耗者的不同哀求,将吸收到哀求会交给在Name Server服务器启动时注册的处置惩罚器DefaultRequestProcessor类的processRequest方法处置惩罚。processRequest方法根据哀求的不同范例,将哀求交给不同的方法进行处置惩罚。有关Broker管理的哀求主要有注册Broker、注销Broker,processRequest方法处置惩罚注册Broker、注销Broker哀求的代吗如下:
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
//省略代码
}
复制代码
首先会判断下RocketmqHome的值是否为空,RocketmqHome是Borker相关配置生存的文件目录,假如为空则直接退出步伐,启动Broker失败;然后判断下Name server 地址是否为空,假如不为空则解析以“;”分割的name server地址,检测下地址的正当性,假如不正当则直接退出步伐;最后判断下Broker的脚色,假如是master,BrokerId设置为0,假如是SLAVE,则BrokerId设置为大于0的数,否则直接退出步伐,Broker启动失败。
createBrokerController方法进行必要配置参数的判断以后,将进行日志的设置、以及打印配置信息,主要代码如下:
每3分钟定时检测消耗进度的定时任务的作用是检测消耗者的消耗进度,当消耗者消耗消息的进度掉队大于配置的最大掉队阈值时,就停止消耗者消耗,详细的实现看protectBroker的源码:
//源代码位置:org.apache.rocketmq.broker.BrokerController#protectBroker
public void protectBroker() {
//是否开启慢消耗检测开关,默认未开启
if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
//遍历统计项
final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<String, MomentStatsItem> next = it.next();
final long fallBehindBytes = next.getValue().getValue().get();
//消耗者消耗消息的进度掉队消耗者掉队阈值
if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
final String[] split = next.getValue().getStatsKey().split(“@”);
final String group = split[2];
LOG_PROTECTION.info(“[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it”, group, fallBehindBytes);
//设置消耗者消耗的标记,关闭消耗
this.subscriptionGroupManager.disableConsume(group);
}
}
}
}
protectBroker方法首先判别是否开启慢消耗检测开关,假如开启了,就进行遍历统计项,判断消耗者消耗消息的进度掉队消耗者掉队阈值的时候,就停止该消耗者停止消耗来保护broker,假如消耗者消耗比较慢,那么在Broker的消耗会越来越多,积压在Broker上,以是停止慢消耗者消耗消息,让其他消耗者消耗,减少消息的积压。