Ошибка превышения времени ожидания в коде на Python
Ошибка time out в Python: причины и варианты устранения
Как остановить долго выполняющуюся функцию с помощью сигнала?
Наиболее эффективный и простой способ для синхронного кода на Unix-подобных системах - использование модуля signal. Он позволяет прервать выполнение через заданное время с помощью сигнала SIGALRM. Важно: этот метод не работает в Windows, где сигнал SIGALRM отсутствует.
import signal
import time
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError("Функция превысила лимит времени")
signal.signal(signal.SIGALRM, handler)
signal.alarm(5) # 5 секунд
try:
# Долгая операция
time.sleep(10)
signal.alarm(0) # сброс таймера
except TimeoutError:
print("Функция прервана по тайм-ауту")
finally:
signal.alarm(0)Python timed out (ошибка time out в python)
Функция прервана по тайм-ауту
После срабатывания таймера выполнение переходит в обработчик, где можно корректно завершить работу или выбросить собственное исключение. Следует отключать будильник (signal.alarm(0)) после успешного завершения, чтобы не прервать другие операции.
Проблема: В Windows метод не работает, возникает ошибка AttributeError: module 'signal' has no attribute 'SIGALRM'. Альтернатива - использовать threading.Timer или multiprocessing.
Типичная ошибка: Если внутри защищаемой функции вызывается блокирующая системная операция (например, чтение из сокета), сигнал может быть проигнорирован. В таком случае требуется установка тайм-аута непосредственно на сокет.
Как задать тайм-аут для HTTP-запросов через requests?
import requests
try:
response = requests.get('https://httpbin.org/delay/10', timeout=3)
except requests.exceptions.Timeout:
print("Запрос превысил время ожидания")
except requests.exceptions.ConnectionError:
print("Ошибка соединения")
Запрос превысил время ожидания
Параметр timeout принимает число секунд или кортеж (connect_timeout, read_timeout). Если сервер не отвечает в течение указанного времени, выбрасывается исключение requests.exceptions.Timeout.
Проблема: При очень медленном ответе сервера тайм-аут может сработать некорректно, если не указать отдельно тайм-аут на чтение. Рекомендуется использовать кортеж: timeout=(3, 5).
Как ограничить время выполнения произвольного кода с помощью threading.Timer?
import threading
import time
def long_function():
time.sleep(10)
print("Функция завершена")
timer = threading.Timer(5, lambda: print("Тайм-аут! Функция не завершена"))
timer.start()
thread = threading.Thread(target=long_function)
thread.start()
thread.join(timeout=5) # ждем не более 5 секунд
if thread.is_alive():
print("Поток все еще выполняется, тайм-аут")
# Принудительное завершение не поддерживается, можно лишь установить флаг
timer.cancel()
Тайм-аут! Функция не завершена Поток все еще выполняется, тайм-аут
Этот подход позволяет отследить превышение времени, но не прерывает поток принудительно (в Python нет безопасного способа убить поток). Для реального прерывания используют процессы или асинхронный код.
Проблема: Поток продолжает выполняться в фоне, что может привести к утечке ресурсов. Решение - использовать multiprocessing с возможностью принудительного завершения процесса.
Как использовать asyncio.wait_for для тайм-аута в асинхронном коде?
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "Готово"
async def main():
try:
result = await asyncio.wait_for(slow_task(), timeout=5)
print(result)
except asyncio.TimeoutError:
print("Задача превысила время ожидания")
asyncio.run(main())
Задача превысила время ожидания
asyncio.wait_for оборачивает корутину и генерирует исключение asyncio.TimeoutError при превышении лимита. Задача при этом отменяется автоматически (выбрасывается CancelledError внутри корутины). Это основной способ для асинхронных приложений.
Проблема: Если корутина не поддерживает отмену (например, блокируется на системном вызове), тайм-аут не сработает. В таких случаях необходимо использовать специальные библиотеки (например, aiohttp с собственными тайм-аутами).
Как установить тайм-аут на низкоуровневые сокеты?
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10.0) # секунды
try:
sock.connect(('example.com', 80))
sock.sendall(b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n')
data = sock.recv(1024)
print(data)
except socket.timeout:
print("Сокет превысил время ожидания")
finally:
sock.close()
(вывод данных или сообщение о тайм-ауте)
Метод settimeout устанавливает максимальное время ожидания на все операции с сокетом (connect, recv, send). Если время истекло, выбрасывается socket.timeout.
Проблема: При использовании settimeout после соединения тайм-аут применяется к каждому вызову recv/send отдельно, а не ко всему чтению целиком. Для суммарного тайм-аута требуется собственная логика.
Как прервать выполнение внешнего процесса через subprocess с тайм-аутом?
import subprocess
try:
result = subprocess.run(['sleep', '10'], timeout=5, capture_output=True)
except subprocess.TimeoutExpired:
print("Процесс превысил время ожидания")
except subprocess.CalledProcessError as e:
print(f"Ошибка выполнения: {e}")
Процесс превысил время ожидания
Встроенный параметр timeout в subprocess.run при истечении времени убивает процесс (через SIGKILL на Unix). Исключение TimeoutExpired позволяет обработать ситуацию.
Проблема: Если внутри процесса есть дочерние процессы, они могут не завершиться. Решение - использовать preexec_fn или группу процессов.
Расширенные примеры работы с тайм-аутами в Python
Тайм-аут с использованием модуля functools и частичного применения
Можно создать декоратор, реализующий тайм-аут для любой функции с помощью signal (только Unix). Пример с поддержкой целочисленного и дробного времени:
import signal
from functools import wraps
class TimeoutError(Exception):
pass
def timeout(seconds):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
def handler(signum, frame):
raise TimeoutError(f"Функция {func.__name__} превысила лимит {seconds} сек")
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
return result
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
return wrapper
return decorator
@timeout(3)
def slow_computation():
import time
time.sleep(5)
return 42
try:
print(slow_computation())
except TimeoutError as e:
print(e)
Функция slow_computation превысила лимит 3 сек
Асинхронный тайм-аут с отменой корутины и побочными эффектами
Пример демонстрирует, что задача после тайм-аута отменяется, и можно выполнить очистку:
import asyncio
async def resource_intensive_task():
try:
print("Задача запущена")
await asyncio.sleep(10)
print("Задача завершена (не должно быть)")
except asyncio.CancelledError:
print("Задача отменена, освобождаем ресурсы")
# Здесь можно закрыть соединения, файлы и т.д.
raise
async def main():
task = asyncio.create_task(resource_intensive_task())
try:
await asyncio.wait_for(task, timeout=5)
except asyncio.TimeoutError:
print("Основная программа: тайм-аут")
# Ждем, чтобы убедиться, что задача действительно отменена
await asyncio.sleep(1)
print("Конец main")
asyncio.run(main())
Задача запущена Задача отменена, освобождаем ресурсы Основная программа: тайм-аут Конец main
Тайм-аут для HTTP с использованием aiohttp
Библиотека aiohttp позволяет устанавливать тайм-аут как на соединение, так и на чтение:
import aiohttp
import asyncio
async def fetch_with_timeout():
timeout = aiohttp.ClientTimeout(total=5, connect=2, sock_read=3)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://httpbin.org/delay/10') as response:
return await response.text()
except asyncio.TimeoutError:
return "Тайм-аут aiohttp"
except aiohttp.ClientError as e:
return f"Ошибка клиента: {e}"
result = asyncio.run(fetch_with_timeout())
print(result)
Тайм-аут aiohttp
Тайм-аут при работе с контекстными менеджерами (например, открытие файла)
Хотя стандартный open не поддерживает тайм-аут, можно обернуть его в поток или процесс. Пример с использованием multiprocessing и Queue:
import multiprocessing
def read_file(filename, queue):
try:
with open(filename, 'r') as f:
data = f.read()
queue.put(data)
except Exception as e:
queue.put(e)
def timed_read(filename, timeout):
queue = multiprocessing.Queue()
process = multiprocessing.Process(target=read_file, args=(filename, queue))
process.start()
process.join(timeout)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError(f"Чтение файла {filename} превысило {timeout} сек")
result = queue.get()
if isinstance(result, Exception):
raise result
return result
try:
content = timed_read('/dev/random', 2) # /dev/random может блокироваться
print(content[:100])
except TimeoutError as e:
print(e)
Чтение файла /dev/random превысило 2 сек
Комбинирование тайм-аутов: обертка для внешних библиотек
Иногда требуется установить тайм-аут на вызов C-расширения, которое не поддерживает отмену. В таких случаях помогает запуск в отдельном процессе:
import multiprocessing
import time
def call_c_extension(result_dict):
# Имитация вызова C-функции, которая может зависнуть
time.sleep(10)
result_dict['result'] = 42
manager = multiprocessing.Manager()
result = manager.dict()
p = multiprocessing.Process(target=call_c_extension, args=(result,))
p.start()
p.join(5)
if p.is_alive():
p.terminate()
p.join()
raise TimeoutError("C-расширение не вернулось вовремя")
print(result.get('result', 'Нет результата'))
... (процесс убит) TimeoutError: C-расширение не вернулось вовремя
Тайм-аут в Jupyter Notebook (ячейки)
Для интерактивных сред можно использовать %%timeout магию (требуется пакет ipython-autotime или собственный обработчик). Пример собственного декоратора для ячеек:
from IPython.core.magic import register_cell_magic
import signal
@register_cell_magic
def cell_timeout(line, cell):
try:
seconds = int(line)
except ValueError:
seconds = 5
def handler(signum, frame):
raise TimeoutError("Ячейка превысила время выполнения")
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
try:
exec(cell)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
# Использование в ячейке:
# %%cell_timeout 2
# import time
# time.sleep(10)
(в случае превышения: TimeoutError: Ячейка превысила время выполнения)
Тайм-аут при использовании библиотеки socket (подробный пример с таймером на чтение)
Пример суммарного тайм-аута на чтение данных заданного размера:
import socket
import time
def recv_timeout(sock, n, timeout_seconds):
sock.settimeout(timeout_seconds) # тайм-аут на каждый recv
data = b''
start = time.time()
while len(data) < n:
remaining = n - len(data)
if time.time() - start > timeout_seconds:
raise socket.timeout("Суммарное время чтения превышено")
try:
chunk = sock.recv(remaining)
except socket.timeout:
continue # просто повторяем попытку
if not chunk:
break
data += chunk
return data
sock = socket.socket()
sock.connect(('httpbin.org', 80))
sock.sendall(b'GET /bytes/1000 HTTP/1.1\r\nHost: httpbin.org\r\n\r\n')
# Пропускаем заголовки, читаем тело
sock.recv(4096) # заголовки (можно отдельно)
try:
body = recv_timeout(sock, 1000, 5)
print(f"Прочитано {len(body)} байт")
except socket.timeout as e:
print(e)
finally:
sock.close()
Прочитано 1000 байт (или сообщение о тайм-ауте)