ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink写入es

2022-01-14 18:02:44  阅读:236  来源: 互联网

标签:flink 写入 httpHosts new public es esSinkBuilder


flink写入es

介绍

主要介绍实际中flink如何写入设置es

flink版本:1.13.2

github地址:https://github.com/dahai1996/mdw-flink-quickstart


写入es

引入依赖

    <!--es-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
			<version>${flink.version}</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.logging.log4j</groupId>
					<artifactId>log4j-to-slf4j</artifactId>
				</exclusion>
				<!--                <exclusion>-->
				<!--                    <groupId>com.fasterxml.jackson.core</groupId>-->
				<!--                    <artifactId>jackson-core</artifactId>-->
				<!--                </exclusion>-->
			</exclusions>
		</dependency>

注:排除日志的包防止冲突打不出日志

正常使用

List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost(ip, host, "http"));
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ESSink()
        );
        /*     必须设置flush参数     */
        //刷新前缓冲的最大动作量
        esSinkBuilder.setBulkFlushMaxActions(10);
        //刷新前缓冲区的最大数据大小(以MB为单位)
        esSinkBuilder.setBulkFlushMaxSizeMb(5);
        //论缓冲操作的数量或大小如何都要刷新的时间间隔
        esSinkBuilder.setBulkFlushInterval(5000L);

    //数据流添加sink
    dataStream.addSink(esSinkBuilder.build());

注:其中ESSink()方法是如何写入es的具体实现,大概如下:

public static class ESSink implements ElasticsearchSinkFunction<String> 

写一个类包装下,方便后面快速创建

public class SinkEs<T> {
    public List<HttpHost> httpHosts = new ArrayList<>(1);
    public ElasticsearchSink.Builder<T> esSinkBuilder;

    /**
     * 获取es sinkFunction
     * @param runEnv 包含执行环境地址的枚举类
     * @param elasticsearchSinkFunction es转化单条数据的逻辑方法
     */
    public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
        httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
        esSinkBuilder = new ElasticsearchSink.Builder<T>(
                httpHosts,
                elasticsearchSinkFunction
        );
        esSinkBuilder.setBulkFlushMaxActions(1);
        esSinkBuilder.setBulkFlushMaxSizeMb(1);
        esSinkBuilder.setBulkFlushInterval(5000L);
        esSinkBuilder.setRestClientFactory(new RestClientFactory() {
            @Override
            public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
                restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                        httpAsyncClientBuilder.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
                            @Override
                            public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {
                                return Duration.ofMinutes(5).toMillis();
                            }
                        });
                        return httpAsyncClientBuilder;
                    }
                });
            }
        });
    }

    /**
     * 获取es sinkFunction
     * @param runEnv 包含执行环境地址的枚举类
     * @param elasticsearchSinkFunction elasticsearchSinkFunction es转化单条数据的逻辑方法
     * @param bulkFlushMaxActions 刷新前缓冲的最大动作量
     * @param bulkFlushMaxSizeMb 刷新前缓冲区的最大数据大小(以MB为单位)
     * @param bulkFlushInterval 论缓冲操作的数量或大小如何都要刷新的时间间隔
     */
    public SinkEs(RunEnv runEnv, ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
                  int bulkFlushMaxActions, int bulkFlushMaxSizeMb, Long bulkFlushInterval) {
        httpHosts.add(new HttpHost(runEnv.getEsHost(), runEnv.getEsPort(), "http"));
        esSinkBuilder = new ElasticsearchSink.Builder<T>(
                httpHosts,
                elasticsearchSinkFunction
        );
        esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);
        esSinkBuilder.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);
        esSinkBuilder.setBulkFlushInterval(bulkFlushInterval);
    }


    public ElasticsearchSink<T> getSink() {
        return esSinkBuilder.build();
    }
}

之后可以快速创建es sink了:

SinkFunction<String> sinkEs = new SinkEs<>(
                uat,
                new ElasticsearchSinkFunction<String>() {
                    @Override
                    public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {

                    }
                },
                1,
                5,
                5000L)
                .getSink();

注:这里的uat变量是包含了各个环境地址的枚举类,详情查看github代码

注2:其中设置 setRestClientFactory() 方法的相关代码功能如下:

es客户端会创建一个存活时间无限的长连接,后续以使用这个长连接发送请求到服务器
如果长连接死亡,后续还是会使用这个长连接,就会报错。
因此上面设置了长连接存活时间
具体哪个博客看的遗忘了

标签:flink,写入,httpHosts,new,public,es,esSinkBuilder
来源: https://www.cnblogs.com/sqhhh/p/15802884.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有