159 lines
5.2 KiB
Python
159 lines
5.2 KiB
Python
from aiogram import Router, F
|
|
from aiogram.fsm.context import FSMContext
|
|
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
|
|
from inspect import iscoroutinefunction
|
|
from typing import TYPE_CHECKING
|
|
|
|
from qbot.utils.main import clear_state
|
|
from qbot.utils.navigation import (
|
|
save_navigation_context,
|
|
get_navigation_context,
|
|
pop_navigation_context,
|
|
)
|
|
from qbot.bot.handlers.editors.main import field_editor
|
|
from qbot.bot.handlers.common.routing import route_callback
|
|
from qbot.utils.serialization import deserialize
|
|
from qbot.utils.main import get_send_message
|
|
from qbot.model.descriptors import BotCommand, CommandCallbackContext
|
|
from qbot.model.settings import Settings
|
|
|
|
if TYPE_CHECKING:
|
|
from qbot.main import QBotApp
|
|
|
|
from ..context import ContextData, CallbackCommand, CommandContext
|
|
|
|
|
|
router = Router()
|
|
|
|
|
|
@router.message(F.text.startswith("/"))
|
|
async def command_text(message: Message, **kwargs):
|
|
str_command = message.text.lstrip("/")
|
|
callback_data = ContextData(
|
|
command=CallbackCommand.USER_COMMAND, user_command=str_command
|
|
)
|
|
|
|
state: FSMContext = kwargs["state"]
|
|
state_data = await state.get_data()
|
|
kwargs["state_data"] = state_data
|
|
|
|
await process_command_handler(message=message, callback_data=callback_data, **kwargs)
|
|
|
|
|
|
@router.callback_query(ContextData.filter(F.command == CallbackCommand.USER_COMMAND))
|
|
async def command_callback(message: CallbackQuery, **kwargs):
|
|
state: FSMContext = kwargs["state"]
|
|
state_data = await state.get_data()
|
|
kwargs["state_data"] = state_data
|
|
|
|
await process_command_handler(message=message, **kwargs)
|
|
|
|
|
|
async def process_command_handler(message: Message | CallbackQuery, **kwargs):
|
|
|
|
state_data: dict = kwargs["state_data"]
|
|
callback_data: ContextData = kwargs["callback_data"]
|
|
app: "QBotApp" = kwargs["app"]
|
|
cmd = app.bot_commands.get(callback_data.user_command.split("&")[0])
|
|
|
|
if cmd is None:
|
|
return
|
|
|
|
if cmd.clear_navigation:
|
|
state_data.pop("navigation_stack", None)
|
|
state_data.pop("navigation_context", None)
|
|
|
|
if cmd.register_navigation:
|
|
clear_state(state_data=state_data)
|
|
save_navigation_context(callback_data=callback_data, state_data=state_data)
|
|
|
|
await cammand_handler(message=message, cmd=cmd, **kwargs)
|
|
|
|
|
|
async def cammand_handler(message: Message | CallbackQuery, cmd: BotCommand, **kwargs):
|
|
callback_data: ContextData = kwargs["callback_data"]
|
|
state: FSMContext = kwargs["state"]
|
|
state_data: dict = kwargs["state_data"]
|
|
app: "QBotApp" = kwargs["app"]
|
|
|
|
entity_data_dict: dict = state_data.get("entity_data")
|
|
form_data = (
|
|
{
|
|
key: await deserialize(
|
|
session=kwargs["db_session"],
|
|
type_=cmd.param_form[key].type_,
|
|
value=value,
|
|
)
|
|
for key, value in entity_data_dict.items()
|
|
}
|
|
if entity_data_dict and cmd.param_form
|
|
else None
|
|
)
|
|
|
|
callback_context = CommandCallbackContext(
|
|
message=message,
|
|
callback_data=callback_data,
|
|
form_data=form_data,
|
|
db_session=kwargs["db_session"],
|
|
user=kwargs["user"],
|
|
app=app,
|
|
state_data=state_data,
|
|
state=state,
|
|
i18n=kwargs["i18n"],
|
|
register_navigation=cmd.register_navigation,
|
|
kwargs=kwargs,
|
|
)
|
|
|
|
if cmd.pre_check and (not cmd.param_form or (cmd.param_form and form_data is None)):
|
|
if iscoroutinefunction(cmd.pre_check):
|
|
if not await cmd.pre_check(callback_context):
|
|
return
|
|
else:
|
|
if not cmd.pre_check(callback_context):
|
|
return
|
|
|
|
if form_data is None and cmd.param_form:
|
|
field_descriptor = list(cmd.param_form.values())[0]
|
|
kwargs["callback_data"] = ContextData(
|
|
command=CallbackCommand.FIELD_EDITOR,
|
|
context=CommandContext.COMMAND_FORM,
|
|
field_name=field_descriptor.name,
|
|
user_command=callback_data.user_command,
|
|
)
|
|
|
|
return await field_editor(message=message, **kwargs)
|
|
|
|
await cmd.handler(callback_context)
|
|
|
|
if callback_context.register_navigation:
|
|
await state.set_data(state_data)
|
|
|
|
stack, navigation_context = get_navigation_context(state_data=state_data)
|
|
back_callback_data = pop_navigation_context(stack=stack)
|
|
if back_callback_data:
|
|
callback_context.keyboard_builder.row(
|
|
InlineKeyboardButton(
|
|
text=(await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
|
|
callback_data=back_callback_data.pack(),
|
|
)
|
|
)
|
|
|
|
send_message = get_send_message(message)
|
|
|
|
if isinstance(message, CallbackQuery):
|
|
message = message.message
|
|
|
|
if callback_context.message_text:
|
|
await send_message(
|
|
text=callback_context.message_text,
|
|
reply_markup=callback_context.keyboard_builder.as_markup(),
|
|
)
|
|
else:
|
|
await message.edit_reply_markup(
|
|
reply_markup=callback_context.keyboard_builder.as_markup()
|
|
)
|
|
|
|
else:
|
|
clear_state(state_data=state_data)
|
|
await route_callback(message, back=True, **kwargs)
|