ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

大数据期末总结复习

2020-12-22 12:57:27  阅读:312  来源: 互联网

标签:总结 __ 复习 collect SparkContext rdd 期末 print Spark


信息来源于某位帅男 : 20道选择题,一题2分,2~3道大题:mapreduce求解,spark RDD,hdfs(选择题),hbase(数据表的选择设计问题,操作问题)

一、一些基本概念

1.python基础

# 1).单行注释用“#”,多行注释用一对‘’‘,或者"""包裹内容。
# 2).python的输入输出:
	a = int(input())
    b = int(input())
	print("%d + %d = %d" % (a, b, a + b))
# 3).嵌套与循环:
--------------------------------
	#判断语句
	if x <= 0 :
		x = -1
--------------------------------
	#按索引遍历
	for x in range(list):
		print(list(x))
--------------------------------
	#直接遍历值
	for x in list:
		print(x)
--------------------------------
	#while循环
	n = 5
	while n > 0 :
		print(" I love xr ! ")
		n=n-1
--------------------------------
# 4).函数:
	def move(x, y, step, angle=0):
    nx = x + step * math.cos(angle)
    ny = y - step * math.sin(angle)
    return nx, ny
    
    命令行调用运行结果如下:
    >>> x, y = move(100, 100, 60, math.pi / 6)
	>>> print(x, y)
	151.96152422706632 70.0
# 5).列表list:
	保留8位小数:list.append('{:.8}.format(4*pi)')

在这里插入图片描述

# 6).元组tuple:

在这里插入图片描述

# 7).pymysql链接数据库
	# 导入pymysql模块
	import pymysql
	# 连接database
	conn = pymysql.connect(host='localhost', port=3306,
                       user='root', passwd='root',
                       charset='utf8', db = 'mydb')
	# 得到一个可以执行SQL语句的游标对象
	cursor = conn.cursor()
	# 定义要执行的SQL语句
	sql = """
	CREATE TABLE USER1 (
		id INT auto_increment PRIMARY KEY ,
		name CHAR(10) NOT NULL UNIQUE,
		age TINYINT NOT NULL
	)ENGINE=innodb DEFAULT CHARSET=utf8;  #注意:charset='utf8' 不能写成utf-8
		"""
	# 执行SQL语句
	cursor.execute(sql)
	# 关闭游标对象
	cursor.close()
	# 关闭数据库连接
	conn.close()

2.分布式文件系统HDFS

当数据集的大小超过一台独立的物理计算机的存储能力时,就有必要对它进行分区并存储到若干台单独的计算机上,管理网络中跨多台计算机存储的文件系统称为分布式文件系统(Distributed FileSystem)。

Hadoop自带一个称为HDFS的分布式文件系统,即HDFS(Hadoop Distributed FileSystem)。有时也称之为DFS,他们是一回事儿。

NameNode与DataNode
HDFS有两类节点用来管理集群的数据,即一个namenode(管理节点)和多个datanode(工作节点)。namenode管理文件系统的命名空间,它维护着系统数及整棵树内所有的文件和目录,这些信息以两个形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件,namenode也记录着每个文件中各个块所在的数据节点信息,但它并不永久保存块的位置信息,因为这些信息会在系统启动时根据节点信息重建。

客户端(client)代表用户通过与namenode和datanode交互来访问整个系统。客户端提供一个类似POSIX(可移植操作系统界面)的文件系统结构,因此用户编程时无需知道namenode和datanode也可以实现功能。
datanode是文件系统的工作节点,他们根据需要存储并检索数据块(blocks),并且定期向namenode发送他们所存储的数据块的列表。

在这里插入图片描述

3.大数据计算框架MapReduce

什么是MapReduce?
个任务,任务是:挖掘分析我国气象中心近年来的数据日志,该数据日志大小有3T,让你分析计算出每一年的最高气温,如果你现在只有一台计算机,如何处理呢?我想你应该会读取这些数据,并且将读取到的数据与目前的最大气温值进行比较。比较完所有的数据之后就可以得出最高气温了。不过以我们的经验都知道要处理这么多数据肯定是非常耗时的。

如果我现在给你三台机器,你会如何处理呢?看到下图你应该想到了:最好的处理方式是将这些数据切分成三块,然后分别计算处理这些数据(Map),处理完毕之后发送到一台机器上进行合并(merge),再计算合并之后的数据,归纳(reduce)并输出。

这就是一个比较完整的MapReduce的过程了。

4.分布式数据库(Hbase)

# 启动hbase
start-hbase.sh
# 进入hbase shell 窗口
hbase shell
# 创建一个表
create 'test','data'
# 查看表是否创建成功
list
# 添加数据
put 'test','row1','data:1','value1'
put 'test','row2','data:2','value2'
# 获取数据
get 'test','row1'
# 查看所有的数据
scan 'test'

# 删除数据
delete 'test','row1'
# 删除表
disable 'test' #先将表设置为禁用
drop 'test'
1)Hbase分布式环境的整体架构

在这里插入图片描述

Zookeeper能为HBase提供协同服务,是HBase的一个重要组件,Zookeeper能实时的监控HBase的健康状态,并作出相应处理。

HMaster是HBase的主服务,他负责监控集群中所有的HRegionServer,并对表和Region进行管理操作,比如创建表,修改表,移除表等等。

HRegionServer是RegionServer的实例,它负责服务和管理多个HRegion 实例,并直接响应用户的读写请求。

HRegion是对表进行划分的基本单元,一个表在刚刚创建时只有一个Region,但是随着记录的增加,表会变得越来越大,HRegionServer会实时跟踪Region的大小,当Region增大到某个值时,就会进行切割(split)操作,由一个Region切分成两个Region。

总的来说,要部署一个分布式的HBase数据库,需要各个组件的协作,HBase通过Zookeeper进行分布式应用管理,Zookeeper相当于管理员,HBase将数据存储在HDFS(分布式文件系统)中,通过HDFS存储数据,所以我们搭建分布式的HBase数据库的整体思路也在这里,即将各个服务进行整合。

2). 搭建happybase环境,为编写python程序访问HBase中数据做好准备

happybase主要是用来操作hbase的,首先我们需要安装好happybse环境,然后启动hdfs和hbase,最后测试python-happybase即可。

使用happpybase连接HBase数据库:
import happybase
happybase.Connection(host=’localhost’, port=9090, timeout=None, autoconnect=True, table_prefix=None, table_prefix_separator=b’_’, compat=’0.98’, transport=’buffered’, protocol=’binary’)
获取连接实例
host:主机名
port:端口
timeout:超时时间
autoconnect:连接是否直接打开
table_prefix:用于构造表名的前缀
table_prefix_separator:用于table_prefix的分隔符
compat:兼容模式
transport:运输模式
protocol:协议

# 创建一个表
connection.create_table(
    ‘my_table’,
    {
        ‘cf1’: dict(max_versions=10),
        ‘cf2’:dict(max_versions=1,block_cache_enabled=False),
        ‘cf3’: dict(),  # use defaults
    }
)
此时,我们再通过connection.tables()查看可以使用的table,结果为[‘my_table’]
创建的table即my_table包含3个列族:cf1、cf2、cf3

5.大数据计算框架Spark

1). 安装与配置Scala开发环境

为什么我们要安装配置scala呢?
因为Spark框架底层是使用Scala开发的,使用Scala写出的代码远比java简洁,因此安装与配置scala环境是我们在学习Spark之前要完成的准备工作。

2). Spark架构

同样如上,我们首先要知道Spark是什么?
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。我们知道mapReduce,Spark就是类Hadoop MapReduce的通用并行框架,Spark具有hadoop MapReduce的所有优点,甚至比它能够更好地使用与数据挖掘和机器学习等需要迭代的MapReduce的算法。

在这里插入图片描述

基本概念:

Application:用户编写的Spark应用程序,包含一个Driver和多个Executor。

Driver:Spark中的Driver即运行上述Application的main函数并创建SparkContext,创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与 ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。

Executor:是运行在工作节点WorkerNode的一个进程,负责运行 Task。

RDD:弹性分布式数据集,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。

DAG:有向无环图,反映RDD之间的依赖关系。

Task:运行在Executor上的工作单元。

Job:一个Job包含多个RDD及作用于相应RDD上的各种操作。

Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,或者也被称为TaskSet,代表一组关联的,相互之间没有Shuffle依赖关系的任务组成的任务集。

Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型:

Standalon:Spark原生的资源管理,由Master负责资源的分配;
Apache Mesos:与Hadoop MR兼容性良好的一种资源调度框架;
Hadoop Yarn:主要是指Yarn中的ResourceManager。

3). Spark运行流程

在这里插入图片描述

  1. 构建Spark Application的运行环境,启动SparkContext;
  2. SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源;
  3. Executor向SparkContext申请Task;
  4. SparkContext构建DAG图,将DAG图分解成Stage 、并将 Stage封装成Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行;
  5. Task在Executor上运行,运行完释放所有资源。

4). RDD理解检测

在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用 RDD操作进行求值。简单的来说RDD就是一个集合,一个将集合中数据存储在不同机器上的集合。这么说清楚了吧?
不清楚就再来个例子,

# 如何使用集合并行化创建一个Spark RDD ?
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个1到8的列表List
    data = [1,2,3,4,5,6,7,8]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
    sum = rdd.collect()

    # 5.打印 rdd 的内容
    print(sum)

    # 6.停止 SparkContext
    sc.stop()
    #********** End **********#

现在我们知道什么是RDD了,再来讲一讲它的五大特性。

  1. 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
  1. 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个 RDD都会实现 compute 函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
  1. RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  1. 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  1. 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

5). SparkSQL

我们开始编写 Spark SQL,从何开始呢?答案就是SparkSession。
SparkSession 是 Spark SQL 的入口。要创建基本的 SparkSession,只需使用SparkSession.builder()。

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

有了 SparkSession,下一步就是创建 DataFrame
使用 SparkSession 可以从现有 RDD,Hive 表或 Spark 数据源(json,parquet,jdbc,orc,libsvm,csv,text)等格式文件创建DataFrame.
以下示例为读取 Json 文件创建 DataFrame。

df =spark.read.json("/people.json")

people.json 数据如下:
{“name”:“Michael”}
{“name”:“Andy”, “age”:30}
{“name”:“Justin”, “age”:19}

使用DataFrame

#打印Schema信息
df.printSchema()
#选择姓名列
df.select("name").show()

通过SQL语句的方式
#首先注册df为一个视图
df.createOrReplaceTempView("p")
#通过spark.sql("SQL语句")执行SQL语句
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()

# 写入并保存到指定路径
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
# 覆盖原有数据并保存到 F:\\test 路径下
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")

二、educoder原题

1.python基础

--------------------------------------------------------------
# 摄氏度的转换:华氏转摄氏度
def Table_For(min,max):
    #请在此处用for循环打印列表
    #   请在此添加实现代码   #
    # ********** Begin *********#
    print("华氏度\t\t摄氏度\t\t近似摄氏度")
    print("****************************************")
    for i in range(min, max + 10, 10):
        b = (i - 32)/1.8
        c = (i - 30) / 2
        print("%d\t\t%.1f\t\t%.1f" % (i,b, c))
    return 0
--------------------------------------------------------------
# 读取文件和将数据存储为文件
def solve(file):
    sum = []
    ans ={}
    with open(file, encoding='utf-8') as file_obj:
        lines = file_obj.readlines()
    for line in lines[2:]:
        k = line.rstrip().split('\t')
#        # print(k)
        sum.append(k)
    print(sum)
 solve('src/step1/constants.txt')
#********** End **********#

2.hdfs(选择题)

选择题?我觉得那可能就会考一些命令语句吧。

1). Linux下命令行

# 创建文件夹
mkdir /app
# 创建文件
touch hello.txt
# 切换到 /opt 目录下
cd /opt
# 解压压缩文件
tar -zxvf jdk-8u171-linux-x64.tar.gz
# 移动文件
mv jdk1.8.0_171/ /app
# 编辑配置文件
vim /etc/profile
# 使刚刚的配置生效
source /etc/profile

# 格式化HDFS文件
hadoop namenode -format
# 启动hadoop
start-dfs.sh
# 验证Hadoop

在这里插入图片描述

2). Hdfs下命令行H

# 在HDFS中创建文件夹
hadoop fs -mkdir /test
# 查看是否创建成功
hadoop fs -ls /
# 将文件上传至HDFS
hadoop fs -put hello.txt /test
# 查看文件
hadoop fs -cat /test/hello.txt

在这里插入图片描述

3.hbase(数据表的选择设计问题、操作问题)

1) 使用python代码向HBase表中并添加、删除数据,并查看数据
一、添加数据
table = connection.table(‘my_table’)       #首先获得表对象

cloth_data = {'cf1:content': 'jeans', 'cf1:price': '299', 'cf1:rating': '98%'}
hat_data = {'cf1:content': 'cap', 'cf1:price': '88', 'cf1:rating': '99%'}
# 使用put一次只能存储一行数据,如果row key已经存在,则变成了修改数据
table.put(row='www.test1.com', data=cloth_data)
table.put(row='www.test2.com', data=hat_data)
# 使用batch一次插入多行数据
bat = table.batch()
bat.put('www.test5.com', {'cf1:price': 999, 'cf2:title': 'Hello Python', 'cf2:length': 34, 'cf3:code': 'A43'})
bat.put('www.test6.com', {'cf1:content': 'razor', 'cf1:price': 168, 'cf1:rating': '97%'})
bat.send()
# 使用上下文管理器来管理batch,这样就不用手动发送数据了,即不再需要bat.send()
with table.batch() as bat:
    bat.put('www.test5.com', {'cf1:price': '999', 'cf2:title': 'Hello Python', 'cf2:length': '34', 'cf3:code': 'A43'})
    bat.put('www.test6.com', {'cf1:content': u'剃须刀', 'cf1:price': '168', 'cf1:rating': '97%'})
二、删除数据
with table.batch() as bat:
bat.delete(‘www.test1.com')
三、检索数据
 # 全局扫描一个table
 for key, value in table.scan():
    print key, value
 # 检索一行数据
 row = table.row(‘www.test4.com') print row
 # 检索多行数据
rows = table.rows([‘www.test1.com', ‘www.test4.com'])print rows

4.mapReduce求解

1) 统计两个文本文件中,每个单词出现的次数。
# mapper.py
#! /usr/bin/python3
import sys
def main():
    # 从标准输入流中接受数据行,对每一行调用mapper函数来处理
    for line in sys.stdin:
        line = line.strip()
        mapper(line)
# 每行分割为一个个单词,用word表示
# hadoop streaming要求用"键\t值"形式输出键值对
def mapper(line):
    words = line.split(' ')
    for word in words:
        if len(word.strip()) == 0:
            continue
        print("%s\t%s" % (word, 1))
if __name__ == '__main__':
    main()
-------------------------------------------------------------
#Reduce.py
#! /usr/bin/python3
import sys
from operator import itemgetter
# 对values求和,并按"单词\t词频"的形式输出。
def reducer(k, values):
    print("%s\t%s" % (k, sum(values)))
def main():
    current_key = None
    values = []
    _key, _value = '', 0
    for line in sys.stdin:
        line = line.strip()
        _key, _value = line.split('\t', 1)
        _value = eval(_value)
        if current_key == _key:
            values.append(_value)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(_value)
            current_key = _key
    # 不要忘记最后一个键值对
    if current_key == _key:
        reducer(current_key, values)
if __name__ == '__main__':
    main()
2) 对两个文件进行合并,并剔除其中重复的内容
# mapper.py
#! /usr/bin/python3
import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    ########## Begin  ###############
    num,st = line.split(' ')
    print("%s\t%s" %(num,st))
   ###########  End    #############
if __name__ == '__main__':
    main()
--------------------------------------------------------------
# Reduce.py
#! /usr/bin/python3
import sys

def reducer(k, values):
    ############  Begin   ################
    for value in sorted(list(set(values))):
        print("%s\t%s" %(k,value))
    ############   End    ################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    for line in sys.stdin:
        line = line.strip()
        akey, avalue = line.split('\t')
        
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
  
            values.append(avalue)
            current_key = akey
    
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()
        
3) 求挖掘其中的父子辈关系,给出祖孙辈关系的表格
# mapper.py
#! /usr/bin/python3
import sys
def main():
    for line in sys.stdin:
        line = line.strip()
        if line.startswith('child'):
            pass
        else:
            mapper(line)
           
def mapper(line):
    ###############  Begin   ############
    child, parent = line.split(' ')
    print("%s\t-%s" % (child, parent))
    print("%s\t+%s" % (parent, child))
    
    ###############   End    #############

if __name__ == '__main__':
    main()

# Reduce.py
#! /usr/bin/python3
import sys

def reducer(k, values):
    ##############    Begin    ################
    grandparents = []
    grandson = []
    for v in values:
        if v.startswith('-'):
            grandparents.append(v[1:])
        else:
            grandson.append(v[1:])
    for i in grandson:
        for j in grandparents:
            print("%s\t%s" % (i, j))
    ##############   End      #################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    print("grand_child\tgrand_parent")
    for line in sys.stdin:
        line = line.strip()
        try:
            akey, avalue = line.split('\t')
        except:
            continue
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(avalue)
            current_key = akey
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()

4) 数据清洗
#! /usr/bin/python3
# mapper.py
import sys
from dbhelper import DBHelper
import codecs
import time

# 获取“省市代码:省市名称”项并保存在字典regions中;
# 获取“电话号码:姓名”项并保存在字典userphones中。
regions = DBHelper.get_region()
userphones = DBHelper.get_userphones()

def main():
    # 正确输出utf-8编码的汉字
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    # 输出形如“邓二,张倩,13666666666,15151889601,2018-03-29 10:58:12,2018-03-29 10:58:42,30,黑龙江省,上海市”的字符串
    # 本题不需要reduce阶段,输出题目要求的内容即可,不需要使用“键\t值”的形式。
    ##########  begin      ##############
    items = line.split(',')
    caller = userphones.get(items[0])
    receiver = userphones.get(items[1])
    start_time = int(items[2])
    end_time = int(items[3])
    region_caller = regions.get(items[4])
    region_receiver = regions.get(items[5])
    print(caller,receiver,sep = ',',end = ',')
    print(','.join(items[:2]), end = ',')
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)),end=',')
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)),end=',')
    print(str(end_time - start_time), end = ',')
    print(region_caller, region_receiver, sep = ',')    
    

    ###########  End  #################

if __name__ == '__main__':
    main()
---------------------------------------------------------------
# dbhelper.py
import pymysql
import sys
import codecs

class DBHelper:
    def get_connection():
        # 根据题目提供的凭据建立到mysql服务器的连接"conn",注意字符集指定为"utf8mb4"
        ########  Begin   ############
        conn = pymysql.connect(
            host = 'localhost',
            user = 'root',
            password = '123123',
            db = 'mydb',
            port = 3306,
            charset = 'utf8mb4'
        )
        ########  End    ############    
        return conn

    @classmethod
    def get_region(cls):
        conn = cls.get_connection()
        regions = dict()
        with conn.cursor() as cur:
            #从数据库中查询所有的省市代码和省市名称,并保存到字典regions中。
            ############  Begin ###################
            sqltxt = 'select CodeNum, Address from allregion;'
            cur.execute(sqltxt)
            for row in cur.fetchall():
                regions[row[0]] = row[1].strip()
     
            ############  End    #################
        conn.close()
        return regions

    @classmethod
    def get_userphones(cls):
        conn = cls.get_connection()
        userphones = dict()
        with conn.cursor() as cur:
        #从数据库中查询所有的电话号码和对应的姓名,并保存到字典userphones中。
        ############  Begin ###################
            sqltxt = 'select phone, trueName from userphone;'
            cur.execute(sqltxt)
            for row in cur.fetchall():
                userphones[row[0]] = row[1]    
            ############  End    #################
        conn.close()
        return userphones

def main():
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    region = DBHelper.get_region()
    users = DBHelper.get_userphones() 
    '''
    for k, v in region.items():
        print(k, ':', v)
    print('-------------')

    for k, v in users.items():
        print(k, ':', v)
    '''
if __name__ == '__main__':
    main()

5.SparkRDD

在这里插入图片描述

Spark-submit计算圆周率在这里插入图片描述

在这里插入图片描述

1).读取外部数据集创建RDD

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == '__main__':
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    
    # 文本文件 RDD 可以使用创建 SparkContext 的textFile 方法。此方法需要一个 URI的 文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合
    # 2.读取本地文件,URI为:/root/wordcount.txt
    distFile = sc.textFile("/root/wordcount.txt")
    
    # 3.使用 rdd.collect() 收集 rdd 的内容。 rdd.collect() 是 Spark Action 算子,在后续内容中将会详细说明,主要作用是:收集 rdd 的数据内容
    s = distFile.collect()
    
    # 4.打印 rdd 的内容
    print(s)

    # 5.停止 SparkContext
    sc.stop()

    #********** End **********#

2). 使用 map 算子,将偶数转换成该数的平方;奇数转换成该数的立方。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")

    # 2.创建一个1到5的列表List
    list1 = [1,2,3,4,5] 

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(list1)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
    需求:
        偶数转换成该数的平方
        奇数转换成该数的立方
    """
    # 5.使用 map 算子完成以上需求
    rdd_map = rdd.map(lambda x : x*x if x%2 == 0 else x*x*x)
    
    # 6.使用rdd.collect() 收集完成 map 转换的元素
    print(rdd_map.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

3).使用 mapPartitions 算子,将字符串与该字符串的长度组合成一个元组。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

#********** Begin **********#
def f(iterator):
    list1 = []
    for x in iterator:
        length = len(x)
        list1.append((x,length))
    return list1

#********** End **********#

if __name__ == "__main__":
    #********** Begin **********#
    
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
    data = ["dog", "salmon", "salmon", "rat", "elephant"]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
    需求:
        将字符串与该字符串的长度组合成一个元组,例如:
        dog  -->  (dog,3)
        salmon   -->  (salmon,6)
    """

    # 5.使用 mapPartitions 算子完成以上需求
    partitions = rdd.mapPartitions(f)

    # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
    print(partitions.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

4).使用 filter 算子,过滤掉rdd(1, 2, 3, 4, 5, 6, 7, 8) 中的所有奇数。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个1到8的列表List
    data = [1,2,3,4,5,6,7,8]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
    需求:
        过滤掉rdd中的奇数
    """
    # 5.使用 filter 算子完成以上需求
    rdd_filter = rdd.filter(lambda x:x%2==0)

    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(rdd_filter.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

5) . 使用 flatMap 算子,合并RDD的元素:([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
       #********** Begin **********#
       
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")
    # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
    data = [[1,2,3],[4,5,6],[7,8,9]]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())
    """
        使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
        需求:
            合并RDD的元素,例如:
                            ([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)
                            ([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)
        """
    # 5.使用 filter 算子完成以上需求
    flat_map = rdd.flatMap(lambda x:x)
    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(flat_map.collect())
    # 7.停止 SparkContext
    sc.stop()
    #********** End **********#

6). 使用 distinct 算子,将 rdd 中的数据进行去重。


# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
    data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素去重,例如:
                        1,2,3,3,2,1  --> 1,2,3
                        1,1,1,1,     --> 1
       """
    # 5.使用 distinct 算子完成以上需求
    distinct = rdd.distinct()

    # 6.使用rdd.collect() 收集完成 distinct 转换的元素
    print(distinct.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

7). 使用 sortBy 算子,将 rdd 中的数据进行排序(升序)

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())


    """
       使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            5,4,3,1,2  --> 1,2,3,4,5
       """
    # 5.使用 sortBy 算子完成以上需求
    by = rdd.sortBy(lambda x: x)

    # 6.使用rdd.collect() 收集完成 sortBy 转换的元素
    print(by.collect())

    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

8). 使用 sortByKey 算子,将 rdd 中的数据进行排序(升序)。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
    data=[('B',1),('A',2),('C',3)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            [(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]
       """
    # 5.使用 sortByKey 算子完成以上需求
    key = rdd.sortByKey()

    # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
    print(key.collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

9). 使用mapValues算子,将偶数转换成该数的平方,奇数转换成该数的立方

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
    data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
           使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
           需求:
               元素(key,value)的value进行以下操作:
                                                偶数转换成该数的平方
                                                奇数转换成该数的立方
    """
    # 5.使用 mapValues 算子完成以上需求
    values = rdd.mapValues(lambda x: x*x if x%2==0 else x*x*x)

    # 6.使用rdd.collect() 收集完成 mapValues 转换的元素
    print(values.collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

10).使用 reduceByKey 算子,将 rdd(key-value类型) 中的数据进行值累加。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
    data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
          使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
          需求:
              元素(key-value)的value累加操作,例如:
                                                (1,1),(1,1),(1,2)  --> (1,4)
                                                (1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)
    """
    # 5.使用 reduceByKey 算子完成以上需求
    # ruduceBy = rdd.reduceByKey(lambda x,y:x+y).collect()

    # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
    print(rdd.reduceByKey(lambda x,y:x+y).collect())

    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

11).Actions - 常用算子

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local", "Simple App")

    # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2];

    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)

    # 4.收集rdd的所有元素并print输出
    print(rdd.collect())

    # 5.统计rdd的元素个数并print输出
    print(rdd.count())

    # 6.获取rdd的第一个元素并print输出
    print(rdd.first())

    # 7.获取rdd的前3个元素并print输出
    print(rdd.take(3))

    # 8.聚合rdd的所有元素并print输出
    print(rdd.reduce(lambda x,y:x+y))

    # 9.停止 SparkContext
    sc.stop()

    # ********** End **********#

6. SparkSQL

在这里插入图片描述

1). 使用Spark SQL统计战斗机飞行性能


# coding=utf-8


from pyspark.sql import SparkSession

#**********Begin**********#

#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .master("local") \
    .getOrCreate()
#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json")
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球飞行速度排名前三的战斗机
sqlDF = spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) as SPEED, `名称` from table1 order by SPEED desc LIMIT 3")
#保存结果
sqlDF.write.format("csv").save("/root/airspark")

#**********End**********#
spark.stop()

2). 使用Spark SQL统计各个研发单位研制战斗机占比


# coding=utf-8


from pyspark.sql import SparkSession

#**********Begin**********#

#创建SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .master("local") \
    .getOrCreate()
    
#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")

#统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
sqlDF = spark.sql("select concat(cast(round(count(`研发单位`)*100/(select count(`研发单位`) from table1 where `研发单位` is not null and `名称` is not null ),2) as float),'%'),`研发单位` from table1 where `研发单位` is not null and `名称` is not null group by `研发单位`")

#保存结果
sqlDF.write.format("csv").save("/root/airspark")
#**********End**********#

spark.stop()

3).出租车轨迹数据清洗

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
    
    #**********begin**********#
    df = spark.read.option("header", True).option("delimiter", "\t").csv("/root/data.csv")
    df.createOrReplaceTempView("table1")
    DataFrame = spark.sql(
    """
    select\
    regexp_replace(TRIP_ID,'[$@]','')  TRIP_ID,regexp_replace(CALL_TYPE,'[$@]','') CALL_TYPE,regexp_replace(ORIGIN_CALL,'[$@]','') ORIGIN_CALL,regexp_replace(TAXI_ID,'[$@]','') TAXI_ID,regexp_replace(ORIGIN_STAND,'[$@]','') ORIGIN_STAND,regexp_replace(TIMESTAMP,'[$@]','') TIMESTAMP, regexp_replace(POLYLINE,'[$@,-.\\\\[\\\\]]','') POLYLINE\
    from table1
    """)
    DataFrame.show()
    #**********end**********#
    spark.stop()

标签:总结,__,复习,collect,SparkContext,rdd,期末,print,Spark
来源: https://blog.csdn.net/Zheng_lan/article/details/111413480

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有