ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink 自定义水位线

2022-06-28 06:00:06  阅读:230  来源: 互联网

标签:自定义 Flink 生成 水位 env Override new public


一般来说,Flink内置的水位线生成器就可以满足应用需求了。不过有时由于业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,开发人员就必须自定义实现水位线策略WatermarkStrategy了。在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于WatermarkGenerator的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。就是WatermarkGenerator接口中的两个方法——onEvent()和onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上;

1、周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //生成周期性水位线
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new MyWatermarkStrategy())
                .print();

        env.execute();
    }

    //自定义周期性生成水位线
    public static class MyWatermarkStrategy implements WatermarkStrategy<Event> {


        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp * 1000L;
                }
            };
        }

        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            //周期生成水位线
            return new MyPeriodicGenerator();
            
        }
    }

    //周期性生成水位线
    public static class MyPeriodicGenerator implements WatermarkGenerator<Event> {
        //延迟时间
        private long delayTime = 5000L;
        //观察到最大时间cuo
        private long maxTs = Long.MIN_VALUE + delayTime + 1L;

        //每来一条数据调研一次
        @Override
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
            //更新最大时间cuo
            maxTs = Math.max(event.timestamp, maxTs);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            //发射水位线,默认200 ms 调用一次
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }

在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。所以水位线的时间戳是依赖当前已有数据的最大时间戳的(这里的实现与内置生成器类似,也是减去延迟时间再减1),但具体什么时候生成与数据无关。

2、断点式水位线生成器(PunctuatedGenerator)

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //生成周期性水位线
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new MyWatermarkStrategy())
                .print();

        env.execute();
    }

    //自定义周期性生成水位线
    public static class MyWatermarkStrategy implements WatermarkStrategy<Event> {


        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp * 1000L;
                }
            };
        }

        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            //周期生成水位线
            // return new MyPeriodicGenerator(); 
            //断点生成水位线
            return new MyPunctuatedGenerator();
        }
    }

    //断点生成水位线
    public static class MyPunctuatedGenerator implements WatermarkGenerator<Event> {

        @Override
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
            //只有遇到特定数据时,才发送水位线
            if (event.user.equals("依琳")) {
                output.emitWatermark(new Watermark(event.timestamp - 1L));
            }
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            //onEvent 已经发送了水位线,onPeriodicEmit不做处理即可
        }
    }

在onEvent()中判断当前事件的user字段,只有遇到“依琳”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

3、在自定义数据源中发送水位线

也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法 来 生 成 水 位 线 了 。 在 自 定 义 数 据 源 中 生 成 水 位 线 和 在 程 序 中 使 用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSourceWithWatermark()).print();
        env.execute();

    }
ClickSourceWithWatermark 逻辑
public class ClickSourceWithWatermark implements ParallelSourceFunction<Event> {
    // 声明标志位
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        //随机生成数据
        Random random = new Random();
        //随机范围
        String[] users = {"令狐冲", "依琳", "任盈盈", "莫大", "风清扬"};

        String[] urls = {"./home", "./cat", "./pay", "./info"};
        //循环生成数据
        while (running) {
            //生成数据
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Event event = new Event(user, url, Calendar.getInstance().getTimeInMillis());

            //发送数据
            ctx.collect(event);

            //发送水位线
            ctx.emitWatermark(new Watermark(event.timestamp - 1L));
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

在自定义水位线中生成水位线相比assignTimestampsAndWatermarks方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写Flink的测试程序,测试Flink的各种各样的特性。

标签:自定义,Flink,生成,水位,env,Override,new,public
来源: https://www.cnblogs.com/wdh01/p/16412928.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有