ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Flink 物理分区

2022-07-01 19:03:01  阅读:164  来源: 互联网

标签:username 22 分区 Flink User import fullname age 物理


  分区是要将数据进行重新分布,传递到不同的流分区。keyBy 操作实际是一种按照hashCode 值进行重新分区的操作,这也是一种逻辑分区(按照散列值随机分开)。简单的说,分区就是slot 任务,重分区就是分配任务到不同的slot。

  系统默认也有分区,比如:我们编写的程序可能对多个处理任务设置了不同的并行度,那么当数据执行的上下游任务并行度变化时,数据就不应该还在当前分区以直通(forward)方式传输。这种情况下,系统会自动地将数据均匀地发往下游所有的并行任务,保证各个分区的负载均衡。

  物理分区就是手动控制分配策略。常见的有:随机、轮询、重缩放、广播、全局分区、自定义等。

 1. 随机分区(shuffle)、轮询测试

package cn.qz.partition;

import cn.qz.source.User;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Partition {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
                new User("zs1", "张三", 22),
                new User("ls2", "李四", 23),
                new User("ww3", "王五1", 21),
                new User("ww4", "王五", 22),
                new User("ww5", "王五", 22),
                new User("ww6", "王五", 22),
                new User("ww7", "王五", 22),
                new User("zl8", "赵六", 27)
        );

        // 1. 重新洗牌。 在数据量多的情况下近似均等分配。每个子任务分到的数据量接近一样
//        dataStreamSource.shuffle().print("shuffle").setParallelism(4);
        /**
         * shuffle:4> User(username=ls2, fullname=李四, age=23)
         * shuffle:4> User(username=ww4, fullname=王五, age=22)
         * shuffle:3> User(username=ww3, fullname=王五1, age=21)
         * shuffle:1> User(username=zs1, fullname=张三, age=22)
         * shuffle:3> User(username=ww7, fullname=王五, age=22)
         * shuffle:4> User(username=ww5, fullname=王五, age=22)
         * shuffle:1> User(username=ww6, fullname=王五, age=22)
         * shuffle:4> User(username=zl8, fullname=赵六, age=27)
         */

        // 2. 轮询。当两个算子的并行度发生变化的时候,默认的策略也是轮询。下面两个效果一样
        dataStreamSource.rebalance().print("rebalance").setParallelism(4);
//        dataStreamSource.print("default").setParallelism(4);
        // 从结果可以看出轮询的任务槽slot是: 3412
        /**
         * rebalance:1> User(username=ww3, fullname=王五1, age=21)
         * rebalance:2> User(username=ww4, fullname=王五, age=22)
         * rebalance:3> User(username=zs1, fullname=张三, age=22)
         * rebalance:4> User(username=ls2, fullname=李四, age=23)
         * rebalance:3> User(username=ww5, fullname=王五, age=22)
         * rebalance:2> User(username=zl8, fullname=赵六, age=27)
         * rebalance:1> User(username=ww7, fullname=王五, age=22)
         * rebalance:4> User(username=ww6, fullname=王五, age=22)
         */

        executionEnvironment.execute();
    }
}

2.  重缩放测试

  重缩放和轮询分区非常相似。 当调用rescale 方法时,底层也是使用Round-Robin 算法进行轮询,但是会将下游的任务分成小团体; 轮询的时候是自己的小团体内进行轮询。举个例子可以理解为将6个并行任务分为2个,3个为一组,组内进行轮询.

比如:生产数据的时候,根据奇偶性将任务生产到2个slot,后面用6个任务打印。并且采用rescale 重缩放。

package cn.qz.partition;

import cn.qz.source.User;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class Partition2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        DataStreamSource<User> dataStreamSource = executionEnvironment.addSource(new RichParallelSourceFunction<User>() {
            @Override
            public void run(SourceContext<User> ctx) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                for (int i = 1; i <= 12; i++) {
                    if (i % 2 == indexOfThisSubtask) {
                        User user = new User("user" + i, "用户" + i, i);
                        ctx.collect(user);
                    }
                }
            }

            @Override
            public void cancel() {
                // ignore
            }
        }).setParallelism(2);

        // 3. 重缩放
        dataStreamSource.rescale().print().setParallelism(6);

        executionEnvironment.execute();
    }
}

结果:可以看出来。 123 处理的是偶数(slot 0 传递的数据);456处理的是奇数(slot 1 传递的数据)

4> User(username=user1, fullname=用户1, age=1)
4> User(username=user7, fullname=用户7, age=7)
6> User(username=user5, fullname=用户5, age=5)
6> User(username=user11, fullname=用户11, age=11)
2> User(username=user4, fullname=用户4, age=4)
2> User(username=user10, fullname=用户10, age=10)
1> User(username=user2, fullname=用户2, age=2)
3> User(username=user6, fullname=用户6, age=6)
3> User(username=user12, fullname=用户12, age=12)
5> User(username=user3, fullname=用户3, age=3)
1> User(username=user8, fullname=用户8, age=8)
5> User(username=user9, fullname=用户9, age=9)

3. 广播

数据经过广播之后。数据会在不同的分区都保留一份,可能进行重复处理。

package cn.qz.partition;

import cn.qz.source.User;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Partition3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        DataStreamSource<User> dataStreamSource = executionEnvironment.fromElements(
                new User("zs1", "张三", 22),
                new User("ls2", "李四", 23)
        );

        dataStreamSource.broadcast().print("broadcast").setParallelism(2);

        executionEnvironment.execute();
    }
}

结果:

broadcast:1> User(username=zs1, fullname=张三, age=22)
broadcast:2> User(username=zs1, fullname=张三, age=22)
broadcast:1> User(username=ls2, fullname=李四, age=23)
broadcast:2> User(username=ls2, fullname=李四, age=23)

4. 全局分区

相当于将所有的任务统一合并到一个任务中。

package cn.qz.partition;

import cn.qz.source.User;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class Partition4 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        DataStreamSource<User> dataStreamSource = executionEnvironment.addSource(new RichParallelSourceFunction<User>() {
            @Override
            public void run(SourceContext<User> ctx) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                for (int i = 1; i <= 4; i++) {
                    if (i % 2 == indexOfThisSubtask) {
                        User user = new User("user" + i, "用户" + i, i);
                        ctx.collect(user);
                    }
                }
            }

            @Override
            public void cancel() {
                // ignore
            }
        }).setParallelism(2);

        // 3. 调用global 之后设置并行度也无效
        dataStreamSource.global().print().setParallelism(2);

        executionEnvironment.execute();
    }
}

结果:

1> User(username=user1, fullname=用户1, age=1)
1> User(username=user3, fullname=用户3, age=3)
1> User(username=user2, fullname=用户2, age=2)
1> User(username=user4, fullname=用户4, age=4)

5.自定义分区

例如自己实现按照奇偶分区。

package cn.qz.partition;

import cn.qz.source.User;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class Partition4 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        DataStreamSource<User> dataStreamSource = executionEnvironment.addSource(new RichParallelSourceFunction<User>() {
            @Override
            public void run(SourceContext<User> ctx) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                for (int i = 1; i <= 12; i++) {
                    User user = new User("user" + i, "用户" + i, i);
                    ctx.collect(user);
                }
            }

            @Override
            public void cancel() {
                // ignore
            }
        });

        dataStreamSource.partitionCustom(new Partitioner<Integer>() {
            /**
             * 返回值代表分区的index
             * @param key   keySelector 返回的关键字
             * @param numPartitions 任务总数(这里为2)
             * @return
             */
            @Override
            public int partition(Integer key, int numPartitions) {
                return key % numPartitions;
            }
        }, new KeySelector<User, Integer>() {
            @Override
            public Integer getKey(User value) throws Exception {
                return value.getAge();
            }
        }).print().setParallelism(2);

        executionEnvironment.execute();
    }
}

结果:

1> User(username=user2, fullname=用户2, age=2)
2> User(username=user1, fullname=用户1, age=1)
1> User(username=user4, fullname=用户4, age=4)
2> User(username=user3, fullname=用户3, age=3)
1> User(username=user6, fullname=用户6, age=6)
2> User(username=user5, fullname=用户5, age=5)
1> User(username=user8, fullname=用户8, age=8)
2> User(username=user7, fullname=用户7, age=7)
2> User(username=user9, fullname=用户9, age=9)
2> User(username=user11, fullname=用户11, age=11)
1> User(username=user10, fullname=用户10, age=10)
1> User(username=user12, fullname=用户12, age=12)

 

标签:username,22,分区,Flink,User,import,fullname,age,物理
来源: https://www.cnblogs.com/qlqwjy/p/16424886.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有