Files
quickbot/bot/handlers/common/__init__.py
Alexander Kalinovsky 3898a333fa refactoring
2025-01-09 13:11:10 +01:00

343 lines
15 KiB
Python

from types import NoneType, UnionType
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
from inspect import signature
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import Any, get_args, get_origin, TYPE_CHECKING
import ujson as json
from ..context import ContextData, CallbackCommand, CommandContext
from ...command_context_filter import CallbackCommandFilter
from ....model.user import UserBase
from ....model.settings import Settings
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.view_setting import ViewSetting
from ....utils import get_local_text, deserialize
from ....model.descriptors import (EntityFieldDescriptor,
EntityDescriptor,
EntityCaptionCallable,
EntityItemCaptionCallable,
EntityFieldCaptionCallable)
if TYPE_CHECKING:
from ....main import QBotApp
router = Router()
def get_send_message(message: Message | CallbackQuery):
if isinstance(message, Message):
return message.answer
else:
return message.message.edit_text
# def get_local_text(text: str, lang: str):
# try:
# text_obj = json.loads(text) #@IgnoreException
# return text_obj.get(lang, text_obj[list(text_obj.keys())[0]])
# except:
# return text
def get_value_repr(value: Any, field_descriptor: EntityFieldDescriptor, locale: str | None = None) -> str:
type_ = field_descriptor.type_base
if value is None:
return ""
if isinstance(value, bool):
return "【✔︎】" if value else "【 】"
elif field_descriptor.is_list:
if issubclass(type_, BotEntity):
if locale and type_.bot_entity_descriptor.fields_descriptors["name"].localizable:
return "[" + ", ".join([get_local_text(text = item.name, locale = locale) for item in value]) + "]"
else:
return "[" + ", ".join([str(item.name) for item in value]) + "]"
elif issubclass(type_, BotEnum):
return "[" + ", ".join(item.localized(locale) for item in value) + "]"
elif type_ == str:
return "[" + ", ".join([f"\"{item}\"" for item in value]) + "]"
else:
return "[" + ", ".join([str(item) for item in value]) + "]"
elif issubclass(type_, BotEntity):
if type_.bot_entity_descriptor.fields_descriptors["name"].localizable:
return get_local_text(text = value.name, locale = locale)
return value.name
elif issubclass(type_, BotEnum):
return value.localized(locale)
elif isinstance(value, str):
if field_descriptor and field_descriptor.localizable:
return get_local_text(text = value, locale = locale)
return value
elif isinstance(value, int):
return str(value)
elif isinstance(value, float):
return str(value)
else:
return str(value)
def get_callable_str(callable_str: str | LazyProxy | EntityCaptionCallable | EntityItemCaptionCallable | EntityFieldCaptionCallable,
descriptor: EntityFieldDescriptor | EntityDescriptor,
entity: Any = None,
value: Any = None) -> str:
if isinstance(callable_str, str):
return callable_str
elif isinstance(callable_str, LazyProxy):
return callable_str.value
elif callable(callable_str):
args = signature(callable_str).parameters
if len(args) == 1:
return callable_str(descriptor)
elif len(args) == 2:
return callable_str(descriptor, entity)
elif len(args) == 3:
return callable_str(descriptor, entity, value)
async def authorize_command(user: UserBase,
callback_data: ContextData):
if (callback_data.command == CallbackCommand.MENU_ENTRY_PARAMETERS or
callback_data.context == CommandContext.SETTING_EDIT):
allowed_roles = (await Settings.get(Settings.SECURITY_PARAMETERS_ROLES))
return any(role in user.roles for role in allowed_roles)
return False
def get_entity_descriptor(app: "QBotApp", callback_data: ContextData) -> EntityDescriptor | None:
if callback_data.entity_name:
return app.entity_metadata.entity_descriptors[callback_data.entity_name]
return None
def get_field_descriptor(app: "QBotApp", callback_data: ContextData) -> EntityFieldDescriptor | None:
if callback_data.context == CommandContext.SETTING_EDIT:
return Settings.list_params()[callback_data.field_name]
elif callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT]:
entity_descriptor = get_entity_descriptor(app, callback_data)
if entity_descriptor:
return entity_descriptor.fields_descriptors.get(callback_data.field_name)
return None
def add_pagination_controls(keyboard_builder: InlineKeyboardBuilder,
callback_data: ContextData,
total_pages: int,
command: CallbackCommand,
page: int):
if total_pages > 1:
navigation_buttons = []
ContextData(**callback_data.model_dump()).__setattr__
if total_pages > 10:
navigation_buttons.append(InlineKeyboardButton(text = "⏮️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = "1" if page != 1 else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = "⏪️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(max(page - 10, 1)) if page > 1 else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = f"◀️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(max(page - 1, 1)) if page > 1 else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = f"▶️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(min(page + 1, total_pages)) if page < total_pages else "skip",
save_state = True).pack()))
if total_pages > 10:
navigation_buttons.append(InlineKeyboardButton(text = "⏩️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(min(page + 10, total_pages)) if page < total_pages else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = "⏭️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(total_pages) if page != total_pages else "skip",
save_state = True).pack()))
keyboard_builder.row(*navigation_buttons)
def add_filter_controls(keyboard_builder: InlineKeyboardBuilder,
entity_descriptor: EntityDescriptor,
filter: str = None,
page: int = 1):
field_name_descriptor = entity_descriptor.fields_descriptors["name"]
if field_name_descriptor.caption:
caption = get_callable_str(field_name_descriptor.caption, field_name_descriptor)
else:
caption = field_name_descriptor.name
keyboard_builder.row(
InlineKeyboardButton(
text = f"🔎 {caption}{f": \"{filter}\"" if filter else ""}",
callback_data = ContextData(
command = CallbackCommand.VIEW_FILTER_EDIT,
entity_name = entity_descriptor.name,
data = str(page)).pack()))
@router.callback_query(ContextData.filter(F.command == CallbackCommand.VIEW_FILTER_EDIT))
async def view_filter_edit(query: CallbackQuery, **kwargs):
callback_data: ContextData = kwargs["callback_data"]
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
args = callback_data.data.split("&")
page = int(args[0])
cmd = None
if len(args) > 1:
cmd = args[1]
db_session: AsyncSession = kwargs["db_session"]
app: "QBotApp" = kwargs["app"]
user: UserBase = kwargs["user"]
entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
if cmd in ["cancel", "clear"]:
if cmd == "clear":
await ViewSetting.set_filter(session = db_session, user_id = user.id, entity_name = entity_descriptor.class_name, filter = None)
context_data_bak = state_data.pop("context_data_bak")
if context_data_bak:
state_data["context_data"] = context_data_bak
context_data = ContextData.unpack(context_data_bak)
field_descriptor = get_field_descriptor(app, context_data)
edit_prompt = state_data["edit_prompt"]
current_value = await deserialize(session = db_session,
type_ = field_descriptor.type_,
value = state_data["value"])
page = int(state_data.pop("page"))
kwargs.pop("callback_data")
return await render_entity_picker(field_descriptor = field_descriptor,
message = query,
callback_data = context_data,
current_value = current_value,
edit_prompt = edit_prompt,
page = page,
**kwargs)
else:
return await route_callback(message = query, back = False, **kwargs)
#await save_navigation_context(callback_data = callback_data, state = state)
old_context_data = state_data.get("context_data")
await state.update_data({"context_data": callback_data.pack(),
"context_data_bak": old_context_data,
"page": page})
send_message = get_send_message(query)
await send_message(text = await Settings.get(Settings.APP_STRINGS_VIEW_FILTER_EDIT_PROMPT),
reply_markup = InlineKeyboardBuilder().row(
InlineKeyboardButton(
text = await Settings.get(Settings.APP_STRINGS_CANCEL_BTN),
callback_data = ContextData(
command = CallbackCommand.VIEW_FILTER_EDIT,
entity_name = entity_descriptor.name,
data = f"{page}&cancel").pack()),
InlineKeyboardButton(
text = await Settings.get(Settings.APP_STRINGS_CLEAR_BTN),
callback_data = ContextData(
command = CallbackCommand.VIEW_FILTER_EDIT,
entity_name = entity_descriptor.name,
data = f"{page}&clear").pack())).as_markup())
@router.message(CallbackCommandFilter(command = CallbackCommand.VIEW_FILTER_EDIT))
async def view_filter_edit_input(message: Message, **kwargs):
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
callback_data = ContextData.unpack(state_data["context_data"])
db_session: AsyncSession = kwargs["db_session"]
user: UserBase = kwargs["user"]
app: "QBotApp" = kwargs["app"]
entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
filter = message.text
await ViewSetting.set_filter(session = db_session, user_id = user.id, entity_name = entity_descriptor.class_name, filter = filter)
#state_data.pop("context_data")
#return await route_callback(message = message, back = False, **kwargs)
context_data_bak = state_data.pop("context_data_bak")
if context_data_bak:
state_data["context_data"] = context_data_bak
context_data = ContextData.unpack(context_data_bak)
field_descriptor = get_field_descriptor(app, context_data)
edit_prompt = state_data["edit_prompt"]
current_value = await deserialize(session = db_session,
type_ = field_descriptor.type_,
value = state_data["value"])
page = int(state_data.pop("page"))
return await render_entity_picker(field_descriptor = field_descriptor,
message = message,
callback_data = context_data,
current_value = current_value,
edit_prompt = edit_prompt,
page = page,
**kwargs)
else:
return await route_callback(message = message, back = False, **kwargs)
from ..navigation import route_callback, save_navigation_context, clear_state, get_navigation_context
from ..editors.entity import render_entity_picker