ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink Uid设计剖析

2020-10-22 18:01:55  阅读:181  来源: 互联网

标签:Node Flink java Uid flink 剖析 apache org CliFrontend


一个flink job通常由一个或多个source operators、一些处理计算的operators、和一个或者多个sink oper组成。每个operators在一个或者多个task中并行运行,并且使用不同的类型的state。

如果operators应用于key steam,它可以有零个、一个或者多个“key state”,它的作用是从每一条记录中提取出它的key值,可以将它理解为处理了一个分布式的map。

下图显示了应用程序“ MyApp”,该应用程序由称为“ Src”,“ Proc”和“ Snk”的三个运算符组成。 Src具有一个操作员状态(os1),Proc具有一个操作员状态(os2)和两个键控状态(ks1,ks2),而Snk是无状态的。

Application: MyApp

MyApp的checkpoint 或者 savepoint都是由所有的状态数据组成,这些数据的结构可以让每个任务从checkpoint 或者savepoint恢复。在使用批处理作业保存恢复点的时候,其实际就是把每个任务的state映射到一个数据集或者一个表上,可以认为这个保存点其实就是一个数据库。每个operators(由其UID唯一标识)都代表了一个名称空间。operators的每个操作state都是有一个列映射到名称空间的专有表,改列包含所有任务的state数据。operators的所有的key states都会映射到单个表,该表的由一个k-v列组成。改列每一个key值对应一个列。如下图:

Database: MyApp

该图显示了Src的operators state值如何映射到一个表,该表具有一列五行,跨Src的所有并行任务的每个列表条目为一行。 运算符“ Proc”的运算符状态os2类似地映射到单个表。 键状态ks1和ks2被组合到具有三列的单个表中,一列用于键,一列用于ks1,一列用于ks2。 键控表为两个键控state的每个不同键保持一行。 由于运算符“ Snk”没有任何状态,因此其名称空间为空。

所以我们可以总结savepoint和database的关系如下:

  • 一个savepoint是一个数据库
  • operators是其uid命名的namespace
  • 每一个operator state是一个单独的表
  • operator state中每一个元素代表一行
  • 每个keyed state表都有一个键列,该键列映射operator的键值
  • 每个注册状态代表表中的单个列
  • 表中的每一行都映射到一个键

为所有的operators去设置UID

如上所述,Flink将operators state映射到operators时,使用的是uid,这对于savepoint至关重要。默认情况下,uid是通过遍历JobGraph并hash特定operators属性来生成运算符uid。尽管对于使用者来说很方便,但是他也非常的脆弱,因为对于JobGraph的更改会导致新的UUID,为了建立稳定的映射时,我们必须setUid提供稳定的uid。

问题:

如果使用在FLink上层建立一层解析器,通过类似于SQL的简单的语法提供给未学习过的FLink的用户,这个时候导致解析器去映射成实际的FLink的uid并不能很稳定的去设置,会导致上个savepoint的operators即使部分相同,也会导致大部分的丢失,所以我们需要去了解UID怎么去设置的,并去做修改。


让我们看看flink是如何去设置UID的

可以通过运行这一段demo去debug发现:

StreamGraph streamGraph = env.getStreamGraph(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();

如果用户有设置uid则设置为用户的uid

DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:188: User defined hash 'LogRiverEvent,d1494b6528fcd7578379234466c9feef' for node 'Source: LogRiverEvent-1' {id: 1, parallelism: 4, user function: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011}
  java.lang.Thread.getStackTrace(Thread.java:1559)
  org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:185)
  org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
  org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
  org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
  org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
  org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
  cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
  
  
  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  java.lang.reflect.Method.invoke(Method.java:498)
  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
  org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
  org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
  org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
  org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
  org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
  org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
  org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)

没有uid则会自动生成一个uid

2020-04-15 15:37:07,248 DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:264: Generated hash '10c893aa189fa29399b8c379dbfeca05' for node 'ignore_null_event-3' {id: 3, parallelism: 4, user function: cn.yottabyte.pipe.flink.source.LogRiverSourceTest$1}
  java.lang.Thread.getStackTrace(Thread.java:1559)
  org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateDeterministicHash(StreamGraphHasherV2.java:260)
  org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:166)
  org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
  org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
  org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
  org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
  org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
  
  
  cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  java.lang.reflect.Method.invoke(Method.java:498)
  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
  org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
  org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
  org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
  org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
  org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
  org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
  org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
  org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)

实际生成uid代码为:flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java

再去了解代码层面:

Hasher hasher = hashFunction.newHasher();
			byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);
private byte[] generateDeterministicHash(
			StreamNode node,
			Hasher hasher,
			Map<Integer, byte[]> hashes,
			boolean isChainingEnabled,
			StreamGraph streamGraph) {
		……
		for (StreamEdge outEdge : node.getOutEdges()) {
				generateNodeLocalHash(hasher, hashes.size());
		}
		……
		byte[] hash = hasher.hash().asBytes();
		……
		for (StreamEdge inEdge : node.getInEdges()) {
			byte[] otherHash = hashes.get(inEdge.getSourceId());
			for (int j = 0; j < hash.length; j++) {
				hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
			}
		}
		……
}

最终发现最后生成的hashcode与当前的id,out个数,int的hash有关。

实际输出一个uid为例:

如图的flink DAG:

img

Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.

解释:

  • 第一个"#"后面的id是算hash用的id,也就是遍历时该Node的id。
  • in edges表示前向依赖的Node,后面的#接这个node的hash值,再#后面是该node的name
  • out edges表示后续的Node,后面的#接这个node的hash值,再#后面是该node的name

整个链路如下:

  • Node:Source: LogRiverEvent-1, #0.
  • Node:extractTimestamp-2, #1. in edges#bc764cd8ddf7a0cff126f51c16239658#Source: LogRiverEvent-1.
  • Node:ignore_null_event-3, #2. in edges#0a448493b4782967b150582570326227#extractTimestamp-2.

#并不是算完一条再算另一条

  • Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.

  • Node:Filter-11, #4. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.

  • Node:Map-5, #5. in edges#6d2677a0ecc3fd8df0b72ec675edf8f4#Filter-4.

  • Node:Map-12, #6. in edges#5af0c26a4e78bae94addfaa227a406c1#Filter-11.

  • Node:Flat Map-6, #7. in edges#f66b9c09d172b1c19fff9288e5d53f49#Map-5.

  • Node:Flat Map-13, #8. in edges#227ab9be2ca77a534fd0e93a93e67e8b#Map-12.

  • Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8, #9. in edges#b15a823f9f5f129a46d82a43a93f3613#Flat Map-6.

  • Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15, #10. in edges#d6b993c9e4e18029649e604955b50858#Flat Map-13.

  • Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10, #11. in edges#ce95ba768fcf87e260ab71a244cbfdb9#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8.

  • Node:Map-16, #12. in edges#0ca94585fe097875e38fc49c1eb9af32#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15.

  • Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18, #13. in edges#0929bdfbc119b2e3d9f65c52d507beb4#Map-16.

  • Node:Sink: Unnamed-20, #14. in edges#9dc9fccc5cacedb2297bb9e1d19bbe5d#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10. in edges#8b25afa3fd1186f5175e9605d9e9cf94#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18.

了解到上面UID的设计情况以后,如果我们有需求进行更改,可以根据自己需求去在用户层面去设计UID,避免重复计算。

标签:Node,Flink,java,Uid,flink,剖析,apache,org,CliFrontend
来源: https://www.cnblogs.com/yankang/p/13859820.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有