upd project structure

This commit is contained in:
Alexander Kalinovsky
2025-02-18 20:57:52 +01:00
parent baa55d28d6
commit 1d162677a8
62 changed files with 1305 additions and 48 deletions

214
src/qbot/utils/main.py Normal file
View File

@@ -0,0 +1,214 @@
from babel.support import LazyProxy
from inspect import signature
from aiogram.types import Message, CallbackQuery
from aiogram.utils.i18n import I18n
from typing import Any, TYPE_CHECKING
import ujson as json
from ..model.bot_entity import BotEntity
from ..model.bot_enum import BotEnum
from ..model.settings import Settings
from ..model.descriptors import (
FieldDescriptor,
EntityDescriptor,
EntityItemCaptionCallable,
EntityFieldCaptionCallable,
EntityPermission,
EntityCaptionCallable,
)
from ..bot.handlers.context import ContextData, CommandContext
if TYPE_CHECKING:
from ..model.user import UserBase
from ..main import QBotApp
def get_user_permissions(
user: "UserBase", entity_descriptor: EntityDescriptor
) -> list[EntityPermission]:
permissions = list[EntityPermission]()
for permission, roles in entity_descriptor.permissions.items():
for role in roles:
if role in user.roles:
permissions.append(permission)
break
return permissions
def get_local_text(text: str, locale: str = None) -> str:
if not locale:
i18n = I18n.get_current(no_error=True)
if i18n:
locale = i18n.current_locale
else:
locale = "en"
try:
obj = json.loads(text) # @IgnoreException
except Exception:
return text
else:
return obj.get(locale, obj[list(obj.keys())[0]])
def check_entity_permission(
entity: BotEntity, user: "UserBase", permission: EntityPermission
) -> bool:
perm_mapping = {
EntityPermission.LIST: EntityPermission.LIST_ALL,
EntityPermission.READ: EntityPermission.READ_ALL,
EntityPermission.UPDATE: EntityPermission.UPDATE_ALL,
EntityPermission.CREATE: EntityPermission.CREATE_ALL,
EntityPermission.DELETE: EntityPermission.DELETE_ALL,
}
if permission not in perm_mapping:
raise ValueError(f"Invalid permission: {permission}")
entity_descriptor = entity.__class__.bot_entity_descriptor
permissions = get_user_permissions(user, entity_descriptor)
if perm_mapping[permission] in permissions:
return True
ownership_filds = entity_descriptor.ownership_fields
for role in user.roles:
if role in ownership_filds:
if getattr(entity, ownership_filds[role]) == user.id:
return True
else:
if permission in permissions:
return True
return False
def get_send_message(message: Message | CallbackQuery):
if isinstance(message, Message):
return message.answer
else:
return message.message.edit_text
def clear_state(state_data: dict, clear_nav: bool = False):
if clear_nav:
state_data.clear()
else:
stack = state_data.get("navigation_stack")
context = state_data.get("navigation_context")
state_data.clear()
if stack:
state_data["navigation_stack"] = stack
if context:
state_data["navigation_context"] = context
def get_entity_item_repr(
entity: BotEntity, item_repr: EntityItemCaptionCallable | None = None
) -> str:
descr = entity.bot_entity_descriptor
if item_repr:
return item_repr(descr, entity)
return (
descr.item_repr(descr, entity)
if descr.item_repr
else f"{
get_callable_str(descr.full_name, descr, entity)
if descr.full_name
else descr.name
}: {str(entity.id)}"
)
def get_value_repr(
value: Any, field_descriptor: FieldDescriptor, locale: str | None = None
) -> str:
if value is None:
return ""
type_ = field_descriptor.type_base
if isinstance(value, bool):
return "【✔︎】" if value else "【 】"
elif field_descriptor.is_list:
if issubclass(type_, BotEntity):
return f"[{', '.join([get_entity_item_repr(item) for item in value])}]"
elif issubclass(type_, BotEnum):
return f"[{', '.join(item.localized(locale) for item in value)}]"
elif type_ is str:
return f"[{', '.join([f'"{item}"' for item in value])}]"
else:
return f"[{', '.join([str(item) for item in value])}]"
elif issubclass(type_, BotEntity):
return get_entity_item_repr(value)
elif issubclass(type_, BotEnum):
return value.localized(locale)
elif isinstance(value, str):
if field_descriptor and field_descriptor.localizable:
return get_local_text(text=value, locale=locale)
return value
elif isinstance(value, int):
return str(value)
elif isinstance(value, float):
return str(value)
else:
return str(value)
def get_callable_str(
callable_str: (
str
| LazyProxy
| EntityCaptionCallable
| EntityItemCaptionCallable
| EntityFieldCaptionCallable
),
descriptor: FieldDescriptor | EntityDescriptor,
entity: Any = None,
value: Any = None,
) -> str:
if isinstance(callable_str, str):
return callable_str
elif isinstance(callable_str, LazyProxy):
return callable_str.value
elif callable(callable_str):
args = signature(callable_str).parameters
if len(args) == 1:
return callable_str(descriptor)
elif len(args) == 2:
return callable_str(descriptor, entity)
elif len(args) == 3:
return callable_str(descriptor, entity, value)
def get_entity_descriptor(
app: "QBotApp", callback_data: ContextData
) -> EntityDescriptor:
if callback_data.entity_name:
return app.entity_metadata.entity_descriptors[callback_data.entity_name]
return None
def get_field_descriptor(
app: "QBotApp", callback_data: ContextData
) -> FieldDescriptor | None:
if callback_data.context == CommandContext.SETTING_EDIT:
return Settings.list_params()[callback_data.field_name]
elif callback_data.context == CommandContext.COMMAND_FORM:
command = app.bot_commands[callback_data.user_command.split("&")[0]]
if (
command
and command.param_form
and callback_data.field_name in command.param_form
):
return command.param_form[callback_data.field_name]
elif callback_data.context in [
CommandContext.ENTITY_CREATE,
CommandContext.ENTITY_EDIT,
CommandContext.ENTITY_FIELD_EDIT,
]:
entity_descriptor = get_entity_descriptor(app, callback_data)
if entity_descriptor:
return entity_descriptor.fields_descriptors.get(callback_data.field_name)
return None

View File

@@ -0,0 +1,50 @@
from ..bot.handlers.context import ContextData, CallbackCommand
def save_navigation_context(
callback_data: ContextData, state_data: dict
) -> list[ContextData]:
stack = [
ContextData.unpack(item) for item in state_data.get("navigation_stack", [])
]
data_nc = state_data.get("navigation_context")
navigation_context = ContextData.unpack(data_nc) if data_nc else None
if callback_data.back:
callback_data.back = False
if stack:
stack.pop()
else:
if (
stack
and navigation_context
and navigation_context.command == callback_data.command
and navigation_context.entity_name == callback_data.entity_name
and navigation_context.entity_id == callback_data.entity_id
and navigation_context.command != CallbackCommand.USER_COMMAND
):
navigation_context = callback_data
elif navigation_context:
stack.append(navigation_context)
state_data["navigation_stack"] = [item.pack() for item in stack]
state_data["navigation_context"] = callback_data.pack()
return stack
def pop_navigation_context(stack: list[ContextData]) -> ContextData | None:
if stack:
data = stack[-1]
data.back = True
return data
def get_navigation_context(
state_data: dict,
) -> tuple[list[ContextData], ContextData | None]:
data_nc = state_data.get("navigation_context")
context = ContextData.unpack(data_nc) if data_nc else None
return (
[ContextData.unpack(item) for item in state_data.get("navigation_stack", [])],
context,
)

View File

@@ -0,0 +1,90 @@
from datetime import datetime, time
from decimal import Decimal
from sqlmodel import select, column
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import Any, Union, get_origin, get_args
from types import UnionType, NoneType
import ujson as json
from ..model.bot_entity import BotEntity
from ..model.bot_enum import BotEnum
from ..model.descriptors import FieldDescriptor
async def deserialize[T](session: AsyncSession, type_: type[T], value: str = None) -> T:
type_origin = get_origin(type_)
is_optional = False
if type_origin in [UnionType, Union]:
args = get_args(type_)
if args[1] is NoneType:
type_ = args[0]
if value is None:
return None
is_optional = True
if get_origin(type_) is list:
arg_type = None
args = get_args(type_)
if args:
arg_type = args[0]
values = json.loads(value) if value else []
if arg_type:
if issubclass(arg_type, BotEntity):
ret = list[arg_type]()
items = (
await session.exec(select(arg_type).where(column("id").in_(values)))
).all()
for item in items:
ret.append(item)
return ret
elif issubclass(arg_type, BotEnum):
return [arg_type(value) for value in values]
else:
return [arg_type(value) for value in values]
else:
return values
elif issubclass(type_, BotEntity):
if is_optional and not value:
return None
return await session.get(type_, int(value))
elif issubclass(type_, BotEnum):
if is_optional and not value:
return None
return type_(value)
elif type_ is time:
if is_optional and not value:
return None
return time.fromisoformat(value.replace("-", ":"))
elif type_ is datetime:
if is_optional and not value:
return None
if value[-3] == "-":
return datetime.strptime(value, "%Y-%m-%d %H-%M")
else:
return datetime.fromisoformat(value)
elif type_ is bool:
return value == "True"
elif type_ is Decimal:
if is_optional and not value:
return None
return Decimal(value)
if is_optional and not value:
return None
return type_(value)
def serialize(value: Any, field_descriptor: FieldDescriptor) -> str:
if value is None:
return ""
type_ = field_descriptor.type_base
if field_descriptor.is_list:
if issubclass(type_, BotEntity):
return json.dumps([item.id for item in value], ensure_ascii=False)
elif issubclass(type_, BotEnum):
return json.dumps([item.value for item in value], ensure_ascii=False)
else:
return json.dumps(value, ensure_ascii=False)
elif issubclass(type_, BotEntity):
return str(value.id) if value else ""
return str(value)