马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
下面是 Apache Kafka 单机和集群环境摆设的具体教程,包罗摆设过程中的注意事项以及一个使用案例。Apache Kafka 是一个分布式流处置惩罚平台,广泛用于及时数据处置惩罚、日志收集、消息队列等场景。
一、Kafka 单机环境摆设
1. 环境准备
- 操作体系:Linux(保举 Ubuntu 20.04 或 CentOS 7)
- Java:Kafka 需要 Java 环境,保举使用 OpenJDK 8 或 11。
- ZooKeeper:Kafka 依赖 ZooKeeper 进行分布式和谐。
2. 安装 Java
在 Ubuntu 中:
- sudo apt update
- sudo apt install openjdk-11-jdk
复制代码 在 CentOS 中:
- sudo yum install java-11-openjdk
复制代码 验证 Java 安装:
3. 安装 ZooKeeper
Kafka 使用 ZooKeeper 进行节点管理和和谐,需要先安装并启动 ZooKeeper。
3.1 下载并解压 ZooKeeper
- wget https://downloads.apache.org/zookeeper/zookeeper-3.8.2/apache-zookeeper-3.8.2-bin.tar.gz
- tar -xzvf apache-zookeeper-3.8.2-bin.tar.gz
- mv apache-zookeeper-3.8.2-bin /usr/local/zookeeper
复制代码 3.2 设置 ZooKeeper
- 创建数据目录:
- mkdir -p /var/lib/zookeeper
复制代码 - 复制设置文件:
- cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
复制代码 - 编辑设置文件 /usr/local/zookeeper/conf/zoo.cfg:
- dataDir=/var/lib/zookeeper
- clientPort=2181
复制代码 3.3 启动 ZooKeeper
- /usr/local/zookeeper/bin/zkServer.sh start
复制代码 3.4 验证 ZooKeeper 是否正常运行
- /usr/local/zookeeper/bin/zkCli.sh -server localhost:2181
复制代码 在连接成功后输入 ls /,若返回空列表([]),则分析连接成功。
4. 安装 Kafka
4.1 下载并解压 Kafka
访问 Kafka 官网 下载最新版本的 Kafka。
- wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
- tar -xzvf kafka_2.12-3.5.0.tgz
- mv kafka_2.12-3.5.0 /usr/local/kafka
复制代码 4.2 设置 Kafka
编辑 Kafka 的设置文件 /usr/local/kafka/config/server.properties:
- # Kafka Broker ID,唯一标识符
- broker.id=0
- # 监听的接口和端口
- listeners=PLAINTEXT://:9092
- # 日志文件存储路径
- log.dirs=/var/lib/kafka-logs
- # Zookeeper 连接地址
- zookeeper.connect=localhost:2181
复制代码 4.3 创建日志目录
- mkdir -p /var/lib/kafka-logs
复制代码 4.4 启动 Kafka Broker
- /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
复制代码 4.5 验证 Kafka 是否正常运行
创建一个测试 Topic:
- /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
复制代码 列出 Topic:
- /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
复制代码 你应该看到 test-topic 在列出的 Topic 中。
5. Kafka 单机摆设的注意事项
- ZooKeeper:确保 ZooKeeper 正常运行,而且 zookeeper.connect 地点设置准确。
- 内存和存储:为 Kafka 分配富足的内存和存储空间,尤其是在高负载场景下。
- 日志文件:定期查抄和清理 Kafka 日志文件,以防止磁盘占满。
- 监听地点:如果需要远程访问,确保 listeners 设置了准确的监听地点。
- 防火墙设置:确保防火墙开放了 Kafka 和 ZooKeeper 使用的端口(默认 9092 和 2181)。
二、Kafka 集群环境摆设
Kafka 集群由多个 Kafka Broker 组成,可以或许提供高可用性和程度扩展。
1. 环境准备
- 多台服务器:至少 3 台(3 个 Kafka Broker 和 3 个 ZooKeeper 实例)
- 操作体系:Linux(保举 Ubuntu 20.04 或 CentOS 7)
- Java:在所有节点上安装 Java
2. 安装 ZooKeeper 集群
在每台服务器上按照单机摆设的步骤安装 ZooKeeper,并进行以下设置:
2.1 设置 ZooKeeper 节点 ID
编辑每个节点的 zoo.cfg 文件,添加如下设置:
- server.1=zookeeper1:2888:3888
- server.2=zookeeper2:2888:3888
- server.3=zookeeper3:2888:3888
复制代码 在每台服务器上创建 myid 文件,用于标识节点:
- echo "1" > /var/lib/zookeeper/myid # 在 zookeeper1 上
- echo "2" > /var/lib/zookeeper/myid # 在 zookeeper2 上
- echo "3" > /var/lib/zookeeper/myid # 在 zookeeper3 上
复制代码 2.2 启动 ZooKeeper 集群
在每台服务器上启动 ZooKeeper:
- /usr/local/zookeeper/bin/zkServer.sh start
复制代码 3. 安装 Kafka 集群
在每台服务器上按照单机摆设的步骤安装 Kafka,并进行以下设置:
3.1 设置 Kafka Broker
编辑每个节点的 server.properties 文件,添加如下设置:
- broker.id=0 # 每个 Broker 唯一 ID
- listeners=PLAINTEXT://:9092
- log.dirs=/var/lib/kafka-logs
- zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
复制代码 3.2 启动 Kafka Broker
在每台服务器上启动 Kafka Broker:
- /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
复制代码 4. 验证 Kafka 集群状态
4.1 创建 Topic
在任一 Kafka Broker 上执行以下下令:
- /usr/local/kafka/bin/kafka-topics.sh --create --topic test-topic --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --partitions 3 --replication-factor 3
复制代码 4.2 验证 Topic
列出集群中的 Topic:
- /usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka1:9092
复制代码 查察 Topic 具体信息:
- /usr/local/kafka/bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server kafka1:9092
复制代码 5. Kafka 集群摆设的注意事项
- ZooKeeper 集群:确保每个节点设置了准确的 myid,而且所有节点可以相互通信。
- Kafka Broker 设置:每个 Broker 必须有唯一的 broker.id。
- 分区和副本:根据实际需求设置符合的分区数和副本数,以进步数据可靠性和吞吐量。
- 监控和报警:使用 Kafka Manager 或其他监控工具监控集群状态,及时处置惩罚故障。
- 网络设置:确保各节点之间的网络连接正常,而且防火墙开放了须要端口。
- 资源规划:为 Kafka 和 ZooKeeper 分配富足的 CPU、内存和磁盘资源。
三、Kafka 使用案例:生产者和消费者
1. 使用 Java 实现 Kafka 生产者和消费者
1.1 添加依赖
在 Maven 项目中添加 Kafka 的依赖:
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.5.0</version>
- </dependency>
复制代码 1.2 编写 Kafka 生产者
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.util.Properties;
- public class SimpleProducer {
- public static void main(String[] args) {
- // Kafka 生产者配置
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ProducerConfig
- .KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- // 创建生产者
- Producer<String, String> producer = new KafkaProducer<>(props);
- // 发送消息
- for (int i = 0; i < 10; i++) {
- ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", Integer.toString(i), "Message " + i);
- producer.send(record);
- }
- // 关闭生产者
- producer.close();
- }
- }
复制代码 1.3 编写 Kafka 消费者
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import java.util.Collections;
- import java.util.Properties;
- public class SimpleConsumer {
- public static void main(String[] args) {
- // Kafka 消费者配置
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- // 创建消费者
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- // 订阅主题
- consumer.subscribe(Collections.singletonList("test-topic"));
- // 轮询消息
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- }
- }
复制代码 1.4 运行 Java 程序
编译并运行生产者:
- mvn compile
- mvn exec:java -Dexec.mainClass="SimpleProducer"
复制代码 编译并运行消费者:
- mvn exec:java -Dexec.mainClass="SimpleConsumer"
复制代码 2. 使用 Python 实现 Kafka 生产者和消费者
2.1 安装 Kafka 库
2.2 编写 Kafka 生产者
- from kafka import KafkaProducer
- # 创建 Kafka 生产者
- producer = KafkaProducer(bootstrap_servers='localhost:9092')
- # 发送消息
- for i in range(10):
- producer.send('test-topic', key=bytes(str(i), encoding='utf-8'), value=bytes(f'Message {i}', encoding='utf-8'))
- # 关闭生产者
- producer.close()
复制代码 2.3 编写 Kafka 消费者
- from kafka import KafkaConsumer
- # 创建 Kafka 消费者
- consumer = KafkaConsumer(
- 'test-topic',
- bootstrap_servers='localhost:9092',
- group_id='test-group',
- auto_offset_reset='earliest'
- )
- # 轮询消息
- for message in consumer:
- print(f'Offset = {message.offset}, Key = {message.key.decode()}, Value = {message.value.decode()}')
复制代码 2.4 运行 Python 程序
运行生产者:
运行消费者:
3. 注意事项
- 生产者和消费者设置:合理设置 bootstrap.servers、key.serializer、value.serializer、group.id 等参数。
- 分区策略:在生产者中使用自界说分区策略,可以进步吞吐量和负载均衡。
- 消费组:多个消费者实例可以组成一个消费组,以进步处置惩罚本领。
- 容错机制:在实际应用中,需要考虑重试、错误处置惩罚和幂等性等问题。
总结
通过以上步骤,我们成功摆设了 Kafka 单机和集群环境,并实现了一个简朴的生产者和消费者应用。Kafka 提供了高吞吐量、低延迟的消息通报本领,适合用于及时流处置惩罚和数据管道。
摆设过程中的注意事项
- Java 版本:确保安装了准确版本的 Java。
- ZooKeeper 集群:确保 ZooKeeper 集群稳定运行,并设置准确。
- 网络设置:各节点之间的网络连接需要稳定,端口要开放。
- 资源设置:根据业务需求设置符合的内存、CPU 和磁盘资源。
- 数据安全:启用 Kafka 的 SSL/TLS 和 SASL 认证机制,确保数据安全传输。
- 监控和管理:使用 Kafka Manager、Prometheus 等工具监控集群状态,及时处置惩罚异常。
- 日志管理:定期查抄和清理 Kafka 的日志,以防止磁盘空间不足。
通过合理的设置和优化,Kafka 可以为应用程序提供可靠的消息通报和流处置惩罚服务,是构建及时数据管道和事件驱动架构的重要组件。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |