ToB企服应用市场:ToB评测及商务社交产业平台

标题: springboot 集成kafka 详细教程,看这一篇就够了 [打印本页]

作者: 张春    时间: 2024-8-11 03:29
标题: springboot 集成kafka 详细教程,看这一篇就够了
废话不多说,直接上代码
为啥这样说,现在大家都想先看大妈效果,再去看逻辑
先看团体架构

先贴yml吧,这个究竟是项目一创建就需要的
  1. spring:
  2.   application:
  3.     admin: apache-kafka
  4.   kafka:
  5.     bootstrap-servers: 这里是你自己的kafka地址 # kafka 服务器集群地址,默认为 localhost:9092
  6.     template:
  7.       default-topic: demo  #将消息发送到的默认主题,KafkaTemplate.sendDefault
  8.     listener:
  9.       type: batch #监听器类型,可选值有:SINGLE(单条消费,默认)、BATCH(批量消息)
  10.     # kafka 生产者配置
  11.     producer:
  12.       key-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 key 序列化方式
  13.       value-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 value 序列化方式
  14.       batch-size: 16KB #默认批处理大小,如果值太小,则可能降低吞吐量,为零将完全禁用批处理,当 linger.ms=0 时,此值无效
  15.       buffer-memory: 32MB #生产者可以用来缓冲等待发送到服务器的记录的总内存大小
  16.       retries: 3 #发送失败时的重试次数,当大于零时,允许重试失败的发送。
  17. #      在考虑请求完成之前,生产者要求领导者已收到的确认数,可选值有:-1、0、1(默认为1)
  18. #      使用事务时,必须配置为 -1,表示领导者必须收到所有副本的确认消息。
  19.       acks: -1
  20.       properties:
  21.         #消息提交延时时间(单位毫秒),当生产者接收到消息 linger.ms 秒钟后,就会将消息提交给 kafka。
  22.         #当生产端积累的消息达到 batch-size 大小后,也会将消息提交给 kafka。
  23.         #linger.ms 默认为 0 ,表示每接收到一条消息就会立即提交给 kafka,此时 batch-size 无效。如果对实时性要求高,则建议设置为 0
  24.         linger.ms: 0
  25.         partitioner:
  26.           class: com.wmx.apachekafka.beans.MyKafkaPartitioner #kafka 自定义分区规则
  27.       transaction-id-prefix: tx_kafka.
  28.     # kafka 消费者配置
  29.     consumer:
  30.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 key 反序列化方式
  31.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 value 反序列化方式
  32.       group-id: test-consumer-group #标识此消费者所属的消费者组的唯一字符串,这里只要你是默认安装,那就是这个,不用修改
  33.       #消费者客户端 id,在消费者组需要唯一指定。发出请求时会传递给服务器,用于服务器端日志记录
  34.       #不写时,会自动命名,比如:consumer-1、consumer-2...,原子性递增。通常不建议自定义,使用默认值即可,因为容易冲突
  35.       #client-id: wangmx1
  36.       enable-auto-commit: true #消费者的偏移量是否在后台自动提交,默认为 true
  37.       auto-commit-interval: 5000 #如果enable.auto.commit=true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为 5000
  38.       # 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,可选的值有 latest、earliest、exception、none,默认值为 latest
  39.       # latest:重置为分区中最新的 offset(消费分区中新产生的数据)、earliest:重置为分区中最小的 offset
  40.       auto-offset-reset: latest
  41.       properties:
  42.         session.timeout.ms: 180000 #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发 rebalance(重新平衡) 操作)
  43.         request.timeout.ms: 120000 #消费请求超时时间
  44.       max-poll-records: 5 #一次调用poll()时返回的最大记录数,即批量消费每次最多消费多少条消息,注意是最多,并不是必须满足数量后才消费.
复制代码
自界说分区MyKafkaPartitioner:
  1. package com.zy.apachekafka.beans;
  2. import org.apache.kafka.clients.producer.Partitioner;
  3. import org.apache.kafka.common.Cluster;
  4. import java.util.Map;
  5. /**
  6. * kafka 自定义分区规则,一旦自定义了分区规则,就不会再走 kafka 默认的分区规则
  7. *
  8. * @author zy
  9. */
  10. public class MyKafkaPartitioner implements Partitioner {
  11.     /**
  12.      * 计算给定记录的分区,发送消息到 kafka 服务器之前,都会先走这里进行计算目标分区,即将消息发送到具体的哪个分区
  13.      *
  14.      * @param topic      :主题名称
  15.      * @param key        :要分区的键(如果没有键,则为null)
  16.      * @param keyBytes   :要分区的序列化键(如果没有键,则为null)
  17.      * @param value      :要分区的值或null,健可以有可无,值才是真正的消息内容
  18.      * @param valueBytes :要分区的序列化值或null
  19.      * @param cluster    :当前集群信息
  20.      */
  21.     @Override
  22.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  23.         // 返回的整数值就是表示生产者将消息发送到的分区
  24.         // 具体的规则可以根据自身需要设置
  25.         System.out.println("发送消息:" + value);
  26.         System.out.println("指定分区为:" + 0);
  27.         return 0;
  28.     }
  29.     /**
  30.      * 在分区程序关闭时调用
  31.      */
  32.     @Override
  33.     public void close() {
  34.     }
  35.     /**
  36.      * 使用给定的键值对配置此类
  37.      *
  38.      * @param configs
  39.      */
  40.     @Override
  41.     public void configure(Map<String, ?> configs) {
  42.     }
  43. }
复制代码
消费者定时器ConsumerTimer:
  1. package com.zy.apachekafka.component;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
  5. import org.springframework.kafka.listener.MessageListenerContainer;
  6. import org.springframework.scheduling.annotation.EnableAsync;
  7. import org.springframework.scheduling.annotation.EnableScheduling;
  8. import org.springframework.scheduling.annotation.Scheduled;
  9. import org.springframework.stereotype.Component;
  10. import java.time.LocalDateTime;
  11. import java.util.Set;
  12. /**
  13. * 消费者定时器——定时开关消费者消费功能
  14. * 1、本类使用 @EnableScheduling 定时任务的方式开关消费者监听器,同理可以自己提供控制层接口,通过 http 的方式来开关。
  15. *
  16. * @author zy
  17. */
  18. @Component
  19. @EnableScheduling
  20. @EnableAsync
  21. public class ConsumerTimer {
  22.     /**
  23.      * 1、{@link KafkaListener} 注解标注的方法会被注册在 KafkaListenerEndpointRegistry 中。
  24.      * 2、{@link KafkaListenerEndpointRegistry} 在 Spring IOC 容器中已经存在,可以直接取。
  25.      */
  26.     @Autowired
  27.     private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
  28.     /**
  29.      * 定时启动消费者监听器
  30.      * <p>
  31.      * MessageListenerContainer getListenerContainer(String id)
  32.      * * 1、返回具有指定id的{@link MessageListenerContainer},如果不存在此类容器,则返回 null。
  33.      * * 2、这个 id 就是 @KafkaListener 注解的 id 属性值
  34.      * Set<String> getListenerContainerIds():获取所有的 KafkaListener 监听器 id
  35.      * Collection<MessageListenerContainer> getListenerContainers():获取所有的 KafkaListener 监听器容器
  36.      */
  37.     @Scheduled(cron = "0 52 20 * * ? ")
  38.     public void startListener() {
  39.         Set<String> containerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();
  40.         containerIds.stream().forEach(item -> System.out.println("KafkaListener 消费者监听器:" + item));
  41.         //boolean isRunning():检查此组件当前是否正在运行
  42.         //void start():启动此组件,如果组件已在运行,则不应引发异常,配合 stop 方法使用,
  43.         //void resume():如果暂停,在下一次轮询后恢复此容器,配合 pause 方法使用。
  44.         kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").resume();
  45.         //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").start();
  46.         System.out.println(LocalDateTime.now() + " 启动 kafka 消费者监听器:basicConsumer");
  47.     }
  48.     /**
  49.      * 定时关闭/暂停消费者监听器
  50.      * void pause():在下次轮询之前暂停此容器,配合 resume
  51.      * void stop():以同步方式停止此组件/容器,如果组件未运行(尚未启动),则不应引发异常。配合 start 方法重新启动
  52.      */
  53.     @Scheduled(cron = "0 50 20 * * ? ")
  54.     public void shutDownListener() {
  55.         kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").pause();
  56.         //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").stop();
  57.         System.out.println(LocalDateTime.now() + " 暂停 kafka 消费者监听器:basicConsumer");
  58.     }
  59. }
复制代码
下面该有消费和生产消息:
消费者 · 接收消息.KafkaConsumer:
  1. package com.zy.apachekafka.controller;
  2. import cn.hutool.core.exceptions.ExceptionUtil;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.common.header.Headers;
  5. import org.apache.kafka.common.record.TimestampType;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  9. import org.springframework.kafka.annotation.KafkaListener;
  10. import org.springframework.messaging.handler.annotation.SendTo;
  11. import org.springframework.stereotype.Component;
  12. import java.util.List;
  13. /**
  14. * Kafka 消费者 · 接收消息.
  15. * 1、topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
  16. * 2、系统中定义了消费者(@KafkaListener)时,启动服务后,如果连不上kafka服务器则会输出大量的警告日志,但是不会报错。
  17. * 不是每个环境都启动了kafka服务,所以当没有配置消费者组id的时候,本类不交由Spring容器初始化,不再监听消息。
  18. *
  19. * @author zy
  20. */
  21. @Component
  22. @ConditionalOnProperty(prefix = "spring.kafka.consumer", name = "group-id")
  23. public class KafkaConsumer {
  24.     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
  25.     /**
  26.      * 监听指定主题上的消息,topics 属性是一个字符串数组,可以监听多个主题。
  27.      * * id :用于唯一标识此消费者监听器,不同方法上此注解的id必须唯一,不设置时,会自动生成
  28.      * * topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
  29.      *
  30.      * @param record :消息记录对象,包含消息正文、主题名、分区号、偏移量、时间戳等等
  31.      */
  32.     @KafkaListener(id = "basicConsumer", topics = {"car-infos", "basic-info", "helloWorld", "bgt.basic.agency.frame.topic"})
  33.     public void messageListener1(ConsumerRecord<?, ?> record) {
  34.         /**
  35.          * headers:消息头信息
  36.          * offset:此记录在相应的 Kafka 分区中的位置。
  37.          * partition:记录所在的分区
  38.          * serializedKeySize:序列化的未压缩密钥的大小(以字节为单位),如果 key为 null,则返回的大小为 -1
  39.          * serializedValueSize:序列化的未压缩值(消息正文)的大小(以字节为单位,record.value().getBytes().length)。如果值为 null,则返回的大小为 -1
  40.          * timestamp:记录的时间戳
  41.          * TimestampType:记录的时间戳类型
  42.          * topic:接收此记录的主题
  43.          * value:消息内容
  44.          */
  45.         Headers headers = record.headers();
  46.         long offset = record.offset();
  47.         int partition = record.partition();
  48.         int serializedKeySize = record.serializedKeySize();
  49.         int serializedValueSize = record.serializedValueSize();
  50.         long timestamp = record.timestamp();
  51.         TimestampType timestampType = record.timestampType();
  52.         String topic = record.topic();
  53.         Object value = record.value();
  54.         System.out.println("收到消息:");
  55.         System.out.println("\theaders=" + headers);
  56.         System.out.println("\toffset=" + offset);
  57.         System.out.println("\tpartition=" + partition);
  58.         System.out.println("\tserializedKeySize=" + serializedKeySize);
  59.         System.out.println("\tserializedValueSize=" + serializedValueSize);
  60.         System.out.println("\ttimestamp=" + timestamp);
  61.         System.out.println("\ttimestampType=" + timestampType);
  62.         System.out.println("\ttopic=" + topic);
  63.         System.out.println("\tvalue=" + value);
  64.     }
  65.     /**
  66.      * 批量消费时,必须使用 List 接收,否则会抛异常。
  67.      * 即如果配置文件配置的是批量消费(spring.kafka.listener.type=batch),则监听时必须使用 list 接收
  68.      * 反之如果配置是单条消息消费,则不能使用 list 接收,否则也会异常.
  69.      *
  70.      * @param records
  71.      */
  72.     @KafkaListener(topics = "batch-msg")
  73.     public void messageListener2(List<ConsumerRecord<?, ?>> records) {
  74.         System.out.println(">>>批量消费返回条数,records.size()=" + records.size());
  75.         int count = 0;
  76.         for (ConsumerRecord<?, ?> record : records) {
  77.             System.out.println("\t消息" + (++count) + ":" + record.value());
  78.         }
  79.     }
  80.     /**
  81.      * 消费消息并转换。SendTo 可以标注在类上,此时对类中的所有方法有效,方法的返回值表示转发的消息内容。
  82.      *
  83.      * @param record
  84.      * @return
  85.      */
  86.     @KafkaListener(topics = {"sendTo"})
  87.     @SendTo("car-infos")
  88.     public String messageListener3(ConsumerRecord<?, ?> record) {
  89.         System.out.println("消费单条消费并转发:" + record.value() + "," + record.timestamp());
  90.         return record.value().toString();
  91.     }
  92.     /**
  93.      * 单位一体化编码与名称更正消息监听
  94.      * 约定更正接口返回结果监听的主题为:basic.kafka.syncAgencyStatInfo.reply
  95.      *
  96.      * @param recordList
  97.      */
  98.     @KafkaListener(topics = {"${app.kafka.topics.agency:topic3}"})
  99.     public void syncAgencyStatInfoMsgListener(List<ConsumerRecord<String, String>> recordList) {
  100.         for (ConsumerRecord<String, String> record : recordList) {
  101.             log.info("监听单位一体化编码与名称更正消息:{}", record);
  102.             try {
  103.                 System.out.println("消息处理.....");
  104.             } catch (Exception e) {
  105.                 log.error("单位一体化编码与名称更正消息消费失败:{}", ExceptionUtil.getMessage(e));
  106.             }
  107.         }
  108.     }
  109. }
复制代码
生产者KafkaProducer:
  1. package com.zy.apachekafka.controller;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
  7. import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import org.springframework.kafka.support.SendResult;
  10. import org.springframework.transaction.annotation.Transactional;
  11. import org.springframework.util.concurrent.FailureCallback;
  12. import org.springframework.util.concurrent.ListenableFuture;
  13. import org.springframework.util.concurrent.ListenableFutureCallback;
  14. import org.springframework.util.concurrent.SuccessCallback;
  15. import org.springframework.web.bind.annotation.PostMapping;
  16. import org.springframework.web.bind.annotation.RequestBody;
  17. import org.springframework.web.bind.annotation.RequestParam;
  18. import org.springframework.web.bind.annotation.RestController;
  19. import javax.annotation.Resource;
  20. import java.util.Map;
  21. import java.util.concurrent.TimeUnit;
  22. /**
  23. * kafka 生产者 · 发送消息
  24. *
  25. * @author zy
  26. */
  27. @RestController
  28. public class KafkaProducer {
  29.     private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
  30.     /**
  31.      * {@link KafkaAutoConfiguration} 中会自动根据 {@link KafkaProperties} 配置属性读取配置,
  32.      * 然后将 {@link KafkaTemplate} 模板添加到 Spring 容器中,所以这里直接获取使用即可。
  33.      */
  34.     @Resource
  35.     private KafkaTemplate<String, Object> kafkaTemplate;
  36.     /**
  37.      * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsg?topic=car-infos
  38.      * <p>
  39.      * 1、send(String topic, @Nullable V data):向指定主题发送消息,如果 topic 不存在,则自动创建,
  40.      * 但是创建的主题默认只有一个分区 - PartitionCount: 1、分区也没有副本 - ReplicationFactor: 1,1表示自身。
  41.      * 2、send 方法默认是异步的,主线程会直接继续向后运行,想要获取发送结果是否成功,请添加回调方法 addCallback。
  42.      * [WARN ][org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)]:[Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
  43.      * [ERROR][org.springframework.kafka.support.LoggingProducerListener.onError(LoggingProducerListener.java:76)]:Exception thrown when sending a message with key='xxx' and payload='xxx' to topic bgt.basic.agency.frame.topic:
  44.      * 3、send().get() 可以同步阻塞主线程直到获取执行结果,或者执行超时抛出异常.
  45.      * java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
  46.      * Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  47.      *
  48.      * @param topic   :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
  49.      * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
  50.      * @return
  51.      */
  52.     @PostMapping("kafka/sendMsg")
  53.     @Transactional(rollbackFor = Exception.class)
  54.     public Map<String, Object> sendMessage(@RequestParam String topic, @RequestBody Map<String, Object> message) {
  55.         logger.info("向指定主题发送信息,topic={},message={}", topic, message);
  56.         try {
  57.             String valueAsString = new ObjectMapper().writeValueAsString(message);
  58.             // 异步
  59.             // kafkaTemplate.send(topic, valueAsString);
  60.             // 同步:get() 获取执行结果,此时线程将阻塞,等待执行结果
  61.             SendResult<String, Object> sendResult = kafkaTemplate.send(topic, valueAsString).get();
  62.             sendResult.toString();
  63.             message.put("sendResult", sendResult.toString());
  64.             // org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  65.         } catch (Exception e) {
  66.             // 异步发送时子线程中的异常是不会进入这里的,只有同步发送时,主线程阻塞,发送是吧,抛出异常时,才会进入这里。
  67.             e.printStackTrace();
  68.         }
  69.         return message;
  70.     }
  71.     /**
  72.      * 向默认主题(default-topic)发送消息:http://localhost:8080/kafka/sendMsgDefault
  73.      * 默认主题由 spring.kafka.template.default-topic 选项进行配置
  74.      *
  75.      * @param message :待发送的消息,如:{"version":2,"text":"后日凌晨三点执行任务,不得有误"}
  76.      * @return
  77.      */
  78.     @PostMapping("kafka/sendMsgDefault")
  79.     @Transactional(rollbackFor = Exception.class)
  80.     public Map<String, Object> sendMsgDefault(@RequestBody Map<String, Object> message) {
  81.         logger.info("向默认主题发送信息,topic={},topic={}", kafkaTemplate.getDefaultTopic(), message);
  82.         try {
  83.             String valueAsString = new ObjectMapper().writeValueAsString(message);
  84.             kafkaTemplate.sendDefault(valueAsString);
  85.         } catch (JsonProcessingException e) {
  86.             e.printStackTrace();
  87.         }
  88.         return message;
  89.     }
  90.     /**
  91.      * 异步回调写法 1
  92.      * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
  93.      * http://localhost:8080/kafka/sendMsgCallback?topic=car-infos
  94.      * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程。
  95.      *
  96.      * @param topic   :car-infos
  97.      * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
  98.      * @return
  99.      */
  100.     @PostMapping("kafka/sendMsgCallback")
  101.     @Transactional(rollbackFor = Exception.class)
  102.     public Map<String, Object> sendMessageCallback(@RequestParam String topic,
  103.         @RequestBody Map<String, Object> message) {
  104.         try {
  105.             String valueAsString = new ObjectMapper().writeValueAsString(message);
  106.             /**
  107.              * addCallback:添加成功或者失败的异步回调
  108.              * {@link SuccessCallback}:是发送成功回调,函数式接口,其中的方法参数为 {@link SendResult},表示发送结果
  109.              * {@link FailureCallback}:是发送失败回调,函数式接口,其中的方法参数为 Throwable,表示异常对象
  110.              */
  111.             kafkaTemplate.send(topic, valueAsString).addCallback(success -> {
  112.                 String topic2 = success.getRecordMetadata().topic();
  113.                 int partition = success.getRecordMetadata().partition();
  114.                 long offset = success.getRecordMetadata().offset();
  115.                 logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
  116.             }, failure -> {
  117.                 logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
  118.                 logger.warn("保存到数据库中,后期再做处理.");
  119.             });
  120.         } catch (JsonProcessingException e) {
  121.             e.printStackTrace();
  122.         }
  123.         logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
  124.         return message;
  125.     }
  126.     /**
  127.      * 异步回调写法 2
  128.      * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
  129.      * http://localhost:8080/kafka/sendMsgCallback2?topic=helloWorld
  130.      * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程,主线程会直接运行过去。
  131.      *
  132.      * @param topic   :helloWorld
  133.      * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
  134.      * @return
  135.      */
  136.     @PostMapping("kafka/sendMsgCallback2")
  137.     @Transactional(rollbackFor = Exception.class)
  138.     public Map<String, Object> sendMessageCallback2(@RequestParam String topic,
  139.         @RequestBody Map<String, Object> message) {
  140.         try {
  141.             String valueAsString = new ObjectMapper().writeValueAsString(message);
  142.             /**
  143.              * ListenableFutureCallback 接口继承了 {@link SuccessCallback}、 {@link FailureCallback} 函数式接口
  144.              * 重写方法即可
  145.              */
  146.             kafkaTemplate.send(topic, valueAsString).addCallback(
  147.                 new ListenableFutureCallback<SendResult<String, Object>>() {
  148.                     @Override
  149.                     public void onSuccess(SendResult<String, Object> success) {
  150.                         int partition = success.getRecordMetadata().partition();
  151.                         long offset = success.getRecordMetadata().offset();
  152.                         String topic2 = success.getRecordMetadata().topic();
  153.                         logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
  154.                     }
  155.                     @Override
  156.                     public void onFailure(Throwable failure) {
  157.                         logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
  158.                         logger.warn("保存到数据库中,后期再做处理.");
  159.                     }
  160.                 });
  161.         } catch (JsonProcessingException e) {
  162.             e.printStackTrace();
  163.         }
  164.         logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
  165.         return message;
  166.     }
  167.     /**
  168.      * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsgTransactional1?topic=car-infos
  169.      * 与 springframework 框架的事务整合到一起,此时异常处理完全和平时一样.
  170.      *
  171.      * @param topic   :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
  172.      * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
  173.      * @return
  174.      */
  175.     @PostMapping("kafka/sendMsgTransactional1")
  176.     @Transactional(rollbackFor = Exception.class)
  177.     public Map<String, Object> sendMessageTransactional1(@RequestParam String topic,
  178.         @RequestBody Map<String, Object> message) {
  179.         try {
  180.             logger.info("向指定主题发送信息,带事务管理,topic={},message={}", topic, message);
  181.             String msg = new ObjectMapper().writeValueAsString(message);
  182.             ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, msg);
  183.             if ("110".equals(message.get("version").toString())) {
  184.                 TimeUnit.SECONDS.sleep(3);
  185.                 System.out.println(1 / 0);
  186.             }
  187.         } catch (JsonProcessingException e) {
  188.             e.printStackTrace();
  189.         } catch (InterruptedException e) {
  190.             e.printStackTrace();
  191.         }
  192.         return message;
  193.     }
  194.     /**
  195.      * http://localhost:8080/kafka/sendMsgTransactional2?topic=car-infos
  196.      * 生成者发送消息事务管理方式2:使用 executeInTransaction(OperationsCallback<K, V, T> callback)
  197.      * executeInTransaction:表示执行本地事务,不参与全局事务(如果存在),即方法内部和外部是分离的,只要内部不
  198.      * 发生异常,消息就会发送,与外部无关,即使外部有 @Transactional 注解也不影响消息发送,此时外围有没有 @Transactional 都一样。
  199.      *
  200.      * @param topic
  201.      * @param message
  202.      * @return
  203.      */
  204.     @PostMapping("kafka/sendMsgTransactional2")
  205.     public Map<String, Object> sendMessageTransactional2(@RequestParam String topic,
  206.         @RequestBody Map<String, Object> message) {
  207.         try {
  208.             logger.info("向指定主题发送信息,带事务管理:topic={},message={}", topic, message);
  209.             String msg = new ObjectMapper().writeValueAsString(message);
  210.             /**
  211.              * executeInTransaction 表示这些操作在本地事务中调用,不参与全局事务(如果存在)
  212.              * 所以回调方法内部发生异常时,消息不会发生出去,但是方法外部发生异常不会回滚,即便是外围方法加了 @Transactional 也没用。
  213.              */
  214.             kafkaTemplate.executeInTransaction(operations -> {
  215.                 operations.send(topic, msg);
  216.                 if ("120".equals(message.get("version").toString())) {
  217.                     System.out.println(1 / 0);
  218.                 }
  219.                 return null;
  220.             });
  221.             //如果在这里发生异常,则只要 executeInTransaction 里面不发生异常,它仍旧会发生消息成功
  222.         } catch (JsonProcessingException e) {
  223.             e.printStackTrace();
  224.         }
  225.         return message;
  226.     }
  227. }
复制代码
贴一份pom文件吧,现在随着依赖的增加,许多时候会出现依赖之间出现题目,而且还很难排错,,有一个idea插件可以安排(maven helper)
pom.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <modelVersion>4.0.0</modelVersion>
  5.     <parent>
  6.         <groupId>org.springframework.boot</groupId>
  7.         <artifactId>spring-boot-starter-parent</artifactId>
  8.         <version>2.1.3.RELEASE</version>
  9.         <relativePath/> <!-- lookup parent from repository -->
  10.     </parent>
  11.     <groupId>com.wmx</groupId>
  12.     <artifactId>apache-kafka</artifactId>
  13.     <version>0.0.1-SNAPSHOT</version>
  14.     <name>apache-kafka</name>
  15.     <description>Demo project for Spring Boot</description>
  16.     <properties>
  17.         <java.version>1.8</java.version>
  18.     </properties>
  19.     <dependencies>
  20.         <dependency>
  21.             <groupId>org.springframework.boot</groupId>
  22.             <artifactId>spring-boot-starter-web</artifactId>
  23.         </dependency>
  24.         <!-- spring 整合的 apache kafka 消息队列依赖-->
  25.         <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
  26.         <dependency>
  27.             <groupId>org.springframework.kafka</groupId>
  28.             <artifactId>spring-kafka</artifactId>
  29.         </dependency>
  30.         <dependency>
  31.             <groupId>org.springframework.boot</groupId>
  32.             <artifactId>spring-boot-starter-test</artifactId>
  33.             <scope>test</scope>
  34.         </dependency>
  35.         <dependency>
  36.             <groupId>org.springframework.kafka</groupId>
  37.             <artifactId>spring-kafka-test</artifactId>
  38.             <scope>test</scope>
  39.         </dependency>
  40.         <dependency>
  41.             <groupId>cn.hutool</groupId>
  42.             <artifactId>hutool-all</artifactId>
  43.             <version>5.5.7</version>
  44.         </dependency>
  45.     </dependencies>
  46.     <build>
  47.         <plugins>
  48.             <plugin>
  49.                 <groupId>org.springframework.boot</groupId>
  50.                 <artifactId>spring-boot-maven-plugin</artifactId>
  51.             </plugin>
  52.         </plugins>
  53.     </build>
  54. </project>
复制代码
如果有不明确的接洽作者,一起学习

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4