ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink之state processor api读取checkpoint文件

2022-01-10 19:01:04  阅读:332  来源: 互联网

标签:flink Flink checkpoint state api org apache import


什么是State Processor

API
官方文档说明:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/libs/state_processor_api.html

目的

使用 State Processor API 可以 读取、写入和修改 savepointscheckpoints ,也可以转为SQL查询来分析和处理状态数据。定位作业中的问题。

使用方式介绍

引入pom

		<!--读checkpoint-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-state-processor-api_2.11</artifactId>
            <version>1.12.3</version>
        </dependency>

读取keyed state时,使用 readKeyedState 指定uid和KeyedStateReaderFunction<KeyType, OutputType> 函数来获取对应的 state。(读哪个算子的状态就使用作业中算子的uid)

package com.d4t.dataplatform.runner;

import java.io.Serializable;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;

import com.d4t.dataplatform.runner.functions.rulerunner.RuleCalculateProcessFunction;

/**
 * @author sanhongbo
 * @date 2022/1/10
 * @description 读取checkpoint
 **/
public class ReadCheckpoint {
    public static void main(String[] args) throws Exception {
        final String Uid = RuleCalculateProcessFunction.class.getSimpleName();

        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        final String checkpointPath = parameterTool.get("checkpoint.path");

        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<KeyedMapState> keyedMapStateDataSet = Savepoint
                .load(env, checkpointPath ,new MemoryStateBackend())
                .readKeyedState(Uid, new ReaderFunction());

        keyedMapStateDataSet
                .writeAsText("hdfs:///flink/state/test");

        // execute program
        env.execute("read the list state");
    }

    static class KeyedMapState implements Serializable {
        String key;
        String mapKey;
        Object value;

        @Override
        public String toString() {
            return "KeyedMapState{" +
                    key + ',' +
                    mapKey + ',' +
                    value +
                    '}';
        }
    }

    static class ReaderFunction extends KeyedStateReaderFunction<String, KeyedMapState> {
        private transient MapState<String, Object> mapState;

        @Override
        public void open(Configuration parameters) {
            /**
             * 状态描述符
             */
            final MapStateDescriptor<String, Object> DESCRIPTOR_MAP_STATE =
                    new MapStateDescriptor<>("XXXXXXX", String.class, Object.class);

            mapState = getRuntimeContext().getMapState(DESCRIPTOR_MAP_STATE);
        }

        @Override
        public void readKey(
                String key,
                Context ctx,
                Collector<KeyedMapState> out) throws Exception {


            Iterable<String> keys = mapState.keys();
            for (String s : keys) {
                if(s.contains("XXXXXX")){
                    KeyedMapState km = new KeyedMapState();
                    km.key = key;
                    km.mapKey = s;
                    km.value = mapState.get(s);
                    out.collect(km);
                }
            }
        }
    }

}

打包运行

# 并行度可与读取state的作业保持一致 否则容易内存溢出
flink run -d -p 6 -t yarn-per-job -Dtaskmanager.memory.process.size=3072mb -Dtaskmanager.memory.managed.size=0 -ynm map-state -c com.d4t.dataplatform.runner.ReadCheckpoint runner-0.1-jar-with-dependencies.jar --job.name map-state --checkpoint.path hdfs:///flink/checkpoint/rule_runner_20220108/820008c1f70ed755109219f40fc4efb9/chk-558

执行完后,将文件拉到本地。合并文件

hdfs dfs -ls /flink/state/test/ | awk '{print $NF}' | xargs -I{} hdfs dfs -copyToLocal {}
cat * >> total

标签:flink,Flink,checkpoint,state,api,org,apache,import
来源: https://blog.csdn.net/sanhongbo/article/details/122416831

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有