宝塔山 发表于 2024-8-19 02:15:17

java Kafka生产者推送数据与消耗者吸收数据(参数配置以及案例)_java实现k

max.block.ms:生产者在发送消息之前等待Broker元数据信息的最长时间。如果在该时间内无法获取到Broker元数据信息,则会抛出TimeoutException非常。默认值为60000毫秒,即60秒。
compression.type:消息压缩范例。可选值为none、gzip、snappy、lz4。默认值为none,表示不进行压缩。压缩可以淘汰消息的传输巨细,提高网络带宽的使用率,但会增长CPU的消耗。
interceptor.classes:消息拦截器列表。可以指定多个消息拦截器对消息进行加工处理。例如,可以在消息中添加时间戳、添加消息来源等信息。
以上参数只是一部分,Kafka生产者另有更多参数可以进行配置。需要根据实际情况选择合适的参数进行配置。
例子

下面是一个单例模式配置 kafka生产者的例子(避免多次创建实例,淘汰资源的消耗)
public class SingletonKafkaProducerExample {
private static SingletonKafkaProducerExample instance;
private static Producer<String, String> producer;
private SingletonKafkaProducerExample() {
//参数设置
Properties props = new Properties();
props.put(“bootstrap.servers”, “ip:端口”);
props.put(“acks”, “all”);
props.put(“max.block.ms”,120000);//默认60s
props.put(“retries”, 3)//默认0;
props.put(“batch.size”, 16384);
props.put(“linger.ms”, 1);
props.put(“buffer.memory”, 33554432);
props.put(“request.timeout.ms”,60*1000);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
//sasl认证 (根据实际情况看是否配置)
props.put(“security.protocol”, “SASL_PLAINTEXT”);
props.put(“sasl.mechanism”, “PLAIN”);
props.put(“sasl.jaas.config”, “org.apache.kafka.common.security.plain.PlainLoginModule required username=‘username’ password=‘password’;”);
producer = new KafkaProducer<>(props);
logger.info(“kafka毗连成功”);
}
public static SingletonKafkaProducerExample getInstance() {
if (instance == null) {
synchronized (SingletonKafkaProducerExample.class) {
if (instance == null) {
instance = new SingletonKafkaProducerExample();
}
}
}
return instance;
}
public void sendMessage(String topic, String key, String value) {
try {
//这里也可以不消设置key和partition,例如不设置分区 系统会使用轮询算法自动匹配partition
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
Future future = producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println(“发送消息到” + metadata.topic() + “失败:” + exception.getMessage());
} else {
System.out.println(“发送消息到” + metadata.topic() + “成功:partition=” + metadata.partition() + “, offset=” + metadata.offset());
}
});
future.get(); // 等待返回数据
} catch (InterruptedException | ExecutionException e) {
System.err.println(“发送消息失败:” + e.getMessage());
}
}
public void closeProducer() {
producer.close();
}
}
以上参数配置只是案例,实际参数配置需要根据业务情况自己设置
下面是生产的方法介绍:
close(): 关闭生产者,释放相干资源。
close(Duration timeout): 在指定的超时时间内关闭生产者,释放相干资源。
initTransactions(): 初始化事件,启用事件支持。
beginTransaction(): 开始事件。
send(ProducerRecord<K, V> record): 发送一条消息记载到指定的主题。
send(ProducerRecord<K, V> record, Callback callback): 发送一条消息记载,并附带一个回调函数用于异步处剃头送结果。
send(ProducerRecord<K, V> record, ProducerCallback callback): 发送一条消息记载,并使用自定义的回调函数处剃头送结果。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId): 将消耗者组的偏移量提交给事件。
partitionsFor(String topic): 获取指定主题的分区信息。
metrics(): 获取生产者的度量指标信息。
flush(): 将全部已挂起的消息立即发送到Kafka服务器,等待服务器确认后再返回。
commitTransaction(): 提交当前事件。
abortTransaction(): 中止当前事件。
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, ConsumerGroupMetadata groupMetadata): 将消耗者组的偏移量和消耗者组元数据提交给事件。
大概遇见的题目

1.多个topic发送消息的时候总有1.2发送失败 报Failed to update metadata after 60000ms
这种情况出现的原因大概是Kafka集群中Broker的元数据信息还没有被更新到Kafka客户端中,导致Kafka客户端无法毗连到指定的Broker。
办理

增长等待时间:可以通过设置max.block.ms属性来增长等待时间
提高重试次数:可以通过设置retries属性来提高重试次数
检查Broker配置
检查网络毗连
检查Kafka版本
如果下面3个都没题目,就增长等待时间和重试次数。本人遇到这样的题目办理了
消耗者 推送数据

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置消耗者参数
Properties props = new Properties();
/*
bootstrap.servers
Kafka集群中Broker的地点列表,格式为"hostname:port",例如:“localhost:9092”。可以配置多个Broker,用逗号分隔。
*/
props.put(“bootstrap.servers”, “ip:port”);
/*
group.id
消耗者组的名称,同一个消耗者组中的消耗者会共享消耗消息的责任。例如:“test”。
*/
props.put(“group.id”, “test”);
/*
enable.auto.commit
是否自动提交偏移量,默认为true。如果为false,则需要手动提交偏移量。
*/
props.put(“enable.auto.commit”, “true”);
/*
session.timeout.ms
消耗者会话超时时间(毫秒),如果消耗者在该时间内没有向Kafka Broker发送心跳,则会被认为已经失效。默认10000毫秒。
*/
props.put(“session.timeout.ms”, “30000”);
/*
auto.offset.reset
如果消耗者在初始化时没有指定偏移量或指定的偏移量不存在,则从哪个位置开始消耗,默认latest,即从最新的消息开始消耗。其他可选值为earliest和none。
*/
props.put(“auto.offset.reset”, “earliest”);
/*
key.deserializer
key的反序列化方式,例如:“org.apache.kafka.common.serialization.StringDeserializer”。
*/
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
/*
value.deserializer
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数大数据工程师,想要提升技能,每每是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学结果低效又漫长,而且极易碰到天花板技能故步自封!
因此网络整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
https://i-blog.csdnimg.cn/blog_migrate/707e3499655f39087564c47f8bcb68c1.png
https://i-blog.csdnimg.cn/blog_migrate/f9e307e713dc04f8847f06cbbe677ccb.png
https://i-blog.csdnimg.cn/blog_migrate/498854162a60ad1733be96da424197f3.png
https://i-blog.csdnimg.cn/blog_migrate/c8b245f42d7cd60335bf2acdc119cd36.png
https://i-blog.csdnimg.cn/blog_migrate/2ef221ca0ba49e65caf94530dc7e8239.png
既有适合小白学习的零底子资料,也有适合3年以上履历的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!
由于文件比力大,这里只是将部分目录大纲截图出来,每个节点内里都包含大厂面经、学习条记、源码课本、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
https://i-blog.csdnimg.cn/blog_migrate/8a6beb708e8fd34291ca48ac04606ceb.png
以上大数据开发知识点,真正体系化!**
由于文件比力大,这里只是将部分目录大纲截图出来,每个节点内里都包含大厂面经、学习条记、源码课本、实战项目、讲解视频,并且后续会持续更新
如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)
[外链图片转存中…(img-U8g7yQDO-1712521305501)]

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: java Kafka生产者推送数据与消耗者吸收数据(参数配置以及案例)_java实现k