马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
测试了多个方案同步,最终选择oceanu产品,底层基于Flink cdc
1、实时性能够保证,binlog量很大时也不产生延迟
2、设置SQL即可完成,操作上简单
下面示例mysql的100张分表实时同步到es,优化备注等文本字段的like查询
创建SQL作业
- CREATE TABLE from_mysql (
- id int,
- cid int NOT NULL,
- gid bigint NOT NULL,
- content varchar,
- create_time TIMESTAMP(3) ,
- PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = 'mysql-ip',
- 'port' = '3306',
- 'username' = 'mysqluser',
- 'password' = 'mysqlpwd',
- 'database-name' = 'mysqldb',
- 'debezium.snapshot.locking.mode' = 'none',
- 'table-name' = 'tb_test[0-9]?[0-9]',
- 'server-id' = '100-110',
- 'server-time-zone' = 'Asia/Shanghai',
- 'debezium.skipped.operations' = 'd',
- 'debezium.snapshot.mode' = 'schema_only',
- 'debezium.min.row.count.to.stream.results' = '50000'
- );
- CREATE TABLE to_es (
- id string,
- tableid int,
- tablename string,
- cid int NOT NULL,
- gid string NOT NULL,
- content string,
- create_time string,
- PRIMARY KEY (id,companyId) NOT ENFORCED
- ) WITH (
- 'connector.type' = 'elasticsearch',
- 'connector.version' = '7',
- 'connector.hosts' = 'http://ip:9200',
- 'connector.index' = 'myindex',
- 'connector.document-type' = '_doc',
- 'connector.username' = 'elastic',
- 'connector.password' = 'password123',
- 'update-mode' = 'upsert',
- 'connector.key-delimiter' = '$',
- 'connector.key-null-literal' = 'n/a',
- 'connector.failure-handler' = 'retry-rejected',
- 'connector.flush-on-checkpoint' = 'true',
- 'connector.bulk-flush.max-actions' = '10000',
- 'connector.bulk-flush.max-size' = '2 mb',
- 'connector.bulk-flush.interval' = '2000',
- 'connector.connection-max-retry-timeout' = '300',
- 'format.type' = 'json'
- );
- INSERT INTO to_es
- SELECT
- concat(CAST(id as string),'-',CAST(mod(cid,100) AS VARCHAR)) as id,
- id tableid,
- tablename,
- cid,
- gid,
- content,
- DATE_FORMAT(create_time, 'yyyy-MM-dd HH:mm:ss') as create_time
- from from_mysql
复制代码这里主要留意字段类型
scan.startup.mode:"initial"(默认,同步汗青数据),"latest-offset" 同步增量数据
最后insert可以加where,只同步需要的行数据
es设置
设置好mapping、setting和自己的分词器
利用自字义分词是由于字段中全部涉及的标点符号、空格等都可以来检索
- PUT myindex-20230314/
- {
- "mappings": {
- "properties": {
- "id":{
- "type": "text"
- },
- "tableid":{
- "type": "long"
- },
- "cid":{
- "type": "long"
- },
- "gid":{
- "type": "text",
- "analyzer": "my_analyzer"
- },
- "content":{
- "type": "text",
- "analyzer": "my_analyzer"
- },
- "create_time" : {
- "type" : "keyword"
- }
- }
- },
- "settings": {
- "index":{
- "number_of_shards": "10",
- "number_of_replicas": "1",
- "refresh_interval" : "1s",
- "translog": {
- "sync_interval": "30s",
- "durability": "async"
- },
- "codec": "best_compression",
- "analysis": {
- "analyzer": {
- "my_analyzer": {
- "tokenizer": "my_tokenizer",
- "filter": [
- "lowercase"
- ]
- }
- },
- "tokenizer": {
- "my_tokenizer": {
- "type": "ngram",
- "min_gram": 1,
- "max_gram": 2,
- "token_chars": [
- "letter",
- "digit","whitespace","punctuation","symbol"
- ]
- }
- }
- }
- }
- }
- }
复制代码利用别名,方便后续的维护
- POST /_aliases
- {
- "actions": [
- { "add": { "index": "myindex-20230314", "alias": "myindex" }}
- ]
- }
复制代码 之前测试的
- canal单进程延迟越来越大,单独设置汗青数据同步
- go-mysql-elasticsearch经常报错重新同步
- logstash同步100张分表不知道怎么设置
oceanus是收费的对于运维人员不敷的情况,可以参考,有精力的可以思量flink。
欢迎关注我的公众号:老王76。一起进步吧!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |