Collection.parallelStream(): примеры (JAVA)
Collection.parallelStream(): StreamОписание и сигнатура
Метод parallelStream() определён как default-метод в интерфейсе java.util.Collection (начиная с Java 8). Сигнатура выглядит так:
default StreamparallelStream()
Возвращает параллельный поток (Stream<E>), готовый к выполнению операций в параллельных задачах. У метода нет параметров. Поток использует общий пул задач ForkJoinPool.commonPool() для выполнения параллельных задач, если не переопределён механизм запуска.
Ключевые свойства и поведение:
- Возвращаемый тип:
Stream<E>. Поток имеет параллельную модальност (isParallel() == true). - Аргументы: отсутствуют. Вызывается на экземпляре коллекции; вызов на
nullприведёт кNullPointerException. - Используется для распараллеливания операций на элементах коллекции с целью ускорения CPU-bound вычислений или для эффективного использования нескольких ядер процессора.
- Порядок обработки зависит от источника и операций: встречающийся порядок (encounter order) по умолчанию обычно сохраняется для упорядоченных коллекций (например,
List), но некоторые терминальные операции (например,forEach) не гарантируют порядок при параллельном выполнении; альтернативы:forEachOrdered. - Параллельные стримы подходят, когда операции являются ассоциативными и нечувствительны к состоянию (без побочных эффектов) или используют потокобезопасные структуры.
- Пул потоков: используется
ForkJoinPool.commonPool(); его параллелизм можно настроить через системное свойство-Djava.util.concurrent.ForkJoinPool.common.parallelismили с помощью собственногоForkJoinPool(см. примеры продвинутого использования). - Производительность: накладные расходы на управление задачами делают параллельные стримы неэффективными для очень маленьких коллекций или лёгких операций.
Важно учитывать потокобезопасность. Операции с изменяемым общим состоянием должны быть синхронизированы или заменены на безопасные альтернативы (например, Collectors.toConcurrentMap, LongAdder и т. п.).
Короткие примеры применения
Пример 1: суммирование чисел в параллельном потоке
List nums = Arrays.asList(1, 2, 3, 4, 5);
int sum = nums.parallelStream().mapToInt(Integer::intValue).sum();
System.out.println(sum);
15
Пример 2: forEach против forEachOrdered
List list = IntStream.rangeClosed(1, 10).boxed().collect(Collectors.toList());
list.parallelStream().forEach(i -> System.out.print(i + " "));// порядок не гарантирован
System.out.println();
list.parallelStream().forEachOrdered(i -> System.out.print(i + " ")); // порядок сохранён
(пример вывода первой строки может быть:) 7 2 9 5 1 10 3 8 4 6 1 2 3 4 5 6 7 8 9 10
Пример 3: findAny в параллельном потоке
Optional any = nums.parallelStream().filter(i -> i % 2 == 0).findAny();
System.out.println(any.orElse(-1));
2 (или 4 - результат может отличаться)
Пример 4: небезопасный побочный эффект (гонка данных)
List result = new ArrayList<>();
nums.parallelStream().forEach(result::add); // ArrayList не потокобезопасен
System.out.println(result.size());
(возможный результат: число < 5, поведение непредсказуемо, возможен ConcurrentModificationException)
Пример 5: безопасный параллельный сбор в потокобезопасную коллекцию
List safe = nums.parallelStream().collect(Collectors.toList());
System.out.println(safe);
[1, 2, 3, 4, 5]
Похожие механизмы в Java
Collection.stream()- создаёт последовательный поток. Предпочтителен при упрощённой логике и когда порядок важен.stream().parallel()- переводит уже созданный поток в параллельный. Эквивалент вызоваparallelStream(), но удобен при динамическом выборе режима.StreamSupport.stream(spliterator, true)- явное создание параллельного потока изSpliterator, даёт контроль над источником данных.ForkJoinPoolиExecutorService- низкоуровневые средства управления задачами; предпочтительны при необходимости детального управления пулом и его политиками.- Для конкурентных сборщиков:
Collectors.toConcurrentMap,groupingByConcurrent- позволяют эффективно собирать результаты в многопоточной среде.
Выбор зависит от требований: простота и читаемость - parallelStream(), детальный контроль над исполнением - собственный ForkJoinPool или ExecutorService.
Альтернативы в других языках
Краткое сравнение и примеры.
C# (PLINQ)
var res = Enumerable.Range(1, 10).AsParallel().Select(i => i * 2).ToArray();
Console.WriteLine(string.Join(",", res));
2,4,6,8,10,12,14,16,18,20
PLINQ предоставляет удобный способ распараллеливания LINQ-запросов, управление степенью параллелизма через WithDegreeOfParallelism.
Python
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as ex:
res = list(ex.map(lambda x: x*x, range(1,11)))
print(res)
[1,4,9,16,25,36,49,64,81,100]
Глобальная блокировка интерпретатора (GIL) делает multiprocessing или ProcessPoolExecutor предпочтительными для CPU-bound задач.
JavaScript (Node.js / браузер)
// Node.js: worker_threads (упрощённо)
// В браузере: Web Workers
Promise.all(items.map(i => doAsync(i))).then(results => console.log(results));
[ /* результаты асинхронной обработки */ ]
JS не имеет встроенного CPU-параллелизма в одном потоке; для параллельной CPU-работы применяются воркеры.
Go
var wg sync.WaitGroup
results := make(chan int, 10)
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(n int){ defer wg.Done(); results <- n*n }(i)
}
go func(){ wg.Wait(); close(results) }()
for r := range results { fmt.Println(r) }
(вывод квадратов в произвольном порядке)
Go предлагает лёгкий механизм параллелизма без необходимости в пуле потоков.
Kotlin
val res = (1..10).toList().parallelStream().map { it * 2 }.toList()
println(res)
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
Kotlin на JVM может использовать Java-параллельные стримы. Также доступны корутины для асинхронной неблокирующей работы.
В целом, подходы в других языках часто предлагают более явный контроль над пулом задач или используют процессы вместо потоков (Python). Отличия от Java: наличие/отсутствие глобальных блокировок, модель памяти, удобство управления параллелизмом и доступность высокоуровневых операторов в стиле stream/linq.
Типичные ошибки и примеры
1) Побочные эффекты с небезопасными коллекциями
List nums = IntStream.rangeClosed(1, 1000).boxed().collect(Collectors.toList());
List out = new ArrayList<>();
nums.parallelStream().forEach(out::add);
System.out.println(out.size());
(возможный вывод: число меньше 1000 или ConcurrentModificationException)
Причина: ArrayList не потокобезопасен. Решение: собирать через collect(Collectors.toList()) или использовать потокобезопасные структуры.
2) Несоответствующий редуктор (неассоциативная операция)
int res = Arrays.asList(1,2,3,4).parallelStream().reduce(0, (a,b) -> a - b);
System.out.println(res);
(результат может быть неожиданным, например -10)
Причина: оператор вычитания не ассоциативен; для параллельного reduce требуется ассоциативная и совместимая с combiner операция.
3) Ожидание порядка при использовании forEach
list.parallelStream().forEach(System.out::println);
(вывод в произвольном порядке)
Если важен порядок, следует использовать forEachOrdered или сохранять порядок при сборе.
4) Изменение коллекции во время стрима
List l = new ArrayList<>(Arrays.asList(1,2,3));
l.parallelStream().forEach(i -> l.remove(i));
ConcurrentModificationException (возможен)
5) Неподходящий выбор при малых объёмах данных
Для небольших коллекций накладные расходы на распараллеливание могут привести к худшей производительности по сравнению с последовательными потоками.
Изменения и эволюция
Метод parallelStream() был введён в Java 8 вместе со Streams API как default-метод интерфейса Collection. Сама сигнатура и поведение метода практически не менялись в последующих релизах JVM.
Сопутствующие изменения, влияющие на параллельные потоки:
- Улучшения в
ForkJoinPool(разные релизы Java вносили оптимизации в планировщик и производительность work-stealing). - Добавление новых коллекторов и
CollectorsAPI в более поздних версиях улучшило возможности для конкурентного сбора (toConcurrentMap,groupingByConcurrent). - В будущем архитектурные изменения (например, Project Loom) могут изменить подход к конкурентности на JVM, но непосредственных изменений в API
parallelStream()не было.
Таким образом, API стабилен, но окружение исполнения (пулы задач и сопутствующие утилиты) эволюционирует.
Продвинутые и менее распространённые сценарии
Пример 1: запуск параллельного потока в собственном ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(8);
List data = IntStream.rangeClosed(1, 1_000_000).boxed().collect(Collectors.toList());
try {
List res = customPool.submit(() ->
data.parallelStream().map(i -> i * 2).collect(Collectors.toList())
).get();
System.out.println(res.size());
} catch (Exception e) { e.printStackTrace(); }
customPool.shutdown();
1000000
Пояснение: прямое изменение ForkJoinPool.commonPool() нежелательно в библиотеке. Использование собственного пула позволяет изолировать нагрузку.
Пример 2: groupingByConcurrent для масштабируемой группировки
Map counts = IntStream.range(0, 10000).boxed()
.parallelStream()
.collect(Collectors.groupingByConcurrent(i -> i % 10, Collectors.counting()));
System.out.println(counts.get(3));
1000
Пояснение: groupingByConcurrent позволяет объединять результаты эффективно в многопоточном режиме.
Пример 3: использование LongAdder вместо AtomicInteger для счётчика
LongAdder adder = new LongAdder();
IntStream.range(0, 1000000).parallel().forEach(i -> adder.increment());
System.out.println(adder.sum());
1000000
Пояснение: LongAdder уменьшает конкуренцию при высокой нагрузке по сравнению с AtomicInteger.
Пример 4: корректный редуцирующий комбайнер
// Корректный параллельный reduce для суммы квадратов
long sumSq = LongStream.rangeClosed(1, 1000)
.parallel()
.reduce(0L, (a,b) -> a + b*b, (a,b) -> a + b);
System.out.println(sumSq);
(число: сумма квадратов от 1 до 1000, например 333833500)
Пример 5: измерение и сравнение производительности
List list = IntStream.rangeClosed(1, 5_000_000).boxed().collect(Collectors.toList());
long t1 = System.nanoTime();
long s1 = list.stream().mapToLong(i -> heavy(i)).sum();
long seq = System.nanoTime() - t1;
long t2 = System.nanoTime();
long s2 = list.parallelStream().mapToLong(i -> heavy(i)).sum();
long par = System.nanoTime() - t2;
System.out.println("seq ms=" + (seq/1_000_000) + " par ms=" + (par/1_000_000));
// heavy - CPU-натяжная функция
seq ms=... par ms=... (в зависимости от нагрузки и числа ядер)
Пояснение: для CPU-bound задач и больших объёмов данных параллельный поток обычно быстрее, но нужно измерять в целевой среде.
Пример 6: безопасное форматирование дат в многопоточной среде
DateTimeFormatter fmt = DateTimeFormatter.ISO_LOCAL_DATE;
List out = IntStream.rangeClosed(1, 100)
.parallel()
.mapToObj(i -> LocalDate.now().format(fmt))
.collect(Collectors.toList());
System.out.println(out.size());
100
Пояснение: вместо небезопасного SimpleDateFormat предпочтительны неизменяемые форматтеры из java.time.
Пример 7: использование unordered() для оптимизации
long count = list.parallelStream().unordered().filter(x -> x % 2 == 0).count();
System.out.println(count);
(число чётных элементов)
Пояснение: снятие требований порядка может помочь оптимизации выполнения в некоторых случаях.
джава Collection.parallelStream() function comments
- джава Collection.parallelStream() - аргументы и возвращаемое значение
- Функция java Collection.parallelStream() - описание
- Collection.parallelStream() - примеры
- Collection.parallelStream() - похожие методы на java
- Collection.parallelStream() на javascript, c#, python, php
- Collection.parallelStream() изменения java
- Примеры Collection.parallelStream() на джава