部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)
[*]部署ZK
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
[*]部署Kafka
docker run -d --name xdclass_kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
--env KAFKA_HEAP_OPTS=-Xmx256M \
--env KAFKA_HEAP_OPTS=-Xms128M \
-e KAFKA_ZOOKEEPER_CONNECT=[内网ip]:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[外网ip]:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0
[*]采用Slf4j采集日志(lombok)
[*]需求
[*]控制台输出访问日志,方便测试
[*]业务数据实际输出到kafka
[*]常用的框架 log4j、logback、self4j等
[*]log4j、logback、self4j 之间有啥关系
[*]SLF4J(Simple logging Facade for Java) 门面设计模式 |外观设计模式
[*]把不同的日志系统的实现进行了具体的抽象化,提供统一的日志使用接口
[*]具体的日志系统就有log4j,logback等;
[*]logback也是log4j的作者完成的,有更好的特性,可以取代log4j的一个日志框架, 是slf4j的原生实现
[*]log4j、logback可以单独的使用,也可以绑定slf4j一起使用
[*]编码规范建议不直接用log4j、logback的API,应该用self4j, 日后更换框架所带来的成本就很低
[*]依赖引入
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
[*]kafka配置application.properties
#----------kafka配置--------------
spring.kafka.bootstrap-servers=[外网ip]:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
[*]logback.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_HOME" value="./data/logs/link" />
<appender name="console" >
<encoder>
<pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
</encoder>
</appender>
<appender name="rollingFile" >
<file>${LOG_HOME}/link.log</file>
<rollingPolicy >
<fileNamePattern>${LOG_HOME}/link-%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
</encoder>
</appender>
<logger name="net.xdclass.service.impl.LogServiceImpl"
level="INFO" additivity="false">
<appender-ref ref="rollingFile" />
<appender-ref ref="console" />
</logger>
<root level="info" additivity="false">
<appender-ref ref="console" />
</root>
</configuration>
[*]LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {
// Kafka:topic
private static final String TOPIC_NAME = "ods_link_visit_topic";
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 记录日志
*
* @param request
* @param shortLinkCode
* @param accountNo
* @return
*/
@Override
public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
// ip、 浏览器信息
String ip = CommonUtil.getIpAddr(request);
// 全部请求头
Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);
Map<String,String> availableMap = new HashMap<>();
availableMap.put("user-agent",headerMap.get("user-agent"));
availableMap.put("referer",headerMap.get("referer"));
availableMap.put("accountNo",accountNo.toString());
LogRecord logRecord = LogRecord.builder()
//日志类型
.event(LogTypeEnum.SHORT_LINK_TYPE.name())
//日志内容
.data(availableMap)
//客户端ip
.ip(ip)
// 时间
.ts(CommonUtil.getCurrentTimestamp())
//业务唯一标识(短链码)
.bizId(shortLinkCode).build();
String jsonLog = JsonUtil.obj2Json(logRecord);
//打印日志 in 控制台
log.info(jsonLog);
// 发送kafka
kafkaTemplate.send(TOPIC_NAME,jsonLog);
}
}
[*]kafka命令
```
创建topic
./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic ods_link_visit_topic
查看topic
./kafka-topics.sh --list --zookeeper 172.17.0.1:2181
删除topic
./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic ods_link_visit_topic
消费者消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic ods_link_visit_topic
生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.75.146:9092--topic ods_link_visit_topic
```
[*]测试
@Controller
@Slf4j
public class LinkApiController {
@Autowired
private ShortLinkService shortLinkService;
@Autowired
private LogService logService;
/**
*
* @param shortLinkCode
* @param request
* @param response
*/
@GetMapping(path = "/test")
public void dispatch(HttpServletRequest request, HttpServletResponse response) {
log.info("短链:{}", shortLinkCode);
logService.recodeShortLinkLog(request, shortLinkCode, shortLinkVO.getAccountNo());
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]