mysql实时同步到es

打印 上一主题 下一主题

主题 1020|帖子 1020|积分 3060

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
测试了多个方案同步,最终选择oceanu产品,底层基于Flink cdc
1、实时性能够保证,binlog量很大时也不产生延迟
2、设置SQL即可完成,操作上简单
下面示例mysql的100张分表实时同步到es,优化备注等文本字段的like查询
创建SQL作业
  1. CREATE TABLE from_mysql (
  2.   id int,
  3.   cid int NOT NULL,
  4.   gid bigint NOT NULL,
  5.   content varchar,
  6.   create_time TIMESTAMP(3)  ,
  7.   PRIMARY KEY (id) NOT ENFORCED
  8. ) WITH (
  9.   'connector' = 'mysql-cdc',
  10.   'hostname' = 'mysql-ip',
  11.   'port' = '3306',
  12.   'username' = 'mysqluser',
  13.   'password' = 'mysqlpwd',
  14.   'database-name' = 'mysqldb',
  15.   'debezium.snapshot.locking.mode' = 'none',
  16.   'table-name' = 'tb_test[0-9]?[0-9]',
  17.   'server-id' = '100-110',
  18.   'server-time-zone' = 'Asia/Shanghai',
  19.   'debezium.skipped.operations' = 'd',
  20.   'debezium.snapshot.mode' = 'schema_only',
  21.   'debezium.min.row.count.to.stream.results' = '50000'
  22. );
  23. CREATE TABLE to_es (
  24.   id string,
  25.   tableid int,
  26.   tablename string,
  27.   cid int NOT NULL,
  28.   gid string NOT NULL,
  29.   content string,
  30.   create_time string,
  31.   PRIMARY KEY (id,companyId) NOT ENFORCED
  32. ) WITH (
  33.    'connector.type' = 'elasticsearch',
  34.    'connector.version' = '7',
  35.    'connector.hosts' = 'http://ip:9200',
  36.    'connector.index' = 'myindex',
  37.    'connector.document-type' = '_doc',
  38.    'connector.username' = 'elastic',
  39.    'connector.password' = 'password123',
  40.    'update-mode' = 'upsert',
  41.    'connector.key-delimiter' = '$',
  42.    'connector.key-null-literal' = 'n/a',
  43.    'connector.failure-handler' = 'retry-rejected',
  44.    'connector.flush-on-checkpoint' = 'true',
  45.    'connector.bulk-flush.max-actions' = '10000',
  46.    'connector.bulk-flush.max-size' = '2 mb',
  47.    'connector.bulk-flush.interval' = '2000',
  48.    'connector.connection-max-retry-timeout' = '300',
  49.    'format.type' = 'json'
  50. );
  51. INSERT INTO to_es
  52. SELECT
  53. concat(CAST(id as string),'-',CAST(mod(cid,100) AS VARCHAR)) as id,
  54. id tableid,
  55. tablename,
  56. cid,
  57. gid,
  58. content,
  59. DATE_FORMAT(create_time, 'yyyy-MM-dd HH:mm:ss') as create_time
  60. from from_mysql
复制代码
这里主要留意字段类型
scan.startup.mode:"initial"(默认,同步汗青数据),"latest-offset" 同步增量数据
最后insert可以加where,只同步需要的行数据
es设置

设置好mapping、setting和自己的分词器

利用自字义分词是由于字段中全部涉及的标点符号、空格等都可以来检索
  1. PUT myindex-20230314/
  2. {
  3.   "mappings": {
  4.     "properties": {
  5.       "id":{
  6.         "type": "text"
  7.       },
  8.       "tableid":{
  9.         "type": "long"
  10.       },
  11.       "cid":{
  12.         "type": "long"
  13.       },
  14.       "gid":{
  15.         "type": "text",
  16.     "analyzer": "my_analyzer"
  17.       },
  18.       "content":{
  19.         "type": "text",
  20.     "analyzer": "my_analyzer"
  21.       },
  22.                         "create_time" : {
  23.           "type" : "keyword"
  24.         }
  25.     }
  26.   },
  27.   "settings": {
  28.     "index":{
  29.       "number_of_shards": "10",
  30.       "number_of_replicas": "1",
  31.       "refresh_interval" : "1s",
  32.       "translog": {
  33.         "sync_interval": "30s",
  34.         "durability": "async"
  35.       },
  36.       "codec": "best_compression",   
  37.     "analysis": {
  38.       "analyzer": {
  39.         "my_analyzer": {
  40.           "tokenizer": "my_tokenizer",
  41.           "filter": [
  42.             "lowercase"
  43.           ]
  44.         }
  45.       },
  46.       "tokenizer": {
  47.         "my_tokenizer": {
  48.           "type": "ngram",
  49.           "min_gram": 1,
  50.           "max_gram": 2,
  51.           "token_chars": [
  52.             "letter",
  53.             "digit","whitespace","punctuation","symbol"
  54.           ]
  55.         }
  56.       }
  57.     }
  58.     }
  59.   }
  60. }
复制代码
利用别名,方便后续的维护
  1. POST /_aliases
  2. {
  3.     "actions": [
  4.         { "add":    { "index": "myindex-20230314", "alias": "myindex" }}
  5.     ]
  6. }
复制代码
之前测试的

  • canal单进程延迟越来越大,单独设置汗青数据同步
  • go-mysql-elasticsearch经常报错重新同步
  • logstash同步100张分表不知道怎么设置

oceanus是收费的对于运维人员不敷的情况,可以参考,有精力的可以思量flink。
欢迎关注我的公众号:老王76。一起进步吧!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立聪堂德州十三局店

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表