参考文献

  • Java8 实战

Stream API

  • 只能对实现了java.util.Collection接口的类做流操作.
  • 流(Stream)是数据渠道,用于操作数据源(集合,数组等)所生成的元素序列.
  • 集合讲的是数据,流讲的是计算
  • Stream自己不会存储元素
  • Stream不会改变源对象.相反,它们会返回一个持有结果的新Stream.
  • Stream操作是延迟执行的,这意味着它们等到需要的时候才执行.
  • Stream支持同步执行,也支持异步执行.
img

惰性流

  • 流是惰性的,在达到终止条件前不会处理元素,达到终止条件后逐个处理每个元素.如果遇到短路操作,那么只要满足所有条件,流处理就会终止.
  • 对于集合而言,必须执行完所有操作才能进行下一步操作.对于流而言,各种中间操作构成一条流水线,但在流达到终止操作前不会处理任何元素,达到终止操作后只处理所需的值.
  • 流处理并非任何情况下都有意义:如果进行任何状态操作(如排序或求和),就不得不处理所有值.但是如果无状态操作后跟一个短路终止操作,流处理的优点还是很明显的.

注意点

  • 请注意,和迭代器类似,流只能遍历一次

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class StreamTest {

    @Test
    public void testStream() {
    final List<Integer> list = Arrays.asList(1, 2, 3);
    final Stream<Integer> stream = list.stream();
    stream.forEach(System.out::println);
    stream.forEach(System.out::println);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    1
    2
    3

    java.lang.IllegalStateException: stream has already been operated upon or closed

    at java.base/java.util.stream.AbstractPipeline.sourceStageSpliterator(AbstractPipeline.java:279)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
    at com.holelin.sundry.test.common.StreamTest.testStream(StreamTest.java:16)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

Stream的操作三个步骤

创建Stream

  • 一个数据源(如集合,数组)获取一个流.
Java8中Collection接口
  • Java8中Collection接口被扩展,提供了两个获取流的方法:
    • default Stream<E> stream(): 返回一个顺序流;
    • default Stream<E> parallelStream(): 返回一个并行流;
由数组创建流
  • Java8中Arrays的静态方法stream()可以获取数组流
  • static <T> Stream<T> stream(T[] array): 返回一个流
由值创建流
  • 可以使用静态方法Stream.of()通过显示值创建一个流,它可以接收任意数量的参数.
  • static<T> Stream<T> of(T... values)
创建无限流
  • 由函数创建流,使用静态方法Stream.iterate()Stream.generate()创建无限流.

  • static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)

  • T seed: 初始值

  • UnaryOperator<T> f: 依次应用在每个产生的新值上的lambda(``UnaryOperator`类型)

    1
    2
    3
    4
    5
    6
    7
    8
    9
        Stream.iterate(LocalDate.now(), it->it.plusDays(1))
    .limit(5)
    .forEach(System.out::println);

    // 生成斐波拉契数列
    Stream.iterate(new int[]{0, 1},
    t -> new int[]{t[1], t[0]+t[1]})
    .limit(20)
    .forEach(t -> System.out.println("(" + t[0] + "," + t[1] +")"));
  • static<T> Stream<T> generate(Supplier<? extends T> s)

    • generate不是依次 对每个新生成的值应用函数的.它接受一个Supplier类型的Lambda提供新的值
    1
    2
    3
    Stream.generate(Math::random)
    .limit(10)
    .forEach(System.out::println);
IntStream,LongStream()rangerangeClosed方法
  • static IntStream range(int startInclusive, int endExclusive)
  • static IntStream rangeClosed(int startInclusive, int endInclusive)
  • static LongStream range(long startInclusive, final long endExclusive)
  • static LongStream rangeClosed(long startInclusive, final long endInclusive)

中间操作

  • 一个中间操作链,对数据源的数据进行处理.
  • 多个中间操作可以连起来形成一个流水线,除非流水线触发终止操作,否则中间操作不会执行任何的处理,而在终止操作是,一次性全部处理称为"惰性求值"
筛选与切片(filter/distinct/limit/skip)
操作 说明
filter(Predicate p) 接收lambda,从流中排除/过滤某些元素
distinct() 筛选通过流所生成的元素的hashCode()和equals()去除重复元素
limit(long maxSize) 截断流,使其元素不超过给定数量
skip(long n) 跳过元素,返回一个扔掉前n个元素的流.若流中元素不足n个,则返回一个空流,与limit(n)互补.
映射(map/mapToDouble/mapToInt/mapToLong/flatMap)
操作 说明
map(Function f) 接收一个函数作为参数,该函数会被应用到每个元素上并将其映射成一个新元素.
mapToDouble(ToDoubleFunction f) 接收一个函数作为参数,该函数会被应用到每个元素上并将其映射成一个新的DoubleStream.
mapToInt(ToIntFunction f) 接收一个函数作为参数,该函数会被应用到每个元素上并将其映射成一个新的IntStream.
mapToLong(ToLongFunction f) 接收一个函数作为参数,该函数会被应用到每个元素上并将其映射成一个新的LongStream.
flatMap(Function f) 接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流
  • mapflatMap的区别
    • 若需要将每个元素转换为一个值,则使用Stream.map方法
    • 若需要将每个元素转换为多个值,且需要将生成的流"展平",则使用Stream.flatMap方法
      • 方法参数Function产生的一个输出值流
      • 生成的元素被展平为一个新的流.
排序(sorted)
操作 说明
sorted() 产生一个新流,其中按自然顺序排序.
sorted(Comparator c) 产生一个新流,其中按比较器顺序排序.
装箱流(boxed/mapToInt)
  • 使用基本类型流创建集合

  • 解决方法

    • 使用java.util.stream.IntStream接口定义的boxed方法来包装元素.

      1
      2
      3
      IntStream.of(1,2,3,4)
      .boxed()
      .collect(Collectors.toList());
    • 可以使用合适的包装器类来映射值

      1
      2
      3
      IntStream.of(1,2,3,4)
      .mapToInt(Integer::valueof)
      .collect(Collectors.toList());
    • 还可以使用collect方法的三参数形式

      1
      2
      3
      IntStream.of(1,2,3,4)
      .collect(ArrayList<Integer>::new,
      Arrays::add,ArrayList::addAll);
      • Supplier是ArrayList<Integer>的构造函数.累加器为add方法,表示如何添加单个元素.仅在并行操作中使用的组合器(combiner)是addAll方法,它能将两个列表合二为一.
使用peek方法调试流
1
2
3
4
5
6
7
IntStream.rangeClosed(1,10)
.peek(n-> System.out.printf("original:%d%n",n))
.map(n->n*2)
.peek(n-> System.out.printf("doubled: %d%n",n))
.filter(n->n%3==0)
.peek(n-> System.out.printf("filtered: %d%n",n))
.sum();
流的拼接(concat/flatMap)
  • Stream.concat方法适用于合并两个流.如果需要合并多个流,则需要使用Stream.flatMap

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     		@Test
    public void testStream2() {
    List<Integer> numbers1 = Arrays.asList(1, 2, 3);
    List<Integer> numbers2 = Arrays.asList(3, 4);
    numbers1.stream()
    .flatMap(i -> numbers2.stream()
    .map(j -> new int[]{i, j})
    )
    .collect(toList()).forEach(item-> System.out.println(Arrays.toString(item)));
    }

    //
    [1, 3]
    [1, 4]
    [2, 3]
    [2, 4]
    [3, 3]
    [3, 4]
  • concat方法将创建一个惰性的拼接流,其元素是第一个流的所有元素,后跟第二个流的所有元素.

终止操作(终端操作)

  • 一个终止操作,执行中间操作链,并产生结果.

  • 终止操作会从流的流水线生成结果,其中结果可以是任何不是流的值,如List,Integer,甚至为void

查找与匹配
操作 说明
allMatch(Predicate p) 检查是否匹配所有元素
anyMatch(Predicate p) 检查是否至少匹配一个元素
noneMatch(Predicate p) 检查是否没有匹配所有元素
findFirst() 返回第一个元素
findAny() 返回当前流中的任意元素
count() 返回流中元素总数
max(Comparator c) 返回流中最大值
min(Comparator c) 返回流中最小值
forEach(Consumer c) 内部迭代.使用Collection接口需要用户做迭代被称为外部迭代
归约(reduce)
  • Java的函数式范式经常使用"映射-筛选-归约"(map-filter-reduce)的过程处理数据.
    • 首先map操作将一种类型的流转换为另一种类型接着filter操作产生一个新的流,它仅包含所需的元素,最后通过终止操作从流中生成单个值.
操作 说明
T reduce(T identity, BinaryOperator<T> accumulator) identity为累加器的初始值,可以将流中元素反复结合起来,得到一个值,返回T
Optional<T> reduce(BinaryOperator<T> accumulator) 可以将流中元素反复结合起来,得到一个值,返回Optional<T>
  • 示例

    1
    2
    3
    4
    // 1-10 求和
    IntStream.rangeClosed(1, 10).reduce(Integer::sum).orElse(0)

    final int reduce = IntStream.rangeClosed(1, 10).reduce(0, (x, y) -> x +2 * y);
收集(collect)
操作 说明
<R, A> R collect(Collector<? super T, A, R> collector); 将流转换为其他形式.
  • java.util.stream.Collectors,使用工具类的toList,toSet,toCollection,toMap
  • Function.identity()
分组(Collectors.groupingBy)
  • Collectors.groupingBy方法生成一个由类别构成的Map,其中值为每个类别中的元素.

  • 多级分组:

    • 要实现多级分组,可以使用一个由双参数版本的Collectors.groupingBy工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受collector类型的第二个参数.那么要进行二级分组的话,我们可以把一个内层groupingBy传递给外层groupingBy,并定义一个为流中项目分类的二级标准.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel =
    menu.stream().collect(
    groupingBy(Dish::getType,
    groupingBy(dish -> {
    if (dish.getCalories() <= 400) return CaloricLevel.DIET;
    else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
    else return CaloricLevel.FAT;
    }))
    );

    //
    {MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]},
    FISH={DIET=[prawns], NORMAL=[salmon]},
    OTHER={DIET=[rice, seasonal fruit], NORMAL=[french fries, pizza]}}
分区(Collectors.partitioningBy)
  • Collectors.partitioningBy方法将元素拆分为满足Predicate与不满足Predicate的两类

    1
    2
    3
    4
    public static <T>
    Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
    return partitioningBy(predicate, toList());
    }
    1
    2
    3
    4
    5
    6
    Map<Boolean, List<Dish>> partitionedMenu =
    menu.stream().collect(partitioningBy(Dish::isVegetarian));

    //
    {false=[pork, beef, chicken, prawns, salmon],
    true=[french fries, rice, season fruit, pizza]}
  • 多级分区

    • 定义
    1
    2
    3
    public static <T, D, A>
    Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
    Collector<? super T, A, D> downstream)
    • 示例
    1
    2
    menu.stream().collect(partitioningBy(Dish::isVegetarian,
    partitioningBy (d -> d.getCalories() > 500)));

中间操作和终端操作状态

操作 类型 返回值 使用的类型/函数式接口 函数描述符
filter 中间 Stream<T> Predicate<T> T->boolean
distinct 中间(有状态-有界) Stream<T>
skip 中间(有状态-有界) Stream<T> long
limit 中间(有状态-有界) Stream<T> long
map 中间 Stream<R> Function<T> T->R
flatmap 中间 Stream<R> Function<T,Stream<R>> T->Stream<R>
sorted 中间(有状态-无界) Stream<T> Comparator<T>
anyMatch 终端 boolean Predicate<T> T->boolean
noneMatch 终端 boolean Predicate<T> T->boolean
allMatch 终端 boolean Predicate<T> T->boolean
findAny 终端 Optional<T>
findFirst 终端(有状态) Optional<T>
forEach 终端 void Consumer<T> T->void
collect 终端 R Collector<T,A,R>
reduce 终端(有状态-有界) Optional<T> BinaryOperator<T> (T,T)->T
count 终端 long
collect 终端(有状态) R Collector<T, A, R>
min 终端(有状态) Optional<T> Comparator<T> (T, T) -> int
max 终端(有状态) Optional<T> Comparator<T> (T, T) -> int
count 终端(有状态) long
forEach 终端 void Consumer<T> T -> void
forEachOrdered 终端(有状态) void Consumer<T> T -> void
toArray 终端(有状态) Object[]

Collectors

img

img

比较器和收集器

使用比较器实现排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@FunctionalInterface
public interface Comparator<T> {
int compare(T var1, T var2);

boolean equals(Object var1);

default Comparator<T> reversed() {
return Collections.reverseOrder(this);
}

default Comparator<T> thenComparing(Comparator<? super T> other) {
Objects.requireNonNull(other);
return (Comparator)((Serializable)((c1, c2) -> {
int res = this.compare(c1, c2);
return res != 0 ? res : other.compare(c1, c2);
}));
}
// ...略
}
  • 使用传入ComparatorStream.sorted方法,Comparator既可以通过lambda表达式实现,也可以使用Comparator接口定义的某种comparing方法生成.
  • Stream.sorted方法根据类的自然顺序生成一个新的排序流,自然顺序是通过实现java.util.Comparable接口来指定的.
  • 从Java1.2引入集合框架开始,工具类Collections就已存在.Collections类定义的静态方法sort传入List作为参数,但返回是void.这种排序是破坏性的,会修改锁提供的集合.即Collection.sort方法不符合Java8所倡导的不可变性(immutability)置于首要位置的函数式编程原则.
  • Java8采用Stream.sorted方法实现相同的排序,但不对原始集合进行修改,而是生成一个新的流.
  • 若希望以其他方式排序,可以使用sorted方法的重载形式,传入Comparator作为参数.

对Map排序

  • Map接口始终包含一个称为Map.Entry的公共静态内部接口,它表示一个键值对.Map接口定义的entrySet方法返回Map.Entry元素的Set.在Java8之前,getKeygetValueMap.Entry接口两种最常用的方法,二者分别返回与某个条目的对应的键和值.
方法 描述
comparingByKey 返回一个比较器,它根据键的自然顺序比较Map.Entry
comparingByValue 返回一个比较器,它根据值的自然顺序比较Map.Entry
comparingByKey(Comparator<? super K> cmp) 返回一个比较器,它使用给定的Comparator并根据键比较Map.Entry
comparingByValue(Comparator<? super V> cmp) 返回一个比较器,它使用给定的Comparator并根据值比较Map.Entry

实现Collector接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package java.util.stream;

import java.util.Collections;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;

// T: 是流中要收集的项目的泛型
// A: 是累计器的类型,累加器是在收集过程中用于累积部分结果的对象
// R: 是收集操作得到的对象(通常但并不一定是集合)的类型
public interface Collector<T, A, R> {
/**
* A function that creates and returns a new mutable result container.
*
* @return a function which returns a new, mutable result container
*/
Supplier<A> supplier();

/**
* A function that folds a value into a mutable result container.
*
* @return a function which folds a value into a mutable result container
*/
BiConsumer<A, T> accumulator();

/**
* A function that accepts two partial results and merges them. The
* combiner function may fold state from one argument into the other and
* return that, or may return a new result container.
*
* @return a function which combines two partial results into a combined
* result
*/
BinaryOperator<A> combiner();

/**
* Perform the final transformation from the intermediate accumulation type
* {@code A} to the final result type {@code R}.
*
* <p>If the characteristic {@code IDENTITY_FINISH} is
* set, this function may be presumed to be an identity transform with an
* unchecked cast from {@code A} to {@code R}.
*
* @return a function which transforms the intermediate result to the final
* result
*/
Function<A, R> finisher();

/**
* Returns a {@code Set} of {@code Collector.Characteristics} indicating
* the characteristics of this Collector. This set should be immutable.
*
* @return an immutable set of collector characteristics
*/
Set<Characteristics> characteristics();
// ...略
}
  • Supplier<A> supplier()

    • 使用Supplier<A>创建累加容器(accumulator container),建立新的结果容器
    • 用于创建累加器临时结果所用的容器
  • BiConsumer<A, T> accumulator()

    • 使用BiConsumer<A, T>将元素添加到结果容器
    • 用于将一个元素添加到累加器
  • BinaryOperator<A> combiner()

    • 使用BinaryOperator<A>合并两个结果容器
    • BinaryOperator表示输入类型和输出类型相同,因此可以将两个累加器合二为一
  • Function<A, R> finisher()

    • 使用Function<A, R>对结果容器应用最终转换
    • Function将累加器转换为所需的结果容器.
  • Set<Characteristics> characteristics()

    • characteristics会返回一个不可变的Characteristics集合,它定义了收集器的行为,尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Characteristics indicating properties of a {@code Collector}, which can
* be used to optimize reduction implementations.
*/
enum Characteristics {
/**
* Indicates that this collector is <em>concurrent</em>, meaning that
* the result container can support the accumulator function being
* called concurrently with the same result container from multiple
* threads.
*
* <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
* then it should only be evaluated concurrently if applied to an
* unordered data source.
*/
CONCURRENT,

/**
* Indicates that the collection operation does not commit to preserving
* the encounter order of input elements. (This might be true if the
* result container has no intrinsic order, such as a {@link Set}.)
*/
UNORDERED,

/**
* Indicates that the finisher function is the identity function and
* can be elided. If set, it must be the case that an unchecked cast
* from A to R will succeed.
*/
IDENTITY_FINISH
}
  • CONCURRENT: 表示accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流.如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并行归约
  • UNORDERED: 表示归约结果不受流中项目的遍历和累积顺序的影响.
  • IDENTITY_FINISH: 这表明完成器方法返回的函数是一个恒等函数,可以跳过.这种情况下,累加器对象将会直接用作归约过程的最终结果.这也意味着,将累加器A不加检查地转换为结果R是安全的.

示例

  • 第一种: 实现Collector接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.holelin.sundry.test.common;

import java.util.*;
import java.util.function.*;
import java.util.stream.Collector;
import static java.util.stream.Collector.Characteristics.*;

public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {

// 创建集合操作的起始点
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}

// 累积遍历过的项目,原位修改累加器
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}

// 恒等函数
@Override
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}

// 修改第一个累加器,将其与第二个累加器的内容合并
@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
// 返回修改后的第一个累加器
return list1;
};
}

// 为收集器添加IDENTITY_FINISH和CONCURRENT标志
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(
IDENTITY_FINISH, CONCURRENT));
}
}
  • 第二种: 对于IDENTITY_FINISH的收集操作,还有一种方法可以得到同样的结果而无需从头实现新的Collectors接口
    • Stream有一个重载的collect方法可以接受另外三个函数——supplier、accumulator和combiner,其语 义和 Collector接口的相应方法返回的函数完全相同.
1
2
3
4
List<Dish> dishes = menuStream.collect( 
ArrayList::new, // 供应源
List::add, // 累加器
List::addAll); // 组合器

闭包复合

  • 使用Function,ConsumerPredicate接口中定义的默认的复合方法.

Function

1
2
3
4
5
6
7
8
9
10

default <V> Function<V, R> compose(Function<? super V, ? extends T> before) {
Objects.requireNonNull(before);
return (V v) -> apply(before.apply(v));
}

default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t) -> after.apply(apply(t));
}
  • compose方法在原始函数之前应用参数
  • andThen方法在原始函数之后应用参数
1
2
3
4
5
6
7
8
9
 				Function<Integer, Integer> add = x -> x + 2;
Function<Integer, Integer> mult = x -> x * 3;
final Function<Integer, Integer> multadd = add.compose(mult);
final Function<Integer, Integer> addmult = add.andThen(mult);
System.out.println("multadd(1): " + multadd.apply(1));
System.out.println("addmult(1): " + addmult.apply(1));

// multadd(1): 5
// addmult(1): 9

Consumer

1
2
3
4
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}

Predicate

1
2
3
4
5
6
7
8
9
10
11
12
13
default Predicate<T> and(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) && other.test(t);
}

default Predicate<T> negate() {
return (t) -> !test(t);
}

default Predicate<T> or(Predicate<? super T> other) {
Objects.requireNonNull(other);
return (t) -> test(t) || other.test(t);
}

并行流

流的数据源和可分解性

可分解性
List -
ArrayList 极佳
LinkedList 不适合
Vector 不适合
Stack 不适合
CopyOnWriteArrayList
Set
HashSet 极佳
TreeSet 不适合
LinkedHashSet 极佳
EnumSet 极佳
CopyOnWriteArraySet 极佳
Queue
PriorityQueue 不适合
ArrayDeque 极佳
ConcurrentLinkedQueue 极佳
LinkedBlockingQueue 不适合
Map
HashMap 极佳
LinkedHashMap
TreeMap 不适合
EnumMap
ConcurrentHashMap
HashTable 不适合
Properties 不适合
Stream
IntStream.rang 极佳
Strean.iterate
  • ArrayList:由于元素是连续存储的,适合并行化处理.

  • LinkedList:由于元素不是连续存储的,不适合并行化处理.

  • Vector:由于是线程安全的,但是在并发环境下可能会出现性能问题,不建议使用并行化处理.

  • Stack:由于是线程安全的,但是在并发环境下可能会出现性能问题,不建议使用并行化处理.

  • CopyOnWriteArrayList:由于是线程安全的,且适合并行化处理.

  • HashSet:由于哈希表的元素分散存储在不同的桶中,适合并行化处理.

  • LinkedHashSet:由于哈希表和链表的特点,适合并行化处理.

  • TreeSet:由于红黑树的元素是有序的,需要注意线程安全问题,在并行化处理时需要进行同步.

  • EnumSet:由于是基于位向量实现的,适合并行化处理.

  • CopyOnWriteArraySet:由于是线程安全的,且适合并行化处理.

  • PriorityQueue:由于基于堆实现,需要注意线程安全问题,在并行化处理时需要进行同步.

  • ArrayDeque:由于是基于数组实现的,适合并行化处理.

  • ConcurrentLinkedQueue:由于是线程安全的,且适合并行化处理.

  • LinkedBlockingQueue:由于是线程安全的,但是在并发环境下可能会出现性能问题,不建议使用并行化处理.

  • HashMap:由于哈希表的元素分散存储在不同的桶中,适合并行化处理.

  • LinkedHashMap:由于哈希表和链表的特点,适合并行化处理.

  • TreeMap:由于红黑树的元素是有序的,需要注意线程安全问题,在并行化处理时需要进行同步.

  • EnumMap:由于是基于数组实现的,适合并行化处理.

  • ConcurrentHashMap:由于是线程安全的,且适合并行化处理.

  • Hashtable:由于是线程安全的,但是在并发环境下可能会出现性能问题,不建议使用并行化处理.

  • Properties:由于是基于哈希表实现的,适合并行化处理.

分支/合并框架

使用RecursiveTask

  • java.util.concurrent.RecursiveTask

  • 拆分逻辑

    1
    2
    3
    4
    5
    6
    7
    if (任务足够小或不可分) { 
    顺序计算该任务
    } else {
    将任务分成两个子任务
    递归调用本方法,拆分每个子任务,等待所有子任务完成
    合并每个子任务的结果
    }
  • 示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package com.holelin.sundry.test.common;

    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.stream.LongStream;

    public class ForkJoinSumCalculator
    extends java.util.concurrent.RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;

    public ForkJoinSumCalculator(long[] numbers) {
    this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
    this.numbers = numbers;
    this.start = start;
    this.end = end;
    }

    @Override
    protected Long compute() {
    int length = end - start;
    if (length <= THRESHOLD) {
    return computeSequentially();
    }
    // 将任务分成两个子任务
    ForkJoinSumCalculator leftTask =
    new ForkJoinSumCalculator(numbers, start, start + length / 2);
    leftTask.fork();
    ForkJoinSumCalculator rightTask =
    new ForkJoinSumCalculator(numbers, start + length / 2, end);
    // 递归调用本方法,拆分每个子任务,等待所有子任务完成
    Long rightResult = rightTask.compute();
    Long leftResult = leftTask.join();
    // 合并每个子任务的结果
    return leftResult + rightResult;
    }

    private long computeSequentially() {
    long sum = 0;
    for (int i = start; i < end; i++) {
    sum += numbers[i];
    }
    return sum;
    }

    public static void main(String[] args) {
    final long l = forkJoinSum(1000000L);
    System.out.println(l);
    }
    public static long forkJoinSum(long n) {
    long[] numbers = LongStream.rangeClosed(1, n).toArray();
    ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    return new ForkJoinPool().invoke(task);
    }
    }

使用分支/合并框架的最佳做法(Java8实战)

  • 对一个任务调用join方法会阻塞调用方,直到该任务做出结果.因此,有必要在两个子任务的计算都开始之后再调用它.否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动.
  • 对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步拆分还是已小到可以顺序求值.
  • 不应该在RecursiveTask内部使用ForkJoinPoolinvoke方法.相反,你应该始终直接调用computefork方法,只有顺序代码才应该用invoke来启动并行计算.
  • 对子任务调用fork方法可以把它排进ForkJoinPool.同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低.这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销.
  • 调试使用分支/合并框架的并行计算可能有点棘手.特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支合并计算上就不行了,因为调用compute的线程并不是概念上的调用方,后者是调用fork的那个.
  • 和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快.我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升.所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出同时进行.此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑.就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化.这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么做的.同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死码分析——删去从未被使用的计算)