ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink sink 到 kafka,并行度与分区的关系

2022-06-01 14:33:51  阅读:174  来源: 互联网

标签:10 partition Flink ts value 并行度 sink id user


Flink 版本: 1.15.0

问题

在社区看到以下问题:


请教个问题哈,sink 到 kafka,采用默认的分区器,是不是每个并行度都会与kafka的partition维护一个连接

比如 10 个并行度,3个 partition,那么维护的连接数总共为 10*3 个

?  是的

还是一个taskManager建立一个生产者 一个生产者对应多个分区

一个taskManager里面多个slot共享一个生产者? no

刚好想起,之前有个分析程序,用 FlinkKafkaProducer 写数据到 kafka,sink 只有一个并行度,sink 的 topic 有多个分区,数据永远只往 分区 0 发送数据

测试程序

来一个简单的测试程序:

  1. 读取 kafka 数据
  2. 来个 map 算子处理一下,在数据上加入当前的 subtask 的 index,标明数据是在哪个并行度处理的
  3. sink 数据到 kafka
  4. 写个简单的 java 消费者程序,讲数据的分区和数据内容输出

通过调整 flink 任务的并行度和 sink 的 topic 的分区数,测试以上问题

读取kafka,map算子给数据加上当前 subtask 数,标明数据是那个并行度的,最后 sink 到 kafka

object KafkaSinkTest {

  val LOG = LoggerFactory.getLogger("KafkaSinkTest")

  def main(args: Array[String]): Unit = {

    val topic = "user_log"
    val sinkTopic = "user_log_sink"

    // env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // global parllelism
    val parallelism = 1
    env.setParallelism(parallelism)

    // kafka source
    val kafkaSource = KafkaSource.builder[String]()
      .setBootstrapServers(Common.BROKER_LIST)
      .setTopics(topic)
      .setGroupId("KafkaSinkTest")
      .setStartingOffsets(OffsetsInitializer.latest())
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build();

    // kafka sink
    val kafkaSink = KafkaSink
      .builder[String]()
      .setBootstrapServers(bootstrapServer)
      .setKafkaProducerConfig(Common.getProp)
      .setRecordSerializer(KafkaRecordSerializationSchema.builder[String]()
        .setTopic(sinkTopic)
        // 不指定 key 的序列号器,key 会为 空
//        .setKeySerializationSchema(new SimpleStringSchema())
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
      )
      .build()

    // add source,读取数据
    val sourceStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")

    // map, add current subtask index
    val mapStream = sourceStream
     // rebalance data to all parallelisn
      .rebalance
      .flatMap(new RichFlatMapFunction[String, String] {
        override def flatMap(element: String, out: Collector[String]): Unit = {
          val parallelism = getRuntimeContext.getIndexOfThisSubtask
          out.collect(parallelism + "," + element)

        }
      })
      .name("flatMap")
      .uid("flatMap")

    // sink to kafka, new api
    mapStream.sinkTo(kafkaSink)

    // sink to kafka, old api
    //    val kafkaProducer = new FlinkKafkaProducer[String](bootstrapServer,sinkTopic, new SimpleStringSchema())
    //    mapStream.addSink(kafkaProducer)
    //      .setParallelism(parallelism)

    env.execute("KafkaSinkTest")
  }

任务流图如下:

消费者

打印从 kafka 消费出来的数据,并打印 数据在 topic 的分区编号


private static final String topic = "user_log_sink";

    public static void main(String[] args) throws InterruptedException {

        Properties prop = KafkaUtils.getProp();

        KafkaConsumer kafkaConsumer = new KafkaConsumer(prop);
        kafkaConsumer.subscribe(Arrays.asList(topic));
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        while (true) {

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
            String key;
            String time;
            String value = null;

            for (ConsumerRecord<String, String> record : consumerRecords) {
                value = record.value();
                // 读取数据所在的分区
                int partition = record.partition();

                System.out.println( "partition: " +partition+ ", value : " + value);
            }

            Thread.sleep(5 * 1000);
   }
}

场景测试

场景1: 1 个并行度 对 3 个分区

kakfa 消费结果:

partition: 2, value : 0,{"category_id":10,"user_id":"349","item_id":"10507","behavior":"buy","ts":"2022-05-28 15:34:51.774"}
partition: 0, value : 0,{"category_id":10,"user_id":"351","item_id":"10941","behavior":"buy","ts":"2022-05-28 15:34:53.774"}
partition: 1, value : 0,{"category_id":10,"user_id":"348","item_id":"10354","behavior":"buy","ts":"2022-05-28 15:34:50.774"}

结论: 数据一对一,一个并行度的数据往 3 个分区发

场景2: 3 个并行度 对 1 个分区

kakfa 消费结果:

partition: 0, value : 1,{"category_id":10,"user_id":"72","item_id":"10433","behavior":"buy","ts":"2022-05-27 14:19:02.437"}
partition: 0, value : 2,{"category_id":10,"user_id":"73","item_id":"1093","behavior":"buy","ts":"2022-05-27 14:19:03.437"}
partition: 0, value : 0,{"category_id":10,"user_id":"74","item_id":"10217","behavior":"buy","ts":"2022-05-27 14:19:04.437"}

结论: 数据多对一,多个并行度的数据往 1 个分区发

场景3: 3 个并行度 对 3 个分区

kakfa 消费结果:

partition: 0, value : 1,{"category_id":10,"user_id":"567","item_id":"10900","behavior":"buy","ts":"2022-05-27 14:27:17.437"}
partition: 2, value : 1,{"category_id":10,"user_id":"573","item_id":"10241","behavior":"buy","ts":"2022-05-27 14:27:23.437"}
partition: 1, value : 1,{"category_id":10,"user_id":"570","item_id":"10943","behavior":"buy","ts":"2022-05-27 14:27:20.437"}

partition: 0, value : 2,{"category_id":10,"user_id":"568","item_id":"1050","behavior":"buy","ts":"2022-05-27 14:27:18.437"}
partition: 1, value : 2,{"category_id":10,"user_id":"574","item_id":"1021","behavior":"buy","ts":"2022-05-27 14:27:24.437"}

partition: 1, value : 0,{"category_id":10,"user_id":"566","item_id":"1028","behavior":"buy","ts":"2022-05-27 14:27:16.437"}
partition: 2, value : 0,{"category_id":10,"user_id":"569","item_id":"10801","behavior":"buy","ts":"2022-05-27 14:27:19.437"}
partition: 0, value : 0,{"category_id":10,"user_id":"578","item_id":"10759","behavior":"buy","ts":"2022-05-27 14:27:28.437"}


结论: 数据多对多,多个并行度的数据往 3 个分区发

场景4: 2 个并行度 对 4 个分区

kakfa 消费结果:


partition: 0, value : 0,{"category_id":10,"user_id":"397","item_id":"10669","behavior":"buy","ts":"2022-05-28 15:35:39.774"}
partition: 2, value : 0,{"category_id":10,"user_id":"399","item_id":"10940","behavior":"buy","ts":"2022-05-28 15:35:41.774"}
partition: 1, value : 0,{"category_id":10,"user_id":"427","item_id":"1032","behavior":"buy","ts":"2022-05-28 15:36:09.774"}

partition: 2, value : 1,{"category_id":10,"user_id":"401","item_id":"10798","behavior":"buy","ts":"2022-05-28 15:35:43.774"}
partition: 0, value : 1,{"category_id":10,"user_id":"400","item_id":"10654","behavior":"buy","ts":"2022-05-28 15:35:42.774"}
partition: 1, value : 1,{"category_id":10,"user_id":"398","item_id":"10627","behavior":"buy","ts":"2022-05-28 15:35:40.774"}

结论: 数据多对多,多个并行度的数据往 3 个分区发

场景5: 2 个并行度 对 3 个分区

kakfa 消费结果:


partition: 2, value : 1,{"category_id":10,"user_id":"708","item_id":"10929","behavior":"buy","ts":"2022-05-27 14:29:38.437"}
partition: 0, value : 1,{"category_id":10,"user_id":"706","item_id":"10161","behavior":"buy","ts":"2022-05-27 14:29:36.437"}

partition: 2, value : 0,{"category_id":10,"user_id":"709","item_id":"10438","behavior":"buy","ts":"2022-05-27 14:29:39.437"}
partition: 0, value : 0,{"category_id":10,"user_id":"707","item_id":"10870","behavior":"buy","ts":"2022-05-27 14:29:37.437"}
partition: 1, value : 0,{"category_id":10,"user_id":"711","item_id":"10789","behavior":"buy","ts":"2022-05-27 14:29:41.437"}

结论: 数据一对多,一个并行度的数据往 3 个分区发

场景6: 4 个并行度 对 2 个分区

kakfa 消费结果:

partition: 0, value : 0,{"category_id":10,"user_id":"264","item_id":"10261","behavior":"buy","ts":"2022-05-27 14:22:14.437"}
partition: 1, value : 0,{"category_id":10,"user_id":"271","item_id":"10784","behavior":"buy","ts":"2022-05-27 14:22:21.437"}

partition: 1, value : 1,{"category_id":10,"user_id":"261","item_id":"10206","behavior":"buy","ts":"2022-05-27 14:22:11.437"}
partition: 0, value : 1,{"category_id":10,"user_id":"268","item_id":"10781","behavior":"buy","ts":"2022-05-27 14:22:18.437"}

partition: 1, value : 2,{"category_id":10,"user_id":"262","item_id":"10429","behavior":"buy","ts":"2022-05-27 14:22:12.437"}
partition: 0, value : 2,{"category_id":10,"user_id":"269","item_id":"10893","behavior":"buy","ts":"2022-05-27 14:22:19.437"}

partition: 1, value : 3,{"category_id":10,"user_id":"263","item_id":"10578","behavior":"buy","ts":"2022-05-27 14:22:13.437"}
partition: 0, value : 3,{"category_id":10,"user_id":"267","item_id":"10790","behavior":"buy","ts":"2022-05-27 14:22:17.437"}

结论: 数据多对多,多个并行度的数据往 2 个分区发

场景7: 3 个并行度 对 2 个分区

kakfa 消费结果:

partition: 0, value : 2,{"category_id":10,"user_id":"475","item_id":"10947","behavior":"buy","ts":"2022-05-27 14:25:45.437"}
partition: 1, value : 2,{"category_id":10,"user_id":"478","item_id":"10605","behavior":"buy","ts":"2022-05-27 14:25:48.437"}

partition: 1, value : 0,{"category_id":10,"user_id":"479","item_id":"10749","behavior":"buy","ts":"2022-05-27 14:25:49.437"}
partition: 0, value : 0,{"category_id":10,"user_id":"476","item_id":"10856","behavior":"buy","ts":"2022-05-27 14:25:46.437"}

partition: 0, value : 1,{"category_id":10,"user_id":"477","item_id":"10418","behavior":"buy","ts":"2022-05-27 14:25:47.437"}
partition: 1, value : 1,{"category_id":10,"user_id":"474","item_id":"1078","behavior":"buy","ts":"2022-05-27 14:25:44.437"}

结论: 数据多对多,多个并行度的数据往 2 个分区发

场景8: 老的 kafka api FlinkKafkaProducer 1 个并行度 对 3 个分区

kakfa 消费结果:

partition: 0, value : 0,{"category_id":10,"user_id":"520","item_id":"10207","behavior":"buy","ts":"2022-05-28 15:37:42.774"}

结论: 数据一对多,一个并行度的数据往 1 个分区发

场景9: 老的 kafka api FlinkKafkaProducer 2 个并行度 对 4 个分区

kakfa 消费结果:

partition: 0, value : 0,{"category_id":10,"user_id":"580","item_id":"10577","behavior":"buy","ts":"2022-05-28 15:38:42.774"}
partition: 1, value : 1,{"category_id":10,"user_id":"577","item_id":"10481","behavior":"buy","ts":"2022-05-28 15:38:39.774"}

结论: 数据一对多,一个并行度的数据只会往 1 个分区发,另外两个分区无数据

场景9: 老的 kafka api FlinkKafkaProducer 4 个并行度 对 2 个分区

kakfa 消费结果:

partition: 0, value : 2,{"category_id":10,"user_id":"785","item_id":"10883","behavior":"buy","ts":"2022-05-28 15:42:07.774"}
partition: 0, value : 0,{"category_id":10,"user_id":"787","item_id":"102","behavior":"buy","ts":"2022-05-28 15:42:09.774"}
partition: 1, value : 1,{"category_id":10,"user_id":"788","item_id":"10618","behavior":"buy","ts":"2022-05-28 15:42:10.774"}
partition: 1, value : 3,{"category_id":10,"user_id":"790","item_id":"10627","behavior":"buy","ts":"2022-05-28 15:42:12.774"}

结论: 数据多对一,一个并行度的数据只会往 1 个分区发

结论

KafkaSink api

随机策略,数据随机发往所有分区

Flink 的 kafka connector 并没有设置分区策略,直接使用的 kafka 客户端

Flink 中调用 kafka 生产者发送数据:

Flink 1.15.0 使用的 kafka-client 版本是 2.8.1 的默认设置: 随机选择一个和上一次不一样的分区

分区源码: StickyPartitionCache.nextPartition

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // Check that the current sticky partition for the topic is either not set or that the partition that 
        // triggered the new batch matches the sticky partition that needs to be changed.
        if (oldPart == null || oldPart == prevPartition) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }

注: 如果指定 key 的 序列化器,会将 数据用 指定的 序列化器序列化后生成 kafka 的 key

源码:


@Override
public ProducerRecord<byte[], byte[]> serialize(
        IN element, KafkaSinkContext context, Long timestamp) {
    final String targetTopic = topicSelector.apply(element);
    final byte[] value = valueSerializationSchema.serialize(element);
    byte[] key = null;
    // key 的 序列化器,讲数据序列化为 key,实际就是 key 和 value 一样
    if (keySerializationSchema != null) {
        key = keySerializationSchema.serialize(element);
    }
    final OptionalInt partition =
            partitioner != null
                    ? OptionalInt.of(
                            partitioner.partition(
                                    element,
                                    key,
                                    value,
                                    targetTopic,
                                    context.getPartitionsForTopic(targetTopic)))
                    : OptionalInt.empty();

    return new ProducerRecord<>(
            targetTopic,
            partition.isPresent() ? partition.getAsInt() : null,
            timestamp == null || timestamp < 0L ? null : timestamp,
            key,
            value);
}


注: 这样有个小问题,

FlinkKafkaProducer api

默认采用 sink 算子并行度 % kafka 分区数,所以每个并行度的数据永远都只会往一个 分区发

@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
    Preconditions.checkArgument(
            partitions != null && partitions.length > 0,
            "Partitions of the target topic is empty.");

    return partitions[parallelInstanceId % partitions.length];
}

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

标签:10,partition,Flink,ts,value,并行度,sink,id,user
来源: https://www.cnblogs.com/Springmoon-venn/p/16321022.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有