parallelStream和Stream

2022/8/23 23:26:46

本文主要是介绍parallelStream和Stream,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1. 什么是流?

Stream是java8中新增加的一个特性,被java猿统称为流.

Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。

Stream 的另外一大特点是,数据源本身可以是无限的。

2. parallelStream是什么

parallelStream其实就是一个并行执行的流.它通过默认的ForkJoinPool,可能提高你的多线程任务的速度.

Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作,因此像以下的程式片段:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(out::println);

你得到的展示顺序不一定会是1、2、3、4、5、6、7、8、9,而可能是任意的顺序,就forEach()这个操作來讲,如果平行处理时,希望最后顺序是按照原来Stream的数据顺序,那可以调用forEachOrdered()。例如:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEachOrdered(out::println); 
注意:如果forEachOrdered()中间有其他如filter()的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如sorted()方法。

3. 线程安全问题

3.1 原因分析

ArrayIndexOutOfBoundsException异常说明是存放数据的list有问题,即list的索引超过了list的长度导致越界。而在代码中采用的list是ArrayList,具有自动扩容机制,但是,ArrayList是线程不安全的!

ArrayList的add方法如下:

public boolean add(E e) {
    ensureCapacityInternal(size + 1); 
    elementData[size++] = e;
    return true;
}

线程不安全问题的关键在于这行代码:elementData[size++] = e

其原子操作如下:
1. elementData[size] = e
2. 读取 size
3. size += 1

在多线程环境下,当两个线程同时执行ensureCapacityInternal(size + 1)得到了相同的size(假设此时size恰好为数组最后一位),没有触发扩容,此时线程A先一步执行完size+1,而后线程B读取到这个新的size,而后再次size+1,此时就会出现数组越界异常。

解决办法

使用线程安全的ArrayList:CopyOnWriteArrayList



这篇关于parallelStream和Stream的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程