1 选择语言
2 运行curl 的url下令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到当地,命名为client.properties
5 复制客户端代码
import time
- from confluent_kafka import Producer, Consumer
- import asyncio
- import threading
- class KafkaClient:
- def __init__(self, config_file):
- self.config = self.read_config(config_file)
- def read_config(self, config_file):
- config = {}
- with open(config_file) as fh:
- for line in fh:
- line = line.strip()
- if len(line) != 0 and line[0] != "#":
- parameter, value = line.strip().split('=', 1)
- config[parameter] = value.strip()
- return config
- def produce(self, topic, key, value):
- # Creates a new producer instance
- producer = Producer(self.config)
- # Produces a sample message
- producer.produce(topic, key=key, value=value)
- print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")
- # Send any outstanding or buffered messages to the Kafka broker
- producer.flush()
- def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):
- # Sets the consumer group ID and offset
- self.config["group.id"] = group_id
- self.config["auto.offset.reset"] = auto_offset_reset
- consumer = Consumer(self.config)
- consumer.subscribe([topic])
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- if callback is not None:
- loop.run_until_complete(callback(consumer))
- def consume(self, topic, callback=None):
- thread = threading.Thread(target=self.consume_async, args=(topic, callback,))
- thread.start()
- return thread
- async def consume_async(consumer):
- try:
- while True:
- msg = consumer.poll(1.0)
- if msg is not None:
- break
- if not msg.error():
- key = msg.key().decode("utf-8")
- value = msg.value().decode("utf-8")
- print(f"Consumed message: key = {key:12} value = {value:12}")
- except KeyboardInterrupt:
- pass
- finally:
- consumer.close()
- config_file_path = ".\\client.properties"
- topic = "test"
- key = "key"
- value = "value"
- kafka_client = KafkaClient(config_file_path)
- kafka_client.produce(topic, key, value)
- thread = kafka_client.consume(topic, consume_async)
复制代码 配置文件
- # Required connection configs for Kafka producer, consumer, and admin
- bootstrap.servers=
- security.protocol=SASL_SSL
- sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
- sasl.mechanism=PLAIN
- # Required for correctness in Apache Kafka clients prior to 2.6
- client.dns.lookup=use_all_dns_ips
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
- # Best practice for higher availability in Apache Kafka clients prior to 3.0
- session.timeout.ms=45000
- topic=
- group.id=
- auto.offset.reset=earliest
- key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- # Best practice for Kafka producer to prevent data loss
- acks=all
复制代码 java(kotiln)
- import kotlinx.coroutines.CoroutineScope
- import kotlinx.coroutines.delay
- import kotlinx.coroutines.launch
- import kotlinx.coroutines.newFixedThreadPoolContext
- import org.apache.kafka.clients.consumer.ConsumerRecords
- import org.apache.kafka.clients.consumer.KafkaConsumer
- import org.apache.kafka.clients.producer.KafkaProducer
- import org.apache.kafka.clients.producer.ProducerRecord
- import java.io.Closeable
- import java.io.FileInputStream
- import java.io.IOException
- import java.nio.file.Files
- import java.nio.file.Paths
- import java.time.Duration
- import java.util.*
- class KafkaClient<T, V> : Closeable {
- private var producer: KafkaProducer<T, V>? = null
- private var fileConfig: Properties? = null
- val TOPIC = "topic"
- private val DURATION = 100L
- private val POOLSIZE = 10
- private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")
- private val SCOPE = CoroutineScope(DISPATCHER)
- constructor(configPath: String? = null, config: Properties? = null) {
- if (config == null && configPath == null) throw Exception("don't have any config")
- var config1 = Properties()
- if (configPath != null) {
- fileConfig = readConfig(configPath)
- fileConfig?.let { config1.putAll(it) }
- }
- if (config != null) {
- config1.putAll(config)
- }
- producer = KafkaProducer(config1)
- }
- fun produce(key: T, value: V, topic: String? = null) {
- producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))
- }
- fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {
- val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)
- consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))
- SCOPE.launch {
- while (true) {
- val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))
- func(records)
- delay(DURATION)
- }
- }
- }
- @Throws(IOException::class)
- fun readConfig(configFile: String): Properties {
- if (!Files.exists(Paths.get(configFile))) {
- throw IOException("$configFile not found.")
- }
- val config = Properties()
- FileInputStream(configFile).use { inputStream -> config.load(inputStream) }
- return config
- }
- override fun close() {
- producer?.close()
- }
- }
- fun main() {
- val cli =
- KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")
- cli.consume {
- println("test beg")
- for (record in it) {
- println(
- String.format(
- "Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()
- )
- )
- }
- println("test end")
- }
- // Give some time for the consumer to start
- Thread.sleep(2000)
- cli.produce("key1", "test")
- // Give some time for the consumer to consume the message
- Thread.sleep(5000)
- }
