Kafka 单机和集群环境摆设教程

打印 上一主题 下一主题

主题 1641|帖子 1641|积分 4923

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
下面是 Apache Kafka 单机和集群环境摆设的具体教程,包罗摆设过程中的注意事项以及一个使用案例。Apache Kafka 是一个分布式流处置惩罚平台,广泛用于及时数据处置惩罚、日志收集、消息队列等场景。

一、Kafka 单机环境摆设

1. 环境准备



  • 操作体系:Linux(保举 Ubuntu 20.04 或 CentOS 7)
  • Java:Kafka 需要 Java 环境,保举使用 OpenJDK 8 或 11。
  • ZooKeeper:Kafka 依赖 ZooKeeper 进行分布式和谐。
2. 安装 Java

在 Ubuntu 中:
  1. sudo apt update
  2. sudo apt install openjdk-11-jdk
复制代码
在 CentOS 中:
  1. sudo yum install java-11-openjdk
复制代码
验证 Java 安装:
  1. java -version
复制代码
3. 安装 ZooKeeper

Kafka 使用 ZooKeeper 进行节点管理和和谐,需要先安装并启动 ZooKeeper。
3.1 下载并解压 ZooKeeper

  1. wget https://downloads.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
  2. tar -xzvf apache-zookeeper-3.8.2-bin.tar.gz
  3. mv apache-zookeeper-3.8.2-bin /usr/local/zookeeper
复制代码
3.2 设置 ZooKeeper


  • 创建数据目录:
    1. mkdir -p /var/lib/zookeeper
    复制代码
  • 复制设置文件:
    1. cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
    复制代码
  • 编辑设置文件 /usr/local/zookeeper/conf/zoo.cfg:
    1. dataDir=/var/lib/zookeeper
    2. clientPort=2181
    复制代码
3.3 启动 ZooKeeper

  1. /usr/local/zookeeper/bin/zkServer.sh start
复制代码
3.4 验证 ZooKeeper 是否正常运行

  1. /usr/local/zookeeper/bin/zkCli.sh -server localhost:2181
复制代码
在连接成功后输入 ls /,若返回空列表([]),则分析连接成功。
4. 安装 Kafka

4.1 下载并解压 Kafka

访问 Kafka 官网 下载最新版本的 Kafka。
  1. wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
  2. tar -xzvf kafka_2.12-3.5.0.tgz
  3. mv kafka_2.12-3.5.0 /usr/local/kafka
复制代码
4.2 设置 Kafka

编辑 Kafka 的设置文件 /usr/local/kafka/config/server.properties:
  1. # Kafka Broker ID,唯一标识符
  2. broker.id=0
  3. # 监听的接口和端口
  4. listeners=PLAINTEXT://:9092
  5. # 日志文件存储路径
  6. log.dirs=/var/lib/kafka-logs
  7. # Zookeeper 连接地址
  8. zookeeper.connect=localhost:2181
复制代码
4.3 创建日志目录

  1. mkdir -p /var/lib/kafka-logs
复制代码
4.4 启动 Kafka Broker

  1. /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
复制代码
4.5 验证 Kafka 是否正常运行

创建一个测试 Topic:
  1. /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
复制代码
列出 Topic:
  1. /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
复制代码
你应该看到 test-topic 在列出的 Topic 中。
5. Kafka 单机摆设的注意事项



  • ZooKeeper:确保 ZooKeeper 正常运行,而且 zookeeper.connect 地点设置准确。
  • 内存和存储:为 Kafka 分配富足的内存和存储空间,尤其是在高负载场景下。
  • 日志文件:定期查抄和清理 Kafka 日志文件,以防止磁盘占满。
  • 监听地点:如果需要远程访问,确保 listeners 设置了准确的监听地点。
  • 防火墙设置:确保防火墙开放了 Kafka 和 ZooKeeper 使用的端口(默认 9092 和 2181)。

二、Kafka 集群环境摆设

Kafka 集群由多个 Kafka Broker 组成,可以或许提供高可用性和程度扩展。
1. 环境准备



  • 多台服务器:至少 3 台(3 个 Kafka Broker 和 3 个 ZooKeeper 实例)
  • 操作体系:Linux(保举 Ubuntu 20.04 或 CentOS 7)
  • Java:在所有节点上安装 Java
2. 安装 ZooKeeper 集群

在每台服务器上按照单机摆设的步骤安装 ZooKeeper,并进行以下设置:
2.1 设置 ZooKeeper 节点 ID

编辑每个节点的 zoo.cfg 文件,添加如下设置:
  1. server.1=zookeeper1:2888:3888
  2. server.2=zookeeper2:2888:3888
  3. server.3=zookeeper3:2888:3888
复制代码
在每台服务器上创建 myid 文件,用于标识节点:
  1. echo "1" > /var/lib/zookeeper/myid  # 在 zookeeper1 上
  2. echo "2" > /var/lib/zookeeper/myid  # 在 zookeeper2 上
  3. echo "3" > /var/lib/zookeeper/myid  # 在 zookeeper3 上
复制代码
2.2 启动 ZooKeeper 集群

在每台服务器上启动 ZooKeeper:
  1. /usr/local/zookeeper/bin/zkServer.sh start
复制代码
3. 安装 Kafka 集群

在每台服务器上按照单机摆设的步骤安装 Kafka,并进行以下设置:
3.1 设置 Kafka Broker

编辑每个节点的 server.properties 文件,添加如下设置:
  1. broker.id=0  # 每个 Broker 唯一 ID
  2. listeners=PLAINTEXT://:9092
  3. log.dirs=/var/lib/kafka-logs
  4. zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
复制代码
3.2 启动 Kafka Broker

在每台服务器上启动 Kafka Broker:
  1. /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
复制代码
4. 验证 Kafka 集群状态

4.1 创建 Topic

在任一 Kafka Broker 上执行以下下令:
  1. /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --partitions 3 --replication-factor 3
复制代码
4.2 验证 Topic

列出集群中的 Topic:
  1. /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka1:9092
复制代码
查察 Topic 具体信息:
  1. /usr/local/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server kafka1:9092
复制代码
5. Kafka 集群摆设的注意事项



  • ZooKeeper 集群:确保每个节点设置了准确的 myid,而且所有节点可以相互通信。
  • Kafka Broker 设置:每个 Broker 必须有唯一的 broker.id。
  • 分区和副本:根据实际需求设置符合的分区数和副本数,以进步数据可靠性和吞吐量。
  • 监控和报警:使用 Kafka Manager 或其他监控工具监控集群状态,及时处置惩罚故障。
  • 网络设置:确保各节点之间的网络连接正常,而且防火墙开放了须要端口。
  • 资源规划:为 Kafka 和 ZooKeeper 分配富足的 CPU、内存和磁盘资源。

三、Kafka 使用案例:生产者和消费者

1. 使用 Java 实现 Kafka 生产者和消费者

1.1 添加依赖

在 Maven 项目中添加 Kafka 的依赖:
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4.     <version>3.5.0</version>
  5. </dependency>
复制代码
1.2 编写 Kafka 生产者

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.Producer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class SimpleProducer {
  7.     public static void main(String[] args) {
  8.         // Kafka 生产者配置
  9.         Properties props = new Properties();
  10.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  11.         props.put(ProducerConfig
  12. .KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  13.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  14.         // 创建生产者
  15.         Producer<String, String> producer = new KafkaProducer<>(props);
  16.         // 发送消息
  17.         for (int i = 0; i < 10; i++) {
  18.             ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", Integer.toString(i), "Message " + i);
  19.             producer.send(record);
  20.         }
  21.         // 关闭生产者
  22.         producer.close();
  23.     }
  24. }
复制代码
1.3 编写 Kafka 消费者

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import java.util.Collections;
  6. import java.util.Properties;
  7. public class SimpleConsumer {
  8.     public static void main(String[] args) {
  9.         // Kafka 消费者配置
  10.         Properties props = new Properties();
  11.         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  12.         props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
  13.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  14.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  15.         // 创建消费者
  16.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  17.         // 订阅主题
  18.         consumer.subscribe(Collections.singletonList("test-topic"));
  19.         // 轮询消息
  20.         while (true) {
  21.             ConsumerRecords<String, String> records = consumer.poll(100);
  22.             for (ConsumerRecord<String, String> record : records) {
  23.                 System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
  24.             }
  25.         }
  26.     }
  27. }
复制代码
1.4 运行 Java 程序

编译并运行生产者:
  1. mvn compile
  2. mvn exec:java -Dexec.mainClass="SimpleProducer"
复制代码
编译并运行消费者:
  1. mvn exec:java -Dexec.mainClass="SimpleConsumer"
复制代码
2. 使用 Python 实现 Kafka 生产者和消费者

2.1 安装 Kafka 库

  1. pip install kafka-python
复制代码
2.2 编写 Kafka 生产者

  1. from kafka import KafkaProducer
  2. # 创建 Kafka 生产者
  3. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  4. # 发送消息
  5. for i in range(10):
  6.     producer.send('test-topic', key=bytes(str(i), encoding='utf-8'), value=bytes(f'Message {i}', encoding='utf-8'))
  7. # 关闭生产者
  8. producer.close()
复制代码
2.3 编写 Kafka 消费者

  1. from kafka import KafkaConsumer
  2. # 创建 Kafka 消费者
  3. consumer = KafkaConsumer(
  4.     'test-topic',
  5.     bootstrap_servers='localhost:9092',
  6.     group_id='test-group',
  7.     auto_offset_reset='earliest'
  8. )
  9. # 轮询消息
  10. for message in consumer:
  11.     print(f'Offset = {message.offset}, Key = {message.key.decode()}, Value = {message.value.decode()}')
复制代码
2.4 运行 Python 程序

运行生产者:
  1. python kafka_producer.py
复制代码
运行消费者:
  1. python kafka_consumer.py
复制代码
3. 注意事项



  • 生产者和消费者设置:合理设置 bootstrap.servers、key.serializer、value.serializer、group.id 等参数。
  • 分区策略:在生产者中使用自界说分区策略,可以进步吞吐量和负载均衡。
  • 消费组:多个消费者实例可以组成一个消费组,以进步处置惩罚本领。
  • 容错机制:在实际应用中,需要考虑重试、错误处置惩罚和幂等性等问题。

总结

通过以上步骤,我们成功摆设了 Kafka 单机和集群环境,并实现了一个简朴的生产者和消费者应用。Kafka 提供了高吞吐量、低延迟的消息通报本领,适合用于及时流处置惩罚和数据管道。
摆设过程中的注意事项



  • Java 版本:确保安装了准确版本的 Java。
  • ZooKeeper 集群:确保 ZooKeeper 集群稳定运行,并设置准确。
  • 网络设置:各节点之间的网络连接需要稳定,端口要开放。
  • 资源设置:根据业务需求设置符合的内存、CPU 和磁盘资源。
  • 数据安全:启用 Kafka 的 SSL/TLS 和 SASL 认证机制,确保数据安全传输。
  • 监控和管理:使用 Kafka Manager、Prometheus 等工具监控集群状态,及时处置惩罚异常。
  • 日志管理:定期查抄和清理 Kafka 的日志,以防止磁盘空间不足。
   通过合理的设置和优化,Kafka 可以为应用程序提供可靠的消息通报和流处置惩罚服务,是构建及时数据管道和事件驱动架构的重要组件。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

反转基因福娃

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表