马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
日记是体系的黑匣子
线上出标题,第一反应就是看日记。但日记散落在各台服务器,grep查起来慢,关键错误被沉没在海量日记里……
这篇文章用MonkeyCode构建一个完备的日记分析体系:网络→存储→分析→告警,全流程主动化。
体系架构
- [应用服务器] --Fluentd--> [Kafka] --> [Logstash] --> [Elasticsearch] --> [Kibana]
- |
- --> [告警服务] --> [Slack/企微]
复制代码 给MonkeyCode的Prompt
- 构建日志
分析系统,要求: - 1. 日志
收集:Fluentd配置(收集Nginx、应用日志 ) - 2. 消息队列:Kafka(解耦收集和处理)
- 3. 日志处理:Logstash(解析、过滤、转换)
- 4. 存储:Elasticsearch(索引、搜索)
- 5. 可视化:Kibana仪表板
- 6. 告警规则:错误率、响应时间、异常关键字
- 7. 告警通知:Webhook发送到Slack
- 日志格式:JSON(包含timestamp, level, service, message, trace_id)
复制代码 1. Fluentd设置
- # fluentd/fluent.conf - MonkeyCode生成
- # Nginx访问日志
- <source>
- @type tail
- path /var/log/nginx/access.log
- pos_file /var/log/td-agent/nginx-access.log.pos
- tag nginx.access
- format nginx
- </source>
- # Nginx错误日志
- <source>
- @type tail
- path /var/log/nginx/error.log
- pos_file /var/log/td-agent/nginx-error.log.pos
- tag nginx.error
- format regexp
- expression /^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[(\w+)\] (\d+#\d+): (.*)$/
- </source>
- # 应用JSON日志
- <source>
- @type tail
- path /var/log/app/*.log
- pos_file /var/log/td-agent/app.log.pos
- tag app
- format json
- time_key timestamp
- time_format %Y-%m-%dT%H:%M:%S.%NZ
- </source>
- # 发送到Kafka
- <match **>
- @type kafka2
-
- brokers kafka1:9092,kafka2:9092,kafka3:9092
- topic_key topic
- default_topic logs
-
- <format>
- @type json
- </format>
-
- <buffer>
- @type file
- path /var/log/td-agent/buffer
- flush_interval 5s
- chunk_limit_size 16MB
- queue_limit_length 1024
- </buffer>
- </match>
复制代码 2. Kafka设置
- # kafka/docker-compose.yml
- version: '3.8'
- services:
- zookeeper:
- image: confluentinc/cp-zookeeper:7.3.0
- environment:
- ZOOKEEPER_CLIENT_PORT: 2181
- ZOOKEEPER_TICK_TIME: 2000
- volumes:
- - zookeeper-data:/var/lib/zookeeper/data
- kafka:
- image: confluentinc/cp-kafka:7.3.0
- depends_on:
- - zookeeper
- ports:
- - "9092:9092"
- environment:
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_LOG_RETENTION_HOURS: 24
- KAFKA_LOG_SEGMENT_BYTES: 1073741824
- KAFKA_RETENTION_CHECK_INTERVAL_MS: 300000
- volumes:
- - kafka-data:/var/lib/kafka/data
- volumes:
- zookeeper-data:
- kafka-data:
复制代码 3. Logstash管道
- # logstash/pipeline/logs.conf - MonkeyCode生成
- input {
- kafka {
- bootstrap_servers => "kafka1:9092,kafka2:9092"
- topics => ["logs"]
- group_id => "logstash"
- consumer_threads => 4
- decorate_events => true
- }
- }
- filter {
- # 解析Nginx访问日志
- if "nginx.access" in [tags] {
- grok {
- match => { "message" => "%{COMBINEDAPACHELOG}" }
- }
- date {
- match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
- }
-
- # 提取响应时间
- grok {
- match => { "message" => "rt=(?<response_time>\d+\.\d+)" }
- }
-
- # 地理位置解析
- geoip {
- source => "clientip"
- target => "geoip"
- }
-
- # User-Agent解析
- useragent {
- source => "agent"
- target => "useragent"
- }
- }
-
- # 解析应用JSON日志
- if [level] {
- # 标准化日志级别
- mutate {
- replace => { "log_level" => "%{level}" }
- }
-
- # 错误日志特殊处理
- if [level] == "ERROR" or [level] == "FATAL" {
- mutate {
- add_tag => ["error"]
- }
-
- # 提取异常栈(如果有)
- if [exception] {
- grok {
- match => { "exception" => "(?<exception_class>\w+\.\w+Exception): (?<exception_message>.*)" }
- }
- }
- }
- }
-
- # 添加服务标识
- mutate {
- add_field => { "environment" => "production" }
- add_field => { "hostname" => "%{host}" }
- }
-
- # 移除不需要的字段
- mutate {
- remove_field => [ "agent", "auth" ]
- }
- }
- output {
- # 输出到Elasticsearch
- elasticsearch {
- hosts => ["http://elasticsearch:9200"]
- index => "logs-%{[service]}-%{+YYYY.MM.dd}"
- template_name => "logs"
- template_overwrite => true
- }
-
- # 错误日志同时发送到告警主题
- if "error" in [tags] {
- kafka {
- bootstrap_servers => "kafka1:9092"
- topic_id => "alerts"
- }
- }
- }
复制代码 4. Elasticsearch索引模板
- // elasticsearch/templates/logs.json
- {
- "index_patterns": ["logs-*"],
- "settings": {
- "number_of_shards": 3,
- "number_of_replicas": 1,
- "refresh_interval": "5s"
- },
- "mappings": {
- "properties": {
- "@timestamp": { "type": "date" },
- "level": { "type": "keyword" },
- "service": { "type": "keyword" },
- "trace_id": { "type": "keyword" },
- "message": { "type": "text", "analyzer": "standard" },
- "exception": { "type": "text", "index": false },
- "hostname": { "type": "keyword" },
- "environment": { "type": "keyword" },
- "response_time": { "type": "float" },
- "status": { "type": "integer" },
- "request": {
- "properties": {
- "method": { "type": "keyword" },
- "path": { "type": "keyword" },
- "params": { "type": "object", "enabled": false }
- }
- },
- "user": {
- "properties": {
- "id": { "type": "keyword" },
- "ip": { "type": "ip" }
- }
- },
- "geoip": {
- "properties": {
- "country": { "type": "keyword" },
- "city": { "type": "keyword" },
- "location": { "type": "geo_point" }
- }
- }
- }
- }
- }
复制代码 5. 告警服务
- # alert_service.py - MonkeyCode生成
- import json
- import requests
- import time
- from kafka import KafkaConsumer
- from elasticsearch import Elasticsearch
- from datetime import datetime, timedelta
- from dataclasses import dataclass
- from typing import List, Dict, Any
- import logging
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- @dataclass
- class AlertRule:
- name: str
- condition: dict # ES查询条件
- threshold: int
- time_window: int # 秒
- severity: str # critical/warning/info
- channels: List[str] # ['slack', 'wechat']
- class AlertService:
- """日志告警服务"""
-
- def __init__(self, es_hosts: list, kafka_servers: list):
- self.es = Elasticsearch(es_hosts)
- self.consumer = KafkaConsumer(
- 'alerts',
- bootstrap_servers=kafka_servers,
- value_deserializer=lambda m: json.loads(m.decode('utf-8'))
- )
- self.alert_rules = self._load_rules()
- self.alert_cooldown = {} # 防止重复告警
-
- def _load_rules(self) -> List[AlertRule]:
- """加载告警规则"""
- return [
- # 规则1:5分钟内错误超过100次
- AlertRule(
- name='high_error_rate',
- condition={'term': {'level': 'ERROR'}},
- threshold=100,
- time_window=300,
- severity='critical',
- channels=['slack', 'wechat']
- ),
- # 规则2:5分钟内500响应超过50次
- AlertRule(
- name='high_500_rate',
- condition={'term': {'status': 500}},
- threshold=50,
- time_window=300,
- severity='critical',
- channels=['slack']
- ),
- # 规则3:P99响应时间超过2秒
- AlertRule(
- name='slow_response',
- condition={'range': {'response_time': {'gt': 2}}},
- threshold=10,
- time_window=60,
- severity='warning',
- channels=['slack']
- ),
- # 规则4:特定异常出现
- AlertRule(
- name='critical_exception',
- condition={'match': {'exception_class': 'OutOfMemoryError'}},
- threshold=1,
- time_window=60,
- severity='critical',
- channels=['slack', 'wechat']
- ),
- ]
-
- def check_rules(self):
- """检查所有告警规则"""
- now = datetime.utcnow()
-
- for rule in self.alert_rules:
- # 冷却期检查(避免重复告警)
- if rule.name in self.alert_cooldown:
- if now < self.alert_cooldown[rule.name]:
- continue
-
- # 构建ES查询
- query = {
- "query": {
- "bool": {
- "must": [
- rule.condition,
- {"range": {"@timestamp": {"gte": f"now-{rule.time_window}s"}}}
- ]
- }
- },
- "size": 0
- }
-
- # 执行查询
- result = self.es.search(index="logs-*", body=query)
- count = result['hits']['total']['value']
-
- # 判断是否触发
- if count >= rule.threshold:
- self._send_alert(rule, count)
- # 设置冷却期(5分钟内不重复告警)
- self.alert_cooldown[rule.name] = now + timedelta(minutes=5)
-
- def _send_alert(self, rule: AlertRule, count: int):
- """发送告警"""
- alert_data = {
- "rule_name": rule.name,
- "severity": rule.severity,
- "count": count,
- "threshold": rule.threshold,
- "time_window": f"{rule.time_window}秒",
- "timestamp": datetime.utcnow().isoformat()
- }
-
- logger.warning(f"Alert triggered: {rule.name}, count={count}")
-
- if 'slack' in rule.channels:
- self._send_to_slack(alert_data)
-
- if 'wechat' in rule.channels:
- self._send_to_wechat(alert_data)
-
- def _send_to_slack(self, alert: dict):
- """发送到Slack"""
- webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
-
- color = {
- 'critical': 'danger',
- 'warning': 'warning',
- 'info': '#439FE0'
- }.get(alert['severity'], 'warning')
-
- payload = {
- "attachments": [{
- "color": color,
- "title": f"🚨 Alert: {alert['rule_name']}",
- "fields": [
- {"title": "严重程度", "value": alert['severity'], "short": True},
- {"title": "触发次数", "value": str(alert['count']), "short": True},
- {"title": "阈值", "value": str(alert['threshold']), "short": True},
- {"title": "时间窗口", "value": alert['time_window'], "short": True},
- ],
- "footer": alert['timestamp']
- }]
- }
-
- try:
- requests.post(webhook_url, json=payload, timeout=5)
- except Exception as e:
- logger.error(f"Failed to send Slack alert: {e}")
-
- def _send_to_wechat(self, alert: dict):
- """发送到企业微信"""
- webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
-
- payload = {
- "msgtype": "markdown",
- "markdown": {
- "content": f"""
- ## 🚨 告警通知
- **规则**: {alert['rule_name']}
- **严重程度**: {alert['severity']}
- **触发次数**: {alert['count']}
- **阈值**: {alert['threshold']}
- **时间窗口**: {alert['time_window']}
- **时间**: {alert['timestamp']}
- """
- }
- }
-
- try:
- requests.post(webhook_url, json=payload, timeout=5)
- except Exception as e:
- logger.error(f"Failed to send WeChat alert: {e}")
-
- def run(self):
- """主循环"""
- logger.info("Alert service started")
-
- while True:
- try:
- self.check_rules()
- time.sleep(30) # 每30秒检查一次
- except KeyboardInterrupt:
- break
- except Exception as e:
- logger.error(f"Error in alert loop: {e}")
- time.sleep(60)
- if __name__ == '__main__':
- service = AlertService(
- es_hosts=['http://elasticsearch:9200'],
- kafka_servers=['kafka1:9092']
- )
- service.run()
复制代码 6. Kibana仪表板设置
- // kibana/dashboard.json
- {
- "version": "8.0.0",
- "objects": [
- {
- "id": "logs-overview",
- "type": "dashboard",
- "attributes": {
- "title": "Logs Overview",
- "panelsJSON": [
- {
- "id": "error-count",
- "type": "visualization",
- "gridData": {"x": 0, "y": 0, "w": 6, "h": 4},
- "panelConfig": {
- "visType": "metric",
- "aggs": [
- {"id": "1", "type": "count", "schema": "metric"}
- ],
- "filters": [
- {"query": "level:ERROR", "label": "Errors"}
- ]
- }
- },
- {
- "id": "response-time-percentiles",
- "type": "visualization",
- "gridData": {"x": 6, "y": 0, "w": 6, "h": 4},
- "panelConfig": {
- "visType": "line",
- "aggs": [
- {"id": "1", "type": "percentiles", "field": "response_time", "percents": [50, 95, 99]}
- ]
- }
- },
- {
- "id": "top-services",
- "type": "visualization",
- "gridData": {"x": 0, "y": 4, "w": 6, "h": 4},
- "panelConfig": {
- "visType": "pie",
- "aggs": [
- {"id": "1", "type": "terms", "field": "service", "size": 10}
- ]
- }
- },
- {
- "id": "error-trend",
- "type": "visualization",
- "gridData": {"x": 6, "y": 4, "w": 6, "h": 4},
- "panelConfig": {
- "visType": "area",
- "aggs": [
- {"id": "1", "type": "count"},
- {"id": "2", "type": "date_histogram", "field": "@timestamp", "interval": "1h"}
- ],
- "filters": [
- {"query": "level:ERROR"}
- ]
- }
- }
- ]
- }
- }
- ]
- }
复制代码 7. 日记查询工具
- # log_query.py - MonkeyCode生成
- from elasticsearch import Elasticsearch
- from datetime import datetime, timedelta
- from typing import List, Dict, Any, Optional
- import json
- class LogQuery:
- """日志查询工具"""
-
- def __init__(self, es_hosts: list):
- self.es = Elasticsearch(es_hosts)
-
- def search(self, query: str, service: str = None, level: str = None,
- start_time: datetime = None, end_time: datetime = None,
- size: int = 100) -> List[Dict]:
- """通用搜索"""
- must = []
-
- # 关键字搜索
- if query:
- must.append({"match": {"message": query}})
-
- # 服务过滤
- if service:
- must.append({"term": {"service": service}})
-
- # 日志级别过滤
- if level:
- must.append({"term": {"level": level}})
-
- # 时间范围
- time_range = {}
- if start_time:
- time_range["gte"] = start_time.isoformat()
- if end_time:
- time_range["lte"] = end_time.isoformat()
- if time_range:
- must.append({"range": {"@timestamp": time_range}})
-
- # 执行查询
- body = {
- "query": {"bool": {"must": must}} if must else {"match_all": {}},
- "size": size,
- "sort": [{"@timestamp": {"order": "desc"}}]
- }
-
- result = self.es.search(index="logs-*", body=body)
-
- return [hit["_source"] for hit in result["hits"]["hits"]]
-
- def get_by_trace_id(self, trace_id: str) -> List[Dict]:
- """根据trace_id查询完整链路"""
- body = {
- "query": {"term": {"trace_id": trace_id}},
- "size": 1000,
- "sort": [{"@timestamp": {"order": "asc"}}]
- }
-
- result = self.es.search(index="logs-*", body=body)
- return [hit["_source"] for hit in result["hits"]["hits"]]
-
- def get_error_context(self, log_id: str, before: int = 10, after: int = 10) -> List[Dict]:
- """获取错误日志上下文"""
- # 先找到目标日志
- target = self.es.get(index="logs-*", id=log_id)
- target_time = target["_source"]["@timestamp"]
- hostname = target["_source"]["hostname"]
-
- # 查询前后的日志
- body = {
- "query": {
- "bool": {
- "must": [
- {"term": {"hostname": hostname}},
- {"range": {
- "@timestamp": {
- "gte": datetime.fromisoformat(target_time) - timedelta(minutes=5),
- "lte": datetime.fromisoformat(target_time) + timedelta(minutes=5)
- }
- }}
- ]
- }
- },
- "size": before + after + 1,
- "sort": [{"@timestamp": {"order": "asc"}}]
- }
-
- result = self.es.search(index="logs-*", body=body)
- return [hit["_source"] for hit in result["hits"]["hits"]]
-
- def stats_by_service(self, time_range: str = "1h") -> Dict:
- """按服务统计日志数量"""
- body = {
- "query": {"range": {"@timestamp": {"gte": f"now-{time_range}"}}},
- "size": 0,
- "aggs": {
- "by_service": {
- "terms": {"field": "service", "size": 20},
- "aggs": {
- "by_level": {
- "terms": {"field": "level"}
- }
- }
- }
- }
- }
-
- result = self.es.search(index="logs-*", body=body)
-
- stats = {}
- for bucket in result["aggregations"]["by_service"]["buckets"]:
- service = bucket["key"]
- stats[service] = {
- "total": bucket["doc_count"],
- "by_level": {
- b["key"]: b["doc_count"]
- for b in bucket["by_level"]["buckets"]
- }
- }
-
- return stats
- # 使用示例
- query = LogQuery(['http://localhost:9200'])
- # 搜索最近1小时的错误日志
- errors = query.search(
- query="OutOfMemoryError",
- level="ERROR",
- start_time=datetime.utcnow() - timedelta(hours=1)
- )
- # 查询trace_id链路
- trace_logs = query.get_by_trace_id("abc-123-def")
- # 按服务统计
- stats = query.stats_by_service("1h")
- for service, data in stats.items():
- print(f"{service}: {data}")
复制代码 8. Docker Compose一键摆设
- # docker-compose.yml - MonkeyCode生成
- version: '3.8'
- services:
- elasticsearch:
- image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
- environment:
- - discovery.type=single-node
- - xpack.security.enabled=false
- - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- ports:
- - "9200:9200"
- volumes:
- - es-data:/usr/share/elasticsearch/data
- healthcheck:
- test: ["CMD", "curl", "-f", "http://localhost:9200/_cluster/health"]
- interval: 10s
- timeout: 5s
- retries: 5
- kibana:
- image: docker.elastic.co/kibana/kibana:8.11.0
- depends_on:
- elasticsearch:
- condition: service_healthy
- ports:
- - "5601:5601"
- environment:
- - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- logstash:
- image: docker.elastic.co/logstash/logstash:8.11.0
- depends_on:
- - elasticsearch
- volumes:
- - ./logstash/pipeline:/usr/share/logstash/pipeline
- ports:
- - "5044:5044"
- alert-service:
- build: ./alert-service
- depends_on:
- - elasticsearch
- - kafka
- environment:
- - ES_HOSTS=http://elasticsearch:9200
- - KAFKA_SERVERS=kafka:9092
- volumes:
- es-data:
复制代码 用MonkeyCode天生日记分析体系的关键是:把日记流从产生到告警的完备链路形貌清楚。ELK技能栈成熟但设置复杂,AI天生的设置文件可以节省大量时间,但告警规则必要根据业务特点调优。
免责声明:如果侵犯了您的权益,请联系站长及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金. |