ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

6-spark_streaming

2022-01-06 12:02:33  阅读:165  来源: 互联网

标签:PYSPARK PYTHON streaming Streaming HOME spark os Spark


学习目标

  • 说出Spark Streaming的特点
  • 说出DStreaming的常见操作api
  • 能够应用Spark Streaming实现实时数据处理
  • 能够应用Spark Streaming的状态操作解决实际问题
  • 独立实现foreachRDD向mysql数据库的数据写入
  • 独立实现Spark Streaming对接kafka实现实时数据处理

1、sparkStreaming概述

1.1 SparkStreaming是什么

  • 它是一个可扩展,高吞吐具有容错性的流式计算框架

    吞吐量:单位时间内成功传输数据的数量

之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。

实时计算框架对比

Storm

  • 流式计算框架

  • 以record为单位处理数据

  • 也支持micro-batch方式(Trident)

Spark

  • 批处理计算框架

  • 以RDD为单位处理数据

  • 支持micro-batch流式处理数据(Spark Streaming)

对比:

  • 吞吐量:Spark Streaming优于Storm

  • 延迟:Spark Streaming差于Storm

1.2 SparkStreaming的组件

  • Streaming Context

    • 一旦一个Context已经启动(调用了Streaming Context的start()),就不能有新的流算子(Dstream)建立或者是添加到context中

    • 一旦一个context已经停止,不能重新启动(Streaming Context调用了stop方法之后 就不能再次调 start())

    • 在JVM(java虚拟机)中, 同一时间只能有一个Streaming Context处于活跃状态, 一个SparkContext创建一个Streaming Context

    • 在Streaming Context上调用Stop方法, 也会关闭SparkContext对象, 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false

    • 一个SparkContext对象可以重复利用去创建多个Streaming Context对象(不关闭SparkContext前提下), 但是需要关一个再开下一个

  • DStream (离散流)

    • 代表一个连续的数据流

    • 在内部, DStream由一系列连续的RDD组成

    • DStreams中的每个RDD都包含确定时间间隔内的数据

    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作

    • 数据源

      • 基本源

        • TCP/IP Socket

        • FileSystem

      • 高级源

        • Kafka

        • Flume

2、Spark Streaming编码实践

Spark Streaming编码步骤:

  • 1,创建一个StreamingContext

  • 2,从StreamingContext中创建一个数据对象

  • 3,对数据对象进行Transformations操作

  • 4,输出结果

  • 5,开始和停止

利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

1,需要安装一个nc工具:sudo yum install -y nc

2,执行指令:nc -lk 9999 -v

import os
JAVA_HOME = '/usr/local/java/jdk1.8.0_131'
PYSPARK_PYTHON = "/usr/local/python3/python"
SPARK_HOME = "/bigdata/spark-2.1.2-bin-hadoop2.3"
os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
# os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext("local[2]", appName="NetworkWordCount")
    # 参数2:指定执行计算的时间间隔
    ssc = StreamingContext(sc, 1)
    # 监听ip,端口上的上的数据
    lines = ssc.socketTextStream('localhost', 9999)
    # 将数据按空格进行拆分为多个单词
    words = lines.flatMap(lambda line: line.split(" "))
    # 将单词转换为(单词,1)的形式
    pairs = words.map(lambda word: (word, 1))
    # 统计单词个数
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)
    # 打印结果信息,会使得前面的transformation操作执行
    wordCounts.pprint()
    # 启动StreamingContext
    ssc.start()
    # 等待计算结束
    ssc.awaitTermination()

3、Spark Streaming的状态操作

在Spark Streaming中存在两种状态操作

  • UpdateStateByKey

  • Windows操作

使用有状态的transformation,需要开启Checkpoint

  • spark streaming 的容错机制

  • 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复

3.1 updateStateByKey

Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

无状态:指的是每个时间片段的数据之间是没有关联的。

需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作

一般超过一天都是用RDD或Spark SQL来进行离线批处理

如果没有UpdateStateByKey,我们需要将每一秒的数据计算好放入mysql中取,再用mysql来进行统计计算

Spark Streaming中提供这种状态保护机制,即updateStateByKey

步骤:

  • 首先,要定义一个state,可以是任意的数据类型

  • 其次,要定义state更新函数--指定一个函数如何使用之前的state和新值来更新state

  • 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除

  • 对于每个新出现的key,也会执行state更新函数

举例:词统计。

案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
​
# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
​
ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")
​
#定义state更新函数
def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)
​
lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数
    
counts.pprint()
​
ssc.start()
ssc.awaitTermination()

 

3.2 Windows

  • 窗口长度L:运算的数据量

  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

 

 

操作细节

  • Window操作是基于窗口长度和滑动间隔来工作的

  • 窗口的长度控制考虑前几批次数据量

  • 默认为批处理的滑动间隔来确定计算结果的频率

相关函数

  • Smart computation

  • invAddFunc

reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[num,Tasks])

func:正向操作,类似于updateStateByKey

invFunc:反向操作

 

例如在热词时,在上一个窗口中可能是热词,这个一个窗口中可能不是热词,就需要在这个窗口中把该次剔除掉

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

 

 

案例

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
​
def get_countryname(line):
    country_name = line.strip()
​
    if country_name == 'usa':
        output = 'USA'
    elif country_name == 'ind':
        output = 'India'
    elif country_name == 'aus':
        output = 'Australia'
    else:
        output = 'Unknown'
​
    return (output, 1)
​
if __name__ == "__main__":
    #定义处理的时间间隔
    batch_interval = 1 # base time unit (in seconds)
    #定义窗口长度
    window_length = 6 * batch_interval
    #定义滑动时间间隔
    frequency = 3 * batch_interval
​
    #获取StreamingContext
    spark = SparkSession.builder.master("local[2]").getOrCreate()
    sc = spark.sparkContext
    ssc = StreamingContext(sc, batch_interval)
    
    #需要设置检查点
    ssc.checkpoint("checkpoint")
​
    lines = ssc.socketTextStream('localhost', 9999)
    addFunc = lambda x, y: x + y
    invAddFunc = lambda x, y: x - y
    #调用reduceByKeyAndWindow,来进行窗口函数的调用
    window_counts = lines.map(get_countryname) \
        .reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
    #输出处理结果信息
    window_counts.pprint()
​
    ssc.start()
    ssc.awaitTermination()

 



 

标签:PYSPARK,PYTHON,streaming,Streaming,HOME,spark,os,Spark
来源: https://www.cnblogs.com/Live-up-to-your-youth/p/15770494.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有