Arrays.stream().parallel: примеры (JAVA)

Параллельные потоки: Arrays.stream().parallel() в деталях
Раздел: Параллельные стримы
Arrays.stream().parallel(T[] array): Stream

Общее описание Arrays.stream().parallel()

В Java комбинация Arrays.stream(...) и последующий вызов .parallel() создаёт поток (Stream, IntStream, LongStream или DoubleStream) с включённым параллельным режимом выполнения. Метод Arrays.stream имеет несколько перегрузок: для массивов объектов и для примитивных массивов int[], long[], double[], а также перегрузку с указанием границ Arrays.stream(T[] array, int startInclusive, int endExclusive). Метод parallel() объявлен в интерфейсе Stream и не принимает аргументов.

Возвращаемое значение:

  • Для объекта T[] возвращается Stream<T>.
  • Для int[] возвращается IntStream, для long[] - LongStream, для double[] - DoubleStream.
  • Вызов parallel() возвращает тот же тип потока, но с флагом параллельности; последовательный вызов sequential() снимает этот флаг.

Ключевые характеристики поведения:

  • Параллельный поток распараллеливает промежуточные и терминальные операции с использованием общего пула задач ForkJoinPool.commonPool(), если не указан собственный пул.
  • Порядок элементов (encounter order) сохраняется для упорядоченных источников, если не применены операции, снимающие упорядоченность (unordered()) или если используются нерегламентированные терминальные операции (forEach без forEachOrdered).
  • Некоторые операции при параллельной обработке требуют ассоциативных и неблокирующих функций (например, для reduce нужен ассоциативный аккумулятор и корректный компайнер).
  • При использовании параллельных потоков важно учитывать потокобезопасность используемых в лямбда-выражениях структур данных и побочных эффектов.
  • Производительность зависит от размера и характера задач; для небольших наборов данных накладные расходы на распараллеливание могут перевесить выигрыш.

Типичные сценарии применения: CPU-bound вычисления над большими наборами элементов, параллельные агрегации, трансформации коллекций и агрегации с помощью конкурентных коллекций.

Примеры базового использования

Несколько коротких примеров для разных типов массивов и операций.

Пример 1: параллельный перебор объектов (порядок не гарантируется при forEach)

import java.util.Arrays;

public class Ex1 {
    public static void main(String[] args) {
        String[] arr = {"a", "b", "c", "d", "e"};
        Arrays.stream(arr).parallel().forEach(s -> System.out.println(s + " thread:" + Thread.currentThread().getName()));
    }
}
Возможный вывод (порядок может быть другим):
a thread:ForkJoinPool.commonPool-worker-1
c thread:ForkJoinPool.commonPool-worker-3
b thread:ForkJoinPool.commonPool-worker-2
d thread:ForkJoinPool.commonPool-worker-5
e thread:main

Пример 2: суммирование целых чисел через IntStream

import java.util.Arrays;

public class Ex2 {
    public static void main(String[] args) {
        int[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        int sum = Arrays.stream(nums).parallel().sum();
        System.out.println(sum);
    }
}
55

Пример 3: сбор в список с сохранением порядка с помощью forEachOrdered

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class Ex3 {
    public static void main(String[] args) {
        String[] arr = {"one", "two", "three", "four"};
        List list = Arrays.stream(arr).parallel()
                .map(String::toUpperCase)
                .collect(Collectors.toList());
        System.out.println(list);

        // forEachOrdered обеспечивает порядок при выводе
        Arrays.stream(arr).parallel().forEachOrdered(System.out::println);
    }
}
[ONE, TWO, THREE, FOUR]
one
two
three
four

Пример 4: переключение обратно в последовательный режим

Arrays.stream(new int[]{1,2,3,4}).parallel().map(i -> i*2).sequential().forEach(System.out::println);
2
4
6
8

Похожие Java-подходы и их отличия

  • Collection.parallelStream(): удобен для коллекций (List, Set). Работает по принципу того же параллельного флага; предпочтителен при наличии коллекции, а не массива.
  • StreamSupport.stream(spliterator, true): создание параллельного потока из произвольного Spliterator; полезен при необходимости точного контроля над источником данных и его сплиттером.
  • Arrays.parallelSort(): специализированный метод для параллельной сортировки массивов с использованием Fork/Join, эффективнее общего подхода через поток для сортировки.
  • ForkJoinPool / RecursiveTask: означает более низкоуровневый контроль над распараллеливанием задач; применяется, когда требуется тонкая настройка алгоритма разделения работы.
  • CompletableFuture: удобен для параллельных независимых задач с управлением зависимостями; лучше подходит для асинхронных операций и сети/IO.

Выбор зависит от задачи: для простых трансформаций больших коллекций удобны параллельные потоки; для алгоритмов с собственными стратегиями разделения нагрузки - ForkJoin; для асинхронных зависимых задач - CompletableFuture.

Аналоги в других языках и их особенности

Краткие сопоставления с примерами.

  • C# (PLINQ): метод AsParallel() на Enumerable. Похож по поведению: использует параллельную обработку и предоставляет средства сохранения порядка (AsOrdered()).
    // C#
    using System.Linq;
    var arr = new[]{1,2,3,4,5};
    var sum = arr.AsParallel().Sum();
    System.Console.WriteLine(sum);
    15
  • Python: нет встроенного точного аналога. Используются concurrent.futures (ThreadPoolExecutor/ProcessPoolExecutor) или multiprocessing.Pool. Для численных данных чаще применяется numpy, который векторизует операции и использует C-реализацию.
    # Python
    from concurrent.futures import ProcessPoolExecutor
    arr = list(range(1,11))
    with ProcessPoolExecutor() as ex:
        res = sum(ex.map(lambda x: x, arr))
    print(res)
    55
  • JavaScript: в браузере и Node.js параллелизм достигается через Web Workers или worker_threads; нет прозрачного потокового API, похожего на Java Stream.
    // Node.js (worker_threads) -- схематично
    // Главный поток делит работу на воркеры и собирает результаты
    Результат зависит от реализации; прямого аналога нет
  • Go: примитивы параллелизма - горутины и каналы. Параллельная обработка массивов делается вручную с разделением диапазонов.
    // Go
    package main
    import (
        "fmt"
    )
    func main(){
        arr := []int{1,2,3,4,5}
        // простая последовательная агрегация; для параллельной работы делят диапазон на сегменты
        sum := 0
        for _, v := range arr { sum += v }
        fmt.Println(sum)
    }
    
    15
  • PHP: отсутствует встроенный потоковый параллелизм для коллекций; доступны расширения (pthreads, parallel) либо внешние процессы / очереди.
  • Kotlin: использует те же Java Streams при необходимости, но более идиоматичны корутины (kotlinx.coroutines) для асинхронности; нет прямого аналога параллельных stream, предпочтение отдаётся корутинам для IO-bound задач.
  • Lua: сторонние библиотеки (Lanes) для параллелизма.
  • SQL: параллелизм операций выполняется на стороне СУБД автоматически; пользовательские аналоги нет.

В целом отличия: Java Streams предоставляют декларативный и высокоуровневый API, многие другие языки требуют явного управления потоками/воркерами или предлагают иные парадигмы (корутины, асинхронность, векторизацию).

Типичные ошибки при использовании параллельных потоков

Ниже перечислены распространённые ошибки с примерами и результатом.

Ошибка 1: модификация небезопасной коллекции из параллельного потока

import java.util.*;
import java.util.stream.*;

public class Race {
    public static void main(String[] args) {
        List result = new ArrayList<>();
        IntStream.rangeClosed(1, 1000).parallel().forEach(i -> result.add(i));
        System.out.println(result.size());
    }
}
Вывод часто меньше 1000 или может возникнуть ConcurrentModificationException; поведение непредсказуемо.

Комментарий: проблема в неконкурентной структуре ArrayList.

Ошибка 2: неверное использование reduce без ассоциативности

import java.util.Arrays;

public class BadReduce {
    public static void main(String[] args) {
        Integer[] arr = {1,2,3,4};
        Integer r = Arrays.stream(arr).parallel().reduce(0, (a,b) -> a - b);
        System.out.println(r);
    }
}
Результат может отличаться от ожиданий из-за неассоциативной операции вычитания. Например: -10

Ошибка 3: ожидание порядка при использовании forEach

Arrays.stream(new String[]{"a","b","c"}).parallel().forEach(System.out::println);
Порядок вывода не гарантируется; если порядок важен, следует использовать forEachOrdered.

Ошибка 4: чрезмерное распараллеливание мелких задач - деградация производительности. Микрозадачи создают накладные расходы на организацию потоков.

Изменения и эволюция

API потоков и метод parallel() появились в Java 8. С течением времени спецификация Stream не претерпевала значительных изменений в поведении параллельных потоков: параллельность по-прежнему реализована поверх ForkJoinPool.commonPool(). В более новых релизах JDK вносились улучшения производительности и исправления ошибок в реализации Spliterator и оптимизациях отдельных операций.

Некоторые сопутствующие возможности появились или стали более гибкими:

  • Возможность изменения уровня параллелизма общего пула через системное свойство java.util.concurrent.ForkJoinPool.common.parallelism.
  • Появление альтернативных инструментов параллелизма (CompletableFuture, Project Loom - виртуальные потоки) даёт новые опции построения конкурентных решений, но непосредственное поведение parallel() без изменений.

Продвинутые и редко встречающиеся варианты использования

Несколько расширенных примеров с пояснениями.

Пример A: запуск параллельного потока в собственном ForkJoinPool для контроля числа потоков

Пример java
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;

public class CustomPool {
    public static void main(String[] args) throws Exception {
        ForkJoinPool pool = new ForkJoinPool(2); // ограничение параллелизма
        int[] arr = new int[1000];
        for (int i = 0; i < arr.length; i++) arr[i] = i + 1;

        int sum = pool.submit(() -> Arrays.stream(arr).parallel().sum()).get();
        System.out.println(sum);
    }
}
500500

Пояснение: submit запускает задачу в указанном пуле; без такой обёртки поток использовал бы commonPool.

Пример B: использование concurrent-коллектора для безопасного сбора результатов

Пример java
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public class ConcurrentCollect {
    public static void main(String[] args) {
        String[] words = {"apple","banana","apricot","blueberry","avocado"};
        Map counts = Arrays.stream(words).parallel()
                .collect(Collectors.groupingByConcurrent(s -> s.charAt(0), Collectors.counting()));
        System.out.println(counts);
    }
}
{a=3, b=2}

Пояснение: groupingByConcurrent даёт конкурентный мап для параллельной агрегации.

Пример C: параллельный reduce с корректным компайнером

Пример java
import java.util.Arrays;

public class ProperReduce {
    public static void main(String[] args) {
        Integer[] arr = {1,2,3,4,5,6};
        int prod = Arrays.stream(arr).parallel().reduce(1, (a,b) -> a * b, (a,b) -> a * b);
        System.out.println(prod);
    }
}
720

Пояснение: при параллельном reduce используется три-аргументная версия с combiner. Функции должны быть ассоциативными и корректно комбинируемыми.

Пример D: использование unordered() для повышения производительности при агрегации

Пример java
import java.util.Arrays;
import java.util.stream.Collectors;

public class UnorderedExample {
    public static void main(String[] args) {
        String[] data = new String[1000000];
        Arrays.fill(data, "x");
        long count = Arrays.stream(data).parallel().unordered().filter(s -> s.equals("x")).count();
        System.out.println(count);
    }
}
1000000

Пояснение: вызов unordered() позволяет оптимизировать перераспределение работы, когда порядок не важен.

Пример E: обработка потенциально бесконечного потока с параллельными операциями - предупреждение

Пример java
// Не рекомендуется: параллелизация бесконечного потока без явных лимитов
java.util.stream.Stream.generate(Math::random).parallel().limit(1000).forEach(System.out::println);
Выведет 1000 случайных чисел, но производительность и поведение зависят от того, как источник сплиттится.

Пояснение: при работе с бесконечными или трудно сплиттящимися источниками параллелизация может быть неэффективна.

джава Arrays.stream().parallel function comments

En
Arrays.stream().parallel Создает параллельный поток из массива