ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java自定义stream

2022-08-27 12:02:10  阅读:149  来源: 互联网

标签:java 自定义 stream 管道 sink new return SelfSink public


一、流程

1

        // 自定义集合,继承ArrayList,与ArrayList没啥区别
        SelfList<Apple> appleList = new SelfList<>();
import java.util.ArrayList;
import java.util.Iterator;

/**
 * 自定义集合,继承ArrayList,与ArrayList没啥区别
 */
public class SelfList<T> extends ArrayList<T> {
    /**
     * 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
     *
     * @return 头部管道(作用是持有数据源)
     */
    public SelfStream<T> selfStream() {
        Iterator<T> listIterator = super.iterator();
        return new SelfPipeline.SelfHead<>(listIterator);
    }
}
    /**
     * 头部管道:继承管道,属于管道的一种
     */
    static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        SelfHead(Iterator<?> source) {
            // 初始化头部管道
            super(source);
    /**
     * 初始化头部管道
     *
     * @param source 数据源
     */
    SelfPipeline(Iterator<?> source) {
        // 数据源
        this.sourceIterator = source;
        // 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
        this.sourceStage = this;

        // 上段管道,头部管道没有上段管道
        this.previousStage = null;

        // 头部管道的深度=0,下面的每段管道的深度依次+1
        this.depth = 0;
    }
        }

        // 头部管道无需sink
        @Override
        final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
            throw new UnsupportedOperationException();
        }
    }
        appleList.add(new Apple(1, "青色"));
        appleList.add(new Apple(2, "橙色"));
        appleList.add(new Apple(3, "红色"));
        appleList.add(new Apple(4, "绿色"));
        appleList.add(new Apple(5, "绿色"));
        appleList.add(new Apple(6, "紫色"));

2

        // 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
        SelfStream<Apple> pipelineHead = appleList.selfStream();
见SelfList

3

        // 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
        SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
    /**
     * 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
     *
     * @param predicate 断言型函数式接口
     * @return Op管道(FilterOp)
     */
    @Override
    public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        return new SelfStatelessOp<P_OUT, P_OUT>(this) {
    /**
     * Op管道:继承管道,属于管道的一种
     */
    abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        /**
         * @param upstream 上段管道
         */
        SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
            // 初始化Op管道
            super(upstream);
    /**
     * 初始化Op管道
     *
     * @param upstream 上段管道
     */
    SelfPipeline(SelfPipeline<?, P_IN> upstream) {
        // 自己的上段管道
        this.previousStage = upstream;
        // 上段管道的下段是自己
        upstream.nextStage = this;

        // 每一段管道都持有“头部管道”
        this.sourceStage = upstream.sourceStage;

        // 深度+1
        this.depth = upstream.depth + 1;
    }
        }
    }
       // 定义这段管道的职责(并由FilterSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
                // FilterSink
                return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
    /**
     * sink链
     */
    abstract class Chain<T, E_OUT> implements SelfSink<T> {
        // 下个sink
        protected final SelfSink<? super E_OUT> downstream;

        public Chain(SelfSink<? super E_OUT> downstream) {
            // 向下的单向链条
            this.downstream = downstream;
        }

        // 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    }

                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u)) {
                            // 触发下一段管道的动作,下沉
                            downstream.accept(u);
                        }
                    }
                };
            }
        };
    }

4

        // 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
        SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
    /**
     * 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
     *
     * @param mapper 函数型函数式接口:有入参和返回值
     * @return Op管道(MapOp)
     */
    @Override
    public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        return new SelfStatelessOp<P_OUT, R>(this) {
如上FilterOp
       // 定义这段管道的职责(并由MapSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
                // MapSink
                return new SelfSink.Chain<P_OUT, R>(sink) {
如上FilterOp
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

5

        /*
         * 执行终结操作:
         * 1,生成ReducingSink(设计为收集汇聚上游的流);
         * 2,生成MapSink,让MapSink向下链接ReducingSink;
         * 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
         * 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
         */
        SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
    /**
     * 终结操作
     *
     * @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
     * @return 最终结果:即汇聚最终结果都是在此方法中进行的
     */
    @Override
    public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
        // 在终结操作里尾部管道(TerminalOp)
        SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);
    // 生成尾部管道
    public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
        // 此例子是(Supplier<List<T>>) ArrayList::new
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        // 此例子是List::add
        BiConsumer<I, ? super T> accumulator = collector.accumulator();

        // 定义了ReducingSink的职责
        class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
            @Override
            public void begin(long size) {
                // state = ArrayList::new = 初始化的空集合
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                // 向集合state中添加元素t
                accumulator.accept(state, t);
            }
        }

        // 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
        return new SelfReduceOp<T, I, ReducingSink>() {
            // ReducingSink
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

     // 拉网
        A container = evaluate(terminalOp);
    /**
     * 使用terminalOp拉网
     *
     * @param terminalOp 使用statelessOp.collect方法的参数构造出来的
     * @param <R>
     * @return 拉网结果,ReducingSink的get方法得到最终集合
     */
    final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
        // 当前对象为map的statelessOp
        Iterator<?> iterator = sourceStage.sourceIterator;
        // 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
        return terminalOp.evaluateSequential(this, iterator);
        /**
         * 顺序评估管道
         */
        @Override
        public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
            // 生成尾部管道的sink
            S reducingSink = makeSink();
如上makeRef
       // 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
            S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);
    /**
     * 管道包装sink
     *
     * @return 评估管道结束后,返回终结操作管道的sink
     */
    @Override
    final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
        SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));
    /**
     * 管道包装sink
     *
     * @param sink 此例子是:传入的是ReducingSink
     * @return sink链的第一个对象
     */
    @Override
    final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
        for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
            sink = p.opWrapSink(sink);
见FilterOp、MapOp的职责定义部分
        }
        return (SelfSink<P_IN>) sink;
    }

     // 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
        copyInto(firstSinkLink, iterator);
    /**
     * 遍历数据源元素,顺序执行sink的accept方法(consumer)
     *
     * @param wrappedSink sink链的第一个对象
     * @param iterator    数据源
     */
    @Override
    final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
        // 准备好尾部管道的收集装置
        wrappedSink.begin(-1);

        // 遍历流转和收集
        iterator.forEachRemaining(wrappedSink);
// 收尾工作(无)
        wrappedSink.end();
    }

     // 返回ReducingSink
        return sink;
    }

       // 返回ReducingSink持有和可以supplier的集合
            return wrappedReducingSink.get();
        }
    }
return (R) container;
    }

6

        System.out.println(terminalOpCollect);

7

        // 简写
        SelfList<Apple> apples = appleList.selfStream()
                .filter(item -> "绿色".equals(item.getColor()))
                .map(Apple::getWeight)
                .collect(SelfCollectors.toList());
        System.out.println(apples);

二、完整代码

package com.simple.boot.java_skill.selfλ;

import com.simple.boot.java_skill.functionprograming.Apple;

/**
 * main函数
 */
public class SelfLambdaTest {
    public static void main(String[] args) {
        // 自定义集合,继承ArrayList,与ArrayList没啥区别
        SelfList<Apple> appleList = new SelfList<>();

        appleList.add(new Apple(1, "青色"));
        appleList.add(new Apple(2, "橙色"));
        appleList.add(new Apple(3, "红色"));
        appleList.add(new Apple(4, "绿色"));
        appleList.add(new Apple(5, "绿色"));
        appleList.add(new Apple(6, "紫色"));

        // 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
        SelfStream<Apple> pipelineHead = appleList.selfStream();
        // 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
        SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
        // 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
        SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
        /*
         * 执行终结操作:
         * 1,生成ReducingSink(设计为收集汇聚上游的流);
         * 2,生成MapSink,让MapSink向下链接ReducingSink;
         * 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
         * 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
         */
        SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
        System.out.println(terminalOpCollect);

        // 简写
        SelfList<Apple> apples = appleList.selfStream()
                .filter(item -> "绿色".equals(item.getColor()))
                .map(Apple::getWeight)
                .collect(SelfCollectors.toList());
        System.out.println(apples);
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.ArrayList;
import java.util.Iterator;

/**
 * 自定义集合,继承ArrayList,与ArrayList没啥区别
 */
public class SelfList<T> extends ArrayList<T> {
    /**
     * 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
     *
     * @return 头部管道(作用是持有数据源)
     */
    public SelfStream<T> selfStream() {
        Iterator<T> listIterator = super.iterator();
        return new SelfPipeline.SelfHead<>(listIterator);
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;

/**
 * 管道:流的一种具化实现
 */
public abstract class SelfPipeline<P_IN, P_OUT>
        extends SelfPipelineHelper<P_OUT> implements SelfStream<P_OUT> {
    // 数据源
    private Iterator<?> sourceIterator;
    // 头部管道(持有数据源)
    private final SelfPipeline sourceStage;

    // 上段管道
    private final SelfPipeline previousStage;

    // 下段管道
    private SelfPipeline nextStage;

    // 本段管道深度
    private int depth;

    /**
     * 初始化Op管道
     *
     * @param upstream 上段管道
     */
    SelfPipeline(SelfPipeline<?, P_IN> upstream) {
        // 自己的上段管道
        this.previousStage = upstream;
        // 上段管道的下段是自己
        upstream.nextStage = this;

        // 每一段管道都持有“头部管道”
        this.sourceStage = upstream.sourceStage;

        // 深度+1
        this.depth = upstream.depth + 1;
    }

    /**
     * 初始化头部管道
     *
     * @param source 数据源
     */
    SelfPipeline(Iterator<?> source) {
        // 数据源
        this.sourceIterator = source;
        // 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
        this.sourceStage = this;

        // 上段管道,头部管道没有上段管道
        this.previousStage = null;

        // 头部管道的深度=0,下面的每段管道的深度依次+1
        this.depth = 0;
    }

    /**
     * 头部管道:继承管道,属于管道的一种
     */
    static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        SelfHead(Iterator<?> source) {
            // 初始化头部管道
            super(source);
        }

        // 头部管道无需sink
        @Override
        final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
            throw new UnsupportedOperationException();
        }
    }

    /**
     * Op管道:继承管道,属于管道的一种
     */
    abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        /**
         * @param upstream 上段管道
         */
        SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
            // 初始化Op管道
            super(upstream);
        }
    }

    abstract SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink);

    /**
     * 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
     *
     * @param predicate 断言型函数式接口
     * @return Op管道(FilterOp)
     */
    @Override
    public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        return new SelfStatelessOp<P_OUT, P_OUT>(this) {
            // 定义这段管道的职责(并由FilterSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
                // FilterSink
                return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u)) {
                            // 触发下一段管道的动作,下沉
                            downstream.accept(u);
                        }
                    }
                };
            }
        };
    }

    /**
     * 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
     *
     * @param mapper 函数型函数式接口:有入参和返回值
     * @return Op管道(MapOp)
     */
    @Override
    public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        return new SelfStatelessOp<P_OUT, R>(this) {
            // 定义这段管道的职责(并由MapSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
                // MapSink
                return new SelfSink.Chain<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

    /**
     * 终结操作
     *
     * @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
     * @return 最终结果:即汇聚最终结果都是在此方法中进行的
     */
    @Override
    public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
        // 在终结操作里尾部管道(TerminalOp)
        SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);

        // 拉网
        A container = evaluate(terminalOp);

        return (R) container;
    }


    /**
     * 使用terminalOp拉网
     *
     * @param terminalOp 使用statelessOp.collect方法的参数构造出来的
     * @param <R>
     * @return 拉网结果,ReducingSink的get方法得到最终集合
     */
    final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
        // 当前对象为map的statelessOp
        Iterator<?> iterator = sourceStage.sourceIterator;
        // 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
        return terminalOp.evaluateSequential(this, iterator);
    }

    /**
     * 管道包装sink
     *
     * @return 评估管道结束后,返回终结操作管道的sink
     */
    @Override
    final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
        SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));

        // 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
        copyInto(firstSinkLink, iterator);

        // 返回ReducingSink
        return sink;
    }

    /**
     * 遍历数据源元素,顺序执行sink的accept方法(consumer)
     *
     * @param wrappedSink sink链的第一个对象
     * @param iterator    数据源
     */
    @Override
    final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
        // 准备好尾部管道的收集装置
        wrappedSink.begin(-1);

        // 遍历流转和收集
        iterator.forEachRemaining(wrappedSink);

        // 收尾工作(无)
        wrappedSink.end();
    }

    /**
     * 管道包装sink
     *
     * @param sink 此例子是:传入的是ReducingSink
     * @return sink链的第一个对象
     */
    @Override
    final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
        for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
            sink = p.opWrapSink(sink);
        }
        return (SelfSink<P_IN>) sink;
    }

}
package com.simple.boot.java_skill.selfλ;

import java.util.function.BiConsumer;
import java.util.function.Supplier;

public interface SelfCollector<T, A> {

    Supplier<A> supplier();

    BiConsumer<A, T> accumulator();
}
package com.simple.boot.java_skill.selfλ;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public final class SelfCollectors {

    public static <T> SelfCollector<T, ?> toList() {
        return new SelfCollectorImpl<>((Supplier<List<T>>) SelfList::new, List::add);
    }

    static class SelfCollectorImpl<T, A> implements SelfCollector<T, A> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;

        SelfCollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator) {
            this.supplier = supplier;
            this.accumulator = accumulator;
        }

        @Override
        public BiConsumer<A, T> accumulator() {
            return accumulator;
        }

        @Override
        public Supplier<A> supplier() {
            return supplier;
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;

/**
 * 管道工具类
 */
public abstract class SelfPipelineHelper<P_OUT> {

    abstract <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator);

    abstract <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator);

    abstract <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink);
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

/**
 * 尾部管道工厂模式
 */
public class SelfReduceOpsFactory {
    // 生成尾部管道
    public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
        // 此例子是(Supplier<List<T>>) ArrayList::new
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        // 此例子是List::add
        BiConsumer<I, ? super T> accumulator = collector.accumulator();

        // 定义了ReducingSink的职责
        class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
            @Override
            public void begin(long size) {
                // state = ArrayList::new = 初始化的空集合
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                // 向集合state中添加元素t
                accumulator.accept(state, t);
            }
        }

        // 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
        return new SelfReduceOp<T, I, ReducingSink>() {
            // ReducingSink
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

    /**
     * 结果集
     */
    private static abstract class SelfBox<U> {
        U state;

        SelfBox() {
        }

        public U get() {
            return state;
        }
    }

    /**
     * 尾部管道
     */
    private static abstract class SelfReduceOp<T, R, S extends SelfTerminalSink<T, R>>
            implements SelfTerminalOp<T, R> {
        SelfReduceOp() {
        }

        // 生成尾部管道的sink,定义为抽象方法,终结操作终结结果有多种
        public abstract S makeSink();

        /**
         * 顺序评估管道
         */
        @Override
        public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
            // 生成尾部管道的sink
            S reducingSink = makeSink();

            // 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
            S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);

            // 返回ReducingSink持有和可以supplier的集合
            return wrappedReducingSink.get();
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Consumer;

/**
 * 下沉:一种Consumer函数式接口,执行指定动作
 */
public interface SelfSink<T> extends Consumer<T> {
    default void begin(long size) {
    }

    default void end() {
    }

    /**
     * sink链
     */
    abstract class Chain<T, E_OUT> implements SelfSink<T> {
        // 下个sink
        protected final SelfSink<? super E_OUT> downstream;

        public Chain(SelfSink<? super E_OUT> downstream) {
            // 向下的单向链条
            this.downstream = downstream;
        }

        // 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Function;
import java.util.function.Predicate;

public interface SelfStream<T> {
    SelfStream<T> filter(Predicate<? super T> predicate);

    <R> SelfStream<R> map(Function<? super T, ? extends R> mapper);

    <R, A> R collect(SelfCollector<? super T, A> collector);
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;

public interface SelfTerminalOp<E_IN, R> {
    <P_IN> R evaluateSequential(SelfPipelineHelper<E_IN> helper, Iterator<P_IN> iterator);
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Supplier;

/**
 * 尾部管道的sink
 */
public interface SelfTerminalSink<T, R> extends SelfSink<T>, Supplier<R> {
}

 

标签:java,自定义,stream,管道,sink,new,return,SelfSink,public
来源: https://www.cnblogs.com/seeall/p/16630280.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有