ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink Table Api 之聚合函数使用

2022-03-20 11:01:45  阅读:324  来源: 互联网

标签:Flink 聚合 org flink Api import apache Table


聚合函数(Aggregate Functions)

  • 用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs) 可以把一个表中的数据,聚合成一个标量值;
  • 用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的

  • AggregationFunction要求必须实现的方法:
– createAccumulator()
– accumulate()
– getValue()

  • AggregateFunction 的工作原理如下:
1、首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构; 可以通过调用 createAccumulator() 方法创建空累加器; 2、随后,对每个输入行调用函数的 accumulate() 方法来更新累加器; 3、处理完所有行后,将调用函数的 getValue() 方法来计算并返回最终结果

表聚合函数(Table Aggregate Functions)

  • 用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表;
  • 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的;

 

  • AggregationFunction 要求必须实现的方法:

– createAccumulator()

– accumulate() – emitValue()

  • TableAggregateFunction 的工作原理如下:

1、首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器。
2、随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
3、处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。

下面通过一个实际代码案例来演示看下效果

package com.congge.table.api.udf;

import com.congge.source.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;

public class TestAggregateFunction {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String path = "E:\\code-self\\flink_study\\src\\main\\resources\\sensor.txt";

        // 1. 读取数据
        DataStreamSource<String> inputStream = env.readTextFile(path);

        // 2. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 3. 将流转换成表
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");

        // 4. 自定义聚合函数,求当前传感器的平均温度值
        // table API
        AvgTemp avgTemp = new AvgTemp();

        // 需要在环境中注册UDF
        tableEnv.registerFunction("avgTemp", avgTemp);
        Table resultTable = sensorTable
                .groupBy("id")
                .aggregate( "avgTemp(temp) as avgtemp" )
                .select("id, avgtemp");

        // SQL
        tableEnv.createTemporaryView("sensor", sensorTable);
        Table resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) " +
                " from sensor group by id");

        // 打印输出
        tableEnv.toRetractStream(resultTable, Row.class).print("result");
        tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");

        env.execute();
    }

    // 实现自定义的AggregateFunction
    public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>>{
        @Override
        public Double getValue(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }

        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }

        // 必须实现一个accumulate方法,来数据之后更新状态
        public void accumulate( Tuple2<Double, Integer> accumulator, Double temp ){
            accumulator.f0 += temp;
            accumulator.f1 += 1;
        }
    }

}

本例的需求是,通过 Flink Table Api读取原始文件数据,然后通过自定义统计聚合函数,将读取到的数据进行聚合统计输出到控制台,运行上面的代码,观察输出效果

 

标签:Flink,聚合,org,flink,Api,import,apache,Table
来源: https://blog.csdn.net/congge_study/article/details/123608733

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有