本文主要记录redis cluster相关流程 2024/11/11 10:19
redis cluster命令文档:https://redis.io/docs/latest/commands/cluster-failover/
redis集群模式和哨兵模式是差别的模式,别搞混了 --chatgpt
https://blog.csdn.net/zzti_erlie/article/details/122900856
https://xie.infoq.cn/article/e1555e88470e48f301626f35c
https://xie.infoq.cn/article/8835096d9845529fc9845bc99
本文应包罗:集群启动、处理命令、集群处理比如扩容缩容迁徙导入负载平衡
clusterCronclusterProcessPacket
!!!redis-cluster中投票只会统计master节点的投票,从节点的一律会忽略,比如收到一个来自x的ping包,陈诉y节点故障了,假如x是master节点则y_failure_count+=1,假如x是slave节点,则直接忽略这条故障陈诉信息,y_failure_count不变
1:client对应的fd上的read变乱处理流程
getNodeByQuery函数中关于migrate、importing状态的slot的处理流程临时略了
cluster meet:
meet包是要求吸收者强制接受的纵然发送者未知,而ping/pong包则是假如来源未知则会丢弃包
CLUSTER MEET 用于将启用了集群支持的差别 Redis 节点连接到工作集群中。。其根本思想是节点默认情况下不相互信任,并且被认为是未知的。因此,为了让一个给定的节点接受另一个节点进入构成 Redis 集群的节点列表,只有两种方法:第一种是系统管理员发送 CLUSTER MEET 命令来强制节点a与未知节点b碰面,对于a来说b是unknow的,但是由于是cluster meet命令,以是a会接受b节点,但是假如不是cluster meet命令且b未知,那么a就会忽略这条消息。第二中是已知的节点c会发给a的相关ping/pong信息的节点列表中含有未知节点d的信息,假如吸收节点a信任发送节点c作为已知节点,那么a就会接受d并实验与d建立连接。发送meet后,a会生存b节点的信息,然后通过ping/pong消息返回b节点的信息,由于对于集群中的其他节点来说,a是受信任的节点,以是都会接受a返回的b的信息。
manulfailover:这个命令只能发送给slave
今天周五,leader调休,好像是去珠海看航展了,今天不想弄了,弄了三四天cluster流程了,大抵流程是明白了,只是具体的细节没有理清而已,以及没有去画流程图了,大头已经搞完了,已经没有精神再搞了,得换个项目了,由于每次都是集中精神三四天就感觉身材被掏空,大脑就抗拒了,非常不想看了,效率就很低了,以是集中攻坚的时间晚上往往能加班到11点,而低落期到了下午三四点就有点磨洋工了。。。以是此时就先放下来,后续再去弄,除了工作外,此时适宜休息耍手机大概换一个项目,由于有新鲜感。 20241115 11:03
manulfailover流程:
- 1:slave告诉他的master停止处理来自客户端的查询。
- 2:master用当前的复制偏移量回复副本。
- 3:slave等待复制偏移在其一侧匹配,以确保slave数据和master同步
- 4:slave启动故障转移,就是开启投票即发送auth请求,如果过半master都同意则可以完成故障转移
- 5:旧master收到配置更新:因为master已经变了,所以这里解除对客户端的阻止后需要回复重定向消息,以便client可以重新连接到新master。
- 通过这种方式,客户端会原子地从旧主服务器移至新主服务器,并且仅当正在转变为新主服务器的副本已处理来自旧主服务器的所有复制流时。
复制代码- fe->rfileProc(fe->clientData)
- ......
- networking.processInputBuffer
- ......
- server.processCommand
- if server.cluster_enabled: #如果是集群,则丢给对应的节点去处理
- if (server.cluster_enabled &&!mustObeyClient(c) && #如果开启了集群模式
- !(!(c->cmd->flags&CMD_MOVABLE_KEYS) && #并且!(当前命令不可迁移(即必须本机处理)&&命令中的keyNum==0,)
- c->cmd->key_specs_num == 0
- &&c->cmd->proc != execCommand))
- cluster.getNodeByQuery #获取对应的处理节点
- #!!!检查命令对应的key是否全都位于同一个slot,如果不是,则报错
- #!!!如果正处于事务状态,那么要求事务的所有命令的所有key都必须位于同一个slot中
- #!!!如果slot处于migrateing或者importing状态,则需要进行特定处理,暂略
- if cmd==exec: #如果当前命令是exec命令表示要执行事务了,此时需检查事务命令队列中的所有命令
- ms = &c->mstate; #c->mstate表示事务命令队列
- else:
- ms = [当前单条命令]
- for cmd in ms: #遍历ms中的所有命令
- for key in cmd: #遍历该命令涉及的所有key
- thisslot = cluster.keyHashSlot #获取key所在的slot
- n = getNodeBySlot(slot) #根据slot获取节点
- if thisslot!=slot: #表示不等于在此之前的key所在的slot
- err_code=CLUSTER_REDIR_CROSS_SLOT #那么我们就立即返回错误
- return null
- if node==当前节点:
- return 当前节点
- else
- return n
- if (n == NULL || !clusterNodeIsMyself(n)) #如果不是当前节点进行处理,那么进行重定向
- cluster.clusterRedirectClient #直接通知客户端重定向到其他节
- networking.addReplyError #重定向是通知客户端重定向,而不是当前节点负责路由请求
- return
- else: #如果是本机处理,则继续往下执行
- ......
- server.call
复制代码 2:cluster启动
- server.main
- if server.cluster_enabled:
- cluster_legacy.clusterInit
- if server.cluster_enabled:
- cluster_legacy.clusterInitLast
- ...init server.clistener,略...
- server.createSocketAcceptHandler(&server.clistener, clusterAcceptHandler)
- if !server.sentinel_mode:
- aof.aofLoadManifestFromDisk
- server.loadDataFromDisk
- aof.aofOpenIfNeededOnServerStart
- aof.aofDelHistoryFiles
- aof.applyAppendOnlyConfig
- if (server.cluster_enabled) {
- cluster_legacy.verifyClusterConfigWithData
复制代码 2.1:clusterCron定时任务
pfail:Possibly FAILed,即检测到长时间没有通信了,大概挂掉了,只是大概,以是叫pfail;
fail:已经确定该节点失败了,以是标记为failed
redis-cluster中任意两个节点之间都会建立连接
手动故障转移:管理员发现某个主节点崩溃了,以是在他的某个从节点上主动执行故障转移命令,如许该从节点就会启动手动故障处理流程。由于有些master节点是孤立的主节点,没有从节点,以是需要思量这种状态。
可以通过slave migration来把master a节点的从节点slave a变成master b的从节点,在master b是孤立主节点时利用。
https://www.cnblogs.com/shanml/p/16391649.html
故障转移需要其他master节点授权
!!!当 Redis 集群中的主节点故障时,大概会导致从节点被提升为新的主节点,由于从节点往往会落后master节点,以是在这个过程中确实大概导致数据丢失,而客户端往往无法察觉到这种数据丢失,以是有时就会出现莫名其妙的bug,我明显写入了,怎么找不到,以是redis就只能作为缓存等对一致性要求不高的
从节点故障不用管,标记一下就行,但是假如master故障了,就要进行故障转移了
手动故障转移做的事变:
- 从节点角度:
- 1:用户发送 CLUSTER FAILOVER 命令。故障转移状态被初始化,并将 mf_end 设置为 Unix 毫秒时间,当该时间到达时我们将中止故障转移尝试。
- 2:从节点向主节点发送 MFSTART 消息,请求暂停客户端操作,暂停时间为手动故障转移超时 CLUSTER_MF_TIMEOUT 的两倍。当主节点被暂停进行手动故障转移时,它还会开始将数据包标记为 CLUSTERMSG_FLAG0_PAUSED。
- 3:从节点等待主节点发送其复制偏移量,且该偏移量会被标记为 PAUSED。
- 4:如果从节点接收到主节点的偏移量,并且其偏移量匹配,mf_can_start 被设置为 1,然后 clusterHandleSlaveFailover() 将按常规方式执行故障转移,不同之处在于投票请求会被修改,强制主节点投票给一个有正常主节点的从节点。
- 主节点的角度:
- 当主节点接收到 PAUSE_CLIENTS 数据包时,主节点也会设置 mf_end 并将发送者设置为 mf_slave。在手动故障转移的时间限制内,主节点会更频繁地向该从节点发送 PING,并标记为 PAUSED,以便从节点在接收到带有该标记的数据包时设置 mf_master_offset。
- 手动故障转移的目标是手动触发后cluster集群执行一个快速的故障转移,避免由于异步主从复制导致的数据丢失。
复制代码 clusterCron源码流程
2.2 beforesleep
- server.main
- server.el=ae.aeCreateEventLoop #创建事件循环
- ae.aeMain(server.el)
- while(!el.stop): #就一个死循环,不断处理事件,如果没有事件就阻塞
- ae.aeProcessEvents
- ae.aeEventLoop.beforesleep
- server.beforeSleep
- if server.cluster_enable
- cluster_legacy.clusterBeforeSleep
- if flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER: #1:clusterCron里面会执行一遍这样的代码,而beforeSleep又执行一遍
- #因为clusterCron是100ms才执行一次
- if nodeIsSlave(myself):
- cluster_legacy.clusterHandleManualFailover #设置标记,下次还会继续执行故障处理
- #这个函数就是检测是否可以开始manual failover
- #如果可以开始,就打上HANDLER_FAILOVER标记
- #反之则打上MANUALFAILOVER即表示还不能开始下次继续检测
- #beforesleep中会检测,如果打上了MANUALFAILOVER标记
- #则beforesleep知道之前检测时还没准备好就再次调用
- #clusterHandleManualFailover来检测和打标机
- #以便在下一轮beforesleep中处理
- if !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER):
- clsuter_legacy.clusterHandleSlaveFailover #处理故障转移,即manual failure就是尝试把slave提升为master
- #处理完后会清除manual failure标记,就不会再次处理了
- #如果条件不满足则会直接返回而不执行故障转移
- #这一块的逻辑就是:我调用clusterHandleManualFailover检测一下打标记
- #然后不管条件满不满足我都执行一下clusterHandlerSlaveFailover
- #如果满足,就执行,如果不满足,函数会自动跳过,
- #所以这里才会不管条件满不满足都执行clusterHandlerSlaveFailover
- else if flags & CLUSTER_TODO_HANDLE_FAILOVER: #如果是自动故障转移就直接处理了
- cluster_legacy.clusterHandleSlaveFailover
- if flags & CLUSTER_TODO_UPDATE_STATE: #更新集群状态
- cluster_legacy.clusterUpdateState
- if flags & CLUSTER_TODO_SAVE_CONFIG: #持久化当前集群状态到磁盘
- cluster_legacy.clusterSaveConfigOrDie(fsync);
-
复制代码 2.3处理ping/pong消息:clusterReadHandler
两个节点一主一从,含有相同的shardid则可以肯定从节点一定是该节点的从节点
redis收到其他节点发来的消息的时间会判定是否是已知节点,假如否,则会丢弃掉该消息
- -------------------------------------------------------------------------------------------------------------------------------------------------
- cluster_legacy.clusterReadHandler
- connection.connRead #读取数据
- cluster_legacy.clusterProcessPacket
- sender = getNodeFromLinkAndMsg(link, hdr) #获取消息发送者即sender,会根据sender来判断是否是已知节点
- #!!!强调一下:sender是旧信息可能过时
- #!!!hdr中信息是最新的(仅当hdr->epoch>server.epoch)
- #!!!如果sender和hdr对不上,那么就可以判断出许多信息
- #!!!比如原来是slave现在是master,而且还在同一个分片
- #!!!那么就可以判断他发生了failure
- #对于未知节点发来的消息会直接丢弃
- if link->node && !nodeInHandshake(link->node): #link->node是连接对象中保存的节点信息
- #handshake表示该连接还处于握手阶段,不算有效连接
- sender = link->node;
- else:
- sender = clusterLookupNode(hdr->sender, CLUSTER_NAMELEN) #hdr是收到的包中的sender信息,去集群查找该节点是否存在
- #也就是说sender信息保存在link或者hdr中
- #如果link和hdr中的对不上说明集群发生了变化或者消息来源未知
- if sender && !link->node:
- setClusterNodeToInboundClusterLink(sender, link) #设置该连接为inbound类型,即对方发起的连接而不是他主动连接的
- 1:根据收到的信息来更新本节点上保存的该节点的信息 #1:根据收到的信息来更新本节点上保存的该节点的信息
- if sender: #如果sender不为空则更新该sender对应的data_received时间
- sender->data_received = now
- if sender && !nodeInHandshake(sender): #如果连接有效则根据收到的hdr中的信息更新一些信息
- if senderCurrentEpoch > server.cluster->currentEpoch: #如果hdr中的epoch>server保存的epoch,
- server.cluster->currentEpoch = senderCurrentEpoch #则更新server的epoch
- if senderConfigEpoch > sender->configEpoch: #同理,更新configEpoch
- sender->configEpoch = senderConfigEpoch
- clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG) #标记beforesleep中要持久化集群状态
- sender->repl_offset = ntohu64(hdr->offset); #更新server保存的sender的repl_offset
- if server.cluster->mf_end && #这个if是从节点处理收到master发来的repl_offset的逻辑
- nodeIsSlave(myself) && #mf_end!=0表示正处于manualfailover,并且当前是从节点
- myself->slaveof == sender && #sender正好是当前节点的master节点
- hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
- server.cluster->mf_master_offset == -1) #并且还没有收到来到master的repl_offset
- {
- server.cluster->mf_master_offset = sender->repl_offset #所以设置mf过程中用到的mf_master_offset字段
- #一旦设置就标志manualfailure(即mf)过程可以开始了
- clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER) #beforesleep中检测mf是否可以开始
- }
- 2:开始处理ping/meet,并返回pong消息作为响应 #2:开始处理ping/meet,并返回pong消息作为响应
- if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
- 2.1:根据收到的meet消息来更新本节点ip #2.1:根据收到的meet消息来更新本节点ip
- if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') && #如果是meet消息且本节点对外ip为空,则设置本节点ip
- server.cluster_announce_ip == NULL: #meet消息就是集群发给准备加入集群的新节点的
- { #也就是说当前节点就是要加入集群的新节点
- connAddrSockName(link->conn,ip,sizeof(ip) #获取ip
- memcpy(myself->ip,ip,NET_IP_STR_LEN); #设置本机ip
- clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); #持久化集群状态
- }
- 2.2:如果是新节点消息,则添加该节点到节点列表并持久化并通过gossip广播 #2.2:如果是新节点消息,则添加该节点到节点列表并通过gossip广播
- if !sender && type == CLUSTERMSG_TYPE_MEET: #未知节点发来的meet消息,则保存该节点的信息
- node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE) #创建节点信息
- clusterAddNode(node); #添加节点到节点列表
- clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
- clusterProcessGossipSection #处理发来的信息里的gossip部分的内容
- clusterSendPing(CLUSTERMSG_TYPE_PONG) #发送pong消息
- 3:处理 ping/pong/meet消息 #3:处理 ping/pong/meet消息
- if type == CLUSTERMSG_TYPE_PING / CLUSTERMSG_TYPE_PONG / CLUSTERMSG_TYPE_MEET:
- if !link->inbound: #如果是本节点主动建立的连接
- if nodeInHandshake(link->node): #如果连接处于handshake状态
- if sender: #但是我们收到了来自该节点的有效信息
- ##则直接创建对应node结构体并结束handshake状态
- nodeUpdateAddressIfNeeded(sender,link,hdr)
- clusterDelNode(link->node);
- return 0;
- clusterRenameNode
- link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; #清除handshake标记
- link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
- else if link->node->name!=hdr->sender: #如果不出handshake阶段并且link中保存的
- #与收到的hdr消息包中的sender对不上说明集群可能发生了变化
- freeClusterLink(link); #则直接断开该连接,因为不与不知道的节点通信
- if sender && type == CLUSTERMSG_TYPE_PING &&!nodeInHandshake(sender): #如果是ping消息则先更新对应的ip地址信息
- nodeUpdateAddressIfNeeded(sender,link,hdr) #更新对应的ip地址信息
- if !link->inbound && type == CLUSTERMSG_TYPE_PONG) #如果是收到pong消息,则表明此时节点正常
- if nodeTimedOut(link->node): 什么情况下会pfail #所以如果node因为超时被打伤pfail了
- link->node->flags &= ~CLUSTER_NODE_PFAIL; #那么这里就要清掉pfail,因为收到他的pong消息了
- else if nodeFailed(link->node): #如果该节点已经被标记为fail的
- #那么就不能直接清除要根据情况来判断是否可以清除fail标记
- clearNodeFailureIfNeeded(link->node); #也就是说一个被标记为failed的节点,只要再次收到他的消息
- #那么就可以认为他可能是正常的,某些情况下可以清除fail标记
- if nodeIsSlave(node) || node->numslots == 0: #如果是slave节点或者虽然是master节点但是分配的slots数为0
- #即这个master节点不对外服务,那么就可以清除这个slave或master的fail标记
- serverLog("Clear FAIL state for node xxx is reachable again."); #再次说明:只要能访问就可以认为这个节点是可以正常通信的
- #只是可以正常通信,但是节点不一定正常
- #所以只能在某些情况下认为通新正常就是节点正常
- node->flags &= ~CLUSTER_NODE_FAIL; #这里清除fail标记
-
- if (clusterNodeIsMaster(node) && node->numslots > 0 && #如果是master节点,并且slots数不为0,
- (now - node->fail_time) > (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT)) #并且被标记为fail的时间超过了阈值
- #那么我们就可以认为该master是重新可达的可以清除fail标记
- {
- serverLog("Clear FAIL state for node xx: is reachable again and nobody is serving its slots after some time.")
- node->flags &= ~CLUSTER_NODE_FAIL #清除fail标记
- }
-
- #检查是否有slave->master或者master->slave的转变
- if sender {
- if hdr->slaveof==null_name: #如果salveof字段为空则表明该节点现在是master角色
- #所以这里就是检查是否有slave->master的转变
- clusterSetNodeAsMaster(sender) #设置这个节点是master
- if clusterNodeIsMaster(n): #如果已经是master了则返回
- return
- if (n->slaveof) { #如果是slave,则从原master的slave列表摘除这个节点
- clusterNodeRemoveSlave(n->slaveof,n)
- if n != myself:
- n->flags |= CLUSTER_NODE_MIGRATE_TO #暂不清楚,这里就是标记这个节点正处于迁移状态
- n->flags |= CLUSTER_NODE_MASTER #设置node为master,就是打上一个标记
- n->slaveof = NULL
- else #节点当前是slave,这里就是检查是否有master->slave的转变
- clusterNode *master = clusterLookupNode(hdr->slaveof, CLUSTER_NAMELEN) #在hdr信息中查找该节点的原master
- if clusterNodeIsMaster(sender): #如果sender原来是个master,现在是slave,那么就要更新吸纳骨干信息
- if master && master->shard_id==sender->shard_id: #shardid相同表示slave是从原来的master变成了slave,还是在同一个分片
- if sender->configEpoch > senderConfigEpoch:
- serverLog
- else
- clusterMoveNodeSlots(sender, master) #所以就把sender上的slots更新到master中
- clusterSetNodeAsMaster(master) #设置hdr信息包中的master节点的信息为master
- master->configEpoch = senderConfigEpoch #同时更新epoch
- #!!!笔记(半猜测):redis cluster各节点之间ping/pong不是直接同步最终数据集
- #!!!即同步的时候一个节点不会把他看到的整个集群状态在ping/pong中同步给其他人
- #!!!而是同步他自身的状态和一部分信息,这样另一个节点收到该节点信息的时候
- #!!!就必须根据信息去对自身看到的集群状态数据及做不同的处理
- #!!!也就是说每个节点都维护着自己看到的集群状态,并且定期接收其他节点的状态更新
- #!!!但是这些更新不会包含完整的集群状态,而是通过故障、角色变化等事件来逐步
- #!!!更新某个节点所看到的集群状态,并最终达到一致。举个例子:
- #!!!这里从sender发来的信息知道原来是master现在变成了slave
- #!!!而原来他的某个slave变成了他的master,
- #!!!所以当前节点就知道原先分配给这个slave的槽现在应该分配给新master了
- #!!!所以当前节点就据此去处理自己看到的集群状态数据集
- #!!!即这里就是把sender的slots信息复制到master对象对应的slots信息字段中
- #!!!也就是说sender发来的信息只是告诉当前节点发生了什么以及一部分数据
- #!!!并不是直接告诉当前节点整个集群是什么状态
-
- else:
- clusterDelNodeSlots(sender) #sender和master不是同一个分片了,所以直接删除sender的slots就行了
- #因为此时sender是slave
- sender->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO) #清除MASTER和MIGRATE_TO标记,暂时不懂
- sender->flags |= CLUSTER_NODE_SLAVE; #标记为slave
- if master && sender->slaveof != master: #原先sender记录的不是这个master,但现在是这个master
- if sender->slaveof: #那么就从sender原master的slave表中移除这个节点
- clusterNodeRemoveSlave(sender->slaveof,sender)
- clusterNodeAddSlave(master,sender); #新master的slave列表添加当前节点
- sender->slaveof = master
- updateShardId(sender, master->shard_id)
-
- ...更新slots信息并广播,略... #!!!上面是更新节点状态,这里是更新slots信息并广播,略
- clusterSendUpdate(sender->link,server.cluster->slots[j]) #广播槽更新信息
- break;
- ...处理epoch冲突问题,略... #处理epoch冲突问题,略...
- clusterHandleConfigEpochCollision(sender)
- if sender:
- clusterProcessGossipSection(hdr,link) #处理gossip部分的信息
- clusterProcessPingExtensions(hdr,link)
- else if type == CLUSTERMSG_TYPE_FAIL: #处理fail消息,就是处理已经确定fail的节点
- if (sender) {
- if (failing &&!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)): #就是清除pfail标记并打上fail标记
- failing->flags |= CLUSTER_NODE_FAIL;
- failing->flags &= ~CLUSTER_NODE_PFAIL;
- else if type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD: #处理pub消息,直接pub
- pubsub.pubsubPublishMessage
- else if type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: #处理其他节点发来的要求故障转移投票的消息
- clusterSendFailoverAuthIfNeeded #就是先看能不能投票,如果可以就投票
- #要投一票则要求当前节点是master且slots数不为0
- #还要求投票请求的epoch>server.epoch
- #还要求投票开始时刻距今不能超过2倍节点超时时间
- #还要求申请者必须是slave,master必须fail
- #或者必须设置了manual failure标记
- #满足这些要求才会投出一票赞成票
- else if type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK #处理其他节点发来的对故障转移投票消息的投票结果
- if clusterNodeIsMaster(sender) && sender->numslots > 0 &&
- senderCurrentEpoch >= server.cluster->failover_auth_epoch
- {
- server.cluster->failover_auth_count++; #如果有master投赞成票则票数+1
- }
- else if type == CLUSTERMSG_TYPE_MFSTART: #处理MFSTART消息
- #即从节点上开始执行manual failure
- if !sender || sender->slaveof != myself:
- return 1;
- resetManualFailover();
- server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; #开始manual failure就是初始化mf_end
- server.cluster->mf_slave = sender; #在clusterCron里面就会检测到mf_end不为0,不为0就会开始mf过程
- pauseActions(PAUSE_DURING_FAILOVER);
- clusterSendPing(link, CLUSTERMSG_TYPE_PING); #send ping消息回去
- } else if (type == CLUSTERMSG_TYPE_UPDATE) {
- ...更新一些信息,略...
- } else if (type == CLUSTERMSG_TYPE_MODULE) {
- ...略...
- } else {
- serverLog(LL_WARNING,"Received unknown packet type: %d", type);
- }
复制代码 cluster failover命令:
failover命令会发给slave,然后slave会发送一个mfstart消息给指定节点
- slave流程:
- cluster.clusterCommand:
- cluster_legacy.clusterCommandSpecial
- if "failover":
- server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; #一旦mf_end不为0则表示本机开启了manual failover(简记为mf)
- cluster_legacy.clusterSendMFStart(myself->slaveof) #发给本节点对应的master,因为只有slave能接受客户端的mf命令
- clsuter_legacy.createClusterMsgSendBlock(CLUSTERMSG_TYPE_MFSTART) #创建mfstart消息
- cluster_legacy.clusterSendMessage(node->link,msgblock) #发给master节点
- -- -- - - - - >流程来到事件循环的clusterCron
- clsuter.clusterCron:
- if nodeIsSlave(myself):
- cluster_legacy.clusterHandleManualFailover
- if !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)
- cluster_legacy.clusterHandleSlaveFailover #处理failover事件
- manual_failover = server.cluster->mf_end != 0 &&server.cluster->mf_can_start #标记是否可以开始mf流程
- if (clusterNodeIsMaster(myself) ||
- myself->slaveof == NULL ||
- (!nodeFailed(myself->slaveof) && !manual_failover) || #如果mf为真则可跳过后两个if即可以继续往下执行
- (server.cluster_slave_no_failover && !manual_failover) || #如果mf为假则还需要判断是否要可以执行自动failover
- myself->slaveof->numslots == 0) #如果都是否的,那么这里就直接返回,就不执行failover
- {
-
- return
- }
- ...执行故障转移,略...
-
- master流程:
- cluster_legacy.clusterReadHandler: #收到其他节点发来的消息
- cluster_legacy.clusterProcessPacket
- if type==MFSTART: #如果是MFSTART,就设置mf_end表示mf开始,
- #后续在clsuterCron中检测到mf_end不为0,就会更频繁的ping slave
- server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
- pauseActions(PAUSE_DURING_FAILOVER); #先暂停对外服务
- clusterSendPing(link, CLUSTERMSG_TYPE_PING); #发送ping给slave
- -- - - - - >流程来到cluster_leagcy.clusterCron
- cluster_leagcy.clusterCron #clsuterCron中对mf处理比较简单,就是更频繁的ping一下slave
- if (server.cluster->mf_end &&
- clusterNodeIsMaster(myself) &&
- server.cluster->mf_slave == node &&
- node->link)
- {
- clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
- continue;
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |