我们解压后的kafka 进文件夹,如下目次。我们紧张用到的就是bin和config 这两个目次。
bin 目次
bin 文件夹下都是一些执行的命令文件,我们临时会用到图中圈出的这几个命令。具体用法后面再讲,先说说这几个分别干啥。
1是消费者连接topic 消费消息的命令。
2是生产者连接topic 推送消息的命令。
3分别是启动和停止kafka服务的。
4是操纵topic 的指令,比如查察topic 列表或者删除topic
5分别是启动和停止zookeeperd服务,这里的zookeeper 是kafka 自带的。
config 目次
我们再来看看config 里面的文件。我们紧张就用到server.propertie 和zookeeper.properties
server.propertie
server.propertie 是启动kafka 时加载的配置文件。点击去看看,基本要改的就是下面这两个地方。
每一个broker都须要一个标识符,使用broker.id来表示。它的默认值是0,也可以被设置成任意其它整数。这个值在整个kafka集群里必须是唯一的。这个值可以任意选定。我这里设置的broker.id=1
还有kafka 默认启动服务的默认端口是9092.假如我们想要修改的话,就须要在server.propertie 中加上
port = 9093
固然改了这里,还得改其他对应配置文件的连接。这里是网上截图的。
zookeeper.properties
zookeeper.connect 紧张配置 zookeeper的链接。假如我们在其他地方安装的zookeeper ,就须要修改这里的配置了。
zookeeper.properties 文件是启动kafka 自带的zookeeper 时加载的配置。
里面的配置就比较少了,紧张是
libs 文件夹是kafka 运行依赖的jar 包,我们可以不用管,logs是kafka 运行产生的日志,我们排盘问题时用到,临时也不用管。
简单操纵
===================================================================
zookeeper
我们按序次来,由于kafka 启动要依赖zookeeper服务。以是我们来看zookeeper的命令。
启动zookeeper 服务
bin/zookeeper-server-start.sh ./config/zookeeper.properties
停止zookeeper 服务
bin/zookeeper-server-stop.sh
kafka 服务
启动好zookeeper 后,我们来启动kafka 服务。
bin/kafka-server-start.sh ./config/server.properties
关闭kafka 服务命令
bin/kafka-server-stop.sh
topic
启动好kafka 服务后,我们就可以创建topic 啦。创建topic的命令
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
这里可以看到创建topic 的命令算是比较复杂的, --zookeeper localhost:2181是指定zookeeper 服务。-replication-factor是指创建分区。
partitions 是创建备份。test1是topic 名称。
我们在创建一个tpoic test2. 然后查察topic 列表,须要指定zookeeper 连接
bin/kafka-topics.sh --list --zookeeper localhost:2181
删除一个topic,须要指定zookeeper 和删除的topic
bin/kafka-topics.sh --delete --topic quellanan --zookeeper localhost:2181
producer
我们已经创建了topic 。接下来我们可以让生产者推送消息到这个topic 上。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
–broker-list localhost:9092 连接上指定的kafka 服务器。
consumer
生产者生产了消息,接下来就须要消费者消费消息啦。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
–bootstrap-server localhost:9092 是连接特定的kafka 服务
–from-beginning 读取历史未消费的数据。
Springboot整合使用kafka
==================================================================================
上面哪些都是在服务器上操纵的,以是接下来我们须要在我们代码中使用kafka ,紧张是推送消息和消费消息。
这里由于我们kafka 摆设在服务器上,不是我们本地,以是须要kafka 配置文件中设置远程访问。紧张修改config/server.properties
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.51:9092
zookeeper.connect=192.168.1.51:2181
做如上修改就可以在远程访问啦。
produecer
准备工作做好了,我们如今创建一个springboot 项目 ,名为kafka-producer,作为kafka 生产者。我这里Springboot 版本是最新的2.2.4。
在pom.xml 文件中引入kafka 依赖。
org.springframework.kafka
spring-kafka
在配置文件中,配置kafka 连接
server.port=9000
spring.kafka.bootstrap-servers=192.168.1.51:9092
然后我们创建一个推动消息的接口。KafkaProducerController
内容如下:
@RestController
public class KafkaProducerController {
@Resource
private KafkaTemplate<String,String> KafkaTemplate;
@RequestMapping(“/send”)
public String sendMsg(@RequestParam(value = “topic”)String topic,@RequestParam(value = “msg”)String msg){
KafkaTemplate.send(topic,msg);
return “success”;
}
}
紧张是通过KafkaTemplate 向topic 推送消息的。这样就可以了,我们启动项目,调接口
http://localhost:9000/send?topic=test3&msg=hello world
控制台可以看到连接kafka 的信息。
并可以看到推送的是时间和commitID
consumer
接下来我们就须要创建一个kafka 消费者来监控topic ,假如有新的消息就吸收。
pom.xml 文件和配置文件连接kafka 服务器都是一样的。
我们创建一个类KafkaConsumer。
内容如下
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(groupId = “test-group”,topics = “test3”)
public void listen(String msg){
log.info(“吸收消息:”+msg);
}
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |