This commit is contained in:
Alexander Kalinovsky
2025-08-19 17:57:45 +03:00
parent 84866dcfb4
commit 54414e530a
6 changed files with 189 additions and 0 deletions

10
.gitignore vendored Normal file
View File

@@ -0,0 +1,10 @@
__pycache__
.env
.pytest_cache
.DS_Store
.venv
.python-version
uv.lock
dist
*.egg-info
site

36
pyproject.toml Normal file
View File

@@ -0,0 +1,36 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "quickbot-faststream"
version = "0.1.0"
description = "Enables telegram updates handling in faststream workers"
readme = "README.md"
requires-python = ">=3.13"
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
]
authors = [
{ name = "Alexander Kalinovsky", email = "ak@botforge.biz" },
]
license = { file = "LICENSE" }
dependencies = [
"quickbot>=0.1.1",
"faststream[rabbit]>=0.5",
]
[project.optional-dependencies]
dev = [
"build>=1.2.1",
"twine>=5.0.0",
"pytest>=8.0.0",
"ruff>=0.5.0",
]
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -0,0 +1,3 @@
from .main import FaststreamPlugin
__all__ = ["FaststreamPlugin"]

View File

@@ -0,0 +1,12 @@
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Config(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env", env_ignore_empty=True, extra="ignore"
)
RABBITMQ_URL: str
config = Config()

View File

@@ -0,0 +1,86 @@
from aiogram.types import Update
from typing import Literal
from fastapi import Depends, Request, Response
from fastapi.datastructures import State
from faststream.rabbit.fastapi import ContextRepo
from faststream.rabbit import RabbitBroker
from logging import getLogger
from typing_extensions import Annotated
from quickbot import QuickBot
from quickbot.db import get_db
from sqlmodel.ext.asyncio.session import AsyncSession
from faststream.rabbit.fastapi import RabbitRouter
from .config import config
from .utils import override_route
logger = getLogger(__name__)
async def telegram_webhook(
request: Request,
):
logger.debug("Webhook request %s", await request.json())
app: QuickBot = request.app
if app.webhook_handler:
return await app.webhook_handler(app=app, request=request)
request_token = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if request_token != app.config.TELEGRAM_WEBHOOK_AUTH_KEY:
logger.warning("Unauthorized request %s", request)
return Response(status_code=403)
try:
update = await request.json()
except Exception:
logger.error("Invalid request", exc_info=True)
return Response(status_code=400)
broker: RabbitBroker = request.state.broker
await broker.publish(
message=update,
queue=f"{app.config.STACK_NAME}.telegram_updates",
)
return Response(status_code=200)
class FaststreamPlugin:
def __init__(
self,
) -> None:
self.broker_router = None
def register(self, app: QuickBot) -> None:
self.broker_router = RabbitRouter(
url=config.RABBITMQ_URL
)
@self.broker_router.subscriber(queue=f"{app.config.STACK_NAME}.telegram_updates", no_reply=True)
async def telegram_updates_handler(
message: dict,
db_session: Annotated[AsyncSession, Depends(get_db)],
context: ContextRepo,
):
state = State(
{
"broker": context.context["broker"],
}
)
try:
update = Update(**message)
await app.dp.feed_webhook_update(
bot=app.bot,
update=update,
db_session=db_session,
app=app,
app_state=state,
)
except Exception:
logger.error("Error processing update", exc_info=True)
app.include_router(self.broker_router)
override_route(app, name="telegram_webhook", new_endpoint=telegram_webhook)

View File

@@ -0,0 +1,42 @@
from typing import Callable, Optional
from fastapi import FastAPI, APIRouter
from fastapi.routing import APIRoute
def override_route(
app: FastAPI,
*,
name: str,
new_endpoint: Callable,
):
"""
Replace the handler of a route with a new one, preserving all other route settings.
"""
found: Optional[APIRoute] = None
for r in app.routes:
if isinstance(r, APIRoute) and r.name == name:
found = r
break
if not found:
raise LookupError(f"Method {name} not found")
app.routes.remove(found)
app.add_api_route(
found.path,
new_endpoint,
methods=list(found.methods),
response_model=found.response_model,
status_code=found.status_code,
tags=found.tags,
summary=found.summary,
description=found.description,
response_description=found.response_description,
responses=found.responses,
deprecated=found.deprecated,
name=found.name,
include_in_schema=found.include_in_schema,
response_model_exclude_unset=found.response_model_exclude_unset,
response_model_exclude_defaults=found.response_model_exclude_defaults,
response_model_exclude_none=found.response_model_exclude_none,
openapi_extra=found.openapi_extra,
)