ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink TransForm (二)

2022-03-03 12:32:37  阅读:103  来源: 互联网

标签:Flink flink TransForm api org apache import public


滚动聚合算子

常见的滚动聚合算子
sum,min,maxminBy,maxBy
作用 KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
参数

  1. 如果流中存储的是POJO或者scala的样例类, 参数使用字段名
  2. 如果流中存储的是元组, 参数就是位置(基于0...).
返回 KeyedStream -> SingleOutputStreamOperator

实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 水位 监控器 用于接收水位数据
 * id 传感器编号
 * ts 时间戳
 * vc 水位
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {

    public String id;
    public long ts;
    public Integer vc;
}

max

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.wdh01.bean.WaterSensor;

public class Flink02_Transform_Max {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        final String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                });
        //3、按照传感器ID分组
        KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        //4、计算最高水位线
        SingleOutputStreamOperator<WaterSensor> vc = keyByStream.max("vc");
        //5、打印
        vc.print();
        //6、执行
        env.execute();
    }
}
View Code

maxBy

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.wdh01.bean.WaterSensor;

public class Flink03_Transform_MaxBy {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998).
                map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        final String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                });
        //3、按照传感器ID分组
        KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        //4、计算最高水位线
        SingleOutputStreamOperator<WaterSensor> vc = keyByStream.maxBy("vc", false); // false 当两组值 数据一样时,不使用第一条,使用最新的数据
        //5、打印
        vc.print();
        //6、执行
        env.execute();
    }
}
View Code

reduce

作用    一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果.为什么还要把中间值也保存下来? 考虑流式数据的特点: 没有终点, 也就没有最终的概念了. 任何一个中间的聚合结果都是值! 参数 interface ReduceFunction<T> 返回 KeyedStream -> SingleOutputStreamOperator

示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.wdh01.bean.WaterSensor;

import static java.lang.Integer.max;

public class Flink04_Transform_reduce {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop103", 9998).
                map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        final String[] split = value.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    }
                });
        //3、按照传感器ID分组
        KeyedStream<WaterSensor, String> keyByStream = waterSensorDS.keyBy(new KeySelector<WaterSensor, String>() {
            @Override
            public String getKey(WaterSensor value) throws Exception {
                return value.getId();
            }
        });
        //4、计算最高水位线
        SingleOutputStreamOperator<WaterSensor> reduce = keyByStream.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                return new WaterSensor(value1.getId(), value2.getTs(), max(value1.getVc(), value2.getVc()));
            }
        });
        //5、打印
        reduce.print();
        //6、执行
        env.execute();
    }
}
View Code 说明:聚合后结果的类型, 必须和原来流中元素的类型保持一致!

process

作用  process算子在Flink算是一个比较底层的算子,很多类型的流上都可以调用,可以从流中获取更多的信息(不仅仅数据本身)

示例 

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class Flink05_Transform_process {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop103", 9998);
        //3、使用 process 实现 压平
        SingleOutputStreamOperator<String> wordDS = socketTextStream.process(new ProcessFlatMapFunction());
        //4、使用 process 实现 map
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordToOne = wordDS.process(new ProcessMapFunction());
        //5、按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = wordToOne.keyBy(d -> d.f0);
        //6、计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
        //7、打印
        sum.print();
        //8、执行
        env.execute();
    }

    public static class ProcessMapFunction extends ProcessFunction<String, Tuple2<String, Integer>> {

        @Override
        public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
            out.collect(new Tuple2<>(value, 1));
        }
    }

    public static class ProcessFlatMapFunction extends ProcessFunction<String, String> {
        @Override
        public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
            String[] words = value.split(" ");
            for (String word : words) {
                out.collect(word);
            }
            //运行时上下文
            RuntimeContext runtimeContext = getRuntimeContext();

            //定时器
            TimerService timerService = ctx.timerService();
            timerService.registerProcessingTimeTimer(1245L);
            //当前处理数据的时间、
            timerService.currentProcessingTime();
            //事件时间
            timerService.currentWatermark();
            //侧输出流   ctx.output();

        }

        //生命周期
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
        }


    }
}
View Code

数据流重新分区算子 

  • KeyBy 先按照key分组, 按照key的双重hash来选择后面的分区
  • shuffle 对流中的元素随机分区
  • reblance 对流中的元素平均分布到每个区.当处理倾斜数据的时候, 进行性能优化
  • rescale 同 rebalance一样, 也是平均循环的分布数据。但是要比rebalance更高效, 因为rescale不需要通过网络, 完全走的"管道"。

 

 

 重分区示例

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

//重分区
public class Flink06_Transform_Repartition {
    public static void main(String[] args) throws Exception {

        //1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // env.setParallelism(1);
        //2、读取端口数据 & 转换为 javaBean
        DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop103", 9998);
        //3、使用不同的重分区策略
        socketTextStream.keyBy(d -> d).print("keyBy---");
        socketTextStream.shuffle().print("shuffle---");
        socketTextStream.rebalance().print("rebalance---");
        socketTextStream.rescale().print("rescale---");
        socketTextStream.global().print("global---");
        // socketTextStream.broadcast().print("broadcast---");
        // socketTextStream.forward().print("forward---");

        //4、开启
        env.execute();
    }
}
View Code

 

 

标签:Flink,flink,TransForm,api,org,apache,import,public
来源: https://www.cnblogs.com/wdh01/p/15954542.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有