add bot entity's events, external bot command call
Some checks failed
Build Docs / changes (push) Failing after 2s
Build Docs / build-docs (push) Has been skipped
Build Docs / deploy-docs (push) Has been skipped

This commit is contained in:
Alexander Kalinovsky
2025-04-30 19:21:29 +07:00
parent a4999159b9
commit 90652b9f3f
18 changed files with 555 additions and 260 deletions

View File

@@ -19,4 +19,5 @@ from .model.descriptors import (
CommandButton as CommandButton,
FieldEditButton as FieldEditButton,
InlineButton as InlineButton,
FormField as FormField,
)

View File

@@ -13,7 +13,7 @@ def add_pagination_controls(
):
if total_pages > 1:
navigation_buttons = []
ContextData(**callback_data.model_dump()).__setattr__
# ContextData(**callback_data.model_dump()).__setattr__
if total_pages > 10:
navigation_buttons.append(
InlineKeyboardButton(

View File

@@ -12,17 +12,18 @@ from ....utils.navigation import (
pop_navigation_context,
)
import quickbot.bot.handlers.menu.main as menu_main
import quickbot.bot.handlers.menu.settings as menu_settings
import quickbot.bot.handlers.menu.parameters as menu_parameters
import quickbot.bot.handlers.menu.language as menu_language
import quickbot.bot.handlers.menu.entities as menu_entities
import quickbot.bot.handlers.forms.entity_list as form_list
import quickbot.bot.handlers.forms.entity_form as form_item
import quickbot.bot.handlers.editors.main as editor
async def route_callback(message: Message | CallbackQuery, back: bool = True, **kwargs):
import quickbot.bot.handlers.menu.main as menu_main
import quickbot.bot.handlers.menu.language as menu_language
import quickbot.bot.handlers.menu.settings as menu_settings
import quickbot.bot.handlers.menu.parameters as menu_parameters
import quickbot.bot.handlers.menu.entities as menu_entities
import quickbot.bot.handlers.forms.entity_list as form_list
import quickbot.bot.handlers.forms.entity_form as form_item
import quickbot.bot.handlers.editors.main as editor
import quickbot.bot.handlers.user_handlers.main as user_handler
state_data = kwargs["state_data"]
stack, context = get_navigation_context(state_data)
if back:
@@ -47,12 +48,10 @@ async def route_callback(message: Message | CallbackQuery, back: bool = True, **
elif context.command == CallbackCommand.FIELD_EDITOR:
await editor.field_editor(message, **kwargs)
elif context.command == CallbackCommand.USER_COMMAND:
import quickbot.bot.handlers.user_handlers.main as user_handler
app: "QBotApp" = kwargs["app"]
cmd = app.bot_commands.get(context.user_command.split("&")[0])
await user_handler.cammand_handler(message=message, cmd=cmd, **kwargs)
await user_handler.command_handler(message=message, cmd=cmd, **kwargs)
else:
raise ValueError(f"Unknown command {context.command}")
else:

View File

@@ -21,6 +21,7 @@ class CallbackCommand(StrEnum):
ENTITY_PICKER_TOGGLE_ITEM = "et"
VIEW_FILTER_EDIT = "vf"
USER_COMMAND = "uc"
DELETE_MESSAGE = "dl"
class CommandContext(StrEnum):

View File

@@ -1,8 +1,10 @@
from aiogram.types import Message, CallbackQuery
from decimal import Decimal
from datetime import datetime, time
from typing import TYPE_CHECKING
from quickbot.main import QBotApp
if TYPE_CHECKING:
from quickbot.main import QBotApp
from quickbot.utils.serialization import deserialize
from ....model.bot_entity import BotEntity
@@ -25,7 +27,7 @@ async def show_editor(message: Message | CallbackQuery, **kwargs):
callback_data: ContextData = kwargs.get("callback_data", None)
state_data: dict = kwargs["state_data"]
db_session = kwargs["db_session"]
app: QBotApp = kwargs["app"]
app: "QBotApp" = kwargs["app"]
value_type = field_descriptor.type_base

View File

@@ -1,3 +1,4 @@
from inspect import iscoroutinefunction
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
@@ -14,13 +15,14 @@ from ....model.bot_enum import BotEnum
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.view_setting import ViewSetting
from ....model.descriptors import BotContext, EntityList, FieldDescriptor, Filter
from ....model.descriptors import BotContext, EntityList, FieldDescriptor
from ....model import EntityPermission
from ....utils.main import (
get_user_permissions,
get_send_message,
get_field_descriptor,
get_callable_str,
prepare_static_filter,
)
from ....utils.serialization import serialize, deserialize
from ..context import ContextData, CallbackCommand
@@ -108,8 +110,34 @@ async def render_entity_picker(
for item in enum_items
]
elif issubclass(type_, BotEntity):
ep_form = "default"
ep_form_params = []
if field_descriptor.ep_form:
if callable(field_descriptor.ep_form):
context = BotContext(
db_session=db_session,
app=kwargs["app"],
app_state=kwargs["app_state"],
user=user,
message=message,
)
if iscoroutinefunction(field_descriptor.ep_form):
ep_form = await field_descriptor.ep_form(context)
else:
ep_form = field_descriptor.ep_form(context)
else:
ep_form = field_descriptor.ep_form
ep_form_list = ep_form.split("&")
ep_form = ep_form_list[0]
ep_form_params = ep_form_list[1:] if len(ep_form_list) > 1 else []
form_list: EntityList = type_.bot_entity_descriptor.lists.get(
field_descriptor.ep_form, type_.bot_entity_descriptor.default_list
ep_form, type_.bot_entity_descriptor.default_list
)
permissions = get_user_permissions(user, type_.bot_entity_descriptor)
if form_list.filtering:
@@ -125,7 +153,7 @@ async def render_entity_picker(
if list_all or EntityPermission.LIST in permissions:
if (
field_descriptor.ep_parent_field
and field_descriptor.ep_parent_field
and field_descriptor.ep_child_field
and callback_data.entity_id
):
entity = await field_descriptor.entity_descriptor.type_.get(
@@ -140,19 +168,11 @@ async def render_entity_picker(
if form_list.pagination:
items_count = await type_.get_count(
session=db_session,
static_filter=(
[
Filter(
field_name=f.field_name,
operator=f.operator,
value_type="const",
value=f.value,
)
for f in form_list.static_filters
if f.value_type == "const"
]
if isinstance(form_list.static_filters, list)
else form_list.static_filters
static_filter=await prepare_static_filter(
db_session=db_session,
entity_descriptor=type_.bot_entity_descriptor,
static_filters=form_list.static_filters,
params=ep_form_params,
),
ext_filter=ext_filter,
filter=entity_filter,
@@ -169,19 +189,11 @@ async def render_entity_picker(
entity_items = await type_.get_multi(
session=db_session,
order_by=form_list.order_by,
static_filter=(
[
Filter(
field_name=f.field_name,
operator=f.operator,
value_type="const",
value=f.value,
)
for f in form_list.static_filters
if f.value_type == "const"
]
if isinstance(form_list.static_filters, list)
else form_list.static_filters
static_filter=await prepare_static_filter(
db_session=db_session,
entity_descriptor=type_.bot_entity_descriptor,
static_filters=form_list.static_filters,
params=ep_form_params,
),
ext_filter=ext_filter,
filter=entity_filter,
@@ -313,9 +325,14 @@ async def render_entity_picker(
await state.set_data(state_data)
send_message = get_send_message(message)
await send_message(text=edit_prompt, reply_markup=keyboard_builder.as_markup())
if message:
send_message = get_send_message(message)
await send_message(text=edit_prompt, reply_markup=keyboard_builder.as_markup())
else:
app: "QBotApp" = kwargs["app"]
await app.bot.send_message(
chat_id=user.id, text=edit_prompt, reply_markup=keyboard_builder.as_markup()
)
@router.callback_query(

View File

@@ -1,10 +1,11 @@
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery
from logging import getLogger
from sqlalchemy.orm.collections import InstrumentedList
from sqlmodel.ext.asyncio.session import AsyncSession
from quickbot.model.descriptors import BotContext, EntityForm
from ....model import EntityPermission
@@ -14,6 +15,7 @@ from ....utils.main import (
build_field_sequence,
check_entity_permission,
get_field_descriptor,
clear_state,
)
from ....utils.serialization import deserialize, serialize
from ..context import ContextData, CallbackCommand, CommandContext
@@ -92,6 +94,14 @@ async def field_editor(message: Message | CallbackQuery, **kwargs):
current_value = None
context = BotContext(
db_session=db_session,
app=app,
app_state=kwargs["app_state"],
user=user,
message=message,
)
if (
field_descriptor.type_base is bool
and callback_data.context == CommandContext.ENTITY_FIELD_EDIT
@@ -102,19 +112,73 @@ async def field_editor(message: Message | CallbackQuery, **kwargs):
if check_entity_permission(
entity=entity, user=user, permission=EntityPermission.UPDATE
):
old_values = {}
for f in entity.bot_entity_descriptor.fields_descriptors.values():
value = getattr(entity, f.field_name)
if isinstance(value, InstrumentedList):
value = list(value)
old_values[f.field_name] = value
new_values = old_values.copy()
current_value: bool = (
getattr(entity, field_descriptor.field_name) or False
)
setattr(entity, field_descriptor.field_name, not current_value)
new_values[field_descriptor.field_name] = not current_value
await db_session.commit()
stack, context = get_navigation_context(state_data=state_data)
can_update = True
kwargs.update({"callback_data": context})
if entity_descriptor.before_update_save:
if iscoroutinefunction(entity_descriptor.before_update_save):
can_update = await entity_descriptor.before_update_save(
old_values,
new_values,
context,
)
else:
can_update = entity_descriptor.before_update_save(
old_values,
new_values,
context,
)
if isinstance(can_update, str):
await message.answer(text=can_update, **{"show_alert": True})
elif not can_update:
await message.answer(
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)),
**{"show_alert": True},
)
return await entity_item(
query=message, navigation_stack=stack, **kwargs
)
if isinstance(can_update, bool) and can_update:
for attr in new_values:
if attr != "id":
setattr(entity, attr, new_values[attr])
await db_session.commit()
if entity_descriptor.on_updated:
if iscoroutinefunction(entity_descriptor.on_updated):
await entity_descriptor.on_updated(
old_values,
entity,
context,
)
else:
entity_descriptor.on_updated(
old_values,
entity,
context,
)
stack, context_data = get_navigation_context(state_data=state_data)
kwargs.update({"callback_data": context_data})
return await entity_item(
query=message, navigation_stack=stack, **kwargs
)
return
if not entity_data and callback_data.context in [
CommandContext.ENTITY_EDIT,
@@ -138,13 +202,6 @@ async def field_editor(message: Message | CallbackQuery, **kwargs):
if form.edit_field_sequence:
field_sequence = form.edit_field_sequence
else:
context = BotContext(
db_session=db_session,
app=app,
app_state=kwargs["app_state"],
user=user,
message=message,
)
field_sequence = await build_field_sequence(
entity_descriptor=entity_descriptor,
user=user,
@@ -168,6 +225,24 @@ async def field_editor(message: Message | CallbackQuery, **kwargs):
}
state_data.update({"entity_data": entity_data})
if callback_data.context == CommandContext.ENTITY_CREATE:
if entity_descriptor.before_create:
if iscoroutinefunction(entity_descriptor.before_create):
can_create = await entity_descriptor.before_create(
context,
)
else:
can_create = entity_descriptor.before_create(
context,
)
if isinstance(can_create, str):
return await message.answer(text=can_create, **{"show_alert": True})
elif not can_create:
return await message.answer(
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)),
**{"show_alert": True},
)
if entity_data:
current_value = await deserialize(
session=db_session,
@@ -181,6 +256,16 @@ async def field_editor(message: Message | CallbackQuery, **kwargs):
await show_editor(message=message, current_value=current_value, **kwargs)
@router.callback_query(ContextData.filter(F.command == CallbackCommand.DELETE_MESSAGE))
async def delete_message_callback(query: CallbackQuery, **kwargs):
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
clear_state(state_data=state_data)
await state.set_data(state_data)
await query.message.delete()
router.include_routers(
string_editor_router,
date_picker_router,

View File

@@ -2,6 +2,7 @@ from inspect import iscoroutinefunction
from aiogram import Router, F
from aiogram.types import Message, CallbackQuery
from aiogram.fsm.context import FSMContext
from sqlalchemy.orm.collections import InstrumentedList
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import TYPE_CHECKING, Any
from decimal import Decimal
@@ -9,7 +10,7 @@ import json
from ..context import ContextData, CallbackCommand, CommandContext
from ...command_context_filter import CallbackCommandFilter
from ..user_handlers.main import cammand_handler
from ..user_handlers.main import command_handler
from ....model import EntityPermission
from ....model.user import UserBase
from ....model.settings import Settings
@@ -354,44 +355,68 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN))
)
new_entity = await entity_type.create(
session=db_session,
obj_in=entity_type(**deser_entity_data),
commit=True,
)
new_entity = entity_type(**deser_entity_data)
if entity_descriptor.on_created:
if iscoroutinefunction(entity_descriptor.on_created):
await entity_descriptor.on_created(
can_create = True
if entity_descriptor.before_update_save:
if iscoroutinefunction(entity_descriptor.before_update_save):
can_create = await entity_descriptor.before_update_save(
new_entity,
context,
)
else:
entity_descriptor.on_created(
can_create = entity_descriptor.before_update_save(
new_entity,
context,
)
if isinstance(can_create, str):
await message.answer(text=can_create, **{"show_alert": True})
elif not can_create:
await message.answer(
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)),
**{"show_alert": True},
)
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else None
)
form_list = entity_descriptor.lists.get(
form_name, entity_descriptor.default_list
)
if isinstance(can_create, bool) and can_create:
new_entity = await entity_type.create(
session=db_session,
obj_in=new_entity,
commit=True,
)
state_data["navigation_context"] = ContextData(
command=CallbackCommand.ENTITY_ITEM,
entity_name=entity_descriptor.name,
form_params=form_list.item_form,
entity_id=str(new_entity.id),
).pack()
if entity_descriptor.on_created:
if iscoroutinefunction(entity_descriptor.on_created):
await entity_descriptor.on_created(
new_entity,
context,
)
else:
entity_descriptor.on_created(
new_entity,
context,
)
state_data.update(state_data)
form_name = (
callback_data.form_params.split("&")[0]
if callback_data.form_params
else None
)
form_list = entity_descriptor.lists.get(
form_name, entity_descriptor.default_list
)
clear_state(state_data=state_data)
return await route_callback(message=message, back=False, **kwargs)
state_data["navigation_context"] = ContextData(
command=CallbackCommand.ENTITY_ITEM,
entity_name=entity_descriptor.name,
form_params=form_list.item_form,
entity_id=str(new_entity.id),
).pack()
state_data.update(state_data)
clear_state(state_data=state_data)
return await route_callback(message=message, back=False, **kwargs)
elif callback_data.context in [
CommandContext.ENTITY_EDIT,
@@ -412,27 +437,67 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN))
)
old_values = {}
for f in entity.bot_entity_descriptor.fields_descriptors.values():
value = getattr(entity, f.field_name)
if isinstance(value, InstrumentedList):
value = list(value)
old_values[f.field_name] = value
new_values = old_values.copy()
for key, value in deser_entity_data.items():
setattr(
entity,
entity.bot_entity_descriptor.fields_descriptors[key].field_name,
value,
)
new_values[
entity.bot_entity_descriptor.fields_descriptors[key].field_name
] = value
await db_session.commit()
await db_session.refresh(entity)
can_update = True
if entity_descriptor.on_updated:
if iscoroutinefunction(entity_descriptor.on_updated):
await entity_descriptor.on_updated(
entity,
if entity_descriptor.before_update_save:
if iscoroutinefunction(entity_descriptor.before_update_save):
can_update = await entity_descriptor.before_update_save(
old_values,
new_values,
context,
)
else:
entity_descriptor.on_updated(
entity,
can_update = entity_descriptor.before_update_save(
old_values,
new_values,
context,
)
if isinstance(can_update, str):
await message.answer(text=can_update, **{"show_alert": True})
elif not can_update:
await message.answer(
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)),
**{"show_alert": True},
)
if isinstance(can_update, bool) and can_update:
for attr in new_values:
if attr != "id":
setattr(entity, attr, new_values[attr])
await db_session.commit()
await db_session.refresh(entity)
if entity_descriptor.on_updated:
if iscoroutinefunction(entity_descriptor.on_updated):
await entity_descriptor.on_updated(
old_values,
entity,
context,
)
else:
entity_descriptor.on_updated(
old_values,
entity,
context,
)
else:
await db_session.rollback()
elif callback_data.context == CommandContext.COMMAND_FORM:
clear_state(state_data=state_data)
@@ -449,7 +514,7 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
cmd = app.bot_commands.get(callback_data.user_command.split("&")[0])
return await cammand_handler(message=message, cmd=cmd, **kwargs)
return await command_handler(message=message, cmd=cmd, **kwargs)
clear_state(state_data=state_data)

View File

@@ -99,7 +99,9 @@ async def wrap_editor(
keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_CANCEL_BTN)),
callback_data=context.pack(),
callback_data=context.pack()
if context
else ContextData(command=CallbackCommand.DELETE_MESSAGE).pack(),
)
)

View File

@@ -58,23 +58,47 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
)
if callback_data.data == "yes":
entity = await entity_descriptor.type_.remove(
session=db_session, id=int(callback_data.entity_id), commit=True
)
can_delete = True
if entity_descriptor.on_deleted:
if iscoroutinefunction(entity_descriptor.on_created):
await entity_descriptor.on_deleted(
if entity_descriptor.before_delete:
if iscoroutinefunction(entity_descriptor.before_delete):
can_delete = await entity_descriptor.before_delete(
entity,
context,
)
else:
entity_descriptor.on_deleted(
can_delete = entity_descriptor.before_delete(
entity,
context,
)
if isinstance(can_delete, str):
await query.answer(text=can_delete, show_alert=True)
elif not can_delete:
await query.answer(
text=(await Settings.get(Settings.APP_STRINGS_FORBIDDEN)),
show_alert=True,
)
await route_callback(message=query, **kwargs)
if isinstance(can_delete, bool) and can_delete:
await db_session.delete(entity)
await db_session.commit()
if entity_descriptor.on_updated:
if iscoroutinefunction(entity_descriptor.on_updated):
await entity_descriptor.on_updated(
entity,
context,
)
else:
entity_descriptor.on_updated(
entity,
context,
)
await route_callback(message=query, **kwargs)
else:
await route_callback(message=query, back=False, **kwargs)
elif not callback_data.data:
entity = await entity_descriptor.type_.get(

View File

@@ -12,10 +12,8 @@ from ....model.user import UserBase
from ....model.view_setting import ViewSetting
from ....model.descriptors import (
BotContext,
EntityDescriptor,
EntityForm,
EntityList,
Filter,
)
from ....model import EntityPermission
from ....utils.main import (
@@ -25,8 +23,8 @@ from ....utils.main import (
get_entity_descriptor,
get_callable_str,
build_field_sequence,
prepare_static_filter,
)
from ....utils.serialization import deserialize
from ..context import ContextData, CallbackCommand, CommandContext
from ..common.pagination import add_pagination_controls
from ..common.filtering import add_filter_controls
@@ -61,37 +59,6 @@ def calc_total_pages(items_count: int, page_size: int) -> int:
return max(items_count // page_size + (1 if items_count % page_size else 0), 1)
async def _prepare_static_filter(
db_session: AsyncSession,
entity_descriptor: EntityDescriptor,
static_filters: list[Filter],
params: list[str],
) -> list[Filter]:
return (
[
Filter(
field_name=f.field_name,
operator=f.operator,
value_type="const",
value=(
f.value
if f.value_type == "const"
else await deserialize(
session=db_session,
type_=entity_descriptor.fields_descriptors[
f.field_name
].type_base,
value=params[f.param_index],
)
),
)
for f in static_filters
]
if static_filters
else None
)
async def entity_list(
message: CallbackQuery | Message,
callback_data: ContextData,
@@ -175,7 +142,7 @@ async def entity_list(
page_size = await Settings.get(Settings.PAGE_SIZE)
items_count = await entity_type.get_count(
session=db_session,
static_filter=await _prepare_static_filter(
static_filter=await prepare_static_filter(
db_session=db_session,
entity_descriptor=entity_descriptor,
static_filters=form_list.static_filters,
@@ -196,7 +163,7 @@ async def entity_list(
items = await entity_type.get_multi(
session=db_session,
order_by=form_list.order_by,
static_filter=await _prepare_static_filter(
static_filter=await prepare_static_filter(
db_session=db_session,
entity_descriptor=entity_descriptor,
static_filters=form_list.static_filters,

View File

@@ -15,7 +15,6 @@ from ....model.language import LanguageBase
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
from ..common.routing import route_callback
from ....utils.main import get_send_message
@@ -94,3 +93,6 @@ async def set_language(message: CallbackQuery, **kwargs):
i18n: I18n = kwargs["i18n"]
with i18n.use_locale(user.lang.value):
await route_callback(message, **kwargs)
from ..common.routing import route_callback # noqa: E402

View File

@@ -0,0 +1,126 @@
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING
from quickbot.utils.main import clear_state
from quickbot.utils.navigation import (
get_navigation_context,
pop_navigation_context,
)
from quickbot.bot.handlers.editors.main import field_editor
from quickbot.utils.serialization import deserialize
from quickbot.utils.main import get_send_message
from quickbot.model.descriptors import BotCommand, CommandCallbackContext
from quickbot.model.settings import Settings
if TYPE_CHECKING:
from quickbot.main import QBotApp
from quickbot.model.user import UserBase
from ..context import ContextData, CallbackCommand, CommandContext
async def command_handler(message: Message | CallbackQuery, cmd: BotCommand, **kwargs):
callback_data: ContextData = kwargs["callback_data"]
state: FSMContext = kwargs["state"]
state_data: dict = kwargs["state_data"]
app: "QBotApp" = kwargs["app"]
user: "UserBase" = kwargs["user"]
entity_data_dict: dict = state_data.get("entity_data")
form_data = (
{
key: await deserialize(
session=kwargs["db_session"],
type_=cmd.param_form[key].type_,
value=value,
)
for key, value in entity_data_dict.items()
}
if entity_data_dict and cmd.param_form
else None
)
callback_context = CommandCallbackContext(
message=message,
callback_data=callback_data,
form_data=form_data,
db_session=kwargs["db_session"],
user=user,
app=app,
app_state=kwargs["app_state"],
state_data=state_data,
state=state,
i18n=kwargs["i18n"],
register_navigation=cmd.register_navigation,
clear_navigation=cmd.clear_navigation,
kwargs=kwargs,
)
if cmd.pre_check and (not cmd.param_form or (cmd.param_form and form_data is None)):
if iscoroutinefunction(cmd.pre_check):
if not await cmd.pre_check(callback_context):
return
else:
if not cmd.pre_check(callback_context):
return
if form_data is None and cmd.param_form:
field_descriptor = list(cmd.param_form.values())[0]
kwargs["callback_data"] = ContextData(
command=CallbackCommand.FIELD_EDITOR,
context=CommandContext.COMMAND_FORM,
field_name=field_descriptor.name,
user_command=callback_data.user_command,
)
return await field_editor(message=message, **kwargs)
await cmd.handler(callback_context)
if callback_context.register_navigation:
await state.set_data(state_data)
stack, navigation_context = get_navigation_context(state_data=state_data)
back_callback_data = pop_navigation_context(stack=stack)
if back_callback_data:
callback_context.keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data=back_callback_data.pack(),
)
)
if message:
send_message = get_send_message(message)
if isinstance(message, CallbackQuery):
message = message.message
if callback_context.message_text:
await send_message(
text=callback_context.message_text,
reply_markup=callback_context.keyboard_builder.as_markup(),
)
else:
await message.edit_reply_markup(
reply_markup=callback_context.keyboard_builder.as_markup()
)
else:
if callback_context.message_text:
await app.bot.send_message(
chat_id=user.id,
text=callback_context.message_text,
reply_markup=callback_context.keyboard_builder.as_markup(),
)
if not callback_context.register_navigation:
if callback_context.clear_navigation:
clear_state(state_data=state_data, clear_nav=True)
await state.set_data(state_data)
else:
clear_state(state_data=state_data)
await route_callback(message, back=True, **kwargs)
from quickbot.bot.handlers.common.routing import route_callback # noqa: E402

View File

@@ -1,27 +1,16 @@
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from inspect import iscoroutinefunction
from aiogram.types import Message, CallbackQuery
from typing import TYPE_CHECKING
from quickbot.utils.main import clear_state
from quickbot.utils.navigation import (
save_navigation_context,
get_navigation_context,
pop_navigation_context,
)
from quickbot.bot.handlers.editors.main import field_editor
from quickbot.bot.handlers.common.routing import route_callback
from quickbot.utils.serialization import deserialize
from quickbot.utils.main import get_send_message
from quickbot.model.descriptors import BotCommand, CommandCallbackContext
from quickbot.model.settings import Settings
from quickbot.utils.navigation import save_navigation_context
if TYPE_CHECKING:
from quickbot.main import QBotApp
from ..context import ContextData, CallbackCommand, CommandContext
from ..context import ContextData, CallbackCommand
from .command_handler import command_handler
router = Router()
@@ -68,93 +57,4 @@ async def process_command_handler(message: Message | CallbackQuery, **kwargs):
clear_state(state_data=state_data)
save_navigation_context(callback_data=callback_data, state_data=state_data)
await cammand_handler(message=message, cmd=cmd, **kwargs)
async def cammand_handler(message: Message | CallbackQuery, cmd: BotCommand, **kwargs):
callback_data: ContextData = kwargs["callback_data"]
state: FSMContext = kwargs["state"]
state_data: dict = kwargs["state_data"]
app: "QBotApp" = kwargs["app"]
entity_data_dict: dict = state_data.get("entity_data")
form_data = (
{
key: await deserialize(
session=kwargs["db_session"],
type_=cmd.param_form[key].type_,
value=value,
)
for key, value in entity_data_dict.items()
}
if entity_data_dict and cmd.param_form
else None
)
callback_context = CommandCallbackContext(
message=message,
callback_data=callback_data,
form_data=form_data,
db_session=kwargs["db_session"],
user=kwargs["user"],
app=app,
app_state=kwargs["app_state"],
state_data=state_data,
state=state,
i18n=kwargs["i18n"],
register_navigation=cmd.register_navigation,
kwargs=kwargs,
)
if cmd.pre_check and (not cmd.param_form or (cmd.param_form and form_data is None)):
if iscoroutinefunction(cmd.pre_check):
if not await cmd.pre_check(callback_context):
return
else:
if not cmd.pre_check(callback_context):
return
if form_data is None and cmd.param_form:
field_descriptor = list(cmd.param_form.values())[0]
kwargs["callback_data"] = ContextData(
command=CallbackCommand.FIELD_EDITOR,
context=CommandContext.COMMAND_FORM,
field_name=field_descriptor.name,
user_command=callback_data.user_command,
)
return await field_editor(message=message, **kwargs)
await cmd.handler(callback_context)
if callback_context.register_navigation:
await state.set_data(state_data)
stack, navigation_context = get_navigation_context(state_data=state_data)
back_callback_data = pop_navigation_context(stack=stack)
if back_callback_data:
callback_context.keyboard_builder.row(
InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data=back_callback_data.pack(),
)
)
send_message = get_send_message(message)
if isinstance(message, CallbackQuery):
message = message.message
if callback_context.message_text:
await send_message(
text=callback_context.message_text,
reply_markup=callback_context.keyboard_builder.as_markup(),
)
else:
await message.edit_reply_markup(
reply_markup=callback_context.keyboard_builder.as_markup()
)
else:
clear_state(state_data=state_data)
await route_callback(message, back=True, **kwargs)
await command_handler(message=message, cmd=cmd, **kwargs)

View File

@@ -11,7 +11,9 @@ from fastapi import FastAPI
from fastapi.applications import Lifespan, AppType
from fastapi.datastructures import State
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from quickbot.bot.handlers.user_handlers.main import command_handler
from quickbot.utils.main import clear_state
from quickbot.utils.navigation import save_navigation_context
@@ -185,9 +187,9 @@ class QBotApp(Generic[UserType, ConfigType], FastAPI):
user_id: int,
entity: type[BotEntity] | str,
entity_id: int,
db_session: AsyncSession = None,
form_name: str = None,
form_params: list[Any] = None,
**kwargs,
):
f_params = []
@@ -195,7 +197,7 @@ class QBotApp(Generic[UserType, ConfigType], FastAPI):
f_params.append(form_name)
if form_params:
f_params.extend(form_params)
f_params.extend([str(p) for p in form_params])
if isinstance(entity, type):
entity = entity.bot_entity_descriptor.name
@@ -214,7 +216,6 @@ class QBotApp(Generic[UserType, ConfigType], FastAPI):
callback_data=callback_data, state_data=state_data
)
db_session = kwargs.get("db_session")
if not db_session:
db_session = await get_db()
@@ -226,7 +227,7 @@ class QBotApp(Generic[UserType, ConfigType], FastAPI):
with self.i18n.context(), self.i18n.use_locale(user.lang.value):
await entity_item(
query=None,
db_session=kwargs.get("db_session"),
db_session=db_session,
callback_data=callback_data,
app=self,
user=user,
@@ -236,3 +237,52 @@ class QBotApp(Generic[UserType, ConfigType], FastAPI):
i18n=self.i18n,
app_state=app_state,
)
async def execute_command(
self,
app_state: State,
command: str,
user_id: int,
db_session: AsyncSession = None,
):
state = self.dp.fsm.get_context(bot=self.bot, chat_id=user_id, user_id=user_id)
state_data = await state.get_data()
callback_data = ContextData(
command=CallbackCommand.USER_COMMAND,
user_command=command,
)
command_name = command.split("&")[0]
cmd = self.bot_commands.get(command_name)
if not db_session:
db_session = await get_db()
user = await self.user_class.get(
session=db_session,
id=user_id,
)
if cmd is None:
return
if cmd.clear_navigation:
state_data.pop("navigation_stack", None)
state_data.pop("navigation_context", None)
if cmd.register_navigation:
clear_state(state_data=state_data)
save_navigation_context(callback_data=callback_data, state_data=state_data)
with self.i18n.context(), self.i18n.use_locale(user.lang.value):
await command_handler(
message=None,
cmd=cmd,
db_session=db_session,
callback_data=callback_data,
app=self,
user=user,
state=state,
state_data=state_data,
i18n=self.i18n,
app_state=app_state,
)

View File

@@ -135,6 +135,13 @@ class EnumMember(object):
def __hash__(self):
return hash(self.value)
def __lt__(self, other: Self | str | Any | None) -> bool:
if isinstance(other, str):
return self.value < other
if isinstance(other, EnumMember):
return self.value < other.value
return False
def localized(self, lang: str = None) -> str:
if self.loc_obj:
if not lang:

View File

@@ -122,7 +122,7 @@ class _BaseFieldDescriptor:
localizable: bool = False
bool_false_value: str | LazyProxy = "no"
bool_true_value: str | LazyProxy = "yes"
ep_form: str | None = None
ep_form: str | Callable[["BotContext"], str] | None = None
ep_parent_field: str | None = None
ep_child_field: str | None = None
dt_type: Literal["date", "datetime"] = "date"
@@ -195,9 +195,20 @@ class _BaseEntityDescriptor:
EntityPermission.DELETE_ALL: [RoleBase.SUPER_USER],
}
)
before_create: Callable[["BotContext"], Union[bool, str]] | None = None
before_create_save: (
Callable[["BotEntity", "BotContext"], Union[bool, str]] | None
) = None
before_update_save: (
Callable[[dict[str, Any], dict[str, Any], "BotContext"], Union[bool, str]]
| None
) = None
before_delete: Callable[["BotEntity", "BotContext"], Union[bool, str]] | None = None
on_created: Callable[["BotEntity", "BotContext"], None] | None = None
on_deleted: Callable[["BotEntity", "BotContext"], None] | None = None
on_updated: Callable[["BotEntity", "BotContext"], None] | None = None
on_updated: Callable[[dict[str, Any], "BotEntity", "BotContext"], None] | None = (
None
)
@dataclass(kw_only=True)
@@ -220,6 +231,7 @@ class CommandCallbackContext[UT: UserBase]:
)
message_text: str | None = None
register_navigation: bool = True
clear_navigation: bool = False
message: Message | CallbackQuery
callback_data: ContextData
db_session: AsyncSession

View File

@@ -2,9 +2,12 @@ from babel.support import LazyProxy
from inspect import iscoroutinefunction, signature
from aiogram.types import Message, CallbackQuery
from aiogram.utils.i18n import I18n
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import Any, TYPE_CHECKING, Callable
import ujson as json
from quickbot.utils.serialization import deserialize
from ..model.bot_entity import BotEntity
from ..model.bot_enum import BotEnum
from ..model.settings import Settings
@@ -16,6 +19,7 @@ from ..model.descriptors import (
EntityDescriptor,
EntityPermission,
_BaseFieldDescriptor,
Filter,
)
from ..bot.handlers.context import CallbackCommand, ContextData, CommandContext
@@ -330,3 +334,34 @@ async def build_field_sequence(
field_sequence.append(fd.field_name)
return field_sequence
async def prepare_static_filter(
db_session: AsyncSession,
entity_descriptor: EntityDescriptor,
static_filters: list[Filter],
params: list[str],
) -> list[Filter]:
return (
[
Filter(
field_name=f.field_name,
operator=f.operator,
value_type="const",
value=(
f.value
if f.value_type == "const"
else await deserialize(
session=db_session,
type_=entity_descriptor.fields_descriptors[
f.field_name
].type_base,
value=params[f.param_index],
)
),
)
for f in static_filters
]
if static_filters
else None
)