ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink项目4 双流connect项目

2021-10-24 23:33:54  阅读:183  来源: 互联网

标签:receipt val 双流 Flink connect new ReceiptEvent output OrderEvent


1、一个是订单流,一个是对账流

定时器螫不区分key的,是项目视角的

package flinkProject

import java.text.SimpleDateFormat

import flinkSourse.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

case class ReceiptEvent(txid:String,payChannel:String,timestamp:Long)
case class OrderEvent(txid:String,payChannel:String,timestamp:Long)

object TxConnectedMatch {
  def main(args: Array[String]): Unit = {
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    executionEnvironment.setParallelism(1)
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms

    val stream1: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)

    val receiptDataStream: DataStream[ReceiptEvent] = stream1.map(data => {
      val tmpList = data.split(" ")
      val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
      val ts = simpleDateFormat.parse(tmpList(2)).getTime
      ReceiptEvent(tmpList(0), tmpList(1), ts)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(0)) {
      override def extractTimestamp(t: ReceiptEvent) = t.timestamp
    })


    val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 2222)

    val orderStram: DataStream[OrderEvent] = stream2.map(data => {
      val tmpList = data.split(" ")
      val simpleDateFormat = new SimpleDateFormat("dd/mm/yy:HH:mm:ss")
      val ts = simpleDateFormat.parse(tmpList(2)).getTime
      OrderEvent(tmpList(0), tmpList(1), ts)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(0)) {
      override def extractTimestamp(t: OrderEvent) = t.timestamp
    })

    val result: DataStream[(ReceiptEvent, OrderEvent)] = receiptDataStream.connect(orderStram)
      .keyBy((receipt => receipt.txid), (order => order.txid))
      .process(new ConnectedCoProcessFunction())

    result.print("result")
    result.getSideOutput(new OutputTag[OrderEvent]("order_output_tag")).print("order_output_tag")
    result.getSideOutput(new OutputTag[ReceiptEvent]("receipt_output_tag")).print("receipt_output_tag  ")


    executionEnvironment.execute("connected Stream")
  }

}

class ConnectedCoProcessFunction extends CoProcessFunction[ReceiptEvent,OrderEvent,(ReceiptEvent,OrderEvent)] {
  var receiptValueState:ValueState[ReceiptEvent]=_
  var orderValueState:ValueState[OrderEvent]=_

  override def open(parameters: Configuration): Unit = {
    receiptValueState=getRuntimeContext.getState[ReceiptEvent](new ValueStateDescriptor[ReceiptEvent]("receipt",classOf[ReceiptEvent]))
    orderValueState=getRuntimeContext.getState[OrderEvent](new ValueStateDescriptor[OrderEvent]("order",classOf[OrderEvent]))
  }

  override def processElement1(in1: ReceiptEvent, context: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#Context, collector: Collector[(ReceiptEvent, OrderEvent)]): Unit = {
    var order=orderValueState.value()
    //订单先来
    if(order!=null){
      collector.collect((in1,order))
      orderValueState.clear()
    }else{
      receiptValueState.update(in1)
      context.timerService().registerEventTimeTimer(in1.timestamp+3000l)
    }
  }

  override def processElement2(in2: OrderEvent, context: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#Context, collector: Collector[(ReceiptEvent, OrderEvent)]): Unit = {
    var receipt=receiptValueState.value()
    //receipt先来
    if(receipt!=null){
      collector.collect(receipt,in2)
      receiptValueState.clear()
    }else{
      orderValueState.update(in2)
      context.timerService().registerEventTimeTimer(in2.timestamp+3000l)
    }
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[ReceiptEvent, OrderEvent, (ReceiptEvent, OrderEvent)]#OnTimerContext, out: Collector[(ReceiptEvent, OrderEvent)]): Unit = {
    if(receiptValueState.value()!=null){
      ctx.output(new OutputTag[ReceiptEvent]("receipt_output_tag"),receiptValueState.value())
    }
    if(orderValueState.value()!=null){
      ctx.output(new OutputTag[OrderEvent]("order_output_tag"),orderValueState.value() )
    }
    receiptValueState.clear()
    orderValueState.clear()
  }
}

2、输入数据

正常的只要两个流有匹配的txId就会输出,不管延迟多长时间

定时器是不区分key的,是项目视角的

只有一个流里面有的时候,定时器延迟3s,每个流根据自己的watermark,如下

流1输入:4 404 17/05/2015:10:26:45   不会有输出

流1输入:5 404 17/05/2015:10:26:47   不会有输出,

流1输入:7 404 17/05/2015:10:26:49   

输出:receipt_output_tag  > ReceiptEvent(4,404,1421461605000)

流1输入:9 404 17/05/2015:10:26:59   watermark是10:26:59

输出:

receipt_output_tag  > ReceiptEvent(5,404,1421461607000)
receipt_output_tag  > ReceiptEvent(7,404,1421461609000)

流2输入:6 505 17/05/2015:10:26:55   不会有输出

流2输入:8 505 17/05/2015:10:26:56   不会有输出

流2输入:1 505 17/05/2015:10:27:01

输出:

order_output_tag> OrderEvent(6,505,1421461615000)
order_output_tag> OrderEvent(8,505,1421461616000)

标签:receipt,val,双流,Flink,connect,new,ReceiptEvent,output,OrderEvent
来源: https://blog.csdn.net/xuehuagongzi000/article/details/120943248

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有