媒介:
在分布式消息体系中,变乱消息也是一个热门课题,在项目的现实业务场景中,如果用到变乱消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持变乱消息,本篇我们将对 Kafka 的变乱消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及核心概念解说
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送消费
Kafka 之消息广播消费
Kafka 之消息并发消费
Kafka 之顺序消息
变乱消息的使用场景
变乱消息的使用场景众多,这里我简朴枚举几个如下:
- 金融交易处置惩罚:在金融领域,每笔交易都必须具备原子性,确保不发生不同等或重复的交易,变乱性消息可用于金融交易的场景,来保证交易的完整性。
- 订单处置惩罚:用户订单处置惩罚必须是完整的,必要保订单的创建、支付和发货不会出现题目,变乱性消息可以用于订单场景,来保证整个流程的完整性。
- 。。。。等等等场景。
什么是 Kafka 的变乱消息?
Kafka 变乱性消息是一种机制,用于确保消息的可靠性通报和处置惩罚,与非变乱性消息相比,它们在数据处置惩罚中提供了额外的保证,一旦消息被写入Kafka 集群,它们将被以为是已经处置惩罚,无论发生了什么,Kafka 的变乱不同于 RocketMQ,RocketMQ 是保障本地变乱与 RocketMQ 消息发送的变乱同等性,而 Kafka 的变乱消息主要是保障一次发送多条消息的变乱同等性,发送的消息要么同时成功要么同时失败。
Kafka 变乱消息的特性?
- 原子性:Kafka 变乱性消息要么完全成功,要么完全失败,保障了消息不会被部分处置惩罚。
- 可靠性:Kafka 变乱性消息一旦写入Kafka,它们将被视为已经处置惩罚,即使发生了应用程序或体系故障。
- 幂等性:Kafka 生产者可以设置为幂等,确保雷同的消息不会被重复发送,对于 Kafka 变乱消息通常会设置为幂等。
- Exactly Once 语义:仅一次,Kafka 变乱性消息支持仅一次,即消息要么完全到达一次,要么不到达。
Kafka的消息传输保障
Kafka 的消息传输保障分为 3个层级:
at most once:至多一次,消息大概丢失,但绝不会重复。
at least once:最少一次,消息绝不丢失,但大概重复传输。
exactly once:恰恰一次,每条消息肯定会被传输一次且仅传输一次。
Kafka 变乱消息的使用
Kafka 变乱消息的设置
Kafka 的变乱消息必要对生产者和消费者举行一些设置来启用 Kafka 对变乱消息的支持。
生产者设置
- acks:这个参数我们前面有见到过,是有关生产者接收到确认之后才以为消息发送成功的设置,对于变乱性消息,通常将其设置为acks=all(表示必要等到消息写入到所有 ISR 同步副本中,设置为 -1 也是一样的结果),以确保消息在变乱完全提交后才被视为成功发送。
- transactional.id:变乱id,这是用于标识生产者实例的唯一ID,在设置文件中设置 transactional.id 是启用变乱性消息的核心步调。
消费者设置
- isolation.level:Kafka 消费者隔离级别的设置,对于变乱性消息,通常将其设置为isolation.level=read_committed,确保只读取已经提交的变乱消息。
- auto.offset.reset:这是消费者启动时从哪里开始读取消息的设置,通常将其设置为auto.offset.reset=earliest(如果分区下有提交的 offset 从提交的 offset 开始消费,如果没有提交的 offset,重新开始消费),以确保不会错过任何已提交的消息。
具体设置如下:
- #消息应答
- spring.kafka.producer.acks = -1
- #事务id配置
- spring.kafka.producer.transaction-id-prefix=my-transaction-
- #读取已提交的消息
- spring.kafka.consumer.isolation-level=read_committed
复制代码 Kafka 变乱消息案例演示
Producer 代码案例
对于生产者我们两种范例可以选择,使用 KafkaTemplate 举行变乱消息发送和使用原生 API 举行变乱消息发送。
使用 KafkaTemplate 完成变乱消息发送代码如下:
- package com.order.service.kafka.producer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaOperations;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- /**
- * @ClassName: TransactionalProducer
- * @Author: Author
- * @Date: 2024/10/22 19:22
- * @Description: 事务消息生产者
- */
- @Slf4j
- @Component
- public class TransactionalProducer {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- //发送事务消息(编程式)
- public void sendTransactionalMessage(int number) {
- try {
- kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
- @Override
- public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {
- kafkaOperations.send("transactional-topic", "我是第一条事务消息");
- if (number == 0) {
- //模拟异常
- int a = 1 / 0;
- }
- kafkaOperations.send("transactional-topic", "我是第二条事务消息");
- return true;
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- //发送事务消息(注解方式)
- @Transactional(rollbackFor = RuntimeException.class)
- public void sendTransactionalMessageByAnnotation(int number) {
- kafkaTemplate.send("transactional-topic", "我是第一条事务消息");
- if (number == 0) {
- //模拟异常
- int a = 1 / 0;
- }
- kafkaTemplate.send("transactional-topic", "我是第二条事务消息");
- }
- }
复制代码 上述消息生产者代码中我们使用了两种方式实现,分别是编程式变乱和注解式变乱消息,各人根据自己的喜欢来选择,两种方式都是可以的。
Consumer 代码案例
变乱消息的 Consumer 代码没有什么特殊之处,我们使用 Manual ACK 即可。
- package com.order.service.kafka.consumer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- /**
- * @ClassName: TransactionalConsumer
- * @Author: Author
- * @Date: 2024/10/22 19:22
- * @Description: 事务消息消费者
- */
- @Slf4j
- @Component
- public class TransactionalConsumer {
-
- @KafkaListener(id = "transactional-consumer",
- groupId = "transactional-consumer-groupId",
- topics = "transactional-topic",
- containerFactory = "myContainerFactory")
- public void listen(String message, Acknowledgment acknowledgment) {
- log.info("事务消息消费成功,消息内容:{}", message);
- //手动提交 ACK
- acknowledgment.acknowledge();
- }
- }
复制代码 变乱消息结果验证
我们触发注解式变乱消息发送,numer 传值 1,得到如下结果:
- 2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
- 2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
复制代码 结果符合预期
- 2024-11-05 19:52:27.444 INFO 20652 --- [nio-8086-exec-8] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-my-transaction-0, transactionalId=my-transaction-0] Aborting incomplete transaction
- 2024-11-05 19:52:27.451 ERROR 20652 --- [y-transaction-0] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='我是注解式事务第一条事务消息' to topic transactional-topic:
- org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
- at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) [kafka-clients-2.6.0.jar:na]
- at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]
- at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]
- at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
- 2024-11-05 19:52:27.456 ERROR 20652 --- [nio-8086-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause
- java.lang.ArithmeticException: / by zero
- at com.order.service.kafka.producer.TransactionalProducer.sendTransactionalMessageByAnnotation(TransactionalProducer.java:49) ~[classes/:na]
- at com.order.service.kafka.producer.TransactionalProducer$$FastClassBySpringCGLIB$$dbc0d44d.invoke(<generated>) ~[classes/:na]
- at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.6.jar:5.3.6]
- at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.6.jar:5.3.6]
- at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.6.jar:5.3.6]
- at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]
- at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.6.jar:5.3.6]
- at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.6.jar:5.3.6]
- at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.6.jar:5.3.6]
- at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.6.jar:5.3.6]
- at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]
- at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.6.jar:5.3.6]
- at com.order.service.kafka.producer.TransactionalProducer$$EnhancerBySpringCGLIB$$e4350198.sendTransactionalMessageByAnnotation(<generated>) ~[classes/:na]
- at com.order.service.controller.KafkaController.sendTransactionalMessageByAnnotation(KafkaController.java:96) ~[classes/:na]
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
- at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
- at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]
- at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
- at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]
- at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]
- at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]
- at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
复制代码 从日志结果我们看到消费者没有消费到任何一条消息(这里我们是在第一条消息发送竣事后模拟的非常),结果符合预期。
编程式变乱消息这里就不在验证了,有爱好的朋侪可以自己去测试一下。
Producer 使用原生 API 发送变乱消息
使用原生 API 发送变乱消息,必要有一个 KafkaProducer 对象,我这里使用注入 Bean 的方式,具体如下:
- @Bean
- public KafkaProducer<String, String> kafkaProducer() {
- return new KafkaProducer<>(getMyKafkaProps());
- }
复制代码 有了 KafkaProducer 只有我们调用其 API 即可完成变乱消息发送,KafkaProducer 有几个重要的 API 如下:
- //初始化事务
- public void initTransactions();
- //开启事务
- public void beginTransaction() throws ProducerFencedException ;
- //在事务内提交已经消费的偏移量
- public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) throws ProducerFencedException ;
- //提交事务
- public void commitTransaction() throws ProducerFencedException;
- //丢弃事务
- public void abortTransaction() throws ProducerFencedException ;
复制代码 使用原生 API 举行变乱消息发送代码如下:
- package com.order.service.kafka.producer;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.errors.ProducerFencedException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * @ClassName: NativeTransactionalProducer
- * @Author: Author
- * @Date: 2024/10/22 19:22
- * @Description: 事务消息生产者 原生API
- */
- @Slf4j
- @Component
- public class NativeTransactionalProducer {
- @Autowired
- private KafkaProducer<String, String> kafkaProducer;
- //发送事务消息(原生API)
- public void sendTransactionalMessageByNativeApi(int number) {
- try {
- //初始化一个事务
- kafkaProducer.initTransactions();
- //开启事务
- kafkaProducer.beginTransaction();
- //发送消息
- kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第一条事务消息"));
- if (number == 0) {
- //模拟异常
- int a = 1 / 0;
- }
- kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第二条事务消息"));
- //提交事务消息
- kafkaProducer.commitTransaction();
- } catch (ProducerFencedException e) {
- //关闭资源
- kafkaProducer.close();
- } finally {
- //关闭资源
- kafkaProducer.close();
- }
- }
- }
复制代码 原生 API 发送变乱消息结果验证
我们触发原生 API 发送变乱消息发送,numer 传值 1,得到如下结果:
- 2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
- 2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
复制代码 结果符合预期,非常的环境我这里就不演示了,有爱好的朋侪可以自己去验证。
- [/code] [b]变乱消息留意事项[/b]
- 只要开启了变乱消息功能,不管是 KafkaProducer 照旧 KafkaTemplate 发送的所有消息,都必须处于 Kafka 变乱之中,否则会抛出如下非常:
- [code]java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
- at org.springframework.util.Assert.state(Assert.java:76)
- at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:640)
- at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:552)
- at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)
- at com.order.service.kafka.producer.ManualKafkaProducer.sendManualMessage(ManualKafkaProducer.java:34)
- at com.order.service.controller.KafkaController.sendManualMessage(KafkaController.java:87)
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
- at java.lang.reflect.Method.invoke(Method.java:498)
- at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
- at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
- at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
- at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
- at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
- at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
- at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)
- at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)
- at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
- at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
- at javax.servlet.http.HttpServlet.service(HttpServlet.java:626)
- at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
- at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
- at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
- at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
- at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
- at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
- at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
- at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
- at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
- at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
- at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
- at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
- at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
- at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
- at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
- at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
- at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
- at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
- at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
- at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
- at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
- at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
- at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
- at java.lang.Thread.run(Thread.java:745)
复制代码 如果业务中,存在必要变乱的环境,也存在不必要变乱的环境,那么则必要分别定义两个 KafkaTemplate(Kafka Producer),按需使用。
总结:本篇分享了 Kafka 的变乱消息的使用,可以感受到 Kafka 的变乱消息和 RocketMQ 是完全不一样的,希望可以帮助到必要用到的伙伴们。
如有不正确的地方接待各位指出纠正。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |