init
This commit is contained in:
1
.python-version
Normal file
1
.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.13
|
||||
20
LICENSE
20
LICENSE
@@ -1,9 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 BotForge
|
||||
Copyright (c) 2025 Alexander Kalinovsky
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
10
README.md
10
README.md
@@ -1,3 +1,13 @@
|
||||
# quickbot-agent
|
||||
|
||||
GPT-based function calling and dialogue orchestration plugin for the QuickBot framework.
|
||||
|
||||
quickbot-tools is a plugin for the QuickBot framework that enables natural language understanding and structured function execution using OpenAI GPT models.
|
||||
It provides a universal mechanism for:
|
||||
• Interpreting user requests in plain language
|
||||
• Resolving entities from business data
|
||||
• Managing multi-step dialogues with function chaining
|
||||
• Executing registered business methods via structured tool calls
|
||||
• Returning AI-generated natural language summaries of results
|
||||
|
||||
Ideal for AI assistants that act as business logic mediators, schedulers, or interactive client-facing agents.
|
||||
9
pyproject.toml
Normal file
9
pyproject.toml
Normal file
@@ -0,0 +1,9 @@
|
||||
[project]
|
||||
name = "quickbot-agent"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"openai>=1.97.1",
|
||||
]
|
||||
2
src/quickbot_agent/__init__.py
Normal file
2
src/quickbot_agent/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .model.message_log import MessageLog as MessageLog
|
||||
from .main import AgentPlugin as AgentPlugin
|
||||
14
src/quickbot_agent/config.py
Normal file
14
src/quickbot_agent/config.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
|
||||
class Config(BaseSettings):
|
||||
model_config = SettingsConfigDict(
|
||||
env_file=".env", env_ignore_empty=True, extra="ignore"
|
||||
)
|
||||
|
||||
OPENAI_API_KEY: str
|
||||
OPENAI_MODEL: str = "gpt-4o-2024-11-20"
|
||||
MESSAGE_HISTORY_DEPTH: int = 10
|
||||
|
||||
|
||||
config = Config()
|
||||
323
src/quickbot_agent/handlers/messages.py
Normal file
323
src/quickbot_agent/handlers/messages.py
Normal file
@@ -0,0 +1,323 @@
|
||||
from inspect import iscoroutinefunction
|
||||
from aiogram import Router
|
||||
from aiogram.types import Message
|
||||
from aiogram.utils.i18n.lazy_proxy import LazyProxy
|
||||
from fastapi.datastructures import State
|
||||
from openai import AsyncOpenAI
|
||||
from openai.types.chat import ChatCompletionMessage
|
||||
from pydantic import BaseModel
|
||||
from quickbot.model.user import UserBase
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from quickbot import QBotApp, BotContext
|
||||
from quickbot.model.crud_service import ForbiddenError, NotFoundError
|
||||
from quickbot.model.crud_command import CrudCommand
|
||||
from ..utils import get_message_log, add_message_log, generate_gpt_tools_schemas
|
||||
from typing import TYPE_CHECKING
|
||||
import ujson as json
|
||||
from ..config import config
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..main import AgentPlugin
|
||||
|
||||
router = Router()
|
||||
|
||||
|
||||
async def handle_openai_text(context: BotContext, output: ChatCompletionMessage):
|
||||
if output.content:
|
||||
await context.message.answer(output.content)
|
||||
await add_message_log(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
client_id=context.message.chat.id
|
||||
if context.message.business_connection_id
|
||||
else None,
|
||||
content=json.dumps(
|
||||
{"role": "assistant", "content": output.content}, ensure_ascii=False
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def handle_openai_cycle(
|
||||
context: BotContext,
|
||||
client: AsyncOpenAI,
|
||||
plugin: "AgentPlugin",
|
||||
messages: list[dict],
|
||||
):
|
||||
max_iterations = 10 # Prevent infinite loops
|
||||
iteration = 0
|
||||
|
||||
while iteration < max_iterations:
|
||||
iteration += 1
|
||||
|
||||
# response = await client.responses.create(
|
||||
# background=False,
|
||||
# instructions=system_prompt,
|
||||
# input=messages,
|
||||
# tools=plugin.gpt_tools_metadata,
|
||||
# tool_choice="auto",
|
||||
# model=config.OPENAI_MODEL,
|
||||
# )
|
||||
|
||||
if callable(plugin.tools_context):
|
||||
if iscoroutinefunction(plugin.tools_context):
|
||||
tools_context = await plugin.tools_context(context)
|
||||
else:
|
||||
tools_context = plugin.tools_context(context)
|
||||
gpt_tools_metadata = generate_gpt_tools_schemas(
|
||||
context.app.bot_metadata, tools_context
|
||||
)
|
||||
else:
|
||||
gpt_tools_metadata = plugin._gpt_tools_metadata
|
||||
|
||||
response = await client.chat.completions.create(
|
||||
model=config.OPENAI_MODEL,
|
||||
messages=messages,
|
||||
tools=gpt_tools_metadata,
|
||||
tool_choice="auto",
|
||||
)
|
||||
|
||||
has_function_calls = False
|
||||
for output in response.choices:
|
||||
if output.message.content:
|
||||
await handle_openai_text(context=context, output=output.message)
|
||||
messages.append(output.message)
|
||||
elif output.message.tool_calls:
|
||||
tool_call = output.message.tool_calls[0]
|
||||
has_function_calls = True
|
||||
tool_call_message = {
|
||||
"role": "assistant",
|
||||
"tool_calls": [
|
||||
{
|
||||
"type": "function",
|
||||
"id": tool_call.id,
|
||||
"function": tool_call.function.model_dump(mode="json"),
|
||||
}
|
||||
],
|
||||
}
|
||||
await add_message_log(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
client_id=context.message.chat.id
|
||||
if context.message.business_connection_id
|
||||
else None,
|
||||
content=json.dumps(tool_call_message, ensure_ascii=False),
|
||||
)
|
||||
messages.append(tool_call_message)
|
||||
# Execute the function call and append result to messages
|
||||
function_name = tool_call.function.name
|
||||
function_args: dict = json.loads(tool_call.function.arguments)
|
||||
function_name_parts = function_name.split("__")
|
||||
result = None
|
||||
if function_name_parts[0] == "model":
|
||||
entity_name = function_name_parts[1]
|
||||
crud_command = function_name_parts[2]
|
||||
entity_descriptor = context.app.bot_metadata.entity_descriptors[
|
||||
entity_name
|
||||
]
|
||||
try:
|
||||
if crud_command == CrudCommand.LIST:
|
||||
result = await entity_descriptor.crud.list_all(
|
||||
db_session=context.db_session, user=context.user
|
||||
)
|
||||
elif crud_command == CrudCommand.GET_BY_ID:
|
||||
entity_id = function_args.pop("id")
|
||||
result = await entity_descriptor.crud.get_by_id(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
id=entity_id,
|
||||
)
|
||||
elif crud_command == CrudCommand.CREATE:
|
||||
result = await entity_descriptor.crud.create(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
model=entity_descriptor.crud.create_schema(
|
||||
**function_args
|
||||
),
|
||||
)
|
||||
elif crud_command == CrudCommand.UPDATE:
|
||||
entity_id = function_args.pop("id")
|
||||
result = await entity_descriptor.crud.update(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
id=entity_id,
|
||||
model=entity_descriptor.crud.update_schema(
|
||||
**function_args
|
||||
),
|
||||
)
|
||||
elif crud_command == CrudCommand.DELETE:
|
||||
entity_id = function_args.pop("id")
|
||||
result = await entity_descriptor.crud.delete(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
id=entity_id,
|
||||
)
|
||||
except ForbiddenError as e:
|
||||
result = {"error": f"Forbidden: {e}"}
|
||||
except NotFoundError as e:
|
||||
result = {"error": f"Not found: {e}"}
|
||||
except Exception as e:
|
||||
result = {"error": f"Error: {e}"}
|
||||
elif function_name_parts[0] == "process":
|
||||
process_name = function_name_parts[1]
|
||||
process_descriptor = context.app.bot_metadata.process_descriptors[
|
||||
process_name
|
||||
]
|
||||
try:
|
||||
if iscoroutinefunction(process_descriptor.process_class.run):
|
||||
result = await process_descriptor.process_class.run(
|
||||
context, parameters=process_descriptor.input_schema(**function_args)
|
||||
)
|
||||
else:
|
||||
result = process_descriptor.process_class.run(
|
||||
context, parameters=process_descriptor.input_schema(**function_args)
|
||||
)
|
||||
except Exception as e:
|
||||
result = {"error": f"Error: {e}"}
|
||||
# Append the function call result as assistant message
|
||||
if isinstance(result, BaseModel):
|
||||
result = result.model_dump(mode="json")
|
||||
elif isinstance(result, list):
|
||||
result = [item.model_dump(mode="json") for item in result]
|
||||
|
||||
message = {
|
||||
"role": "tool",
|
||||
"tool_call_id": tool_call.id,
|
||||
"content": str(result),
|
||||
}
|
||||
await add_message_log(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
client_id=context.message.chat.id
|
||||
if context.message.business_connection_id
|
||||
else None,
|
||||
content=json.dumps(message, ensure_ascii=False),
|
||||
)
|
||||
messages.append(message)
|
||||
|
||||
# If no function calls were made, break the loop
|
||||
if not has_function_calls:
|
||||
break
|
||||
|
||||
if iteration >= max_iterations:
|
||||
await context.message.answer(
|
||||
"Maximum conversation iterations reached. Please try again."
|
||||
)
|
||||
|
||||
|
||||
@router.message()
|
||||
async def handle_message(
|
||||
message: Message,
|
||||
db_session: AsyncSession,
|
||||
user: UserBase,
|
||||
app: QBotApp,
|
||||
app_state: State,
|
||||
) -> None:
|
||||
plugin: "AgentPlugin" = app.plugins["AgentPlugin"]
|
||||
|
||||
context = BotContext(
|
||||
db_session=db_session,
|
||||
app=app,
|
||||
app_state=app_state,
|
||||
user=user,
|
||||
message=message,
|
||||
)
|
||||
|
||||
await add_message_log(
|
||||
db_session=db_session,
|
||||
user=user,
|
||||
content=json.dumps(
|
||||
{"role": "user", "content": message.text}, ensure_ascii=False
|
||||
),
|
||||
)
|
||||
|
||||
system_prompt = await get_system_prompt(plugin, context)
|
||||
messages = await get_messages(plugin, context, system_prompt)
|
||||
|
||||
async with AsyncOpenAI(
|
||||
api_key=config.OPENAI_API_KEY,
|
||||
) as client:
|
||||
await handle_openai_cycle(
|
||||
context=context,
|
||||
client=client,
|
||||
plugin=plugin,
|
||||
messages=messages,
|
||||
)
|
||||
|
||||
|
||||
@router.business_message()
|
||||
async def handle_business_message(
|
||||
message: Message,
|
||||
db_session: AsyncSession,
|
||||
user: UserBase,
|
||||
app: QBotApp,
|
||||
app_state: State,
|
||||
) -> None:
|
||||
plugin: "AgentPlugin" = app.plugins["AgentPlugin"]
|
||||
|
||||
context = BotContext(
|
||||
db_session=db_session,
|
||||
app=app,
|
||||
app_state=app_state,
|
||||
user=user,
|
||||
message=message,
|
||||
)
|
||||
|
||||
await add_message_log(
|
||||
db_session=db_session,
|
||||
user=user,
|
||||
client_id=message.chat.id if message.business_connection_id else None,
|
||||
content=json.dumps(
|
||||
{"role": "user", "content": message.text}, ensure_ascii=False
|
||||
),
|
||||
)
|
||||
|
||||
system_prompt = await get_system_prompt(plugin, context)
|
||||
messages = await get_messages(plugin, context, system_prompt)
|
||||
|
||||
async with AsyncOpenAI(
|
||||
api_key=config.OPENAI_API_KEY,
|
||||
) as client:
|
||||
await handle_openai_cycle(
|
||||
context=context,
|
||||
client=client,
|
||||
plugin=plugin,
|
||||
messages=messages,
|
||||
)
|
||||
|
||||
|
||||
async def get_system_prompt(plugin: "AgentPlugin", context: BotContext) -> str:
|
||||
system_prompt = plugin.system_prompt
|
||||
if isinstance(system_prompt, LazyProxy):
|
||||
system_prompt = system_prompt.value
|
||||
elif callable(system_prompt):
|
||||
if iscoroutinefunction(system_prompt):
|
||||
system_prompt = await system_prompt(context)
|
||||
else:
|
||||
system_prompt = system_prompt(context)
|
||||
else:
|
||||
system_prompt = system_prompt
|
||||
|
||||
return system_prompt
|
||||
|
||||
|
||||
async def get_messages(
|
||||
plugin: "AgentPlugin", context: BotContext, system_prompt: str
|
||||
) -> list[dict]:
|
||||
message_log = await get_message_log(
|
||||
db_session=context.db_session,
|
||||
user=context.user,
|
||||
client_id=context.message.chat.id
|
||||
if context.message.business_connection_id
|
||||
else None,
|
||||
)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": system_prompt,
|
||||
}
|
||||
]
|
||||
for message_log_item in message_log:
|
||||
messages.append(json.loads(message_log_item.content))
|
||||
|
||||
return messages
|
||||
36
src/quickbot_agent/main.py
Normal file
36
src/quickbot_agent/main.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from aiogram.utils.i18n.lazy_proxy import LazyProxy
|
||||
from quickbot import QBotApp, BotEntity, BotProcess
|
||||
from quickbot.model.crud_command import CrudCommand
|
||||
from quickbot.model.descriptors import BotContext
|
||||
from typing import Callable
|
||||
from .utils import generate_gpt_tools_schemas
|
||||
from .handlers.messages import router as messages_router
|
||||
|
||||
|
||||
class AgentPlugin:
|
||||
def __init__(
|
||||
self,
|
||||
system_prompt: str | LazyProxy | Callable[[BotContext], str],
|
||||
tools_context: list[type[BotEntity] | tuple[type[BotEntity], list[CrudCommand]]]
|
||||
| Callable[
|
||||
[BotContext],
|
||||
list[
|
||||
type[BotProcess]
|
||||
| type[BotEntity]
|
||||
| tuple[type[BotEntity], list[CrudCommand]]
|
||||
],
|
||||
]
|
||||
| None = None,
|
||||
) -> None:
|
||||
self.system_prompt = system_prompt
|
||||
self.tools_context = tools_context
|
||||
self._gpt_tools_metadata = []
|
||||
|
||||
def register(self, app: QBotApp) -> None:
|
||||
if not callable(self.tools_context):
|
||||
self._gpt_tools_metadata = generate_gpt_tools_schemas(
|
||||
app.bot_metadata, self.tools_context
|
||||
)
|
||||
messages_router.message.middleware.register(app.auth)
|
||||
messages_router.business_message.middleware.register(app.auth)
|
||||
app.dp.include_router(messages_router)
|
||||
1
src/quickbot_agent/model/__init__.py
Normal file
1
src/quickbot_agent/model/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .message_log import MessageLog as MessageLog
|
||||
33
src/quickbot_agent/model/message_log.py
Normal file
33
src/quickbot_agent/model/message_log.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from datetime import datetime
|
||||
from sqlmodel import Field, SQLModel, BigInteger
|
||||
|
||||
|
||||
class MessageLog(SQLModel, table=True):
|
||||
__tablename__ = "quickbot_agent_message_log"
|
||||
|
||||
id: int = Field(
|
||||
primary_key=True,
|
||||
index=True,
|
||||
sa_type=BigInteger,
|
||||
)
|
||||
|
||||
user_id: int = Field(
|
||||
index=True,
|
||||
sa_type=BigInteger,
|
||||
)
|
||||
|
||||
is_business_chat: bool = Field(
|
||||
default=False,
|
||||
)
|
||||
|
||||
client_id: int | None = Field(
|
||||
index=True,
|
||||
sa_type=BigInteger,
|
||||
)
|
||||
|
||||
dt: datetime = Field(
|
||||
index=True,
|
||||
default_factory=datetime.now,
|
||||
)
|
||||
|
||||
content: str
|
||||
343
src/quickbot_agent/utils.py
Normal file
343
src/quickbot_agent/utils.py
Normal file
@@ -0,0 +1,343 @@
|
||||
# Полный код: генерация схемы GPT-инструмента на основе Pydantic-модели и Python-функции
|
||||
|
||||
from datetime import datetime, date, time
|
||||
from decimal import Decimal
|
||||
from typing import (
|
||||
get_args,
|
||||
get_origin,
|
||||
Union,
|
||||
List,
|
||||
Dict,
|
||||
Tuple,
|
||||
)
|
||||
from sqlmodel import select, col
|
||||
from sqlmodel.ext.asyncio.session import AsyncSession
|
||||
from pydantic import BaseModel
|
||||
import sys
|
||||
|
||||
from quickbot.model.bot_entity import BotEntity
|
||||
from quickbot.model.crud_command import CrudCommand
|
||||
from quickbot.model.bot_metadata import BotMetadata
|
||||
from quickbot.model.descriptors import EntityDescriptor
|
||||
from quickbot.model.bot_process import BotProcess
|
||||
from quickbot.model.user import UserBase
|
||||
|
||||
from .model import MessageLog
|
||||
from .config import config
|
||||
|
||||
|
||||
# Рекурсивное определение JSON Schema по типу
|
||||
def resolve_json_schema_type(py_type: type) -> dict:
|
||||
origin = get_origin(py_type)
|
||||
args = get_args(py_type)
|
||||
|
||||
if py_type in (str, datetime, date, time, int, float, Decimal, bool):
|
||||
return {"type": type_name(py_type)}
|
||||
elif py_type is None or py_type is type(None):
|
||||
return {"type": "null"}
|
||||
|
||||
if origin is Union and type(None) in args:
|
||||
non_null = [arg for arg in args if arg is not type(None)]
|
||||
if len(non_null) == 1:
|
||||
schema = resolve_json_schema_type(non_null[0])
|
||||
schema["type"] = [schema["type"], "null"]
|
||||
return schema
|
||||
else:
|
||||
return {
|
||||
"anyOf": [resolve_json_schema_type(arg) for arg in non_null]
|
||||
+ [{"type": "null"}]
|
||||
}
|
||||
|
||||
if origin in (list, List):
|
||||
item_schema = resolve_json_schema_type(args[0]) if args else {}
|
||||
return {"type": "array", "items": item_schema}
|
||||
|
||||
if origin in (dict, Dict):
|
||||
key_type, value_type = args if args else (str, object)
|
||||
if key_type not in (str,):
|
||||
raise ValueError("JSON object keys must be strings")
|
||||
return {
|
||||
"type": "object",
|
||||
"additionalProperties": resolve_json_schema_type(value_type),
|
||||
}
|
||||
|
||||
if origin in (tuple, Tuple) and args:
|
||||
if len(args) == 2 and args[1] is Ellipsis:
|
||||
return {"type": "array", "items": resolve_json_schema_type(args[0])}
|
||||
return {
|
||||
"type": "array",
|
||||
"prefixItems": [resolve_json_schema_type(arg) for arg in args],
|
||||
}
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
from typing import Literal
|
||||
|
||||
if origin is Literal:
|
||||
literals = list(args)
|
||||
literal_type = type(literals[0]) if literals else "string"
|
||||
return {"type": type_name(literal_type), "enum": literals}
|
||||
|
||||
if isinstance(py_type, type) and issubclass(py_type, BaseModel):
|
||||
return generate_parameters_from_pydantic(py_type)
|
||||
|
||||
return {"type": "string"}
|
||||
|
||||
|
||||
def type_name(t: type) -> str:
|
||||
if t in (str, datetime, date, time):
|
||||
return "string"
|
||||
elif t in (int, float, Decimal):
|
||||
return "number"
|
||||
elif t in (bool,):
|
||||
return "boolean"
|
||||
elif t in (list, List):
|
||||
return "array"
|
||||
elif t in (dict, Dict):
|
||||
return "object"
|
||||
else:
|
||||
return "string"
|
||||
|
||||
|
||||
def add_additional_properties_false(schema: dict):
|
||||
"""
|
||||
Рекурсивно добавляет additionalProperties: False для всех объектов в схеме.
|
||||
"""
|
||||
if schema.get("type") == "object":
|
||||
schema["additionalProperties"] = False
|
||||
for prop in schema.get("properties", {}).values():
|
||||
add_additional_properties_false(prop)
|
||||
elif schema.get("type") == "array" and "items" in schema:
|
||||
if isinstance(schema["items"], dict):
|
||||
add_additional_properties_false(schema["items"])
|
||||
elif isinstance(schema["items"], list):
|
||||
for item in schema["items"]:
|
||||
add_additional_properties_false(item)
|
||||
if "anyOf" in schema:
|
||||
for sub in schema["anyOf"]:
|
||||
add_additional_properties_false(sub)
|
||||
if "prefixItems" in schema:
|
||||
for sub in schema["prefixItems"]:
|
||||
add_additional_properties_false(sub)
|
||||
|
||||
|
||||
# Генерация параметров по Pydantic-модели
|
||||
# (добавляем additionalProperties: False для объектов)
|
||||
def generate_parameters_from_pydantic(
|
||||
model: type[BaseModel], add_id: bool = False
|
||||
) -> dict:
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
}
|
||||
required = []
|
||||
for name, field in model.model_fields.items():
|
||||
field_schema = resolve_json_schema_type(field.annotation)
|
||||
if field.description:
|
||||
field_schema["description"] = field.description
|
||||
schema["properties"][name] = field_schema
|
||||
if field.is_required():
|
||||
required.append(name)
|
||||
if add_id:
|
||||
schema["properties"]["id"] = {
|
||||
"type": "number",
|
||||
"description": "ID of the entity",
|
||||
}
|
||||
if "id" not in required:
|
||||
required.append("id")
|
||||
if required:
|
||||
schema["required"] = required
|
||||
add_additional_properties_false(schema)
|
||||
return schema
|
||||
|
||||
|
||||
# Основная функция генерации GPT-инструмента (строго по META_SCHEMA)
|
||||
def generate_gpt_tool_schema(
|
||||
name: str,
|
||||
description: str,
|
||||
param_model: type[BaseModel] | None = None,
|
||||
add_id: bool = False,
|
||||
) -> dict:
|
||||
parameters_schema = (
|
||||
generate_parameters_from_pydantic(param_model, add_id) if param_model else None
|
||||
)
|
||||
tool_schema = {
|
||||
"name": name,
|
||||
"description": description,
|
||||
}
|
||||
if parameters_schema:
|
||||
tool_schema["parameters"] = parameters_schema
|
||||
return {
|
||||
"type": "function",
|
||||
"function": tool_schema,
|
||||
}
|
||||
|
||||
|
||||
def generate_crud_tool_schemas(
|
||||
entity_descriptor: EntityDescriptor,
|
||||
commands: list[CrudCommand] = [
|
||||
CrudCommand.LIST,
|
||||
CrudCommand.GET_BY_ID,
|
||||
CrudCommand.CREATE,
|
||||
CrudCommand.UPDATE,
|
||||
CrudCommand.DELETE,
|
||||
],
|
||||
) -> list[dict]:
|
||||
crud_tools = []
|
||||
|
||||
if (
|
||||
CrudCommand.LIST in entity_descriptor.crud.commands
|
||||
and CrudCommand.LIST in commands
|
||||
):
|
||||
tool = generate_gpt_tool_schema(
|
||||
name=f"model__{entity_descriptor.name}__{CrudCommand.LIST}",
|
||||
description=f"List all {entity_descriptor.name}s",
|
||||
param_model=None,
|
||||
)
|
||||
crud_tools.append(tool)
|
||||
|
||||
# tool = FunctionToolParam(
|
||||
# type="function",
|
||||
# name=f"model_{entity_descriptor.name}_get_by_id",
|
||||
# description=f"Get a {entity_descriptor.name} by ID",
|
||||
# parameters=generate_parameters_from_pydantic(entity_descriptor.get_by_id_schema_class),
|
||||
# )
|
||||
|
||||
if (
|
||||
CrudCommand.GET_BY_ID in entity_descriptor.crud.commands
|
||||
and CrudCommand.GET_BY_ID in commands
|
||||
):
|
||||
tool = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": f"model__{entity_descriptor.name}__{CrudCommand.GET_BY_ID}",
|
||||
"description": f"Get a {entity_descriptor.name} by ID",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "number",
|
||||
"description": "ID of the entity",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
crud_tools.append(tool)
|
||||
|
||||
if (
|
||||
CrudCommand.CREATE in entity_descriptor.crud.commands
|
||||
and CrudCommand.CREATE in commands
|
||||
):
|
||||
tool = generate_gpt_tool_schema(
|
||||
name=f"model__{entity_descriptor.name}__{CrudCommand.CREATE}",
|
||||
description=f"Create a new {entity_descriptor.name}",
|
||||
param_model=entity_descriptor.crud.create_schema,
|
||||
)
|
||||
crud_tools.append(tool)
|
||||
|
||||
if (
|
||||
CrudCommand.UPDATE in entity_descriptor.crud.commands
|
||||
and CrudCommand.UPDATE in commands
|
||||
):
|
||||
tool = generate_gpt_tool_schema(
|
||||
name=f"model__{entity_descriptor.name}__{CrudCommand.UPDATE}",
|
||||
description=f"Update a {entity_descriptor.name} by ID",
|
||||
param_model=entity_descriptor.crud.update_schema,
|
||||
add_id=True,
|
||||
)
|
||||
crud_tools.append(tool)
|
||||
|
||||
if (
|
||||
CrudCommand.DELETE in entity_descriptor.crud.commands
|
||||
and CrudCommand.DELETE in commands
|
||||
):
|
||||
tool = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": f"model__{entity_descriptor.name}__{CrudCommand.DELETE}",
|
||||
"description": f"Delete a {entity_descriptor.name} by ID",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "number",
|
||||
"description": "ID of the entity",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
crud_tools.append(tool)
|
||||
|
||||
return crud_tools
|
||||
|
||||
|
||||
def generate_gpt_tools_schemas(
|
||||
metadata: BotMetadata,
|
||||
tools_context: list[
|
||||
type[BotEntity] | type[BotProcess] | tuple[BotEntity, list[CrudCommand]]
|
||||
]
|
||||
| None = None,
|
||||
) -> list[dict]:
|
||||
gpt_tools_schemas = []
|
||||
if tools_context is None:
|
||||
for entity_descriptor in metadata.entity_descriptors.values():
|
||||
gpt_tools_schemas.extend(generate_crud_tool_schemas(entity_descriptor))
|
||||
else:
|
||||
for tool_item in tools_context:
|
||||
if isinstance(tool_item, type) and issubclass(tool_item, BotProcess):
|
||||
gpt_tools_schemas.append(
|
||||
generate_gpt_tool_schema(
|
||||
name=f"process__{tool_item.bot_process_descriptor.name}",
|
||||
description=tool_item.bot_process_descriptor.description,
|
||||
param_model=tool_item.bot_process_descriptor.input_schema,
|
||||
)
|
||||
)
|
||||
if isinstance(tool_item, type) and hasattr(
|
||||
tool_item, "bot_entity_descriptor"
|
||||
):
|
||||
gpt_tools_schemas.extend(
|
||||
generate_crud_tool_schemas(tool_item.bot_entity_descriptor)
|
||||
)
|
||||
elif isinstance(tool_item, tuple):
|
||||
entity, commands = tool_item
|
||||
if commands:
|
||||
gpt_tools_schemas.extend(
|
||||
generate_crud_tool_schemas(entity.bot_entity_descriptor, commands=commands)
|
||||
)
|
||||
else:
|
||||
gpt_tools_schemas.extend(
|
||||
generate_crud_tool_schemas(entity.bot_entity_descriptor)
|
||||
)
|
||||
return gpt_tools_schemas
|
||||
|
||||
|
||||
async def get_message_log(
|
||||
db_session: AsyncSession, user: UserBase, client_id: int | None = None
|
||||
):
|
||||
query = select(MessageLog).where(
|
||||
MessageLog.user_id == user.id,
|
||||
)
|
||||
if client_id:
|
||||
query = query.where(MessageLog.client_id == client_id)
|
||||
query = query.order_by(col(MessageLog.dt).desc()).limit(
|
||||
config.MESSAGE_HISTORY_DEPTH
|
||||
)
|
||||
result = await db_session.exec(query)
|
||||
return reversed(result.all())
|
||||
|
||||
|
||||
async def add_message_log(
|
||||
db_session: AsyncSession,
|
||||
user: UserBase,
|
||||
client_id: int | None = None,
|
||||
content: str | None = None,
|
||||
):
|
||||
message_log = MessageLog(
|
||||
user_id=user.id,
|
||||
is_business_chat=client_id is not None,
|
||||
client_id=client_id,
|
||||
content=content,
|
||||
)
|
||||
db_session.add(message_log)
|
||||
await db_session.commit()
|
||||
98
tests/test_utils_schema.py
Normal file
98
tests/test_utils_schema.py
Normal file
@@ -0,0 +1,98 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import List, Tuple
|
||||
from ..src.quickbot_agent.utils import generate_gpt_tool_schema
|
||||
|
||||
META_SCHEMA_REQUIRED_KEYS = {"name", "description", "parameters"}
|
||||
|
||||
|
||||
def test_simple_model():
|
||||
class SimpleModel(BaseModel):
|
||||
a: int = Field(..., description="A")
|
||||
b: str = Field("x", description="B")
|
||||
|
||||
schema = generate_gpt_tool_schema(
|
||||
func=None, name="simple", description="desc", param_model=SimpleModel
|
||||
)
|
||||
assert set(schema.keys()) == META_SCHEMA_REQUIRED_KEYS
|
||||
assert "type" not in schema
|
||||
assert schema["parameters"]["type"] == "object"
|
||||
assert "a" in schema["parameters"]["properties"]
|
||||
assert "b" in schema["parameters"]["properties"]
|
||||
assert "a" in schema["parameters"]["required"]
|
||||
assert "b" not in schema["parameters"].get("required", [])
|
||||
assert schema["parameters"]["properties"]["a"]["type"] == "number"
|
||||
assert schema["parameters"]["properties"]["b"]["type"] == "string"
|
||||
assert schema["parameters"]["properties"]["a"]["description"] == "A"
|
||||
|
||||
|
||||
def test_nested_model():
|
||||
class Inner(BaseModel):
|
||||
x: int = Field(...)
|
||||
|
||||
class Outer(BaseModel):
|
||||
inner: Inner = Field(...)
|
||||
|
||||
schema = generate_gpt_tool_schema(
|
||||
func=None, name="nested", description="desc", param_model=Outer
|
||||
)
|
||||
assert schema["parameters"]["properties"]["inner"]["type"] == "object"
|
||||
assert "x" in schema["parameters"]["properties"]["inner"]["properties"]
|
||||
assert schema["parameters"]["properties"]["inner"]["additionalProperties"] is False
|
||||
assert schema["parameters"]["additionalProperties"] is False
|
||||
|
||||
|
||||
def test_list_3d():
|
||||
class List3D(BaseModel):
|
||||
matrix: List[List[List[int]]] = Field(...)
|
||||
|
||||
schema = generate_gpt_tool_schema(
|
||||
func=None, name="list3d", description="desc", param_model=List3D
|
||||
)
|
||||
m = schema["parameters"]["properties"]["matrix"]
|
||||
assert m["type"] == "array"
|
||||
assert m["items"]["type"] == "array"
|
||||
assert m["items"]["items"]["type"] == "array"
|
||||
assert m["items"]["items"]["items"]["type"] == "number"
|
||||
|
||||
|
||||
def test_tuple_prefix_items():
|
||||
class TupleModel(BaseModel):
|
||||
t: Tuple[str, int, bool] = Field(...)
|
||||
|
||||
schema = generate_gpt_tool_schema(
|
||||
func=None, name="tuple", description="desc", param_model=TupleModel
|
||||
)
|
||||
t = schema["parameters"]["properties"]["t"]
|
||||
assert t["type"] == "array"
|
||||
assert "prefixItems" in t
|
||||
assert t["prefixItems"][0]["type"] == "string"
|
||||
assert t["prefixItems"][1]["type"] == "number"
|
||||
assert t["prefixItems"][2]["type"] == "boolean"
|
||||
|
||||
|
||||
def test_required_with_defaults():
|
||||
class Model(BaseModel):
|
||||
a: int = Field(...)
|
||||
b: int = Field(5)
|
||||
c: int = Field(None)
|
||||
d: int = Field()
|
||||
|
||||
schema = generate_gpt_tool_schema(
|
||||
func=None, name="req", description="desc", param_model=Model
|
||||
)
|
||||
req = schema["parameters"]["required"]
|
||||
assert "a" in req
|
||||
assert "d" in req
|
||||
assert "b" not in req
|
||||
assert "c" not in req
|
||||
|
||||
|
||||
def test_no_type_function():
|
||||
class M(BaseModel):
|
||||
x: int
|
||||
|
||||
schema = generate_gpt_tool_schema(
|
||||
func=None, name="m", description="desc", param_model=M
|
||||
)
|
||||
assert "type" not in schema
|
||||
assert set(schema.keys()) == META_SCHEMA_REQUIRED_KEYS
|
||||
Reference in New Issue
Block a user