Подключение PostgreSQL к FastAPI: методы и примеры кода

Раздел: Базы данных -> Работа с базами данных

Обзор вариантов работы с PostgreSQL в FastAPI

Как настроить асинхронное подключение к PostgreSQL в FastAPI с помощью SQLAlchemy и asyncpg?

Для высокопроизводительных приложений рекомендуется использовать асинхронный драйвер asyncpg вместе с SQLAlchemy. Это позволяет не блокировать event loop при запросах к базе данных. Рассмотрим полную настройку.

Установка зависимостей:

pip install fastapi uvicorn sqlalchemy asyncpg psycopg2-binary alembic

создание базы данных на python (создание базы данных с помощью python)

Создадим файл database.py для настройки подключения:


# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

# Функция для получения сессии
async def get_session() -> AsyncSession:
    async with async_session() as session:
        yield session

база данных на python (база данных на python)

Модель данных (models.py):


# models.py
from sqlalchemy import Column, Integer, String
from database import Base

class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String)

Python интерфейс базы данных (интерфейс базы данных в python)

CRUD операции в FastAPI:


# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_session
from models import Item

app = FastAPI()

@app.post("/items/")
async def create_item(name: str, description: str, session: AsyncSession = Depends(get_session)):
    item = Item(name=name, description=description)
    session.add(item)
    await session.commit()
    await session.refresh(item)
    return {"id": item.id, "name": item.name, "description": item.description}

@app.get("/items/{item_id}")
async def read_item(item_id: int, session: AsyncSession = Depends(get_session)):
    result = await session.execute(select(Item).where(Item.id == item_id))
    item = result.scalar_one_or_none()
    if item is None:
        raise HTTPException(status_code=404, detail="Item not found")
    return item

@app.get("/items/")
async def list_items(session: AsyncSession = Depends(get_session)):
    result = await session.execute(select(Item))
    items = result.scalars().all()
    return items

Fastapi python postgresql (работа с postgresql в fastapi)

Пояснения:

Мы используем create_async_engine с URL, начинающимся с postgresql+asyncpg://. Сессия создается через async_sessionmaker. Функция get_session используется как зависимость FastAPI, что гарантирует закрытие сессии после завершения запроса.

Возможные проблемы и их решение:

  • SSL/TLS ошибки: Если PostgreSQL требует SSL, добавьте параметры в URL: ?ssl=true или настройте сертификаты.
  • Ошибка пула соединений: При большом количестве одновременных запросов может возникнуть нехватка соединений. Увеличьте размер пула: create_async_engine(..., pool_size=20, max_overflow=10).
  • Миграции баз данных: Используйте Alembic с асинхронным движком. В alembic.ini укажите sqlalchemy.url = postgresql+asyncpg://user:pass@localhost/dbname, а в env.py импортируйте асинхронный engine.

Цель: Данный подход подходит для продакшен-систем с высокой нагрузкой, где важна производительность и асинхронность.

Вариант 1: Как использовать синхронный SQLAlchemy с psycopg2 в FastAPI?

Если асинхронность не критична, можно применить классический синхронный SQLAlchemy с драйвером psycopg2. Однако следует помнить, что синхронные вызовы блокируют event loop FastAPI, поэтому такой подход рекомендуется только для фоновых задач или при использовании пула потоков.


# database_sync.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

DATABASE_URL = "postgresql+psycopg2://user:password@localhost/dbname"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db() -> Session:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

таблица sql python (создание таблиц в sql через python)

В FastAPI используйте Depends(get_db) для получения сессии. Для асинхронных эндпоинтов можно обернуть синхронные вызовы в run_in_executor, но это усложняет код.

Проблема:

При высоком трафике синхронные вызовы могут значительно снизить пропускную способность приложения, так как каждый запрос занимает поток из пула.

Цель: Быстрое прототипирование или приложения с низкой нагрузкой.

Вариант 2: Как упростить работу с PostgreSQL используя библиотеку databases?

Библиотека databases предоставляет легковесный асинхронный интерфейс для работы с SQL-запросами без полного ORM.


# database_databases.py
from databases import Database
from fastapi import FastAPI

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
database = Database(DATABASE_URL)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

@app.post("/items/")
async def create_item(name: str, description: str):
    query = "INSERT INTO items(name, description) VALUES (:name, :description) RETURNING id"
    values = {"name": name, "description": description}
    result = await database.execute(query, values)
    return {"id": result}

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    query = "SELECT * FROM items WHERE id = :id"
    row = await database.fetch_one(query, {"id": item_id})
    if row is None:
        raise HTTPException(status_code=404)
    return dict(row)

подключение к бд python (подключение к базе данных в python)

Проблема:

Нет автоматической генерации схемы, отсутствует ORM для сложных отношений, требуется писать SQL вручную. Для миграций нужна отдельная библиотека.

Цель: Минималистичные проекты, где не нужна сложная бизнес-логика.

Вариант 3: Как напрямую использовать asyncpg в FastAPI для полного контроля над запросами?

Библиотека asyncpg предоставляет низкоуровневый доступ к PostgreSQL. Она быстрая и гибкая, но требует ручного управления соединениями и обработки результатов.


# database_asyncpg.py
import asyncpg
from fastapi import FastAPI

DATABASE_URL = "postgresql://user:password@localhost/dbname"

app = FastAPI()

async def get_pool():
    return await asyncpg.create_pool(DATABASE_URL, min_size=5, max_size=20)

@app.on_event("startup")
async def startup():
    app.state.db_pool = await get_pool()

@app.on_event("shutdown")
async def shutdown():
    await app.state.db_pool.close()

@app.post("/items/")
async def create_item(name: str, description: str):
    async with app.state.db_pool.acquire() as conn:
        result = await conn.fetchrow(
            "INSERT INTO items(name, description) VALUES ($1, $2) RETURNING id",
            name, description
        )
        return {"id": result["id"]}

Проблема:

Нет валидации данных, сложно поддерживать миграции, код становится громоздким при большом количестве запросов.

Цель: Высокая производительность с полным контролем, например, для написания специфических запросов.

Вариант 4: Как объединить Pydantic и SQLAlchemy с помощью SQLModel?

SQLModel - это библиотека, которая объединяет Pydantic и SQLAlchemy, позволяя определять модели данных, которые автоматически становятся и Pydantic-схемами, и SQLAlchemy-моделями. Она поддерживает асинхронность.


# models_sqlmodel.py
from sqlmodel import SQLModel, Field, Session, create_engine, select
from fastapi import FastAPI, Depends

class Item(SQLModel, table=True):
    id: int = Field(default=None, primary_key=True)
    name: str
    description: str

DATABASE_URL = "postgresql://user:password@localhost/dbname?async_fallback=True"
engine = create_engine(DATABASE_URL)
SQLModel.metadata.create_all(engine)  # Not recommended for prod, use Alembic

app = FastAPI()

def get_session():
    with Session(engine) as session:
        yield session

@app.post("/items/")
def create_item(item: Item, session: Session = Depends(get_session)):
    session.add(item)
    session.commit()
    session.refresh(item)
    return item

@app.get("/items/{item_id}")
def read_item(item_id: int, session: Session = Depends(get_session)):
    item = session.get(Item, item_id)
    if not item:
        raise HTTPException(404)
    return item

Проблема:

На момент написания SQLModel не полностью поддерживает асинхронный движок (требуется async_fallback), а также менее гибок для сложных запросов. Миграции нужно делать через Alembic, но интеграция может быть неочевидной.

Цель: Быстрая разработка, когда хочется минимизировать количество кода и использовать одну модель для валидации и ORM.

Общие типичные ошибки при работе с PostgreSQL в FastAPI:

  • Незакрытые сессии: При использовании SQLAlchemy всегда используйте контекстный менеджер или зависимость FastAPI для закрытия сессии. Иначе соединения будут исчерпаны.
  • Конфликт версий драйверов: Убедитесь, что версии asyncpg и psycopg2 совместимы с вашей версией PostgreSQL. Например, asyncpg требует PostgreSQL 9.2+.
  • Миграции без Alembic: Ручное создание таблиц через Base.metadata.create_all не подходит для продакшена, так как не отслеживает изменения. Используйте Alembic для управления миграциями.
  • Неверное использование переменных окружения: Не храните строку подключения в коде. Используйте os.getenv или файлы .env.

Дополнительные примеры и расширенное использование

Пример 1: Сложный запрос с JOIN и пагинацией (асинхронный SQLAlchemy)

Предположим, есть таблицы User и Post. Нужно получить посты пользователя с постраничным выводом.

Пример

# models.py
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)

class Post(Base):
    __tablename__ = "posts"
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("users.id"))
    title = Column(String)
    content = Column(String)
    user = relationship("User", back_populates="posts")

User.posts = relationship("Post", back_populates="user")

# endpoint
@app.get("/users/{user_id}/posts/")
async def get_user_posts(user_id: int, page: int = 1, per_page: int = 10, session: AsyncSession = Depends(get_session)):
    offset = (page - 1) * per_page
    stmt = (
        select(Post)
        .where(Post.user_id == user_id)
        .offset(offset)
        .limit(per_page)
    )
    result = await session.execute(stmt)
    posts = result.scalars().all()
    return {"page": page, "per_page": per_page, "posts": posts}

Результат для страницы 1:

{
  "page": 1,
  "per_page": 10,
  "posts": [
    {"id": 1, "user_id": 1, "title": "Post 1", "content": "Content 1"},
    ...
  ]
}

Такой запрос эффективно использует LIMIT и OFFSET для пагинации.

Пример 2: Миграции с Alembic для асинхронного движка

Установка: pip install alembic. Инициализация: alembic init alembic. Редактируем env.py для асинхронной работы.

Пример

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from database import Base  # импорт вашего Base
import asyncio

config = context.config
fileConfig(config.config_file_name)

target_metadata = Base.metadata

def run_migrations_offline():
    url = config.get_main_option("sqlalchemy.url")
    context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
    with context.begin_transaction():
        context.run_migrations()

def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()

async def run_async_migrations():
    connectable = create_async_engine(
        config.get_main_option("sqlalchemy.url"),
        poolclass=pool.NullPool
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()

def run_migrations_online():
    asyncio.run(run_async_migrations())

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

В alembic.ini укажите строку подключения: sqlalchemy.url = postgresql+asyncpg://user:pass@localhost/dbname.

Создайте миграцию: alembic revision --autogenerate -m "initial". Примените: alembic upgrade head.

Результат:

INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.runtime.migration] Will assume transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 1234abcd, initial

Пример 3: Транзакции с асинхронным SQLAlchemy

Использование контекстного менеджера сессии гарантирует атомарность операций.

Пример

@app.post("/transfer/")
async def transfer_funds(from_id: int, to_id: int, amount: float, session: AsyncSession = Depends(get_session)):
    async with session.begin():
        from_account = await session.get(Account, from_id)
        to_account = await session.get(Account, to_id)
        if not from_account or not to_account:
            raise HTTPException(404, "Account not found")
        if from_account.balance < amount:
            raise HTTPException(400, "Insufficient funds")
        from_account.balance -= amount
        to_account.balance += amount
    return {"status": "ok"}

Результат при успешном переводе:

{"status": "ok"}

Пример 4: Тестирование с использованием mock и in-memory базы

Для unit-тестирования можно заменить реальную базу на SQLite в памяти.

Пример

# test_main.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from database import Base, get_session
from main import app
from httpx import AsyncClient

@pytest.fixture
async def test_db():
    engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async def override_get_session():
        async with async_session() as session:
            yield session

    app.dependency_overrides[get_session] = override_get_session
    yield

@pytest.mark.asyncio
async def test_create_item(test_db):
    async with AsyncClient(app=app, base_url="http://test") as ac:
        response = await ac.post("/items/", params={"name": "test", "description": "desc"})
    assert response.status_code == 200
    assert response.json()["name"] == "test"

Результат выполнения теста:

1 passed in 0.45s

Пример 5: Использование asyncpg с пулом напрямую для массовой вставки

Массовая вставка с помощью executemany значительно ускоряет вставку большого количества строк.

Пример

async def bulk_insert_items(items: list[dict]):
    async with app.state.db_pool.acquire() as conn:
        await conn.executemany(
            "INSERT INTO items(name, description) VALUES ($1, $2)",
            [(item["name"], item["description"]) for item in items]
        )

Результат:

Вставлено 1000 записей за 0.2 секунды.

Пример 6: Отношения многие-ко-многим с SQLAlchemy

Таблицы Student и Course с ассоциативной таблицей.

Пример

# models.py
association_table = Table("student_courses", Base.metadata,
    Column("student_id", Integer, ForeignKey("students.id")),
    Column("course_id", Integer, ForeignKey("courses.id"))
)

class Student(Base):
    __tablename__ = "students"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    courses = relationship("Course", secondary=association_table, back_populates="students")

class Course(Base):
    __tablename__ = "courses"
    id = Column(Integer, primary_key=True)
    title = Column(String)
    students = relationship("Student", secondary=association_table, back_populates="courses")

Запрос студентов с курсами:

Пример

from sqlalchemy.orm import selectinload

@app.get("/students/{student_id}/")
async def get_student_with_courses(student_id: int, session: AsyncSession = Depends(get_session)):
    stmt = select(Student).options(selectinload(Student.courses)).where(Student.id == student_id)
    result = await session.execute(stmt)
    student = result.scalar_one_or_none()
    if not student:
        raise HTTPException(404)
    return {"id": student.id, "name": student.name, "courses": [{"id": c.id, "title": c.title} for c in student.courses]}

Результат:

{
  "id": 1,
  "name": "Alice",
  "courses": [
    {"id": 1, "title": "Mathematics"},
    {"id": 2, "title": "Physics"}
  ]
}

Работа с PostgreSQL в FastAPI - comments

En
Fastapi python postgresql (python)