Конкурентное выполнение: какой модуль выбрать для потоков

Раздел: Продвинутые темы -> Конкурентность

Модули для многопоточности в Python: обзор решений

Как создать и запустить поток с помощью стандартного модуля threading?

Основной и наиболее распространённый модуль для многопоточности в Python - threading. Он предоставляет высокоуровневый интерфейс, основанный на классах. Создание потока выполняется через класс Thread.

import threading
import time

def worker(name, delay):
    print(f"{name} начал работу")
    time.sleep(delay)
    print(f"{name} закончил работу")

# Создание потока с аргументами
thread1 = threading.Thread(target=worker, args=("Поток-1", 2))
thread2 = threading.Thread(target=worker, args=("Поток-2", 1))

thread1.start()  # старт потока
thread2.start()

thread1.join()   # ожидание завершения
thread2.join()

print("Главный поток завершён")

какой модуль в python используется для многопоточности (какой модуль используется для многопоточности в python)

Поток-1 начал работу
Поток-2 начал работу
Поток-2 закончил работу
Поток-1 закончил работу
Главный поток завершён

Метод start() запускает поток, join() блокирует выполнение до его завершения. Для передачи данных используется args или kwargs.

Типичные ошибки:

  • Попытка вызвать run() вместо start() - поток выполнится синхронно в текущем потоке.
  • Отсутствие join() - программа может завершиться раньше, чем потоки успеют выполнить свою работу (особенно для демон-потоков).
  • Изменение общих данных без синхронизации - приводит к состояниям гонки (race condition). Решение: использовать Lock или RLock.

Как управлять пулом потоков с помощью concurrent.futures.ThreadPoolExecutor?

Модуль concurrent.futures предлагает более удобный интерфейс для работы с пулом потоков через класс ThreadPoolExecutor. Это избавляет от ручного управления жизненным циклом потоков.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def compute_square(n):
    time.sleep(0.5)
    return n * n

with ThreadPoolExecutor(max_workers=3) as executor:
    # Отправка задач
    futures = [executor.submit(compute_square, i) for i in range(5)]
    # Обработка результатов по мере завершения
    for future in as_completed(futures):
        print(f"Результат: {future.result()}")
Результат: 0
Результат: 1
Результат: 4
Результат: 9
Результат: 16

Метод submit() возвращает объект Future, результат доступен через result(). Функция map() позволяет применить функцию ко всем элементам итератора.

Типичные ошибки:

  • Необработанные исключения внутри потока - они пробрасываются только при вызове result(). Если не вызвать result(), исключение останется незамеченным.
  • Слишком большое значение max_workers может привести к избыточному потреблению ресурсов. Для задач, ограниченных вводом-выводом, оптимально количество ядер * 5.

Как создать поток минимальными средствами с помощью модуля _thread?

Низкоуровневый модуль _thread (ранее thread) предоставляет только базовую функцию start_new_thread(). Он существует для совместимости и не рекомендуется для нового кода, но может быть полезен в легаси.

import _thread
import time

def long_task(name):
    print(f"Поток {name} запущен")
    time.sleep(1)
    print(f"Поток {name} завершён")

# Запуск потока
_thread.start_new_thread(long_task, ("A",))
_thread.start_new_thread(long_task, ("B",))

time.sleep(1.5)  # задержка, чтобы дать потокам завершиться
print("Главный поток")
# Внимание: нет join(), программа завершается принудительно
Поток A запущен
Поток B запущен
Поток A завершён
Поток B завершён
Главный поток

Типичные ошибки:

  • Отсутствие синхронизации - модуль не предоставляет Lock, Condition и т.д. (они есть в threading).
  • При завершении главного потока все дочерние потоки убиваются принудительно, даже если они не закончили работу.
  • Трудно обрабатывать исключения - нет обратной связи об ошибках.

Как использовать интерфейс multiprocessing для потоков через multiprocessing.dummy?

Модуль multiprocessing.dummy предоставляет ту же абстракцию, что и multiprocessing, но использует потоки вместо процессов. Это удобно, если нужно переключиться между процессами и потоками без изменения кода.

from multiprocessing.dummy import Pool
import requests

def fetch_url(url):
    response = requests.get(url)
    return len(response.content)

urls = ["http://example.com", "http://httpbin.org/get"]

with Pool(2) as pool:
    results = pool.map(fetch_url, urls)
    print("Размеры ответов:", results)
Размеры ответов: [1256, 357]

Типичные ошибки:

  • Модуль multiprocessing.dummy плохо документирован и может быть удалён в будущих версиях. Рекомендуется использовать concurrent.futures.ThreadPoolExecutor.
  • Ложное ощущение, что код работает параллельно для CPU-задач - GIL ограничивает производительность.

Расширенные примеры работы с многопоточностью

Синхронизация потоков с помощью Event

Пример
import threading
import time

def waiter(event):
    print("Ожидание события...")
    event.wait()
    print("Событие получено, продолжаем")

def setter(event):
    time.sleep(1)
    print("Устанавливаем событие")
    event.set()

event = threading.Event()
thread1 = threading.Thread(target=waiter, args=(event,))
thread2 = threading.Thread(target=setter, args=(event,))

thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("Готово")
Ожидание события...
Устанавливаем событие
Событие получено, продолжаем
Готово

Безопасная передача данных через Queue

Пример
from queue import Queue
import threading
import time

def producer(queue):
    for i in range(5):
        item = f"Сообщение {i}"
        queue.put(item)
        print(f"Отправлено: {item}")
        time.sleep(0.2)

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:  # сигнал остановки
            break
        print(f"Получено: {item}")
        queue.task_done()

q = Queue()
prod = threading.Thread(target=producer, args=(q,))
cons = threading.Thread(target=consumer, args=(q,))

prod.start()
cons.start()

prod.join()
q.put(None)  # сигнал завершения потребителя
cons.join()
print("Все задачи завершены")
Отправлено: Сообщение 0
Получено: Сообщение 0
Отправлено: Сообщение 1
Получено: Сообщение 1
Отправлено: Сообщение 2
Получено: Сообщение 2
Отправлено: Сообщение 3
Получено: Сообщение 3
Отправлено: Сообщение 4
Получено: Сообщение 4
Все задачи завершены

Пул потоков с обработкой исключений и таймаутом

Пример
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def risky_task(seconds):
    time.sleep(seconds)
    if seconds > 2:
        raise ValueError("Слишком долго!")
    return f"Успех за {seconds} сек"

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(risky_task, t) for t in [1, 3, 0.5]]
    for future in futures:
        try:
            result = future.result(timeout=2.5)
            print(result)
        except TimeoutError:
            print("Таймаут")
        except ValueError as e:
            print(f"Ошибка: {e}")
Успех за 1 сек
Ошибка: Слишком долго!
Успех за 0.5 сек

Подкласс Thread с переопределением run

Пример
import threading
import random

class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        delay = random.uniform(0.5, 1.5)
        print(f"{self.name} засыпает на {delay:.2f} сек")
        threading.Event().wait(delay)
        print(f"{self.name} проснулся")

threads = [MyThread(f"Поток-{i}") for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print("Все потоки завершены")
Поток-0 засыпает на 0.87 сек
Поток-1 засыпает на 1.23 сек
Поток-2 засыпает на 0.95 сек
Поток-0 проснулся
Поток-2 проснулся
Поток-1 проснулся
Все потоки завершены

Использование Condition для синхронизации производителей и потребителей

Пример
import threading
import time

class Buffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.items = []
        self.cond = threading.Condition()

    def produce(self, item):
        with self.cond:
            while len(self.items) >= self.capacity:
                self.cond.wait()
            self.items.append(item)
            self.cond.notify()

    def consume(self):
        with self.cond:
            while not self.items:
                self.cond.wait()
            item = self.items.pop(0)
            self.cond.notify()
            return item

def producer(buf):
    for i in range(5):
        buf.produce(i)
        print(f"Произведено {i}")
        time.sleep(0.1)

def consumer(buf):
    for _ in range(5):
        item = buf.consume()
        print(f"Потреблено {item}")
        time.sleep(0.15)

buf = Buffer(3)
t1 = threading.Thread(target=producer, args=(buf,))
t2 = threading.Thread(target=consumer, args=(buf,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Работа завершена")
Произведено 0
Потреблено 0
Произведено 1
Произведено 2
Потреблено 1
Произведено 3
Потреблено 2
Произведено 4
Потреблено 3
Потреблено 4
Работа завершена

Какой модуль используется для многопоточности в Python - comments

En
какой модуль в python используется для многопоточности (python)