add command params

This commit is contained in:
Alexander Kalinovsky
2025-01-29 23:40:43 +01:00
parent b40e588379
commit f666bcfba3
33 changed files with 547 additions and 340 deletions

View File

@@ -8,6 +8,5 @@ from .model.descriptors import (
EntityForm as EntityForm,
EntityList as EntityList,
EntityPermission as EntityPermission,
Command as Command,
CommandCallbackContext as CommandCallbackContext,
)

View File

View File

@@ -25,6 +25,7 @@ def add_pagination_controls(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data="1" if page != 1 else "skip",
).pack(),
)
@@ -39,6 +40,7 @@ def add_pagination_controls(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=str(max(page - 10, 1)) if page > 1 else "skip",
).pack(),
)
@@ -54,6 +56,7 @@ def add_pagination_controls(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=str(max(page - 1, 1)) if page > 1 else "skip",
).pack(),
)
@@ -68,6 +71,7 @@ def add_pagination_controls(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=(
str(min(page + 1, total_pages))
if page < total_pages
@@ -88,6 +92,7 @@ def add_pagination_controls(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=(
str(min(page + 10, total_pages))
if page < total_pages
@@ -106,6 +111,7 @@ def add_pagination_controls(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=str(total_pages) if page != total_pages else "skip",
).pack(),
)

View File

@@ -1,7 +1,7 @@
from aiogram.types import Message, CallbackQuery
from ..context import CallbackCommand
from ..navigation import (
from ....utils.navigation import (
get_navigation_context,
save_navigation_context,
pop_navigation_context,
@@ -41,7 +41,12 @@ async def route_callback(message: Message | CallbackQuery, back: bool = True, **
await form_item.entity_item(message, **kwargs)
elif context.command == CallbackCommand.FIELD_EDITOR:
await editor.field_editor(message, **kwargs)
elif context.command == CallbackCommand.USER_COMMAND:
import qbot.bot.handlers.user_handlers.main as user_handler
await user_handler.cammand_handler(
message, from_callback_query=True, **kwargs
)
else:
raise ValueError(f"Unknown command {context.command}")
else:

View File

@@ -29,6 +29,7 @@ class CommandContext(StrEnum):
ENTITY_CREATE = "ec"
ENTITY_EDIT = "ee"
ENTITY_FIELD_EDIT = "ef"
COMMAND_FORM = "cf"
class ContextData(BaseCallbackData, prefix="cd"):

View File

@@ -5,7 +5,7 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
from logging import getLogger
from ....model.descriptors import EntityFieldDescriptor
from ....model.descriptors import FieldDescriptor
from ..context import ContextData, CallbackCommand
from ....utils.main import get_send_message
from .wrapper import wrap_editor
@@ -18,7 +18,7 @@ router = Router()
async def bool_editor(
message: Message | CallbackQuery,
edit_prompt: str,
field_descriptor: EntityFieldDescriptor,
field_descriptor: FieldDescriptor,
callback_data: ContextData,
**kwargs,
):
@@ -44,6 +44,7 @@ async def bool_editor(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=str(True),
).pack(),
),
@@ -56,6 +57,7 @@ async def bool_editor(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=str(False),
).pack(),
),

View File

@@ -4,7 +4,7 @@ from datetime import datetime, time
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.descriptors import EntityFieldDescriptor
from ....model.descriptors import FieldDescriptor
from ....model.settings import Settings
from ....model.user import UserBase
from ....utils.main import get_callable_str, get_value_repr
@@ -16,7 +16,7 @@ from .string import string_editor
async def show_editor(message: Message | CallbackQuery, **kwargs):
field_descriptor: EntityFieldDescriptor = kwargs["field_descriptor"]
field_descriptor: FieldDescriptor = kwargs["field_descriptor"]
current_value = kwargs["current_value"]
user: UserBase = kwargs["user"]
callback_data: ContextData = kwargs.get("callback_data", None)

View File

@@ -6,7 +6,7 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from typing import TYPE_CHECKING
from ....model.descriptors import EntityFieldDescriptor
from ....model.descriptors import FieldDescriptor
from ....model.settings import Settings
from ..context import ContextData, CallbackCommand
from ....utils.main import get_send_message, get_field_descriptor
@@ -45,7 +45,7 @@ async def time_picker_callback(
async def time_picker(
message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
field_descriptor: FieldDescriptor,
callback_data: ContextData,
current_value: datetime | time,
state: FSMContext,
@@ -70,6 +70,7 @@ async def time_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=current_value.replace(
hour=i if current_value.hour < 12 else i + 12
).strftime(
@@ -92,6 +93,7 @@ async def time_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=current_value.replace(minute=i * 5).strftime(
"%Y-%m-%d %H-%M"
if isinstance(current_value, datetime)
@@ -112,6 +114,7 @@ async def time_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=current_value.replace(
hour=current_value.hour + 12
if current_value.hour < 12
@@ -130,6 +133,7 @@ async def time_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=current_value.strftime(
"%Y-%m-%d %H-%M" if isinstance(current_value, datetime) else "%H-%M"
),
@@ -157,7 +161,7 @@ async def time_picker(
async def date_picker(
message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
field_descriptor: FieldDescriptor,
callback_data: ContextData,
current_value: datetime,
state: FSMContext,
@@ -185,6 +189,7 @@ async def date_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=previous_month.strftime("%Y-%m-%d %H-%M"),
).pack(),
),
@@ -197,6 +202,7 @@ async def date_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=start_date.strftime("%Y-%m-%d %H-%M"),
).pack(),
),
@@ -209,6 +215,7 @@ async def date_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=next_month.strftime("%Y-%m-%d %H-%M"),
).pack(),
),
@@ -237,6 +244,7 @@ async def date_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=current_day.strftime("%Y-%m-%d %H-%M"),
).pack(),
)
@@ -288,6 +296,7 @@ async def date_picker_year(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=start_date.replace(year=start_date.year - 20).strftime(
"%Y-%m-%d %H-%M"
),
@@ -309,6 +318,7 @@ async def date_picker_year(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=current_date.strftime("%Y-%m-%d %H-%M"),
).pack(),
)
@@ -326,6 +336,7 @@ async def date_picker_year(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=start_date.replace(year=start_date.year + 20).strftime(
"%Y-%m-%d %H-%M"
),

View File

@@ -14,7 +14,7 @@ from ....model.bot_enum import BotEnum
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.view_setting import ViewSetting
from ....model.descriptors import EntityFieldDescriptor, Filter
from ....model.descriptors import FieldDescriptor, Filter
from ....model import EntityPermission
from ....utils.main import (
get_user_permissions,
@@ -37,7 +37,7 @@ router = Router()
async def entity_picker(
message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
field_descriptor: FieldDescriptor,
edit_prompt: str,
current_value: BotEntity | BotEnum | list[BotEntity] | list[BotEnum],
**kwargs,
@@ -67,7 +67,7 @@ def calc_total_pages(items_count: int, page_size: int) -> int:
async def render_entity_picker(
*,
field_descriptor: EntityFieldDescriptor,
field_descriptor: FieldDescriptor,
message: Message | CallbackQuery,
callback_data: ContextData,
user: UserBase,
@@ -239,6 +239,7 @@ async def render_entity_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data=f"{page}&{item['value']}" if is_list else item["value"],
).pack(),
)
@@ -275,6 +276,7 @@ async def render_entity_picker(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
).pack(),
)
)

View File

@@ -15,7 +15,7 @@ from ....utils.main import (
from ....utils.serialization import deserialize, serialize
from ..context import ContextData, CallbackCommand, CommandContext
from ....auth import authorize_command
from ..navigation import (
from ....utils.navigation import (
get_navigation_context,
save_navigation_context,
)
@@ -37,14 +37,22 @@ router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.FIELD_EDITOR))
async def field_editor_callback(query: CallbackQuery, **kwargs):
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
await field_editor(message=query, **kwargs)
async def field_editor(message: Message | CallbackQuery, **kwargs):
callback_data: ContextData = kwargs.get("callback_data", None)
db_session: AsyncSession = kwargs["db_session"]
user: UserBase = kwargs["user"]
app: "QBotApp" = kwargs["app"]
state: FSMContext = kwargs["state"]
# state: FSMContext = kwargs["state"]
state_data = await state.get_data()
state_data: dict = kwargs["state_data"]
entity_data = state_data.get("entity_data")
for key in ["current_value", "value", "locale_index"]:

View File

@@ -8,10 +8,11 @@ import json
from ..context import ContextData, CallbackCommand, CommandContext
from ...command_context_filter import CallbackCommandFilter
from ..user_handlers.main import cammand_handler
from ....model import EntityPermission
from ....model.user import UserBase
from ....model.settings import Settings
from ....model.descriptors import EntityFieldDescriptor
from ....model.descriptors import FieldDescriptor
from ....model.language import LanguageBase
from ....auth import authorize_command
from ....utils.main import (
@@ -60,7 +61,9 @@ async def field_editor_callback(message: Message | CallbackQuery, **kwargs):
else:
value = {}
value[list(LanguageBase.all_members.keys())[locale_index]] = message.text
value[list(LanguageBase.all_members.values())[locale_index].value] = (
message.text
)
value = json.dumps(value, ensure_ascii=False)
if locale_index < len(LanguageBase.all_members.values()) - 1:
@@ -125,7 +128,7 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
callback_data: ContextData = kwargs.get("callback_data", None)
state_data: dict = kwargs["state_data"]
value = kwargs["value"]
field_descriptor: EntityFieldDescriptor = kwargs["field_descriptor"]
field_descriptor: FieldDescriptor = kwargs["field_descriptor"]
if callback_data.context == CommandContext.SETTING_EDIT:
if callback_data.data != "cancel":
@@ -145,29 +148,37 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
CommandContext.ENTITY_CREATE,
CommandContext.ENTITY_EDIT,
CommandContext.ENTITY_FIELD_EDIT,
CommandContext.COMMAND_FORM,
]:
app: "QBotApp" = kwargs["app"]
entity_descriptor = get_entity_descriptor(app, callback_data)
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
)
form = entity_descriptor.forms.get(form_name, entity_descriptor.default_form)
if callback_data.context == CommandContext.COMMAND_FORM:
field_sequence = list(field_descriptor.command.param_form.keys())
current_index = field_sequence.index(callback_data.field_name)
field_descriptors = field_descriptor.command.param_form
else:
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
)
form = entity_descriptor.forms.get(
form_name, entity_descriptor.default_form
)
field_sequence = form.edit_field_sequence
current_index = (
field_sequence.index(callback_data.field_name)
if callback_data.context
in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]
else 0
)
field_sequence = form.edit_field_sequence
current_index = (
field_sequence.index(callback_data.field_name)
if callback_data.context
in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]
else 0
)
field_descriptors = entity_descriptor.fields_descriptors
entity_data = state_data.get("entity_data", {})
if callback_data.context == CommandContext.ENTITY_CREATE:
if callback_data.context == CommandContext.ENTITY_CREATE and not entity_data:
stack = state_data.get("navigation_stack", [])
prev_callback_data = ContextData.unpack(stack[-1]) if stack else None
if (
@@ -199,16 +210,18 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
if (
callback_data.context
in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]
in [
CommandContext.ENTITY_CREATE,
CommandContext.ENTITY_EDIT,
CommandContext.COMMAND_FORM,
]
and current_index < len(field_sequence) - 1
):
entity_data[field_descriptor.field_name] = value
state_data.update({"entity_data": entity_data})
next_field_name = field_sequence[current_index + 1]
next_field_descriptor = entity_descriptor.fields_descriptors[
next_field_name
]
next_field_descriptor = field_descriptors[next_field_name]
kwargs.update({"field_descriptor": next_field_descriptor})
callback_data.field_name = next_field_name
@@ -232,8 +245,6 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
)
else:
entity_type = entity_descriptor.type_
entity_data[field_descriptor.field_name] = value
# What if user has several roles and each role has its own ownership field? Should we allow creation even
@@ -246,13 +257,14 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
deser_entity_data = {
key: await deserialize(
session=db_session,
type_=entity_descriptor.fields_descriptors[key].type_,
type_=field_descriptors[key].type_,
value=value,
)
for key, value in entity_data.items()
}
if callback_data.context == CommandContext.ENTITY_CREATE:
entity_type = entity_descriptor.type_
user_permissions = get_user_permissions(user, entity_descriptor)
if (
EntityPermission.CREATE not in user_permissions
@@ -290,6 +302,7 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
CommandContext.ENTITY_EDIT,
CommandContext.ENTITY_FIELD_EDIT,
]:
entity_type = entity_descriptor.type_
entity_id = int(callback_data.entity_id)
entity = await entity_type.get(session=db_session, id=entity_id)
if not entity:
@@ -309,6 +322,23 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
await db_session.commit()
elif callback_data.context == CommandContext.COMMAND_FORM:
clear_state(state_data=state_data)
state_data["entity_data"] = entity_data
kwargs.update(
{
"callback_data": ContextData(
command=CallbackCommand.USER_COMMAND,
user_command=callback_data.user_command,
data=callback_data.data,
)
}
)
return await cammand_handler(message=message, **kwargs)
clear_state(state_data=state_data)
# TODO: Try back=False and check if it works to navigate to newly created entity
await route_callback(message=message, back=True, **kwargs)

View File

@@ -5,7 +5,7 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from typing import Any
from ....model.descriptors import EntityFieldDescriptor
from ....model.descriptors import FieldDescriptor
from ....model.language import LanguageBase
from ....model.settings import Settings
from ....utils.main import get_send_message, get_local_text
@@ -20,7 +20,7 @@ router = Router()
async def string_editor(
message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
field_descriptor: FieldDescriptor,
callback_data: ContextData,
current_value: Any,
edit_prompt: str,
@@ -41,6 +41,7 @@ async def string_editor(
entity_id=callback_data.entity_id,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
)
if field_descriptor.type_base is str and field_descriptor.localizable:

View File

@@ -2,14 +2,14 @@ from aiogram.types import InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from ....model.settings import Settings
from ....model.descriptors import EntityFieldDescriptor
from ....model.descriptors import FieldDescriptor
from ..context import ContextData, CallbackCommand, CommandContext
from ..navigation import get_navigation_context, pop_navigation_context
from ....utils.navigation import get_navigation_context, pop_navigation_context
async def wrap_editor(
keyboard_builder: InlineKeyboardBuilder,
field_descriptor: EntityFieldDescriptor,
field_descriptor: FieldDescriptor,
callback_data: ContextData,
state_data: dict,
):
@@ -17,28 +17,37 @@ async def wrap_editor(
CommandContext.ENTITY_CREATE,
CommandContext.ENTITY_EDIT,
CommandContext.ENTITY_FIELD_EDIT,
CommandContext.COMMAND_FORM,
]:
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
)
form = field_descriptor.entity_descriptor.forms.get(
form_name, field_descriptor.entity_descriptor.default_form
)
btns = []
field_index = (
form.edit_field_sequence.index(field_descriptor.name)
if callback_data.context
in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]
else 0
)
show_back = True
show_cancel = True
if callback_data.context == CommandContext.COMMAND_FORM:
field_sequence = list(field_descriptor.command.param_form.keys())
field_index = field_sequence.index(callback_data.field_name)
show_back = field_descriptor.command.show_back_in_param_form
show_cancel = field_descriptor.command.show_cancel_in_param_form
else:
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else "default"
)
form = field_descriptor.entity_descriptor.forms.get(
form_name, field_descriptor.entity_descriptor.default_form
)
field_sequence = form.edit_field_sequence
field_index = (
field_sequence.index(field_descriptor.name)
if callback_data.context
in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]
else 0
)
stack, context = get_navigation_context(state_data=state_data)
context = pop_navigation_context(stack)
if field_index > 0:
if field_index > 0 and show_back:
btns.append(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
@@ -48,7 +57,8 @@ async def wrap_editor(
entity_name=callback_data.entity_name,
entity_id=callback_data.entity_id,
form_params=callback_data.form_params,
field_name=form.edit_field_sequence[field_index - 1],
user_command=callback_data.user_command,
field_name=field_sequence[field_index - 1],
).pack(),
)
)
@@ -62,8 +72,9 @@ async def wrap_editor(
context=callback_data.context,
entity_name=callback_data.entity_name,
entity_id=callback_data.entity_id,
form_params=callback_data.form_params,
field_name=callback_data.field_name,
form_params=callback_data.form_params,
user_command=callback_data.user_command,
data="skip",
).pack(),
)
@@ -71,12 +82,13 @@ async def wrap_editor(
keyboard_builder.row(*btns)
keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_CANCEL_BTN)),
callback_data=context.pack(),
if show_cancel:
keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_CANCEL_BTN)),
callback_data=context.pack(),
)
)
)
elif callback_data.context == CommandContext.SETTING_EDIT:
keyboard_builder.row(

View File

@@ -19,7 +19,7 @@ from ....utils.main import (
get_entity_descriptor,
)
from ..context import ContextData, CallbackCommand, CommandContext
from ..navigation import (
from ....utils.navigation import (
pop_navigation_context,
save_navigation_context,
)
@@ -84,94 +84,89 @@ async def entity_item(
callback_data.form_params or "default", entity_descriptor.default_form
)
for edit_buttons_row in form.form_buttons:
btn_row = []
for button in edit_buttons_row:
if form.form_buttons:
for edit_buttons_row in form.form_buttons:
btn_row = []
for button in edit_buttons_row:
if button.visibility and not button.visibility(entity_item):
continue
if button.visibility and not button.visibility(entity_item):
continue
if isinstance(button, FieldEditButton) and can_edit:
field_name = button.field_name
btn_caption = button.caption
if field_name in entity_descriptor.fields_descriptors:
field_descriptor = entity_descriptor.fields_descriptors[
field_name
]
field_value = getattr(entity_item, field_descriptor.field_name)
if btn_caption:
btn_text = get_callable_str(
btn_caption, field_descriptor, entity_item, field_value
)
else:
if field_descriptor.type_base is bool:
btn_text = f"{'【✔︎】 ' if field_value else '【 】 '}{
get_callable_str(
field_descriptor.caption,
field_descriptor,
entity_item,
field_value,
)
if field_descriptor.caption
else field_name
}"
if isinstance(button, FieldEditButton) and can_edit:
field_name = button.field_name
btn_caption = button.caption
if field_name in entity_descriptor.fields_descriptors:
field_descriptor = entity_descriptor.fields_descriptors[
field_name
]
field_value = getattr(entity_item, field_descriptor.field_name)
if btn_caption:
btn_text = get_callable_str(
btn_caption, field_descriptor, entity_item, field_value
)
else:
btn_text = (
f"✏️ {
if field_descriptor.type_base is bool:
btn_text = f"{'【✔︎】 ' if field_value else '【 】 '}{
get_callable_str(
field_descriptor.caption,
field_descriptor,
entity_item,
field_value,
)
if field_descriptor.caption
else field_name
}"
if field_descriptor.caption
else f"✏️ {field_name}"
else:
btn_text = (
f"✏️ {
get_callable_str(
field_descriptor.caption,
field_descriptor,
entity_item,
field_value,
)
}"
if field_descriptor.caption
else f"✏️ {field_name}"
)
btn_row.append(
InlineKeyboardButton(
text=btn_text,
callback_data=ContextData(
command=CallbackCommand.FIELD_EDITOR,
context=CommandContext.ENTITY_FIELD_EDIT,
entity_name=entity_descriptor.name,
entity_id=str(entity_item.id),
field_name=field_name,
).pack(),
)
btn_row.append(
InlineKeyboardButton(
text=btn_text,
callback_data=ContextData(
command=CallbackCommand.FIELD_EDITOR,
context=CommandContext.ENTITY_FIELD_EDIT,
entity_name=entity_descriptor.name,
entity_id=str(entity_item.id),
field_name=field_name,
).pack(),
)
)
elif isinstance(button, CommandButton):
elif isinstance(button, CommandButton):
btn_caption = button.caption
btn_caption = button.caption
if btn_caption:
btn_text = get_callable_str(
btn_caption, entity_descriptor, entity_item
)
else:
btn_text = button.command
if isinstance(button.context_data, ContextData):
btn_cdata = button.context_data
elif callable(button.context_data):
btn_cdata = button.context_data(callback_data, entity_item)
else:
btn_cdata = ContextData(
command=CallbackCommand.USER_COMMAND,
user_command=button.command,
data=str(entity_item.id),
if isinstance(button.command, ContextData):
btn_cdata = button.command
elif callable(button.command):
btn_cdata = button.command(callback_data, entity_item)
elif isinstance(button.command, str):
btn_cdata = ContextData(
command=CallbackCommand.USER_COMMAND,
user_command=button.command,
)
btn_row.append(
InlineKeyboardButton(
text=btn_text,
callback_data=btn_cdata.pack(),
)
)
btn_row.append(
InlineKeyboardButton(
text=btn_text,
callback_data=btn_cdata.pack(),
)
)
if btn_row:
keyboard_builder.row(*btn_row)
if btn_row:
keyboard_builder.row(*btn_row)
edit_delete_row = []
if can_edit and form.show_edit_button:

View File

@@ -1,4 +1,5 @@
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -10,7 +11,7 @@ from ....model.settings import Settings
from ....model import EntityPermission
from ....utils.main import (
check_entity_permission,
get_value_repr,
get_entity_item_repr,
get_entity_descriptor,
)
from ..common.routing import route_callback
@@ -28,6 +29,9 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
user: UserBase = kwargs["user"]
db_session: AsyncSession = kwargs["db_session"]
app: "QBotApp" = kwargs["app"]
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
entity_descriptor = get_entity_descriptor(app=app, callback_data=callback_data)
@@ -49,12 +53,7 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
await route_callback(message=query, **kwargs)
elif callback_data.data == "no":
await route_callback(message=query, back=False, **kwargs)
elif not callback_data.data:
field_descriptor = entity_descriptor.fields_descriptors["name"]
entity = await entity_descriptor.type_.get(
session=db_session, id=int(callback_data.entity_id)
)
@@ -62,13 +61,7 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
return await query.message.edit_text(
text=(
await Settings.get(Settings.APP_STRINGS_CONFIRM_DELETE_P_NAME)
).format(
name=get_value_repr(
value=getattr(entity, field_descriptor.name),
field_descriptor=field_descriptor,
locale=user.lang,
)
),
).format(name=get_entity_item_repr(entity=entity)),
reply_markup=InlineKeyboardBuilder()
.row(
InlineKeyboardButton(
@@ -88,7 +81,6 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
entity_name=callback_data.entity_name,
entity_id=callback_data.entity_id,
form_params=callback_data.form_params,
data="no",
).pack(),
),
)

View File

@@ -23,7 +23,7 @@ from ....utils.serialization import deserialize
from ..context import ContextData, CallbackCommand, CommandContext
from ..common.pagination import add_pagination_controls
from ..common.filtering import add_filter_controls
from ..navigation import pop_navigation_context, save_navigation_context
from ....utils.navigation import pop_navigation_context, save_navigation_context
if TYPE_CHECKING:
from ....main import QBotApp

View File

@@ -9,7 +9,7 @@ from ....model.settings import Settings
from ..context import ContextData, CallbackCommand
from ....utils.main import get_send_message
from ....model.descriptors import EntityCaptionCallable
from ..navigation import save_navigation_context, pop_navigation_context
from ....utils.navigation import save_navigation_context, pop_navigation_context
if TYPE_CHECKING:
from ....main import QBotApp
@@ -45,9 +45,7 @@ async def entities_menu(
entity_metadata = app.entity_metadata
for entity in entity_metadata.entity_descriptors.values():
if entity.show_in_entities_menu:
if entity.full_name_plural.__class__ == EntityCaptionCallable:
caption = entity.full_name_plural(entity) or entity.name
elif entity.full_name_plural.__class__ == LazyProxy:

View File

@@ -10,7 +10,7 @@ from aiogram.utils.i18n import I18n
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ..navigation import pop_navigation_context, save_navigation_context
from ....utils.navigation import pop_navigation_context, save_navigation_context
from ....model.language import LanguageBase
from ....model.settings import Settings
from ....model.user import UserBase

View File

@@ -5,7 +5,7 @@ from logging import getLogger
from ....model.settings import Settings
from ..context import ContextData, CallbackCommand
from ....utils.main import get_send_message
from ..navigation import save_navigation_context, pop_navigation_context
from ....utils.navigation import save_navigation_context, pop_navigation_context
import qbot.bot.handlers.menu.entities as entities
import qbot.bot.handlers.menu.settings as settings
@@ -17,7 +17,7 @@ import qbot.bot.handlers.forms.entity_list as entity_list
import qbot.bot.handlers.forms.entity_form as entity_form
import qbot.bot.handlers.forms.entity_form_callbacks as entity_form_callbacks
import qbot.bot.handlers.common.filtering_callbacks as filtering_callbacks
import qbot.bot.handlers.user_handlers as user_handlers
import qbot.bot.handlers.user_handlers.main as user_handlers_main
logger = getLogger(__name__)
@@ -26,7 +26,7 @@ router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_MAIN))
async def menu_entry_main(message: CallbackQuery, **kwargs):
stack = await save_navigation_context(
stack = save_navigation_context(
callback_data=kwargs["callback_data"], state=kwargs["state"]
)
@@ -84,5 +84,5 @@ router.include_routers(
entity_form.router,
entity_form_callbacks.router,
filtering_callbacks.router,
user_handlers.router,
user_handlers_main.router,
)

View File

@@ -13,7 +13,7 @@ from ....utils.main import (
get_value_repr,
get_callable_str,
)
from ..navigation import save_navigation_context, pop_navigation_context
from ....utils.navigation import save_navigation_context, pop_navigation_context
from ....auth import authorize_command

View File

@@ -9,7 +9,7 @@ from ....model.user import UserBase
from ....utils.main import get_send_message
from ..context import ContextData, CallbackCommand
from ....auth import authorize_command
from ..navigation import save_navigation_context, pop_navigation_context
from ....utils.navigation import save_navigation_context, pop_navigation_context
logger = getLogger(__name__)
router = Router()

View File

@@ -1,102 +0,0 @@
from typing import TYPE_CHECKING
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from ..context import ContextData, CallbackCommand
from ....model.settings import Settings
from ....utils.main import get_send_message, clear_state
from ....model.descriptors import CommandCallbackContext
from ..navigation import (
save_navigation_context,
get_navigation_context,
pop_navigation_context,
)
if TYPE_CHECKING:
from ....main import QBotApp
router = Router()
@router.message(F.text.startswith("/"))
async def command_text(message: Message, **kwargs):
str_command = message.text.lstrip("/")
callback_data = ContextData(
command=CallbackCommand.USER_COMMAND, user_command=str_command
)
await command_handler(message=message, callback_data=callback_data, **kwargs)
@router.callback_query(ContextData.filter(F.command == CallbackCommand.USER_COMMAND))
async def command_callback(message: CallbackQuery, **kwargs):
await command_handler(message=message, **kwargs)
async def command_handler(message: Message | CallbackQuery, **kwargs):
callback_data: ContextData = kwargs.pop("callback_data")
str_command = callback_data.user_command
app: "QBotApp" = kwargs.pop("app")
command = app.bot_commands.get(str_command)
if not command:
return
state: FSMContext = kwargs.pop("state")
state_data = await state.get_data()
if command.register_navigation:
clear_state(state_data=state_data)
if command.clear_navigation:
state_data.pop("navigation_stack", None)
state_data.pop("navigation_context", None)
if command.register_navigation:
stack = save_navigation_context(
callback_data=callback_data, state_data=state_data
)
callback_context = CommandCallbackContext[app.user_class](
message=message,
callback_data=callback_data,
db_session=kwargs.pop("db_session"),
user=kwargs.pop("user"),
app=app,
state_data=state_data,
state=state,
i18n=kwargs.pop("i18n"),
kwargs=kwargs,
)
await command.handler(callback_context)
await state.set_data(state_data)
if command.register_navigation:
stack, navigation_context = get_navigation_context(state_data=state_data)
back_callback_data = pop_navigation_context(stack=stack)
if back_callback_data:
callback_context.keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data=back_callback_data.pack(),
)
)
send_message = get_send_message(message)
if isinstance(message, CallbackCommand):
message = message.message
if callback_context.message_text:
await send_message(
text=callback_context.message_text,
reply_markup=callback_context.keyboard_builder.as_markup(),
)
else:
await message.edit_reply_markup(
reply_markup=callback_context.keyboard_builder.as_markup()
)

View File

@@ -0,0 +1,150 @@
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING
from qbot.utils.main import clear_state
from qbot.utils.navigation import (
save_navigation_context,
get_navigation_context,
pop_navigation_context,
)
from qbot.bot.handlers.editors.main import field_editor
from qbot.bot.handlers.common.routing import route_callback
from qbot.utils.serialization import deserialize
from qbot.utils.main import get_send_message
from qbot.model.descriptors import CommandCallbackContext
from qbot.model.settings import Settings
if TYPE_CHECKING:
from qbot.main import QBotApp
from ..context import ContextData, CallbackCommand, CommandContext
router = Router()
@router.message(F.text.startswith("/"))
async def command_text(message: Message, **kwargs):
str_command = message.text.lstrip("/")
callback_data = ContextData(
command=CallbackCommand.USER_COMMAND, user_command=str_command
)
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
await cammand_handler(message=message, callback_data=callback_data, **kwargs)
@router.callback_query(ContextData.filter(F.command == CallbackCommand.USER_COMMAND))
async def command_callback(message: CallbackQuery, **kwargs):
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
kwargs["state_data"] = state_data
await cammand_handler(message=message, **kwargs)
async def cammand_handler(message: Message | CallbackQuery, **kwargs):
callback_data: ContextData = kwargs["callback_data"]
state: FSMContext = kwargs["state"]
state_data: dict = kwargs["state_data"]
app: "QBotApp" = kwargs["app"]
cmd = app.bot_commands.get(callback_data.user_command.split("&")[0])
if cmd is None:
return
entity_data_dict: dict = state_data.get("entity_data")
form_data = (
{
key: await deserialize(
session=kwargs["db_session"],
type_=cmd.param_form[key].type_,
value=value,
)
for key, value in entity_data_dict.items()
}
if entity_data_dict and cmd.param_form
else None
)
callback_context = CommandCallbackContext(
message=message,
callback_data=callback_data,
form_data=form_data,
db_session=kwargs["db_session"],
user=kwargs["user"],
app=app,
state_data=state_data,
state=state,
i18n=kwargs["i18n"],
register_navigation=cmd.register_navigation,
kwargs=kwargs,
)
if cmd.pre_check and (not cmd.param_form or (cmd.param_form and form_data is None)):
if iscoroutinefunction(cmd.pre_check):
if not await cmd.pre_check(callback_context):
return
else:
if not cmd.pre_check(callback_context):
return
if form_data is None and cmd.param_form:
field_descriptor = list(cmd.param_form.values())[0]
kwargs["callback_data"] = ContextData(
command=CallbackCommand.FIELD_EDITOR,
context=CommandContext.COMMAND_FORM,
field_name=field_descriptor.name,
user_command=callback_data.user_command,
)
return await field_editor(message=message, **kwargs)
if cmd.clear_navigation:
state_data.pop("navigation_stack", None)
state_data.pop("navigation_context", None)
if cmd.register_navigation:
clear_state(state_data=state_data)
save_navigation_context(callback_data=callback_data, state_data=state_data)
await cmd.handler(callback_context)
if callback_context.register_navigation:
await state.set_data(state_data)
# if command.register_navigation:
stack, navigation_context = get_navigation_context(state_data=state_data)
back_callback_data = pop_navigation_context(stack=stack)
if back_callback_data:
callback_context.keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data=back_callback_data.pack(),
)
)
send_message = get_send_message(message)
if isinstance(message, CallbackQuery):
message = message.message
if callback_context.message_text:
await send_message(
text=callback_context.message_text,
reply_markup=callback_context.keyboard_builder.as_markup(),
)
else:
await message.edit_reply_markup(
reply_markup=callback_context.keyboard_builder.as_markup()
)
else:
clear_state(state_data=state_data)
await route_callback(message, back=True, **kwargs)

23
helpers/__init__.py Normal file
View File

@@ -0,0 +1,23 @@
from aiogram.types import InlineKeyboardButton
from ..utils.navigation import pop_navigation_context
from ..model.descriptors import CommandCallbackContext
from ..model.settings import Settings
async def get_back_button(
context: CommandCallbackContext, text: str = None
) -> InlineKeyboardButton | None:
stack = context.state_data.get("navigation_stack")
if not stack:
return None
back_callback_data = pop_navigation_context(stack)
if not text:
text = await Settings.get(Settings.APP_STRINGS_BACK_BTN)
return InlineKeyboardButton(
text=text,
callback_data=back_callback_data.pack(),
)

View File

@@ -11,13 +11,13 @@ from typing import (
TYPE_CHECKING,
)
from pydantic import BaseModel
from sqlmodel import SQLModel, BIGINT, Field, select, func, column
from sqlmodel import SQLModel, BigInteger, Field, select, func, column
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.sql.expression import SelectOfScalar
from sqlmodel.main import SQLModelMetaclass, RelationshipInfo
from .descriptors import EntityDescriptor, EntityField, EntityFieldDescriptor, Filter
from .descriptors import EntityDescriptor, EntityField, FieldDescriptor, Filter
from .entity_metadata import EntityMetadata
from . import session_dep
@@ -26,7 +26,7 @@ if TYPE_CHECKING:
class BotEntityMetaclass(SQLModelMetaclass):
__future_references__ = {}
_future_references = {}
def __new__(mcs, name, bases, namespace, **kwargs):
bot_fields_descriptors = {}
@@ -35,7 +35,7 @@ class BotEntityMetaclass(SQLModelMetaclass):
bot_entity_descriptor = bases[0].__dict__.get("bot_entity_descriptor")
bot_fields_descriptors = (
{
key: EntityFieldDescriptor(**value.__dict__.copy())
key: FieldDescriptor(**value.__dict__.copy())
for key, value in bot_entity_descriptor.fields_descriptors.items()
}
if bot_entity_descriptor
@@ -71,7 +71,7 @@ class BotEntityMetaclass(SQLModelMetaclass):
type_origin = get_origin(type_)
field_descriptor = EntityFieldDescriptor(
field_descriptor = FieldDescriptor(
name=descriptor_name,
field_name=annotation,
type_=type_,
@@ -80,18 +80,19 @@ class BotEntityMetaclass(SQLModelMetaclass):
)
is_list = False
is_optional = False
if type_origin is list:
field_descriptor.is_list = is_list = True
field_descriptor.type_base = type_ = get_args(type_)[0]
if type_origin == Union and isinstance(get_args(type_)[0], ForwardRef):
field_descriptor.is_optional = True
field_descriptor.is_optional = is_optional = True
field_descriptor.type_base = type_ = get_args(type_)[
0
].__forward_arg__
if type_origin == UnionType and get_args(type_)[1] == NoneType:
field_descriptor.is_optional = True
field_descriptor.is_optional = is_optional = True
field_descriptor.type_base = type_ = get_args(type_)[0]
if isinstance(type_, str):
@@ -100,18 +101,16 @@ class BotEntityMetaclass(SQLModelMetaclass):
entity_descriptor
) in EntityMetadata().entity_descriptors.values():
if type_ == entity_descriptor.class_name:
field_descriptor.type_base = entity_descriptor.type_
field_descriptor.type_ = (
list[entity_descriptor.type_]
if is_list
else (
Optional[entity_descriptor.type_]
if type_origin == Optional
if type_origin == Union and is_optional
else (
entity_descriptor.type_ | None
if (
type_origin == UnionType
and get_args(type_)[1] == NoneType
)
if (type_origin == UnionType and is_optional)
else entity_descriptor.type_
)
)
@@ -119,10 +118,10 @@ class BotEntityMetaclass(SQLModelMetaclass):
type_not_found = False
break
if type_not_found:
if type_ in mcs.__future_references__:
mcs.__future_references__[type_].append(field_descriptor)
if type_ in mcs._future_references:
mcs._future_references[type_].append(field_descriptor)
else:
mcs.__future_references__[type_] = [field_descriptor]
mcs._future_references[type_] = [field_descriptor]
bot_fields_descriptors[descriptor_name] = field_descriptor
@@ -191,14 +190,14 @@ class BotEntityMetaclass(SQLModelMetaclass):
type_ = super().__new__(mcs, name, bases, namespace, **kwargs)
if name in mcs.__future_references__:
for field_descriptor in mcs.__future_references__[name]:
if name in mcs._future_references:
for field_descriptor in mcs._future_references[name]:
type_origin = get_origin(field_descriptor.type_)
field_descriptor.type_base = type_
field_descriptor.type_ = (
list[type_]
if get_origin(field_descriptor.type_) is list
if type_origin is list
else (
Optional[type_]
if type_origin == Union
@@ -220,7 +219,9 @@ class BotEntity[CreateSchemaType: BaseModel, UpdateSchemaType: BaseModel](
bot_entity_descriptor: ClassVar[EntityDescriptor]
entity_metadata: ClassVar[EntityMetadata]
id: int = Field(primary_key=True, sa_type=BIGINT)
id: int = EntityField(
sm_descriptor=Field(primary_key=True, sa_type=BigInteger), is_visible=False
)
@classmethod
@session_dep
@@ -228,7 +229,7 @@ class BotEntity[CreateSchemaType: BaseModel, UpdateSchemaType: BaseModel](
return await session.get(cls, id, populate_existing=True)
@classmethod
def _static_fiter_condition(
def _static_filter_condition(
cls, select_statement: SelectOfScalar[Self], static_filter: list[Filter]
):
for sfilt in static_filter:
@@ -292,7 +293,7 @@ class BotEntity[CreateSchemaType: BaseModel, UpdateSchemaType: BaseModel](
select_statement = select(func.count()).select_from(cls)
if static_filter:
if isinstance(static_filter, list):
select_statement = cls._static_fiter_condition(
select_statement = cls._static_filter_condition(
select_statement, static_filter
)
else:
@@ -327,7 +328,7 @@ class BotEntity[CreateSchemaType: BaseModel, UpdateSchemaType: BaseModel](
select_statement = select_statement.limit(limit)
if static_filter:
if isinstance(static_filter, list):
select_statement = cls._static_fiter_condition(
select_statement = cls._static_filter_condition(
select_statement, static_filter
)
else:

View File

@@ -143,7 +143,13 @@ class BotEnum(EnumMember, metaclass=BotEnumMetaclass):
class EnumType(TypeDecorator):
impl = String(256)
impl = String(64)
cache_ok = True
# class comparator_factory(TypeDecorator.Comparator):
# def __eq__(self, other):
# expr = type_coerce(self.expr, String)
# return expr != other.value
def __init__(self, enum_type: BotEnum):
self._enum_type = enum_type

View File

@@ -2,7 +2,7 @@ from aiogram.types import Message, CallbackQuery
from aiogram.fsm.context import FSMContext
from aiogram.utils.i18n import I18n
from aiogram.utils.keyboard import InlineKeyboardBuilder
from typing import Any, Callable, TYPE_CHECKING, Literal
from typing import Any, Callable, TYPE_CHECKING, Literal, Union
from babel.support import LazyProxy
from dataclasses import dataclass, field
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -18,22 +18,21 @@ if TYPE_CHECKING:
EntityCaptionCallable = Callable[["EntityDescriptor"], str]
EntityItemCaptionCallable = Callable[["EntityDescriptor", Any], str]
EntityFieldCaptionCallable = Callable[["EntityFieldDescriptor", Any, Any], str]
EntityFieldCaptionCallable = Callable[["FieldDescriptor", Any, Any], str]
@dataclass
class FieldEditButton:
field_name: str
visibility: Callable[[Any], bool] | None = None
caption: str | LazyProxy | EntityFieldCaptionCallable | None = None
visibility: Callable[[Any], bool] | None = None
@dataclass
class CommandButton:
command: str
caption: str | LazyProxy | EntityItemCaptionCallable | None = None
command: ContextData | Callable[[ContextData, Any], ContextData] | str
caption: str | LazyProxy | EntityItemCaptionCallable
visibility: Callable[[Any], bool] | None = None
context_data: ContextData | Callable[[ContextData, Any], ContextData] | None = None
@dataclass
@@ -81,7 +80,7 @@ class EntityForm:
@dataclass(kw_only=True)
class _BaseEntityFieldDescriptor:
class _BaseFieldDescriptor:
icon: str = None
caption: str | LazyProxy | EntityFieldCaptionCallable | None = None
description: str | LazyProxy | EntityFieldCaptionCallable | None = None
@@ -99,18 +98,24 @@ class _BaseEntityFieldDescriptor:
@dataclass(kw_only=True)
class EntityField(_BaseEntityFieldDescriptor):
class EntityField(_BaseFieldDescriptor):
name: str | None = None
sm_descriptor: Any = None
@dataclass(kw_only=True)
class Setting(_BaseEntityFieldDescriptor):
class Setting(_BaseFieldDescriptor):
name: str | None = None
@dataclass(kw_only=True)
class EntityFieldDescriptor(_BaseEntityFieldDescriptor):
class FormField(_BaseFieldDescriptor):
name: str | None = None
type_: type
@dataclass(kw_only=True)
class FieldDescriptor(_BaseFieldDescriptor):
name: str
field_name: str
type_: type
@@ -118,6 +123,7 @@ class EntityFieldDescriptor(_BaseEntityFieldDescriptor):
is_list: bool = False
is_optional: bool = False
entity_descriptor: "EntityDescriptor" = None
command: "BotCommand" = None
def __hash__(self):
return self.name.__hash__()
@@ -162,7 +168,7 @@ class EntityDescriptor(_BaseEntityDescriptor):
name: str
class_name: str
type_: type["BotEntity"]
fields_descriptors: dict[str, EntityFieldDescriptor]
fields_descriptors: dict[str, FieldDescriptor]
@dataclass(kw_only=True)
@@ -179,24 +185,21 @@ class CommandCallbackContext[UT: UserBase]:
app: "QBotApp"
state_data: dict[str, Any]
state: FSMContext
form_data: dict[str, Any]
i18n: I18n
kwargs: dict[str, Any] = field(default_factory=dict)
@dataclass(kw_only=True)
class _BotCommand:
class BotCommand:
name: str
caption: str | dict[str, str] | None = None
pre_check: Callable[[Union[Message, CallbackQuery], Any], bool] | None = None
show_in_bot_commands: bool = False
register_navigation: bool = True
clear_navigation: bool = False
clear_state: bool = True
@dataclass(kw_only=True)
class BotCommand(_BotCommand):
param_form: dict[str, FieldDescriptor] | None = None
show_cancel_in_param_form: bool = True
show_back_in_param_form: bool = True
handler: Callable[[CommandCallbackContext], None]
@dataclass(kw_only=True)
class Command(_BotCommand): ...

View File

@@ -6,7 +6,7 @@ from typing import Any, get_args, get_origin
from ..db import async_session
from .role import RoleBase
from .descriptors import EntityFieldDescriptor, Setting
from .descriptors import FieldDescriptor, Setting
from ..utils.serialization import deserialize, serialize
import ujson as json
@@ -39,7 +39,7 @@ class SettingsMetaclass(type):
if isinstance(attr_value, Setting):
descriptor_kwargs = attr_value.__dict__.copy()
name = descriptor_kwargs.pop("name") or annotation
attributes[annotation] = EntityFieldDescriptor(
attributes[annotation] = FieldDescriptor(
name=name,
field_name=annotation,
type_=type_,
@@ -48,7 +48,7 @@ class SettingsMetaclass(type):
)
else:
attributes[annotation] = EntityFieldDescriptor(
attributes[annotation] = FieldDescriptor(
name=annotation,
field_name=annotation,
type_=type_,
@@ -83,7 +83,7 @@ class SettingsMetaclass(type):
class Settings(metaclass=SettingsMetaclass):
_cache: dict[str, Any] = dict[str, Any]()
_settings_descriptors: dict[str, EntityFieldDescriptor] = {}
_settings_descriptors: dict[str, FieldDescriptor] = {}
PAGE_SIZE: int = Setting(
default=10,
@@ -213,7 +213,7 @@ class Settings(metaclass=SettingsMetaclass):
return ret_val
@classmethod
async def load_param(cls, param: EntityFieldDescriptor) -> Any:
async def load_param(cls, param: FieldDescriptor) -> Any:
async with async_session() as session:
db_setting = (
await session.exec(
@@ -244,7 +244,7 @@ class Settings(metaclass=SettingsMetaclass):
db_settings = (await session.exec(select(DbSettings))).all()
for db_setting in db_settings:
if db_setting.name in cls.__dict__:
setting = cls.__dict__[db_setting.name] # type: EntityFieldDescriptor
setting = cls.__dict__[db_setting.name] # type: FieldDescriptor
cls._cache[db_setting.name] = await deserialize(
session=session,
type_=setting.type_,
@@ -255,7 +255,7 @@ class Settings(metaclass=SettingsMetaclass):
cls._loaded = True
@classmethod
async def set_param(cls, param: str | EntityFieldDescriptor, value) -> None:
async def set_param(cls, param: str | FieldDescriptor, value) -> None:
if isinstance(param, str):
param = cls._settings_descriptors[param]
ser_value = serialize(value, param)
@@ -273,11 +273,11 @@ class Settings(metaclass=SettingsMetaclass):
cls._cache[param.field_name] = value
@classmethod
def list_params(cls) -> dict[str, EntityFieldDescriptor]:
def list_params(cls) -> dict[str, FieldDescriptor]:
return cls._settings_descriptors
@classmethod
async def get_params(cls) -> dict[EntityFieldDescriptor, Any]:
async def get_params(cls) -> dict[FieldDescriptor, Any]:
params = cls.list_params()
return {
param: await cls.get(param, all_locales=True) for _, param in params.items()

View File

@@ -1,4 +1,4 @@
from sqlmodel import SQLModel, Field, BIGINT
from sqlmodel import SQLModel, Field, BigInteger
from sqlalchemy.ext.asyncio.session import AsyncSession
from . import session_dep
@@ -7,7 +7,7 @@ from . import session_dep
class ViewSetting(SQLModel, table=True):
__tablename__ = "view_setting"
user_id: int = Field(
sa_type=BIGINT, primary_key=True, foreign_key="user.id", ondelete="CASCADE"
sa_type=BigInteger, primary_key=True, foreign_key="user.id", ondelete="CASCADE"
)
entity_name: str = Field(primary_key=True)
filter: str | None = None

View File

@@ -1,26 +1,75 @@
from aiogram.types import CallbackQuery, Message
from functools import wraps
from typing import Callable, overload
from .model.descriptors import BotCommand, Command, CommandCallbackContext
from types import UnionType
from typing import Callable, Union, get_args, get_origin, Any
from .model.descriptors import (
BotCommand,
CommandCallbackContext,
FieldDescriptor,
FormField,
)
class Router:
def __init__(self):
self._commands = dict[str, BotCommand]()
@overload
def command(self, command: Command): ...
@overload
def command(self, command: str, caption: str | dict[str, str] | None = None): ...
def command(
self, command: str | Command, caption: str | dict[str, str] | None = None
self,
name: str,
caption: str | dict[str, str] | None = None,
pre_check: Callable[[Union[Message, CallbackQuery], Any], bool] | None = None,
# handle_message: bool = False,
show_in_bot_commands: bool = False,
register_navigation: bool = True,
clear_navigation: bool = False,
clear_state: bool = True,
show_cancel_in_param_form: bool = True,
show_back_in_param_form: bool = True,
form_fields: list[FormField] = list[FormField](),
):
def decorator(func: Callable[[CommandCallbackContext], None]):
if isinstance(command, str):
cmd = BotCommand(name=command, handler=func, caption=caption)
else:
cmd = BotCommand(handler=func, **command.__dict__)
form_fields_dict = dict[str, FieldDescriptor]()
for field in form_fields:
is_list = False
is_optional = False
type_origin = get_origin(field.type_)
if type_origin is list:
is_list = True
type_base = get_args(field.type_)[0]
elif type_origin in [Union, UnionType] and type(None) in get_args(
field.type_
):
is_optional = True
type_base = get_args(field.type_)[0]
else:
type_base = field.type_
form_fields_dict[field.name] = FieldDescriptor(
field_name=field.name,
type_base=type_base,
is_list=is_list,
is_optional=is_optional,
**field.__dict__,
)
cmd = BotCommand(
name=name,
caption=caption,
pre_check=pre_check,
show_in_bot_commands=show_in_bot_commands,
register_navigation=register_navigation,
clear_navigation=clear_navigation,
clear_state=clear_state,
param_form=form_fields_dict,
show_cancel_in_param_form=show_cancel_in_param_form,
show_back_in_param_form=show_back_in_param_form,
handler=func,
)
for field in form_fields_dict.values():
field.command = cmd
self._commands[cmd.name] = cmd
@wraps(func)

View File

@@ -1,6 +1,7 @@
from babel.support import LazyProxy
from inspect import signature
from aiogram.types import Message, CallbackQuery
from aiogram.utils.i18n import I18n
from typing import Any, TYPE_CHECKING
import ujson as json
@@ -9,7 +10,7 @@ from ..model.bot_enum import BotEnum
from ..model.settings import Settings
from ..model.descriptors import (
EntityFieldDescriptor,
FieldDescriptor,
EntityDescriptor,
EntityItemCaptionCallable,
EntityFieldCaptionCallable,
@@ -36,7 +37,13 @@ def get_user_permissions(
return permissions
def get_local_text(text: str, locale: str) -> str:
def get_local_text(text: str, locale: str = None) -> str:
if not locale:
i18n = I18n.get_current(no_error=True)
if i18n:
locale = i18n.current_locale
else:
locale = "en"
try:
obj = json.loads(text) # @IgnoreException
except Exception:
@@ -116,7 +123,7 @@ def get_entity_item_repr(
def get_value_repr(
value: Any, field_descriptor: EntityFieldDescriptor, locale: str | None = None
value: Any, field_descriptor: FieldDescriptor, locale: str | None = None
) -> str:
if value is None:
return ""
@@ -157,7 +164,7 @@ def get_callable_str(
| EntityItemCaptionCallable
| EntityFieldCaptionCallable
),
descriptor: EntityFieldDescriptor | EntityDescriptor,
descriptor: FieldDescriptor | EntityDescriptor,
entity: Any = None,
value: Any = None,
) -> str:
@@ -185,9 +192,17 @@ def get_entity_descriptor(
def get_field_descriptor(
app: "QBotApp", callback_data: ContextData
) -> EntityFieldDescriptor | None:
) -> FieldDescriptor | None:
if callback_data.context == CommandContext.SETTING_EDIT:
return Settings.list_params()[callback_data.field_name]
elif callback_data.context == CommandContext.COMMAND_FORM:
command = app.bot_commands[callback_data.user_command.split("&")[0]]
if (
command
and command.param_form
and callback_data.field_name in command.param_form
):
return command.param_form[callback_data.field_name]
elif callback_data.context in [
CommandContext.ENTITY_CREATE,
CommandContext.ENTITY_EDIT,

View File

@@ -1,4 +1,4 @@
from .context import ContextData, CallbackCommand
from ..bot.handlers.context import ContextData, CallbackCommand
def save_navigation_context(

View File

@@ -8,7 +8,7 @@ import ujson as json
from ..model.bot_entity import BotEntity
from ..model.bot_enum import BotEnum
from ..model.descriptors import EntityFieldDescriptor
from ..model.descriptors import FieldDescriptor
async def deserialize[T](session: AsyncSession, type_: type[T], value: str = None) -> T:
@@ -73,7 +73,7 @@ async def deserialize[T](session: AsyncSession, type_: type[T], value: str = Non
return type_(value)
def serialize(value: Any, field_descriptor: EntityFieldDescriptor) -> str:
def serialize(value: Any, field_descriptor: FieldDescriptor) -> str:
if value is None:
return ""
type_ = field_descriptor.type_base