水军大提督 发表于 2023-8-30 04:42:29

部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)


[*]部署ZK
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
[*]部署Kafka
        docker run -d --name xdclass_kafka \
        -p 9092:9092 \
        -e KAFKA_BROKER_ID=0 \
        --env KAFKA_HEAP_OPTS=-Xmx256M \
        --env KAFKA_HEAP_OPTS=-Xms128M \
        -e KAFKA_ZOOKEEPER_CONNECT=[内网ip]:2181 \
        -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[外网ip]:9092 \
        -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0
[*]采用Slf4j采集日志(lombok)


[*]需求

[*]控制台输出访问日志,方便测试
[*]业务数据实际输出到kafka
[*]常用的框架 log4j、logback、self4j等

[*]log4j、logback、self4j 之间有啥关系

[*]SLF4J(Simple logging Facade for Java) 门面设计模式 |外观设计模式

[*]把不同的日志系统的实现进行了具体的抽象化,提供统一的日志使用接口
[*]具体的日志系统就有log4j,logback等;
[*]logback也是log4j的作者完成的,有更好的特性,可以取代log4j的一个日志框架, 是slf4j的原生实现
[*]log4j、logback可以单独的使用,也可以绑定slf4j一起使用

[*]编码规范建议不直接用log4j、logback的API,应该用self4j, 日后更换框架所带来的成本就很低



[*]依赖引入
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
[*]kafka配置application.properties
#----------kafka配置--------------
spring.kafka.bootstrap-servers=[外网ip]:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
[*]logback.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="./data/logs/link" />

   
    <appender name="console" >
      <encoder>
            <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
      </encoder>
    </appender>

   
    <appender name="rollingFile" >
      <file>${LOG_HOME}/link.log</file>
      <rollingPolicy >
            <fileNamePattern>${LOG_HOME}/link-%d{yyyy-MM-dd}.log</fileNamePattern>
      </rollingPolicy>
      <encoder>
            <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
      </encoder>
    </appender>


   
    <logger name="net.xdclass.service.impl.LogServiceImpl"
            level="INFO" additivity="false">
      <appender-ref ref="rollingFile" />
      <appender-ref ref="console" />
    </logger>

    <root level="info" additivity="false">
      <appender-ref ref="console" />
    </root>

</configuration>
[*]LogServiceImpl
@Service
@Slf4j
public class LogServiceImpl implements LogService {
       
        // Kafka:topic
    private static final String TOPIC_NAME = "ods_link_visit_topic";

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
   * 记录日志
   *
   * @param request
   * @param shortLinkCode
   * @param accountNo
   * @return
   */
    @Override
    public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
      // ip、 浏览器信息
      String ip = CommonUtil.getIpAddr(request);
      // 全部请求头
      Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);

      Map<String,String> availableMap = new HashMap<>();
      availableMap.put("user-agent",headerMap.get("user-agent"));
      availableMap.put("referer",headerMap.get("referer"));
      availableMap.put("accountNo",accountNo.toString());

      LogRecord logRecord = LogRecord.builder()
                //日志类型
                .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                //日志内容
                .data(availableMap)
                //客户端ip
                .ip(ip)
                // 时间
                .ts(CommonUtil.getCurrentTimestamp())
                //业务唯一标识(短链码)
                .bizId(shortLinkCode).build();

      String jsonLog = JsonUtil.obj2Json(logRecord);

      //打印日志 in 控制台
      log.info(jsonLog);

      // 发送kafka
      kafkaTemplate.send(TOPIC_NAME,jsonLog);

    }
}
[*]kafka命令
```
创建topic
./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic ods_link_visit_topic

查看topic
./kafka-topics.sh --list --zookeeper 172.17.0.1:2181

删除topic
./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic ods_link_visit_topic

消费者消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic ods_link_visit_topic

生产者发送消息
./kafka-console-producer.sh --broker-list 192.168.75.146:9092--topic ods_link_visit_topic
```
[*]测试
@Controller
@Slf4j
public class LinkApiController {
    @Autowired
    private ShortLinkService shortLinkService;

    @Autowired
    private LogService logService;


    /**
   *
   * @param shortLinkCode
   * @param request
   * @param response
   */
    @GetMapping(path = "/test")
    public void dispatch(HttpServletRequest request, HttpServletResponse response) {

      log.info("短链:{}", shortLinkCode);
      logService.recodeShortLinkLog(request, shortLinkCode, shortLinkVO.getAccountNo());
      
    }

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)