ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

JAVA新特性(6)自定义收集器的实现与Collectors类

2019-05-20 22:47:46  阅读:230  来源: 互联网

标签:return 自定义 downstream 收集器 System println JAVA Collector public


  1. 实现Collector接口,首先在类中实现接口中所有的抽象方法,然后在主程序中用collect传入实现的类的实例
    public class CollectorSetTest<T> implements Collector<T,Set<T>,Set<T>> {
    //简单自定义收集器的实现:将list结果收集到一个Set中
    
        public static void main(String[] args) {
            List<String> list = Arrays.asList("Hello","World","NiHao");
            Set<String> strings = list.stream().collect(new CollectorSetTest<String>());
            System.out.println(strings);
        }
    
        @Override
        public Supplier<Set<T>> supplier() {
            System.out.println("supplier invoked");
            return HashSet::new;
        }
    
        @Override
        public BiConsumer<Set<T>,T> accumulator() {
            System.out.println("accumulator invoked");
            //报错,要使用给定泛型的Set,不能使用具体实现类
    //        return HashSet<T>::add;//不行
    //        return Set<T>::add;//可以
            return (set,item)->set.add(item);//与上等价
        }
    
        @Override
        public BinaryOperator<Set<T>> combiner() {
            System.out.println("combiner invoked");
            return (set1,set2)->{set1.addAll(set2); return set1;};
        }
    
        @Override
        public Function<Set<T>,Set<T>> finisher() {
            System.out.println("finisher invoked");
    //        return set->set;
            return Function.identity();
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            System.out.println("characteristics invoked");
            return Collections.unmodifiableSet(EnumSet.of
                    (Characteristics.IDENTITY_FINISH,Characteristics.UNORDERED));
        }
    }



     
  2. 并行流:可以有多个线程,每一个线程操作一个对应的结果容器;如果设置Collector的属性Characteristics.Concurrent,则多个线程会操作仅仅一个结果容器。
    示例代码:
    public class CollectorSetTest2<T> implements Collector<T, Set<T>,Map<T,T>> {
    //    实现:输入:Set<String>   输出:Map<String,String>
    
        public static void main(String[] args) {
            List<String> list = Arrays.asList("Hello","World","NiHao","Beijing","Hello","a","b","c","d","e");
            Set<String> strings = new TreeSet<>();
            strings.addAll(list);
            System.out.println(strings);
    
            System.out.println(Runtime.getRuntime().availableProcessors());//打印线程数
    //      parallelStream一般默认情况下会生成的线程数:与CPU核心数相同的线程;Intel现在进行了超核技术。
            Map<String,String> map = strings.parallelStream().collect(new CollectorSetTest2<>());//1
    //        Map<String,String> map1 = strings.stream().parallel().
    //        collect(new CollectorSetTest2<>());//完全等价于1
            System.out.println(map);
        }
    
        @Override
        public Supplier<Set<T>> supplier() {
    //         * below must be equivalent:
    // * <pre>{@code
    // *     A a1 = supplier.get();
    // *     accumulator.accept(a1, t1);
    // *     accumulator.accept(a1, t2);
    // *     R r1 = finisher.apply(a1);  // result without splitting
    // *
    // *     A a2 = supplier.get();
    // *     accumulator.accept(a2, t1);
    // *     A a3 = supplier.get();
    // *     accumulator.accept(a3, t2);
    // *     R r2 = finisher.apply(combiner.apply(a2, a3));  // result with splitting
    // * } </pre>
    
            System.out.println("supplier invoked");
    //        return HashSet::new;
            return ()->{
                System.out.println("---------");
                //测试Characteristics.CONCURRENT时,中间结果容器的数量
                return new HashSet<T>();
            };
        }
    
        @Override
        public BiConsumer<Set<T>, T> accumulator() {
            System.out.println("accumulator invoked");
            return (set,item)->{
                System.out.println(set + "," + Thread.currentThread().getName());//测试并行流时的输出
                set.add(item);
    //            当Characteristics.CONCURRENT:可能发生异常:ConcurrentModificationException
    //            it is not generally permissible for one thread to modify a Collection
    //            while another thread is iterating over it
            };
        }
        @Override
        public BinaryOperator<Set<T>> combiner() {
    //        并行流且在Characteristics.CONCURRENT的情况下才会被调用
            System.out.println("combiner invoked");
            return (s,s1)->{s.addAll(s1);return s;};
        }
    
        @Override
        public Function<Set<T>,Map<T,T>> finisher() {
            System.out.println("finisher invoked");
            return set -> {
                Map<T,T> map = new HashMap<>();
                set.stream().forEach(item->map.put(item, item));
                return map;
            };
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            System.out.println("characteristics invoked");
    //        return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT));
            return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
        }
    }
    

     

  3. Collectors静态工厂类的实现共分为两种情况:
    1)通过CollectorImpl来实现
    2)通过reducing方法来实现;reducing方法本身又是通过CollectorImpl来实现的
  4. 数字是一个值类型的,数组是引用类型的
    因为如此,源码中sumingInt的实现:
        /**
         * Returns a {@code Collector} that produces the sum of a integer-valued
         * function applied to the input elements.  If no elements are present,
         * the result is 0.
         *
         * @param <T> the type of the input elements
         * @param mapper a function extracting the property to be summed
         * @return a {@code Collector} that produces the sum of a derived property
         */
        public static <T> Collector<T, ?, Integer>
        summingInt(ToIntFunction<? super T> mapper) {
            return new CollectorImpl<>(
                    () -> new int[1],//使用一个长度为1的数组作为操作结果容器
                    (a, t) -> { a[0] += mapper.applyAsInt(t); },
                    (a, b) -> { a[0] += b[0]; return a; },
                    a -> a[0], CH_NOID);
        }

     

  5. 基础知识补充:
    原始类型包装类(primitive wrappers)(Integer,Long, Short, Double, Float, Character, Byte, Boolean)也都是不可变的
    不可变集合:
    1)保证线程安全
    2)被不可信的类库使用时会很安全
    3)如果一个对象不需要支持修改操作,将会节省空间和时间开销;能跟有效的利用内存。
    4)注:从可变集合转化为不可变集合,需要将所有元素赋给新的一个不可变集合,并禁止所有对原集合的更改。
  6. Collectors中groupingBy 源码解析:
        //classifier分类器:输入一个参数类型T及以上,返回一个结果参数类型K;如输入年龄,返回学生
        //Collector:参数一为输入参数年龄,中间类型?自己来定义,返回一个映射Map类型,对应key为年龄,value为学生列表
        public static <T, K> Collector<T, ?, Map<K, List<T>>>
        groupingBy(Function<? super T, ? extends K> classifier) {
            return groupingBy(classifier, toList());
        }
    
        //构造:接收了一个收集器(Collector:downstream)以及classifier,返回一个收集器;
        //构造思路:将classifier应用到传入的收集器(downstream)上
        public static <T, K, A, D>
        Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                              Collector<? super T, A, D> downstream) {
            return groupingBy(classifier, HashMap::new, downstream);
        }
    
        //接收三个参数:Function,Supplier,Collector(downstream);
        //调用downstream的那几个抽象方法来完成收集:supplier,accumulator,combiner,finisher
        //通过Function和Supplier来设置一些分类条件使用downstream来收集,得到新的CollectorImpl对象
        public static <T, K, D, A, M extends Map<K, D>>
        Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                      Supplier<M> mapFactory,
                                      Collector<? super T, A, D> downstream) {
            Supplier<A> downstreamSupplier = downstream.supplier();
            BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
            BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
                K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
    //K分类器classfier的返回类型
                A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
    //A中间可变容器类型,key对应的value中的如果没有内容就将downstreamSupplier容器对象放到value位置
                downstreamAccumulator.accept(container, t);
    //前两步是为了这一步做准备:downstreamAccumulator中的container(A类型)是有前两步计算出来的
            };  //中间结果都是Map<K, A> 类型了
            BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());//将中间结果进行合并(mapMerger)
            @SuppressWarnings("unchecked")
            Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
    //强制类型转化为Supplier<Map<K, A>>类型,转化为了中间结果类型
            if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
                return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
    //返回得到的新的CollectorImpl对象(收集器)
            }
            else {
                @SuppressWarnings("unchecked")
                Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
                Function<Map<K, A>, M> finisher = intermediate -> {
                    intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                    @SuppressWarnings("unchecked")
                    M castResult = (M) intermediate;
                    return castResult;
                };
                return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
            }
        }

    其中mapping源码解析:
     

        //将输入参数类型为T,经过mapper function 输出参数为U.在多重收集reduction下是最有用的
        //U为下游收集器downstream的输入,R为downstream的输出
        public static <T, U, A, R>
        Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
                                   Collector<? super U, A, R> downstream) {
            BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
            return new CollectorImpl<>(downstream.supplier(),
                                       (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
                                       downstream.combiner(), downstream.finisher(),
                                       downstream.characteristics());
        }
    For example, given a stream of {@code Person}, to accumulate the set of last names in each city:
       Map<City, Set<String>> lastNamesByCity
       = people.stream().collect(groupingBy(Person::getCity, mapping(Person::getLastName, toSet())));
    将每个city中的Person:使用mapping,mapping的作用:将此city中的Person按set of last name收集(收集为Set)
    mapping 中 Person::getLastName (mapper):person----><String>name, toset :<String>name---->Set<String>
    所以:mapping返回set<String>;  
    
  7.  





  8.  

标签:return,自定义,downstream,收集器,System,println,JAVA,Collector,public
来源: https://blog.csdn.net/liuzewei2015/article/details/90378176

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有