ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink流处理-Task之TripDriveTask

2021-11-08 01:03:05  阅读:150  来源: 互联网

标签:flink api Flink TripDriveTask streaming Task org apache import


Task之TripDriveTask

package pers.aishuang.flink.streaming.task;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.function.window.DriveSampleWindowFunction;
import pers.aishuang.flink.streaming.function.window.DriveTripWindowFunction;
import pers.aishuang.flink.streaming.sink.hbase.TripDriveToHBaseSink;
import pers.aishuang.flink.streaming.sink.hbase.TripSampleToHBaseSink;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;

/**

  • 驾驶行程采样分析 驾驶行程分析

  • 开发步骤:

  • 1、创建流执行环境

  • 2、获取Kafka中的数据

  • 3、将json字符串解析成车辆数据对象

  • 4、过滤出正确的数据并且是行程数据 chargeStatus=2 或者 chargeStatus=3

  • 0x01:停车充电 0x02:行驶充电 0x03:未充电状态 0x04:充电完成 0xFE:异常 0xFF:无效

  • 5、分配水印机制,设置最大延迟时间 30s

  • 6、超出3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟

  • 7、对车辆数据进行分组,创建会话窗口

  • 8、数据的采样分析

  • -- 应用窗口,数据的采样分析

  • -- 将分析的采样数据封装成数组,并将其保存到HBase中

  • 9、数据的行程分析

  • -- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象

  • -- 将这个对象保存到HBase中

  • 10、执行流环境任务
    */
    public class TripDriveTask extends BaseTask{
    public static void main(String[] args) {
    //1、创建流执行环境(已设置好checkpoint、重启策略)
    StreamExecutionEnvironment env = getEnv(TripDriveTask.class.getSimpleName());
    //2、获取Kafka中的数据
    DataStreamSource kafkaStream = getKafkaStream(env, "_tripDrive_consumer", SimpleStringSchema.class);
    //3、将json字符串解析成车辆数据对象
    DataStream tripDriveStream = kafkaStream.map(JsonParseUtil::parseJsonToObject)
    //4、 过滤出正确的数据并且是行程数据 chargeStatus=2或者chargeStatus=3
    .filter(obj -> StringUtils.isEmpty(obj.getErrorData()))
    .filter(obj -> (obj.getChargeStatus()2) || (obj.getChargeStatus()3));
    //5、分配水印机制,设置最大延迟时间30s
    SingleOutputStreamOperator itcastDataObjWatermark = tripDriveStream
    //分配水印机制,并指定事件时间字段
    .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(30)) {
    @Override
    public long extractTimestamp(ItcastDataObj element) {
    return element.getTerminalTimeStamp();
    }
    }
    );
    //6、超过3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟
    OutputTag maxLatestData = new OutputTag<>("maxLatestData", TypeInformation.of(ItcastDataObj.class));
    //7、对车辆数据进行分组,创建会话窗口。
    WindowedStream<ItcastDataObj, String, TimeWindow> itcastDataObjWindowStream = itcastDataObjWatermark
    //指定分组字段
    .keyBy(obj -> obj.getVin())
    //指定窗口类型为会话窗口,时间间隔是15min
    .window(EventTimeSessionWindows.withGap(Time.minutes(15L)))
    //允许延迟时间
    .allowedLateness(Time.minutes(3L))
    //侧边流输出延迟数据
    .sideOutputLateData(maxLatestData);
    //8、数据的采样分析
    //-- 应用窗口,数据的采样分析
    SingleOutputStreamOperator<String[]> sampleTripDriveStream = itcastDataObjWindowStream
    .apply(new DriveSampleWindowFunction());
    //-- 将分析的采样数据封装成数组,并将其保存到HBase中
    sampleTripDriveStream.addSink(new TripSampleToHBaseSink("TRIPDB:trip_sample"));
    //9、数据的行程分析
    //-- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象
    SingleOutputStreamOperator tripModelStream = itcastDataObjWindowStream
    .apply(new DriveTripWindowFunction());
    //-- 将这个对象保存到Hbase中
    tripModelStream.addSink(new TripDriveToHBaseSink("TRIPDB:trip_division"));
    //10、执行流环境任务
    try {
    env.execute();
    } catch (Exception e) {
    e.printStackTrace();
    }

    }
    }

标签:flink,api,Flink,TripDriveTask,streaming,Task,org,apache,import
来源: https://www.cnblogs.com/zi-shuo/p/15522492.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有