from aiogram import Router, F from aiogram.types import Message, CallbackQuery from aiogram.fsm.context import FSMContext from sqlmodel.ext.asyncio.session import AsyncSession from typing import TYPE_CHECKING from decimal import Decimal import json from ..context import ContextData, CallbackCommand, CommandContext from ...command_context_filter import CallbackCommandFilter from ..user_handlers.main import cammand_handler from ....model import EntityPermission from ....model.user import UserBase from ....model.settings import Settings from ....model.descriptors import FieldDescriptor from ....model.language import LanguageBase from ....auth import authorize_command from ....utils.main import ( get_user_permissions, check_entity_permission, clear_state, get_entity_descriptor, get_field_descriptor, ) from ....utils.serialization import deserialize from ..common.routing import route_callback from .common import show_editor if TYPE_CHECKING: from ....main import QBotApp router = Router() @router.message(CallbackCommandFilter(CallbackCommand.FIELD_EDITOR_CALLBACK)) @router.callback_query( ContextData.filter(F.command == CallbackCommand.FIELD_EDITOR_CALLBACK) ) async def field_editor_callback(message: Message | CallbackQuery, **kwargs): app: "QBotApp" = kwargs["app"] state: FSMContext = kwargs["state"] state_data = await state.get_data() kwargs["state_data"] = state_data if isinstance(message, Message): callback_data: ContextData = kwargs.get("callback_data", None) context_data = state_data.get("context_data") if context_data: callback_data = ContextData.unpack(context_data) value = message.text field_descriptor = get_field_descriptor(app, callback_data) type_base = field_descriptor.type_base if type_base is str and field_descriptor.localizable: locale_index = int(state_data.get("locale_index")) value = state_data.get("value") if value: value = json.loads(value) else: value = {} value[list(LanguageBase.all_members.values())[locale_index].value] = ( message.text ) value = json.dumps(value, ensure_ascii=False) if locale_index < len(LanguageBase.all_members.values()) - 1: current_value = state_data.get("current_value") state_data.update({"value": value}) entity_descriptor = get_entity_descriptor(app, callback_data) kwargs.update({"callback_data": callback_data}) return await show_editor( message=message, locale_index=locale_index + 1, field_descriptor=field_descriptor, entity_descriptor=entity_descriptor, current_value=current_value, value=value, **kwargs, ) # else: # value = state_data.get("value") # if value: # value = json.loads(value) # else: # value = {} # value[list(LanguageBase.all_members.keys())[locale_index]] = ( # message.text # ) # value = json.dumps(value, ensure_ascii=False) elif type_base in [int, float, Decimal]: try: _ = type_base(value) # @IgnoreException except Exception: return await message.answer( text=(await Settings.get(Settings.APP_STRINGS_INVALID_INPUT)) ) else: callback_data: ContextData = kwargs["callback_data"] if callback_data.data: if callback_data.data == "skip": value = None else: value = callback_data.data else: value = state_data.get("value") field_descriptor = get_field_descriptor(app, callback_data) kwargs.update( { "callback_data": callback_data, } ) await process_field_edit_callback( message=message, value=value, field_descriptor=field_descriptor, **kwargs ) async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs): user: UserBase = kwargs["user"] db_session: AsyncSession = kwargs["db_session"] callback_data: ContextData = kwargs.get("callback_data", None) state_data: dict = kwargs["state_data"] value = kwargs["value"] field_descriptor: FieldDescriptor = kwargs["field_descriptor"] if callback_data.context == CommandContext.SETTING_EDIT: if callback_data.data != "cancel": if await authorize_command(user=user, callback_data=callback_data): value = await deserialize( session=db_session, type_=field_descriptor.type_, value=value ) await Settings.set_param(field_descriptor, value) else: return await message.answer( text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)) ) return await route_callback(message=message, back=True, **kwargs) elif callback_data.context in [ CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT, CommandContext.COMMAND_FORM, ]: app: "QBotApp" = kwargs["app"] entity_descriptor = get_entity_descriptor(app, callback_data) if callback_data.context == CommandContext.COMMAND_FORM: field_sequence = list(field_descriptor.command.param_form.keys()) current_index = field_sequence.index(callback_data.field_name) field_descriptors = field_descriptor.command.param_form else: form_name = ( callback_data.form_params.split("&")[0] if callback_data.form_params else "default" ) form = entity_descriptor.forms.get( form_name, entity_descriptor.default_form ) field_sequence = form.edit_field_sequence current_index = ( field_sequence.index(callback_data.field_name) if callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT] else 0 ) field_descriptors = entity_descriptor.fields_descriptors entity_data = state_data.get("entity_data", {}) if callback_data.context == CommandContext.ENTITY_CREATE and not entity_data: stack = state_data.get("navigation_stack", []) prev_callback_data = ContextData.unpack(stack[-1]) if stack else None if ( prev_callback_data and prev_callback_data.command == CallbackCommand.ENTITY_LIST and prev_callback_data.entity_name == entity_descriptor.name ): prev_form_name = ( prev_callback_data.form_params.split("&")[0] if prev_callback_data.form_params else "default" ) prev_form_params = ( prev_callback_data.form_params.split("&")[1:] if prev_callback_data.form_params else [] ) prev_form_list = entity_descriptor.lists.get( prev_form_name or "default", entity_descriptor.default_list ) if prev_form_list.static_filters: for filt in prev_form_list.static_filters: if filt.value_type == "const": entity_data[filt.field_name] = filt.value elif len(prev_form_params) > filt.param_index: entity_data[filt.field_name] = prev_form_params[ filt.param_index ] if ( callback_data.context in [ CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT, CommandContext.COMMAND_FORM, ] and current_index < len(field_sequence) - 1 ): entity_data[field_descriptor.field_name] = value state_data.update({"entity_data": entity_data}) next_field_name = field_sequence[current_index + 1] next_field_descriptor = field_descriptors[next_field_name] kwargs.update({"field_descriptor": next_field_descriptor}) callback_data.field_name = next_field_name state_entity_val = entity_data.get(next_field_descriptor.field_name) current_value = ( await deserialize( session=db_session, type_=next_field_descriptor.type_, value=state_entity_val, ) if state_entity_val else None ) await show_editor( message=message, entity_descriptor=entity_descriptor, current_value=current_value, **kwargs, ) else: entity_data[field_descriptor.field_name] = value # What if user has several roles and each role has its own ownership field? Should we allow creation even # if user has no CREATE_ALL permission # for role in user.roles: # if role in entity_descriptor.ownership_fields and not EntityPermission.CREATE_ALL in user_permissions: # entity_data[entity_descriptor.ownership_fields[role]] = user.id deser_entity_data = { key: await deserialize( session=db_session, type_=field_descriptors[key].type_, value=value, ) for key, value in entity_data.items() } if callback_data.context == CommandContext.ENTITY_CREATE: entity_type = entity_descriptor.type_ user_permissions = get_user_permissions(user, entity_descriptor) if ( EntityPermission.CREATE not in user_permissions and EntityPermission.CREATE_ALL not in user_permissions ): return await message.answer( text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)) ) new_entity = await entity_type.create( session=db_session, obj_in=entity_type(**deser_entity_data), commit=True, ) form_name = ( callback_data.form_params.split("&")[0] if callback_data.form_params else "default" ) form_list = entity_descriptor.lists.get( form_name or "default", entity_descriptor.default_list ) state_data["navigation_context"] = ContextData( command=CallbackCommand.ENTITY_ITEM, entity_name=entity_descriptor.name, form_params=form_list.item_form, entity_id=str(new_entity.id), ).pack() state_data.update(state_data) clear_state(state_data=state_data) return await route_callback(message=message, back=False, **kwargs) elif callback_data.context in [ CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT, ]: entity_type = entity_descriptor.type_ entity_id = int(callback_data.entity_id) entity = await entity_type.get(session=db_session, id=entity_id) if not entity: return await message.answer( text=(await Settings.get(Settings.APP_STRINGS_NOT_FOUND)) ) if not check_entity_permission( entity=entity, user=user, permission=EntityPermission.UPDATE ): return await message.answer( text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)) ) for key, value in deser_entity_data.items(): setattr(entity, key, value) await db_session.commit() elif callback_data.context == CommandContext.COMMAND_FORM: clear_state(state_data=state_data) state_data["entity_data"] = entity_data kwargs.update( { "callback_data": ContextData( command=CallbackCommand.USER_COMMAND, user_command=callback_data.user_command, ) } ) cmd = app.bot_commands.get(callback_data.user_command.split("&")[0]) return await cammand_handler(message=message, cmd=cmd, **kwargs) clear_state(state_data=state_data) # TODO: Try back=False and check if it works to navigate to newly created entity await route_callback(message=message, back=True, **kwargs)