怎样通过 Logstash 将数据采集到 Elasticsearch
作者:来自 Elastic Andre Luizhttps://i-blog.csdnimg.cn/direct/9f0ca8b6be8c408f826b7bf97623912c.webp
将 Logstash 与 Elasticsearch 集成以实现高效的数据提取、索引和搜索的分步指南。
什么是 Logstash?
Logstash 是一种广泛使用的 Elastic Stack 工具,用于实时处理大量日志数据。它充当高效的数据管道,未来自各种泉源的信息集成到单一布局化流中。其主要功能是可靠地执行数据提取、转换和加载。
Logstash 具有多种优势,尤其是其在支持多种范例的输入、过滤器和输出方面的多功能性,可与各种泉源和目的地集成。它实时处理数据,捕获和转换信息。它与 Elastic Stack(尤其是 Elasticsearch 和 Kibana)的原生集成有助于数据分析和可视化。别的,它还包括高级过滤器,可实现高效的数据规范化、丰富和转换。
Logstash 怎样工作?
Logstash 由输入、过滤器和输出组成,它们构成了数据处理管道。这些组件在定义数据提取流程的 .config 文件中举行配置。
[*]输入(Inputs):从各种泉源捕获数据。
[*]过滤器(Filters):处理和转换捕获的数据。
[*]输出(Outputs):将转换后的数据发送到定义的目的地。
https://i-blog.csdnimg.cn/direct/b6e0f59ec4df4718ad0e3f2a84a266e4.webp
每个组件最常见的范例如下所示:
输入范例:
[*]文件:读取各种格式(文本、JSON、CSV 等)的日志文件。
[*]消息队列:Kafka、RabbitMQ。
[*]API:Webhook 或其他数据收集 API。
[*]数据库:用于关系数据提取的 JDBC 连接。
过滤器范例:
[*]Grok:用于分析和提取文本模式。
[*]Mutate:修改字段(重定名、转换范例、删除数据)。
[*]Date:将日期和时间字符串转换为可读的日期格式。
[*]GeoIP:使用地理数据丰富日志。
[*]JSON:解析或天生 JSON 数据。
输出范例:
[*]Elasticsearch:最常见的目的地,Elasticsearch 是一个搜索和分析引擎,允许对 Logstash 索引的数据举行强大的搜索和可视化。
[*]Files:将处理后的数据存储在本地。
[*]云服务:Logstash 可以将数据发送到各种云服务,例如 AWS S3、Google Cloud Storage、Azure Blob Storage,举行存储或分析。
[*]数据库:Logstash 可以通过特定的连接器将数据发送到各种其他数据库,例如 MySQL、PostgreSQL、MongoDB 等。
Elasticsearch 的数据提取
在此示例中,我们使用 Logstash 将数据提取到 Elasticsearch 中。此示例中配置的步骤将具有以下流程:
[*]Kafka 将用作数据源。
[*]Logstash 将使用数据,应用 grok、geoip 和 mutate 等过滤器来构造数据。
[*]转换后的数据将发送到 Elasticsearch 中的索引。
[*]Kibana 将用于可视化索引数据。
先决条件
我们将使用 Docker Compose 创建一个具有必要服务的环境:Elasticsearch、Kibana、Logstash 和 Kafka。Logstash 配置文件名为 logstash.conf,将直接挂载到 Logstash 容器中。下面我们将详细介绍配置文件的配置。
这是 docker-compose.yml:
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.16.2
container_name: elasticsearch-8.16.2
environment:
- node.name=elasticsearch
- xpack.security.enabled=false
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
ports:
- 9200:9200
networks:
- shared_network
kibana:
image: docker.elastic.co/kibana/kibana:8.16.2
container_name: kibana-8.16.2
restart: always
environment:
- ELASTICSEARCH_URL=http://elasticsearch:9200
ports:
- 5601:5601
depends_on:
- elasticsearch
networks:
- shared_network
logstash:
image: docker.elastic.co/logstash/logstash:8.16.2
container_name: logstash-8.16.2
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
ports:
- "5044:5044"
depends_on:
- elasticsearch
networks:
- shared_network
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
networks:
- shared_network
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
networks:
- shared_network
networks:
shared_network: 如上所述,将定义 Logstash 管道,在此步骤中,我们将描述输入、过滤器和输出配置。
将在当前目录(docker-compose.yml 所在的位置)中创建 logstash.conf 文件。在 docker-compose.yml 中,本地文件系统上的 logstash.conf 文件将安装在容器内的路径 /usr/share/logstash/pipeline/logstash.conf 中。
Logstash 管道配置
Logstash 管道分为三个部分:输入、过滤器和输出。
[*]输入:定义数据的使用位置(在本例中为 Kafka)。
[*]过滤器:对原始数据举行转换和布局化。
[*]输出:指定处理后的数据发送到的位置(在本例中为 Elasticsearch)。
接下来,我们将详细配置每个步骤。
输入配置
数据源是 Kafka 主题,要使用该主题的数据,需要配置 Kafka 输入插件。以下是 Logstash 中 Kafka 插件的配置,我们定义:
[*]bootstrap_servers:Kafka 服务器的地址。
[*]topics:要使用的主题的名称。
[*]group_id:消费者组标识符。
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["logs"]
group_id => "logstash-consumer"
}
} 这样,我们就可以吸收数据了。
过滤器配置
过滤器负责转换和构造数据。让我们配置以下过滤器:
Grok 过滤器
从非布局化数据中提取布局化信息。在本例中,它提取时间戳、日志级别、客户端 IP、URI、状态和 JSON 负载。
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp},%{WORD:log_level},%{IP:client_ip},%{URIPATH:uri},%{NUMBER:status}"
}
} 示例日志:
2025-01-05 16:30:15,INFO,69.162.81.155,/api/products,200,{"user_id":123,"region":"US"} 提取字段:
[*]timestamp:提取日期和时间(例如:2025-01-05T16:30:15)。
[*]log_level:捕获日志级别(例如:INFO、ERROR)。
[*]client_ip:捕获客户端 IP 地址(例如:69.162.81.155)。
[*]uri:捕获 URI 路径(例如:/api/products)。
[*]status:捕获 HTTP 状态码(例如:200)。
日期过滤器
将时间戳字段转换为 Elasticsearch 可读的格式并将其存储在 @timestamp 中。
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
}
GeoIP 过滤器
接下来,我们将使用 geoip 过滤器根据 client_ip 字段的值检索地理信息,例如国家、地域、都会和坐标。
geoip {
source => "client_ip"
target => "geoip"
}
Mutate 过滤器
变异过滤器允许对字段举行转换。在本例中,我们将使用它的两个属性:
[*]remove_field:删除时间戳和消息字段,因为它们不再需要。
[*]convert:将状态字段从字符串转换为整数。
输出配置
输出定义转换后的数据将发送到那边。在本例中,我们将使用 Elasticsearch。
output {
elasticsearch {
hosts => ["http://172.21.0.1:9200"]
index => "webapp_logs"
}
} 现在我们已经定义了配置文件。以下是完整文件:
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["logs"]
group_id => "logstash-consumer"
}
}filter { grok { match => { "message" => "%{TIMESTAMP_ISO8601:timestamp},%{WORD:log_level},%{IP:client_ip},%{URIPATH:uri},%{NUMBER:status}" } } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" } geoip { source => "client_ip" target => "geoip" } mutate { remove_field => ["timestamp", "message"] convert => { "status" => "integer" } }}output {
elasticsearch {
hosts => ["http://172.21.0.1:9200"]
index => "webapp_logs"
}
}
发送和提取数据
容器运行时,我们可以开始向主题发送消息并等待数据被索引。首先,如果尚未创建主题(topic),请创建主题。
docker exec -it kafka kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 要发送消息,请在终端中执行以下命令:
docker exec -it kafka kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 要发送的消息:
2025-01-05 16:30:15,INFO,69.162.81.155,/api/products,200,{"user_id":123,"region":"US"}
2025-01-05 16:31:02,ERROR,104.101.21.255,/api/orders,500,{"user_id":124,"region":"BR"}
2025-01-05 16:32:45,INFO,103.244.145.255,/api/cart,404,{"user_id":125,"region":"DE"} 要检察索引数据,请转到 Kibana:
https://i-blog.csdnimg.cn/direct/39059f615a434bcc98eedeafd51d0f9d.webp
索引乐成完成后,我们可以在 Kibana 中检察和分析数据。映射和索引过程可确保字段根据 Logstash 中定义的配置举行布局化。
结论
通过提供的配置,我们使用 Logstash 创建了一个管道,用于在具有 Elasticsearch 和 Kafka 的容器化环境中索引日志。我们探索了 Logstash 使用 grok、date、geoip 和 mutate 等过滤器处理消息的灵活性,从而构建了数据以供在 Kibana 中举行分析。别的,我们还演示了怎样配置与 Kafka 的集成以使用消息并使用它们来处理和索引数据。
参考
[*]Logstash
[*]https://www.elastic.co/guide/en/logstash/current/index.html
[*]Logstash Docker
[*]https://www.elastic.co/guide/en/logstash/current/docker.html
[*] GeoIp Plugin
[*]https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html
[*]Mutate Plugin
[*]https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
[*]Grok Plugin
[*]https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
[*]Kafka Plugin
[*]https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时举行!
Elasticsearch 包罗许多新功能,可帮助你针对自己的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立刻在本地机器上试用 Elastic。
原文:How to ingest data to Elasticsearch through Logstash - Elasticsearch Labs
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]