ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark在kafka读数并发问题

2019-02-27 15:52:11  阅读:245  来源: 互联网

标签:读数 val 分区 partition Kafka topic subconcurrency spark kafka


也就是修改了 KafkaRDD 类的 getPartitions 方法:
原实现:
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
修改后的实现:
override def getPartitions: Array[Partition] = {

val subconcurrency = if (kafkaParams.contains(“topic.partition.subconcurrency”))
kafkaParams.getOrElse(“topic.partition.subconcurrency”,“1”).toInt
else 1
val numPartitions = offsetRanges.length

val subOffsetRanges: Array[OffsetRange] = new Array[OffsetRange](subconcurrency * numPartitions)
for (i <- 0 until numPartitions) {
val offsetRange = offsetRanges(i)
val step = (offsetRange.untilOffset - offsetRange.fromOffset) / subconcurrency

var from = -1L
var until = -1L

for (j <- 0 until subconcurrency) {
  from = offsetRange.fromOffset + j * step
  until = offsetRange.fromOffset + (j + 1) * step -1
  if (j == subconcurrency) {
    until = offsetRange.untilOffset
  }
  subOffsetRanges(i * subconcurrency + j) = OffsetRange.create(offsetRange.topic, offsetRange.partition, from, until)
}

}

subOffsetRanges.zipWithIndex.map{ case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray

}
这个方法的实现思想还是很简单的,就是通过设置 topic.partition.subconcurrency 参数,如果这个参数等于1,整个函数的执行效果和之前一样。但是如果这个参数大于1,则之前一个 Kafka 分区由一个 Spark 分区消费的数据变成由 topic.partition.subconcurrency 个 Spark 分区去消费,每个 Spark 分区消费的数据量相等。这个无疑会加快 Kafka 数据的消费,但是这种方法的问题也很明显:

如果数据的顺序很重要,这种方法会存在乱序的问题。
Spark 设计的 KafkaRDD 目的是让 Kafka Partition 和 Spark RDD Partition 一一对应,这样可以保证同一个分区里面的数据顺序,但是这种方法实现变成了 Kafka Partition 和 Spark RDD Partition 一对多的关系,无疑破坏了官方的原有设计。

到目前为止,上述 PR 被关闭,而且 SPARK-22056 一直处于 IN PROGRESS 状态,我猜这个最后可能也会被关闭掉。

那除了上面实现,我们还有其他实现吗?当然有,我们可以在处理数据之前通过 repartition 或 coalease 对数据进行重分区:

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder,
  StringDecoder](streamingContext, kafkaParams, topics).repartition(xxx).mapPartition(xxx)
这种方法的好处是,对同一类型的数据,先后顺序是不会乱的,因为同一类型的数据经过重分区最后还是会分发到同一个分区里面的。

但是这个方法的使用前提是数据重分区+后续处理的时间比没有重分区直接处理数据的时间要短,否则重分区的开销过大导致总的处理时间过长那就没意义了。

当然,我们可以可以通过在 RDD#mapPartitions 里面创建多个线程来处理同一个 RDD 分区里面的数据。

但是上面两种方法无法解决 Kafka 端数据倾斜导致的数据处理过慢的问题(也就是有些分区数据量相比其他分区大很多,光是从这些分区消费数据的时间就比其他分区要长很多)。针对这种情况,我们需要考虑 Kafka 分区设置是否合理?是否能够通过修改 Kafka 分区的实现来解决数据倾斜的问题。

如果不是 Kafka 数据倾斜导致的数据处理过慢的问题,而是所有 Kafka 分区的整体数据量就比较大,那这种情况我们可以考虑是否可以增加 Kafka 分区数?是否需要增加 Spark 的处理资源等。建议最好还是别使用多个线程处理同一个 Kafka 分区里面的数据。

标签:读数,val,分区,partition,Kafka,topic,subconcurrency,spark,kafka
来源: https://blog.csdn.net/weixin_43967444/article/details/87975929

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有