马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
步调员的一样平常:看日志 →猜题目→改代码→看日志 。有了ELK,整个过程从"猜"酿成"查"。
ELK是什么?
- ELK Stack
- ├── Elasticsearch → 存储 + 搜索(核心)
- ├── Logstash → 收集 + 过滤 + 转发(处理器)
- ├── Kibana → 查询 + 可视化(界面)
- └── Beats → 日志
采集Agent(搬运工)
复制代码 新版本叫 ELK Stack = Elasticsearch + Logstash + Kibana + Beats
团体架构
- 你的应用(Python/Go/Java)
- ↓ 输出JSON日志(stdout或文件)
- Filebeat(轻量Agent,部署在每台服务器)
- ↓ 采集日志,转发
- Logstash(过滤、解析、丰富)
- ↓ 写入
- Elasticsearch(存储、索引、搜索)
- ↓ 查询展示
- Kibana(可视化、告警、Dashboard)
复制代码 第一步:Docker Compose一键启动
让MonkeyCode天生:- # docker-compose.elk.yml
- version: '3.8'
- services:
- elasticsearch:
- image: elasticsearch:8.12.0
- environment:
- - discovery.type=single-node
- - xpack.security.enabled=false
- - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
- ports: ["9200:9200", "9300:9300"]
- volumes: ["es_data:/usr/share/elasticsearch/data"]
- healthcheck:
- test: ["CMD-SHELL", "curl -f http://localhost:9200 || exit 1"]
- interval: 30s
- timeout: 10s
- retries: 5
-
- logstash:
- image: logstash:8.12.0
- ports: ["5044:5044", "5000:5000/udp"]
- volumes:
- - ./logstash/pipeline:/usr/share/logstash/pipeline:ro
- - ./logstash/config:/usr/share/logstash/config:ro
- depends_on: [elasticsearch]
-
- kibana:
- image: kibana:8.12.0
- ports: ["5601:5601"]
- environment:
- - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- depends_on: [elasticsearch]
-
- filebeat:
- image: elastic/filebeat:8.12.0
- user: root
- volumes:
- - /var/lib/docker/containers:/var/lib/docker/containers:ro
- - /var/run/docker.sock:/var/run/docker.sock:ro
- - ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- - ./logs:/var/log/app:ro
- depends_on: [logstash]
- network_mode: "host"
- volumes:
- es_data:
复制代码 启动:- docker-compose -f docker-compose.elk.yml up -d
复制代码 访问Kibana:http://localhost:5601
第二步:应用输出布局化日志
关键原则:永久输出JSON格式,不要输出文本日志。- # app/logging_config.py
- import structlog
- import json
- import sys
- def setup_logging():
- structlog.configure(
- processors=[
- # 添加时间戳
- structlog.processors.TimeStamper(fmt="iso"),
- # 添加日志级别
- structlog.processors.add_log_level,
- # 添加进程/线程信息
- structlog.processors.add_logger_name,
- # 异常信息格式化
- structlog.processors.format_exc_info,
- # 输出为JSON
- structlog.dev.JSONRenderer()
- ],
- logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
- wrapper_class=structlog.stdlib.BoundLogger,
- )
- logger = structlog.get_logger()
- # 使用
- async def create_order(req: CreateOrderRequest, user_id: int):
- log = logger.bind(user_id=user_id, endpoint="/orders")
- log.info("order.create.attempt", amount=req.amount, items=len(req.items))
- try:
- order = await db.create_order(req, user_id)
- log.info("order.create.success", order_id=order.id, amount=order.total_amount)
- return order
- except Exception as e:
- log.error("order.create.failed", error=str(e), exc_info=True)
- raise
复制代码 输出到stdout的JSON日志:- {"event": "order.create.success", "level": "info", "timestamp": "2024-06-01T10:30:00Z", "user_id": 42, "order_id": 10086, "amount": 299.0}
复制代码 第三步:Filebeat收罗日志
- # filebeat.yml
- filebeat.inputs:
- - type: log
- enabled: true
- paths:
- - /var/log/app/*.log
- - /var/log/app/*.json
- json.keys_under_root: true # 解析JSON,字段提到顶层
- json.overwrite_keys: true
- tags: ["app", "production"]
- fields:
- env: production
- app: my-service
- fields_under_root: true
- - type: container # Docker容器日志
- enabled: true
- paths:
- - /var/lib/docker/containers/*/*.log
- json.keys_under_root: true
- processors:
- - add_docker_metadata: ~
- - add_host_metadata: ~
- - dissect:
- tokenizer: "%{timestamp} %{level} %{message}"
- field: "message"
- target_prefix: ""
- # 输出到Logstash(推荐)或直连Elasticsearch
- output.logstash:
- hosts: ["logstash:5044"]
- # 或者直接输出到Elasticsearch(跳过Logstash,简单场景)
- # output.elasticsearch:
- # hosts: ["elasticsearch:9200"]
- # index: "my-app-%{+yyyy.MM.dd}"
复制代码 第四步:Logstash过滤与分析
- # logstash/pipeline/logstash.conf
- input {
- beats {
- port => 5044
- }
- }
- filter {
- # 解析时间戳
- if [timestamp] {
- date {
- match => [ "timestamp", "ISO8601" ]
- }
- }
-
- # 解析JSON字段(如果Filebeat没解析)
- if [message] =~ /^\{.*\}$/ {
- json {
- source => "message"
- skip_on_invalid_json => true
- }
- }
-
- # 添加地理位置(IP地址)
- if [remote_ip] {
- geoip {
- source => "remote_ip"
- target => "geoip"
- }
- }
-
- # 添加User-Agent解析
- if [user_agent] {
- useragent {
- source => "user_agent"
- target => "ua"
- }
- }
-
- # 过滤掉健康检查日志
- if [endpoint] == "/health" {
- drop {}
- }
- }
- output {
- elasticsearch {
- hosts => ["elasticsearch:9200"]
- index => "logs-%{+YYYY.MM.dd}"
- manage_template => false
- }
-
- # 调试用:同时输出到控制台
- # stdout { codec => rubydebug }
- }
复制代码 第五步:在Kibana中查询
启动后,在Kibana中:
1. 创建Index Pattern
进入 Management → Stack Management → Index Patterns,创建 logs-*
2. 常用查询语法(KQL)
[code]# 正确匹配level: error# 暗昧搜索message: "order"# 多条件level: error AND service: payment# 范围查询@timestamp >= "2024-06-01T00:00:00Z" AND @timestamp |