Files
quickbot/utils/main.py
Alexander Kalinovsky f666bcfba3 add command params
2025-01-29 23:40:43 +01:00

215 lines
6.5 KiB
Python

from babel.support import LazyProxy
from inspect import signature
from aiogram.types import Message, CallbackQuery
from aiogram.utils.i18n import I18n
from typing import Any, TYPE_CHECKING
import ujson as json
from ..model.bot_entity import BotEntity
from ..model.bot_enum import BotEnum
from ..model.settings import Settings
from ..model.descriptors import (
FieldDescriptor,
EntityDescriptor,
EntityItemCaptionCallable,
EntityFieldCaptionCallable,
EntityPermission,
EntityCaptionCallable,
)
from ..bot.handlers.context import ContextData, CommandContext
if TYPE_CHECKING:
from ..model.user import UserBase
from ..main import QBotApp
def get_user_permissions(
user: "UserBase", entity_descriptor: EntityDescriptor
) -> list[EntityPermission]:
permissions = list[EntityPermission]()
for permission, roles in entity_descriptor.permissions.items():
for role in roles:
if role in user.roles:
permissions.append(permission)
break
return permissions
def get_local_text(text: str, locale: str = None) -> str:
if not locale:
i18n = I18n.get_current(no_error=True)
if i18n:
locale = i18n.current_locale
else:
locale = "en"
try:
obj = json.loads(text) # @IgnoreException
except Exception:
return text
else:
return obj.get(locale, obj[list(obj.keys())[0]])
def check_entity_permission(
entity: BotEntity, user: "UserBase", permission: EntityPermission
) -> bool:
perm_mapping = {
EntityPermission.LIST: EntityPermission.LIST_ALL,
EntityPermission.READ: EntityPermission.READ_ALL,
EntityPermission.UPDATE: EntityPermission.UPDATE_ALL,
EntityPermission.CREATE: EntityPermission.CREATE_ALL,
EntityPermission.DELETE: EntityPermission.DELETE_ALL,
}
if permission not in perm_mapping:
raise ValueError(f"Invalid permission: {permission}")
entity_descriptor = entity.__class__.bot_entity_descriptor
permissions = get_user_permissions(user, entity_descriptor)
if perm_mapping[permission] in permissions:
return True
ownership_filds = entity_descriptor.ownership_fields
for role in user.roles:
if role in ownership_filds:
if getattr(entity, ownership_filds[role]) == user.id:
return True
else:
if permission in permissions:
return True
return False
def get_send_message(message: Message | CallbackQuery):
if isinstance(message, Message):
return message.answer
else:
return message.message.edit_text
def clear_state(state_data: dict, clear_nav: bool = False):
if clear_nav:
state_data.clear()
else:
stack = state_data.get("navigation_stack")
context = state_data.get("navigation_context")
state_data.clear()
if stack:
state_data["navigation_stack"] = stack
if context:
state_data["navigation_context"] = context
def get_entity_item_repr(
entity: BotEntity, item_repr: EntityItemCaptionCallable | None = None
) -> str:
descr = entity.bot_entity_descriptor
if item_repr:
return item_repr(descr, entity)
return (
descr.item_repr(descr, entity)
if descr.item_repr
else f"{
get_callable_str(descr.full_name, descr, entity)
if descr.full_name
else descr.name
}: {str(entity.id)}"
)
def get_value_repr(
value: Any, field_descriptor: FieldDescriptor, locale: str | None = None
) -> str:
if value is None:
return ""
type_ = field_descriptor.type_base
if isinstance(value, bool):
return "【✔︎】" if value else "【 】"
elif field_descriptor.is_list:
if issubclass(type_, BotEntity):
return f"[{', '.join([get_entity_item_repr(item) for item in value])}]"
elif issubclass(type_, BotEnum):
return f"[{', '.join(item.localized(locale) for item in value)}]"
elif type_ is str:
return f"[{', '.join([f'"{item}"' for item in value])}]"
else:
return f"[{', '.join([str(item) for item in value])}]"
elif issubclass(type_, BotEntity):
return get_entity_item_repr(value)
elif issubclass(type_, BotEnum):
return value.localized(locale)
elif isinstance(value, str):
if field_descriptor and field_descriptor.localizable:
return get_local_text(text=value, locale=locale)
return value
elif isinstance(value, int):
return str(value)
elif isinstance(value, float):
return str(value)
else:
return str(value)
def get_callable_str(
callable_str: (
str
| LazyProxy
| EntityCaptionCallable
| EntityItemCaptionCallable
| EntityFieldCaptionCallable
),
descriptor: FieldDescriptor | EntityDescriptor,
entity: Any = None,
value: Any = None,
) -> str:
if isinstance(callable_str, str):
return callable_str
elif isinstance(callable_str, LazyProxy):
return callable_str.value
elif callable(callable_str):
args = signature(callable_str).parameters
if len(args) == 1:
return callable_str(descriptor)
elif len(args) == 2:
return callable_str(descriptor, entity)
elif len(args) == 3:
return callable_str(descriptor, entity, value)
def get_entity_descriptor(
app: "QBotApp", callback_data: ContextData
) -> EntityDescriptor:
if callback_data.entity_name:
return app.entity_metadata.entity_descriptors[callback_data.entity_name]
return None
def get_field_descriptor(
app: "QBotApp", callback_data: ContextData
) -> FieldDescriptor | None:
if callback_data.context == CommandContext.SETTING_EDIT:
return Settings.list_params()[callback_data.field_name]
elif callback_data.context == CommandContext.COMMAND_FORM:
command = app.bot_commands[callback_data.user_command.split("&")[0]]
if (
command
and command.param_form
and callback_data.field_name in command.param_form
):
return command.param_form[callback_data.field_name]
elif callback_data.context in [
CommandContext.ENTITY_CREATE,
CommandContext.ENTITY_EDIT,
CommandContext.ENTITY_FIELD_EDIT,
]:
entity_descriptor = get_entity_descriptor(app, callback_data)
if entity_descriptor:
return entity_descriptor.fields_descriptors.get(callback_data.field_name)
return None