from aiogram.types import BotCommand from contextlib import asynccontextmanager from .main import QBotApp from logging import getLogger logger = getLogger(__name__) @asynccontextmanager async def default_lifespan(app: QBotApp): logger.debug("starting qbot app") if app.config.USE_NGROK: try: from pyngrok import ngrok from pyngrok.conf import PyngrokConfig except ImportError: logger.error("pyngrok is not installed") raise tunnel = ngrok.connect( app.config.API_PORT, pyngrok_config=PyngrokConfig(auth_token=app.config.NGROK_AUTH_TOKEN), ) app.config.NGROK_URL = tunnel.public_url commands_captions = dict[str, list[tuple[str, str]]]() for command_name, command in app.bot_commands.items(): if command.show_in_bot_commands: if isinstance(command.caption, str) or command.caption is None: if "default" not in commands_captions: commands_captions["default"] = [] commands_captions["default"].append( (command_name, command.caption or command_name) ) else: for locale, description in command.caption.items(): if locale not in commands_captions: commands_captions[locale] = [] commands_captions[locale].append((command_name, description)) for locale, commands in commands_captions.items(): await app.bot.set_my_commands( [ BotCommand(command=command[0], description=command[1]) for command in commands ], language_code=None if locale == "default" else locale, ) await app.bot.set_webhook( url=f"{app.config.API_URL}/api/telegram/webhook", drop_pending_updates=True, allowed_updates=["message", "callback_query", "pre_checkout_query"], secret_token=app.bot_auth_token, ) logger.info("qbot app started") if app.lifespan: async with app.lifespan(app): yield else: yield logger.info("stopping qbot app") await app.bot.delete_webhook() if app.config.USE_NGROK: ngrok.disconnect(app.config.NGROK_URL) ngrok.kill() logger.info("qbot app stopped")