Elasticsearch不停机切换(上云)方案

  论坛元老 | 2024-9-23 09:35:30 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1737|帖子 1737|积分 5211

怎样给飞行中的飞机换引擎?
   配景



  • 业务配景



  • 技术配景

    • 线下集群40个索引左右,总数据量不大,不到100G
    • 因为ES承担的业务鉴权业务,以是不能接受停机割接

      • 尚有就是ES中数据来自各个业务方,推送的时机不定,也没有完备的重推机制,以是不能停机割接

    • 索引中根本都没有创建或者更新时间字段,纵然部分有,也没有用起来

      • 也就无法使用logstash的增量同步功能。

    • 盼望不进行业务改造,直接替换。
    • 虽然服务分为了读写服务,但通过读服务还是可以调用写入的API,通过写服务也可以调用读的API。

架构方案



  • 全量数据同步logstash
  • 脚步比对出来的差别数据,脚步补数

注意:


  • CLB及署理层的配置一定有冗余
  • 如果个CLB支撑不了,可以思量

    • 方式一:直接申请多个CLB,并将这多个CLB的地点配置到应用中
    • 方式二:先申请一个EIP,在EIP的后面配置多个CLB,这样应用只配置一个EIP的地点就可以了
    • 方式三:CLB直接升配到NLB

  • CLB文档
  • NLB文档
  • 准备两套CLB及署理层的缘故原由是:署理层是个Nginx集群,手动一台一台更新配置然后reload很慢,这时候数据写入的主ES是不确定的。
比对核心逻辑



  • 获取线下集群所有索引(跳过系统以是及不需要迁徙的索引)
  • 遍历第一步获取到的索引聚集

    • 获取线上、线下索引的文档总数,如果总数不一样,终止比对;
    • 如果总数一样,则通过search after(需要)分页分别从线上、线下获取数据比对。

注意:search_after的排序字段聚集有几个要求

  • 如果_id就是业务ID,则直接使用该字段;
  • 如果_id是ES自动生成的ID,则需要使用业务ID字段来排序(需要保证该业务ID索引内部不重复;如果不能保证,则需要添加其他字段来保证唯一;保证唯一的目的就是比对的两个索引在相同位置的文档就应该是一样的,不一样就是有问题);
  • 如果无法找到能构建复合主键的字段,则需要将索引数据完备的拉到内存中,然后根据mapping将所有字段拼接构建组合ID,然后去重,再依次比对。(索引条数不一样的,也可以通过类似的方式来查找非常的缘故原由;采取这种简朴粗暴方式的缘故原由是:1、我们这种范例索引的数据量不大 2、这个比对程序其实就是个临时的工具,不会长期使用)
   模板、mapping、index setting这些都需要比对。
  比对核心代码

MapFlatUtil.java
  1. import java.util.*;
  2. /**
  3. * @Author jiankunking
  4. * @Date 2024/9/4 17:13
  5. * @Description:
  6. */
  7. public class MapFlatUtil {
  8.     static String PREFIX = ".";
  9.     public static Map<String, Object> flat(Map<String, Object> map) {
  10.         Map<String, Object> configMap = new LinkedHashMap<>();
  11.         map.entrySet().forEach(entry -> {
  12.             if (entry.getValue() instanceof Map) {
  13.                 Map<String, Object> subMap = flat(entry.getKey(), (Map<String, Object>) entry.getValue());
  14.                 if (!subMap.isEmpty()) {
  15.                     configMap.putAll(subMap);
  16.                 }
  17.             } else if (entry.getValue() instanceof List) {
  18.                 configMap.put(entry.getKey(), entry.getValue());
  19.             } else {
  20.                 configMap.put(entry.getKey(), entry.getValue() == null ? "" : String.valueOf(entry.getValue()));
  21.             }
  22.         });
  23.         return configMap;
  24.     }
  25.     private static Map<String, Object> flat(String parentNode, Map<String, Object> source) {
  26.         Map<String, Object> flatMap = new LinkedHashMap<>();
  27.         Set<Map.Entry<String, Object>> set = source.entrySet();
  28.         set.forEach(entity -> {
  29.             Object value = entity.getValue();
  30.             String key = entity.getKey();
  31.             String newKey = parentNode + PREFIX + key;
  32.             if (value instanceof Map) {
  33.                 flatMap.putAll(flat(newKey, (Map<String, Object>) value));
  34.             } else if (value instanceof List) {
  35.                 flatMap.put(newKey, value);
  36.             } else {
  37.                 flatMap.put(newKey, value == null ? "" : String.valueOf(value));
  38.             }
  39.         });
  40.         return flatMap;
  41.     }
  42. }
复制代码
MapCompareUtil.java
  1. import com.fasterxml.jackson.core.JsonProcessingException;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.util.ArrayList;
  5. import java.util.Comparator;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.stream.Collectors;
  9. import static com.jiankunking.branchcompare.es.SortUtil.mapComparator;
  10. /**
  11. * @Author jiankunking
  12. * @Date 2024/9/14 9:48
  13. * @Description:
  14. */
  15. @Slf4j
  16. public class MapCompareUtil {
  17.     public static boolean isMapEquals(Map<String, Object> offlineMap, Map<String, Object> onlineMap) throws JsonProcessingException {
  18.         offlineMap = MapFlatUtil.flat(offlineMap);
  19.         onlineMap = MapFlatUtil.flat(onlineMap);
  20.         if (offlineMap.size() != onlineMap.size()) {
  21.             return false;
  22.         }
  23.         for (Map.Entry<String, Object> offlineEntry : offlineMap.entrySet()) {
  24.             String offlineEntryKey = offlineEntry.getKey();
  25.             if (!onlineMap.containsKey(offlineEntryKey)) {
  26.                 return false;
  27.             }
  28.             Object offlineEntryValue = offlineEntry.getValue();
  29.             Object onlineEntryValue = onlineMap.get(offlineEntryKey);
  30.             Class offlineEntryValueClass = offlineEntryValue.getClass();
  31.             Class onlineEntryValueClass = onlineEntryValue.getClass();
  32.             if (offlineEntryValueClass != onlineEntryValueClass) {
  33.                 log.warn("value type not equals,offlineEntryValue:" + offlineEntryValueClass.getName() + ",onlineEntryValue:" + onlineEntryValueClass.getName());
  34.                 return false;
  35.             }
  36.             if (offlineEntryValue instanceof Map) {
  37.                 Map<String, Object> offlineMapValue = (Map<String, Object>) offlineEntryValue;
  38.                 Map<String, Object> onlineMapValue = (Map<String, Object>) onlineEntryValue;
  39.                 if (!isMapEquals(offlineMapValue, onlineMapValue)) {
  40.                     return false;
  41.                 }
  42.                 continue;
  43.             } else if (offlineEntryValue instanceof List) {
  44.                 List<Object> offlineList = (List<Object>) offlineEntryValue;
  45.                 List<Object> onlineList = (List<Object>) onlineEntryValue;
  46.                 if (offlineList.size() != onlineList.size()) {
  47.                     log.warn("list size not equals,offlineList:" + offlineList.size() + ",onlineList:" + onlineList.size());
  48.                     return false;
  49.                 }
  50.                 // List<Map>
  51.                 if (!offlineList.isEmpty() && offlineList.get(0) instanceof Map) {
  52.                     List<Map<String, Object>> offlineEntryValueTmp = (List<Map<String, Object>>) offlineEntryValue;
  53.                     List<Map<String, Object>> onlineEntryValueTmp = (List<Map<String, Object>>) onlineEntryValue;
  54.                     List<SortUtil.Sort> sorts = new ArrayList<>();
  55.                     // 按照map 的key 排序
  56.                     for (Map.Entry<String, Object> entry : offlineEntryValueTmp.get(0).entrySet()) {
  57.                         sorts.add(new SortUtil.Sort(entry.getKey(), SortUtil.Order.ASC));
  58.                     }
  59.                     List<Map<String, Object>> offlineEntryValueSorted = offlineEntryValueTmp.stream()
  60.                                                                                             .sorted(mapComparator(sorts))
  61.                                                                                             .collect(Collectors.toList());
  62.                     List<Map<String, Object>> onlineEntryValueSorted = onlineEntryValueTmp.stream()
  63.                                                                                           .sorted(mapComparator(sorts))
  64.                                                                                           .collect(Collectors.toList());
  65.                     for (int i = 0; i < offlineEntryValueSorted.size(); i++) {
  66.                         Object offlineListItem = offlineEntryValueSorted.get(i);
  67.                         Object onlineListItem = onlineEntryValueSorted.get(i);
  68.                         if (!isMapEquals((Map<String, Object>) offlineListItem, (Map<String, Object>) onlineListItem)) {
  69.                             return false;
  70.                         }
  71.                     }
  72.                 } else {
  73.                     // List<简单类型>
  74.                     offlineList.sort(Comparator.comparing(o -> o.toString()));
  75.                     onlineList.sort(Comparator.comparing(o -> o.toString()));
  76.                     for (int i = 0; i < offlineList.size(); i++) {
  77.                         Object offlineListItem = offlineList.get(i);
  78.                         Object onlineListItem = onlineList.get(i);
  79.                         if (!simpleObjectEquals(offlineListItem, onlineListItem)) {
  80.                             log.warn("list item not equals,offlineListItem:" + offlineListItem + ",onlineListItem:" + onlineListItem);
  81.                             return false;
  82.                         }
  83.                     }
  84.                 }
  85.                 continue;
  86.             }
  87.             if (!simpleObjectEquals(offlineEntryValue, onlineEntryValue)) {
  88.                 log.warn("map value not equals,offlineEntryValue:" + offlineEntryValue + ",onlineEntryValue:" + onlineEntryValue);
  89.                 return false;
  90.             }
  91.         }
  92.         return true;
  93.     }
  94.     // 只能处理简单对象 不能处理Map List等复杂类型
  95.     private static boolean simpleObjectEquals(Object o1, Object o2) throws JsonProcessingException {
  96.         String offlineJson = new ObjectMapper().writeValueAsString(o1);
  97.         String onlineJson = new ObjectMapper().writeValueAsString(o2);
  98.         if (offlineJson.equals(onlineJson)) {
  99.             return true;
  100.         }
  101.         return false;
  102.     }
  103. }
复制代码
SortUtil.java
  1. import java.math.BigDecimal;
  2. import java.util.*;
  3. import java.util.stream.Collectors;
  4. /**
  5. * @Author jiankunking
  6. * @Date 2024/9/5 14:00
  7. * @Description: https://gist.github.com/IOsetting/25ca8d70c12c11390113d343f666cd6e
  8. */
  9. public class SortUtil {
  10.     public enum Order {ASC, DESC}
  11.     /**
  12.      * @param sorts keys and sort direction
  13.      * @return sorted list
  14.      */
  15.     public static Comparator<Map<String, Object>> mapComparator(List<Sort> sorts) {
  16.         return (o1, o2) -> {
  17.             int ret = 0;
  18.             for (Sort sort : sorts) {
  19.                 Object v1 = o1.get(sort.field);
  20.                 Object v2 = o2.get(sort.field);
  21.                 ret = singleCompare(v1, v2, sort.order == Order.ASC);
  22.                 if (ret != 0) {
  23.                     break;
  24.                 }
  25.             }
  26.             return ret;
  27.         };
  28.     }
  29.     public static class Sort {
  30.         public String field;
  31.         public Order order;
  32.         public Sort(String field, Order order) {
  33.             this.field = field;
  34.             this.order = order;
  35.         }
  36.     }
  37.     private static int singleCompare(Object ao, Object bo, boolean asc) {
  38.         int ret;
  39.         if (ao == null && bo == null) {
  40.             ret = 0;
  41.         } else if (ao == null) {
  42.             ret = -1;
  43.         } else if (bo == null) {
  44.             ret = 1;
  45.         } else if (ao instanceof BigDecimal) {
  46.             ret = ((BigDecimal) ao).compareTo((BigDecimal) bo);
  47.         } else if (ao instanceof Number) {
  48.             if (((Number) ao).doubleValue() != ((Number) bo).doubleValue()) {
  49.                 ret = ((Number) ao).doubleValue() > ((Number) bo).doubleValue() ? 1 : -1;
  50.             } else {
  51.                 ret = 0;
  52.             }
  53.         } else if (ao instanceof Date) {
  54.             ret = ((Date) ao).compareTo((Date) bo);
  55.         } else {
  56.             ret = String.valueOf(ao).compareTo(String.valueOf(bo));
  57.         }
  58.         if (!asc) {
  59.             return -ret;
  60.         }
  61.         return ret;
  62.     }
  63.     public static void main(String[] args) {
  64.         List<Map<String, Object>> list = new ArrayList<>();
  65.         List<Sort> sorts = new ArrayList<>();
  66.         List<Map<String, Object>> sorted = list.stream()
  67.                                                .sorted(mapComparator(sorts))
  68.                                                .collect(Collectors.toList());
  69.         for (Map<String, Object> map : sorted) {
  70.             System.out.println(map.get("somekey"));
  71.         }
  72.     }
  73. }
复制代码
EsQueryUtil.java
  1. public static SearchResponse searchAfterByMultiFields(RestHighLevelClient restHighLevelClient, String indexName, List<String> searchAfterSortFields, List<Object> searchAfterValues, int size) throws IOException {
  2.         SearchSourceBuilder builder = new SearchSourceBuilder();
  3.         builder.size(size);
  4.         builder.trackTotalHits(true);
  5.         builder.query(QueryBuilders.matchAllQuery());
  6.         // USING SEARCH AFTER
  7.         if (searchAfterValues != null && !searchAfterValues.isEmpty()) {
  8.             builder.searchAfter(searchAfterValues.toArray());
  9.         }
  10.         for (String sortField : searchAfterSortFields) {
  11.             builder.sort(sortField, SortOrder.ASC);
  12.         }
  13.         SearchRequest searchRequest = new SearchRequest();
  14.         searchRequest.indices(indexName);
  15.         searchRequest.source(builder);
  16.         // log.info(searchRequest.toString());
  17.         log.info(searchRequest.source().toString());
  18.         SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  19.         return response;
  20.     }
  21.     static List<Object> getSearchAfterValues(List<String> searchAfterSortFields, SearchHit hit) {
  22.         List<Object> searchAfterValues = new ArrayList<>(searchAfterSortFields.size());
  23.         Map<String, Object> map = hit.getSourceAsMap();
  24.         for (String field : searchAfterSortFields) {
  25.             if (field.equals("_id")) {
  26.                 searchAfterValues.add(hit.getId());
  27.             } else {
  28.                 searchAfterValues.add(map.get(field));
  29.             }
  30.         }
  31.         return searchAfterValues;
  32.     }
复制代码
反思



  • 要拉通全流程及相关人员,查对每个可能出现的问题及应对方案
  • 有些东西不能因为是临时的就放松鉴戒性

    • 好比本次署理层申请的机器是有两块的盘:1、一个50G的系统盘 2、一个500G的数据盘;但最终落地的时候云厂商同砚还是把nginx的访问日记落到了系统盘,导致系统盘满了,系统受到的影响。

      • 这个500G的盘其时还讨论过,要用来存储访问日记,防止机器磁盘写满。

    • 任务列表也梳理了署理层遇到问题要发送告警,但没有一一核实,导致系统盘满的时候,没有第一时间收到告警。
    • 只要是在核心链路上的,不管是不是临时的,必须一一测试、验证。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表