如题,最近遇到一个归约问题引发的源码思考

首先 Stream#reduce 有3个重载方法如下:

    T reduce(T identity, BinaryOperator<T> accumulator);
    
    Optional<T> reduce(BinaryOperator<T> accumulator);

    <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);

第一个方法有两个入参:

  1. T identity : 意思为累加器 accumulator 的起始点
  2. BinaryOperator accumulator : 累加器

此方法比较好理解,归约的过程即对于传入的 identity,将 Stream 流中的每个元素执行传入的累加器 accumulator 中的操作后赋予 identity。

第二个方法只有一个入参,意思与上述类似,区别在于内置的 identity 初始为null,累加器 accumulator 处理完以后的值还是可能为 null,所以返回值用 Optional 包装了一下。

本文的重点在于第三个方法的三个参数:

  1. U identity:意思为累加器 accumulator 的起始点
  2. BiFunction<U, ? super T, U> accumulator:累加器
  3. BinaryOperator< U > combiner:合并器

先来看下官方文档的解释:
accumulator - an associative, non-interfering, stateless function for incorporating an additional element into a result.
累加器 - 一种关联的、无干扰的、无状态的函数,用于将附加元素合并到结果中.
combiner - an associative, non-interfering, stateless function for combining two values, ==which must be compatible with the accumulator function==
组合器 - 一个关联的、无干扰的、无状态的函数,用于组合两个值,==它必须与累加器函数兼容.==

其中重点在于以上标黄字体,其中官方还给了一个具体的解释:
Stream#reduce源码注释
其中框住的描述大致意思为:
标识值必须是组合器功能的标识。 这意味着对于所有 u,combiner(identity, u) 之后的值类型等于 u。 此外,组合器功能必须与累加器功能兼容; 对于所有 u 和 t,以下必须成立:
==combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)==

咋一看还是比较疑惑的,既然以上等式需要成立,为什么还要单独设计累加器和组合器呢?
让我们来看一组代码揭开迷惑:

    private static void testForReduce() {
        ArrayList<Integer> list = new ArrayList<>();
        list.add(2);
        list.add(3);
        list.add(4);
        list.add(5);
        Integer reduce = list.stream().reduce(
                1,
                (x, y) -> {
                    System.out.println("------accumulator--------"
                            + "\nvalue:" + (x + y + 2)
                    );
                    return x + y + 2;
                },
                (x, y) -> {
                    System.out.println("------combiner--------"
                            + "\nvalue:" + (x * y)
                    );
                    return x * y;
                }
        );
        System.out.println("result:"+reduce);
    }

你可以大胆猜测一下结果,如果结果是23,那么建议直接跳到文末看结论即可。
以下为上述代码执行流程:
Stream#reduce串行的流程

以上流程中细心的人可能发现了,为什么没有 combiner 的执行流程呢?
注:以上代码中,combiner 确实不参与 reduce 的整个过程中的计算.

那么设计 combiner 的意义何在呢?
下面我们来改造一下代码:

    private static void testForReduce() {
        ArrayList<Integer> list = new ArrayList<>();
        list.add(2);
        list.add(3);
        list.add(4);
        list.add(5);
        Integer reduce = list.parallelStream().reduce(
                1,
                (x, y) -> {
                    System.out.println("------accumulator--------\n"
                            + "ThreadName: " + Thread.currentThread().getName()
                            + "\nvalue:" + (x + y + 2) + "\n"
                    );
                    return x + y + 2;
                },
                (x, y) -> {
                    System.out.println("------combiner--------\n"
                            + "ThreadName: " + Thread.currentThread().getName()
                            + "\nvalue:" + (x * y)
                    );
                    return x * y;
                }
        );
        System.out.println("result:"+reduce);
    }

以上代码相比于第一次执行的代码:一是将 stream() 改成了 parallelStream(),二是每次打印了当前Thread 名字,下面来看看日志来分析:

------accumulator--------
ThreadName: ForkJoinPool.commonPool-worker-2
value:8

------accumulator--------
ThreadName: ForkJoinPool.commonPool-worker-11
value:5

------accumulator--------
ThreadName: ForkJoinPool.commonPool-worker-9
value:6

------accumulator--------
ThreadName: main
value:7

------combiner--------
ThreadName: ForkJoinPool.commonPool-worker-9
value:30
------combiner--------
ThreadName: main
value:56
------combiner--------
ThreadName: main
value:1680

result:1680

结合以上日志,代码的执行流程为:
stream#reduce并行的执行流程
结合上面的图片,可以清晰的看到:combiner 的执行过程为多个线程之前的合并数据流,所以在之前的单线程处理时,combiner 并不生效。

结论:Stream#reduce 方法的 combiner 在单线程工作时并不生效,combiner 的生效时间在多线程合并时,所以为了保证串行流与并行流的结果一致性,设计了上文中提及的原则:
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
如果不满足以上原则,则会造成本文中的两次结果不一致的情形。

引申思考:

  1. 在多线程之间的数据同步问题需要提前考虑并设计预处理方案。
  2. 根据本文日志中的 ThreadName 侧面印证了 parallelStream 的底层实际上为 ForkJoinPool,将多个 task 切分给了多个线程工作后再次 join。

标签: none

已有 2 条评论

  1. bro jie cow pussy

    1. 李哥他爹

      伙计 劫 母牛 猫咪

评论已关闭