ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java ParallelStream

2021-05-06 21:34:12  阅读:222  来源: 互联网

标签:Java sum System currentTimeMillis start ms accumulate ParallelStream


ParallelStream 处理数据

Stream 接口提供了parallelStream方法来将集合转换为并行流。即将一个集合分为多个数据块,并用不同的线程分别处理每个数据块的流。
并且使用parallelStream 时无需担心内部变量控制,线程数量等问题。
如使用并行流计算1至100000累加之和:

  • 最后一次parallel或sequential调用会影响整个流水线,即如下例子中会并行执行。
  • parallelStream使用得默认核心数为Runtime.getRuntime().availableProcessors() - 1。
    可通过配置java.util.concurrent.ForkJoinPool.common.parallelism改变默认的核心数。
        Stream.iterate(1L, param1 -> Math.addExact(param1, 1))
                .limit(100000)
                .parallel()
                .sequential()
                .parallel()
                .reduce(0L, Math::addExact)
                .longValue();

parallelStream 性能分析

通常我们认为在数据量到达一定程度时,使用多线程计算会获得更好的性能。但实际效果应该在测量比较之后才直到。
使用并行流和顺序流别计算1至100000 的累加之和,在我的四核英特尔机器上运行结果如下:

        long start = System.currentTimeMillis();
        Stream.iterate(1L, param1 -> Math.addExact(param1, 1))
                .limit(100000)
                .parallel()
                .reduce(0L, Math::addExact)
                .longValue();
        System.out.println(String.format("Parallel accumulate sum, used %d ms.", System.currentTimeMillis() - start));

        start = System.currentTimeMillis();
        LongStream.rangeClosed(1, 100000)
                    .reduce(0L, Math::addExact);
        System.out.println(String.format("Sequential accumulate sum, used %d ms.", System.currentTimeMillis() - start));
      
      Parallel accumulate sum, used 64 ms.
      Sequential accumulate sum, used 8 ms.

通过以上结果可以看到,并行流计算的耗时竟然是顺序流的好几倍,这与我们的预期结果差距十分的大。
要想明白这差距的原因,首先得明白影响上面并行流的速度的因素有那些:

  • 元素是否容易拆分为多个数据块, 很明显Iterate 很难拆分为多个独立数据块,因为每次应用这个函数都要依赖于前一个元素。
  • 元素是否频繁拆装箱, 流中Long -> long 频繁拆装箱也影响了效率。而LongStream 中并没有这个消耗。

修复上面两个影响并行流的速度的问题后,重新运行结果如下:


        long start = System.currentTimeMillis();
        LongStream.rangeClosed(1, 100000)
                    .parallel()
                    .reduce(0L, Math::addExact);
        System.out.println(String.format("Parallel accumulate sum, used %d ms.", System.currentTimeMillis() - start));

        start = System.currentTimeMillis();
        LongStream.rangeClosed(1, 100000)
                    .reduce(0L, Math::addExact);
        System.out.println(String.format("Sequential accumulate sum, used %d ms.", System.currentTimeMillis() - start));

        Parallel accumulate sum, used 7 ms.
        Sequential accumulate sum, used 3 ms.

并行流的速度得到了很大提升,这表明并行化时需要使用正确的数据结构
但是顺序流的速度却仍然更快,这说明并行化也是有代价的,如下:

  • 内核之间交换数据的花销较大。
  • 要保证在内核中的处理时间大于内核间的数据交换时间,即数据到达一定的量级。

而并行过程需要对流要递归划分,再把每个子流的归纳操作分配到不同的线程,最后把这些操作的结果合并成一个值。
在子流归纳操作时间过短时,并行化并没有带来性能提升,反而是更加慢了。

再将数据提升至上亿级别进行运算,并行流终于取得了一些领先。

        long start = System.currentTimeMillis();
        LongStream.rangeClosed(1, 100000000)
                    .parallel()
                    .reduce(0L, Math::addExact);
        System.out.println(String.format("Parallel accumulate sum, used %d ms.", System.currentTimeMillis() - start));

        start = System.currentTimeMillis();
        LongStream.rangeClosed(1, 100000000)
                    .reduce(0L, Math::addExact);
        System.out.println(String.format("Sequential accumulate sum, used %d ms.", System.currentTimeMillis() - start));
        Parallel accumulate sum, used 79 ms.
        Sequential accumulate sum, used 264 ms.

高效使用ParallelStream

关于在什么地方使用parallelStream 没有绝对的建议,而是只能做定性分析。下列是一些可能影响性能的地方:

  • 测量比较,并行流并不都比顺序流快。
  • 避免拆装箱,这对性能有较大影响。可使用原始类型IntStream, LongStream等。
  • 依赖元素顺序的操作,并行性能较差。如findAny()性能会优于findFirst(),因为它不依赖于顺序。
  • 数据量大小,估算一个元素通过流水线的大概处理时间,得到处理完整个集合的处理时间。
  • 流是否易于拆分,如ArrayList 比LinkedList 更易于拆分,前者无需遍历,后者需要遍历之后才能拆分。
  • 终端操作时,合并操作的代价大小(例如Collector中的combiner方法)。

Fork/Join

ParallelStream流背后使用的基础架构是Java 7中引入的Fork/Join分支合并框架。
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。
这其实就是分治算法的并行版本。

标签:Java,sum,System,currentTimeMillis,start,ms,accumulate,ParallelStream
来源: https://www.cnblogs.com/cd-along/p/14736898.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有