ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

03-springboot整合elasticsearch-源码初识

2020-06-18 17:58:23  阅读:234  来源: 互联网

标签:03 springboot index indexRequest request 源码 query new response


    前面两个小节已经知道了spring boot怎么整合es,以及es的简单使用,但是springboot中是怎么和es服务器交互的。我们可以简单了解一下。要看一下源码
在看源码的同时,先要对springboot请求ES服务器的原理了解一下,ES官网(https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-update.html)给出了很详细的说明,可以自行进行了解。

1.RestClient es交互的基础服务器

对于单机es,一般使用的是ElasticsearchOperations

1.1 数据存储的具体过程

本质上还是使用ElasticsearchRestTemplate进行操作。
数据存储发起操作

1.存储数据
String documentId = operations.index(indexQuery,indexCoordinates);

2.index操作的过程
@Override
	public String index(IndexQuery query, IndexCoordinates index) {

		maybeCallbackBeforeConvertWithQuery(query, index);//实体类再保存操作之前的回调方法

		IndexRequest request = requestFactory.indexRequest(query, index);//==获取indexRequest的过程==
		String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId());//==与ES服务器交互过程==

		// We should call this because we are not going through a mapper.
		Object queryObject = query.getObject();
		if (queryObject != null) {
			setPersistentEntityId(queryObject, documentId);
		}

		maybeCallbackAfterSaveWithQuery(query, index);//实体类再保存之后的回调方法

		return documentId;
	}

查看IndexRequest的创建过程如下

	public IndexRequest indexRequest(IndexQuery query, IndexCoordinates index) {
		String indexName = index.getIndexName();

		IndexRequest indexRequest;

		if (query.getObject() != null) {
			String id = StringUtils.isEmpty(query.getId()) ? getPersistentEntityId(query.getObject()) : query.getId();
			// If we have a query id and a document id, do not ask ES to generate one.
			if (id != null) {
				indexRequest = new IndexRequest(indexName).id(id);
			} else {
				indexRequest = new IndexRequest(indexName);
			}
                        /**
                         * 1.将传来的object转成Map,再转成json串
                         * 2.将Object的json串转成字节BytesReference,请求的ContentType设置为Request.JSON方式
                         */     
			indexRequest.source(elasticsearchConverter.mapObject(query.getObject()).toJson(), Requests.INDEX_CONTENT_TYPE);
		} 
                // 省略一部分代码。。。。。
		return indexRequest;
	}

请求数据处理过程

public final IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
        return performRequestAndParseEntity(indexRequest, RequestConverters::index, options, IndexResponse::fromXContent, emptySet());
    }

1.RequestConverters::index 这个过程创建Request对象

static Request index(IndexRequest indexRequest) {
        String method = Strings.hasLength(indexRequest.id()) ? HttpPut.METHOD_NAME : HttpPost.METHOD_NAME; //根据有无ID选择传输方式是PUT还是POST
        boolean isCreate = (indexRequest.opType() == DocWriteRequest.OpType.CREATE);
        //拼接请求的uri
        String endpoint = endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id(), isCreate ? "_create" : null);
        Request request = new Request(method, endpoint);
        //增加request的请求参数    
        Params parameters = new Params(request);
        parameters.withRouting(indexRequest.routing());
        parameters.withParent(indexRequest.parent());
        parameters.withTimeout(indexRequest.timeout());
        parameters.withVersion(indexRequest.version());
        parameters.withVersionType(indexRequest.versionType());
        parameters.withIfSeqNo(indexRequest.ifSeqNo());
        parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
        parameters.withPipeline(indexRequest.getPipeline());
        parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
        parameters.withWaitForActiveShards(indexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
        //将请求的参数变成byte[]    
        BytesRef source = indexRequest.source().toBytesRef();
        ContentType contentType = createContentType(indexRequest.getContentType());
        request.setEntity(new NByteArrayEntity(source.bytes, source.offset, source.length, contentType));
        return request;
    }

2.创建response

3.创建空的集合 : new EmptySet<>();

4. 给ES服务器发送数据
 
  private <Req, Resp> Resp internalPerformRequest(Req request,
                                            CheckedFunction<Req, Request, IOException> requestConverter,
                                            RequestOptions options,
                                            CheckedFunction<Response, Resp, IOException> responseConverter,
                                            Set<Integer> ignores) throws IOException {
        Request req = requestConverter.apply(request);
        req.setOptions(options);
        Response response;
        try {
            response = client.performRequest(req);
        } catch (ResponseException e) {
            if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
                try {
                    return responseConverter.apply(e.getResponse());
                } catch (Exception innerException) {
                    throw parseResponseException(e);
                }
            }
            throw parseResponseException(e);
        }

        try {
            return responseConverter.apply(response);
        } catch(Exception e) {
            throw new IOException("Unable to parse response body for " + response, e);
        }
    }

RestClien请求发送的过程

1.发送请求过程
 public Response performRequest(Request request) throws IOException {
        SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
        performRequestAsyncNoCatch(request, listener);
        return listener.get();
    }
  //创建请求的url,创建request对象
 void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException {
        Map<String, String> requestParams = new HashMap<>(request.getParameters());
        String ignoreString = requestParams.remove("ignore");
        Set<Integer> ignoreErrorCodes;
        if (ignoreString == null) {
            if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                //404 never causes error if returned for a HEAD request
                ignoreErrorCodes = Collections.singleton(404);
            } else {
                ignoreErrorCodes = Collections.emptySet();
            }
        } else {
            String[] ignoresArray = ignoreString.split(",");
            ignoreErrorCodes = new HashSet<>();
            if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                //404 never causes error if returned for a HEAD request
                ignoreErrorCodes.add(404);
            }
            for (String ignoreCode : ignoresArray) {
                try {
                    ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
                } catch (NumberFormatException e) {
                    throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
                }
            }
        }
        URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams);//创建url和请求参数拼接
        HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());//创建request对象
        setHeaders(httpRequest, request.getOptions().getHeaders());
        FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
        long startTime = System.nanoTime();
        performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
                request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(),
                request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);//发送过程
    }

  //数据发送,完成后将响应信息封装进response对象中
 private void performRequestAsync(final long startTime, final NodeTuple<Iterator<Node>> nodeTuple, final HttpRequestBase request,
                                     final Set<Integer> ignoreErrorCodes,
                                     final WarningsHandler thisWarningsHandler,
                                     final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                     final FailureTrackingResponseListener listener) {
        final Node node = nodeTuple.nodes.next(); //获取注册的节点
        final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request);
        final HttpAsyncResponseConsumer<HttpResponse> asyncResponseConsumer =
            httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
        final HttpClientContext context = HttpClientContext.create();
        context.setAuthCache(nodeTuple.authCache);
        client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse httpResponse) { //执行完成后的回调方法
                try {
                    RequestLogger.logResponse(logger, request, node.getHost(), httpResponse);
                    int statusCode = httpResponse.getStatusLine().getStatusCode();
                    Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse);
                    if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
                        onResponse(node);
                        if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) {
                            listener.onDefinitiveFailure(new WarningFailureException(response));
                        } else {
                            listener.onSuccess(response);
                        }
                    } else {
                        ResponseException responseException = new ResponseException(response);
                        if (isRetryStatus(statusCode)) {
                            //mark host dead and retry against next one
                            onFailure(node);
                            retryIfPossible(responseException);
                        } else {
                            //mark host alive and don't retry, as the error should be a request problem
                            onResponse(node);
                            listener.onDefinitiveFailure(responseException);
                        }
                    }
                } catch(Exception e) {
                    listener.onDefinitiveFailure(e);
                }
            }
        });
    }



标签:03,springboot,index,indexRequest,request,源码,query,new,response
来源: https://www.cnblogs.com/perferect/p/13157229.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有