mongodb 数据块迁移的源码分析

打印 上一主题 下一主题

主题 846|帖子 846|积分 2538

1. 简介

上一篇我们聊到了mongodb数据块的基本概念,和数据块迁移的主要流程,这篇文章我们聊聊源码实现部分。
2. 迁移序列图

数据块迁移的请求是从配置服务器(config server)发给(donor,捐献方),再有捐献方发起迁移请求给目标节点(recipient,接收方),后续迁移由捐献方和接收方配合完成。
数据迁移结束时,捐献方再提交迁移结果给配置服务器,三方交互序列图如下:

 
可以看到,序列图中的5个步骤,是对应前面文章的迁移流程中的5个步骤,其中接收方的流程控制代码在migration_destination_manager.cpp中的_migrateDriver方法中,捐献方的流程控制代码在donor的move_chunk_command.cpp中的_runImpl方法中完成,代码如下:
  1. static void _runImpl(OperationContext* opCtx, const MoveChunkRequest& moveChunkRequest) {
  2.         const auto writeConcernForRangeDeleter =
  3.             uassertStatusOK(ChunkMoveWriteConcernOptions::getEffectiveWriteConcern(
  4.                 opCtx, moveChunkRequest.getSecondaryThrottle()));
  5.         // Resolve the donor and recipient shards and their connection string
  6.         auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
  7.         // 准备donor和recipient的连接
  8.         const auto donorConnStr =
  9.             uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getFromShardId()))
  10.                 ->getConnString();
  11.         const auto recipientHost = uassertStatusOK([&] {
  12.             auto recipientShard =
  13.                 uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId()));
  14.             return recipientShard->getTargeter()->findHost(
  15.                 opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
  16.         }());
  17.         std::string unusedErrMsg;
  18.         // 用于统计每一步的耗时情况
  19.         MoveTimingHelper moveTimingHelper(opCtx,
  20.                                           "from",
  21.                                           moveChunkRequest.getNss().ns(),
  22.                                           moveChunkRequest.getMinKey(),
  23.                                           moveChunkRequest.getMaxKey(),
  24.                                           6,  // Total number of steps
  25.                                           &unusedErrMsg,
  26.                                           moveChunkRequest.getToShardId(),
  27.                                           moveChunkRequest.getFromShardId());
  28.         moveTimingHelper.done(1);
  29.         moveChunkHangAtStep1.pauseWhileSet();
  30.         if (moveChunkRequest.getFromShardId() == moveChunkRequest.getToShardId()) {
  31.             // TODO: SERVER-46669 handle wait for delete.
  32.             return;
  33.         }
  34.         // 构建迁移任务管理器
  35.         MigrationSourceManager migrationSourceManager(
  36.             opCtx, moveChunkRequest, donorConnStr, recipientHost);
  37.         moveTimingHelper.done(2);
  38.         moveChunkHangAtStep2.pauseWhileSet();
  39.         // 向接收方发送迁移命令
  40.         uassertStatusOKWithWarning(migrationSourceManager.startClone());
  41.         moveTimingHelper.done(3);
  42.         moveChunkHangAtStep3.pauseWhileSet();
  43.         // 等待块数据和变更数据都拷贝完成
  44.         uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp());
  45.         moveTimingHelper.done(4);
  46.         moveChunkHangAtStep4.pauseWhileSet();
  47.         // 进入临界区
  48.         uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection());
  49.         // 通知接收方
  50.         uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient());
  51.         moveTimingHelper.done(5);
  52.         moveChunkHangAtStep5.pauseWhileSet();
  53.         // 在配置服务器提交分块元数据信息
  54.         uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig());
  55.         moveTimingHelper.done(6);
  56.         moveChunkHangAtStep6.pauseWhileSet();
  57.     }
复制代码
下面对每一个步骤的代码做分析。
3. 各步骤源码分析

3.1 启动迁移( _recvChunkStart)

 
在启动阶段,捐献方主要做了三件事:
1. 参数检查,在MigrationSourceManager 构造函数中完成,不再赘述。
2. 注册监听器,用于记录在迁移期间该数据块内发生的变更数据,代码如下:
3. 向接收方发送迁移命令_recvChunkStart。
步骤2和3的代码实现在一个方法中,如下:
  1. Status MigrationSourceManager::startClone() {
  2.     ...// 省略了部分代码
  3.     _cloneAndCommitTimer.reset();
  4.     auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
  5.     auto replEnabled = replCoord->isReplEnabled();
  6.     {
  7.         const auto metadata = _getCurrentMetadataAndCheckEpoch();
  8.         // Having the metadata manager registered on the collection sharding state is what indicates
  9.         // that a chunk on that collection is being migrated. With an active migration, write
  10.         // operations require the cloner to be present in order to track changes to the chunk which
  11.         // needs to be transmitted to the recipient.
  12.         // 注册监听器,_cloneDriver除了迁移数据外,还会用于记录在迁移过程中该数据块增量变化的数据(比如新增的数据)
  13.         _cloneDriver = std::make_unique<MigrationChunkClonerSourceLegacy>(
  14.             _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost);
  15.         AutoGetCollection autoColl(_opCtx,
  16.                                    getNss(),
  17.                                    replEnabled ? MODE_IX : MODE_X,
  18.                                    AutoGetCollectionViewMode::kViewsForbidden,
  19.                                    _opCtx->getServiceContext()->getPreciseClockSource()->now() +
  20.                                        Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
  21.         auto csr = CollectionShardingRuntime::get(_opCtx, getNss());
  22.         auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
  23.         invariant(nullptr == std::exchange(msmForCsr(csr), this));
  24.         _coordinator = std::make_unique<migrationutil::MigrationCoordinator>(
  25.             _cloneDriver->getSessionId(),
  26.             _args.getFromShardId(),
  27.             _args.getToShardId(),
  28.             getNss(),
  29.             *_collectionUUID,
  30.             ChunkRange(_args.getMinKey(), _args.getMaxKey()),
  31.             _chunkVersion,
  32.             _args.getWaitForDelete());
  33.         _state = kCloning;
  34.     }
  35.     if (replEnabled) {
  36.         auto const readConcernArgs = repl::ReadConcernArgs(
  37.             replCoord->getMyLastAppliedOpTime(), repl::ReadConcernLevel::kLocalReadConcern);
  38.         // 检查当前节点状态是否满足repl::ReadConcernLevel::kLocalReadConcern
  39.         auto waitForReadConcernStatus =
  40.             waitForReadConcern(_opCtx, readConcernArgs, StringData(), false);
  41.         if (!waitForReadConcernStatus.isOK()) {
  42.             return waitForReadConcernStatus;
  43.         }
  44.         setPrepareConflictBehaviorForReadConcern(
  45.             _opCtx, readConcernArgs, PrepareConflictBehavior::kEnforce);
  46.     }
  47.     _coordinator->startMigration(_opCtx);
  48.     // 向接收方发送开始拷贝数据的命令(_recvChunkStart)
  49.     Status startCloneStatus = _cloneDriver->startClone(_opCtx,
  50.                                                        _coordinator->getMigrationId(),
  51.                                                        _coordinator->getLsid(),
  52.                                                        _coordinator->getTxnNumber());
  53.     if (!startCloneStatus.isOK()) {
  54.         return startCloneStatus;
  55.     }
  56.     scopedGuard.dismiss();
  57.     return Status::OK();
  58. }
复制代码
 
接收方在收到迁移请求后,会先检查本地是否有该表,如果没有的话,会先建表会创建表的索引:
  1. void MigrationDestinationManager::cloneCollectionIndexesAndOptions(
  2.     OperationContext* opCtx,
  3.     const NamespaceString& nss,
  4.     const CollectionOptionsAndIndexes& collectionOptionsAndIndexes) {
  5.     {
  6.         // 1. Create the collection (if it doesn't already exist) and create any indexes we are
  7.         // missing (auto-heal indexes).
  8.         ...// 省略部分代码
  9.         {
  10.             AutoGetCollection collection(opCtx, nss, MODE_IS);
  11.             // 如果存在表,且不缺索引,则退出
  12.             if (collection) {
  13.                 checkUUIDsMatch(collection.getCollection());
  14.                 auto indexSpecs =
  15.                     checkEmptyOrGetMissingIndexesFromDonor(collection.getCollection());
  16.                 if (indexSpecs.empty()) {
  17.                     return;
  18.                 }
  19.             }
  20.         }
  21.         // Take the exclusive database lock if the collection does not exist or indexes are missing
  22.         // (needs auto-heal).
  23.         // 建表时,需要对数据库加锁
  24.         AutoGetDb autoDb(opCtx, nss.db(), MODE_X);
  25.         auto db = autoDb.ensureDbExists();
  26.         auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
  27.         if (collection) {
  28.             checkUUIDsMatch(collection);
  29.         } else {
  30.             ...// 省略部分代码// We do not have a collection by this name. Create the collection with the donor's
  31.             // options.
  32.             // 建表
  33.             OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE
  34.                 unsafeCreateCollection(opCtx);
  35.             WriteUnitOfWork wuow(opCtx);
  36.             CollectionOptions collectionOptions = uassertStatusOK(
  37.                 CollectionOptions::parse(collectionOptionsAndIndexes.options,
  38.                                          CollectionOptions::ParseKind::parseForStorage));
  39.             const bool createDefaultIndexes = true;
  40.             uassertStatusOK(db->userCreateNS(opCtx,
  41.                                              nss,
  42.                                              collectionOptions,
  43.                                              createDefaultIndexes,
  44.                                              collectionOptionsAndIndexes.idIndexSpec));
  45.             wuow.commit();
  46.             collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
  47.         }
  48.         // 创建对应的索引
  49.         auto indexSpecs = checkEmptyOrGetMissingIndexesFromDonor(collection);
  50.         if (!indexSpecs.empty()) {
  51.             WriteUnitOfWork wunit(opCtx);
  52.             auto fromMigrate = true;
  53.             CollectionWriter collWriter(opCtx, collection->uuid());
  54.             IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection(
  55.                 opCtx, collWriter, indexSpecs, fromMigrate);
  56.             wunit.commit();
  57.         }
  58.     }
  59. }
复制代码
 
3.2 接收方拉取存量数据( _migrateClone)

接收方的拉取存量数据时,做了六件事情:
1. 定义了一个批量插入记录的方法。
2. 定义了一个批量拉取数据的方法。
3. 定义生产者和消费队列。
4. 启动数据写入线程,该线程会消费队列中的数据,并调用批量插入记录的方法把记录保存到本地。
5. 循环向捐献方发起拉取数据请求(步骤2的方法),并写入步骤3的队列中。
6. 数据拉取结束后(写入空记录到队列中,触发步骤5结束),则同步等待步骤5的线程也结束。
详细代码如下:
  1. // 1. 定义批量写入函数
  2.         auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
  3.             auto it = arr.begin();
  4.             while (it != arr.end()) {
  5.                 int batchNumCloned = 0;
  6.                 int batchClonedBytes = 0;
  7.                 const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
  8.                 assertNotAborted(opCtx);
  9.                 write_ops::InsertCommandRequest insertOp(_nss);
  10.                 insertOp.getWriteCommandRequestBase().setOrdered(true);
  11.                 insertOp.setDocuments([&] {
  12.                     std::vector<BSONObj> toInsert;
  13.                     while (it != arr.end() &&
  14.                            (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
  15.                         const auto& doc = *it;
  16.                         BSONObj docToClone = doc.Obj();
  17.                         toInsert.push_back(docToClone);
  18.                         batchNumCloned++;
  19.                         batchClonedBytes += docToClone.objsize();
  20.                         ++it;
  21.                     }
  22.                     return toInsert;
  23.                 }());
  24.                 const auto reply =
  25.                     write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate);
  26.                 for (unsigned long i = 0; i < reply.results.size(); ++i) {
  27.                     uassertStatusOKWithContext(
  28.                         reply.results[i],
  29.                         str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
  30.                 }
  31.                 {
  32.                     stdx::lock_guard<Latch> statsLock(_mutex);
  33.                     _numCloned += batchNumCloned;
  34.                     ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(
  35.                         batchNumCloned);
  36.                     _clonedBytes += batchClonedBytes;
  37.                 }
  38.                 if (_writeConcern.needToWaitForOtherNodes()) {
  39.                     runWithoutSession(outerOpCtx, [&] {
  40.                         repl::ReplicationCoordinator::StatusAndDuration replStatus =
  41.                             repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
  42.                                 opCtx,
  43.                                 repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
  44.                                 _writeConcern);
  45.                         if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
  46.                             LOGV2_WARNING(
  47.                                 22011,
  48.                                 "secondaryThrottle on, but doc insert timed out; continuing",
  49.                                 "migrationId"_attr = _migrationId->toBSON());
  50.                         } else {
  51.                             uassertStatusOK(replStatus.status);
  52.                         }
  53.                     });
  54.                 }
  55.                 sleepmillis(migrateCloneInsertionBatchDelayMS.load());
  56.             }
  57.         };
  58.         // 2. 定义批量拉取函数
  59.         auto fetchBatchFn = [&](OperationContext* opCtx) {
  60.             auto res = uassertStatusOKWithContext(
  61.                 fromShard->runCommand(opCtx,
  62.                                       ReadPreferenceSetting(ReadPreference::PrimaryOnly),
  63.                                       "admin",
  64.                                       migrateCloneRequest,
  65.                                       Shard::RetryPolicy::kNoRetry),
  66.                 "_migrateClone failed: ");
  67.             uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(res),
  68.                                        "_migrateClone failed: ");
  69.             return res.response;
  70.         };
  71. SingleProducerSingleConsumerQueue<BSONObj>::Options options;
  72.     options.maxQueueDepth = 1;
  73.     // 3. 使用生产者和消费者队列来把同步的数据写入到本地
  74.     SingleProducerSingleConsumerQueue<BSONObj> batches(options);
  75.     repl::OpTime lastOpApplied;
  76.     // 4. 定义写数据线程,该线程会读取队列中的数据并写入本地节点,直到无需要同步的数据时线程退出
  77.     stdx::thread inserterThread{[&] {
  78.         Client::initThread("chunkInserter", opCtx->getServiceContext(), nullptr);
  79.         auto client = Client::getCurrent();
  80.         {
  81.             stdx::lock_guard lk(*client);
  82.             client->setSystemOperationKillableByStepdown(lk);
  83.         }
  84.         auto executor =
  85.             Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
  86.         auto inserterOpCtx = CancelableOperationContext(
  87.             cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
  88.         auto consumerGuard = makeGuard([&] {
  89.             batches.closeConsumerEnd();
  90.             lastOpApplied = repl::ReplClientInfo::forClient(inserterOpCtx->getClient()).getLastOp();
  91.         });
  92.         try {
  93.             while (true) {
  94.                 auto nextBatch = batches.pop(inserterOpCtx.get());
  95.                 auto arr = nextBatch["objects"].Obj();
  96.                 if (arr.isEmpty()) {
  97.                     return;
  98.                 }
  99.                 insertBatchFn(inserterOpCtx.get(), arr);
  100.             }
  101.         } catch (...) {
  102.             stdx::lock_guard<Client> lk(*opCtx->getClient());
  103.             opCtx->getServiceContext()->killOperation(lk, opCtx, ErrorCodes::Error(51008));
  104.             LOGV2(21999,
  105.                   "Batch insertion failed: {error}",
  106.                   "Batch insertion failed",
  107.                   "error"_attr = redact(exceptionToStatus()));
  108.         }
  109.     }};
  110.     {
  111.         //6.  makeGuard的作用是延迟执行inserterThread.join()
  112.         auto inserterThreadJoinGuard = makeGuard([&] {
  113.             batches.closeProducerEnd();
  114.             inserterThread.join();
  115.         });
  116.         // 5. 向捐献方发起拉取请求,并把数据写入队列中
  117.         while (true) {
  118.             auto res = fetchBatchFn(opCtx);
  119.             try {
  120.                 batches.push(res.getOwned(), opCtx);
  121.                 auto arr = res["objects"].Obj();
  122.                 if (arr.isEmpty()) {
  123.                     break;
  124.                 }
  125.             } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) {
  126.                 break;
  127.             }
  128.         }
  129.     }  // This scope ensures that the guard is destroyed
复制代码
3.3 接收方拉取变更数据( _recvChunkStart)

在本步骤,接收方会再拉取变更数据,即在前面迁移过程中,捐献方上发生的针对该数据块的写入、更新和删除的记录,代码如下:
  1. // 同步变更数据(_transferMods)
  2.     const BSONObj xferModsRequest = createTransferModsRequest(_nss, *_sessionId);
  3.     {
  4.         // 5. Do bulk of mods
  5.         // 5. 批量拉取变更数据,循环拉取,直至无变更数据
  6.         _setState(CATCHUP);
  7.         while (true) {
  8.             auto res = uassertStatusOKWithContext(
  9.                 fromShard->runCommand(opCtx,
  10.                                       ReadPreferenceSetting(ReadPreference::PrimaryOnly),
  11.                                       "admin",
  12.                                       xferModsRequest,
  13.                                       Shard::RetryPolicy::kNoRetry),
  14.                 "_transferMods failed: ");
  15.             uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(res),
  16.                                        "_transferMods failed: ");
  17.             const auto& mods = res.response;
  18.             if (mods["size"].number() == 0) {
  19.                 // There are no more pending modifications to be applied. End the catchup phase
  20.                 // 无变更数据时,停止循环
  21.                 break;
  22.             }
  23.             // 应用拉取到的变更数据
  24.             if (!_applyMigrateOp(opCtx, mods, &lastOpApplied)) {
  25.                 continue;
  26.             }
  27.             const int maxIterations = 3600 * 50;
  28.             // 等待从节点完成数据同步
  29.             int i;
  30.             for (i = 0; i < maxIterations; i++) {
  31.                 opCtx->checkForInterrupt();
  32.                 outerOpCtx->checkForInterrupt();
  33.                 if (getState() == ABORT) {
  34.                     LOGV2(22002,
  35.                           "Migration aborted while waiting for replication at catch up stage",
  36.                           "migrationId"_attr = _migrationId->toBSON());
  37.                     return;
  38.                 }
  39.                 if (runWithoutSession(outerOpCtx, [&] {
  40.                         return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern);
  41.                     })) {
  42.                     break;
  43.                 }
  44.                 if (i > 100) {
  45.                     LOGV2(22003,
  46.                           "secondaries having hard time keeping up with migrate",
  47.                           "migrationId"_attr = _migrationId->toBSON());
  48.                 }
  49.                 sleepmillis(20);
  50.             }
  51.             if (i == maxIterations) {
  52.                 _setStateFail("secondary can't keep up with migrate");
  53.                 return;
  54.             }
  55.         }
  56.         timing.done(5);
  57.         migrateThreadHangAtStep5.pauseWhileSet();
  58.     }
复制代码
变更数据拉取结束,就进入等待捐献方进入临界区,在临界区内,捐献方会阻塞写入请求,因此在未进入临界区前,仍然需要拉取变更数据:
  1.         // 6. Wait for commit
  2.         // 6. 等待donor进入临界区
  3.         _setState(STEADY);
  4.         bool transferAfterCommit = false;
  5.         while (getState() == STEADY || getState() == COMMIT_START) {
  6.             opCtx->checkForInterrupt();
  7.             outerOpCtx->checkForInterrupt();
  8.             // Make sure we do at least one transfer after recv'ing the commit message. If we
  9.             // aren't sure that at least one transfer happens *after* our state changes to
  10.             // COMMIT_START, there could be mods still on the FROM shard that got logged
  11.             // *after* our _transferMods but *before* the critical section.
  12.             if (getState() == COMMIT_START) {
  13.                 transferAfterCommit = true;
  14.             }
  15.             auto res = uassertStatusOKWithContext(
  16.                 fromShard->runCommand(opCtx,
  17.                                       ReadPreferenceSetting(ReadPreference::PrimaryOnly),
  18.                                       "admin",
  19.                                       xferModsRequest,
  20.                                       Shard::RetryPolicy::kNoRetry),
  21.                 "_transferMods failed in STEADY STATE: ");
  22.             uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(res),
  23.                                        "_transferMods failed in STEADY STATE: ");
  24.             auto mods = res.response;
  25.             // 如果请求到变更数据,则应用到本地,并继续请求变更数据,直到所有变更数据都迁移结束
  26.             if (mods["size"].number() > 0 && _applyMigrateOp(opCtx, mods, &lastOpApplied)) {
  27.                 continue;
  28.             }
  29.             if (getState() == ABORT) {
  30.                 LOGV2(22006,
  31.                       "Migration aborted while transferring mods",
  32.                       "migrationId"_attr = _migrationId->toBSON());
  33.                 return;
  34.             }
  35.             // We know we're finished when:
  36.             // 1) The from side has told us that it has locked writes (COMMIT_START)
  37.             // 2) We've checked at least one more time for un-transmitted mods
  38.             // 检查transferAfterCommit的原因:进入COMMIT_START(临界区)后,需要再拉取一次变更数据
  39.             if (getState() == COMMIT_START && transferAfterCommit == true) {
  40.                 // 检查所有数据同步到从节点后,数据迁移流程结束
  41.                 if (runWithoutSession(outerOpCtx,
  42.                                       [&] { return _flushPendingWrites(opCtx, lastOpApplied); })) {
  43.                     break;
  44.                 }
  45.             }
  46.             // Only sleep if we aren't committing
  47.             if (getState() == STEADY)
  48.                 sleepmillis(10);
  49.         }
复制代码
 
3.4 进入临界区( _recvChunkStatus,_recvChunkCommit)

在该步骤,捐献方主要做了三件事:
1. 等待接收方完成数据同步(_recvChunkStatus)。
2. 标记本节点进入临界区,阻塞写操作。
3. 通知接收方进入临界区(_recvChunkCommit)。
相关代码如下:
  1. Status MigrationSourceManager::awaitToCatchUp() {
  2.     invariant(!_opCtx->lockState()->isLocked());
  3.     invariant(_state == kCloning);
  4.     auto scopedGuard = makeGuard([&] { cleanupOnError(); });
  5.     _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
  6.     _cloneAndCommitTimer.reset();
  7.     // Block until the cloner deems it appropriate to enter the critical section.
  8.     // 等待数据拷贝完成,这里会向接收方发送_recvChunkStatus,检查接收方的状态是否是STEADY
  9.     Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate(
  10.         _opCtx, kMaxWaitToEnterCriticalSectionTimeout);
  11.     if (!catchUpStatus.isOK()) {
  12.         return catchUpStatus;
  13.     }
  14.     _state = kCloneCaughtUp;
  15.     scopedGuard.dismiss();
  16.     return Status::OK();
  17. }
  18. <br>// 进入临界区
  19. Status MigrationSourceManager::enterCriticalSection() {
  20.     ...// 省略部分代码<br>     // 标记进入临界区,后续更新类操作会被阻塞(通过ShardingMigrationCriticalSection::getSignal()检查该标记)
  21.     _critSec.emplace(_opCtx, _args.getNss(), _critSecReason);
  22.     _state = kCriticalSection;
  23.     // Persist a signal to secondaries that we've entered the critical section. This is will cause
  24.     // secondaries to refresh their routing table when next accessed, which will block behind the
  25.     // critical section. This ensures causal consistency by preventing a stale mongos with a cluster
  26.     // time inclusive of the migration config commit update from accessing secondary data.
  27.     // Note: this write must occur after the critSec flag is set, to ensure the secondary refresh
  28.     // will stall behind the flag.
  29.     // 通知从节点此时主节点已进入临界区,如果有数据访问时要刷新路由信息(保证因果一致性)
  30.     Status signalStatus = shardmetadatautil::updateShardCollectionsEntry(
  31.         _opCtx,
  32.         BSON(ShardCollectionType::kNssFieldName << getNss().ns()),
  33.         BSON("$inc" << BSON(ShardCollectionType::kEnterCriticalSectionCounterFieldName << 1)),
  34.         false /*upsert*/);
  35.     if (!signalStatus.isOK()) {
  36.         return {
  37.             ErrorCodes::OperationFailed,
  38.             str::stream() << "Failed to persist critical section signal for secondaries due to: "
  39.                           << signalStatus.toString()};
  40.     }
  41.     LOGV2(22017,
  42.           "Migration successfully entered critical section",
  43.           "migrationId"_attr = _coordinator->getMigrationId());
  44.     scopedGuard.dismiss();
  45.     return Status::OK();
  46. }<br><br>
复制代码
 
4. 小结

至此,mongodb的数据块迁移的源代码基本分析完毕,这里补充一下监听变更数据的代码实现。
前面有提到监听变更数据是由_cloneDriver完成的,下面看下_cloneDriver的接口定义:
  1. [/code][code]
复制代码
[code][/code] 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表