Markdown Converter
Agent skill for markdown-converter
---
Sign in to like and favorite skills
Файл генерируется автоматически из файлов в specs/rules/*
Шаблон составлен с учетом опыта разработки сервисов в GEN AI.
Разработчики прошли через боль и слезы плохие решения, ошибки, оверинжиниринг и неудобный дизайн.
Чтоб не делать одни и те же ошибки, мы собрали оптимальные решения в этом проекте.
✨ Преимущества шаблона:
В директории
specs/rules/ находятся спецификации с подробными правилами и рекомендациями по разработке:
✨ Создание нового функционала:
🏛️ Работа с архитектурой:
⚙️ Инфраструктура и конфигурация:
Мы используем пакетный менеджер UV, как установить.
# Установка всех зависимостей uv sync --all-groups
Зависимости организованы по группам для более гибкой установки:
- зависимости для разработкиdev
uv sync --locked --group dev # или вместе с основными uv sync --locked --all-extras
- обработка аудио (голос)voice
uv sync --locked --group voice
- Telegram бот + uvloop + Flasktelegram
uv sync --locked --group telegram
- FastAPI + uvicornrestapi
uv sync --locked --group restapi
- Alembic + PostgreSQL + SQLAlchemydatabase
uv sync --locked --group database
Либо через pip
pip install -r requirements.txt -r requirements.dev.txt
Нужно создать файл с переменными окружения
.env в директории,
пример файла с обязательными переменнами в env.example.
Ознакомьтесь с переменными окружения в settings.py.
Linux
cp .env.example .env
Windows
Copy-Item .env.example .env
Crossplatform
python3 -c "import shutil; shutil.copy('.env.example', '.env2')"
Запускайте приложение с переменной окружения
ENV=LOCAL, оно выключает проверку аутентификации и другие вещи, которые
не нужны в локальной раработке.
⚡ Fastapi
uv run uvicorn project.infrastructure.apps.api:app
🤖 Telegram Bot
uv run python -m project.infrastructure.apps.bot
Для продакшена написать запуск в модуле main.py и запустить таким образом:
python -m project.infrastructure.apps.main
TODO: как запустить через докер
Для запуска тестов используйте:
pytest --cov=project tests/
Уровень логирование в тестах настраивается в pytest.ini
Следуйте соглашению именования комитов.
Форматирование кода и линтеры выполняется автоматически при коммите через pre-commit хуки. Чтобы активировать pre-commit, выполните:
pre-commit install
Настройки линтеров находятся в pyproject.toml. Настройки линтеров для тестов находятся в ruff-tests.toml.
При коммите автоматически выполняются:
Модули объединяются по компонентам, так рекомендуются в чистой архитектуре, в DDD, это наиболее оптимально. Благодаря этому взглянув на структуру проекта, можно сразу понять, про что он. И не надо прыгать по директориям, искать модули относящиеся к одному компоненту.
Границы модулей:
project/infrastructure/apps/bot.py - Telegram botproject/infrastructure/apps/flask.py - Запускатеся на проде для параллельного запуска с Telegram bot для
создания эндпоинтов health-check и prometheusproject/infrastructure/apps/api.py - FastAPI appproject/infrastructure/apps/main.py - Запуск приложения на проде, там их может быть несколько запущено параллельно,
поэтому отдельный модульproject/infrastructure/adapters/* - интеграции к внешним системам (адаптеры, клиенты)project/infrastructure/utils - универсальный переиспользуемые код, не связанный с бизнес-логикой, относящиеся кproject/components/{component}/cli.py - обработчики CLI интерфейсаproject/components/{component}/endpoints.py - эндпоинты APIproject/components/{component}/handlers.py - обработчики ботаproject/components/{component}/models.py - модели данных ORMproject/components/{component}/repositories.py - Любое обращение к даннымproject/components/{component}/enums.py - Наборы значенийproject/components/{component}/usecases.py - точка входа в бизнес-логику (сценарии использования приложения)project/components/{component}/service.py - детали реализации бизнес-логики (когда бизнес-логика не влезает вproject/components/{component}/exceptions - исключения бизнес-логики относящиеся к компонентуproject/components/{component}/schemas.py - схемы данных и/или валидация pydanticproject/components/{component}/ai/{agent_name}/exceptions.pyproject/components/{component}/ai/{agent_name}/schemas.py - схемы данных агента (pydantic модели)project/components/{component}/ai/{agent_name}/prompts.py - llm промпты, еще второй вариант ниже, где каждый промпт
в отдельнос файле, в общей директории promptsproject/components/{component}/ai/{agent_name}/prompts/*.py - llm промптыproject/components/{component}/ai/{agent_name}/tools/*.py - инструменты ai агентаproject/components/{component}/ai/{agent_name}/agent.py - логика ai агентаproject/libs/* - универсальный переиспользуемые код, не связанный с бизнес-логикой и инфраструктуройproject/exceptions.py - базовые исключенияproject/logger.py - настройки логированияproject/settings.py - Переменные окруженияproject/container.py - Контейнер для внедрения зависимостей
фреймворками. Почему не в project/libs? Потому что там запрещен импорт из project.infrastructure.tests/test_* - Автотестыtests/conftest.py - Фикстурыtests/factories.py - Фабрики данныхalembic/versions/* - Миграции бдscripts/* - Скриптыspecs/rules/* - Документация и спецификации по разработке используя этот шаблон и заложенный в нем стильspecs/features/* - Спецификация по разработке фичи при вайбкодингеДля того, чтобы работать с внешними моделями, можно сгенерировать промпт с содержимом модулей проекта, через команду:
uv run python ./scripts/project_prompt.py # по умолчанию конфиг ./project-prompt.toml uv run python ./scripts/project_prompt.py -c my-custom-config.toml
Управлять тем, какие файлы войдут в промпт, можно через конфиг project-prompt.toml
В директории
specs/rules/ находятся документация по использованию шаблона:
Адаптеры — это компоненты инфраструктурного слоя, которые инкапсулируют взаимодействие с внешними системами и сервисами.
Расположение:
project/infrastructure/adapters/
Основные принципы:
Адаптер для работы с сервисом аутентификации пользователей. Запросы отслеживаются через prometheus.
Основной функционал:
check_telegram_user(user_telegram_id: int) -> bool — проверка существования пользователя Telegramget_users_data() -> dict — получение данных всех пользователейАдаптер для интеграции с Keycloak (система управления идентификацией и доступом).
⭐ Качественное решение, проверенное на практике
Адаптер для работы с голосовыми данными: преобразование речи в текст (STT) и текста в речь (TTS).
Реализация:
VoiceAdapter — использует OpenAI API
Основной функционал:
voice_to_text(voice: bytes | bytearray) -> str — преобразование голоса в текст
ru)text_to_voice(text: str, instructions: str, voice: str = "alloy") -> io.BytesIO — синтез речи из текстаОсобенности:
@action_tracking_decoratordownload_as_bytearray())Пример использования:
# Для Telegram ogg_data = await voice_file.download_as_bytearray() text = await voice_adapter.voice_to_text(ogg_data)
Основной функционал:
llm_chat_client() -> ChatOpenAI — создание клиента LangChain для чатаllm_aclient() -> AsyncClient — создание асинхронного OpenAI клиентаОсобенности:
Адаптеры для работы с базой данных (синхронный и асинхронный варианты).
Основной функционал:
TODO: примеры использования сессий и транзакций
Адаптеры для работы с системой кеширования (синхронный и асинхронный варианты).
Основной функционал:
Видео про эту проблему https://www.youtube.com/watch?v=3Z_3yCgVKkM
Плохо: Создание зависимостей внутри классов или функций.
class AskUseCase: def __init__(self): # Жесткая связка с реализацией, внутри этого класса, могут быть еще много зависимостей. self.chat = ChatService(...) def ask(self, user_id: int, question: str) -> str: # Тестирование потребует патча. repo = DatabaseRepository() # прямое создание зависимости ...
При тестировании придется патчить все вложенные зависимости ChatService и DatabaseRepository:
from unittes import mock def test_ask(): with mock.patch('path.to.ChatService'), \ mock.patch('path.to.ChatService.create_answer'), \ mock.patch('path.to.DatabaseRepository'): use_case = AskUseCase() ...
Такие тесты делают рефакторинг болезненным. При изменении путей, имен объектов, потребуется вносить изменения во все тесты с патчами. Лучше такие зависимости проносить через аргументы методов, ниже будет пример.
Хорошо: Внедрение зависимостей через контейнер зависимостей (DI Container):
from typing import TYPE_CHECKING if TYPE_CHECKING: from ... import Repository, ChatService class AskUseCase: def __init__( self, repo: "Repository", chat: "ChatService", ): self.repo = repo self.chat = chat def ask(self, user_id: int, question: str) -> str: ... class Repository: # Реализация def get(self): pass class ChatService: # Реализация def create_answer(self, user_id, question): return "Answer" class DIContainer: def __init__(self, repo=None, chat_service=None): self.repo = repo or Repository() self._chat = chat_service or ChatService() self.ask_use_case = AskUseCase(self.repo, self._chat) container = DIContainer() assert container.ask_use_case.ask(user_id=1, question="My question") == "Answer" class OtherChatService: # Другая реализация def create_answer(self, user_id, question): return "Other Answer" other_container = DIContainer(chat_service=OtherChatService) assert other_container.ask_use_case.ask(user_id=1, question="My question") == "Other Answer"
Такой класс можно тестировать с разными реализациями зависимостей без патчей:
def test_ask_use_case(): user_id = 1 question_text = "Test question" expected_answer = "Test answer" class TestRepo: def get(self): return user_id class TestChatService: def create_answer(self, user_id, question): return expected_answer use_case = AskUseCase( repository=TestRepo(), chat=TestChatService(), ) result = use_case.ask(user_id, question_text) assert result == expected_answer
Преимущества подхода:
Пример теста эндпоинта API test_endpoints.py.
В проекте есть фабрики объектов ORM моделей, они позволяют подготавливать окружение для тестирования. Находятся в factories.py. Пример использования в test_use_case.py.
В тестах используется одна сессия SqlAlchemy с БД без завершения транзакции, поэтому объекты на самом деле не создаются фабрикой в БД, но благодаря внутреннему хранилищу ORM, программа видит эти объекты.
Где обычно создают данные для теста? В фикстурах. Потом их использует и в других тестах. Чтоб создать немного другие данные, создает еще фикстуру и так проект ими зарастает.
Фабрики позволяют создавать объекты с разными параметрами, что делает их более гибкими. Одна фабрика для создания объектов с разными настройками. Полезно, когда вам нужно создавать объекты с разными состояниями в тестах.
Фабрики позволяют изолировать данные тестов друг от друга, так как каждый тест может создавать свои собственные объекты.
Фабрики упрощают поддержку кода, так как логика создания объектов сосредоточена в одном месте. Если вам нужно изменить способ создания объекта, вы делаете это только в фабрике, а не в каждом тесте.
Фикстуры в
pytest могут быть "магическими" — они автоматически подставляются в тесты,
что может затруднить понимание того, что именно происходит.
Фабрики, явно вызываются в коде, улучшая понимание теста.
Фабрики лучше подходят для сложных сценариев, где нужно создавать объекты с множеством зависимостей или выполнять дополнительные действия при создании. Фикстуры могут стать громоздкими в таких случаях.
(scope="session", autouse=True)setup
(scope="session")init_database
session
Пример использования:
def test_create_user(session): # После теста данные автоматически откатятся ...
asession
sessionПример использования:
async def test_create_user_async(asession): # После теста данные автоматически откатятся ...
(scope="session")init_redis
redis
Пример использования:
def test_cache(redis): # После теста Redis очищается ...
(scope="session")async_init_redis
async_redis
Пример использования:
async def test_cache_async(async_redis): # После теста Redis очищается ...
api_client
Пример использования:
def test_endpoint(api_client): response = api_client.get("/api/users") assert response.status_code == 200 assert response.json() == []
httpx_responses
Пример использования:
def test_external_api(httpx_responses): httpx_responses.add( "GET", "https://api.example.com/data", json={"result": "success"}, status=200 ) # Теперь запросы на этот URL вернут mock-ответ
aiohttp_responses
Пример использования:
async def test_external_api_async(aiohttp_responses): aiohttp_responses.add( "https://api.example.com/data", method="GET", payload={"result": "success"}, status=200 ) # Асинхронные запросы на этот URL вернут mock-ответ
keycloak_client
mock_keycloak
Пример использования:
def test_keycloak_auth(keycloak_client, mock_keycloak): # mock_keycloak автоматически мокирует запросы token = keycloak_client.get_token() assert token == "test_token"
keycloak_aclient
mock_async_keycloak
(scope="session")project_dir
Пример использования:
def test_config_file(project_dir): config_path = project_dir / "config.yaml" assert config_path.exists()
Для создания адаптеров, работающих с внешними HTTP API, используйте классы из base_client.py.
Как замокать http запросы в тестах можно найти в спеке auto-tests.md.
Основные компоненты:
AsyncApi — базовый класс для асинхронных HTTP клиентов
SyncApi — базовый класс для синхронных HTTP клиентов
IClient — Protocol, определяющий интерфейс клиента
Создайте новый файл в
project/infrastructure/adapters/, например my_service.py.
from project.exceptions import ExternalApiError, ServerError, ClientError class MyServiceApiError(ExternalApiError): pass class MyServiceServerError(ServerError): pass class MyServiceClientError(ClientError): pass
Вариант А: Асинхронный адаптер
from project.infrastructure.utils.base_client import AsyncApi, IClient class MyServiceClient(IClient): class Api(AsyncApi): ApiError = MyServiceApiError ServerError = MyServiceServerError ClientError = MyServiceClientError # Опционально: кастомная сессия для мониторинга # ClientSession = MyCustomHttpClient def __init__(self, api_key: str): self.api_root = "https://api.myservice.com" self.api = self.Api( self.api_root, name_for_monitoring="my_service_api", headers={"Authorization": f"Bearer {api_key}"}, request_settings={"timeout": 30}, ) async def get_items(self, resource_id: str) -> dict: """Получить данные ресурса.""" return await self.api.call_endpoint( f"path/to/resource/{resource_id}", method="GET", request_settings={"timeout": 3} # example timeout for resource ) async def create_items(self, data: dict) -> dict: """Создать новый ресурс.""" return await self.api.call_endpoint( "path/to/resource/", method="POST", json=data, )
Вариант Б: Синхронный адаптер
from project.infrastructure.utils.base_client import SyncApi class MyServiceSyncClient: class Api(SyncApi): ApiError = MyServiceApiError ServerError = MyServiceServerError ClientError = MyServiceClientError def __init__(self, api_key: str): self.api_root = "https://api.myservice.com" self.api = self.Api( self.api_root, name_for_monitoring="my_service_api", headers={"Authorization": f"Bearer {api_key}"}, ) def get_items(self, resource_id: str) -> dict: return self.api.call_endpoint( f"resources/{resource_id}", method="GET", )
from functools import cache from project.settings import Settings @cache def my_service_client(): return MyServiceClient( api_key=Settings().MY_SERVICE_API_KEY.get_secret_value() )
from project.infrastructure.adapters.my_service import my_service_client client = my_service_client() data = await client.get_items("123") # С переиспользованием сессии (для множественных запросов) async with client.api.Session(): data1 = await client.get_items("123") data2 = await client.get_items("456") # Сессия будет переиспользована
Если нужна специфическая обработка ответов, переопределите методы в Api классе:
class LimitError(MyServiceClientError): pass class MyServiceClient(IClient): class Api(AsyncApi): async def response_to_native(self, response): # Кастомная десериализация return await super().response_to_native(response) async def error_handling(self, response, response_data): # Кастомная обработка ошибок if response.status == 429: raise LimitError("LimitError: Too many requests.") return await super().error_handling(response, response_data)
Для критичных операций добавьте retry:
from project.libs.retry import retry_on_exception class MyServiceClient(IClient): # ... (определение Api класса) @retry_on_exception( (LimitError,), max_attempts=3, backoff=2, ) async def create_item(self, data: dict): ...
Mapped и mapped_column)int, str) для идентификаторов и ключевых полей сущностейimport datetime as dt from sqlalchemy import func, MetaData from sqlalchemy.orm import ( Mapped, mapped_column, declarative_base, ) public_schema = MetaData() Base = declarative_base(metadata=public_schema) class TimeMixin: created_at: Mapped[dt.datetime] = mapped_column(nullable=False, server_default=func.now()) updated_at: Mapped[dt.datetime] = mapped_column(nullable=False, server_default=func.now(), onupdate=func.now())
Для устранения двусмысленности и повышения читаемости кода, вместо примитивных типов (
int, str) необходимо создавать специальные типы для каждой сущности.
typing.NewType, описание поля задается через typing.Annotated.project/datatypes.py.Пример (
project/datatypes.py):
import typing as t UserIdT = t.NewType("UserIdT", t.Annotated[int, "User ID"]) OrderIdT = t.NewType("OrderIdT", t.Annotated[int, "Order ID"]) ProductNameT = t.NewType("ProductNameT", t.Annotated[str, "Product Name"])
В PostgreSQL следует использовать нативные ENUM типы.
str и enum.Enum.sqlalchemy.Enum(..., name="...").name обязателен для создания типа в БД.class UserRole(str, enum.Enum): ADMIN = "admin" USER = "user" # В модели: role: Mapped[UserRole] = mapped_column(Enum(UserRole, name="user_role_enum"))
Mapped[...]) используйте доменный тип (например, UserIdT), а не int.mapped_column(...)) всегда указывайте BigInteger (аналог BIGSERIAL).# Правильно: id: Mapped[UserIdT] = mapped_column(BigInteger, primary_key=True)
Если ключ составной, указывайте
primary_key=True для каждого поля.
Не создавайте промежуточные таблицы. Используйте нативный тип
ARRAY для хранения списка идентификаторов.
mapped_column(ARRAY(BigInteger)).Mapped[list[DomainIdT]].# Пример: Статья хранит список ID тегов tag_ids: Mapped[list[TagIdT]] = mapped_column(ARRAY(BigInteger), default=list)
index=True внутри mapped_column.Index("name", "col1", "col2") в __table_args__.Генерируй код, строго следуя этому шаблону. Обрати внимание на импорт типов из
project.datatypes.
import enum import typing as t from datetime import datetime from sqlalchemy import ( BigInteger, String, ForeignKey, func, Index, Enum ) from sqlalchemy.dialects.postgresql import ARRAY from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column # Предполагается, что этот код находится в project/datatypes.py # Но для генерации моделей импортируй их: # from project.datatypes import ProductIdT, ProductNameT, OrderIdT, OrderStatusT # --- MOCK DATATYPES (для примера) --- ProductIdT = t.NewType("ProductIdT", t.Annotated[int, "Product ID"]) ProductNameT = t.NewType("ProductNameT", t.Annotated[str, "Product Name"]) OrderIdT = t.NewType("OrderIdT", t.Annotated[int, "Order ID"]) # ------------------------------------ # 1. Base class Base(DeclarativeBase): pass # 2. Enums class OrderStatus(str, enum.Enum): CREATED = "created" PROCESSING = "processing" COMPLETED = "completed" # 3. Models class Product(Base): __tablename__ = "products" # Использование доменного типа + BigInteger id: Mapped[ProductIdT] = mapped_column(BigInteger, primary_key=True) # Доменный тип для строки name: Mapped[ProductNameT] = mapped_column(String(150), index=True) price: Mapped[int] = mapped_column(BigInteger) # Для простых значений можно int # Many-to-Many via Array of IDs related_product_ids: Mapped[list[ProductIdT]] = mapped_column(ARRAY(BigInteger), default=list) class Order(Base): __tablename__ = "orders" id: Mapped[OrderIdT] = mapped_column(BigInteger, primary_key=True) # Native Postgres Enum status: Mapped[OrderStatus] = mapped_column( Enum(OrderStatus, name="order_status_enum"), default=OrderStatus.CREATED, index=True ) created_at: Mapped[datetime] = mapped_column(server_default=func.now()) class OrderLog(Base): __tablename__ = "order_logs" # Composite Primary Key Example with Domain Types order_id: Mapped[OrderIdT] = mapped_column( ForeignKey("orders.id", ondelete="CASCADE"), primary_key=True ) log_index: Mapped[int] = mapped_column(BigInteger, primary_key=True) message: Mapped[str] = mapped_column(String)
Проект использует Alembic для управления миграциями базы данных.
# Автоматически создать миграцию на основе изменений в моделях alembic revision --autogenerate -m "Описание изменений" # Или создать пустую миграцию alembic revision -m "Описание изменений"
# Применить все ожидающие миграции alembic upgrade head # Откатить последнюю миграцию alembic downgrade -1
При переключение веток не забудьте откатывать миграции, если в ваших ветках они ушли дальше, чем на ветке, в которую вы переключились.
CacheRepository — это базовый класс для работы с Redis в качестве кеша. Он предоставляет унифицированный интерфейс для кеширования данных с поддержкой TTL.
Расположение:
project/components/{component}/repositories.py
Тип:
t.ClassVar[str]
Шаблон для формирования ключей кеша. Должен содержать плейсхолдер
{} для подстановки идентификатора:
# Правильно key_template = "user:{}" # Неправильно key_template = "user" # Нет плейсхолдера
Тип:
t.ClassVar[timedelta]
Время жизни записи в кеше:
# Примеры TTL ttl = timedelta(days=7) # 7 дней ttl = timedelta(seconds=60) # 60 секунд
Тип:
t.ClassVar[redis_client]
Клиент для работы с Redis. Уже определен в базовом классе:
from project.infrastructure.adapters.acache import redis_client class CacheRepository: client = redis_client
Используйте метод
cls.client() для получения экземпляра клиента внутри методов.
Для данных кеша рекомендуется использовать Pydantic схемы:
from pydantic import BaseModel class UserCacheSchema(BaseModel): id: int name: str email: str created_at: datetime class Config: from_attributes = True
Имя класса должно заканчиваться на
CacheRepository и наследоваться от CacheRepository:
class UserCacheRepository(CacheRepository): ... class ProductCacheRepository(CacheRepository): ...
Используйте доменные типы из
project/datatypes.py для аннотации ключей:
from project.datatypes import UserIdT, ProductIdT async def save(cls, user_id: UserIdT, data: "BaseModel"): ... async def get(cls, product_id: ProductIdT): ...
orjson для быстрой сериализацииdata.model_dump(exclude_unset=True) для получения словаряorjson.loads()Полный актуальный пример в repositories.py:
class UserCacheRepository(CacheRepository): key_template = "user:{}" ttl = timedelta(days=7) @classmethod async def save(cls, user_id: UserIdT, data: "BaseModel"): async with redis_atransaction() as tr: content = orjson.dumps(data.model_dump(exclude_unset=True)) tr.set(cls.key_template.format(user_id), content, ex=cls.ttl) @classmethod async def get(cls, user_id: UserIdT): content = await cls.client().get(cls.key_template.format(user_id)) if content: data = orjson.loads(content) return UserCacheSchema(**data) return content @classmethod async def delete(cls, user_id: UserIdT): async with redis_atransaction() as tr: tr.delete(cls.key_template.format(user_id))
При необходимости можно добавить дополнительные методы для работы с кешем:
class UserCacheRepository(CacheRepository): ... @classmethod async def exists(cls, user_id: UserIdT) -> bool: """Проверка существования ключа в кеше.""" return await cls.client().exists(cls.key_template.format(user_id)) > 0
Расположение:
project/infrastructure/adapters/adatabase.py
Для чтения данных. Переиспользует существующую сессию или создает новую.
# Простое чтение async with asession() as session: result = await session.execute(select(User).where(User.id == user_id)) user = resulscalar_one_or_none() # Вложенные вызовы используют ту же сессию async with asession() as session1: async with asession() as session2: # session1 === session2
⚠️ Не создает транзакцию. Для изменений используйте
atransaction().
Для изменения данных. Создает транзакцию с автоматическим commit/rollback.
# Простая транзакция async with atransaction() as session: user = User(name="John") session.add(user) # Автоматический commit # Вложенные транзакции создают SavePoint async with atransaction() as session: user = User(name="John") session.add(user) try: async with atransaction() as s: # Создается SavePoint post = Post(title="Test", user=user) s.add(post) raise ValueError() except ValueError: pass # SavePoint откатился, но user сохранится
Поведение при вложенности:
begin_nested() (SavePoint)begin()Для переиспользуемых функций. Возвращает активную транзакцию или создает новую.
async def reusable_operation(): async with current_atransaction() as session: # Работает и внутри, и вне существующей транзакции user = User(name="John") session.add(user) # Вариант 1: создаст транзакцию await reusable_operation() # Вариант 2: использует существующую async with atransaction(): await reusable_operation()
Отличие от atransaction():
atransaction() — всегда создает новый уровень (SavePoint)current_atransaction() — переиспользует текущую транзакцию без SavePointРасположение:
project/components/base/repositories.py
Дает доступ к открытие сессии и транзакции. Область применения, внутри методов классов, во вне лучше использовать обертки через project.container.AllRepositories
Вне классов, их использовать не надо!
class ORMRepository(Generic[T]): @classmethod @contextmanager def get_session(cls): with Session() as session: yield session @classmethod @contextmanager def get_transaction(cls): with transaction() as session: yield session @classmethod @contextmanager def get_current_transaction(cls): with current_transaction() as session: yield session class ORMModelRepository(ORMRepository[T]): # Наследует методы от ORMRepository. ...
Расположение:
project/container.py
Класс
AllRepositories предоставляет централизованный доступ к транзакциям.
Его надо использовать, когда на уровне бизнес-логики
нужно обернуть вызов методов из нескольких репозиториев.
class AllRepositories: def __init__(self): self.user = UserRepository() ... @classmethod @contextmanager def transaction(cls) -> Generator["ORMSession", Any, None]: with transaction() as session: yield session @classmethod @contextmanager def current_transaction(cls) -> Generator["ORMSession", Any, None]: with current_transaction() as session: yield session
Использование:
from project.container import Repositories # Через экземпляр репозитория user = Repositories().user.get(user_id) # Через контейнер транзакций with Repositories.transaction() as session: Repositories.user.save(user_data) Repositories.employee.save(employee_data) # Переиспользование текущей транзакции with Repositories.current_transaction() as session: ...
В
tests/conftespy определены фикстуры с автоматическим rollback:
@pytesfixture def session(init_database): with database.Session() as session: with session.begin() as t: with session.begin_nested(): yield session # Данные откатываются после теста rollback() database.engine_factory.cache_clear() database.scoped_session_factory.cache_clear()
@pytest_asyncio.fixture async def asession(init_database): async with adatabase.asession() as asession: async with asession.begin() as t: async with asession.begin_nested(): yield asession # Данные откатываются после теста await rollback() adatabase.aengine_factory.cache_clear() adatabase.async_sessionmaker_factory.cache_clear()
Как это работает:
adatabase.asession()begin()begin_nested()rollback()Это обеспечивает чистую БД для каждого теста без необходимости пересоздания схемы.
Свои собственные исключения наследуйте от
project.exceptions.AppError
uvicorn запускается с циклом uvloop
uvicorn project.presentation.api:app -host 0.0.0.0 --loop uvloop
Эндпоинты должны располагаться в
project.components.{name}.endpoints.
Лучше возвращать в виде словаря. Тогда при необходимости добавление новых данных в ответе ручки, нужно будет добавить только новое поле. Используйте готовую схему для этого
project.components.base.schemas.ApiResponse.
Если указать в аргументе response_model, в swagger появится документация по выводу.
Но в случае тяжелых данных это может быть затратно по времени,
потому что данные буду валидироваться через pydantic, это замедляет 2.5 раза по сравнению с обычным dict/dataclass.
from project.components.base.schemas import ApiResponseSchema @app.get("/my", response_model=ApiResponseSchema[list[int]]) async def my_resource(): return {"data": [1, 2]}
Проверка уже зашита в эндпоинты.
def auth_by_token(auth_token: str = Header(alias="Api-Token")): if auth_token != Settings().API_TOKEN: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Token") return auth_token app = FastAPI(..., dependencies=[Depends(auth_by_token)])
ORJSONResponse использует пакет orjson написанный на RUST, очень быстрый.
Можно указывать в самой ручке через аргумент response_class.
from fastapi.responses import ORJSONResponse @app.get("/health", response_class=ORJSONResponse) async def health_check(): return {}
/user/v1/listdeprecated.@app.get("/user/v1/list", deprecated=True)
Поток обработки ошибок: Бизнес-логика → raise Exception → FastAPI Exception Handler → HTTP Response
В бизнес-логике поднимать обычные исключения (наследники
AppError):
# В use cases, services, repositories def process_user(user_id: int): user = user_repo.get(user_id) if not user: raise UserNotFoundError(f"User {user_id} not found") return user
В FastAPI эндпоинтах использовать обработчики исключений FastAPI для преобразования бизнес-исключений в HTTP ответы:
app = FastAPI(...) @app.exception_handler(Exception) async def custom_exception_handler(request, exc: Exception): message = f"Unexpected Error: {exc}" logger.exception(message) return ORJSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": "Internal Server Error"}, ) @app.exception_handler(HTTPException) async def custom_http_exception_handler(request, exc: HTTPException): message = f"{request.method} {request.url} {exc.status_code}" if exc.detail: message = f"{message} ({exc.detail})" if Settings().is_local(): message = f"{message} headers={request.headers}" logger.error(message) return await http_exception_handler(request, exc) @app.exception_handler(RequestValidationError) async def custom_validation_exception_handler(request: Request, exc: RequestValidationError): message = f"{request.method} {request.url} {status.HTTP_422_UNPROCESSABLE_ENTITY} ({exc.errors()})" logger.error(message) return await request_validation_exception_handler(request, exc) @app.exception_handler(exceptions.NotFoundError) async def not_found_error_handler(request: Request, exc: exceptions.NotFoundError): message = f"{request.method} {request.url} {status.HTTP_404_NOT_FOUND} ({exc})" logger.error(message) return ORJSONResponse( status_code=status.HTTP_404_NOT_FOUND, content={"detail": str(exc)}, ) @app.exception_handler(exceptions.AuthError) async def auth_error_handler(request: Request, exc: exceptions.NotFoundError): message = f"{request.method} {request.url} {status.HTTP_401_UNAUTHORIZED} ({exc})" logger.error(message) return ORJSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": str(exc)}, ) @app.exception_handler(exceptions.ExternalApiError) async def integration_error_handler(request: Request, exc: exceptions.NotFoundError): message = f"{request.method} {request.url} {status.HTTP_500_INTERNAL_SERVER_ERROR} ({exc})" logger.error(message) return ORJSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": str(exc)}, )
❌ Не использовать HTTPException в бизнес-логике:
# ПЛОХО: бизнес-логика зависит от FastAPI def process_user(user_id: int): user = user_repo.get(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") # ❌ return user
❌ Не подавлять исключения в эндпоинтах:
# ПЛОХО: скрывает ошибки @app.get("/users/{user_id}") async def get_user(user_id: int): try: user = await user_service.get_user(user_id) return user except Exception as e: return {"error": "Something went wrong"} # ❌
TODO: переработать
Основная цель слоистых архитектур - отделить бизнес-логику от фреймворков, инфраструктуры и интерфейсов ввода/вывода.
Инфраструктура - это подключение к базе данных, ORM, обращение к внешнему сервису, шина сообщений, отправка уведомлений, сбор метрик и т.д.
Главное правило - бизнес-логика не должна зависеть от деталей реализации и быть подальше от побочных эффектов. Для этого нужно соблюдать правило инверсии зависимостей, нижележащие слои не могут зависеть от вышестоящих.
Конкретно в Python это означает, что в слой бизнес-логики не должно быть импортов из слоя инфраструктуры и ввода/вывода. Чтобы обозначить ожидаемый интерфейс объектов из слоя инфраструктуры в аннотациях типов, не импортируя их, можно создавать заглушки объектов наследуясь от класса
Protocol.
Пример в interfaces.py
Из-за того, что бизнес-логика не зависит от деталей реализации, упрощается тестирование. Пример теста бизнес-логики посмотрите тут test_ask.py
Пример, как надо писать модули смотрите на примере домена chat
Считаю, что абстракции UseCase, Service, Repository могут быть достаточными для скрытия сложности на ранней стадии проекта. Вводить новые сущности можно по мере увеличения сложности проекта. Поэтому ограничимся описанием этих паттернов.
UseCase - точка входа в бизнес-сценарии. Пример use_cases.py. Слой, через который интерфейсы ввода/вывода запускают бизнес-логику. Здесь содержится валидация данных, авторизация, проверка квоты, лимитов и т.д. Поэтому другие домены и поддомены бизнес-логики не должны использовать
UseCase.
Реализация бизнес-процессов должна находится в Service-ах.
В UseCase не должно быть того, что потребуется в Service-ах в других доменах и поддоменах.
Имя сценария должно отражать бизнес-функцию.
Ожидается, что UseCase должен быть очень простым (мало строк) и понятным для чтения.
Сам код, нейминг классов и методов должен описывать, что происходит в терминах бизнеса -
провалидировать данные, проверить авторизацию, квоту, запустить бизнес-процесс.
Валидацию данных лучше использовать на этом слое,
но детали реализации (функции валидаций) выносить в модуль validation.py
UseCase - это объект без состояния.
Service - скрывает детали реализации бизнес-процесса. Пример service.py
Может объединять в себе работу одного или нескольких доменов.
Объект без состояния.
Repository - нужны, чтобы отделить доступ к данным от ORM. Пример [repositories.py]
(project/components/chat/repositories.py)
Объект без состояния.
# Когда мы смотрим на бизнес-логику, лучше увидеть такое UserRepo.get_users(user_ids=[1, 2, 3]) # чем такое query = select(User).where(User.id.in_([1, 2, 3])) async with Session() as session: result = session.execute(query) data = await result.scalars().all()
Interface - это объект, показывающий ожидаемый интерфейс, используется только в аннотациях типов. Пример interfaces.py. Избавляет от необходимости импорта реального объекта, чтобы не нарушать правило инверсии зависимостей.
Adapter - реализация интерфейса. Пример llm.py. Чтобы не зависеть от конкретных фреймворков и других зависимостей, мы взаимодействуем с ними через фасад. Благодаря этому можно заменить технологию, находящуюся за фасадом.
DIContainer - контейнер, в котором разрешаются зависимости, создается один раз. Пример container.py. Знает кому, какие зависимости нужны и откуда их взять. Избавляет нас от необходимости думать, как создать объект. Если нужно в одной транзакции изменить несколько моделей, т.е. вне границ репозитория домена, лучше создайте еще один репозиторий для этого.
Есть линтер, который проверяет направление зависимостей, настроенных в конфиге layers.toml. Запускается через
layers-linter project
глобальные объекты это топ антипаттерн, далующий программу крайне плохой (TODO: написать подробнее почему).
Вместо них создавайте объекты с ленивой инициализацией (в момент реального использования объекта)
Самый простой и частый вариант это создать фнукцию с кешом. Там где используете клиент, вы вызываете эту функцию и получаете объект, это позволяет отложить инициализацию до момента реально использования.
from functools import cache from langchain_openai import ChatOpenAI from project.settings import Settings @cache def client(): return ChatOpenAI( api_key=Settings().LLM_API_KEY.get_secret_value(), ) async def llm_logic(): result = await client().ainvoke()
Если в тестах нужно инициализировать объект с другими параметрами, тогда можете использовать
LazyInit из structures.py, он предоставляют механизм ленивой инициализации и
контекстный менеджер для инициализации объекта с другими параметрами.
Пример такого использования для класса Settings, который под капотом получает переменные окружения, а в тестах мы хотим заменять переменные окружения.
# ✅ ПРАВИЛЬНО - объявление класса class MyServiceClass: def __init__(self, param): self.param = param def do_something(self): return self.param MyService = LazyInit(MyServiceClass, kwargs_func=lambda: {"param": Settings().PARAM}) # ❌ НЕПРАВИЛЬНО - инициализация в глобальной области result = MyService().do_something() # ❌ НЕПРАВИЛЬНО - сохранение экземпляра в глобальной области my_service = MyService() # ❌ ПЛОХО my_service.do_something() # ✅ ПРАВИЛЬНО - сохранение в атрибутах экземпляра допускается, потому что экземпляр создается лениво. class MyClass: def __init__(self): self.adapter = GitLabAdapter() # ✅ ПРАВИЛЬНО - Вызов внутри функции будет инициализироваться лениво def myfunc(): x = MyService().do_something()
Проект использует layers-linter для автоматической проверки соблюдения архитектурных границ между слоями.
Конфигурация находится в файле layers.toml, где определены:
Запуск проверки:
layers-linter project
Линтер анализирует импорты в коде и выявляет нарушения архитектурных границ:
Это помогает поддерживать чистоту архитектуры и предотвращает появление нежелательных зависимостей между слоями.
Есть готовые настроенные дашборды для метрик собираемых в этом проекте через Prometheus. Можно узнать у коллег разработчиков и девопсов. Также на эти метрики в дашбордах можно настроить алерты в телеграм
Для критичных секций кода. Для Telegram обработчиков и callback если это telegram бот.
Для этого есть контекстный менеджер и декоратор.
Чтобы эти примитивы смогли отследить возниклования exception, внутри этих контекстного менеджера и декоратора не должны подавляться exception.
from llm_common.prometheus import action_tracking, action_tracking_decorator # Использование контекст-менеджера with action_tracking("data_processing") as tracker: # Ваш код processed_data = process_data() # Опционально: трекинг размера данных tracker.size(len(processed_data)) # Опционально: зафиксировать, как ошибку tracker.to_fail() # Использование декоратора @action_tracking_decorator("myfeature_llm_call") async def make_llm_request(): # Ваш код return result
Хорошей практикой является отслеживания всех хендлеров.
Применяйется на хендлеры и обработки callback кнопок декоратор или контекстный менеджеры action_tracking и action_tracking_decorator В качестве имени указывайте суффикс "_handler" action_tracking(name="menu_handler"), это позволит офильтровать на графике только метрики для хэндлеров
Для обработчиков Telegram, суффикс "_handler" Для регулярных задач, суффикс "_task" Для вызовов llm, суффикс "_llm_call" Для запуска агента llm, суффикс "_agent"
Разделитель для имен: "_"
Контекст-менеджер для отслеживания действий:
Декоратор для функций и корутин, поддерживает все возможности
action_tracking.
Все метрики имеют префикс
genapp_:
genapp_http_requests_total - Общее количество HTTP запросовgenapp_http_request_duration_sec - Гистограмма времени выполненияgenapp_http_request_size_bytes - Размер запросов/ответовgenapp_action_count_total - Количество выполненных действийgenapp_action_duration_sec - Время выполнения действийgenapp_action_size_total - Размер обработанных данныхМетрики содержат labels:
Про утилиты
TODO: описать другие утилиты
Асинхронные приожения запускаются через uvloop. Он гораздо быстрее.
про них написано в linters.md
Используется для создания ботов
Используется для создания API
TODO: добавить про другие библиотеки
TODO: рассказать про SecretStr
Объект
Settings в файле project/settings.py реализован через LazyInit
и должен использоваться только через вызов класса Settings().param_name.
Это обеспечивает правильную работу ленивой инициализации, потокобезопасность и возможность динамического переопределения настроек для тестирования.
LazyInit создает экземпляр настроек только при первом обращении
и переиспользует его в рамках одного контекста выполнения. Это обеспечивает:
Settings.local(**kwargs)# ❌ НЕПРАВИЛЬНО settings = Settings() model_name = settings.LLM_MODEL_NAME
Почему плохо:
# ❌ НЕПРАВИЛЬНО def process_data(settings=Settings()): return settings.MAX_TOKENS # ❌ НЕПРАВИЛЬНО def create_agent(config: SettingsValidator = Settings()): pass
Почему плохо:
# ❌ НЕПРАВИЛЬНО class MyClass: def __init__(self): self.settings = Settings() # ❌ ПЛОХО def process(self): return self.settings.MAX_TOKENS
Почему плохо:
# ✅ ПРАВИЛЬНО для тестов def some_function(): print(Settings().MAX_TOKENS) with Settings.local(MAX_TOKENS=1000, TEMPERATURE=0.5): result = some_function() # В этом контексте some_function и Settings().MAX_TOKENS вернет 1000
import typing as t from enum import Enum from pathlib import Path from pydantic import PostgresDsn, AfterValidator, SecretStr from pydantic_settings import BaseSettings, SettingsConfigDict from project.libs.structures import LazyInit __all__ = ["Settings"] def not_empty_validator(value): if not value: error_msg = "Field cannot be empty" raise ValueError(error_msg) return value NotEmptyStrT = t.Annotated[str, AfterValidator(not_empty_validator)] NotEmptySecretStrT = t.Annotated[SecretStr, AfterValidator(not_empty_validator)] MONITORING_APP_NAME = "" API_ROOT_PATH = "/api" class Envs(Enum): PROD = "PROD" # to work at a prod stand LAMBDA = "LAMBDA" # to work at a stable stand SANDBOX = "SANDBOX" # to work on a test stand TEST = "AUTOTEST" # for run testing LOCAL = "LOCAL" # for local development class SettingsValidator(BaseSettings): # Application ENV: Envs = Envs.PROD API_TOKEN: NotEmptySecretStrT HISTORY_WINDOW: int = 20 # Keycloak KEYCLOAK_URL: str = "" KEYCLOAK_CLIENT_ID: str = "" KEYCLOAK_USERNAME: str = "" KEYCLOAK_PASSWORD: SecretStr | None = None # Auth service BOT_AUTH_SERVICE_URL: str = "" # Database SQLALCHEMY_DATABASE_DSN: PostgresDsn # Example: postgresql+psycopg2://user:password@localhost:5432/database DATABASE_PRE_PING: t.Annotated[bool, "Checks and creates connection if closed before requesting"] = False # Telegram TELEGRAM_BOT_TOKEN: NotEmptySecretStrT TELEGRAM_BASE_URL: str = "" TELEGRAM_FILE_BASE_URL: str = "" # Redis REDIS_HOST: str = "" REDIS_PORT: str = "" REDIS_DB: str = "" # LLM LLM_MODEL: NotEmptyStrT LLM_API_KEY: NotEmptySecretStrT LLM_MIDDLE_PROXY_URL: str = "" LLM_TEMPERATURE: float = 0.3 LLM_MAX_TOKENS: int = 8192 LLM_TIMEOUT: float | None = None # Langfuse LANGFUSE_TRACING_ENABLED: bool = "false" LANGFUSE_PUBLIC_KEY: str | None = None LANGFUSE_SECRET_KEY: str | None = None LANGFUSE_HOST: str | None = None # Logging WRITE_LOGS_TO_FILE: bool = True LOG_LEVEL: str = "INFO" FASTAPI_LOG_LEVEL: str = "INFO" TELEGRAM_LOG_LEVEL: str = "INFO" HTTP_REQUESTS_LOG_LEVEL: str = "ERROR" SQLALCHEMY_LOG_LEVEL: str = "ERROR" REDIS_LOG_LEVEL: str = "ERROR" FLASK_LOG_LEVEL: str = "ERROR" # Loading local settings for development environment. model_config = SettingsConfigDict(env_file=Path(__file__).parent.parent / ".env", extra="allow") def is_local(self): return self.ENV == Envs.LOCAL def is_production(self): return self.ENV == Envs.PROD def is_testable_stand(self): return self.ENV in (Envs.LAMBDA, Envs.SANDBOX) def is_any_stand(self): return self.ENV in (Envs.PROD, Envs.LAMBDA, Envs.SANDBOX) Settings = LazyInit(SettingsValidator)
Создавайте обработчики телеграма в таком виде.
Не нужно подавлять ошибки внутри обработчика, потому что декоратор processing_errors должен его перехватить и он является предпочтительным местом обработчик ошибок, являясь централизованным местом обработки ошибок, чтоб избежать дублирования кода по обработке ошибок в каждом обработчике.
{Объяснить каждый декоратор для телеграмма из project/infrastructure/utils/telegram.py}
Порядок декораторов важен!
@check_auth @timeout_with_retry @processing_errors @action_tracking_decorator("start_handler") async def start_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): user_id = update.effective_user.id message = f"Привет {user_id}! Это пример обработчика Телеграм!" await update.message.reply_text(message)
Не используйте в аннотациях примитивные типы str, int и т.п. Вместо этого создавайте типы для каждой сущности с именем, которое будет отражать сущность этого объекта. Код с такими типами лучше читается, устраняет двусмысленность и устраняет риск перепутать объекты.
Пример:
import typing as t UserIdT = t.NewType("UserIdT", t.Annotated[int, "User ID"])
Такие типы размещаются в datatypes.py
Список всех таких типов в проекте:
import typing as t UserIdT = t.NewType("UserIdT", t.Annotated[int, "User ID"]) ChatIdT = t.NewType("ChatIdT", t.Annotated[int, "Chat ID"]) MessageIdT = t.NewType("MessageIdT", t.Annotated[int, "Message ID"]) AnswerT = t.NewType("AnswerT", t.Annotated[str, "Ответ пользователю"]) QuestionT = t.NewType("QuestionT", t.Annotated[str, "Вопрос пользвоателя"])