Kafka 之变乱消息

打印 上一主题 下一主题

主题 845|帖子 845|积分 2537

媒介:
在分布式消息体系中,变乱消息也是一个热门课题,在项目的现实业务场景中,如果用到变乱消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持变乱消息,本篇我们将对 Kafka 的变乱消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及核心概念解说
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送消费
Kafka 之消息广播消费
Kafka 之消息并发消费
Kafka 之顺序消息
变乱消息的使用场景
变乱消息的使用场景众多,这里我简朴枚举几个如下:


  • 金融交易处置惩罚:在金融领域,每笔交易都必须具备原子性,确保不发生不同等或重复的交易,变乱性消息可用于金融交易的场景,来保证交易的完整性。
  • 订单处置惩罚:用户订单处置惩罚必须是完整的,必要保订单的创建、支付和发货不会出现题目,变乱性消息可以用于订单场景,来保证整个流程的完整性。
  • 。。。。等等等场景。
什么是 Kafka 的变乱消息?
Kafka 变乱性消息是一种机制,用于确保消息的可靠性通报和处置惩罚,与非变乱性消息相比,它们在数据处置惩罚中提供了额外的保证,一旦消息被写入Kafka 集群,它们将被以为是已经处置惩罚,无论发生了什么,Kafka 的变乱不同于 RocketMQ,RocketMQ 是保障本地变乱与 RocketMQ 消息发送的变乱同等性,而 Kafka 的变乱消息主要是保障一次发送多条消息的变乱同等性,发送的消息要么同时成功要么同时失败。
Kafka 变乱消息的特性?


  • 原子性:Kafka 变乱性消息要么完全成功,要么完全失败,保障了消息不会被部分处置惩罚。
  • 可靠性:Kafka 变乱性消息一旦写入Kafka,它们将被视为已经处置惩罚,即使发生了应用程序或体系故障。
  • 幂等性:Kafka 生产者可以设置为幂等,确保雷同的消息不会被重复发送,对于 Kafka 变乱消息通常会设置为幂等。
  • Exactly Once 语义:仅一次,Kafka 变乱性消息支持仅一次,即消息要么完全到达一次,要么不到达。
   Kafka的消息传输保障
Kafka 的消息传输保障分为 3个层级:
at most once:至多一次,消息大概丢失,但绝不会重复。
at least once:最少一次,消息绝不丢失,但大概重复传输。
exactly once:恰恰一次,每条消息肯定会被传输一次且仅传输一次。
  Kafka 变乱消息的使用

Kafka 变乱消息的设置
Kafka 的变乱消息必要对生产者和消费者举行一些设置来启用 Kafka 对变乱消息的支持。
生产者设置


  • acks:这个参数我们前面有见到过,是有关生产者接收到确认之后才以为消息发送成功的设置,对于变乱性消息,通常将其设置为acks=all(表示必要等到消息写入到所有 ISR 同步副本中,设置为 -1 也是一样的结果),以确保消息在变乱完全提交后才被视为成功发送。
  • transactional.id:变乱id,这是用于标识生产者实例的唯一ID,在设置文件中设置 transactional.id 是启用变乱性消息的核心步调。
消费者设置


  • isolation.level:Kafka 消费者隔离级别的设置,对于变乱性消息,通常将其设置为isolation.level=read_committed,确保只读取已经提交的变乱消息。
  • auto.offset.reset:这是消费者启动时从哪里开始读取消息的设置,通常将其设置为auto.offset.reset=earliest(如果分区下有提交的 offset 从提交的 offset 开始消费,如果没有提交的 offset,重新开始消费),以确保不会错过任何已提交的消息。
具体设置如下:
  1. #消息应答
  2. spring.kafka.producer.acks = -1
  3. #事务id配置
  4. spring.kafka.producer.transaction-id-prefix=my-transaction-
  5. #读取已提交的消息
  6. spring.kafka.consumer.isolation-level=read_committed
复制代码
Kafka 变乱消息案例演示

Producer 代码案例
对于生产者我们两种范例可以选择,使用 KafkaTemplate 举行变乱消息发送和使用原生 API 举行变乱消息发送。
使用 KafkaTemplate 完成变乱消息发送代码如下:
  1. package com.order.service.kafka.producer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaOperations;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.transaction.annotation.Transactional;
  8. /**
  9. * @ClassName: TransactionalProducer
  10. * @Author: Author
  11. * @Date: 2024/10/22 19:22
  12. * @Description: 事务消息生产者
  13. */
  14. @Slf4j
  15. @Component
  16. public class TransactionalProducer {
  17.     @Autowired
  18.     private KafkaTemplate<String, String> kafkaTemplate;
  19.     //发送事务消息(编程式)
  20.     public void sendTransactionalMessage(int number) {
  21.         try {
  22.             kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
  23.                 @Override
  24.                 public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {
  25.                     kafkaOperations.send("transactional-topic", "我是第一条事务消息");
  26.                     if (number == 0) {
  27.                         //模拟异常
  28.                         int a = 1 / 0;
  29.                     }
  30.                     kafkaOperations.send("transactional-topic", "我是第二条事务消息");
  31.                     return true;
  32.                 }
  33.             });
  34.         } catch (Exception e) {
  35.             e.printStackTrace();
  36.         }
  37.     }
  38.     //发送事务消息(注解方式)
  39.     @Transactional(rollbackFor = RuntimeException.class)
  40.     public void sendTransactionalMessageByAnnotation(int number) {
  41.         kafkaTemplate.send("transactional-topic", "我是第一条事务消息");
  42.         if (number == 0) {
  43.             //模拟异常
  44.             int a = 1 / 0;
  45.         }
  46.         kafkaTemplate.send("transactional-topic", "我是第二条事务消息");
  47.     }
  48. }
复制代码
上述消息生产者代码中我们使用了两种方式实现,分别是编程式变乱和注解式变乱消息,各人根据自己的喜欢来选择,两种方式都是可以的。
Consumer 代码案例
变乱消息的 Consumer 代码没有什么特殊之处,我们使用 Manual ACK 即可。
  1. package com.order.service.kafka.consumer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.kafka.support.Acknowledgment;
  5. import org.springframework.stereotype.Component;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. /**
  9. * @ClassName: TransactionalConsumer
  10. * @Author: Author
  11. * @Date: 2024/10/22 19:22
  12. * @Description: 事务消息消费者
  13. */
  14. @Slf4j
  15. @Component
  16. public class TransactionalConsumer {
  17.    
  18.     @KafkaListener(id = "transactional-consumer",
  19.             groupId = "transactional-consumer-groupId",
  20.             topics = "transactional-topic",
  21.             containerFactory = "myContainerFactory")
  22.     public void listen(String message, Acknowledgment acknowledgment) {
  23.         log.info("事务消息消费成功,消息内容:{}", message);
  24.         //手动提交 ACK
  25.         acknowledgment.acknowledge();
  26.     }
  27. }
复制代码
变乱消息结果验证
我们触发注解式变乱消息发送,numer 传值 1,得到如下结果:
  1. 2024-11-05 19:48:32.649  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
  2. 2024-11-05 19:48:32.655  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
复制代码
结果符合预期
  1. 2024-11-05 19:52:27.444  INFO 20652 --- [nio-8086-exec-8] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-my-transaction-0, transactionalId=my-transaction-0] Aborting incomplete transaction
  2. 2024-11-05 19:52:27.451 ERROR 20652 --- [y-transaction-0] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='我是注解式事务第一条事务消息' to topic transactional-topic:
  3. org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
  4.         at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) [kafka-clients-2.6.0.jar:na]
  5.         at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]
  6.         at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]
  7.         at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
  8. 2024-11-05 19:52:27.456 ERROR 20652 --- [nio-8086-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause
  9. java.lang.ArithmeticException: / by zero
  10.         at com.order.service.kafka.producer.TransactionalProducer.sendTransactionalMessageByAnnotation(TransactionalProducer.java:49) ~[classes/:na]
  11.         at com.order.service.kafka.producer.TransactionalProducer$$FastClassBySpringCGLIB$$dbc0d44d.invoke(<generated>) ~[classes/:na]
  12.         at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.6.jar:5.3.6]
  13.         at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.6.jar:5.3.6]
  14.         at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.6.jar:5.3.6]
  15.         at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]
  16.         at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.6.jar:5.3.6]
  17.         at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.6.jar:5.3.6]
  18.         at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.6.jar:5.3.6]
  19.         at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.6.jar:5.3.6]
  20.         at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]
  21.         at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.6.jar:5.3.6]
  22.         at com.order.service.kafka.producer.TransactionalProducer$$EnhancerBySpringCGLIB$$e4350198.sendTransactionalMessageByAnnotation(<generated>) ~[classes/:na]
  23.         at com.order.service.controller.KafkaController.sendTransactionalMessageByAnnotation(KafkaController.java:96) ~[classes/:na]
  24.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
  25.         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
  26.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
  27.         at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
  28.         at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]
  29.         at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]
  30.         at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]
  31.         at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]
  32.         at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]
  33.         at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]
  34.         at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]
  35.         at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]
  36.         at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]
  37.         at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]
  38.         at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
  39.         at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]
  40.         at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
  41.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  42.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  43.         at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]
  44.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  45.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  46.         at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]
  47.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
  48.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  49.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  50.         at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]
  51.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
  52.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  53.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  54.         at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]
  55.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
  56.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  57.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  58.         at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]
  59.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
  60.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  61.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  62.         at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
  63.         at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]
  64.         at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]
  65.         at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]
  66.         at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]
  67.         at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]
  68.         at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]
  69.         at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]
  70.         at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]
  71.         at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]
  72.         at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]
  73.         at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]
  74.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]
  75.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]
  76.         at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]
  77.         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
复制代码
从日志结果我们看到消费者没有消费到任何一条消息(这里我们是在第一条消息发送竣事后模拟的非常),结果符合预期。
编程式变乱消息这里就不在验证了,有爱好的朋侪可以自己去测试一下。
Producer 使用原生 API 发送变乱消息
使用原生 API 发送变乱消息,必要有一个 KafkaProducer 对象,我这里使用注入 Bean 的方式,具体如下:
  1. @Bean
  2. public KafkaProducer<String, String> kafkaProducer() {
  3.         return new KafkaProducer<>(getMyKafkaProps());
  4. }
复制代码
有了 KafkaProducer 只有我们调用其 API 即可完成变乱消息发送,KafkaProducer 有几个重要的 API 如下:
  1. //初始化事务
  2. public void initTransactions();
  3. //开启事务
  4. public void beginTransaction() throws ProducerFencedException ;
  5. //在事务内提交已经消费的偏移量
  6. public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
  7.                                                                          String consumerGroupId) throws ProducerFencedException ;
  8. //提交事务
  9. public void commitTransaction() throws ProducerFencedException;
  10. //丢弃事务
  11. public void abortTransaction() throws ProducerFencedException ;
复制代码
使用原生 API 举行变乱消息发送代码如下:
  1. package com.order.service.kafka.producer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.errors.ProducerFencedException;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * @ClassName: NativeTransactionalProducer
  10. * @Author: Author
  11. * @Date: 2024/10/22 19:22
  12. * @Description: 事务消息生产者 原生API
  13. */
  14. @Slf4j
  15. @Component
  16. public class NativeTransactionalProducer {
  17.     @Autowired
  18.     private KafkaProducer<String, String> kafkaProducer;
  19.     //发送事务消息(原生API)
  20.     public void sendTransactionalMessageByNativeApi(int number) {
  21.         try {
  22.             //初始化一个事务
  23.             kafkaProducer.initTransactions();
  24.             //开启事务
  25.             kafkaProducer.beginTransaction();
  26.             //发送消息
  27.             kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第一条事务消息"));
  28.             if (number == 0) {
  29.                 //模拟异常
  30.                 int a = 1 / 0;
  31.             }
  32.             kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第二条事务消息"));
  33.             //提交事务消息
  34.             kafkaProducer.commitTransaction();
  35.         } catch (ProducerFencedException e) {
  36.             //关闭资源
  37.             kafkaProducer.close();
  38.         } finally {
  39.             //关闭资源
  40.             kafkaProducer.close();
  41.         }
  42.     }
  43. }
复制代码
原生 API 发送变乱消息结果验证
我们触发原生 API 发送变乱消息发送,numer 传值 1,得到如下结果:
  1. 2024-11-05 19:48:32.649  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
  2. 2024-11-05 19:48:32.655  INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer   : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
复制代码
结果符合预期,非常的环境我这里就不演示了,有爱好的朋侪可以自己去验证。
  1. [/code] [b]变乱消息留意事项[/b]
  2. 只要开启了变乱消息功能,不管是 KafkaProducer 照旧 KafkaTemplate 发送的所有消息,都必须处于 Kafka 变乱之中,否则会抛出如下非常:
  3. [code]java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
  4.         at org.springframework.util.Assert.state(Assert.java:76)
  5.         at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:640)
  6.         at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:552)
  7.         at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)
  8.         at com.order.service.kafka.producer.ManualKafkaProducer.sendManualMessage(ManualKafkaProducer.java:34)
  9.         at com.order.service.controller.KafkaController.sendManualMessage(KafkaController.java:87)
  10.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  11.         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  12.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  13.         at java.lang.reflect.Method.invoke(Method.java:498)
  14.         at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
  15.         at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
  16.         at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
  17.         at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
  18.         at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
  19.         at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
  20.         at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)
  21.         at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)
  22.         at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
  23.         at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
  24.         at javax.servlet.http.HttpServlet.service(HttpServlet.java:626)
  25.         at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
  26.         at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
  27.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
  28.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  29.         at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
  30.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  31.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  32.         at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
  33.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
  34.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  35.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  36.         at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
  37.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
  38.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  39.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  40.         at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)
  41.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
  42.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  43.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  44.         at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
  45.         at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
  46.         at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  47.         at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  48.         at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
  49.         at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
  50.         at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
  51.         at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
  52.         at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
  53.         at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
  54.         at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
  55.         at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
  56.         at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
  57.         at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
  58.         at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
  59.         at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
  60.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  61.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  62.         at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
  63.         at java.lang.Thread.run(Thread.java:745)
复制代码
如果业务中,存在必要变乱的环境,也存在不必要变乱的环境,那么则必要分别定义两个 KafkaTemplate(Kafka Producer),按需使用。
总结:本篇分享了 Kafka 的变乱消息的使用,可以感受到 Kafka 的变乱消息和 RocketMQ 是完全不一样的,希望可以帮助到必要用到的伙伴们。
如有不正确的地方接待各位指出纠正。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

拉不拉稀肚拉稀

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表