Работа с базами данных в Python: популярные библиотеки и их применение
Библиотеки Python для работы с базами данных
При разработке приложений на Python часто требуется взаимодействие с базами данных (БД). Разные задачи требуют разных подходов - от простого встраиваемого SQLite до промышленных PostgreSQL, MySQL. Рассмотрим основные библиотеки, которые помогают выполнять запросы, управлять транзакциями и моделировать данные.
Основное и наиболее эффективное решение: SQLAlchemy
SQLAlchemy - это мощная библиотека, предоставляющая как ORM (Object Relational Mapping), так и низкоуровневый Core для работы с SQL. Она поддерживает множество СУБД (SQLite, PostgreSQL, MySQL, Oracle, Microsoft SQL Server) через единый интерфейс.
Как создать подключение и выполнить простой запрос через SQLAlchemy Core?
from sqlalchemy import create_engine, text
engine = create_engine('sqlite:///example.db', echo=False)
with engine.connect() as conn:
result = conn.execute(text('SELECT * FROM users'))
for row in result:
print(row)Python библиотеки для баз данных (библиотеки python для баз данных)
Здесь create_engine создаёт движок, connect открывает соединение, text превращает строку в выполняемое выражение. Результат - итерируемый объект с записями.
Типичная проблема: неправильный синтаксис URI (например, забытый слэш для SQLite). Решение - проверять строку по документации: 'sqlite:///path/to/db' (три слэша для абсолютного пути).
Ещё одна ошибка - забыть закрыть соединение. Использование менеджера контекста with решает эту проблему.
Различные варианты решения
Как использовать встроенную БД без установки дополнительных пакетов?
Библиотека sqlite3 входит в стандартную поставку Python. Она идеальна для прототипирования, малых проектов и тестов.
import sqlite3
conn = sqlite3.connect('projects.db')
cursor = conn.cursor()
cursor.execute('CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY, name TEXT)')
cursor.execute('INSERT INTO tasks (name) VALUES (?)', ('Первая задача',))
conn.commit()
cursor.execute('SELECT * FROM tasks')
print(cursor.fetchall())
conn.close()
Проблема: строки соединения не поддерживаются, только имя файла. Также нет автоматического преобразования типов - даты приходят в виде строк. Решение - использовать row_factory для получения словарей.
Ошибка: забыть вызов commit() - данные не сохранятся. Всегда фиксируйте транзакцию после вставки.
Как подключаться к PostgreSQL с максимальной производительностью?
Для работы с PostgreSQL чаще всего используют psycopg2 - зрелый драйвер, поддерживающий пулы соединений, асинхронные режимы (через psycopg2.extras) и широкий набор типов.
import psycopg2
conn = psycopg2.connect(
host='localhost',
database='mydb',
user='user',
password='pass'
)
cur = conn.cursor()
cur.execute('SELECT version()')
print(cur.fetchone())
cur.close()
conn.close()
Проблема: если PostgreSQL не запущен, соединение вызовет исключение OperationalError. Рекомендуется оборачивать подключение в try-except.
Как выполнять асинхронные запросы к БД?
asyncpg - асинхронный драйвер для PostgreSQL, работающий на asyncio. Он даёт высокую скорость и поддержку prepared statements.
import asyncio
import asyncpg
async def run():
conn = await asyncpg.connect(user='user', password='pass', database='mydb', host='localhost')
values = await conn.fetch('''SELECT id, name FROM products WHERE price > $1''', 100)
for record in values:
print(record['name'])
await conn.close()
asyncio.run(run())
Типичная ошибка: попытка использовать asyncpg в синхронном коде без event loop. Решение - всегда работать внутри асинхронной функции и вызывать через asyncio.run().
Как работать с MySQL из Python?
Библиотека PyMySQL - чистый Python драйвер для MySQL. Он прост в установке и совместим с большинством версий MySQL.
import pymysql
connection = pymysql.connect(host='localhost', user='root', password='', database='test')
cursor = connection.cursor()
cursor.execute('SELECT COUNT(*) FROM users')
result = cursor.fetchone()
print('Число пользователей:', result[0])
cursor.close()
connection.close()
Проблема: утечка памяти при большом количестве соединений. Используйте пул или закрывайте соединения в блоке finally.
Как загрузить данные из SQL-запроса в DataFrame?
Библиотека pandas умеет читать результаты SQL напрямую через функцию read_sql_query. Для этого требуется драйвер (sqlite3, psycopg2, и т.п.).
import pandas as pd
import sqlite3
conn = sqlite3.connect('data.db')
df = pd.read_sql_query('SELECT * FROM sales', conn)
print(df.head())
conn.close()
Типичная ошибка: забыть закрыть соединение после чтения. Лучше использовать контекстный менеджер.
Расширенные примеры использования библиотек для баз данных
1. SQLAlchemy - продвинутая работа с ORM
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, Session
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
posts = relationship('Post', back_populates='author')
class Post(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(100))
user_id = Column(Integer, ForeignKey('users.id'))
author = relationship('User', back_populates='posts')
engine = create_engine('sqlite:///blog.db')
Base.metadata.create_all(engine)
with Session(engine) as session:
user = User(name='Анна')
session.add(user)
session.commit()
post = Post(title='Первая запись', author=user)
session.add(post)
session.commit()
# Запрос с join
result = session.query(User).join(Post).filter(Post.title.like('%Первая%')).all()
for u in result:
print(f'Пользователь {u.name}, посты: {len(u.posts)}')
Пользователь Анна, посты: 1
2. psycopg2 - использование пула соединений
from psycopg2 import pool
connection_pool = pool.ThreadedConnectionPool(
minconn=2, maxconn=10,
host='localhost', database='testdb', user='user', password='pass'
)
# Получаем соединение из пула
conn = connection_pool.getconn()
cur = conn.cursor()
cur.execute('SELECT id, username FROM accounts WHERE active = %s', (True,))
rows = cur.fetchall()
print('Активные пользователи:')
for row in rows:
print(f' ID {row[0]}: {row[1]}')
cur.close()
connection_pool.putconn(conn) # возвращаем в пул
Активные пользователи: ID 1: admin ID 3: test_user
3. asyncpg - массовая вставка с транзакцией
import asyncio
import asyncpg
async def bulk_insert():
conn = await asyncpg.connect(user='user', password='pass', database='mydb')
async with conn.transaction():
await conn.execute('''
INSERT INTO logs (event, timestamp) VALUES ($1, $2)
''', 'Событие А', '2025-03-10 12:00:00')
await conn.execute('''
INSERT INTO logs (event, timestamp) VALUES ($1, $2)
''', 'Событие Б', '2025-03-10 12:05:00')
# Получаем все вставленные записи
rows = await conn.fetch('SELECT * FROM logs ORDER BY timestamp')
for row in rows:
print(dict(row))
await conn.close()
asyncio.run(bulk_insert())
{'id': 1, 'event': 'Событие А', 'timestamp': datetime.datetime(2025, 3, 10, 12, 0)}
{'id': 2, 'event': 'Событие Б', 'timestamp': datetime.datetime(2025, 3, 10, 12, 5)}
4. pandas и SQL - загрузка с параметрами
import pandas as pd
import psycopg2
conn = psycopg2.connect('dbname=shop user=user password=pass host=localhost')
query = 'SELECT product, price, quantity FROM inventory WHERE quantity > %(min_qty)s'
params = {'min_qty': 5}
df = pd.read_sql_query(query, conn, params=params)
print('Товары с остатком более 5 штук:')
print(df.to_string(index=False))
conn.close()
Товары с остатком более 5 штук:
product price quantity
стол 12000 12
стул 4500 8