225 lines
7.0 KiB
Python
225 lines
7.0 KiB
Python
from babel.support import LazyProxy
|
|
from inspect import iscoroutinefunction, signature
|
|
from aiogram.types import Message, CallbackQuery
|
|
from aiogram.utils.i18n import I18n
|
|
from typing import Any, TYPE_CHECKING
|
|
import ujson as json
|
|
|
|
from ..model.bot_entity import BotEntity
|
|
from ..model.bot_enum import BotEnum
|
|
from ..model.settings import Settings
|
|
|
|
from ..model.descriptors import (
|
|
FieldDescriptor,
|
|
EntityDescriptor,
|
|
EntityItemCaptionCallable,
|
|
EntityFieldCaptionCallable,
|
|
EntityPermission,
|
|
EntityCaptionCallable,
|
|
)
|
|
|
|
from ..bot.handlers.context import ContextData, CommandContext
|
|
|
|
if TYPE_CHECKING:
|
|
from ..model.user import UserBase
|
|
from ..main import QBotApp
|
|
|
|
|
|
def get_user_permissions(
|
|
user: "UserBase", entity_descriptor: EntityDescriptor
|
|
) -> list[EntityPermission]:
|
|
permissions = list[EntityPermission]()
|
|
for permission, roles in entity_descriptor.permissions.items():
|
|
for role in roles:
|
|
if role in user.roles:
|
|
permissions.append(permission)
|
|
break
|
|
return permissions
|
|
|
|
|
|
def get_local_text(text: str, locale: str = None) -> str:
|
|
if not locale:
|
|
i18n = I18n.get_current(no_error=True)
|
|
if i18n:
|
|
locale = i18n.current_locale
|
|
else:
|
|
locale = "en"
|
|
try:
|
|
obj = json.loads(text) # @IgnoreException
|
|
except Exception:
|
|
return text
|
|
else:
|
|
return obj.get(locale, obj[list(obj.keys())[0]])
|
|
|
|
|
|
def check_entity_permission(
|
|
entity: BotEntity, user: "UserBase", permission: EntityPermission
|
|
) -> bool:
|
|
perm_mapping = {
|
|
EntityPermission.LIST: EntityPermission.LIST_ALL,
|
|
EntityPermission.READ: EntityPermission.READ_ALL,
|
|
EntityPermission.UPDATE: EntityPermission.UPDATE_ALL,
|
|
EntityPermission.CREATE: EntityPermission.CREATE_ALL,
|
|
EntityPermission.DELETE: EntityPermission.DELETE_ALL,
|
|
}
|
|
|
|
if permission not in perm_mapping:
|
|
raise ValueError(f"Invalid permission: {permission}")
|
|
|
|
entity_descriptor = entity.__class__.bot_entity_descriptor
|
|
permissions = get_user_permissions(user, entity_descriptor)
|
|
|
|
if perm_mapping[permission] in permissions:
|
|
return True
|
|
|
|
ownership_filds = entity_descriptor.ownership_fields
|
|
|
|
for role in user.roles:
|
|
if role in ownership_filds:
|
|
if getattr(entity, ownership_filds[role]) == user.id:
|
|
return True
|
|
else:
|
|
if permission in permissions:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def get_send_message(message: Message | CallbackQuery):
|
|
if isinstance(message, Message):
|
|
return message.answer
|
|
else:
|
|
return message.message.edit_text
|
|
|
|
|
|
def clear_state(state_data: dict, clear_nav: bool = False):
|
|
if clear_nav:
|
|
state_data.clear()
|
|
else:
|
|
stack = state_data.get("navigation_stack")
|
|
context = state_data.get("navigation_context")
|
|
state_data.clear()
|
|
if stack:
|
|
state_data["navigation_stack"] = stack
|
|
if context:
|
|
state_data["navigation_context"] = context
|
|
|
|
|
|
async def get_entity_item_repr(
|
|
entity: BotEntity, item_repr: EntityItemCaptionCallable | None = None
|
|
) -> str:
|
|
descr = entity.bot_entity_descriptor
|
|
if item_repr:
|
|
return item_repr(descr, entity)
|
|
return (
|
|
descr.item_repr(descr, entity)
|
|
if descr.item_repr
|
|
else f"{
|
|
await get_callable_str(descr.full_name, descr, entity)
|
|
if descr.full_name
|
|
else descr.name
|
|
}: {str(entity.id)}"
|
|
)
|
|
|
|
|
|
async def get_value_repr(
|
|
value: Any, field_descriptor: FieldDescriptor, locale: str | None = None
|
|
) -> str:
|
|
if value is None:
|
|
return ""
|
|
|
|
type_ = field_descriptor.type_base
|
|
if isinstance(value, bool):
|
|
return "【✔︎】" if value else "【 】"
|
|
elif field_descriptor.is_list:
|
|
if issubclass(type_, BotEntity):
|
|
return (
|
|
f"[{', '.join([await get_entity_item_repr(item) for item in value])}]"
|
|
)
|
|
elif issubclass(type_, BotEnum):
|
|
return f"[{', '.join(item.localized(locale) for item in value)}]"
|
|
elif type_ is str:
|
|
return f"[{', '.join([f'"{item}"' for item in value])}]"
|
|
else:
|
|
return f"[{', '.join([str(item) for item in value])}]"
|
|
elif issubclass(type_, BotEntity):
|
|
return await get_entity_item_repr(value)
|
|
elif issubclass(type_, BotEnum):
|
|
return value.localized(locale)
|
|
elif isinstance(value, str):
|
|
if field_descriptor and field_descriptor.localizable:
|
|
return get_local_text(text=value, locale=locale)
|
|
return value
|
|
elif isinstance(value, int):
|
|
return str(value)
|
|
elif isinstance(value, float):
|
|
return str(value)
|
|
else:
|
|
return str(value)
|
|
|
|
|
|
async def get_callable_str(
|
|
callable_str: (
|
|
str
|
|
| LazyProxy
|
|
| EntityCaptionCallable
|
|
| EntityItemCaptionCallable
|
|
| EntityFieldCaptionCallable
|
|
),
|
|
descriptor: FieldDescriptor | EntityDescriptor,
|
|
entity: Any = None,
|
|
value: Any = None,
|
|
) -> str:
|
|
if isinstance(callable_str, str):
|
|
return callable_str
|
|
elif isinstance(callable_str, LazyProxy):
|
|
return callable_str.value
|
|
elif callable(callable_str):
|
|
args = signature(callable_str).parameters
|
|
if iscoroutinefunction(callable_str):
|
|
if len(args) == 1:
|
|
return await callable_str(descriptor)
|
|
elif len(args) == 2:
|
|
return await callable_str(descriptor, entity)
|
|
elif len(args) == 3:
|
|
return await callable_str(descriptor, entity, value)
|
|
else:
|
|
if len(args) == 1:
|
|
return callable_str(descriptor)
|
|
elif len(args) == 2:
|
|
return callable_str(descriptor, entity)
|
|
elif len(args) == 3:
|
|
return callable_str(descriptor, entity, value)
|
|
|
|
|
|
def get_entity_descriptor(
|
|
app: "QBotApp", callback_data: ContextData
|
|
) -> EntityDescriptor:
|
|
if callback_data.entity_name:
|
|
return app.entity_metadata.entity_descriptors[callback_data.entity_name]
|
|
return None
|
|
|
|
|
|
def get_field_descriptor(
|
|
app: "QBotApp", callback_data: ContextData
|
|
) -> FieldDescriptor | None:
|
|
if callback_data.context == CommandContext.SETTING_EDIT:
|
|
return Settings.list_params()[callback_data.field_name]
|
|
elif callback_data.context == CommandContext.COMMAND_FORM:
|
|
command = app.bot_commands[callback_data.user_command.split("&")[0]]
|
|
if (
|
|
command
|
|
and command.param_form
|
|
and callback_data.field_name in command.param_form
|
|
):
|
|
return command.param_form[callback_data.field_name]
|
|
elif callback_data.context in [
|
|
CommandContext.ENTITY_CREATE,
|
|
CommandContext.ENTITY_EDIT,
|
|
CommandContext.ENTITY_FIELD_EDIT,
|
|
]:
|
|
entity_descriptor = get_entity_descriptor(app, callback_data)
|
|
if entity_descriptor:
|
|
return entity_descriptor.fields_descriptors.get(callback_data.field_name)
|
|
return None
|