ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink实例(126):状态管理(十五)State 过期时间TTL

2021-03-02 22:03:34  阅读:470  来源: 互联网

标签:状态 清理 过期 Flink State 126 TTL StateTtlConfig


1 State 过期时间TTL

  使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形。

  例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句,以及执行了没有时间窗口限制的双流 JOIN 等等操作。

  对于这些情况,经常导致堆内存出现 OOM,或者堆外内存(RocksDB)用量持续增长导致超出容器的配额上限,造成作业的频繁崩溃。从 Flink 1.6 版本开始引入了State TTL 特性,该特性可以允许对作业中定义的 Keyed 状态进行超时自动清理,对于Table API 和 SQL 模块引入了空闲状态保留时间(Idle State Retention Time)进行状态管理,下面我们具体介绍一下。

1.1 State TTL 功能的用法

在 Flink 的官方文档 中给我们展示了State TTL的基本用法,用法示例如下:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
 
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

可以看到,要使用 State TTL 功能,首先要定义一个 StateTtlConfig 对象。这个 StateTtlConfig 对象可以通过构造器模式(Builder Pattern)来创建,典型地用法是传入一个 Time 对象作为 TTL 时间,然后设置更新类型(Update Type)和状态可见性(State Visibility),这两个功能的含义将在下面的文章中详细描述。当 StateTtlConfig 对象构造完成后,即可在后续声明的状态描述符(State Descriptor)中启用 State TTL 功能了。

从上述的代码也可以看到,State TTL 功能所指定的过期时间并不是全局生效的,而是和某个具体的状态所绑定。换而言之,如果希望对所有状态都生效,那么就需要对所有用到的状态定义都传入 StateTtlConfig 对象。对 Flink 源码感兴趣的同学,可以尝试为 Flink 增加一个默认的 StateTTL 选项,实现起来很简单,这里不再展开说明了。

State TTL 使用的更多案例,可以参见官方的 flink-stream-state-ttl-test 包,它提供了很多测试用例可以参考。

1.2 StateTtlConfig 的参数说明

  • TTL:表示状态的过期时间,是一个 org.apache.flink.api.common.time.Time 对象。一旦设置了 TTL,那么如果上次访问的时间戳 + TTL 超过了当前时间,则表明状态过期了(这是一个简化的说法,严谨的定义请参考org.apache.flink.runtime.state.ttl.TtlUtils 类中关于 expired 的实现) 。

  • UpdateType:表示状态时间戳的更新的时机,是一个 Enum 对象。如果设置为 Disabled,则表明不更新时间戳;如果设置为 OnCreateAndWrite,则表明当状态创建或每次写入时都会更新时间戳;如果设置为 OnReadAndWrite,则除了在状态创建和写入时更新时间戳外,读取也会更新状态的时间戳。

  • StateVisibility:表示对已过期但还未被清理掉的状态如何处理,也是 Enum 对象。如果设置为 ReturnExpiredIfNotCleanedUp,那么即使这个状态的时间戳表明它已经过期了,但是只要还未被真正清理掉,就会被返回给调用方;如果设置为 NeverReturnExpired,那么一旦这个状态过期了,那么永远不会被返回给调用方,只会返回空状态,避免了过期状态带来的干扰。

  • TimeCharacteristic 以及 TtlTimeCharacteristic:表示 State TTL 功能所适用的时间模式,仍然是 Enum 对象。前者已经被标记为 Deprecated(废弃),推荐新代码采用新的 TtlTimeCharacteristic 参数。截止到 Flink 1.8,只支持 ProcessingTime 一种时间模式,对 EventTime 模式的 State TTL 支持还在开发中。

  • CleanupStrategies:表示过期对象的清理策略,目前来说有三种 Enum 值。当设置为 FULL_STATE_SCAN_SNAPSHOT 时,对应的是 EmptyCleanupStrategy 类,表示对过期状态不做主动清理,当执行完整快照(Snapshot / Checkpoint)时,会生成一个较小的状态文件,但本地状态并不会减小。

    唯有当作业重启并从上一个快照点恢复后,本地状态才会实际减小,因此可能仍然不能解决内存压力的问题。为了应对这个问题,Flink 还提供了增量清理的枚举值,分别是针对 Heap StateBackend 的 INCREMENTAL_CLEANUP(对应 IncrementalCleanupStrategy 类),以及对 RocksDB StateBackend 有效的 ROCKSDB_COMPACTION_FILTER(对应 RocksdbCompactFilterCleanupStrategy 类)。

    对于增量清理功能,Flink 可以被配置为每读取若干条记录就执行一次清理操作,而且可以指定每次要清理多少条失效记录;对于 RocksDB 的状态清理,则是通过 JNI 来调用 C++ 语言编写的 FlinkCompactionFilter 来实现,底层是通过 RocksDB 提供的后台 Compaction 操作来实现对失效状态过滤的。

配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。

  • TTL 刷新策略(默认OnCreateAndWrite)
策略类型描述
StateTtlConfig.UpdateType.Disabled 禁用TTL,永不过期
StateTtlConfig.UpdateType.OnCreateAndWrite 每次写操作都会更新State的最后访问时间
StateTtlConfig.UpdateType.OnReadAndWrite 每次读写操作都会跟新State的最后访问时间
  • 状态可见性(默认NeverReturnExpired)
策略类型描述
StateTtlConfig.StateVisibility.NeverReturnExpired 永不返回过期状态
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp 可以返回过期但尚未被清理的状态值

1.3 Notes:

  • 状态后端存储最后一次修改的时间戳和值,这意味着启用该特性会增加状态存储的消耗。堆状态后端在内存中存储一个附加的Java对象,其中包含对用户状态对象的引用和一个原始长值。RocksDB状态后端为每个存储值、列表条目或映射条目添加8个字节;

  • 目前只支持与处理时间相关的TTLs;

  • 如果试图使用启用TTL的描述符或使用启用TTL的描述符恢复先前在没有TTL的情况下配置的状态,将导致兼容性失败和statmigration异常;

  • TTL配置不是check- or savepoints的一部分,而是Flink在当前运行的作业中如何处理它的一种方式

2 State清除策略

2.1 Cleanup in full snapshot

  默认情况下,过期值只有在显式读出时才会被删除,例如通过调用 ValueState.value() 方法。

  此外,您可以在获取完整状态快照时激活清理操作,这将减少其大小。

  在当前实现下,本地状态不会被清除,但在从前一个快照恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig 中配置。(1)下面的配置选项不适用于 RocksDB state backend上的 increamental checkpointing;(2)对于现有作业,此清理策略可以在 StateTtlConfig 中随时激活或停用,例如从保存点重新启动后可以使用。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

2.2 Incremental cleanup

  另一个选项是增量地触发对某些状态项的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果这个清理策略在某个状态下活跃的,那么存储后端会在其所有条目上为该状态保留一个惰性全局迭代器。

  每次触发增量清理时,迭代器都会被提升。检查遍历的状态项,并清理过期的状态项。这个特性可以在StateTtlConfig中激活:

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlCon fig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally
    .build

上面的策略有两个参数,第一个参数:第是每次清理触发的检查状态的条件。如果启用,则每次状态访问都将触发它。第二个参数:是否为每个记录处理额外触发清理。Notes:

  • 如果对状态没有访问或者没有任何处理的记录,那么状态会一直保留;

  • 增量状态的清理增加了记录处理的延迟;

  • 目前,增量状态的清理策略仅仅在对堆状态后端被实现了,对于设置了RocksDB的将没有效果;

  • 如果使用堆状态后端进行同步快照,全局迭代器在跌倒时会保留所有键的副本,因为它的特性不支持对并发数的修改。使用此功能将增加内存消耗。异步快照进行对状态的保存就没有这种情况发生;

  • 对于现有的作业,可以通过在StateTtlConfig中设置这种清理策略能够随时被激活和停用,例如:从保存点重新启动后。

2.3 Cleanup during RocksDB compaction

  如果使用RocksDB进行状态的管理,另一个清理策略就是激活Flink的压缩过滤这个策略。RocksDB会定期使用异步压缩来合并状态的更新和减少储存。Flink压缩过滤器使用TTL检查状态的过期时间戳,并排除过期值。

  默认情况下是关闭该特性的。对于RocksDB进行状态管理首先要做的就是要激活,通过Flink配置文件配置state.backend.rocksdb.ttl.compaction.filter.enabled,或者对于一个Flink job来说如果一个自定义的RocksDB 状态管理被创建那么它可以调用 RocksDBStateBackend::enableTtlCompactionFilter。

  然后任何带有TTL的状态都可以配置来去使用过滤器。

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter
    .build

  RocksDB compaction filter将会从Flink每次处理完一定数据量的状态之后,从Flink查询用于检查过期的当前时间戳,这个数字默认是1000。你也可以选择更改它,并将自定义值传递给StateTtlConfig.newBuilder(…)。

  cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。频繁的跟新时间错可以提高清理的数据但是会降低压缩性能,因为它使用了来自本地的JNI的调用。

Notes:

  • 在压缩过程中调用TTL过滤器会减慢它的速度。TTL过滤器必须解析上次访问的时间戳,并检查每个正在压缩的键的每个存储状态条目的过期时间。对于集合状态类型(列表或映射),每个存储的元素也调用该检查;
  • 对于现有作业,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后。

  目前,管理 operator state 仅仅支持使用 List 类型。当前,支持 List 样式的托管运算符状态,彼此之间相互独立,因此可以在重新缩放时可以重新分配。换句话说,这些对象是可以重新分配 non-keyed state 的最佳粒度。根据状态访问方法,定义一下重新分配方案。

 

标签:状态,清理,过期,Flink,State,126,TTL,StateTtlConfig
来源: https://www.cnblogs.com/qiu-hua/p/14471568.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有