ToB企服应用市场:ToB评测及商务社交产业平台

标题: Kafka-Connect [打印本页]

作者: 罪恶克星    时间: 2024-12-3 11:03
标题: Kafka-Connect
一、概述

Kafka Connect是一个在Apache Kafka和其他体系之间可扩展且可靠地流式传输数据的工具。仔细的你会发现,我们编写的producer、consumer都有很多重复的代码,KafkaConnect就是将这些通用的api进行了封装。让我们可以只关心业务部门(数据的处理逻辑)。它可以以设置界说的方式实现数据的生产和消耗。Kafka Connect可以将整个数据库或全部应用服务器的指标收集到Kafka主题中,使数据可以大概以低延迟进行流处理。导出作业可以将数据从Kafka主题转达到辅助存储和查询体系,或转达到批处理体系进行离线分析。
我们写大数据程序的时候,只写数据的处理逻辑就能实现分布式的并行计算,是因为MapReduce、Spark这些计算框架的良好筹划。把输入格式化、节点间Shuffle性能优化、任务拆解等都抽离出来,我们只需要继续Mapper、Reducer接口或者利用算子就可以了。Kafka Connect也是这样的筹划思想,你就把我看成一个消息队列工具,我给你封装好,你按照我们的规范处理数据就可以了。
Kafka Connect功能如下:
1、通用框架
        Kafka Connect标准化了其他数据体系与Kafka的集成,简化了连接器的开发、部署和管理
2、Distributed 和 standalone 模式
        扩展到支持整个构造的大型集中管理服务,或扩展到开发、测试和小型生产部署
3、REST 接口
        通过易于利用的REST API向Kafka Connect集群提交和管理连接器
4、offset 自动管理
        只需来自连接器的一点信息,Kafka Connect就可以自动管理偏移提交过程,因此连接器开发人员无需担心连接器开发中这个轻易出错的部门
5、可扩展
        Kafka Connect创建在现有的组管理协议之上。可以添加更多工作人员来扩展Kafka Connect集群。
6、流/批处理集成
        利用Kafka的现有功能,Kafka Connect是桥接流和批处理数据体系的理想办理方案
二、利用方法

1、运行Kafka Connect

Kafka Connect目前支持两种执行模式:standalone (单历程)和distributed
standalone 模式

在独立模式下,全部工作都在单个历程中执行。该模式轻易设置和上手,在有些场景下也很得当(例如收集日记文件),但它不能受益于Kafka Connect的某些功能,例如容错。您可以利用以下下令启动独立历程:
  1. bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.json …]
复制代码
参数设置:

第一个参数(也就是connector1.properties)是worker的设置。这包罗诸如Kafka连接参数、序列化格式以及提交偏移的频率等设置。config/server.properties提供了默认设置运行的当地集群。它需要调整以用于差别的设置或生产部署。全部worker(独立和分布式)都需要一些设置:

以下是standalone 模式特有的重要设置:

此处设置的参数旨在供Kafka Connect用于访问设置、偏移量和状态主题的生产者和消耗者利用。对于Kafka源任务利用的生产者和Kafka接收器任务利用的消耗者的设置,可以利用相同的参数,但需要分别以producer.和consumer.为前缀。从工作设置中继续的唯一没有前缀的Kafka客户端参数是bootstrap.servers
从2.3.0开始,客户端设置覆盖可以通过利用前缀producer.override.和consumer.override.分别为Kafka源或Kafka接收器单独设置。这些覆盖包含在连接器的别的设置属性中。
distributed 模式

分布式模式处理work的自动平衡,允许动态扩展(或缩减),并在活动任务以及设置和偏移提交数据中提供容错。执行与独立模式非常相似:
  1. bin/connect-distributed.sh config/connect-distributed.properties
复制代码
区别在于启动的类和设置参数,这些参数改变了Kafka Connect历程决定在哪里存储设置、怎样分配工作以及在哪里存储偏移量和任务状态的方式。在分布式模式下,Kafka Connect将偏移量、设置和任务状态存储在Kafka主题中。发起手动创建偏移量、设置和状态的主题,以实现所需的分区数和复制因子。如果启动Kafka Connect时尚未创建主题,则将利用默认的分区数和复制因子自动创建主题(一般不怎么做)。
参数设置:

在启动集群之前需要设置的重要参数:

请注意,在分布式模式下,连接器设置不会在下令行上转达。相反,利用下面形貌的REST API来创建、修改和销毁连接器
2、设置Connectors

连接器设置是简单的键值映射。在独立模式和分布式模式下,它们都包含在创建(或修改)连接器的REST请求的JSON有用负载中。在独立模式下,这些也可以在属性文件中界说并通过下令行转达给Connect历程。
以下是一些常见选项:

该connector.class设置支持多种格式:此连接器的类的全名或别名。如果连接器是org. apache.kafka.connect.file.FileStreamSinkConnector,则可以指定此全名或利用FileStreamSink或FileStreamSinkConnector使设置更短。
接收器连接器另有一些额外的选项来控制它们的输入。每个接收器连接器必须设置以下选项之一:

3、转换

连接器可以设置转换以进行轻量级的Message-at-time修改。
可以在连接器设置中指定转换链:

4、REST API

由于Kafka Connect旨在作为服务运行,因此它还提供了用于管理连接器的REST API。此REST API在独立模式和分布式模式下都可用。可以利用listeners设置选项来设置REST API服务器。此字段应包含以下格式的侦听器列表:protocol://host:port,protocol2://host2:port2。目前支持的协议是http和https。例如:
   listeners=http://localhost:8080,https://localhost:8443
  默认情况下,如果未指定listeners,则REST服务器利用HTTP协议在端口8083上运行。
REST API不光被用户用于监控/管理Kafka Connect。在分布式模式下,它也用于Kafka Connect跨集群通信。在follower 节点REST API上接收到的一些请求将被转发到leader 节点REST API。如果给定主机可达的URI与其侦听的URI差别,设置选项rest.advertised.host.name、rest.advertised.port和rest.advertised.listener可用于更改follower 节点将用于与leader 连接的URI。当同时利用HTTP和HTTPS侦听器时,rest.advertised.listenerssl.* or listeners.https 选项将用于设置HTTPS客户端。
以下是当前支持的REST API端点:

Kafka Connect还提供了一个REST API来获取有关连接器插件的信息:

以下是顶级(根)端点支持的REST请求:

可以利用admin.listeners设置在Kafka Connect的REST API服务器上设置管理员REST API
   admin.listeners=http://localhost:8080,https://localhost:8443
  默认情况下,如果未设置admin.listeners,则管理员REST API将在通例侦听器上可用。
以下是当前支持的管理员REST API端点:

5、错误陈诉

Kafka Connect提供错误陈诉来处理在处理的各个阶段遇到的错误。默认情况下,在转换期间或转换中遇到的任何错误都将导致连接器失败。每个连接器设置还可以通过跳过它们来允许容忍此类错误,可选地将每个错误以及失败操作的具体信息和有题目的记载(具有差别的具体级别)写入Connect应用程序日记。当接收器连接器处理从其Kafka主题消耗的消息时,这些机制还会捕获错误,而且全部错误都可以写入可设置的“死信队列”(DLQ)Kafka主题。
要向日记陈诉连接器转换器、转换或接收器连接器自己中的错误,请在连接器设置中设置errors.log.enable=true以记载每个错误和题目记载的主题、分区和偏移量的具体信息。出于其他调试目的,请设置errors.log.include.messages=true以将题目记载键、值和标头记载到日记中(请注意,这可能会记载敏感信息)。
要在连接器的转换器内陈诉错误,请将接收器连接器自己转换为死信队列主题,设置errors.deadletterqueue.topic.name,并可选地errors.deadletterqueue.context.headers.enable=true。
默认情况下,连接器在出现错误或异常时立即表现出“快速故障”行为。这相称于将以下设置属性及其默认值添加到连接器设置中:
   # 禁用失败重试
errors.retry.timeout=0
  # 不记载错误及其上下文
errors.log.enable=false
  # 不要在死信队列主题中记载错误
errors.deadletterqueue.topic.name=
  # 第一次出错失败
errors.tolerance=none
  可以更改这些和其他相关的连接器设置属性以提供差别的行为。例如,可以将以下设置属性添加到连接器设置中,以通过多次重试来设置错误处理、记载到应用程序日记和my-connector-errorsKafka主题,并通过陈诉错误而不是连接器任务失败来容忍全部错误:
   # 重试最多10分钟,连续失败之间最多等待30秒
errors.retry.timeout=600000
errors.retry.delay.max.ms=30000
  # 将错误上下文与应用程序日记一起记载,但不包罗设置和消息
errors.log.enable=true
errors.log.include.messages=false
  # 在Kafka主题中天生错误上下文
errors.deadletterqueue.topic.name=my-connector-errors
  # 容忍全部错误。
errors.tolerance=all
  6、支持正确一次语义

Kafka Connect可以大概为接收器连接器(从版本0.11.0开始)和源连接器(从版本3.3.0开始)提供正确一次语义学。请注意,对正确一次语义学的支持高度依赖于您运行的连接器类型。纵然您在设置中为集群中的每个节点设置了全部正确的工作属性,如果连接器不是为Kafka Connect框架筹划的,或者不能利用Kafka Connect框架的功能,正确一次可能是不可能的。
Sink connectors

如果接收器连接器支持正确一次语义学,要在Connect工作级别启用正确一次,您必须确保其消耗者组设置为忽略中止事务中的记载。您可以通过将工作属性consumer.isolation.level设置为read_committed来做到这一点,或者,如果运行支持它的Kafka Connect版本,则利用连接器客户端设置覆盖计谋,该计谋允许在单个连接器设置中将consumer.override.isolation.level属性设置为read_committed。没有额外的ACL要求。
Source connectors

如果源连接器支持正确一次语义学,则必须设置Connect聚集以启用对正确一次源连接器的框架级支持。如果针对安全的Kafka聚集运行,可能需要额外的ACL。请注意,对Source connectors的正确一次支持目前仅在分布式模式下可用;独立的Connect工作人员不能提供正确一次语义学。
Worker 设置

对于新的Connect聚集,在聚集中每个节点的工作设置中将exactly.once.source.support属性设置为enabled。对于现有聚集,需要两次滚动升级。在第一次升级期间,exactly.once.source.support属性应设置为preparing,在第二次升级期间,应设置为enabled。
ACL要求

启用了正确一次源支持,或者exactly.once.source.support设置为preparing,每个Connect工作人员的主体将需要以下ACL:
操作资源类型资源名称注意
WriteTransactionalIdconnect-cluster-${groupId},其中${groupId}是集群的group.id
DescribeTransactionalIdconnect-cluster-${groupId},其中${groupId}是集群的group.id
IdempotentWriteCluster托管工作人员设置主题的Kafka集群的ID仅适用于在Kafka2.8之前
启用了正确一次源(但如果exactly.once.source.support设置为preparing),每个单独连接器的主体将需要以下ACL:Write、Describe、Write、Read、Describe、Create、IdempotentWrite。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4