Обзор библиотек Python для работы с данными в SQLite и PostgreSQL
Выбор библиотеки для работы с SQL в Python
Для взаимодействия с реляционными базами данных SQLite и PostgreSQL в Python существует несколько подходов. Встроенный модуль sqlite3 подходит только для SQLite, а для PostgreSQL чаще всего используют psycopg2. Универсальным решением является SQLAlchemy, который предоставляет как низкоуровневый Core, так и высокоуровневый ORM. В данном обзоре рассматриваются основные варианты с примерами кода.
Какая библиотека позволяет работать и с SQLite, и с PostgreSQL с единым интерфейсом?
Наиболее эффективным решением считается SQLAlchemy. Она абстрагирует диалекты баз данных, позволяет переключаться между SQLite и PostgreSQL, меняя только строку подключения. Пример базовой настройки:
from sqlalchemy import create_engine, text
# подключение к SQLite (файл БД)
engine_sqlite = create_engine('sqlite:///example.db')
# подключение к PostgreSQL (требует psycopg2)
engine_pg = create_engine('postgresql://user:pass@localhost/mydb')
# выполнение запроса
with engine_sqlite.connect() as conn:
result = conn.execute(text('SELECT sqlite_version()'))
print(result.scalar()) # версия SQLiteSql server python (sql server и python)
3.45.1
Sql через python (работа с sql через python)
В этом примере используется text() для написания сырого SQL. SQLAlchemy автоматически управляет пулом соединений и транзакциями.
Типичные ошибки:
- Забыли установить драйвер для PostgreSQL (
pip install psycopg2-binary). - Использование
create_engineбезconnect()приводит к ошибке о том, что движок не является соединением. - В SQLite некоторые типы данных (например, UUID) не поддерживаются напрямую, требуется преобразование.
Как работать с SQLite без установки дополнительных пакетов?
Встроенный модуль sqlite3 идеально подходит для локальных баз данных, когда не требуется PostgreSQL. Он входит в стандартную библиотеку Python. Пример создания таблицы и вставки данных:
import sqlite3
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS users
(id INTEGER PRIMARY KEY, name TEXT, age INTEGER)''')
cursor.execute('INSERT INTO users (name, age) VALUES (?, ?)', ('Alice', 30))
conn.commit()
cursor.execute('SELECT * FROM users')
print(cursor.fetchall())
conn.close()Python postgresql (подключение к postgresql из python)
[(1, 'Alice', 30)]
Python база данных таблицу (создание таблицы в базе данных python)
Внимание: после INSERT требуется conn.commit(), иначе изменения не сохранятся. Использование ? как placeholder защищает от SQL-инъекций.
Проблемы:
- Забытый
commit()- данные потеряются при закрытии соединения. - Одновременный доступ из нескольких потоков может вызвать блокировку (SQLite не поддерживает конкурентную запись).
Как подключиться к PostgreSQL и выполнить классические запросы?
Для PostgreSQL стандартом является psycopg2. Он предоставляет совместимый с DB-API 2.0 интерфейс. Пример:
import psycopg2
conn = psycopg2.connect(
dbname='mydb',
user='myuser',
password='mypass',
host='localhost',
port=5432
)
cur = conn.cursor()
cur.execute('CREATE TABLE IF NOT EXISTS items (id SERIAL PRIMARY KEY, title VARCHAR(100))')
cur.execute('INSERT INTO items (title) VALUES (%s)', ('First item',))
conn.commit()
cur.execute('SELECT * FROM items')
print(cur.fetchall())
cur.close()
conn.close()библиотека sql python (библиотека для работы с sql в python)
[(1, 'First item')]
создание sql python (работа с sql в python)
Здесь используется %s для параметров. В отличие от sqlite3, символ ? в psycopg2 не работает.
Частые ошибки:
- Не установлен libpq (на Windows драйвер устанавливается вместе с
psycopg2-binary). - Параметры подключения неверны - исключение
OperationalError. - Использование
%в запросе без экранирования приводит к ошибке.
Как выполнять асинхронные запросы к SQLite или PostgreSQL?
Для асинхронного доступа существуют библиотеки aiosqlite (для SQLite) и asyncpg (для PostgreSQL). Пример с aiosqlite:
import asyncio
import aiosqlite
async def main():
async with aiosqlite.connect('async_test.db') as db:
await db.execute('CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY, description TEXT)')
await db.execute('INSERT INTO tasks (description) VALUES (?)', ('Async task',))
await db.commit()
cursor = await db.execute('SELECT * FROM tasks')
rows = await cursor.fetchall()
print(rows)
asyncio.run(main())
[(1, 'Async task')]
Для PostgreSQL с asyncpg:
import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect(user='myuser', password='mypass', database='mydb', host='localhost')
await conn.execute('CREATE TABLE IF NOT EXISTS logs (id serial PRIMARY KEY, msg text)')
await conn.execute('INSERT INTO logs (msg) VALUES ($1)', 'Async log')
rows = await conn.fetch('SELECT * FROM logs')
print(rows)
await conn.close()
asyncio.run(main())
[]
Здесь используется $1 для параметров. Асинхронные драйверы не поддерживаются в стандартной библиотеке.
Проблемы:
- Смешивание синхронного и асинхронного кода приводит к блокировке цикла событий.
- Для asyncpg требуется установка
pip install asyncpg.
Какая ORM проще для небольших проектов, если SQLAlchemy кажется избыточной?
Для небольших проектов подходит Peewee. Она легковесна, имеет понятный API и поддерживает SQLite, PostgreSQL и MySQL. Пример:
from peewee import SqliteDatabase, Model, CharField, IntegerField
db = SqliteDatabase('people.db')
class Person(Model):
name = CharField()
age = IntegerField()
class Meta:
database = db
db.create_tables([Person])
alice = Person.create(name='Alice', age=30)
query = Person.select()
for person in query:
print(person.name, person.age)
Alice 30
Peewee автоматически создает таблицы и обрабатывает транзакции через контекстные менеджеры, что упрощает код.
Ошибки:
- Нельзя использовать Peewee с уже существующей схемой, если она не совпадает с моделями (требуется migrate).
- При работе с PostgreSQL нужно установить
psycopg2.
Каждый подход имеет свою область применения. Для больших проектов с миграциями и переключением между БД предпочтительнее SQLAlchemy. Для простых скриптов на SQLite достаточно sqlite3 или Peewee. Асинхронные решения уместны в веб-фреймворках (FastAPI, aiohttp).
Расширенные примеры и нераспространенные сценарии
Транзакции с явным управлением в SQLAlchemy Core
from sqlalchemy import create_engine, text
engine = create_engine('sqlite:///transactions.db')
with engine.begin() as conn: # автоматический commit или rollback при ошибке
conn.execute(text('CREATE TABLE IF NOT EXISTS accounts (id INTEGER PRIMARY KEY, balance REAL)'))
conn.execute(text('INSERT INTO accounts (balance) VALUES (:bal)'), {'bal': 1000.0})
# если ниже возникнет исключение, транзакция откатится
conn.execute(text('UPDATE accounts SET balance = balance + 500 WHERE id = 1'))
# после выхода из блока conn, если не было ошибок, происходит commit
Результат: таблица accounts создана, баланс 1500.0.
Метод begin() гарантирует, что соединение вернется в пул после завершения транзакции.
Bulk insert с помощью executemany
import sqlite3
conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute('CREATE TABLE numbers (val INTEGER)')
data = [(i,) for i in range(100000)]
c.executemany('INSERT INTO numbers VALUES (?)', data)
conn.commit()
print('Вставлено', c.rowcount, 'строк')
conn.close()
Вставлено 100000 строк
Использование executemany значительно быстрее, чем множественные вызовы execute. Для больших объемов стоит увеличить параметр batch_size в ORM.
Кастомные типы в SQLAlchemy ORM
from sqlalchemy import create_engine, Column, Integer, String, TypeDecorator
from sqlalchemy.orm import declarative_base, Session
import json
class JSONType(TypeDecorator):
impl = String
def process_bind_param(self, value, dialect):
return json.dumps(value) if value else None
def process_result_value(self, value, dialect):
return json.loads(value) if value else None
Base = declarative_base()
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer, primary_key=True)
tags = Column(JSONType)
engine = create_engine('sqlite:///custom_type.db')
Base.metadata.create_all(engine)
with Session(engine) as session:
article = Article(tags=['python', 'sqlalchemy'])
session.add(article)
session.commit()
print(article.tags)
['python', 'sqlalchemy']
Кастомный тип JSONType автоматически сериализует список в JSON при сохранении и десериализует при загрузке.
Пагинация с использованием LIMIT и OFFSET
from sqlalchemy import create_engine, text
engine = create_engine('sqlite:///pagination.db')
with engine.begin() as conn:
conn.execute(text('CREATE TABLE IF NOT EXISTS posts (id INTEGER PRIMARY KEY, title TEXT)'))
for i in range(1, 101):
conn.execute(text('INSERT INTO posts (title) VALUES (:t)'), {'t': f'Post {i}'})
# пагинация: страница 3, по 10 записей
page = 3
per_page = 10
with engine.connect() as conn:
result = conn.execute(
text('SELECT * FROM posts ORDER BY id LIMIT :limit OFFSET :offset'),
{'limit': per_page, 'offset': (page-1)*per_page}
)
for row in result:
print(row.id, row.title)
21 Post 21 22 Post 22 ... 30 Post 30
Важно: в SQLite OFFSET без LIMIT не работает, поэтому всегда указывайте LIMIT.
Использование подзапросов в SQLAlchemy Core
from sqlalchemy import create_engine, text, select, func, table, column
engine = create_engine('sqlite:///subquery_example.db')
with engine.begin() as conn:
conn.execute(text('''CREATE TABLE orders(
id INTEGER PRIMARY KEY,
customer_id INTEGER,
amount REAL
)'''))
conn.execute(text('''INSERT INTO orders VALUES
(1,1,100), (2,1,200), (3,2,150), (4,2,300)'''))
orders = table('orders', column('id'), column('customer_id'), column('amount'))
subq = select(func.avg(orders.c.amount)).where(orders.c.customer_id == 1).scalar_subquery()
stmt = select(orders).where(orders.c.amount > subq)
with engine.connect() as conn:
result = conn.execute(stmt)
for row in result:
print(row)
(1, 1, 100.0) (2, 1, 200.0)
Подзапрос scalar_subquery() вычисляет среднюю сумму для клиента 1 (150), затем выбираются все заказы этого клиента с суммой больше 150 (только 200).
Асинхронное соединение с пулом в asyncpg
import asyncio
import asyncpg
async def main():
pool = await asyncpg.create_pool(
user='myuser', password='mypass', database='mydb',
host='localhost', min_size=2, max_size=10
)
async with pool.acquire() as conn:
await conn.execute('''CREATE TABLE IF NOT EXISTS metrics (
id serial PRIMARY KEY, value real, ts timestamptz DEFAULT now()
)''')
await conn.execute('INSERT INTO metrics (value) VALUES ($1)', 3.14)
row = await conn.fetchrow('SELECT * FROM metrics')
print(dict(row))
await pool.close()
asyncio.run(main())
{'id': 1, 'value': 3.14, 'ts': datetime.datetime(2025, 4, 2, 12, 0, 0, tzinfo=datetime.timezone.utc)}
Пул соединений позволяет эффективно работать с большим количеством запросов, переиспользуя открытые соединения.
Миграции схемы с использованием Alembic (SQLAlchemy)
# Установка: pip install alembic
# Инициализация: alembic init alembic
# Настройка env.py для подключения к БД
# Создание миграции: alembic revision --autogenerate -m "add email field"
# Применение: alembic upgrade head
# Пример файла миграции (автоматически генерируется):
"""
Revision ID: abc123
Revises:
Create Date: 2025-04-02
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column('users', sa.Column('email', sa.String(120), nullable=True))
def downgrade():
op.drop_column('users', 'email')
Миграции позволяют версионировать изменения схемы БД и выполнять их последовательно.