Подключение PostgreSQL к FastAPI: методы и примеры кода
Обзор вариантов работы с PostgreSQL в FastAPI
Как настроить асинхронное подключение к PostgreSQL в FastAPI с помощью SQLAlchemy и asyncpg?
Для высокопроизводительных приложений рекомендуется использовать асинхронный драйвер asyncpg вместе с SQLAlchemy. Это позволяет не блокировать event loop при запросах к базе данных. Рассмотрим полную настройку.
Установка зависимостей:
pip install fastapi uvicorn sqlalchemy asyncpg psycopg2-binary alembicсоздание базы данных на python (создание базы данных с помощью python)
Создадим файл database.py для настройки подключения:
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
# Функция для получения сессии
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
база данных на python (база данных на python)
Модель данных (models.py):
# models.py
from sqlalchemy import Column, Integer, String
from database import Base
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
description = Column(String)
Python интерфейс базы данных (интерфейс базы данных в python)
CRUD операции в FastAPI:
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_session
from models import Item
app = FastAPI()
@app.post("/items/")
async def create_item(name: str, description: str, session: AsyncSession = Depends(get_session)):
item = Item(name=name, description=description)
session.add(item)
await session.commit()
await session.refresh(item)
return {"id": item.id, "name": item.name, "description": item.description}
@app.get("/items/{item_id}")
async def read_item(item_id: int, session: AsyncSession = Depends(get_session)):
result = await session.execute(select(Item).where(Item.id == item_id))
item = result.scalar_one_or_none()
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.get("/items/")
async def list_items(session: AsyncSession = Depends(get_session)):
result = await session.execute(select(Item))
items = result.scalars().all()
return items
Fastapi python postgresql (работа с postgresql в fastapi)
Пояснения:
Мы используем create_async_engine с URL, начинающимся с postgresql+asyncpg://. Сессия создается через async_sessionmaker. Функция get_session используется как зависимость FastAPI, что гарантирует закрытие сессии после завершения запроса.
Возможные проблемы и их решение:
- SSL/TLS ошибки: Если PostgreSQL требует SSL, добавьте параметры в URL:
?ssl=trueили настройте сертификаты. - Ошибка пула соединений: При большом количестве одновременных запросов может возникнуть нехватка соединений. Увеличьте размер пула:
create_async_engine(..., pool_size=20, max_overflow=10). - Миграции баз данных: Используйте Alembic с асинхронным движком. В
alembic.iniукажитеsqlalchemy.url = postgresql+asyncpg://user:pass@localhost/dbname, а вenv.pyимпортируйте асинхронный engine.
Цель: Данный подход подходит для продакшен-систем с высокой нагрузкой, где важна производительность и асинхронность.
Вариант 1: Как использовать синхронный SQLAlchemy с psycopg2 в FastAPI?
Если асинхронность не критична, можно применить классический синхронный SQLAlchemy с драйвером psycopg2. Однако следует помнить, что синхронные вызовы блокируют event loop FastAPI, поэтому такой подход рекомендуется только для фоновых задач или при использовании пула потоков.
# database_sync.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
DATABASE_URL = "postgresql+psycopg2://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db() -> Session:
db = SessionLocal()
try:
yield db
finally:
db.close()
таблица sql python (создание таблиц в sql через python)
В FastAPI используйте Depends(get_db) для получения сессии. Для асинхронных эндпоинтов можно обернуть синхронные вызовы в run_in_executor, но это усложняет код.
Проблема:
При высоком трафике синхронные вызовы могут значительно снизить пропускную способность приложения, так как каждый запрос занимает поток из пула.Цель: Быстрое прототипирование или приложения с низкой нагрузкой.
Вариант 2: Как упростить работу с PostgreSQL используя библиотеку databases?
Библиотека databases предоставляет легковесный асинхронный интерфейс для работы с SQL-запросами без полного ORM.
# database_databases.py
from databases import Database
from fastapi import FastAPI
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
database = Database(DATABASE_URL)
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.post("/items/")
async def create_item(name: str, description: str):
query = "INSERT INTO items(name, description) VALUES (:name, :description) RETURNING id"
values = {"name": name, "description": description}
result = await database.execute(query, values)
return {"id": result}
@app.get("/items/{item_id}")
async def read_item(item_id: int):
query = "SELECT * FROM items WHERE id = :id"
row = await database.fetch_one(query, {"id": item_id})
if row is None:
raise HTTPException(status_code=404)
return dict(row)
подключение к бд python (подключение к базе данных в python)
Проблема:
Нет автоматической генерации схемы, отсутствует ORM для сложных отношений, требуется писать SQL вручную. Для миграций нужна отдельная библиотека.Цель: Минималистичные проекты, где не нужна сложная бизнес-логика.
Вариант 3: Как напрямую использовать asyncpg в FastAPI для полного контроля над запросами?
Библиотека asyncpg предоставляет низкоуровневый доступ к PostgreSQL. Она быстрая и гибкая, но требует ручного управления соединениями и обработки результатов.
# database_asyncpg.py
import asyncpg
from fastapi import FastAPI
DATABASE_URL = "postgresql://user:password@localhost/dbname"
app = FastAPI()
async def get_pool():
return await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)
@app.on_event("startup")
async def startup():
app.state.db_pool = await get_pool()
@app.on_event("shutdown")
async def shutdown():
await app.state.db_pool.close()
@app.post("/items/")
async def create_item(name: str, description: str):
async with app.state.db_pool.acquire() as conn:
result = await conn.fetchrow(
"INSERT INTO items(name, description) VALUES ($1, $2) RETURNING id",
name, description
)
return {"id": result["id"]}
Проблема:
Нет валидации данных, сложно поддерживать миграции, код становится громоздким при большом количестве запросов.Цель: Высокая производительность с полным контролем, например, для написания специфических запросов.
Вариант 4: Как объединить Pydantic и SQLAlchemy с помощью SQLModel?
SQLModel - это библиотека, которая объединяет Pydantic и SQLAlchemy, позволяя определять модели данных, которые автоматически становятся и Pydantic-схемами, и SQLAlchemy-моделями. Она поддерживает асинхронность.
# models_sqlmodel.py
from sqlmodel import SQLModel, Field, Session, create_engine, select
from fastapi import FastAPI, Depends
class Item(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
name: str
description: str
DATABASE_URL = "postgresql://user:password@localhost/dbname?async_fallback=True"
engine = create_engine(DATABASE_URL)
SQLModel.metadata.create_all(engine) # Not recommended for prod, use Alembic
app = FastAPI()
def get_session():
with Session(engine) as session:
yield session
@app.post("/items/")
def create_item(item: Item, session: Session = Depends(get_session)):
session.add(item)
session.commit()
session.refresh(item)
return item
@app.get("/items/{item_id}")
def read_item(item_id: int, session: Session = Depends(get_session)):
item = session.get(Item, item_id)
if not item:
raise HTTPException(404)
return item
Проблема:
На момент написания SQLModel не полностью поддерживает асинхронный движок (требуетсяasync_fallback), а также менее гибок для сложных запросов. Миграции нужно делать через Alembic, но интеграция может быть неочевидной.Цель: Быстрая разработка, когда хочется минимизировать количество кода и использовать одну модель для валидации и ORM.
Общие типичные ошибки при работе с PostgreSQL в FastAPI:
- Незакрытые сессии: При использовании SQLAlchemy всегда используйте контекстный менеджер или зависимость FastAPI для закрытия сессии. Иначе соединения будут исчерпаны.
- Конфликт версий драйверов: Убедитесь, что версии asyncpg и psycopg2 совместимы с вашей версией PostgreSQL. Например, asyncpg требует PostgreSQL 9.2+.
- Миграции без Alembic: Ручное создание таблиц через
Base.metadata.create_allне подходит для продакшена, так как не отслеживает изменения. Используйте Alembic для управления миграциями. - Неверное использование переменных окружения: Не храните строку подключения в коде. Используйте
os.getenvили файлы.env.
Дополнительные примеры и расширенное использование
Пример 1: Сложный запрос с JOIN и пагинацией (асинхронный SQLAlchemy)
Предположим, есть таблицы User и Post. Нужно получить посты пользователя с постраничным выводом.
# models.py
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"))
title = Column(String)
content = Column(String)
user = relationship("User", back_populates="posts")
User.posts = relationship("Post", back_populates="user")
# endpoint
@app.get("/users/{user_id}/posts/")
async def get_user_posts(user_id: int, page: int = 1, per_page: int = 10, session: AsyncSession = Depends(get_session)):
offset = (page - 1) * per_page
stmt = (
select(Post)
.where(Post.user_id == user_id)
.offset(offset)
.limit(per_page)
)
result = await session.execute(stmt)
posts = result.scalars().all()
return {"page": page, "per_page": per_page, "posts": posts}
Результат для страницы 1:
{
"page": 1,
"per_page": 10,
"posts": [
{"id": 1, "user_id": 1, "title": "Post 1", "content": "Content 1"},
...
]
}
Такой запрос эффективно использует LIMIT и OFFSET для пагинации.
Пример 2: Миграции с Alembic для асинхронного движка
Установка: pip install alembic. Инициализация: alembic init alembic. Редактируем env.py для асинхронной работы.
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from database import Base # импорт вашего Base
import asyncio
config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url"),
poolclass=pool.NullPool
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online():
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
В alembic.ini укажите строку подключения: sqlalchemy.url = postgresql+asyncpg://user:pass@localhost/dbname.
Создайте миграцию: alembic revision --autogenerate -m "initial". Примените: alembic upgrade head.
Результат:
INFO [alembic.runtime.migration] Context impl PostgresqlImpl. INFO [alembic.runtime.migration] Will assume transactional DDL. INFO [alembic.runtime.migration] Running upgrade -> 1234abcd, initial
Пример 3: Транзакции с асинхронным SQLAlchemy
Использование контекстного менеджера сессии гарантирует атомарность операций.
@app.post("/transfer/")
async def transfer_funds(from_id: int, to_id: int, amount: float, session: AsyncSession = Depends(get_session)):
async with session.begin():
from_account = await session.get(Account, from_id)
to_account = await session.get(Account, to_id)
if not from_account or not to_account:
raise HTTPException(404, "Account not found")
if from_account.balance < amount:
raise HTTPException(400, "Insufficient funds")
from_account.balance -= amount
to_account.balance += amount
return {"status": "ok"}
Результат при успешном переводе:
{"status": "ok"}
Пример 4: Тестирование с использованием mock и in-memory базы
Для unit-тестирования можно заменить реальную базу на SQLite в памяти.
# test_main.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from database import Base, get_session
from main import app
from httpx import AsyncClient
@pytest.fixture
async def test_db():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def override_get_session():
async with async_session() as session:
yield session
app.dependency_overrides[get_session] = override_get_session
yield
@pytest.mark.asyncio
async def test_create_item(test_db):
async with AsyncClient(app=app, base_url="http://test") as ac:
response = await ac.post("/items/", params={"name": "test", "description": "desc"})
assert response.status_code == 200
assert response.json()["name"] == "test"
Результат выполнения теста:
1 passed in 0.45s
Пример 5: Использование asyncpg с пулом напрямую для массовой вставки
Массовая вставка с помощью executemany значительно ускоряет вставку большого количества строк.
async def bulk_insert_items(items: list[dict]):
async with app.state.db_pool.acquire() as conn:
await conn.executemany(
"INSERT INTO items(name, description) VALUES ($1, $2)",
[(item["name"], item["description"]) for item in items]
)
Результат:
Вставлено 1000 записей за 0.2 секунды.
Пример 6: Отношения многие-ко-многим с SQLAlchemy
Таблицы Student и Course с ассоциативной таблицей.
# models.py
association_table = Table("student_courses", Base.metadata,
Column("student_id", Integer, ForeignKey("students.id")),
Column("course_id", Integer, ForeignKey("courses.id"))
)
class Student(Base):
__tablename__ = "students"
id = Column(Integer, primary_key=True)
name = Column(String)
courses = relationship("Course", secondary=association_table, back_populates="students")
class Course(Base):
__tablename__ = "courses"
id = Column(Integer, primary_key=True)
title = Column(String)
students = relationship("Student", secondary=association_table, back_populates="courses")
Запрос студентов с курсами:
from sqlalchemy.orm import selectinload
@app.get("/students/{student_id}/")
async def get_student_with_courses(student_id: int, session: AsyncSession = Depends(get_session)):
stmt = select(Student).options(selectinload(Student.courses)).where(Student.id == student_id)
result = await session.execute(stmt)
student = result.scalar_one_or_none()
if not student:
raise HTTPException(404)
return {"id": student.id, "name": student.name, "courses": [{"id": c.id, "title": c.title} for c in student.courses]}
Результат:
{
"id": 1,
"name": "Alice",
"courses": [
{"id": 1, "title": "Mathematics"},
{"id": 2, "title": "Physics"}
]
}