ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark任务中空间数据的序列化

2021-10-01 13:01:01  阅读:207  来源: 互联网

标签:kryo 对象 Kryo 空间数据 空间 Spark 序列化


一、引言

 

Spark是目前主流的分布式计算框架,通过利用内存存储中间计算结果的方式,优化了MapReduce框架并不擅长的迭代式计算。同时,Spark使用有向无环图(Directed Acyclic Graph,DAG)统筹和优化整个计算流程。另外,Spark基于弹性分布式数据集RDD(Resilient Distributed Datasets)提供了丰富的数据分析算子,大大简化了分布式计算应用的开发难度。序列化和反序列是Spark的一项基本操作。Spark在执行计算任务的过程中,需要在不同的机器之间交换数据,这一过程称为洗牌(Shuffle)。机器之间交换内存对象需要通过序列化和反序列化来实现,Spark使用Kryo序列化框架将内存中的Java对象序列化成字节数组,然后通过网络传输到另一台机器并反序列化成内存中的对象。序列化的效率直接影响着Spark任务的计算性能,应该尽量确保序列化过程快速高效且序列化后的字节数组尽可能小,以减少网络开销,加速计算效率。空间对象是空间分析的主体,在基于Spark的空间分析中,空间对象的序列化是一个重要的优化环节,本文将介绍如何通过向Spark注册空间对象序列化器的方式优化基于Spark的空间分析性能。二、优化方案Kryo序列化机制支持用户注册自定义的序列化器,以供用户在开发过程中通过自定义序列化器的方法优化计算性能。如果直接使用Kryo序列化一个空间对象,Kryo则会在记录该对象字段值的基础上记录字段类型等一系列元信息,所以会降低序列化性能,并增加生成的字节数组。JTS(Java Topology Suite)空间拓扑工具包提供了空间对象与WKB(Well-known Binary)格式之间的转换,WKB是OGC标准定义的QQ买卖平台地图空间对象的二进制表示格式,该格式用最少的字节数表示一个空间对象的完整信息。可以将空间对象与WKB之间的转换过程对应到空间对象的序列化和反序列化过程,并将通过Kryo序列化器的形式注册到Spark的序列化过程中,以优化基于Spark的空间分析性能。三、代码实现WKB的转换利用JTS提供的WKBReader和WKBWriter两个类来实现,这两个类是非线程安全的,为了在Spark的并发场景中重复利用,一般使用ThreadLocal对其进行缓存,Scala代码实现如下。

objectWKBUtils {
  private val readerPool = newThreadLocal[WKBReader]{
    override def initialValue:WKBReader =new WKBReader
  }
  private val writerPool = newThreadLocal[WKBWriter]{
    override def initialValue:WKBWriter =new WKBWriter
  }
  def read(bytes: Array[Byte]): Geometry= readerPool.get.read(bytes)
  def write(geom: Geometry): Array[Byte]= writerPool.get.write(geom)
}

为空间对象创建Kryo序列化器。序列化的实现通过调用WKBUtils的read和write方法来完成,Scala代码实现如下。

classGeometrySerializer extends Serializer[Geometry] {
	override def write(kryo: Kryo, output:Output, geom: Geometry): Unit= {
		val spatialBytes: Array[Byte] =WKBUtils.write(geom)
		output.writeInt(spatialBytes.length)
		output.writeBytes(spatialBytes)
	}
	override def read(kryo: Kryo, input:Input, clazz: Class[Geometry]):Geometry= {
		val length = input.readInt()
		WKBUtils.read(input.readBytes(length))
	}
}

向Spark注册创建好的Kryo序列化器。该过程通过继承Spark的KryoRegistrator类实现,Scala代码实现如下。因为Point、LineString、Polygon、MultiPoint、MultiLineString、MultiPolygon这六类空间数据类型都是Geometry的子类,都可以通过WKB的格式表示,因此共用同一个GeometrySerializer对象。

classGeometryKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo:Kryo): Unit = {
    val geometrySerializer = new GeometrySerializer

    kryo.register(classOf[Point],geometrySerializer)
    kryo.register(classOf[LineString],geometrySerializer)
    kryo.register(classOf[Polygon],geometrySerializer)
    kryo.register(classOf[MultiPoint],geometrySerializer)
    kryo.register(classOf[MultiLineString],geometrySerializer)
    kryo.register(classOf[MultiPolygon],geometrySerializer)
  }
}

将创建好的序列化注册器作为SparkConf的参数,在创建SparkContext的时候传入,便可在Spark的计算中对空间对象使用WKB的格式进行序列化和反序列化。

val conf= new SparkConf().set("spark.serializer",classOf[KryoSerializer].getName)
 .set("spark.kryo.registrator",classOf[GeometryKryoRegistrator].getName)
val spark = new SparkContext(conf)


四、实验与结论为了验证向Spark注册空间对象序列化器的有效性,本文以10320条表示轨迹的LineString作为测试数据集,在PC机上用多线程并行计算的模式做了对比实验。对比原始的Kryo序列化器和自定义的Kryo序列化器,以序列化时长和序列化字节数为指标观察其性能差异,实验结果如下表所示:由实验结果可知,原始的Kryo序列化机制耗时是自定义序列化器的3倍左右,序列化所得的字节数是自定义序列化器的1.7倍左右。总之,在使用Spark进行空间数据分析时,通过Kryo给Spark指定空间对象的WKB序列化机制可以提高序列化效率,并减少机器之间的数据传输量。

标签:kryo,对象,Kryo,空间数据,空间,Spark,序列化
来源: https://www.cnblogs.com/qiucunxin/p/15359213.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有