数据同步-Mysql同步到ElasticSearch

打印 上一主题 下一主题

主题 587|帖子 587|积分 1761

数据同步

一样平常情况下,假如做查询搜索功能,使用 ES 来含糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。
数据同步包含:全量同步 (首次) + 增量同步(新数据)。
首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本。
  1. public class FullSyncPostToEs implements CommandLineRunner {
  2.     @Resource
  3.     private PostService postService;
  4.     @Resource
  5.     private PostEsDao postEsDao;
  6.     @Override
  7.     public void run(String... args) {
  8.         List<Post> postList = postService.list();
  9.         if (CollectionUtils.isEmpty(postList)) {
  10.             return;
  11.         }
  12.         List<PostEsDTO> postEsDTOList = postList.stream().map(PostEsDTO::objToDto).collect(Collectors.toList());
  13.         final int pageSize = 500;
  14.         int total = postEsDTOList.size();
  15.         log.info("FullSyncPostToEs start, total {}", total);
  16.         for (int i = 0; i < total; i += pageSize) {
  17.             int end = Math.min(i + pageSize, total);
  18.             log.info("sync from {} to {}", i, end);
  19.             postEsDao.saveAll(postEsDTOList.subList(i, end));
  20.         }
  21.         log.info("FullSyncPostToEs end, total {}", total);
  22.     }
  23. }
复制代码
增量同步有五种方式:
1、定时任务



  • 定时任务:比如1 分钟 1 次,找到 MySQL 中过去几分钟内(至少是定时周期的 2 倍)发生改变的数据,然后更新到 ES。
  1. public class IncSyncPostToEs {
  2.     @Resource
  3.     private PostMapper postMapper;
  4.     @Resource
  5.     private PostEsDao postEsDao;
  6.     /**
  7.      * 每分钟执行一次
  8.      */
  9.     @Scheduled(fixedRate = 60 * 1000)
  10.     public void run() {
  11.         // 查询近 5 分钟内的数据
  12.         Date fiveMinutesAgoDate = new Date(new Date().getTime() - 5 * 60 * 1000L);
  13.         List<Post> postList = postMapper.listPostWithDelete(fiveMinutesAgoDate);
  14.         if (CollectionUtils.isEmpty(postList)) {
  15.             log.info("no inc post");
  16.             return;
  17.         }
  18.         List<PostEsDTO> postEsDTOList = postList.stream()
  19.                 .map(PostEsDTO::objToDto)
  20.                 .collect(Collectors.toList());
  21.         final int pageSize = 500;
  22.         int total = postEsDTOList.size();
  23.         log.info("IncSyncPostToEs start, total {}", total);
  24.         for (int i = 0; i < total; i += pageSize) {
  25.             int end = Math.min(i + pageSize, total);
  26.             log.info("sync from {} to {}", i, end);
  27.             postEsDao.saveAll(postEsDTOList.subList(i, end));
  28.         }
  29.         log.info("IncSyncPostToEs end, total {}", total);
  30.     }
  31. }
复制代码
优点:简单易懂、占用资源少、不消引入第三方中间件;
缺点:有时间差;
应用场景:数据时间内差别步影响不大、或者数据险些不发生修改;
2、双写



  • 双写:写数据的时候,必须去写入到ES,更新、删除都需要操纵ES(加事务:大概存在写入某一方出现失败,形成脏数据)。
3、MQ异步写入



  • MQ异步写入:在写入数据库时,通过MQ异步写入ES,同样大概存在数据写入不一致题目。
4、Logstash



  • ES的Logstash数据同步管道:Logstash 变乱处理管道有三个阶段:输入过滤器输出
下载地点:https://www.elastic.co/guide/en/logstash/7.17/installing-logstash.html
inputs 模块负责收集数据,filters 模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs 模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。
inputs 和 outputs 支持编解码器,使您可以或许在数据进入或脱离管道时对数据进行编码或解码,而无需使用单独的过滤器。

启动Logstash,添加一个conf设置文件,便可完成同步任务。
  1. C:\Windows\system32> cd C:\logstash-7.17.23\
  2. C:\logstash-7.17.23> .\bin\logstash.bat -f .\config\syslog.conf
复制代码
syslog.conf:数据同步的设置文件。
举个例子:
输入变乱:
  1. input {
  2.   jdbc {
  3.     jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"  //数据库驱动
  4.     jdbc_driver_class => "com.mysql.jdbc.Driver"                 //连接数据库
  5.     jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
  6.     jdbc_user => "mysql"
  7.      jdbc_password => "mysql"
  8.      statement => "SELECT * from songs where artist = :favorite_artist"  //执行sql语句
  9.     parameters => { "favorite_artist" => "Beethoven" }   //预编译  
  10.     schedule => "* * * * *"    //corn表达式,多久进行同步
  11.   }
  12. }
复制代码
:sql_last_value 可以设置每次查询效果中updatetime为最后的时间,作为下次增量同步的开始时间(需要对时间进行排序才能保证最后一条数据为时间最大的)。
  1. input {
  2.   jdbc {
  3.     statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE updatetime > :sql_last_value order  by updatetime desc"
  4.     use_column_value => true
  5.     tracking_column => "updatetime "
  6.     # ... other configuration bits
  7.   }
  8. }
复制代码
输失变乱:
  1. output {
  2. stdout { codec => rubydebug }
  3. elasticsearch {
  4. hosts => "127.0.0,1:9200"  //写入到ES
  5. index => "post_v1"        //ES对应的索引
  6. document_id => "%{id)"            //取数据库查询出的id作为ES中的唯一id
  7. }
  8. }
复制代码
过滤变乱:
  1. filter {
  2. mutate {
  3. rename => {
  4. "updatetime" =>"updateTime"    //给字段重命名
  5. "userid"     => "userId"
  6. "createtime" =>"createTime"
  7. "isdelete"   =>"isDelete"
  8. remove_field =>["thumbnm","favournum"]   //移除不需要同步到ES中的字段
复制代码
更多参数,可参考官方文档进行设置:https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html。
5、Canal



  • Canal
优点:实时同步,实时性非常强;
原理:数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理;
canal: 帮你监听 binlog,并解析 binlog 为你可以理解的内容,它伪装成了 mysql 的从节点,获取主节点给的 binlog。

参考文档:https://github.com/alibaba/canal/wiki/QuickStart
后记

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表