ToB企服应用市场:ToB评测及商务社交产业平台
标题:
发送kafka生产者数据,效果消耗者吸收不到数据 producer.flush()
[打印本页]
作者:
张国伟
时间:
2024-9-29 00:53
标题:
发送kafka生产者数据,效果消耗者吸收不到数据 producer.flush()
头一次碰到过 生产者发送一个数据,我在另一个下令窗口去消耗相同的topic 数据,效果无非获取到数据 这是怎么回事。
查阅了干系资料说是:
消息可能会在生产者的缓冲区中滞留,而没有被发送到Kafka服务器。
如果程序在发送完消息后立即竣事,可能会导致一些消息丢失
办理办法:
producer.flush()
下面是干系伪代码 如下:
def delivery_callback(err, msg):
if err:
logging.error('Message failed delivery: %s' % err)
re['name'] = "test"
re['age'] = 18
producer.produce('topic_name', json.dumps(re),callback=delivery_callback)
producer.flush() #若没有该句 可能推送不过去kafka
复制代码
调用producer.flush()是一个常见的做法,特别是在利用Kafka生产者发送消息时。这个方法用于确保所有缓冲的消息都被发送到Kafka服务器。在调用flush()之后,程序会阻塞直到所有的记载都被发送成功或者发送失败。
在生产者发送消息后,调用flush()可以确保消息被实时发送,从而降低消息丢失的风险。
生产者的缓冲区是指生产者在发送消息到Kafka集群时临时存储消息的内存区域。当生产者发送消息时,消息通常首先被写入这个缓冲区,然后在适当的时机被批量发送到Kafka服务器。
缓冲多久才能被消耗者吸收到信息取决于多种因素,包括但不限于:
生产者的发送频率
: 生产者可能会将消息缓冲一段时间,以便批量发送,而不是每个消息都立即发送到Kafka。这个批量发送的频率可能会影响消息的到达时间。
Kafka的配置
: Kafka本身也有配置参数,比方batch.size(批量发送的消息数量)、linger.ms(消息发送延迟)、compression.type(消息压缩范例)等,这些配置参数会影响消息在缓冲区中停留的时间以及发送的时间间隔。
网络延迟
: 消息从生产者发送到Kafka服务器,以及从Kafka服务器发送到消耗者,都会受到网络延迟的影响。较高的网络延迟可能会导致消息到达的时间延迟。
消耗者组的消耗速度
: 如果消耗者组的消耗速度较慢,即消耗者处理惩罚消息的速度低于生产者发送消息的速度,那么消息会在Kafka中的主题分区中积存,直到消耗者能够处理惩罚它们。
总的来说,Kafka旨在提供低延迟和高吞吐量的消息传输服务。但实际消息到达消耗者的时间受多种因素影响,包括生产者和消耗者的配置以及网络情况等。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4