Flink CDC 使用实践以及遇到的问题

打印 上一主题 下一主题

主题 848|帖子 848|积分 2544

背景

近来公司在做一些业务上的架构调整,有一部门是数据从mysql采集到Starrocks,之前的一套方法是走 debezium 到 puslar 到 starrocks,这一套下来比较需要设置很多东西,而且出现问题以后,需要修改很多设置,而且现阶段问题比较多,且采集的是全部线上的数据库,维护起来很费劲。
于是我们举行了采集的数据流调整,使用 Flink CDC这一套,这一套 是端到端的,且采取设置化的方式,支持schema的变动,无需再多一层中间存储层。
终极设置

关于flink cdc的使用设置可以参考Flink CDC 源码分析–整体流程,我能这里只贴出来我们终极使用的设置:
  1. source:
  2.   type: mysql
  3.   name: Database mysql to Data warehouse
  4.   hostname: xxxx
  5.   port: 3306
  6.   username: xxx
  7.   password: xxx
  8.   tables:   db1.table1
  9.   server-id: 556401-556500
  10.   scan.startup.mode: initial
  11.   scan.snapshot.fetch.size: 8096
  12.   scan.incremental.snapshot.chunk.size: 16192
  13.   debezium.max.queue.size: 162580
  14.   debezium.max.batch.size: 40960
  15.   debezium.poll.interval.ms: 50
  16.   scan.only.deserialize.captured.tables.changelog.enabled: true
  17.   scan.parallel-deserialize-changelog.enabled: true
  18.   heartbeat.interval: 5s
  19.   scan.newly-added-table.enabled: true
  20. sink:
  21.   type: starrocks
  22.   name: StarRocks Sink
  23.   jdbc-url: xxx
  24.   load-url: xxx
  25.   username: xxx
  26.   password: xxx
  27.   sink.buffer-flush.interval-ms: 5000
  28.   table.create.properties.replication_num: 3
  29.   table.create.num-buckets: 3
  30. route:
  31.   - source-table: db1.\.*
  32.     sink-table: db1.<>
  33.     replace-symbol: <>
  34.     description: route all tables to starrrocks
  35. pipeline:
  36.   name: Sync mysql Database to StarRocks
  37.   parallelism: 1
  38.   schema.change.behavior: EVOLVE
复制代码
遇到的问题


  • EventHeaderV4反序列化问题
    报错如下:
    1.   Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1732257303000, eventType=WRITE_ROWS, serverId=28555270, headerLength=19, dataLength=320, nextPosition=383299196, flags=0}
    2.           at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1718)
    3.           ... 5 more
    4.   Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1732257303000, eventType=WRITE_ROWS, serverId=28555270, headerLength=19, dataLength=320, nextPosition=383299196, flags=0}
    5.           at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:358)
    6.           at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:252)
    7.           at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:388)
    8.           at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1187)
    9.           ... 3 more
    10.   Caused by: java.io.EOFException: Failed to read remaining 28 of 36 bytes from position 258280448. Block length: 183. Initial block length: 316.
    11.           at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.fill(ByteArrayInputStream.java:115)
    12.           at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:105)
    13.           at io.debezium.connector.mysql.RowDeserializers.deserializeVarString(RowDeserializers.java:264)
    14.           at io.debezium.connector.mysql.RowDeserializers$WriteRowsDeserializer.deserializeVarString(RowDeserializers.java:192)
    15.           at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataDeserializer.java:189)
    16.           at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:143)
    17.           at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:75)
    18.           at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:65)
    19.           at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:38)
    20.           at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:352)
    21.           ... 6 more
    复制代码
    过段时间自己恢复
    这个征象比较诡异,过段时间就自己恢复了,目前怀疑的点:

    • mysql毗连数和带宽问题
    • msyql服务端的设置问题,可以参考Flink CDC FAQ
      1. mysql> set global slave_net_timeout = 120;
      2. mysql> set global thread_pool_idle_timeout = 120;
      复制代码
    • 作业反压导致,参考阿里云Flink
      1. execution.checkpointing.interval=10min
      2. execution.checkpointing.tolerable-failed-checkpoints=100
      3. debezium.connect.keep.alive.interval.ms = 40000
      复制代码

  • Starrocks Be 内存受限
    1.    java.lang.RuntimeException: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: shoufuyou_fund, table: fund_common_repay_push, label: flink-4c6c8cfb-5116-4c38-a60e-a1b87cd6f2f2,
    2.    responseBody: {
    3.        "Status": "MEM_LIMIT_EXCEEDED",
    4.        "Message": "Memory of process exceed limit. QUERY Backend: 172.17.172.251, fragment: 9048ed6e-6ffb-04db-081b-a4966b179387 Used: 26469550752, Limit: 26316804096. Mem usage has exceed the limit of BE"
    5.    }
    6.    errorLog: null
    7.            at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.AssertNotException(StreamLoadManagerV2.java:427)
    8.            at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.write(StreamLoadManagerV2.java:252)
    9.            at com.starrocks.connector.flink.table.sink.v2.StarRocksWriter.write(StarRocksWriter.java:143)
    10.            at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:182)
    11.            at org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:178)
    12.            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
    13.            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    14.            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    15.            at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
    16.            at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:245)
    17.            at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:217)
    18.            at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:169)
    19.            at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
    20.            at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:616)
    21.            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    22.            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1071)
    23.            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1020)
    24.            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
    25.            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
    26.            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    27.            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
    28.            at java.lang.Thread.run(Thread.java:879)
    29.    Caused by: com.starrocks.data.load.stream.exception.StreamLoadFailException: Transaction prepare failed, db: shoufuyou_fund, table: fund_common_repay_push, label: flink-4c6c8cfb-5116-4c38-a60e-a1b87cd6f2f2,
    30.    responseBody: {
    31.        "Status": "MEM_LIMIT_EXCEEDED",
    32.        "Message": "Memory of process exceed limit. QUERY Backend: 172.17.172.251, fragment: 9048ed6e-6ffb-04db-081b-a4966b179387 Used: 26469550752, Limit: 26316804096. Mem usage has exceed the limit of BE"
    33.    }
    34.    errorLog: null
    35.            at com.starrocks.data.load.stream.TransactionStreamLoader.prepare(TransactionStreamLoader.java:221)
    36.            at com.starrocks.data.load.stream.v2.TransactionTableRegion.commit(TransactionTableRegion.java:247)
    37.            at com.starrocks.data.load.stream.v2.StreamLoadManagerV2.lambda$init$0(StreamLoadManagerV2.java:210)
    38.            ... 1 more
    复制代码
    由于我们 Starrocks BE的内存是在 32GB,开启多个Flink CDC 使命,会导致CDC初始化的时候,写入BE的数据太多,从而BE内存不敷,
    办理: 低落 写入Starrocks的并行读,不要太多CDC同时并行
    也可以参考Troubleshooting StarRocks memory hog issues
  • JobManager Direct buffer memory不敷
    1.   java. lang.OutOfMemoryError: Direct buffer memory
    2.      at lava.n10.B1ts.reserveMemory(B1ts.lava:/08 ~ 7:1.8.0 312.
    3.      at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[7:1.8.0_372]
    4.      at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[7:1.8.0_3721
    5.      at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:247) ~[7:1.8.0_372]
    6.      at sun.nio.ch.IOUtil.write(IOUtil.java:60) ~[7:1.8.0_372]
    7.      at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:234) ~[?:1.8.0_372]
    8.      at java.nio.channels.Channels.writeFullyImpl(Channels.java:78) ~[?:1.8.0_372]
    9.      at java.nio.channels.Channels$1.write(Channels.java:174) ~[7:1.8.0_372]
    10.      at org.apache.flink.core.fs.OffsetAware0utputStream.write(0ffsetAware0utputStream.java:48) ~[ververica-connector-vvp-1.17-vvr-8.0.9-2-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    11.      at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54) ~[ververica-connector-vvp-1.17-vvr-8.0.9-2-SNAPSHOT-jar-with-dependencies. jar: 1.17-vvr-8.0.9-2-SNAPSHOT
    12.      at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88) ~[ververica-connector-vvp-1.17-vvr-8.0.9-2-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    13.      at ora.aoache.flink.fs.osshadooo.writer.OSSRecoverableFsDataOutoutStream.write OSSRecoverableFsDataOutoutStream.1ava:130) ~?:?
    14.      at org.apache.flink. runtime.state.filesystem.FsCheckpointMetadata0utputStream.write(FsCheckpointMetadata0utputStream.java:78) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT. jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    15.      at java.io.Data0utputStream.write(DataOutputStream.java:107) ~[7:1.8.0_372]
    16.      at java.io.Filter0utputStream.write(FilterOutputStream.java:97) ~[7:1.8.0_372]
    17.      at org.apache.flink. runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:703) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT. jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    18.      at org.apache. flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT] at org.apache.flink. runtime.checkpoint.metadata.MetadataV3Serializer.serialize0peratorState(MetadataV3Serializer.java:109) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    19.      at org.apache. flink. runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:153) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    20.      at org.apache.flink. runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT] at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]|
    21.      at org.apache. flink. runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints. java:102) ~[flink-dist-1.17-vvr-8.0.
    复制代码
    办理:
    增加设置:
    1.   jobmanager.memory.off-heap.size: 512mb
    复制代码
  • TaskManager jvm内存不敷
    1.   java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id job-da2375f5-405b-4398-a568-eaba9711576d-taskmanager-1-34 timed out.
    2.           at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1714)
    3.           at org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java:158)
    4.           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    5.           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    6.           at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
    7.           at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    8.           at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
    9.           at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
    10.           at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    11.           at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    12.           at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    13.           at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    14.           at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    15.           at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    16.           at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    17.           at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    18.           at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    19.           at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    20.           at akka.actor.Actor.aroundReceive(Actor.scala:537)
    21.           at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    22.           at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    23.           at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
    24.           at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    25.           at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    26.           at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    27.           at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    28.           at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    29.           at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    30.           at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    31.           at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
    32.   
    复制代码
    办理:
    在运行的过程中我们发现TaskManager的 taskmanager.memory.managed.size 内存使用一直为0,这是因为我们这里没有状态的存储,只是ETL,可以参考Flink TaskManager Memory Model

    所以增加以下设置
    1.   taskmanager.memory.managed.size: 256mb
    2.   taskmanager.memory.process.size: 4096m
    3.   table.exec.state.ttl: 1 m
    复制代码
  • 读取mysql数据过慢
    1.   java.lang.RuntimeException: One or more fetchers have encountered exception
    2.           at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    3.           at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    4.           at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    5.           at org.apache.flink.streaming.api.operators.SourceOperator.pollNext(SourceOperator.java:779) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    6.           at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:457) ~[flink-dist-1.17-vvr-8.0.9-2-SNAPSHOT.
    7.     ...
    8.   Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    9.           at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    10.           at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    11.           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372]
    12.           at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
    13.           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
    14.           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
    15.           ... 1 more
    16.   Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1732052840471,db=,server_id=0,file=mysql-bin.051880,pos=347695811,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
    17.           at org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:212) ~[?:?]
    18.           at org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:133) ~[?:?]
    19.           at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:105) ~[?:?]
    20.           at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:71) ~[?:?]
    21.           at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:119) ~[?:?]
    22.           at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:90) ~[?:?]
    23.           at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    24.           at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    25.           at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) ~[flink-connector-files-1.17-vvr-8.0.9-2-SNAPSHOT.jar:1.17-vvr-8.0.9-2-SNAPSHOT]
    26.           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372]
    27.           at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
    28.           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
    29.           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
    30.           ... 1 more
    复制代码
    办理:
    参考debezium connectors和阿里云,增加如下参数:
    1. debezium.max.queue.size: 162580
    2. debezium.max.batch.size: 40960
    3. debezium.poll.interval.ms: 50
    4. scan.only.deserialize.captured.tables.changelog.enabled: true
    复制代码
  • 增量读取过慢,导致binlog 已经没了
    参考阿里云,增加如下参数
    1. scan.parallel-deserialize-changelog.enabled: true
    2. scan.parallel-deserialize-changelog.handler.size: 4
    3. heartbeat.interval: 5s
    复制代码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

北冰洋以北

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表