new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。
(2)server id
每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络毗连和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,发起为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。
(3)从MySQL源读取数据
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)
Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.
Table options are:
'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='******'
'table-name'='human_sink'
'url'='jdbc:mysql://localhost:80/cdc_sink'
'username'='root'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:234)
at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)
at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)
... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
mysql-cdc
print
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)