Executors.newFixedThreadPool: примеры (JAVA)

Пул фиксированного размера для задач в Java
Раздел: Многопоточность, Исполнители
Executors.newFixedThreadPool(int nThreads): ExecutorService

Общее описание

Метод Executors.newFixedThreadPool() создает пул потоков фиксированного размера и возвращает объект типа ExecutorService. Часто применяется для выполнения большого числа коротких задач с ограничением на одновременное количество потоков: фиксированное число рабочих потоков выполняет задачи, остальные задачи ставятся в очередь.

Сигнатуры:

  • ExecutorService newFixedThreadPool(int nThreads) - создает пул с nThreads рабочих потоков.
  • ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) - то же самое, но используется заданная фабрика потоков для создания потоков.

Аргументы и возврат:

  • nThreads - целое число > 0. При передаче <= 0 будет выброшено IllegalArgumentException.
  • threadFactory - реализация ThreadFactory, позволяющая задать имена потоков, сделать их daemon, установить UncaughtExceptionHandler и т. п. Если не задана, используется Executors.defaultThreadFactory().
  • Возвращаемое значение - ExecutorService. На практике это экземпляр ThreadPoolExecutor с конкретными настройками (см. ниже).

Реализация и поведение (типичные внутренние установки):

  • corePoolSize = maximumPoolSize = nThreads.
  • Очередь задач - LinkedBlockingQueue<Runnable> без установленного лимита (по сути, практически неограниченная очередь).
  • keepAliveTime = 0 (потоки простоя не убиваются).
  • RejectedExecutionHandler - по умолчанию ThreadPoolExecutor.AbortPolicy, но при неограниченной очереди отклонение задач маловероятно, пока не закончится память.
  • Пулы такого типа удобны, когда требуется ограничить количество одновременно работающих потоков, но допускается накапливание задач в очереди.

Последствия выбора этой реализации:

  • Если все потоки заняты, новые задачи помещаются в очередь, а не создаются новые потоки.
  • Из-за неограниченной очереди возможно накопление большого количества задач и исчерпание памяти при неадекватном проектировании.
  • Пул не масштабируется автоматически выше заданного размера.

Краткие рекомендации:

  • Использовать при предсказуемом конкарентном уровне и когда важен контроль числа реальных системных потоков.
  • Избегать для задач, которые могут длительно блокироваться, если отсутствует механизм защиты от накопления задач (например, лимитированная очередь и обработка отклонений).

Короткие примеры использования

Пример 1. Простейший пул и запуск Runnable

import java.util.concurrent.*;

public class Example1 {
    public static void main(String[] args) {
        ExecutorService ex = Executors.newFixedThreadPool(2);
        ex.submit(() -> System.out.println("Task 1 executed by " + Thread.currentThread().getName()));
        ex.submit(() -> System.out.println("Task 2 executed by " + Thread.currentThread().getName()));
        ex.shutdown();
    }
}
Ожидаемый вывод (имена потоков могут отличаться):
Task 1 executed by pool-1-thread-1
Task 2 executed by pool-1-thread-2

Пример 2. Callable и Future

import java.util.concurrent.*;

public class Example2 {
    public static void main(String[] args) throws Exception {
        ExecutorService ex = Executors.newFixedThreadPool(2);
        Future<String> f = ex.submit(() -> {
            Thread.sleep(200);
            return "result";
        });
        System.out.println("Got: " + f.get());
        ex.shutdown();
    }
}
Ожидаемый вывод:
Got: result

Пример 3. Указание ThreadFactory для имен потоков

import java.util.concurrent.*;

public class Example3 {
    public static void main(String[] args) {
        ThreadFactory tf = r -> {
            Thread t = new Thread(r);
            t.setName("worker-" + t.getId());
            return t;
        };
        ExecutorService ex = Executors.newFixedThreadPool(2, tf);
        ex.submit(() -> System.out.println(Thread.currentThread().getName()));
        ex.shutdown();
    }
}
Ожидаемый вывод, пример:
worker-11

Пример 4. shutdownNow возвращает невыполненные задачи

import java.util.concurrent.*;
import java.util.List;

public class Example4 {
    public static void main(String[] args) throws Exception {
        ExecutorService ex = Executors.newFixedThreadPool(1);
        ex.submit(() -> { Thread.sleep(5000); return null; });
        ex.submit(() -> System.out.println("queued task"));
        List<Runnable> left = ex.shutdownNow();
        System.out.println("Pending: " + left.size());
    }
}
Ожидаемый вывод (количество может варьироваться):
Pending: 1

Похожие Java-решения

  • Executors.newCachedThreadPool() - пул с динамическим числом потоков. Создает новые потоки по мере необходимости и убивает неактивные через 60 секунд. Предпочтителен при множестве коротких независимых задач, число одновременно активных сильно варьируется.
  • Executors.newSingleThreadExecutor() - пул из одного потока, полезен для последовательного выполнения задач в однопоточном порядке.
  • newWorkStealingPool() (Java 8) - пул с work-stealing на базе ForkJoinPool, подходит для параллельных вычислений, исполняющихся рекурсивно.
  • ScheduledThreadPoolExecutor - для отложенного и периодического выполнения задач.
  • Ручная конфигурация ThreadPoolExecutor - при необходимости задать собственную очередь (например, ограниченную ArrayBlockingQueue), политику отклонения, временные параметры и т. п.

Выбор кратко:

  • Нужен фиксированный контроль числа системных потоков - выбирать newFixedThreadPool.
  • Нужна автоматическая масштабируемость - newCachedThreadPool, но осторожно с пиковыми нагрузками.
  • Для задач с высокой параллельностью и делением работы - рассмотреть ForkJoinPool или newWorkStealingPool.
  • Для расписаний - ScheduledThreadPoolExecutor.

Аналоги в других языках

  • Python: concurrent.futures.ThreadPoolExecutor(max_workers=n). Очень похожая семантика: задается число рабочих потоков, задачи ставятся в очередь. Пример:
    from concurrent.futures import ThreadPoolExecutor
    
    with ThreadPoolExecutor(max_workers=2) as ex:
        future = ex.submit(lambda: 'ok')
        print(future.result())
    
    ok
  • JavaScript (Node.js): модель событий однопоточная, но есть worker_threads для блочных задач и библиотеки-пулы, например Piscina. Пример с Piscina:
    // main.js (Node.js)
    const Piscina = require('piscina');
    const pool = new Piscina({ filename: './task.js', maxThreads: 2 });
    (async () => {
      const r = await pool.runTask({ value: 1 });
      console.log(r);
    })();
    
    (вывод зависит от task.js)
  • C#: ThreadPool и Task.Run. Для явного пула можно использовать ThreadPool или сторонние библиотеки. Пример с ThreadPool:
    using System;
    using System.Threading;
    
    ThreadPool.QueueUserWorkItem(_ => Console.WriteLine("Hello"));
    
    Hello
  • Go: легковесные горутины и каналы. Нет built-in fixed thread pool, но шаблон worker-pool реализуется просто:
    package main
    import (
      "fmt"
    )
    
    func worker(id int, jobs <-chan int, results chan<- int) {
      for j := range jobs {
        fmt.Println("worker", id, "job", j)
        results <- j * 2
      }
    }
    
    func main() {
      jobs := make(chan int, 5)
      results := make(chan int, 5)
      for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
      }
      for j := 1; j <= 5; j++ { jobs <- j }
      close(jobs)
      for a := 1; a <= 5; a++ { <-results }
    }
    
    (вывод с сообщениями от воркеров)
  • Kotlin: использовать корутины и Dispatchers.IO или получить CoroutineDispatcher из Executors.newFixedThreadPool(n).asCoroutineDispatcher(). Пример:
    // Kotlin
    val ex = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
    // использовать в launch/coroutineScope
    
    (передача задач корутинам)
  • PHP: нет стандартного пула потоков в ядре. Есть расширения, например pthreads (CLI) или parallel, а также очереди задач (RabbitMQ) и форки. Семантика и ограничения существенно отличаются.
  • Lua: через библиотеки типа Lua Lanes или реализация процессов; стандартная среда не предоставляет OS-потоки сразу.
  • SQL: модели параллелизма реализованы на стороне СУБД; явного аналога пула потоков приложений внутри SQL нет.

Ключевые отличия от Java-реализации: управление очередью и политиками отклонения, модель потоков и доступность нативных потоков. В языках с зеленых потоками или корутинами (Go, Kotlin, Python с async) подход отличается и может быть эффективнее для большого числа легковесных задач.

Типичные ошибки и их проявления

  • Отсутствие вызова shutdown(). После отправки задач JVM может не завершиться, поскольку рабочие потоки пула являются пользовательскими и блокируют завершение процесса. Пример:
ExecutorService ex = Executors.newFixedThreadPool(1);
ex.submit(() -> System.out.println("hello"));
// нет ex.shutdown();
Ожидаемого завершения процесса может не произойти - приложение останется запущенным.
  • Передача nThreads <= 0. Вызовет IllegalArgumentException. Пример:
ExecutorService ex = Executors.newFixedThreadPool(0);
java.lang.IllegalArgumentException: nThreads must be > 0
  • Накопление задач в очереди. Если задачи производят новые объекты и быстро накапливаются, можно столкнуться с OutOfMemoryError, поскольку очередь по умолчанию почти неограниченная.
// Псевдокод: бесконтрольно посылать задачи, каждая хранит большой объект
for (int i=0;i<Integer.MAX_VALUE;i++) ex.submit(() -> bigObjectList.add(new byte[10_000_000]));
Возможен java.lang.OutOfMemoryError: Java heap space
  • Блокирование потока вызывающего get() без таймаута. Если ждать результата навсегда, вызывающий поток может зависнуть при зависшей задаче. Лучше использовать таймауты или обработку InterruptedException.
Future<String> f = ex.submit(callable);
String s = f.get(); // может блокировать бесконечно
Зависание до завершения задачи; возможна потеря отклика.
  • Ожидание результата внутри задачи того же пула. Возможна взаимная блокировка, если все потоки пула заняты и каждая задача ждёт результата другой задачи в том же пуле.

Рекомендации: всегда вызывать shutdown/awaitTermination при завершении, ограничивать поступление задач или использовать ограниченные очереди и политику отклонения в ручной конфигурации, применять таймауты при ожидании Future.

Изменения и эволюция

Метод Executors.newFixedThreadPool доступен с ранних версий JDK (с Java 5) и в базовой логике не претерпел значительных изменений. Нет официальной депрецитации.

Важно отметить появление альтернативных моделей в новых версиях Java:

  • Fork/Join и newWorkStealingPool для параллельных вычислений.
  • Virtual threads (Project Loom) в JDK 19+ (preview / production в последующих релизах). Появились фабрики потоков для виртуальных потоков, например Executors.newVirtualThreadPerTaskExecutor(), которые существенно меняют подход к параллелизму для большого числа коротких блокирующих задач.

Поэтому при разработке новых систем стоит рассмотреть virtual threads там, где они доступны, но newFixedThreadPool сохраняет свою актуальность при необходимости строгого контроля числа платформенных потоков.

Расширенные и непривычные сценарии использования

1) Изменение размера пула во время выполнения (через приведение к ThreadPoolExecutor):

Пример java
ExecutorService ex = Executors.newFixedThreadPool(2);
ThreadPoolExecutor tpe = (ThreadPoolExecutor) ex;
System.out.println(tpe.getCorePoolSize()); // 2
tpe.setCorePoolSize(4); // динамическое увеличение
Вывод: 2 (до изменения). После setCorePoolSize пул будет иметь 4 рабочих потока.

2) Включение освобождения простых потоков (allowCoreThreadTimeOut):

Пример java
ThreadPoolExecutor tpe = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
tpe.allowCoreThreadTimeOut(true);
// после этого при ненулевом keepAliveTime простые потоки могут завершаться
По умолчанию keepAliveTime = 0, поэтому имеет смысл сначала изменить keepAliveTime при ручной конфигурации ThreadPoolExecutor.

3) Использование CompletionService для получения результатов по мере завершения:

Пример java
ExecutorService ex = Executors.newFixedThreadPool(4);
CompletionService<String> cs = new ExecutorCompletionService<>(ex);
for (int i=0;i<10;i++) cs.submit(() -> { Thread.sleep(100); return Thread.currentThread().getName(); });
for (int i=0;i<10;i++) System.out.println(cs.take().get());
ex.shutdown();
Вывод: имена потоков по мере завершения задач.

4) Интеграция с CompletableFuture для неблокирующей композиции:

Пример java
ExecutorService ex = Executors.newFixedThreadPool(4);
CompletableFuture.supplyAsync(() -> expensive(), ex)
    .thenApplyAsync(r -> transform(r), ex)
    .whenComplete((r, t) -> { if (t != null) t.printStackTrace(); else System.out.println(r); });
ex.shutdown();
Вывод: результат преобразования или стек ошибки при исключении.

5) Мониторинг пула для метрик и эвристик управления:

Пример java
ThreadPoolExecutor tpe = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
System.out.println("Active: " + tpe.getActiveCount());
System.out.println("Queued: " + tpe.getQueue().size());
Вывод: текущее количество активных потоков и длина очереди.

6) Защита от накопления задач: комбинирование ограниченной очереди и политики отклонения (пример ручной конфигурации вместо Executors):

Пример java
ThreadPoolExecutor custom = new ThreadPoolExecutor(
    4, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100),
    new ThreadPoolExecutor.CallerRunsPolicy());
Политика CallerRunsPolicy при перегрузке отправит выполнение обратно в вызывающий поток, снижая скорость подачи задач.

7) Применение в серверных приложениях: соединение пула с ограниченным числом DB-коннекций. Пример: если пул потоков больше числа доступных соединений к БД, появится блокирование и очереди - синхронизировать размеры.

8) Обработка исключений в задачах: назначение UncaughtExceptionHandler через ThreadFactory:

Пример java
ThreadFactory tf = r -> {
    Thread t = new Thread(r);
    t.setUncaughtExceptionHandler((th, ex) -> System.err.println("Uncaught: " + ex));
    return t;
};
ExecutorService ex = Executors.newFixedThreadPool(2, tf);
ex.submit(() -> { throw new RuntimeException("fail"); });
ex.shutdown();
Вывод: Uncaught: java.lang.RuntimeException: fail

9) Использование newFixedThreadPool для реализации сервера задач с приоритетной очередью: придется создавать ThreadPoolExecutor вручную и подставлять PriorityBlockingQueue, при этом нужно следить за сопоставлением типов задач.

10) Сравнение с виртуальными потоками (Project Loom): если задачи преимущественно блокируют, виртуальные потоки дают более простую модель и масштабируемость. Пример альтернативы (JDK с Loom):

Пример java
ExecutorService vEx = Executors.newVirtualThreadPerTaskExecutor();
vEx.submit(() -> blockingCall());
Виртуальные потоки позволяют запускать огромное количество блокирующих задач без большого числа платформенных потоков.

джава Executors.newFixedThreadPool function comments

En
Executors.newFixedThreadPool Создает пул потоков фиксированного размера