ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Hudi-集成Flink(Flink操作hudi表)

2022-03-08 22:32:03  阅读:357  来源: 互联网

标签:00 Flink hudi TIMESTAMP flink 01 mor Hudi


一、安装部署Flink 1.12

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。  

1.准备tar包

flink-1.13.1-bin-scala_2.12.tgz

2.解压

 tar -zxvf flink-1.13.1-bin-scala_2.12.tgz

3.添加Hadoop依赖jar包,放在flink的lib目录下

flink-shaded-hadoop-2-uber-2.7.5-10.0.jar https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.7.5-10.0  

4.启动HDFS集群

hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode

5.启动flink本地集群

/flink/bin/start-cluster.sh 可看到两个进程:TaskManagerRunner、StandaloneSessionClusterEntrypoint   停止命令 /flink/bin/stop-cluster.sh  

6.Flink Web UI

http://localhost:8081/#/overview  

7.执行官方示例

读取文本文件数据,进行词频统计WordCount,将结果打印控制台 /flink/bin/flink run /fline/examples/batch/WordCount.jar  

二、Flink集成Hudi时,本质将集成jar包:hudi-flink-bundle_2.12-0.10.1.jar,放入Flink应用CLASSPATH下即可。

Flink SQL Connector支持Hudi作为Source和Sink时,两种方式将jar包放入CLASSPATH路径: 方式一:运行Flink SQL Client命令时,通过参数【-j xx.jar】指定jar包 flink/bin/sql-client.sh embedded -j …./hudi-flink-bundle_2.12-0.10.1.jar 方式二:将jar包直接放入Flink软件安装包lib目录下【$FLINK_HOME/lib】   修改conf/flink-conf.yaml taskmanager.numberOfTaskSlots: 4 works四个localhost   由于Flink需要连接HDFS文件系统,所以需要设置HADOOP_CLASSPATH环境变量,再启动集群  

三、启动Flink SQL Cli命令行

sql-client.sh embedded shell   设置分析结果展示模式为:set execution.result-mode=tableau;  

四、使用

1.创建表:test_flink_hudi_mor,数据存储到hudi表中,底层HDFS存储,表类型MOR

CREATE TABLE test_flink_hudi_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',
    'write.tasks' = '1',
    'compaction.tasks' = '1',
    'table.type' = 'MERGE_ON_READ'
);
  connector:表连接器 path:数据存储路径 write.tasks:flink往hudi写数据时,task数量 compaction.tasks:往hudi写数据时,做合并的task数量 table.type:hudi表类型  
Flink SQL> desc test_flink_hudi_mor;
>
+-----------+--------------+------+-----+--------+-----------+
|      name |         type | null | key | extras | watermark |
+-----------+--------------+------+-----+--------+-----------+
|      uuid |  VARCHAR(20) | true |     |        |           |
|      name |  VARCHAR(10) | true |     |        |           |
|       age |          INT | true |     |        |           |
|        ts | TIMESTAMP(3) | true |     |        |           |
| partition |  VARCHAR(20) | true |     |        |           |
+-----------+--------------+------+-----+--------+-----------+
5 rows in set

 

 

2.插入数据

INSERT INTO test_flink_hudi_mor VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1' );
 
INSERT INTO test_flink_hudi_mor VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
重复insert,会更新,id1的值由 VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1’ ) 改为  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1’) 因为是MOR表,先入log,还未合并成parquet文件,如下图:  

四、Streaming query

1.创建表:test_flink_hudi_mor_2, 以流的方式查询读取,映射到前面表test_flink_hudi_mor

CREATE TABLE test_flink_hudi_mor_2(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',   
    'table.type' = 'MERGE_ON_READ',
    'read.tasks' = '1',
    'read.streaming.enabled' = 'true',
    'read.streaming.start-commit' = '20220307211200',
    'read.streaming.check-interval' = '4'
); 
  read.streaming.enabled设置为true,表名通过streaming的方式读取表数据 read.streaming.check-interval指定了source监控新的commits的间隔为4s table.type设置表类型为MERGE_ON_READ    

2.重新开启terminal启动flink SQL CLI,重新创建表:test_flink_hudi_mor,采用批batch模式插入一条数据

CREATE TABLE test_flink_hudi_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',
    'write.tasks' = '1',
    'compaction.tasks' = '1',
    'table.type' = 'MERGE_ON_READ'
);
 
INSERT INTO test_flink_hudi_mor VALUES ('id9','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par5' );
INSERT INTO test_flink_hudi_mor VALUES ('id10','DX',28, TIMESTAMP '1994-06-02 00:00:01', 'par5' );
       

标签:00,Flink,hudi,TIMESTAMP,flink,01,mor,Hudi
来源: https://www.cnblogs.com/EnzoDin/p/15982833.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有