ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink原理与调优

2022-03-18 17:58:44  阅读:141  来源: 互联网

标签:Task 聚合 Flink 并行度 调优 内存 原理 数据


Flink提交流程(Yarn-Per-Job)

在这里插入图片描述

1. client运行脚本提交命令。
2. CliFrontend实例化CliFrontendParser进行参数解析。
3. CliFrontend实例化YarnJobClusterExecutor并创建客户端。
4. 在客户端中实例化YarnClusterDescriptor封装YarnClient信息,包含提交参数和命令。
5. 将信息提交给RM。
6. RM向NM的yarnRMClient发送消息,启动APPmaster。
7. NM分配资源生成APPmaster,并启动Dispatcher分发器。
8. Dispatcher启动JobMaster。
9. JobMaster启动executionGraph执行图。
10. JobMaster中的SlotPool线程池向SoltManager注册、请求slot。
11. SoltManager向Resource Manager申请资源requestNewWorker。
12. Resource Manager启动TaskManager。
13. APPmaster实例化工作线程池launcherPool。
14. 工作线程池launcherPool中实例化ExecutorRunnalbe。
15. TaskManager实例化YarnTaskExecutorRunner,并生成TaskExecutor。
16. TaskExecutor向SoltManager注册Solt。
17. SoltManager向TaskExecutor分配Solt。
18. TaskExecutor向SoltPool提供Slot。
19. JobMaster生成对应的执行图并提交给Slot中的Task执行。

Flink组件通讯过程

在这里插入图片描述

1.JobMaster和TaskExecutor的RpcService调用startServer()方法启动RpcServer,创建AkkaRpcActor。
2.RpcServer调用start()方法启动RpcEndPoint。
3.RpcService通过Connect()对方的RpcServer得到一个对方的代理客户端RpcGateWay。
4.通过RpcGateway远程调用对端的方法。
5.TaskExecutor的RpcServer转发给代理InvokeHandler。
6.代理调用incoke()->incokeRpc()
7.1.首先判断发送地址是不是本地,是本地的话不涉及网络传输,就不需要序列化,调用LocalRpcInvocation方法。
7.2.不是本地的话需要进行序列化,调用RemoteRpcInvocation方法。
7.3.判断方法是否有返回值,如果有返回值调用ask方法。
8.1.如果调用方法没有返回值,直接返回void()。
8.2.接着判断返回类型是否是CompletableFuture类型,是的话不阻塞直接返回Future。
8.3.如果返回值不是,则阻塞等待返回值。
最后TaskExecutor按照返回类型进行对应处理。

Flink内存模型(TaskManager)

在这里插入图片描述

Flink内存使用了堆上内存和堆外内存,不计入solt资源
Task执行的内存使用了堆上内存和堆外内存
网络缓冲内存是网络数据交换所使用的堆外内存
框架堆外内存、Task对外内存、网络缓冲内存都在堆外的直接内存中。
管理内存是Flink管理的堆外内存,用于管理排序、哈希表、缓冲中间结果以及RocksDB的本地内存。

JVM特有内存是JVM本身占用的内存、包括源空间和执行开销。
Flink使用内存 = 框架堆内内存、框架堆外内存+Task堆内内存、Task堆外内存+网络缓冲内存+管理内存
进程内存 = Flink使用内存 + JVM内存

Flink资源配置调优

内存设置

bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn队列
-Djobmanager.memory.process.size=2048mb \ 指定JM的总进程大小 JM 2-4GB即可
-Dtaskmanager.memory.process.size=6144mb \ 指定每个TM的总进程大小 单个TM 2-8GB即可
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每个TM的slot数 与容器核数对应,1slot或2slot对应1core
-c com.atguigu.app.dwd.LogBaseApp \
/opt/module/flink-jar/XX.jar

最优并行度计算

  1. 先设置并行度10进行压测。
  2. 公式为:QPS/单并行度处理能力 = 并行度
  3. 最后并行度 = 压测并行度*1.2

Source端并行度配置

如果数据源是Kafka,Source的并行度设置为Kafka对应的Topic分区数。如果消费速度跟不上生产速度,则增大Kafka分区数。

Transform端并行度的配置

  • KeyBy之前的算子:并行度与source保持一致。
  • KeyBy之后的算子:如果并发较大,设置并行度为2的整数次幂。小并发则根据实际设置。

Sink端并行度的配置

Sink端是数据流向下游的地方,可以根据Sink端的数据量以及下游的服务抗压能力进行评估。如果Sink端是Kafka,可以设置为Kafka对应的Topic分区数。

CheckPoint设置

  • 一般CheckPoint时间间隔可以设置为分钟级别。
  • 对于状态很大的任务每次CheckPoint访问HDFS比较耗时,可以设置5-10分钟一次,并且调大两次CheckPoint之间暂停4-8分钟。
  • 如果配置EXACTLY_ONCE,那么在CheckPoint过程中还会存在barrier对其的过程,可以通过Web UI的CheckPoint选项卡来查看各阶段的耗时情况,从而确定是哪个阶段导致CheckPoint时间过长。

Flink反压处理

压测

在Kafka中积压数据,之后开启Flink任务,出现反压。

反压

反压是短时间的负载高峰导致系统接收数据的速率高于它处理数据的速率。

比如垃圾回收停顿可能会导致流入的数据快速堆积,或者临时业务活动导致流量徒增。反压得不到合理的处理会导致资源耗尽甚至崩溃。

反压机制是指系统能够自动检测被阻塞的Operator,然后自适应地降低源头或上游数据发送速率,从而维持整个系统的稳定。

反压现象及定位

监控对正常的任务运行有一定影响,因此只有当web页面切换到Job的BakPressure页面时,JobManager才会对Job触发反压监控。

默认情况下JobManager会触发100次采样,每次间隔50ms来确定反压。Web界面中可以看到的比率是多少个stack trace被卡住,比如0.01就表示100个采样中有一个被卡住。

ok标识没有反压:0<=比例<=0.1

low:0.1<=比例<=0.5

high标识反压:0.5<=比例<=1

利用Metrics定位反压位置

当某个Task吞吐量下降时,基于Credit的反压机制,上游不会给该Task发送数据,所以该Task不会频繁卡在向Buffer Pool去申请buffer。

反压监控实现原理就是监控Task是否在申请Buffer这一步,所以遇到瓶颈的Task必然会显示ok,表示没有受到反压。

如果Task吞吐量下降,造成该Task上游的Task出现反压时,必然会出现Task出现的InputChannel变满,已经申请不到可用的Buffer空间,从这个思路出发,监控Task的InputChannel使用情况进行监控,如果InputChannel使用率达到100%,那么该Task正在发生反压。

Flink数据倾斜

判断是否存在数据倾斜

可以通过WebUI精准地看到每个SubTask处理了多少数据,继而判断Flink任务是否存在数据倾斜。数据倾斜和反压会一起出现。

LocalKeyBy替代KeyBy算子

在KeyBy上游算子数据发送之前,首先在上游算子的本地对数据进行聚合再发送到下游,使下游接收到的数据量大大减少,从而使得KeyBy之后的聚合操作不再是任务的瓶颈。

但是这要求聚合操作必须是多条数据或者一批数据才能聚合,单条数据没有办法通过聚合来减少数据量。

//Checkpoint 时为了保证 Exactly Once,将 buffer 中的数据保存到该 ListState 中
class LocalKeyByFlatMap extends RichFlatMapFunction<String,Tuple2<String, 
 
 private ListState<Tuple2<String, Long>> localPvStatListState;
 
 //本地 buffer,存放 local 端缓存的 app 的 pv 信息
 private HashMap<String, Long> localPvStat;
 
 //缓存的数据量大小,即:缓存多少数据再向下游发送
 private int batchSize;
 
 //计数器,获取当前批次接收的数据量
 private AtomicInteger currentSize;

 //构造器,批次大小传参
 LocalKeyByFlatMap(int batchSize){
 	this.batchSize = batchSize;
 }

 @Override
 public void flatMap(String in, Collector collector) throws Exception {
 	// 将新来的数据添加到 buffer 中
 	Long pv = localPvStat.getOrDefault(in, 0L);
 	localPvStat.put(in, pv + 1);
 	// 如果到达设定的批次,则将 buffer 中的数据发送到下游
 	if(currentSize.incrementAndGet() >= batchSize){
 		// 遍历 Buffer 中数据,发送到下游
 		for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
 			collector.collect(Tuple2.of(appIdPv.getKey(), appIdPv.getValue()
 		}
 		// Buffer 清空,计数器清零
 		localPvStat.clear();
 		currentSize.set(0);
 	}
 }

 @Override
 public void snapshotState(FunctionSnapshotContext functionSnapshotConte
 	// 将 buffer 中的数据保存到状态中,来保证 Exactly Once
 	localPvStatListState.clear();
 	for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
 		localPvStatListState.add(Tuple2.of(appIdPv.getKey(), appIdPv.ge
 	}
 }

 @Override
 public void initializeState(FunctionInitializationContext context) {
 	// 从状态中恢复 buffer 中的数据
 	localPvStatListState = context.getOperatorStateStore().getListState
 	new ListStateDescriptor<>("localPvStat",
 	TypeInformation.of(new TypeHint<Tuple2<String, Long>>})));
 	localPvStat = new HashMap();
 	if(context.isRestored()) {
 		// 从状态中恢复数据到 localPvStat 中
 		for(Tuple2<String, Long> appIdPv: localPvStatListState.get()){
long pv = localPvStat.getOrDefault(appIdPv.f0, 0L);
 			// 如果出现 pv != 0,说明改变了并行度,
 			// ListState 中的数据会被均匀分发到新的 subtask中
 			// 所以单个 subtask 恢复的状态中可能包含两个相同的 app 的数据
 			localPvStat.put(appIdPv.f0, pv + appIdPv.f1);
 		}
 		// 从状态恢复时,默认认为 buffer 中数据量达到了 batchSize,需要向下游发
 		currentSize = new AtomicInteger(batchSize);
 	} else {
 		currentSize = new AtomicInteger(0);
 	}
 }

}

强制Shuffle

如果KeyBy之前就存在数据倾斜,上游算子的某些实例可能处理的数据较多,某些实例可能处理的数据较少,产生该情况是因为数据源本身就不均匀。这种情况下可以让Flink任务强制shuffle。使用shuffle、rebalance或rescale算子既可将数据均匀分配,从而解决数据倾斜的问题。

两阶段聚合

如果使用窗口,变成了有界数据的处理,窗口默认是触发时才会发出一条结果到下游,这时就可以使用两阶段聚合的方式。

第一阶段聚合:key拼接随机数前缀或后缀,进行keyby、开窗、聚合。需要注意的是聚合完不再是windowedStream,要获取windowEnd作为窗口标记作为第二阶段分组,避免不同窗口的结果聚合到一起。

第二阶段聚合:去掉随机数前缀或后缀,按照原来的key以及windowsEnd作keyby聚合。

KafkaSource 调优

动态发现分区

使用FlinkKafkaConsumer初始化时,可以通过Properties指定参数开始动态发现partition。

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 1000 + ""); 
第一个参数是配置项,第二个参数是多久检测一次分区。

Kafka数据源生成watermark

kafka单分区内有序,多分区间无序。这种情况下可以使用Flink识别kafka分区的watermark生成机制。使用此特性可以将Kafka消费端内部针对每个Kafka分区生成watermark,并且不同分区watermark的合并方式与在数据流shuffle时合并方式相同。

kafkaSourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                       .forBoundedOutOfOrderness(Duration.ofMinutes(2))
);

设置空闲等待

如果某一分区或分片在一段时间未发送事件数据,我们称这类数据源为空闲输入或空闲源。在这种情况下会造成个别partition一直没有新的数据。由于下游算子watermark的计算方式是取所有不同的上游并行数据源的watermark最小值,则其watermark不会发生变化,导致窗口、定时器都不会触发。

我们可以将watermark来检测空闲输入并将其标记为空闲状态。

kafkaSourceFunction.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofMinutes(2))
						.withIdleness(Duration.ofMinutes(5))
);

Kafka的offset消费策略

  • setStartFromGroupOffsets:默认消费策略

    默认读取上次保存的offset信息,也可以直接指定auto.offset.reset的值来进行消费。

  • setStartFromEarliest:忽略offset策略

    从最早的数据开始消费。

  • setStartFromLatest:忽略offset策略

    从最新的数据开始消费。

  • setStartFromSpecificOffsets:从指定位置消费

  • setStartFromTimestamp:指定时间点消费

    从topic中指定的时间点开始消费,指定时间点之前的数据忽略。

当checkpoint机制开启时,KafkaConsumer会定期把Kafka的offset信息和其他状态保存起来,job失败后,Flink会从最近一次Checkpoint回复数据,从保存的offset重新消费kafka中的数据。

Flink SQL工作机制

Flink SQL 执行流程

Flink sql 基于Apache Calcite。Calcite执行步骤如下:

f492106e135102729c0d2603dbd88e6c.png

  • Parser:SQL解析,通过JavaCC实现,使用JavaCC编写sql语法描述文件,将SQL解析为未经校验的AST语法树。
  • Validate:SQL校验,通过与元数据结合验证SQL中的Schema、Field、Function是否存在,输入输出类型是否匹配等。
  • Optimize:SQL优化,对上个步骤的输出(RelNode,逻辑计划树)进行优化,使用两种优化器。
  • Produce:SQL生成,将物理执行计划生成在特定平台的可执行程序。
  • Execute:SQL执行,通过各个执行平台执行查询,得到输出结果。

Flink SQL 优化器

  • RBO(基于规则的优化器)

    将原有表达式裁剪掉,遍历一系列规则(Rule),只要满足条件就转换,生成最后的执行计划。

    规则包括:分区裁剪、列裁剪、谓词下推、投影下推、聚合下推、limit下推、sort下推、常量折叠、子查询内联转join等。

  • CBO(基于代价的优化器)

    保留原有表达式,基于统计信息和代价模型,尝试探索生成等价关系表达式、最终取代价最小的执行计划。

    模型包括:Volcano模型、Cascades模型。Cascades模型会一边遍历SQL逻辑树,一遍优化,从而进一步裁减掉一些执行计划。

Flink SQL调优

开启MiniBatch(提升吞吐)

MinBatch是微批处理,原理是缓存一定的数据后再触发处理,减少对State的访问,从而提升吞吐并减少数据的输入量。MinBatch依靠在每个Task上注册的Timer线程来触发微批,需要消耗一定的线程调度性能。

  • 使用场景

    微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通常对于聚合的场景,微批处理可以显著的提升系统性能,可以开启。

  • 注意事项

    目前K-V配置项仅被Blink planner支持。

    1.12版本前有bug,不会清理过期状态。

  • 配置

// 获取 tableEnv的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 开启miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
configuration.setString("table.exec.mini-batch.size", "20000");

开启LocalGlobal(解决常见数据热点问题)

LocalGlobal优化将原来的aggregate分为Local+Global两阶段聚合,第一阶段在上游节点本地攒一批数据进行聚合(localAgg),并输出这次微批的增量值(Accumulator)。第二阶段再将受到Accumulator合并(Merge),得到最终结果(GlovalAgg)。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aQIP4z5c-1647597128293)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220318110219417.png)]

  1. 如果没有开启LocalGlobal优化,由于流中的数据倾斜,key为红色的聚合算子实例需要处理更多的记录,这就导致了数据热点。

  2. 开启LocalGlobal优化,先进行本地聚合再进行全局聚合,可大大减少GlobalAgg的热点,提升性能。

  • 使用场景

    适用于提升如SUM、COUNT、MAX、MIN、AVG等普通聚合的性能以及这些场景下引起的数据热点问题。

  • 注意事项

    需要先开启MiniBatch。

    开启LocalGlobal需要使用UDAF实现Merge方法。

  • 配置

    // 获取 tableEnv的配置对象
    Configuration configuration = tEnv.getConfig().getConfiguration();
    
    // 设置参数:
    // 开启miniBatch
    configuration.setString("table.exec.mini-batch.enabled", "true");
    // 批量输出的间隔时间
    configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
    // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
    configuration.setString("table.exec.mini-batch.size", "20000");
    // 开启LocalGlobal
    configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
    

开启Split Distinct(解决COUNT DISTINCT热点问题)

Count Distinct 在Local聚合时,对于Distinct key的去重率不高,导致在Global节点仍然存在热点。

为了解决这个问题,可以手动改写两层聚合来打散,Flink1.9之后,提供了自动打散功能。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XcJd703e-1647597128294)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220318111838509.png)]

举例:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

手动两阶段聚合:
-- 外层聚合
SELECT day, SUM(cnt)
FROM (
	-- 手动打散 distinct key
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
  • 使用场景

    使用COUNT DISTINCT,但无法满足聚合节点性能要求。

  • 注意事项

    目前不能在包含UDAF的Flink SQL中使用split Distinct优化。

    拆分出来的两个Group聚合还可参与LocalGlobal优化。

  • 配置

    // 获取 tableEnv的配置对象
    Configuration configuration = tEnv.getConfig().getConfiguration();
    
    // 设置参数:
    // 开启Split Distinct
    configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
    // 第一层打散的bucket数目
    configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
    

改写 agg with filter语法(提升大量count distinct场景性能)

在某些场景下,可能需要从不同维度统计,这时可能会使用cash when语法。

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

在这种情况下,可以使用FILTER语法,Flink可以只使用一个共享状态示例,而不是三个状态,可减少状态的大小和对状态的访问。

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

TopN优化(无排名优化,解决数据膨胀问题)

根据TopN的语法,rownum字段会作为结果表的主键字段之一写入结果表。但是这会导致数据膨胀的问题。比如收到一条原排名9的更新数据,更新后排名上升到1,则从1到9的数据排名都发生变化,需要将这些数据作为更新都写入结果,就会导致数据膨胀。

可以将TopN的输出结果无需要显示rownum值,仅需在最终前端显示时进行一次排序,极大减少结果表的输入。

原SQL:
SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
    ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  FROM table_name)
WHERE rownum <= N [AND conditions]

优化:
SELECT col1, col2, col3
FROM (
 SELECT col1, col2, col3
   ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
   ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
 FROM table_name)
WHERE rownum <= N [AND conditions]

对于无rownum的场景,结果表主键的定义需要十分小心,如果定义有误会导致TopN结果不正确。无rownum场景,主键应该为TopN上游GroupBY节点的key列表。

TopN优化(增加TopN的Cache大小)

TopN为了提升性能有一个state Cache层,Cache层能够提升对State的访问效率。TopN的Cache命中率的计算公式为:

cache_hit = cache_size*parallelism/top_n/partition_key_num
命中率 = 配置缓存数*并发/top数/PatitionBy_key数
例如Top100配置缓存10000条,并发50,当PatitionBy的key维度10万级别
5% = 10000*50/100/100000
  • 配置

    // 获取 tableEnv的配置对象
    Configuration configuration = tEnv.getConfig().getConfiguration();
    
    // 设置参数:
    // 默认10000条,调整TopN cahce到20万,那么理论命中率能达200000*50/100/100000 = 100%
    configuration.setString("table.exec.topn.cache-size", "200000");
    

保留首行的去重策略(Deduplicate Keep FirstRow)

保留key下第一条出现的数据,之后出现的key的数据会被丢弃,因为state中只存储了key数据,所以性能较优。

  • 示例

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
      FROM T
    )
    WHERE rowNum = 1
    

保留末行的去重策略(Deduplicate Keep LastRow)

保留key下最后一条出现的数据,可以按照业务时间保留最后一条数据。

  • 示例

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
      FROM T
    )
    WHERE rowNum = 1
    

like操作注意事项

  • 如果需要进行StartWith操作,使用LIKE ‘xxx%’。

  • 如果需要进行EndWith操作,使用LIKE ‘%xxx’。

  • 如果需要进行Contains操作,使用LIKE ‘%xxx%’。

  • 如果需要进行Equals操作,使用LIKE ‘xxx’,等价于str = ‘xxx’。

  • 如果需要匹配 _ 字符,请注意要完成转义LIKE ‘%seller/id%’ ESCAPE ‘/’。_在SQL中属于单字符通配符,能匹配任何字符。如果声明为 LIKE ‘%seller_id%’,则不单会匹配seller_id还会匹配seller#id、sellerxid或seller1id 等,导致结果错误。

指定时区

为了避免时区错乱的问题可以指定时区

  • 配置

    // 获取 tableEnv的配置对象
    Configuration configuration = tEnv.getConfig().getConfiguration();
    
    // 设置参数:
    // 指定时区
    configuration.setString("table.local-time-zone", "Asia/Shanghai");
    

Flink并行度设置

可以从四个不同层面设置并行度:

  • 操作算子层面
  • 执行环境层面
  • 客户端层面
  • 系统层面

优先级:算子层面>环境层面>客户端层面>系统层面

并行度设置:一般设置为Kafka分区数,遵循2的N次方。

Flink的KeyBy怎么实现的分区

对指定的key调用自身的hashCode方法,

调用murmruhash算法,进行第二次hash,得到键组ID

通过一个公式,计算当前数据应该去往哪个下游分区:
键组ID * 下游算子并行度 / 最大并行度

算子的一个并行示例可以理解为一个分区,是物理上的资源。

数据按照key进行区分可以理解为一个分组。

一个分区可以有多个分组,同一个分组的数据肯定在同一个分区。

Flink的interval join的实现原理

底层调用的是Keyby+Connect,处理逻辑如下:

  • 判断是否迟到(迟到就不处理)
  • 每条流都存了一个Map类型的状态(key是时间戳,value是List存数据)
  • 任一条流,来了一条数据,遍历对方的map状态,能匹配上就发往join方法
  • 超过有效范围,会删除对应map中的数据。

interval join不会处理join不上的数据,如果需要没join上的数据,可以用coGroup+connect算子实现,或者直接使用left join或right join语法。

Flink状态机制

算子状态:作用范围是算子,算子的多个并行实例各自维护一个状态。

监控状态:每个分组维护一个状态。

状态后端:

本地状态checkpoint
内存TaskManager的内存JobManager内存
文件TaskManager的内存HDFS
RocksDBRocksDBHDFS

Flink水位线和时间语义

Watermark是一条携带时间戳的数据,用来衡量Event Time进展的机制,可以设定延迟触发,从代码指定生成的位置,插入到流中。

Watermark的生成有两种方式,官方默认提供的是周期性水位线,默认是200ms,除周期性水位线外还有间歇性水位线,来一条数据,更新一次水位线。

时间语义有三种:

  • Event Time:事件时间
  • Ingestion Time:摄入时间
  • Processing Time:处理时间

Flink的窗口

  • 窗口分类:Keyed Window和Non-keyed Window

    基于时间:滚动窗口、滑动窗口、会话窗口

    基于数量:滚动窗口、滑动窗口

  • 窗口的4个相关重要组件

    assigner(分配器):如何将元素分配给窗口

    function(计算函数):为窗口定义的计算

    triger(触发器):在什么条件下触发窗口的计算

    evictor(退出器):定义从窗口中移除数据

  • 窗口的划分

    开了一个10s的滚动窗口,第一条数据是867S,那么他属于[860s,870s)

  • 窗口的创建

    当属于某个窗口的第一个元素到达,Flink就会创建一个窗口。

  • 窗口的销毁

    当时间超过其结束时间+用户指定的允许延迟时间,窗口销毁。

一致性语义

Source:可重发

Transformation:Checkpoint机制(Chandy-Lamport算法、barrier对齐)

Sink:幂等性、事务性

Flink Checkpoint

https://blog.csdn.net/qq_41106844/article/details/114372717

标签:Task,聚合,Flink,并行度,调优,内存,原理,数据
来源: https://blog.csdn.net/qq_41106844/article/details/123581092

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有