349 lines
13 KiB
Python
349 lines
13 KiB
Python
from aiogram import Router, F
|
|
from aiogram.types import Message, CallbackQuery
|
|
from aiogram.fsm.context import FSMContext
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
from typing import TYPE_CHECKING
|
|
from decimal import Decimal
|
|
import json
|
|
|
|
from ..context import ContextData, CallbackCommand, CommandContext
|
|
from ...command_context_filter import CallbackCommandFilter
|
|
from ..user_handlers.main import cammand_handler
|
|
from ....model import EntityPermission
|
|
from ....model.user import UserBase
|
|
from ....model.settings import Settings
|
|
from ....model.descriptors import FieldDescriptor
|
|
from ....model.language import LanguageBase
|
|
from ....auth import authorize_command
|
|
from ....utils.main import (
|
|
get_user_permissions,
|
|
check_entity_permission,
|
|
clear_state,
|
|
get_entity_descriptor,
|
|
get_field_descriptor,
|
|
)
|
|
from ....utils.serialization import deserialize
|
|
from ..common.routing import route_callback
|
|
from .common import show_editor
|
|
|
|
if TYPE_CHECKING:
|
|
from ....main import QBotApp
|
|
|
|
router = Router()
|
|
|
|
|
|
@router.message(CallbackCommandFilter(CallbackCommand.FIELD_EDITOR_CALLBACK))
|
|
@router.callback_query(
|
|
ContextData.filter(F.command == CallbackCommand.FIELD_EDITOR_CALLBACK)
|
|
)
|
|
async def field_editor_callback(message: Message | CallbackQuery, **kwargs):
|
|
app: "QBotApp" = kwargs["app"]
|
|
state: FSMContext = kwargs["state"]
|
|
|
|
state_data = await state.get_data()
|
|
kwargs["state_data"] = state_data
|
|
|
|
if isinstance(message, Message):
|
|
callback_data: ContextData = kwargs.get("callback_data", None)
|
|
context_data = state_data.get("context_data")
|
|
if context_data:
|
|
callback_data = ContextData.unpack(context_data)
|
|
value = message.text
|
|
field_descriptor = get_field_descriptor(app, callback_data)
|
|
type_base = field_descriptor.type_base
|
|
|
|
if type_base is str and field_descriptor.localizable:
|
|
locale_index = int(state_data.get("locale_index"))
|
|
|
|
value = state_data.get("value")
|
|
if value:
|
|
value = json.loads(value)
|
|
else:
|
|
value = {}
|
|
|
|
value[list(LanguageBase.all_members.values())[locale_index].value] = (
|
|
message.text
|
|
)
|
|
value = json.dumps(value, ensure_ascii=False)
|
|
|
|
if locale_index < len(LanguageBase.all_members.values()) - 1:
|
|
current_value = state_data.get("current_value")
|
|
|
|
state_data.update({"value": value})
|
|
entity_descriptor = get_entity_descriptor(app, callback_data)
|
|
kwargs.update({"callback_data": callback_data})
|
|
|
|
return await show_editor(
|
|
message=message,
|
|
locale_index=locale_index + 1,
|
|
field_descriptor=field_descriptor,
|
|
entity_descriptor=entity_descriptor,
|
|
current_value=current_value,
|
|
value=value,
|
|
**kwargs,
|
|
)
|
|
# else:
|
|
# value = state_data.get("value")
|
|
# if value:
|
|
# value = json.loads(value)
|
|
# else:
|
|
# value = {}
|
|
# value[list(LanguageBase.all_members.keys())[locale_index]] = (
|
|
# message.text
|
|
# )
|
|
# value = json.dumps(value, ensure_ascii=False)
|
|
|
|
elif type_base in [int, float, Decimal]:
|
|
try:
|
|
_ = type_base(value) # @IgnoreException
|
|
except Exception:
|
|
return await message.answer(
|
|
text=(await Settings.get(Settings.APP_STRINGS_INVALID_INPUT))
|
|
)
|
|
else:
|
|
callback_data: ContextData = kwargs["callback_data"]
|
|
if callback_data.data:
|
|
if callback_data.data == "skip":
|
|
value = None
|
|
else:
|
|
value = callback_data.data
|
|
else:
|
|
value = state_data.get("value")
|
|
field_descriptor = get_field_descriptor(app, callback_data)
|
|
|
|
kwargs.update(
|
|
{
|
|
"callback_data": callback_data,
|
|
}
|
|
)
|
|
|
|
await process_field_edit_callback(
|
|
message=message, value=value, field_descriptor=field_descriptor, **kwargs
|
|
)
|
|
|
|
|
|
async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs):
|
|
user: UserBase = kwargs["user"]
|
|
db_session: AsyncSession = kwargs["db_session"]
|
|
callback_data: ContextData = kwargs.get("callback_data", None)
|
|
state_data: dict = kwargs["state_data"]
|
|
value = kwargs["value"]
|
|
field_descriptor: FieldDescriptor = kwargs["field_descriptor"]
|
|
|
|
if callback_data.context == CommandContext.SETTING_EDIT:
|
|
if callback_data.data != "cancel":
|
|
if await authorize_command(user=user, callback_data=callback_data):
|
|
value = await deserialize(
|
|
session=db_session, type_=field_descriptor.type_, value=value
|
|
)
|
|
await Settings.set_param(field_descriptor, value)
|
|
else:
|
|
return await message.answer(
|
|
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN))
|
|
)
|
|
|
|
return await route_callback(message=message, back=True, **kwargs)
|
|
|
|
elif callback_data.context in [
|
|
CommandContext.ENTITY_CREATE,
|
|
CommandContext.ENTITY_EDIT,
|
|
CommandContext.ENTITY_FIELD_EDIT,
|
|
CommandContext.COMMAND_FORM,
|
|
]:
|
|
app: "QBotApp" = kwargs["app"]
|
|
entity_descriptor = get_entity_descriptor(app, callback_data)
|
|
|
|
if callback_data.context == CommandContext.COMMAND_FORM:
|
|
field_sequence = list(field_descriptor.command.param_form.keys())
|
|
current_index = field_sequence.index(callback_data.field_name)
|
|
field_descriptors = field_descriptor.command.param_form
|
|
else:
|
|
form_name = (
|
|
callback_data.form_params.split("&")[0]
|
|
if callback_data.form_params
|
|
else "default"
|
|
)
|
|
form = entity_descriptor.forms.get(
|
|
form_name, entity_descriptor.default_form
|
|
)
|
|
|
|
field_sequence = form.edit_field_sequence
|
|
current_index = (
|
|
field_sequence.index(callback_data.field_name)
|
|
if callback_data.context
|
|
in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]
|
|
else 0
|
|
)
|
|
field_descriptors = entity_descriptor.fields_descriptors
|
|
|
|
entity_data = state_data.get("entity_data", {})
|
|
|
|
if callback_data.context == CommandContext.ENTITY_CREATE and not entity_data:
|
|
stack = state_data.get("navigation_stack", [])
|
|
prev_callback_data = ContextData.unpack(stack[-1]) if stack else None
|
|
if (
|
|
prev_callback_data
|
|
and prev_callback_data.command == CallbackCommand.ENTITY_LIST
|
|
and prev_callback_data.entity_name == entity_descriptor.name
|
|
):
|
|
prev_form_name = (
|
|
prev_callback_data.form_params.split("&")[0]
|
|
if prev_callback_data.form_params
|
|
else "default"
|
|
)
|
|
prev_form_params = (
|
|
prev_callback_data.form_params.split("&")[1:]
|
|
if prev_callback_data.form_params
|
|
else []
|
|
)
|
|
prev_form_list = entity_descriptor.lists.get(
|
|
prev_form_name or "default", entity_descriptor.default_list
|
|
)
|
|
|
|
if prev_form_list.static_filters:
|
|
for filt in prev_form_list.static_filters:
|
|
if filt.value_type == "const":
|
|
entity_data[filt.field_name] = filt.value
|
|
elif len(prev_form_params) > filt.param_index:
|
|
entity_data[filt.field_name] = prev_form_params[
|
|
filt.param_index
|
|
]
|
|
|
|
if (
|
|
callback_data.context
|
|
in [
|
|
CommandContext.ENTITY_CREATE,
|
|
CommandContext.ENTITY_EDIT,
|
|
CommandContext.COMMAND_FORM,
|
|
]
|
|
and current_index < len(field_sequence) - 1
|
|
):
|
|
entity_data[field_descriptor.field_name] = value
|
|
state_data.update({"entity_data": entity_data})
|
|
|
|
next_field_name = field_sequence[current_index + 1]
|
|
next_field_descriptor = field_descriptors[next_field_name]
|
|
kwargs.update({"field_descriptor": next_field_descriptor})
|
|
callback_data.field_name = next_field_name
|
|
|
|
state_entity_val = entity_data.get(next_field_descriptor.field_name)
|
|
|
|
current_value = (
|
|
await deserialize(
|
|
session=db_session,
|
|
type_=next_field_descriptor.type_,
|
|
value=state_entity_val,
|
|
)
|
|
if state_entity_val
|
|
else None
|
|
)
|
|
|
|
await show_editor(
|
|
message=message,
|
|
entity_descriptor=entity_descriptor,
|
|
current_value=current_value,
|
|
**kwargs,
|
|
)
|
|
|
|
else:
|
|
entity_data[field_descriptor.field_name] = value
|
|
|
|
# What if user has several roles and each role has its own ownership field? Should we allow creation even
|
|
# if user has no CREATE_ALL permission
|
|
|
|
# for role in user.roles:
|
|
# if role in entity_descriptor.ownership_fields and not EntityPermission.CREATE_ALL in user_permissions:
|
|
# entity_data[entity_descriptor.ownership_fields[role]] = user.id
|
|
|
|
deser_entity_data = {
|
|
key: await deserialize(
|
|
session=db_session,
|
|
type_=field_descriptors[key].type_,
|
|
value=value,
|
|
)
|
|
for key, value in entity_data.items()
|
|
}
|
|
|
|
if callback_data.context == CommandContext.ENTITY_CREATE:
|
|
entity_type = entity_descriptor.type_
|
|
user_permissions = get_user_permissions(user, entity_descriptor)
|
|
if (
|
|
EntityPermission.CREATE not in user_permissions
|
|
and EntityPermission.CREATE_ALL not in user_permissions
|
|
):
|
|
return await message.answer(
|
|
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN))
|
|
)
|
|
|
|
new_entity = await entity_type.create(
|
|
session=db_session,
|
|
obj_in=entity_type(**deser_entity_data),
|
|
commit=True,
|
|
)
|
|
|
|
form_name = (
|
|
callback_data.form_params.split("&")[0]
|
|
if callback_data.form_params
|
|
else "default"
|
|
)
|
|
form_list = entity_descriptor.lists.get(
|
|
form_name or "default", entity_descriptor.default_list
|
|
)
|
|
|
|
state_data["navigation_context"] = ContextData(
|
|
command=CallbackCommand.ENTITY_ITEM,
|
|
entity_name=entity_descriptor.name,
|
|
form_params=form_list.item_form,
|
|
entity_id=str(new_entity.id),
|
|
).pack()
|
|
|
|
state_data.update(state_data)
|
|
|
|
clear_state(state_data=state_data)
|
|
return await route_callback(message=message, back=False, **kwargs)
|
|
|
|
elif callback_data.context in [
|
|
CommandContext.ENTITY_EDIT,
|
|
CommandContext.ENTITY_FIELD_EDIT,
|
|
]:
|
|
entity_type = entity_descriptor.type_
|
|
entity_id = int(callback_data.entity_id)
|
|
entity = await entity_type.get(session=db_session, id=entity_id)
|
|
if not entity:
|
|
return await message.answer(
|
|
text=(await Settings.get(Settings.APP_STRINGS_NOT_FOUND))
|
|
)
|
|
|
|
if not check_entity_permission(
|
|
entity=entity, user=user, permission=EntityPermission.UPDATE
|
|
):
|
|
return await message.answer(
|
|
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN))
|
|
)
|
|
|
|
for key, value in deser_entity_data.items():
|
|
setattr(entity, key, value)
|
|
|
|
await db_session.commit()
|
|
|
|
elif callback_data.context == CommandContext.COMMAND_FORM:
|
|
clear_state(state_data=state_data)
|
|
state_data["entity_data"] = entity_data
|
|
|
|
kwargs.update(
|
|
{
|
|
"callback_data": ContextData(
|
|
command=CallbackCommand.USER_COMMAND,
|
|
user_command=callback_data.user_command,
|
|
data=callback_data.data,
|
|
)
|
|
}
|
|
)
|
|
|
|
return await cammand_handler(message=message, **kwargs)
|
|
|
|
clear_state(state_data=state_data)
|
|
|
|
# TODO: Try back=False and check if it works to navigate to newly created entity
|
|
await route_callback(message=message, back=True, **kwargs)
|