ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink-join的三种方式

2022-02-25 22:34:52  阅读:221  来源: 互联网

标签:join String fields Flink Tuple3 三种 new public


Join

/**
 *
 * 将两个数据流,进行join
 *
 * 如果让两个流能够join上,必须满足以下两个条件
 * 1.由于数据是分散在多台机器上,必须将join条件相同的数据通过网络传输到同一台机器的同一个分区中(按照条件进行KeyBy)
 * 2.让每个流中的数据都放慢,等等对方(划分相同类型,长度一样的窗口)
 *
 */
public class EventTumblingWindowJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1000,o001,c001
        DataStreamSource<String> lines1 = env.socketTextStream("linux01", 7777);
        //1200,c001,图书
        DataStreamSource<String> lines2 = env.socketTextStream("linux01", 8888);

        //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API

        //提取两个流的Watermark
        SingleOutputStreamOperator<String> lines1WithWatermark
                = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator<String> lines2WithWatermark
                = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        //对两个流进行处理

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream1
                = lines1WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() {

            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream2
                = lines2WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() {

            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        //将两个流join
        DataStream<Tuple5<Long, String, String, Long, String>> result = tpStream1.join(tpStream2)
                .where(tp1 -> tp1.f2)   //第一个流keyBY的字段
                .equalTo(tp2 -> tp2.f1) //第二个流keyBy的字段
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))   //划分窗口
                //全量聚合的处理逻辑
                .apply(new JoinFunction<Tuple3<Long, String, String>, Tuple3<Long, String, String>, Tuple5<Long, String, String, Long, String>>() {
                    //窗口触发后,条件相同的,并且在同一个窗口内的数据,会传入到join方法中
                    @Override
                    public Tuple5<Long, String, String, Long, String> join(Tuple3<Long, String, String> first, Tuple3<Long, String, String> second) throws Exception {
                        return Tuple5.of(first.f0,first.f1,first.f2,second.f0,second.f2);
                    }
                });

        result.print();

        env.execute();
    }
}

LeftOuterJoin

/**
 * 将两个数据流,实现LeftOuterJoin
 *
 * 如果让两个流能够join上,必须满足以下两个条件
 * 1.由于数据是分散在多台机器上,必须将join条件相同的数据通过网络传输到同一台机器的同一个分区中(按照条件进行KeyBy)
 * 2.让每个流中的数据都放慢,等等对方(划分相同类型,长度一样的窗口)
 *
 */
public class EventTumblingWindowLeftOuterJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1000,o001,c001
        DataStreamSource<String> lines1 = env.socketTextStream("linux01", 7777);
        //1200,c001,图书
        DataStreamSource<String> lines2 = env.socketTextStream("linux01", 8888);

        //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API

        //提取两个流的Watermark
        SingleOutputStreamOperator<String> lines1WithWatermark
                = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator<String> lines2WithWatermark
                = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        //对两个流进行处理

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream1
                = lines1WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() {

            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream2
                = lines2WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() {

            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        //将两个流leftOuterJoin
        DataStream<Tuple5<Long, String, String, Long, String>> result = tpStream1.coGroup(tpStream2)
                .where(tp1 -> tp1.f2) //第一个流keyBy的字段
                .equalTo(tp2 -> tp2.f1)//第二个流keyBy的字段
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))//划分窗口
                .apply(new CoGroupFunction<Tuple3<Long, String, String>, Tuple3<Long, String, String>, Tuple5<Long, String, String, Long, String>>() {
                    /**
                     * coGroup当窗口触发后,每个key会调用一次coGroup
                     * 三种情况会调用coGroup方法
                     * 1.第一个流和第二个流中,都有key相同的数据数据,并且在同一个窗口呢,那么coGroup方法中的两个Iterable都不为empty
                     * 2.第一个流中出现了同一个key的数据,.第二个流中没有出现相同key的数据,那么coGroup方法中的第一个Iterable不为empty,第二个为empty
                     * 3.第二个流中出现了同一个key的数据,.第一个流中没有出现相同key的数据,那么coGroup方法中的第二个Iterable不为empty,第一个为empty
                     * @param first
                     * @param second
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void coGroup(Iterable<Tuple3<Long, String, String>> first, Iterable<Tuple3<Long, String, String>> second, Collector<Tuple5<Long, String, String, Long, String>> out) throws Exception {
                        for (Tuple3<Long, String, String> left : first) {
                            //实现左外连接
                            //先循环左流的数据
                            boolean isEmpty = false;
                            for (Tuple3<Long, String, String> right : second) {
                                isEmpty = true;
                                out.collect(Tuple5.of(left.f0, left.f1, left.f2, right.f0, right.f2));
                            }
                            if (!isEmpty) {
                                out.collect(Tuple5.of(left.f0, left.f1, left.f2, null, null));
                            }
                        }
                    }
                });

        result.print();

        env.execute();
    }
}

intervalJoin

/**
 * 将两个数据流不划分窗口,按照时间范围进行join,即intervalJoin
 *
 *   以第一个流中的数据为标准进行比较时间
 *
 *   实现步骤:
 *   1.分别将两个流按照相同的条件进行KeyBy(可以保证key等值的数据一定进入到同一台机器的同一个分区中)
 *   2.将两个数据流的数据缓存到KeyedState,然后将两个流Connected到一起(可以共享状态)
 *
 */
public class EventTumblingWindowIntervalJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1000,o001,c001
        DataStreamSource<String> lines1 = env.socketTextStream("linux01", 7777);
        //1200,c001,图书
        DataStreamSource<String> lines2 = env.socketTextStream("linux01", 8888);

        //按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API

        //提取两个流的Watermark
        SingleOutputStreamOperator<String> lines1WithWatermark
                = lines1.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator<String> lines2WithWatermark
                = lines2.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {

            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        //对两个流进行处理

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream1
                = lines1WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() {

            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        SingleOutputStreamOperator<Tuple3<Long, String, String>> tpStream2
                = lines2WithWatermark.map(new MapFunction<String, Tuple3<Long, String, String>>() {

            @Override
            public Tuple3<Long, String, String> map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
            }
        });

        //将两个流join
        KeyedStream<Tuple3<Long, String, String>, String> keyedStream1 = tpStream1.keyBy(tp -> tp.f2);
        KeyedStream<Tuple3<Long, String, String>, String> keyedStream2 = tpStream2.keyBy(tp -> tp.f1);

        SingleOutputStreamOperator<Tuple5<Long, String, String, Long, String>> result = keyedStream1.intervalJoin(keyedStream2)
                .between(Time.seconds(-1), Time.seconds(1))  //指定的时间范围
                .upperBoundExclusive() //不包括上界
                .process(new ProcessJoinFunction<Tuple3<Long, String, String>, Tuple3<Long, String, String>, Tuple5<Long, String, String, Long, String>>() {
                    @Override
                    public void processElement(Tuple3<Long, String, String> left, Tuple3<Long, String, String> right, Context ctx, Collector<Tuple5<Long, String, String, Long, String>> out) throws Exception {

                        out.collect(Tuple5.of(left.f0,left.f1,left.f2,right.f0,right.f2));

                    }
                });


        result.print();

        env.execute();
    }
}

标签:join,String,fields,Flink,Tuple3,三种,new,public
来源: https://blog.csdn.net/JinVijay/article/details/123142271

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有