tsx81429 发表于 2024-11-11 04:27:55

【Python-Kafka】Python毗连Windows下的kafka服务器,创建生产者Producer和

一、Windows11本地开启Kafka服务器

1.1、开启Zookeeper

Windows下安装Zookeeper的安装可以参考这篇博文:
Windows下安装Zookeeper(图文记录详细步调,手把手包安装成功)
先启动Zookeeper服务
管理员权限打开命令窗口,输入命令zkServer
,启动Zookeeper服务:
zkServer
https://i-blog.csdnimg.cn/blog_migrate/6080fcf171e57144778a4d9c773ea4ed.png
1.2、开启Kafka服务

Windows下安装Kafka的安装可以参考这篇博文:
【Kafka】Windows下安装Kafka(图文记录详细步调)
管理员权限打开命令窗口,进入到Kafka安装目录(D:\bigdata\kafka\2.12-3.5.1)。
输入如下命令启动Kafka服务:
.\bin\windows\kafka-server-start.bat .\config\server.properties
https://i-blog.csdnimg.cn/blog_migrate/df7bf0d8a244a7c0d2ec2520bc859965.png
二、Python创建生产者Producer

起首运行生产者Producer
KafkaConsumerDemo.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import time

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
# Assign a topic
topic = 'test'


def test():
    print('begin')
    n = 1
    # 初始化一个变量用来存储用户输入
    user_input = ""
    try:
      # 使用while循环来持续获取用户输入
      while user_input != "quit":# 条件是用户输入不是'quit'
            user_input = input("Kafka生产者,生产消息,输入内容(输入'quit'退出):")
            if user_input != "quit" and user_input != "":
                producer.send(topic, user_input.encode())
                n += 1
                # time.sleep(1.5)
    except KafkaError as e:
      print(e)
    finally:
      producer.close()
      print('done')


def test_json():
    msg_dict = {
      "sleep_time": 10,
      "db_config": {
            "database": "test_1",
            "host": "xxxx",
            "user": "root",
            "password": "root"
      },
      "table": "msg",
      "msg": "Hello World"
    }
    msg = json.dumps(msg_dict)
    producer.send(topic, msg, partition=0)
    producer.close()


if __name__ == '__main__':
    test()
可以不绝发送消息:
https://i-blog.csdnimg.cn/blog_migrate/b7df4414c9174f21ab763b3bcd98c31a.png
三、Python创建消费者Consumer

再创建消费者Consumer
KafkaConsumerDemo.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pykafka import KafkaClient
from pykafka.common import OffsetType

bootstrap_servers = '127.0.0.1:9092'
group_id = 'test'


class KConsumer(object):
    """kafka 消费者; 动态传参,非配置文件传入;
      kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
   """
    _encode = "UTF-8"

    def __init__(self, topics, bootstrap_server=None, group_id=group_id, partitions=None):
      """ 初始化kafka的消费者;
         1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
         2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
         Args:
         topics: str; kafka 的消费主题;
         bootstrap_server: list; kafka 的消费者地址;
         group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
         """
      if bootstrap_server is None:
            bootstrap_server = bootstrap_servers
      self.client = KafkaClient(hosts=bootstrap_server)
      # 选择要消费的topic
      vpn_topic = self.client.topics
      self.consumer = vpn_topic.get_simple_consumer(consumer_group=group_id,
                                                      consumer_timeout_ms=200,
                                                      auto_commit_enable=False,# 自动提交偏移量
                                                      # LATEST 获取当前偏移量最新消息EARLIEST 从头开始获取信息
                                                      auto_offset_reset=OffsetType.EARLIEST)

    def recv(self):
      """
         接收消费中的数据
         Returns:
         """
      return self.consumer


def main():
    """
    kafka消费队列入口
    :param topic:
    :return:
    """
    obj = KConsumer(topics="test")
    while True:
      for message in obj.recv():
            # data = eval(message.value.decode('utf-8'))
            data = message.value.decode('utf-8')
            print("Kafka消费者接受到的消息:" + data)


if __name__ == '__main__':
    main()
可以发现可以或许实时收到生产者发送来的消息:
https://i-blog.csdnimg.cn/blog_migrate/5d77b8ceeb270661869b59dc293d826c.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【Python-Kafka】Python毗连Windows下的kafka服务器,创建生产者Producer和