【Python-Kafka】Python毗连Windows下的kafka服务器,创建生产者Producer和 ...

打印 上一主题 下一主题

主题 835|帖子 835|积分 2505


一、Windows11本地开启Kafka服务器

1.1、开启Zookeeper

Windows下安装Zookeeper的安装可以参考这篇博文:
Windows下安装Zookeeper(图文记录详细步调,手把手包安装成功)
先启动Zookeeper服务
管理员权限打开命令窗口,输入命令zkServer
,启动Zookeeper服务:
  1. zkServer
复制代码

1.2、开启Kafka服务

Windows下安装Kafka的安装可以参考这篇博文:
【Kafka】Windows下安装Kafka(图文记录详细步调)
管理员权限打开命令窗口,进入到Kafka安装目录(D:\bigdata\kafka\2.12-3.5.1)。
输入如下命令启动Kafka服务:
  1. .\bin\windows\kafka-server-start.bat .\config\server.properties
复制代码

二、Python创建生产者Producer

起首运行生产者Producer
KafkaConsumerDemo.py
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import json
  4. import time
  5. from kafka import KafkaProducer
  6. from kafka.errors import KafkaError
  7. producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  8. # Assign a topic
  9. topic = 'test'
  10. def test():
  11.     print('begin')
  12.     n = 1
  13.     # 初始化一个变量用来存储用户输入
  14.     user_input = ""
  15.     try:
  16.         # 使用while循环来持续获取用户输入
  17.         while user_input != "quit":  # 条件是用户输入不是'quit'
  18.             user_input = input("Kafka生产者,生产消息,输入内容(输入'quit'退出):")
  19.             if user_input != "quit" and user_input != "":
  20.                 producer.send(topic, user_input.encode())
  21.                 n += 1
  22.                 # time.sleep(1.5)
  23.     except KafkaError as e:
  24.         print(e)
  25.     finally:
  26.         producer.close()
  27.         print('done')
  28. def test_json():
  29.     msg_dict = {
  30.         "sleep_time": 10,
  31.         "db_config": {
  32.             "database": "test_1",
  33.             "host": "xxxx",
  34.             "user": "root",
  35.             "password": "root"
  36.         },
  37.         "table": "msg",
  38.         "msg": "Hello World"
  39.     }
  40.     msg = json.dumps(msg_dict)
  41.     producer.send(topic, msg, partition=0)
  42.     producer.close()
  43. if __name__ == '__main__':
  44.     test()
复制代码
可以不绝发送消息:

三、Python创建消费者Consumer

再创建消费者Consumer
KafkaConsumerDemo.py
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from pykafka import KafkaClient
  4. from pykafka.common import OffsetType
  5. bootstrap_servers = '127.0.0.1:9092'
  6. group_id = 'test'
  7. class KConsumer(object):
  8.     """kafka 消费者; 动态传参,非配置文件传入;
  9.       kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
  10.      """
  11.     _encode = "UTF-8"
  12.     def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
  13.         """ 初始化kafka的消费者;
  14.            1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
  15.            2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
  16.          Args:
  17.            topics: str; kafka 的消费主题;
  18.            bootstrap_server: list; kafka 的消费者地址;
  19.            group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
  20.          """
  21.         if bootstrap_server is None:
  22.             bootstrap_server = bootstrap_servers
  23.         self.client = KafkaClient(hosts=bootstrap_server)
  24.         # 选择要消费的topic
  25.         vpn_topic = self.client.topics[topics]
  26.         self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
  27.                                                       consumer_timeout_ms=200,
  28.                                                       auto_commit_enable=False,  # 自动提交偏移量
  29.                                                       # LATEST 获取当前偏移量最新消息  EARLIEST 从头开始获取信息
  30.                                                       auto_offset_reset=OffsetType.EARLIEST)
  31.     def recv(self):
  32.         """
  33.          接收消费中的数据
  34.          Returns:
  35.          """
  36.         return self.consumer
  37. def main():
  38.     """
  39.     kafka消费队列入口
  40.     :param topic:
  41.     :return:
  42.     """
  43.     obj = KConsumer(topics="test")
  44.     while True:
  45.         for message in obj.recv():
  46.             # data = eval(message.value.decode('utf-8'))
  47.             data = message.value.decode('utf-8')
  48.             print("Kafka消费者接受到的消息:" + data)
  49. if __name__ == '__main__':
  50.     main()
复制代码
可以发现可以或许实时收到生产者发送来的消息:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81429

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表