kafka-消息中间键(springboot集成)

[复制链接]
发表于 2023-2-9 11:13:03 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

×
特性

追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
kafka入门

1.导入依赖
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         <artifactId>spring-boot-starter-web</artifactId>
  5.     </dependency>
  6.    
  7.     <dependency>
  8.         <groupId>org.springframework.kafka</groupId>
  9.         <artifactId>spring-kafka</artifactId>
  10.         <exclusions>
  11.             <exclusion>
  12.                 <groupId>org.apache.kafka</groupId>
  13.                 <artifactId>kafka-clients</artifactId>
  14.             </exclusion>
  15.         </exclusions>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>org.apache.kafka</groupId>
  19.         <artifactId>kafka-clients</artifactId>
  20.     </dependency>
  21.     <dependency>
  22.         <groupId>com.alibaba</groupId>
  23.         <artifactId>fastjson</artifactId>
  24.     </dependency>
  25. </dependencies>
复制代码
2.配置文件

生产者:
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 192.168.200.130:9092        #主机IP:端口号
  4.     producer:
  5.       retries: 10
  6.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
消费者
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: 192.168.200.130:9092        #主机IP:端口号
  4.     consumer:
  5.       group-id: ${spring.application.name}
  6.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
复制代码
3.生产者发送消息
  1. @RestController
  2. public class HelloController {
  3.     @Autowired
  4.     private KafkaTemplate<String,String> kafkaTemplate;
  5.     //发送String
  6.     @GetMapping("/hello")
  7.     public String hello(){
  8.         String message = "Hello,SpringBoot Kafka!";
  9.         kafkaTemplate.send("topic",message);
  10.         return "success";
  11.     }
  12.    
  13.     //发送pojo
  14.         @GetMapping("/sendPojo")
  15.     public String sendPojo(){
  16.         User user = User.builder()
  17.                 .id(1001)
  18.                 .name("张三")
  19.                 .birthday(new Date())
  20.                 .build();
  21.         kafkaTemplate.send("pojo-topic", JSON.toJSONString(user));
  22.         return "success";
  23.     }
  24. }
复制代码
3.消费者接收消息
  1. @Component
  2. public class HelloListener {
  3.     //接收Stirng
  4.     @KafkaListener(topics = "topic")
  5.     public void onMessage(String message){
  6.         if(!StringUtils.isEmpty(message)){
  7.             System.out.println(message);
  8.         }
  9.     }
  10.     //接收pojo
  11.         @KafkaListener(topics = "pojo-topic")
  12.     public void onMessage2(String message) {
  13.         if (!StringUtils.isEmpty(message)){
  14.             User user = JSON.parseObject(message, User.class);
  15.             System.out.println("user = " + user);
  16.         }
  17.     }
  18. }
复制代码
4.总结


  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)(设置不同得组)
#kafka安装(docker)

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • docker安装zookeeper
    1. docker pull zookeeper:3.4.14
    复制代码
    创建容器
    1. docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
    复制代码
  • Docker安装kafka
    下载镜像:
    1. docker pull wurstmeister/kafka:2.12-2.3.1
    复制代码
    创建容器
    1. docker run -d --name kafka \
    2. --env KAFKA_ADVERTISED_HOST_NAME=主机IP \
    3. --env KAFKA_ZOOKEEPER_CONNECT=主机IP:2181 \
    4. --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://主机IP:9092 \
    5. --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    6. --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
    7. --restart always=true
    8. --net=host wurstmeister/kafka:2.12-2.3.1
    复制代码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
继续阅读请点击广告
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表