ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

第5章-实时计算-板块和K线+第6章-实时预警-FlinkCEP

2021-02-04 02:03:55  阅读:386  来源: 互联网

标签:FlinkCEP itcast 预警 org flink 实时 new apache import


复习回顾

  • 1.指数业务

    和个股类似,稍作简单修改即可

  • 2.板块业务

    注意:

    板块由个股组成,也就是说一个板块下有多个个股

    板块和个股的对应关系表:

    1612314561202

业务中需要将StockBean转为SectorBean

1612314636230

  • 3.板块秒级行情

    1612315368003

    1612315346826

个股核心业务开发-重点

个股分为如下子业务, 我们代码中已经分配好子业务包类结构

序号业务板块子业务
1 个股 秒级行情
2   分时行情
3   分时行情备份
4   个股涨跌幅
5   K线行情

1612056567391

1612056587535

个股行情-秒级行情

需求

对沪深两市的个股数据按照个股代码进行分组并进行窗口(5s滚动)计算,封装为StockBean,计算结果写入到HBase中

WhyHBase? ---HBase适合海量数据的存储(课后复习HBase相关技术点,后面业务讲完统一布置)

1612056828741

 

代码实现-核心任务类

注意:

窗口开始和结束都是根据第一条数据和设置的窗口size生成的

第一条数据2021-01-28 18:00:00 每5s一个,

那么窗口就是:

2021-01-28 18:00:00 ~ 2021-01-28 18:00:05 ,

2021-01-28 18:00:05 ~ 2021-01-28 18:00:10

依次类推

因为我们指定了用事件时间

             
//设置使用事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//老版本该API没过期,新版本过期的不需要设置
 

 

             
package cn.itcast.task;
import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.sink.HBaseSink;
import cn.itcast.function.window.StockPutHBaseWindowFunction;
import cn.itcast.function.window.StockSecondsWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
 * Author itcast
 * Desc 个股秒级行情数据业务处理核心任务类
 * 需求: 对沪深两市的个股数据按照个股代码进行分组并进行窗口(5s滚动)计算,封装为StockBean,计算结果写入到HBase中
 */
public class StockSecondsTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.窗口划分
        //.window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .timeWindow(Time.seconds(5))
        //TODO 3.窗口计算
        //.sum--只支持简单聚合
        //.reduce()--只支持简单聚合
        //.process() //里面有context--可以
        .apply(new StockSecondsWindowFunction())//里面有Window--可以
        //TODO 4.数据合并
        //上面计算完,有很多分组,但是每个分组只有1条5s内最新的数据,
        //而后面需要把数据sink到HBase,为了提高性能,应该要批量插入数据到HBase
        //所以这里需要对数据进行合并
        //把上面各个分组的数据合到一起,这个窗口里面就是5s内各个个股的最新数据,也就是我们需要的结果
        .timeWindowAll(Time.seconds(5))
        //TODO 5.数据封装
        //接下来要对窗口内的所有个股数据进行封装为List<Put> ,方便后续批量插入到HBase
        .apply(new StockPutHBaseWindowFunction())
        //TODO 6.Sink到HBase
        .addSink(new HBaseSink(QuotConfig.STOCK_HBASE_TABLE_NAME));//"quot_stock"
    }
}
 

 

代码实现-窗口计算函数/类

1612058229355

             
package cn.itcast.function.window;
import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
 * Author itcast
 * Desc 把当前窗口(5s一个)中的CleanBean数据封装为StockBean返回
 * 简单来说就是把当前窗口中5s内的所有个股最新的CleanBean封装为StockBean并返回
 */
public class StockSecondsWindowFunction implements WindowFunction<CleanBean, StockBean, String, TimeWindow> {
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<StockBean> collector) throws Exception {
        //注意:
        // 当前5s窗口内有很多CleanBean,而要哪一个CleanBean的字段作为StockBean的字段呢?
        // 当前窗口内最新的CleanBean--就是eventTime最大的CleanBean
        //1.记录最新的CleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {//进来的cleanBean
            //先假定第一个为最新的newCleanBean
            if (newCleanBean == null){
                newCleanBean = cleanBean;
            }
            //后面的每个进来的数据都要和暂时认为是最新的newCleanBean进行比较
            //如果当前的EventTime> 暂时的newCleanBean的EventTime,则当前这个为新的newCleanBean
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean  = cleanBean;
            }
        }
        //循环结束newCleanBean中存储的就是当前最新的CleanBean
        //2.设置数据
        //对数据进行格式化:
        //1612058392042--long时间戳
        //20210131095952--格式化之后的long格式
        Long tradTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), DateFormatConstant.format_YYYYMMDDHHMMSS);
        
        StockBean stockBean = new StockBean();
        stockBean.setEventTime(newCleanBean.getEventTime());
        stockBean.setSecCode(newCleanBean.getSecCode());
        stockBean.setSecName(newCleanBean.getSecName());
        stockBean.setPreClosePrice(newCleanBean.getPreClosePx());
        stockBean.setOpenPrice(newCleanBean.getOpenPrice());
        stockBean.setHighPrice(newCleanBean.getMaxPrice());
        stockBean.setLowPrice(newCleanBean.getMinPrice());
        stockBean.setClosePrice(newCleanBean.getTradePrice());
        stockBean.setTradeVol(0l);//秒级行情不需要计算分时成交数据
        stockBean.setTradeAmt(0l);//秒级行情不需要计算分时成交数据
        stockBean.setTradeVolDay(newCleanBean.getTradeVolumn());
        stockBean.setTradeAmtDay(newCleanBean.getTradeAmt());
        stockBean.setTradeTime(tradTime);
        stockBean.setSource(newCleanBean.getSource());
        
        //3.返回数据
        collector.collect(stockBean);
    }
}
 

 

代码实现-数据合并

             
 //TODO 4.数据合并
        //上面计算完,有很多分组,但是每个分组只有1条5s内最新的数据,
        //而后面需要把数据sink到HBase,为了提高性能,应该要批量插入数据到HBase
        //所以这里需要对数据进行合并
        //把上面各个分组的数据合到一起,这个窗口里面就是5s内各个个股的最新数据,也就是我们需要的结果
        .timeWindowAll(Time.seconds(5))
 

 

代码实现-数据封装

             
 //TODO 5.数据封装
        //接下来要对窗口内的所有个股数据进行封装为List<Put> ,方便后续批量插入到HBase
        .apply(new StockPutHBaseWindowFunction())
 

 

             
package cn.itcast.function.window;
import cn.itcast.bean.StockBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Put;
import java.util.ArrayList;
import java.util.List;
/**
 * Author itcast
 * Desc 将当前窗口内的所有StockBean封装为List<Put>并返回
 */
public class StockPutHBaseWindowFunction implements AllWindowFunction<StockBean, List<Put>, TimeWindow> {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<StockBean> iterable, Collector<List<Put>> collector) throws Exception {
        //1.准备待返回List<Put>
        List<Put> list = new ArrayList<>();
        //2.封装Put并放入list
        for (StockBean stockBean : iterable) {
            String rowkey = stockBean.getSecCode() + stockBean.getTradeTime(); //20210128180000
            String stockBeanJsonStr = JSON.toJSONString(stockBean);
            //Put表示放入到HBase中的每一条数据
            Put put = new Put(rowkey.getBytes());//创建Put并设置rowKey
            //指定Put的列族为info,列名为data,数据为stockBean的json字符串形式
            put.addColumn("info".getBytes(),"data".getBytes(),stockBeanJsonStr.getBytes());
            //将put放入list
            list.add(put);
        }
        //整个for循环走完,当前5s窗口内的各个个股的最新数据stockBean就已经封装到List<Put>中了
        //3.返回List<Put>
        collector.collect(list);
    }
}
 

 

代码实现-数据sink到HBase

             
 //TODO 6.Sink到HBase
        .addSink(new HBaseSink(QuotConfig.STOCK_HBASE_TABLE_NAME));//"quot_stock"
 

 

             
package cn.itcast.function.sink;
import cn.itcast.util.HBaseUtil;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.hbase.client.Put;
import java.util.List;
/**
 * Author itcast
 * Desc
 */
public class HBaseSink implements SinkFunction<List<Put>> {
    private String tableName;
    public HBaseSink(String tableName) {
        this.tableName = tableName;
    }
    @Override
    public void invoke(List<Put> value, Context context) throws Exception {
         System.out.println("正在调用工具类把数据批量插入到HBase");
        //调用工具类把数据存入HBase表中
        HBaseUtil.putList(tableName,value);
    }
}
 

 

 

测试

1.hdfs、zk和kafka启动

 

2.启动hbase

cd /export/servers/hbase-1.1.1/ bin/start-hbase.sh

 

3.进入hbase命令行

bin/hbase shell

 

4.删除之前的表

list
disable "quot_index"
disable "quot_sector"
disable "quot_stock"
drop "quot_index"
drop "quot_sector"
drop "quot_stock"                                                                                                                             
                                                                                                        

 

disable “”

5.启动程序

1612061815820

 

4.观察hbase

list
scan "quot_stock",{LIMIT=>10}

1612061925434

 

个股行情-分时行情/分级行情

 

需求

回顾:Druid典型应用场景:

Kafka的sse和szse主题--->Flink处理--->Kafka的stock-sse和stock-szse主题--->Druid摄取-->实时查询

1612062893534

 

按照上面的流程:

按照个股代码进行分组 每隔1min/60s划分一个窗口对数据进行处理并将结果写入到Kafka的stock-sse和stock-szse主题主题,最终由Druid进行摄取并查询

1612062831573

 

代码实现-核心任务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 个股分时/分级行情数据业务处理核心任务类
 * 需求: 按照个股代码进行分组 每隔1min/60s划分一个窗口对数据进行处理并将结果写入到Kafka的stock-sse和stock-szse主题主题,最终由Druid进行摄取并查询
 */
public class StockMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.窗口划分
        .timeWindow(Time.minutes(1))//60s
        //TODO 3.窗口计算CleanBean-->StockBean
        .apply(new StockMinutesWindowFunction())
        //TODO 4.数据分流--因为main中对沪深两市数据进行了合并,这里后续又要把沪深两市数据发到不同主题
        .process(new ProcessFunction<StockBean,StockBean>(){
            @Override
            public void processElement(StockBean stockBean, Context context, Collector<StockBean> collector) throws Exception {

            }
        });
        //TODO 5.数据sink到Kafka
        
    }
}

 

代码实现-窗口计算函数/类

1612065273113

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 分时/分级行情数据处理窗口函数/类
 * 在该窗口里面需要对最近1min/60s的数据进行计算,要统计出这1分钟,该个股的分级成交量和分级成交金额,也就这1分钟内该股票的成交量和成交金额
 * 当前窗口这1分钟该股票的成交量或成交金额 = 这一窗口该股票最新的日总成交量或成交金额 - 上一窗口该股票最新的日总成交量或成交金额
 * 所以应该要搞个东西保存下上一个窗口的数据,用状态!
 *
 * interface WindowFunction<IN, OUT, KEY, W extends Window>
 * abstract class RichWindowFunction<IN, OUT, KEY, W extends Window>  这里需要用到rich里面的open等方法,所以需要使用RichWindowFunction
 */
public class StockMinutesWindowFunction extends RichWindowFunction<CleanBean, StockBean, String, TimeWindow> {
    //1.准备一个MapState用来存放上一个窗口的StockBean数据
    //MapState<个股代码,上一个窗口的StockBean>
    private MapState<String,StockBean> stockState = null;

    //2.初始化State
    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, StockBean> stateDescriptor = new MapStateDescriptor<>("stockState", String.class, StockBean.class);
        stockState = getRuntimeContext().getMapState(stateDescriptor);
    }

    //3.计算当前窗口这1分钟该股票的成交量或成交金额 = 这一窗口该股票最新的日总成交量或成交金额 - 上一窗口该股票最新的日总成交量或成交金额
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<StockBean> collector) throws Exception {
        //3.1记录当前窗口最新的CleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if(newCleanBean == null){
                newCleanBean = cleanBean;
            }
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean = cleanBean;
            }
        }

        //3.2获取状态中上一窗口的StockBean
        StockBean lastStockBean = stockState.get(newCleanBean.getSecCode());//根据key状态中的value
        Long minutesVol = 0L;//分时成交量
        Long minutesAmt = 0L;//分时成交金额
        if(lastStockBean!=null){
            //3.3获取上一个窗口的日总成交量和成交金额
            //当前窗口这1分钟该股票的成交量或成交金额 = 这一窗口该股票最新的日总成交量或成交金额 - 上一窗口该股票最新的日总成交量或成交金额
            Long tradeVolDay = lastStockBean.getTradeVolDay();//上一个窗口的/上分钟的日总成交量
            Long tradeAmtDay = lastStockBean.getTradeAmtDay();//上一个窗口的/上分钟的日总成交金额

            ///3.4获取当前窗口最新的日总成交量和成交金额
            Long tradeVolumn = newCleanBean.getTradeVolumn();
            Long tradeAmt = newCleanBean.getTradeAmt();

            //3.5计算当前窗口这1分钟该股票的成交量或成交金额
            //当前窗口这1分钟该股票的成交量或成交金额 = 这一窗口该股票最新的日总成交量或成交金额 - 上一窗口该股票最新的日总成交量或成交金额
            minutesVol = tradeVolumn - tradeVolDay;
            minutesAmt = tradeAmt - tradeAmtDay;
        }else{
            minutesVol = newCleanBean.getTradeVolumn();
            minutesAmt = newCleanBean.getTradeAmt();
        }

        //3.6封装数据
        StockBean stockBean = new StockBean();
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), DateFormatConstant.format_YYYYMMDDHHMMSS);
        stockBean.setEventTime(newCleanBean.getEventTime());
        stockBean.setSecCode(newCleanBean.getSecCode());
        stockBean.setSecName(newCleanBean.getSecName());
        stockBean.setPreClosePrice(newCleanBean.getPreClosePx());
        stockBean.setOpenPrice(newCleanBean.getOpenPrice());
        stockBean.setHighPrice(newCleanBean.getMaxPrice());
        stockBean.setLowPrice(newCleanBean.getMinPrice());
        stockBean.setClosePrice(newCleanBean.getTradePrice());

        stockBean.setTradeVol(minutesVol);//分时行情需要计算的分时成交量/当前窗口这1分钟该股票的成交量
        stockBean.setTradeAmt(minutesAmt);//分时行情需要计算的分时成交金额/当前窗口这1分钟该股票的成交金额

        stockBean.setTradeVolDay(newCleanBean.getTradeVolumn());
        stockBean.setTradeAmtDay(newCleanBean.getTradeAmt());
        stockBean.setTradeTime(tradeTime);
        stockBean.setSource(newCleanBean.getSource());

        //3.7返回结果
        collector.collect(stockBean);

        //3.8更新State
        stockState.put(stockBean.getSecCode(),stockBean);
    }

}

 

代码实现-分流+Sink

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Properties;

/**
 * Author itcast
 * Desc 个股分时/分级行情数据业务处理核心任务类
 * 需求: 按照个股代码进行分组 每隔1min/60s划分一个窗口对数据进行处理并将结果写入到Kafka的stock-sse和stock-szse主题主题,最终由Druid进行摄取并查询
 */
public class StockMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准备侧道输出流用来存放分流后的结果
        OutputTag<StockBean> sseOutputTag = new OutputTag<>("sse", TypeInformation.of(StockBean.class));
        OutputTag<StockBean> szseOutputTag = new OutputTag<>("szse", TypeInformation.of(StockBean.class));


        SingleOutputStreamOperator<StockBean> processDS =
                //TODO 1.分组
                watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))//60s
                //TODO 3.窗口计算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction())
                //TODO 4.数据分流--因为main中对沪深两市数据进行了合并,这里后续又要把沪深两市数据发到不同主题
                .process(new ProcessFunction<StockBean, StockBean>() {
                    @Override
                    public void processElement(StockBean stockBean, Context context, Collector<StockBean> collector) throws Exception {
                        if (stockBean.getSource().equals("sse")) {//sse沪市
                            context.output(sseOutputTag, stockBean);
                        } else {//szse深市
                            context.output(szseOutputTag, stockBean);
                        }
                    }
                });
        //获取分流结果
        DataStream<StockBean> sseDS = processDS.getSideOutput(sseOutputTag);
        DataStream<StockBean> szseDS = processDS.getSideOutput(szseOutputTag);

        //TODO 5.数据sink到Kafka
        //数据sink到Kafka后需要被Driud摄取,而Druid摄取Kafka的数据并解析,需要数据是json格式!
        //所以先把数据变为json
        SingleOutputStreamOperator<String> sseJsonDS = sseDS.map(new MapFunction<StockBean, String>() {
            @Override
            public String map(StockBean stockBean) throws Exception {
                return JSON.toJSONString(stockBean);
            }
        });
        SingleOutputStreamOperator<String> szseJsonDS = szseDS.map(new MapFunction<StockBean, String>() {
            @Override
            public String map(StockBean stockBean) throws Exception {
                return JSON.toJSONString(stockBean);
            }
        });
        //sink到kafka的不同主题
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);

        FlinkKafkaProducer011<String> ssekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_STOCK_TOPIC, new SimpleStringSchema(), props);//FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
        FlinkKafkaProducer011<String> szsekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SZSE_STOCK_TOPIC, new SimpleStringSchema(), props);
        
        System.out.println("沪深两市数据将要写入Kafka的stock-sse和stock-szse主题");
        sseJsonDS.addSink(ssekafkaSink);
        szseJsonDS.addSink(szsekafkaSink);

    }
}

 

测试

1.启动zk/kafka/Druid

2.准备主题

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic stock-sse
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic stock-szse

3.创建Druid数据源,让Druid实时摄取stock-sse和stock-szse

可以通过webUI配置也可以直接使用postman中配置好的

1612076307417

 

1612076349896

检查WebUI发现Druid正在实时摄取Kafka数据

http://node01:8090/console.html

1612076409888

 

4.启动程序

 

5.观察Kafka中是否有数据

1612076682822

如果乱码,进行如下设置

1612078753378

6.查询Druid中是否有数据(需要等待Druid反应过来或running摄取重置一下或Druid重启一下)

1612077589525

1612077613529

 

 

个股行情-分级行情备份

需求

1612078952955

 

代码实现-核心业务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.map.StockPutHDFSMapFunction;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

/**
 * Author itcast
 * Desc 个股分时/分级行情数据业务处理核心任务类--分时行情数据备份到HDFS,前面的计算和分时/分级行情一样算即可
 */
public class StockMinutesBackupTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准备HDFS-Sink
        //之前学习的新版本可以用StreamingFileSink或FileSink,但需要注意支持的hadoop版本为2.7+,而我们项目中用的cdh5.14是2.6
        //所以这里不能使用之前学习的Flink新特性中的Sink,得用老的BucketingSink
        //path为: stock.sec.hdfs.path=hdfs://192.168.52.100:8020/quot_data/dev/stock/
        BucketingSink<String> bucketingSink = new BucketingSink<>(QuotConfig.STOCK_SEC_HDFS_PATH);
        //文件大小: hdfs.batch=1073741824
        bucketingSink.setBatchSize(Long.parseLong(QuotConfig.HDFS_BATCH));
        //分桶/分文件夹策略:hdfs.bucketer=yyyyMMdd
        bucketingSink.setBucketer(new DateTimeBucketer(QuotConfig.HDFS_BUCKETER));

        //前缀--不设置会有默认的
        bucketingSink.setInProgressPrefix("stock_");
        bucketingSink.setPendingPrefix("stock2_");//挂起状态的前缀
        //后缀--不设置会有默认的
        bucketingSink.setInProgressSuffix(".txt");
        bucketingSink.setPendingSuffix(".txt");

           /*
        https://blog.csdn.net/kisimple/article/details/83998238
        文件名格式为{状态_prefix}{part_prefix}-{parallel_task_index}-{count}{part_suffix}{状态_suffix};
        in-progress,正在写入。
        pending,等待Checkpoint。
        finished,Checkpoint-ok,写入成功
        */

        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))//60s
                //TODO 3.窗口计算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction())
                //TODO 4.数据转换:将StockBean转为|分割的普通文本
                .map(new StockPutHDFSMapFunction())
                //TODO 5.Sink到HDFS方便后续使用其他技术做离线分析,如SparkSQL/HiveSQL(格式使用普通文本,字段用|分割)
                .addSink(bucketingSink);

    }
}

 

 

 

代码实现-数据转换拼接

package cn.itcast.function.map;

import cn.itcast.bean.StockBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.MapFunction;

import java.sql.Timestamp;

/**
 * Author itcast
 * Desc 将StockBean转为|分割的普通文本
 */
public class StockPutHDFSMapFunction implements MapFunction<StockBean, String> {
    @Override
    public String map(StockBean stockBean) throws Exception {
        //获取分隔符
        String seperator = QuotConfig.HDFS_SEPERATOR;//就是|

        //获取交易日期
        String tradeDate = DateUtil.longTimestamp2String(stockBean.getEventTime(), DateFormatConstant.format_yyyy_mm_dd);

        //字段拼接
        //顺序:
        //Timestamp|date|secCode|secName|preClosePrice|openPirce|highPrice|
        //lowPrice|closePrice|tradeVol|tradeAmt|tradeVolDay|tradeAmtDay|source
        StringBuilder sb = new StringBuilder();

        sb.append(new Timestamp(stockBean.getEventTime())).append(seperator)
                .append(tradeDate).append(seperator)
                .append(stockBean.getSecCode()).append(seperator)
                .append(stockBean.getSecName()).append(seperator)
                .append(stockBean.getPreClosePrice()).append(seperator)
                .append(stockBean.getOpenPrice()).append(seperator)
                .append(stockBean.getHighPrice()).append(seperator)
                .append(stockBean.getLowPrice()).append(seperator)
                .append(stockBean.getClosePrice()).append(seperator)
                .append(stockBean.getTradeVol()).append(seperator)
                .append(stockBean.getTradeAmt()).append(seperator)
                .append(stockBean.getTradeVolDay()).append(seperator)
                .append(stockBean.getTradeAmtDay()).append(seperator)
                .append(stockBean.getSource());

        return sb.toString();
    }
}

 

测试

0.启动zk和kafka

1.把HDFS之前的数据清掉

hadoop fs -rmr /quot_data/dev/*

2.启动程序

3.观察HDFS数据

http://node01:50070/explorer.html#/quot_data/dev/stock/20210131

 

 

个股行情-个股涨跌/涨跌幅/振幅

需求

和之前的分时行情类似, 对实时行情数据按照code分组后,每隔60s/1min,计算个股的涨跌幅,并将数据发送的Kafka,最后由Druid摄取,提供实时分析

1612082008793

 

代码实现-核心业务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockIncreaseBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.StockMinutesIncreaseWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * Author itcast
 * Desc 个股分时/分级行情数据业务处理核心任务类--个股涨跌幅
 * 需求:和之前的分时行情类似, 对实时行情数据按照code分组后,每隔60s/1min,计算个股的涨跌幅,并将数据发送的Kafka(一个主题/两个主题都行),最后由Druid摄取,提供实时分析
 */
public class StockIncreaseTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准备KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //#个股分时涨跌幅: stock.increase.topic=stock-increase
        FlinkKafkaProducer011<String> kafkaSink = new FlinkKafkaProducer011<>(QuotConfig.STOCK_INCREASE_TOPIC, new SimpleStringSchema(), props);
        
        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.划分窗口
        .timeWindow(Time.minutes(1))
        //TODO 3.窗口计算-CleanBean转了StockIncreaseBean(里面封装了涨跌/涨跌幅/振幅)
        .apply(new StockMinutesIncreaseWindowFunction())
        //TODO 4.数据转为Json
        .map(new MapFunction<StockIncreaseBean, String>() {
            @Override
            public String map(StockIncreaseBean stockIncreaseBean) throws Exception {
                return JSON.toJSONString(stockIncreaseBean);
            }
        })
        //TODO 5.Sink到Kafka
        .addSink(kafkaSink);
    }
}


 

代码实现-窗口函数/类-完成涨跌幅计算

1612082525962

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockIncreaseBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.math.RoundingMode;

/**
 * Author itcast
 * Desc 需求:计算当前窗口/每1min的数据的涨跌幅并封装为StockIncreaseBean返回
 */
public class StockMinutesIncreaseWindowFunction implements WindowFunction<CleanBean, StockIncreaseBean, String, TimeWindow> {
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<StockIncreaseBean> collector) throws Exception {
        //1.记录最新的数据
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if(newCleanBean == null){
                newCleanBean = cleanBean;
            }
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean = cleanBean;
            }
        }
        //2.计算涨跌幅/涨跌/振幅--业务(按照文档/产品经理/领导的要求来)
        //涨跌 = 当前价(最新价) - 前收盘价 = newCleanBean.getTradePrice() - newCleanBean.getPreClosePx()
        BigDecimal  updown = newCleanBean.getTradePrice().subtract(newCleanBean.getPreClosePx());
        //涨跌幅 = (当前价-前收盘价)/ 前收盘价 * 100% = (newCleanBean.getTradePrice()- newCleanBean.getPreClosePx()) / newCleanBean.getPreClosePx();
        BigDecimal  increase = newCleanBean.getTradePrice().subtract( newCleanBean.getPreClosePx()).divide(newCleanBean.getPreClosePx(),2, RoundingMode.HALF_UP);
        //振幅 = (当日最高点的价格-当日最低点的价格)/前收盘价 ×100%
        BigDecimal amplitude = newCleanBean.getMaxPrice().subtract(newCleanBean.getMinPrice()).divide(newCleanBean.getPreClosePx(),2, RoundingMode.HALF_UP);

        //3.数据封装
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), DateFormatConstant.format_YYYYMMDDHHMMSS);
        StockIncreaseBean stockIncreaseBean = new StockIncreaseBean(
                newCleanBean.getEventTime(),
                newCleanBean.getSecCode(),
                newCleanBean.getSecName(),
                increase,
                newCleanBean.getTradePrice(),
                updown,
                newCleanBean.getTradeVolumn(),
                amplitude,
                newCleanBean.getPreClosePx(),
                newCleanBean.getTradeAmt(),
                tradeTime,
                newCleanBean.getSource()
        );

        //4.返回结果
        collector.collect(stockIncreaseBean);
    }
}


测试

0.启动zk/kafka/druid

1.准备kafka主题stock-increase(也可以不创建,让自动生成)

2.使用druid摄取kafka主题stock-increase

1612084239791

3.观察druid

http://node01:8090/console.html

1612084303121

 

4.启动程序

 

5.观察kafka

1612084451833

6.查询druid(需要等待Druid反应过来或running摄取重置一下或Druid重启一下)

1612084573166

1612084540418

 

指数核心业务开发-重点

程序入口类

把之前的个股的程序入口类拿过来里面的个股改成指数即可

课后不熟悉的同学需要再敲一遍

package cn.itcast.app;

import cn.itcast.avro.AvroDeserializeSchema;
import cn.itcast.avro.SseAvro;
import cn.itcast.avro.SzseAvro;
import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.util.QuotUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Properties;

/**
 * Author itcast
 * Desc 指数实时行情数据处理业务入口类
 */
public class IndexStreamApplication {
    public static void main(String[] args) throws Exception {
        //TODO 0.env-流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置环境参数
        //-学习测试为了方便观察可以设置并行度为1
        env.setParallelism(1);
        //设置使用事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//老版本该API没过期,新版本过期的不需要设置
        //注意:Checkpoint是Flink容错机制,上线是开启,开发测试可以注掉!
        //-Checkpoint
        /*env.enableCheckpointing(5000);//开启ckp
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/checkpoint"));
        }
        //===========类型2:建议参数===========
        //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
        //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
        //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是  false不是
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
        //env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
        //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //===========类型3:直接使用默认的即可===============
        //设置checkpoint的执行模式为EXACTLY_ONCE(默认)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
        //设置同一时间有多少个checkpoint可以同时执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
        //固定延迟重启--开发中常用
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 最多重启3次数
                Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
        ));
        //上面的设置表示:如果job失败,重启3次, 每次间隔5s*/

        //TODO 1.source-kafka的主题sse沪市和szse深市
        //准备kafka参数
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);//集群地址192.168.52.100:9092
        props.setProperty("group.id", QuotConfig.GROUP_ID);//消费者组名称
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//动态分区检测,开一个后台线程每隔5s检查Kafka的分区状态
        //props.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)
        //props.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔
        //props.setProperty("auto.offset.reset","latest");//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费

        //注意:从两市消费的数据是经过Avro序列化之后的二进制数据
        //那么消费时就不能使用简单的SimpleStringSchema反序列化,而应该进行自定义的反序列化
        //sse沪市
        FlinkKafkaConsumer011<SseAvro> sseKafkaSource = new FlinkKafkaConsumer011<>(QuotConfig.SSE_TOPIC, new AvroDeserializeSchema(QuotConfig.SSE_TOPIC), props);//注意新版本直接new FlinkKafkaConsumer,老版本需要指定kafka的版本
        //szse深市
        FlinkKafkaConsumer011<SzseAvro> szseKafkaSource = new FlinkKafkaConsumer011<>(QuotConfig.SZSE_TOPIC, new AvroDeserializeSchema(QuotConfig.SZSE_TOPIC), props);

        //sseKafkaSource.setCommitOffsetsOnCheckpoints(true);//默认就是true
        //szseKafkaSource.setCommitOffsetsOnCheckpoints(true);//默认就是true

        //问题:那么如果Flink从Kafka消费消息,记录了偏移量,那么再次启动就会从偏移量位置开始消费!那么每次启动Flink测试都要去发数据给Kafka!很麻烦!
        //那怎么办?--用props.setProperty("auto.offset.reset","earliest");不可以,因为它只有第一次生效,后面不生效
        //所以应该告诉Flink不要管auto.offset.reset的值和记录的offset,直接每次从最早的数据开始消费,便于开发测试
        sseKafkaSource.setStartFromEarliest();
        szseKafkaSource.setStartFromEarliest();

        DataStreamSource<SseAvro> sseAvroDS = env.addSource(sseKafkaSource);
        DataStreamSource<SzseAvro> szseAvroDS = env.addSource(szseKafkaSource);

        //sseAvroDS.print("沪市>>>");
        //szseAvroDS.print("深市>>>");

        //TODO 2.transformation--预处理:过滤/转换/合并....
        //需要检查数据正常和时间是在正常交易时间内
        //过滤出沪深两市的合法数据/过滤掉非法数据
        SingleOutputStreamOperator<SseAvro> sseAvroFiltedDS = sseAvroDS.filter(new FilterFunction<SseAvro>() {
            @Override
            public boolean filter(SseAvro sseAvro) throws Exception {
                //虽然是下面这样调用的,但其实对时间的判断是根据配置文件中的配置open.time=00:00和close.time=23:59来的,也就是说数据时间都合法,也是为了方便测试
                if (QuotUtil.checkData(sseAvro) && QuotUtil.checkTime(sseAvro)) {//合法
                    return true;
                } else {//非法
                    return false;
                }
            }
        });
        SingleOutputStreamOperator<SzseAvro> szseAvroFiltedDS = szseAvroDS.filter(new FilterFunction<SzseAvro>() {
            @Override
            public boolean filter(SzseAvro szseAvro) throws Exception {
                return QuotUtil.checkData(szseAvro) && QuotUtil.checkTime(szseAvro);
            }
        });

        //sseAvroFiltedDS.print("沪市过滤后的合法数据>>");
        //szseAvroFiltedDS.print("深市过滤后的合法数据>>");


        //代码走到这里说明数据都是过滤后的合法的"冰清玉洁"的数据!就应该将里面的重新指数数据封装为CleanBean
        //将沪深两市里面的AvroBean--->CleanBean
        SingleOutputStreamOperator<CleanBean> sseCleanBeanDS = sseAvroFiltedDS.map(new MapFunction<SseAvro, CleanBean>() {
            @Override
            public CleanBean map(SseAvro sseAvro) throws Exception {
                CleanBean cleanBean = new CleanBean();
                cleanBean.setMdStreamId(sseAvro.getMdStreamID().toString());
                cleanBean.setSecCode(sseAvro.getSecurityID().toString());
                cleanBean.setSecName(sseAvro.getSymbol().toString());
                cleanBean.setTradeVolumn(sseAvro.getTradeVolume());
                cleanBean.setTradeAmt(sseAvro.getTotalValueTraded());
                cleanBean.setPreClosePx(new BigDecimal(sseAvro.getPreClosePx()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setOpenPrice(new BigDecimal(sseAvro.getOpenPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMaxPrice(new BigDecimal(sseAvro.getHighPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMinPrice(new BigDecimal(sseAvro.getLowPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setTradePrice(new BigDecimal(sseAvro.getTradePrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setEventTime(sseAvro.getTimestamp());
                cleanBean.setSource("sse");
                return cleanBean;
            }
        });

        SingleOutputStreamOperator<CleanBean> szseCleanBeanDS = szseAvroFiltedDS.map(new MapFunction<SzseAvro, CleanBean>() {
            @Override
            public CleanBean map(SzseAvro szseAvro) throws Exception {
                CleanBean cleanBean = new CleanBean(
                        szseAvro.getMdStreamID().toString(),
                        szseAvro.getSecurityID().toString(),
                        szseAvro.getSymbol().toString(),
                        szseAvro.getTradeVolume(),
                        szseAvro.getTotalValueTraded(),
                        BigDecimal.valueOf(szseAvro.getPreClosePx()),
                        BigDecimal.valueOf(szseAvro.getOpenPrice()),
                        BigDecimal.valueOf(szseAvro.getHighPrice()),
                        BigDecimal.valueOf(szseAvro.getLowPrice()),
                        BigDecimal.valueOf(szseAvro.getTradePrice()),
                        szseAvro.getTimestamp(),
                        "szse"
                );
                return cleanBean;
            }
        });


        //现将沪深两市的CleanBean做一个合并,然后过滤出里面的指数
        DataStream<CleanBean> unionDS = sseCleanBeanDS.union(szseCleanBeanDS);

        //现在手里拿到的就是沪深两市过滤清理后的合法的指数数据
        SingleOutputStreamOperator<CleanBean> indexDS = unionDS.filter(new FilterFunction<CleanBean>() {
            @Override
            public boolean filter(CleanBean cleanBean) throws Exception {
                return QuotUtil.isIndex(cleanBean);
            }
        });

        //indexDS.print("合并后的沪深两市的指数的CleanBean>>");


        //TODO 3.transformation--Watermark
        //开发测试时Watermark可以要可以不要
        //注意:老版本WatermarkAPI和新版本的不一样
        //Watermark = 当前最大的事件时间 - 最大允许的乱序度
        //Watermark的作用: 触发窗口计算!
        //Watermark如何触发: Watermark >= 窗口结束时间时触发计算 (该窗口里面得有数据)
        env.getConfig().setAutoWatermarkInterval(200);//默认就是200ms,表示每隔多久去给数据添加一次Watermaker,但在新版本中该API已过期
        SingleOutputStreamOperator<CleanBean> watermarkDS = indexDS.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<CleanBean>(Time.seconds(2)) {//设置最大乱序度为2s
                    @Override
                    public long extractTimestamp(CleanBean cleanBean) {
                        return cleanBean.getEventTime();//指定事件时间列
                    }
                });
        watermarkDS.print("指数:watermarkDS>>");

        //TODO 4.指数核心业务+Sink....
        //TODO 指数核心业务1.秒级行情/每5s计算最新的指数价格行情数据
        //new IndexSecondsTask().process(watermarkDS);
        //TODO 指数核心业务2.分级行情/每1min计算最新的指数价格行情数据
        //new IndexMinutesTask().process(watermarkDS);
        //TODO 指数核心业务3.分级行情备份/每1min备份最新的指数价格行情数据
        //new IndexMinutesBackupTask().process(watermarkDS);
        //TODO 指数核心业务4.指数涨跌幅
        //new IndexIncreaseTask().process(watermarkDS);//课后作业
        //TODO 指数核心业务5.K线行情/日K,周K,月K---后面单独做
        //new IndexKlineTask().process(watermarkDS);

        //TODO 5.execute
        env.execute();
    }
}

1612143770333

1612143747789

指数行情-秒级行情

需求

对沪深两市的指数数据按照指数代码进行分组并进行窗口(5s滚动)计算,封装为IndexBean,计算结果写入到HBase中

注意:把个股换成指数即可

![1612056828741](../../../../授课/202009-47/今日指数-day06/笔记/笔记-day06.assets/1612056828741.png)

代码实现-核心任务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.sink.HBaseSink;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc 指数秒级行情数据业务处理核心任务类
 * 需求: 对沪深两市的指数数据按照指数代码进行分组并进行窗口(5s滚动)计算,封装为IndexBean,计算结果写入到HBase中
 */
public class IndexSecondsTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.窗口划分
        .timeWindow(Time.seconds(5))
        //TODO 3.窗口计算:CleanBean-->IndexBean
        .apply(new IndexSecondsWindowFunction())
        //TODO 4.数据合并
        .timeWindowAll(Time.seconds(5))
        //TODO 5.封装为List<Put>
        .apply(new IndexPutHBaseWindowFunction())
        //TODO 6.批量Sink到HBase:index.hbase.table.name=quot_index
        .addSink(new HBaseSink(QuotConfig.INDEX_HBASE_TABLE_NAME));
    }
}

 

代码实现-窗口计算函数/类

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.util.DateUtil;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 把当前5s窗口内的最新的CleanBean转为IndexBean返回
 */
public class IndexSecondsWindowFunction implements WindowFunction<CleanBean, IndexBean, String, TimeWindow> {
    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<IndexBean> collector) throws Exception {
        //1.记录最新的CleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if (newCleanBean == null){
                newCleanBean = cleanBean;
            }
            if(cleanBean.getEventTime() > newCleanBean.getEventTime()){
                newCleanBean  = cleanBean;
            }
        }
        //2.数据封装
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), "yyyyMMddHHmmss");
        IndexBean indexBean = new IndexBean(
                newCleanBean.getEventTime(),
                newCleanBean.getSecCode(),
                newCleanBean.getSecName(),
                newCleanBean.getPreClosePx(),
                newCleanBean.getOpenPrice(),
                newCleanBean.getMaxPrice(),
                newCleanBean.getMinPrice(),
                newCleanBean.getTradePrice(),
                0l,0l,
                newCleanBean.getTradeVolumn(),
                newCleanBean.getTradeAmt(),
                tradeTime,
                newCleanBean.getSource()
        );

        //3.返回结果
        collector.collect(indexBean);
    }
}

 

代码实现-数据合并

 //TODO 4.数据合并
        .timeWindowAll(Time.seconds(5))

 

代码实现-数据封装

package cn.itcast.function.window;

import cn.itcast.bean.IndexBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Put;

import java.util.ArrayList;
import java.util.List;

/**
 * Author itcast
 * Desc 将当前窗口内的数据封装为List<Put>方便后续使用HBase批量插入方法
 */
public class IndexPutHBaseWindowFunction implements AllWindowFunction<IndexBean, List<Put>, TimeWindow> {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<IndexBean> iterable, Collector<List<Put>> collector) throws Exception {
        List<Put> list = new ArrayList<>();
        for (IndexBean indexBean : iterable) {
            String rowKey = indexBean.getIndexCode() + indexBean.getTradeTime();
            Put put = new Put(rowKey.getBytes());
            String jsonStr = JSON.toJSONString(indexBean);
            put.addColumn("info".getBytes(),"data".getBytes(),jsonStr.getBytes());
            list.add(put);
        }
        collector.collect(list);
    }
}

 

代码实现-Sink到HBase

 //TODO 6.批量Sink到HBase:index.hbase.table.name=quot_index
        .addSink(new HBaseSink(QuotConfig.INDEX_HBASE_TABLE_NAME));

 

测试

1.zk/kafka/hbase启动

2.启动程序IndexStreamApplication

3.登录hbase查看数据

bin/hbase shell
list
scan "quot_index",{LIMIT=>10}

1612146750236

 

指数行情-分时/分级行情

需求

按照指数代码进行分组 每隔1min/60s划分一个窗口对数据进行处理并将结果写入到Kafka的index-sse和index-szse主题主题,最终由Druid进行摄取并查询

1612062831573

代码实现-核心任务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Properties;

/**
 * Author itcast
 * Desc 指数分时/分级行情数据业务处理核心任务类
 * 需求:按照指数代码进行分组 每隔1min/60s划分一个窗口对数据进行处理并将结果写入到Kafka的index-sse和index-szse主题主题,
 * 最终由Druid进行摄取并查询
 */
public class IndexMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准备侧道数据流用来存放分流后的数据
        OutputTag<IndexBean> sseOutputTag = new OutputTag<>("sse", TypeInformation.of(IndexBean.class));
        OutputTag<IndexBean> szseOutputTag = new OutputTag<>("szse", TypeInformation.of(IndexBean.class));

        //TODO 1.分组
        SingleOutputStreamOperator<IndexBean> processDS = watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口计算:CleanBean-->IndexBean
                .apply(new IndexMinutesWindowFunction())
                //TODO 4.分流
                .process(new ProcessFunction<IndexBean, IndexBean>() {
                    @Override
                    public void processElement(IndexBean indexBean, Context context, Collector<IndexBean> collector) throws Exception {
                        if (indexBean.getSource().equals("sse")) {
                            context.output(sseOutputTag, indexBean);
                        } else {
                            context.output(szseOutputTag, indexBean);
                        }
                    }
                });
        DataStream<IndexBean> sseIndexDS = processDS.getSideOutput(sseOutputTag);
        DataStream<IndexBean> szseIndexDS = processDS.getSideOutput(szseOutputTag);

        SingleOutputStreamOperator<String> sseJsonDS = sseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });
        SingleOutputStreamOperator<String> szseJsonDS = szseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });

        //TODO 5.Sink到Kafka的不同主题(最终到Druid)
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //sse.index.topic=index-sse
        FlinkKafkaProducer011<String> ssekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_INDEX_TOPIC, new SimpleStringSchema(), props);
        //szse.index.topic=index-szse
        FlinkKafkaProducer011<String> szsekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SZSE_INDEX_TOPIC, new SimpleStringSchema(), props);

        sseJsonDS.addSink(ssekafkaSink);
        szseJsonDS.addSink(szsekafkaSink);

    }
}

 

代码实现-窗口计算函数/类

package cn.itcast.function.window;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Author itcast
 * Desc 将当前60s窗口最新的CleanBean转为IndexBean
 * 注意需要封装分时成交量和成交金额
 * 这一分钟的分时成交量或成交金额 = 当前窗口最新的日总成交量或成交金额  - 上一个窗口的最新的日总成交量或成交金额
 * 当前窗口最新的可以通过newCleanBean获取
 * 上一个窗口的最新可以通过状态保存和获取
 */
public class IndexMinutesWindowFunction extends RichWindowFunction<CleanBean, IndexBean, String, TimeWindow> {
    //1.准备一个MapState用来存放上一个窗口的StockBean数据
    //MapState<指数代码,上一个窗口的IndexBean>
    private MapState<String, IndexBean> indexState = null;

    //2.初始化状态
    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, IndexBean> stateDescriptor = new MapStateDescriptor<>("indexState", String.class, IndexBean.class);
        indexState = getRuntimeContext().getMapState(stateDescriptor);
    }


    @Override
    public void apply(String s, TimeWindow timeWindow, Iterable<CleanBean> iterable, Collector<IndexBean> collector) throws Exception {
        //3.获取newCleanBean
        CleanBean newCleanBean = null;
        for (CleanBean cleanBean : iterable) {
            if (newCleanBean == null) {
                newCleanBean = cleanBean;
            }
            if (cleanBean.getEventTime() > newCleanBean.getEventTime()) {
                newCleanBean = cleanBean;
            }
        }

        //4.从状态中获取上一窗口的IndexBean
        IndexBean lastIndexBean = indexState.get(newCleanBean.getSecCode());
        Long minutesVol = 0L;//要计算的分时成交量
        Long minutesAmt = 0L;//要计算的分时成交金额
        //这一分钟的分时成交量或成交金额 = 当前窗口最新的日总成交量或成交金额  - 上一个窗口的最新的日总成交量或成交金额
        if (lastIndexBean != null) {
            //5.当前窗口最新的日总成交量和成交金额
            Long currentTradeVolumn = newCleanBean.getTradeVolumn();
            Long currentTradeAmt = newCleanBean.getTradeAmt();

            //6.获取上一个窗口的最新的日总成交量和成交金额
            Long tradeVolDay = lastIndexBean.getTradeVolDay();
            Long tradeAmtDay = lastIndexBean.getTradeAmtDay();

            //7.计算这一分钟的分时成交量和成交金额
            minutesVol = currentTradeVolumn - tradeVolDay;
            minutesAmt = currentTradeAmt - tradeAmtDay;

        } else {
            minutesVol = newCleanBean.getTradeVolumn();
            minutesAmt = newCleanBean.getTradeAmt();
        }

        //8.数据封装
        Long tradeTime = DateUtil.longTimestamp2LongFormat(newCleanBean.getEventTime(), "yyyyMMddHHmmss");
        IndexBean indexBean = new IndexBean(
                newCleanBean.getEventTime(),
                newCleanBean.getSecCode(),
                newCleanBean.getSecName(),
                newCleanBean.getPreClosePx(),
                newCleanBean.getOpenPrice(),
                newCleanBean.getMaxPrice(),
                newCleanBean.getMinPrice(),
                newCleanBean.getTradePrice(),
                minutesVol, minutesAmt,
                newCleanBean.getTradeVolumn(),
                newCleanBean.getTradeAmt(),
                tradeTime,
                newCleanBean.getSource()
        );
        collector.collect(indexBean);

        //9.更新状态
        indexState.put(indexBean.getIndexCode(),indexBean);

    }
}

 

代码实现-分流+Sink

//TODO 4.分流
                .process(new ProcessFunction<IndexBean, IndexBean>() {
                    @Override
                    public void processElement(IndexBean indexBean, Context context, Collector<IndexBean> collector) throws Exception {
                        if (indexBean.getSource().equals("sse")) {
                            context.output(sseOutputTag, indexBean);
                        } else {
                            context.output(szseOutputTag, indexBean);
                        }
                    }
                });
        DataStream<IndexBean> sseIndexDS = processDS.getSideOutput(sseOutputTag);
        DataStream<IndexBean> szseIndexDS = processDS.getSideOutput(szseOutputTag);

        SingleOutputStreamOperator<String> sseJsonDS = sseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });
        SingleOutputStreamOperator<String> szseJsonDS = szseIndexDS.map(new MapFunction<IndexBean, String>() {
            @Override
            public String map(IndexBean indexBean) throws Exception {
                return JSON.toJSONString(indexBean);
            }
        });

        //TODO 5.Sink到Kafka的不同主题(最终到Druid)
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //sse.index.topic=index-sse
        FlinkKafkaProducer011<String> ssekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_INDEX_TOPIC, new SimpleStringSchema(), props);
        //szse.index.topic=index-szse
        FlinkKafkaProducer011<String> szsekafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SZSE_INDEX_TOPIC, new SimpleStringSchema(), props);

        sseJsonDS.addSink(ssekafkaSink);
        szseJsonDS.addSink(szsekafkaSink);

 

测试

1.启动zk/kafka/Druid

broker http://node01:8888

middleManager、historical http://node01:8090/console.html

2.准备主题

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic index-sse
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic index-szse

3.创建Druid数据源,让Druid实时摄取index-sse和index-szse

可以通过webUI配置也可以直接使用postman中配置好的

1612149777526

1612149823110

 

检查WebUI发现Druid正在实时摄取Kafka数据

http://node01:8090/console.html

1612149843276

 

4.启动程序

 

5.观察Kafka中是否有数据

1612149967829

 

如果乱码,进行如下设置

![1612078753378](../../../../授课/202009-47/今日指数-day06/笔记/笔记-day06.assets/1612078753378.png)

6.查询Druid中是否有数据(需要等待Druid反应过来或running摄取reset重置一下然后程序flink程序或Druid重启一下)

http://node01:8888/unified-console.html#query

1612150118537

 

1612150082671

 

 

 

指数行情-分时/分级行情备份

需求

1612078952955

代码实现-核心业务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.IndexBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.IndexMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

/**
 * Author itcast
 * Desc 指数分时/分级行情数据业务处理核心任务类--分时行情数据备份到HDFS,前面的计算和分时/分级行情一样算即可
 */
public class IndexMinutesBackupTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准备HDFS-Sink
        //之前学习的新版本可以用StreamingFileSink或FileSink,但需要注意支持的hadoop版本为2.7+,而我们项目中用的cdh5.14是2.6
        //所以这里不能使用之前学习的Flink新特性中的Sink,得用老的BucketingSink
        //path为: index.sec.hdfs.path=hdfs://192.168.52.100:8020/quot_data/dev/index/
        BucketingSink<String> bucketingSink = new BucketingSink<>(QuotConfig.INDEX_SEC_HDFS_PATH);
        //文件大小: hdfs.batch=1073741824
        bucketingSink.setBatchSize(Long.parseLong(QuotConfig.HDFS_BATCH));
        //分桶/分文件夹策略:hdfs.bucketer=yyyyMMdd
        bucketingSink.setBucketer(new DateTimeBucketer(QuotConfig.HDFS_BUCKETER));

        //前缀--不设置会有默认的
        bucketingSink.setInProgressPrefix("index_");
        bucketingSink.setPendingPrefix("index_2_");//挂起状态的前缀
        //后缀--不设置会有默认的
        bucketingSink.setInProgressSuffix(".txt");
        bucketingSink.setPendingSuffix(".txt");


        //TODO 1.分组
        SingleOutputStreamOperator<IndexBean> processDS = watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口计算:CleanBean-->IndexBean
                .apply(new IndexMinutesWindowFunction())
                //TODO 4.数据转换拼接
                .map(new IndexPutHDFSMapFunction())
                .addSink(bucketingSink);
    }
}

 

代码实现-数据拼接

package cn.itcast.function.map;

import cn.itcast.bean.IndexBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.MapFunction;

/**
 * Author itcast
 * Desc 将IndexBean中的字段拼接为String并使用|分隔
 */
public class IndexPutHDFSMapFunction implements MapFunction<IndexBean, String> {
    @Override
    public String map(IndexBean indexBean) throws Exception {
        //获取分隔符
        String seperator = QuotConfig.HDFS_SEPERATOR;
        //日期转换
        String tradeDate = DateUtil.longTimestamp2String(indexBean.getEventTime(), DateFormatConstant.format_yyyy_mm_dd);
        //字段拼接
        StringBuilder sb = new StringBuilder();
        sb.append(indexBean.getTradeTime()).append(seperator)
                .append(tradeDate).append(seperator)
                .append(indexBean.getIndexCode()).append(seperator)
                .append(indexBean.getIndexName()).append(seperator)
                .append(indexBean.getPreClosePrice()).append(seperator)
                .append(indexBean.getOpenPrice()).append(seperator)
                .append(indexBean.getHighPrice()).append(seperator)
                .append(indexBean.getLowPrice()).append(seperator)
                .append(indexBean.getClosePrice()).append(seperator)
                .append(indexBean.getTradeVol()).append(seperator)
                .append(indexBean.getTradeAmt()).append(seperator)
                .append(indexBean.getTradeVolDay()).append(seperator)
                .append(indexBean.getTradeAmtDay()).append(seperator)
                .append(indexBean.getSource()).append(seperator);
        //返回拼接结果
        return sb.toString();
    }
}

 

测试

0.启动zk/kafka

1.启动程序IndexStreamApplication

2.观察HDFS

http://node01:50070/explorer.html#/quot_data/dev/index/20210201

3.注意:如果关闭了Checkpoint, 数据没有刷到HDFS, 可以把batchSize设置小一点

 

指数行情-涨跌/涨跌幅/振幅-作业

课后自己完成作业

需要自定义一个IndexIncreaseBean

然后和个股类似去做即可

把个股中的代码换成指数就搞定--5分钟搞定!

 

 

板块核心业务开发-了解

注意:

业务复杂---听懂更好 ! 不做强制要求 ! 面试这块面试官没做过,问不出来!

实际开发中,该业务是3个开发,耗时3周完成!(包括需求讨论,实现,弯路)

我们今天下午半天讲完! 所以难度很大!

 

说明

个股行情:每一支股票的行情

指数行情:每一个指数的行情(指数是多支股票的综合行情)

板块行情:也是人为的按照不同的分类方式对股票进行的分类

如:按照行业分为行业板块:石油, 银行, 文化传媒, 旅游....

如:按照地区分为地区板块:南方, 珠三角, 西部,长三角.....

如:按照概念分为概念板块:区块链, AI板块, 新能源....

注意:

1.我们项目中做的是行业板块!

2.我们项目中以沪市为例,做行业板块(做沪深两市也行,但是为了降低学习难度,只考虑沪市)

3.板块是由个股组成!

1612161967109

 

程序入口类

package cn.itcast.app;

import cn.itcast.avro.AvroDeserializeSchema;
import cn.itcast.avro.SseAvro;
import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.util.QuotUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Properties;

/**
 * Author itcast
 * Desc 板块实时行情数据处理业务入口类
 */
public class SectorStreamApplication {
    public static void main(String[] args) throws Exception {
        //TODO 0.env-流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置环境参数
        //-学习测试为了方便观察可以设置并行度为1
        env.setParallelism(1);
        //设置使用事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//老版本该API没过期,新版本过期的不需要设置
        //注意:Checkpoint是Flink容错机制,上线是开启,开发测试可以注掉!
        //-Checkpoint
        /*env.enableCheckpointing(5000);//开启ckp
        if (SystemUtils.IS_OS_WINDOWS) {
            env.setStateBackend(new FsStateBackend("file:///D:/ckp"));
        } else {
            env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink-checkpoint/checkpoint"));
        }
        //===========类型2:建议参数===========
        //设置两个Checkpoint 之间最少等待时间,如设置Checkpoint之间最少是要等 500ms(为了避免每隔1000ms做一次Checkpoint的时候,前一次太慢和后一次重叠到一起去了)
        //如:高速公路上,每隔1s关口放行一辆车,但是规定了两车之前的最小车距为500m
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//默认是0
        //设置如果在做Checkpoint过程中出现错误,是否让整体任务失败:true是  false不是
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);//默认是true
        //env.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);//默认值为0,表示不容忍任何检查点失败
        //设置是否清理检查点,表示 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint会在作业被Cancel时被删除
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:true,当作业被取消时,删除外部的checkpoint(默认值)
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:false,当作业被取消时,保留外部的checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //===========类型3:直接使用默认的即可===============
        //设置checkpoint的执行模式为EXACTLY_ONCE(默认)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃。
        env.getCheckpointConfig().setCheckpointTimeout(60000);//默认10分钟
        //设置同一时间有多少个checkpoint可以同时执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//默认为1
        //固定延迟重启--开发中常用
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 最多重启3次数
                Time.of(5, TimeUnit.SECONDS) // 重启时间间隔
        ));
        //上面的设置表示:如果job失败,重启3次, 每次间隔5s*/

        //TODO 1.source-kafka的主题sse沪市
        //准备kafka参数
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);//集群地址192.168.52.100:9092
        props.setProperty("group.id", QuotConfig.GROUP_ID);//消费者组名称
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//动态分区检测,开一个后台线程每隔5s检查Kafka的分区状态
        //sse沪市
        FlinkKafkaConsumer011<SseAvro> sseKafkaSource = new FlinkKafkaConsumer011<>(QuotConfig.SSE_TOPIC, new AvroDeserializeSchema(QuotConfig.SSE_TOPIC), props);//注意新版本直接new FlinkKafkaConsumer,老版本需要指定kafka的版本
        sseKafkaSource.setStartFromEarliest();

        DataStreamSource<SseAvro> sseAvroDS = env.addSource(sseKafkaSource);
        //sseAvroDS.print("沪市>>>");

        //TODO 2.transformation--预处理:过滤/转换/合并....
        //需要检查数据正常和时间是在正常交易时间内
        //过滤出沪深两市的合法数据/过滤掉非法数据
        SingleOutputStreamOperator<SseAvro> sseAvroFiltedDS = sseAvroDS.filter(new FilterFunction<SseAvro>() {
            @Override
            public boolean filter(SseAvro sseAvro) throws Exception {
                //虽然是下面这样调用的,但其实对时间的判断是根据配置文件中的配置open.time=00:00和close.time=23:59来的,也就是说数据时间都合法,也是为了方便测试
                if (QuotUtil.checkData(sseAvro) && QuotUtil.checkTime(sseAvro)) {//合法
                    return true;
                } else {//非法
                    return false;
                }
            }
        });

        //sseAvroFiltedDS.print("沪市过滤后的合法数据>>");


        //代码走到这里说明数据都是过滤后的合法的"冰清玉洁"的数据!就应该将里面的重新板块数据封装为CleanBean
        //将沪深两市里面的AvroBean--->CleanBean
        SingleOutputStreamOperator<CleanBean> sseCleanBeanDS = sseAvroFiltedDS.map(new MapFunction<SseAvro, CleanBean>() {
            @Override
            public CleanBean map(SseAvro sseAvro) throws Exception {
                CleanBean cleanBean = new CleanBean();
                cleanBean.setMdStreamId(sseAvro.getMdStreamID().toString());
                cleanBean.setSecCode(sseAvro.getSecurityID().toString());
                cleanBean.setSecName(sseAvro.getSymbol().toString());
                cleanBean.setTradeVolumn(sseAvro.getTradeVolume());
                cleanBean.setTradeAmt(sseAvro.getTotalValueTraded());
                cleanBean.setPreClosePx(new BigDecimal(sseAvro.getPreClosePx()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setOpenPrice(new BigDecimal(sseAvro.getOpenPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMaxPrice(new BigDecimal(sseAvro.getHighPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setMinPrice(new BigDecimal(sseAvro.getLowPrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setTradePrice(new BigDecimal(sseAvro.getTradePrice()).setScale(2, RoundingMode.HALF_UP));
                cleanBean.setEventTime(sseAvro.getTimestamp());
                cleanBean.setSource("sse");
                return cleanBean;
            }
        });

        //现在手里拿到的就是沪市过滤清理后的合法的板块数据(由个股组成)
        SingleOutputStreamOperator<CleanBean> sectorDS = sseCleanBeanDS.filter(new FilterFunction<CleanBean>() {
            @Override
            public boolean filter(CleanBean cleanBean) throws Exception {
                return QuotUtil.isStock(cleanBean);
            }
        });

        //sectorDS.print("沪市的板块的CleanBean>>");


        //TODO 3.transformation--Watermark
        //开发测试时Watermark可以要可以不要
        //注意:老版本WatermarkAPI和新版本的不一样
        //Watermark = 当前最大的事件时间 - 最大允许的乱序度
        //Watermark的作用: 触发窗口计算!
        //Watermark如何触发: Watermark >= 窗口结束时间时触发计算 (该窗口里面得有数据)
        env.getConfig().setAutoWatermarkInterval(200);//默认就是200ms,表示每隔多久去给数据添加一次Watermaker,但在新版本中该API已过期
        SingleOutputStreamOperator<CleanBean> watermarkDS = sectorDS.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<CleanBean>(Time.seconds(2)) {//设置最大乱序度为2s
                    @Override
                    public long extractTimestamp(CleanBean cleanBean) {
                        return cleanBean.getEventTime();//指定事件时间列
                    }
                });
        watermarkDS.print("沪市个股watermarkDS>>");

        //TODO 4.板块核心业务+Sink....
        //TODO 板块核心业务1.秒级行情/每5s计算最新的板块价格行情数据
        //new SectorSecondsTask().process(watermarkDS);
        //TODO 板块核心业务2.分级行情/每1min计算最新的板块价格行情数据
        //new SectorMinutesTask().process(watermarkDS);
        //TODO 板块核心业务3.分级行情备份/每1min备份最新的板块价格行情数据
        //new SectorMinutesBackupTask().process(watermarkDS);
        //TODO 板块核心业务4.板块涨跌幅
        //new SectorIncreaseTask().process(watermarkDS);
        //TODO 板块核心业务5.K线行情/日K,周K,月K---后面单独做
        //new SectorKlineTask().process(watermarkDS);

        //TODO 5.execute
        env.execute();
    }
}

1612162537587

板块行情-秒级行情

代码实现-核心任务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.sink.HBaseSink;
import cn.itcast.function.window.SectorPutHBaseWindowFunction;
import cn.itcast.function.window.SectorWindowFunction;
import cn.itcast.function.window.StockSecondsWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc 板块秒级行情数据业务处理核心任务类
 * 需求: 对沪市的个股数据按照个股代码进行分组并进行窗口(5s滚动)计算,封装为StockBean,再按照板块进行计算封装为SectorBean,最后将结果写入到HBase中
 */
public class SectorSecondsTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.划分窗口
        .timeWindow(Time.seconds(5))
        //TODO 3.窗口计算:CleanBean-->StockBean
        .apply(new StockSecondsWindowFunction())//板块由个股组成,这里复用之前的个股代码即可
        //TODO 4.汇总/合并
        .timeWindowAll(Time.seconds(5))//把所有转换好的StockBean放一起,方便后面进行板块计算
        //TODO 5.窗口计算:StockBean-->SectorBean
        .apply(new SectorWindowFunction())
        //TODO 6.汇总/合并
        .timeWindowAll(Time.seconds(5))//把各个板块的数据汇总到一起,方便转为List<Put>进行HBase批量插入
        //TODO 7.封装为List<Put>
        .apply(new SectorPutHBaseWindowFunction())
        //TODO 8.Sink到HBase #板块HBase表名:sector.hbase.table.name=quot_sector
        .addSink(new HBaseSink(QuotConfig.SECTOR_HBASE_TABLE_NAME));
    }
}

 

代码实现-个股窗口

复用之前的个股代码即可
StockSecondsWindowFunction

代码实现-板块窗口-这里不一样

1612163830478

注意:

看上去好像是把左边的数据取出来封装到右边即可

但是实际上:右边的很多字段需要计算!

也就是说要找到该个股对应的板块, 把该个股算到该板块下去

也就是按照个股所属的板块进行聚合计算!

那么问题是: 该个股属于哪个板块? 在哪里查?---MySQL中有从Hive同步过来的(三方提供的)板块个股对应关系表!

1612164046055

1612164082125

package cn.itcast.function.window;

import cn.itcast.bean.SectorBean;
import cn.itcast.bean.StockBean;
import cn.itcast.util.DBUtil;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 板块业务数据处理窗口函数/类
 * 需求:将StockBean转为-->SectorBean
 * 注意:
 * 1.个股和板块的对应关系表在MySQL中
 * 2.板块中的很多字段需要根据该板块下的个股的数据进行重新计算得到---复杂
 * 当前板块开盘价格 = 板块前收盘价*当前板块以开盘价计算的各个股总流通市值  即 累计各个股开盘流通市值/当前板块前一交易日板块总流通市值
 * 当前板块当前价格 = 板块前收盘价*当前板块以收盘价计算的各个股总流通市值  即 累计各个股收盘流通市值/当前板块前一交易日板块总流通市值
 */
public class SectorWindowFunction extends RichAllWindowFunction<StockBean, SectorBean, TimeWindow> {
    //TODO 0.准备一些变量、集合、状态方便后续使用
    //定义基准价(作为上市首日的前一交易日收盘价)
    BigDecimal basePrice = new BigDecimal(1000);

    //定义集合用来存放MySQL表中的数据
    //Map<板块代码, List<Map<字段名,值>>>
    Map<String, List<Map<String,Object>>> sectorStockMap = null;

    //Map<板块代码,Map<字段名,值>>
    Map<String,Map<String,Object>> sectorKlineMap = null;

    //定义一个状态用来存储上一个窗口的板块数据
    //MapState<板块代码,SectorBean>
    MapState<String,SectorBean> sectorState = null;

    //TODO 0.初始化数据
    @Override
    public void open(Configuration parameters) throws Exception {
        //查MySQL-bdp_sector_stock表中的数据封装到sectorStockMap中
        String sql = "SELECT * FROM bdp_sector_stock WHERE sec_abbr = 'ss'";
        //执行sql可以使用原始JDBC代码,这里直接使用封装好的工具类
        sectorStockMap = DBUtil.queryForGroup("sector_code", sql);

        //查MySQL-bdp_quot_sector_kline_day板块日K表--没有数据没关系,后续会计算存进去 --为了取前收盘价
        //# 查询前一个交易日的板块日K数据
        String sql2 = "SELECT * FROM bdp_quot_sector_kline_day WHERE trade_date = (SELECT last_trade_date FROM tcc_date where trade_date = CURDATE())";
        sectorKlineMap = DBUtil.query("sector_code", sql2);

        MapStateDescriptor<String, SectorBean> stateDescriptor = new MapStateDescriptor<>("sectorState", String.class, SectorBean.class);
        sectorState = getRuntimeContext().getMapState(stateDescriptor);
    }

    @Override
    public void apply(TimeWindow timeWindow, Iterable<StockBean> iterable, Collector<SectorBean> collector) throws Exception {
        //TODO 1.遍历该窗口中的个股并缓存到新建的Map中
        Map<String,StockBean> cacheStockMap = new HashMap<>();
        for (StockBean stockBean : iterable) {
            cacheStockMap.put(stockBean.getSecCode(),stockBean);
        }

        //TODO 2.遍历板块对应关系表中的个股并获取板块下的个股
        //Map<板块代码, List<Map<字段名,值>>>
        //Map<String, List<Map<String,Object>>> sectorStockMap = 已经在open中初始化了;
        for (String sectorCode : sectorStockMap.keySet()) {
            //获取该板块代码对应的个股
            //List表示该板块下的个股组成的集合,里面的一个map就一个个股
            List<Map<String, Object>> listStock = sectorStockMap.get(sectorCode);

            //TODO 3.初始化要返回的SectorBean中需要的数据,后面计算再赋值
            //下面的都是返回的SectorBean中需要用到的
            Long eventTime = 0L;//ok
            String sectorName = null; //ok
            BigDecimal preClosePrice = new BigDecimal(0);//ok
            BigDecimal highPrice = new BigDecimal(0);//ok
            BigDecimal openPrice = new BigDecimal(0);//ok
            BigDecimal lowPrice = new BigDecimal(0);//ok
            BigDecimal closePrice = new BigDecimal(0);//ok
            Long tradeVol = 0L; //分时成交量 //ok
            Long tradeAmt = 0L; //分时成交额 //ok
            Long tradeVolDay = 0L; //日成交总量 //ok
            Long tradeAmtDay = 0L; //日成交总额 //ok
            Long tradeTime = 0L;//ok

            //下面的都是后面计算需要用到的
            //当前板块以开盘价计算的总流通市值 也就是 累计各个股开盘流通市值
            BigDecimal stockOpenTotalNegoCap = new BigDecimal(0);//ok
            //当前板块以收盘价计算的总流通市值 也就是 累计各个股收盘流通市值
            BigDecimal stockCloseTotalNegoCap = new BigDecimal(0);//ok
            //前一日板块总流通市值
            BigDecimal preSectorNegoCap = new BigDecimal(0); //ok

            //TODO 4.遍历该板块下的各个个股,并获取每个个股的所属板块名称,个股代码,个股流通股本,前一交易日板块流通总市值
            for (Map<String, Object> map : listStock) {
                sectorName = map.get("sector_name").toString();//板块名称
                String sec_code = map.get("sec_code").toString();//个股代码
                BigDecimal negoCap = new BigDecimal(map.get("nego_cap").toString());//个股流通股本
                preSectorNegoCap = new BigDecimal(map.get("pre_sector_nego_cap").toString());//前一交易日板块流通总市值

                StockBean stockBean = cacheStockMap.get(sec_code);//根据个股代码从当前窗口数据获取对应的个股信息
                if(stockBean != null){//当前窗口中有该个股,且属于该板块
                    eventTime = stockBean.getEventTime();
                    tradeTime = stockBean.getTradeTime();

                    //TODO 5.计算业务字段:当前板块以开盘价计算的总流通市值和当前板块以收盘价计算的总流通市值
                    //当前板块以开盘价计算的总流通市值 也就是 累计各个股开盘流通市值 = sum(板块下的各个个股的开盘价 * 各个个股流通股本)
                    BigDecimal stockOpenValue = stockBean.getOpenPrice().multiply(negoCap).setScale(2, RoundingMode.HALF_UP);
                    stockOpenTotalNegoCap.add(stockOpenValue);
                    //当前板块以收盘价计算的总流通市值 也就是 累计各个股收盘流通市值 = sum(板块下的各个个股的收盘价 * 各个个股流通股本)
                    BigDecimal stockCloseValue = stockBean.getClosePrice().multiply(negoCap).setScale(2, RoundingMode.HALF_UP);
                    stockCloseTotalNegoCap.add(stockCloseValue);

                    //TODO 6.板块下各个个股的累计成交量/成交金额
                    //板块下各个个股的累计成交量
                    tradeVolDay += stockBean.getTradeVolDay();
                    //板块下各个个股的累计成交金额
                    tradeAmtDay += stockBean.getTradeAmtDay();
                }
            }
            //TODO 7.是否是首日上市
            if(sectorKlineMap != null && sectorKlineMap.get(sectorCode) != null){
                //非首日上市
                //板块前收盘价 = 板块日K中的收盘价(注意板块日k查询的就是前一个交易日的k线,所以板块前收盘价就等于该日k中的收盘价)
                Map<String, Object> kmap = sectorKlineMap.get(sectorCode);
                preClosePrice = new BigDecimal(kmap.get("close_price").toString());
            }else{
                //首日上市
                //板块前收盘价 = 基准价
                preClosePrice = basePrice;
            }

            //TODO 8.计算板块开盘价和收盘价/当前价
            //下面的公式不需要背
            //当前板块开盘价格 = 板块前收盘价*当前板块以开盘价计算的各个股总流通市值  即 累计各个股开盘流通市值/当前板块前一交易日板块总流通市值
            openPrice = preClosePrice.multiply(stockOpenTotalNegoCap).divide(preSectorNegoCap,2, RoundingMode.HALF_UP).setScale(2, RoundingMode.HALF_UP);
            //当前板块收盘价或当前价格 = 板块前收盘价*当前板块以收盘价计算的各个股总流通市值  即 累计各个股收盘流通市值/当前板块前一交易日板块总流通市值
            closePrice = preClosePrice.multiply(stockCloseTotalNegoCap).divide(preSectorNegoCap,2, RoundingMode.HALF_UP).setScale(2, RoundingMode.HALF_UP);


            //TODO 9.计算板块最高最低价
            //先让高低都赋值为最新价
            highPrice = closePrice;//如果不是最后一条,closePrice其实就是最新价
            lowPrice = closePrice;//如果不是最后一条,closePrice其实就是最新价

            //从状态中获取之前的高低进行比较,算出最终的高低
            SectorBean lastSectorBean = sectorState.get(sectorCode);
            if(lastSectorBean != null){
                BigDecimal lastHighPrice = lastSectorBean.getHighPrice();
                BigDecimal lastLowPrice = lastSectorBean.getLowPrice();
                Long lastTradeVol = lastSectorBean.getTradeVol();
                Long lastTradeAmt = lastSectorBean.getTradeAmt();
                
                if(lastHighPrice.compareTo(highPrice) == 1) {//lastHighPrice>highPrice
                    highPrice = lastHighPrice;
                }

                if (lastLowPrice.compareTo(lowPrice) == -1) {//lastLowPrice<lowPrice
                    lowPrice = lastLowPrice;
                }

                //TODO 10.分时成交量/成交金额
                tradeVol = tradeVolDay - lastTradeVol;
                tradeAmt = tradeVolDay - lastTradeAmt;

                //TODO 11.注意:严谨一点: 高低还和开盘价进行比较,因为之前的高低只是和上一窗口比的
                if(openPrice.compareTo(highPrice) == 1){
                    highPrice = openPrice;
                }
                if(openPrice.compareTo(lowPrice) == -1){
                    lowPrice = openPrice;
                }
            }/*else {
                //使用默认值,第一个窗口数据默认值没事
            }*/

            //结束后上面的字段值就都有了
            //TODO 12.封装并返回结果
            SectorBean sectorBean = new SectorBean();
            sectorBean.setEventTime(eventTime);
            sectorBean.setSectorCode(sectorCode);
            sectorBean.setSectorName(sectorName);

            sectorBean.setPreClosePrice(preClosePrice);
            sectorBean.setHighPrice(highPrice);
            sectorBean.setOpenPrice(openPrice);
            sectorBean.setLowPrice(lowPrice);
            sectorBean.setClosePrice(closePrice);

            sectorBean.setTradeVol(tradeVol);
            sectorBean.setTradeAmt(tradeAmt);
            sectorBean.setTradeVolDay(tradeVolDay);
            sectorBean.setTradeAmtDay(tradeAmtDay);

            sectorBean.setTradeTime(tradeTime);

            collector.collect(sectorBean);

            //TODO 13.更新状态
            sectorState.put(sectorCode,sectorBean);//最后更新的SectorBean里面就封装了最新的高低价
        }
    }
}

 

 

代码实现-数据封装为List< Put >

package cn.itcast.function.window;

import cn.itcast.bean.SectorBean;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.hadoop.hbase.client.Put;

import java.util.ArrayList;
import java.util.List;

/**
 * Author itcast
 * Desc 将SectorBean-->List<Put>
 */
public class SectorPutHBaseWindowFunction implements AllWindowFunction<SectorBean, List<Put>, TimeWindow> {
    @Override
    public void apply(TimeWindow timeWindow, Iterable<SectorBean> iterable, Collector<List<Put>> collector) throws Exception {
        List<Put> list = new ArrayList<>();
        for (SectorBean sectorBean : iterable) {
            String rowKey = sectorBean.getSectorCode() + sectorBean.getTradeTime();
            Put put = new Put(rowKey.getBytes());
            String jsonStr = JSON.toJSONString(sectorBean);
            put.addColumn("info".getBytes(),"data".getBytes(),jsonStr.getBytes());
            list.add(put);
        }
        collector.collect(list);
    }
}


 

代码实现-Sink到HBase

//TODO 8.Sink到HBase #板块HBase表名:sector.hbase.table.name=quot_sector
        .addSink(new HBaseSink(QuotConfig.SECTOR_HBASE_TABLE_NAME));

 

测试

1.zk/kafka/hbase启动

2.启动程序SectorStreamApplication

3.登录hbase查看数据

bin/hbase shell
list
scan "quot_sector",{LIMIT=>10}

1612173892675

 

板块行情-分时行情

需求

按照个股代码进行分组 每隔1min/60s划分一个窗口对数据进行处理转为StockBean, 再转SectorBean,并将结果写入到Kafka的主题,最终由Druid进行摄取并查询

1612062831573

 

代码实现-核心任务类

只需要写这里,剩下的都可以复用

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.SectorBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.window.SectorWindowFunction;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * Author itcast
 * Desc 板块行情核心任务类-板块分时行情
 * 按照个股代码进行分组 每隔1min/60s划分一个窗口对数据进行处理转为StockBean,
 * 再转SectorBean,并将结果写入到Kafka的主题,最终由Druid进行摄取并查询
 */
public class SectorMinutesTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.准备Kafka-Sink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", QuotConfig.BOOTSTRAP_SERVERS);
        //sse.sector.topic=sector-sse
        FlinkKafkaProducer011<String> kafkaSink = new FlinkKafkaProducer011<>(QuotConfig.SSE_SECTOR_TOPIC, new SimpleStringSchema(), props);

        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.窗口划分
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口计算CleanBean-->StockBean--复用之前的个股分时行情代码
                .apply(new StockMinutesWindowFunction())
                //TODO 4.数据合并
                .timeWindowAll(Time.minutes(1))
                //TODO 5.窗口计算StockBean-->SectorBean--复用之前的板块秒级行情代码
                .apply(new SectorWindowFunction())
                //TODO 6.转为json再Sink到Kafka
                .map(new MapFunction<SectorBean, String>() {
                    @Override
                    public String map(SectorBean sectorBean) throws Exception {
                        return JSON.toJSONString(sectorBean);
                    }
                }).addSink(kafkaSink);

    }
}

 

测试

0.启动zk/kafka/druid

1.准备kafka主题

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --create --partitions 1 --replication-factor 1 --topic sector-sse

 

2.让druid摄取kafka

1612316953635

http://node01:8090/console.html

1612316968840

3.启动程序SectorStreamApplication

4.观察kafka

1612317033791

5.观察druid

http://node01:8090/console.html

1612317139608

http://node01:8888/unified-console.html#query

1612317121625

 

 

板块行情-分时行情-备份

需求

1612078952955

 

代码实现-核心业务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.function.map.SectorPutHDFSMapFunction;
import cn.itcast.function.window.SectorWindowFunction;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;

/**
 * Author itcast
 * Desc 板块核心业务类-分时行情备份
 */
public class SectorMinutesBackupTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 0.HDFS-Sink
        //sector.sec.hdfs.path=hdfs://192.168.52.100:8020/quot_data/dev/sector/
        BucketingSink<String> bucketSink = new BucketingSink<>(QuotConfig.SECTOR_SEC_HDFS_PATH);//路径
        //bucketSink.setBatchSize(Integer.parseInt(QuotConfig.HDFS_BATCH));//批大小1073741824
        bucketSink.setBatchSize(16384L);//批大小:注意:如果设置太大,那么缓冲块的大小也很大,数据较少时,达不到缓冲块大小则不会flush到HDFS,所以这里把大小设置小一点,那么缓冲块也就变下了
        bucketSink.setBucketer(new DateTimeBucketer<>(QuotConfig.HDFS_BUCKETER));//分桶规则hdfs.bucketer=yyyyMMdd
        //前缀后缀
        bucketSink.setPendingPrefix("sector2-");
        bucketSink.setInProgressPrefix("sector-");
        bucketSink.setPendingSuffix(".txt");
        bucketSink.setInProgressSuffix(".txt");

        //TODO 1.分组
        watermarkDS.keyBy(CleanBean::getSecCode)
        //TODO 2.划分窗口
        .timeWindow(Time.minutes(1))
        //TODO 3.窗口计算CleanBean-->StockBean
        .apply(new StockMinutesWindowFunction())
        //TODO 4.数据合并
        .timeWindowAll(Time.minutes(1))
        //TODO 5.窗口计算StockBean-->SectorBean
        .apply(new SectorWindowFunction())
        //TODO 6.数据拼接为字符串
        .map(new SectorPutHDFSMapFunction())
        //TODO 7.数据Sink到HDFS
        .addSink(bucketSink);
    }
}

 

代码实现-数据拼接

package cn.itcast.function.map;

import cn.itcast.bean.SectorBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.MapFunction;

import java.sql.Timestamp;
import java.util.Date;

/**
 * Author itcast
 * Desc 将SectorBean-->String
 */
public class SectorPutHDFSMapFunction implements MapFunction<SectorBean, String> {
    String sp = QuotConfig.HDFS_SEPERATOR;
    @Override
    public String map(SectorBean value) throws Exception {
        String tradeDate = DateUtil.longTimestamp2String(value.getEventTime(), DateFormatConstant.format_yyyy_mm_dd);
        StringBuilder sb = new StringBuilder();
        sb.append(new Timestamp(new Date().getTime())).append(sp)
                .append(tradeDate).append(sp)
                .append(value.getSectorCode()).append(sp)
                .append(value.getSectorName()).append(sp)
                .append(value.getPreClosePrice()).append(sp)
                .append(value.getOpenPrice()).append(sp)
                .append(value.getHighPrice()).append(sp)
                .append(value.getLowPrice()).append(sp)
                .append(value.getClosePrice()).append(sp)
                .append(value.getTradeVol()).append(sp)
                .append(value.getTradeAmt()).append(sp)
                .append(value.getTradeVolDay()).append(sp)
                .append(value.getTradeAmtDay());
        System.out.println("数据已经转换拼接为:"+sb.toString());
        return sb.toString();
    }
}


 

测试

0.启动zk/kafka

1.启动程序SectorStreamApplication

2.观察HDFS

http://node01:50070/explorer.html#/quot_data/dev/sector/20210203

 

 

K线业务-掌握/重点

尽量掌握

需求

如下图:要对个股/指数/板块求K线,并将结果存放到MySQL

注意:

我们项目中K线分为日K/周K/月K,数据量不大,当然也有时K

日K/周K/月K一起做! 就是不再拆分代码去做了, 因为再拆的话,要分别实现如下代码,太麻烦

所以把日K/周K/月K一起做,但区分个股,指数,板块,那就意味着写3个task即可

1612320289298

1612320043879

 

 

个股K线

1612321801185

 

代码实现-核心任务类

package cn.itcast.task;

import cn.itcast.bean.CleanBean;
import cn.itcast.bean.StockBean;
import cn.itcast.config.QuotConfig;
import cn.itcast.constant.KlineType;
import cn.itcast.function.map.StockKlineMapFunction;
import cn.itcast.function.sink.MySQLSink;
import cn.itcast.function.window.StockMinutesWindowFunction;
import cn.itcast.standard.ProcessDataInterface;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Author itcast
 * Desc 个股K线行情核心任务类
 * 要完成个股的日K,周K,月K的数据计算并Sink到MySQL
 * 注意:日K最终1天就1条,周K最终1周就1条,月K最终1月就1条
 */
public class StockKlineTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CleanBean> watermarkDS) {
        //TODO 1.分组
        DataStream<StockBean> stockBeanDS = watermarkDS.keyBy(CleanBean::getSecCode)
                //TODO 2.划分窗口(1min更新)
                .timeWindow(Time.minutes(1))
                //TODO 3.窗口计算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction());

        //TODO 4.StockBean-->K线Row(搞个Bean封装也行,但这里用Row更方便)并Sink到MySQL
        //先准备一条日K,周K,月K的sql(sql应该是replace into 表示有则更新,没有则插入),看sql中需要哪些字段
        String sql = "REPLACE INTO %s values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
        //#个股日k
        //mysql.stock.sql.day.table=bdp_quot_stock_kline_day
        //#个股周k
        //mysql.stock.sql.week.table=bdp_quot_stock_kline_week
        //#个股月k
        //mysql.stock.sql.month.table=bdp_quot_stock_kline_month

        //日K:StockKlineMapFunction将StockBean-->K线Row
        stockBeanDS.map(new StockKlineMapFunction(KlineType.DAY_K.getType(),KlineType.DAY_K.getFirstTxDateType()))
                .keyBy(row->row.getField(2))//按照secCode分组
                .addSink(new MySQLSink(String.format(sql, QuotConfig.MYSQL_STOCK_SQL_DAY_TABLE)));//Sink到MySQL
        //周K
        stockBeanDS.map(new StockKlineMapFunction(KlineType.WEEK_K.getType(),KlineType.WEEK_K.getFirstTxDateType()))
                .keyBy(row->row.getField(2))//按照secCode分组
                .addSink(new MySQLSink(String.format(sql, QuotConfig.MYSQL_STOCK_SQL_WEEK_TABLE)));//Sink到MySQL
        //月K
        stockBeanDS.map(new StockKlineMapFunction(KlineType.MONTH_K.getType(),KlineType.MONTH_K.getFirstTxDateType()))
                .keyBy(row->row.getField(2))//按照secCode分组
                .addSink(new MySQLSink(String.format(sql, QuotConfig.MYSQL_STOCK_SQL_MONTH_TABLE)));//Sink到MySQL

    }
}

 

代码实现-数据转换类-转为KLine

package cn.itcast.function.map;

import cn.itcast.bean.StockBean;
import cn.itcast.constant.DateFormatConstant;
import cn.itcast.util.DBUtil;
import cn.itcast.util.DateUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.Timestamp;
import java.util.Date;
import java.util.Map;

/**
 * Author itcast
 * Desc
 */
public class StockKlineMapFunction extends RichMapFunction<StockBean, Row> {
    //TODO 0.定义变量
    private String type;//K线类型,1是日K,2是周K,3是月K
    private String firstTxDateTypeName;//周期内首个交易日期字段名

    private String firstTxDate;//周期内首个交易日期值
    private String tradeDate;//当天日期
    //Map<股票代码, Map<字段名称,字段值>>
    private Map<String, Map<String, Object>> aggMap;

    //TODO 1.初始化变量
    public StockKlineMapFunction(String type, String firstTxDateTypeName) {
        this.type = type;
        this.firstTxDateTypeName = firstTxDateTypeName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //查询交易日历表
        String sql = "select * from tcc_date where trade_date = CURDATE()";
        Map<String, String> tradeDateMap = DBUtil.queryKv(sql);
        firstTxDate = tradeDateMap.get(firstTxDateTypeName);//根据字段名获取周期内首个交易日
        tradeDate = tradeDateMap.get("trade_date");//获取当天日期
        //初始化aggMap,汇总高,低,成交量,成交金额
/*
select sec_code,
max(high_price) as high_price,
min(low_price) as low_price,
sum(trade_vol) as trade_vol,
sum(trade_amt) as trade_amt
from bdp_quot_stock_kline_day
where trade_date between firstTxDate and tradeDate
group by sec_code
 */
        String aggSQL = "select sec_code,\n" +
                "max(high_price) as high_price,\n" +
                "min(low_price) as low_price,\n" +
                "sum(trade_vol) as trade_vol,\n" +
                "sum(trade_amt) as trade_amt\n" +
                "from bdp_quot_stock_kline_day\n" +
                "where trade_date between " + firstTxDate + " and " + tradeDate + "\n" +
                "group by sec_code";

        aggMap = DBUtil.query("sec_code", aggSQL);

    }

    //TODO 3.转换StockBean-->ROW
    @Override
    public Row map(StockBean stockBean) throws Exception {
        //获取需要的数据
        Long eventTime = stockBean.getEventTime();
        String secCode = stockBean.getSecCode();
        String secName = stockBean.getSecName();
        BigDecimal preClosePrice = stockBean.getPreClosePrice();
        BigDecimal openPrice = stockBean.getOpenPrice();
        BigDecimal highPrice = stockBean.getHighPrice();
        BigDecimal lowPrice = stockBean.getLowPrice();
        BigDecimal closePrice = stockBean.getClosePrice();
        Long tradeVolDay = stockBean.getTradeVolDay();
        Long tradeAmtDay = stockBean.getTradeAmtDay();
        BigDecimal avgPrice = new BigDecimal(0);

        //均价 = 成交金额/成交量
        if (tradeVolDay != 0) {
            avgPrice = new BigDecimal(tradeAmtDay).divide(new BigDecimal(tradeVolDay), 2, RoundingMode.HALF_UP);
        }

        //将当前日期和周期内首个交易日期转换为long
        Long tradeDateTime = DateUtil.stringToLong(tradeDate, DateFormatConstant.format_yyyy_mm_dd);
        Long firstTxDateTime = DateUtil.stringToLong(firstTxDate, DateFormatConstant.format_yyyy_mm_dd);
        //如果是日K,那么前收,高,开,低,收,成交量,成交金额,就直接存入日K表,因为stockBean中的数据就是当天最新的数据,直接更新MySQL日K表即可
        //如果是周K或月K,那么需要把stockBean中的数据和aggMap中的进行合并!
        if (firstTxDateTime < tradeDateTime && (type.equals(2) || type.equals(3))) {
            //表示进来的是周K或月K
            Map<String, Object> map = aggMap.get(secCode);
            if (map != null && map.size() > 0) {
                BigDecimal lastHighPrice = new BigDecimal(map.get("high_price").toString());
                BigDecimal lastLowPrice = new BigDecimal(map.get("low_price").toString());
                Long lastTradeVol = Long.parseLong(map.get("trade_vol").toString());
                Long lastTradeAmt = Long.parseLong(map.get("trade_amt").toString());

                //比较高低
                if (lastHighPrice.compareTo(highPrice) == 1) {
                    highPrice = lastHighPrice;
                }
                if (lastLowPrice.compareTo(lowPrice) == -1) {
                    lowPrice = lastLowPrice;
                }

                //累加成交量成交金额
                tradeVolDay += lastTradeVol;
                tradeAmtDay += lastTradeAmt;

                //更新aggMap--这里不更新也没事,因为后续可以使用JavaWeb计算做定时任务修正周K和月K
                /*
                map.put("high_price",highPrice);
                map.put("low_price",lowPrice);
                map.put("trade_vol",tradeVolDay);
                map.put("trade_amt",tradeAmtDay);
                */

                //计算均价
                //均价 = 成交金额/成交量
                if (tradeVolDay != 0) {
                    avgPrice = new BigDecimal(tradeAmtDay).divide(new BigDecimal(tradeVolDay), 2, RoundingMode.HALF_UP);
                }
            }
        }/*else {
            //是日K,日K直接使用stockBean中的数据
        }*/


        Row row = new Row(13);
        row.setField(0, new Timestamp(new Date().getTime()));
        row.setField(1, tradeDate);
        row.setField(2, secCode);
        row.setField(3, secName);
        row.setField(4, type);
        row.setField(5, preClosePrice);
        row.setField(6, openPrice);
        row.setField(7, highPrice);
        row.setField(8, lowPrice);
        row.setField(9, closePrice);
        row.setField(10, avgPrice);
        row.setField(11, tradeVolDay);
        row.setField(12, tradeAmtDay);

        return row;
    }
}

 

代码实现-MySQLSink

package cn.itcast.function.sink;

import cn.itcast.util.DBUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;

import java.sql.Connection;
import java.sql.PreparedStatement;

/**
 * Author itcast
 * Desc
 */
public class MySQLSink extends RichSinkFunction<Row> {
    //1.定义变量并初始化
    private String sqlWithName;
    private Connection conn;
    private PreparedStatement ps;

    public MySQLSink(String sqlWithName) {
        //sqlWithName = "REPLACE INTO 表名 values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
        //拼接好了表名的sql,但是参数需要赋值
        this.sqlWithName = sqlWithName;
    }

    //2.开启资源
    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DBUtil.getConnByJdbc();
        ps = conn.prepareStatement(sqlWithName);
    }

    //3.设置?占位符参数并执行
    @Override
    public void invoke(Row row, Context context) throws Exception {
        int length = row.getArity();//获取字段长度,长度就是13
        //设置参数
        for (int i = 0; i < length; i++) {//i的范围:[0,13)===>[0,12]===>正好就是索引的范围
            Object value = row.getField(i);
            ps.setObject(i+1,value);//注意:row的索引从0开始,jdbc的?占位符从1开始
        }
        System.out.println("K线SQL已执行,类型为" + row.getField(4));
        //执行sql
        ps.executeUpdate();

    }

    //4.关闭资源
    @Override
    public void close() throws Exception {
        if(conn!=null) conn.close();
        if(ps!=null) ps.close();
    }
}

 

测试

1.启动程序StockStreamApplication

2.观察控制台输出

1612335530286

3.观察MySQL中的K线表

1612335638996

1612335730207

1612335760970

 

 

指数K线

代码实现

使用个股的进行修改

个股-->指数

Stock-->Index

stock-->index

STOCK-->INDEX

sec_code-->index_code

getSecCode-->getIndexCode

getSecName-->getIndexName

 

测试

运行IndexStreamApplication

观察

1612336792532

 

板块K线

代码实现

修改代码

个股-->板块

Stock-->Sector

....

 

注意:

 //TODO 3.窗口计算CleanBean-->StockBean
                .apply(new StockMinutesWindowFunction())
                .timeWindowAll(Time.minutes(1))
                .apply(new SectorWindowFunction());

 

 

 

测试

启动SectorStreamApplication

观察

1612337593560

 

 

监控预警业务

需求

1612338711268

前面已经做了个股/指数/板块3大核心业务的各个子业务:秒级行情/分时行情/分时行情备份/涨跌,涨跌幅,振幅/日K,周K,月K--K线业务...(课后把所有的个股全部搞定,其他的就简单了,板块秒级行情作为扩展!)

那么接下来我们要做图中的绿色部分,也就是股票的监控预警业务(包括实时和离线,主要是实时监控预警)

而实时预警需要借助FlinkCEP去实现

 

技术准备-FlinkCEP介绍

FlinkCEP概述

Complex Event Processing(CEP)是 Flink 提供的一个非常亮眼的功能,是Flink提供的复杂事件处理(CEP)库,使用它可以在无界的事件流中检测事件模式,让我们可以掌握数据中重要的事项。并允许指定要在流中检测的模式,然后检测匹配事件序列并对其进行操作。

 

说人话:

==FlinkCEP = 实时数据流(无界流) + 用户指定的规则 + 匹配到之后的处理/输出方式==

就是使用FlinkCEP技术对 实时流数据 做规则匹配(规则可以由用户指定) , 匹配到之后可以进行相应的处理或输出!

如下图:

实时数据流中有很多不同形状的数据, 然后通过FlinkCEP指定了匹配规则,为正方形和圆形, 那么最后匹配到的结果输出就为结果流所示,

有点类似于Filter! 但是比Filter更强大!

1612339250855

 

FlinkCEP的应用场景

1.实时股票曲线预测

2.网站恶意攻击登陆行为

3.电子商务实时营销,对用户行为实时分析

4.滴滴打车异常行为检测

5.物流订单实时追踪

6.网络欺诈

7.故障检测

8.风险规避

9.智能营销等领域

==综上,FlinkCEP主要用在实时风控业务!==

 

后面会通过案例和项目中的代码实现来演示!

 

FlinkCEP优缺点

  • l 优势:

继承了 Flink 高吞吐的特点

查询是静态的,数据是动态的,满足实现和连续查询的需求

擅长解决跨事件的匹配

API友好

  • l 劣势:

本身无法做的直接动态更新规则(痛点),需要借助其他技术才可以动态注入或更新规则

但是后续项目中可以通过Redis来实现动态规则更新

 

FlinkCEP在流式开发中的位置

FlinkCEP 可以无缝嵌入到Flink流式程序中

1612339738109

技术准备-案例

编码步骤

==FlinkCEP = 实时数据流(无界流) + 用户指定的规则 + 匹配到之后的处理/输出方式==

1612340201861

新建module

1612340284751

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>quot_47</artifactId>
        <groupId>cn.itcast</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>test_flinkcep</artifactId>
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <properties>
        <jedis.version>3.0.1</jedis.version>
        <mysql.version>5.1.44</mysql.version>
        <avatica.version>1.10.0</avatica.version>
        <druid.version>1.1.9</druid.version>
        <scala.version>2.11</scala.version>
        <log4j.version>2.11.0</log4j.version>
        <cdh.version>cdh5.14.0</cdh.version>
        <hadoop.version>2.6.0</hadoop.version>
        <hbase.version>1.2.0</hbase.version>
        <kafka.version>0.11.0.2</kafka.version>
        <fastjson.version>1.2.44</fastjson.version>
        <flink.version>1.7.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-cep_2.11</artifactId>
             <version>1.10.0</version>
         </dependency>-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

准备bean和util

从资料中拷贝如下代码

今日指数-课程资料\4.资料\骨架代码\quot_bak_new\test_flinkcep\src\main\java\cn\itcast

1612340440385

 

案例1-量词

需求:

识别恶意用户

用户如果在10s内,输入了TMD 5次,就认为用户为恶意攻击,识别出该用户

如果不使用FlinkCEP,那么实现起来较为麻烦,得记录用户输出TMD多少次,得用状态

使用 Flink CEP量词模式很简单就可以搞定

package cn.itcast.cep;

import cn.itcast.bean.Message;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 识别恶意用户
 * 用户如果在10s内,输入了TMD 5次,就认为用户为恶意攻击,识别出该用户
 * 如果不使用FlinkCEP,那么实现起来较为麻烦,得记录用户输出TMD多少次,得用状态
 * 使用 Flink CEP量词模式很简单就可以搞定
 */
public class Demo01_MaliceUser {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.source
        SingleOutputStreamOperator<Message> sourceDS = env.fromCollection(Arrays.asList(
                new Message("1", "TMD", 1558430842000L),//2019-05-21 17:27:22
                new Message("1", "TMD", 1558430843000L),//2019-05-21 17:27:23
                new Message("1", "TMD", 1558430845000L),//2019-05-21 17:27:25
                new Message("1", "TMD", 1558430850000L),//2019-05-21 17:27:30
                new Message("1", "TMD", 1558430851000L),//2019-05-21 17:27:30
                new Message("2", "TMD", 1558430851000L),//2019-05-21 17:27:31
                new Message("1", "TMD", 1558430852000L)//2019-05-21 17:27:32
        )).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Message>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(Message element) {
                return element.getEventTime();
            }
        });

        //3.transformation
        //识别恶意用户:10s内,输入了TMD 5次
        //TODO 定义规则/模式
        //起了一个名字叫start
        Pattern<Message, Message> pattern = Pattern.<Message>begin("start").where(new SimpleCondition<Message>() {
            @Override
            public boolean filter(Message message) throws Exception {
                if (message.getMsg().equals("TMD")) {//输入TMD
                    return true;
                } else {
                    return false;
                }
            }
        }).times(5)//5次
         .within(Time.seconds(10));//10s内

        //TODO 将规则/模式应用到流上
        //注意:要按照用户id进行分组,因为要的是针对某个用户的10s内,输入了TMD 5次
        PatternStream<Message> patternDS = CEP.pattern(sourceDS.keyBy(Message::getId), pattern);

        //TODO 处理/获取/输出符合规则/模式的数据
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<Message, Object>() {
            @Override
            public Object select(Map<String, List<Message>> map) throws Exception {
                List<Message> list = map.get("start");//获取符合start模式的所有消息
                return list;
            }
        });
        //4.sink
        resultDS.print("被FlinkCEP规则/模式匹配到的恶意用户的详细消息信息");

        //5.execute
        env.execute();

    }
}

 

 

 

案例2-3-组合

需求:

识别2秒内连续登录失败用户

有一个业务系统,用户要使用该业务系统必须要先登陆

过滤出来在2秒内连续登陆失败的用户

 

next:严格连续,必须是紧接着

followBy:非严格连续,可以不是紧接着

package cn.itcast.cep;

import cn.itcast.bean.LoginUser;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 识别2秒内连续登录失败用户
 * 有一个业务系统,用户要使用该业务系统必须要先登陆
 * 过滤出来在2秒内连续登陆失败的用户
 */
public class Demo02_03_LoginFail {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //2.source
        SingleOutputStreamOperator<LoginUser> sourceDS = env.fromCollection(Arrays.asList(
                new LoginUser(1, "192.168.0.1", "fail", 1558430842000L),    //2019-05-21 17:27:22
                new LoginUser(1, "192.168.0.2", "success", 1558430843000L),    //2019-05-21 17:27:23
                new LoginUser(1, "192.168.0.3", "fail", 1558430843000L),    //2019-05-21 17:27:24
                new LoginUser(2, "192.168.10.10", "success", 1558430845000L)//2019-05-21 17:27:25
        )).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginUser>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(LoginUser element) {
                return element.getEventTime();
            }
        });

        //3.transformation
        //识别出在2秒内连续登陆失败的用户
        //TODO 定义规则/模式
        Pattern<LoginUser, LoginUser> pattern = Pattern.<LoginUser>begin("xx").where(new SimpleCondition<LoginUser>() {
            @Override
            public boolean filter(LoginUser loginUser) throws Exception {
                return loginUser.getStatus().equals("fail");
            }
        })
        //注意:下面这样写没有连续的意思
        /*.times(2)
        .within(Time.seconds(2))*/
        .next("next")//紧接着下一次还是失败,也就是说 fail fail fail 是匹配的, 而fail success fail 是不匹配的
        //.followedBy("next")//后面还有一次失败即可,也就是说 fail fail fail 是匹配的, 而fail success fail 是匹配的
        .where(new SimpleCondition<LoginUser>() {
            @Override
            public boolean filter(LoginUser loginUser) throws Exception {
                return loginUser.getStatus().equals("fail");
            }
        }).within(Time.seconds(2));


        //TODO 将规则/模式应用到流上
        //注意:要按照用户id进行分组,因为要的是针对某个用户的10s内,输入了TMD 5次
        PatternStream<LoginUser> patternDS = CEP.pattern(sourceDS.keyBy(LoginUser::getUserId), pattern);

        //TODO 处理/获取/输出符合规则/模式的数据
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<LoginUser, Object>() {
            @Override
            public Object select(Map<String, List<LoginUser>> map) throws Exception {
                List<LoginUser> list = map.get("next");
                return list;
            }
        });

        //4.sink
        resultDS.print("被FlinkCEP规则/模式匹配到的2s内连续登陆失败俩次的用户的信息:");

        //5.execute
        env.execute();

    }
}

 

案例4-连续和允许组合--了解

从数据源中依次提取"c","a","b"元素

.oneOrMore() 表示一个或多个,不允许组合,但可以不连续

.oneOrMore() + .consecutive() 表示这一个或多个但必须是连续的

.oneOrMore() + .allowCombinations()表示这一个或多个但允许组合

package cn.itcast.cep;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 从数据源中依次提取"c","a","b"元素
 */
public class Demo04_Consecutive {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //2.source
        DataStreamSource<String> sourceDS = env.fromElements("c", "d", "a", "a", "a", "d", "a", "b");


        //3.transformation
        //从数据源中依次提取"c","a","b"元素
        Pattern<String, String> pattern = Pattern.<String>begin("begin").where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.equals("c");
            }
        })
        //.next("next")//严格的连续模式,该案例中匹配不上
        .followedBy("middle")
        .where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.equals("a");
            }
        })
        //"c", "d", "a", "a", "a", "d", "a", "b"
        //要匹配"c","a","b"
        .oneOrMore()//允许一个或多个
        //([c],[a, a, a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a],[b])
        //.consecutive()//表示上面的a要求连续
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a],[b])
        //.allowCombinations()//允许组合.比较宽松的条件
        //([c],[a, a, a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a, a, a],[b])
        //([c],[a, a],[b])
        //([c],[a, a],[b])
        //([c],[a],[b])
        .followedBy("end")
        .where(new SimpleCondition<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.equals("b");
            }
        });


        //TODO 将规则/模式应用到流上
        //注意:要按照用户id进行分组,因为要的是针对某个用户的10s内,输入了TMD 5次
        PatternStream<String> patternDS = CEP.pattern(sourceDS, pattern);

        //TODO 处理/获取/输出符合规则/模式的数据
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<String, Object>() {
            @Override
            public Object select(Map<String, List<String>> map) throws Exception {
                List<String> begin = map.get("begin");
                List<String> middle = map.get("middle");
                List<String> end = map.get("end");
                return Tuple3.of(begin, middle, end);
            }
        });

        //4.sink
        resultDS.print("被FlinkCEP规则/模式匹配到的数据:");

        //5.execute
        env.execute();

    }
}

 

案例5-高频交易风险用户识别

需求

高频交易,找出活跃账户/交易活跃用户

在这个场景中,我们模拟账户交易信息中,那些高频的转账支付信息,希望能发现其中的风险或者活跃的用户:

需要找出那些 24 小时内至少 5 次有效交易的账户

package cn.itcast.cep;

import cn.itcast.bean.TransactionEvent;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.List;
import java.util.Map;

/**
 * Author itcast
 * Desc 需求:
 * 高频交易风险用户识别
 * 高频交易,找出活跃账户/交易活跃用户
 * 在这个场景中,我们模拟账户交易信息中,那些高频的转账支付信息,希望能发现其中的风险或者活跃的用户:
 * 需要找出那些 24 小时内至少 5 次有效交易的账户
 */
public class Demo05_HighFrequencyTrading {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //2.source
        DataStream<TransactionEvent> sourceDS = env.fromElements(
                new TransactionEvent("100XX", 10.0D, 1597905234000L),//2020-08-20 14:33:54
                new TransactionEvent("100XX", 100.0D, 1597905235000L),//2020-08-20 14:33:55
                new TransactionEvent("100XX", 200.0D, 1597905236000L),//2020-08-20 14:33:56
                new TransactionEvent("100XX", 300.0D, 1597905237000L),//2020-08-20 14:33:57
                new TransactionEvent("100XX", 400.0D, 1597905238000L),//2020-08-20 14:33:58
                new TransactionEvent("100XX", 500.0D, 1597905239000L),//2020-08-20 14:33:59
                new TransactionEvent("101XX", 0.0D, 1597905240000L),//2020-08-20 14:34:00
                new TransactionEvent("101XX", 100.0D, 1597905241000L)//2020-08-20 14:34:01
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<TransactionEvent>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(TransactionEvent element) {
                return element.getTimeStamp();
            }
        });


        //3.transformation
        //24 小时内至少 5 次 有效 交易的账户
        Pattern<TransactionEvent, TransactionEvent> pattern = Pattern.<TransactionEvent>begin("begin").where(new SimpleCondition<TransactionEvent>() {
            @Override
            public boolean filter(TransactionEvent transactionEvent) throws Exception {
                return transactionEvent.getAmount() > 0;//有效交易
            }
        })
        //.times(5)//5次
        .timesOrMore(5)//至少5次
        .within(Time.hours(24));


        //TODO 将规则/模式应用到流上
        //注意:要按照用户id进行分组,因为要的是针对某个用户的10s内,输入了TMD 5次
        PatternStream<TransactionEvent> patternDS = CEP.pattern(sourceDS.keyBy(TransactionEvent::getAccout), pattern);

        //TODO 处理/获取/输出符合规则/模式的数据
        SingleOutputStreamOperator<Object> resultDS = patternDS.select(new PatternSelectFunction<TransactionEvent, Object>() {
            @Override
            public Object select(Map<String, List<TransactionEvent>> map) throws Exception {
                List<TransactionEvent> list = map.get("begin");
                return list;
            }
        });

        //4.sink
        resultDS.print("被FlinkCEP规则/模式匹配到的数据:");

        //5.execute
        env.execute();

    }
}

 

 

案例6

稍微复杂一点点 , 可以提前预习 ,明天讲

案例7

稍微复杂一点点 , 可以提前预习,明天讲

 

实时监控业务实现

明天讲

 

 

 

思考题-解答

https://leetcode-cn.com/problems/reverse-linked-list/

1612175364663

1612175332693

//反转一个单链表。 
//
// 示例: 
//
// 输入: 1->2->3->4->5->NULL
//输出: 5->4->3->2->1->NULL 
//
// 进阶: 
//你可以迭代或递归地反转链表。你能否用两种方法解决这道题? 
// Related Topics 链表 
// 

标签:FlinkCEP,itcast,预警,org,flink,实时,new,apache,import
来源: https://www.cnblogs.com/shan13936/p/14370560.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有