java(kotlin)和 python 通过DoubleCloud的kafka举行线程间通信

打印 上一主题 下一主题

主题 553|帖子 553|积分 1659

进入
DoubleCloud
https://www.double.cloud
创建一个kafka
1 选择语言
2 运行curl 的url下令启动一个topic
3 生成对应语言的token
4 复制3中的配置文件到当地,命名为client.properties
5 复制客户端代码
对python和java客户端代码举行了重写,java改成了kotlin:
配置文件
  1. # Required connection configs for Kafka producer, consumer, and admin
  2. bootstrap.servers=
  3. security.protocol=SASL_SSL
  4. sasl.mechanisms=PLAIN
  5. sasl.username=
  6. sasl.password=
  7. group.id=
  8. auto.offset.reset=earliest
  9. # Best practice for higher availability in librdkafka clients prior to 1.7
  10. session.timeout.ms=45000
复制代码
  1. import time
  2. from confluent_kafka import Producer, Consumer
  3. import asyncio
  4. import threading
  5. class KafkaClient:
  6.     def __init__(self, config_file):
  7.         self.config = self.read_config(config_file)
  8.     def read_config(self, config_file):
  9.         config = {}
  10.         with open(config_file) as fh:
  11.             for line in fh:
  12.                 line = line.strip()
  13.                 if len(line) != 0 and line[0] != "#":
  14.                     parameter, value = line.strip().split('=', 1)
  15.                     config[parameter] = value.strip()
  16.         return config
  17.     def produce(self, topic, key, value):
  18.         # Creates a new producer instance
  19.         producer = Producer(self.config)
  20.         # Produces a sample message
  21.         producer.produce(topic, key=key, value=value)
  22.         print(f"Produced message to topic {topic}: key = {key:12} value = {value:12}")
  23.         # Send any outstanding or buffered messages to the Kafka broker
  24.         producer.flush()
  25.     def consume_async(self, topic, callback=None, group_id="python-group-1", auto_offset_reset="earliest"):
  26.         # Sets the consumer group ID and offset
  27.         self.config["group.id"] = group_id
  28.         self.config["auto.offset.reset"] = auto_offset_reset
  29.         consumer = Consumer(self.config)
  30.         consumer.subscribe([topic])
  31.         loop = asyncio.new_event_loop()
  32.         asyncio.set_event_loop(loop)
  33.         if callback is not None:
  34.             loop.run_until_complete(callback(consumer))
  35.     def consume(self, topic, callback=None):
  36.         thread = threading.Thread(target=self.consume_async, args=(topic, callback,))
  37.         thread.start()
  38.         return thread
  39. async def consume_async(consumer):
  40.     try:
  41.         while True:
  42.             msg = consumer.poll(1.0)
  43.             if msg is not None:
  44.                 break
  45.         if not msg.error():
  46.             key = msg.key().decode("utf-8")
  47.             value = msg.value().decode("utf-8")
  48.             print(f"Consumed message: key = {key:12} value = {value:12}")
  49.     except KeyboardInterrupt:
  50.         pass
  51.     finally:
  52.         consumer.close()
  53. config_file_path = ".\\client.properties"
  54. topic = "test"
  55. key = "key"
  56. value = "value"
  57. kafka_client = KafkaClient(config_file_path)
  58. kafka_client.produce(topic, key, value)
  59. thread = kafka_client.consume(topic, consume_async)
复制代码
配置文件
  1. # Required connection configs for Kafka producer, consumer, and admin
  2. bootstrap.servers=
  3. security.protocol=SASL_SSL
  4. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='GHFXZDIOMQW3IPKA' password='TimUk7hj/EwTiB031lA95LeKfXN3t2Ddnw+izhKx3+7wFxZKMLGEqTOnneTKrlQQ';
  5. sasl.mechanism=PLAIN
  6. # Required for correctness in Apache Kafka clients prior to 2.6
  7. client.dns.lookup=use_all_dns_ips
  8. key.serializer=org.apache.kafka.common.serialization.StringSerializer
  9. value.serializer=org.apache.kafka.common.serialization.StringSerializer
  10. # Best practice for higher availability in Apache Kafka clients prior to 3.0
  11. session.timeout.ms=45000
  12. topic=
  13. group.id=
  14. auto.offset.reset=earliest
  15. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  16. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  17. # Best practice for Kafka producer to prevent data loss
  18. acks=all
复制代码
java(kotiln)
  1. import kotlinx.coroutines.CoroutineScope
  2. import kotlinx.coroutines.delay
  3. import kotlinx.coroutines.launch
  4. import kotlinx.coroutines.newFixedThreadPoolContext
  5. import org.apache.kafka.clients.consumer.ConsumerRecords
  6. import org.apache.kafka.clients.consumer.KafkaConsumer
  7. import org.apache.kafka.clients.producer.KafkaProducer
  8. import org.apache.kafka.clients.producer.ProducerRecord
  9. import java.io.Closeable
  10. import java.io.FileInputStream
  11. import java.io.IOException
  12. import java.nio.file.Files
  13. import java.nio.file.Paths
  14. import java.time.Duration
  15. import java.util.*
  16. class KafkaClient<T, V> : Closeable {
  17.     private var producer: KafkaProducer<T, V>? = null
  18.     private var fileConfig: Properties? = null
  19.     val TOPIC = "topic"
  20.     private val DURATION = 100L
  21.     private val POOLSIZE = 10
  22.     private val DISPATCHER = newFixedThreadPoolContext(POOLSIZE, "CoroutinePool")
  23.     private val SCOPE = CoroutineScope(DISPATCHER)
  24.     constructor(configPath: String? = null, config: Properties? = null) {
  25.         if (config == null && configPath == null) throw Exception("don't have any config")
  26.         var config1 = Properties()
  27.         if (configPath != null) {
  28.             fileConfig = readConfig(configPath)
  29.             fileConfig?.let { config1.putAll(it) }
  30.         }
  31.         if (config != null) {
  32.             config1.putAll(config)
  33.         }
  34.         producer = KafkaProducer(config1)
  35.     }
  36.     fun produce(key: T, value: V, topic: String? = null) {
  37.         producer?.send(ProducerRecord(topic ?: (fileConfig?.getProperty(TOPIC) as String), key, value))
  38.     }
  39.     fun consume(func: suspend (ConsumerRecords<T, V>) -> Unit) {
  40.         val consumer: KafkaConsumer<T, V> = KafkaConsumer(fileConfig)
  41.         consumer.subscribe(Arrays.asList(fileConfig?.getProperty(TOPIC)))
  42.         SCOPE.launch {
  43.             while (true) {
  44.                 val records: ConsumerRecords<T, V> = consumer.poll(Duration.ofMillis(DURATION))
  45.                 func(records)
  46.                 delay(DURATION)
  47.             }
  48.         }
  49.     }
  50.     @Throws(IOException::class)
  51.     fun readConfig(configFile: String): Properties {
  52.         if (!Files.exists(Paths.get(configFile))) {
  53.             throw IOException("$configFile not found.")
  54.         }
  55.         val config = Properties()
  56.         FileInputStream(configFile).use { inputStream -> config.load(inputStream) }
  57.         return config
  58.     }
  59.     override fun close() {
  60.         producer?.close()
  61.     }
  62. }
  63. fun main() {
  64.     val cli =
  65.         KafkaClient<String, String>("D:\\src\\main\\java\\com\\tr\\robot\\io\\kafka\\client.properties")
  66.     cli.consume {
  67.         println("test beg")
  68.         for (record in it) {
  69.             println(
  70.                 String.format(
  71.                     "Consumed message from topic %s: key = %s value = %s", cli.TOPIC, record.key(), record.value()
  72.                 )
  73.             )
  74.         }
  75.         println("test end")
  76.     }
  77.     // Give some time for the consumer to start
  78.     Thread.sleep(2000)
  79.     cli.produce("key1", "test")
  80.     // Give some time for the consumer to consume the message
  81.     Thread.sleep(5000)
  82. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用多少眼泪才能让你相信

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表