Stream.parallel(): примеры (JAVA)

Stream.parallel() в Java: разбор применения и примеры
Раздел: Параллельные стримы
Stream.parallel(): Stream

Общее описание и поведение Stream.parallel()

Метод parallel() в Java является дефолтным методом в интерфейсе java.util.stream.BaseStream (и реализован для Stream, IntStream, LongStream, DoubleStream). Он не принимает аргументов и возвращает поток того же типа, но с заявленной характеристикой выполнения в параллельном режиме. Подпись в коде:

Stream parallel()

Назначение: переключить существующий последовательный или уже параллельный поток в параллельный режим. Параллельный режим позволяет распараллелить обработку элементов через механизм разделения источника данных (Spliterator) и выполнения задач на пуле потоков (по умолчанию ForkJoinPool.commonPool).

Возвращаемое значение: новый или тот же объект потока (Stream) с флагом параллельности. Метод не меняет исходную коллекцию; операции по-прежнему ленивы и выполняются при терминальном вызове.

Важные особенности поведения:

  • Исполнение параллельного потока использует ForkJoinPool.commonPool по умолчанию; уровень параллелизма зависит от окружения и может быть настроен.
  • Порядок элементов (encounter order) для источников с упорядоченностью сохраняется, если не применять unordered(). Для сохранения порядка в побочном выводе есть forEachOrdered, но он снижает параллелизм.
  • Некоторые коллекторы и операции имеют особенности при параллельности: существует специальный набор характеристик коллектора (CONCURRENT, UNORDERED), которые влияют на объединение частичных результатов без дополнительной синхронизации.
  • Для корректности в параллельном режиме рекоммендуется использовать ассоциативные, без побочных эффектов операции при reduce и безопасные для многопоточности структуры при мутабельных операциях.

Когда применяется: при обработке больших наборов данных и при наличии CPU-bound задач, где выгодна распараллеливаемость. Не всегда приводит к ускорению: малые коллекции, I/O-bound операции или источники с плохой поддержкой разделения (например, Stream.iterate) могут работать медленнее.

Примеры базового использования

Несколько кратких примеров с кодом и возможными результатами.

1) Базовый параллельный вывод (порядок не гарантируется):

List list = IntStream.rangeClosed(1, 9).boxed().collect(Collectors.toList());
list.stream().parallel().forEach(System.out::println);
Возможный вывод (порядок произвольный):
5
2
7
1
9
4
3
6
8

2) Сохранение порядка при выводе:

IntStream.rangeClosed(1, 9).parallel().forEachOrdered(System.out::print);
123456789

3) Сумма с использованием параллельного IntStream (рекомендовано через готовые методы):

int sum = IntStream.rangeClosed(1, 1000).parallel().sum();
System.out.println(sum);
500500

4) Неправильное использование с разделяемым листом (гонка):

List unsafe = new ArrayList<>();
IntStream.range(0, 1000).parallel().forEach(unsafe::add);
System.out.println(unsafe.size());
Ожидалось 1000, реальный результат часто меньше 1000 из-за потерянных вставок

5) Корректный сбор без явной синхронизации:

List safe = IntStream.range(0, 1000).parallel().boxed().collect(Collectors.toList());
System.out.println(safe.size());
1000

6) Запуск параллельного потока в пользовательском пуле ForkJoinPool:

ForkJoinPool pool = new ForkJoinPool(4);
pool.submit(() -> IntStream.range(1, 10000).parallel().forEach(i -> {/*работа*/})).get();
pool.shutdown();
Задачи выполнены в пуле с параллелизмом 4

Аналоги и смежные механизмы в Java

  • Collection.parallelStream(): удобный способ получить параллельный поток от коллекции напрямую. Эквивалент применения stream().parallel(), но короче.
  • sequential(): переключает поток обратно в последовательный режим. Применяется для отключения параллелизма в цепочке.
  • ForkJoinPool и ExecutorService: прямое управление пулом потоков и задачами. Предпочтительно при необходимости тонкой настройки пула или при избежании общего пула.
  • Arrays.parallelSort: специализированный параллельный алгоритм для сортировки массивов, часто быстрее общего варианта при больших массивах.
  • CompletableFuture / parallel tasks: асинхронные цепочки и комбинирование задач, дающие гибкость управления параллелизмом вне Stream API.

Выбор: для простого распараллеливания коллекций удобнее parallelStream/parallel(). Для контроля пулов или при сложной координации лучше использовать ForkJoinPool или CompletableFuture.

Альтернативы в других языках и их отличия

Краткие сравнения и примеры в популярных языках.

Python (concurrent.futures): GIL ограничивает CPU-параллелизм в потоках, для CPU-bound используется ProcessPoolExecutor.

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as ex:
    res = list(ex.map(lambda x: x*x, range(1, 11)))
print(res)
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

C# (PLINQ): параллельные LINQ-операции через AsParallel.

var sum = Enumerable.Range(1, 1000000).AsParallel().Sum();
Console.WriteLine(sum);
500000500000

JavaScript (Node.js): нет встроенного потокового параллелизма для CPU; используются worker_threads или промисы для асинхронных операций. Пример с Promise.all (параллелизм I/O):

const tasks = [1,2,3].map(i => Promise.resolve(i*i));
Promise.all(tasks).then(console.log);
[1, 4, 9]

Golang: легковесный параллелизм через горутины и WaitGroup; распараллеливание контролируется вручную.

var wg sync.WaitGroup
res := make([]int, 3)
for i := 1; i <= 3; i++ {
  wg.Add(1)
  go func(i int){ defer wg.Done(); res[i-1]=i*i }(i)
}
wg.Wait()
fmt.Println(res)
[1 4 9]

Kotlin: доступ к Java Streams (parallelStream) и корутины с Dispatchers.Default для параллельных задач. Пример корутин даёт удобство отмены и структурной конкуренции.

runBlocking {
  val results = (1..3).map { n ->
    async(Dispatchers.Default) { n * n }
  }.awaitAll()
  println(results)
}
[1, 4, 9]

SQL: параллелизм выполняется внутри СУБД на уровне планов выполнения, не управляется через Stream API, но обеспечивает быстрые set-based операции на сервере.

Отличия от Java Stream.parallel(): в большинстве языков параллелизм задаётся явным управлением задачами и пулом потоков, тогда как Stream.parallel() предоставляет декларативный стиль обработки коллекций и опирается на Spliterator и объединяющие коллекторы.

Типичные ошибки и примеры

Перечень распространённых ошибок при использовании параллельных потоков.

1) Мутирование общей коллекции без синхронизации:

List result = new ArrayList<>();
IntStream.range(0, 1000).parallel().forEach(result::add);
System.out.println(result.size());
Часто возвращает значение меньше 1000 из-за гонок

Правильные альтернативы: использовать collect(Collectors.toList()), Collections.synchronizedList(...) или потокобезопасную коллекцию.

2) Ожидание упорядоченного вывода при forEach (потеря производительности):

stream.parallel().forEachOrdered(System.out::println);
Вывод упорядочен, но параллельность существенно снижена

3) Использование непоследовательно объединяющегося reduce: операторы reduce должны быть ассоциативны и без сторонних эффектов.

int bad = Stream.of(1,2,3,4).parallel().reduce(0, (a,b) -> a - b);
System.out.println(bad);
Результат непредсказуем или некорректен для параллельного выполнения

4) Неподходящие источники данных: Stream.iterate плохо распараллеливается, что ведёт к низкой производительности.

Stream.iterate(1, i -> i+1).limit(1000000).parallel().map(...)
Может работать медленно из-за невозможности эффективного разбиения

5) Блокирующие операции внутри параллельного потока: при I/O-bound задачах параллельный поток может не дать выигрыша и привести к увеличению времени из-за ожидания.

Изменения и эволюция в JVM

Ключевые моменты развития параллельных потоков:

  • Введение Stream API и метода parallel() в Java 8 вместе с ForkJoinPool.commonPool.
  • Дальнейшие версии JVM включали оптимизации ForkJoinPool и улучшения производительности встроенных коллекций и коллекторов, которые косвенно повлияли на эффективность параллельных потоков.
  • Добавление новых коллекторов и улучшение характеристик collectors в более поздних релизах позволило эффективнее собирать результаты при параллельной работе (например, groupingByConcurrent, улучшения в toArray и т.д.).
  • Практика рекомендует контролировать общие ресурсы и при необходимости использовать собственные ForkJoinPool, так как общий пул может быть занят и повлиять на другие параллельные операции.

Конкретных изменений в сигнатуре parallel() не было; эволюция касалась реализации, оптимизаций и сопутствующих API.

Расширенные и редкие сценарии применения

Несколько продвинутых примеров с пояснениями.

1) Использование groupingByConcurrent для конкурентного группирования:

Пример java
List words = Arrays.asList("one","two","three","four","five","six");
Map> byLen = words.parallelStream()
    .collect(Collectors.groupingByConcurrent(String::length));
System.out.println(byLen);
{3=[one, two, six], 4=[four, five], 5=[three]}

Пояснение: groupingByConcurrent собирает элементы конкурентно без глобальной блокировки, что даёт прирост для больших наборов.

2) Запуск параллельного потока в отдельном пуле, чтобы не блокировать commonPool:

Пример java
ForkJoinPool custom = new ForkJoinPool(8);
try {
  custom.submit(() ->
    IntStream.range(1, 1000000).parallel().forEach(i -> doCpuWork(i))
  ).get();
} finally { custom.shutdown(); }
Выполнение использует custom пул с 8 потоками вместо общего

Пояснение: полезно при интеграции с кодом, который тоже использует commonPool, чтобы избежать взаимной блокировки.

3) Правильный вариант мутабельного редьюса с Collector.of и характеристикой CONCURRENT:

Пример java
Collector>> c =
  Collector.of(
    ConcurrentHashMap>::new,
    (map, v) -> map.computeIfAbsent(v%10, k -> Collections.synchronizedList(new ArrayList<>())).add(v),
    (m1, m2) -> { m2.forEach((k,v) -> m1.merge(k, v, (a,b)->{ a.addAll(b); return a;})); return m1; },
    Collector.Characteristics.CONCURRENT
  );
ConcurrentMap> grouped = IntStream.range(0, 10000).parallel().boxed().collect(c);
System.out.println(grouped.size());
10

Пояснение: создание собственного коллектора даёт контроль над процессом и позволяет максимально использовать параллелизм.

4) Проблемы с источниками, плохо разбиваемыми для параллели: замена Stream.iterate на LongStream.range для лучшего разделения:

Пример java
// Плохо
Stream.iterate(1L, i -> i + 1).limit(10_000_000).parallel().map(x -> x*x).count();
// Лучше
LongStream.rangeClosed(1, 10_000_000).parallel().map(x -> x*x).count();
LongStream.rangeClosed быстрее на многопроцессорных системах

5) Nested parallelism и ограничение commonPool: запуск parallel внутри parallel может привести к чрезмерному созданию задач и деградации. Рекомендация: контролировать уровни параллелизма или использовать собственные пулы для верхнего уровня.

6) Профилирование: пример замера времени выполнения последовательного и параллельного потоков для CPU-bound задачи:

Пример java
long t1 = System.nanoTime();
long s1 = LongStream.rangeClosed(1, 5_000_000).map(x -> x*x).sum();
long dur1 = System.nanoTime() - t1;
long t2 = System.nanoTime();
long s2 = LongStream.rangeClosed(1, 5_000_000).parallel().map(x -> x*x).sum();
long dur2 = System.nanoTime() - t2;
System.out.println("seq=" + dur1/1_000_000 + "ms par=" + dur2/1_000_000 + "ms");
seq=XXXms par=YYYms (XXX и YYY зависят от машины; для CPU-bound часто par < seq)

Пояснение: реальные числа зависят от числа ядер, кэширования и накладных расходов на разбиение/объединение.

джава Stream.parallel() function comments

En
Stream.parallel() Возвращает параллельный поток