Files
Alexander Kalinovsky 329c1361a2 add sql filter "contains"
2025-02-05 23:35:02 +01:00

160 lines
5.2 KiB
Python

from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING
from qbot.utils.main import clear_state
from qbot.utils.navigation import (
save_navigation_context,
get_navigation_context,
pop_navigation_context,
)
from qbot.bot.handlers.editors.main import field_editor
from qbot.bot.handlers.common.routing import route_callback
from qbot.utils.serialization import deserialize
from qbot.utils.main import get_send_message
from qbot.model.descriptors import BotCommand, CommandCallbackContext
from qbot.model.settings import Settings
if TYPE_CHECKING:
from qbot.main import QBotApp
from ..context import ContextData, CallbackCommand, CommandContext
router = Router()
@router.message(F.text.startswith("/"))
async def command_text(message: Message, **kwargs):
str_command = message.text.lstrip("/")
callback_data = ContextData(
command=CallbackCommand.USER_COMMAND, user_command=str_command
)
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
await process_command_handler(
message=message, callback_data=callback_data, **kwargs
)
@router.callback_query(ContextData.filter(F.command == CallbackCommand.USER_COMMAND))
async def command_callback(message: CallbackQuery, **kwargs):
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
await process_command_handler(message=message, **kwargs)
async def process_command_handler(message: Message | CallbackQuery, **kwargs):
state_data: dict = kwargs["state_data"]
callback_data: ContextData = kwargs["callback_data"]
app: "QBotApp" = kwargs["app"]
cmd = app.bot_commands.get(callback_data.user_command.split("&")[0])
if cmd is None:
return
if cmd.clear_navigation:
state_data.pop("navigation_stack", None)
state_data.pop("navigation_context", None)
if cmd.register_navigation:
clear_state(state_data=state_data)
save_navigation_context(callback_data=callback_data, state_data=state_data)
await cammand_handler(message=message, cmd=cmd, **kwargs)
async def cammand_handler(message: Message | CallbackQuery, cmd: BotCommand, **kwargs):
callback_data: ContextData = kwargs["callback_data"]
state: FSMContext = kwargs["state"]
state_data: dict = kwargs["state_data"]
app: "QBotApp" = kwargs["app"]
entity_data_dict: dict = state_data.get("entity_data")
form_data = (
{
key: await deserialize(
session=kwargs["db_session"],
type_=cmd.param_form[key].type_,
value=value,
)
for key, value in entity_data_dict.items()
}
if entity_data_dict and cmd.param_form
else None
)
callback_context = CommandCallbackContext(
message=message,
callback_data=callback_data,
form_data=form_data,
db_session=kwargs["db_session"],
user=kwargs["user"],
app=app,
state_data=state_data,
state=state,
i18n=kwargs["i18n"],
register_navigation=cmd.register_navigation,
kwargs=kwargs,
)
if cmd.pre_check and (not cmd.param_form or (cmd.param_form and form_data is None)):
if iscoroutinefunction(cmd.pre_check):
if not await cmd.pre_check(callback_context):
return
else:
if not cmd.pre_check(callback_context):
return
if form_data is None and cmd.param_form:
field_descriptor = list(cmd.param_form.values())[0]
kwargs["callback_data"] = ContextData(
command=CallbackCommand.FIELD_EDITOR,
context=CommandContext.COMMAND_FORM,
field_name=field_descriptor.name,
user_command=callback_data.user_command,
)
return await field_editor(message=message, **kwargs)
await cmd.handler(callback_context)
if callback_context.register_navigation:
await state.set_data(state_data)
stack, navigation_context = get_navigation_context(state_data=state_data)
back_callback_data = pop_navigation_context(stack=stack)
if back_callback_data:
callback_context.keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data=back_callback_data.pack(),
)
)
send_message = get_send_message(message)
if isinstance(message, CallbackQuery):
message = message.message
if callback_context.message_text:
await send_message(
text=callback_context.message_text,
reply_markup=callback_context.keyboard_builder.as_markup(),
)
else:
await message.edit_reply_markup(
reply_markup=callback_context.keyboard_builder.as_markup()
)
else:
clear_state(state_data=state_data)
await route_callback(message, back=True, **kwargs)