花瓣小跑 发表于 2025-3-26 05:44:37

流式ETL设置指南:从MySQL到Elasticsearch的实时数据同步

流式ETL设置指南:从MySQL到Elasticsearch的实时数据同步

场景介绍

假设您运营一个电商平台,必要将MySQL数据库中的订单、用户和产物信息实时同步到Elasticsearch,以支持实时搜索、分析和仪表盘展示。传统的批处理ETL无法满足实时性要求,因此我们将使用Flink CDC构建流式ETL管道。
条件条件


[*]MySQL数据库 (作为数据源)
[*]Elasticsearch (作为目标体系)
[*]Flink环境 (处理引擎)
[*]Java开发环境
步骤一:环境预备

1.1 预备MySQL环境

-- 创建数据库
CREATE DATABASE IF NOT EXISTS shop;
USE shop;

-- 创建用户表
CREATE TABLE users (
id INT PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建产品表
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(200),
price DECIMAL(10,2),
stock INT,
category VARCHAR(100)
);

-- 创建订单表
CREATE TABLE orders (
id INT PRIMARY KEY,
user_id INT,
order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(20),
total_amount DECIMAL(10,2),
FOREIGN KEY (user_id) REFERENCES users(id)
);

-- 创建订单详情表
CREATE TABLE order_items (
id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT,
price DECIMAL(10,2),
FOREIGN KEY (order_id) REFERENCES orders(id),
FOREIGN KEY (product_id) REFERENCES products(id)
);

-- 插入一些测试数据
INSERT INTO users VALUES (1, '张三', 'zhangsan@example.com', '2023-01-01 10:00:00');
INSERT INTO products VALUES (101, 'iPhone 14', 5999.00, 100, '电子产品');
INSERT INTO orders VALUES (1001, 1, '2023-01-05 14:30:00', '已完成', 5999.00);
INSERT INTO order_items VALUES (10001, 1001, 101, 1, 5999.00);
确保MySQL已开启binlog,编辑MySQL设置文件:

server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
1.2 预备Elasticsearch环境

创建索引映射:
PUT /shop_orders
{
"mappings": {
    "properties": {
      "order_id": { "type": "keyword" },
      "user_id": { "type": "keyword" },
      "user_name": { "type": "keyword" },
      "user_email": { "type": "keyword" },
      "order_time": { "type": "date" },
      "status": { "type": "keyword" },
      "total_amount": { "type": "double" },
      "items": {
      "type": "nested",
      "properties": {
          "product_id": { "type": "keyword" },
          "product_name": { "type": "text" },
          "quantity": { "type": "integer" },
          "price": { "type": "double" },
          "category": { "type": "keyword" }
      }
      }
    }
}
}
步骤二:创建Flink流式ETL项目

2.1 创建Maven项目

pom.xml文件设置:
<dependencies>
    <!-- Flink核心依赖 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.17.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>1.17.0</version>
    </dependency>
   
    <!-- Flink CDC连接器 -->
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.3.0</version>
    </dependency>
   
    <!-- Elasticsearch连接器 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch7</artifactId>
      <version>1.17.0</version>
    </dependency>
   
    <!-- JSON处理 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>1.17.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.9.0</version>
    </dependency>
</dependencies>
2.2 实现ETL主程序

创建MySQLToElasticsearchETL.java文件:
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MySQLToElasticsearchETL {

    public static void main(String[] args) throws Exception {
      // 1. 设置Flink执行环境
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);// 开发环境设置为1,生产环境根据需要调整
      env.enableCheckpointing(60000);// 每60秒做一次检查点

      // 2. 配置MySQL CDC源
      MySqlSource<String> userSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("shop")
                .tableList("shop.users")
                .username("root")
                .password("yourpassword")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

      MySqlSource<String> productSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("shop")
                .tableList("shop.products")
                .username("root")
                .password("yourpassword")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

      MySqlSource<String> orderSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("shop")
                .tableList("shop.orders")
                .username("root")
                .password("yourpassword")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

      MySqlSource<String> orderItemSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("shop")
                .tableList("shop.order_items")
                .username("root")
                .password("yourpassword")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();

      // 3. 创建数据流
      DataStream<String> userStream = env.fromSource(
                userSource,
                WatermarkStrategy.noWatermarks(),
                "User CDC Source"
      );

      DataStream<String> productStream = env.fromSource(
                productSource,
                WatermarkStrategy.noWatermarks(),
                "Product CDC Source"
      );

      DataStream<String> orderStream = env.fromSource(
                orderSource,
                WatermarkStrategy.noWatermarks(),
                "Order CDC Source"
      );

      DataStream<String> orderItemStream = env.fromSource(
                orderItemSource,
                WatermarkStrategy.noWatermarks(),
                "OrderItem CDC Source"
      );

      // 4. 数据转换与关联
      // 用户缓存
      Map<Integer, Map<String, Object>> userCache = new HashMap<>();
      userStream.map(json -> {
            JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();
            JsonObject after = jsonObject.getAsJsonObject("after");
            if (after != null) {
                int userId = after.get("id").getAsInt();
                Map<String, Object> userInfo = new HashMap<>();
                userInfo.put("name", after.get("name").getAsString());
                userInfo.put("email", after.get("email").getAsString());
                userCache.put(userId, userInfo);
            }
            return json;
      });

      // 产品缓存
      Map<Integer, Map<String, Object>> productCache = new HashMap<>();
      productStream.map(json -> {
            JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();
            JsonObject after = jsonObject.getAsJsonObject("after");
            if (after != null) {
                int productId = after.get("id").getAsInt();
                Map<String, Object> productInfo = new HashMap<>();
                productInfo.put("name", after.get("name").getAsString());
                productInfo.put("price", after.get("price").getAsDouble());
                productInfo.put("category", after.get("category").getAsString());
                productCache.put(productId, productInfo);
            }
            return json;
      });

      // 订单与订单项关联
      Map<Integer, List<Map<String, Object>>> orderItemsCache = new HashMap<>();
      orderItemStream.map(json -> {
            JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();
            JsonObject after = jsonObject.getAsJsonObject("after");
            if (after != null) {
                int orderId = after.get("order_id").getAsInt();
                int productId = after.get("product_id").getAsInt();
                Map<String, Object> itemInfo = new HashMap<>();
                itemInfo.put("product_id", productId);
                itemInfo.put("quantity", after.get("quantity").getAsInt());
                itemInfo.put("price", after.get("price").getAsDouble());
               
                // 添加产品信息
                if (productCache.containsKey(productId)) {
                  itemInfo.put("product_name", productCache.get(productId).get("name"));
                  itemInfo.put("category", productCache.get(productId).get("category"));
                }
               
                if (!orderItemsCache.containsKey(orderId)) {
                  orderItemsCache.put(orderId, new ArrayList<>());
                }
                orderItemsCache.get(orderId).add(itemInfo);
            }
            return json;
      });

      // 处理订单并关联用户和订单项
      SingleOutputStreamOperator<Map<String, Object>> enrichedOrderStream = orderStream.map(new MapFunction<String, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(String json) throws Exception {
                JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();
                JsonObject after = jsonObject.getAsJsonObject("after");
                String op = jsonObject.get("op").getAsString();
               
                Map<String, Object> orderInfo = new HashMap<>();
               
                // 只处理插入和更新事件
                if ("c".equals(op) || "u".equals(op)) {
                  int orderId = after.get("id").getAsInt();
                  int userId = after.get("user_id").getAsInt();
                  
                  orderInfo.put("order_id", orderId);
                  orderInfo.put("user_id", userId);
                  orderInfo.put("order_time", after.get("order_time").getAsString());
                  orderInfo.put("status", after.get("status").getAsString());
                  orderInfo.put("total_amount", after.get("total_amount").getAsDouble());
                  
                  // 关联用户信息
                  if (userCache.containsKey(userId)) {
                        orderInfo.put("user_name", userCache.get(userId).get("name"));
                        orderInfo.put("user_email", userCache.get(userId).get("email"));
                  }
                  
                  // 关联订单项
                  if (orderItemsCache.containsKey(orderId)) {
                        orderInfo.put("items", orderItemsCache.get(orderId));
                  }
                }
               
                return orderInfo;
            }
      });

      // 5. 配置Elasticsearch接收器
      List<HttpHost> httpHosts = new ArrayList<>();
      httpHosts.add(new HttpHost("localhost", 9200, "http"));

      ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                (request, context, element) -> {
                  if (element.containsKey("order_id")) {
                        request.index("shop_orders")
                               .id(element.get("order_id").toString())
                               .source(element);
                  }
                }
      );

      // 配置批量写入
      esSinkBuilder.setBulkFlushMaxActions(1);// 每条记录立即写入,生产环境可以调大
      esSinkBuilder.setBulkFlushInterval(1000);// 每秒刷新一次

      // 6. 写入Elasticsearch
      enrichedOrderStream.addSink(esSinkBuilder.build());

      // 7. 执行作业
      env.execute("MySQL to Elasticsearch ETL Job");
    }
}
步骤三:部署和运行

3.1 编译打包

使用Maven打包:
mvn clean package
3.2 提交到Flink集群

flink run -c MySQLToElasticsearchETL target/your-jar-file.jar
3.3 验证数据同步

在Elasticsearch中查询数据:
curl -X GET "localhost:9200/shop_orders/_search?pretty"
关键点和注意事项


[*] 数据一致性:

[*]确保开启Flink的检查点机制,实现exactly-once语义
[*]合理设置检查点隔断,均衡一致性和性能

[*] 状态管理:

[*]在上述例子中,我们在内存中维护了用户和产物的缓存,生产环境应使用Flink的状态API
[*]思量状态巨细和清算计谋,制止状态无穷增长

[*] 表关联计谋:

[*]上述示例使用了简化的表关联方式
[*]生产环境可以思量使用Flink SQL或异步I/O举行优化

[*] 性能优化:

[*]调解并行度以匹配业务需求
[*]设置合适的批处理巨细和隔断
[*]监控反压(backpressure)情况

[*] 错误处理:

[*]添加错误处理逻辑,处理数据格式非常
[*]实现重试机制,应对暂时网络故障
[*]思量死信队列(DLQ)来处理无法处理的消息

[*] 监控和告警:

[*]接入Prometheus和Grafana监控Flink作业
[*]设置关键指标告警,如延迟、失败次数等

[*] 扩展性思量:

[*]计划时思量表布局变动的处理方式
[*]为未来增加新数据源或新目标体系预留扩展点

扩展功能

基于这个基础架构,您可以进一步实现:

[*]增量更新优化:只同步变动字段,减少网络传输
[*]汗青数据回溯:支持从特定时间点重新同步数据
[*]数据转换:增加复杂的业务计算逻辑
[*]数据过滤:根据业务规则过滤不必要的数据
[*]多目标写入:同时将数据写入Elasticsearch和其他体系如Kafka
这个完备的方案展示了怎样使用Flink CDC构建一个端到端的流式ETL体系,实现从MySQL到Elasticsearch的实时数据同步,同时处理表之间的关联关系。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 流式ETL设置指南:从MySQL到Elasticsearch的实时数据同步