【Python-Kafka】Python毗连Windows下的kafka服务器,创建生产者Producer和
一、Windows11本地开启Kafka服务器1.1、开启Zookeeper
Windows下安装Zookeeper的安装可以参考这篇博文:
Windows下安装Zookeeper(图文记录详细步调,手把手包安装成功)
先启动Zookeeper服务
管理员权限打开命令窗口,输入命令zkServer
,启动Zookeeper服务:
zkServer
https://i-blog.csdnimg.cn/blog_migrate/6080fcf171e57144778a4d9c773ea4ed.png
1.2、开启Kafka服务
Windows下安装Kafka的安装可以参考这篇博文:
【Kafka】Windows下安装Kafka(图文记录详细步调)
管理员权限打开命令窗口,进入到Kafka安装目录(D:\bigdata\kafka\2.12-3.5.1)。
输入如下命令启动Kafka服务:
.\bin\windows\kafka-server-start.bat .\config\server.properties
https://i-blog.csdnimg.cn/blog_migrate/df7bf0d8a244a7c0d2ec2520bc859965.png
二、Python创建生产者Producer
起首运行生产者Producer
KafkaConsumerDemo.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
# Assign a topic
topic = 'test'
def test():
print('begin')
n = 1
# 初始化一个变量用来存储用户输入
user_input = ""
try:
# 使用while循环来持续获取用户输入
while user_input != "quit":# 条件是用户输入不是'quit'
user_input = input("Kafka生产者,生产消息,输入内容(输入'quit'退出):")
if user_input != "quit" and user_input != "":
producer.send(topic, user_input.encode())
n += 1
# time.sleep(1.5)
except KafkaError as e:
print(e)
finally:
producer.close()
print('done')
def test_json():
msg_dict = {
"sleep_time": 10,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send(topic, msg, partition=0)
producer.close()
if __name__ == '__main__':
test()
可以不绝发送消息:
https://i-blog.csdnimg.cn/blog_migrate/b7df4414c9174f21ab763b3bcd98c31a.png
三、Python创建消费者Consumer
再创建消费者Consumer
KafkaConsumerDemo.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType
bootstrap_servers = '127.0.0.1:9092'
group_id = 'test'
class KConsumer(object):
"""kafka 消费者; 动态传参,非配置文件传入;
kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
"""
_encode = "UTF-8"
def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
""" 初始化kafka的消费者;
1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
Args:
topics: str; kafka 的消费主题;
bootstrap_server: list; kafka 的消费者地址;
group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
"""
if bootstrap_server is None:
bootstrap_server = bootstrap_servers
self.client = KafkaClient(hosts=bootstrap_server)
# 选择要消费的topic
vpn_topic = self.client.topics
self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
consumer_timeout_ms=200,
auto_commit_enable=False,# 自动提交偏移量
# LATEST 获取当前偏移量最新消息EARLIEST 从头开始获取信息
auto_offset_reset=OffsetType.EARLIEST)
def recv(self):
"""
接收消费中的数据
Returns:
"""
return self.consumer
def main():
"""
kafka消费队列入口
:param topic:
:return:
"""
obj = KConsumer(topics="test")
while True:
for message in obj.recv():
# data = eval(message.value.decode('utf-8'))
data = message.value.decode('utf-8')
print("Kafka消费者接受到的消息:" + data)
if __name__ == '__main__':
main()
可以发现可以或许实时收到生产者发送来的消息:
https://i-blog.csdnimg.cn/blog_migrate/5d77b8ceeb270661869b59dc293d826c.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]