【DevOps】Elasticsearch 数据跨集群同步方案

打印 上一主题 下一主题

主题 823|帖子 823|积分 2469

目录
1、Elasticsearch Cross-Cluster Replication (CCR)
1.1、长处
1.2、缺点
1.3、步调
1.4、示例
2. Logstash 或其他 ETL 工具
2.1、长处
2.2、缺点
2.3、步调
3. Apache Kafka 或 RabbitMQ
3.1、长处
3.2、缺点
3.3、步调
4、利用 Reindex API 进行跨集群同步
4.1、长处
4.2、缺点
4.3、步调
4.4、示例
总结


跨集群同步 ES 数据,意味着您希望将一个 Elasticsearch 集群的数据及时或接近及时地复制到另一个集群。 这对于劫难规复、地理位置分布、数据隔离等场景非常有用。我会详细解说Elasticsearch 数据跨集群同步的几种方案,并结合示例代码和配置,帮助您更好地明白。

1、Elasticsearch Cross-Cluster Replication (CCR)

CCR 是 Elasticsearch 官方提供的跨集群复制解决方案,适用于需要及时或接近及时数据同步的场景。它允许您将一个集群中的索引复制到另一个集群,并保持数据同步。
1.1、长处



  • 及时同步: 数据更改会自动从领导者索引复制到跟随者索引。
  • 易于配置: CCR 的配置相对简单,只需要在源集群和目标集群上进行少量配置即可。
  • 高性能: CCR 利用 Elasticsearch 的内部机制进行数据复制,性能优异。
1.2、缺点



  • 版本要求: 仅支持 Elasticsearch 6.7 及更高版本。
  • 网络耽误: 对网络耽误敏感,如果两个集群之间的网络耽误较高,大概会影响同步性能。
1.3、步调



  • 启用 CCR:

    • 在源集群和目标集群的 elasticsearch.yml 文件中添加以下配置:
      1. xpack.security.enabled: true
      2. xpack.license.self_generated.type: trial
      3. xpack.ccr.enabled: true
      复制代码
    • 重启两个集群。

  • 创建 Follower Index:

    • 在目标集群上实行以下哀求,创建 follower index 并指定要同步的 leader index:
      1. PUT <follower_index_name>
      2. {
      3.   "index": {
      4.     "remote": {
      5.       "name": "<remote_cluster_name>",
      6.       "connection": {
      7.         "hosts": ["<leader_cluster_host_1>:<port>", "<leader_cluster_host_2>:<port>"]
      8.       }
      9.     },
      10.     "leader_index": "<leader_index_name>"
      11.   }
      12. }
      复制代码
    • 将 <follower_index_name> 更换为您要创建的 follower index 名称。
    • 将 <remote_cluster_name> 更换为源集群的名称 (在 elasticsearch.yml 中配置)。
    • 将 <leader_cluster_host_1>:<port> 等更换为源集群节点的主机名和端口。
    • 将 <leader_index_name> 更换为要同步的 leader index 名称。

  • 启动同步:

    • CCR 会自动启动同步过程。您可以利用以下 API 监控同步状态:
      1. GET /_ccr/stats
      复制代码

1.4、示例

假设您有两个集群:cluster_A (源集群) 和 cluster_B (目标集群)。您希望将 cluster_A 上的索引 logs 同步到 cluster_B。
启用 CCR:


  • 在 cluster_A 和 cluster_B 的 elasticsearch.yml 文件中添加 CCR 配置 (如上所示)。
创建 Follower Index (在 cluster_B 上实行):
  1. PUT logs_replica
  2. {
  3.   "index": {
  4.     "remote": {
  5.       "name": "cluster_A",
  6.       "connection": {
  7.         "hosts": ["cluster_A_host_1:9200", "cluster_A_host_2:9200"]
  8.       }
  9.     },
  10.     "leader_index": "logs"
  11.   }
  12. }
复制代码
 监控同步状态:
  1. GET /_ccr/stats
复制代码
2. Logstash 或其他 ETL 工具

Logstash 是一款开源的数据处置惩罚管道工具,可以用于收集、解析、转换和传输数据。您可以利用 Logstash 将数据从源 Elasticsearch 集群同步到目标 Elasticsearch 集群。

2.1、长处



  • 灵活性: Logstash 支持各种数据源和目标,并提供了丰富的插件,可以进行数据转换和过滤。
  • 增量同步: 可以配置 Logstash 进行增量数据同步,只同步自上次同步以来更改的数据。
2.2、缺点



  • 复杂性: Logstash 的配置和维护比 CCR 更复杂。
  • 性能: Logstash 会对源集群造成肯定的性能影响。
2.3、步调



  • 安装 Logstash: 下载并安装 Logstash。
  • 配置 Logstash: 创建一个 Logstash 配置文件,用于从源集群读取数据,并将其写入目标集群。例如:
    1. input {
    2.   elasticsearch {
    3.     hosts => ["<source_cluster_host_1>:<port>", "<source_cluster_host_2>:<port>"]
    4.     index => "<source_index_name>"
    5.     query => '{ "match_all": {} }'
    6.   }
    7. }
    8. output {
    9.   elasticsearch {
    10.     hosts => ["<target_cluster_host_1>:<port>", "<target_cluster_host_2>:<port>"]
    11.     index => "<target_index_name>"
    12.   }
    13. }
    复制代码
  • 运行 Logstash: 利用创建的配置文件运行 Logstash。
3. Apache Kafka 或 RabbitMQ

Apache Kafka 和 RabbitMQ 是盛行的消息队列体系,可以用于构建高吞吐量、低耽误的数据管道。您可以利用它们将数据从源 Elasticsearch 集群异步复制到目标 Elasticsearch 集群。

3.1、长处



  • 解耦: 消息队列可以解耦数据生产者和消费者,提高体系可伸缩性和可靠性。
  • 可靠性: 消息队列支持数据持久化,可以包管数据不丢失。
3.2、缺点



  • 复杂性: 利用消息队列进行数据同步需要额外的组件和配置,架构比较复杂。
3.3、步调



  • 配置消息队列: 安装并配置 Apache Kafka 或 RabbitMQ。
  • 创建生产者: 在源集群上创建数据生产者,将数据写入消息队列。
  • 创建消费者: 在目标集群上创建数据消费者,从消息队列读取数据并将其写入 Elasticsearch。
3.4、示例 (利用 Kafka):


  • 配置 Kafka: 安装并配置 Kafka 集群。
  • 创建生产者 (Python):
    1. from kafka import KafkaProducer
    2. producer = KafkaProducer(bootstrap_servers=['<kafka_broker_1>:<port>', '<kafka_broker_2>:<port>'])
    3. # 从 Elasticsearch 读取数据
    4. # ...
    5. # 将数据发送到 Kafka topic
    6. producer.send('<topic_name>', data)
    复制代码
  • 创建消费者 (Python):
    1. from kafka import KafkaConsumer
    2. from elasticsearch import Elasticsearch
    3. consumer = KafkaConsumer('<topic_name>', bootstrap_servers=['<kafka_broker_1>:<port>', '<kafka_broker_2>:<port>'])
    4. es = Elasticsearch(['<target_cluster_host_1>:<port>', '<target_cluster_host_2>:<port>'])
    5. for message in consumer:
    6.     data = message.value
    7.     # 将数据写入 Elasticsearch
    8.     es.index(index='<target_index_name>', document=data)
    复制代码
4、利用 Reindex API 进行跨集群同步

Reindex API 重要用于重建索引,但它也可以用于跨集群复制数据。
4.1、长处



  • 简单易用: Reindex API 利用方便,只需要指定源集群、目标集群和索引名称即可。
  • 支持版本间迁移: 可以利用 Reindex API 将数据从较低版本的 Elasticsearch 集群迁移到较高版本的集群。
  • 灵活的数据转换: 可以在 reindex 过程中利用脚本对数据进行转换。
4.2、缺点



  • 非及时同步: Reindex API 是一次性操作,不会及时同步数据。
  • 性能: 对于大型索引,reindex 操作大概需要很长时间,而且会对源集群和目标集群造成肯定的性能影响。
4.3、步调



  • 准备目标集群: 确保目标集群已经创建,而且具有充足的磁盘空间来存储数据。
  • 实行 Reindex API 哀求: 在目标集群上实行以下哀求,将数据从源集群复制到目标集群:
    1. POST _reindex
    2. {
    3.   "source": {
    4.     "remote": {
    5.       "host": "<source_cluster_host>:<port>",
    6.       "username": "<username>",
    7.       "password": "<password>"
    8.     },
    9.     "index": "<source_index_name>"
    10.   },
    11.   "dest": {
    12.     "index": "<target_index_name>"
    13.   }
    14. }
    复制代码

    • 将 <source_cluster_host>:<port> 更换为源集群节点的主机名和端口。
    • 将 <username> 和 <password> 更换为具有充足权限访问源集群的用户的根据。
    • 将 <source_index_name> 更换为要复制的索引名称。
    • 将 <target_index_name> 更换为目标索引名称。

  • 监控 reindex 进度: 可以利用以下 API 监控 reindex 操作的进度:
    1. GET _tasks/<task_id>
    复制代码

    • 将 <task_id> 更换为 reindex 操作返回的使命 ID。

4.4、示例

假设您要将名为 "source_index" 的索引从运行在 192.168.1.10:9200 的源集群复制到名为 "target_index" 的目标集群,可以利用以下命令:
  1. curl -X POST "localhost:9200/_reindex" -H 'Content-Type: application/json' -d'
  2. {
  3.   "source": {
  4.     "remote": {
  5.       "host": "https://192.168.1.10:9200",
  6.       "username": "user",
  7.       "password": "password"
  8.     },
  9.     "index": "source_index"
  10.   },
  11.   "dest": {
  12.     "index": "target_index"
  13.   }
  14. }
  15. '
复制代码
注意事项:


  • 确保目标集群中不存在与源集群索引同名的索引,否则数据大概会被覆盖。
  • 为了提高 reindex 性能,可以调解 reindex API 的参数,例如 slices(用于并行处置惩罚)和 batch_size(用于控制每次批量处置惩罚的文档数量)。
总结

选择哪种 ES 数据跨集群同步方案取决于您的具体需求,例如数据及时性要求、数据量、集群版本、网络情况等。 CCR 是官方推荐的解决方案,配置简单,性能优异,但需要 Elasticsearch 6.7 以上版本。 Logstash 和消息队列提供了更高的灵活性和可定制性,但配置和维护更复杂。利用 Reindex API 进行跨集群同步是一种简单直接的方法,但它不适用于需要及时同步数据的场景。 对于需要定期同步数据或进行一次性数据迁移的情况,Reindex API 是一个不错的选择。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

愛在花開的季節

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表