废话不多说,直接上代码
为啥这样说,现在大家都想先看大妈效果,再去看逻辑
先看团体架构

先贴yml吧,这个究竟是项目一创建就需要的
- spring:
- application:
- admin: apache-kafka
- kafka:
- bootstrap-servers: 这里是你自己的kafka地址 # kafka 服务器集群地址,默认为 localhost:9092
- template:
- default-topic: demo #将消息发送到的默认主题,KafkaTemplate.sendDefault
- listener:
- type: batch #监听器类型,可选值有:SINGLE(单条消费,默认)、BATCH(批量消息)
- # kafka 生产者配置
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 key 序列化方式
- value-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 value 序列化方式
- batch-size: 16KB #默认批处理大小,如果值太小,则可能降低吞吐量,为零将完全禁用批处理,当 linger.ms=0 时,此值无效
- buffer-memory: 32MB #生产者可以用来缓冲等待发送到服务器的记录的总内存大小
- retries: 3 #发送失败时的重试次数,当大于零时,允许重试失败的发送。
- # 在考虑请求完成之前,生产者要求领导者已收到的确认数,可选值有:-1、0、1(默认为1)
- # 使用事务时,必须配置为 -1,表示领导者必须收到所有副本的确认消息。
- acks: -1
- properties:
- #消息提交延时时间(单位毫秒),当生产者接收到消息 linger.ms 秒钟后,就会将消息提交给 kafka。
- #当生产端积累的消息达到 batch-size 大小后,也会将消息提交给 kafka。
- #linger.ms 默认为 0 ,表示每接收到一条消息就会立即提交给 kafka,此时 batch-size 无效。如果对实时性要求高,则建议设置为 0
- linger.ms: 0
- partitioner:
- class: com.wmx.apachekafka.beans.MyKafkaPartitioner #kafka 自定义分区规则
- transaction-id-prefix: tx_kafka.
- # kafka 消费者配置
- consumer:
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 key 反序列化方式
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 value 反序列化方式
- group-id: test-consumer-group #标识此消费者所属的消费者组的唯一字符串,这里只要你是默认安装,那就是这个,不用修改
- #消费者客户端 id,在消费者组需要唯一指定。发出请求时会传递给服务器,用于服务器端日志记录
- #不写时,会自动命名,比如:consumer-1、consumer-2...,原子性递增。通常不建议自定义,使用默认值即可,因为容易冲突
- #client-id: wangmx1
- enable-auto-commit: true #消费者的偏移量是否在后台自动提交,默认为 true
- auto-commit-interval: 5000 #如果enable.auto.commit=true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为 5000
- # 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,可选的值有 latest、earliest、exception、none,默认值为 latest
- # latest:重置为分区中最新的 offset(消费分区中新产生的数据)、earliest:重置为分区中最小的 offset
- auto-offset-reset: latest
- properties:
- session.timeout.ms: 180000 #消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发 rebalance(重新平衡) 操作)
- request.timeout.ms: 120000 #消费请求超时时间
- max-poll-records: 5 #一次调用poll()时返回的最大记录数,即批量消费每次最多消费多少条消息,注意是最多,并不是必须满足数量后才消费.
复制代码 自界说分区MyKafkaPartitioner:
- package com.zy.apachekafka.beans;
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import java.util.Map;
- /**
- * kafka 自定义分区规则,一旦自定义了分区规则,就不会再走 kafka 默认的分区规则
- *
- * @author zy
- */
- public class MyKafkaPartitioner implements Partitioner {
- /**
- * 计算给定记录的分区,发送消息到 kafka 服务器之前,都会先走这里进行计算目标分区,即将消息发送到具体的哪个分区
- *
- * @param topic :主题名称
- * @param key :要分区的键(如果没有键,则为null)
- * @param keyBytes :要分区的序列化键(如果没有键,则为null)
- * @param value :要分区的值或null,健可以有可无,值才是真正的消息内容
- * @param valueBytes :要分区的序列化值或null
- * @param cluster :当前集群信息
- */
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- // 返回的整数值就是表示生产者将消息发送到的分区
- // 具体的规则可以根据自身需要设置
- System.out.println("发送消息:" + value);
- System.out.println("指定分区为:" + 0);
- return 0;
- }
- /**
- * 在分区程序关闭时调用
- */
- @Override
- public void close() {
- }
- /**
- * 使用给定的键值对配置此类
- *
- * @param configs
- */
- @Override
- public void configure(Map<String, ?> configs) {
- }
- }
复制代码 消费者定时器ConsumerTimer:
- package com.zy.apachekafka.component;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
- import org.springframework.kafka.listener.MessageListenerContainer;
- import org.springframework.scheduling.annotation.EnableAsync;
- import org.springframework.scheduling.annotation.EnableScheduling;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import java.time.LocalDateTime;
- import java.util.Set;
- /**
- * 消费者定时器——定时开关消费者消费功能
- * 1、本类使用 @EnableScheduling 定时任务的方式开关消费者监听器,同理可以自己提供控制层接口,通过 http 的方式来开关。
- *
- * @author zy
- */
- @Component
- @EnableScheduling
- @EnableAsync
- public class ConsumerTimer {
- /**
- * 1、{@link KafkaListener} 注解标注的方法会被注册在 KafkaListenerEndpointRegistry 中。
- * 2、{@link KafkaListenerEndpointRegistry} 在 Spring IOC 容器中已经存在,可以直接取。
- */
- @Autowired
- private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
- /**
- * 定时启动消费者监听器
- * <p>
- * MessageListenerContainer getListenerContainer(String id)
- * * 1、返回具有指定id的{@link MessageListenerContainer},如果不存在此类容器,则返回 null。
- * * 2、这个 id 就是 @KafkaListener 注解的 id 属性值
- * Set<String> getListenerContainerIds():获取所有的 KafkaListener 监听器 id
- * Collection<MessageListenerContainer> getListenerContainers():获取所有的 KafkaListener 监听器容器
- */
- @Scheduled(cron = "0 52 20 * * ? ")
- public void startListener() {
- Set<String> containerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();
- containerIds.stream().forEach(item -> System.out.println("KafkaListener 消费者监听器:" + item));
- //boolean isRunning():检查此组件当前是否正在运行
- //void start():启动此组件,如果组件已在运行,则不应引发异常,配合 stop 方法使用,
- //void resume():如果暂停,在下一次轮询后恢复此容器,配合 pause 方法使用。
- kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").resume();
- //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").start();
- System.out.println(LocalDateTime.now() + " 启动 kafka 消费者监听器:basicConsumer");
- }
- /**
- * 定时关闭/暂停消费者监听器
- * void pause():在下次轮询之前暂停此容器,配合 resume
- * void stop():以同步方式停止此组件/容器,如果组件未运行(尚未启动),则不应引发异常。配合 start 方法重新启动
- */
- @Scheduled(cron = "0 50 20 * * ? ")
- public void shutDownListener() {
- kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").pause();
- //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").stop();
- System.out.println(LocalDateTime.now() + " 暂停 kafka 消费者监听器:basicConsumer");
- }
- }
复制代码 下面该有消费和生产消息:
消费者 · 接收消息.KafkaConsumer:
- package com.zy.apachekafka.controller;
- import cn.hutool.core.exceptions.ExceptionUtil;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.common.header.Headers;
- import org.apache.kafka.common.record.TimestampType;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.messaging.handler.annotation.SendTo;
- import org.springframework.stereotype.Component;
- import java.util.List;
- /**
- * Kafka 消费者 · 接收消息.
- * 1、topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
- * 2、系统中定义了消费者(@KafkaListener)时,启动服务后,如果连不上kafka服务器则会输出大量的警告日志,但是不会报错。
- * 不是每个环境都启动了kafka服务,所以当没有配置消费者组id的时候,本类不交由Spring容器初始化,不再监听消息。
- *
- * @author zy
- */
- @Component
- @ConditionalOnProperty(prefix = "spring.kafka.consumer", name = "group-id")
- public class KafkaConsumer {
- private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
- /**
- * 监听指定主题上的消息,topics 属性是一个字符串数组,可以监听多个主题。
- * * id :用于唯一标识此消费者监听器,不同方法上此注解的id必须唯一,不设置时,会自动生成
- * * topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics = {"${my.kafka.topic-name}"})
- *
- * @param record :消息记录对象,包含消息正文、主题名、分区号、偏移量、时间戳等等
- */
- @KafkaListener(id = "basicConsumer", topics = {"car-infos", "basic-info", "helloWorld", "bgt.basic.agency.frame.topic"})
- public void messageListener1(ConsumerRecord<?, ?> record) {
- /**
- * headers:消息头信息
- * offset:此记录在相应的 Kafka 分区中的位置。
- * partition:记录所在的分区
- * serializedKeySize:序列化的未压缩密钥的大小(以字节为单位),如果 key为 null,则返回的大小为 -1
- * serializedValueSize:序列化的未压缩值(消息正文)的大小(以字节为单位,record.value().getBytes().length)。如果值为 null,则返回的大小为 -1
- * timestamp:记录的时间戳
- * TimestampType:记录的时间戳类型
- * topic:接收此记录的主题
- * value:消息内容
- */
- Headers headers = record.headers();
- long offset = record.offset();
- int partition = record.partition();
- int serializedKeySize = record.serializedKeySize();
- int serializedValueSize = record.serializedValueSize();
- long timestamp = record.timestamp();
- TimestampType timestampType = record.timestampType();
- String topic = record.topic();
- Object value = record.value();
- System.out.println("收到消息:");
- System.out.println("\theaders=" + headers);
- System.out.println("\toffset=" + offset);
- System.out.println("\tpartition=" + partition);
- System.out.println("\tserializedKeySize=" + serializedKeySize);
- System.out.println("\tserializedValueSize=" + serializedValueSize);
- System.out.println("\ttimestamp=" + timestamp);
- System.out.println("\ttimestampType=" + timestampType);
- System.out.println("\ttopic=" + topic);
- System.out.println("\tvalue=" + value);
- }
- /**
- * 批量消费时,必须使用 List 接收,否则会抛异常。
- * 即如果配置文件配置的是批量消费(spring.kafka.listener.type=batch),则监听时必须使用 list 接收
- * 反之如果配置是单条消息消费,则不能使用 list 接收,否则也会异常.
- *
- * @param records
- */
- @KafkaListener(topics = "batch-msg")
- public void messageListener2(List<ConsumerRecord<?, ?>> records) {
- System.out.println(">>>批量消费返回条数,records.size()=" + records.size());
- int count = 0;
- for (ConsumerRecord<?, ?> record : records) {
- System.out.println("\t消息" + (++count) + ":" + record.value());
- }
- }
- /**
- * 消费消息并转换。SendTo 可以标注在类上,此时对类中的所有方法有效,方法的返回值表示转发的消息内容。
- *
- * @param record
- * @return
- */
- @KafkaListener(topics = {"sendTo"})
- @SendTo("car-infos")
- public String messageListener3(ConsumerRecord<?, ?> record) {
- System.out.println("消费单条消费并转发:" + record.value() + "," + record.timestamp());
- return record.value().toString();
- }
- /**
- * 单位一体化编码与名称更正消息监听
- * 约定更正接口返回结果监听的主题为:basic.kafka.syncAgencyStatInfo.reply
- *
- * @param recordList
- */
- @KafkaListener(topics = {"${app.kafka.topics.agency:topic3}"})
- public void syncAgencyStatInfoMsgListener(List<ConsumerRecord<String, String>> recordList) {
- for (ConsumerRecord<String, String> record : recordList) {
- log.info("监听单位一体化编码与名称更正消息:{}", record);
- try {
- System.out.println("消息处理.....");
- } catch (Exception e) {
- log.error("单位一体化编码与名称更正消息消费失败:{}", ExceptionUtil.getMessage(e));
- }
- }
- }
- }
复制代码 生产者KafkaProducer:
- package com.zy.apachekafka.controller;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
- import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.util.concurrent.FailureCallback;
- import org.springframework.util.concurrent.ListenableFuture;
- import org.springframework.util.concurrent.ListenableFutureCallback;
- import org.springframework.util.concurrent.SuccessCallback;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RequestParam;
- import org.springframework.web.bind.annotation.RestController;
- import javax.annotation.Resource;
- import java.util.Map;
- import java.util.concurrent.TimeUnit;
- /**
- * kafka 生产者 · 发送消息
- *
- * @author zy
- */
- @RestController
- public class KafkaProducer {
- private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
- /**
- * {@link KafkaAutoConfiguration} 中会自动根据 {@link KafkaProperties} 配置属性读取配置,
- * 然后将 {@link KafkaTemplate} 模板添加到 Spring 容器中,所以这里直接获取使用即可。
- */
- @Resource
- private KafkaTemplate<String, Object> kafkaTemplate;
- /**
- * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsg?topic=car-infos
- * <p>
- * 1、send(String topic, @Nullable V data):向指定主题发送消息,如果 topic 不存在,则自动创建,
- * 但是创建的主题默认只有一个分区 - PartitionCount: 1、分区也没有副本 - ReplicationFactor: 1,1表示自身。
- * 2、send 方法默认是异步的,主线程会直接继续向后运行,想要获取发送结果是否成功,请添加回调方法 addCallback。
- * [WARN ][org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)]:[Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
- * [ERROR][org.springframework.kafka.support.LoggingProducerListener.onError(LoggingProducerListener.java:76)]:Exception thrown when sending a message with key='xxx' and payload='xxx' to topic bgt.basic.agency.frame.topic:
- * 3、send().get() 可以同步阻塞主线程直到获取执行结果,或者执行超时抛出异常.
- * java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
- * Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
- *
- * @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
- * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
- * @return
- */
- @PostMapping("kafka/sendMsg")
- @Transactional(rollbackFor = Exception.class)
- public Map<String, Object> sendMessage(@RequestParam String topic, @RequestBody Map<String, Object> message) {
- logger.info("向指定主题发送信息,topic={},message={}", topic, message);
- try {
- String valueAsString = new ObjectMapper().writeValueAsString(message);
- // 异步
- // kafkaTemplate.send(topic, valueAsString);
- // 同步:get() 获取执行结果,此时线程将阻塞,等待执行结果
- SendResult<String, Object> sendResult = kafkaTemplate.send(topic, valueAsString).get();
- sendResult.toString();
- message.put("sendResult", sendResult.toString());
- // org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
- } catch (Exception e) {
- // 异步发送时子线程中的异常是不会进入这里的,只有同步发送时,主线程阻塞,发送是吧,抛出异常时,才会进入这里。
- e.printStackTrace();
- }
- return message;
- }
- /**
- * 向默认主题(default-topic)发送消息:http://localhost:8080/kafka/sendMsgDefault
- * 默认主题由 spring.kafka.template.default-topic 选项进行配置
- *
- * @param message :待发送的消息,如:{"version":2,"text":"后日凌晨三点执行任务,不得有误"}
- * @return
- */
- @PostMapping("kafka/sendMsgDefault")
- @Transactional(rollbackFor = Exception.class)
- public Map<String, Object> sendMsgDefault(@RequestBody Map<String, Object> message) {
- logger.info("向默认主题发送信息,topic={},topic={}", kafkaTemplate.getDefaultTopic(), message);
- try {
- String valueAsString = new ObjectMapper().writeValueAsString(message);
- kafkaTemplate.sendDefault(valueAsString);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- return message;
- }
- /**
- * 异步回调写法 1
- * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
- * http://localhost:8080/kafka/sendMsgCallback?topic=car-infos
- * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程。
- *
- * @param topic :car-infos
- * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
- * @return
- */
- @PostMapping("kafka/sendMsgCallback")
- @Transactional(rollbackFor = Exception.class)
- public Map<String, Object> sendMessageCallback(@RequestParam String topic,
- @RequestBody Map<String, Object> message) {
- try {
- String valueAsString = new ObjectMapper().writeValueAsString(message);
- /**
- * addCallback:添加成功或者失败的异步回调
- * {@link SuccessCallback}:是发送成功回调,函数式接口,其中的方法参数为 {@link SendResult},表示发送结果
- * {@link FailureCallback}:是发送失败回调,函数式接口,其中的方法参数为 Throwable,表示异常对象
- */
- kafkaTemplate.send(topic, valueAsString).addCallback(success -> {
- String topic2 = success.getRecordMetadata().topic();
- int partition = success.getRecordMetadata().partition();
- long offset = success.getRecordMetadata().offset();
- logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
- }, failure -> {
- logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
- logger.warn("保存到数据库中,后期再做处理.");
- });
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
- return message;
- }
- /**
- * 异步回调写法 2
- * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
- * http://localhost:8080/kafka/sendMsgCallback2?topic=helloWorld
- * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程,主线程会直接运行过去。
- *
- * @param topic :helloWorld
- * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
- * @return
- */
- @PostMapping("kafka/sendMsgCallback2")
- @Transactional(rollbackFor = Exception.class)
- public Map<String, Object> sendMessageCallback2(@RequestParam String topic,
- @RequestBody Map<String, Object> message) {
- try {
- String valueAsString = new ObjectMapper().writeValueAsString(message);
- /**
- * ListenableFutureCallback 接口继承了 {@link SuccessCallback}、 {@link FailureCallback} 函数式接口
- * 重写方法即可
- */
- kafkaTemplate.send(topic, valueAsString).addCallback(
- new ListenableFutureCallback<SendResult<String, Object>>() {
- @Override
- public void onSuccess(SendResult<String, Object> success) {
- int partition = success.getRecordMetadata().partition();
- long offset = success.getRecordMetadata().offset();
- String topic2 = success.getRecordMetadata().topic();
- logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);
- }
- @Override
- public void onFailure(Throwable failure) {
- logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
- logger.warn("保存到数据库中,后期再做处理.");
- }
- });
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);
- return message;
- }
- /**
- * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsgTransactional1?topic=car-infos
- * 与 springframework 框架的事务整合到一起,此时异常处理完全和平时一样.
- *
- * @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
- * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
- * @return
- */
- @PostMapping("kafka/sendMsgTransactional1")
- @Transactional(rollbackFor = Exception.class)
- public Map<String, Object> sendMessageTransactional1(@RequestParam String topic,
- @RequestBody Map<String, Object> message) {
- try {
- logger.info("向指定主题发送信息,带事务管理,topic={},message={}", topic, message);
- String msg = new ObjectMapper().writeValueAsString(message);
- ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, msg);
- if ("110".equals(message.get("version").toString())) {
- TimeUnit.SECONDS.sleep(3);
- System.out.println(1 / 0);
- }
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return message;
- }
- /**
- * http://localhost:8080/kafka/sendMsgTransactional2?topic=car-infos
- * 生成者发送消息事务管理方式2:使用 executeInTransaction(OperationsCallback<K, V, T> callback)
- * executeInTransaction:表示执行本地事务,不参与全局事务(如果存在),即方法内部和外部是分离的,只要内部不
- * 发生异常,消息就会发送,与外部无关,即使外部有 @Transactional 注解也不影响消息发送,此时外围有没有 @Transactional 都一样。
- *
- * @param topic
- * @param message
- * @return
- */
- @PostMapping("kafka/sendMsgTransactional2")
- public Map<String, Object> sendMessageTransactional2(@RequestParam String topic,
- @RequestBody Map<String, Object> message) {
- try {
- logger.info("向指定主题发送信息,带事务管理:topic={},message={}", topic, message);
- String msg = new ObjectMapper().writeValueAsString(message);
- /**
- * executeInTransaction 表示这些操作在本地事务中调用,不参与全局事务(如果存在)
- * 所以回调方法内部发生异常时,消息不会发生出去,但是方法外部发生异常不会回滚,即便是外围方法加了 @Transactional 也没用。
- */
- kafkaTemplate.executeInTransaction(operations -> {
- operations.send(topic, msg);
- if ("120".equals(message.get("version").toString())) {
- System.out.println(1 / 0);
- }
- return null;
- });
- //如果在这里发生异常,则只要 executeInTransaction 里面不发生异常,它仍旧会发生消息成功
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- return message;
- }
- }
复制代码 贴一份pom文件吧,现在随着依赖的增加,许多时候会出现依赖之间出现题目,而且还很难排错,,有一个idea插件可以安排(maven helper)
pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.1.3.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.wmx</groupId>
- <artifactId>apache-kafka</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>apache-kafka</name>
- <description>Demo project for Spring Boot</description>
- <properties>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- spring 整合的 apache kafka 消息队列依赖-->
- <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.5.7</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
复制代码 如果有不明确的接洽作者,一起学习
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |