This commit is contained in:
Alexander Kalinovsky
2025-01-04 12:00:12 +01:00
commit 6dbe0536ca
94 changed files with 3467 additions and 0 deletions

1
__init__.py Normal file
View File

@@ -0,0 +1 @@
from .main import QBotApp as QBotApp, Config as Config

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

0
api_route/model.py Normal file
View File

37
api_route/telegram.py Normal file
View File

@@ -0,0 +1,37 @@
from typing import Annotated
from fastapi import APIRouter, Depends, Request, Response
from sqlmodel.ext.asyncio.session import AsyncSession
from ..main import QBotApp
from ..db import get_db
from aiogram.types import Update
from logging import getLogger
logger = getLogger(__name__)
logger.setLevel("DEBUG")
router = APIRouter()
@router.post("/webhook")
async def telegram_webhook(db_session: Annotated[AsyncSession, Depends(get_db)],
request: Request):
logger.debug("Webhook request %s", await request.json())
app = request.app #type: QBotApp
request_token = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if request_token != app.bot_auth_token:
logger.warning("Unauthorized request %s", request)
return Response(status_code = 403)
try:
update = Update(**await request.json())
except:
logger.error("Invalid request", exc_info = True)
return Response(status_code = 400)
try:
await app.dp.feed_webhook_update(app.bot, update, db_session = db_session, app = app)
except:
logger.error("Error processing update", exc_info = True)
return Response(status_code = 200)

Binary file not shown.

View File

@@ -0,0 +1,29 @@
from aiogram.filters import Filter
from aiogram.fsm.context import FSMContext
from .handlers.context import CallbackCommand, ContextData
from logging import getLogger
logger = getLogger(__name__)
class CallbackCommandFilter(Filter):
def __init__(self, command: CallbackCommand):
self.command = command
async def __call__(self, *args, **kwargs):
state: FSMContext = kwargs.get("state")
state_data = await state.get_data()
context_data = state_data.get("context_data")
if context_data:
try:
context_data = ContextData.unpack(context_data)
except Exception as e:
logger.error(f"Error unpacking context data", exc_info = True)
return False
else:
return context_data.command == self.command
return False

0
bot/handlers/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,195 @@
from types import NoneType, UnionType
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import Any, get_args, get_origin
import ujson as json
from ..context import ContextData, CallbackCommand, CommandContext
from ....model.user import UserBase
from ....model.settings import Settings
from ....main import QBotApp
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.descriptors import (EntityFieldDescriptor,
EntityDescriptor,
EntityCaptionCallable,
EntityItemCaptionCallable,
EntityFieldCaptionCallable)
def get_send_message(message: Message | CallbackQuery):
if isinstance(message, Message):
return message.answer
else:
return message.message.edit_text
def get_local_text(text: str, lang: str):
try:
text_obj = json.loads(text) #@IgnoreException
return text_obj.get(lang, text_obj[list(text_obj.keys())[0]])
except:
return text
def get_value_repr(value: Any, field_descriptor: EntityFieldDescriptor, locale: str | None = None) -> str:
type_ = field_descriptor.type_
origin = get_origin(type_)
if value is None:
return ""
if origin == UnionType:
args = get_args(type_)
if args[1] == NoneType:
type_ = args[0]
if isinstance(value, bool):
return "【✔︎】" if value else "【 】"
elif origin == list:
arg_type = None
args = get_args(type_)
if args:
arg_type = args[0]
if arg_type and issubclass(arg_type, BotEntity):
if locale and arg_type.bot_entity_descriptor.fields_descriptors["name"].localizable:
return "[" + ", ".join([get_local_text(value = item.name, locale = locale) for item in value]) + "]"
else:
return "[" + ", ".join([str(item.name) for item in value]) + "]"
elif arg_type and issubclass(arg_type, BotEnum):
return "[" + ", ".join(item.localized(locale) for item in value) + "]"
elif arg_type == str:
return "[" + ", ".join([f"\"{item}\"" for item in value]) + "]"
else:
return "[" + ", ".join([str(item) for item in value]) + "]"
elif issubclass(type_, BotEntity):
if type_.bot_entity_descriptor.fields_descriptors["name"].localizable:
return get_local_text(value = value.name, locale = locale)
return value.name
elif issubclass(type_, BotEnum):
return value.localized(locale)
elif isinstance(value, str):
if field_descriptor and field_descriptor.localizable:
return get_local_text(value, locale)
return value
elif isinstance(value, int):
return str(value)
elif isinstance(value, float):
return str(value)
else:
return str(value)
def get_callable_str(callable_str: str | LazyProxy | EntityCaptionCallable | EntityItemCaptionCallable | EntityFieldCaptionCallable,
descriptor: EntityFieldDescriptor | EntityDescriptor,
entity: Any = None,
value: Any = None) -> str:
if isinstance(callable_str, str):
return callable_str
elif isinstance(callable_str, LazyProxy):
return callable_str.value
elif callable(callable_str):
return callable_str(*(descriptor, entity, value))
async def authorize_command(user: UserBase,
callback_data: ContextData):
if (callback_data.command == CallbackCommand.MENU_ENTRY_PARAMETERS or
callback_data.context == CommandContext.SETTING_EDIT):
allowed_roles = (await Settings.get(Settings.SECURITY_SETTINGS_ROLES))
return any(role in user.roles for role in allowed_roles)
return False
def get_entity_descriptor(app: QBotApp, callback_data: ContextData) -> EntityDescriptor | None:
if callback_data.entity_name:
return app.entity_metadata.entity_descriptors[callback_data.entity_name]
return None
def get_field_descriptor(app: QBotApp, callback_data: ContextData) -> EntityFieldDescriptor | None:
if callback_data.context == CommandContext.SETTING_EDIT:
return Settings.list_params()[callback_data.field_name]
elif callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]:
entity_descriptor = get_entity_descriptor(app, callback_data)
if entity_descriptor:
return entity_descriptor.fields_descriptors.get(callback_data.field_name)
return None
def add_pagination_controls(keyboard_builder: InlineKeyboardBuilder,
callback_data: ContextData,
total_pages: int,
command: CallbackCommand,
page: int):
if total_pages > 1:
navigation_buttons = []
ContextData(**callback_data.model_dump()).__setattr__
if total_pages > 10:
navigation_buttons.append(InlineKeyboardButton(text = "⏮️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = "1" if page != 1 else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = "⏪️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(max(page - 10, 1)) if page > 1 else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = f"◀️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(max(page - 1, 1)) if page > 1 else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = f"▶️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(min(page + 1, total_pages)) if page < total_pages else "skip",
save_state = True).pack()))
if total_pages > 10:
navigation_buttons.append(InlineKeyboardButton(text = "⏩️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(min(page + 10, total_pages)) if page < total_pages else "skip",
save_state = True).pack()))
navigation_buttons.append(InlineKeyboardButton(text = "⏭️",
callback_data = ContextData(
command = command,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(total_pages) if page != total_pages else "skip",
save_state = True).pack()))
keyboard_builder.row(*navigation_buttons)

36
bot/handlers/context.py Normal file
View File

@@ -0,0 +1,36 @@
from aiogram.filters.callback_data import CallbackData as BaseCallbackData
from enum import StrEnum
class CallbackCommand(StrEnum):
FIELD_EDITOR = "fe"
FIELD_EDITOR_CALLBACK = "fc"
ENTITY_LIST = "el"
ENTITY_ITEM = "ei"
ENTITY_DELETE = "ed"
MENU_ENTRY_MAIN = "mm"
MENU_ENTRY_SETTINGS = "ms"
MENU_ENTRY_ENTITIES = "me"
MENU_ENTRY_PARAMETERS = "mp"
MENU_ENTRY_LANGUAGE = "ml"
SET_LANGUAGE = "ls"
DATE_PICKER_MONTH = "dm"
DATE_PICKER_YEAR = "dy"
STRING_EDITOR_LOCALE = "sl"
ENTITY_PICKER_PAGE = "ep"
ENTITY_PICKER_TOGGLE_ITEM = "et"
class CommandContext(StrEnum):
SETTING_EDIT = "se"
ENTITY_CREATE = "ec"
ENTITY_EDIT = "ee"
class ContextData(BaseCallbackData, prefix = "cd"):
command: CallbackCommand
context: CommandContext | None = None
entity_name: str | None = None
entity_id: int | None = None
field_name: str | None = None
data: str | None = None
back: bool = False

View File

@@ -0,0 +1,371 @@
from datetime import datetime
from decimal import Decimal
from types import NoneType, UnionType
from typing import get_args, get_origin
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
import ujson as json
from ....main import QBotApp
from ....model import EntityPermission
from ....model.bot_entity import BotEntity
from ....model.owned_bot_entity import OwnedBotEntity
from ....model.bot_enum import BotEnum
from ....model.language import LanguageBase
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.descriptors import EntityFieldDescriptor
from ....utils import deserialize, get_user_permissions, serialize
from ...command_context_filter import CallbackCommandFilter
from ..context import ContextData, CallbackCommand, CommandContext
from ..common import (get_value_repr, authorize_command, get_callable_str,
get_entity_descriptor, get_field_descriptor)
from ..menu.parameters import parameters_menu
from .string import string_editor, router as string_editor_router
from .date import date_picker, router as date_picker_router
from .boolean import bool_editor, router as bool_editor_router
from .entity import entity_picker, router as entity_picker_router
logger = getLogger(__name__)
router = Router()
router.include_routers(
string_editor_router,
date_picker_router,
bool_editor_router,
entity_picker_router,
)
@router.callback_query(ContextData.filter(F.command == CallbackCommand.FIELD_EDITOR))
async def settings_field_editor(message: Message | CallbackQuery, **kwargs):
callback_data: ContextData = kwargs.get("callback_data", None)
db_session: AsyncSession = kwargs["db_session"]
user: UserBase = kwargs["user"]
app: QBotApp = kwargs["app"]
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
entity_data = state_data.get("entity_data")
for key in ["current_value", "value", "locale_index"]:
if key in state_data:
state_data.pop(key)
await state.clear()
await state.update_data(state_data)
entity_descriptor = None
if callback_data.context == CommandContext.SETTING_EDIT:
field_descriptor = get_field_descriptor(app, callback_data)
if field_descriptor.type_ == bool:
if await authorize_command(user = user, callback_data = callback_data):
await Settings.set_param(field_descriptor, not await Settings.get(field_descriptor))
else:
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
stack, context = await get_navigation_context(state = state)
return await parameters_menu(message = message,
navigation_stack = stack,
**kwargs)
current_value = await Settings.get(field_descriptor)
else:
entity_descriptor = get_entity_descriptor(app, callback_data)
field_descriptor = get_field_descriptor(app, callback_data)
current_value = None
if not entity_data and callback_data.context == CommandContext.ENTITY_EDIT:
if (EntityPermission.READ_ALL in get_user_permissions(user, entity_descriptor) or
(EntityPermission.READ in get_user_permissions(user, entity_descriptor) and
not issubclass(entity_descriptor.type_, OwnedBotEntity)) or
(EntityPermission.READ in get_user_permissions(user, entity_descriptor) and
issubclass(entity_descriptor.type_, OwnedBotEntity) and
entity_data.user_id == user.id)):
entity = await entity_descriptor.type_.get(session = kwargs["db_session"], id = int(callback_data.entity_id))
if entity:
entity_data = {key: serialize(getattr(entity, key), entity_descriptor.fields_descriptors[key]) for key in entity_descriptor.field_sequence}
await state.update_data({"entity_data": entity_data})
if entity_data:
current_value = await deserialize(session = db_session,
type_= field_descriptor.type_,
value = entity_data.get(callback_data.field_name))
await show_editor(message = message,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
current_value = current_value,
**kwargs)
async def show_editor(message: Message | CallbackQuery,
**kwargs):
field_descriptor: EntityFieldDescriptor = kwargs["field_descriptor"]
current_value = kwargs["current_value"]
user: UserBase = kwargs["user"]
callback_data: ContextData = kwargs.get("callback_data", None)
state: FSMContext = kwargs["state"]
value_type = field_descriptor.type_
if field_descriptor.edit_prompt:
edit_prompt = get_callable_str(field_descriptor.edit_prompt, field_descriptor, None, current_value)
else:
if field_descriptor.caption_str:
caption_str = get_callable_str(field_descriptor.caption_str, field_descriptor, None, current_value)
else:
caption_str = field_descriptor.name
if callback_data.context == CommandContext.ENTITY_EDIT:
edit_prompt = (await Settings.get(Settings.APP_STRINGS_FIELD_EDIT_PROMPT_TEMPLATE_P_NAME_VALUE)).format(
name = caption_str, value = get_value_repr(current_value, field_descriptor, user.lang))
else:
edit_prompt = (await Settings.get(Settings.APP_STRINGS_FIELD_CREATE_PROMPT_TEMPLATE_P_NAME)).format(
name = caption_str)
kwargs["edit_prompt"] = edit_prompt
type_origin = get_origin(value_type)
if type_origin == UnionType:
args = get_args(value_type)
if args[1] == NoneType:
value_type = args[0]
if value_type not in [int, float, Decimal, str]:
await state.update_data({"context_data": callback_data.pack()})
if value_type == str:
await string_editor(message = message, **kwargs)
elif value_type == bool:
await bool_editor(message = message, **kwargs)
elif value_type in [int, float, Decimal, str]:
await string_editor(message = message, **kwargs)
elif value_type == datetime:
await date_picker(message = message, **kwargs)
elif type_origin == list:
type_args = get_args(value_type)
if type_args and issubclass(type_args[0], BotEntity) or issubclass(type_args[0], BotEnum):
await entity_picker(message = message, **kwargs)
else:
await string_editor(message = message, **kwargs)
elif issubclass(value_type, BotEntity) or issubclass(value_type, BotEnum):
await entity_picker(message = message, **kwargs)
else:
raise ValueError(f"Unsupported field type: {value_type}")
@router.message(CallbackCommandFilter(CallbackCommand.FIELD_EDITOR_CALLBACK))
@router.callback_query(ContextData.filter(F.command == CallbackCommand.FIELD_EDITOR_CALLBACK))
async def field_editor_callback(message: Message | CallbackQuery, **kwargs):
callback_data: ContextData = kwargs.get("callback_data", None)
app: QBotApp = kwargs["app"]
state: FSMContext = kwargs["state"]
state_data = await state.get_data()
if isinstance(message, Message):
context_data = state_data.get("context_data")
if context_data:
context_data = ContextData.unpack(context_data)
callback_data = context_data
value = message.text
field_descriptor = get_field_descriptor(app, callback_data)
base_type = field_descriptor.type_
if get_origin(base_type) == UnionType:
args = get_args(base_type)
if args[1] == NoneType:
base_type = args[0]
if base_type == str and field_descriptor.localizable:
locale_index = int(state_data.get("locale_index"))
if locale_index < len(LanguageBase.all_members.values()) - 1:
#entity_data = state_data.get("entity_data", {})
#current_value = entity_data.get(field_descriptor.field_name)
current_value = state_data.get("current_value")
value = state_data.get("value")
if value:
value = json.loads(value)
else:
value = {}
value[list(LanguageBase.all_members.values())[locale_index]] = message.text
value = json.dumps(value)
await state.update_data({"value": value})
entity_descriptor = get_entity_descriptor(app, callback_data)
kwargs.update({"callback_data": callback_data})
return await show_editor(message = message,
locale_index = locale_index + 1,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
current_value = current_value,
value = value,
**kwargs)
else:
value = state_data.get("value")
if value:
value = json.loads(value)
else:
value = {}
value[list(LanguageBase.all_members.values())[locale_index]] = message.text
value = json.dumps(value)
elif (base_type in [int, float, Decimal]):
try:
_ = base_type(value) #@IgnoreException
except:
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_INVALID_INPUT)))
else:
if callback_data.data:
value = callback_data.data
else:
value = state_data.get("value")
field_descriptor = get_field_descriptor(app, callback_data)
kwargs.update({"callback_data": callback_data,})
await process_field_edit_callback(message = message,
value = value,
field_descriptor = field_descriptor,
**kwargs)
async def process_field_edit_callback(message: Message | CallbackQuery, **kwargs):
user: UserBase = kwargs["user"]
db_session: AsyncSession = kwargs["db_session"]
callback_data: ContextData = kwargs.get("callback_data", None)
state: FSMContext = kwargs["state"]
value = kwargs["value"]
field_descriptor: EntityFieldDescriptor = kwargs["field_descriptor"]
if callback_data.context == CommandContext.SETTING_EDIT:
await clear_state(state = state)
if callback_data.data != "cancel":
if await authorize_command(user = user, callback_data = callback_data):
value = await deserialize(session = db_session, type_ = field_descriptor.type_, value = value)
await Settings.set_param(field_descriptor, value)
else:
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
stack, context = await get_navigation_context(state = state)
return await parameters_menu(message = message,
navigation_stack = stack,
**kwargs)
elif callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]:
app: QBotApp = kwargs["app"]
entity_descriptor = get_entity_descriptor(app, callback_data)
field_sequence = entity_descriptor.field_sequence
current_index = field_sequence.index(callback_data.field_name)
state_data = await state.get_data()
entity_data = state_data.get("entity_data", {})
if current_index < len(field_sequence) - 1:
entity_data[field_descriptor.field_name] = value
await state.update_data({"entity_data": entity_data})
next_field_name = field_sequence[current_index + 1]
next_field_descriptor = entity_descriptor.fields_descriptors[next_field_name]
kwargs.update({"field_descriptor": next_field_descriptor})
callback_data.field_name = next_field_name
state_entity_val = entity_data.get(next_field_descriptor.field_name)
current_value = await deserialize(session = db_session, type_ = next_field_descriptor.type_,
value = state_entity_val) if state_entity_val else None
await show_editor(message = message,
entity_descriptor = entity_descriptor,
current_value = current_value,
**kwargs)
else:
entity_type: BotEntity = entity_descriptor.type_
user_permissions = get_user_permissions(user, entity_descriptor)
if ((callback_data.context == CommandContext.ENTITY_CREATE and
EntityPermission.CREATE not in user_permissions and
EntityPermission.CREATE_ALL not in user_permissions) or
(callback_data.context == CommandContext.ENTITY_EDIT and
EntityPermission.UPDATE not in user_permissions and
EntityPermission.UPDATE_ALL not in user_permissions)):
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
is_owned = issubclass(entity_type, OwnedBotEntity)
entity_data[field_descriptor.field_name] = value
if is_owned and EntityPermission.CREATE_ALL not in user_permissions:
entity_data["user_id"] = user.id
deser_entity_data = {key: await deserialize(
session = db_session,
type_ = entity_descriptor.fields_descriptors[key].type_,
value = value) for key, value in entity_data.items()}
if callback_data.context == CommandContext.ENTITY_CREATE:
new_entity = await entity_type.create(session = db_session,
obj_in = entity_type(**deser_entity_data),
commit = True)
await save_navigation_context(state = state, callback_data = ContextData(
command = CallbackCommand.ENTITY_ITEM,
entity_name = entity_descriptor.name,
entity_id = str(new_entity.id)
))
elif callback_data.context == CommandContext.ENTITY_EDIT:
entity_id = int(callback_data.entity_id)
entity = await entity_type.get(session = db_session, id = entity_id)
if not entity:
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_NOT_FOUND)))
if (is_owned and entity.user_id != user.id and
EntityPermission.UPDATE_ALL not in user_permissions):
return await message.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
for key, value in deser_entity_data.items():
setattr(entity, key, value)
await db_session.commit()
await clear_state(state = state)
await route_callback(message = message, back = False, **kwargs)
from ..navigation import get_navigation_context, route_callback, clear_state, save_navigation_context

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,71 @@
from aiogram import Router
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
from logging import getLogger
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ..context import ContextData, CallbackCommand
from ..common import get_send_message
from .common import wrap_editor
logger = getLogger(__name__)
router = Router()
async def bool_editor(message: Message | CallbackQuery,
edit_prompt: str,
entity_descriptor: EntityDescriptor,
field_descriptor: EntityFieldDescriptor,
callback_data: ContextData,
**kwargs):
keyboard_builder = InlineKeyboardBuilder()
if isinstance(field_descriptor.bool_true_value_btn, LazyProxy):
true_caption = field_descriptor.bool_true_value_btn.value
else:
true_caption = field_descriptor.bool_true_value_btn
if isinstance(field_descriptor.bool_false_value_btn, LazyProxy):
false_caption = field_descriptor.bool_false_value_btn.value
else:
false_caption = field_descriptor.bool_false_value_btn
keyboard_builder.row(
InlineKeyboardButton(text = true_caption,
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(True),
save_state = True).pack()),
InlineKeyboardButton(text = false_caption,
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = str(False),
save_state = True).pack())
)
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
callback_data = callback_data,
state = kwargs["state"])
send_message = get_send_message(message)
await send_message(text = edit_prompt, reply_markup = keyboard_builder.as_markup())

View File

@@ -0,0 +1,67 @@
from types import NoneType, UnionType
from typing import get_args, get_origin
from aiogram.fsm.context import FSMContext
from aiogram.types import InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ....model.settings import Settings
from ..context import ContextData, CallbackCommand, CommandContext
from ..navigation import get_navigation_context
async def wrap_editor(keyboard_builder: InlineKeyboardBuilder,
field_descriptor: EntityFieldDescriptor,
entity_descriptor: EntityDescriptor,
callback_data: ContextData,
state: FSMContext):
if callback_data.context in [CommandContext.ENTITY_CREATE, CommandContext.ENTITY_EDIT]:
btns = []
field_index = entity_descriptor.field_sequence.index(field_descriptor.name)
stack, context = await get_navigation_context(state)
if field_index > 0:
btns.append(InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = entity_descriptor.field_sequence[field_index - 1],
save_state = True).pack()))
if get_origin(field_descriptor.type_) == UnionType:
args = get_args(field_descriptor.type_)
if args[1] == NoneType:
btns.append(InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_SKIP_BTN)),
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
save_state = True).pack()))
keyboard_builder.row(*btns)
keyboard_builder.row(InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_CANCEL_BTN)),
callback_data = context.pack()))
elif callback_data.context == CommandContext.SETTING_EDIT:
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_CANCEL_BTN)),
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
field_name = callback_data.field_name,
data = "cancel").pack()))

View File

@@ -0,0 +1,167 @@
from datetime import datetime, timedelta
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from ....main import QBotApp
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ..context import ContextData, CallbackCommand
from ..common import get_send_message, get_field_descriptor, get_entity_descriptor
from .common import wrap_editor
logger = getLogger(__name__)
router = Router()
async def date_picker(message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
entity_descriptor: EntityDescriptor,
callback_data: ContextData,
current_value: datetime,
state: FSMContext,
edit_prompt: str | None = None,
**kwargs):
if not current_value:
start_date = datetime.now()
else:
start_date = current_value
start_date = start_date.replace(day = 1)
previous_month = start_date - timedelta(days = 1)
next_month = start_date.replace(day = 28) + timedelta(days = 4)
keyboard_builder = InlineKeyboardBuilder()
keyboard_builder.row(InlineKeyboardButton(text = "◀️",
callback_data = ContextData(
command = CallbackCommand.DATE_PICKER_MONTH,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = previous_month.strftime("%Y-%m-%d"),
save_state = True).pack()),
InlineKeyboardButton(text = start_date.strftime("%b %Y"),
callback_data = ContextData(
command = CallbackCommand.DATE_PICKER_YEAR,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = start_date.strftime("%Y-%m-%d"),
save_state = True).pack()),
InlineKeyboardButton(text = "▶️",
callback_data = ContextData(
command = CallbackCommand.DATE_PICKER_MONTH,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = next_month.strftime("%Y-%m-%d"),
save_state = True).pack()))
first_day = start_date - timedelta(days = start_date.weekday())
weeks = (((start_date.replace(day = 28) + timedelta(days = 4)).replace(day = 1) - first_day).days - 1) // 7 + 1
for week in range(weeks):
buttons = []
for day in range(7):
current_day = first_day + timedelta(days = week * 7 + day)
buttons.append(InlineKeyboardButton(text = current_day.strftime("%d"),
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = current_day.strftime("%Y-%m-%d"),
save_state = True).pack()))
keyboard_builder.row(*buttons)
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
callback_data = callback_data,
state = state)
if edit_prompt:
send_message = get_send_message(message)
await send_message(text = edit_prompt, reply_markup = keyboard_builder.as_markup())
else:
await message.edit_reply_markup(reply_markup = keyboard_builder.as_markup())
@router.callback_query(ContextData.filter(F.command == CallbackCommand.DATE_PICKER_YEAR))
async def date_picker_year(query: CallbackQuery,
callback_data: ContextData,
app: QBotApp,
state: FSMContext,
**kwargs):
start_date = datetime.strptime(callback_data.data, "%Y-%m-%d")
keyboard_builder = InlineKeyboardBuilder()
keyboard_builder.row(InlineKeyboardButton(text = "🔼",
callback_data = ContextData(
command = CallbackCommand.DATE_PICKER_YEAR,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = start_date.replace(year = start_date.year - 20).strftime("%Y-%m-%d"),
save_state = True).pack()))
for r in range(4):
buttons = []
for c in range(5):
current_date = start_date.replace(year = start_date.year + r * 5 + c - 10)
buttons.append(InlineKeyboardButton(text = current_date.strftime("%Y"),
callback_data = ContextData(
command = CallbackCommand.DATE_PICKER_MONTH,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = current_date.strftime("%Y-%m-%d"),
save_state = True).pack()))
keyboard_builder.row(*buttons)
keyboard_builder.row(InlineKeyboardButton(text = "🔽",
callback_data = ContextData(
command = CallbackCommand.DATE_PICKER_YEAR,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = start_date.replace(year = start_date.year + 20).strftime("%Y-%m-%d"),
save_state = True).pack()))
field_descriptor = get_field_descriptor(app, callback_data)
entity_descriptor = get_entity_descriptor(app, callback_data)
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
callback_data = callback_data,
state = state)
await query.message.edit_reply_markup(reply_markup = keyboard_builder.as_markup())
@router.callback_query(ContextData.filter(F.command == CallbackCommand.DATE_PICKER_MONTH))
async def date_picker_month(query: CallbackQuery, callback_data: ContextData, app: QBotApp, **kwargs):
entity_descriptor = get_entity_descriptor(app, callback_data)
field_descriptor = get_field_descriptor(app, callback_data)
await date_picker(query.message,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
callback_data = callback_data,
current_value = datetime.strptime(callback_data.data, "%Y-%m-%d"),
**kwargs)

View File

@@ -0,0 +1,193 @@
from types import UnionType
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import get_args, get_origin
from ....main import QBotApp
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ....utils import serialize, deserialize
from ..context import ContextData, CallbackCommand
from ..common import (get_send_message, get_local_text, get_field_descriptor,
get_entity_descriptor, add_pagination_controls)
from .common import wrap_editor
logger = getLogger(__name__)
router = Router()
async def entity_picker(message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
edit_prompt: str,
current_value: BotEntity | BotEnum | list[BotEntity] | list[BotEnum],
state: FSMContext,
**kwargs):
await state.update_data({"current_value": serialize(current_value, field_descriptor),
"value": serialize(current_value, field_descriptor),
"edit_prompt": edit_prompt})
await render_entity_picker(field_descriptor = field_descriptor,
message = message,
state = state,
current_value = current_value,
edit_prompt = edit_prompt,
**kwargs)
async def render_entity_picker(*,
field_descriptor: EntityFieldDescriptor,
entity_descriptor: EntityDescriptor,
message: Message | CallbackQuery,
callback_data: ContextData,
user: UserBase,
db_session: AsyncSession,
state: FSMContext,
current_value: BotEntity | BotEnum | list[BotEntity] | list[BotEnum],
edit_prompt: str,
page: int = 1,
**kwargs):
if callback_data.command in [CallbackCommand.ENTITY_PICKER_PAGE, CallbackCommand.ENTITY_PICKER_TOGGLE_ITEM]:
page = int(callback_data.data.split("&")[0])
is_list = False
type_origin = get_origin(field_descriptor.type_)
if type_origin == UnionType:
type_ = get_args(field_descriptor.type_)[0]
elif type_origin == list:
type_ = get_args(field_descriptor.type_)[0]
is_list = True
else:
type_ = field_descriptor.type_
if not issubclass(type_, BotEntity) and not issubclass(type_, BotEnum):
raise ValueError("Unsupported type")
page_size = await Settings.get(Settings.PAGE_SIZE)
if issubclass(type_, BotEnum):
items_count = len(type_.all_members)
enum_items = list(type_.all_members.values())[page_size * (page - 1):page_size * page]
items = [{"text": f"{"" if not is_list else "【✔︎】 " if item in (current_value or []) else "【 】 "}{item.localized(user.lang)}",
"value": item.value} for item in enum_items]
else:
items_count = await type_.get_count(session = db_session)
entity_items = await type_.get_multi(session = db_session, order_by = type_.name, skip = page_size * (page - 1), limit = page_size)
items = [{"text": f"{"" if not is_list else "【✔︎】 " if item in (current_value or []) else "【 】 "}{
type_.bot_entity_descriptor.item_caption_btn(type_.bot_entity_descriptor, item) if type_.bot_entity_descriptor.item_caption_btn
else get_local_text(item.name, user.lang) if field_descriptor.localizable else item.name}",
"value": str(item.id)} for item in entity_items]
total_pages = items_count // page_size + (1 if items_count % page_size else 0)
keyboard_builder = InlineKeyboardBuilder()
for item in items:
keyboard_builder.row(
InlineKeyboardButton(text = item["text"],
callback_data = ContextData(
command = CallbackCommand.ENTITY_PICKER_TOGGLE_ITEM if is_list else CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
data = f"{page}&{item['value']}" if is_list else item["value"],
save_state = True).pack()))
add_pagination_controls(keyboard_builder = keyboard_builder,
callback_data = callback_data,
total_pages = total_pages,
command = CallbackCommand.ENTITY_PICKER_PAGE,
page = page)
if is_list:
keyboard_builder.row(
InlineKeyboardButton(text = await Settings.get(Settings.APP_STRINGS_DONE_BTN),
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
save_state = True).pack()))
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
callback_data = callback_data,
state = state)
send_message = get_send_message(message)
await send_message(text = edit_prompt, reply_markup = keyboard_builder.as_markup())
@router.callback_query(ContextData.filter(F.command == CallbackCommand.ENTITY_PICKER_PAGE))
@router.callback_query(ContextData.filter(F.command == CallbackCommand.ENTITY_PICKER_TOGGLE_ITEM))
async def entity_picker_callback(query: CallbackQuery,
callback_data: ContextData,
db_session: AsyncSession,
app: QBotApp,
state: FSMContext,
**kwargs):
state_data = await state.get_data()
field_descriptor = get_field_descriptor(app = app, callback_data = callback_data)
entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
current_value = await deserialize(session = db_session, type_ = field_descriptor.type_, value = state_data["current_value"])
edit_prompt = state_data["edit_prompt"]
value = await deserialize(session = db_session, type_ = field_descriptor.type_, value = state_data["value"])
if callback_data.command == CallbackCommand.ENTITY_PICKER_TOGGLE_ITEM:
page, id_value = callback_data.data.split("&")
page = int(page)
type_ = get_args(field_descriptor.type_)[0]
if issubclass(type_, BotEnum):
item = type_(id_value)
if item in value:
value.remove(item)
else:
value.append(item)
else:
item = await type_.get(session = db_session, id = int(id_value))
if item in value:
value.remove(item)
else:
value.append(item)
await state.update_data({"value": serialize(value, field_descriptor)})
elif callback_data.command == CallbackCommand.ENTITY_PICKER_PAGE:
if callback_data.data == "skip":
return
page = int(callback_data.data)
else:
raise ValueError("Unsupported command")
await render_entity_picker(field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
message = query,
callback_data = callback_data,
current_value = value,
edit_prompt = edit_prompt,
db_session = db_session,
app = app,
state = state,
page = page,
**kwargs)

View File

@@ -0,0 +1,94 @@
from types import UnionType
from aiogram import Router
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton, CopyTextButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from typing import Any, get_args, get_origin
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ....model.language import LanguageBase
from ....model.settings import Settings
from ....utils import serialize
from ..context import ContextData, CallbackCommand
from ..common import get_send_message, get_local_text
from .common import wrap_editor
logger = getLogger(__name__)
router = Router()
async def string_editor(message: Message | CallbackQuery,
field_descriptor: EntityFieldDescriptor,
entity_descriptor: EntityDescriptor,
callback_data: ContextData,
current_value: Any,
edit_prompt: str,
state: FSMContext,
locale_index: int = 0,
**kwargs):
keyboard_builder = InlineKeyboardBuilder()
_edit_prompt = edit_prompt
type_ = field_descriptor.type_
type_origin = get_origin(type_)
if type_origin == UnionType:
type_ = get_args(type_)[0]
if type_ == str and field_descriptor.localizable:
current_locale = list(LanguageBase.all_members.values())[locale_index]
context_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
save_state = True)
_edit_prompt = f"{edit_prompt}\n{(await Settings.get(
Settings.APP_STRINGS_STRING_EDITOR_LOCALE_TEMPLATE_P_NAME)).format(name = current_locale)}"
_current_value = get_local_text(current_value, current_locale)
await state.update_data({
"context_data": context_data.pack(),
"edit_prompt": edit_prompt,
"locale_index": str(locale_index),
"current_value": current_value})
else:
context_data = ContextData(
command = CallbackCommand.FIELD_EDITOR_CALLBACK,
context = callback_data.context,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
field_name = callback_data.field_name,
save_state = True)
_current_value = serialize(current_value, field_descriptor)
await state.update_data({
"context_data": context_data.pack()})
if _current_value:
_current_value_caption = f"{_current_value[:30]}..." if len(_current_value) > 30 else _current_value
keyboard_builder.row(InlineKeyboardButton(text = _current_value_caption,
copy_text = CopyTextButton(text = _current_value)))
await wrap_editor(keyboard_builder = keyboard_builder,
field_descriptor = field_descriptor,
entity_descriptor = entity_descriptor,
callback_data = callback_data,
state = state)
send_message = get_send_message(message)
await send_message(text = _edit_prompt, reply_markup = keyboard_builder.as_markup())
async def context_command_fiter(*args, **kwargs):
print(args, kwargs)
return True

View File

@@ -0,0 +1,191 @@
from typing import get_args, get_origin
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import StatesGroup, State
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.i18n import I18n
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....main import QBotApp
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.owned_bot_entity import OwnedBotEntity
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ....model import EntityPermission
from ....utils import serialize, deserialize, get_user_permissions
from ..context import ContextData, CallbackCommand, CommandContext
from ..common import get_send_message, get_local_text, get_entity_descriptor, get_callable_str, get_value_repr
logger = getLogger(__name__)
router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.ENTITY_ITEM))
async def entity_item_callback(query: CallbackQuery, callback_data: ContextData, **kwargs):
await clear_state(state = kwargs["state"])
stack = await save_navigation_context(callback_data = callback_data, state = kwargs["state"])
await entity_item(query = query, callback_data = callback_data, navigation_stack = stack, **kwargs)
async def entity_item(query: CallbackQuery,
callback_data: ContextData,
db_session: AsyncSession,
user: UserBase,
app: QBotApp,
navigation_stack: list[ContextData],
**kwargs):
entity_descriptor = get_entity_descriptor(app, callback_data)
user_permissions = get_user_permissions(user, entity_descriptor)
entity_type: BotEntity = entity_descriptor.type_
keyboard_builder = InlineKeyboardBuilder()
entity_item = await entity_type.get(session = db_session, id = callback_data.entity_id)
if not entity_item:
return await query.answer(text = (await Settings.get(Settings.APP_STRINGS_NOT_FOUND)))
is_owned = issubclass(entity_type, OwnedBotEntity)
if (EntityPermission.READ not in user_permissions and
EntityPermission.READ_ALL not in user_permissions):
return await query.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
if (is_owned and
EntityPermission.READ_ALL not in user_permissions and
entity_item.user_id != user.id):
return await query.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
can_edit = (EntityPermission.UPDATE_ALL in user_permissions or
(EntityPermission.UPDATE in user_permissions and not is_owned) or
(EntityPermission.UPDATE in user_permissions and is_owned and
entity_item.user_id == user.id))
can_delete = (EntityPermission.DELETE_ALL in user_permissions or
(EntityPermission.DELETE in user_permissions and not is_owned) or
(EntityPermission.DELETE in user_permissions and is_owned and
entity_item.user_id == user.id))
edit_delete_row = []
if can_edit:
edit_delete_row.append(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_EDIT_BTN)),
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR,
context = CommandContext.ENTITY_EDIT,
entity_name = entity_descriptor.name,
entity_id = str(entity_item.id),
field_name = entity_descriptor.field_sequence[0],
save_state = True).pack()))
if can_delete:
edit_delete_row.append(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_DELETE_BTN)),
callback_data = ContextData(
command = CallbackCommand.ENTITY_DELETE,
entity_name = entity_descriptor.name,
entity_id = str(entity_item.id)).pack()))
if edit_delete_row:
keyboard_builder.row(*edit_delete_row)
entity_caption = get_callable_str(entity_descriptor.caption_msg, entity_descriptor, entity_item)
entity_item_name = get_local_text(entity_item.name, user.lang) if entity_descriptor.fields_descriptors["name"].localizable else entity_item.name
item_text = f"<b><u><i>{entity_caption or entity_descriptor.name}:</i></u></b> <b>{entity_item_name}</b>"
for field_descriptor in entity_descriptor.fields_descriptors.values():
if field_descriptor.name in ["name", "id"] or not field_descriptor.is_visible:
continue
field_caption = get_callable_str(field_descriptor.caption_str, field_descriptor, entity_item)
value = get_value_repr(value = getattr(entity_item, field_descriptor.name),
field_descriptor = field_descriptor,
locale = user.lang)
item_text += f"\n{field_caption or field_descriptor.name}:{f" <b>{value}</b>" if value else ""}"
context = pop_navigation_context(navigation_stack)
if context:
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
send_message = get_send_message(query)
await send_message(text = item_text, reply_markup = keyboard_builder.as_markup())
@router.callback_query(ContextData.filter(F.command == CallbackCommand.ENTITY_DELETE))
async def entity_delete_callback(query: CallbackQuery, **kwargs):
callback_data: ContextData = kwargs["callback_data"]
user: UserBase = kwargs["user"]
db_session: AsyncSession = kwargs["db_session"]
app: QBotApp = kwargs["app"]
entity_descriptor = get_entity_descriptor(app = app, callback_data = callback_data)
user_permissions = get_user_permissions(user, entity_descriptor)
entity = await entity_descriptor.type_.get(session = db_session, id = int(callback_data.entity_id))
if not (EntityPermission.DELETE_ALL in user_permissions or
(EntityPermission.DELETE in user_permissions and not issubclass(entity_descriptor.type_, OwnedBotEntity)) or
(EntityPermission.DELETE in user_permissions and issubclass(entity_descriptor.type_, OwnedBotEntity) and
entity.user_id == user.id)):
return await query.answer(text = (await Settings.get(Settings.APP_STRINGS_FORBIDDEN)))
if not callback_data.data:
field_descriptor = entity_descriptor.fields_descriptors["name"]
entity = await entity_descriptor.type_.get(session = db_session, id = int(callback_data.entity_id))
return await query.message.edit_text(
text = (await Settings.get(Settings.APP_STRINGS_CONFIRM_DELETE_P_NAME)).format(
name = get_value_repr(
value = getattr(entity, field_descriptor.name),
field_descriptor = field_descriptor,
locale = user.lang)),
reply_markup = InlineKeyboardBuilder().row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_YES_BTN)),
callback_data = ContextData(
command = CallbackCommand.ENTITY_DELETE,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
data = "yes").pack()),
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_NO_BTN)),
callback_data = ContextData(
command = CallbackCommand.ENTITY_ITEM,
entity_name = callback_data.entity_name,
entity_id = callback_data.entity_id,
data = "no").pack())).as_markup())
if callback_data.data == "yes":
await entity_descriptor.type_.remove(
session = db_session, id = int(callback_data.entity_id), commit = True)
await route_callback(message = query, **kwargs)
if callback_data.data == "no":
await route_callback(message = query, back = False, **kwargs)
from ..navigation import pop_navigation_context, save_navigation_context, clear_state, route_callback

View File

@@ -0,0 +1,143 @@
from typing import get_args, get_origin
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import StatesGroup, State
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.i18n import I18n
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....main import QBotApp
from ....model.bot_entity import BotEntity
from ....model.bot_enum import BotEnum
from ....model.owned_bot_entity import OwnedBotEntity
from ....model.settings import Settings
from ....model.user import UserBase
from ....model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ....model import EntityPermission
from ....utils import serialize, deserialize, get_user_permissions
from ..context import ContextData, CallbackCommand, CommandContext
from ..common import (add_pagination_controls, get_local_text, get_entity_descriptor,
get_callable_str, get_send_message)
logger = getLogger(__name__)
router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.ENTITY_LIST))
async def entity_list_callback(query: CallbackQuery, callback_data: ContextData, **kwargs):
if callback_data.data == "skip":
return
await clear_state(state = kwargs["state"])
stack = await save_navigation_context(callback_data = callback_data, state = kwargs["state"])
await entity_list(message = query, callback_data = callback_data, navigation_stack = stack, **kwargs)
async def entity_list(message: CallbackQuery | Message,
callback_data: ContextData,
db_session: AsyncSession,
user: UserBase,
app: QBotApp,
navigation_stack: list[ContextData],
**kwargs):
page = int(callback_data.data or "1")
entity_descriptor = get_entity_descriptor(app, callback_data)
user_permissions = get_user_permissions(user, entity_descriptor)
entity_type = entity_descriptor.type_
keyboard_builder = InlineKeyboardBuilder()
if EntityPermission.CREATE in user_permissions or EntityPermission.CREATE_ALL in user_permissions:
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_ADD_BTN)),
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR,
context = CommandContext.ENTITY_CREATE,
entity_name = entity_descriptor.name,
field_name = entity_descriptor.field_sequence[0],
save_state = True).pack()))
page_size = await Settings.get(Settings.PAGE_SIZE)
if issubclass(entity_type, OwnedBotEntity):
if EntityPermission.READ_ALL in user_permissions or EntityPermission.LIST_ALL in user_permissions:
items = await entity_type.get_multi(
session = db_session, order_by = entity_type.name,
skip = page_size * (page - 1), limit = page_size)
items_count = await entity_type.get_count(session = db_session)
elif EntityPermission.READ in user_permissions or EntityPermission.LIST in user_permissions:
items = await entity_type.get_multi_by_user(
session = db_session, user_id = user.id, order_by = entity_type.name,
skip = page_size * (page - 1), limit = page_size)
items_count = await entity_type.get_count_by_user(session = db_session, user_id = user.id)
else:
items = list[OwnedBotEntity]()
items_count = 0
elif issubclass(entity_type, BotEntity):
if (EntityPermission.READ in user_permissions or EntityPermission.LIST in user_permissions or
EntityPermission.READ_ALL in user_permissions or EntityPermission.LIST_ALL in user_permissions):
items = await entity_type.get_multi(
session = db_session, order_by = entity_type.name,
skip = page_size * (page - 1), limit = page_size)
items_count = await entity_type.get_count(session = db_session)
else:
items = list[BotEntity]()
items_count = 0
else:
raise ValueError(f"Unsupported entity type: {entity_type}")
total_pages = items_count // page_size + (1 if items_count % page_size else 0)
for item in items:
if entity_descriptor.item_caption_btn:
caption = entity_descriptor.item_caption_btn(entity_descriptor, item)
elif entity_descriptor.fields_descriptors["name"].localizable:
caption = get_local_text(item.name, user.lang)
else:
caption = item.name
keyboard_builder.row(
InlineKeyboardButton(
text = caption,
callback_data = ContextData(
command = CallbackCommand.ENTITY_ITEM,
entity_name = entity_descriptor.name,
entity_id = str(item.id)).pack()))
add_pagination_controls(keyboard_builder = keyboard_builder,
callback_data = callback_data,
total_pages = total_pages,
command = CallbackCommand.ENTITY_LIST,
page = page)
context = pop_navigation_context(navigation_stack)
if context:
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
if entity_descriptor.caption_msg:
entity_text = get_callable_str(entity_descriptor.caption_msg, entity_descriptor)
else:
entity_text = entity_descriptor.name
if entity_descriptor.description:
entity_desciption = get_callable_str(entity_descriptor.description, entity_descriptor)
else:
entity_desciption = None
send_message = get_send_message(message)
await send_message(text = f"{entity_text}{f"\n{entity_desciption}" if entity_desciption else ""}",
reply_markup = keyboard_builder.as_markup())
from ..navigation import pop_navigation_context, save_navigation_context, clear_state

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,67 @@
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from babel.support import LazyProxy
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....main import QBotApp
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
from ..common import get_send_message
from ....model.descriptors import EntityCaptionCallable
logger = getLogger(__name__)
router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_ENTITIES))
async def menu_entry_entities(message: CallbackQuery, **kwargs):
stack = await save_navigation_context(
callback_data = kwargs["callback_data"],
state = kwargs["state"])
await entities_menu(message = message, navigation_stack = stack, **kwargs)
async def entities_menu(message: Message | CallbackQuery,
callback_data: ContextData,
app: QBotApp,
state: FSMContext,
navigation_stack: list[ContextData],
**kwargs):
keyboard_builder = InlineKeyboardBuilder()
entity_metadata = app.entity_metadata
for entity in entity_metadata.entity_descriptors.values():
if entity.caption_btn.__class__ == EntityCaptionCallable:
caption = entity.caption_btn(entity) or entity.name
elif entity.caption_btn.__class__ == LazyProxy:
caption = f"{f"{entity.icon} " if entity.icon else ""}{entity.caption_btn.value or entity.name}"
else:
caption = f"{f"{entity.icon} " if entity.icon else ""}{entity.caption_btn or entity.name}"
keyboard_builder.row(
InlineKeyboardButton(
text = caption,
callback_data = ContextData(command = CallbackCommand.ENTITY_LIST, entity_name = entity.name).pack()))
context = pop_navigation_context(navigation_stack)
if context:
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
send_message = get_send_message(message)
await send_message(text = (await Settings.get(Settings.APP_STRINGS_REFERENCES)), reply_markup = keyboard_builder.as_markup())
from ..navigation import save_navigation_context, pop_navigation_context

View File

@@ -0,0 +1,62 @@
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....main import QBotApp
from ....model.language import LanguageBase
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
from ..navigation import route_callback
from .settings import settings_menu
from ..common import get_send_message
logger = getLogger(__name__)
router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_LANGUAGE))
async def menu_entry_language(message: CallbackQuery, **kwargs):
stack = await save_navigation_context(callback_data = kwargs["callback_data"],
state = kwargs["state"])
await language_menu(message, navigation_stack = stack, **kwargs)
async def language_menu(message: Message | CallbackQuery,
navigation_stack: list[ContextData],
user: UserBase,
**kwargs):
send_message = get_send_message(message)
inline_keyboard = [
[InlineKeyboardButton(text = locale.localized(user.lang),
callback_data = ContextData(command = CallbackCommand.SET_LANGUAGE,
data = str(locale)).pack())]
for locale in LanguageBase.all_members.values()]
context = pop_navigation_context(navigation_stack)
if context:
inline_keyboard.append([InlineKeyboardButton(text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack())])
await send_message(text = (await Settings.get(Settings.APP_STRINGS_LANGUAGE)),
reply_markup = InlineKeyboardMarkup(inline_keyboard = inline_keyboard))
@router.callback_query(ContextData.filter(F.command == CallbackCommand.SET_LANGUAGE))
async def set_language(message: CallbackQuery, user: UserBase, callback_data: ContextData, db_session: AsyncSession, **kwargs):
user.lang = callback_data.data
await db_session.commit()
await route_callback(message, callback_data = callback_data, user = user, db_session = db_session, **kwargs)
from ..navigation import pop_navigation_context, save_navigation_context

91
bot/handlers/menu/main.py Normal file
View File

@@ -0,0 +1,91 @@
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....main import QBotApp
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
from ..common import get_send_message
logger = getLogger(__name__)
router = Router()
@router.message(Command("menu"))
async def command_menu(message: Message, **kwargs):
await clear_state(state = kwargs["state"], clear_nav = True)
callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_MAIN)
stack = await save_navigation_context(callback_data = callback_data, state = kwargs["state"])
kwargs.update({"navigation_stack": stack, "callback_data": callback_data})
await main_menu(message, **kwargs)
# @router.callback_query(CallbackData.filter(F.command == CallbackCommand.MENU_ENTRY))
# async def menu_entry(query: CallbackQuery, callback_data: CallbackData, user: UserBase, db_session: AsyncSession, app: QBotApp):
# pass
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_MAIN))
async def menu_entry_main(message: CallbackQuery, **kwargs):
stack = await save_navigation_context(callback_data = kwargs["callback_data"], state = kwargs["state"])
await main_menu(message, navigation_stack = stack, **kwargs)
async def main_menu(message: Message | CallbackQuery, navigation_stack: list[ContextData], **kwargs):
keyboard_builder = InlineKeyboardBuilder()
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_REFERENCES_BTN)),
callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_ENTITIES).pack()))
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_SETTINGS_BTN)),
callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_SETTINGS).pack()))
context = pop_navigation_context(navigation_stack)
if context:
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
send_message = get_send_message(message)
await send_message(text = (await Settings.get(Settings.APP_STRINGS_MAIN_NENU)),
reply_markup = keyboard_builder.as_markup())
from .entities import router as entities_router
from .settings import router as settings_router
from .parameters import router as parameters_router
from .language import router as language_router
from ..editors import router as editors_router
from ..forms.entity_list import router as entity_list_router
from ..forms.entity_form import router as entity_form_router
router.include_routers(
entities_router,
settings_router,
parameters_router,
language_router,
editors_router,
entity_list_router,
entity_form_router
)
from ..navigation import save_navigation_context, pop_navigation_context, clear_state

View File

@@ -0,0 +1,80 @@
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import StatesGroup, State
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.i18n import I18n
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....main import QBotApp
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand, CommandContext
from ..common import get_send_message, get_value_repr, get_callable_str, authorize_command
from ..navigation import save_navigation_context, pop_navigation_context
logger = getLogger(__name__)
router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_PARAMETERS))
async def menu_entry_parameters(message: CallbackQuery, **kwargs):
await clear_state(state = kwargs["state"])
stack = await save_navigation_context(callback_data = kwargs["callback_data"], state = kwargs["state"])
await parameters_menu(message = message, navigation_stack = stack, **kwargs)
async def parameters_menu(message: Message | CallbackQuery,
user: UserBase,
callback_data: ContextData,
navigation_stack: list[ContextData],
**kwargs):
if not await authorize_command(user = user, callback_data = callback_data):
await message.answer(await Settings.get(Settings.APP_STRINGS_FORBIDDEN))
settings = await Settings.get_params()
keyboard_builder = InlineKeyboardBuilder()
for key, value in settings.items():
if not key.is_visible:
continue
if key.caption_value_btn:
caption = get_callable_str(callable_str = key.caption_value_btn, descriptor = key, entity = None, value = value)
else:
if key.caption_btn:
caption = get_callable_str(callable_str = key.caption_btn, descriptor = key, entity = None, value = value)
else:
caption = key.name
if key.type_ == bool:
caption = f"{"【✔︎】" if value else "【 】"} {caption}"
else:
caption = f"{caption}: {get_value_repr(value = value, field_descriptor = key, locale = user.lang)}"
keyboard_builder.row(InlineKeyboardButton(text = caption,
callback_data = ContextData(
command = CallbackCommand.FIELD_EDITOR,
context = CommandContext.SETTING_EDIT,
field_name = key.name).pack()))
context = pop_navigation_context(navigation_stack)
if context:
keyboard_builder.row(InlineKeyboardButton(text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
send_message = get_send_message(message)
await send_message(text = (await Settings.get(Settings.APP_STRINGS_PARAMETERS)), reply_markup = keyboard_builder.as_markup())
from ..navigation import pop_navigation_context, get_navigation_context, clear_state

View File

@@ -0,0 +1,58 @@
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ....main import QBotApp
from ....model.settings import Settings
from ....model.user import UserBase
from ..context import ContextData, CallbackCommand
from ..common import get_send_message, authorize_command
from ..navigation import save_navigation_context, pop_navigation_context
logger = getLogger(__name__)
router = Router()
@router.callback_query(ContextData.filter(F.command == CallbackCommand.MENU_ENTRY_SETTINGS))
async def menu_entry_settings(message: CallbackQuery, **kwargs):
stack = await save_navigation_context(callback_data = kwargs["callback_data"], state = kwargs["state"])
await settings_menu(message, navigation_stack = stack, **kwargs)
async def settings_menu(message: Message | CallbackQuery,
user: UserBase,
navigation_stack: list[ContextData],
**kwargs):
keyboard_builder = InlineKeyboardBuilder()
if await authorize_command(user = user, callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_PARAMETERS)):
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_PARAMETERS_BTN)),
callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_PARAMETERS).pack()))
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_LANGUAGE_BTN)),
callback_data = ContextData(command = CallbackCommand.MENU_ENTRY_LANGUAGE).pack()))
context = pop_navigation_context(navigation_stack)
if context:
keyboard_builder.row(
InlineKeyboardButton(
text = (await Settings.get(Settings.APP_STRINGS_BACK_BTN)),
callback_data = context.pack()))
send_message = get_send_message(message)
await send_message(text = (await Settings.get(Settings.APP_STRINGS_SETTINGS)), reply_markup = keyboard_builder.as_markup())
from ..navigation import pop_navigation_context, get_navigation_context

View File

@@ -0,0 +1,93 @@
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery
from .context import ContextData, CallbackCommand
async def save_navigation_context(callback_data: ContextData, state: FSMContext) -> list[ContextData]:
data = await state.get_data()
stack = [ContextData.unpack(item) for item in data.get("navigation_stack", [])]
data_nc = data.get("navigation_context")
navigation_context = ContextData.unpack(data_nc) if data_nc else None
if callback_data.back:
callback_data.back = False
if stack:
stack.pop()
else:
if stack and navigation_context and navigation_context.command == callback_data.command:
navigation_context = callback_data
elif navigation_context:
stack.append(navigation_context)
await state.update_data({"navigation_stack": [item.pack() for item in stack],
"navigation_context": callback_data.pack()})
return stack
def pop_navigation_context(stack: list[ContextData]) -> ContextData | None:
if stack:
data = stack[-1]
data.back = True
return data
async def get_navigation_context(state: FSMContext) -> tuple[list[ContextData], ContextData | None]:
data = await state.get_data()
data_nc = data.get("navigation_context")
context = ContextData.unpack(data_nc) if data_nc else None
return ([ContextData.unpack(item) for item in data.get("navigation_stack", [])],
context)
async def clear_state(state: FSMContext, clear_nav: bool = False):
if clear_nav:
await state.clear()
else:
state_data = await state.get_data()
stack = state_data.get("navigation_stack")
context = state_data.get("navigation_context")
update_data = {}
if stack:
update_data["navigation_stack"] = stack
if context:
update_data["navigation_context"] = context
await state.clear()
await state.update_data(update_data)
async def route_callback(message: Message | CallbackQuery, back: bool = True, **kwargs):
stack, context = await get_navigation_context(kwargs["state"])
if back:
context = pop_navigation_context(stack)
stack = await save_navigation_context(callback_data = context, state = kwargs["state"])
kwargs.update({"callback_data": context, "navigation_stack": stack})
if context:
if context.command == CallbackCommand.MENU_ENTRY_MAIN:
await main_menu(message, **kwargs)
elif context.command == CallbackCommand.MENU_ENTRY_SETTINGS:
await settings_menu(message, **kwargs)
elif context.command == CallbackCommand.MENU_ENTRY_PARAMETERS:
await parameters_menu(message, **kwargs)
elif context.command == CallbackCommand.MENU_ENTRY_LANGUAGE:
await language_menu(message, **kwargs)
elif context.command == CallbackCommand.MENU_ENTRY_ENTITIES:
await entities_menu(message, **kwargs)
elif context.command == CallbackCommand.ENTITY_LIST:
await entity_list(message, **kwargs)
elif context.command == CallbackCommand.ENTITY_ITEM:
await entity_item(message, **kwargs)
else:
raise ValueError(f"Unknown command {context.command}")
else:
raise ValueError("No navigation context")
from .menu.main import main_menu
from .menu.settings import settings_menu
from .menu.parameters import parameters_menu
from .menu.language import language_menu
from .menu.entities import entities_menu
from .forms.entity_list import entity_list
from .forms.entity_form import entity_item

54
bot/handlers/start.py Normal file
View File

@@ -0,0 +1,54 @@
from aiogram import Router
from aiogram.filters import CommandStart
from aiogram.fsm.context import FSMContext
from aiogram.types import Message
from logging import getLogger
from sqlmodel.ext.asyncio.session import AsyncSession
from ...main import QBotApp
from ...model.settings import Settings
from ...model.language import LanguageBase
from .navigation import clear_state
logger = getLogger(__name__)
router = Router()
@router.message(CommandStart())
async def start(message: Message, db_session: AsyncSession, app: QBotApp, state: FSMContext):
await clear_state(state = state, clear_nav = True)
User = app.user_class
user = await User.get(session = db_session, id = message.from_user.id)
if not user:
msg_text = (await Settings.get(Settings.APP_STRINGS_WELCOME_P_NAME)).format(name = message.from_user.full_name)
try:
if message.from_user.language_code in [item.value for item in LanguageBase.all_members.values()]:
lang = LanguageBase(message.from_user.language_code)
user = await User.create(session = db_session,
obj_in = User(
id = message.from_user.id,
name = message.from_user.full_name,
lang = lang,
is_active = True),
commit = True)
except Exception as e:
logger.error("Error creating user", exc_info = True)
message.answer((await Settings.get(Settings.APP_STRINGS_INTERNAL_ERROR_P_ERROR)).format(error = str(e)))
return
else:
if user.is_active:
msg_text = (await Settings.get(Settings.APP_STRINGS_GREETING_P_NAME)).format(name = user.name)
else:
msg_text = (await Settings.get(Settings.APP_STRINGS_USER_BLOCKED_P_NAME)).format(name = user.name)
await message.answer(msg_text)

80
config/__init__.py Normal file
View File

@@ -0,0 +1,80 @@
from babel.support import LazyProxy
from pydantic import computed_field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Literal, Self
import warnings
class Config(BaseSettings):
model_config = SettingsConfigDict(
env_file = ".env",
env_ignore_empty = True,
extra = "ignore"
)
SECRET_KEY: str = "changethis"
ENVIRONMENT: Literal["local", "staging", "production"] = "local"
DB_NAME: str = "app"
DB_HOST: str = "db"
DB_PORT: int = 5432
DB_USER: str = "app"
DB_PASSWORD: str = "changethis"
@computed_field
@property
def DATABASE_URI(self) -> str:
return f"postgresql+asyncpg://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}"
DOMAIN: str
@computed_field
@property
def API_DOMAIN(self) -> str:
if self.ENVIRONMENT == "local":
return self.DOMAIN
return f'api.{self.DOMAIN}'
@computed_field
@property
def API_URL(self) -> str:
if self.USE_NGROK:
return self.NGROK_URL
return f"{"http" if self.ENVIRONMENT == "local" else "https"}://{self.API_DOMAIN}"
API_PORT: int = 8000
TELEGRAM_BOT_TOKEN: str = "changethis"
ADMIN_TELEGRAM_ID: int
USE_NGROK : bool = False
NGROK_AUTH_TOKEN: str = "changethis"
NGROK_URL: str = ""
LOG_LEVEL: str = "DEBUG"
def _check_default_secret(self, var_name: str, value: str | None) -> None:
if value == "changethis":
message = (
f"The value of {var_name} is \"changethis\", "
"for security, please change it, at least for deployments."
)
if self.ENVIRONMENT == "local":
warnings.warn(message, stacklevel=1)
else:
raise ValueError(message)
@model_validator(mode="after")
def _enforce_non_default_secrets(self) -> Self:
self._check_default_secret("SECRET_KEY", self.SECRET_KEY)
self._check_default_secret("DB_PASSWORD", self.DB_PASSWORD)
self._check_default_secret("TELEGRAM_BOT_TOKEN", self.TELEGRAM_BOT_TOKEN)
if self.USE_NGROK:
self._check_default_secret("NGROK_AUTH_TOKEN", self.NGROK_AUTH_TOKEN)
return self
config = Config()

Binary file not shown.

33
db/__init__.py Normal file
View File

@@ -0,0 +1,33 @@
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from ..config import config
import logging
logger = logging.getLogger('sqlalchemy.engine')
logger.setLevel(logging.DEBUG)
async_engine = create_async_engine(config.DATABASE_URI)
async_session = sessionmaker[AsyncSession](
async_engine, class_=AsyncSession, expire_on_commit=False
)
async def init_db(session: AsyncSession) -> None:
from app.models import User, Language,Role
user = (await session.exec(
select(User).where(User.id == config.ADMIN_TELEGRAM_ID)
)).first()
if not user:
await User.create(session, User(
id = config.ADMIN_TELEGRAM_ID,
lang = Language.DEFAULT,
roles = [Role.SUPER_USER],
name = "Admin"), commit = True)
async def get_db() -> AsyncSession: # type: ignore
async with async_session() as session:
yield session

Binary file not shown.

Binary file not shown.

84
fsm/db_storage.py Normal file
View File

@@ -0,0 +1,84 @@
from aiogram.fsm.state import State
from aiogram.fsm.storage.base import BaseStorage, StorageKey, StateType, DefaultKeyBuilder, KeyBuilder
from sqlmodel import select
from typing import Any, Dict
import ujson as json
from ..db import async_session
from ..model.fsm_storage import FSMStorage
class DbStorage(BaseStorage):
def __init__(
self,
key_builder: KeyBuilder | None = None) -> None:
if key_builder is None:
key_builder = DefaultKeyBuilder()
self.key_builder = key_builder
async def set_state(self, key: StorageKey, state: StateType = None) -> None:
db_key = self.key_builder.build(key, "state")
async with async_session() as session:
db_state = (await session.exec(
select(FSMStorage).where(FSMStorage.key == db_key))).first()
if db_state:
if state is None:
await session.delete(db_state)
else:
db_state.value = state.state if isinstance(state, State) else state
elif state is not None:
db_state = FSMStorage(key = db_key, value = state.state if isinstance(state, State) else state)
session.add(db_state)
else:
return
await session.commit()
async def get_state(self, key: StorageKey) -> str | None:
db_key = self.key_builder.build(key, "state")
async with async_session() as session:
db_state = (await session.exec(
select(FSMStorage).where(FSMStorage.key == db_key))).first()
return db_state.value if db_state else None
async def set_data(self, key: StorageKey, data: Dict[str, Any]) -> None:
db_key = self.key_builder.build(key, "data")
async with async_session() as session:
db_data = (await session.exec(
select(FSMStorage).where(FSMStorage.key == db_key))).first()
if db_data:
if not data:
await session.delete(db_data)
else:
db_data.value = json.dumps(data)
elif data:
db_data = FSMStorage(key = db_key, value = json.dumps(data,))
session.add(db_data)
else:
return
await session.commit()
async def get_data(self, key: StorageKey) -> Dict[str, Any]:
db_key = self.key_builder.build(key, "data")
async with async_session() as session:
db_data = (await session.exec(
select(FSMStorage).where(FSMStorage.key == db_key))).first()
return json.loads(db_data.value) if db_data else {}
async def close(self):
return await super().close()

40
lifespan.py Normal file
View File

@@ -0,0 +1,40 @@
from contextlib import asynccontextmanager
from .main import QBotApp
@asynccontextmanager
async def default_lifespan(app: QBotApp):
app.logger.debug("starting qbot app")
if app.config.USE_NGROK:
try:
from pyngrok import ngrok
from pyngrok.conf import PyngrokConfig
except ImportError:
app.logger.error("pyngrok is not installed")
raise
tunnel = ngrok.connect(app.config.API_PORT, pyngrok_config = PyngrokConfig(auth_token = app.config.NGROK_AUTH_TOKEN))
app.config.NGROK_URL = tunnel.public_url
await app.bot.set_webhook(url = f"{app.config.API_URL}/api/telegram/webhook",
drop_pending_updates = True,
allowed_updates = ['message', 'callback_query', 'pre_checkout_query'],
secret_token = app.bot_auth_token)
app.logger.info("qbot app started")
if app.lifespan:
async with app.lifespan(app):
yield
else:
yield
app.logger.info("stopping qbot app")
await app.bot.delete_webhook()
if app.config.USE_NGROK:
ngrok.disconnect(app.config.NGROK_URL)
ngrok.kill()
app.logger.info("qbot app stopped")

72
main.py Normal file
View File

@@ -0,0 +1,72 @@
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.utils.callback_answer import CallbackAnswerMiddleware
from aiogram.utils.i18n import I18n
from fastapi import FastAPI
from fastapi.applications import Lifespan, AppType
from logging import getLogger
from secrets import token_hex
from .config import Config
from .fsm.db_storage import DbStorage
from .middleware.telegram import AuthMiddleware, I18nMiddleware, ResetStateMiddleware
from .model.user import UserBase
from .model.entity_metadata import EntityMetadata
class QBotApp(FastAPI):
bot: Bot
dp: Dispatcher
config: Config
logger = getLogger(__name__)
def __init__[UserType: UserBase](self,
user_class: type[UserType] | None = None,
config: Config | None = None,
lifespan: Lifespan[AppType] | None = None,
*args,
**kwargs):
if config is None:
config = Config()
if user_class is None:
from .model.default_user import DefaultUser
user_class = DefaultUser
self.user_class = user_class
self.entity_metadata: EntityMetadata = user_class.entity_metadata
self.config = config
self.lifespan = lifespan
self.bot = Bot(token = self.config.TELEGRAM_BOT_TOKEN, default = DefaultBotProperties(parse_mode = "HTML"))
dp = Dispatcher(storage = DbStorage())
i18n = I18n(path = "locales", default_locale = "en", domain = "messages")
i18n_middleware = I18nMiddleware(user_class = user_class, i18n = i18n)
i18n_middleware.setup(dp)
# dp.callback_query.middleware(ResetStateMiddleware())
dp.callback_query.middleware(CallbackAnswerMiddleware())
from .bot.handlers.start import router as start_router
dp.include_router(start_router)
from .bot.handlers.menu.main import router as main_menu_router
auth = AuthMiddleware(user_class = user_class)
main_menu_router.message.middleware.register(auth)
main_menu_router.callback_query.middleware.register(auth)
dp.include_router(main_menu_router)
self.dp = dp
self.bot_auth_token = token_hex(128)
from .lifespan import default_lifespan
super().__init__(lifespan = default_lifespan, *args, **kwargs)
from .api_route.telegram import router as telegram_router
self.include_router(telegram_router, prefix = "/api/telegram", tags = ["telegram"])

View File

@@ -0,0 +1,3 @@
from .auth import AuthMiddleware
from .i18n import I18nMiddleware
from .reset_state import ResetStateMiddleware

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,40 @@
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery
from aiogram.utils.i18n import gettext as _
from babel.support import LazyProxy
from typing import Any, Awaitable, Callable, Dict
from ...model.user import UserBase
class AuthMiddleware(BaseMiddleware):
def __init__[UserType: UserBase](self,
user_class: type[UserType],
not_authenticated_msg: LazyProxy | str = "not authenticated",
not_active_msg: LazyProxy | str = "not active"):
self.user_class = user_class
self.not_authenticated_msg = not_authenticated_msg
self.not_active_msg = not_active_msg
async def __call__(self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]) -> Any:
user = await self.user_class.get(id = event.from_user.id, session = data["db_session"])
if user and user.is_active:
data["user"] = user
return await handler(event, data)
if type(event) in [Message, CallbackQuery]:
if user and not user.is_active:
return await event.answer(self.not_active_msg)
return await event.answer(self.not_authenticated_msg)

View File

@@ -0,0 +1,28 @@
from typing import Optional, Dict, Any
from aiogram.utils.i18n import I18n, SimpleI18nMiddleware
from aiogram.types import TelegramObject
from ...model.user import UserBase
class I18nMiddleware(SimpleI18nMiddleware):
"""
This middleware stores locale in the FSM storage
"""
def __init__[UserType: UserBase](
self,
user_class: type[UserType],
i18n: I18n,
i18n_key: Optional[str] = "i18n",
middleware_key: str = "i18n_middleware",
) -> None:
self.user_class = user_class
super().__init__(i18n=i18n, i18n_key=i18n_key, middleware_key=middleware_key)
async def get_locale(self, event: TelegramObject, data: Dict[str, Any]) -> str:
db_session = data.get("db_session")
if db_session and event.model_fields.get("from_user"):
user = await self.user_class.get(id = event.from_user.id, session = db_session)
if user and user.lang:
return user.lang
return await super().get_locale(event=event, data=data)

View File

@@ -0,0 +1,30 @@
from typing import Any, Awaitable, Callable, Dict
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject
from aiogram.fsm.context import FSMContext
from aiogram.utils.i18n import gettext as _
from ...bot.handlers.context import ContextData
class ResetStateMiddleware(BaseMiddleware):
async def __call__(self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]) -> Any:
save_state = False
callback_data = data.get("callback_data")
if isinstance(callback_data, ContextData):
save_state = callback_data.save_state
if not save_state:
state = data.get("state")
if isinstance(state, FSMContext):
await state.clear()
return await handler(event, data)

46
model/__init__.py Normal file
View File

@@ -0,0 +1,46 @@
from functools import wraps
from sqlalchemy.inspection import inspect
from sqlalchemy.orm.state import InstanceState
from typing import cast
from .bot_enum import BotEnum, EnumMember
from ..db import async_session
class EntityPermission(BotEnum):
LIST = EnumMember("list")
READ = EnumMember("read")
CREATE = EnumMember("create")
UPDATE = EnumMember("update")
DELETE = EnumMember("delete")
LIST_ALL = EnumMember("list_all")
READ_ALL = EnumMember("read_all")
CREATE_ALL = EnumMember("create_all")
UPDATE_ALL = EnumMember("update_all")
DELETE_ALL = EnumMember("delete_all")
def session_dep(func):
@wraps(func)
async def wrapper(cls, *args, **kwargs):
if "session" in kwargs and kwargs["session"]:
return await func(cls, *args, **kwargs)
_session = None
state = cast(InstanceState, inspect(cls))
if hasattr(state, "async_session"):
_session = state.async_session
if not _session:
async with async_session() as session:
kwargs["session"] = session
return await func(cls, *args, **kwargs)
else:
kwargs["session"] = _session
return await func(cls, *args, **kwargs)
return wrapper

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

6
model/_singleton.py Normal file
View File

@@ -0,0 +1,6 @@
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]

241
model/bot_entity.py Normal file
View File

@@ -0,0 +1,241 @@
from functools import wraps
from typing import ClassVar, cast, get_args, get_origin
from pydantic import BaseModel
from sqlmodel import SQLModel, BIGINT, Field, select, func
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel.main import SQLModelMetaclass, RelationshipInfo
from .descriptors import EntityDescriptor, EntityField, EntityFieldDescriptor
from .entity_metadata import EntityMetadata
from . import session_dep
class BotEntityMetaclass(SQLModelMetaclass):
__future_references__ = {}
def __new__(mcs, name, bases, namespace, **kwargs):
bot_fields_descriptors = {}
if bases:
bot_entity_descriptor = bases[0].__dict__.get('bot_entity_descriptor')
bot_fields_descriptors = {key: EntityFieldDescriptor(**value.__dict__.copy())
for key, value in bot_entity_descriptor.fields_descriptors.items()} if bot_entity_descriptor else {}
if '__annotations__' in namespace:
for annotation in namespace['__annotations__']:
if annotation in ["bot_entity_descriptor", "entity_metadata"]:
continue
attribute_value = namespace.get(annotation)
if isinstance(attribute_value, RelationshipInfo):
continue
descriptor_kwargs = {}
descriptor_name = annotation
if attribute_value:
if isinstance(attribute_value, EntityField):
descriptor_kwargs = attribute_value.__dict__.copy()
sm_descriptor = descriptor_kwargs.pop("sm_descriptor", None)
if sm_descriptor:
namespace[annotation] = sm_descriptor
else:
namespace.pop(annotation)
descriptor_name = descriptor_kwargs.pop("name") or annotation
type_ = namespace['__annotations__'][annotation]
field_descriptor = EntityFieldDescriptor(
name = descriptor_name,
field_name = annotation,
type_ = type_,
**descriptor_kwargs)
type_origin = get_origin(type_)
is_list = False
if type_origin == list:
is_list = True
type_ = get_args(type_)[0]
if isinstance(type_, str):
type_not_found = True
for entity_descriptor in EntityMetadata().entity_descriptors.values():
if type_ == entity_descriptor.class_name:
field_descriptor.type_ = list[entity_descriptor.type_] if is_list else entity_descriptor.type_
type_not_found = False
break
if type_not_found:
if type_ in mcs.__future_references__:
mcs.__future_references__[type_].append(field_descriptor)
else:
mcs.__future_references__[type_] = [field_descriptor]
bot_fields_descriptors[descriptor_name] = field_descriptor
descriptor_name = name
if "bot_entity_descriptor" in namespace:
entity_descriptor = namespace.pop("bot_entity_descriptor")
descriptor_kwargs: dict = entity_descriptor.__dict__.copy()
descriptor_name = descriptor_kwargs.pop("name", None)
descriptor_name = descriptor_name or name.lower()
descriptor_fields_sequence = descriptor_kwargs.pop("field_sequence", None)
if not descriptor_fields_sequence:
descriptor_fields_sequence = list(bot_fields_descriptors.keys())
descriptor_fields_sequence.remove("id")
namespace["bot_entity_descriptor"] = EntityDescriptor(
name = descriptor_name,
class_name = name,
type_ = name,
fields_descriptors = bot_fields_descriptors,
field_sequence = descriptor_fields_sequence,
**descriptor_kwargs)
else:
descriptor_fields_sequence = list(bot_fields_descriptors.keys())
descriptor_fields_sequence.remove("id")
descriptor_name = name.lower()
namespace["bot_entity_descriptor"] = EntityDescriptor(
name = descriptor_name,
class_name = name,
type_ = name,
fields_descriptors = bot_fields_descriptors,
field_sequence = descriptor_fields_sequence)
for field_descriptor in bot_fields_descriptors.values():
field_descriptor.entity_descriptor = namespace["bot_entity_descriptor"]
if "table" not in kwargs:
kwargs["table"] = True
if kwargs["table"] == True:
entity_metadata = EntityMetadata()
entity_metadata.entity_descriptors[descriptor_name] = namespace["bot_entity_descriptor"]
if "__annotations__" in namespace:
namespace["__annotations__"]["entity_metadata"] = ClassVar[EntityMetadata]
else:
namespace["__annotations__"] = {"entity_metadata": ClassVar[EntityMetadata]}
namespace["entity_metadata"] = entity_metadata
type_ = super().__new__(mcs, name, bases, namespace, **kwargs)
if name in mcs.__future_references__:
for field_descriptor in mcs.__future_references__[name]:
field_descriptor.type_ = list[type_] if get_origin(field_descriptor.type_) == list else type_
a = field_descriptor
setattr(namespace["bot_entity_descriptor"], "type_", type_)
return type_
class BotEntity[CreateSchemaType: BaseModel,
UpdateSchemaType: BaseModel](SQLModel,
metaclass = BotEntityMetaclass,
table = False):
bot_entity_descriptor: ClassVar[EntityDescriptor]
entity_metadata: ClassVar[EntityMetadata]
id: int = Field(
primary_key = True,
sa_type = BIGINT)
name: str
@classmethod
@session_dep
async def get(cls, *,
session: AsyncSession | None = None,
id: int):
return await session.get(cls, id)
@classmethod
@session_dep
async def get_count(cls, *,
session: AsyncSession | None = None) -> int:
return await session.scalar(select(func.count()).select_from(cls))
@classmethod
@session_dep
async def get_multi(cls, *,
session: AsyncSession | None = None,
order_by = None,
skip: int = 0,
limit: int = None):
select_statement = select(cls).offset(skip)
if limit:
select_statement = select_statement.limit(limit)
if order_by:
select_statement = select_statement.order_by(order_by)
return (await session.exec(select_statement)).all()
@classmethod
@session_dep
async def create(cls, *,
session: AsyncSession | None = None,
obj_in: CreateSchemaType,
commit: bool = False):
if isinstance(obj_in, cls):
obj = obj_in
else:
obj = cls(**obj_in.model_dump())
session.add(obj)
if commit:
await session.commit()
return obj
@classmethod
@session_dep
async def update(cls, *,
session: AsyncSession | None = None,
id: int,
obj_in: UpdateSchemaType,
commit: bool = False):
obj = await session.get(cls, id)
if obj:
obj_data = obj.model_dump()
update_data = obj_in.model_dump(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(obj, field, update_data[field])
session.add(obj)
if commit:
await session.commit()
return obj
return None
@classmethod
@session_dep
async def remove(cls, *,
session: AsyncSession | None = None,
id: int,
commit: bool = False):
obj = await session.get(cls, id)
if obj:
await session.delete(obj)
if commit:
await session.commit()
return obj
return None

147
model/bot_enum.py Normal file
View File

@@ -0,0 +1,147 @@
from pydantic_core.core_schema import str_schema
from sqlalchemy.types import TypeDecorator, String
from typing import Any, Self, overload
class BotEnumMetaclass(type):
def __new__(cls, name: str, bases: tuple[type], namespace: dict[str, Any]):
all_members = {}
if bases and bases[0].__name__ != "BotEnum" and "all_members" in bases[0].__dict__:
all_members = bases[0].__dict__["all_members"]
annotations = {}
for key, value in namespace.items():
if (key.isupper() and
not key.startswith("__") and
not key.endswith("__")):
if not isinstance(value, EnumMember):
value = EnumMember(value, None)
if key in all_members.keys() and all_members[key].value != value.value:
raise ValueError(f"Enum member {key} already exists with different value. Use same value to extend it.")
if (value.value in [member.value for member in all_members.values()] and
key not in all_members.keys()):
raise ValueError(f"Duplicate enum value {value[0]}")
member = EnumMember(value = value.value, loc_obj = value.loc_obj, parent = None, name = key, casting = False)
namespace[key] = member
all_members[key] = member
annotations[key] = type(member)
namespace["__annotations__"] = annotations
namespace["all_members"] = all_members
type_ = super().__new__(cls, name, bases, namespace)
for key, value in all_members.items():
if not value._parent:
value._parent = type_
return type_
class EnumMember(object):
@overload
def __init__(self, value: str) -> "EnumMember":...
@overload
def __init__(self, value: "EnumMember") -> "EnumMember":...
@overload
def __init__(self, value: str, loc_obj: dict[str, str]) -> "EnumMember":...
def __init__(self,
value: str = None,
loc_obj: dict[str, str] = None,
parent: type = None,
name: str = None,
casting: bool = True) -> "EnumMember":
if not casting:
self._parent = parent
self._name = name
self.value = value
self.loc_obj = loc_obj
@overload
def __new__(cls: Self, *args, **kwargs) -> "EnumMember":...
def __new__(cls, *args, casting: bool = True, **kwargs) -> "EnumMember":
if (cls.__name__ == "EnumMember") or not casting:
obj = super().__new__(cls)
kwargs["casting"] = False
obj.__init__(*args, **kwargs)
return obj
if args.__len__() == 0:
return list(cls.all_members.values())[0]
if args.__len__() == 1 and isinstance(args[0], str):
return {member.value: member for key, member in cls.all_members.items()}[args[0]]
elif args.__len__() == 1:
return {member.value: member for key, member in cls.all_members.items()}[args[0].value]
else:
return args[0]
def __get_pydantic_core_schema__(cls, *args, **kwargs):
return str_schema()
def __get__(self, instance, owner) -> Self:
# return {member.value: member for key, member in owner.all_members.items()}[self.value]
return {member.value: member for key, member in self._parent.all_members.items()}[self.value]
def __set__(self, instance, value):
instance.__dict__[self] = value
def __repr__(self):
return f"<{self._parent.__name__ if self._parent else "EnumMember"}.{self._name}: '{self.value}'>"
def __str__(self):
return self.value
def __eq__(self, other : Self | str) -> bool:
if other is None:
return False
if isinstance(other, str):
return self.value == other
return self.value == other.value
def __hash__(self):
return hash(self.value)
def localized(self, lang: str = None) -> str:
if self.loc_obj and len(self.loc_obj) > 0:
if lang and lang in self.loc_obj.keys():
return self.loc_obj[lang]
else:
return self.loc_obj[list(self.loc_obj.keys())[0]]
return self.value
class BotEnum(EnumMember, metaclass = BotEnumMetaclass):
all_members: dict[str, EnumMember]
class EnumType(TypeDecorator):
impl = String(256)
def __init__(self, enum_type: BotEnum):
self._enum_type = enum_type
super().__init__()
def process_bind_param(self, value, dialect):
if value and isinstance(value, EnumMember):
return value.value
return None
def process_result_value(self, value, dialect):
if value:
return self._enum_type(value)
return None

4
model/default_user.py Normal file
View File

@@ -0,0 +1,4 @@
from .user import UserBase
class DefaultUser(UserBase): ...

90
model/descriptors.py Normal file
View File

@@ -0,0 +1,90 @@
from typing import Any, Callable
from babel.support import LazyProxy
from dataclasses import dataclass, field
from .role import RoleBase
from . import EntityPermission
EntityCaptionCallable = Callable[["EntityDescriptor"], str]
EntityItemCaptionCallable = Callable[["EntityDescriptor", Any], str]
EntityFieldCaptionCallable = Callable[["EntityFieldDescriptor", Any, Any], str]
@dataclass(kw_only = True)
class _BaseEntityFieldDescriptor():
icon: str = None
caption_str: str | LazyProxy | EntityFieldCaptionCallable | None = None
caption_btn: str | LazyProxy | EntityFieldCaptionCallable | None = None
description: str | LazyProxy | EntityFieldCaptionCallable | None = None
edit_prompt: str | LazyProxy | EntityFieldCaptionCallable | None = None
caption_value_str: str | LazyProxy | EntityFieldCaptionCallable | None = None
caption_value_btn: str | LazyProxy | EntityFieldCaptionCallable | None = None
is_visible: bool = True
localizable: bool = False
bool_false_value: str | LazyProxy = "no"
bool_false_value_btn: str | LazyProxy = "no"
bool_true_value: str | LazyProxy = "yes"
bool_true_value_btn: str | LazyProxy = "yes"
default: Any = None
@dataclass(kw_only = True)
class EntityField(_BaseEntityFieldDescriptor):
name: str | None = None
sm_descriptor: Any = None
@dataclass(kw_only = True)
class Setting(_BaseEntityFieldDescriptor):
name: str | None = None
@dataclass(kw_only = True)
class EntityFieldDescriptor(_BaseEntityFieldDescriptor):
name: str
field_name: str
type_: type
entity_descriptor: "EntityDescriptor" = None
def __hash__(self):
return self.name.__hash__()
@dataclass(kw_only = True)
class _BaseEntityDescriptor:
icon: str = "📘"
caption_msg: str | LazyProxy | EntityCaptionCallable | None = None
caption_btn: str | LazyProxy | EntityCaptionCallable | None = None
description: str | LazyProxy | EntityCaptionCallable | None = None
item_caption_msg: EntityItemCaptionCallable | None = None
item_caption_btn: EntityItemCaptionCallable | None = None
show_in_entities_menu: bool = True
field_sequence: list[str] = None
permissions: dict[EntityPermission, list[RoleBase]] = field(default_factory = lambda: {
EntityPermission.LIST: [RoleBase.DEFAULT_USER, RoleBase.SUPER_USER],
EntityPermission.READ: [RoleBase.DEFAULT_USER, RoleBase.SUPER_USER],
EntityPermission.CREATE: [RoleBase.DEFAULT_USER, RoleBase.SUPER_USER],
EntityPermission.UPDATE: [RoleBase.DEFAULT_USER, RoleBase.SUPER_USER],
EntityPermission.DELETE: [RoleBase.DEFAULT_USER, RoleBase.SUPER_USER],
EntityPermission.LIST_ALL: [RoleBase.SUPER_USER],
EntityPermission.READ_ALL: [RoleBase.SUPER_USER],
EntityPermission.CREATE_ALL: [RoleBase.SUPER_USER],
EntityPermission.UPDATE_ALL: [RoleBase.SUPER_USER],
EntityPermission.DELETE_ALL: [RoleBase.SUPER_USER]
})
@dataclass(kw_only = True)
class Entity(_BaseEntityDescriptor):
name: str | None = None
@dataclass
class EntityDescriptor(_BaseEntityDescriptor):
name: str
class_name: str
type_: type
fields_descriptors: dict[str, EntityFieldDescriptor]

8
model/entity_metadata.py Normal file
View File

@@ -0,0 +1,8 @@
from .descriptors import EntityDescriptor
from ._singleton import Singleton
class EntityMetadata(metaclass = Singleton):
def __init__(self):
self.entity_descriptors: dict[str, EntityDescriptor] = {}

45
model/field_types.py Normal file
View File

@@ -0,0 +1,45 @@
from dataclasses import dataclass
from babel.support import LazyProxy
from typing import TypeVar
from .bot_entity import BotEntity
@dataclass
class FieldType: ...
class LocStr(str): ...
class String(FieldType):
localizable: bool = False
class Integer(FieldType):
pass
@dataclass
class Decimal:
precision: int = 0
@dataclass
class Boolean:
true_value: str | LazyProxy = "true"
false_value: str | LazyProxy = "false"
@dataclass
class DateTime:
pass
EntityType = TypeVar('EntityType', bound = BotEntity)
@dataclass
class EntityReference:
entity_type: type[EntityType]

8
model/fsm_storage.py Normal file
View File

@@ -0,0 +1,8 @@
from sqlmodel import SQLModel, Field
class FSMStorage(SQLModel, table = True):
__tablename__ = "fsm_storage"
key: str = Field(primary_key = True)
value: str | None = None

6
model/language.py Normal file
View File

@@ -0,0 +1,6 @@
from .bot_enum import BotEnum, EnumMember
class LanguageBase(BotEnum):
DEFAULT = EnumMember("en", {"en": "🇬🇧 english"})

44
model/owned_bot_entity.py Normal file
View File

@@ -0,0 +1,44 @@
from sqlmodel import BIGINT, Field, select, func
from sqlmodel.ext.asyncio.session import AsyncSession
from .bot_entity import BotEntity
from .descriptors import EntityField
from .user import UserBase
from . import session_dep
class OwnedBotEntity(BotEntity, table = False):
user_id: int | None = EntityField(
sm_descriptor = Field(sa_type = BIGINT, foreign_key = "user.id", ondelete="SET NULL"),
is_visible = False)
@classmethod
@session_dep
async def get_multi_by_user(cls, *,
session: AsyncSession | None = None,
user_id: int,
order_by = None,
skip: int = 0,
limit: int = None):
select_statement = select(cls).where(cls.user_id == user_id).offset(skip)
if limit:
select_statement = select_statement.limit(limit)
if order_by:
select_statement = select_statement.order_by(order_by)
return (await session.exec(select_statement)).all()
@classmethod
@session_dep
async def get_count_by_user(cls, *,
session: AsyncSession | None = None,
user_id: int):
return await session.scalar(
select(func.count()).
select_from(cls).
where(cls.user_id == user_id))

7
model/role.py Normal file
View File

@@ -0,0 +1,7 @@
from .bot_enum import BotEnum, EnumMember
class RoleBase(BotEnum):
SUPER_USER = EnumMember("super_user")
DEFAULT_USER = EnumMember("default_user")

183
model/settings.py Normal file
View File

@@ -0,0 +1,183 @@
from datetime import datetime
from sqlmodel import SQLModel, Field, select
from typing import Any, get_origin
from ..db import async_session
from .role import RoleBase
from .descriptors import EntityFieldDescriptor, Setting
from ..utils import deserialize, serialize
class DbSettings(SQLModel, table = True):
__tablename__ = "settings"
name: str = Field(primary_key = True)
value: str
class SettingsMetaclass(type):
def __new__(cls, class_name, base_classes, attributes):
settings_descriptors = {}
if base_classes:
settings_descriptors = base_classes[0].__dict__.get("_settings_descriptors", {})
for annotation in attributes['__annotations__']:
if annotation in ["_settings_descriptors", "_cache", "_cached_settings"]:
continue
attr_value = attributes.get(annotation)
name = annotation
if isinstance(attr_value, Setting):
descriptor_kwargs = attr_value.__dict__.copy()
name = descriptor_kwargs.pop("name") or annotation
attributes[annotation] = EntityFieldDescriptor(
name = name,
field_name = annotation,
type_ = attributes['__annotations__'][annotation],
**descriptor_kwargs)
else:
attributes[annotation] = EntityFieldDescriptor(
name = annotation,
field_name = annotation,
type_ = attributes['__annotations__'][annotation],
default = attr_value)
settings_descriptors[name] = attributes[annotation]
attributes["__annotations__"] = {}
attributes["_settings_descriptors"] = settings_descriptors
return super().__new__(cls, class_name, base_classes, attributes)
class Settings(metaclass = SettingsMetaclass):
_cache: dict[str, Any] = dict[str, Any]()
_settings_descriptors: dict[str, EntityFieldDescriptor] = {}
PAGE_SIZE: int = Setting(default = 10, )
SECURITY_SETTINGS_ROLES: list[RoleBase] = [RoleBase.SUPER_USER]
APP_STRINGS_WELCOME_P_NAME: str = Setting(name = "AS_WELCOME", default = "Welcome, {name}", is_visible = False)
APP_STRINGS_GREETING_P_NAME: str = Setting(name = "AS_GREETING", default = "Hello, {name}", is_visible = False)
APP_STRINGS_INTERNAL_ERROR_P_ERROR: str = Setting(name = "AS_INTERNAL_ERROR", default = "Internal error\n{error}", is_visible = False)
APP_STRINGS_USER_BLOCKED_P_NAME: str = Setting(name = "AS_USER_BLOCKED", default = "User {name} is blocked", is_visible = False)
APP_STRINGS_FORBIDDEN: str = Setting(name = "AS_FORBIDDEN", default = "Forbidden", is_visible = False)
APP_STRINGS_NOT_FOUND: str = Setting(name = "AS_NOT_FOUND", default = "Object not found", is_visible = False)
APP_STRINGS_MAIN_NENU: str = Setting(name = "AS_MAIN_MENU", default = "Main menu", is_visible = False)
APP_STRINGS_REFERENCES: str = Setting(name = "AS_REFERENCES", default = "References", is_visible = False)
APP_STRINGS_REFERENCES_BTN: str = Setting(name = "AS_REFERENCES_BTN", default = "📚 References", is_visible = False)
APP_STRINGS_SETTINGS: str = Setting(name = "AS_SETTINGS", default = "Settings", is_visible = False)
APP_STRINGS_SETTINGS_BTN: str = Setting(name = "AS_SETTINGS_BTN", default = "⚙️ Settings", is_visible = False)
APP_STRINGS_PARAMETERS: str = Setting(name = "AS_PARAMETERS", default = "Parameters", is_visible = False)
APP_STRINGS_PARAMETERS_BTN: str = Setting(name = "AS_PARAMETERS_BTN", default = "🎛️ Parameters", is_visible = False)
APP_STRINGS_LANGUAGE: str = Setting(name = "AS_LANGUAGE", default = "Language", is_visible = False)
APP_STRINGS_LANGUAGE_BTN: str = Setting(name = "AS_LANGUAGE_BTN", default = "🗣️ Language", is_visible = False)
APP_STRINGS_BACK_BTN: str = Setting(name = "AS_BACK_BTN", default = "⬅️ Back", is_visible = False)
APP_STRINGS_DELETE_BTN: str = Setting(name = "AS_DELETE_BTN", default = "🗑️ Delete", is_visible = False)
APP_STRINGS_CONFIRM_DELETE_P_NAME: str = Setting(
name = "AS_CONFIRM_DEL",
default = "Are you sure you want to delete \"{name}\"?",
is_visible = False)
APP_STRINGS_EDIT_BTN: str = Setting(name = "AS_EDIT_BTN", default = "✏️ Edit", is_visible = False)
APP_STRINGS_ADD_BTN: str = Setting(name = "AS_ADD_BTN", default = " Add", is_visible = False)
APP_STRINGS_YES_BTN: str = Setting(name = "AS_YES_BTN", default = "✅ Yes", is_visible = False)
APP_STRINGS_NO_BTN: str = Setting(name = "AS_NO_BTN", default = "❌ No", is_visible = False)
APP_STRINGS_CANCEL_BTN: str = Setting(name = "AS_CANCEL_BTN", default = "❌ Cancel", is_visible = False)
APP_STRINGS_DONE_BTN: str = Setting(name = "AS_DONE_BTN", default = "✅ Done", is_visible = False)
APP_STRINGS_SKIP_BTN: str = Setting(name = "AS_SKIP_BTN", default = "⏩️ Skip", is_visible = False)
APP_STRINGS_FIELD_EDIT_PROMPT_TEMPLATE_P_NAME_VALUE: str = Setting(
name = "AS_FIELDEDIT_PROMPT",
default = "Enter new value for \"{name}\" (current value: {value})",
is_visible = False)
APP_STRINGS_FIELD_CREATE_PROMPT_TEMPLATE_P_NAME: str = Setting(
name = "AS_FIELDCREATE_PROMPT",
default = "Enter new value for \"{name}\"",
is_visible = False)
APP_STRINGS_STRING_EDITOR_LOCALE_TEMPLATE_P_NAME: str = Setting(
name = "AS_STREDIT_LOC_TEMPLATE",
default = "string for \"{name}\"",
is_visible = False)
APP_STRINGS_INVALID_INPUT: str = Setting(name = "AS_INVALID_INPUT", default = "Invalid input", is_visible = False)
@classmethod
async def get[T](cls, param: T) -> T:
name = param.field_name
if param.name not in cls._cache.keys():
cls._cache[name] = await cls.load_param(param)
return cls._cache[name]
@classmethod
async def load_param(cls, param: EntityFieldDescriptor) -> Any:
async with async_session() as session:
db_setting = (await session.exec(
select(DbSettings)
.where(DbSettings.name == param.field_name))).first()
if db_setting:
return await deserialize(session = session,
type_ = param.type_,
value = db_setting.value)
return (param.default if param.default else
[] if (get_origin(param.type_) is list or param.type_ == list) else
datetime(2000, 1, 1) if param.type_ == datetime else
param.type_())
@classmethod
async def load_params(cls):
async with async_session() as session:
db_settings = (await session.exec(select(DbSettings))).all()
for db_setting in db_settings:
if db_setting.name in cls.__dict__:
setting = cls.__dict__[db_setting.name]
cls._cache[db_setting.name] = await deserialize(session = session,
type_ = setting.type_,
value = db_setting.value,
default = setting.default)
cls._loaded = True
@classmethod
async def set_param(cls, param: str | EntityFieldDescriptor, value) -> None:
if isinstance(param, str):
param = cls._settings_descriptors[param]
ser_value = serialize(value, param)
async with async_session() as session:
db_setting = (await session.exec(
select(DbSettings)
.where(DbSettings.name == param.field_name))).first()
if db_setting is None:
db_setting = DbSettings(name = param.field_name)
db_setting.value = str(ser_value)
session.add(db_setting)
await session.commit()
cls._cache[param.field_name] = value
@classmethod
def list_params(cls) -> dict[str, EntityFieldDescriptor]:
return cls._settings_descriptors
@classmethod
async def get_params(cls) -> dict[EntityFieldDescriptor, Any]:
params = cls.list_params()
return {param: await cls.get(param) for _, param in params.items()}

19
model/user.py Normal file
View File

@@ -0,0 +1,19 @@
from sqlmodel import Field, ARRAY
from .bot_entity import BotEntity
from .bot_enum import EnumType
from .language import LanguageBase
from .role import RoleBase
from .settings import DbSettings as DbSettings
from .fsm_storage import FSMStorage as FSMStorage
class UserBase(BotEntity, table = False):
__tablename__ = "user"
lang: LanguageBase = Field(sa_type = EnumType(LanguageBase), default = LanguageBase.DEFAULT)
is_active: bool = True
roles: list[RoleBase] = Field(sa_type=ARRAY(EnumType(RoleBase)), default = [RoleBase.DEFAULT_USER])

103
utils/__init__.py Normal file
View File

@@ -0,0 +1,103 @@
from datetime import datetime
from decimal import Decimal
from types import NoneType, UnionType
from sqlmodel import select, column
from sqlmodel.ext.asyncio.session import AsyncSession
from typing import Any, get_origin, get_args, TYPE_CHECKING
import ujson as json
from ..model.bot_entity import BotEntity
from ..model.bot_enum import BotEnum
from ..model.descriptors import EntityFieldDescriptor, EntityDescriptor
from ..model import EntityPermission
if TYPE_CHECKING:
from ..model.user import UserBase
async def deserialize[T](session: AsyncSession, type_: type[T], value: str = None) -> T:
type_origin = get_origin(type_)
is_optional = False
if type_origin == UnionType:
args = get_args(type_)
if args[1] == NoneType:
type_ = args[0]
if value is None:
return None
is_optional = True
if get_origin(type_) == list:
arg_type = None
args = get_args(type_)
if args:
arg_type = args[0]
values = json.loads(value) if value else []
if arg_type:
if issubclass(arg_type, BotEntity):
ret = list[arg_type]()
items = (await session.exec(select(arg_type).where(column("id").in_(values)))).all()
for item in items:
ret.append(item)
return ret
elif issubclass(arg_type, BotEnum):
return [arg_type(value) for value in values]
else:
return [arg_type(value) for value in values]
else:
return values
elif issubclass(type_, BotEntity):
return await session.get(type_, int(value))
elif issubclass(type_, BotEnum):
if is_optional and not value:
return None
return type_(value)
elif type_ == datetime:
if is_optional and not value:
return None
return datetime.fromisoformat(value)
elif type_ == bool:
return value == "True"
elif type_ == Decimal:
if is_optional and not value:
return None
return Decimal(value)
if is_optional and not value:
return None
return type_(value)
def serialize(value: Any, field_descriptor: EntityFieldDescriptor) -> str:
if value is None:
return ""
type_ = field_descriptor.type_
type_origin = get_origin(type_)
if type_origin == UnionType:
args = get_args(type_)
if args[1] == NoneType:
type_ = get_args(type_)[0]
if type_origin == list:
arg_type = None
args = get_args(type_)
if args:
arg_type = args[0]
if arg_type and issubclass(arg_type, BotEntity):
return json.dumps([item.id for item in value])
elif arg_type and issubclass(arg_type, BotEnum):
return json.dumps([item.value for item in value])
else:
return json.dumps(value)
elif issubclass(type_, BotEntity):
return str(value.id) if value else ""
return str(value)
def get_user_permissions(user: "UserBase", entity_descriptor: EntityDescriptor) -> list[EntityPermission]:
permissions = list[EntityPermission]()
for permission, roles in entity_descriptor.permissions.items():
for role in roles:
if role in user.roles:
permissions.append(permission)
break
return permissions

Binary file not shown.