ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink写入clickhouse之单表写入

2022-02-15 17:04:24  阅读:330  来源: 互联网

标签:ps runEnv flink 写入 param 单表 sql clickhouse sink


flink写入clickhouse之单表写入

简介

flink有一个标准的jdbc sink,提供批量,定时的提交方法。

参考flink文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/

同时,如果设置了checkpoint,在做checkpoint时候会进行一次提交。

基于这点,我们可以将jdbc sink的提交时间和数量设置的很大(即一次checkpoint间隔内达不到的标准),然后通过checkpoint时候进行的提交,来达到精确一次的效果。

关于写clickhouse,我们采用官方的包,是基于https的,适用于批量提交。

clickhouse的表有单表和分布式表之分,我们先进行单表的写入,即对着一个节点写入。

写入clickhouse单表

引入依赖

    <!--   jdbc sink-->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-jdbc_2.11</artifactId>
		<version>${flink.version}</version>
	</dependency>
    <!-- jdbc  clickhouse -->
	<dependency>
		<groupId>ru.yandex.clickhouse</groupId>
		<artifactId>clickhouse-jdbc</artifactId>
		<version>0.3.1</version>
	</dependency>

jdbc sink一般使用方式

JdbcSink.sink(
                "insert into tableName (id,name) values (?,?)",
                new JdbcStatementBuilder<DwdOrderBean>() {
                    @Override
                    public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
                        Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
                        try {
                            SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    /**
     * 用于设置clickhouse PreparedStatement的通用方法
     *
     * @param ps     PreparedStatement实例
     * @param fields 通过”实例对象.getClass().getDeclaredFields()“获得
     * @param bean   实例对象
     * @throws IllegalAccessException field.get抛出的错误
     * @throws SQLException           ps.set抛出的错误
     */
    public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
        for (int i = 1; i <= fields.length; i++) {
            Field field = fields[i - 1];
            field.setAccessible(true);
            Object o = field.get(bean);
            if (o == null) {
                ps.setNull(i, 0);
                continue;
            }
            String fieldValue = o.toString();
            if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
                ps.setObject(i, fieldValue);
            } else {
                ps.setNull(i, 0);
            }
        }
    }

注1:其中第一个参数是sql语句,格式必须严格按照例子中的格式,列举出列名,后面以 ? 填充。因为后面会调用 JdbcStatementBuilder 读取每条数据来对该sql进行补全。
注2:其中JdbcStatementBuilder的实现,DwdOrderBean是对应clickhouse表结构创建的实例对象,后续通过对实例对象的属性循环传递,来设置到PreparedStatement中,如果列数很少,可以手动填写。

包装下常用设置

public class SinkSingleClickHouse<T> {
    private final static String NA = "null";
    private final SinkFunction<T> sink;

    /**
     * 获取clickhouse sinkFunction
     *
     * @param sql                  插入语句,格式必须为  inert into table  a,b values (?,?)
     * @param jdbcStatementBuilder 如何用单条信息填充sql
     * @param runEnv               执行环境
     * @param database             表所在的数据库
     */
    public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
                                RunEnv runEnv, String database) {
        sink = JdbcSink.sink(
                sql,
                jdbcStatementBuilder,
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    }

    /**
     * 获取clickhouse sinkFunction
     *
     * @param sql                  插入语句,格式必须为  inert into table  a,b values (?,?)
     * @param jdbcStatementBuilder 如何用单条信息填充sql
     * @param runEnv               执行环境
     * @param database             表所在的数据库
     * @param batchIntervalMs      提交条件之:间隔
     * @param batchSize            提交条件之:数据量
     * @param maxRetries           提交重试次数
     */
    public SinkSingleClickHouse(String sql, JdbcStatementBuilder<T> jdbcStatementBuilder,
                                RunEnv runEnv, String database,
                                int batchIntervalMs, int batchSize, int maxRetries) {
        sink = JdbcSink.sink(
                sql,
                jdbcStatementBuilder,
                JdbcExecutionOptions.builder()
                        .withBatchIntervalMs(batchIntervalMs)
                        .withBatchSize(batchSize)
                        .withMaxRetries(maxRetries)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:clickhouse://" + runEnv.getClickHouseHost() + ":" + runEnv.getClickHousePort() + "/" + database)
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUsername(runEnv.getClickHouseUser())
                        .withPassword(runEnv.getClickHousePassword())
                        .build()
        );
    }

    public SinkFunction<T> getSink() {
        return sink;
    }

    /**
     * 用于设置clickhouse PreparedStatement的通用方法
     *
     * @param ps     PreparedStatement实例
     * @param fields 通过”实例对象.getClass().getDeclaredFields()“获得
     * @param bean   实例对象
     * @throws IllegalAccessException field.get抛出的错误
     * @throws SQLException           ps.set抛出的错误
     */
    public static void setPs(PreparedStatement ps, Field[] fields, Object bean) throws IllegalAccessException, SQLException {
        for (int i = 1; i <= fields.length; i++) {
            Field field = fields[i - 1];
            field.setAccessible(true);
            Object o = field.get(bean);
            if (o == null) {
                ps.setNull(i, 0);
                continue;
            }
            String fieldValue = o.toString();
            if (!NA.equals(fieldValue) && !"".equals(fieldValue)) {
                ps.setObject(i, fieldValue);
            } else {
                ps.setNull(i, 0);
            }
        }
    }
}

注1:其中JdbcExecutionOptions,即使我们设置批量和定时的地方,如果不传,会有默认值。

最终使用

SinkFunction<DwdOrderBean> sinkClickhouse = new SinkSingleClickHouse<>("insert into tableName (id,name) values (?,?)",
                new JdbcStatementBuilder<DwdOrderBean>() {
                    @Override
                    public void accept(PreparedStatement ps, DwdOrderBean dwdOrderBean) throws SQLException {
                        Field[] fields = dwdOrderBean.getClass().getDeclaredFields();
                        try {
                            SinkSingleClickHouse.setPs(ps, fields, dwdOrderBean);
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        }
                    }
                },
                uat,
                "dwd_cdp")
                .getSink();

最终如上,我们即可得到一个对着单表写入的sink

标签:ps,runEnv,flink,写入,param,单表,sql,clickhouse,sink
来源: https://www.cnblogs.com/sqhhh/p/15897275.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有