| 
特性
×
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册  
 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
 kafka入门
 
 1.导入依赖
 
 2.配置文件复制代码<dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>        <dependency>        <groupId>org.springframework.kafka</groupId>        <artifactId>spring-kafka</artifactId>        <exclusions>            <exclusion>                <groupId>org.apache.kafka</groupId>                <artifactId>kafka-clients</artifactId>            </exclusion>        </exclusions>    </dependency>    <dependency>        <groupId>org.apache.kafka</groupId>        <artifactId>kafka-clients</artifactId>    </dependency>    <dependency>        <groupId>com.alibaba</groupId>        <artifactId>fastjson</artifactId>    </dependency></dependencies>
 生产者:
 
 消费者复制代码spring:  kafka:    bootstrap-servers: 192.168.200.130:9092        #主机IP:端口号    producer:      retries: 10      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer
 3.生产者发送消息复制代码spring:  kafka:    bootstrap-servers: 192.168.200.130:9092        #主机IP:端口号    consumer:      group-id: ${spring.application.name}      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 3.消费者接收消息复制代码@RestControllerpublic class HelloController {    @Autowired    private KafkaTemplate<String,String> kafkaTemplate;    //发送String    @GetMapping("/hello")    public String hello(){        String message = "Hello,SpringBoot Kafka!";        kafkaTemplate.send("topic",message);        return "success";    }        //发送pojo        @GetMapping("/sendPojo")    public String sendPojo(){        User user = User.builder()                .id(1001)                .name("张三")                .birthday(new Date())                .build();        kafkaTemplate.send("pojo-topic", JSON.toJSONString(user));        return "success";    }}
 4.总结复制代码@Componentpublic class HelloListener {    //接收Stirng    @KafkaListener(topics = "topic")    public void onMessage(String message){        if(!StringUtils.isEmpty(message)){            System.out.println(message);        }    }    //接收pojo        @KafkaListener(topics = "pojo-topic")    public void onMessage2(String message) {        if (!StringUtils.isEmpty(message)){            User user = JSON.parseObject(message, User.class);            System.out.println("user = " + user);        }    }}
 
 #kafka安装(docker)
 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)(设置不同得组)
 
 Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
 
 
 docker安装zookeeper创建容器复制代码docker pull zookeeper:3.4.14
复制代码docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
Docker安装kafka下载镜像:
 创建容器复制代码docker pull wurstmeister/kafka:2.12-2.3.1
复制代码docker run -d --name kafka \--env KAFKA_ADVERTISED_HOST_NAME=主机IP \--env KAFKA_ZOOKEEPER_CONNECT=主机IP:2181 \--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://主机IP:9092 \--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \--restart always=true--net=host wurstmeister/kafka:2.12-2.3.1
 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
 |