Stream.parallel(): примеры (JAVA)
Stream.parallel(): StreamОбщее описание и поведение Stream.parallel()
Метод parallel() в Java является дефолтным методом в интерфейсе java.util.stream.BaseStream (и реализован для Stream, IntStream, LongStream, DoubleStream). Он не принимает аргументов и возвращает поток того же типа, но с заявленной характеристикой выполнения в параллельном режиме. Подпись в коде:
Streamparallel()
Назначение: переключить существующий последовательный или уже параллельный поток в параллельный режим. Параллельный режим позволяет распараллелить обработку элементов через механизм разделения источника данных (Spliterator) и выполнения задач на пуле потоков (по умолчанию ForkJoinPool.commonPool).
Возвращаемое значение: новый или тот же объект потока (Stream
Важные особенности поведения:
- Исполнение параллельного потока использует ForkJoinPool.commonPool по умолчанию; уровень параллелизма зависит от окружения и может быть настроен.
- Порядок элементов (encounter order) для источников с упорядоченностью сохраняется, если не применять unordered(). Для сохранения порядка в побочном выводе есть forEachOrdered, но он снижает параллелизм.
- Некоторые коллекторы и операции имеют особенности при параллельности: существует специальный набор характеристик коллектора (CONCURRENT, UNORDERED), которые влияют на объединение частичных результатов без дополнительной синхронизации.
- Для корректности в параллельном режиме рекоммендуется использовать ассоциативные, без побочных эффектов операции при reduce и безопасные для многопоточности структуры при мутабельных операциях.
Когда применяется: при обработке больших наборов данных и при наличии CPU-bound задач, где выгодна распараллеливаемость. Не всегда приводит к ускорению: малые коллекции, I/O-bound операции или источники с плохой поддержкой разделения (например, Stream.iterate) могут работать медленнее.
Примеры базового использования
Несколько кратких примеров с кодом и возможными результатами.
1) Базовый параллельный вывод (порядок не гарантируется):
List list = IntStream.rangeClosed(1, 9).boxed().collect(Collectors.toList());
list.stream().parallel().forEach(System.out::println);
Возможный вывод (порядок произвольный): 5 2 7 1 9 4 3 6 8
2) Сохранение порядка при выводе:
IntStream.rangeClosed(1, 9).parallel().forEachOrdered(System.out::print);
123456789
3) Сумма с использованием параллельного IntStream (рекомендовано через готовые методы):
int sum = IntStream.rangeClosed(1, 1000).parallel().sum();
System.out.println(sum);
500500
4) Неправильное использование с разделяемым листом (гонка):
List unsafe = new ArrayList<>();
IntStream.range(0, 1000).parallel().forEach(unsafe::add);
System.out.println(unsafe.size());
Ожидалось 1000, реальный результат часто меньше 1000 из-за потерянных вставок
5) Корректный сбор без явной синхронизации:
List safe = IntStream.range(0, 1000).parallel().boxed().collect(Collectors.toList());
System.out.println(safe.size());
1000
6) Запуск параллельного потока в пользовательском пуле ForkJoinPool:
ForkJoinPool pool = new ForkJoinPool(4);
pool.submit(() -> IntStream.range(1, 10000).parallel().forEach(i -> {/*работа*/})).get();
pool.shutdown();
Задачи выполнены в пуле с параллелизмом 4
Аналоги и смежные механизмы в Java
- Collection.parallelStream(): удобный способ получить параллельный поток от коллекции напрямую. Эквивалент применения stream().parallel(), но короче.
- sequential(): переключает поток обратно в последовательный режим. Применяется для отключения параллелизма в цепочке.
- ForkJoinPool и ExecutorService: прямое управление пулом потоков и задачами. Предпочтительно при необходимости тонкой настройки пула или при избежании общего пула.
- Arrays.parallelSort: специализированный параллельный алгоритм для сортировки массивов, часто быстрее общего варианта при больших массивах.
- CompletableFuture / parallel tasks: асинхронные цепочки и комбинирование задач, дающие гибкость управления параллелизмом вне Stream API.
Выбор: для простого распараллеливания коллекций удобнее parallelStream/parallel(). Для контроля пулов или при сложной координации лучше использовать ForkJoinPool или CompletableFuture.
Альтернативы в других языках и их отличия
Краткие сравнения и примеры в популярных языках.
Python (concurrent.futures): GIL ограничивает CPU-параллелизм в потоках, для CPU-bound используется ProcessPoolExecutor.
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as ex:
res = list(ex.map(lambda x: x*x, range(1, 11)))
print(res)
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
C# (PLINQ): параллельные LINQ-операции через AsParallel.
var sum = Enumerable.Range(1, 1000000).AsParallel().Sum();
Console.WriteLine(sum);
500000500000
JavaScript (Node.js): нет встроенного потокового параллелизма для CPU; используются worker_threads или промисы для асинхронных операций. Пример с Promise.all (параллелизм I/O):
const tasks = [1,2,3].map(i => Promise.resolve(i*i));
Promise.all(tasks).then(console.log);
[1, 4, 9]
Golang: легковесный параллелизм через горутины и WaitGroup; распараллеливание контролируется вручную.
var wg sync.WaitGroup
res := make([]int, 3)
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(i int){ defer wg.Done(); res[i-1]=i*i }(i)
}
wg.Wait()
fmt.Println(res)
[1 4 9]
Kotlin: доступ к Java Streams (parallelStream) и корутины с Dispatchers.Default для параллельных задач. Пример корутин даёт удобство отмены и структурной конкуренции.
runBlocking {
val results = (1..3).map { n ->
async(Dispatchers.Default) { n * n }
}.awaitAll()
println(results)
}
[1, 4, 9]
SQL: параллелизм выполняется внутри СУБД на уровне планов выполнения, не управляется через Stream API, но обеспечивает быстрые set-based операции на сервере.
Отличия от Java Stream.parallel(): в большинстве языков параллелизм задаётся явным управлением задачами и пулом потоков, тогда как Stream.parallel() предоставляет декларативный стиль обработки коллекций и опирается на Spliterator и объединяющие коллекторы.
Типичные ошибки и примеры
Перечень распространённых ошибок при использовании параллельных потоков.
1) Мутирование общей коллекции без синхронизации:
List result = new ArrayList<>();
IntStream.range(0, 1000).parallel().forEach(result::add);
System.out.println(result.size());
Часто возвращает значение меньше 1000 из-за гонок
Правильные альтернативы: использовать collect(Collectors.toList()), Collections.synchronizedList(...) или потокобезопасную коллекцию.
2) Ожидание упорядоченного вывода при forEach (потеря производительности):
stream.parallel().forEachOrdered(System.out::println);
Вывод упорядочен, но параллельность существенно снижена
3) Использование непоследовательно объединяющегося reduce: операторы reduce должны быть ассоциативны и без сторонних эффектов.
int bad = Stream.of(1,2,3,4).parallel().reduce(0, (a,b) -> a - b);
System.out.println(bad);
Результат непредсказуем или некорректен для параллельного выполнения
4) Неподходящие источники данных: Stream.iterate плохо распараллеливается, что ведёт к низкой производительности.
Stream.iterate(1, i -> i+1).limit(1000000).parallel().map(...)
Может работать медленно из-за невозможности эффективного разбиения
5) Блокирующие операции внутри параллельного потока: при I/O-bound задачах параллельный поток может не дать выигрыша и привести к увеличению времени из-за ожидания.
Изменения и эволюция в JVM
Ключевые моменты развития параллельных потоков:
- Введение Stream API и метода parallel() в Java 8 вместе с ForkJoinPool.commonPool.
- Дальнейшие версии JVM включали оптимизации ForkJoinPool и улучшения производительности встроенных коллекций и коллекторов, которые косвенно повлияли на эффективность параллельных потоков.
- Добавление новых коллекторов и улучшение характеристик collectors в более поздних релизах позволило эффективнее собирать результаты при параллельной работе (например, groupingByConcurrent, улучшения в toArray и т.д.).
- Практика рекомендует контролировать общие ресурсы и при необходимости использовать собственные ForkJoinPool, так как общий пул может быть занят и повлиять на другие параллельные операции.
Конкретных изменений в сигнатуре parallel() не было; эволюция касалась реализации, оптимизаций и сопутствующих API.
Расширенные и редкие сценарии применения
Несколько продвинутых примеров с пояснениями.
1) Использование groupingByConcurrent для конкурентного группирования:
List words = Arrays.asList("one","two","three","four","five","six");
Map> byLen = words.parallelStream()
.collect(Collectors.groupingByConcurrent(String::length));
System.out.println(byLen);
{3=[one, two, six], 4=[four, five], 5=[three]}
Пояснение: groupingByConcurrent собирает элементы конкурентно без глобальной блокировки, что даёт прирост для больших наборов.
2) Запуск параллельного потока в отдельном пуле, чтобы не блокировать commonPool:
ForkJoinPool custom = new ForkJoinPool(8);
try {
custom.submit(() ->
IntStream.range(1, 1000000).parallel().forEach(i -> doCpuWork(i))
).get();
} finally { custom.shutdown(); }
Выполнение использует custom пул с 8 потоками вместо общего
Пояснение: полезно при интеграции с кодом, который тоже использует commonPool, чтобы избежать взаимной блокировки.
3) Правильный вариант мутабельного редьюса с Collector.of и характеристикой CONCURRENT:
Collector>> c =
Collector.of(
ConcurrentHashMap>::new,
(map, v) -> map.computeIfAbsent(v%10, k -> Collections.synchronizedList(new ArrayList<>())).add(v),
(m1, m2) -> { m2.forEach((k,v) -> m1.merge(k, v, (a,b)->{ a.addAll(b); return a;})); return m1; },
Collector.Characteristics.CONCURRENT
);
ConcurrentMap> grouped = IntStream.range(0, 10000).parallel().boxed().collect(c);
System.out.println(grouped.size());
10
Пояснение: создание собственного коллектора даёт контроль над процессом и позволяет максимально использовать параллелизм.
4) Проблемы с источниками, плохо разбиваемыми для параллели: замена Stream.iterate на LongStream.range для лучшего разделения:
// Плохо
Stream.iterate(1L, i -> i + 1).limit(10_000_000).parallel().map(x -> x*x).count();
// Лучше
LongStream.rangeClosed(1, 10_000_000).parallel().map(x -> x*x).count();
LongStream.rangeClosed быстрее на многопроцессорных системах
5) Nested parallelism и ограничение commonPool: запуск parallel внутри parallel может привести к чрезмерному созданию задач и деградации. Рекомендация: контролировать уровни параллелизма или использовать собственные пулы для верхнего уровня.
6) Профилирование: пример замера времени выполнения последовательного и параллельного потоков для CPU-bound задачи:
long t1 = System.nanoTime();
long s1 = LongStream.rangeClosed(1, 5_000_000).map(x -> x*x).sum();
long dur1 = System.nanoTime() - t1;
long t2 = System.nanoTime();
long s2 = LongStream.rangeClosed(1, 5_000_000).parallel().map(x -> x*x).sum();
long dur2 = System.nanoTime() - t2;
System.out.println("seq=" + dur1/1_000_000 + "ms par=" + dur2/1_000_000 + "ms");
seq=XXXms par=YYYms (XXX и YYY зависят от машины; для CPU-bound часто par < seq)
Пояснение: реальные числа зависят от числа ядер, кэширования и накладных расходов на разбиение/объединение.