ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Es7.x使用RestHighLevelClient进行增删改和批量操作

2022-01-24 09:04:28  阅读:1068  来源: 互联网

标签:Es7 builder void indexRequest static 增删 new RestHighLevelClient id


  1. 引入依赖
  2. 初始化RestHighLevelClient和BulkProcessor对象
  3. 增删改操作
    3.1 数据准备
    3.2 单条数据异步插入
    3.3 单条数据同步插入
    3.4 批量插入
    3.5 更新操作
    3.6 带条件的更新语句
    3.7 批量更新
    3.8 删除操作
    3.9 条件删除

Java层面操作elasticSearch7.x,为了便于操作,不集成Spring,使用main方法进行调用。

1. 引入依赖

        <!--解决:java.lang.NoClassDefFoundError: org/elasticsearch/common/xcontent/DeprecationHandler-->
        <!-- elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.5.1</version>
        </dependency>

        <!-- elasticsearch-rest-client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.5.1</version>
        </dependency>

        <!-- elasticsearch-rest-high-level-client -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.5.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2. 初始化RestHighLevelClient和BulkProcessor对象

RestHighLevelClientRestHighLevelClient是官方指定的连接API。另外一个是TransportClient,但是TransportClient这个是已经废弃不用的,所以会在ES8.0之后完全移除,也就是说8.0之后就无法使用了。

@Slf4j
public class EsTest {

    //es操作客户端
    private static RestHighLevelClient restHighLevelClient;
    //批量操作的对象
    private static BulkProcessor bulkProcessor;

    static {
        List<HttpHost> httpHosts = new ArrayList<>();
        //填充数据
        httpHosts.add(new HttpHost("172.26.17.11", 9200));
        httpHosts.add(new HttpHost("172.26.17.11", 9201));
        httpHosts.add(new HttpHost("172.26.17.11", 9202));
        //填充host节点
        RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));

        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(1000);
            requestConfigBuilder.setSocketTimeout(1000);
            requestConfigBuilder.setConnectionRequestTimeout(1000);
            return requestConfigBuilder;
        });

        //填充用户名密码
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("userName", "password"));

        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(30);
            httpClientBuilder.setMaxConnPerRoute(30);
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            return httpClientBuilder;
        });

        restHighLevelClient = new RestHighLevelClient(builder);
    }

    static {
        bulkProcessor=createBulkProcessor();
    }

    private static BulkProcessor createBulkProcessor() {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,
                                  BulkResponse response) {
                if (!response.hasFailures()) {
                    log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
                } else {
                    BulkItemResponse[] items = response.getItems();
                    for (BulkItemResponse item : items) {
                        if (item.isFailed()) {
                            log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
                            break;
                        }
                    }
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,
                                  Throwable failure) {

                List<DocWriteRequest<?>> requests = request.requests();
                List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
                log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
            }
        };

        BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
            restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
        }), listener);
        //到达10000条时刷新
        builder.setBulkActions(10000);
        //内存到达8M时刷新
        builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
        //设置的刷新间隔10s
        builder.setFlushInterval(TimeValue.timeValueSeconds(10));
        //设置允许执行的并发请求数。
        builder.setConcurrentRequests(8);
        //设置重试策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
        return builder.build();
    }
}

整个项目可以共用一个BulkProcessor,可以配置多种刷新策略,将数据由内存刷新到es中。

3. 增删改操作

3.1 数据准备

PUT test_demo

PUT test_demo/_mapping
{
  "properties":{
    "title":{
      "type":"text"
    },
    "tag":{
      "type":"keyword"
    },
    "publishTime":{
      "type":"date",
      "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    }
  }
}

GET test_demo/_search
{
  "query": {
    "match_all": {}
  }
}

3.2 单条数据异步插入

    public static void testAsyncSingle() {
        IndexRequest indexRequest = new IndexRequest("test_demo");
        DemoDto demoDto = new DemoDto(2001L, "印度新冠疫情失控", "世界", new Date());
        indexRequest.source(JSON.toJSONString(demoDto), XContentType.JSON);
        indexRequest.timeout(TimeValue.timeValueSeconds(1));
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //数据为存储而不是更新
        indexRequest.create(false);
        indexRequest.id(demoDto.getId() + "");
        restHighLevelClient.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        log.error("将id为:{}的数据存入ES时存在失败的分片,原因为:{}", indexRequest.id(), failure.getCause());
                    }
                }
            }

            @Override
            public void onFailure(Exception e) {
                log.error("{}:存储es时异常,数据信息为", indexRequest.id(), e);
            }
        });
    }

3.3 单条数据同步插入

    public static void testSingleAdd() throws IOException {
        IndexRequest indexRequest = new IndexRequest("test_demo");
        DemoDto demoDto = new DemoDto(3001L, "es单数据同步插入2", "IT", new Date());
        indexRequest.source(JSON.toJSONString(demoDto), XContentType.JSON);
        indexRequest.id("3001");
        indexRequest.timeout(TimeValue.timeValueSeconds(1));
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        indexRequest.create(true);
        indexRequest.id(demoDto.getId() + "");
        restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    }
  1. indexRequest.id(demoDto.getId() + ""); —— 填充"_id"字段。

  2. indexRequest.create(true);——设置操作类型
    public IndexRequest create(boolean create) {
        if (create) {
            return opType(OpType.CREATE);
        } else {
            return opType(OpType.INDEX);
        }
    }
  • OpType.CREATE:当存在相同的_id时,插入会出现异常;
  • OpType.INDEX:当存在相同_id时,插入会进行覆盖;

当设置OpType.CREATE时相同id插入异常看出,es进行了乐观锁控制并发写冲突。

Elasticsearch exception [type=version_conflict_engine_exception, reason=[3001]: version conflict, document already exists (current version [3])]

3.4 批量插入

由于设置了BulkProcessor对象,可以将数据设置到BulkProcessor对象中,根据策略批量的刷新到Es中。

    /**
     * 批量插入
     */
    public static void testBatch() {
        List<IndexRequest> indexRequests = new ArrayList<>();

        ArrayList<DemoDto> demoDtos = new ArrayList<>();


        demoDtos.add(new DemoDto(1001L, "中国是中国人的中国", "中国", new Date()));
        demoDtos.add(new DemoDto(1002L, "2008年奥运会", "体育", new Date()));


        demoDtos.forEach(e -> {
            IndexRequest request = new IndexRequest("test_demo");
            //填充id
            request.id(e.getId() + "");
            //先不修改id
            request.source(JSON.toJSONString(e), XContentType.JSON);
            request.opType(DocWriteRequest.OpType.CREATE);
            indexRequests.add(request);
        });
        indexRequests.forEach(bulkProcessor::add);
    }

3.5 更新操作

更新操作传入的doc为map对象,而不是json字符串,否则会抛出异常。

    public static void testSingleUpdate() throws IOException {

        UpdateRequest updateRequest = new UpdateRequest("test_demo", "3001");

        Map<String, Object> kvs = new HashMap<>();
        kvs.put("title", "es单数据更新啦!");
        updateRequest.doc(kvs);
        updateRequest.timeout(TimeValue.timeValueSeconds(1));
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        //数据为存储而不是更新
        restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
    }

3.6 带条件的更新语句

    public static void testSingleUpdateQuery() throws IOException {


        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
        updateByQueryRequest.indices("test_demo");

        updateByQueryRequest.setQuery(new TermQueryBuilder("id", 3001));

        updateByQueryRequest.setScript(new Script(ScriptType.INLINE,
                "painless",
                "ctx._source.tag='电脑'", Collections.emptyMap()));
        //数据为存储而不是更新
        restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
    }

3.7 批量更新

    /**
     * 批量更新
     */
    private static void testBatchUpdate() {

        List<UpdateRequest> updateRequests=new ArrayList<>();

        //更新的数据
        List<DemoDto> params=new ArrayList<>();
        params.add(new DemoDto(2001L));
        params.add(new DemoDto(3001L));

        params.forEach(e->{
            //获取id
            UpdateRequest updateRequest = new UpdateRequest();
            updateRequest.index("test_demo");
            //更新的id
            updateRequest.id(e.getId()+"");
            //更新的数据
            Map<String,Object> map=new HashMap<>();
            map.put("title","美国社会动荡");

            updateRequest.doc(map);
            updateRequests.add(updateRequest);
        });
        updateRequests.forEach(bulkProcessor::add);
    }

3.8 删除操作

    /**
     * 单个删除
     */
    private static void testSingleDel() throws IOException {
        DeleteRequest deleteRequest=new DeleteRequest();
        deleteRequest.index("test_demo");
        deleteRequest.id("3001");
        restHighLevelClient.delete(deleteRequest,RequestOptions.DEFAULT);
    }

3.9 条件删除

    /**
     * 单个条件删除
     */
    private static void testSingleDelQuery() throws IOException {
        DeleteByQueryRequest deleteByQueryRequest=new DeleteByQueryRequest();
        deleteByQueryRequest.indices("test_demo");
        deleteByQueryRequest.setQuery(new MatchQueryBuilder("title","国年"));
        //分词式删除
        restHighLevelClient.deleteByQuery(deleteByQueryRequest,RequestOptions.DEFAULT);
    }

标签:Es7,builder,void,indexRequest,static,增删,new,RestHighLevelClient,id
来源: https://blog.csdn.net/z69183787/article/details/122660788

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有