王國慶 发表于 2024-4-1 04:01:39

ClickHouse(21)ClickHouse集成Kafka表引擎详细解析

目录

[*]Kafka表集成引擎

[*]配置

[*]Kerberos 支持

[*]虚拟列

[*]资料分享
[*]参考文章

Kafka表集成引擎

此引擎与Apache Kafka结合使用。
Kafka 特性:

[*]发布或者订阅数据流。
[*]容错存储机制。
[*]处理流数据。
老版Kafka集成表引擎参数格式:
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
      [, kafka_row_delimiter, kafka_schema, kafka_num_consumers])新版Kafka集成表引擎参数格式:
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n',
kafka_schema = '',
kafka_num_consumers = 2必要参数:

[*]kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
[*]kafka_topic_list – topic 列表 (my_topic)。
[*]kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
[*]kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。
可选参数:

[*]kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
[*]kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
[*]kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。
ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
以下kafka_format是支持的格式,ClickHouse可以接受和返回各种格式的数据。受支持的输入格式可用于提交给INSERT语句、从文件表(File,URL,HDFS或者外部目录)执行SELECT语句,受支持的输出格式可用于格式化SELECT语句的返回结果,或者通过INSERT写入到文件表。
格式输入输出✔✔✔✔✔✔✔✔✔✔✔✗✔✔✔✔✔✔✔✔✗✔✗✔✔✗✗✔✗✔✗✔✔✔✗✔✔✔✗✔✔✔✔✔✔✔✔✔✔✔✗✔✗✔✗✔✗✔✗✔✔✔✔✔✔✔✔✗✔✔✔✔✔✔✔✔✔✔✔✔✔✔✗✔✗✔✔✗✔✗✔✗✔✔示例:
CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

SELECT * FROM queue LIMIT 5;

CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
                            kafka_topic_list = 'topic',
                            kafka_group_name = 'group1',
                            kafka_format = 'JSONEachRow',
                            kafka_num_consumers = 4;

CREATE TABLE queue2 (
    timestamp UInt64,
    level String,
    message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
            SETTINGS kafka_format = 'JSONEachRow',
                     kafka_num_consumers = 4;消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。
消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。
SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

[*]使用引擎创建一个 Kafka 消费者并作为一条数据流。
[*]创建一个结构表。
[*]创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。
当 MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。
示例:
CREATE TABLE queue (
    timestamp UInt64,
    level String,
    message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

CREATE TABLE daily (
    day Date,
    level String,
    total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);

CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

SELECT level, sum(total) FROM daily GROUP BY level;为了提高性能,接受的消息被分组为max_insert_block_size大小的块。如果未在stream_flush_interval_ms毫秒内形成块,则不关心块的完整性,都会将数据刷新到表中。
停止接收主题数据或更改转换逻辑,请 detach 物化视图:
DETACH TABLE consumer;
ATTACH TABLE consumer;如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。
配置

与 GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

<kafka>
    <debug>cgrp</debug>
    <auto_offset_reset>smallest</auto_offset_reset>
</kafka>


<kafka_logs>
    <retry_backoff_ms>250</retry_backoff_ms>
    <fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>在ClickHouse配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 true。
Kerberos 支持

对于使用了kerberos的kafka, 将security_protocol 设置为sasl_plaintext就够了,如果kerberos的ticket是由操作系统获取和缓存的。
clickhouse也支持自己使用keyfile的方式来维护kerbros的凭证。配置sasl_kerberos_service_name、sasl_kerberos_keytab、sasl_kerberos_principal三个子元素就可以。
示例:

<kafka>
    <security_protocol>SASL_PLAINTEXT</security_protocol>
    <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
    <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>虚拟列


[*]_topic – Kafka 主题。
[*]_key – 信息的键。
[*]_offset – 消息的偏移量。
[*]_timestamp – 消息的时间戳。
[*]_timestamp_ms – 消息的时间戳(毫秒)。
[*]_partition – Kafka 主题的分区。
资料分享

ClickHouse经典中文文档分享
参考文章


[*]ClickHouse(01)什么是ClickHouse,ClickHouse适用于什么场景
[*]ClickHouse(02)ClickHouse架构设计介绍概述与ClickHouse数据分片设计
[*]ClickHouse(03)ClickHouse怎么安装和部署
[*]ClickHouse(04)如何搭建ClickHouse集群
[*]ClickHouse(05)ClickHouse数据类型详解
[*]ClickHouse(06)ClickHouse建表语句DDL详细解析
[*]ClickHouse(07)ClickHouse数据库引擎解析
[*]ClickHouse(08)ClickHouse表引擎概况
[*]ClickHouse(09)ClickHouse合并树MergeTree家族表引擎之MergeTree详细解析
[*]ClickHouse(10)ClickHouse合并树MergeTree家族表引擎之ReplacingMergeTree详细解析
[*]ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
[*]ClickHouse(12)ClickHouse合并树MergeTree家族表引擎之AggregatingMergeTree详细解析
[*]ClickHouse(13)ClickHouse合并树MergeTree家族表引擎之CollapsingMergeTree详细解析
[*]ClickHouse(14)ClickHouse合并树MergeTree家族表引擎之VersionedCollapsingMergeTree详细解析
[*]ClickHouse(15)ClickHouse合并树MergeTree家族表引擎之GraphiteMergeTree详细解析
[*]ClickHouse(16)ClickHouse日志引擎Log详细解析
[*]ClickHouse(17)ClickHouse集成JDBC表引擎详细解析
[*]ClickHouse(18)ClickHouse集成ODBC表引擎详细解析
[*]ClickHouse(19)ClickHouse集成Hive表引擎详细解析
[*]ClickHouse(20)ClickHouse集成PostgreSQL表引擎详细解析

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: ClickHouse(21)ClickHouse集成Kafka表引擎详细解析