Java Stream简介和使用参考

1. 什么是Java Stream

Java Stream是Java 8引入的一个新的集合操作功能类,提供数据的流式处理,能够对Java集合数据实现类似大数据领域中map-reduce的数据处理方式,为集合对象进行各种高效的聚合和批量处理操作提供便利。Java Stream是Java语言在集合类Array/Collection/Map之后又一项集合功能扩展。

Java Stream提供串行和并行的数据处理模式,会在流式操作前后步骤中进行优化处理,可以在并发模式中充分利用多核处理器,实现高效的数据处理过程。

Java Stream不同于其它Java集合类(例如Array/Collection/Map等等),有其独自的特点,

  • 没有数据存储,Java Stream关注的是算法和计算,提供的是数据处理管道(pipeline)。
  • 数据流,数据源转换成Stream数据对象之后,Stream数据处理产生新的Stream数据对象,如此循环直到产生数据结果,整个处理的过程中不改变数据源。
  • 延后处理(Laziness-seeking),很多Stream操作,比如filter/map/duplicate removal,能够将多次操作优化合并后执行,延后处理提供了这种优化执行的机会,详情见下文。
  • 数据源可以是无限的,只要处理过程存在终止条件(例如limit/findFirst)。
  • 一次性消费,在一个Stream操作流水线中,数据源中每一个数据元素只被消费处理一次。如果想再消费,需重新构建一个新的Stream对象。

2. 简单的使用

下面是一个使用Java Stream的样例,使用了Stream的过滤-映射-聚合(filter-map-reduce)操作,

Integer sum = IntStream.range(0,10)
                       .filter(n -> n%2 == 0)
                       .map(n -> n*2)
                       .reduce(0, (x, y) -> x+y);

在这个样例中,首先拿到 [0,9] 十个数字的整形数据集合,然后过滤出偶数,然后对每个偶数乘2,最后算出总和为40。

3. Java Steam的使用演示

Java Stream各种操作方法演示代码请见这里,供使用时参考。请先通过如下git clone命令下载代码仓库,

git clone git@gitee.com:pphh/blog.git

演示代码放在文件夹171124_java_stream中,请打开演示代码项目,运行主程序。

里面包括如下几个部分的演示,

  1. 演示简单的stream集合处理(SimpleStream)
    • 在SimpleStream的演示方法中,通过filter-map-reduce操作数组,获取偶数数列,相加出总和。
  2. 演示Stream的构建操作(StreamBuilder.demo)
    • 通过Java数组生成Stream对象
    • 通过Java Collection集合类生成Stream对象
    • 通过Stream静态工厂类生成Stream对象
  3. 演示Stream的各种中间操作方法:StreamOpertions
    • Stream.filter:在数列中找到偶数,挑出字符串长度大于5的用户名
    • Stream.map:对用户名进行字符串大写处理,对多个集合合并处理
    • Stream.forEach:对集合元素一一执行指定动作,forEach是一个结束操作,只能在最后执行,并只被执行一次。
    • Stream.peek:对集合元素一一执行指定动作,和forEach不一样之处,peek是一个中间操作,可以执行多次。
    • Stream.reduce:聚合操作,获取偶数并取总和
    • Stream.skip:获取数列,跳过数组的前三个数据元素
    • Stream.limit:获取数列,指定获取数组的前面三个数据元素
    • Stream.sort:对集合数据元素进行排序
    • Stream.sort and Stream.limit:演示limit和sort一起工作后,执行流水线的变化,注意有了limit之后,sort只对限定的数据集合进行排序。该演示可以看到Java Stream对中间操作的优化。
    • Stream.min and Stream.max:获取数列中的最大值和最小值
    • Stream.distinct:获取新数列,去除重复值
    • Stream.allMatch, Stream.anyMatch, Stream.noneMatch:判断数列中是否所有数字大于0,有数字大于0,没有任何数字大于0
    • Stream.iterate:获取等差数列
    • Stream.Collectors:对数列进行分组
  4. 演示Stream的串并行处理:StreamParallel
    • 当没有指定并行处理时,数列中的数据在主线程中一一执行peek操作。
    • 若指定了并行处理时,数列中的数据在不同线程中执行peek操作,执行顺序不定。

    如下为Stream串并行处理的演示输出,可以看到前者一一执行,后者有不同线程执行,顺序也不定,
    [20171124 22:16:09-970][main] 演示:串行执行数据流式处理
    [20171124 22:16:09-971][main] 0
    [20171124 22:16:09-971][main] 1
    [20171124 22:16:09-971][main] 2
    [20171124 22:16:09-971][main] 3
    [20171124 22:16:09-971][main] 4
    [20171124 22:16:09-971][main] 5
    [20171124 22:16:09-972][main] 6
    [20171124 22:16:09-972][main] 7
    [20171124 22:16:09-972][main] 8
    [20171124 22:16:09-972][main] 9
    [20171124 22:16:09-972][main] 演示:并行执行数据流式处理
    [20171124 22:16:09-975][main] 9
    [20171124 22:16:09-975][main] 8
    [20171124 22:16:09-976][main] 7
    [20171124 22:16:09-976][ForkJoinPool.commonPool-worker-2] 4
    [20171124 22:16:09-976][ForkJoinPool.commonPool-worker-2] 3
    [20171124 22:16:09-976][ForkJoinPool.commonPool-worker-2] 2
    [20171124 22:16:09-976][ForkJoinPool.commonPool-worker-2] 1
    [20171124 22:16:09-976][ForkJoinPool.commonPool-worker-2] 0
    [20171124 22:16:09-977][ForkJoinPool.commonPool-worker-2] 5
    [20171124 22:16:09-977][main] 6

4. Stream的生成

Stream对象主要有如下几种获取方式

  • Java Collection类
    • java.util.Collection.stream()
    • java.util.Collection.parallelStream()
  • Java Arrays类
    • Java.util.Arrays.stream()
  • 调用Java Stream中的静态工厂方法
    • java.util.stream.Stream.of(Object[])
    • java.util.stream.Stream.iterate()
    • java.util.stream.IntStream.range()
  • 自定义生成
    • java.util.StreamSupport
  • 其它
    • Random.ints()
    • BitSet.stream()

5. Stream操作流水线和操作方法分类

有了Stream集合数据,我们接下来就可以调用Stream提供的操作方法,对数据进行流式处理,这种流式处理也被称为操作流水线(Stream Pipeline)。

在了解操作流式线定义之前首先需要了解下操作方法,Java Stream类中提供每一种操作方法都可划分为如下两大类型,

  1. 中间操作(Intermediate Operation):会产生另外一个Stream对象
  2. 结束操作(Terminal Operation):产生结果

一个Stream流水线通常由一个数据源,N个中间操作,一个结束操作组成,

Stream Pipeline = Source + N * Intermediate operations + Terminal Operation

其中N>=0。

需要注意的是,所有的中间操作都是延后执行(Lazy Operation),换句话说,当Stream对象调用一个中间操作方法时,其操作不会立即执行,而是只有当Stream对象调用了结束操作方法时,才开始执行整个操作流水线,按顺序依次执行各种intermediate和terminal操作。

Stream Lazy Intermediate Operation这种特性能够让数据的处理效率得到提升,Java Stream可以分析中间操作的算法,进行相应的操作合并,或者根据结束条件,一旦有满足结束条件,则提前结束流水线执行。例如,在上述的filter-map-reduce样例中,可以让filtering\mapping\summing三次操作放入同一次循环中完成。再比如在一组数据集合中找出一个小于零的数字,那么只要找到第一小于零的数字则可以提前结束整个流水线操作。

中间操作的延后执行对于无限的数据集合是非常必要的。Stream在面对无限的数据集合时,必须要定义一个结束操作条件,一旦该条件满足了,则流水线执行结束。

中间操作可以分为无状态和有状态的操作,

  1. Stateless operation是指当前中间操作不会给后续操作带来状态,每个数据元素可以独立地执行后续操作并获取结果,各个数据元素在执行后续操作时互不影响,没有相互关联关系,filter/map就是一种stateless operations。
  2. Stateful operations是指只有所有的数据元素都执行完当前中间操作,才能获取到执行结果。Sort/Distinct就是一种有状态的操作,前者对数据集合进行排序,后者对数据进行重复值去除。

操作的状态有无对Stream流水线的并发执行有影响,如果是有状态的,在并发执行过程中会产生大量的数据需要缓存,而如果是无状态的,则数据的缓存量会大大减少。

还有一种是短路操作(short-circuiting),是指对一个无限的对象集合可能在有限的时间内产生一个有限的集合,短路操作能够提前结束对无限对象集合的流水线执行,比如limit/anyMatch操作,前者获取指定个数的对象集合,后者查询是否有匹配的数据元素。中间操作和结束操作都可能是short-circuiting。

下表列出java.util.stream.Stream中各操作方法所具有的属性分类,标识Y表明该方法具有当前属性,

Stream Operations Stateless Stateful Intermediate Terminal Short-Circuit
filter Y Y
map Y Y
mapToInt Y Y
mapToLong Y Y
mapToDouble Y Y
flatMap Y Y
flatMapToLong Y Y
flatMapToDouble Y Y
peek Y Y
distinct Y Y
sorted Y Y
skip Y Y
forEach Y
forEachOrdered Y
toArray Y
reduce Y
collect Y
min Y
max Y
count Y
anyMatch Y Y
allMatch Y Y
noneMatch Y Y
findFirst Y Y
findAny Y Y
limit Y Y Y

6. 参考资料

发表评论