Kafka-Connect
一、概述Kafka Connect是一个在Apache Kafka和其他体系之间可扩展且可靠地流式传输数据的工具。仔细的你会发现,我们编写的producer、consumer都有很多重复的代码,KafkaConnect就是将这些通用的api进行了封装。让我们可以只关心业务部门(数据的处理逻辑)。它可以以设置界说的方式实现数据的生产和消耗。Kafka Connect可以将整个数据库或全部应用服务器的指标收集到Kafka主题中,使数据可以大概以低延迟进行流处理。导出作业可以将数据从Kafka主题转达到辅助存储和查询体系,或转达到批处理体系进行离线分析。
我们写大数据程序的时候,只写数据的处理逻辑就能实现分布式的并行计算,是因为MapReduce、Spark这些计算框架的良好筹划。把输入格式化、节点间Shuffle性能优化、任务拆解等都抽离出来,我们只需要继续Mapper、Reducer接口或者利用算子就可以了。Kafka Connect也是这样的筹划思想,你就把我看成一个消息队列工具,我给你封装好,你按照我们的规范处理数据就可以了。
Kafka Connect功能如下:
1、通用框架
Kafka Connect标准化了其他数据体系与Kafka的集成,简化了连接器的开发、部署和管理
2、Distributed 和 standalone 模式
扩展到支持整个构造的大型集中管理服务,或扩展到开发、测试和小型生产部署
3、REST 接口
通过易于利用的REST API向Kafka Connect集群提交和管理连接器
4、offset 自动管理
只需来自连接器的一点信息,Kafka Connect就可以自动管理偏移提交过程,因此连接器开发人员无需担心连接器开发中这个轻易出错的部门
5、可扩展
Kafka Connect创建在现有的组管理协议之上。可以添加更多工作人员来扩展Kafka Connect集群。
6、流/批处理集成
利用Kafka的现有功能,Kafka Connect是桥接流和批处理数据体系的理想办理方案
二、利用方法
1、运行Kafka Connect
Kafka Connect目前支持两种执行模式:standalone (单历程)和distributed
standalone 模式
在独立模式下,全部工作都在单个历程中执行。该模式轻易设置和上手,在有些场景下也很得当(例如收集日记文件),但它不能受益于Kafka Connect的某些功能,例如容错。您可以利用以下下令启动独立历程:
bin/connect-standalone.sh config/connect-standalone.properties 参数设置:
第一个参数(也就是connector1.properties)是worker的设置。这包罗诸如Kafka连接参数、序列化格式以及提交偏移的频率等设置。config/server.properties提供了默认设置运行的当地集群。它需要调整以用于差别的设置或生产部署。全部worker(独立和分布式)都需要一些设置:
[*]bootstrap.servers-用于引导连接Kafka的服务器列表
[*]key.converter-序列化key的转换器类。常见格式的示例包罗JSON和Avro。
[*]value.converter-序列化value的转换器类。常见格式的示例包罗JSON和Avro。
[*]plugin.path(默以为empty)-包含Connect插件(连接器、转换器、转换)的路径列表。在运行快速启动之前,用户必须添加包含示例FileStreamSourceConnector的绝对路径,
以下是standalone 模式特有的重要设置:
[*]offset.storage.file.filename-存储源连接器偏移量的文件
此处设置的参数旨在供Kafka Connect用于访问设置、偏移量和状态主题的生产者和消耗者利用。对于Kafka源任务利用的生产者和Kafka接收器任务利用的消耗者的设置,可以利用相同的参数,但需要分别以producer.和consumer.为前缀。从工作设置中继续的唯一没有前缀的Kafka客户端参数是bootstrap.servers
从2.3.0开始,客户端设置覆盖可以通过利用前缀producer.override.和consumer.override.分别为Kafka源或Kafka接收器单独设置。这些覆盖包含在连接器的别的设置属性中。
distributed 模式
分布式模式处理work的自动平衡,允许动态扩展(或缩减),并在活动任务以及设置和偏移提交数据中提供容错。执行与独立模式非常相似:
bin/connect-distributed.sh config/connect-distributed.properties 区别在于启动的类和设置参数,这些参数改变了Kafka Connect历程决定在哪里存储设置、怎样分配工作以及在哪里存储偏移量和任务状态的方式。在分布式模式下,Kafka Connect将偏移量、设置和任务状态存储在Kafka主题中。发起手动创建偏移量、设置和状态的主题,以实现所需的分区数和复制因子。如果启动Kafka Connect时尚未创建主题,则将利用默认的分区数和复制因子自动创建主题(一般不怎么做)。
参数设置:
在启动集群之前需要设置的重要参数:
[*]group.id(默认connect-cluster)-集群的唯一名称,用于形成Connect集群组;请注意,这不得与消耗者组ID相辩论
[*]config.storage.topic(默认connect-configs)-用于存储连接器和任务设置的topic;请注意,这应该是单个分区、高度复制、压缩的topic。可能需要手动创建topic以确保正确的设置,因为自动创建的topic可能有多个分区或自动设置为删除而不是压缩
[*]offset.storage.topic(默认connect-offsets)-用于存储偏移量的topic;此topic应该有许多分区,可以复制,并设置为压缩
[*]status.storage.topic(默认connect-status)-用于存储状态的topic;这个topic可以有多个分区,应该复制和设置以进行压缩
请注意,在分布式模式下,连接器设置不会在下令行上转达。相反,利用下面形貌的REST API来创建、修改和销毁连接器
2、设置Connectors
连接器设置是简单的键值映射。在独立模式和分布式模式下,它们都包含在创建(或修改)连接器的REST请求的JSON有用负载中。在独立模式下,这些也可以在属性文件中界说并通过下令行转达给Connect历程。
以下是一些常见选项:
[*]name-连接器的唯一名称。尝试利用相同的名称再次注册将失败。
[*]connector.class-连接器的Java类
[*]tasks.max-应为此连接器创建的最大任务数。如果连接器无法实现此并行级别,它可能会创建更少的任务。
[*]key.converter-(可选)覆盖worker设置的默认key转换器。
[*]value.converter-(可选)覆盖worker设置的默认value转换器。
该connector.class设置支持多种格式:此连接器的类的全名或别名。如果连接器是org. apache.kafka.connect.file.FileStreamSinkConnector,则可以指定此全名或利用FileStreamSink或FileStreamSinkConnector使设置更短。
接收器连接器另有一些额外的选项来控制它们的输入。每个接收器连接器必须设置以下选项之一:
[*]topics-用逗号分隔的主题列表,用作此连接器的输入
[*]topics.regex-主题的Java正则表达式,用作此连接器的输入
3、转换
连接器可以设置转换以进行轻量级的Message-at-time修改。
可以在连接器设置中指定转换链:
[*]transforms-转换的别名列表,指定应用转换的顺序。
[*]transforms.$alias.type-转换的完全限定类名。
[*]transforms.$alias.$transformationSpecificConfig转换的设置属性
4、REST API
由于Kafka Connect旨在作为服务运行,因此它还提供了用于管理连接器的REST API。此REST API在独立模式和分布式模式下都可用。可以利用listeners设置选项来设置REST API服务器。此字段应包含以下格式的侦听器列表:protocol://host:port,protocol2://host2:port2。目前支持的协议是http和https。例如:
listeners=http://localhost:8080,https://localhost:8443
默认情况下,如果未指定listeners,则REST服务器利用HTTP协议在端口8083上运行。
REST API不光被用户用于监控/管理Kafka Connect。在分布式模式下,它也用于Kafka Connect跨集群通信。在follower 节点REST API上接收到的一些请求将被转发到leader 节点REST API。如果给定主机可达的URI与其侦听的URI差别,设置选项rest.advertised.host.name、rest.advertised.port和rest.advertised.listener可用于更改follower 节点将用于与leader 连接的URI。当同时利用HTTP和HTTPS侦听器时,rest.advertised.listenerssl.* or listeners.https 选项将用于设置HTTPS客户端。
以下是当前支持的REST API端点:
[*]GET /connectors-返回活动连接器列表
[*]POST /connectors-创建一个新的连接器;请求正文应该是一个JSON对象,其中包含一个字符串name字段和一个带有连接器设置参数的对象config字段。JSON对象还可以选择包含一个字符串initial_state字段,该字段可以采用以下值-STOPPED、PAUSED或RUNNING(默认值)
[*]GET /connectors/{name}-获取有关特定连接器的信息
[*]GET /connectors/{name}/config-获取特定连接器的设置参数
[*]PUT /connectors/{name}/config-更新特定连接器的设置参数
[*]PATCH /connectors/{name}/config-修补特定连接器的设置参数,其中JSON正文中的null表现从最终设置中删除键
[*]GET /connectors/{name}/status-获取连接器的当前状态,包罗是否正在运行、失败、停息等,分配给哪个工作人员,失败时的错误信息,以及全部任务的状态
[*]GET /connectors/{name}/tasks-获取当前为连接器运行的任务列表及其设置
[*]GET /connectors/{name}/tasks-config-获取特定连接器的全部任务的设置。此端点已弃用,将在下一个重要版本中删除。请改用GET /connectors/{name}/tasks端点。注意两个端点的响应结构略有差别,
[*]GET /connectors/{name}/tasks/{taskid}/status-获取任务的当前状态,包罗它是否正在运行、失败、停息等,它被分配给哪个工作人员,以及失败时的错误信息
[*]PUT /connectors/{name}/pause-停息连接器及其任务,这将停止消息处理,直到连接器恢复。其任务声明的任何资源都被分配,这允许连接器在恢复后快速开始处理数据。
[*]PUT /connectors/{name}/stop-停止连接器并关闭其任务,开释其任务声明的任何资源。从资源利用的角度来看,这比停息连接器更有用,但可能会导致连接器在恢复后需要更长时间才气开始处理数据。请注意,如果连接器处于停止状态,则只能通过偏移管理端点修改连接器的偏移量
[*]PUT /connectors/{name}/resume-恢复停息或停止的连接器(如果连接器没有停息或停止,则不执行任何操作)
[*]POST /connectors/{name}/tasks/{taskId}/restart-重新启动单个任务(通常是因为它失败了)
[*]DELETE /connectors/{name}-删除连接器,停止全部任务并删除其设置
[*]GET /connectors/{name}/topics-获取特定连接器自创建连接器或发出重置其活动主题集的请求以来正在利用的主题集
[*]PUT /connectors/{name}/topics/reset-发送清空连接器活动主题集的请求
Kafka Connect还提供了一个REST API来获取有关连接器插件的信息:
[*]GET /connector-plugins-返回安装在Kafka Connect集群中的连接器插件列表。请注意,API仅检查处理请求的工作人员上的连接器,这意味着您可能会看到不一致的效果,尤其是在滚动升级期间,如果您添加新的连接器jar
[*]GET /connector-plugins/{plugin-type}/config-获取指定插件的设置界说。
[*]PUT /connector-plugins/{connector-type}/config/validate-根据设置界说验证提供的设置值。此API执行每个设置验证,在验证期间返回发起值和错误消息。
以下是顶级(根)端点支持的REST请求:
[*]GET /-返回有关Kafka Connect集群的根本信息,例如服务于REST请求的Connect工作程序版本(包罗源代码的git提交ID)和连接到的Kafka集群ID。
可以利用admin.listeners设置在Kafka Connect的REST API服务器上设置管理员REST API
admin.listeners=http://localhost:8080,https://localhost:8443
默认情况下,如果未设置admin.listeners,则管理员REST API将在通例侦听器上可用。
以下是当前支持的管理员REST API端点:
[*]GET /admin/loggers-列出明确设置了级别的当前记载器及其日记级别
[*]GET /admin/loggers/{name}-获取指定记载器的日记级别
[*]PUT /admin/loggers/{name}-设置指定记载器的日记级别
5、错误陈诉
Kafka Connect提供错误陈诉来处理在处理的各个阶段遇到的错误。默认情况下,在转换期间或转换中遇到的任何错误都将导致连接器失败。每个连接器设置还可以通过跳过它们来允许容忍此类错误,可选地将每个错误以及失败操作的具体信息和有题目的记载(具有差别的具体级别)写入Connect应用程序日记。当接收器连接器处理从其Kafka主题消耗的消息时,这些机制还会捕获错误,而且全部错误都可以写入可设置的“死信队列”(DLQ)Kafka主题。
要向日记陈诉连接器转换器、转换或接收器连接器自己中的错误,请在连接器设置中设置errors.log.enable=true以记载每个错误和题目记载的主题、分区和偏移量的具体信息。出于其他调试目的,请设置errors.log.include.messages=true以将题目记载键、值和标头记载到日记中(请注意,这可能会记载敏感信息)。
要在连接器的转换器内陈诉错误,请将接收器连接器自己转换为死信队列主题,设置errors.deadletterqueue.topic.name,并可选地errors.deadletterqueue.context.headers.enable=true。
默认情况下,连接器在出现错误或异常时立即表现出“快速故障”行为。这相称于将以下设置属性及其默认值添加到连接器设置中:
# 禁用失败重试
errors.retry.timeout=0
# 不记载错误及其上下文
errors.log.enable=false
# 不要在死信队列主题中记载错误
errors.deadletterqueue.topic.name=
# 第一次出错失败
errors.tolerance=none
可以更改这些和其他相关的连接器设置属性以提供差别的行为。例如,可以将以下设置属性添加到连接器设置中,以通过多次重试来设置错误处理、记载到应用程序日记和my-connector-errorsKafka主题,并通过陈诉错误而不是连接器任务失败来容忍全部错误:
# 重试最多10分钟,连续失败之间最多等待30秒
errors.retry.timeout=600000
errors.retry.delay.max.ms=30000
# 将错误上下文与应用程序日记一起记载,但不包罗设置和消息
errors.log.enable=true
errors.log.include.messages=false
# 在Kafka主题中天生错误上下文
errors.deadletterqueue.topic.name=my-connector-errors
# 容忍全部错误。
errors.tolerance=all
6、支持正确一次语义
Kafka Connect可以大概为接收器连接器(从版本0.11.0开始)和源连接器(从版本3.3.0开始)提供正确一次语义学。请注意,对正确一次语义学的支持高度依赖于您运行的连接器类型。纵然您在设置中为集群中的每个节点设置了全部正确的工作属性,如果连接器不是为Kafka Connect框架筹划的,或者不能利用Kafka Connect框架的功能,正确一次可能是不可能的。
Sink connectors
如果接收器连接器支持正确一次语义学,要在Connect工作级别启用正确一次,您必须确保其消耗者组设置为忽略中止事务中的记载。您可以通过将工作属性consumer.isolation.level设置为read_committed来做到这一点,或者,如果运行支持它的Kafka Connect版本,则利用连接器客户端设置覆盖计谋,该计谋允许在单个连接器设置中将consumer.override.isolation.level属性设置为read_committed。没有额外的ACL要求。
Source connectors
如果源连接器支持正确一次语义学,则必须设置Connect聚集以启用对正确一次源连接器的框架级支持。如果针对安全的Kafka聚集运行,可能需要额外的ACL。请注意,对Source connectors的正确一次支持目前仅在分布式模式下可用;独立的Connect工作人员不能提供正确一次语义学。
Worker 设置
对于新的Connect聚集,在聚集中每个节点的工作设置中将exactly.once.source.support属性设置为enabled。对于现有聚集,需要两次滚动升级。在第一次升级期间,exactly.once.source.support属性应设置为preparing,在第二次升级期间,应设置为enabled。
ACL要求
启用了正确一次源支持,或者exactly.once.source.support设置为preparing,每个Connect工作人员的主体将需要以下ACL:
操作资源类型资源名称注意WriteTransactionalIdconnect-cluster-${groupId},其中${groupId}是集群的group.idDescribeTransactionalIdconnect-cluster-${groupId},其中${groupId}是集群的group.idIdempotentWriteCluster托管工作人员设置主题的Kafka集群的ID仅适用于在Kafka2.8之前 启用了正确一次源(但如果exactly.once.source.support设置为preparing),每个单独连接器的主体将需要以下ACL:Write、Describe、Write、Read、Describe、Create、IdempotentWrite。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]