一、Windows11本地开启Kafka服务器
1.1、开启Zookeeper
Windows下安装Zookeeper的安装可以参考这篇博文:
Windows下安装Zookeeper(图文记录详细步调,手把手包安装成功)
先启动Zookeeper服务
管理员权限打开命令窗口,输入命令zkServer
,启动Zookeeper服务:
1.2、开启Kafka服务
Windows下安装Kafka的安装可以参考这篇博文:
【Kafka】Windows下安装Kafka(图文记录详细步调)
管理员权限打开命令窗口,进入到Kafka安装目录(D:\bigdata\kafka\2.12-3.5.1)。
输入如下命令启动Kafka服务:
- .\bin\windows\kafka-server-start.bat .\config\server.properties
复制代码
二、Python创建生产者Producer
起首运行生产者Producer
KafkaConsumerDemo.py
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import json
- import time
- from kafka import KafkaProducer
- from kafka.errors import KafkaError
- producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
- # Assign a topic
- topic = 'test'
- def test():
- print('begin')
- n = 1
- # 初始化一个变量用来存储用户输入
- user_input = ""
- try:
- # 使用while循环来持续获取用户输入
- while user_input != "quit": # 条件是用户输入不是'quit'
- user_input = input("Kafka生产者,生产消息,输入内容(输入'quit'退出):")
- if user_input != "quit" and user_input != "":
- producer.send(topic, user_input.encode())
- n += 1
- # time.sleep(1.5)
- except KafkaError as e:
- print(e)
- finally:
- producer.close()
- print('done')
- def test_json():
- msg_dict = {
- "sleep_time": 10,
- "db_config": {
- "database": "test_1",
- "host": "xxxx",
- "user": "root",
- "password": "root"
- },
- "table": "msg",
- "msg": "Hello World"
- }
- msg = json.dumps(msg_dict)
- producer.send(topic, msg, partition=0)
- producer.close()
- if __name__ == '__main__':
- test()
复制代码 可以不绝发送消息:
三、Python创建消费者Consumer
再创建消费者Consumer
KafkaConsumerDemo.py
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- from pykafka import KafkaClient
- from pykafka.common import OffsetType
- bootstrap_servers = '127.0.0.1:9092'
- group_id = 'test'
- class KConsumer(object):
- """kafka 消费者; 动态传参,非配置文件传入;
- kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
- """
- _encode = "UTF-8"
- def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
- """ 初始化kafka的消费者;
- 1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
- 2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
- Args:
- topics: str; kafka 的消费主题;
- bootstrap_server: list; kafka 的消费者地址;
- group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
- """
- if bootstrap_server is None:
- bootstrap_server = bootstrap_servers
- self.client = KafkaClient(hosts=bootstrap_server)
- # 选择要消费的topic
- vpn_topic = self.client.topics[topics]
- self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
- consumer_timeout_ms=200,
- auto_commit_enable=False, # 自动提交偏移量
- # LATEST 获取当前偏移量最新消息 EARLIEST 从头开始获取信息
- auto_offset_reset=OffsetType.EARLIEST)
- def recv(self):
- """
- 接收消费中的数据
- Returns:
- """
- return self.consumer
- def main():
- """
- kafka消费队列入口
- :param topic:
- :return:
- """
- obj = KConsumer(topics="test")
- while True:
- for message in obj.recv():
- # data = eval(message.value.decode('utf-8'))
- data = message.value.decode('utf-8')
- print("Kafka消费者接受到的消息:" + data)
- if __name__ == '__main__':
- main()
复制代码 可以发现可以或许实时收到生产者发送来的消息:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |