写在前面
使用spring-boot-starter-data-elasticsearch集成Elasticsearch8?
What? 官方写的不支持啊?让我们来看下官方给出的版本建议。
官方地址:
https://docs.spring.io/spring-data/elasticsearch/reference/elasticsearch/versions.html
实际需求
大部分稳定的生产系统仍然使用的是SpringBoot2.x,即使使用最新的SpringBoot2.7.x,
也会发现使用spring-boot-starter-data-elasticsearch去连接Elasticsearch8也是连不上的。具体报错就不贴了,但是你说我们升级SpringBoot3?固然可以,但是jdk也得升级,
生产系统那种运行的稳定的一批的客户能同意,领导能同意么?显然是,很难。
深入源码
废话不多说,先说改造方案,就是我们本身使用自定义的类覆盖官方的默认实现。
案比方下:
- /*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
- package org.elasticsearch.action;
- import cn.hutool.core.util.StrUtil;
- import org.elasticsearch.Version;
- import org.elasticsearch.action.support.WriteRequest;
- import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
- import org.elasticsearch.action.support.WriteResponse;
- import org.elasticsearch.action.support.replication.ReplicationResponse;
- import org.elasticsearch.cluster.metadata.IndexMetadata;
- import org.elasticsearch.common.io.stream.StreamInput;
- import org.elasticsearch.common.io.stream.StreamOutput;
- import org.elasticsearch.common.io.stream.Writeable;
- import org.elasticsearch.common.xcontent.StatusToXContentObject;
- import org.elasticsearch.core.Nullable;
- import org.elasticsearch.index.Index;
- import org.elasticsearch.index.IndexSettings;
- import org.elasticsearch.index.seqno.SequenceNumbers;
- import org.elasticsearch.index.shard.ShardId;
- import org.elasticsearch.rest.RestStatus;
- import org.elasticsearch.xcontent.XContentBuilder;
- import org.elasticsearch.xcontent.XContentParser;
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.net.URLEncoder;
- import java.util.Locale;
- import java.util.Objects;
- import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
- import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
- import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
- /**
- * A base class for the response of a write operation that involves a single doc
- */
- public abstract class DocWriteResponse extends ReplicationResponse implements WriteResponse, StatusToXContentObject {
- private static final String _SHARDS = "_shards";
- private static final String _INDEX = "_index";
- private static final String _TYPE = "_type";
- private static final String _ID = "_id";
- private static final String _VERSION = "_version";
- private static final String _SEQ_NO = "_seq_no";
- private static final String _PRIMARY_TERM = "_primary_term";
- private static final String RESULT = "result";
- private static final String FORCED_REFRESH = "forced_refresh";
- /**
- * An enum that represents the results of CRUD operations, primarily used to communicate the type of
- * operation that occurred.
- */
- public enum Result implements Writeable {
- CREATED(0),
- UPDATED(1),
- DELETED(2),
- NOT_FOUND(3),
- NOOP(4);
- private final byte op;
- private final String lowercase;
- Result(int op) {
- this.op = (byte) op;
- this.lowercase = this.name().toLowerCase(Locale.ROOT);
- }
- public byte getOp() {
- return op;
- }
- public String getLowercase() {
- return lowercase;
- }
- public static Result readFrom(StreamInput in) throws IOException {
- Byte opcode = in.readByte();
- switch (opcode) {
- case 0:
- return CREATED;
- case 1:
- return UPDATED;
- case 2:
- return DELETED;
- case 3:
- return NOT_FOUND;
- case 4:
- return NOOP;
- default:
- throw new IllegalArgumentException("Unknown result code: " + opcode);
- }
- }
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- out.writeByte(op);
- }
- }
- private final ShardId shardId;
- private final String id;
- private final String type;
- private final long version;
- private final long seqNo;
- private final long primaryTerm;
- private boolean forcedRefresh;
- protected final Result result;
- public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
- this.shardId = Objects.requireNonNull(shardId);
- this.type = StrUtil.isEmpty(type)?"_doc":type;
- this.id = Objects.requireNonNull(id);
- this.seqNo = seqNo;
- this.primaryTerm = primaryTerm;
- this.version = version;
- this.result = Objects.requireNonNull(result);
- }
- // needed for deserialization
- protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException {
- this.shardId = shardId;
- String typeTmp = in.readString();
- type = StrUtil.isEmpty(typeTmp)?"_doc":typeTmp;;
- id = in.readString();
- version = in.readZLong();
- seqNo = in.readZLong();
- primaryTerm = in.readVLong();
- forcedRefresh = in.readBoolean();
- result = Result.readFrom(in);
- }
- /**
- * Needed for deserialization of single item requests in {@link org.elasticsearch.action.index.IndexAction} and BwC
- * deserialization path
- */
- protected DocWriteResponse(StreamInput in) throws IOException {
- super(in);
- shardId = new ShardId(in);
- String typeTmp = in.readString();
- type = StrUtil.isEmpty(typeTmp)?"_doc":typeTmp;;
- id = in.readString();
- version = in.readZLong();
- if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
- seqNo = in.readZLong();
- primaryTerm = in.readVLong();
- } else {
- seqNo = UNASSIGNED_SEQ_NO;
- primaryTerm = UNASSIGNED_PRIMARY_TERM;
- }
- forcedRefresh = in.readBoolean();
- result = Result.readFrom(in);
- }
- /**
- * The change that occurred to the document.
- */
- public Result getResult() {
- return result;
- }
- /**
- * The index the document was changed in.
- */
- public String getIndex() {
- return this.shardId.getIndexName();
- }
- /**
- * The exact shard the document was changed in.
- */
- public ShardId getShardId() {
- return this.shardId;
- }
- /**
- * The type of the document changed.
- *
- * @deprecated Types are in the process of being removed.
- */
- @Deprecated
- public String getType() {
- return this.type;
- }
- /**
- * The id of the document changed.
- */
- public String getId() {
- return this.id;
- }
- /**
- * Returns the current version of the doc.
- */
- public long getVersion() {
- return this.version;
- }
- /**
- * Returns the sequence number assigned for this change. Returns {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if the operation
- * wasn't performed (i.e., an update operation that resulted in a NOOP).
- */
- public long getSeqNo() {
- return seqNo;
- }
- /**
- * The primary term for this change.
- *
- * @return the primary term
- */
- public long getPrimaryTerm() {
- return primaryTerm;
- }
- /**
- * Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
- * {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
- * only return true here if they run out of refresh listener slots (see {@link IndexSettings#MAX_REFRESH_LISTENERS_PER_SHARD}).
- */
- public boolean forcedRefresh() {
- return forcedRefresh;
- }
- @Override
- public void setForcedRefresh(boolean forcedRefresh) {
- this.forcedRefresh = forcedRefresh;
- }
- /** returns the rest status for this response (based on {@link ShardInfo#status()} */
- @Override
- public RestStatus status() {
- return getShardInfo().status();
- }
- /**
- * Return the relative URI for the location of the document suitable for use in the {@code Location} header. The use of relative URIs is
- * permitted as of HTTP/1.1 (cf. https://tools.ietf.org/html/rfc7231#section-7.1.2).
- *
- * @param routing custom routing or {@code null} if custom routing is not used
- * @return the relative URI for the location of the document
- */
- public String getLocation(@Nullable String routing) {
- final String encodedIndex;
- final String encodedType;
- final String encodedId;
- final String encodedRouting;
- try {
- // encode the path components separately otherwise the path separators will be encoded
- encodedIndex = URLEncoder.encode(getIndex(), "UTF-8");
- encodedType = URLEncoder.encode(getType(), "UTF-8");
- encodedId = URLEncoder.encode(getId(), "UTF-8");
- encodedRouting = routing == null ? null : URLEncoder.encode(routing, "UTF-8");
- } catch (final UnsupportedEncodingException e) {
- throw new AssertionError(e);
- }
- final String routingStart = "?routing=";
- final int bufferSizeExcludingRouting = 3 + encodedIndex.length() + encodedType.length() + encodedId.length();
- final int bufferSize;
- if (encodedRouting == null) {
- bufferSize = bufferSizeExcludingRouting;
- } else {
- bufferSize = bufferSizeExcludingRouting + routingStart.length() + encodedRouting.length();
- }
- final StringBuilder location = new StringBuilder(bufferSize);
- location.append('/').append(encodedIndex);
- location.append('/').append(encodedType);
- location.append('/').append(encodedId);
- if (encodedRouting != null) {
- location.append(routingStart).append(encodedRouting);
- }
- return location.toString();
- }
- public void writeThin(StreamOutput out) throws IOException {
- super.writeTo(out);
- writeWithoutShardId(out);
- }
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
- shardId.writeTo(out);
- writeWithoutShardId(out);
- }
- private void writeWithoutShardId(StreamOutput out) throws IOException {
- out.writeString(type);
- out.writeString(id);
- out.writeZLong(version);
- if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
- out.writeZLong(seqNo);
- out.writeVLong(primaryTerm);
- }
- out.writeBoolean(forcedRefresh);
- result.writeTo(out);
- }
- @Override
- public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
- builder.startObject();
- innerToXContent(builder, params);
- builder.endObject();
- return builder;
- }
- public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
- ReplicationResponse.ShardInfo shardInfo = getShardInfo();
- builder.field(_INDEX, shardId.getIndexName());
- builder.field(_TYPE, type);
- builder.field(_ID, id).field(_VERSION, version).field(RESULT, getResult().getLowercase());
- if (forcedRefresh) {
- builder.field(FORCED_REFRESH, true);
- }
- builder.field(_SHARDS, shardInfo);
- if (getSeqNo() >= 0) {
- builder.field(_SEQ_NO, getSeqNo());
- builder.field(_PRIMARY_TERM, getPrimaryTerm());
- }
- return builder;
- }
- /**
- * Parse the output of the {@link #innerToXContent(XContentBuilder, Params)} method.
- *
- * This method is intended to be called by subclasses and must be called multiple times to parse all the information concerning
- * {@link DocWriteResponse} objects. It always parses the current token, updates the given parsing context accordingly
- * if needed and then immediately returns.
- */
- protected static void parseInnerToXContent(XContentParser parser, Builder context) throws IOException {
- XContentParser.Token token = parser.currentToken();
- ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, parser);
- String currentFieldName = parser.currentName();
- token = parser.nextToken();
- if (token.isValue()) {
- if (_INDEX.equals(currentFieldName)) {
- // index uuid and shard id are unknown and can't be parsed back for now.
- context.setShardId(new ShardId(new Index(parser.text(), IndexMetadata.INDEX_UUID_NA_VALUE), -1));
- } else if (_TYPE.equals(currentFieldName)) {
- context.setType(parser.text());
- } else if (_ID.equals(currentFieldName)) {
- context.setId(parser.text());
- } else if (_VERSION.equals(currentFieldName)) {
- context.setVersion(parser.longValue());
- } else if (RESULT.equals(currentFieldName)) {
- String result = parser.text();
- for (Result r : Result.values()) {
- if (r.getLowercase().equals(result)) {
- context.setResult(r);
- break;
- }
- }
- } else if (FORCED_REFRESH.equals(currentFieldName)) {
- context.setForcedRefresh(parser.booleanValue());
- } else if (_SEQ_NO.equals(currentFieldName)) {
- context.setSeqNo(parser.longValue());
- } else if (_PRIMARY_TERM.equals(currentFieldName)) {
- context.setPrimaryTerm(parser.longValue());
- }
- } else if (token == XContentParser.Token.START_OBJECT) {
- if (_SHARDS.equals(currentFieldName)) {
- context.setShardInfo(ShardInfo.fromXContent(parser));
- } else {
- parser.skipChildren(); // skip potential inner objects for forward compatibility
- }
- } else if (token == XContentParser.Token.START_ARRAY) {
- parser.skipChildren(); // skip potential inner arrays for forward compatibility
- }
- }
- /**
- * Base class of all {@link DocWriteResponse} builders. These {@link DocWriteResponse.Builder} are used during
- * xcontent parsing to temporarily store the parsed values, then the {@link Builder#build()} method is called to
- * instantiate the appropriate {@link DocWriteResponse} with the parsed values.
- */
- public abstract static class Builder {
- protected ShardId shardId = null;
- protected String type = null;
- protected String id = null;
- protected Long version = null;
- protected Result result = null;
- protected boolean forcedRefresh;
- protected ShardInfo shardInfo = null;
- protected long seqNo = UNASSIGNED_SEQ_NO;
- protected long primaryTerm = UNASSIGNED_PRIMARY_TERM;
- public ShardId getShardId() {
- return shardId;
- }
- public void setShardId(ShardId shardId) {
- this.shardId = shardId;
- }
- public String getType() {
- return type;
- }
- public void setType(String type) {
- type = StrUtil.isEmpty(type)?"_doc":type;;
- }
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
- public void setVersion(Long version) {
- this.version = version;
- }
- public void setResult(Result result) {
- this.result = result;
- }
- public void setForcedRefresh(boolean forcedRefresh) {
- this.forcedRefresh = forcedRefresh;
- }
- public void setShardInfo(ShardInfo shardInfo) {
- this.shardInfo = shardInfo;
- }
- public void setSeqNo(long seqNo) {
- this.seqNo = seqNo;
- }
- public void setPrimaryTerm(long primaryTerm) {
- this.primaryTerm = primaryTerm;
- }
- public abstract DocWriteResponse build();
- }
- }
复制代码 写在最后
假如想要相识改的内容,可以找到官方源码对比下实现,然后本身DEBUG一下,
然后就可以知道Elasticsearch7和Elasticsearch8在连接时候的区别了。
假如要问为什么,答案就是可以本身动态创建索引,还可以指定字段类型,就是
这么香而已。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |