Конкурентное выполнение: какой модуль выбрать для потоков
Модули для многопоточности в Python: обзор решений
Как создать и запустить поток с помощью стандартного модуля threading?
Основной и наиболее распространённый модуль для многопоточности в Python - threading. Он предоставляет высокоуровневый интерфейс, основанный на классах. Создание потока выполняется через класс Thread.
import threading
import time
def worker(name, delay):
print(f"{name} начал работу")
time.sleep(delay)
print(f"{name} закончил работу")
# Создание потока с аргументами
thread1 = threading.Thread(target=worker, args=("Поток-1", 2))
thread2 = threading.Thread(target=worker, args=("Поток-2", 1))
thread1.start() # старт потока
thread2.start()
thread1.join() # ожидание завершения
thread2.join()
print("Главный поток завершён")какой модуль в python используется для многопоточности (какой модуль используется для многопоточности в python)
Поток-1 начал работу Поток-2 начал работу Поток-2 закончил работу Поток-1 закончил работу Главный поток завершён
Метод start() запускает поток, join() блокирует выполнение до его завершения. Для передачи данных используется args или kwargs.
Типичные ошибки:
- Попытка вызвать run() вместо start() - поток выполнится синхронно в текущем потоке.
- Отсутствие join() - программа может завершиться раньше, чем потоки успеют выполнить свою работу (особенно для демон-потоков).
- Изменение общих данных без синхронизации - приводит к состояниям гонки (race condition). Решение: использовать Lock или RLock.
Как управлять пулом потоков с помощью concurrent.futures.ThreadPoolExecutor?
Модуль concurrent.futures предлагает более удобный интерфейс для работы с пулом потоков через класс ThreadPoolExecutor. Это избавляет от ручного управления жизненным циклом потоков.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def compute_square(n):
time.sleep(0.5)
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
# Отправка задач
futures = [executor.submit(compute_square, i) for i in range(5)]
# Обработка результатов по мере завершения
for future in as_completed(futures):
print(f"Результат: {future.result()}")
Результат: 0 Результат: 1 Результат: 4 Результат: 9 Результат: 16
Метод submit() возвращает объект Future, результат доступен через result(). Функция map() позволяет применить функцию ко всем элементам итератора.
Типичные ошибки:
- Необработанные исключения внутри потока - они пробрасываются только при вызове result(). Если не вызвать result(), исключение останется незамеченным.
- Слишком большое значение max_workers может привести к избыточному потреблению ресурсов. Для задач, ограниченных вводом-выводом, оптимально количество ядер * 5.
Как создать поток минимальными средствами с помощью модуля _thread?
Низкоуровневый модуль _thread (ранее thread) предоставляет только базовую функцию start_new_thread(). Он существует для совместимости и не рекомендуется для нового кода, но может быть полезен в легаси.
import _thread
import time
def long_task(name):
print(f"Поток {name} запущен")
time.sleep(1)
print(f"Поток {name} завершён")
# Запуск потока
_thread.start_new_thread(long_task, ("A",))
_thread.start_new_thread(long_task, ("B",))
time.sleep(1.5) # задержка, чтобы дать потокам завершиться
print("Главный поток")
# Внимание: нет join(), программа завершается принудительно
Поток A запущен Поток B запущен Поток A завершён Поток B завершён Главный поток
Типичные ошибки:
- Отсутствие синхронизации - модуль не предоставляет Lock, Condition и т.д. (они есть в threading).
- При завершении главного потока все дочерние потоки убиваются принудительно, даже если они не закончили работу.
- Трудно обрабатывать исключения - нет обратной связи об ошибках.
Как использовать интерфейс multiprocessing для потоков через multiprocessing.dummy?
Модуль multiprocessing.dummy предоставляет ту же абстракцию, что и multiprocessing, но использует потоки вместо процессов. Это удобно, если нужно переключиться между процессами и потоками без изменения кода.
from multiprocessing.dummy import Pool
import requests
def fetch_url(url):
response = requests.get(url)
return len(response.content)
urls = ["http://example.com", "http://httpbin.org/get"]
with Pool(2) as pool:
results = pool.map(fetch_url, urls)
print("Размеры ответов:", results)
Размеры ответов: [1256, 357]
Типичные ошибки:
- Модуль multiprocessing.dummy плохо документирован и может быть удалён в будущих версиях. Рекомендуется использовать concurrent.futures.ThreadPoolExecutor.
- Ложное ощущение, что код работает параллельно для CPU-задач - GIL ограничивает производительность.
Расширенные примеры работы с многопоточностью
Синхронизация потоков с помощью Event
import threading
import time
def waiter(event):
print("Ожидание события...")
event.wait()
print("Событие получено, продолжаем")
def setter(event):
time.sleep(1)
print("Устанавливаем событие")
event.set()
event = threading.Event()
thread1 = threading.Thread(target=waiter, args=(event,))
thread2 = threading.Thread(target=setter, args=(event,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("Готово")
Ожидание события... Устанавливаем событие Событие получено, продолжаем Готово
Безопасная передача данных через Queue
from queue import Queue
import threading
import time
def producer(queue):
for i in range(5):
item = f"Сообщение {i}"
queue.put(item)
print(f"Отправлено: {item}")
time.sleep(0.2)
def consumer(queue):
while True:
item = queue.get()
if item is None: # сигнал остановки
break
print(f"Получено: {item}")
queue.task_done()
q = Queue()
prod = threading.Thread(target=producer, args=(q,))
cons = threading.Thread(target=consumer, args=(q,))
prod.start()
cons.start()
prod.join()
q.put(None) # сигнал завершения потребителя
cons.join()
print("Все задачи завершены")
Отправлено: Сообщение 0 Получено: Сообщение 0 Отправлено: Сообщение 1 Получено: Сообщение 1 Отправлено: Сообщение 2 Получено: Сообщение 2 Отправлено: Сообщение 3 Получено: Сообщение 3 Отправлено: Сообщение 4 Получено: Сообщение 4 Все задачи завершены
Пул потоков с обработкой исключений и таймаутом
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def risky_task(seconds):
time.sleep(seconds)
if seconds > 2:
raise ValueError("Слишком долго!")
return f"Успех за {seconds} сек"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(risky_task, t) for t in [1, 3, 0.5]]
for future in futures:
try:
result = future.result(timeout=2.5)
print(result)
except TimeoutError:
print("Таймаут")
except ValueError as e:
print(f"Ошибка: {e}")
Успех за 1 сек Ошибка: Слишком долго! Успех за 0.5 сек
Подкласс Thread с переопределением run
import threading
import random
class MyThread(threading.Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
delay = random.uniform(0.5, 1.5)
print(f"{self.name} засыпает на {delay:.2f} сек")
threading.Event().wait(delay)
print(f"{self.name} проснулся")
threads = [MyThread(f"Поток-{i}") for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Все потоки завершены")
Поток-0 засыпает на 0.87 сек Поток-1 засыпает на 1.23 сек Поток-2 засыпает на 0.95 сек Поток-0 проснулся Поток-2 проснулся Поток-1 проснулся Все потоки завершены
Использование Condition для синхронизации производителей и потребителей
import threading
import time
class Buffer:
def __init__(self, capacity):
self.capacity = capacity
self.items = []
self.cond = threading.Condition()
def produce(self, item):
with self.cond:
while len(self.items) >= self.capacity:
self.cond.wait()
self.items.append(item)
self.cond.notify()
def consume(self):
with self.cond:
while not self.items:
self.cond.wait()
item = self.items.pop(0)
self.cond.notify()
return item
def producer(buf):
for i in range(5):
buf.produce(i)
print(f"Произведено {i}")
time.sleep(0.1)
def consumer(buf):
for _ in range(5):
item = buf.consume()
print(f"Потреблено {item}")
time.sleep(0.15)
buf = Buffer(3)
t1 = threading.Thread(target=producer, args=(buf,))
t2 = threading.Thread(target=consumer, args=(buf,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Работа завершена")
Произведено 0 Потреблено 0 Произведено 1 Произведено 2 Потреблено 1 Произведено 3 Потреблено 2 Произведено 4 Потреблено 3 Потреблено 4 Работа завершена