ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

关于流处理框架Flink的入门使用

2022-01-26 21:59:37  阅读:86  来源: 互联网

标签:分组 flink 聚合 入门 框架 Flink aggregateResult KafkaEntity kafkaEntity


1、什么是flink

flink是一种流处理框架,通常使用场景是消费kafka数据进行分组聚合后发送到其他系统,分组与聚合是flink的核心,在本文中仅阐述单个使用场景。流数据相当于是连续不断的数据,生产上的kafka中的日志数据就可以理解为流数据,流数据还分为有界流和无界流,有界即文本数据作为datastream这种有固定大小的数据,无界即源源不断的数据。

2、flink的界面

下图为flink的界面,在界面中可以提交代码jar包,即可实时运行处理
在这里插入图片描述
在这里插入图片描述

3、flink结合代码案例讲解使用场景

在main入口函数中定义以下方法

//获取流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //获取数据流
        DataStream<String> stringDataStreamSource = env.socketTextStream("127.0.0.1", 6666);


        //转pojo
        SingleOutputStreamOperator<KafkaEntity> map = stringDataStreamSource.map(new MapFunction<String, KafkaEntity>() {
            @Override
            public KafkaEntity map(String value) throws Exception {


                KafkaEntity kafkaEntity = new KafkaEntity();
                if (!"".equals(value)){
                    String[] splitResult = value.split("1");
                    kafkaEntity.setCityId(splitResult[0]);
                    kafkaEntity.setAppId(splitResult[1]);
                    kafkaEntity.setProcessCode(splitResult[2]);
                    kafkaEntity.setStartTime(splitResult[3].substring(0,12));
                    kafkaEntity.setErrCode(splitResult[4]);
                }
                return kafkaEntity;
            }
        });

        //分组,聚合
        SingleOutputStreamOperator<Object> applyResult = map.keyBy("processCode", "appId", "cityId", "startTime")
                .timeWindow(Time.seconds(15))//每隔15秒聚合一次
                .apply(new WindowFunction<KafkaEntity, Object, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<KafkaEntity> input, Collector<Object> out) throws Exception {
                        //调用总次数
                        KafkaEntity aggregateResult = input.iterator().next();
                        int reqAmount = IteratorUtils.toList(input.iterator()).size();


                        //成功次数
                        int successAmount = 0;
                        //总时长
                        long timeAll = 0;
                        //限流次数
                        int failAmount = 0;
                        List<KafkaEntity> list = IteratorUtils.toList(input.iterator());
                        for (int i = 0; i < list.size(); i++) {
                            KafkaEntity kafkaEntity = list.get(i);
                            timeAll += Long.parseLong(kafkaEntity.getDuration());
                            if ("0".equals(kafkaEntity.getErrCode())) {
                                successAmount += 1;
                            } else {
                                failAmount += 1;
                            }
                        }

                        //平均调用时长
                        long averageDuration = (timeAll / reqAmount);


                        //聚合结果
                        aggregateResult.setReqAmount(String.valueOf(reqAmount));
                        aggregateResult.setSuccessAmount(String.valueOf(successAmount));
                        aggregateResult.setAverageDuration(String.valueOf(averageDuration));
                        aggregateResult.setFailAmount(String.valueOf(failAmount));
                        aggregateResult.setInsertTime(new Date());
                        out.collect(aggregateResult);
                    }
                });

        applyResult.addSink(new RichSinkOperation());

        env.execute();
        

4、代码解释

4.1

首先需要获取流环境

4.2

以socket文本流代替kafka消费者,在linux中使用nc -lk 6666 启动,然后写文本发送即可模拟kafka消费者读取数据,这里也是通过第一步的流环境来获取数据流

4.3

获取到数据流后,将datastream通过map方法(这也可以当作一种算子)转为pojo类,到此,数据准备完成

4.4

SingleOutputStreamOperator也是datastream的子类,我们将获取到的pojo流通过keyby分组,分组的维度是四个,即"processCode", “appId”, “cityId”, “startTime”,只要收到的数据中有一个元素与上一个不同,即为新的一个组

4.5

分组以后通过timewindow设置窗口大小为15秒,即15秒进行一次聚合,聚合方法为下面的apply

4.6

apply方法是对15秒内收到的数据根据用户自定义来做数据处理
KafkaEntity aggregateResult = input.iterator().next();代表按那四个维度来分组得到的pojo对象,同一组中那四个属性都是一样的,在本例中由此来计算同一组的总次数即按当前维度分组后,每组的数据个数,即list的大小,重新计算后放入pojo的一个属性中,最终通过out.collect方法将计算得到的结果汇总在一个对象的几个属性中输出

4.7

applyResult为聚合后的结果,最后一步为将聚合结果输出到外部系统,这里举例为入数据库(redis或hbase都一样)

4.8

public class RichSinkOperation extends RichSinkFunction {


    @Override
    public void invoke(Object value) throws Exception {



        InputStream inputStream = Resources.getResourceAsStream("mybatis-config.xml");
        //获取工厂
        SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(inputStream);

        SqlSession sqlSession = factory.openSession();


        FlinkDao flinkDao  = sqlSession.getMapper(FlinkDao .class);

        KafkaEntity kafkaEntity = (KafkaEntity) value;



        flinkDao.insertRecord(kafkaEntity);
        

        sqlSession.commit();
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        
    }
    
    
    
}

此处集成了mybatis,该自定义类继承RichSinkFunction,主要实现invoke方法,将聚合结果的每一条进行入库处理

本例代码仅为很局限的场景使用,仅为打通整体流程,需要根据业务不同定义不同的apply处理办法,此处的sink操作中也不合理,生产中数据库连接应该放在open中并使用数据池,另外还需要考虑生产每分钟都是上亿的数据,如果开一分钟的窗口,聚合结果都在内存中内存会不会炸,聚合后一次性sink数据库操作会不会阻塞,需要压测来得到实际效果验证。

标签:分组,flink,聚合,入门,框架,Flink,aggregateResult,KafkaEntity,kafkaEntity
来源: https://blog.csdn.net/qq_40719095/article/details/122708873

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有