Collection.parallelStream(): примеры (JAVA)

Метод parallelStream() для коллекций Java
Раздел: Параллельные стримы
Collection.parallelStream(): Stream

Описание и сигнатура

Метод parallelStream() определён как default-метод в интерфейсе java.util.Collection (начиная с Java 8). Сигнатура выглядит так:

default Stream parallelStream()

Возвращает параллельный поток (Stream<E>), готовый к выполнению операций в параллельных задачах. У метода нет параметров. Поток использует общий пул задач ForkJoinPool.commonPool() для выполнения параллельных задач, если не переопределён механизм запуска.

Ключевые свойства и поведение:

  • Возвращаемый тип: Stream<E>. Поток имеет параллельную модальност (isParallel() == true).
  • Аргументы: отсутствуют. Вызывается на экземпляре коллекции; вызов на null приведёт к NullPointerException.
  • Используется для распараллеливания операций на элементах коллекции с целью ускорения CPU-bound вычислений или для эффективного использования нескольких ядер процессора.
  • Порядок обработки зависит от источника и операций: встречающийся порядок (encounter order) по умолчанию обычно сохраняется для упорядоченных коллекций (например, List), но некоторые терминальные операции (например, forEach) не гарантируют порядок при параллельном выполнении; альтернативы: forEachOrdered.
  • Параллельные стримы подходят, когда операции являются ассоциативными и нечувствительны к состоянию (без побочных эффектов) или используют потокобезопасные структуры.
  • Пул потоков: используется ForkJoinPool.commonPool(); его параллелизм можно настроить через системное свойство -Djava.util.concurrent.ForkJoinPool.common.parallelism или с помощью собственного ForkJoinPool (см. примеры продвинутого использования).
  • Производительность: накладные расходы на управление задачами делают параллельные стримы неэффективными для очень маленьких коллекций или лёгких операций.

Важно учитывать потокобезопасность. Операции с изменяемым общим состоянием должны быть синхронизированы или заменены на безопасные альтернативы (например, Collectors.toConcurrentMap, LongAdder и т. п.).

Короткие примеры применения

Пример 1: суммирование чисел в параллельном потоке

List nums = Arrays.asList(1, 2, 3, 4, 5);
int sum = nums.parallelStream().mapToInt(Integer::intValue).sum();
System.out.println(sum);
15

Пример 2: forEach против forEachOrdered

List list = IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());
list.parallelStream().forEach(i -> System.out.print(i + " "));// порядок не гарантирован
System.out.println();
list.parallelStream().forEachOrdered(i -> System.out.print(i + " ")); // порядок сохранён
(пример вывода первой строки может быть:)
7 2 9 5 1 10 3 8 4 6 
1 2 3 4 5 6 7 8 9 10 

Пример 3: findAny в параллельном потоке

Optional any = nums.parallelStream().filter(i -> i % 2 == 0).findAny();
System.out.println(any.orElse(-1));
2 (или 4 - результат может отличаться)

Пример 4: небезопасный побочный эффект (гонка данных)

List result = new ArrayList<>();
nums.parallelStream().forEach(result::add); // ArrayList не потокобезопасен
System.out.println(result.size());
(возможный результат: число < 5, поведение непредсказуемо, возможен ConcurrentModificationException)

Пример 5: безопасный параллельный сбор в потокобезопасную коллекцию

List safe = nums.parallelStream().collect(Collectors.toList());
System.out.println(safe);
[1, 2, 3, 4, 5]

Похожие механизмы в Java

  • Collection.stream() - создаёт последовательный поток. Предпочтителен при упрощённой логике и когда порядок важен.
  • stream().parallel() - переводит уже созданный поток в параллельный. Эквивалент вызова parallelStream(), но удобен при динамическом выборе режима.
  • StreamSupport.stream(spliterator, true) - явное создание параллельного потока из Spliterator, даёт контроль над источником данных.
  • ForkJoinPool и ExecutorService - низкоуровневые средства управления задачами; предпочтительны при необходимости детального управления пулом и его политиками.
  • Для конкурентных сборщиков: Collectors.toConcurrentMap, groupingByConcurrent - позволяют эффективно собирать результаты в многопоточной среде.

Выбор зависит от требований: простота и читаемость - parallelStream(), детальный контроль над исполнением - собственный ForkJoinPool или ExecutorService.

Альтернативы в других языках

Краткое сравнение и примеры.

C# (PLINQ)

var res = Enumerable.Range(1, 10).AsParallel().Select(i => i * 2).ToArray();
Console.WriteLine(string.Join(",", res));
2,4,6,8,10,12,14,16,18,20

PLINQ предоставляет удобный способ распараллеливания LINQ-запросов, управление степенью параллелизма через WithDegreeOfParallelism.

Python

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as ex:
    res = list(ex.map(lambda x: x*x, range(1,11)))
print(res)
[1,4,9,16,25,36,49,64,81,100]

Глобальная блокировка интерпретатора (GIL) делает multiprocessing или ProcessPoolExecutor предпочтительными для CPU-bound задач.

JavaScript (Node.js / браузер)

// Node.js: worker_threads (упрощённо)
// В браузере: Web Workers
Promise.all(items.map(i => doAsync(i))).then(results => console.log(results));
[ /* результаты асинхронной обработки */ ]

JS не имеет встроенного CPU-параллелизма в одном потоке; для параллельной CPU-работы применяются воркеры.

Go

var wg sync.WaitGroup
results := make(chan int, 10)
for i := 1; i <= 10; i++ {
  wg.Add(1)
  go func(n int){ defer wg.Done(); results <- n*n }(i)
}
go func(){ wg.Wait(); close(results) }()
for r := range results { fmt.Println(r) }
(вывод квадратов в произвольном порядке)

Go предлагает лёгкий механизм параллелизма без необходимости в пуле потоков.

Kotlin

val res = (1..10).toList().parallelStream().map { it * 2 }.toList()
println(res)
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

Kotlin на JVM может использовать Java-параллельные стримы. Также доступны корутины для асинхронной неблокирующей работы.

В целом, подходы в других языках часто предлагают более явный контроль над пулом задач или используют процессы вместо потоков (Python). Отличия от Java: наличие/отсутствие глобальных блокировок, модель памяти, удобство управления параллелизмом и доступность высокоуровневых операторов в стиле stream/linq.

Типичные ошибки и примеры

1) Побочные эффекты с небезопасными коллекциями

List nums = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
List out = new ArrayList<>();
nums.parallelStream().forEach(out::add);
System.out.println(out.size());
(возможный вывод: число меньше 1000 или ConcurrentModificationException)

Причина: ArrayList не потокобезопасен. Решение: собирать через collect(Collectors.toList()) или использовать потокобезопасные структуры.

2) Несоответствующий редуктор (неассоциативная операция)

int res = Arrays.asList(1,2,3,4).parallelStream().reduce(0, (a,b) -> a - b);
System.out.println(res);
(результат может быть неожиданным, например -10)

Причина: оператор вычитания не ассоциативен; для параллельного reduce требуется ассоциативная и совместимая с combiner операция.

3) Ожидание порядка при использовании forEach

list.parallelStream().forEach(System.out::println);
(вывод в произвольном порядке)

Если важен порядок, следует использовать forEachOrdered или сохранять порядок при сборе.

4) Изменение коллекции во время стрима

List l = new ArrayList<>(Arrays.asList(1,2,3));
l.parallelStream().forEach(i -> l.remove(i));
ConcurrentModificationException (возможен)

5) Неподходящий выбор при малых объёмах данных

Для небольших коллекций накладные расходы на распараллеливание могут привести к худшей производительности по сравнению с последовательными потоками.

Изменения и эволюция

Метод parallelStream() был введён в Java 8 вместе со Streams API как default-метод интерфейса Collection. Сама сигнатура и поведение метода практически не менялись в последующих релизах JVM.

Сопутствующие изменения, влияющие на параллельные потоки:

  • Улучшения в ForkJoinPool (разные релизы Java вносили оптимизации в планировщик и производительность work-stealing).
  • Добавление новых коллекторов и Collectors API в более поздних версиях улучшило возможности для конкурентного сбора (toConcurrentMap, groupingByConcurrent).
  • В будущем архитектурные изменения (например, Project Loom) могут изменить подход к конкурентности на JVM, но непосредственных изменений в API parallelStream() не было.

Таким образом, API стабилен, но окружение исполнения (пулы задач и сопутствующие утилиты) эволюционирует.

Продвинутые и менее распространённые сценарии

Пример 1: запуск параллельного потока в собственном ForkJoinPool

Пример java
ForkJoinPool customPool = new ForkJoinPool(8);
List data = IntStream.rangeClosed(1, 1_000_000).boxed().collect(Collectors.toList());
try {
  List res = customPool.submit(() ->
    data.parallelStream().map(i -> i * 2).collect(Collectors.toList())
  ).get();
  System.out.println(res.size());
} catch (Exception e) { e.printStackTrace(); }
customPool.shutdown();
1000000

Пояснение: прямое изменение ForkJoinPool.commonPool() нежелательно в библиотеке. Использование собственного пула позволяет изолировать нагрузку.

Пример 2: groupingByConcurrent для масштабируемой группировки

Пример java
Map counts = IntStream.range(0, 10000).boxed()
  .parallelStream()
  .collect(Collectors.groupingByConcurrent(i -> i % 10, Collectors.counting()));
System.out.println(counts.get(3));
1000

Пояснение: groupingByConcurrent позволяет объединять результаты эффективно в многопоточном режиме.

Пример 3: использование LongAdder вместо AtomicInteger для счётчика

Пример java
LongAdder adder = new LongAdder();
IntStream.range(0, 1000000).parallel().forEach(i -> adder.increment());
System.out.println(adder.sum());
1000000

Пояснение: LongAdder уменьшает конкуренцию при высокой нагрузке по сравнению с AtomicInteger.

Пример 4: корректный редуцирующий комбайнер

Пример java
// Корректный параллельный reduce для суммы квадратов
long sumSq = LongStream.rangeClosed(1, 1000)
  .parallel()
  .reduce(0L, (a,b) -> a + b*b, (a,b) -> a + b);
System.out.println(sumSq);
(число: сумма квадратов от 1 до 1000, например 333833500)

Пример 5: измерение и сравнение производительности

Пример java
List list = IntStream.rangeClosed(1, 5_000_000).boxed().collect(Collectors.toList());
long t1 = System.nanoTime();
long s1 = list.stream().mapToLong(i -> heavy(i)).sum();
long seq = System.nanoTime() - t1;
long t2 = System.nanoTime();
long s2 = list.parallelStream().mapToLong(i -> heavy(i)).sum();
long par = System.nanoTime() - t2;
System.out.println("seq ms=" + (seq/1_000_000) + " par ms=" + (par/1_000_000));
// heavy - CPU-натяжная функция
seq ms=... par ms=... (в зависимости от нагрузки и числа ядер)

Пояснение: для CPU-bound задач и больших объёмов данных параллельный поток обычно быстрее, но нужно измерять в целевой среде.

Пример 6: безопасное форматирование дат в многопоточной среде

Пример java
DateTimeFormatter fmt = DateTimeFormatter.ISO_LOCAL_DATE;
List out = IntStream.rangeClosed(1, 100)
  .parallel()
  .mapToObj(i -> LocalDate.now().format(fmt))
  .collect(Collectors.toList());
System.out.println(out.size());
100

Пояснение: вместо небезопасного SimpleDateFormat предпочтительны неизменяемые форматтеры из java.time.

Пример 7: использование unordered() для оптимизации

Пример java
long count = list.parallelStream().unordered().filter(x -> x % 2 == 0).count();
System.out.println(count);
(число чётных элементов)

Пояснение: снятие требований порядка может помочь оптимизации выполнения в некоторых случаях.

джава Collection.parallelStream() function comments

En
Collection.parallelStream() Возвращает параллельный поток из коллекции