ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java8 Stream源码精讲(二):Stream创建原理深度解析

2022-07-13 23:33:38  阅读:153  来源: 互联网

标签:index fence Stream int 精讲 元素 Spliterator 源码


简介

Java8 Stream源码精讲(一):从一个简单的例子入手

上一篇文章,通过分析一个使用Stream操作数据的例子,讲解了构建Stream,经过中间操作map()和filter()方法调用返回一个ReferencePipeline链表,调用终止操作forEach()将声明的函数构造成为一个sink链表,最终每一个元素都会被传入Sink#accept()方法处理。本章将通过重点分析创建Stream的源码,了解Stream的构建过程。

Spliterator是什么

在分析Stream构建之前,需要填一下上一章的坑,还记得吗,在上一章分析Stream流程的时候,构建Stream传入了一个Spliterator对象,当时只是说它是一个类似迭代器一样的东西。

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) { 
    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false); 
}
复制代码

现在我们来仔细看看这个接口是做什么的,先来看接口定义,省略了接口中的常量和内部接口,只留下了方法定义:

public interface Spliterator<T> {
//用于遍历单个元素,action是Stream调用终止操作之后包装的sink链
boolean tryAdvance(Consumer&lt;? super T&gt; action);

//用于批量遍历元素,action是Stream调用终止操作之后包装的sink链
default void forEachRemaining(Consumer&lt;? super T&gt; action) {
    do { } while (tryAdvance(action));
}

//并行计算的时候拆分Spliterator
Spliterator&lt;T&gt; trySplit();

//预估元素的大小
long estimateSize();

//精确获取元素的大小
default long getExactSizeIfKnown() {
    return (characteristics() &amp; SIZED) == 0 ? -1L : estimateSize();
}

int characteristics();

default boolean hasCharacteristics(int characteristics) {
    return (characteristics() &amp; characteristics) == characteristics;
}

//获取元素比较器
default Comparator&lt;? super T&gt; getComparator() {
    throw new IllegalStateException();
}

}
复制代码

Spliterator翻译成中文是分离器或者拆分器,接口注释大概的意思是:Spliterator是一个用于遍历和划分源元素的对象,源元素可以来自一个数组、Collection集合、IO Channel或者一个生成器函数。可以通过tryAdvance()方法来遍历源中的单个元素,也可以通过forEachRemaining()方法批量遍历源中的元素。在并行计算中,可以通过trySplit()方法将源中的一些元素拆分为另外的Spliterator。

从注释我们知道Spliterator的主要作用:

  • 封装源元素
  • 遍历源元素
  • 拆分源元素(并行计算,不用关心)

现在我们主要来看看方法:

  • tryAdvance():用于遍历源中的单个元素,如果源中还有元素存在,就调用参数action的accept()方法消费元素,并且返回true,否则返回false。
  • forEachRemaining():批量遍历源中的元素,调用参数action的accept()方法消费剩下的所有元素。
  • trySplit():用于拆分元素的,并行计算时会使用到,我们只关注串行流,所以不用关心。
  • estimateSize():预估元素的大小,如果元素大小是已知的,返回元素大小,如果流是无界的、大小未知的或者计算太耗时,则返回Long.MAX_VALUE。
  • getExactSizeIfKnown():如果元素大小是已知的,则返回精确大小,否则返回-1。
  • getComparator():如果源中的元素是被比较器排序的,则返回这个比较器;如果是按照自然顺序排序的,则返回null;否则抛出IllegalStateException异常。

如何实现一个Spliterator

对Spliterator有一个详细的认识之后,我们来看一看如何实现一个Spliterator。

T8M{NF(4Z@3R0@`MB%C395F.png

哇,好多实现类,第一时间是不是比较懵逼,这么多子类不知道从何下手?不过从图中可以看出,Spliterator的子类基本上都是某一个集合的内部类,所以我打算选择两个常用子类详细讲解,大家也可以根据这种思路分析其它的。

ArraySpliterator

ArraySpliterator是Spliterators的一个内部类,每次通过一个数组构建Stream时,都会创建相应的ArraySpliterator对象。Arrays#stream()方法的调用流程:

public static <T> Stream<T> stream(T[] array) {
    return stream(array, 0, array.length);
}

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {
return Spliterators.spliterator(array, startInclusive, endExclusive,
Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
复制代码

Spliterators#spliterator()工厂方法用于创建ArraySpliterator对象:

public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex,
                                             int additionalCharacteristics) {
    checkFromToBounds(Objects.requireNonNull(array).length, fromIndex, toIndex);
    //返回ArraySpliterator对象
    return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics);
}
复制代码

ArraySpliterator字段分析:

//源数组
private final Object[] array;
//当前索引
private int index;        // current index, modified on advance/split
//终止索引
private final int fence;  // one past last index
private final int characteristics;
复制代码
  • array:源对象数组,在介绍Spliterator接口时说过,源元素可以来自一个数组。
  • index:当前数组的下标,当调用tryAdvance()、forEachRemaining()或者trySplit()方法时,会更改下标。
  • fence:数组的截止下标,一般也就是数组长度。

ArraySpliterator构造函数

public ArraySpliterator(Object[] array, int additionalCharacteristics) {
    this(array, 0, array.length, additionalCharacteristics);
}

public ArraySpliterator(Object[] array, int origin, int fence, int additionalCharacteristics) {
this.array = array;
this.index = origin;
this.fence = fence;
//这里要重点关注一下ArraySpliterator源元素大小是确定的
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
}
复制代码

通过构造函数可以看出,默认情况下没有指明origin和fence时,就是从0开始,到数组尾部结束,数组中的元素都算作流元素。

ArraySpliterator方法分析

源码比较简单,直接在代码上用注释说明了,不再单独讲解。

  • forEachRemaining()
public void forEachRemaining(Consumer<? super T> action) {
    Object[] a; int i, hi; // hoist accesses and checks from loop
    //判空
    if (action == null)
        throw new NullPointerException();
    //数组长度大于等于fence变量
    //index变量大于等于0
    //修改index变量且当前下标小于hi
    if ((a = array).length >= (hi = fence) &&
        (i = index) >= 0 && i < (index = hi)) {
        //循环消费数组元素
        do { action.accept((T)a[i]); } while (++i < hi);
    }
}
复制代码
  • tryAdvance()
public boolean tryAdvance(Consumer<? super T> action) {
    //判空
    if (action == null)
        throw new NullPointerException();
    //当前下标大于等于0且小于fence
    //数组中才有剩余可访问的元素
    if (index >= 0 && index < fence) {
        //取元素,index自增
        @SuppressWarnings("unchecked") T e = (T) array[index++];
        //消费元素
        action.accept(e);
        return true;
    }
    //返回false代表没有源元素了
    return false;
}
复制代码
  • estimateSize()
//就是fence减去index,代表数组中剩余的元素大小
public long estimateSize() { return (long)(fence - index); }
复制代码

ArrayListSpliterator

ArrayListSpliterator是ArrayList的内部类,调用ArrayList#stream()方法时会创建这样一个对象。

ArrayList继承自Collection的stream()方法:

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}
复制代码

重写了spliterator()方法,这个方法会返回一个Spliterator对象,可以看到创建的就是ArrayListSpliterator,同时传入了ArrayList自身:

public Spliterator<E> spliterator() {
    return new ArrayListSpliterator<>(this, 0, -1, 0);
}
复制代码

那我们来着重研究下ArrayListSpliterator的源码。

ArrayListSpliterator字段分析

//源元素集合
private final ArrayList<E> list;
//当前下标
private int index; // current index, modified on advance/split
//结束下标
private int fence; // -1 until used; then one past last index
private int expectedModCount; // initialized when fence set
复制代码
  • list:源元素集合,也就是创建ArrayListSpliterator时传入的ArrayList。
  • index:当前下标,当调用tryAdvance()、forEachRemaining()或者trySplit()方法时,会更改下标。
  • fence:结束下标,初始值为-1,当getFence()被调用时,会将ArrayList的size赋值给它,后面会讲到。
  • expectedModCount:看到这个变量是不是很眼熟,表示ArrayList在流处理中同样不能被并发修改。

ArrayListSpliterator方法分析

  • forEachRemaining()
public void forEachRemaining(Consumer<? super E> action) {
    int i, hi, mc; // hoist accesses and checks from loop
    ArrayList<E> lst; Object[] a;
    //判空
    if (action == null)
        throw new NullPointerException();
    //ArrayList不为空且其中存放元素的数组不能为空,很容易理解
    if ((lst = list) != null && (a = lst.elementData) != null) {
        //一般来说进入这个分支,因为创建ArrayListSpliterator时,fence是-1
        if ((hi = fence) < 0) {
            mc = lst.modCount;
            //hi就是ArrayList的元素大小
            hi = lst.size;
        }
        else
            mc = expectedModCount;
        //修改index变量
        if ((i = index) >= 0 && (index = hi) <= a.length) {
            for (; i < hi; ++i) {
                //遍历list中的元素数组,取依次取上面的元素,调用action消费
                @SuppressWarnings("unchecked") E e = (E) a[i];
                action.accept(e);
            }
            //校验modCount,不允许方法执行时内部结构发生改变
            if (lst.modCount == mc)
                return;
        }
    }
    throw new ConcurrentModificationException();
}
复制代码
  • tryAdvance()
public boolean tryAdvance(Consumer<? super E> action) {
    //判空
    if (action == null)
        throw new NullPointerException();
    //这里hi其实就是ArrayList元素大小
    int hi = getFence(), i = index;
    if (i < hi) {
        index = i + 1;
        //取数组中index下标的元素
        @SuppressWarnings("unchecked") E e = (E)list.elementData[i];
        //调用action消费元素
        action.accept(e);
        //并发修改校验
        if (list.modCount != expectedModCount)
            throw new ConcurrentModificationException();
        return true;
    }
    //没有剩余的元素,返回false
    return false;
}
复制代码

tryAdvance()中是通过将getFence()的返回值赋值给hi的,进入这个方法看下:

private int getFence() { // initialize fence to size on first use
    int hi; // (a specialized variant appears in method forEach)
    ArrayList<E> lst;
    //首次被调用时会进入,将ArrayList.size赋值给fence,ArrayList.modCount赋值给expectedModCount,
    //其它时候直接返回fence的值
    if ((hi = fence) < 0) {
        if ((lst = list) == null)
            hi = fence = 0;
        else {
            expectedModCount = lst.modCount;
            hi = fence = lst.size;
        }
    }
    return hi;
}
复制代码
  • estimateSize()
//估计值大小,ArrayList也是元素大小确定的,计算逻辑是ArrayList.size减去当前下标,表示还有多少源元素
public long estimateSize() {
    return (long) (getFence() - index);
}
复制代码

根据Spliterator创建Stream

通过前面分析我们知道Stream都是通过StreamSupport这个工具类创建的,传入的Spliterator参数就是上面讲解的Spliterator实现类实例:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}
复制代码

返回的实际上是一个ReferencePipeline.Head对象,我在上一个章节中有详细的讲解,现在我们再来分析一下加深印象。先看一下它的类继承关系:

Head.png Head是ReferencePipeline的内部类,同时又继承了ReferencePipeline,ReferencePipeline是引用类型Stream的抽象,它实现了Stream接口,拥有中间操作和终止操作的能力,ReferencePipeline也继承了AbstractPipeline,这个抽象类上一章节有详细讲解。

Head的构造函数调用ReferencePipeline构造函数:

Head(Spliterator<?> source,
     int sourceFlags, boolean parallel) {
    super(source, sourceFlags, parallel);
}

ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
复制代码

最终调用AbstractPipeline构造函数,上一章也有详细讲解,这里再回顾一下:

AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    //头结点没有前一个节点了
    this.previousStage = null;
    //代表源元素的Spliterator
    this.sourceSpliterator = source;
    //sourceStage变量指向自己
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    //深度为0
    this.depth = 0;
    //标识是不是并行流,默认为false
    this.parallel = parallel;
}
复制代码

总结一下,通过StreamSupport#stream()传入Spliterator创建Stream,实际上返回的都是ReferencePipeline.Head对象,它代表源阶段的Stream,也是Pipeline链表的头结点。

Spliterator调用时机

前面分析了Spliterator和创建Stream的过程之后,还有一个疑问:Spliterator是在什么地方被使用到的呢?

先说结论:Spliterator是在Stream调用终止操作的时候触发它的方法调用。其实这很容易理解,因为Stream是惰性流,创建和中间操作的时候什么都不会做,只有终止操作时才调用声明的处理数据的lambda表达式。

看过上一章节的小伙伴应该还记得终止操作都会调用AbstractPipeline#evaluate()方法:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
return isParallel()
       ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
       : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));

}
复制代码

经过一系列调用,进入AbstractPipeline#copyInto()方法:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);
//非短路操作
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {

    //调用spliterato#getExactSizeIfKnown()获取源元素精确大小,然后传递给sink链表的begin()方法
  wrappedSink.begin(spliterator.getExactSizeIfKnown());
    //调用spliterator#forEachRemaining()批量遍历源元素
    spliterator.forEachRemaining(wrappedSink);
    wrappedSink.end();
}
//短路操作
else {
    copyIntoWithCancel(wrappedSink, spliterator);
}

}
复制代码

终止操作是非短路操作的,在copyInto()中调用Spliterato#getExactSizeIfKnown()方法,会间接调用Spliterato#estimateSize(),然后调用Spliterato#forEachRemaining()批量遍历源元素。

终止操作是短路操作的,会再调用AbstractPipeline#copyIntoWithCancel()方法:

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    @SuppressWarnings({"rawtypes","unchecked"})
    AbstractPipeline p = AbstractPipeline.this;
    //取Pipeline链表头节点
    while (p.depth > 0) {
        p = p.previousStage;
    }
    wrappedSink.begin(spliterator.getExactSizeIfKnown());
    p.forEachWithCancel(spliterator, wrappedSink);
    wrappedSink.end();
}
复制代码

ReferencePipeline#forEachWithCancel()方法:

final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
复制代码

短路操作在do-while循环中调用tryAdvance()方法遍历Spliterator的单个源元素。

总结

本文首先介绍了Spliterator的含义,详细讲解了每一个方法的作用,然后通过ArraySpliterator和ArrayListSpliterator分析如何实现一个Spliterator接口,再次回顾了代表源Stream的Head类,最后分析了Spliterator方法的调用时机。

写在最后

  • 阅读源码有什么用?

Stream的数据处理逻辑都是通过lambda表达式定义的,它是一种声名式编程,与命令式编程不同。命令式编程比如传统的集合遍历迭代很好调试,而Stream很难调试,出了问题很多时候无从下手。通过阅读源码,了解原理,可以帮助我们更容易调试代码,定位问题。

  • 阅读建议

本系列文章是以专栏的形式发布的,上下文多有关联,如果跳着阅读,可能会产生不连贯的感觉,所以建议按照顺序阅读。另外由于是源码分析,跟其他类型文章不同,所以建议在阅读的时候跟着思路亲自动手调试源码。

最后,原创不易,如果觉得本系列文章对您有帮助,能够加深您对Stream原理和源码的理解的话,请不要吝啬您手中的赞(✪ω✪)!

来源:https://juejin.cn/post/7101217470542774308

标签:index,fence,Stream,int,精讲,元素,Spliterator,源码
来源: https://www.cnblogs.com/konglxblog/p/16476003.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有