ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Iceberg(三)对接Flink

2022-01-20 23:32:03  阅读:356  来源: 互联网

标签:Flink Iceberg flink 对接 hadoop TableLoader env iceberg tableLoader


1、Flink基本操作

1.1、配置参数和jar包

        Flink1.11开始就不在提供flink-shaded-hadoop-2-uber的支持,所以如果需要flink支持hadoop得配置环境变量HADOOP_CLASSPATH

[root@hadoop1 flink-1.11.0]# vim bin/config.sh 
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$PATH:$HADOOP_CLASSPATH

        目前Iceberg只支持flink1.11.x的版本,所以我这使用flink1.11.0,将构建好的Iceberg的jar包复制到flink下

[root@hadoop1 libs]# cd  /opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/
[root@hadoop1 libs]# cp *.jar /opt/module/flink-1.11.0/lib/

1.2、Flink SQL Client

        1、启动flink集群,并启动flink sql client

bin/sql-client.sh embedded shell

         2、使用 Catalogs 创建目录

CREATE CATALOG hadoop_catalog WITH ( 
    'type'='iceberg',
    'catalog-type'='hadoop', 
    'warehouse'='hdfs://mycluster/flink/warehouse/', 
    'property-version'='1'
);

         或者修改 sql-client-defaults.yaml,添加以下内容

[root@hadoop103 conf]# vim sql-client-defaults.yaml catalogs:
- name: hadoop_catalog
  type: iceberg 
  catalog-type: hadoop
  warehouse: hdfs://mycluster/flink/warehouse/

        3、使用当前 catalog

use catalog hadoop_catalog;

        4、建库建表

        建库可以直接使用create database;建表需要指定分区,使用flink对接iceberg不能使用iceberg的隐藏分区。

        5、写入与修改数据

        flink默认使用流的方式插入数据,这个时候流的插入是不支持overwrite操作的。需要将插入模式进行修改SET execution.type = batch;,改成批的插入方式,再次使用overwrite插入数据。如需要改回流式操作参数设置为 SET execution.type = streaming;会根据分区进行覆盖操作。

2、Flink API操作

        1、需要引入相关依赖包

<dependency>
	<groupId>org.apache.iceberg</groupId>
	<artifactId>iceberg-flink-runtime</artifactId>
	<version>0.11.1</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>${hadoop.version}</version>
</dependency>

2.1、读操作

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testA");
    batchRead(env, tableLoader);
    streamingRead(env, tableLoader);
    env.execute();
}

// 通过batch的方式去读取数据
public static void batchRead(StreamExecutionEnvironment env, TableLoader tableLoader) {
    DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
    batch.map(item -> item.getInt(0) + "\t" + item.getString(1) + "\t" + item.getInt(2) + "\t" + item.getString(3)).print();
}

// 通过streaming的方式去读取数据,启动之后程序不会立马停止
public static void streamingRead(StreamExecutionEnvironment env, TableLoader tableLoader)
{
    DataStream<RowData>	stream	= FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();
    stream.print();
}

2.2、 写操作

// 采用的是batch批处理
public static void appendingData(StreamExecutionEnvironment env, TableLoader tableLoader) {
    DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
    TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
    FlinkSink.forRowData(batch).tableLoader(tableB).build();
}

// 根据分区将数据进行覆盖操作
public static void overwriteData(StreamExecutionEnvironment env, TableLoader tableLoader) {
    DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
    TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB");
    FlinkSink.forRowData(batch).tableLoader(tableB).overwrite(true).build();
}

3、读写Flink存在的问题

  1. Flink 不支持 Iceberg 隐藏分区
  2. 不支持通过计算列根据case class创建表
  3. 不支持创建带水位线的表
  4. 不支持添加列、删除列、重命名列
  5. Flink写iceberg需要使用checkpoint

   

标签:Flink,Iceberg,flink,对接,hadoop,TableLoader,env,iceberg,tableLoader
来源: https://blog.csdn.net/Yuan_CSDF/article/details/122611248

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有