ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark stream消费kafka Exactly-once

2021-04-03 20:33:57  阅读:243  来源: 互联网

标签:String val Exactly 偏移量 kafka topic offset stream 数据


  • 精确一次消费(Exactly-once)
    是指消息一定会被处理且只会被处理一次。不多不少就一次处理
  • 至少一次消费(at least once)
    主要是保证数据不会丢失,但有可能存在数据重复问题
  • 最多一次消费 (at most once)
    主要是保证数据不会重复,但有可能存在数据丢失问题

数据丢失

  • 实时计算任务进行计算,到数据结果存盘之前,进程崩溃,假设在进程崩溃前 kafka
    调整了偏移量,那么 kafka 就会认为数据已经被处理过,即使进程重启,kafka 也会从新的
    偏移量开始,所以之前没有保存的数据就被丢失掉了
    数据丢失

重复消费

  • 如果数据计算结果已经存盘了,在 kafka 调整偏移量之前,进程崩溃,那么 kafka 会
    认为数据没有被消费,进程重启,会重新从旧的偏移量开始,那么数据就会被 2 次消费,
    又会被存盘,数据就被存了 2 遍,造成数据重复。
    kafka重复消费
目前 Kafka 默认每 5 秒钟做一次自动提交偏移量,这样并不能保证精准一次消费
enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。
auto.commit.interval.ms 的默认值是 5000,单位是毫秒。

利用关系型数据库的事务进行处理

  • 偏移量的提交与数据的保存,不是原子性的。如果能做成要么数据保存和偏移量都成功,要么两个失败,那么就不会出现丢失或者重复了。
  • 数据必须都要放在某一个关系型数据库中,无法使用其他功能强大的 nosql 数据库
  • 事务本身性能不好
  • 如果保存的数据量较大一个数据库节点不够,多个节点的话,还要考虑分布式事务
    的问题。分布式事务会带来管理的复杂性,一般企业不选择使用,有的企业会把分
    布式事务变成本地事务,例如把 Executor 上的数据通过 rdd.collect 算子提取到
    Driver 端,由 Driver 端统一写入数据库,这样会将分布式事务变成本地事务的单
    线程操作,降低了写入的吞吐量
    关系数据库事务处理

手动提交偏移量+幂等性处理

  • 数据丢失问题,办法就是要等数据保存成功后再提交偏移量,所以就必须手工
    来控制偏移量的提交时机
  • 把数据的保存做成幂等性保存。即同一批数据反复保存多次,数据不会翻倍,保存一次和保
    存一百次的效果是一样的。利用数据存储的唯一约束、索引、主键保证插入、更新操作的幂等性
    手动offset
xxDstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
因为 offset 的存储于 HasOffsetRanges,只有 kafkaRDD 继承了他,所以假如我们对
KafkaRDD 进行了转化之后,其它 RDD 没有继承 HasOffsetRanges,所以就无法再获取
offset 了。
利用 ZooKeeper,Redis,Mysql 等工具手动对偏移量进行保存

kafka offset

  • 从Redis中获取偏移量
// type:hash   key: offset:topic:groupId   field:partition   value: 偏移量
  def getOffset(topic: String, groupId: String): Map[TopicPartition, Long] = {
    //获取客户端连接
    val jedis: Jedis = MyRedisUtil.getJedisClient()
    //拼接操作redis的key     offset:topic:groupId
    var offsetKey = "offset:" + topic + ":" + groupId
    //获取当前消费者组消费的主题  对应的分区以及偏移量
    val offsetMap: util.Map[String, String] = jedis.hgetAll(offsetKey)
    //关闭客户端
    jedis.close()

    //将java的map转换为scala的map
    import scala.collection.JavaConverters._
    val oMap: Map[TopicPartition, Long] = offsetMap.asScala.map {
      case (partition, offset) => {
        println("读取分区偏移量:" + partition + ":" + offset)
        //Map[TopicPartition,Long]
        (new TopicPartition(topic, partition.toInt), offset.toLong)
      }
    }.toMap
    
    oMap
  }
  • 将偏移量信息保存到Redis中
 def saveOffset(topic: String, groupId: String, offsetRanges: Array[OffsetRange]): Unit = {
    //拼接redis中操作偏移量的key
    var offsetKey = "offset:" + topic + ":" + groupId
    //定义java的map集合,用于存放每个分区对应的偏移量
    val offsetMap: util.HashMap[String, String] = new util.HashMap[String, String]()

    //对offsetRanges进行遍历,将数据封装offsetMap
    for (offsetRange <- offsetRanges) {
      val partitionId: Int = offsetRange.partition
      val fromOffset: Long = offsetRange.fromOffset
      val untilOffset: Long = offsetRange.untilOffset
      offsetMap.put(partitionId.toString, untilOffset.toString)
    }

    val jedis: Jedis = MyRedisUtil.getJedisClient()
    jedis.hmset(offsetKey, offsetMap)
    jedis.close()
  }
  • 将数据批量的保存到ES中
 filteredDStream.foreachRDD {
      rdd => {
        //以分区为单位对数据进行处理
        rdd.foreachPartition {
          jsonObjItr => {
            val dauInfoList: List[(String, DauInfo)] = jsonObjItr.map {
              jsonObj => {
                .............
                val moreInfo = MoreInfo()
                (moreInfo.mac, moreInfo)
              }
            }.toList

            //将数据批量的保存到ES中
            val dt: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
            MyESUtil.bulkInsert(list, "index_more_info")
          }
        }
        //提交偏移量到Redis中
        OffsetManagerUtil.saveOffset(topic, groupId, offsetRanges)
      }

标签:String,val,Exactly,偏移量,kafka,topic,offset,stream,数据
来源: https://blog.csdn.net/wolfjson/article/details/115419601

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有