IT评测·应用市场-qidao123.com技术社区

标题: SpringBoot2集成Elasticsearch8(使用spring-boot-starter-data-elasticsearch) [打印本页]

作者: 耶耶耶耶耶    时间: 2025-3-25 22:09
标题: SpringBoot2集成Elasticsearch8(使用spring-boot-starter-data-elasticsearch)
写在前面

使用spring-boot-starter-data-elasticsearch集成Elasticsearch8?
What? 官方写的不支持啊?让我们来看下官方给出的版本建议。

官方地址:
https://docs.spring.io/spring-data/elasticsearch/reference/elasticsearch/versions.html
实际需求

大部分稳定的生产系统仍然使用的是SpringBoot2.x,即使使用最新的SpringBoot2.7.x,
也会发现使用spring-boot-starter-data-elasticsearch去连接Elasticsearch8也是连不上的。具体报错就不贴了,但是你说我们升级SpringBoot3?固然可以,但是jdk也得升级,
生产系统那种运行的稳定的一批的客户能同意,领导能同意么?显然是,很难。
深入源码

废话不多说,先说改造方案,就是我们本身使用自定义的类覆盖官方的默认实现。
案比方下:
  1. /*
  2. * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
  3. * or more contributor license agreements. Licensed under the Elastic License
  4. * 2.0 and the Server Side Public License, v 1; you may not use this file except
  5. * in compliance with, at your election, the Elastic License 2.0 or the Server
  6. * Side Public License, v 1.
  7. */
  8. package org.elasticsearch.action;
  9. import cn.hutool.core.util.StrUtil;
  10. import org.elasticsearch.Version;
  11. import org.elasticsearch.action.support.WriteRequest;
  12. import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
  13. import org.elasticsearch.action.support.WriteResponse;
  14. import org.elasticsearch.action.support.replication.ReplicationResponse;
  15. import org.elasticsearch.cluster.metadata.IndexMetadata;
  16. import org.elasticsearch.common.io.stream.StreamInput;
  17. import org.elasticsearch.common.io.stream.StreamOutput;
  18. import org.elasticsearch.common.io.stream.Writeable;
  19. import org.elasticsearch.common.xcontent.StatusToXContentObject;
  20. import org.elasticsearch.core.Nullable;
  21. import org.elasticsearch.index.Index;
  22. import org.elasticsearch.index.IndexSettings;
  23. import org.elasticsearch.index.seqno.SequenceNumbers;
  24. import org.elasticsearch.index.shard.ShardId;
  25. import org.elasticsearch.rest.RestStatus;
  26. import org.elasticsearch.xcontent.XContentBuilder;
  27. import org.elasticsearch.xcontent.XContentParser;
  28. import java.io.IOException;
  29. import java.io.UnsupportedEncodingException;
  30. import java.net.URLEncoder;
  31. import java.util.Locale;
  32. import java.util.Objects;
  33. import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
  34. import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
  35. import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
  36. /**
  37. * A base class for the response of a write operation that involves a single doc
  38. */
  39. public abstract class DocWriteResponse extends ReplicationResponse implements WriteResponse, StatusToXContentObject {
  40.     private static final String _SHARDS = "_shards";
  41.     private static final String _INDEX = "_index";
  42.     private static final String _TYPE = "_type";
  43.     private static final String _ID = "_id";
  44.     private static final String _VERSION = "_version";
  45.     private static final String _SEQ_NO = "_seq_no";
  46.     private static final String _PRIMARY_TERM = "_primary_term";
  47.     private static final String RESULT = "result";
  48.     private static final String FORCED_REFRESH = "forced_refresh";
  49.     /**
  50.      * An enum that represents the results of CRUD operations, primarily used to communicate the type of
  51.      * operation that occurred.
  52.      */
  53.     public enum Result implements Writeable {
  54.         CREATED(0),
  55.         UPDATED(1),
  56.         DELETED(2),
  57.         NOT_FOUND(3),
  58.         NOOP(4);
  59.         private final byte op;
  60.         private final String lowercase;
  61.         Result(int op) {
  62.             this.op = (byte) op;
  63.             this.lowercase = this.name().toLowerCase(Locale.ROOT);
  64.         }
  65.         public byte getOp() {
  66.             return op;
  67.         }
  68.         public String getLowercase() {
  69.             return lowercase;
  70.         }
  71.         public static Result readFrom(StreamInput in) throws IOException {
  72.             Byte opcode = in.readByte();
  73.             switch (opcode) {
  74.                 case 0:
  75.                     return CREATED;
  76.                 case 1:
  77.                     return UPDATED;
  78.                 case 2:
  79.                     return DELETED;
  80.                 case 3:
  81.                     return NOT_FOUND;
  82.                 case 4:
  83.                     return NOOP;
  84.                 default:
  85.                     throw new IllegalArgumentException("Unknown result code: " + opcode);
  86.             }
  87.         }
  88.         @Override
  89.         public void writeTo(StreamOutput out) throws IOException {
  90.             out.writeByte(op);
  91.         }
  92.     }
  93.     private final ShardId shardId;
  94.     private final String id;
  95.     private final String type;
  96.     private final long version;
  97.     private final long seqNo;
  98.     private final long primaryTerm;
  99.     private boolean forcedRefresh;
  100.     protected final Result result;
  101.     public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
  102.         this.shardId = Objects.requireNonNull(shardId);
  103.         this.type = StrUtil.isEmpty(type)?"_doc":type;
  104.         this.id = Objects.requireNonNull(id);
  105.         this.seqNo = seqNo;
  106.         this.primaryTerm = primaryTerm;
  107.         this.version = version;
  108.         this.result = Objects.requireNonNull(result);
  109.     }
  110.     // needed for deserialization
  111.     protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException {
  112.         this.shardId = shardId;
  113.         String typeTmp = in.readString();
  114.         type =  StrUtil.isEmpty(typeTmp)?"_doc":typeTmp;;
  115.         id = in.readString();
  116.         version = in.readZLong();
  117.         seqNo = in.readZLong();
  118.         primaryTerm = in.readVLong();
  119.         forcedRefresh = in.readBoolean();
  120.         result = Result.readFrom(in);
  121.     }
  122.     /**
  123.      * Needed for deserialization of single item requests in {@link org.elasticsearch.action.index.IndexAction} and BwC
  124.      * deserialization path
  125.      */
  126.     protected DocWriteResponse(StreamInput in) throws IOException {
  127.         super(in);
  128.         shardId = new ShardId(in);
  129.         String typeTmp = in.readString();
  130.         type =  StrUtil.isEmpty(typeTmp)?"_doc":typeTmp;;
  131.         id = in.readString();
  132.         version = in.readZLong();
  133.         if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
  134.             seqNo = in.readZLong();
  135.             primaryTerm = in.readVLong();
  136.         } else {
  137.             seqNo = UNASSIGNED_SEQ_NO;
  138.             primaryTerm = UNASSIGNED_PRIMARY_TERM;
  139.         }
  140.         forcedRefresh = in.readBoolean();
  141.         result = Result.readFrom(in);
  142.     }
  143.     /**
  144.      * The change that occurred to the document.
  145.      */
  146.     public Result getResult() {
  147.         return result;
  148.     }
  149.     /**
  150.      * The index the document was changed in.
  151.      */
  152.     public String getIndex() {
  153.         return this.shardId.getIndexName();
  154.     }
  155.     /**
  156.      * The exact shard the document was changed in.
  157.      */
  158.     public ShardId getShardId() {
  159.         return this.shardId;
  160.     }
  161.     /**
  162.      * The type of the document changed.
  163.      *
  164.      * @deprecated Types are in the process of being removed.
  165.      */
  166.     @Deprecated
  167.     public String getType() {
  168.         return this.type;
  169.     }
  170.     /**
  171.      * The id of the document changed.
  172.      */
  173.     public String getId() {
  174.         return this.id;
  175.     }
  176.     /**
  177.      * Returns the current version of the doc.
  178.      */
  179.     public long getVersion() {
  180.         return this.version;
  181.     }
  182.     /**
  183.      * Returns the sequence number assigned for this change. Returns {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if the operation
  184.      * wasn't performed (i.e., an update operation that resulted in a NOOP).
  185.      */
  186.     public long getSeqNo() {
  187.         return seqNo;
  188.     }
  189.     /**
  190.      * The primary term for this change.
  191.      *
  192.      * @return the primary term
  193.      */
  194.     public long getPrimaryTerm() {
  195.         return primaryTerm;
  196.     }
  197.     /**
  198.      * Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
  199.      * {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
  200.      * only return true here if they run out of refresh listener slots (see {@link IndexSettings#MAX_REFRESH_LISTENERS_PER_SHARD}).
  201.      */
  202.     public boolean forcedRefresh() {
  203.         return forcedRefresh;
  204.     }
  205.     @Override
  206.     public void setForcedRefresh(boolean forcedRefresh) {
  207.         this.forcedRefresh = forcedRefresh;
  208.     }
  209.     /** returns the rest status for this response (based on {@link ShardInfo#status()} */
  210.     @Override
  211.     public RestStatus status() {
  212.         return getShardInfo().status();
  213.     }
  214.     /**
  215.      * Return the relative URI for the location of the document suitable for use in the {@code Location} header. The use of relative URIs is
  216.      * permitted as of HTTP/1.1 (cf. https://tools.ietf.org/html/rfc7231#section-7.1.2).
  217.      *
  218.      * @param routing custom routing or {@code null} if custom routing is not used
  219.      * @return the relative URI for the location of the document
  220.      */
  221.     public String getLocation(@Nullable String routing) {
  222.         final String encodedIndex;
  223.         final String encodedType;
  224.         final String encodedId;
  225.         final String encodedRouting;
  226.         try {
  227.             // encode the path components separately otherwise the path separators will be encoded
  228.             encodedIndex = URLEncoder.encode(getIndex(), "UTF-8");
  229.             encodedType = URLEncoder.encode(getType(), "UTF-8");
  230.             encodedId = URLEncoder.encode(getId(), "UTF-8");
  231.             encodedRouting = routing == null ? null : URLEncoder.encode(routing, "UTF-8");
  232.         } catch (final UnsupportedEncodingException e) {
  233.             throw new AssertionError(e);
  234.         }
  235.         final String routingStart = "?routing=";
  236.         final int bufferSizeExcludingRouting = 3 + encodedIndex.length() + encodedType.length() + encodedId.length();
  237.         final int bufferSize;
  238.         if (encodedRouting == null) {
  239.             bufferSize = bufferSizeExcludingRouting;
  240.         } else {
  241.             bufferSize = bufferSizeExcludingRouting + routingStart.length() + encodedRouting.length();
  242.         }
  243.         final StringBuilder location = new StringBuilder(bufferSize);
  244.         location.append('/').append(encodedIndex);
  245.         location.append('/').append(encodedType);
  246.         location.append('/').append(encodedId);
  247.         if (encodedRouting != null) {
  248.             location.append(routingStart).append(encodedRouting);
  249.         }
  250.         return location.toString();
  251.     }
  252.     public void writeThin(StreamOutput out) throws IOException {
  253.         super.writeTo(out);
  254.         writeWithoutShardId(out);
  255.     }
  256.     @Override
  257.     public void writeTo(StreamOutput out) throws IOException {
  258.         super.writeTo(out);
  259.         shardId.writeTo(out);
  260.         writeWithoutShardId(out);
  261.     }
  262.     private void writeWithoutShardId(StreamOutput out) throws IOException {
  263.         out.writeString(type);
  264.         out.writeString(id);
  265.         out.writeZLong(version);
  266.         if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
  267.             out.writeZLong(seqNo);
  268.             out.writeVLong(primaryTerm);
  269.         }
  270.         out.writeBoolean(forcedRefresh);
  271.         result.writeTo(out);
  272.     }
  273.     @Override
  274.     public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
  275.         builder.startObject();
  276.         innerToXContent(builder, params);
  277.         builder.endObject();
  278.         return builder;
  279.     }
  280.     public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
  281.         ReplicationResponse.ShardInfo shardInfo = getShardInfo();
  282.         builder.field(_INDEX, shardId.getIndexName());
  283.         builder.field(_TYPE, type);
  284.         builder.field(_ID, id).field(_VERSION, version).field(RESULT, getResult().getLowercase());
  285.         if (forcedRefresh) {
  286.             builder.field(FORCED_REFRESH, true);
  287.         }
  288.         builder.field(_SHARDS, shardInfo);
  289.         if (getSeqNo() >= 0) {
  290.             builder.field(_SEQ_NO, getSeqNo());
  291.             builder.field(_PRIMARY_TERM, getPrimaryTerm());
  292.         }
  293.         return builder;
  294.     }
  295.     /**
  296.      * Parse the output of the {@link #innerToXContent(XContentBuilder, Params)} method.
  297.      *
  298.      * This method is intended to be called by subclasses and must be called multiple times to parse all the information concerning
  299.      * {@link DocWriteResponse} objects. It always parses the current token, updates the given parsing context accordingly
  300.      * if needed and then immediately returns.
  301.      */
  302.     protected static void parseInnerToXContent(XContentParser parser, Builder context) throws IOException {
  303.         XContentParser.Token token = parser.currentToken();
  304.         ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);
  305.         String currentFieldName = parser.currentName();
  306.         token = parser.nextToken();
  307.         if (token.isValue()) {
  308.             if (_INDEX.equals(currentFieldName)) {
  309.                 // index uuid and shard id are unknown and can't be parsed back for now.
  310.                 context.setShardId(new ShardId(new Index(parser.text(), IndexMetadata.INDEX_UUID_NA_VALUE), -1));
  311.             } else if (_TYPE.equals(currentFieldName)) {
  312.                 context.setType(parser.text());
  313.             } else if (_ID.equals(currentFieldName)) {
  314.                 context.setId(parser.text());
  315.             } else if (_VERSION.equals(currentFieldName)) {
  316.                 context.setVersion(parser.longValue());
  317.             } else if (RESULT.equals(currentFieldName)) {
  318.                 String result = parser.text();
  319.                 for (Result r : Result.values()) {
  320.                     if (r.getLowercase().equals(result)) {
  321.                         context.setResult(r);
  322.                         break;
  323.                     }
  324.                 }
  325.             } else if (FORCED_REFRESH.equals(currentFieldName)) {
  326.                 context.setForcedRefresh(parser.booleanValue());
  327.             } else if (_SEQ_NO.equals(currentFieldName)) {
  328.                 context.setSeqNo(parser.longValue());
  329.             } else if (_PRIMARY_TERM.equals(currentFieldName)) {
  330.                 context.setPrimaryTerm(parser.longValue());
  331.             }
  332.         } else if (token == XContentParser.Token.START_OBJECT) {
  333.             if (_SHARDS.equals(currentFieldName)) {
  334.                 context.setShardInfo(ShardInfo.fromXContent(parser));
  335.             } else {
  336.                 parser.skipChildren(); // skip potential inner objects for forward compatibility
  337.             }
  338.         } else if (token == XContentParser.Token.START_ARRAY) {
  339.             parser.skipChildren(); // skip potential inner arrays for forward compatibility
  340.         }
  341.     }
  342.     /**
  343.      * Base class of all {@link DocWriteResponse} builders. These {@link DocWriteResponse.Builder} are used during
  344.      * xcontent parsing to temporarily store the parsed values, then the {@link Builder#build()} method is called to
  345.      * instantiate the appropriate {@link DocWriteResponse} with the parsed values.
  346.      */
  347.     public abstract static class Builder {
  348.         protected ShardId shardId = null;
  349.         protected String type = null;
  350.         protected String id = null;
  351.         protected Long version = null;
  352.         protected Result result = null;
  353.         protected boolean forcedRefresh;
  354.         protected ShardInfo shardInfo = null;
  355.         protected long seqNo = UNASSIGNED_SEQ_NO;
  356.         protected long primaryTerm = UNASSIGNED_PRIMARY_TERM;
  357.         public ShardId getShardId() {
  358.             return shardId;
  359.         }
  360.         public void setShardId(ShardId shardId) {
  361.             this.shardId = shardId;
  362.         }
  363.         public String getType() {
  364.             return type;
  365.         }
  366.         public void setType(String type) {
  367.             type =  StrUtil.isEmpty(type)?"_doc":type;;
  368.         }
  369.         public String getId() {
  370.             return id;
  371.         }
  372.         public void setId(String id) {
  373.             this.id = id;
  374.         }
  375.         public void setVersion(Long version) {
  376.             this.version = version;
  377.         }
  378.         public void setResult(Result result) {
  379.             this.result = result;
  380.         }
  381.         public void setForcedRefresh(boolean forcedRefresh) {
  382.             this.forcedRefresh = forcedRefresh;
  383.         }
  384.         public void setShardInfo(ShardInfo shardInfo) {
  385.             this.shardInfo = shardInfo;
  386.         }
  387.         public void setSeqNo(long seqNo) {
  388.             this.seqNo = seqNo;
  389.         }
  390.         public void setPrimaryTerm(long primaryTerm) {
  391.             this.primaryTerm = primaryTerm;
  392.         }
  393.         public abstract DocWriteResponse build();
  394.     }
  395. }
复制代码
写在最后

假如想要相识改的内容,可以找到官方源码对比下实现,然后本身DEBUG一下,
然后就可以知道Elasticsearch7和Elasticsearch8在连接时候的区别了。
假如要问为什么,答案就是可以本身动态创建索引,还可以指定字段类型,就是
这么香而已。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4