老婆出轨 发表于 2024-8-29 09:29:07

Kafka——Kafka Connect详解

Kafka Connect

1、概要介绍

Kafka Connect是一个高伸缩性、高可靠性的数据集成工具,用于在Apache Kafka与其他系统间进行数据搬运以及实行ETL操作,好比Kafka Connect可以或许将文件系统中某些文件的内容全部灌入Kafka topic中大概是把Kafka topic中的消息导出到外部的数据库系统,如图所示。
https://i-blog.csdnimg.cn/blog_migrate/cadbdcd91447588be95a500821b4ac38.png
如图所示,Kafka Connect重要由source connector和sink connector组成。事实上,几乎大部分的ETL框架都是由这两大类逻辑组件组成的,如Apache Flume、Kettle等。source connector负责把输入数据从外部系统中导入到Kafka中,而sink connector则负责把输出数据
导出到其他外部系统。
根据Kafka Connect官网的介绍,目前其重要的计划特点如下。


[*]通用性:依托底层的Kafka焦点系统封装了connector接口,方便开发、部署和管理。
[*]兼具分布式(distributed)和单体式(standalone)两种模式:既可以以standalone单进程的方式运行,也可以扩展到多台机器成为分布式ETL系统。
[*]REST接口:提供常见的REST API方便管理和操作,只适用于分布式模式。
[*]自动位移管理:connector自动管理位移,无须开发人员干预,低落开发本钱。
[*]集成性:方便与流/批处置惩罚系统对接。
显然,一个ETL框架或connector系统是否好用的重要标记之一就是,看source connector和sink connector的种类是否丰富。默认提供的connector越多,我们就能集成越多的外部系统,免去了用户自行开发的本钱。
2、standalone Connect

在standalone模式下全部的操作都是在一个进程中完成的。这种模式非常得当运行在测试或功能验证环境,抑或是必须是单线程才能完成的场景(好比网络日志文件)。由于是单进程,standalone模式无法充实利用Kafka天然提供的负载均衡和高容错等特性。
2.1、数据抽取与加载示例

下面我们在一个单节点的Kafka集群上运行standalone模式的Kafka Connect,把输入文件foo.txt中的数据通过Kafka传输到输出文件bar.txt中。首先我们制作设置文件。Kafka Connectstandalone模式下通常有3类设置文件:connect设置文件,多少source connector设置文件和多少sink connector设置文件。由于本例分别启动一个source connector读取foo.txt和一个sink connector写入bar.txt,故source和sink设置文件都只有一个,以是统共有如下3个设置文件。


[*]connect-standalone.properties:connect standalone模式下的设置文件。
[*]connect--file-source.properties:file source connector设置文件。
[*]connect-file-sink.properties:file sink connector设置文件。
   首先来编辑connect-standalone.properties文件。实际上,Kafka已经在config目次下为我们提供了一个该文件的模板。我们直接使用该模板并修改对应的字段即可,如下:
# connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets


[*]bootstrap.servers:指定Connect要毗连的Kafka集群主机名和端口号。本例使用localhost::9092。
[*]key/value.converter:设置Kafka消息key/value的格式转化类,本例使用JsonConverter,即把每条Kafka消息转化成一个JSON格式。
[*]key/value.converter.schemas.enable:设置是否需要把数据看成纯JSON字符串大概JSON格式的对象。本例设置为tue,即把数据转换成JSON对象。
[*]offset.storage.file.filename:connector会定期地将状态写入底层存储中。该参数设定了状态要被写入的底层存储文件的路径。本例使用/tmp/connect.offsets保存connector的
状态。
   下面编辑connect-file-source.properties,它在Kafka的config目次下也有一份模板,本例直接在该模板的基础上进行修改:
# connect-file-source.properties
name=test-file-source
connector.class=FileStreamSource
tasks.max=1
file=foo.txt
topic=connect-file-test


[*]name:设置该file source connector的名称。
[*]connector.class:设置source connector类的全限定名。有时间设置为类名也是可以的,Kafka Connect可以在classpath中自动搜寻该类并加载。
[*]tasks.max:每个connector下会创建多少个使命(task)实行connector逻辑以期望增加并行度,但对于从单个文件读/写数据如许的操作,恣意时间只能有一个ask访问文件,故这里设置最大使命数为1。
[*]file:输入文件全路径名。本例为foo.txt,即表现该文件位于Kafka目次下。实际使用时最好使用绝对路径。
[*]topic:设置source connector把数据导入到Kafka的哪个topic,若该topic之前不存在,则source connector会自动创建。最好提前手工创建出该topic。.本例使用connect-file-test.
   末了,我们编辑connect-file-sink.properties。同理,直接修改位于config目次下的connect-file-sink.properties模板文件:
# connect-file-sink.properties
name=test-file-sink
connector.class=FileStreamSink
tasks.max=1
file=bar.txt
topics=connect-file-test


[*]name:设置该sink connector名称。
[*]connector.class:设置sink connector类的全限定名。有时间设置为类名也是可以的,Kafka Connect可以在classpath中自动搜寻该类并加载。
[*]tasks.max:依然设置为l,原理与source connector中设置设置雷同。
[*]file:输出文件全路径名。本例为bar.txt,即表现该文件位于Kafka目次下。实际使用时最好使用绝对路径。
[*]topic:设置sink connector导出Kafka中的哪个topic的数据。
   启动Kafka Connect的standalone模式:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties

启动之后,应该可以看到控制台不断地打印“Couldn’t find file foo.txt for FileStreamSourceTask,sleeping to wait for it to be created”之类的日志。这是正常的,由于我们尚未创建输入文件foo.txt。
https://i-blog.csdnimg.cn/blog_migrate/c95598f416ebdcee9d382e9ada7c2c2b.png
下面我们在Kafka的目次下创建foo.txt并写入一些文本行:
echo 'hello' >> ./foo.txt
echo 'kafka connect test exaple' >> ./foo.txt
echo 'this is a file connector test.' >> ./foo.txt
假如统统正常,可以看到在当前目次下天生一个名为bar.txt的文件:
hello
kafka connect test exaple
this is a file connector test.
https://i-blog.csdnimg.cn/blog_migrate/5cb05436245235ab14620f9045c75731.png
可见,foo.txt文件的内容已经乐成地被file connector通过Kafka搬运到bar.txt文件中了。
为了验证数据的确是通过Kafka topic进行转移的,我们读取一下topic(connect-file-test)的数据,如:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-file-test --from-beginning


{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"kafka connect test exaple"}
{"schema":{"type":"string","optional":false},"payload":"this is a file connector test."}
https://i-blog.csdnimg.cn/blog_migrate/bcf19f5c88379fa66100b6197e32cfef.png
2.2、数据抽取、转换与加载示例

上面的例子只涉及ETL中的E和L,即数据抽取(extract.)与加载(load)。作为一个ETL框架,Kafka Connect也支持相称程度的数据转换操作。下面演示在将文件数据导出到目
标文件之前为每条消息增加一个IP字段。假如要插入P静态字段,我们必须修改source connector的设置文件,增加以下这些行:
transforms=WrapMap,InsertHost
transforms.WrapMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.WrapMap.field=line
transforms.InsertHost.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertHost.static.field=ip
transforms.InsertHost.static.value=com.connector.machinel
然后重启kafka Connect,然后写入foo.txt文件:
./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties

echo 'this is a transformation test' >> ./foo.txt
然后查看bar.txt:
hello
kafka connect test exaple
this is a file connector test.
Struct{line=this is a transformation test,ip=com.connector.machinel} https://i-blog.csdnimg.cn/blog_migrate/6341f7be55ac249a3db6e34dcac22808.png
显然,新增的数据被封装成一个结构体(Struct),并增加了ip字段。这就是上面WrapMap和InsertHost的作用。
3、distributed Connect

和standalone模式不同,distributed Connect天然地联合了Kafka提供的负载均衡和故障转移功能,可以或许自动地在多节点机器上平衡负载。用户可以增减机器来实现团体系统的高伸缩性。用户需要实行下列下令来启动distributed模式的Connect,假设我们依然使用Kafka config目次下的设置文件模板:
bin/connect-distributed.sh config/connect-distributed.properties
和standalone模式不同的是,在distributed模式中我们不需要指定source和sink的设置文件。distributed模式中的connector只能通过REST API来创建和管理。
3.1、示例

依然以FileStreamSourceConnector/FileStreamSinkConnector为例来演示怎样在distributed模式下运行Kafka Connect。上述下令启动乐成后,我们可以实行以下下令来获取当
前全部connector:
curl http://localhost:8083/connectors
[]
https://i-blog.csdnimg.cn/blog_migrate/a31ed42369482d3d427544c4cfd137e8.png
值得注意的是,distributed模式下默认的REST端口是8083,用户可以修改connect-distributed.properties文件中的rest.port属性来变更这一端口。如上可见,当前集群中没有创建任何的connector。
下面分别创建file source connector和file sink connector,下令如下:
curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d '{"name":"test-file-source","config":{"connector.class":"FileStreamSource","tasks.max":"1","topic":"connect-file-test","file":"foo.txt"}}' http://localhost:8083/connectors
https://i-blog.csdnimg.cn/blog_migrate/5df4aa1bad672e1790edc72216cedbfe.png
curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d '{"name":"test-file-sink","config":{"connector.class":"FileStreamSink","tasks.max":"1","topics":"connect-file-test","file":"bar.txt"}}' http://localhost:8083/connectors
https://i-blog.csdnimg.cn/blog_migrate/2b31d9027b481af964a25883c90f4016.png
本例中使用curl工具给Kafka Connect发送POST哀求。当前REST API只支持application/json作为哀求(request)和相应(response)的内容范例(content type),因此在发送POST哀求时必须显式指定HTTP的Accept头部为application/json,以设置response的content type。别的,我们还需要设置Content-Type头部信息为application/json,以指定request
的content type。在上面下令中我们只是把standalone模式下设置文件中的全部属性封装成JSON字符串传递给curl工具。注意,connector的name字段和其他字段是分开的,即其他字段首先要被封装到config下,然后和name一起做成JSON串。
下面再次获取当前全部connector以查抄之前的两个connector是否已被创建出来:
https://i-blog.csdnimg.cn/blog_migrate/d7aac1fe5468c1c3b3999dfcad75c216.png
这次我们可以看到两个connector都已经被创建出来了。REST API还提供了/connectors//{name}/config,答应用户查询某个connector的具体设置信息,我们使用这个endpoint来查询file sink connector的信息:
https://i-blog.csdnimg.cn/blog_migrate/809690535d5a917d7b0d2a077a564ebd.png
同时使用GET/connectors//{name}/status查询两connector的运行状态:
https://i-blog.csdnimg.cn/blog_migrate/fc04a3518daee48dd754c46fc774ec6f.png
https://i-blog.csdnimg.cn/blog_migrate/9c16537e368bc240595a3d2e1aa2e527.png
目前两个connector都正常工作。下面开始写入输入文件foo.txt:
echo 'one' >> ./foo.txt
echo 'two' >> ./foo.txt
echo 'three' >> ./foo.txt
查看bar.txt:
https://i-blog.csdnimg.cn/blog_migrate/1a67a3f37ee4bbc6a69f64992e1b0e4d.png
做完这些之后,我们删除这两个connector把系统还原回初始状态。若要删除connector,可以使用REST API–DELETE/connectors/.{name},如下:
curl -i -X DELETE http://localhost:8083/connectors/test-file-source

curl -i -X DELETE http://localhost:8083/connectors/test-file-sink
3.2、REST API

我们可以通过Kafka Connect提供的基于REST风格的API接口来管理毗连器,默认端口号为8083,可以通过Worker进程的设置文件中的rest,port参数来修改端口号。Kafka ConnectREST API接口如表所示。
REST API描述GET /查看Kafka集群版本信息GET /connectors查看当前生动的毗连器列表,表现毗连器的名字POST /connectors根据指定设置,创建一个新的毗连器GET /connectors/{name}查看指定毗连器的信息GET /connectors/{name}/config查看指定毗连器的设置信息PUT /connectors/{name}/config修改指定毗连器的设置信息GET /connectors/{name}/status查看指定毗连器的状态POST /connectors/{name}/restart重启指定的毗连器PUT /connectors/{name}/pause停息指定的毗连器GET /connectors/{name}/tasks查看指定毗连器正在运行的TaskPOST /connectors/{name}/tasks修改Task的设置GET /connectors/{name}/tasks/{taskId}/status查看指定毗连器中指定Task的状态POST /connectors/{name}/tasks/{tasked}/restart重启指定毗连器中指定的TaskDELETE /connectors/{name}/删除指定的毗连器 3.3、其它毗连器类

connector.class用来设置毗连器类的全限定名称,有时间设置为类名也是可以的,Kafka Connect会在classpath中自动搜索这个类并加载。Kafka中默认只提供了与文件相干的毗连器,假如要实现与其他数据存储系统相毗连,那么可以参考文件毗连器的具体实现来自界说一套毗连器,大概搜寻开源的实现,好比Confluent公司提供的
一些产物:


[*]kafka-connect-elasticsearch(https://github.com/confluentinc/kafka-connect-elasticsearch);
[*]kafka-connect-jdbc (https://github.com/confluentinc/kafka-connect-jdbc);
[*]kafka-connect-hdfs (https://github.com/confluentinc/kafka-connect-hdfs);
[*]kafka-connect-storage-cloud (https://github.com/confluentinc/kafka-connect-storage-cloud).
4、示例MySQL数据同步到Redis

4.1、准备毗连器

   下载毗连器
MySQL毗连器:https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
Redis毗连器:https://www.confluent.io/hub/jcustenborder/kafka-connect-redis
   安装插件
在kafka目次下新建connect文件夹:
cd /usr/local/kafka_2.12-3.1.0
mkdir connect
将下载的插件移动到connect文件夹中:
cp confluentinc-kafka-connect-jdbc-10.7.4 /usr/local/kafka_2.12-3.1.0/confluentinc-kafka-connect-jdbc-10.7.4

cp jcustenborder-kafka-connect-redis-0.0.4 /usr/local/kafka_2.12-3.1.0/jcustenborder-kafka-connect-redis-0.0.4
下载mysql对应的驱动,放到confluentinc-kafka-connect-jdbc-10.7.4/lib目次下
mv mysql-connector-java-8.0.20.jar /usr/local/kafka_2.12-3.1.0/confluentinc-kafka-connect-jdbc-10.7.4/lib/mysql-connector-java-8.0.20.jar
修改distributed设置:
vim /usr/local/kafka_2.12-3.1.0/config/connect-distributed.properties
指定插件位置:
plugin.path=../connect
启动Connect,查看插件是否加载乐成:
./connect-distributed.sh ../config/connect-distributed.properties
https://i-blog.csdnimg.cn/blog_migrate/a03d68b2c334479e70d456c9eab65a9a.png
4.2、准备MySQL

   创建表及数据
CREATE TABLE `login` (
        `id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
        `username` varchar(30) DEFAULT NULL,
        `login_time` datetime
);

INSERT INTO `login` VALUES(1, 'aaa', NOW());
INSERT INTO `login` VALUES(2, 'bbb', NOW());
INSERT INTO `login` VALUES(3, 'ccc', NOW());
INSERT INTO `login` VALUES(4, 'ddd', NOW());
INSERT INTO `login` VALUES(5, 'eee', NOW());
INSERT INTO `login` VALUES(6, 'fff', NOW());
INSERT INTO `login` VALUES(7, 'ggg', NOW());
INSERT INTO `login` VALUES(8, 'hhh', NOW());
INSERT INTO `login` VALUES(9, 'iii', NOW());
INSERT INTO `login` VALUES(10, 'jjj', NOW());
   创建毗连器,新建source.json
{
        "name": "example-source",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "tasks.max": "1",
                "connection.url": "jdbc:mysql://127.0.0.1:3306/kafka_db",
                "connection.user": "root",
                "connection.password": "123456",
                "table.whitelist": "login",
      "mode":"incrementing",
      "incrementing.column.name":"id"
        }
}
   向Worker发送哀求,创建毗连器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @source.json
   确认数据是否写入kafka
./kafka-topics.sh --bootstrap-server localhost:9092 --list
https://i-blog.csdnimg.cn/blog_migrate/a7002fb6fe7e1b1080b7df5365698921.png
__consumer_offsets: 记录所有Kafka Consumer Group的Offset
connect-configs: 存储连接器的配置,对应Connect 配置文件中config.storage.topic
connect-offsets: 存储Source 的Offset,对应Connect 配置文件中offset.storage.topic
connect-status: 连接器与Task的状态,对应Connect 配置文件中status.storage.topic
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic login --from-beginning
https://i-blog.csdnimg.cn/blog_migrate/a542432f0af863d2825803652a45bb00.png
4.3、准备redis

   创建sink.json
{
        "name": "example-sink",
        "config": {
                "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
      "topics": "login",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "tasks.max": "1",
          "redis.client.mode": "Standalone",
          "redis.database": "1",
          "redis.hosts": "localhost:6379",
          "redis.password": "123456"
        }
}
启动:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @sink.json

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Kafka——Kafka Connect详解