enhanced strings and elements visibility delegates
All checks were successful
Build Docs / changes (push) Successful in 27s
Build Docs / build-docs (push) Has been skipped
Build Docs / deploy-docs (push) Has been skipped

This commit is contained in:
Alexander Kalinovsky
2025-04-01 00:19:51 +07:00
parent 6a7355996c
commit e10d7ff7bf
15 changed files with 458 additions and 206 deletions

View File

@@ -15,7 +15,7 @@ from .model.descriptors import (
Filter as Filter,
EntityPermission as EntityPermission,
CommandCallbackContext as CommandCallbackContext,
EntityEventContext as EntityEventContext,
BotContext as BotContext,
CommandButton as CommandButton,
FieldEditButton as FieldEditButton,
InlineButton as InlineButton,

View File

@@ -30,14 +30,12 @@ async def telegram_webhook(
logger.error("Invalid request", exc_info=True)
return Response(status_code=400)
try:
state_kw = request.state._state # TODO: avoid accessing private attribute
await app.dp.feed_webhook_update(
app.bot,
update,
db_session=db_session,
app=app,
**(state_kw or {}),
app_state=request.state,
)
except Exception:
logger.error("Error processing update", exc_info=True)

View File

@@ -1,7 +1,7 @@
from aiogram.types import InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from ....model.descriptors import EntityDescriptor
from ....model.descriptors import BotContext, EntityDescriptor
from ....utils.main import get_callable_str
from ..context import ContextData, CallbackCommand
@@ -9,6 +9,7 @@ from ..context import ContextData, CallbackCommand
async def add_filter_controls(
keyboard_builder: InlineKeyboardBuilder,
entity_descriptor: EntityDescriptor,
context: BotContext,
filter: str = None,
filtering_fields: list[str] = None,
page: int = 1,
@@ -16,8 +17,9 @@ async def add_filter_controls(
caption = ", ".join(
[
await get_callable_str(
entity_descriptor.fields_descriptors[field_name].caption,
entity_descriptor,
callable_str=entity_descriptor.fields_descriptors[field_name].caption,
context=context,
descriptor=entity_descriptor.fields_descriptors[field_name],
)
if entity_descriptor.fields_descriptors[field_name].caption
else field_name

View File

@@ -2,9 +2,12 @@ from aiogram.types import Message, CallbackQuery
from decimal import Decimal
from datetime import datetime, time
from quickbot.main import QBotApp
from quickbot.utils.serialization import deserialize
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.descriptors import FieldDescriptor
from ....model.descriptors import BotContext, FieldDescriptor
from ....model.settings import Settings
from ....model.user import UserBase
from ....utils.main import get_callable_str, get_value_repr
@@ -21,38 +24,87 @@ async def show_editor(message: Message | CallbackQuery, **kwargs):
user: UserBase = kwargs["user"]
callback_data: ContextData = kwargs.get("callback_data", None)
state_data: dict = kwargs["state_data"]
db_session = kwargs["db_session"]
app: QBotApp = kwargs["app"]
value_type = field_descriptor.type_base
entity_data_dict: dict = state_data.get("entity_data")
entity_data = None
if callback_data.context == CommandContext.COMMAND_FORM:
cmd = app.bot_commands.get(callback_data.user_command.split("&")[0])
entity_data = (
{
key: await deserialize(
session=kwargs["db_session"],
type_=cmd.param_form[key].type_,
value=value,
)
for key, value in entity_data_dict.items()
}
if entity_data_dict and cmd.param_form
else None
)
elif callback_data.context == CommandContext.ENTITY_CREATE:
entity_data = (
{
key: await deserialize(
session=kwargs["db_session"],
type_=field_descriptor.entity_descriptor.fields_descriptors[
key
].type_,
value=value,
)
for key, value in entity_data_dict.items()
}
if entity_data_dict
else None
)
else:
entity_id = callback_data.entity_id
if entity_id:
entity_data = await field_descriptor.entity_descriptor.type_.get(
session=db_session, id=entity_id
)
context = BotContext(
db_session=db_session, app=app, app_state=kwargs["app_state"], message=message
)
if field_descriptor.edit_prompt:
edit_prompt = await get_callable_str(
field_descriptor.edit_prompt,
field_descriptor,
callback_data
if callback_data.context == CommandContext.COMMAND_FORM
else None,
current_value,
callable_str=field_descriptor.edit_prompt,
context=context,
descriptor=field_descriptor,
entity=entity_data,
)
else:
if field_descriptor.caption:
caption_str = await get_callable_str(
field_descriptor.caption,
field_descriptor,
callback_data
if callback_data.context == CommandContext.COMMAND_FORM
else None,
current_value,
context=context,
descriptor=field_descriptor,
)
else:
caption_str = field_descriptor.name
if callback_data.context == CommandContext.ENTITY_EDIT:
db_session = kwargs["db_session"]
app = kwargs["app"]
edit_prompt = (
await Settings.get(
Settings.APP_STRINGS_FIELD_EDIT_PROMPT_TEMPLATE_P_NAME_VALUE
)
).format(
name=caption_str,
value=await get_value_repr(current_value, field_descriptor, user.lang),
value=await get_value_repr(
value=current_value,
field_descriptor=field_descriptor,
context=context,
locale=user.lang,
),
)
else:
edit_prompt = (

View File

@@ -14,7 +14,7 @@ from ....model.bot_enum import BotEnum
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.view_setting import ViewSetting
from ....model.descriptors import FieldDescriptor, Filter
from ....model.descriptors import BotContext, EntityList, FieldDescriptor, Filter
from ....model import EntityPermission
from ....utils.main import (
get_user_permissions,
@@ -108,9 +108,8 @@ async def render_entity_picker(
for item in enum_items
]
elif issubclass(type_, BotEntity):
form_name = field_descriptor.ep_form or "default"
form_list = type_.bot_entity_descriptor.lists.get(
form_name, type_.bot_entity_descriptor.default_list
form_list: EntityList = type_.bot_entity_descriptor.lists.get(
field_descriptor.ep_form, type_.bot_entity_descriptor.default_list
)
permissions = get_user_permissions(user, type_.bot_entity_descriptor)
if form_list.filtering:
@@ -196,6 +195,13 @@ async def render_entity_picker(
page = 1
entity_items = list[BotEntity]()
context = BotContext(
db_session=db_session,
app=kwargs["app"],
app_state=kwargs["app_state"],
message=message,
)
items = [
{
"text": f"{
@@ -205,14 +211,16 @@ async def render_entity_picker(
if item in (current_value or [])
else '【 】 '
}{
type_.bot_entity_descriptor.item_repr(
type_.bot_entity_descriptor, item
await get_callable_str(
callable_str=type_.bot_entity_descriptor.item_repr,
context=context,
entity=item,
)
if type_.bot_entity_descriptor.item_repr
else await get_callable_str(
type_.bot_entity_descriptor.full_name,
type_.bot_entity_descriptor,
item,
callable_str=type_.bot_entity_descriptor.full_name,
context=context,
descriptor=type_.bot_entity_descriptor,
)
if type_.bot_entity_descriptor.full_name
else f'{type_.bot_entity_descriptor.name}: {str(item.id)}'
@@ -262,6 +270,7 @@ async def render_entity_picker(
await add_filter_controls(
keyboard_builder=keyboard_builder,
entity_descriptor=type_.bot_entity_descriptor,
context=context,
filter=entity_filter,
filtering_fields=form_list.filtering_fields,
)

View File

@@ -5,6 +5,8 @@ from aiogram.types import Message, CallbackQuery
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from quickbot.model.descriptors import EntityForm
from ....model import EntityPermission
from ....model.settings import Settings
from ....model.user import UserBase
@@ -127,14 +129,17 @@ async def field_editor(message: Message | CallbackQuery, **kwargs):
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
else None
)
form = entity_descriptor.forms.get(
form: EntityForm = entity_descriptor.forms.get(
form_name, entity_descriptor.default_form
)
entity_data = {
key: serialize(
getattr(entity, key),
getattr(
entity,
entity_descriptor.fields_descriptors[key].field_name,
),
entity_descriptor.fields_descriptors[key],
)
for key in (

View File

@@ -13,7 +13,12 @@ from ..user_handlers.main import cammand_handler
from ....model import EntityPermission
from ....model.user import UserBase
from ....model.settings import Settings
from ....model.descriptors import EntityEventContext, FieldDescriptor
from ....model.descriptors import (
BotContext,
EntityForm,
EntityList,
FieldDescriptor,
)
from ....model.language import LanguageBase
from ....auth import authorize_command
from ....utils.main import (
@@ -84,16 +89,6 @@ async def field_editor_callback(message: Message | CallbackQuery, **kwargs):
value=value,
**kwargs,
)
# else:
# value = state_data.get("value")
# if value:
# value = json.loads(value)
# else:
# value = {}
# value[list(LanguageBase.all_members.keys())[locale_index]] = (
# message.text
# )
# value = json.dumps(value, ensure_ascii=False)
elif type_base in [int, float, Decimal]:
try:
@@ -163,9 +158,9 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
else None
)
form = entity_descriptor.forms.get(
form: EntityForm = entity_descriptor.forms.get(
form_name, entity_descriptor.default_form
)
@@ -176,6 +171,7 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
entity_descriptor=entity_descriptor,
user=user,
callback_data=callback_data,
state_data=state_data,
)
current_index = (
@@ -188,7 +184,7 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
entity_data = state_data.get("entity_data", {})
if callback_data.context == CommandContext.ENTITY_CREATE and not entity_data:
if callback_data.context == CommandContext.ENTITY_CREATE:
stack = state_data.get("navigation_stack", [])
prev_callback_data = ContextData.unpack(stack[-1]) if stack else None
if (
@@ -199,15 +195,15 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
prev_form_name = (
prev_callback_data.form_params.split("&")[0]
if prev_callback_data.form_params
else "default"
else None
)
prev_form_params = (
prev_callback_data.form_params.split("&")[1:]
if prev_callback_data.form_params
else []
)
prev_form_list = entity_descriptor.lists.get(
prev_form_name or "default", entity_descriptor.default_list
prev_form_list: EntityList = entity_descriptor.lists.get(
prev_form_name, entity_descriptor.default_list
)
if prev_form_list.static_filters:
@@ -256,7 +252,7 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
)
else:
entity_data[field_descriptor.field_name] = value
entity_data[field_descriptor.name] = value
# What if user has several roles and each role has its own ownership field? Should we allow creation even
# if user has no CREATE_ALL permission
@@ -283,6 +279,13 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
for key, value in entity_data.items()
}
context = BotContext(
db_session=db_session,
app=app,
app_state=kwargs["app_state"],
message=message,
)
if callback_data.context == CommandContext.ENTITY_CREATE:
entity_type = entity_descriptor.type_
user_permissions = get_user_permissions(user, entity_descriptor)
@@ -304,25 +307,21 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
if iscoroutinefunction(entity_descriptor.on_created):
await entity_descriptor.on_created(
new_entity,
EntityEventContext(
db_session=db_session, app=app, message=message
),
context,
)
else:
entity_descriptor.on_created(
new_entity,
EntityEventContext(
db_session=db_session, app=app, message=message
),
context,
)
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
else None
)
form_list = entity_descriptor.lists.get(
form_name or "default", entity_descriptor.default_list
form_name, entity_descriptor.default_list
)
state_data["navigation_context"] = ContextData(
@@ -357,7 +356,11 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
)
for key, value in deser_entity_data.items():
setattr(entity, key, value)
setattr(
entity,
entity.bot_entity_descriptor.fields_descriptors[key].field_name,
value,
)
await db_session.commit()
await db_session.refresh(entity)
@@ -366,16 +369,12 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
if iscoroutinefunction(entity_descriptor.on_updated):
await entity_descriptor.on_updated(
entity,
EntityEventContext(
db_session=db_session, app=app, message=message
),
context,
)
else:
entity_descriptor.on_updated(
entity,
EntityEventContext(
db_session=db_session, app=app, message=message
),
context,
)
elif callback_data.context == CommandContext.COMMAND_FORM:
@@ -397,5 +396,4 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
clear_state(state_data=state_data)
# TODO: Try back=False and check if it works to navigate to newly created entity
await route_callback(message=message, back=True, **kwargs)

View File

@@ -2,7 +2,7 @@ from aiogram.types import InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from ....model.settings import Settings
from ....model.descriptors import FieldDescriptor
from ....model.descriptors import EntityForm, FieldDescriptor
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand, CommandContext
from ....utils.navigation import get_navigation_context, pop_navigation_context
@@ -34,9 +34,9 @@ async def wrap_editor(
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
else None
)
form = field_descriptor.entity_descriptor.forms.get(
form: EntityForm = field_descriptor.entity_descriptor.forms.get(
form_name, field_descriptor.entity_descriptor.default_form
)
if form.edit_field_sequence:
@@ -46,6 +46,7 @@ async def wrap_editor(
entity_descriptor=field_descriptor.entity_descriptor,
user=user,
callback_data=callback_data,
state_data=state_data,
)
field_index = (
field_sequence.index(field_descriptor.name)

View File

@@ -8,10 +8,11 @@ from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....model.descriptors import (
EntityForm,
FieldEditButton,
CommandButton,
InlineButton,
EntityEventContext,
BotContext,
)
from ....model.settings import Settings
from ....model.user import UserBase
@@ -92,12 +93,15 @@ async def entity_item(
entity=entity_item, user=user, permission=EntityPermission.UPDATE
)
form = entity_descriptor.forms.get(
callback_data.form_params or "default", entity_descriptor.default_form
form: EntityForm = entity_descriptor.forms.get(
callback_data.form_params, entity_descriptor.default_form
)
context = BotContext(
db_session=db_session, app=app, app_state=kwargs["app_state"], message=query
)
if form.form_buttons:
context = EntityEventContext(db_session=db_session, app=app, message=query)
for edit_buttons_row in form.form_buttons:
btn_row = []
for button in edit_buttons_row:
@@ -114,16 +118,17 @@ async def entity_item(
field_value = getattr(entity_item, field_descriptor.field_name)
if btn_caption:
btn_text = await get_callable_str(
btn_caption, field_descriptor, entity_item, field_value
callable_str=btn_caption,
context=context,
entity=entity_item,
)
else:
if field_descriptor.type_base is bool:
btn_text = f"{'【✔︎】 ' if field_value else '【 】 '}{
await get_callable_str(
field_descriptor.caption,
field_descriptor,
entity_item,
field_value,
callable_str=field_descriptor.caption,
context=context,
descriptor=field_descriptor,
)
if field_descriptor.caption
else field_name
@@ -135,10 +140,9 @@ async def entity_item(
else '✏️'
} {
await get_callable_str(
field_descriptor.caption,
field_descriptor,
entity_item,
field_value,
callable_str=field_descriptor.caption,
context=context,
descriptor=field_descriptor,
)
if field_descriptor.caption
else field_name
@@ -160,7 +164,9 @@ async def entity_item(
btn_caption = button.caption
btn_text = await get_callable_str(
btn_caption, entity_descriptor, entity_item
callable_str=btn_caption,
context=context,
entity=entity_item,
)
if isinstance(button.command, ContextData):
@@ -206,6 +212,7 @@ async def entity_item(
entity_descriptor=entity_descriptor,
user=user,
callback_data=callback_data,
state_data=state_data,
)
edit_delete_row.append(
InlineKeyboardButton(
@@ -243,11 +250,17 @@ async def entity_item(
keyboard_builder.row(*edit_delete_row)
if form.item_repr:
item_text = form.item_repr(entity_descriptor, entity_item)
item_text = await get_callable_str(
callable_str=form.item_repr,
context=context,
entity=entity_item,
)
else:
entity_caption = (
await get_callable_str(
entity_descriptor.full_name, entity_descriptor, entity_item
callable_str=entity_descriptor.full_name,
context=context,
descriptor=entity_descriptor,
)
if entity_descriptor.full_name
else entity_descriptor.name
@@ -255,7 +268,9 @@ async def entity_item(
entity_item_repr = (
await get_callable_str(
entity_descriptor.item_repr, entity_descriptor, entity_item
callable_str=entity_descriptor.item_repr,
context=context,
entity=entity_item,
)
if entity_descriptor.item_repr
else str(entity_item.id)
@@ -267,11 +282,23 @@ async def entity_item(
for field_descriptor in entity_descriptor.fields_descriptors.values():
if (
field_descriptor.is_visible is not None
isinstance(field_descriptor.is_visible, bool)
and not field_descriptor.is_visible
):
continue
if callable(field_descriptor.is_visible):
if iscoroutinefunction(field_descriptor.is_visible):
field_visible = await field_descriptor.is_visible(
field_descriptor, entity_item, context
)
else:
field_visible = field_descriptor.is_visible(
field_descriptor, entity_item, context
)
if not field_visible:
continue
skip = False
for own_field in entity_descriptor.ownership_fields.items():
@@ -287,20 +314,27 @@ async def entity_item(
if skip:
continue
field_caption = await get_callable_str(
field_descriptor.caption, field_descriptor, entity_item
field_caption = (
await get_callable_str(
callable_str=field_descriptor.caption,
context=context,
descriptor=field_descriptor,
)
if field_descriptor.caption
else field_descriptor.field_name
)
if field_descriptor.caption_value:
value = await get_callable_str(
field_descriptor.caption_value,
field_descriptor,
entity_item,
getattr(entity_item, field_descriptor.field_name),
callable_str=field_descriptor.caption_value,
context=context,
descriptor=field_descriptor,
entity=entity_item,
)
else:
value = await get_value_repr(
value=getattr(entity_item, field_descriptor.field_name),
field_descriptor=field_descriptor,
context=context,
locale=user.lang,
)
item_text += f"\n{field_caption or field_descriptor.name}:{f' <b>{value}</b>' if value else ''}"

View File

@@ -6,7 +6,7 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import TYPE_CHECKING
from quickbot.model.descriptors import EntityEventContext
from quickbot.model.descriptors import BotContext
from ..context import ContextData, CallbackCommand
from ....model.user import UserBase
@@ -49,6 +49,10 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN))
)
context = BotContext(
db_session=db_session, app=app, app_state=kwargs["app_state"], message=query
)
if callback_data.data == "yes":
entity = await entity_descriptor.type_.remove(
session=db_session, id=int(callback_data.entity_id), commit=True
@@ -58,12 +62,12 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
if iscoroutinefunction(entity_descriptor.on_created):
await entity_descriptor.on_deleted(
entity,
EntityEventContext(db_session=db_session, app=app, message=query),
context,
)
else:
entity_descriptor.on_deleted(
entity,
EntityEventContext(db_session=db_session, app=app, message=query),
context,
)
await route_callback(message=query, **kwargs)
@@ -76,7 +80,12 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
return await query.message.edit_text(
text=(
await Settings.get(Settings.APP_STRINGS_CONFIRM_DELETE_P_NAME)
).format(name=await get_entity_item_repr(entity=entity)),
).format(
name=await get_entity_item_repr(
entity=entity,
context=context,
)
),
reply_markup=InlineKeyboardBuilder()
.row(
InlineKeyboardButton(

View File

@@ -10,7 +10,13 @@ from ....model.bot_entity import BotEntity
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.view_setting import ViewSetting
from ....model.descriptors import EntityDescriptor, Filter
from ....model.descriptors import (
BotContext,
EntityDescriptor,
EntityForm,
EntityList,
Filter,
)
from ....model import EntityPermission
from ....utils.main import (
get_user_permissions,
@@ -103,12 +109,12 @@ async def entity_list(
form_params = (
callback_data.form_params.split("&") if callback_data.form_params else []
)
form_name = form_params.pop(0) if form_params else "default"
form_list = entity_descriptor.lists.get(
form_name or "default", entity_descriptor.default_list
form_name = form_params.pop(0) if form_params else None
form_list: EntityList = entity_descriptor.lists.get(
form_name, entity_descriptor.default_list
)
form_item = entity_descriptor.forms.get(
form_list.item_form or "default", entity_descriptor.default_form
form_item: EntityForm = entity_descriptor.forms.get(
form_list.item_form, entity_descriptor.default_form
)
keyboard_builder = InlineKeyboardBuilder()
@@ -124,6 +130,7 @@ async def entity_list(
entity_descriptor=entity_descriptor,
user=user,
callback_data=callback_data,
state_data=kwargs["state_data"],
)
keyboard_builder.row(
InlineKeyboardButton(
@@ -199,20 +206,39 @@ async def entity_list(
total_pages = 1
page = 1
context = BotContext(
db_session=db_session,
app=app,
app_state=kwargs["app_state"],
message=message,
)
for item in items:
caption = None
if form_list.item_repr:
caption = form_list.item_repr(entity_descriptor, item)
caption = await get_callable_str(
callable_str=form_list.item_repr,
context=context,
entity=item,
)
elif entity_descriptor.item_repr:
caption = entity_descriptor.item_repr(entity_descriptor, item)
caption = await get_callable_str(
callable_str=entity_descriptor.item_repr,
context=context,
entity=item,
)
elif entity_descriptor.full_name:
caption = f"{
await get_callable_str(
callable_str=entity_descriptor.full_name,
context=context,
descriptor=entity_descriptor,
entity=item,
)
}: {item.id}"
else:
if not caption:
caption = f"{entity_descriptor.name}: {item.id}"
keyboard_builder.row(
@@ -240,6 +266,7 @@ async def entity_list(
await add_filter_controls(
keyboard_builder=keyboard_builder,
entity_descriptor=entity_descriptor,
context=context,
filter=entity_filter,
filtering_fields=form_list.filtering_fields,
)
@@ -254,17 +281,29 @@ async def entity_list(
)
if form_list.caption:
entity_text = await get_callable_str(form_list.caption, entity_descriptor)
entity_text = await get_callable_str(
callable_str=form_list.caption,
context=context,
descriptor=entity_descriptor,
)
else:
if entity_descriptor.full_name_plural:
entity_text = await get_callable_str(
entity_descriptor.full_name_plural, entity_descriptor
callable_str=entity_descriptor.full_name_plural,
context=context,
descriptor=entity_descriptor,
)
else:
entity_text = entity_descriptor.name
if entity_descriptor.description:
entity_text = f"{entity_text} {await get_callable_str(entity_descriptor.description, entity_descriptor)}"
entity_text = f"{entity_text} {
await get_callable_str(
callable_str=entity_descriptor.description,
context=context,
descriptor=entity_descriptor,
)
}"
state: FSMContext = kwargs["state"]
state_data = kwargs["state_data"]

View File

@@ -2,13 +2,13 @@ from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
from logging import getLogger
from typing import TYPE_CHECKING
from quickbot.model.descriptors import BotContext
from ....model.settings import Settings
from ..context import ContextData, CallbackCommand
from ....utils.main import get_send_message
from ....model.descriptors import EntityCaptionCallable
from ....utils.main import get_send_message, get_callable_str
from ....utils.navigation import save_navigation_context, pop_navigation_context
if TYPE_CHECKING:
@@ -46,12 +46,21 @@ async def entities_menu(
for entity in entity_metadata.entity_descriptors.values():
if entity.show_in_entities_menu:
if entity.full_name_plural.__class__ == EntityCaptionCallable:
caption = entity.full_name_plural(entity) or entity.name
elif entity.full_name_plural.__class__ == LazyProxy:
caption = f"{f'{entity.icon} ' if entity.icon else ''}{entity.full_name_plural.value or entity.name}"
if entity.full_name_plural:
caption = await get_callable_str(
callable_str=entity.full_name_plural,
context=BotContext(
db_session=kwargs["db_session"],
app=app,
app_state=kwargs["app_state"],
message=message,
),
descriptor=entity,
)
else:
caption = f"{f'{entity.icon} ' if entity.icon else ''}{entity.full_name_plural or entity.name}"
caption = entity.name
caption = f"{f'{entity.icon} ' if entity.icon else ''}{caption}"
keyboard_builder.row(
InlineKeyboardButton(

View File

@@ -4,6 +4,8 @@ from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from quickbot.model.descriptors import BotContext
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand, CommandContext
@@ -53,14 +55,26 @@ async def parameters_menu(
if not key.is_visible:
continue
context = BotContext(
db_session=kwargs["db_session"],
app=kwargs["app"],
app_state=kwargs["app_state"],
message=message,
)
if key.caption_value:
caption = await get_callable_str(
callable_str=key.caption_value, descriptor=key, entity=None, value=value
callable_str=key.caption_value,
context=context,
descriptor=key,
entity={key.field_name: value},
)
else:
if key.caption:
caption = await get_callable_str(
callable_str=key.caption, descriptor=key, entity=None, value=value
callable_str=key.caption,
context=context,
descriptor=key,
)
else:
caption = key.name
@@ -68,7 +82,7 @@ async def parameters_menu(
if key.type_ is bool:
caption = f"{'【✔︎】' if value else '【 】'} {caption}"
else:
caption = f"{caption}: {await get_value_repr(value=value, field_descriptor=key, locale=user.lang)}"
caption = f"{caption}: {await get_value_repr(value=value, field_descriptor=key, context=context, locale=user.lang)}"
keyboard_builder.row(
InlineKeyboardButton(

View File

@@ -5,6 +5,7 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from typing import Any, Callable, TYPE_CHECKING, Literal, Union
from babel.support import LazyProxy
from dataclasses import dataclass, field
from fastapi.datastructures import State
from sqlmodel.ext.asyncio.session import AsyncSession
from .role import RoleBase
@@ -16,29 +17,32 @@ if TYPE_CHECKING:
from ..main import QBotApp
from .user import UserBase
EntityCaptionCallable = Callable[["EntityDescriptor"], str]
EntityItemCaptionCallable = Callable[["EntityDescriptor", Any], str]
EntityFieldCaptionCallable = Callable[["FieldDescriptor", Any, Any], str]
# EntityCaptionCallable = Callable[["EntityDescriptor"], str]
# EntityItemCaptionCallable = Callable[["EntityDescriptor", Any], str]
# EntityFieldCaptionCallable = Callable[["FieldDescriptor", Any, Any], str]
@dataclass
class FieldEditButton:
field_name: str
caption: str | LazyProxy | EntityFieldCaptionCallable | None = None
visibility: Callable[[Any], bool] | None = None
caption: str | LazyProxy | Callable[["BotEntity", "BotContext"], str] | None = None
visibility: Callable[["BotEntity", "BotContext"], bool] | None = None
@dataclass
class CommandButton:
command: ContextData | Callable[[ContextData, Any], ContextData] | str
caption: str | LazyProxy | EntityItemCaptionCallable
visibility: Callable[[Any], bool] | None = None
caption: str | LazyProxy | Callable[["BotEntity", "BotContext"], str] | None = None
visibility: Callable[["BotEntity", "BotContext"], bool] | None = None
@dataclass
class InlineButton:
inline_button: InlineKeyboardButton | Callable[[Any], InlineKeyboardButton]
visibility: Callable[[Any], bool] | None = None
inline_button: (
InlineKeyboardButton
| Callable[["BotEntity", "BotContext"], InlineKeyboardButton]
)
visibility: Callable[["BotEntity", "BotContext"], bool] | None = None
@dataclass
@@ -66,8 +70,10 @@ class Filter:
@dataclass
class EntityList:
caption: str | LazyProxy | EntityCaptionCallable | None = None
item_repr: EntityItemCaptionCallable | None = None
caption: (
str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None
) = None
item_repr: Callable[["BotEntity", "BotContext"], str] | None = None
show_add_new_button: bool = True
item_form: str | None = None
pagination: bool = True
@@ -79,7 +85,7 @@ class EntityList:
@dataclass
class EntityForm:
item_repr: EntityItemCaptionCallable | None = None
item_repr: Callable[["BotEntity", "BotContext"], str] | None = None
edit_field_sequence: list[str] = None
form_buttons: list[list[FieldEditButton | CommandButton | InlineButton]] = None
show_edit_button: bool = True
@@ -89,11 +95,24 @@ class EntityForm:
@dataclass(kw_only=True)
class _BaseFieldDescriptor:
icon: str = None
caption: str | LazyProxy | EntityFieldCaptionCallable | None = None
description: str | LazyProxy | EntityFieldCaptionCallable | None = None
edit_prompt: str | LazyProxy | EntityFieldCaptionCallable | None = None
caption_value: EntityFieldCaptionCallable | None = None
is_visible: bool | None = None
caption: (
str | LazyProxy | Callable[["FieldDescriptor", "BotContext"], str] | None
) = None
description: (
str | LazyProxy | Callable[["FieldDescriptor", "BotContext"], str] | None
) = None
edit_prompt: (
str
| LazyProxy
| Callable[["FieldDescriptor", Union["BotEntity", Any], "BotContext"], str]
| None
) = None
caption_value: (
Callable[["FieldDescriptor", Union["BotEntity", Any], "BotContext"], str] | None
) = None
is_visible: (
bool | Callable[["FieldDescriptor", "BotEntity", "BotContext"], bool] | None
) = None
localizable: bool = False
bool_false_value: str | LazyProxy = "no"
bool_true_value: str | LazyProxy = "yes"
@@ -140,10 +159,16 @@ class FieldDescriptor(_BaseFieldDescriptor):
@dataclass(kw_only=True)
class _BaseEntityDescriptor:
icon: str = "📘"
full_name: str | LazyProxy | EntityCaptionCallable | None = None
full_name_plural: str | LazyProxy | EntityCaptionCallable | None = None
description: str | LazyProxy | EntityCaptionCallable | None = None
item_repr: EntityItemCaptionCallable | None = None
full_name: (
str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None
) = None
full_name_plural: (
str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None
) = None
description: (
str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None
) = None
item_repr: Callable[["BotEntity", "BotContext"], str] | None = None
default_list: EntityList = field(default_factory=EntityList)
default_form: EntityForm = field(default_factory=EntityForm)
lists: dict[str, EntityList] = field(default_factory=dict[str, EntityList])
@@ -164,9 +189,9 @@ class _BaseEntityDescriptor:
EntityPermission.DELETE_ALL: [RoleBase.SUPER_USER],
}
)
on_created: Callable[["BotEntity", "EntityEventContext"], None] | None = None
on_deleted: Callable[["BotEntity", "EntityEventContext"], None] | None = None
on_updated: Callable[["BotEntity", "EntityEventContext"], None] | None = None
on_created: Callable[["BotEntity", "BotContext"], None] | None = None
on_deleted: Callable[["BotEntity", "BotContext"], None] | None = None
on_updated: Callable[["BotEntity", "BotContext"], None] | None = None
@dataclass(kw_only=True)
@@ -202,9 +227,10 @@ class CommandCallbackContext[UT: UserBase]:
@dataclass(kw_only=True)
class EntityEventContext:
class BotContext:
db_session: AsyncSession
app: "QBotApp"
app_state: State
message: Message | CallbackQuery | None = None

View File

@@ -2,7 +2,7 @@ from babel.support import LazyProxy
from inspect import iscoroutinefunction, signature
from aiogram.types import Message, CallbackQuery
from aiogram.utils.i18n import I18n
from typing import Any, TYPE_CHECKING
from typing import Any, TYPE_CHECKING, Callable
import ujson as json
from ..model.bot_entity import BotEntity
@@ -10,15 +10,14 @@ from ..model.bot_enum import BotEnum
from ..model.settings import Settings
from ..model.descriptors import (
BotContext,
EntityList,
FieldDescriptor,
EntityDescriptor,
EntityItemCaptionCallable,
EntityFieldCaptionCallable,
EntityPermission,
EntityCaptionCallable,
)
from ..bot.handlers.context import ContextData, CommandContext
from ..bot.handlers.context import CallbackCommand, ContextData, CommandContext
if TYPE_CHECKING:
from ..model.user import UserBase
@@ -106,24 +105,38 @@ def clear_state(state_data: dict, clear_nav: bool = False):
async def get_entity_item_repr(
entity: BotEntity, item_repr: EntityItemCaptionCallable | None = None
entity: BotEntity,
context: BotContext,
item_repr: Callable[[BotEntity, BotContext], str] | None = None,
) -> str:
descr = entity.bot_entity_descriptor
if not item_repr:
item_repr = descr.item_repr
if item_repr:
return item_repr(descr, entity)
return (
descr.item_repr(descr, entity)
if descr.item_repr
else f"{
await get_callable_str(descr.full_name, descr, entity)
if descr.full_name
else descr.name
}: {str(entity.id)}"
)
if iscoroutinefunction(item_repr):
return await item_repr(entity, context)
else:
return item_repr(entity, context)
return f"{
await get_callable_str(
callable_str=descr.full_name,
context=context,
descriptor=descr,
entity=entity,
)
if descr.full_name
else descr.name
}: {str(entity.id)}"
async def get_value_repr(
value: Any, field_descriptor: FieldDescriptor, locale: str | None = None
value: Any,
field_descriptor: FieldDescriptor,
context: BotContext,
locale: str | None = None,
) -> str:
if value is None:
return ""
@@ -133,9 +146,14 @@ async def get_value_repr(
return "【✔︎】" if value else "【 】"
elif field_descriptor.is_list:
if issubclass(type_, BotEntity):
return (
f"[{', '.join([await get_entity_item_repr(item) for item in value])}]"
)
return f"[{
', '.join(
[
await get_entity_item_repr(entity=item, context=context)
for item in value
]
)
}]"
elif issubclass(type_, BotEnum):
return f"[{', '.join(item.localized(locale) for item in value)}]"
elif type_ is str:
@@ -143,7 +161,7 @@ async def get_value_repr(
else:
return f"[{', '.join([str(item) for item in value])}]"
elif issubclass(type_, BotEntity):
return await get_entity_item_repr(value)
return await get_entity_item_repr(entity=value, context=context)
elif issubclass(type_, BotEnum):
return value.localized(locale)
elif isinstance(value, str):
@@ -162,13 +180,13 @@ async def get_callable_str(
callable_str: (
str
| LazyProxy
| EntityCaptionCallable
| EntityItemCaptionCallable
| EntityFieldCaptionCallable
| Callable[[EntityDescriptor, BotContext], str]
| Callable[[BotEntity, BotContext], str]
| Callable[[FieldDescriptor, BotEntity, BotContext], str]
),
descriptor: FieldDescriptor | EntityDescriptor,
entity: Any = None,
value: Any = None,
context: BotContext,
descriptor: FieldDescriptor | EntityDescriptor | None = None,
entity: BotEntity | Any = None,
) -> str:
if isinstance(callable_str, str):
return callable_str
@@ -177,19 +195,22 @@ async def get_callable_str(
elif callable(callable_str):
args = signature(callable_str).parameters
if iscoroutinefunction(callable_str):
if len(args) == 1:
return await callable_str(descriptor)
elif len(args) == 2:
return await callable_str(descriptor, entity)
elif len(args) == 3:
return await callable_str(descriptor, entity, value)
if len(args) == 3:
return await callable_str(descriptor, entity, context)
else:
if issubclass(args[0].annotation, BotEntity):
return await callable_str(entity, context)
else:
return await callable_str(descriptor, context)
else:
if len(args) == 1:
return callable_str(descriptor)
elif len(args) == 2:
return callable_str(descriptor, entity)
elif len(args) == 3:
return callable_str(descriptor, entity, value)
if len(args) == 3:
return callable_str(descriptor, entity, context)
else:
return callable_str(entity or descriptor, context)
else:
raise ValueError(
f"Invalid callable type: {type(callable_str)}. Expected str, LazyProxy or callable."
)
def get_entity_descriptor(
@@ -225,37 +246,72 @@ def get_field_descriptor(
def build_field_sequence(
entity_descriptor: EntityDescriptor, user: "UserBase", callback_data: ContextData
entity_descriptor: EntityDescriptor,
user: "UserBase",
callback_data: ContextData,
state_data: dict,
):
prev_form_list = None
stack = state_data.get("navigation_stack", [])
prev_callback_data = ContextData.unpack(stack[-1]) if stack else None
if (
prev_callback_data
and prev_callback_data.command == CallbackCommand.ENTITY_LIST
and prev_callback_data.entity_name == entity_descriptor.name
):
prev_form_name = (
prev_callback_data.form_params.split("&")[0]
if prev_callback_data.form_params
else None
)
prev_form_list: EntityList = entity_descriptor.lists.get(
prev_form_name, entity_descriptor.default_list
)
field_sequence = list[str]()
# exclude ownership fields from edit if user has no CREATE_ALL/UPDATE_ALL permission
user_permissions = get_user_permissions(user, entity_descriptor)
for fd in entity_descriptor.fields_descriptors.values():
if not (
skip = False
if (
fd.is_optional
or fd.field_name == "id"
or fd.field_name[:-3] == "_id"
or (
fd.field_name[-3:] == "_id"
and fd.field_name[:-3] in entity_descriptor.fields_descriptors
)
or fd.default is not None
or fd.default_factory is not None
):
skip = False
for own_field in entity_descriptor.ownership_fields.items():
if (
own_field[1].rstrip("_id") == fd.field_name.rstrip("_id")
and own_field[0] in user.roles
and (
(
EntityPermission.CREATE_ALL not in user_permissions
and callback_data.context == CommandContext.ENTITY_CREATE
)
or (
EntityPermission.UPDATE_ALL not in user_permissions
and callback_data.context == CommandContext.ENTITY_EDIT
)
skip = True
for own_field in entity_descriptor.ownership_fields.items():
if (
own_field[1].rstrip("_id") == fd.field_name.rstrip("_id")
and own_field[0] in user.roles
and (
(
EntityPermission.CREATE_ALL not in user_permissions
and callback_data.context == CommandContext.ENTITY_CREATE
)
):
skip = True
break
if not skip:
field_sequence.append(fd.field_name)
or (
EntityPermission.UPDATE_ALL not in user_permissions
and callback_data.context == CommandContext.ENTITY_EDIT
)
)
):
skip = True
break
if (
prev_form_list
and prev_form_list.static_filters
and fd.field_name.rstrip("_id")
in [f.field_name.rstrip("_id") for f in prev_form_list.static_filters]
):
skip = True
if not skip:
field_sequence.append(fd.field_name)
return field_sequence