type hinting enhancements, bugfix in user commands handler
Some checks failed
Build Docs / changes (push) Failing after 2s
Build Docs / build-docs (push) Has been skipped
Build Docs / deploy-docs (push) Has been skipped

This commit is contained in:
Alexander Kalinovsky
2025-05-12 18:21:30 +07:00
parent dcacd31bbc
commit eb57a4ff78
9 changed files with 201 additions and 174 deletions

View File

@@ -1,5 +1,5 @@
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, Depends, Request, Response from fastapi import APIRouter, Depends, Request, Response, BackgroundTasks
from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel.ext.asyncio.session import AsyncSession
from ..main import QBotApp from ..main import QBotApp
@@ -15,7 +15,9 @@ router = APIRouter()
@router.post("/webhook") @router.post("/webhook")
async def telegram_webhook( async def telegram_webhook(
db_session: Annotated[AsyncSession, Depends(get_db)], request: Request db_session: Annotated[AsyncSession, Depends(get_db)],
request: Request,
background_tasks: BackgroundTasks,
): ):
logger.debug("Webhook request %s", await request.json()) logger.debug("Webhook request %s", await request.json())
app: QBotApp = request.app app: QBotApp = request.app
@@ -29,14 +31,12 @@ async def telegram_webhook(
except Exception: except Exception:
logger.error("Invalid request", exc_info=True) logger.error("Invalid request", exc_info=True)
return Response(status_code=400) return Response(status_code=400)
try: background_tasks.add_task(
await app.dp.feed_webhook_update( app.dp.feed_webhook_update,
app.bot, bot=app.bot,
update, update=update,
db_session=db_session, db_session=db_session,
app=app, app=app,
app_state=request.state, app_state=request.state,
) )
except Exception:
logger.error("Error processing update", exc_info=True)
return Response(status_code=200) return Response(status_code=200)

View File

@@ -95,13 +95,39 @@ async def render_entity_picker(
page_size = await Settings.get(Settings.PAGE_SIZE) page_size = await Settings.get(Settings.PAGE_SIZE)
form_list = None form_list = None
context = BotContext(
db_session=db_session,
app=kwargs["app"],
app_state=kwargs["app_state"],
user=user,
message=message,
)
if issubclass(type_, BotEnum): if issubclass(type_, BotEnum):
items_count = len(type_.all_members) items_count = len(type_.all_members)
total_pages = calc_total_pages(items_count, page_size) total_pages = calc_total_pages(items_count, page_size)
page = min(page, total_pages) page = min(page, total_pages)
enum_items = list(type_.all_members.values())[ if isinstance(field_descriptor.options, list):
page_size * (page - 1) : page_size * page enum_items = field_descriptor.options[
] page_size * (page - 1) : page_size * page
]
elif callable(field_descriptor.options):
entity = None
if callback_data.entity_id:
entity = await field_descriptor.entity_descriptor.type_.get(
session=db_session, id=callback_data.entity_id
)
if iscoroutinefunction(field_descriptor.options):
enum_items = (await field_descriptor.options(entity, context)) or []
else:
enum_items = field_descriptor.options(entity, context) or []
enum_items = enum_items[
page_size * (page - 1) : page_size * page
]
else:
enum_items = list(type_.all_members.values())[
page_size * (page - 1) : page_size * page
]
items = [ items = [
{ {
"text": f"{'' if not is_list else '【✔︎】 ' if item in (current_value or []) else '【 】 '}{item.localized(user.lang)}", "text": f"{'' if not is_list else '【✔︎】 ' if item in (current_value or []) else '【 】 '}{item.localized(user.lang)}",
@@ -115,14 +141,6 @@ async def render_entity_picker(
if field_descriptor.ep_form: if field_descriptor.ep_form:
if callable(field_descriptor.ep_form): if callable(field_descriptor.ep_form):
context = BotContext(
db_session=db_session,
app=kwargs["app"],
app_state=kwargs["app_state"],
user=user,
message=message,
)
if iscoroutinefunction(field_descriptor.ep_form): if iscoroutinefunction(field_descriptor.ep_form):
ep_form = await field_descriptor.ep_form(context) ep_form = await field_descriptor.ep_form(context)
else: else:
@@ -207,14 +225,6 @@ async def render_entity_picker(
page = 1 page = 1
entity_items = list[BotEntity]() entity_items = list[BotEntity]()
context = BotContext(
db_session=db_session,
app=kwargs["app"],
app_state=kwargs["app_state"],
user=user,
message=message,
)
items = [ items = [
{ {
"text": f"{ "text": f"{
@@ -306,14 +316,6 @@ async def render_entity_picker(
state_data = kwargs["state_data"] state_data = kwargs["state_data"]
context = BotContext(
db_session=db_session,
app=kwargs["app"],
app_state=kwargs["app_state"],
user=user,
message=message,
)
await wrap_editor( await wrap_editor(
keyboard_builder=keyboard_builder, keyboard_builder=keyboard_builder,
field_descriptor=field_descriptor, field_descriptor=field_descriptor,

View File

@@ -76,7 +76,7 @@ async def wrap_editor(
) )
) )
if field_descriptor.is_optional: if field_descriptor.is_optional and field_descriptor.show_skip_in_editor == "Auto":
btns.append( btns.append(
InlineKeyboardButton( InlineKeyboardButton(
text=(await Settings.get(Settings.APP_STRINGS_SKIP_BTN)), text=(await Settings.get(Settings.APP_STRINGS_SKIP_BTN)),

View File

@@ -14,6 +14,7 @@ from ....model.descriptors import (
InlineButton, InlineButton,
BotContext, BotContext,
) )
from ....model.bot_entity import BotEntity
from ....model.settings import Settings from ....model.settings import Settings
from ....model.user import UserBase from ....model.user import UserBase
from ....model import EntityPermission from ....model import EntityPermission
@@ -103,8 +104,15 @@ async def entity_item(
app_state=kwargs["app_state"], app_state=kwargs["app_state"],
user=user, user=user,
message=query, message=query,
default_handler=item_repr
) )
if form.before_open:
if iscoroutinefunction(form.before_open):
await form.before_open(entity_item, context)
else:
form.before_open(entity_item, context)
if form.form_buttons: if form.form_buttons:
for edit_buttons_row in form.form_buttons: for edit_buttons_row in form.form_buttons:
btn_row = [] btn_row = []
@@ -113,7 +121,14 @@ async def entity_item(
continue continue
if isinstance(button, FieldEditButton) and can_edit: if isinstance(button, FieldEditButton) and can_edit:
field_name = button.field_name if isinstance(button.field, str):
field_name = button.field
else:
field_name = button.field(entity_descriptor.type_).key
for fd in entity_descriptor.fields_descriptors.values():
if fd.field_name == field_name:
field_name = fd.name
break
btn_caption = button.caption btn_caption = button.caption
if field_name in entity_descriptor.fields_descriptors: if field_name in entity_descriptor.fields_descriptors:
field_descriptor = entity_descriptor.fields_descriptors[ field_descriptor = entity_descriptor.fields_descriptors[
@@ -261,90 +276,7 @@ async def entity_item(
entity=entity_item, entity=entity_item,
) )
else: else:
entity_caption = ( item_text = await item_repr(entity_item=entity_item, context=context)
await get_callable_str(
callable_str=entity_descriptor.full_name,
context=context,
descriptor=entity_descriptor,
)
if entity_descriptor.full_name
else entity_descriptor.name
)
entity_item_repr = (
await get_callable_str(
callable_str=entity_descriptor.item_repr,
context=context,
entity=entity_item,
)
if entity_descriptor.item_repr
else str(entity_item.id)
)
item_text = f"<b><u><i>{entity_caption or entity_descriptor.name}:</i></u></b> <b>{entity_item_repr}</b>"
user_permissions = get_user_permissions(user, entity_descriptor)
for field_descriptor in entity_descriptor.fields_descriptors.values():
if (
isinstance(field_descriptor.is_visible, bool)
and not field_descriptor.is_visible
):
continue
if callable(field_descriptor.is_visible):
if iscoroutinefunction(field_descriptor.is_visible):
field_visible = await field_descriptor.is_visible(
field_descriptor, entity_item, context
)
else:
field_visible = field_descriptor.is_visible(
field_descriptor, entity_item, context
)
if not field_visible:
continue
skip = False
for own_field in entity_descriptor.ownership_fields.items():
if (
own_field[1].rstrip("_id")
== field_descriptor.field_name.rstrip("_id")
and own_field[0] in user.roles
and EntityPermission.READ_ALL not in user_permissions
):
skip = True
break
if skip:
continue
if field_descriptor.caption_value:
item_text += f"\n{
await get_callable_str(
callable_str=field_descriptor.caption_value,
context=context,
descriptor=field_descriptor,
entity=entity_item,
)
}"
else:
field_caption = (
await get_callable_str(
callable_str=field_descriptor.caption,
context=context,
descriptor=field_descriptor,
)
if field_descriptor.caption
else field_descriptor.field_name
)
value = await get_value_repr(
value=getattr(entity_item, field_descriptor.field_name),
field_descriptor=field_descriptor,
context=context,
locale=user.lang,
)
item_text += f"\n{field_caption or field_descriptor.name}:{f' <b>{value}</b>' if value else ''}"
context = pop_navigation_context(navigation_stack) context = pop_navigation_context(navigation_stack)
if context: if context:
@@ -368,3 +300,93 @@ async def entity_item(
text=item_text, text=item_text,
reply_markup=keyboard_builder.as_markup(), reply_markup=keyboard_builder.as_markup(),
) )
async def item_repr(entity_item: BotEntity, context: BotContext[UserBase]):
entity_descriptor = entity_item.bot_entity_descriptor
user = context.user
entity_caption = (
await get_callable_str(
callable_str=entity_descriptor.full_name,
context=context,
descriptor=entity_descriptor,
)
if entity_descriptor.full_name
else entity_descriptor.name
)
entity_item_repr = (
await get_callable_str(
callable_str=entity_descriptor.item_repr,
context=context,
entity=entity_item,
)
if entity_descriptor.item_repr
else str(entity_item.id)
)
item_text = f"<b><u><i>{entity_caption or entity_descriptor.name}:</i></u></b> <b>{entity_item_repr}</b>"
user_permissions = get_user_permissions(user, entity_descriptor)
for field_descriptor in entity_descriptor.fields_descriptors.values():
if (
isinstance(field_descriptor.is_visible, bool)
and not field_descriptor.is_visible
):
continue
if callable(field_descriptor.is_visible):
if iscoroutinefunction(field_descriptor.is_visible):
field_visible = await field_descriptor.is_visible(
field_descriptor, entity_item, context
)
else:
field_visible = field_descriptor.is_visible(
field_descriptor, entity_item, context
)
if not field_visible:
continue
skip = False
for own_field in entity_descriptor.ownership_fields.items():
if (
own_field[1].rstrip("_id")
== field_descriptor.field_name.rstrip("_id")
and own_field[0] in user.roles
and EntityPermission.READ_ALL not in user_permissions
):
skip = True
break
if skip:
continue
if field_descriptor.caption_value:
item_text += f"\n{
await get_callable_str(
callable_str=field_descriptor.caption_value,
context=context,
descriptor=field_descriptor,
entity=entity_item,
)
}"
else:
field_caption = (
await get_callable_str(
callable_str=field_descriptor.caption,
context=context,
descriptor=field_descriptor,
)
if field_descriptor.caption
else field_descriptor.field_name
)
value = await get_value_repr(
value=getattr(entity_item, field_descriptor.field_name),
field_descriptor=field_descriptor,
context=context,
locale=user.lang,
)
item_text += f"\n{field_caption or field_descriptor.name}:{f' <b>{value}</b>' if value else ''}"
return item_text

View File

@@ -19,12 +19,18 @@ router = Router()
@router.message(CommandStart()) @router.message(CommandStart())
async def start(message: Message, **kwargs): async def start(message: Message, **kwargs):
app: QBotApp = kwargs["app"] app: QBotApp = kwargs["app"]
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
clear_state(state_data=state_data, clear_nav=True)
if app.start_handler: if app.start_handler:
await app.start_handler(default_start_handler, message, **kwargs) await app.start_handler(default_start_handler, message, **kwargs)
else: else:
await default_start_handler(message, **kwargs) await default_start_handler(message, **kwargs)
await state.set_data(state_data)
async def default_start_handler[UserType: UserBase]( async def default_start_handler[UserType: UserBase](
message: Message, message: Message,
@@ -33,8 +39,6 @@ async def default_start_handler[UserType: UserBase](
state: FSMContext, state: FSMContext,
**kwargs, **kwargs,
) -> tuple[UserType, bool]: ) -> tuple[UserType, bool]:
state_data = await state.get_data()
clear_state(state_data=state_data, clear_nav=True)
User = app.user_class User = app.user_class

View File

@@ -94,16 +94,13 @@ async def command_handler(message: Message | CallbackQuery, cmd: BotCommand, **k
if message: if message:
send_message = get_send_message(message) send_message = get_send_message(message)
if isinstance(message, CallbackQuery):
message = message.message
if callback_context.message_text: if callback_context.message_text:
await send_message( await send_message(
text=callback_context.message_text, text=callback_context.message_text,
reply_markup=callback_context.keyboard_builder.as_markup(), reply_markup=callback_context.keyboard_builder.as_markup(),
) )
else: elif isinstance(message, CallbackQuery):
await message.edit_reply_markup( await message.message.edit_reply_markup(
reply_markup=callback_context.keyboard_builder.as_markup() reply_markup=callback_context.keyboard_builder.as_markup()
) )
else: else:

View File

@@ -93,6 +93,8 @@ class BotEntityMetaclass(SQLModelMetaclass):
attribute_value.default_factory attribute_value.default_factory
) )
descriptor_kwargs.pop("__orig_class__", None)
if sm_descriptor: if sm_descriptor:
namespace[annotation] = sm_descriptor namespace[annotation] = sm_descriptor
else: else:
@@ -167,6 +169,7 @@ class BotEntityMetaclass(SQLModelMetaclass):
entity_descriptor = namespace.pop("bot_entity_descriptor") entity_descriptor = namespace.pop("bot_entity_descriptor")
descriptor_kwargs: dict = entity_descriptor.__dict__.copy() descriptor_kwargs: dict = entity_descriptor.__dict__.copy()
descriptor_name = descriptor_kwargs.pop("name", None) descriptor_name = descriptor_kwargs.pop("name", None)
descriptor_kwargs.pop("__orig_class__", None)
descriptor_name = descriptor_name or name.lower() descriptor_name = descriptor_name or name.lower()
namespace["bot_entity_descriptor"] = EntityDescriptor( namespace["bot_entity_descriptor"] = EntityDescriptor(
name=descriptor_name, name=descriptor_name,

View File

@@ -93,9 +93,10 @@ class EnumMember(object):
if args.__len__() == 0: if args.__len__() == 0:
return list(cls.all_members.values())[0] return list(cls.all_members.values())[0]
if args.__len__() == 1 and isinstance(args[0], str): if args.__len__() == 1 and isinstance(args[0], str):
return {member.value: member for key, member in cls.all_members.items()}[ for key, member in cls.all_members.items():
args[0] if member.value == args[0]:
] return member
return None
elif args.__len__() == 1: elif args.__len__() == 1:
return {member.value: member for key, member in cls.all_members.items()}[ return {member.value: member for key, member in cls.all_members.items()}[
args[0].value args[0].value

View File

@@ -7,6 +7,7 @@ from babel.support import LazyProxy
from dataclasses import dataclass, field from dataclasses import dataclass, field
from fastapi.datastructures import State from fastapi.datastructures import State
from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.orm import InstrumentedAttribute
from .role import RoleBase from .role import RoleBase
from . import EntityPermission from . import EntityPermission
@@ -23,26 +24,25 @@ if TYPE_CHECKING:
@dataclass @dataclass
class FieldEditButton: class FieldEditButton[T: "BotEntity"]:
field_name: str field: str | Callable[[type[T]], InstrumentedAttribute]
caption: str | LazyProxy | Callable[["BotEntity", "BotContext"], str] | None = None caption: str | LazyProxy | Callable[[T, "BotContext"], str] | None = None
visibility: Callable[["BotEntity", "BotContext"], bool] | None = None visibility: Callable[[T, "BotContext"], bool] | None = None
@dataclass @dataclass
class CommandButton: class CommandButton[T: "BotEntity"]:
command: ContextData | Callable[["BotEntity", "BotContext"], ContextData] | str command: ContextData | Callable[[T, "BotContext"], ContextData] | str
caption: str | LazyProxy | Callable[["BotEntity", "BotContext"], str] | None = None caption: str | LazyProxy | Callable[[T, "BotContext"], str] | None = None
visibility: Callable[["BotEntity", "BotContext"], bool] | None = None visibility: Callable[[T, "BotContext"], bool] | None = None
@dataclass @dataclass
class InlineButton: class InlineButton[T: "BotEntity"]:
inline_button: ( inline_button: (
InlineKeyboardButton InlineKeyboardButton | Callable[[T, "BotContext"], InlineKeyboardButton]
| Callable[["BotEntity", "BotContext"], InlineKeyboardButton]
) )
visibility: Callable[["BotEntity", "BotContext"], bool] | None = None visibility: Callable[[T, "BotContext"], bool] | None = None
@dataclass @dataclass
@@ -69,11 +69,11 @@ class Filter:
@dataclass @dataclass
class EntityList: class EntityList[T: "BotEntity"]:
caption: ( caption: (
str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None
) = None ) = None
item_repr: Callable[["BotEntity", "BotContext"], str] | None = None item_repr: Callable[[T, "BotContext"], str] | None = None
show_add_new_button: bool = True show_add_new_button: bool = True
item_form: str | None = None item_form: str | None = None
pagination: bool = True pagination: bool = True
@@ -84,16 +84,17 @@ class EntityList:
@dataclass @dataclass
class EntityForm: class EntityForm[T: "BotEntity"]:
item_repr: Callable[["BotEntity", "BotContext"], str] | None = None item_repr: Callable[[T, "BotContext"], str] | None = None
edit_field_sequence: list[str] = None edit_field_sequence: list[str] = None
form_buttons: list[list[FieldEditButton | CommandButton | InlineButton]] = None form_buttons: list[list[FieldEditButton | CommandButton | InlineButton]] = None
show_edit_button: bool = True show_edit_button: bool = True
show_delete_button: bool = True show_delete_button: bool = True
before_open: Callable[[T, "BotContext"], None] | None = None
@dataclass(kw_only=True) @dataclass(kw_only=True)
class _BaseFieldDescriptor: class _BaseFieldDescriptor[T: "BotEntity"]:
icon: str = None icon: str = None
caption: ( caption: (
str | LazyProxy | Callable[["FieldDescriptor", "BotContext"], str] | None str | LazyProxy | Callable[["FieldDescriptor", "BotContext"], str] | None
@@ -104,19 +105,17 @@ class _BaseFieldDescriptor:
edit_prompt: ( edit_prompt: (
str str
| LazyProxy | LazyProxy
| Callable[["FieldDescriptor", Union["BotEntity", Any], "BotContext"], str] | Callable[["FieldDescriptor", Union[T, Any], "BotContext"], str]
| None | None
) = None ) = None
caption_value: ( caption_value: (
Callable[["FieldDescriptor", Union["BotEntity", Any], "BotContext"], str] | None Callable[["FieldDescriptor", Union[T, Any], "BotContext"], str] | None
) = None
is_visible: (
bool | Callable[["FieldDescriptor", "BotEntity", "BotContext"], bool] | None
) = None ) = None
is_visible: bool | Callable[["FieldDescriptor", T, "BotContext"], bool] | None = (
None
)
is_visible_in_edit_form: ( is_visible_in_edit_form: (
bool bool | Callable[["FieldDescriptor", Union[T, Any], "BotContext"], bool] | None
| Callable[["FieldDescriptor", Union["BotEntity", Any], "BotContext"], bool]
| None
) = None ) = None
validator: Callable[[Any, "BotContext"], Union[bool, str]] | None = None validator: Callable[[Any, "BotContext"], Union[bool, str]] | None = None
localizable: bool = False localizable: bool = False
@@ -126,12 +125,14 @@ class _BaseFieldDescriptor:
ep_parent_field: str | None = None ep_parent_field: str | None = None
ep_child_field: str | None = None ep_child_field: str | None = None
dt_type: Literal["date", "datetime"] = "date" dt_type: Literal["date", "datetime"] = "date"
options: list[Any] | Callable[[T, "BotContext"], list[Any]] | None = None
show_skip_in_editor: Literal[False, "Auto"] = "Auto"
default: Any = None default: Any = None
default_factory: Callable[[], Any] | None = None default_factory: Callable[[], Any] | None = None
@dataclass(kw_only=True) @dataclass(kw_only=True)
class EntityField(_BaseFieldDescriptor): class EntityField[T: "BotEntity"](_BaseFieldDescriptor[T]):
name: str | None = None name: str | None = None
sm_descriptor: Any = None sm_descriptor: Any = None
@@ -142,7 +143,7 @@ class Setting(_BaseFieldDescriptor):
@dataclass(kw_only=True) @dataclass(kw_only=True)
class FormField(_BaseFieldDescriptor): class FormField[T: "BotEntity"](_BaseFieldDescriptor[T]):
name: str | None = None name: str | None = None
type_: type type_: type
@@ -163,7 +164,7 @@ class FieldDescriptor(_BaseFieldDescriptor):
@dataclass(kw_only=True) @dataclass(kw_only=True)
class _BaseEntityDescriptor: class _BaseEntityDescriptor[T: "BotEntity"]:
icon: str = "📘" icon: str = "📘"
full_name: ( full_name: (
str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None
@@ -174,7 +175,7 @@ class _BaseEntityDescriptor:
description: ( description: (
str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None str | LazyProxy | Callable[["EntityDescriptor", "BotContext"], str] | None
) = None ) = None
item_repr: Callable[["BotEntity", "BotContext"], str] | None = None item_repr: Callable[[T, "BotContext"], str] | None = None
default_list: EntityList = field(default_factory=EntityList) default_list: EntityList = field(default_factory=EntityList)
default_form: EntityForm = field(default_factory=EntityForm) default_form: EntityForm = field(default_factory=EntityForm)
lists: dict[str, EntityList] = field(default_factory=dict[str, EntityList]) lists: dict[str, EntityList] = field(default_factory=dict[str, EntityList])
@@ -196,23 +197,19 @@ class _BaseEntityDescriptor:
} }
) )
before_create: Callable[["BotContext"], Union[bool, str]] | None = None before_create: Callable[["BotContext"], Union[bool, str]] | None = None
before_create_save: ( before_create_save: Callable[[T, "BotContext"], Union[bool, str]] | None = None
Callable[["BotEntity", "BotContext"], Union[bool, str]] | None
) = None
before_update_save: ( before_update_save: (
Callable[[dict[str, Any], dict[str, Any], "BotContext"], Union[bool, str]] Callable[[dict[str, Any], dict[str, Any], "BotContext"], Union[bool, str]]
| None | None
) = None ) = None
before_delete: Callable[["BotEntity", "BotContext"], Union[bool, str]] | None = None before_delete: Callable[[T, "BotContext"], Union[bool, str]] | None = None
on_created: Callable[["BotEntity", "BotContext"], None] | None = None on_created: Callable[[T, "BotContext"], None] | None = None
on_deleted: Callable[["BotEntity", "BotContext"], None] | None = None on_deleted: Callable[[T, "BotContext"], None] | None = None
on_updated: Callable[[dict[str, Any], "BotEntity", "BotContext"], None] | None = ( on_updated: Callable[[dict[str, Any], T, "BotContext"], None] | None = None
None
)
@dataclass(kw_only=True) @dataclass(kw_only=True)
class Entity(_BaseEntityDescriptor): class Entity[T: "BotEntity"](_BaseEntityDescriptor[T]):
name: str | None = None name: str | None = None
@@ -252,6 +249,7 @@ class BotContext[UT: UserBase]:
app_state: State app_state: State
user: UT user: UT
message: Message | CallbackQuery | None = None message: Message | CallbackQuery | None = None
default_handler: Callable[["BotEntity", "BotContext"], None] | None = None
@dataclass(kw_only=True) @dataclass(kw_only=True)