标签:lane 后往 案列 Kafka apache json org mapPath import
案列一:
package com.lg.bigdata.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.Concat import org.apache.spark.sql.Column import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.Milliseconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row import scala.collection.mutable import java.lang.Double import java.util.UUID import org.apache.spark.sql.Dataset import org.apache.spark.rdd.RDD import com.google.gson.JsonObject import scala.util.parsing.json.JSONArray import org.apache.hadoop.mapred.KeyValueTextInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import java.util.Properties import org.apache.kafka.clients.producer.KafkaProducer import java.util.concurrent.Future import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.broadcast.Broadcast import org.apache.kafka.common.serialization.StringSerializer import org.apache.spark.TaskContext import com.lg.bigdata.utils.JZWUtil /** * 弃用 * 一:模块功能介绍 * (1) 功能介绍:轨迹推算 */ object KafkaAndJsonGJTS_back { def main(args:Array[String]):Unit={ val groupId = "jwz_GJ" //val groupId = "jwz_test" //1.创建SparkConf并初始化SSC,.setMaster("local[*]") val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS_back") val ssc = new StreamingContext(sparkConf, Milliseconds(500)) ssc.sparkContext.setLogLevel("WARN") val spark= SparkSession.builder().config(sparkConf).getOrCreate() val sc=spark.sparkContext /*2.定义kafka参数将kafka参数映射为map * earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * 如果offect不存在,自动重置偏移量为最小偏移量 * latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * 如果offect不存在,自动重置偏移量为最大偏移量 * none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop104:9092", //kafka链接地址 "key.deserializer" -> classOf[StringDeserializer], //序列化 "value.deserializer" -> classOf[StringDeserializer], //反序列化 "group.id" -> groupId, //主题 "auto.offset.reset" -> "earliest", //earliest latest "enable.auto.commit" -> (true: java.lang.Boolean) //是否让消费者自己提交偏移量(默认true) ) val topics = Array("car") //3.通过KafkaUtil创建kafkaDSteam //官方推荐的直连方式,使用kafka底层的API,效率更高 val kafkaDSteam = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //数据类型 val schema = StructType(List( StructField("cameraId", StringType), StructField("time", StringType), StructField("lane_position", StringType), StructField("carType", StringType), StructField("speed", StringType), StructField("space", StringType))) //初始化轨迹的位置 var mapLeft: mutable.Map[String, String] = mutable.Map() mapLeft("L")="in/left_lane/ZL.json" mapLeft("M")="in/left_lane/ZM.json" mapLeft("R")="in/left_lane/ZR.json" var mapReght: mutable.Map[String, String] = mutable.Map() mapReght("L")="in/reght_lane/YL.json" mapReght("M")="in/reght_lane/YM.json" mapReght("R")="in/reght_lane/YR.json" //变量往外抽 val init:Int=43200 var df:DataFrame=null var dfV158:DataFrame=null var dfV005:DataFrame=null var seV158:Array[Row]=null var seV005:Array[Row]=null var json8:DataFrame=null var json5:DataFrame=null var newJson8:String=null var newJson5:String=null var rdd8:RDD[String]=null var rdd5:RDD[String]=null //2.利用广播变量的形式,将kafkaProducer广播到每一个executor //广播kafkasink val kafkaProducer:Broadcast[KafkaSink[String,String]]={ val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址 p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化 p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化 p } sc.broadcast(KafkaSink[String,String](kafkaProducerConfig)) } /** * 将reduceB * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame * 左线 :V158 * 右线 :V005 */ import org.apache.spark.sql.functions._ kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => { if (!rdd.isEmpty()) { //数据不为空 df= spark.createDataFrame(rdd, schema) //主线左 dfV158=df.filter("cameraId =='V158'").toDF() /* * 第一步:拿到车辆的参数 * 筛选列 * 位置: lane_position:L,M,R * 车型: carType :car→1,bus→2 * 速度: speed * 摄像头编号: cameraId */ if(dfV158.count()>0){ seV158=dfV158.select("lane_position","carType","cameraId","speed").collect() //第二步:拿到车辆的参数根据车辆信息读取JSON seV158.foreach(x⇒{ //读取对应车道的轨迹,缓存 json8=spark.read.json(mapLeft.get(x.get(0).toString()).get).cache() //(1)车型赋值 var rowV158:DataFrame=null if(x.get(1).toString().equals("car")){ rowV158=json8.withColumn("type",concat(json8.col("type"),lit("1"))) }else{ rowV158=json8.withColumn("type",concat(json8.col("type"),lit("2"))) } //(2)车辆编号,唯一即可 rowV158=rowV158.withColumn("fz_car_id",concat(json8.col("fz_car_id"),lit(uuid))) //(3)毫秒/12米 val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString()))) rowV158=rowV158.withColumn("time",(rowV158("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数)) rowV158=rowV158.withColumn("is_show",concat(json8.col("is_show"),lit(time))) if(rowV158.count()>0){ //把spark的json格式数据转java可用的json newJson8=rowV158.toJSON.collectAsList().toString() //结果写入Kakfa kafkaProducer.value.send("GJTS_topic",newJson8) } }) } //主线右 dfV005=df.filter(" cameraId =='V005'").toDF() //筛选两个列 if(dfV005.count()>0){ seV005=dfV005.select("lane_position","carType","cameraId","speed").collect() seV005.foreach(x⇒{ //读取对应车道的轨迹,缓存 json5=spark.read.json(mapReght.get(x.get(0).toString()).get).cache() //(1)车型赋值 var rowV005:DataFrame=null if(x.get(1).toString().equals("car")){ rowV005=json5.withColumn("type",concat(json5.col("type"),lit("1"))) }else{ rowV005=json5.withColumn("type",concat(json5.col("type"),lit("2"))) } //(2)车辆编号,唯一即可 rowV005=rowV005.withColumn("fz_car_id",concat(json5.col("fz_car_id"),lit(uuid))) //(3)毫秒/12米 val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString()))) rowV005=rowV005.withColumn("time",(rowV005("id")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数)) rowV005=rowV005.withColumn("is_show",concat(json5.col("is_show"),lit(time))) if(rowV005.count()>0){ //把spark的json格式数据转java可用的json newJson5=rowV005.toJSON.collectAsList().toString() //结果写入Kakfa kafkaProducer.value.send("GJTS_topic",newJson5) } }) } } }) //启动采集器 ssc.start() //Driver等待采集器的执行,采集器终止,Driver也会终止 ssc.awaitTermination() } //车辆编号生成 def uuid():String={ UUID.randomUUID().toString().replaceAll("-", "").toString() } class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) } }
案列二:
package com.lg.bigdata.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.Concat import org.apache.spark.sql.Column import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.Milliseconds import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row import scala.collection.mutable import java.lang.Double import java.util.UUID import org.apache.spark.sql.Dataset import org.apache.spark.rdd.RDD import com.google.gson.JsonObject import scala.util.parsing.json.JSONArray import org.apache.hadoop.mapred.KeyValueTextInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import java.util.Properties import org.apache.kafka.clients.producer.KafkaProducer import java.util.concurrent.Future import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.broadcast.Broadcast import org.apache.kafka.common.serialization.StringSerializer import org.apache.spark.TaskContext import com.lg.bigdata.utils.JZWUtil import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.types.LongType import org.apache.spark.SparkException import org.apache.spark.streaming.kafka010.HasOffsetRanges import org.apache.spark.streaming.kafka010.CanCommitOffsets /** * 一:模块功能介绍 * (1) 功能介绍:轨迹推算 */ object KafkaAndJsonGJTS { val groupId = "jwz_test" //val groupId = "jwz_GJ" def main(args: Array[String]): Unit = { //1.创建SparkConf并初始化SSC,.setMaster("local[*]") val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaAndJsonGJTS") //设置序列化器为KryoSerializer //sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(sparkConf,Milliseconds(500)) //500毫秒: ssc.sparkContext.setLogLevel("WARN") val spark= SparkSession.builder().config(sparkConf).getOrCreate() val sc=spark.sparkContext /*2.定义kafka参数将kafka参数映射为map * earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * 如果offect不存在,自动重置偏移量为最小偏移量 * latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * 如果offect不存在,自动重置偏移量为最大偏移量 * none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop104:9092", //kafka链接地址 "key.deserializer" -> classOf[StringDeserializer], //序列化 "value.deserializer" -> classOf[StringDeserializer], //反序列化 "group.id" -> groupId, //主题 "auto.offset.reset" -> "latest", //earliest latest "enable.auto.commit" -> (true: java.lang.Boolean), //是否让消费者自己提交偏移量(默认true) "auto.commit.interval.ms" -> "500" //自动提交的时间 ) val topics = Array("car") //3.通过KafkaUtil创建kafkaDSteam //官方推荐的直连方式,使用kafka底层的API,效率更高 val kafkaDSteam = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //数据类型 val schema = StructType(List( StructField("cameraId", StringType), StructField("time", StringType), StructField("lane_position", StringType), StructField("carType", StringType), StructField("speed", StringType), StructField("space", StringType))) //变量往外抽 val init: Int = 43200 var carjson:Dataset[Row]=null var singleCarTrack: DataFrame=null var datacar: Array[Row] = null var rddString:RDD[String]= null var newJson:String =null var singleCarTrack_1:DataFrame=null //2.利用广播变量的形式,将kafkaProducer广播到每一个executor //广播kafkasink val kafkaProducer:Broadcast[KafkaSink[String,String]]={ val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", "hadoop104:9092") //kafka地址 p.setProperty("key.serializer", classOf[StringSerializer].getName) //key序列化 p.setProperty("value.serializer", classOf[StringSerializer].getName) //value序列化 p } sc.broadcast(KafkaSink[String,String](kafkaProducerConfig)) } /** * 每个摄像头单独推 * 将reduceB * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame * */ import org.apache.spark.sql.functions._ kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => { //获取偏移量信息 if (!rdd.isEmpty()) { //数据不为空 //第一步:得到原始数据 datacar= spark.createDataFrame(rdd, schema).select("lane_position","carType","cameraId","speed").collect() /* 拿到车辆的参数: * 筛选列 * 位置: lane_position:L,M,R * 车型: carType :car→1,bus→2 * 速度: speed * 摄像头编号: cameraId * */ if(datacar.size>0){ //第三步:原始数据与json的比对,得到对应的轨迹点 datacar.foreach(x⇒{ //第二步:拿到车辆的JSON信息 carjson=spark.read.json(getPath.get(x.apply(2)+""+x.apply(0)).get) //单个车辆单路摄像头的json轨迹 singleCarTrack=carjson.filter("cam_ID=='"+x.apply(2)+"' and lane=='"+x.apply(0)+"'").toDF() //(1)数据处理1:车型赋值 if(x.get(1).toString().equals("car")){ singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("1"))) }else{ singleCarTrack_1=singleCarTrack.withColumn("type",concat(singleCarTrack.col("type"),lit("2"))) } //(2)车辆编号,唯一即可 singleCarTrack_1=singleCarTrack_1.withColumn("fz_car_id",concat(singleCarTrack_1.col("fz_car_id"),lit(uuid))) //(3) 给当前的json设置递增的新id singleCarTrack_1=singleCarTrack_1.withColumn("newid",row_number().over(Window.partitionBy(lit(1)).orderBy(lit(1).cast(LongType)))) //(4) 数据处理2:根据当前速度算出每12米的毫秒数(毫秒/12米) val time=Math.abs(scala.math.round(init/Double.valueOf(x.get(3).toString()))) singleCarTrack_1=singleCarTrack_1.withColumn("time",(singleCarTrack_1("newid")-1)*time)//取DataFrame中的id((id-1)*(12米的毫秒数)) //把spark的json格式数据转java可用的json,追加 [ ] if(singleCarTrack_1.count()>0){ newJson=singleCarTrack_1.toJSON.collectAsList().toString() val rddrow=sc.makeRDD(Seq(newJson)) rddrow.foreach(record⇒{ kafkaProducer.value.send("GJTS_topic",record) }) } }) } } }) //启动采集器 ssc.start() //Driver等待采集器的执行,采集器终止,Driver也会终止 ssc.awaitTermination() } def getPath():mutable.Map[String, String]={ var mapPath: mutable.Map[String, String] = mutable.Map() mapPath("V140L")="in/left_lane/ZL.json" mapPath("V153L")="in/left_lane/ZL.json" mapPath("V108L")="in/left_lane/ZL.json" mapPath("V158L")="in/left_lane/ZL.json" mapPath("V122L")="in/left_lane/ZL.json" mapPath("V098L")="in/left_lane/ZL.json" mapPath("V150L")="in/left_lane/ZL.json" mapPath("V134L")="in/left_lane/ZL.json" mapPath("V085L")="in/left_lane/ZL.json" mapPath("V114L")="in/left_lane/ZL.json" mapPath("V146L")="in/left_lane/ZL.json" mapPath("V125L")="in/left_lane/ZL.json" mapPath("V143L")="in/left_lane/ZL.json" mapPath("V131L")="in/left_lane/ZL.json" mapPath("V102L")="in/left_lane/ZL.json" mapPath("V137L")="in/left_lane/ZL.json" mapPath("V089L")="in/left_lane/ZL.json" mapPath("V128L")="in/left_lane/ZL.json" mapPath("V093L")="in/left_lane/ZL.json" mapPath("V118L")="in/left_lane/ZL.json" // ------------------------------------------ mapPath("V140M")="in/left_lane/ZM.json" mapPath("V153M")="in/left_lane/ZM.json" mapPath("V108M")="in/left_lane/ZM.json" mapPath("V158M")="in/left_lane/ZM.json" mapPath("V122M")="in/left_lane/ZM.json" mapPath("V098M")="in/left_lane/ZM.json" mapPath("V150M")="in/left_lane/ZM.json" mapPath("V134M")="in/left_lane/ZM.json" mapPath("V085M")="in/left_lane/ZM.json" mapPath("V114M")="in/left_lane/ZM.json" mapPath("V146M")="in/left_lane/ZM.json" mapPath("V125M")="in/left_lane/ZM.json" mapPath("V143M")="in/left_lane/ZM.json" mapPath("V131M")="in/left_lane/ZM.json" mapPath("V102M")="in/left_lane/ZM.json" mapPath("V137M")="in/left_lane/ZM.json" mapPath("V089M")="in/left_lane/ZM.json" mapPath("V128M")="in/left_lane/ZM.json" mapPath("V093M")="in/left_lane/ZM.json" mapPath("V118M")="in/left_lane/ZM.json" // ------------------------------------------ mapPath("V140R")="in/left_lane/ZR.json" mapPath("V153R")="in/left_lane/ZR.json" mapPath("V108R")="in/left_lane/ZR.json" mapPath("V158R")="in/left_lane/ZR.json" mapPath("V122R")="in/left_lane/ZR.json" mapPath("V098R")="in/left_lane/ZR.json" mapPath("V150R")="in/left_lane/ZR.json" mapPath("V134R")="in/left_lane/ZR.json" mapPath("V085R")="in/left_lane/ZR.json" mapPath("V114R")="in/left_lane/ZR.json" mapPath("V146R")="in/left_lane/ZR.json" mapPath("V125R")="in/left_lane/ZR.json" mapPath("V143R")="in/left_lane/ZR.json" mapPath("V131R")="in/left_lane/ZR.json" mapPath("V102R")="in/left_lane/ZR.json" mapPath("V137R")="in/left_lane/ZR.json" mapPath("V089R")="in/left_lane/ZR.json" mapPath("V128R")="in/left_lane/ZR.json" mapPath("V093R")="in/left_lane/ZR.json" mapPath("V118R")="in/left_lane/ZR.json" //====================================== mapPath("V032L")="in/reght_lane/YL.json" mapPath("V072L")="in/reght_lane/YL.json" mapPath("V029L")="in/reght_lane/YL.json" mapPath("V005L")="in/reght_lane/YL.json" mapPath("V051L")="in/reght_lane/YL.json" //-------------------------------------- mapPath("V009L")="in/reght_lane/YL.json" mapPath("V027L")="in/reght_lane/YL.json" mapPath("V062L")="in/reght_lane/YL.json" mapPath("V039L")="in/reght_lane/YL.json" mapPath("V067L")="in/reght_lane/YL.json" //-------------------------------------- mapPath("V035L")="in/reght_lane/YL.json" mapPath("V058L")="in/reght_lane/YL.json" mapPath("V018L")="in/reght_lane/YL.json" mapPath("V045L")="in/reght_lane/YL.json" mapPath("V042L")="in/reght_lane/YL.json" //-------------------------------------- mapPath("V048L")="in/reght_lane/YL.json" mapPath("V014L")="in/reght_lane/YL.json" mapPath("V024L")="in/reght_lane/YL.json" mapPath("V076L")="in/reght_lane/YL.json" mapPath("V054L")="in/reght_lane/YL.json" //====================================== mapPath("V032M")="in/reght_lane/YM.json" mapPath("V072M")="in/reght_lane/YM.json" mapPath("V029M")="in/reght_lane/YM.json" mapPath("V005M")="in/reght_lane/YM.json" mapPath("V051M")="in/reght_lane/YM.json" //-------------------------------------- mapPath("V009M")="in/reght_lane/YM.json" mapPath("V027M")="in/reght_lane/YM.json" mapPath("V062M")="in/reght_lane/YM.json" mapPath("V039M")="in/reght_lane/YM.json" mapPath("V067M")="in/reght_lane/YM.json" //-------------------------------------- mapPath("V035M")="in/reght_lane/YM.json" mapPath("V058M")="in/reght_lane/YM.json" mapPath("V018M")="in/reght_lane/YM.json" mapPath("V045M")="in/reght_lane/YM.json" mapPath("V042M")="in/reght_lane/YM.json" //-------------------------------------- mapPath("V048M")="in/reght_lane/YM.json" mapPath("V014M")="in/reght_lane/YM.json" mapPath("V024M")="in/reght_lane/YM.json" mapPath("V076M")="in/reght_lane/YM.json" mapPath("V054M")="in/reght_lane/YM.json" //====================================== mapPath("V032R")="in/reght_lane/YR.json" mapPath("V072R")="in/reght_lane/YR.json" mapPath("V029R")="in/reght_lane/YR.json" mapPath("V005R")="in/reght_lane/YR.json" mapPath("V051R")="in/reght_lane/YR.json" //-------------------------------------- mapPath("V009R")="in/reght_lane/YR.json" mapPath("V027R")="in/reght_lane/YR.json" mapPath("V062R")="in/reght_lane/YR.json" mapPath("V039R")="in/reght_lane/YR.json" mapPath("V067R")="in/reght_lane/YR.json" //-------------------------------------- mapPath("V035R")="in/reght_lane/YR.json" mapPath("V058R")="in/reght_lane/YR.json" mapPath("V018R")="in/reght_lane/YR.json" mapPath("V045R")="in/reght_lane/YR.json" mapPath("V042R")="in/reght_lane/YR.json" //-------------------------------------- mapPath("V048R")="in/reght_lane/YR.json" mapPath("V014R")="in/reght_lane/YR.json" mapPath("V024R")="in/reght_lane/YR.json" mapPath("V076R")="in/reght_lane/YR.json" mapPath("V054R")="in/reght_lane/YR.json" mapPath } //车辆编号生成 def uuid(): String = { UUID.randomUUID().toString().replaceAll("-", "").toString() } //1.首先需要将KafkaProducer利用lazy val的方式进行包装 class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { //这是一个关键的想法,使我们能够绕过运行到NotSerializableExceptions。 lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { //确保在executor JVM关闭时,Kafka生产者发送 //关闭前任何缓冲的消息。 producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) } }
标签:lane,后往,案列,Kafka,apache,json,org,mapPath,import 来源: https://www.cnblogs.com/KdeS/p/14307041.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。