diff --git a/bot/handlers/common/__init__.py b/bot/handlers/common/__init__.py
index f91624b..9de333f 100644
--- a/bot/handlers/common/__init__.py
+++ b/bot/handlers/common/__init__.py
@@ -1,24 +1,35 @@
from types import NoneType, UnionType
+from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
+from inspect import signature
from sqlmodel.ext.asyncio.session import AsyncSession
-from typing import Any, get_args, get_origin
+from typing import Any, get_args, get_origin, TYPE_CHECKING
import ujson as json
+
from ..context import ContextData, CallbackCommand, CommandContext
+from ...command_context_filter import CallbackCommandFilter
from ....model.user import UserBase
from ....model.settings import Settings
-from ....main import QBotApp
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
+from ....model.view_setting import ViewSetting
+from ....utils import get_local_text, deserialize
from ....model.descriptors import (EntityFieldDescriptor,
EntityDescriptor,
EntityCaptionCallable,
EntityItemCaptionCallable,
EntityFieldCaptionCallable)
+if TYPE_CHECKING:
+ from ....main import QBotApp
+
+
+router = Router()
+
def get_send_message(message: Message | CallbackQuery):
if isinstance(message, Message):
@@ -27,52 +38,43 @@ def get_send_message(message: Message | CallbackQuery):
return message.message.edit_text
-def get_local_text(text: str, lang: str):
- try:
- text_obj = json.loads(text) #@IgnoreException
- return text_obj.get(lang, text_obj[list(text_obj.keys())[0]])
- except:
- return text
+# def get_local_text(text: str, lang: str):
+# try:
+# text_obj = json.loads(text) #@IgnoreException
+# return text_obj.get(lang, text_obj[list(text_obj.keys())[0]])
+# except:
+# return text
def get_value_repr(value: Any, field_descriptor: EntityFieldDescriptor, locale: str | None = None) -> str:
- type_ = field_descriptor.type_
- origin = get_origin(type_)
+
+ type_ = field_descriptor.type_base
if value is None:
return ""
- if origin == UnionType:
- args = get_args(type_)
- if args[1] == NoneType:
- type_ = args[0]
-
if isinstance(value, bool):
return "【✔︎】" if value else "【 】"
- elif origin == list:
- arg_type = None
- args = get_args(type_)
- if args:
- arg_type = args[0]
- if arg_type and issubclass(arg_type, BotEntity):
- if locale and arg_type.bot_entity_descriptor.fields_descriptors["name"].localizable:
- return "[" + ", ".join([get_local_text(value = item.name, locale = locale) for item in value]) + "]"
+ elif field_descriptor.is_list:
+ if issubclass(type_, BotEntity):
+ if locale and type_.bot_entity_descriptor.fields_descriptors["name"].localizable:
+ return "[" + ", ".join([get_local_text(text = item.name, locale = locale) for item in value]) + "]"
else:
return "[" + ", ".join([str(item.name) for item in value]) + "]"
- elif arg_type and issubclass(arg_type, BotEnum):
+ elif issubclass(type_, BotEnum):
return "[" + ", ".join(item.localized(locale) for item in value) + "]"
- elif arg_type == str:
+ elif type_ == str:
return "[" + ", ".join([f"\"{item}\"" for item in value]) + "]"
else:
return "[" + ", ".join([str(item) for item in value]) + "]"
elif issubclass(type_, BotEntity):
if type_.bot_entity_descriptor.fields_descriptors["name"].localizable:
- return get_local_text(value = value.name, locale = locale)
+ return get_local_text(text = value.name, locale = locale)
return value.name
elif issubclass(type_, BotEnum):
return value.localized(locale)
elif isinstance(value, str):
if field_descriptor and field_descriptor.localizable:
- return get_local_text(value, locale)
+ return get_local_text(text = value, locale = locale)
return value
elif isinstance(value, int):
return str(value)
@@ -92,7 +94,13 @@ def get_callable_str(callable_str: str | LazyProxy | EntityCaptionCallable | Ent
elif isinstance(callable_str, LazyProxy):
return callable_str.value
elif callable(callable_str):
- return callable_str(*(descriptor, entity, value))
+ args = signature(callable_str).parameters
+ if len(args) == 1:
+ return callable_str(descriptor)
+ elif len(args) == 2:
+ return callable_str(descriptor, entity)
+ elif len(args) == 3:
+ return callable_str(descriptor, entity, value)
async def authorize_command(user: UserBase,
@@ -100,24 +108,24 @@ async def authorize_command(user: UserBase,
if (callback_data.command == CallbackCommand.MENU_ENTRY_PARAMETERS or
callback_data.context == CommandContext.SETTING_EDIT):
- allowed_roles = (await Settings.get(Settings.SECURITY_SETTINGS_ROLES))
+ allowed_roles = (await Settings.get(Settings.SECURITY_PARAMETERS_ROLES))
return any(role in user.roles for role in allowed_roles)
return False
-def get_entity_descriptor(app: QBotApp, callback_data: ContextData) -> EntityDescriptor | None:
+def get_entity_descriptor(app: "QBotApp", callback_data: ContextData) -> EntityDescriptor | None:
if callback_data.entity_name:
return app.entity_metadata.entity_descriptors[callback_data.entity_name]
return None
-def get_field_descriptor(app: QBotApp, callback_data: ContextData) -> EntityFieldDescriptor | None:
+def get_field_descriptor(app: "QBotApp", callback_data: ContextData) -> EntityFieldDescriptor | None:
if callback_data.context == CommandContext.SETTING_EDIT:
return Settings.list_params()[callback_data.field_name]
- elif callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]:
+ elif callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT]:
entity_descriptor = get_entity_descriptor(app, callback_data)
if entity_descriptor:
return entity_descriptor.fields_descriptors.get(callback_data.field_name)
@@ -192,4 +200,144 @@ def add_pagination_controls(keyboard_builder: InlineKeyboardBuilder,
data = str(total_pages) if page != total_pages else "skip",
save_state = True).pack()))
- keyboard_builder.row(*navigation_buttons)
\ No newline at end of file
+ keyboard_builder.row(*navigation_buttons)
+
+
+def add_filter_controls(keyboard_builder: InlineKeyboardBuilder,
+ entity_descriptor: EntityDescriptor,
+ filter: str = None,
+ page: int = 1):
+
+ field_name_descriptor = entity_descriptor.fields_descriptors["name"]
+ if field_name_descriptor.caption:
+ caption = get_callable_str(field_name_descriptor.caption, field_name_descriptor)
+ else:
+ caption = field_name_descriptor.name
+
+ keyboard_builder.row(
+ InlineKeyboardButton(
+ text = f"🔎 {caption}{f": \"{filter}\"" if filter else ""}",
+ callback_data = ContextData(
+ command = CallbackCommand.VIEW_FILTER_EDIT,
+ entity_name = entity_descriptor.name,
+ data = str(page)).pack()))
+
+
+@router.callback_query(ContextData.filter(F.command == CallbackCommand.VIEW_FILTER_EDIT))
+async def view_filter_edit(query: CallbackQuery, **kwargs):
+
+ callback_data: ContextData = kwargs["callback_data"]
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+
+ args = callback_data.data.split("&")
+ page = int(args[0])
+ cmd = None
+ if len(args) > 1:
+ cmd = args[1]
+
+ db_session: AsyncSession = kwargs["db_session"]
+ app: "QBotApp" = kwargs["app"]
+ user: UserBase = kwargs["user"]
+ entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
+
+ if cmd in ["cancel", "clear"]:
+
+ if cmd == "clear":
+ await ViewSetting.set_filter(session = db_session, user_id = user.id, entity_name = entity_descriptor.class_name, filter = None)
+
+ context_data_bak = state_data.pop("context_data_bak")
+ if context_data_bak:
+
+ state_data["context_data"] = context_data_bak
+ context_data = ContextData.unpack(context_data_bak)
+
+ field_descriptor = get_field_descriptor(app, context_data)
+ edit_prompt = state_data["edit_prompt"]
+ current_value = await deserialize(session = db_session,
+ type_ = field_descriptor.type_,
+ value = state_data["value"])
+ page = int(state_data.pop("page"))
+ kwargs.pop("callback_data")
+
+ return await render_entity_picker(field_descriptor = field_descriptor,
+ message = query,
+ callback_data = context_data,
+ current_value = current_value,
+ edit_prompt = edit_prompt,
+ page = page,
+ **kwargs)
+
+ else:
+
+ return await route_callback(message = query, back = False, **kwargs)
+
+
+ #await save_navigation_context(callback_data = callback_data, state = state)
+ old_context_data = state_data.get("context_data")
+ await state.update_data({"context_data": callback_data.pack(),
+ "context_data_bak": old_context_data,
+ "page": page})
+
+ send_message = get_send_message(query)
+
+ await send_message(text = await Settings.get(Settings.APP_STRINGS_VIEW_FILTER_EDIT_PROMPT),
+ reply_markup = InlineKeyboardBuilder().row(
+ InlineKeyboardButton(
+ text = await Settings.get(Settings.APP_STRINGS_CANCEL_BTN),
+ callback_data = ContextData(
+ command = CallbackCommand.VIEW_FILTER_EDIT,
+ entity_name = entity_descriptor.name,
+ data = f"{page}&cancel").pack()),
+ InlineKeyboardButton(
+ text = await Settings.get(Settings.APP_STRINGS_CLEAR_BTN),
+ callback_data = ContextData(
+ command = CallbackCommand.VIEW_FILTER_EDIT,
+ entity_name = entity_descriptor.name,
+ data = f"{page}&clear").pack())).as_markup())
+
+
+@router.message(CallbackCommandFilter(command = CallbackCommand.VIEW_FILTER_EDIT))
+async def view_filter_edit_input(message: Message, **kwargs):
+
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+ callback_data = ContextData.unpack(state_data["context_data"])
+ db_session: AsyncSession = kwargs["db_session"]
+ user: UserBase = kwargs["user"]
+ app: "QBotApp" = kwargs["app"]
+ entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
+ filter = message.text
+ await ViewSetting.set_filter(session = db_session, user_id = user.id, entity_name = entity_descriptor.class_name, filter = filter)
+
+ #state_data.pop("context_data")
+ #return await route_callback(message = message, back = False, **kwargs)
+
+ context_data_bak = state_data.pop("context_data_bak")
+ if context_data_bak:
+ state_data["context_data"] = context_data_bak
+ context_data = ContextData.unpack(context_data_bak)
+ field_descriptor = get_field_descriptor(app, context_data)
+ edit_prompt = state_data["edit_prompt"]
+ current_value = await deserialize(session = db_session,
+ type_ = field_descriptor.type_,
+ value = state_data["value"])
+ page = int(state_data.pop("page"))
+
+ return await render_entity_picker(field_descriptor = field_descriptor,
+ message = message,
+ callback_data = context_data,
+ current_value = current_value,
+ edit_prompt = edit_prompt,
+ page = page,
+ **kwargs)
+
+ else:
+
+ return await route_callback(message = message, back = False, **kwargs)
+
+
+from ..navigation import route_callback, save_navigation_context, clear_state, get_navigation_context
+from ..editors.entity import render_entity_picker
\ No newline at end of file
diff --git a/bot/handlers/context.py b/bot/handlers/context.py
index 2ece561..1a20275 100644
--- a/bot/handlers/context.py
+++ b/bot/handlers/context.py
@@ -16,15 +16,18 @@ class CallbackCommand(StrEnum):
SET_LANGUAGE = "ls"
DATE_PICKER_MONTH = "dm"
DATE_PICKER_YEAR = "dy"
- STRING_EDITOR_LOCALE = "sl"
+ #STRING_EDITOR_LOCALE = "sl"
ENTITY_PICKER_PAGE = "ep"
ENTITY_PICKER_TOGGLE_ITEM = "et"
+ VIEW_FILTER_EDIT = "vf"
+ USER_COMMAND = "uc"
class CommandContext(StrEnum):
SETTING_EDIT = "se"
ENTITY_CREATE = "ec"
ENTITY_EDIT = "ee"
+ ENTITY_FIELD_EDIT = "ef"
class ContextData(BaseCallbackData, prefix = "cd"):
command: CallbackCommand
@@ -32,5 +35,6 @@ class ContextData(BaseCallbackData, prefix = "cd"):
entity_name: str | None = None
entity_id: int | None = None
field_name: str | None = None
+ user_command: str | None = None
data: str | None = None
back: bool = False
diff --git a/bot/handlers/editors/__init__.py b/bot/handlers/editors/__init__.py
index eaa8877..5ee2768 100644
--- a/bot/handlers/editors/__init__.py
+++ b/bot/handlers/editors/__init__.py
@@ -1,7 +1,7 @@
from datetime import datetime
from decimal import Decimal
from types import NoneType, UnionType
-from typing import get_args, get_origin
+from typing import Union, get_args, get_origin, TYPE_CHECKING
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery
@@ -9,7 +9,6 @@ from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
import ujson as json
-from ....main import QBotApp
from ....model import EntityPermission
from ....model.bot_entity import BotEntity
from ....model.owned_bot_entity import OwnedBotEntity
@@ -21,14 +20,16 @@ from ....model.descriptors import EntityFieldDescriptor
from ....utils import deserialize, get_user_permissions, serialize
from ...command_context_filter import CallbackCommandFilter
from ..context import ContextData, CallbackCommand, CommandContext
-from ..common import (get_value_repr, authorize_command, get_callable_str,
- get_entity_descriptor, get_field_descriptor)
+
from ..menu.parameters import parameters_menu
from .string import string_editor, router as string_editor_router
from .date import date_picker, router as date_picker_router
from .boolean import bool_editor, router as bool_editor_router
from .entity import entity_picker, router as entity_picker_router
+if TYPE_CHECKING:
+ from ....main import QBotApp
+
logger = getLogger(__name__)
router = Router()
@@ -42,21 +43,22 @@ router.include_routers(
@router.callback_query(ContextData.filter(F.command == CallbackCommand.FIELD_EDITOR))
-async def settings_field_editor(message: Message | CallbackQuery, **kwargs):
+async def field_editor(message: Message | CallbackQuery, **kwargs):
callback_data: ContextData = kwargs.get("callback_data", None)
db_session: AsyncSession = kwargs["db_session"]
user: UserBase = kwargs["user"]
- app: QBotApp = kwargs["app"]
+ app: "QBotApp" = kwargs["app"]
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
entity_data = state_data.get("entity_data")
+
for key in ["current_value", "value", "locale_index"]:
if key in state_data:
state_data.pop(key)
- await state.clear()
- await state.update_data(state_data)
+
+ kwargs["state_data"] = state_data
entity_descriptor = None
@@ -69,41 +71,69 @@ async def settings_field_editor(message: Message | CallbackQuery, **kwargs):
else:
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
- stack, context = await get_navigation_context(state = state)
+ stack, context = get_navigation_context(state_data = state_data)
return await parameters_menu(message = message,
navigation_stack = stack,
**kwargs)
- current_value = await Settings.get(field_descriptor)
+ current_value = await Settings.get(field_descriptor, all_locales = True)
else:
- entity_descriptor = get_entity_descriptor(app, callback_data)
field_descriptor = get_field_descriptor(app, callback_data)
+ entity_descriptor = field_descriptor.entity_descriptor
current_value = None
+ user_permissions = get_user_permissions(user, entity_descriptor)
- if not entity_data and callback_data.context == CommandContext.ENTITY_EDIT:
- if (EntityPermission.READ_ALL in get_user_permissions(user, entity_descriptor) or
- (EntityPermission.READ in get_user_permissions(user, entity_descriptor) and
- not issubclass(entity_descriptor.type_, OwnedBotEntity)) or
- (EntityPermission.READ in get_user_permissions(user, entity_descriptor) and
- issubclass(entity_descriptor.type_, OwnedBotEntity) and
- entity_data.user_id == user.id)):
+ if field_descriptor.type_base == bool and callback_data.context == CommandContext.ENTITY_FIELD_EDIT:
+
+ entity = await entity_descriptor.type_.get(session = db_session, id = int(callback_data.entity_id))
+
+ if (EntityPermission.UPDATE_ALL in user_permissions or
+ (EntityPermission.UPDATE in user_permissions and
+ not isinstance(entity, OwnedBotEntity)) or
+ (EntityPermission.UPDATE in user_permissions and
+ isinstance(entity, OwnedBotEntity) and
+ entity.user_id == user.id)):
+
+ current_value: bool = getattr(entity, field_descriptor.field_name) or False
+ setattr(entity, field_descriptor.field_name, not current_value)
+
+ await db_session.commit()
+
+ stack, context = get_navigation_context(state_data = state_data)
+
+ return await entity_item(query = message, navigation_stack = stack, **kwargs)
+
+
+ if not entity_data and callback_data.context in [CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT]:
+
+ entity = await entity_descriptor.type_.get(session = kwargs["db_session"], id = int(callback_data.entity_id))
+
+ if (EntityPermission.READ_ALL in user_permissions or
+ (EntityPermission.READ in user_permissions and
+ not isinstance(entity, OwnedBotEntity)) or
+ (EntityPermission.READ in user_permissions and
+ isinstance(entity, OwnedBotEntity) and
+ entity.user_id == user.id)):
+
- entity = await entity_descriptor.type_.get(session = kwargs["db_session"], id = int(callback_data.entity_id))
if entity:
- entity_data = {key: serialize(getattr(entity, key), entity_descriptor.fields_descriptors[key]) for key in entity_descriptor.field_sequence}
- await state.update_data({"entity_data": entity_data})
+ entity_data = {key: serialize(getattr(entity, key), entity_descriptor.fields_descriptors[key])
+ for key in (entity_descriptor.field_sequence if callback_data.context == CommandContext.ENTITY_EDIT
+ else [callback_data.field_name])}
+ state_data.update({"entity_data": entity_data})
if entity_data:
current_value = await deserialize(session = db_session,
type_= field_descriptor.type_,
value = entity_data.get(callback_data.field_name))
-
+ kwargs.update({"field_descriptor": field_descriptor})
+
+ save_navigation_context(state_data = state_data, callback_data = callback_data)
+
await show_editor(message = message,
- field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
current_value = current_value,
**kwargs)
@@ -116,14 +146,15 @@ async def show_editor(message: Message | CallbackQuery,
user: UserBase = kwargs["user"]
callback_data: ContextData = kwargs.get("callback_data", None)
state: FSMContext = kwargs["state"]
+ state_data: dict = kwargs["state_data"]
- value_type = field_descriptor.type_
+ value_type = field_descriptor.type_base
if field_descriptor.edit_prompt:
edit_prompt = get_callable_str(field_descriptor.edit_prompt, field_descriptor, None, current_value)
else:
- if field_descriptor.caption_str:
- caption_str = get_callable_str(field_descriptor.caption_str, field_descriptor, None, current_value)
+ if field_descriptor.caption:
+ caption_str = get_callable_str(field_descriptor.caption, field_descriptor, None, current_value)
else:
caption_str = field_descriptor.name
if callback_data.context == CommandContext.ENTITY_EDIT:
@@ -135,15 +166,15 @@ async def show_editor(message: Message | CallbackQuery,
kwargs["edit_prompt"] = edit_prompt
- type_origin = get_origin(value_type)
+ # type_origin = get_origin(value_type)
- if type_origin == UnionType:
- args = get_args(value_type)
- if args[1] == NoneType:
- value_type = args[0]
+ # if type_origin in [UnionType, Union]:
+ # args = get_args(value_type)
+ # if args[1] == NoneType:
+ # value_type = args[0]
if value_type not in [int, float, Decimal, str]:
- await state.update_data({"context_data": callback_data.pack()})
+ state_data.update({"context_data": callback_data.pack()})
if value_type == str:
await string_editor(message = message, **kwargs)
@@ -157,12 +188,12 @@ async def show_editor(message: Message | CallbackQuery,
elif value_type == datetime:
await date_picker(message = message, **kwargs)
- elif type_origin == list:
- type_args = get_args(value_type)
- if type_args and issubclass(type_args[0], BotEntity) or issubclass(type_args[0], BotEnum):
- await entity_picker(message = message, **kwargs)
- else:
- await string_editor(message = message, **kwargs)
+ # elif type_origin == list:
+ # type_args = get_args(value_type)
+ # if type_args and issubclass(type_args[0], BotEntity) or issubclass(type_args[0], BotEnum):
+ # await entity_picker(message = message, **kwargs)
+ # else:
+ # await string_editor(message = message, **kwargs)
elif issubclass(value_type, BotEntity) or issubclass(value_type, BotEnum):
await entity_picker(message = message, **kwargs)
@@ -175,26 +206,22 @@ async def show_editor(message: Message | CallbackQuery,
@router.callback_query(ContextData.filter(F.command == CallbackCommand.FIELD_EDITOR_CALLBACK))
async def field_editor_callback(message: Message | CallbackQuery, **kwargs):
- callback_data: ContextData = kwargs.get("callback_data", None)
- app: QBotApp = kwargs["app"]
+ app: "QBotApp" = kwargs["app"]
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
+ kwargs["state_data"] = state_data
if isinstance(message, Message):
+ callback_data: ContextData = kwargs.get("callback_data", None)
context_data = state_data.get("context_data")
if context_data:
- context_data = ContextData.unpack(context_data)
- callback_data = context_data
+ callback_data = ContextData.unpack(context_data)
value = message.text
field_descriptor = get_field_descriptor(app, callback_data)
- base_type = field_descriptor.type_
- if get_origin(base_type) == UnionType:
- args = get_args(base_type)
- if args[1] == NoneType:
- base_type = args[0]
+ type_base = field_descriptor.type_base
- if base_type == str and field_descriptor.localizable:
+ if type_base == str and field_descriptor.localizable:
locale_index = int(state_data.get("locale_index"))
if locale_index < len(LanguageBase.all_members.values()) - 1:
@@ -209,9 +236,9 @@ async def field_editor_callback(message: Message | CallbackQuery, **kwargs):
value = {}
value[list(LanguageBase.all_members.values())[locale_index]] = message.text
- value = json.dumps(value)
+ value = json.dumps(value, ensure_ascii = False)
- await state.update_data({"value": value})
+ state_data.update({"value": value})
entity_descriptor = get_entity_descriptor(app, callback_data)
@@ -231,16 +258,20 @@ async def field_editor_callback(message: Message | CallbackQuery, **kwargs):
else:
value = {}
value[list(LanguageBase.all_members.values())[locale_index]] = message.text
- value = json.dumps(value)
+ value = json.dumps(value, ensure_ascii = False)
- elif (base_type in [int, float, Decimal]):
+ elif (type_base in [int, float, Decimal]):
try:
- _ = base_type(value) #@IgnoreException
+ _ = type_base(value) #@IgnoreException
except:
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_INVALID_INPUT)))
else:
+ callback_data: ContextData = kwargs["callback_data"]
if callback_data.data:
- value = callback_data.data
+ if callback_data.data == "skip":
+ value = None
+ else:
+ value = callback_data.data
else:
value = state_data.get("value")
field_descriptor = get_field_descriptor(app, callback_data)
@@ -258,13 +289,14 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
user: UserBase = kwargs["user"]
db_session: AsyncSession = kwargs["db_session"]
callback_data: ContextData = kwargs.get("callback_data", None)
- state: FSMContext = kwargs["state"]
+ # state: FSMContext = kwargs["state"]
+ state_data: dict = kwargs["state_data"]
value = kwargs["value"]
field_descriptor: EntityFieldDescriptor = kwargs["field_descriptor"]
if callback_data.context == CommandContext.SETTING_EDIT:
- await clear_state(state = state)
+ # clear_state(state_data = state_data)
if callback_data.data != "cancel":
if await authorize_command(user = user, callback_data = callback_data):
@@ -273,28 +305,31 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
else:
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
- stack, context = await get_navigation_context(state = state)
+ # stack, context = get_navigation_context(state_data = state_data)
- return await parameters_menu(message = message,
- navigation_stack = stack,
- **kwargs)
+ return await route_callback(message = message, back = True, **kwargs)
+
+ # return await parameters_menu(message = message,
+ # navigation_stack = stack,
+ # **kwargs)
- elif callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]:
+ elif callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT]:
- app: QBotApp = kwargs["app"]
+ app: "QBotApp" = kwargs["app"]
entity_descriptor = get_entity_descriptor(app, callback_data)
field_sequence = entity_descriptor.field_sequence
- current_index = field_sequence.index(callback_data.field_name)
+ current_index = (field_sequence.index(callback_data.field_name)
+ if callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT] else 0)
- state_data = await state.get_data()
entity_data = state_data.get("entity_data", {})
- if current_index < len(field_sequence) - 1:
+ if (callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT] and
+ current_index < len(field_sequence) - 1):
entity_data[field_descriptor.field_name] = value
- await state.update_data({"entity_data": entity_data})
+ state_data.update({"entity_data": entity_data})
next_field_name = field_sequence[current_index + 1]
next_field_descriptor = entity_descriptor.fields_descriptors[next_field_name]
@@ -319,7 +354,7 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
if ((callback_data.context == CommandContext.ENTITY_CREATE and
EntityPermission.CREATE not in user_permissions and
EntityPermission.CREATE_ALL not in user_permissions) or
- (callback_data.context == CommandContext.ENTITY_EDIT and
+ (callback_data.context in [CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT] and
EntityPermission.UPDATE not in user_permissions and
EntityPermission.UPDATE_ALL not in user_permissions)):
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
@@ -341,13 +376,20 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
obj_in = entity_type(**deser_entity_data),
commit = True)
- await save_navigation_context(state = state, callback_data = ContextData(
+ state_data["navigation_context"] = ContextData(
command = CallbackCommand.ENTITY_ITEM,
entity_name = entity_descriptor.name,
- entity_id = str(new_entity.id)
- ))
+ entity_id = str(new_entity.id)).pack()
+
+ state_data.update(state_data)
- elif callback_data.context == CommandContext.ENTITY_EDIT:
+ # await save_navigation_context(state = state, callback_data = ContextData(
+ # command = CallbackCommand.ENTITY_ITEM,
+ # entity_name = entity_descriptor.name,
+ # entity_id = str(new_entity.id)
+ # ))
+
+ elif callback_data.context in [CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT]:
entity_id = int(callback_data.entity_id)
entity = await entity_type.get(session = db_session, id = entity_id)
@@ -363,9 +405,12 @@ async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs
await db_session.commit()
- await clear_state(state = state)
+ clear_state(state_data = state_data)
- await route_callback(message = message, back = False, **kwargs)
+ await route_callback(message = message, back = True, **kwargs)
-from ..navigation import get_navigation_context, route_callback, clear_state, save_navigation_context
\ No newline at end of file
+from ..common import (get_value_repr, authorize_command, get_callable_str,
+ get_entity_descriptor, get_field_descriptor)
+from ..navigation import get_navigation_context, route_callback, clear_state, save_navigation_context, pop_navigation_context
+from ..forms.entity_form import entity_item
\ No newline at end of file
diff --git a/bot/handlers/editors/boolean.py b/bot/handlers/editors/boolean.py
index 1099465..9dd70c8 100644
--- a/bot/handlers/editors/boolean.py
+++ b/bot/handlers/editors/boolean.py
@@ -17,22 +17,21 @@ router = Router()
async def bool_editor(message: Message | CallbackQuery,
edit_prompt: str,
- entity_descriptor: EntityDescriptor,
field_descriptor: EntityFieldDescriptor,
callback_data: ContextData,
**kwargs):
keyboard_builder = InlineKeyboardBuilder()
- if isinstance(field_descriptor.bool_true_value_btn, LazyProxy):
- true_caption = field_descriptor.bool_true_value_btn.value
+ if isinstance(field_descriptor.bool_true_value, LazyProxy):
+ true_caption = field_descriptor.bool_true_value.value
else:
- true_caption = field_descriptor.bool_true_value_btn
+ true_caption = field_descriptor.bool_true_value
- if isinstance(field_descriptor.bool_false_value_btn, LazyProxy):
- false_caption = field_descriptor.bool_false_value_btn.value
+ if isinstance(field_descriptor.bool_false_value, LazyProxy):
+ false_caption = field_descriptor.bool_false_value.value
else:
- false_caption = field_descriptor.bool_false_value_btn
+ false_caption = field_descriptor.bool_false_value
keyboard_builder.row(
InlineKeyboardButton(text = true_caption,
@@ -55,11 +54,15 @@ async def bool_editor(message: Message | CallbackQuery,
save_state = True).pack())
)
+ state_data = kwargs["state_data"]
+
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
callback_data = callback_data,
- state = kwargs["state"])
+ state_data = state_data)
+
+ state: FSMContext = kwargs["state"]
+ await state.set_data(state_data)
send_message = get_send_message(message)
diff --git a/bot/handlers/editors/common.py b/bot/handlers/editors/common.py
index 48f8a11..3f405a2 100644
--- a/bot/handlers/editors/common.py
+++ b/bot/handlers/editors/common.py
@@ -7,21 +7,24 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ....model.settings import Settings
from ..context import ContextData, CallbackCommand, CommandContext
-from ..navigation import get_navigation_context
+from ..navigation import get_navigation_context, pop_navigation_context
async def wrap_editor(keyboard_builder: InlineKeyboardBuilder,
field_descriptor: EntityFieldDescriptor,
- entity_descriptor: EntityDescriptor,
callback_data: ContextData,
- state: FSMContext):
+ state_data: dict):
- if callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]:
+ if callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT, CommandContext.ENTITY_FIELD_EDIT]:
btns = []
- field_index = entity_descriptor.field_sequence.index(field_descriptor.name)
+ entity_descriptor = field_descriptor.entity_descriptor
+ field_index = (entity_descriptor.field_sequence.index(field_descriptor.name)
+ if callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]
+ else 0)
- stack, context = await get_navigation_context(state)
+ stack, context = get_navigation_context(state_data = state_data)
+ context = pop_navigation_context(stack)
if field_index > 0:
btns.append(InlineKeyboardButton(
@@ -31,21 +34,18 @@ async def wrap_editor(keyboard_builder: InlineKeyboardBuilder,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
- field_name = entity_descriptor.field_sequence[field_index - 1],
- save_state = True).pack()))
+ field_name = entity_descriptor.field_sequence[field_index - 1]).pack()))
- if get_origin(field_descriptor.type_) == UnionType:
- args = get_args(field_descriptor.type_)
- if args[1] == NoneType:
- btns.append(InlineKeyboardButton(
- text = (await Settings.get(Settings.APP_STRINGS_SKIP_BTN)),
- callback_data = ContextData(
- command = CallbackCommand.FIELD_EDITOR_CALLBACK,
- context = callback_data.context,
- entity_name = callback_data.entity_name,
- entity_id = callback_data.entity_id,
- field_name = callback_data.field_name,
- save_state = True).pack()))
+ if field_descriptor.is_optional:
+ btns.append(InlineKeyboardButton(
+ text = (await Settings.get(Settings.APP_STRINGS_SKIP_BTN)),
+ callback_data = ContextData(
+ command = CallbackCommand.FIELD_EDITOR_CALLBACK,
+ context = callback_data.context,
+ entity_name = callback_data.entity_name,
+ entity_id = callback_data.entity_id,
+ field_name = callback_data.field_name,
+ data = "skip").pack()))
keyboard_builder.row(*btns)
diff --git a/bot/handlers/editors/date.py b/bot/handlers/editors/date.py
index 48fd671..e12f892 100644
--- a/bot/handlers/editors/date.py
+++ b/bot/handlers/editors/date.py
@@ -4,13 +4,16 @@ from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
+from typing import TYPE_CHECKING
-from ....main import QBotApp
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ..context import ContextData, CallbackCommand
from ..common import get_send_message, get_field_descriptor, get_entity_descriptor
from .common import wrap_editor
+if TYPE_CHECKING:
+ from ....main import QBotApp
+
logger = getLogger(__name__)
router = Router()
@@ -18,7 +21,6 @@ router = Router()
async def date_picker(message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
- entity_descriptor: EntityDescriptor,
callback_data: ContextData,
current_value: datetime,
state: FSMContext,
@@ -82,11 +84,14 @@ async def date_picker(message: Message | CallbackQuery,
keyboard_builder.row(*buttons)
+ state_data = kwargs["state_data"]
+
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
callback_data = callback_data,
- state = state)
+ state_data = state_data)
+
+ await state.set_data(state_data)
if edit_prompt:
send_message = get_send_message(message)
@@ -98,7 +103,7 @@ async def date_picker(message: Message | CallbackQuery,
@router.callback_query(ContextData.filter(F.command == CallbackCommand.DATE_PICKER_YEAR))
async def date_picker_year(query: CallbackQuery,
callback_data: ContextData,
- app: QBotApp,
+ app: "QBotApp",
state: FSMContext,
**kwargs):
@@ -142,11 +147,9 @@ async def date_picker_year(query: CallbackQuery,
save_state = True).pack()))
field_descriptor = get_field_descriptor(app, callback_data)
- entity_descriptor = get_entity_descriptor(app, callback_data)
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
callback_data = callback_data,
state = state)
@@ -154,14 +157,12 @@ async def date_picker_year(query: CallbackQuery,
@router.callback_query(ContextData.filter(F.command == CallbackCommand.DATE_PICKER_MONTH))
-async def date_picker_month(query: CallbackQuery, callback_data: ContextData, app: QBotApp, **kwargs):
-
- entity_descriptor = get_entity_descriptor(app, callback_data)
+async def date_picker_month(query: CallbackQuery, callback_data: ContextData, app: "QBotApp", **kwargs):
+
field_descriptor = get_field_descriptor(app, callback_data)
await date_picker(query.message,
field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
callback_data = callback_data,
current_value = datetime.strptime(callback_data.data, "%Y-%m-%d"),
**kwargs)
\ No newline at end of file
diff --git a/bot/handlers/editors/entity.py b/bot/handlers/editors/entity.py
index a1f6e20..1e348fc 100644
--- a/bot/handlers/editors/entity.py
+++ b/bot/handlers/editors/entity.py
@@ -5,20 +5,24 @@ from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-from typing import get_args, get_origin
+from typing import get_args, get_origin, TYPE_CHECKING
-from ....main import QBotApp
from ....model.bot_entity import BotEntity
+from ....model.owned_bot_entity import OwnedBotEntity
from ....model.bot_enum import BotEnum
from ....model.settings import Settings
from ....model.user import UserBase
+from ....model.view_setting import ViewSetting
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
-from ....utils import serialize, deserialize
+from ....model import EntityPermission
+from ....utils import serialize, deserialize, get_user_permissions
from ..context import ContextData, CallbackCommand
from ..common import (get_send_message, get_local_text, get_field_descriptor,
- get_entity_descriptor, add_pagination_controls)
+ get_entity_descriptor, add_pagination_controls, add_filter_controls)
from .common import wrap_editor
+if TYPE_CHECKING:
+ from ....main import QBotApp
logger = getLogger(__name__)
router = Router()
@@ -28,24 +32,28 @@ async def entity_picker(message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
edit_prompt: str,
current_value: BotEntity | BotEnum | list[BotEntity] | list[BotEnum],
- state: FSMContext,
**kwargs):
- await state.update_data({"current_value": serialize(current_value, field_descriptor),
+ state_data: dict = kwargs["state_data"]
+
+ state_data.update({"current_value": serialize(current_value, field_descriptor),
"value": serialize(current_value, field_descriptor),
"edit_prompt": edit_prompt})
await render_entity_picker(field_descriptor = field_descriptor,
message = message,
- state = state,
current_value = current_value,
edit_prompt = edit_prompt,
**kwargs)
+
+
+def calc_total_pages(items_count: int, page_size: int) -> int:
+ return max(items_count // page_size + (1 if items_count % page_size else 0), 1)
+
async def render_entity_picker(*,
field_descriptor: EntityFieldDescriptor,
- entity_descriptor: EntityDescriptor,
message: Message | CallbackQuery,
callback_data: ContextData,
user: UserBase,
@@ -60,18 +68,21 @@ async def render_entity_picker(*,
if callback_data.command in [CallbackCommand.ENTITY_PICKER_PAGE, CallbackCommand.ENTITY_PICKER_TOGGLE_ITEM]:
page = int(callback_data.data.split("&")[0])
- is_list = False
+ # is_list = False
- type_origin = get_origin(field_descriptor.type_)
- if type_origin == UnionType:
- type_ = get_args(field_descriptor.type_)[0]
+ # type_origin = get_origin(field_descriptor.type_)
+ # if type_origin == UnionType:
+ # type_ = get_args(field_descriptor.type_)[0]
- elif type_origin == list:
- type_ = get_args(field_descriptor.type_)[0]
- is_list = True
+ # elif type_origin == list:
+ # type_ = get_args(field_descriptor.type_)[0]
+ # is_list = True
- else:
- type_ = field_descriptor.type_
+ # else:
+ # type_ = field_descriptor.type_
+
+ type_ = field_descriptor.type_base
+ is_list = field_descriptor.is_list
if not issubclass(type_, BotEntity) and not issubclass(type_, BotEnum):
raise ValueError("Unsupported type")
@@ -80,18 +91,43 @@ async def render_entity_picker(*,
if issubclass(type_, BotEnum):
items_count = len(type_.all_members)
+ total_pages = calc_total_pages(items_count, page_size)
+ page = min(page, total_pages)
enum_items = list(type_.all_members.values())[page_size * (page - 1):page_size * page]
items = [{"text": f"{"" if not is_list else "【✔︎】 " if item in (current_value or []) else "【 】 "}{item.localized(user.lang)}",
"value": item.value} for item in enum_items]
else:
- items_count = await type_.get_count(session = db_session)
- entity_items = await type_.get_multi(session = db_session, order_by = type_.name, skip = page_size * (page - 1), limit = page_size)
+ permissions = get_user_permissions(user, type_.bot_entity_descriptor)
+ entity_filter = await ViewSetting.get_filter(session = db_session, user_id = user.id, entity_name = type_.bot_entity_descriptor.class_name)
+ if (EntityPermission.LIST_ALL in permissions or
+ (EntityPermission.LIST in permissions and
+ not issubclass(type_, OwnedBotEntity))):
+ items_count = await type_.get_count(session = db_session, filter = entity_filter)
+ total_pages = calc_total_pages(items_count, page_size)
+ page = min(page, total_pages)
+ entity_items = await type_.get_multi(
+ session = db_session, order_by = type_.name, filter = entity_filter,
+ skip = page_size * (page - 1), limit = page_size)
+ elif (EntityPermission.LIST in permissions and
+ issubclass(type_, OwnedBotEntity)):
+ items_count = await type_.get_count_by_user(session = db_session, user_id = user.id, filter = entity_filter)
+ total_pages = calc_total_pages(items_count, page_size)
+ page = min(page, total_pages)
+ entity_items = await type_.get_multi_by_user(
+ session = db_session, user_id = user.id, order_by = type_.name, filter = entity_filter,
+ skip = page_size * (page - 1), limit = page_size)
+ else:
+ items_count = 0
+ total_pages = 1
+ page = 1
+ entity_items = list[BotEntity]()
+
items = [{"text": f"{"" if not is_list else "【✔︎】 " if item in (current_value or []) else "【 】 "}{
- type_.bot_entity_descriptor.item_caption_btn(type_.bot_entity_descriptor, item) if type_.bot_entity_descriptor.item_caption_btn
- else get_local_text(item.name, user.lang) if field_descriptor.localizable else item.name}",
+ type_.bot_entity_descriptor.item_caption(type_.bot_entity_descriptor, item) if type_.bot_entity_descriptor.item_caption
+ else get_local_text(item.name, user.lang) if type_.bot_entity_descriptor.fields_descriptors["name"].localizable else item.name}",
"value": str(item.id)} for item in entity_items]
- total_pages = items_count // page_size + (1 if items_count % page_size else 0)
+ # total_pages = items_count // page_size + (1 if items_count % page_size else 0)
keyboard_builder = InlineKeyboardBuilder()
@@ -112,6 +148,11 @@ async def render_entity_picker(*,
total_pages = total_pages,
command = CallbackCommand.ENTITY_PICKER_PAGE,
page = page)
+
+ if issubclass(type_, BotEntity):
+ add_filter_controls(keyboard_builder = keyboard_builder,
+ entity_descriptor = type_.bot_entity_descriptor,
+ filter = entity_filter)
if is_list:
keyboard_builder.row(
@@ -124,11 +165,14 @@ async def render_entity_picker(*,
field_name = callback_data.field_name,
save_state = True).pack()))
- await wrap_editor(keyboard_builder = keyboard_builder,
- field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
- callback_data = callback_data,
- state = state)
+ state_data = kwargs["state_data"]
+
+ await wrap_editor(keyboard_builder = keyboard_builder,
+ field_descriptor = field_descriptor,
+ callback_data = callback_data,
+ state_data = state_data)
+
+ await state.set_data(state_data)
send_message = get_send_message(message)
@@ -140,14 +184,14 @@ async def render_entity_picker(*,
async def entity_picker_callback(query: CallbackQuery,
callback_data: ContextData,
db_session: AsyncSession,
- app: QBotApp,
+ app: "QBotApp",
state: FSMContext,
**kwargs):
state_data = await state.get_data()
+ kwargs["state_data"] = state_data
field_descriptor = get_field_descriptor(app = app, callback_data = callback_data)
- entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
current_value = await deserialize(session = db_session, type_ = field_descriptor.type_, value = state_data["current_value"])
edit_prompt = state_data["edit_prompt"]
@@ -156,7 +200,7 @@ async def entity_picker_callback(query: CallbackQuery,
if callback_data.command == CallbackCommand.ENTITY_PICKER_TOGGLE_ITEM:
page, id_value = callback_data.data.split("&")
page = int(page)
- type_ = get_args(field_descriptor.type_)[0]
+ type_ = field_descriptor.type_base
if issubclass(type_, BotEnum):
item = type_(id_value)
if item in value:
@@ -170,7 +214,7 @@ async def entity_picker_callback(query: CallbackQuery,
else:
value.append(item)
- await state.update_data({"value": serialize(value, field_descriptor)})
+ state_data.update({"value": serialize(value, field_descriptor)})
elif callback_data.command == CallbackCommand.ENTITY_PICKER_PAGE:
if callback_data.data == "skip":
return
@@ -179,7 +223,6 @@ async def entity_picker_callback(query: CallbackQuery,
raise ValueError("Unsupported command")
await render_entity_picker(field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
message = query,
callback_data = callback_data,
current_value = value,
diff --git a/bot/handlers/editors/string.py b/bot/handlers/editors/string.py
index 31241fa..5b98dcb 100644
--- a/bot/handlers/editors/string.py
+++ b/bot/handlers/editors/string.py
@@ -21,7 +21,6 @@ router = Router()
async def string_editor(message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
- entity_descriptor: EntityDescriptor,
callback_data: ContextData,
current_value: Any,
edit_prompt: str,
@@ -31,14 +30,16 @@ async def string_editor(message: Message | CallbackQuery,
keyboard_builder = InlineKeyboardBuilder()
+ state_data: dict = kwargs["state_data"]
+
_edit_prompt = edit_prompt
- type_ = field_descriptor.type_
- type_origin = get_origin(type_)
- if type_origin == UnionType:
- type_ = get_args(type_)[0]
+ # type_ = field_descriptor.type_
+ # type_origin = get_origin(type_)
+ # if type_origin == UnionType:
+ # type_ = get_args(type_)[0]
- if type_ == str and field_descriptor.localizable:
+ if field_descriptor.type_base == str and field_descriptor.localizable:
current_locale = list(LanguageBase.all_members.values())[locale_index]
context_data = ContextData(
@@ -51,9 +52,9 @@ async def string_editor(message: Message | CallbackQuery,
_edit_prompt = f"{edit_prompt}\n{(await Settings.get(
Settings.APP_STRINGS_STRING_EDITOR_LOCALE_TEMPLATE_P_NAME)).format(name = current_locale)}"
- _current_value = get_local_text(current_value, current_locale)
+ _current_value = get_local_text(current_value, current_locale) if current_value else None
- await state.update_data({
+ state_data.update({
"context_data": context_data.pack(),
"edit_prompt": edit_prompt,
"locale_index": str(locale_index),
@@ -70,7 +71,7 @@ async def string_editor(message: Message | CallbackQuery,
_current_value = serialize(current_value, field_descriptor)
- await state.update_data({
+ state_data.update({
"context_data": context_data.pack()})
if _current_value:
@@ -79,16 +80,19 @@ async def string_editor(message: Message | CallbackQuery,
keyboard_builder.row(InlineKeyboardButton(text = _current_value_caption,
copy_text = CopyTextButton(text = _current_value)))
+ state_data = kwargs["state_data"]
+
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
- entity_descriptor = entity_descriptor,
callback_data = callback_data,
- state = state)
+ state_data = state_data)
+
+ await state.set_data(state_data)
send_message = get_send_message(message)
await send_message(text = _edit_prompt, reply_markup = keyboard_builder.as_markup())
-async def context_command_fiter(*args, **kwargs):
- print(args, kwargs)
- return True
+# async def context_command_fiter(*args, **kwargs):
+# print(args, kwargs)
+# return True
diff --git a/bot/handlers/forms/entity_form.py b/bot/handlers/forms/entity_form.py
index bab7b3c..8e04148 100644
--- a/bot/handlers/forms/entity_form.py
+++ b/bot/handlers/forms/entity_form.py
@@ -1,4 +1,4 @@
-from typing import get_args, get_origin
+from typing import get_args, get_origin, TYPE_CHECKING
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
@@ -9,7 +9,6 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-from ....main import QBotApp
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.owned_bot_entity import OwnedBotEntity
@@ -21,25 +20,33 @@ from ....utils import serialize, deserialize, get_user_permissions
from ..context import ContextData, CallbackCommand, CommandContext
from ..common import get_send_message, get_local_text, get_entity_descriptor, get_callable_str, get_value_repr
+if TYPE_CHECKING:
+ from ....main import QBotApp
+
logger = getLogger(__name__)
router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.ENTITY_ITEM))
-async def entity_item_callback(query: CallbackQuery, callback_data: ContextData, **kwargs):
+async def entity_item_callback(query: CallbackQuery, **kwargs):
- await clear_state(state = kwargs["state"])
- stack = await save_navigation_context(callback_data = callback_data, state = kwargs["state"])
+ callback_data: ContextData = kwargs["callback_data"]
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+
+ clear_state(state_data = state_data)
+ stack = save_navigation_context(callback_data = callback_data, state_data = state_data)
- await entity_item(query = query, callback_data = callback_data, navigation_stack = stack, **kwargs)
+ await entity_item(query = query, navigation_stack = stack, **kwargs)
async def entity_item(query: CallbackQuery,
callback_data: ContextData,
db_session: AsyncSession,
user: UserBase,
- app: QBotApp,
+ app: "QBotApp",
navigation_stack: list[ContextData],
**kwargs):
@@ -77,8 +84,40 @@ async def entity_item(query: CallbackQuery,
(EntityPermission.DELETE in user_permissions and is_owned and
entity_item.user_id == user.id))
- edit_delete_row = []
if can_edit:
+ for edit_buttons_row in entity_descriptor.edit_buttons:
+ btn_row = []
+ for field_name in edit_buttons_row:
+ field_name, btn_caption = field_name if isinstance(field_name, tuple) else (field_name, None)
+ if field_name in entity_descriptor.fields_descriptors:
+ field_descriptor = entity_descriptor.fields_descriptors[field_name]
+ # if field_descriptor.is_list and issubclass(field_descriptor.type_base, BotEntity):
+ # await field_descriptor.type_base.
+ field_value = getattr(entity_item, field_descriptor.field_name)
+ if btn_caption:
+ btn_text = get_callable_str(btn_caption, field_descriptor, entity_item, field_value)
+ else:
+ if field_descriptor.type_base == bool:
+ btn_text = f"{"【✔︎】 " if field_value else "【 】 "}{
+ get_callable_str(field_descriptor.caption, field_descriptor, entity_item, field_value) if field_descriptor.caption
+ else field_name}"
+ else:
+ btn_text = (f"✏️ {get_callable_str(field_descriptor.caption, field_descriptor, entity_item, field_value)}"
+ if field_descriptor.caption else f"✏️ {field_name}")
+ btn_row.append(
+ InlineKeyboardButton(
+ text = btn_text,
+ callback_data = ContextData(
+ command = CallbackCommand.FIELD_EDITOR,
+ context = CommandContext.ENTITY_FIELD_EDIT,
+ entity_name = entity_descriptor.name,
+ entity_id = str(entity_item.id),
+ field_name = field_name).pack()))
+ if btn_row:
+ keyboard_builder.row(*btn_row)
+
+ edit_delete_row = []
+ if can_edit and entity_descriptor.edit_button_visible:
edit_delete_row.append(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_EDIT_BTN)),
@@ -87,8 +126,7 @@ async def entity_item(query: CallbackQuery,
context = CommandContext.ENTITY_EDIT,
entity_name = entity_descriptor.name,
entity_id = str(entity_item.id),
- field_name = entity_descriptor.field_sequence[0],
- save_state = True).pack()))
+ field_name = entity_descriptor.field_sequence[0]).pack()))
if can_delete:
edit_delete_row.append(
@@ -102,15 +140,15 @@ async def entity_item(query: CallbackQuery,
if edit_delete_row:
keyboard_builder.row(*edit_delete_row)
- entity_caption = get_callable_str(entity_descriptor.caption_msg, entity_descriptor, entity_item)
+ entity_caption = get_callable_str(entity_descriptor.caption, entity_descriptor, entity_item)
entity_item_name = get_local_text(entity_item.name, user.lang) if entity_descriptor.fields_descriptors["name"].localizable else entity_item.name
item_text = f"{entity_caption or entity_descriptor.name}: {entity_item_name}"
for field_descriptor in entity_descriptor.fields_descriptors.values():
- if field_descriptor.name in ["name", "id"] or not field_descriptor.is_visible:
+ if field_descriptor.name == "name" or not field_descriptor.is_visible:
continue
- field_caption = get_callable_str(field_descriptor.caption_str, field_descriptor, entity_item)
+ field_caption = get_callable_str(field_descriptor.caption, field_descriptor, entity_item)
value = get_value_repr(value = getattr(entity_item, field_descriptor.name),
field_descriptor = field_descriptor,
locale = user.lang)
@@ -123,6 +161,10 @@ async def entity_item(query: CallbackQuery,
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
+ state: FSMContext = kwargs["state"]
+ state_data = kwargs["state_data"]
+ await state.set_data(state_data)
+
send_message = get_send_message(query)
await send_message(text = item_text, reply_markup = keyboard_builder.as_markup())
@@ -134,7 +176,7 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
callback_data: ContextData = kwargs["callback_data"]
user: UserBase = kwargs["user"]
db_session: AsyncSession = kwargs["db_session"]
- app: QBotApp = kwargs["app"]
+ app: "QBotApp" = kwargs["app"]
entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
user_permissions = get_user_permissions(user, entity_descriptor)
@@ -147,8 +189,18 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
entity.user_id == user.id)):
return await query.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
+
+ if callback_data.data == "yes":
- if not callback_data.data:
+ await entity_descriptor.type_.remove(
+ session = db_session, id = int(callback_data.entity_id), commit = True)
+
+ await route_callback(message = query, **kwargs)
+
+ elif callback_data.data == "no":
+ await route_callback(message = query, back = False, **kwargs)
+
+ elif not callback_data.data:
field_descriptor = entity_descriptor.fields_descriptors["name"]
@@ -175,16 +227,6 @@ async def entity_delete_callback(query: CallbackQuery, **kwargs):
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
data = "no").pack())).as_markup())
-
- if callback_data.data == "yes":
-
- await entity_descriptor.type_.remove(
- session = db_session, id = int(callback_data.entity_id), commit = True)
-
- await route_callback(message = query, **kwargs)
-
- if callback_data.data == "no":
- await route_callback(message = query, back = False, **kwargs)
diff --git a/bot/handlers/forms/entity_list.py b/bot/handlers/forms/entity_list.py
index 9ee818d..0393eee 100644
--- a/bot/handlers/forms/entity_list.py
+++ b/bot/handlers/forms/entity_list.py
@@ -1,4 +1,4 @@
-from typing import get_args, get_origin
+from typing import get_args, get_origin, TYPE_CHECKING
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
@@ -9,18 +9,21 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-from ....main import QBotApp
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.owned_bot_entity import OwnedBotEntity
from ....model.settings import Settings
from ....model.user import UserBase
+from ....model.view_setting import ViewSetting
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ....model import EntityPermission
from ....utils import serialize, deserialize, get_user_permissions
from ..context import ContextData, CallbackCommand, CommandContext
from ..common import (add_pagination_controls, get_local_text, get_entity_descriptor,
- get_callable_str, get_send_message)
+ get_callable_str, get_send_message, add_filter_controls)
+
+if TYPE_CHECKING:
+ from ....main import QBotApp
logger = getLogger(__name__)
@@ -28,22 +31,28 @@ router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.ENTITY_LIST))
-async def entity_list_callback(query: CallbackQuery, callback_data: ContextData, **kwargs):
+async def entity_list_callback(query: CallbackQuery, **kwargs):
+
+ callback_data: ContextData = kwargs["callback_data"]
if callback_data.data == "skip":
return
-
- await clear_state(state = kwargs["state"])
- stack = await save_navigation_context(callback_data = callback_data, state = kwargs["state"])
- await entity_list(message = query, callback_data = callback_data, navigation_stack = stack, **kwargs)
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+
+ clear_state(state_data = state_data)
+ stack = save_navigation_context(callback_data = callback_data, state_data = state_data)
+
+ await entity_list(message = query, navigation_stack = stack, **kwargs)
async def entity_list(message: CallbackQuery | Message,
callback_data: ContextData,
db_session: AsyncSession,
user: UserBase,
- app: QBotApp,
+ app: "QBotApp",
navigation_stack: list[ContextData],
**kwargs):
@@ -68,38 +77,54 @@ async def entity_list(message: CallbackQuery | Message,
page_size = await Settings.get(Settings.PAGE_SIZE)
+ entity_filter = await ViewSetting.get_filter(session = db_session, user_id = user.id, entity_name = entity_descriptor.class_name)
+
+ def calc_total_pages(items_count: int, page_size: int) -> int:
+ return max(items_count // page_size + (1 if items_count % page_size else 0), 1)
+
if issubclass(entity_type, OwnedBotEntity):
if EntityPermission.READ_ALL in user_permissions or EntityPermission.LIST_ALL in user_permissions:
+ items_count = await entity_type.get_count(session = db_session, filter = entity_filter)
+ total_pages = calc_total_pages(items_count, page_size)
+ page = min(page, total_pages)
items = await entity_type.get_multi(
- session = db_session, order_by = entity_type.name,
- skip = page_size * (page - 1), limit = page_size)
- items_count = await entity_type.get_count(session = db_session)
+ session = db_session, order_by = entity_type.name, filter = entity_filter,
+ skip = page_size * (page - 1), limit = page_size)
elif EntityPermission.READ in user_permissions or EntityPermission.LIST in user_permissions:
+ items_count = await entity_type.get_count_by_user(session = db_session, user_id = user.id, filter = entity_filter)
+ total_pages = calc_total_pages(items_count, page_size)
+ page = min(page, total_pages)
items = await entity_type.get_multi_by_user(
- session = db_session, user_id = user.id, order_by = entity_type.name,
+ session = db_session, user_id = user.id, order_by = entity_type.name, filter = entity_filter,
skip = page_size * (page - 1), limit = page_size)
- items_count = await entity_type.get_count_by_user(session = db_session, user_id = user.id)
else:
items = list[OwnedBotEntity]()
items_count = 0
+ total_pages = 1
+ page = 1
elif issubclass(entity_type, BotEntity):
if (EntityPermission.READ in user_permissions or EntityPermission.LIST in user_permissions or
EntityPermission.READ_ALL in user_permissions or EntityPermission.LIST_ALL in user_permissions):
+ items_count = await entity_type.get_count(session = db_session, filter = entity_filter)
+ total_pages = calc_total_pages(items_count, page_size)
+ page = min(page, total_pages)
items = await entity_type.get_multi(
- session = db_session, order_by = entity_type.name,
+ session = db_session, order_by = entity_type.name, filter = entity_filter,
skip = page_size * (page - 1), limit = page_size)
- items_count = await entity_type.get_count(session = db_session)
+
else:
items = list[BotEntity]()
+ total_pages = 1
+ page = 1
items_count = 0
else:
raise ValueError(f"Unsupported entity type: {entity_type}")
- total_pages = items_count // page_size + (1 if items_count % page_size else 0)
+
for item in items:
- if entity_descriptor.item_caption_btn:
- caption = entity_descriptor.item_caption_btn(entity_descriptor, item)
+ if entity_descriptor.item_caption:
+ caption = entity_descriptor.item_caption(entity_descriptor, item)
elif entity_descriptor.fields_descriptors["name"].localizable:
caption = get_local_text(item.name, user.lang)
else:
@@ -118,6 +143,10 @@ async def entity_list(message: CallbackQuery | Message,
command = CallbackCommand.ENTITY_LIST,
page = page)
+ add_filter_controls(keyboard_builder = keyboard_builder,
+ entity_descriptor = entity_descriptor,
+ filter = entity_filter)
+
context = pop_navigation_context(navigation_stack)
if context:
keyboard_builder.row(
@@ -125,8 +154,8 @@ async def entity_list(message: CallbackQuery | Message,
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
- if entity_descriptor.caption_msg:
- entity_text = get_callable_str(entity_descriptor.caption_msg, entity_descriptor)
+ if entity_descriptor.caption:
+ entity_text = get_callable_str(entity_descriptor.caption_plural, entity_descriptor)
else:
entity_text = entity_descriptor.name
if entity_descriptor.description:
@@ -134,6 +163,10 @@ async def entity_list(message: CallbackQuery | Message,
else:
entity_desciption = None
+ state: FSMContext = kwargs["state"]
+ state_data = kwargs["state_data"]
+ await state.set_data(state_data)
+
send_message = get_send_message(message)
await send_message(text = f"{entity_text}{f"\n{entity_desciption}" if entity_desciption else ""}",
diff --git a/bot/handlers/menu/entities.py b/bot/handlers/menu/entities.py
index c5a105a..728d77b 100644
--- a/bot/handlers/menu/entities.py
+++ b/bot/handlers/menu/entities.py
@@ -5,14 +5,16 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-
-from ....main import QBotApp
+from typing import TYPE_CHECKING
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
from ..common import get_send_message
from ....model.descriptors import EntityCaptionCallable
+if TYPE_CHECKING:
+ from ....main import QBotApp
+
logger = getLogger(__name__)
router = Router()
@@ -21,16 +23,18 @@ router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_ENTITIES))
async def menu_entry_entities(message: CallbackQuery, **kwargs):
- stack = await save_navigation_context(
- callback_data = kwargs["callback_data"],
- state = kwargs["state"])
+ callback_data: ContextData = kwargs["callback_data"]
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+
+ stack = save_navigation_context(callback_data = callback_data, state_data = state_data)
await entities_menu(message = message, navigation_stack = stack, **kwargs)
async def entities_menu(message: Message | CallbackQuery,
- callback_data: ContextData,
- app: QBotApp,
+ app: "QBotApp",
state: FSMContext,
navigation_stack: list[ContextData],
**kwargs):
@@ -40,12 +44,12 @@ async def entities_menu(message: Message | CallbackQuery,
entity_metadata = app.entity_metadata
for entity in entity_metadata.entity_descriptors.values():
- if entity.caption_btn.__class__ == EntityCaptionCallable:
- caption = entity.caption_btn(entity) or entity.name
- elif entity.caption_btn.__class__ == LazyProxy:
- caption = f"{f"{entity.icon} " if entity.icon else ""}{entity.caption_btn.value or entity.name}"
+ if entity.caption_plural.__class__ == EntityCaptionCallable:
+ caption = entity.caption_plural(entity) or entity.name
+ elif entity.caption_plural.__class__ == LazyProxy:
+ caption = f"{f"{entity.icon} " if entity.icon else ""}{entity.caption_plural.value or entity.name}"
else:
- caption = f"{f"{entity.icon} " if entity.icon else ""}{entity.caption_btn or entity.name}"
+ caption = f"{f"{entity.icon} " if entity.icon else ""}{entity.caption_plural or entity.name}"
keyboard_builder.row(
InlineKeyboardButton(
@@ -58,6 +62,9 @@ async def entities_menu(message: Message | CallbackQuery,
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
+
+ state_data = kwargs["state_data"]
+ await state.set_data(state_data)
send_message = get_send_message(message)
diff --git a/bot/handlers/menu/language.py b/bot/handlers/menu/language.py
index 2d2cd93..6aacb1c 100644
--- a/bot/handlers/menu/language.py
+++ b/bot/handlers/menu/language.py
@@ -1,17 +1,14 @@
from aiogram import Router, F
-from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
-from aiogram.utils.keyboard import InlineKeyboardBuilder
+from aiogram.fsm.context import FSMContext
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-from ....main import QBotApp
from ....model.language import LanguageBase
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
from ..navigation import route_callback
-from .settings import settings_menu
from ..common import get_send_message
@@ -22,8 +19,12 @@ router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_LANGUAGE))
async def menu_entry_language(message: CallbackQuery, **kwargs):
- stack = await save_navigation_context(callback_data = kwargs["callback_data"],
- state = kwargs["state"])
+ callback_data: ContextData = kwargs["callback_data"]
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+
+ stack = save_navigation_context(callback_data = callback_data, state_data = state_data)
await language_menu(message, navigation_stack = stack, **kwargs)
@@ -45,18 +46,30 @@ async def language_menu(message: Message | CallbackQuery,
if context:
inline_keyboard.append([InlineKeyboardButton(text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack())])
+
+ state: FSMContext = kwargs["state"]
+ state_data = kwargs["state_data"]
+ await state.set_data(state_data)
await send_message(text = (await Settings.get(Settings.APP_STRINGS_LANGUAGE)),
reply_markup = InlineKeyboardMarkup(inline_keyboard = inline_keyboard))
@router.callback_query(ContextData.filter(F.command == CallbackCommand.SET_LANGUAGE))
-async def set_language(message: CallbackQuery, user: UserBase, callback_data: ContextData, db_session: AsyncSession, **kwargs):
+async def set_language(message: CallbackQuery, **kwargs):
- user.lang = callback_data.data
+ user: UserBase = kwargs["user"]
+ callback_data: ContextData = kwargs["callback_data"]
+ db_session: AsyncSession = kwargs["db_session"]
+ state: FSMContext = kwargs["state"]
+
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+
+ user.lang = LanguageBase(callback_data.data)
await db_session.commit()
- await route_callback(message, callback_data = callback_data, user = user, db_session = db_session, **kwargs)
+ await route_callback(message, **kwargs)
from ..navigation import pop_navigation_context, save_navigation_context
\ No newline at end of file
diff --git a/bot/handlers/menu/main.py b/bot/handlers/menu/main.py
index 8d3302f..1f3e973 100644
--- a/bot/handlers/menu/main.py
+++ b/bot/handlers/menu/main.py
@@ -5,8 +5,6 @@ from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-
-from ....main import QBotApp
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
@@ -17,15 +15,15 @@ logger = getLogger(__name__)
router = Router()
-@router.message(Command("menu"))
-async def command_menu(message: Message, **kwargs):
+# @router.message(Command("menu"))
+# async def command_menu(message: Message, **kwargs):
- await clear_state(state = kwargs["state"], clear_nav = True)
- callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_MAIN)
- stack = await save_navigation_context(callback_data = callback_data, state = kwargs["state"])
- kwargs.update({"navigation_stack": stack, "callback_data": callback_data})
+# await clear_state(state = kwargs["state"], clear_nav = True)
+# callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_MAIN)
+# stack = await save_navigation_context(callback_data = callback_data, state = kwargs["state"])
+# kwargs.update({"navigation_stack": stack, "callback_data": callback_data})
- await main_menu(message, **kwargs)
+# await main_menu(message, **kwargs)
# @router.callback_query(CallbackData.filter(F.command == CallbackCommand.MENU_ENTRY))
@@ -77,6 +75,8 @@ from .language import router as language_router
from ..editors import router as editors_router
from ..forms.entity_list import router as entity_list_router
from ..forms.entity_form import router as entity_form_router
+from ..common import router as common_router
+from ..user_handlers import router as user_handlers_router
router.include_routers(
entities_router,
@@ -85,7 +85,9 @@ router.include_routers(
language_router,
editors_router,
entity_list_router,
- entity_form_router
+ entity_form_router,
+ common_router,
+ user_handlers_router
)
from ..navigation import save_navigation_context, pop_navigation_context, clear_state
\ No newline at end of file
diff --git a/bot/handlers/menu/parameters.py b/bot/handlers/menu/parameters.py
index 805bc36..c5d77de 100644
--- a/bot/handlers/menu/parameters.py
+++ b/bot/handlers/menu/parameters.py
@@ -8,11 +8,10 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-from ....main import QBotApp
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand, CommandContext
-from ..common import get_send_message, get_value_repr, get_callable_str, authorize_command
+
from ..navigation import save_navigation_context, pop_navigation_context
@@ -23,9 +22,13 @@ router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_PARAMETERS))
async def menu_entry_parameters(message: CallbackQuery, **kwargs):
- await clear_state(state = kwargs["state"])
+ callback_data: ContextData = kwargs["callback_data"]
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
- stack = await save_navigation_context(callback_data = kwargs["callback_data"], state = kwargs["state"])
+ clear_state(state_data = state_data)
+ stack = save_navigation_context(callback_data = callback_data, state_data = state_data)
await parameters_menu(message = message, navigation_stack = stack, **kwargs)
@@ -47,11 +50,11 @@ async def parameters_menu(message: Message | CallbackQuery,
if not key.is_visible:
continue
- if key.caption_value_btn:
- caption = get_callable_str(callable_str = key.caption_value_btn, descriptor = key, entity = None, value = value)
+ if key.caption_value:
+ caption = get_callable_str(callable_str = key.caption_value, descriptor = key, entity = None, value = value)
else:
- if key.caption_btn:
- caption = get_callable_str(callable_str = key.caption_btn, descriptor = key, entity = None, value = value)
+ if key.caption:
+ caption = get_callable_str(callable_str = key.caption, descriptor = key, entity = None, value = value)
else:
caption = key.name
@@ -72,9 +75,14 @@ async def parameters_menu(message: Message | CallbackQuery,
keyboard_builder.row(InlineKeyboardButton(text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
+ state: FSMContext = kwargs["state"]
+ state_data = kwargs["state_data"]
+ await state.set_data(state_data)
+
send_message = get_send_message(message)
await send_message(text = (await Settings.get(Settings.APP_STRINGS_PARAMETERS)), reply_markup = keyboard_builder.as_markup())
-from ..navigation import pop_navigation_context, get_navigation_context, clear_state
\ No newline at end of file
+from ..navigation import pop_navigation_context, get_navigation_context, clear_state
+from ..common import get_send_message, get_value_repr, get_callable_str, authorize_command
\ No newline at end of file
diff --git a/bot/handlers/menu/settings.py b/bot/handlers/menu/settings.py
index 0909513..3971698 100644
--- a/bot/handlers/menu/settings.py
+++ b/bot/handlers/menu/settings.py
@@ -6,7 +6,6 @@ from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
-from ....main import QBotApp
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
@@ -20,7 +19,12 @@ router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_SETTINGS))
async def menu_entry_settings(message: CallbackQuery, **kwargs):
- stack = await save_navigation_context(callback_data = kwargs["callback_data"], state = kwargs["state"])
+ callback_data: ContextData = kwargs["callback_data"]
+ state: FSMContext = kwargs["state"]
+ state_data = await state.get_data()
+ kwargs["state_data"] = state_data
+
+ stack = save_navigation_context(callback_data = callback_data, state_data = state_data)
await settings_menu(message, navigation_stack = stack, **kwargs)
@@ -49,9 +53,15 @@ async def settings_menu(message: Message | CallbackQuery,
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
+
+ state: FSMContext = kwargs["state"]
+ state_data = kwargs["state_data"]
+ await state.set_data(state_data)
send_message = get_send_message(message)
+
+
await send_message(text = (await Settings.get(Settings.APP_STRINGS_SETTINGS)), reply_markup = keyboard_builder.as_markup())
diff --git a/bot/handlers/navigation.py b/bot/handlers/navigation.py
index 8821f9c..36193f9 100644
--- a/bot/handlers/navigation.py
+++ b/bot/handlers/navigation.py
@@ -4,23 +4,25 @@ from aiogram.types import Message, CallbackQuery
from .context import ContextData, CallbackCommand
-async def save_navigation_context(callback_data: ContextData, state: FSMContext) -> list[ContextData]:
- data = await state.get_data()
- stack = [ContextData.unpack(item) for item in data.get("navigation_stack", [])]
- data_nc = data.get("navigation_context")
+def save_navigation_context(callback_data: ContextData, state_data: dict) -> list[ContextData]:
+ stack = [ContextData.unpack(item) for item in state_data.get("navigation_stack", [])]
+ data_nc = state_data.get("navigation_context")
navigation_context = ContextData.unpack(data_nc) if data_nc else None
if callback_data.back:
callback_data.back = False
if stack:
stack.pop()
else:
- if stack and navigation_context and navigation_context.command == callback_data.command:
+ if (stack and navigation_context and
+ navigation_context.command == callback_data.command and
+ navigation_context.command != CallbackCommand.USER_COMMAND):
navigation_context = callback_data
elif navigation_context:
stack.append(navigation_context)
- await state.update_data({"navigation_stack": [item.pack() for item in stack],
- "navigation_context": callback_data.pack()})
+ state_data["navigation_stack"] = [item.pack() for item in stack]
+ state_data["navigation_context"] = callback_data.pack()
+
return stack
@@ -31,36 +33,33 @@ def pop_navigation_context(stack: list[ContextData]) -> ContextData | None:
return data
-async def get_navigation_context(state: FSMContext) -> tuple[list[ContextData], ContextData | None]:
- data = await state.get_data()
- data_nc = data.get("navigation_context")
+def get_navigation_context(state_data: dict) -> tuple[list[ContextData], ContextData | None]:
+ data_nc = state_data.get("navigation_context")
context = ContextData.unpack(data_nc) if data_nc else None
- return ([ContextData.unpack(item) for item in data.get("navigation_stack", [])],
+ return ([ContextData.unpack(item) for item in state_data.get("navigation_stack", [])],
context)
-async def clear_state(state: FSMContext, clear_nav: bool = False):
+def clear_state(state_data: dict, clear_nav: bool = False):
if clear_nav:
- await state.clear()
+ state_data.clear()
else:
- state_data = await state.get_data()
stack = state_data.get("navigation_stack")
context = state_data.get("navigation_context")
- update_data = {}
+ state_data.clear()
if stack:
- update_data["navigation_stack"] = stack
+ state_data["navigation_stack"] = stack
if context:
- update_data["navigation_context"] = context
- await state.clear()
- await state.update_data(update_data)
+ state_data["navigation_context"] = context
async def route_callback(message: Message | CallbackQuery, back: bool = True, **kwargs):
- stack, context = await get_navigation_context(kwargs["state"])
+ state_data = kwargs["state_data"]
+ stack, context = get_navigation_context(state_data)
if back:
context = pop_navigation_context(stack)
- stack = await save_navigation_context(callback_data = context, state = kwargs["state"])
+ stack = save_navigation_context(callback_data = context, state_data = state_data)
kwargs.update({"callback_data": context, "navigation_stack": stack})
if context:
if context.command == CallbackCommand.MENU_ENTRY_MAIN:
@@ -77,6 +76,8 @@ async def route_callback(message: Message | CallbackQuery, back: bool = True, **
await entity_list(message, **kwargs)
elif context.command == CallbackCommand.ENTITY_ITEM:
await entity_item(message, **kwargs)
+ elif context.command == CallbackCommand.FIELD_EDITOR:
+ await field_editor(message, **kwargs)
else:
raise ValueError(f"Unknown command {context.command}")
@@ -90,4 +91,5 @@ from .menu.parameters import parameters_menu
from .menu.language import language_menu
from .menu.entities import entities_menu
from .forms.entity_list import entity_list
-from .forms.entity_form import entity_item
\ No newline at end of file
+from .forms.entity_form import entity_item
+from .editors import field_editor
\ No newline at end of file
diff --git a/bot/handlers/start.py b/bot/handlers/start.py
index 7306d92..8d8464c 100644
--- a/bot/handlers/start.py
+++ b/bot/handlers/start.py
@@ -18,7 +18,8 @@ router = Router()
@router.message(CommandStart())
async def start(message: Message, db_session: AsyncSession, app: QBotApp, state: FSMContext):
- await clear_state(state = state, clear_nav = True)
+ state_data = await state.get_data()
+ clear_state(state_data = state_data, clear_nav = True)
User = app.user_class
diff --git a/bot/handlers/user_handlers/__init__.py b/bot/handlers/user_handlers/__init__.py
new file mode 100644
index 0000000..38afa98
--- /dev/null
+++ b/bot/handlers/user_handlers/__init__.py
@@ -0,0 +1,125 @@
+from dataclasses import dataclass, field
+from typing import Any, Callable, TYPE_CHECKING
+from aiogram import Router, F
+from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
+from aiogram.fsm.context import FSMContext
+from aiogram.utils.i18n import I18n
+from aiogram.utils.keyboard import InlineKeyboardBuilder
+from sqlmodel.ext.asyncio.session import AsyncSession
+from ..context import ContextData, CallbackCommand
+from ....model.user import UserBase
+from ....model.settings import Settings
+
+
+if TYPE_CHECKING:
+ from ....main import QBotApp
+
+
+router = Router()
+
+
+@dataclass(kw_only = True)
+class CommandCallbackContext[UT: UserBase]:
+
+ keyboard_builder: InlineKeyboardBuilder = field(default_factory = InlineKeyboardBuilder)
+ message_text: str | None = None
+ register_navigation: bool = True
+ message: Message | CallbackQuery
+ callback_data: ContextData
+ db_session: AsyncSession
+ user: UT
+ app: "QBotApp"
+ state_data: dict[str, Any]
+ state: FSMContext
+ i18n: I18n
+ kwargs: dict[str, Any] = field(default_factory = dict)
+
+
+@dataclass(kw_only = True)
+class Command:
+
+ name: str
+ handler: Callable[[CommandCallbackContext], None]
+ caption: str | dict[str, str] | None = None
+ register_navigation: bool = True
+ clear_navigation: bool = False
+ clear_state: bool = True
+
+@router.message(F.text.startswith("/"))
+async def command_text(message: Message, **kwargs):
+
+ str_command = message.text.lstrip("/")
+ callback_data = ContextData(command = CallbackCommand.USER_COMMAND,
+ user_command = str_command)
+
+ await command_handler(message = message, callback_data = callback_data, **kwargs)
+
+
+@router.callback_query(ContextData.filter(F.command == CallbackCommand.USER_COMMAND))
+async def command_callback(message: CallbackQuery, **kwargs):
+
+ await command_handler(message = message, **kwargs)
+
+
+async def command_handler(message: Message | CallbackQuery, **kwargs):
+
+ callback_data: ContextData = kwargs.pop("callback_data")
+ str_command = callback_data.user_command
+ app: "QBotApp" = kwargs.pop("app")
+ command = app.bot_commands.get(str_command)
+
+ if not command:
+ return
+
+ state: FSMContext = kwargs.pop("state")
+ state_data = await state.get_data()
+
+ if command.register_navigation:
+ clear_state(state_data = state_data)
+
+ if command.clear_navigation:
+ state_data.pop("navigation_stack", None)
+ state_data.pop("navigation_context", None)
+
+ if command.register_navigation:
+ stack = save_navigation_context(callback_data = callback_data, state_data = state_data)
+
+ callback_context = CommandCallbackContext[app.user_class](
+ message = message,
+ callback_data = callback_data,
+ db_session = kwargs.pop("db_session"),
+ user = kwargs.pop("user"),
+ app = app,
+ state_data = state_data,
+ state = state,
+ i18n = kwargs.pop("i18n"),
+ kwargs = kwargs)
+
+ await command.handler(callback_context)
+
+ await state.set_data(state_data)
+
+ if command.register_navigation:
+
+ stack, navigation_context = get_navigation_context(state_data = state_data)
+ back_callback_data = pop_navigation_context(stack = stack)
+ if back_callback_data:
+
+ callback_context.keyboard_builder.row(
+ InlineKeyboardButton(
+ text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
+ callback_data = back_callback_data.pack()))
+
+ send_message = get_send_message(message)
+
+ if isinstance(message, CallbackCommand):
+ message = message.message
+
+ if callback_context.message_text:
+ await send_message(text = callback_context.message_text,
+ reply_markup = callback_context.keyboard_builder.as_markup())
+ else:
+ await message.edit_reply_markup(reply_markup = callback_context.keyboard_builder.as_markup())
+
+from ..common import get_send_message
+from ..navigation import save_navigation_context, get_navigation_context, clear_state, pop_navigation_context
\ No newline at end of file
diff --git a/fsm/db_storage.py b/fsm/db_storage.py
index 7481986..b614949 100644
--- a/fsm/db_storage.py
+++ b/fsm/db_storage.py
@@ -60,9 +60,9 @@ class DbStorage(BaseStorage):
if not data:
await session.delete(db_data)
else:
- db_data.value = json.dumps(data)
+ db_data.value = json.dumps(data, ensure_ascii = False)
elif data:
- db_data = FSMStorage(key = db_key, value = json.dumps(data,))
+ db_data = FSMStorage(key = db_key, value = json.dumps(data, ensure_ascii = False))
session.add(db_data)
else:
return
diff --git a/lifespan.py b/lifespan.py
index ca72ec7..46950fb 100644
--- a/lifespan.py
+++ b/lifespan.py
@@ -1,10 +1,14 @@
+from aiogram.types import BotCommand
from contextlib import asynccontextmanager
from .main import QBotApp
+from logging import getLogger
+
+logger = getLogger(__name__)
@asynccontextmanager
async def default_lifespan(app: QBotApp):
- app.logger.debug("starting qbot app")
+ logger.debug("starting qbot app")
if app.config.USE_NGROK:
try:
@@ -12,18 +16,35 @@ async def default_lifespan(app: QBotApp):
from pyngrok.conf import PyngrokConfig
except ImportError:
- app.logger.error("pyngrok is not installed")
+ logger.error("pyngrok is not installed")
raise
tunnel = ngrok.connect(app.config.API_PORT, pyngrok_config = PyngrokConfig(auth_token = app.config.NGROK_AUTH_TOKEN))
app.config.NGROK_URL = tunnel.public_url
+
+ commands_captions = dict[str, list[tuple[str, str]]]()
+
+ for command_name, command in app.bot_commands.items():
+ if isinstance(command.caption, str):
+ if "default" not in commands_captions:
+ commands_captions["default"] = []
+ commands_captions["default"].append((command_name, command.caption))
+ for locale, description in command.caption.items():
+ if locale not in commands_captions:
+ commands_captions[locale] = []
+ commands_captions[locale].append((command_name, description))
+
+ for locale, commands in commands_captions.items():
+ await app.bot.set_my_commands([BotCommand(command = command[0], description=command[1]) for command in commands],
+ language_code = None if locale == "default" else locale)
+
await app.bot.set_webhook(url = f"{app.config.API_URL}/api/telegram/webhook",
drop_pending_updates = True,
allowed_updates = ['message', 'callback_query', 'pre_checkout_query'],
secret_token = app.bot_auth_token)
- app.logger.info("qbot app started")
+ logger.info("qbot app started")
if app.lifespan:
async with app.lifespan(app):
@@ -31,10 +52,10 @@ async def default_lifespan(app: QBotApp):
else:
yield
- app.logger.info("stopping qbot app")
+ logger.info("stopping qbot app")
await app.bot.delete_webhook()
if app.config.USE_NGROK:
ngrok.disconnect(app.config.NGROK_URL)
ngrok.kill()
- app.logger.info("qbot app stopped")
\ No newline at end of file
+ logger.info("qbot app stopped")
\ No newline at end of file
diff --git a/main.py b/main.py
index dd34f67..18e05c7 100644
--- a/main.py
+++ b/main.py
@@ -1,5 +1,11 @@
+from functools import wraps
+from typing import Annotated, Callable, Any, Union, override
+from typing_extensions import Doc
from aiogram import Bot, Dispatcher
+from aiogram.filters import CommandStart
from aiogram.client.default import DefaultBotProperties
+from aiogram.types import Message, BotCommand
+from aiogram.utils.keyboard import InlineKeyboardBuilder
from aiogram.utils.callback_answer import CallbackAnswerMiddleware
from aiogram.utils.i18n import I18n
from fastapi import FastAPI
@@ -12,19 +18,25 @@ from .fsm.db_storage import DbStorage
from .middleware.telegram import AuthMiddleware, I18nMiddleware, ResetStateMiddleware
from .model.user import UserBase
from .model.entity_metadata import EntityMetadata
+from .bot.handlers.user_handlers import Command, CommandCallbackContext
class QBotApp(FastAPI):
-
- bot: Bot
- dp: Dispatcher
- config: Config
-
- logger = getLogger(__name__)
+ """
+ Main class for the QBot application
+ """
def __init__[UserType: UserBase](self,
- user_class: type[UserType] | None = None,
+ user_class: Annotated[type[UserType], Doc(
+ "User class that will be used in the application"
+ )] | None = None,
config: Config | None = None,
+ bot_start: Annotated[Callable[[Annotated[Callable[[Message, Any], None], Doc(
+ "Default handler for the start command"
+ )], Message, Any], None], Doc(
+ "Handler for the start command"
+ )] | None = None,
+ bot_commands: list[Command] | None = None,
lifespan: Lifespan[AppType] | None = None,
*args,
**kwargs):
@@ -63,6 +75,9 @@ class QBotApp(FastAPI):
self.bot_auth_token = token_hex(128)
+ self.start_handler = bot_start
+ self.bot_commands = {c.name: c for c in bot_commands or []}
+
from .lifespan import default_lifespan
super().__init__(lifespan = default_lifespan, *args, **kwargs)
@@ -70,3 +85,26 @@ class QBotApp(FastAPI):
from .api_route.telegram import router as telegram_router
self.include_router(telegram_router, prefix = "/api/telegram", tags = ["telegram"])
+ @override
+ def bot_command(self, command: Command): ...
+
+ @override
+ def bot_command(self, command: str, caption: str | dict[str, str] | None = None): ...
+
+ def bot_command(self, command: str | Command, caption: str | dict[str, str] | None = None):
+ """
+ Decorator for registering bot commands
+ """
+
+ def decorator(func: Callable[[CommandCallbackContext], None]):
+
+ if isinstance(command, str):
+ command = Command(name = command, handler = func, caption = caption)
+ self.bot_commands[command.name] = command
+
+ wraps(func)
+ def wrapper(*args, **kwargs):
+ return func(*args, **kwargs)
+ return wrapper
+
+ return decorator
\ No newline at end of file
diff --git a/model/bot_entity.py b/model/bot_entity.py
index 33fb6b6..16ccc4d 100644
--- a/model/bot_entity.py
+++ b/model/bot_entity.py
@@ -1,7 +1,8 @@
from functools import wraps
-from typing import ClassVar, cast, get_args, get_origin
+from types import NoneType, UnionType
+from typing import ClassVar, ForwardRef, Optional, Union, cast, get_args, get_origin
from pydantic import BaseModel
-from sqlmodel import SQLModel, BIGINT, Field, select, func
+from sqlmodel import SQLModel, BIGINT, Field, select, func, column
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.main import SQLModelMetaclass, RelationshipInfo
@@ -53,24 +54,36 @@ class BotEntityMetaclass(SQLModelMetaclass):
type_ = namespace['__annotations__'][annotation]
+ type_origin = get_origin(type_)
+
field_descriptor = EntityFieldDescriptor(
name = descriptor_name,
field_name = annotation,
type_ = type_,
+ type_base = type_,
**descriptor_kwargs)
- type_origin = get_origin(type_)
-
is_list = False
if type_origin == list:
- is_list = True
- type_ = get_args(type_)[0]
+ field_descriptor.is_list = is_list = True
+ field_descriptor.type_base = type_ = get_args(type_)[0]
+
+ if type_origin == Union and isinstance(get_args(type_)[0], ForwardRef):
+ field_descriptor.is_optional = True
+ field_descriptor.type_base = type_ = get_args(type_)[0].__forward_arg__
+
+ if type_origin == UnionType and get_args(type_)[1] == NoneType:
+ field_descriptor.is_optional = True
+ field_descriptor.type_base = type_ = get_args(type_)[0]
if isinstance(type_, str):
type_not_found = True
for entity_descriptor in EntityMetadata().entity_descriptors.values():
if type_ == entity_descriptor.class_name:
- field_descriptor.type_ = list[entity_descriptor.type_] if is_list else entity_descriptor.type_
+ field_descriptor.type_ = (list[entity_descriptor.type_] if is_list
+ else Optional[entity_descriptor.type_] if type_origin == Optional
+ else entity_descriptor.type_ | None if (type_origin == UnionType and get_args(type_)[1] == NoneType)
+ else entity_descriptor.type_)
type_not_found = False
break
if type_not_found:
@@ -131,8 +144,12 @@ class BotEntityMetaclass(SQLModelMetaclass):
if name in mcs.__future_references__:
for field_descriptor in mcs.__future_references__[name]:
- field_descriptor.type_ = list[type_] if get_origin(field_descriptor.type_) == list else type_
- a = field_descriptor
+ type_origin = get_origin(field_descriptor.type_)
+ field_descriptor.type_base = type_
+ field_descriptor.type_ = (list[type_] if get_origin(field_descriptor.type_) == list else
+ Optional[type_] if type_origin == Union and isinstance(get_args(field_descriptor.type_)[0], ForwardRef) else
+ type_ | None if type_origin == UnionType else
+ type_)
setattr(namespace["bot_entity_descriptor"], "type_", type_)
@@ -160,15 +177,19 @@ class BotEntity[CreateSchemaType: BaseModel,
session: AsyncSession | None = None,
id: int):
- return await session.get(cls, id)
+ return await session.get(cls, id, populate_existing = True)
@classmethod
@session_dep
async def get_count(cls, *,
- session: AsyncSession | None = None) -> int:
+ session: AsyncSession | None = None,
+ filter: str = None) -> int:
- return await session.scalar(select(func.count()).select_from(cls))
+ select_statement = select(func.count()).select_from(cls)
+ if filter:
+ select_statement = select_statement.where(column("name").ilike(f"%{filter}%"))
+ return await session.scalar(select_statement)
@classmethod
@@ -176,12 +197,15 @@ class BotEntity[CreateSchemaType: BaseModel,
async def get_multi(cls, *,
session: AsyncSession | None = None,
order_by = None,
+ filter:str = None,
skip: int = 0,
limit: int = None):
select_statement = select(cls).offset(skip)
if limit:
select_statement = select_statement.limit(limit)
+ if filter:
+ select_statement = select_statement.where(column("name").ilike(f"%{filter}%"))
if order_by:
select_statement = select_statement.order_by(order_by)
return (await session.exec(select_statement)).all()
@@ -238,4 +262,5 @@ class BotEntity[CreateSchemaType: BaseModel,
if commit:
await session.commit()
return obj
- return None
\ No newline at end of file
+ return None
+
diff --git a/model/descriptors.py b/model/descriptors.py
index be7cb77..c9bc222 100644
--- a/model/descriptors.py
+++ b/model/descriptors.py
@@ -1,10 +1,13 @@
-from typing import Any, Callable
+from typing import Any, Callable, TYPE_CHECKING
from babel.support import LazyProxy
from dataclasses import dataclass, field
from .role import RoleBase
from . import EntityPermission
+if TYPE_CHECKING:
+ from .bot_entity import BotEntity
+
EntityCaptionCallable = Callable[["EntityDescriptor"], str]
EntityItemCaptionCallable = Callable[["EntityDescriptor", Any], str]
EntityFieldCaptionCallable = Callable[["EntityFieldDescriptor", Any, Any], str]
@@ -13,18 +16,14 @@ EntityFieldCaptionCallable = Callable[["EntityFieldDescriptor", Any, Any], str]
@dataclass(kw_only = True)
class _BaseEntityFieldDescriptor():
icon: str = None
- caption_str: str | LazyProxy | EntityFieldCaptionCallable | None = None
- caption_btn: str | LazyProxy | EntityFieldCaptionCallable | None = None
+ caption: str | LazyProxy | EntityFieldCaptionCallable | None = None
description: str | LazyProxy | EntityFieldCaptionCallable | None = None
edit_prompt: str | LazyProxy | EntityFieldCaptionCallable | None = None
- caption_value_str: str | LazyProxy | EntityFieldCaptionCallable | None = None
- caption_value_btn: str | LazyProxy | EntityFieldCaptionCallable | None = None
+ caption_value: EntityFieldCaptionCallable | None = None
is_visible: bool = True
localizable: bool = False
bool_false_value: str | LazyProxy = "no"
- bool_false_value_btn: str | LazyProxy = "no"
bool_true_value: str | LazyProxy = "yes"
- bool_true_value_btn: str | LazyProxy = "yes"
default: Any = None
@@ -44,6 +43,9 @@ class EntityFieldDescriptor(_BaseEntityFieldDescriptor):
name: str
field_name: str
type_: type
+ type_base: type = None
+ is_list: bool = False
+ is_optional: bool = False
entity_descriptor: "EntityDescriptor" = None
def __hash__(self):
@@ -54,14 +56,14 @@ class EntityFieldDescriptor(_BaseEntityFieldDescriptor):
class _BaseEntityDescriptor:
icon: str = "📘"
- caption_msg: str | LazyProxy | EntityCaptionCallable | None = None
- caption_btn: str | LazyProxy | EntityCaptionCallable | None = None
+ caption: str | LazyProxy | EntityCaptionCallable | None = None
+ caption_plural: str | LazyProxy | EntityCaptionCallable | None = None
description: str | LazyProxy | EntityCaptionCallable | None = None
- item_caption_msg: EntityItemCaptionCallable | None = None
- item_caption_btn: EntityItemCaptionCallable | None = None
+ item_caption: EntityItemCaptionCallable | None = None
show_in_entities_menu: bool = True
field_sequence: list[str] = None
- edit_buttons: list[list[str]] = None
+ edit_button_visible: bool = True
+ edit_buttons: list[list[str | tuple[str, str | LazyProxy | EntityFieldCaptionCallable]]] = None
permissions: dict[EntityPermission, list[RoleBase]] = field(default_factory = lambda: {
EntityPermission.LIST: [RoleBase.DEFAULT_USER, RoleBase.SUPER_USER],
EntityPermission.READ: [RoleBase.DEFAULT_USER, RoleBase.SUPER_USER],
@@ -87,5 +89,5 @@ class EntityDescriptor(_BaseEntityDescriptor):
name: str
class_name: str
- type_: type
- fields_descriptors: dict[str, EntityFieldDescriptor]
\ No newline at end of file
+ type_: type["BotEntity"]
+ fields_descriptors: dict[str, EntityFieldDescriptor]
diff --git a/model/menu.py b/model/menu.py
new file mode 100644
index 0000000..e37b46b
--- /dev/null
+++ b/model/menu.py
@@ -0,0 +1,26 @@
+# from aiogram.types import Message, CallbackQuery
+# from aiogram.utils.keyboard import InlineKeyboardBuilder
+# from typing import Any, Callable, Self, Union, overload
+# from babel.support import LazyProxy
+# from dataclasses import dataclass
+
+# from ..bot.handlers.context import ContextData
+
+
+# class Menu:
+
+# @overload
+# def __init__(self, description: str | LazyProxy): ...
+
+
+# @overload
+# def __init__(self, menu_factory: Callable[[InlineKeyboardBuilder, Union[Message, CallbackQuery], Any], str]): ...
+
+
+# def __init__(self, description: str | LazyProxy = None,
+# menu_factory: Callable[[InlineKeyboardBuilder, Union[Message, CallbackQuery], Any], str] = None) -> None:
+
+# self.menu_factory = menu_factory
+# self.description = description
+# self.parent: Menu = None
+# self.items: list[list[Menu]] = []
\ No newline at end of file
diff --git a/model/owned_bot_entity.py b/model/owned_bot_entity.py
index 1156617..dfbc546 100644
--- a/model/owned_bot_entity.py
+++ b/model/owned_bot_entity.py
@@ -1,4 +1,4 @@
-from sqlmodel import BIGINT, Field, select, func
+from sqlmodel import BIGINT, Field, select, func, column
from sqlmodel.ext.asyncio.session import AsyncSession
@@ -20,11 +20,14 @@ class OwnedBotEntity(BotEntity, table = False):
async def get_multi_by_user(cls, *,
session: AsyncSession | None = None,
user_id: int,
+ filter: str = None,
order_by = None,
skip: int = 0,
limit: int = None):
select_statement = select(cls).where(cls.user_id == user_id).offset(skip)
+ if filter:
+ select_statement = select_statement.where(column("name").ilike(f"%{filter}%"))
if limit:
select_statement = select_statement.limit(limit)
if order_by:
@@ -36,9 +39,11 @@ class OwnedBotEntity(BotEntity, table = False):
@session_dep
async def get_count_by_user(cls, *,
session: AsyncSession | None = None,
- user_id: int):
+ user_id: int,
+ filter: str = None) -> int:
- return await session.scalar(
- select(func.count()).
- select_from(cls).
- where(cls.user_id == user_id))
\ No newline at end of file
+ select_statement = select(func.count()).select_from(cls).where(cls.user_id == user_id)
+ if filter:
+ select_statement = select_statement.where(column("name").ilike(f"%{filter}%"))
+
+ return await session.scalar(select_statement)
\ No newline at end of file
diff --git a/model/settings.py b/model/settings.py
index 6f42477..215a5d8 100644
--- a/model/settings.py
+++ b/model/settings.py
@@ -1,12 +1,16 @@
+from types import NoneType, UnionType
+from aiogram.utils.i18n.context import get_i18n
from datetime import datetime
from sqlmodel import SQLModel, Field, select
-from typing import Any, get_origin
+from typing import Any, get_args, get_origin
from ..db import async_session
from .role import RoleBase
from .descriptors import EntityFieldDescriptor, Setting
from ..utils import deserialize, serialize
+import ujson as json
+
class DbSettings(SQLModel, table = True):
__tablename__ = "settings"
@@ -31,23 +35,40 @@ class SettingsMetaclass(type):
attr_value = attributes.get(annotation)
name = annotation
+ type_ = attributes['__annotations__'][annotation]
+
if isinstance(attr_value, Setting):
descriptor_kwargs = attr_value.__dict__.copy()
name = descriptor_kwargs.pop("name") or annotation
attributes[annotation] = EntityFieldDescriptor(
name = name,
field_name = annotation,
- type_ = attributes['__annotations__'][annotation],
+ type_ = type_,
+ type_base = type_,
**descriptor_kwargs)
else:
attributes[annotation] = EntityFieldDescriptor(
name = annotation,
field_name = annotation,
- type_ = attributes['__annotations__'][annotation],
+ type_ = type_,
+ type_base = type_,
default = attr_value)
+ type_origin = get_origin(type_)
+
+ if type_origin == list:
+ attributes[annotation].is_list = True
+ attributes[annotation].type_base = type_ = get_args(type_)[0]
+
+ elif type_origin == UnionType and get_args(type_)[1] == NoneType:
+ attributes[annotation].is_optional = True
+ attributes[annotation].type_base = type_ = get_args(type_)[0]
+
settings_descriptors[name] = attributes[annotation]
+
+ if base_classes and base_classes[0].__name__ == "Settings" and hasattr(base_classes[0], annotation):
+ setattr(base_classes[0], annotation, attributes[annotation])
attributes["__annotations__"] = {}
attributes["_settings_descriptors"] = settings_descriptors
@@ -61,8 +82,7 @@ class Settings(metaclass = SettingsMetaclass):
_settings_descriptors: dict[str, EntityFieldDescriptor] = {}
PAGE_SIZE: int = Setting(default = 10, )
-
- SECURITY_SETTINGS_ROLES: list[RoleBase] = [RoleBase.SUPER_USER]
+ SECURITY_PARAMETERS_ROLES: list[RoleBase] = Setting(name = "SECPARAMS_ROLES", default = [RoleBase.SUPER_USER], is_visible = False)
APP_STRINGS_WELCOME_P_NAME: str = Setting(name = "AS_WELCOME", default = "Welcome, {name}", is_visible = False)
APP_STRINGS_GREETING_P_NAME: str = Setting(name = "AS_GREETING", default = "Hello, {name}", is_visible = False)
@@ -90,6 +110,7 @@ class Settings(metaclass = SettingsMetaclass):
APP_STRINGS_YES_BTN: str = Setting(name = "AS_YES_BTN", default = "✅ Yes", is_visible = False)
APP_STRINGS_NO_BTN: str = Setting(name = "AS_NO_BTN", default = "❌ No", is_visible = False)
APP_STRINGS_CANCEL_BTN: str = Setting(name = "AS_CANCEL_BTN", default = "❌ Cancel", is_visible = False)
+ APP_STRINGS_CLEAR_BTN: str = Setting(name = "AS_CLEAR_BTN", default = "⌫ Clear", is_visible = False)
APP_STRINGS_DONE_BTN: str = Setting(name = "AS_DONE_BTN", default = "✅ Done", is_visible = False)
APP_STRINGS_SKIP_BTN: str = Setting(name = "AS_SKIP_BTN", default = "⏩️ Skip", is_visible = False)
APP_STRINGS_FIELD_EDIT_PROMPT_TEMPLATE_P_NAME_VALUE: str = Setting(
@@ -104,18 +125,30 @@ class Settings(metaclass = SettingsMetaclass):
name = "AS_STREDIT_LOC_TEMPLATE",
default = "string for \"{name}\"",
is_visible = False)
+ APP_STRINGS_VIEW_FILTER_EDIT_PROMPT: str = Setting(name = "AS_FILTEREDIT_PROMPT", default = "Enter filter value", is_visible = False)
APP_STRINGS_INVALID_INPUT: str = Setting(name = "AS_INVALID_INPUT", default = "Invalid input", is_visible = False)
@classmethod
- async def get[T](cls, param: T) -> T:
+ async def get[T](cls, param: T, all_locales = False, locale: str = None) -> T:
name = param.field_name
- if param.name not in cls._cache.keys():
+ if name not in cls._cache.keys():
cls._cache[name] = await cls.load_param(param)
- return cls._cache[name]
+ ret_val = cls._cache[name]
+
+ if param.localizable and not all_locales:
+ if not locale:
+ locale = get_i18n().current_locale
+ try:
+ obj = json.loads(ret_val)
+ except:
+ return ret_val
+ return obj.get(locale, obj[list(obj.keys())[0]])
+
+ return ret_val
@classmethod
@@ -180,4 +213,4 @@ class Settings(metaclass = SettingsMetaclass):
async def get_params(cls) -> dict[EntityFieldDescriptor, Any]:
params = cls.list_params()
- return {param: await cls.get(param) for _, param in params.items()}
\ No newline at end of file
+ return {param: await cls.get(param, all_locales = True) for _, param in params.items()}
\ No newline at end of file
diff --git a/model/user.py b/model/user.py
index 03edf03..889e802 100644
--- a/model/user.py
+++ b/model/user.py
@@ -7,6 +7,7 @@ from .role import RoleBase
from .settings import DbSettings as DbSettings
from .fsm_storage import FSMStorage as FSMStorage
+from .view_setting import ViewSetting as ViewSetting
class UserBase(BotEntity, table = False):
diff --git a/model/view_setting.py b/model/view_setting.py
new file mode 100644
index 0000000..8ecaec7
--- /dev/null
+++ b/model/view_setting.py
@@ -0,0 +1,39 @@
+from sqlmodel import SQLModel, Field, BIGINT
+from sqlalchemy.ext.asyncio.session import AsyncSession
+
+from . import session_dep
+
+
+class ViewSetting(SQLModel, table = True):
+
+ __tablename__ = "view_setting"
+ user_id: int = Field(sa_type = BIGINT, primary_key = True, foreign_key="user.id", ondelete="CASCADE")
+ entity_name: str = Field(primary_key = True)
+ filter: str | None = None
+
+
+ @classmethod
+ @session_dep
+ async def get_filter(cls, *,
+ session: AsyncSession | None = None,
+ user_id: int,
+ entity_name: str):
+
+ setting = await session.get(cls, (user_id, entity_name))
+ return setting.filter if setting else None
+
+ @classmethod
+ @session_dep
+ async def set_filter(cls, *,
+ session: AsyncSession | None = None,
+ user_id: int,
+ entity_name: str,
+ filter: str):
+
+ setting = await session.get(cls, (user_id, entity_name))
+ if setting:
+ setting.filter = filter
+ else:
+ setting = cls(user_id = user_id, entity_name = entity_name, filter = filter)
+ session.add(setting)
+ await session.commit()
\ No newline at end of file
diff --git a/utils/__init__.py b/utils/__init__.py
index fbe79eb..e23e0b5 100644
--- a/utils/__init__.py
+++ b/utils/__init__.py
@@ -3,7 +3,7 @@ from decimal import Decimal
from types import NoneType, UnionType
from sqlmodel import select, column
from sqlmodel.ext.asyncio.session import AsyncSession
-from typing import Any, get_origin, get_args, TYPE_CHECKING
+from typing import Any, Union, get_origin, get_args, TYPE_CHECKING
import ujson as json
from ..model.bot_entity import BotEntity
@@ -18,7 +18,7 @@ if TYPE_CHECKING:
async def deserialize[T](session: AsyncSession, type_: type[T], value: str = None) -> T:
type_origin = get_origin(type_)
is_optional = False
- if type_origin == UnionType:
+ if type_origin in [UnionType, Union]:
args = get_args(type_)
if args[1] == NoneType:
type_ = args[0]
@@ -45,6 +45,8 @@ async def deserialize[T](session: AsyncSession, type_: type[T], value: str = Non
else:
return values
elif issubclass(type_, BotEntity):
+ if is_optional and not value:
+ return None
return await session.get(type_, int(value))
elif issubclass(type_, BotEnum):
if is_optional and not value:
@@ -70,23 +72,15 @@ def serialize(value: Any, field_descriptor: EntityFieldDescriptor) -> str:
if value is None:
return ""
- type_ = field_descriptor.type_
- type_origin = get_origin(type_)
- if type_origin == UnionType:
- args = get_args(type_)
- if args[1] == NoneType:
- type_ = get_args(type_)[0]
- if type_origin == list:
- arg_type = None
- args = get_args(type_)
- if args:
- arg_type = args[0]
- if arg_type and issubclass(arg_type, BotEntity):
- return json.dumps([item.id for item in value])
- elif arg_type and issubclass(arg_type, BotEnum):
- return json.dumps([item.value for item in value])
+ type_ = field_descriptor.type_base
+
+ if field_descriptor.is_list:
+ if issubclass(type_, BotEntity):
+ return json.dumps([item.id for item in value], ensure_ascii = False)
+ elif issubclass(type_, BotEnum):
+ return json.dumps([item.value for item in value], ensure_ascii = False)
else:
- return json.dumps(value)
+ return json.dumps(value, ensure_ascii = False)
elif issubclass(type_, BotEntity):
return str(value.id) if value else ""
return str(value)
@@ -100,4 +94,13 @@ def get_user_permissions(user: "UserBase", entity_descriptor: EntityDescriptor)
if role in user.roles:
permissions.append(permission)
break
- return permissions
\ No newline at end of file
+ return permissions
+
+
+def get_local_text(text: str, locale: str) -> str:
+ try:
+ obj = json.loads(text) #@IgnoreException
+ except:
+ return text
+ else:
+ return obj.get(locale, obj[list(obj.keys())[0]])
\ No newline at end of file