ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

打印 上一主题 下一主题

主题 682|帖子 682|积分 2050

目录

Kafka表集成引擎

此引擎与Apache Kafka结合使用。
Kafka 特性:

  • 发布或者订阅数据流。
  • 容错存储机制。
  • 处理流数据。
老版Kafka集成表引擎参数格式:
  1. Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
  2.       [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
复制代码
新版Kafka集成表引擎参数格式:
  1. Kafka SETTINGS
  2.   kafka_broker_list = 'localhost:9092',
  3.   kafka_topic_list = 'topic1,topic2',
  4.   kafka_group_name = 'group1',
  5.   kafka_format = 'JSONEachRow',
  6.   kafka_row_delimiter = '\n',
  7.   kafka_schema = '',
  8.   kafka_num_consumers = 2
复制代码
必要参数:

  • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
  • kafka_topic_list – topic 列表 (my_topic)。
  • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
  • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。
可选参数:

  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。
ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
格式输入输出[TabSeparated]✔✔[TabSeparatedRaw]✔✔[TabSeparatedWithNames]✔✔[TabSeparatedWithNamesAndTypes]✔✔[Template]✔✔[TemplateIgnoreSpaces]✔✗[CSV]✔✔[CSVWithNames]✔✔[CustomSeparated]✔✔[Values]✔✔[Vertical]✗✔[JSON]✗✔[JSONAsString]✔✗[JSONStrings]✗✔[JSONCompact]✗✔[JSONCompactStrings]✗✔[JSONEachRow]✔✔[JSONEachRowWithProgress]✗✔[JSONStringsEachRow]✔✔[JSONStringsEachRowWithProgress]✗✔[JSONCompactEachRow]✔✔[JSONCompactEachRowWithNamesAndTypes]✔✔[JSONCompactStringsEachRow]✔✔[JSONCompactStringsEachRowWithNamesAndTypes]✔✔[TSKV]✔✔[Pretty]✗✔[PrettyCompact]✗✔[PrettyCompactMonoBlock]✗✔[PrettyNoEscapes]✗✔[PrettySpace]✗✔[Protobuf]✔✔[ProtobufSingle]✔✔[Avro]✔✔[AvroConfluent]✔✗[Parquet]✔✔[Arrow]✔✔[ArrowStream]✔✔[ORC]✔✔[RowBinary]✔✔[RowBinaryWithNamesAndTypes]✔✔[Native]✔✔[Null]✗✔[XML]✗✔[CapnProto]✔✗[LineAsString]✔✗[Regexp]✔✗[RawBLOB]✔✔示例:
  1.   CREATE TABLE queue (
  2.     timestamp UInt64,
  3.     level String,
  4.     message String
  5.   ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
  6.   SELECT * FROM queue LIMIT 5;
  7.   CREATE TABLE queue2 (
  8.     timestamp UInt64,
  9.     level String,
  10.     message String
  11.   ) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
  12.                             kafka_topic_list = 'topic',
  13.                             kafka_group_name = 'group1',
  14.                             kafka_format = 'JSONEachRow',
  15.                             kafka_num_consumers = 4;
  16.   CREATE TABLE queue2 (
  17.     timestamp UInt64,
  18.     level String,
  19.     message String
  20.   ) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
  21.               SETTINGS kafka_format = 'JSONEachRow',
  22.                        kafka_num_consumers = 4;
复制代码
消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。
消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。
SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

  • 使用引擎创建一个 Kafka 消费者并作为一条数据流。
  • 创建一个结构表。
  • 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。
当 MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。
示例:
  1.   CREATE TABLE queue (
  2.     timestamp UInt64,
  3.     level String,
  4.     message String
  5.   ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
  6.   CREATE TABLE daily (
  7.     day Date,
  8.     level String,
  9.     total UInt64
  10.   ) ENGINE = SummingMergeTree(day, (day, level), 8192);
  11.   CREATE MATERIALIZED VIEW consumer TO daily
  12.     AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
  13.     FROM queue GROUP BY day, level;
  14.   SELECT level, sum(total) FROM daily GROUP BY level;
复制代码
为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。
停止接收主题数据或更改转换逻辑,请 detach 物化视图:
  1.   DETACH TABLE consumer;
  2.   ATTACH TABLE consumer;
复制代码
如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。
配置

与 GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。
  1.   
  2.   <kafka>
  3.     <debug>cgrp</debug>
  4.     <auto_offset_reset>smallest</auto_offset_reset>
  5.   </kafka>
  6.   
  7.   <kafka_logs>
  8.     <retry_backoff_ms>250</retry_backoff_ms>
  9.     <fetch_min_bytes>100000</fetch_min_bytes>
  10.   </kafka_logs>
复制代码
在ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 true。
Kerberos 支持

对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。
示例:
  1.   
  2.   <kafka>
  3.     <security_protocol>SASL_PLAINTEXT</security_protocol>
  4.     <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
  5.     <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
  6.   </kafka>
复制代码
虚拟列


  • _topic – Kafka 主题。
  • _key – 信息的键。
  • _offset – 消息的偏移量。
  • _timestamp – 消息的时间戳。
  • _timestamp_ms – 消息的时间戳(毫秒)。
  • _partition – Kafka 主题的分区。
资料分享

ClickHouse经典中文文档分享
参考文章


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王國慶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表