Работа с PostgreSQL через Python: выполнение, обработка и оптимизация SQL запросов
Основные методы выполнения SQL запросов в Python
Для работы с PostgreSQL из Python чаще всего используется библиотека psycopg2. Она обеспечивает полный доступ к возможностям сервера, поддерживает пулы соединений, асинхронные режимы и работу с расширенными типами данных. Однако существуют и альтернативы: SQLAlchemy (позволяет писать SQL или использовать ORM), asyncpg (для асинхронных приложений) и встроенные средства Django. Рассмотрим каждый подход с примерами.
Как безопасно выполнить SQL запрос с помощью psycopg2?
Наиболее эффективный способ - использовать контекстный менеджер для подключения и курсора, а также параметризованные запросы для защиты от SQL-инъекций.
import psycopg2
conn = psycopg2.connect(
host="localhost",
port=5432,
dbname="mydb",
user="user",
password="secret"
)
try:
with conn:
with conn.cursor() as cur:
cur.execute("SELECT id, name FROM users WHERE age > %s;", (18,))
rows = cur.fetchall()
for row in rows:
print(row)
finally:
conn.close()Python postgresql sql (sql запросы postgresql в python)
Пояснение: блок with conn автоматически фиксирует транзакцию при успешном выходе или откатывает при исключении. Параметры передаются отдельно от строки запроса - это предотвращает инъекции.
Типичные ошибки: забыть закрыть соединение (используйте finally или контекстный менеджер), передавать параметры через f-строки (риск инъекции), не обрабатывать исключения (например, нарушение уникальности).
Решение: всегда использовать %s плейсхолдеры и кортеж параметров; оборачивать код в try/except и логировать ошибки.
Как выполнять SQL запросы через SQLAlchemy без ORM?
SQLAlchemy предоставляет text() для написания сырых SQL-запросов с возможностью подстановки параметров.
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://user:secret@localhost/mydb")
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM products WHERE price > :min_price"), {"min_price": 100})
for row in result:
print(row)Подстановка через именованные параметры :param также безопасна. SQLAlchemy автоматически управляет пулом соединений.
Проблемы: если не указать text() и передать строку, может возникнуть SQL-инъекция. Также стоит помнить, что соединение возвращается в пул при закрытии, а не сразу закрывается.
Как выполнять запросы асинхронно с помощью asyncpg?
Библиотека asyncpg специально разработана для асинхронного программирования на asyncio. Пример подключения и запроса:
import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect(user='user', password='secret', database='mydb', host='localhost')
values = await conn.fetch('''SELECT * FROM users WHERE age > $1''', 18)
for record in values:
print(record)
await conn.close()
asyncio.run(main())Параметры задаются через $1, $2 и т.д. Библиотека работает только с asyncio.
Типичные ошибки: забыть закрыть соединение (использовать контекстный менеджер async with), пытаться использовать asyncpg в синхронном коде без asyncio.run.
Как выполнить произвольный SQL через Django ORM (django.db.connection)?
Если проект использует Django, можно получить соединение через django.db.connection и выполнить прямой SQL.
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("UPDATE posts SET views = views + 1 WHERE id = %s", [post_id])
cursor.execute("SELECT title, views FROM posts WHERE id = %s", [post_id])
row = cursor.fetchone()
print(row)Django автоматически использует правильную базу данных, указанную в настройках.
Проблемы: не рекомендуется смешивать ORM и сырой SQL в одном проекте без необходимости, может возникнуть путаница с транзакциями. Также нужно явно указывать %s даже для PostgreSQL, так как Django адаптирует синтаксис.
Расширенный пример 1: массовая вставка с обработкой конфликтов (UPSERT)
Используем оператор ON CONFLICT DO UPDATE для вставки или обновления записей. Параметры передаются как список кортежей.
import psycopg2
from psycopg2.extras import execute_values
data = [
(1, 'Alice', 30),
(2, 'Bob', 25),
(2, 'Robert', 26),
(3, 'Charlie', 35)
]
conn = psycopg2.connect("dbname=test user=postgres password=postgres")
try:
with conn:
with conn.cursor() as cur:
query = """
INSERT INTO users (id, name, age) VALUES %s
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, age = EXCLUDED.age
"""
execute_values(cur, query, data, template=None, page_size=100)
print(f"Затронуто строк: {cur.rowcount}")
finally:
conn.close()Затронуто строк: 4
Пояснение: execute_values из модуля psycopg2.extras позволяет вставить сразу много строк через VALUES, что значительно быстрее, чем executemany. При конфликте по первичному ключу id обновляются поля name и age.
Пример 2: работа с JSONB – создание, вставка, запросы
PostgreSQL поддерживает тип JSONB. В Python данные передаются как словари (dict).
import psycopg2
conn = psycopg2.connect("dbname=test user=postgres password=postgres")
try:
with conn:
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS events (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL
)
""")
cur.execute(
"INSERT INTO events (data) VALUES (%s)",
({"event": "click", "user": 42, "metadata": {"browser": "Chrome"}},)
)
cur.execute("SELECT data ->> 'event' AS event_type FROM events WHERE data @> %s", ('{"user": 42}',))
rows = cur.fetchall()
for row in rows:
print(row)
finally:
conn.close()('click',)Оператор @> проверяет, содержит ли JSONB указанный фрагмент. Для доступа к полям используются ->> (возвращает текст) и -> (возвращает JSON).
Пример 3: использование COPY для быстрой загрузки данных из файла или списка
COPY позволяет загрузить тысячи строк за секунды. В Python можно использовать copy_from с файлом или copy_expert со строковым буфером.
import psycopg2
import io
conn = psycopg2.connect("dbname=test user=postgres password=postgres")
try:
with conn:
with conn.cursor() as cur:
cur.execute("CREATE TEMP TABLE tmp_load (id INT, name TEXT) ON COMMIT DROP")
data = "1\tAlice\n2\tBob\n3\tCharlie\n"
f = io.StringIO(data)
cur.copy_from(f, 'tmp_load', sep='\t')
cur.execute("SELECT * FROM tmp_load")
for row in cur.fetchall():
print(row)
finally:
conn.close()(1, 'Alice') (2, 'Bob') (3, 'Charlie')
Важно: данные должны быть в текстовом формате, каждая строка соответствует одной записи. copy_from принимает файлоподобный объект. Этот метод значительно быстрее executemany.
Пример 4: пул соединений с psycopg2.pool
Для многопоточных приложений удобно использовать пул соединений, чтобы не создавать новое подключение на каждый запрос.
from psycopg2 import pool
connection_pool = pool.SimpleConnectionPool(1, 5, dbname="test", user="postgres", password="postgres", host="localhost")
conn = connection_pool.getconn()
cur = conn.cursor()
cur.execute("SELECT count(*) FROM users")
print(cur.fetchone())
connection_pool.putconn(conn)
connection_pool.closeall()(1000,)
Пул управляет временем жизни соединений и автоматически восстанавливает разорванные.
Пример 5: использование именованных курсоров (NamedTupleCursor) для удобного доступа к полям
from psycopg2 import connect
from psycopg2.extras import NamedTupleCursor
conn = connect("dbname=test user=postgres password=postgres")
with conn.cursor(cursor_factory=NamedTupleCursor) as cur:
cur.execute("SELECT id, name, age FROM users LIMIT 3")
for row in cur:
print(row.id, row.name, row.age)1 Alice 30 2 Bob 25 3 Charlie 35
Курсор возвращает строки как объекты, где к полям можно обращаться по имени атрибута.
Пример 6: асинхронный запрос с транзакцией и пулом в asyncpg
import asyncio
import asyncpg
async def main():
pool = await asyncpg.create_pool(dsn="postgresql://user:secret@localhost/mydb", min_size=2, max_size=10)
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("INSERT INTO logs (msg) VALUES ($1)", "async log")
result = await conn.fetchval("SELECT count(*) FROM logs")
print(f"Всего записей: {result}")
await pool.close()
asyncio.run(main())Всего записей: 42
Используется контекстный менеджер транзакции для автоматического commit/rollback. Пул увеличивает производительность в асинхронных приложениях.
Пример 7: вызов хранимой процедуры или функции с возвратом таблицы
import psycopg2
conn = psycopg2.connect("dbname=test user=postgres password=postgres")
with conn:
with conn.cursor() as cur:
cur.callproc("get_users_by_age", [25, 40])
for row in cur:
print(row)(2, 'Bob', 25) (4, 'Diana', 30)
Функция должна быть определена на сервере как RETURNS TABLE. callproc автоматически формирует SELECT * FROM function.