ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

flink双流JOIN原理

2022-02-02 22:33:23  阅读:183  来源: 互联网

标签:join 双流 INNER flink 事件 维表 JOIN LEFT



JOIN简介 谈flink双流JOIN之前,我们先谈一下大家最熟悉的mysql表join,我们知道表join有如下几种,具体区别就不在介绍了。那么流的join和表的join有什么区别呢?本文我们介绍一下。 CROSS JOIN - 交叉连接,计算笛卡儿积; INNER JOIN - 内连接,返回满足条件的记录; OUTER JOIN LEFT - 返回左表所有行,右表不存在补NULL; RIGHT - 返回右表所有行,左边不存在补NULL; FULL -  返回左表和右表的并集,不存在一边补NULL; SELF JOIN - 自连接,将表查询时候命名不同的别名
CROSS JOIN 交叉连接,计算笛卡儿积;
INNER JOIN 内连接,返回满足条件的记录
LEFT JOIN 返回左表所有行,右表不存在补NULL;
RIGHT JOIN 返回右表所有行,左边不存在补NULL;
FULL JOIN 返回左表和右表的并集,不存在一边补NULL;
SELF JOIN  自连接,将表查询时候命名不同的别名
支持的join类型     Apache Flink目前支持INNER JOIN和LEFT OUTER JOIN(SELF 可以转换为普通的INNER和OUTER)。在语义上面Apache Flink严格遵守标准SQL的语义
Apache Flink CROSS INNER OUTER SELF ON WHERE
N Y Y Y 可选 可选
双流JOIN操作注意事项     想要实现流的join我们要考虑数据的延迟,也就是不同流数据到达算子时间不一致的问题。这时候需要用到flink的水印,窗口,EventTime等概念,另外需要注意 Flink 对多表关联是直接顺序链接的,因此需要注意先进行结果集小的关联。同时 flink提供了两种流join的算子,Join和coGroup。具体区别参考上篇博客:flink实战--双流join之Join和coGroup的区别和应用_阿华田的博客-CSDN博客_flink join和cogroup,这篇博客中详细介绍了Join和coGroup的区别,以实现双流Join的案例。 双流JOIN与传统数据库表JOIN的区别     传统数据库表的JOIN是两张静态表的数据联接,在流上面是动态表,双流JOIN的数据不断流入与传统数据库表的JOIN有如下3个核心区别:     1、左右两边的数据集合无穷 - 传统数据库左右两个表的数据集合是有限的,双流JOIN的数据会源源不断的流入。     2、JOIN的结果不断产生/更新 - 传统数据库表JOIN是一次执行产生最终结果后退出,双流JOIN会持续不断的产生新的结果。     3、查询计算的双边驱动 - 双流JOIN由于左右两边的流的速度不一样,会导致左边数据到来的时候右边数据还没有到来,或者右边数据到来的时候左边数据没有到来,所以在实现中要将左右两边的流数据进行保存,以保证JOIN的语义。 数据Shuffle     分布式流计算所有数据会进行Shuffle,怎么才能保障左右两边流的要JOIN的数据会在相同的节点进行处理呢?在双流JOIN的场景,我们会利用JOIN中ON的联接key进行partition,确保两个流相同的联接key会在同一个节点处理,这个在flink的源码中有说明。

数据的保存     不论是INNER JOIN还是OUTER JOIN 都需要对左右两边的流的数据进行保存,JOIN算子会开辟左右两个State进行数据存储,左右两边的数据到来时候,进行如下操作:     1、LeftEvent到来存储到LState,RightEvent到来的时候存储到RState;         2、LeftEvent会去RightState进行JOIN,并发出所有JOIN之后的Event到下游;     3、RightEvent会去LeftState进行JOIN,并发出所有JOIN之后的Event到下游。

简单场景介绍实现原理 INNER JOIN 实现     JOIN有很多复杂的场景,我们先以最简单的场景进行实现原理的介绍,比如:最直接的两个进行INNER JOIN,比如查询产品库存和订单数量,库存变化事件流和订单事件流进行INNER JOIN,JION条件是产品ID,具体如下:

    双流JOIN两边事件都会存储到State里面,如上,事件流按照标号先后流入到join节点,我们假设右边流比较快,先流入了3个事件,3个事件会存储到state中,但因为左边还没有数据,所有右边前3个事件流入时候,没有join结果流出,当左边第一个事件序号为4的流入时候,先存储左边state,再与右边已经流入的3个事件进行join,join的结果如图 三行结果会流入到下游节点sink。当第5号事件流入时候,也会和左边第4号事件进行join,流出一条jion结果到下游节点。这里关于INNER JOIN的语义和大家强调两点:     1、INNER JOIN只有符合JOIN条件时候才会有JOIN结果流出到下游,比如右边最先来的1,2,3个事件,流入时候没有任何输出,因为左边还没有可以JOIN的事件;     2、INNER JOIN两边的数据不论如何乱序,都能够保证和传统数据库语义一致,因为我们保存了左右两个流的所有事件到state中。 LEFT OUTER JOIN 实现     LEFT OUTER JOIN 可以简写 LEFT JOIN,语义上和INNER JOIN的区别是不论右流是否有JOIN的事件,左流的事件都需要流入下游节点,但右流没有可以JION的事件时候,右边的事件补NULL。同样我们以最简单的场景说明LEFT JOIN的实现,比如查询产品库存和订单数量,库存变化事件流和订单事件流进行LEFT JOIN,JION条件是产品ID,具体如下:

下图也是表达LEFT JOIN的语义,只是展现方式不同:

上图主要关注点是当左边先流入1,2事件时候,右边没有可以join的事件时候会向下游发送左边事件并补NULL向下游发出,当右边第一个相同的Join key到来的时候会将左边先来的事件发出的带有NULL的事件撤回(对应上面command的-记录,+代表正向记录,-代表撤回记录)。这里强调三点:     1、左流的事件当右边没有JOIN的事件时候,将右边事件列补NULL后流向下游;* 当右边事件流入发现左边已经有可以JOIN的key的时候,并且是第一个可以JOIN上的右边事件(比如上面的3事件是第一个可以和左边JOIN key P001进行JOIN的事件)需要撤回左边下发的NULL记录,并下发JOIN完整(带有右边事件列)的事件到下游。后续来的4,5,6,8等待后续P001的事件是不会产生撤回记录的。     2、在Apache Flink系统内部事件类型分为正向事件标记为“+”和撤回事件标记为“-”。 RIGHT OUTER JOIN 和 FULL OUTER  JOINRIGHT JOIN内部实现与LEFT JOIN类似, FULL JOIN和LEFT JOIN的区别是左右两边都会产生补NULL和撤回的操作。对于State的使用都是相似的,这里不再重复说明了。 Flink维表JOIN     维表 JOIN 语法 由于维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?如果用传统的 JOIN 语法 SELECT * FROM T JOIN dim_table on T.id = dim_table.id    来表达维表 JOIN,是不完整的(对于双流join 来说,只要其中一起关联的流表发生变化,就会进行最新的关联)。因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢?我们是不知道的,结果是不确定的。所以 Flink SQL 的维表 JOIN 语法引入了 SQL:2011 Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照。维表 JOIN 语法/示例如下。     假设我们有一个 Orders 订单数据流,希望根据产品 ID 补全流上的产品维度信息,所以需要跟 Products 维度表进行关联。Orders 和 Products 的 DDL 声明语句如下: CREATE TABLE Orders (   orderId VARCHAR,          -- 订单 id   productId VARCHAR,        -- 产品 id   units INT,                -- 购买数量   orderTime TIMESTAMP       -- 下单时间 ) with (   type = ''tt'',              -- tt 日志流   ... ) CREATE TABLE Products (   productId VARCHAR,        -- 产品 id   name VARCHAR,             -- 产品名称   unitPrice DOUBLE          -- 单价   PERIOD FOR SYSTEM_TIME,   -- 这是一张随系统时间而变化的表,用来声明维表   PRIMARY KEY (productId)   -- 维表必须声明主键 ) with (   type = ''alihbase'',        -- HBase 数据源   ... ) 如上声明了一张来自 TT 的 Orders 订单数据流,和一张存储于 HBase 中的 Products 产品维表。为了补齐订单流的产品信息,需要 JOIN 维表,这里可以分为 JOIN 当前表和 JOIN 历史表。 JOIN 当前维表 SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.productId = p.productId Flink SQL 支持 LEFT JOIN 和 INNER JOIN 的维表关联。如上语法所示的,维表 JOIN 语法与传统的 JOIN 语法并无二异。只是 Products 维表后面需要跟上  FOR SYSTEM_TIME AS OF PROCTIME() 的关键字,其含义是每条到达的数据所关联上的是到达时刻的维表快照,也就是说,当数据到达时,我们会根据数据上的 key 去查询远程数据库,拿到匹配的结果后关联输出。这里的 PROCTIME 即 processing time。使用 JOIN 当前维表功能需要注意的是,如果维表插入了一条数据能匹配上之前左表的数据时,JOIN的结果流,不会发出更新的数据以弥补之前的未匹配。JOIN行为只发生在处理时间(processing time),即使维表中的数据都被删了,之前JOIN流已经发出的关联上的数据也不会被撤回或改变。

JOIN 历史维表 SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF o.orderTime AS p ON o.productId = p.productId 有时候想关联上的维度数据,并不是当前时刻的值,而是某个历史时刻的值。比如,产品的价格一直在发生变化,订单流希望补全的是下单时的价格,而不是当前的价格,那就是 JOIN 历史维表。语法上只需要将上文的  PROCTIME()改成o.orderTime 即可。含义是关联上的是下单时刻的 Products 维表。

Flink 在获取维度数据时,会根据左流的时间去查对应时刻的快照数据。因此 JOIN 历史维表需要外部存储支持多版本存储,如 HBase,或者存储的数据中带有多版本信息。注:JOIN 历史维表功能目前暂未开放维表优化在实际使用的过程中,会遇到许多性能问题。为了解决这些性能问题,我们做了大量的优化,性能得到大幅提升,在双11的洪峰下表现特别淡定。我们的优化主要是为了解决两方面的问题: 1. 提高吞吐。维表的IO请求严重阻塞了数据流的计算处理。 2. 降低维表数据库读压力。如 HBase 也只能承受单机每秒 20 万的读请求。

 

标签:join,双流,INNER,flink,事件,维表,JOIN,LEFT
来源: https://blog.csdn.net/qq_24505127/article/details/122772573

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有