ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark -实时综合实战

2022-05-05 14:02:04  阅读:251  来源: 互联网

标签:实战 lazy String val 实时 ckpt Spark config spark


 

 

 

 

 

 

 

 

# Start HDFS
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
# Start YARN
yarn-daemon.sh start resourcemanager
yarn-daemon.sh start nodemanager
# Start MRHistoryServer
mr-jobhistory-daemon.sh start historyserver
# Start Spark HistoryServer
/export/server/spark/sbin/start-history-server.sh
# Start Zookeeper
zookeeper-daemon.sh start
# Start Kafka
kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
# Start HBase
hbase-daemon.sh start master
hbase-daemon.sh start regionserver
# Start search
elasticsearch-daemon.sh start
# Start Redis
/export/server/redis/bin/redis-server /export/server/redis/conf/redis.conf
整个实时综合案例所涉及的大数据技术框架,基本上都是企业实时业务使用的,通过此案例框 架整合使用,进一步掌握大数据框架技术与应用。    

1.2.2 应用开发环境

在前面创建的Maven Project工程中创建Maven Module模块,pom.xml文件中添加相关依赖:
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.4.5</spark.version>
<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
<hbase.version>1.2.0-cdh5.16.2</hbase.version>
<kafka.version>2.0.0</kafka.version>
<mysql.version>8.0.19</mysql.version>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Structured Streaming + Kafka 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming 与Kafka 0.10.0 集成依赖-->
<!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
-->
<!-- Spark Streaming 与Kafka 0.8.2.1 集成依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- HBase Client 依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop2-compat</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- MySQL Client 依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- 管理配置文件 -->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<!-- 根据ip转换为省市区 -->
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>1.7.2</version>
</dependency>
<!-- JSON解析库:fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.11</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
按照应用开发分层结构,需要在【src源码目录】下创建相关目录和包,具体如下:

 

 

# local mode
app.is.local=true
app.spark.master=local[3]
# kafka config
kafka.bootstrap.servers=node1.itcast.cn:9092
kafka.auto.offset.reset=largest
kafka.source.topics=orderTopic
kafka.etl.topic=orderEtlTopic
kafka.max.offsets.per.trigger=100000
# Kafka Consumer Group ID
streaming.etl.group.id=order-etl-1000
# Zookeeper Server
kafka.zk.url=node1.itcast.cn:2181/kafka200
# streaming checkpoint
streaming.etl.ckpt=datas/order-apps/ckpt/etl-ckpt/
streaming.hbase.ckpt=datas/order-apps/ckpt/hbase-ckpt/
streaming.es.ckpt=datas/order-apps/ckpt/es-ckpt/
streaming.amt.total.ckpt=datas/order-apps/ckpt/amt-total-ckpt/
streaming.amt.province.ckpt=datas/order-apps/ckpt/amt-province-ckpt/
streaming.amt.city.ckpt=datas/order-apps/ckpt/amt-city-ckpt/
##streaming.etl.ckpt=/spark/order-apps/ckpt/etl-ckpt/
##streaming.hbase.ckpt=/spark/order-apps/ckpt/hbase-ckpt/
##streaming.es.ckpt=/spark/order-apps/ckpt/es-ckpt/
##streaming.amt.total.ckpt=/spark/order-apps/ckpt/amt-total-ckpt/
##streaming.amt.province.ckpt=/spark/order-apps/ckpt/amt-province-ckpt/
##streaming.amt.city.ckpt=/spark/order-apps/ckpt/amt-city-ckpt/
# streaming stop file
stop.etl.file=datas/order-apps/stop/etl-stop
stop.hbase.file=datas/order-apps/stop/hbase-stop
stop.es.file=datas/order-apps/stop/es-stop
stop.state.file=datas/order-apps/stop/state-stop
##stop.etl.file=/spark/order-apps/stop/etl-stop
##stop.hbase.file=/spark/order-apps/stop/hbase-stop
##stop.es.file=/spark/order-apps/stop/es-stop
##stop.state.file=/spark/order-apps/stop/state-stop
# HBase Config
hbase.zk.hosts=node1.itcast.cn
hbase.zk.port=2181
hbase.zk.znode=/hbase
hbase.order.table=htb_orders
hbase.table.family=info
hbase.table.columns=orderId,userId,orderTime,ip,orderMoney,orderStatus,province,city
# Elasticsearch Config
es.nodes=node1.itcast.cn
es.port=9200
es.index.auto.create=true
es.write.operation=upsert
es.index.name=orders/index
es.mapping.id=orderId
# Redis Config
redis.host=node1.itcast.cn
redis.port=6379
redis.db=0
# 字典数据
ipdata.region.path=dataset/ip2region.db
##ipdata.region.path=hdfs://node1.itcast.cn:8020/spark/dataset/ip2region.db
其中应用开发时,采用本地模式运行,相关数据保存在本地文件系统,测试生产时使用HDFS 文件系统。 编写加载属性文件工具类:ApplicationConfig,位于【cn.itcast.spark.config】包,具体代码如下:
package cn.itcast.spark.config
import com.typesafe.config.{Config, ConfigFactory}
/**
* 加载应用Application属性配置文件config.properties获取属性值
*/
object ApplicationConfig {
// 加载属性文件
private val config: Config = ConfigFactory.load("config.properties")
/*
运行模式,开发测试为本地模式,测试生产通过--master传递
*/
lazy val APP_LOCAL_MODE: Boolean = config.getBoolean("app.is.local")
lazy val APP_SPARK_MASTER: String = config.getString("app.spark.master")
/*
Kafka 相关配置信息
*/
lazy val KAFKA_BOOTSTRAP_SERVERS: String = config.getString("kafka.bootstrap.servers")
lazy val KAFKA_AUTO_OFFSET_RESET: String = config.getString("kafka.auto.offset.reset")
lazy val KAFKA_SOURCE_TOPICS: String = config.getString("kafka.source.topics")
lazy val KAFKA_ETL_TOPIC: String = config.getString("kafka.etl.topic")
lazy val KAFKA_MAX_OFFSETS: String = config.getString("kafka.max.offsets.per.trigger")
lazy val KAFKA_ZK_URL: String = config.getString("kafka.zk.url")
lazy val STREAMING_ETL_GROUP_ID: String = config.getString("streaming.etl.group.id")
/*
Streaming流式应用,检查点目录
*/
lazy val STREAMING_ETL_CKPT: String = config.getString("streaming.etl.ckpt")
lazy val STREAMING_HBASE_CKPT: String = config.getString("streaming.hbase.ckpt")
lazy val STREAMING_ES_CKPT: String = config.getString("streaming.es.ckpt")
lazy val STREAMING_AMT_TOTAL_CKPT: String = config.getString("streaming.amt.total.ckpt")
lazy val STREAMING_AMT_PROVINCE_CKPT: String = config.getString("streaming.amt.province.ckpt")
lazy val STREAMING_AMT_CITY_CKPT: String = config.getString("streaming.amt.city.ckpt")
/*
Streaming流式应用,停止文件
*/
lazy val STOP_ETL_FILE: String = config.getString("stop.etl.file")
lazy val STOP_HBASE_FILE: String = config.getString("stop.hbase.file")
lazy val STOP_ES_FILE: String = config.getString("stop.es.file")
lazy val STOP_STATE_FILE: String = config.getString("stop.state.file")
/*
HBase 数据库连接信息及表的信息
*/
lazy val HBASE_ZK_HOSTS: String = config.getString("hbase.zk.hosts")
lazy val HBASE_ZK_PORT: String = config.getString("hbase.zk.port")
lazy val HBASE_ZK_ZNODE: String = config.getString("hbase.zk.znode")
lazy val HBASE_ORDER_TABLE: String = config.getString("hbase.order.table")
lazy val HBASE_ORDER_TABLE_FAMILY: String = config.getString("hbase.table.family")
lazy val HBASE_ORDER_TABLE_COLUMNS: Array[String] = config.getString("hbase.table.columns").split(",")
/*
Elasticsearch 连接信息
*/
lazy val ES_NODES: String = config.getString("es.nodes")
lazy val ES_PORT: String = config.getString("es.port")
lazy val ES_INDEX_AUTO_CREATE: String = config.getString("es.index.auto.create")
lazy val ES_WRITE_OPERATION: String = config.getString("es.write.operation")
lazy val ES_INDEX_NAME: String = config.getString("es.index.name")
lazy val ES_MAPPING_ID: String = config.getString("es.mapping.id")
/*
Redis 数据库
*/
lazy val REDIS_HOST: String = config.getString("redis.host")
lazy val REDIS_PORT: String = config.getString("redis.port")
lazy val REDIS_DB: String = config.getString("redis.db")
// 解析IP地址字典数据文件存储路径
lazy val IPS_DATA_REGION_PATH: String = config.getString("ipdata.region.path") }
每个属性变量前使用lazy,表示懒加载初始化,当第一次使用变量时,才会进行初始化。  

 

 

package cn.itcast.spark.utils
import cn.itcast.spark.config.ApplicationConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 工具类:构建SparkSession和StreamingContext实例对象
*/
object SparkUtils {
/**
* 获取SparkSession实例对象,传递Class对象
* @param clazz Spark Application字节码Class对象
* @return SparkSession对象实例
*/
def createSparkSession(clazz: Class[_]): SparkSession = {
// 1. 构建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.set("spark.debug.maxToStringFields", "2000")
.set("spark.sql.debug.maxToStringFields", "2000")
// 2. 判断应用是否本地模式运行,如果是设置值
if(ApplicationConfig.APP_LOCAL_MODE){
sparkConf
.setMaster(ApplicationConfig.APP_SPARK_MASTER)
// 设置Shuffle时分区数目
.set("spark.sql.shuffle.partitions", "3") }
// 3. 获取SparkSession实例对象
val session: SparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
// 4. 返回实例
session
}
/**
* 获取StreamingContext流式上下文实例对象
* @param clazz Spark Application字节码Class对象
* @param batchInterval 每批次时间间隔
*/
def createStreamingContext(clazz: Class[_], batchInterval: Int): StreamingContext = {
// 构建对象实例
val context: StreamingContext = StreamingContext.getActiveOrCreate(
() => {
// 1. 构建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName(clazz.getSimpleName.stripSuffix("$"))
.set("spark.debug.maxToStringFields", "2000")
.set("spark.sql.debug.maxToStringFields", "2000")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
// 2. 判断应用是否本地模式运行,如果是设置值
if(ApplicationConfig.APP_LOCAL_MODE){
sparkConf
.setMaster(ApplicationConfig.APP_SPARK_MASTER)
// 设置每批次消费数据最大数据量,生成环境使用命令行设置
.set("spark.streaming.kafka.maxRatePerPartition", "10000") }
// 3. 创建StreamingContext对象
new StreamingContext(sparkConf, Seconds(batchInterval))
} )
context // 返回对象
} }

 

其中应用开发本地模式运行时设置的相关属性,在测试和生成环境使用spark-submit提交应用, 通过--conf指定此属性的值。  

1.4 模拟交易订单数据

编程模拟生成交易订单数据,实时发送至Kafka Topic,为了简单起见交易订单数据字段如下, 封装到样例类OrderRecord中:  
/**
* 订单实体类(Case Class) * @param orderId 订单ID
* @param userId 用户ID
* @param orderTime 订单日期时间
* @param ip 下单IP地址
* @param orderMoney 订单金额
* @param orderStatus 订单状态
*/
case class OrderRecord(
orderId: String,
userId: String,
orderTime: String,
ip: String,
orderMoney: Double,
orderStatus: Int
)

1.4.1 创建 Topic

在整个实时综合案例中,原始的交易订单数据存储【orderTopic】,经过ETL后交易订单数据 存【orderEtlTopic】,关于Topic创建等操作命令如下:
# 启动Zookeeper
zookeeper-daemon.sh start
# 启动Kafka Broker
kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
# 查看Topic信息
kafka-topics.sh --list --zookeeper node1.itcast.cn:2181/kafka200
# 创建topic
kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3
--topic orderTopic
# 模拟生产者
kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic orderTopic
# 模拟消费者
kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderTopic --from-beginning
# 删除topic
kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderTopic
# 创建topic
kafka-topics.sh --create --zookeeper node1.itcast.cn:2181/kafka200 --replication-factor 1 --partitions 3
--topic orderEtlTopic
# 模拟消费者
kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic orderEtlTopic --from-beginning
# 删除topic
kafka-topics.sh --delete --zookeeper node1.itcast.cn:2181/kafka200 --topic orderEtlTopic

1.4.2 模拟数据

编写程序,实时产生交易订单数据,使用Json4J类库转换数据为JSON字符,发送Kafka Topic 中,代码如下:
package cn.itcast.spark.mock
import java.util.Properties
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
/**
* 模拟生产订单数据,发送到Kafka Topic中 * Topic中每条数据Message类型为String,以JSON格式数据发送
* 数据转换:
* 将Order类实例对象转换为JSON格式字符串数据(可以使用json4s类库)
*/
object MockOrderProducer {
def main(args: Array[String]): Unit = {
var producer: KafkaProducer[String, String] = null
try {
// 1. Kafka Client Producer 配置信息
val props = new Properties()
props.put("bootstrap.servers", "node1.itcast.cn:9092")
props.put("acks", "1")
props.put("retries", "3")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
// 2. 创建KafkaProducer对象,传入配置信息
producer = new KafkaProducer[String, String](props)
// 随机数实例对象
val random: Random = new Random()
// 订单状态:订单打开 0,订单取消 1,订单关闭 2,订单完成 3
val allStatus =Array(0, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
while(true){
// 每次循环 模拟产生的订单数目
val batchNumber: Int = random.nextInt(2) + 1 (1 to batchNumber).foreach{number =>
val currentTime: Long = System.currentTimeMillis()
val orderId: String = s"${getDate(currentTime)}%06d".format(number)
val userId: String = s"${1 + random.nextInt(5)}%08d".format(random.nextInt(1000))
val orderTime: String = getDate(currentTime, format="yyyy-MM-dd HH:mm:ss.SSS")
val orderMoney: String = s"${5 + random.nextInt(500)}.%02d".format(random.nextInt(100))
val orderStatus: Int = allStatus(random.nextInt(allStatus.length))
// 3. 订单记录数据
val orderRecord: OrderRecord = OrderRecord(
orderId, userId, orderTime, getRandomIp, orderMoney.toDouble, orderStatus
)
// 转换为JSON格式数据
val orderJson = new Json(org.json4s.DefaultFormats).write(orderRecord)
println(orderJson)
// 4. 构建ProducerRecord对象
val record = new ProducerRecord[String, String]("orderTopic", orderJson)
// 5. 发送数据:def send(messages: KeyedMessage[K,V]*), 将数据发送到Topic
producer.send(record)
}
Thread.sleep(random.nextInt(10) * 100 + 500) } }catch {
case e: Exception => e.printStackTrace()
}finally {
if(null != producer) producer.close()
} }
/**=================获取当前时间=================*/
def getDate(time: Long, format: String = "yyyyMMddHHmmssSSS"): String = {
val fastFormat: FastDateFormat = FastDateFormat.getInstance(format)
val formatDate: String = fastFormat.format(time) // 格式化日期
formatDate
}
/**================= 获取随机IP地址 =================*/
def getRandomIp: String = {
// ip范围
val range: Array[(Int, Int)] = Array(
(607649792,608174079), //36.56.0.0-36.63.255.255
(1038614528,1039007743), //61.232.0.0-61.237.255.255
(1783627776,1784676351), //106.80.0.0-106.95.255.255
(2035023872,2035154943), //121.76.0.0-121.77.255.255
(2078801920,2079064063), //123.232.0.0-123.235.255.255
(-1950089216,-1948778497),//139.196.0.0-139.215.255.255
(-1425539072,-1425014785),//171.8.0.0-171.15.255.255
(-1236271104,-1235419137),//182.80.0.0-182.92.255.255
(-770113536,-768606209),//210.25.0.0-210.47.255.255
(-569376768,-564133889) //222.16.0.0-222.95.255.255
)
// 随机数:IP地址范围下标
val random = new Random()
val index = random.nextInt(10)
val ipNumber: Int = range(index)._1 + random.nextInt(range(index)._2 - range(index)._1)
// 转换Int类型IP地址为IPv4格式
number2IpString(ipNumber)
}
/**=================将Int类型IPv4地址转换为字符串类型=================*/
def number2IpString(ip: Int): String = {
val buffer: Array[Int] = new Array[Int](4)
buffer(0) = (ip >> 24) & 0xff
buffer(1) = (ip >> 16) & 0xff
buffer(2) = (ip >> 8) & 0xff
buffer(3) = ip & 0xff
// 返回IPv4地址
buffer.mkString(".") } }

 

 

 

 

标签:实战,lazy,String,val,实时,ckpt,Spark,config,spark
来源: https://www.cnblogs.com/wq-9/p/16224466.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有