Debezium-BinaryLogClient

打印 上一主题 下一主题

主题 819|帖子 819|积分 2457

文章目录

  



    • 概要
    • 核心流程
    • 技能名词解释
    • 技能细节
    • 小结

  
概要

BinaryLogClient类,用于毗连和监听 MySQL 服务器的二进制日志(binlog)
核心流程


技能名词解释

   ### GTID (Global Transaction Identifier) 明白
  #### 定义
GTID(Global Transaction Identifier)是 MySQL 从 5.6 版本开始引入的一种全局事务标识符。每个 GTID 在整个 MySQL 集群中都是唯一的,用于唯一标识一个事务。
  #### 格式
GTID 的格式通常为 `source_id:transaction_id`,此中:
- **source_id**:表示生成事务的 MySQL 实例的唯一标识符,通常是实例的 `server_id`。
- **transaction_id**:表示在该实例上实行的事务的顺序号。
  例如,`3E11FA47-71CA-11E1-9E33-C80AA9429562:23` 表示在 `server_id` 为 `3E11FA47-71CA-11E1-9E33-C80AA9429562` 的 MySQL 实例上实行的第 23 个事务。
  #### 重要用途
1. **事务跟踪**:
   - GTID 可以资助跟踪事务在主从复制中的传播环境。每个事务在主库上生成后,会被分配一个唯一的 GTID,并在从库上应用时保存相同的 GTID。
  2. **简化复制管理**:
   - 利用 GTID 可以简化复制配置和管理。例如,可以通过指定 GTID 范围来同步特定的事务,而不需要手动管理二进制日志文件和位置。
  3. **故障恢复**:
   - 在主从切换或故障恢复时,GTID 可以确保从库不会重复应用同一个事务,从而避免数据不一致的标题。
  4. **并行复制**:
   - GTID 支持并行复制,即多个线程可以同时应用差别的事务,进步复制性能。
  #### 配置
启用 GTID 复制需要在 MySQL 配置文件中设置以下参数:
- `gtid_mode=ON`:启用 GTID 模式。
- `enforce_gtid_consistency=ON`:逼迫 GTID 一致性,确保所有事务都可以被正确地跟踪和应用。
  #### 总结
GTID 是 MySQL 中用于唯一标识事务的全局标识符,有助于简化复制管理和故障恢复。通过 GTID,可以更方便地跟踪和管理事务在主从复制中的传播,确保数据的一致性和可靠性。
  技能细节

  1. /**
  2. * 建立与MySQL服务器的连接并初始化二进制日志复制所需设置。
  3. *
  4. * 该方法首先检查当前客户端是否已连接,如果已连接,则抛出IllegalStateException。
  5. * 如果未连接,它将尝试使用指定的主机名和端口连接到MySQL服务器。
  6. * 连接成功后,接收服务器的欢迎包,并进行身份验证。
  7. * 如果未指定二进制日志文件名,则获取二进制日志文件名和位置。
  8. * 检查并确认支持的校验和类型,请求二进制日志流。
  9. * 最后,通知生命周期监听器连接成功,并启动保持活动线程(如果启用)。
  10. * 监听事件包。
  11. */
  12. public void connect() throws IOException {
  13.     if (this.connected) {
  14.         throw new IllegalStateException("BinaryLogClient 已经连接");
  15.     } else {
  16.         GreetingPacket greetingPacket;
  17.         IOException e;
  18.         try {
  19.             try {
  20.                 // 创建Socket连接
  21.                 Socket socket = this.socketFactory != null ? this.socketFactory.createSocket() : new Socket();
  22.                 socket.connect(new InetSocketAddress(this.hostname, this.port));
  23.                 this.channel = new PacketChannel(socket);
  24.                 if (this.channel.getInputStream().peek() == -1) {
  25.                     throw new EOFException();
  26.                 }
  27.             } catch (IOException var7) {
  28.                 e = var7;
  29.                 throw new IOException("连接到 MySQL " + this.hostname + ":" + this.port + " 失败。请确保其正在运行。", e);
  30.             }
  31.             // 接收欢迎包并进行身份验证
  32.             greetingPacket = this.receiveGreeting();
  33.             this.authenticate(greetingPacket.getScramble(), greetingPacket.getServerCollation());
  34.             // 获取二进制日志文件名和位置
  35.             if (this.binlogFilename == null) {
  36.                 this.fetchBinlogFilenameAndPosition();
  37.             }
  38.             // 调整二进制日志位置
  39.             if (this.binlogPosition < 4L) {
  40.                 if (this.logger.isLoggable(Level.WARNING)) {
  41.                     this.logger.warning("二进制日志位置从 " + this.binlogPosition + " 调整为 " + 4);
  42.                 }
  43.                 this.binlogPosition = 4L;
  44.             }
  45.             // 获取并确认支持的校验和类型
  46.             ChecksumType checksumType = this.fetchBinlogChecksum();
  47.             if (checksumType != ChecksumType.NONE) {
  48.                 this.confirmSupportOfChecksum(checksumType);
  49.             }
  50.             // 请求二进制日志流
  51.             this.requestBinaryLogStream();
  52.         } catch (IOException var10) {
  53.             e = var10;
  54.             if (this.channel != null && this.channel.isOpen()) {
  55.                 this.channel.close();
  56.             }
  57.             throw e;
  58.         }
  59.         // 设置连接状态并记录日志
  60.         this.connected = true;
  61.         if (this.logger.isLoggable(Level.INFO)) {
  62.             this.logger.info("连接到 " + this.hostname + ":" + this.port + " at " + this.binlogFilename + "/" + this.binlogPosition + " (sid:" + this.serverId + ", cid:" + greetingPacket.getThreadId() + ")");
  63.         }
  64.         // 通知生命周期监听器连接成功
  65.         synchronized(this.lifecycleListeners) {
  66.             Iterator i$ = this.lifecycleListeners.iterator();
  67.             while(i$.hasNext()) {
  68.                 LifecycleListener lifecycleListener = (LifecycleListener)i$.next();
  69.                 lifecycleListener.onConnect(this);
  70.             }
  71.         }
  72.         // 启动保持活动线程(如果启用)
  73.         if (this.keepAlive && !this.isKeepAliveThreadRunning()) {
  74.             this.spawnKeepAliveThread();
  75.         }
  76.         // 确保事件数据反序列化器
  77.         this.ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
  78.         synchronized(this.gtidSetAccessLock) {
  79.             if (this.gtidSet != null) {
  80.                 this.ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
  81.             }
  82.         }
  83.         // 监听事件包
  84.         this.listenForEventPackets();
  85.     }
  86. }
复制代码
  1. /**
  2. * 监听事件数据包的方法
  3. * 该方法用于持续监听来自服务器的事件数据包,并进行相应的处理
  4. * 当检测到数据包时,会根据数据包的类型进行处理,包括错误处理和事件处理
  5. * 如果连接断开或者出现异常,将停止监听并进行相应的异常处理
  6. *
  7. * @throws IOException 如果在读取数据流时发生I/O错误
  8. */
  9. private void listenForEventPackets() throws IOException {
  10.     // 获取输入流,用于读取服务器发送的数据
  11.     ByteArrayInputStream inputStream = this.channel.getInputStream();
  12.     // 无限循环,持续监听事件数据包
  13.     label202:
  14.     while(true) {
  15.         try {
  16.             // 检查输入流是否有数据可读
  17.             if (inputStream.peek() != -1) {
  18.                 // 读取数据包长度
  19.                 int packetLength = inputStream.readInteger(3);
  20.                 // 跳过1字节的填充
  21.                 inputStream.skip(1L);
  22.                 // 读取标记字节,用于判断数据包类型
  23.                 int marker = inputStream.read();
  24.                 // 如果标记为255,表示接收到的是错误数据包
  25.                 if (marker == 255) {
  26.                     // 解析错误数据包并抛出异常
  27.                     ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
  28.                     throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), errorPacket.getSqlState());
  29.                 }
  30.                 // 解析事件数据包
  31.                 Event event;
  32.                 try {
  33.                     // 根据数据包长度决定是否需要分块读取
  34.                     event = this.eventDeserializer.nextEvent(packetLength == 16777215 ? new ByteArrayInputStream(this.readPacketSplitInChunks(inputStream, packetLength - 1)) : inputStream);
  35.                 } catch (Exception var20) {
  36.                     // 处理解析异常
  37.                     Exception e = var20;
  38.                     Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
  39.                     // 根据异常类型进行不同处理
  40.                     if (!(cause instanceof EOFException) && !(cause instanceof SocketException)) {
  41.                         if (!this.isConnected()) {
  42.                             continue;
  43.                         }
  44.                         // 通知生命周期监听器解包失败
  45.                         synchronized(this.lifecycleListeners) {
  46.                             Iterator i$ = this.lifecycleListeners.iterator();
  47.                             while(true) {
  48.                                 if (!i$.hasNext()) {
  49.                                     continue label202;
  50.                                 }
  51.                                 LifecycleListener lifecycleListener = (LifecycleListener)i$.next();
  52.                                 lifecycleListener.onEventDeserializationFailure(this, e);
  53.                             }
  54.                         }
  55.                     }
  56.                     throw e;
  57.                 }
  58.                 // 处理解析成功的事件
  59.                 if (this.isConnected()) {
  60.                     this.notifyEventListeners(event);
  61.                     this.updateClientBinlogFilenameAndPosition(event);
  62.                     this.updateGtidSet(event);
  63.                 }
  64.                 continue;
  65.             }
  66.         } catch (Exception var21) {
  67.             // 处理通信异常
  68.             Exception e = var21;
  69.             if (this.isConnected()) {
  70.                 synchronized(this.lifecycleListeners) {
  71.                     Iterator i$ = this.lifecycleListeners.iterator();
  72.                     while(i$.hasNext()) {
  73.                         LifecycleListener lifecycleListener = (LifecycleListener)i$.next();
  74.                         lifecycleListener.onCommunicationFailure(this, e);
  75.                     }
  76.                 }
  77.             }
  78.         } finally {
  79.             // 确保在结束监听时断开连接
  80.             if (this.isConnected()) {
  81.                 this.disconnectChannel();
  82.             }
  83.         }
  84.         // 结束方法
  85.         return;
  86.     }
  87. }
复制代码
  1. /**
  2. * 通知事件监听器
  3. * 当有事件发生时,此方法会被调用以通知所有注册的事件监听器
  4. * 如果事件的数据是EventDataWrapper类型,则会用外部事件数据替换事件数据
  5. *
  6. * @param event 发生的事件,用于通知监听器
  7. */
  8. private void notifyEventListeners(Event event) {
  9.     // 检查事件数据是否为EventDataWrapper类型,如果是,则用外部事件数据替换事件数据
  10.     if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
  11.         event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper)event.getData()).getExternal());
  12.     }
  13.     // 同步eventListeners以确保线程安全
  14.     synchronized(this.eventListeners) {
  15.         // 遍历所有事件监听器
  16.         Iterator i$ = this.eventListeners.iterator();
  17.         while(i$.hasNext()) {
  18.             EventListener eventListener = (EventListener)i$.next();
  19.             try {
  20.                 // 通知事件监听器处理事件
  21.                 eventListener.onEvent(event);
  22.             } catch (Exception var7) {
  23.                 Exception e = var7;
  24.                 // 如果日志级别为WARNING,记录异常信息
  25.                 if (this.logger.isLoggable(Level.WARNING)) {
  26.                     this.logger.log(Level.WARNING, eventListener + " choked on " + event, e);
  27.                 }
  28.             }
  29.         }
  30.     }
  31. }
复制代码
  1. /**
  2. * 将事件添加到队列中以进行后续批量处理。
  3. *
  4. * @param event 从二进制日志中读取的事件
  5. */
  6. protected void enqueue(Event event) {
  7.     // 检查事件是否为空,避免空指针异常
  8.     if (event != null) {
  9.         try {
  10.             // 将事件放入队列中
  11.             events.put(event);
  12.         } catch (InterruptedException e) {
  13.             // 处理中断异常,恢复中断状态并抛出连接异常
  14.             Thread.interrupted();
  15.             throw new ConnectException("在等待将事件添加到队列时被中断", e);
  16.         }
  17.     }
  18. }
复制代码
  1. /**
  2. * 覆盖 poll 方法以从 MySQL 服务器获取并处理事件。
  3. * 该方法会持续轮询事件,处理这些事件,并返回处理后的记录列表。
  4. *
  5. * @return 处理后的 SourceRecord 列表
  6. */
  7. @Override
  8. public List<SourceRecord> poll() throws InterruptedException {
  9.     logger.trace("从 MySQL 服务器 '{}' 轮询事件", serverName);
  10.     while (running.get() && (events.drainTo(batchEvents, maxBatchSize - batchEvents.size()) == 0 || batchEvents.isEmpty())) {
  11.         // 没有事件需要处理,因此暂停一段时间 ...
  12.         metronome.pause();
  13.     }
  14.     logger.trace("准备从 MySQL 服务器 '{}' 处理 {} 个事件", events.size(), serverName);
  15.     // 至少有一些记录需要处理 ...
  16.     List<SourceRecord> records = new ArrayList<>(batchEvents.size());
  17.     while (!batchEvents.isEmpty()) {
  18.         Event event = batchEvents.poll();
  19.         if (event == null) continue;
  20.         // 更新源偏移信息 ...
  21.         EventHeader eventHeader = event.getHeader();
  22.         EventType eventType = eventHeader.getEventType();
  23.         if (eventType == EventType.ROTATE) {
  24.             EventData eventData = event.getData();
  25.             RotateEventData rotateEventData;
  26.             if (eventData instanceof EventDeserializer.EventDataWrapper) {
  27.                 rotateEventData = (RotateEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
  28.             } else {
  29.                 rotateEventData = (RotateEventData) eventData;
  30.             }
  31.             source.setBinlogFilename(rotateEventData.getBinlogFilename());
  32.             source.setBinlogPosition(rotateEventData.getBinlogPosition());
  33.             source.setRowInEvent(0);
  34.         } else if (eventHeader instanceof EventHeaderV4) {
  35.             EventHeaderV4 trackableEventHeader = (EventHeaderV4) eventHeader;
  36.             long nextBinlogPosition = trackableEventHeader.getNextPosition();
  37.             if (nextBinlogPosition > 0) {
  38.                 source.setBinlogPosition(nextBinlogPosition);
  39.                 source.setRowInEvent(0);
  40.             }
  41.         }
  42.         
  43.         if (!running.get()) break;
  44.         // 如果有处理此事件的处理器,将事件转发给它 ...
  45.         EventHandler handler = eventHandlers.get(eventType);
  46.         if (handler != null) {
  47.             handler.handle(event, source, records::add);
  48.         }
  49.     }
  50.     logger.trace("完成从 MySQL 服务器 '{}' 处理 {} 个事件", serverName);
  51.     if (!this.running.get()) {
  52.         // 应该停止,因此返回已经处理的记录,以防止在 DB 历史已停止的情况下持久化记录 ...
  53.         return null;
  54.     }
  55.     // 已经处理完所有事件,清空批处理队列并返回记录 ...
  56.     assert batchEvents.isEmpty();
  57.     return records;
  58. }
复制代码
小结

   ### Debezium 监听和处理 Binlog 事故的简要总结
  1. **轮询事故**:
   - `poll` 方法通过 `logger.trace` 记载开始从 MySQL 服务器轮询事故。
   - 利用 `while` 循环不断查抄是否有新的事故需要处理。如果没有事故,调用 `metronome.pause()` 停息一段时间。
  2. **准备事故**:
   - 当有事故可用时,记载准备处理的事故数目。
   - 创建一个 `List<SourceRecord>` 来存储处理后的记载。
  3. **处理事故**:
   - 从 `batchEvents` 队列中取失事故并举行处理。
   - 根据事故范例更新源偏移信息:
     - 对于 `ROTATE` 事故,更新二进制日志文件名和位置。
     - 对于其他范例的事故,更新二进制日志位置。
   - 如果有相应的事故处理器,调用 `handler.handle` 方法处理事故并将结果添加到 `records` 列表中。
  4. **制止处理**:
   - 如果 `running` 标志为 `false`,表示应该制止处理,返回 `null` 以防止在 DB 汗青已制止的环境下长期化记载。
  5. **返回结果**:
   - 清空 `batchEvents` 队列,确保所有事故都已处理完毕。
   - 返回处理后的 `records` 列表。
  ### 关键步骤总结
- **轮询和等待**:通过循环和停息机制等待新事故。
- **事故处理**:根据事故范例更新偏移信息,并调用相应的处理器处理事故。
- **制止机制**:在需要制止时返回 `null`,避免不须要的记载长期化。
- **结果返回**:清空批处理队列并返回处理后的记载列表。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表