enhance: RabbitMQ config

This commit is contained in:
Alexander Kalinovsky
2025-08-22 20:11:59 +03:00
parent 54414e530a
commit f2855173a8
4 changed files with 19 additions and 12 deletions

View File

@@ -1,3 +1,3 @@
from .main import FaststreamPlugin from .main import FaststreamPlugin
__all__ = ["FaststreamPlugin"] __all__ = ["FaststreamPlugin"]

View File

@@ -1,4 +1,3 @@
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -7,6 +6,14 @@ class Config(BaseSettings):
env_file=".env", env_ignore_empty=True, extra="ignore" env_file=".env", env_ignore_empty=True, extra="ignore"
) )
RABBITMQ_URL: str RABBITMQ_HOST: str
RABBITMQ_PORT: int
RABBITMQ_USER: str
RABBITMQ_PASSWORD: str
config = Config() @property
def RABBITMQ_URI(self) -> str:
return f"amqp://{self.RABBITMQ_USER}:{self.RABBITMQ_PASSWORD}@{self.RABBITMQ_HOST}:{self.RABBITMQ_PORT}/"
config = Config()

View File

@@ -1,5 +1,4 @@
from aiogram.types import Update from aiogram.types import Update
from typing import Literal
from fastapi import Depends, Request, Response from fastapi import Depends, Request, Response
from fastapi.datastructures import State from fastapi.datastructures import State
from faststream.rabbit.fastapi import ContextRepo from faststream.rabbit.fastapi import ContextRepo
@@ -16,6 +15,7 @@ from .utils import override_route
logger = getLogger(__name__) logger = getLogger(__name__)
async def telegram_webhook( async def telegram_webhook(
request: Request, request: Request,
): ):
@@ -52,11 +52,11 @@ class FaststreamPlugin:
self.broker_router = None self.broker_router = None
def register(self, app: QuickBot) -> None: def register(self, app: QuickBot) -> None:
self.broker_router = RabbitRouter( self.broker_router = RabbitRouter(url=config.RABBITMQ_URI)
url=config.RABBITMQ_URL
@self.broker_router.subscriber(
queue=f"{app.config.STACK_NAME}.telegram_updates", no_reply=True
) )
@self.broker_router.subscriber(queue=f"{app.config.STACK_NAME}.telegram_updates", no_reply=True)
async def telegram_updates_handler( async def telegram_updates_handler(
message: dict, message: dict,
db_session: Annotated[AsyncSession, Depends(get_db)], db_session: Annotated[AsyncSession, Depends(get_db)],
@@ -83,4 +83,3 @@ class FaststreamPlugin:
app.include_router(self.broker_router) app.include_router(self.broker_router)
override_route(app, name="telegram_webhook", new_endpoint=telegram_webhook) override_route(app, name="telegram_webhook", new_endpoint=telegram_webhook)

View File

@@ -1,7 +1,8 @@
from typing import Callable, Optional from typing import Callable, Optional
from fastapi import FastAPI, APIRouter from fastapi import FastAPI
from fastapi.routing import APIRoute from fastapi.routing import APIRoute
def override_route( def override_route(
app: FastAPI, app: FastAPI,
*, *,
@@ -39,4 +40,4 @@ def override_route(
response_model_exclude_defaults=found.response_model_exclude_defaults, response_model_exclude_defaults=found.response_model_exclude_defaults,
response_model_exclude_none=found.response_model_exclude_none, response_model_exclude_none=found.response_model_exclude_none,
openapi_extra=found.openapi_extra, openapi_extra=found.openapi_extra,
) )