ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

[DB] Spark Streaming

2020-06-14 23:55:45  阅读:286  来源: 互联网

标签:val DB Streaming import apache org Spark spark ssc


概述

  • 流式计算框架,类似Storm
  • 严格来说不是真正的流式计算(实时计算),而是把连续的数据当做不连续的RDD处理,本质是离散计算
  • Flink:和 Spark Streaming 相反,把离散数据当成流式数据处理

基础

  • 易用,已经集成在Spark中
  • 容错性,底层也是RDD
  • 支持Java、Scala、Python

WordCount

  • nc -l -p 1234
  • bin/run-example streaming.NetworkWordCount localhost 1234
  • cpu核心数必须>1,不记录之前的状态
 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.{Seconds, StreamingContext}
 4 
 5 // 创建一个StreamingContext,创建一个DSteam(离散流)
 6 // DStream表现形式:RDD
 7 // 使用DStream把连续的数据流变成不连续的RDD
 8 object MyNetworkWordCount {
 9   def main(args: Array[String]): Unit = {
10     // 创建一个StreamingContext对象,以local模式为例
11     // 保证CPU核心>=2,setMaster("[2]"),开启两个线程
12     val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
13 
14     // 两个参数:1.conf 和 2.采样时间间隔:每隔3s
15     val ssc = new StreamingContext(conf,Seconds(3))
16 
17     // 创建DStream,从netcat服务器接收数据
18     val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY)
19 
20     // 进行单词计数
21     val words = lines.flatMap(_.split(" "))
22 
23     // 计数
24     val wordCount = words.map((_,1)).reduceByKey(_+_)
25 
26     // 打印结果
27     wordCount.print()
28 
29     // 启动StreamingContext,进行计算
30     ssc.start()
31 
32     // 等待任务结束
33     ssc.awaitTermination()
34   }
35 }
View Code

高级特性

  • 什么是DStream:离散流,把连续的数据流变成不连续的RDD

  • transform
  • updateStateByKey(func):累加之前的结果,设置检查点,把之前的结果保存到检查点目录下
    • hdfs dfs -mkdir -p /day0614/ckpt
    • hdfs dfs -ls /day0614/ckpt
 1 import org.apache.spark.SparkConf
 2 import org.apache.spark.storage.StorageLevel
 3 import org.apache.spark.streaming.{Seconds, StreamingContext}
 4 
 5 // 创建一个StreamingContext,创建一个DSteam(离散流)
 6 // DStream表现形式:RDD
 7 // 使用DStream把连续的数据流变成不连续的RDD
 8 object MyTotalNetworkWordCount {
 9   def main(args: Array[String]): Unit = {
10     // 创建一个StreamingContext对象,以local模式为例
11     // 保证CPU核心>=2,setMaster("[2]"),开启两个线程
12     val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
13 
14     // 两个参数:1.conf 和 2.采样时间间隔:每隔3s
15     val ssc = new StreamingContext(conf,Seconds(3))
16 
17     // 设置检查点目录,保存之前状态
18     ssc.checkpoint("hdfs://192.168.174.111:9000/day0614/ckpt")
19 
20     // 创建DStream,从netcat服务器接收数据
21     val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY)
22 
23     // 进行单词计数
24     val words = lines.flatMap(_.split(" "))
25 
26     // 计数
27     val wordPair = words.map(w => (w,1))
28 
29     // 定义值函数
30     // 两个参数:1.当前的值 2.之前的结果
31     val addFunc = (curreValues:Seq[Int],previousValues:Option[Int])=>{
32       // 把当前序列进行累加
33       val currentTotal = curreValues.sum
34 
35       // 在之前的值上再累加
36       // 如果之前没有值,返回0
37       Some(currentTotal + previousValues.getOrElse(0))
38     }
39 
40     // 累加计算
41     val total = wordPair.updateStateByKey(addFunc)
42 
43     total.print()
44 
45     ssc.start()
46 
47     ssc.awaitTermination()
48 
49   }
50 }
View Code

  • 窗口操作
    • 只统计在窗口中的数据
    • Exception in thread "main" java.lang.Exception: The slide duration of windowed DStream (10000 ms) must be a multiple of the slide duration of parent DStream (3000 ms)
    • 滑动距离必须是采样频率的整数倍
 1 package day0614
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.SparkConf
 5 import org.apache.spark.storage.StorageLevel
 6 import org.apache.spark.streaming.{Seconds, StreamingContext}
 7 
 8 
 9 object MyNetworkWordCountByWindow {
10   def main(args: Array[String]): Unit = {
11     // 不打印日志
12     Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
13     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
14     // 创建一个StreamingContext对象,以local模式为例
15     // 保证CPU核心>=2,setMaster("[2]"),开启两个线程
16     val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
17 
18     // 两个参数:1.conf 和 2.采样时间间隔:每隔3s
19     val ssc = new StreamingContext(conf,Seconds(3))
20 
21     // 创建DStream,从netcat服务器接收数据
22     val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY)
23 
24     // 进行单词计数
25     val words = lines.flatMap(_.split(" ")).map((_,1))
26 
27     // 每9s,把过去30s的数据进行WordCount
28     // 参数:1.操作 2.窗口大小 3.窗口滑动距离
29     val result = words.reduceByKeyAndWindow((x:Int,y:Int)=>(x+y),Seconds(30),Seconds(9))
30 
31     result.print()
32     ssc.start()
33     ssc.awaitTermination()
34   }
35 }
View Code

  • 集成Spark SQL
    • 使用SQL语句分析流式数据
 1 package day0614
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.SparkConf
 5 import org.apache.spark.sql.SparkSession
 6 import org.apache.spark.storage.StorageLevel
 7 import org.apache.spark.streaming.{Seconds, StreamingContext}
 8 
 9 object MyNetworkWordCountWithSQL {
10   def main(args: Array[String]): Unit = {
11     Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
12     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
13     // 创建一个StreamingContext对象,以local模式为例
14     // 保证CPU核心>=2,setMaster("[2]"),开启两个线程
15     val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
16 
17     // 两个参数:1.conf 和 2.采样时间间隔:每隔3s
18     val ssc = new StreamingContext(conf,Seconds(3))
19 
20     // 创建DStream,从netcat服务器接收数据
21     val lines = ssc.socketTextStream("192.168.174.111",1234,StorageLevel .MEMORY_ONLY)
22 
23     // 进行单词计数
24     val words = lines.flatMap(_.split(" "))
25 
26     // 集成Spark SQL,使用SQL语句进行WordCount
27     words.foreachRDD(rdd=> {
28       // 创建SparkSession对象
29       val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
30 
31       // 把rdd转成DataFrame
32       import spark.implicits._
33       val df1 = rdd.toDF("word") // 表df1:只有一个列"word"
34 
35       // 创建视图
36       df1.createOrReplaceTempView("words")
37 
38       // 执行SQL,通过SQL执行WordCount
39       spark.sql("select word,count(*) from words group by word").show
40     })
41 
42     ssc.start()
43     ssc.awaitTermination()
44   }
45 }
View Code

数据源

  • 基本数据源
    • 文件流
 1 import org.apache.log4j.{Level, Logger}
 2 import org.apache.spark.SparkConf
 3 import org.apache.spark.storage.StorageLevel
 4 import org.apache.spark.streaming.{Seconds, StreamingContext}
 5 
 6 object FileStreaming {
 7   def main(args: Array[String]): Unit = {
 8     Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
 9     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
10     // 创建一个StreamingContext对象,以local模式为例
11     // 保证CPU核心>=2,setMaster("[2]"),开启两个线程
12     val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
13 
14     // 两个参数:1.conf 和 2.采样时间间隔:每隔3s
15     val ssc = new StreamingContext(conf,Seconds(3))
16 
17     // 直接监控本地的某个目录,如果有新的文件产生,就读取进来
18     val lines = ssc.textFileStream("F:\\idea-workspace\\temp")
19 
20     lines.print()
21     ssc.start()
22     ssc.awaitTermination()
23   }
24 }
View Code
    • RDD队列流
 1 import org.apache.log4j.{Level, Logger}
 2 import org.apache.spark.SparkConf
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 import scala.collection.mutable.Queue
 7 
 8 object RDDQueueStream {
 9   def main(args: Array[String]): Unit = {
10     Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
11     Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
12     // 创建一个StreamingContext对象,以local模式为例
13     // 保证CPU核心>=2,setMaster("[2]"),开启两个线程
14     val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
15 
16     // 两个参数:1.conf 和 2.采样时间间隔:每隔1s
17     val ssc = new StreamingContext(conf,Seconds(1))
18 
19     // 创建队列,作为数据源
20     val rddQueue = new Queue[RDD[Int]]()
21     for(i<-1 to 3){
22       rddQueue += ssc.sparkContext.makeRDD(1 to 10)
23       // 睡1s
24       Thread.sleep(1000)
25     }
26 
27     // 从队列中接收数据,创建DStream
28     val inputDStream = ssc.queueStream(rddQueue)
29 
30     // 处理数据
31     val result = inputDStream.map(x=>(x,x*2))
32     result.print()
33 
34     ssc.start()
35     ssc.awaitTermination()
36   }
37 }
View Code
    • 套接字流(socketTextStream)
  • 高级数据源
    • Flume
    • Kafka

Kafka

  • 概述
    • 一种高吞吐量的分布式发布订阅消息系统
    • 消息类型:主体Topic(广播)、队列Queue(一对一)
    • 消息系统类型:同步消息系统、异步消息系统
    • 常见消息产品:Redis、Kafka、JMS
  • 安装
    • config/server.properties
    • bin/kafka-server-start.sh config/server.properties &
    • bin/kafka-topics.sh --create --zookeeper bigdata111:2181 -replication-factor 1 --partitions 3 --topic mydemo1
    • bin/kafka-console-producer.sh --broker-list bigdata111:9092 --topic mydemo1
    • bin/kafka-console-consumer.sh --zookeeper bigdata111:2181 --topic mydemo1

性能优化参数

 

标签:val,DB,Streaming,import,apache,org,Spark,spark,ssc
来源: https://www.cnblogs.com/cxc1357/p/13118581.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有