Какую базу данных использовать в Python? Сравнение решений
Обзор вариантов баз данных для Python
Какая база данных обеспечивает баланс производительности, надежности и гибкости для большинства Python-проектов?
Наиболее универсальным выбором является PostgreSQL. Эта реляционная СУБД поддерживает сложные запросы, транзакции, JSON-поля, полнотекстовый поиск и расширяемость. Для Python доступны популярные библиотеки: psycopg2 (синхронный драйвер), asyncpg (асинхронный) и SQLAlchemy (ORM).
# Установка драйвера
pip install psycopg2
# Пример подключения и запроса
import psycopg2
conn = psycopg2.connect(
dbname="mydb",
user="user",
password="secret",
host="localhost",
port=5432
)
cur = conn.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS users (id SERIAL PRIMARY KEY, name TEXT);")
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
conn.commit()
cur.execute("SELECT * FROM users;")
print(cur.fetchall())
cur.close()
conn.close()
Python какую выбрать база данных (какую базу данных выбрать для python)
[(1, 'Alice')]
программирование на python редактор (редактор для программирования на python)
Типичные проблемы: сложность первоначальной настройки, большой расход памяти при неоптимальных запросах, необходимость настройки репликации для высоких нагрузок. Решение: использовать пул соединений (например, psycopg2.pool или asyncpg.pool), индексировать часто запрашиваемые колонки.
Какую базу данных использовать для локального прототипа или встроенного приложения?
SQLite - встроенная СУБД без отдельного сервера, идеальна для небольших проектов, тестирования и мобильных приложений. Библиотека sqlite3 входит в стандартную поставку Python.
import sqlite3
conn = sqlite3.connect("example.db")
cur = conn.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY, title TEXT)")
cur.execute("INSERT INTO tasks (title) VALUES (?)", ("Сделать отчет",))
conn.commit()
for row in cur.execute("SELECT * FROM tasks"):
print(row)
conn.close()
(1, 'Сделать отчет')
Проблемы: отсутствие поддержки конкурентных записей (одна пишущая транзакция за раз), ограниченный набор типов данных. Для одно- или двухпользовательских сценариев это не критично.
Когда необходима работа с неструктурированными документами?
MongoDB - документоориентированная NoSQL СУБД, хранит данные в формате BSON. Подходит для гибких схем, логов, каталогов товаров. Драйвер для Python - pymongo.
pip install pymongo
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["shop"]
collection = db["products"]
product = {"name": "Ноутбук", "price": 1500, "tags": ["электроника", "компьютеры"]}
collection.insert_one(product)
for doc in collection.find({"price": {"$gt": 1000}}):
print(doc["name"])
Ноутбук
Частые ошибки: забывают про индексы (запросы становятся медленными), неправильно выбирают шардирование. Рекомендуется создавать индексы на поля, по которым выполняется поиск.
Как обеспечить быстрый доступ к кэшу или временным данным?
Redis - in-memory хранилище структур данных, используется для кэширования, сессий, очередей. Библиотека redis-py предоставляет полный функционал.
pip install redis
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('username', 'ivan')
print(r.get('username'))
r.lpush('queue', 'task1')
print(r.lpop('queue'))
b'ivan' b'task1'
Проблемы: данные хранятся в оперативной памяти, при перезапуске теряются, если не настроена персистентность. Для кэша это допустимо, но для постоянного хранения - нет.
Как работать с графовыми связями между объектами?
Neo4j - графовая база данных, оптимальная для социальных сетей, рекомендаций, анализа маршрутов. Драйвер neo4j для Python поддерживает синтаксис Cypher.
pip install neo4j
from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
session.run("CREATE (a:Person {name: $name})", name="Анна")
result = session.run("MATCH (a:Person) RETURN a.name")
for record in result:
print(record["a.name"])
driver.close()
Анна
Типичные ошибки: сложность моделирования графа, неправильное использование индексов (для Cypher индексы создаются только на свойства узлов). Рекомендуется предварительно проектировать схему.
Какая база данных подходит для полнотекстового поиска?
Elasticsearch (связка с Logstash и Kibana - стек ELK) - поисковый движок на основе Lucene. Python-клиент elasticsearch позволяет выполнять сложные запросы.
pip install elasticsearch
from elasticsearch import Elasticsearch
es = Elasticsearch(["http://localhost:9200"])
doc = {"title": "Python для начинающих", "content": "Изучаем Python с нуля"}
es.index(index="books", id=1, body=doc)
result = es.search(index="books", body={"query": {"match": {"content": "Python"}}})
print(result["hits"]["hits"][0]["_source"]["title"])
Python для начинающих
Проблемы: большой расход памяти, сложность администрирования, отсутствие транзакций. Для задач поиска и аналитики это лучший вариант.
Расширенные примеры работы с базами данных в Python
PostgreSQL с асинхронным драйвером asyncpg (транзакции и пул соединений)
pip install asyncpg
import asyncio
import asyncpg
async def main():
pool = await asyncpg.create_pool(
user='user', password='secret',
database='mydb', host='localhost',
min_size=2, max_size=10
)
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute('''
CREATE TABLE IF NOT EXISTS logs (
id SERIAL PRIMARY KEY,
message TEXT,
ts TIMESTAMP DEFAULT NOW()
)
''')
await conn.execute(
"INSERT INTO logs (message) VALUES ($1)",
"Старт приложения"
)
# после завершения транзакции можно выполнить ещё запрос
rows = await conn.fetch("SELECT * FROM logs")
for row in rows:
print(row['message'], row['ts'])
await pool.close()
asyncio.run(main())
Старт приложения 2025-04-01 12:00:00.123456+00
Redis как брокер очередей (списки и pub/sub)
import redis
import threading
import time
r = redis.Redis()
# Производитель
def producer():
for i in range(3):
r.lpush('task_queue', f'task_{i}')
time.sleep(0.5)
r.publish('notifications', 'Все задачи отправлены')
# Потребитель
def consumer():
while True:
task = r.brpop('task_queue', timeout=1)
if task:
print(f"Обработана {task[1].decode()}")
else:
break
# Подписчик на канал
def subscriber():
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for msg in pubsub.listen():
if msg['type'] == 'message':
print(f"Получено уведомление: {msg['data'].decode()}")
pubsub.unsubscribe()
break
threads = [
threading.Thread(target=producer),
threading.Thread(target=consumer),
threading.Thread(target=subscriber)
]
for t in threads:
t.start()
for t in threads:
t.join()
Обработана task_0 Обработана task_1 Обработана task_2 Получено уведомление: Все задачи отправлены
MongoDB: агрегация и индексы
from pymongo import MongoClient, ASCENDING, DESCENDING
client = MongoClient()
db = client['analytics']
orders = db['orders']
# Создание составного индекса
orders.create_index([("user_id", ASCENDING), ("total", DESCENDING)])
# Вставка тестовых данных
orders.insert_many([
{"user_id": 1, "total": 200, "items": ["book"], "date": "2025-01-01"},
{"user_id": 1, "total": 150, "items": ["pen"], "date": "2025-01-02"},
{"user_id": 2, "total": 500, "items": ["laptop"], "date": "2025-01-01"}
])
# Агрегационный конвейер
pipeline = [
{"$group": {"_id": "$user_id", "total_spent": {"$sum": "$total"}}},
{"$sort": {"total_spent": -1}},
{"$limit": 2}
]
result = orders.aggregate(pipeline)
for doc in result:
print(f"User {doc['_id']}: total {doc['total_spent']}")
User 2: total 500 User 1: total 350
Neo4j: сложный графовый запрос с путями
from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
def create_graph(tx):
tx.run("CREATE (alice:Person {name: 'Alice'})")
tx.run("CREATE (bob:Person {name: 'Bob'})")
tx.run("MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) CREATE (a)-[:FRIEND]->(b)")
def find_paths(tx):
result = tx.run(
"MATCH path = (a:Person {name: 'Alice'})-[:FRIEND*1..2]-(b:Person) RETURN path"
)
for record in result:
print(record["path"])
with driver.session() as session:
session.execute_write(create_graph)
session.execute_read(find_paths)
driver.close()
<Neo4j path: (Alice)-[:FRIEND]->(Bob)>
Elasticsearch: многопольный поиск с фильтрацией
from elasticsearch import Elasticsearch
es = Elasticsearch(["http://localhost:9200"])
# Маппинг для русского анализатора (через icu_analyzer)
index_body = {
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "standard"
}
}
}
},
"mappings": {
"properties": {
"title": {"type": "text"},
"description": {"type": "text"},
"price": {"type": "integer"}
}
}
}
es.indices.create(index="products", ignore=400, body=index_body)
es.index(index="products", id=1, body={
"title": "Смартфон",
"description": "Мощный смартфон с хорошей камерой",
"price": 30000
})
es.index(index="products", id=2, body={
"title": "Ноутбук",
"description": "Легкий ноутбук для работы",
"price": 80000
})
es.indices.refresh(index="products")
# Поиск с фильтром по цене
query = {
"query": {
"bool": {
"must": [
{"multi_match": {
"query": "смартфон",
"fields": ["title", "description"]
}}
],
"filter": [
{"range": {"price": {"gte": 20000}}}
]
}
}
}
result = es.search(index="products", body=query)
for hit in result["hits"]["hits"]:
print(f"Найдено: {hit['_source']['title']}, цена {hit['_source']['price']}")
Найдено: Смартфон, цена 30000