from babel.support import LazyProxy from inspect import iscoroutinefunction, signature from aiogram.types import Message, CallbackQuery from aiogram.utils.i18n import I18n from typing import Any, TYPE_CHECKING import ujson as json from ..model.bot_entity import BotEntity from ..model.bot_enum import BotEnum from ..model.settings import Settings from ..model.descriptors import ( FieldDescriptor, EntityDescriptor, EntityItemCaptionCallable, EntityFieldCaptionCallable, EntityPermission, EntityCaptionCallable, ) from ..bot.handlers.context import ContextData, CommandContext if TYPE_CHECKING: from ..model.user import UserBase from ..main import QBotApp def get_user_permissions( user: "UserBase", entity_descriptor: EntityDescriptor ) -> list[EntityPermission]: permissions = list[EntityPermission]() for permission, roles in entity_descriptor.permissions.items(): for role in roles: if role in user.roles: permissions.append(permission) break return permissions def get_local_text(text: str, locale: str = None) -> str: if not locale: i18n = I18n.get_current(no_error=True) if i18n: locale = i18n.current_locale else: locale = "en" try: obj = json.loads(text) # @IgnoreException except Exception: return text else: return obj.get(locale, obj[list(obj.keys())[0]]) def check_entity_permission( entity: BotEntity, user: "UserBase", permission: EntityPermission ) -> bool: perm_mapping = { EntityPermission.LIST: EntityPermission.LIST_ALL, EntityPermission.READ: EntityPermission.READ_ALL, EntityPermission.UPDATE: EntityPermission.UPDATE_ALL, EntityPermission.CREATE: EntityPermission.CREATE_ALL, EntityPermission.DELETE: EntityPermission.DELETE_ALL, } if permission not in perm_mapping: raise ValueError(f"Invalid permission: {permission}") entity_descriptor = entity.__class__.bot_entity_descriptor permissions = get_user_permissions(user, entity_descriptor) if perm_mapping[permission] in permissions: return True ownership_filds = entity_descriptor.ownership_fields for role in user.roles: if role in ownership_filds: if getattr(entity, ownership_filds[role]) == user.id: return True else: if permission in permissions: return True return False def get_send_message(message: Message | CallbackQuery): if isinstance(message, Message): return message.answer else: return message.message.edit_text def clear_state(state_data: dict, clear_nav: bool = False): if clear_nav: state_data.clear() else: stack = state_data.get("navigation_stack") context = state_data.get("navigation_context") state_data.clear() if stack: state_data["navigation_stack"] = stack if context: state_data["navigation_context"] = context async def get_entity_item_repr( entity: BotEntity, item_repr: EntityItemCaptionCallable | None = None ) -> str: descr = entity.bot_entity_descriptor if item_repr: return item_repr(descr, entity) return ( descr.item_repr(descr, entity) if descr.item_repr else f"{ await get_callable_str(descr.full_name, descr, entity) if descr.full_name else descr.name }: {str(entity.id)}" ) async def get_value_repr( value: Any, field_descriptor: FieldDescriptor, locale: str | None = None ) -> str: if value is None: return "" type_ = field_descriptor.type_base if isinstance(value, bool): return "【✔︎】" if value else "【 】" elif field_descriptor.is_list: if issubclass(type_, BotEntity): return ( f"[{', '.join([await get_entity_item_repr(item) for item in value])}]" ) elif issubclass(type_, BotEnum): return f"[{', '.join(item.localized(locale) for item in value)}]" elif type_ is str: return f"[{', '.join([f'"{item}"' for item in value])}]" else: return f"[{', '.join([str(item) for item in value])}]" elif issubclass(type_, BotEntity): return await get_entity_item_repr(value) elif issubclass(type_, BotEnum): return value.localized(locale) elif isinstance(value, str): if field_descriptor and field_descriptor.localizable: return get_local_text(text=value, locale=locale) return value elif isinstance(value, int): return str(value) elif isinstance(value, float): return str(value) else: return str(value) async def get_callable_str( callable_str: ( str | LazyProxy | EntityCaptionCallable | EntityItemCaptionCallable | EntityFieldCaptionCallable ), descriptor: FieldDescriptor | EntityDescriptor, entity: Any = None, value: Any = None, ) -> str: if isinstance(callable_str, str): return callable_str elif isinstance(callable_str, LazyProxy): return callable_str.value elif callable(callable_str): args = signature(callable_str).parameters if iscoroutinefunction(callable_str): if len(args) == 1: return await callable_str(descriptor) elif len(args) == 2: return await callable_str(descriptor, entity) elif len(args) == 3: return await callable_str(descriptor, entity, value) else: if len(args) == 1: return callable_str(descriptor) elif len(args) == 2: return callable_str(descriptor, entity) elif len(args) == 3: return callable_str(descriptor, entity, value) def get_entity_descriptor( app: "QBotApp", callback_data: ContextData ) -> EntityDescriptor: if callback_data.entity_name: return app.entity_metadata.entity_descriptors[callback_data.entity_name] return None def get_field_descriptor( app: "QBotApp", callback_data: ContextData ) -> FieldDescriptor | None: if callback_data.context == CommandContext.SETTING_EDIT: return Settings.list_params()[callback_data.field_name] elif callback_data.context == CommandContext.COMMAND_FORM: command = app.bot_commands[callback_data.user_command.split("&")[0]] if ( command and command.param_form and callback_data.field_name in command.param_form ): return command.param_form[callback_data.field_name] elif callback_data.context in [ CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT, ]: entity_descriptor = get_entity_descriptor(app, callback_data) if entity_descriptor: return entity_descriptor.fields_descriptors.get(callback_data.field_name) return None