feat:移除旧的工具系统,并使emoji成为maisaka内置动作

This commit is contained in:
SengokuCola
2026-03-29 15:25:36 +08:00
parent 614d2f43d6
commit 868438e3c1
7 changed files with 322 additions and 426 deletions

View File

@@ -0,0 +1,126 @@
{
"1": [],
"2": [
{
"id": "know_2_1774768612.298128",
"content": "性格自信,常以“真理在我这边”自居",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:16:52.298128"
},
{
"id": "know_2_1774768645.029561",
"content": "性格自信且带有自嘲精神,喜欢用轻松调侃的方式应对他人评价",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:17:25.029561"
}
],
"3": [],
"4": [],
"5": [],
"6": [
{
"id": "know_6_1774768486.451792",
"content": "正在搭建 RAG 测试集",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:14:46.451792"
},
{
"id": "know_6_1774768517.122405",
"content": "熟悉 NapCat、RAG 等技术工具及互联网梗文化",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:15:17.122405"
}
],
"7": [
{
"id": "know_7_1774768517.120403",
"content": "从事 RAG 测试集搭建或相关技术工作",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:15:17.120403"
},
{
"id": "know_7_1774768573.741823",
"content": "从事 RAG检索增强生成测试集搭建相关工作",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:16:13.741823"
}
],
"8": [],
"9": [],
"10": [
{
"id": "know_10_1774768486.452792",
"content": "沟通风格带有调侃和自信,习惯用反问句表达观点",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:14:46.452792"
},
{
"id": "know_10_1774768517.121403",
"content": "沟通风格带有较强的好胜心和防御性,习惯用反问和调侃回应质疑",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:15:17.121403"
},
{
"id": "know_10_1774768573.742824",
"content": "沟通风格幽默,擅长使用逻辑闭环和反问句式进行辩论或调侃",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:16:13.742824"
},
{
"id": "know_10_1774768612.299126",
"content": "沟通风格幽默风趣,擅长使用网络梗和表情包互动",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:16:52.299126"
},
{
"id": "know_10_1774768612.299845",
"content": "偶尔会文绉绉地表达(自称“文青病犯了”),但能迅速切换回口语化",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:16:52.299845"
},
{
"id": "know_10_1774768645.028561",
"content": "沟通风格幽默风趣,偶尔会文青病发作使用古风表达",
"metadata": {
"session_id": "628336b082552269377e9d0648e26c60",
"source": "maisaka_learning"
},
"created_at": "2026-03-29T15:17:25.028561"
}
],
"11": [],
"12": []
}

View File

@@ -62,10 +62,6 @@ class DefaultReplyer:
self.chat_stream = chat_stream
self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_stream.session_id)
from src.chat.tool_executor import ToolExecutor
self.tool_executor = ToolExecutor(chat_id=self.chat_stream.session_id, enable_cache=True, cache_ttl=3)
async def generate_reply_with_context(
self,
extra_info: str = "",
@@ -399,6 +395,11 @@ class DefaultReplyer:
return f"{expression_habits_title}\n{expression_habits_block}", selected_ids
async def build_tool_info(self, chat_history: str, sender: str, target: str, enable_tool: bool = True) -> str:
del chat_history
del sender
del target
del enable_tool
return ""
"""构建工具信息块
Args:
@@ -415,9 +416,7 @@ class DefaultReplyer:
try:
# 使用工具执行器获取信息
tool_results, _, _ = await self.tool_executor.execute_from_chat_message(
sender=sender, target_message=target, chat_history=chat_history, return_details=False
)
tool_results = []
if tool_results:
tool_info_str = "以下是你通过工具获取到的实时信息:\n"
@@ -1173,6 +1172,10 @@ class DefaultReplyer:
return content, reasoning_content, model_name, tool_calls
async def get_prompt_info(self, message: str, sender: str, target: str):
del message
del sender
del target
return ""
related_info = ""
start_time = time.time()
try:
@@ -1218,7 +1221,7 @@ class DefaultReplyer:
# logger.info(f"工具调用: {tool_calls}")
if tool_calls:
result = await self.tool_executor.execute_tool_call(tool_calls[0])
result = None
end_time = time.time()
if not result or not result.get("content"):
logger.debug("从LPMM知识库获取知识失败返回空知识...")

View File

@@ -59,10 +59,6 @@ class PrivateReplyer:
self.is_group_chat, self.chat_target_info = get_chat_type_and_target_info(self.chat_stream.session_id)
# self.memory_activator = MemoryActivator()
from src.chat.tool_executor import ToolExecutor
self.tool_executor = ToolExecutor(chat_id=self.chat_stream.session_id, enable_cache=True, cache_ttl=3)
async def generate_reply_with_context(
self,
extra_info: str = "",
@@ -292,6 +288,11 @@ class PrivateReplyer:
return f"{expression_habits_title}\n{expression_habits_block}", selected_ids
async def build_tool_info(self, chat_history: str, sender: str, target: str, enable_tool: bool = True) -> str:
del chat_history
del sender
del target
del enable_tool
return ""
"""构建工具信息块
Args:
@@ -308,9 +309,7 @@ class PrivateReplyer:
try:
# 使用工具执行器获取信息
tool_results, _, _ = await self.tool_executor.execute_from_chat_message(
sender=sender, target_message=target, chat_history=chat_history, return_details=False
)
tool_results = []
if tool_results:
tool_info_str = "以下是你通过工具获取到的实时信息:\n"

View File

@@ -1,258 +0,0 @@
"""工具执行器。
独立的工具执行组件,可以直接输入聊天消息内容,
自动判断并执行相应的工具,返回结构化的工具执行结果。
"""
from typing import Any, Dict, List, Optional, Tuple, cast
import hashlib
import time
from src.common.logger import get_logger
from src.config.config import global_config
from src.core.announcement_manager import global_announcement_manager
from src.llm_models.payload_content import ToolCall
from src.llm_models.payload_content.tool_option import ToolDefinitionInput
from src.common.data_models.llm_service_data_models import LLMGenerationOptions
from src.services.llm_service import LLMServiceClient
from src.plugin_runtime.component_query import component_query_service
from src.prompt.prompt_manager import prompt_manager
logger = get_logger("tool_use")
class ToolExecutor:
"""独立的工具执行器组件
可以直接输入聊天消息内容,自动判断并执行相应的工具,返回结构化的工具执行结果。
"""
def __init__(self, chat_id: str, enable_cache: bool = True, cache_ttl: int = 3):
from src.chat.message_receive.chat_manager import chat_manager as _chat_manager
self.chat_id = chat_id
self.chat_stream = _chat_manager.get_session_by_session_id(self.chat_id)
self.log_prefix = f"[{_chat_manager.get_session_name(self.chat_id) or self.chat_id}]"
self.llm_model = LLMServiceClient(
task_name="tool_use", request_type="tool_executor"
)
self.enable_cache = enable_cache
self.cache_ttl = cache_ttl
self.tool_cache: Dict[str, dict] = {}
logger.info(f"{self.log_prefix}工具执行器初始化完成,缓存{'启用' if enable_cache else '禁用'}TTL={cache_ttl}")
async def execute_from_chat_message(
self, target_message: str, chat_history: str, sender: str, return_details: bool = False
) -> Tuple[List[Dict[str, Any]], List[str], str]:
"""从聊天消息执行工具"""
cache_key = self._generate_cache_key(target_message, chat_history, sender)
if cached_result := self._get_from_cache(cache_key):
logger.info(f"{self.log_prefix}使用缓存结果,跳过工具执行")
if not return_details:
return cached_result, [], ""
used_tools = [result.get("tool_name", "unknown") for result in cached_result]
return cached_result, used_tools, ""
tools = self._get_tool_definitions()
if not tools:
logger.debug(f"{self.log_prefix}没有可用工具,直接返回空内容")
return [], [], ""
prompt_template = prompt_manager.get_prompt("tool_executor")
prompt_template.add_context("target_message", target_message)
prompt_template.add_context("chat_history", chat_history)
prompt_template.add_context("sender", sender)
prompt_template.add_context("bot_name", global_config.bot.nickname)
prompt_template.add_context("time_now", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
prompt = await prompt_manager.render_prompt(prompt_template)
logger.debug(f"{self.log_prefix}开始LLM工具调用分析")
generation_result = await self.llm_model.generate_response(
prompt=prompt,
options=LLMGenerationOptions(tool_options=tools, raise_when_empty=False),
)
tool_calls = generation_result.tool_calls
tool_results, used_tools = await self.execute_tool_calls(tool_calls)
if tool_results:
self._set_cache(cache_key, tool_results)
if used_tools:
logger.info(f"{self.log_prefix}工具执行完成,共执行{len(used_tools)}个工具: {used_tools}")
if return_details:
return tool_results, used_tools, prompt
return tool_results, [], ""
def _get_tool_definitions(self) -> List[ToolDefinitionInput]:
"""获取 LLM 可用的工具定义列表"""
all_tools = component_query_service.get_llm_available_tools()
user_disabled_tools = global_announcement_manager.get_disabled_chat_tools(self.chat_id)
return [
cast(ToolDefinitionInput, info.get_llm_definition())
for name, info in all_tools.items()
if name not in user_disabled_tools
]
async def execute_tool_calls(self, tool_calls: Optional[List[ToolCall]]) -> Tuple[List[Dict[str, Any]], List[str]]:
"""执行工具调用列表"""
tool_results: List[Dict[str, Any]] = []
used_tools: List[str] = []
if not tool_calls:
logger.debug(f"{self.log_prefix}无需执行工具")
return [], []
func_names = [call.func_name for call in tool_calls if call.func_name]
logger.info(f"{self.log_prefix}开始执行工具调用: {func_names}")
for tool_call in tool_calls:
tool_name = tool_call.func_name
try:
logger.debug(f"{self.log_prefix}执行工具: {tool_name}")
result = await self.execute_tool_call(tool_call)
if result:
tool_info = {
"type": result.get("type", "unknown_type"),
"id": result.get("id", f"tool_exec_{time.time()}"),
"content": result.get("content", ""),
"tool_name": tool_name,
"timestamp": time.time(),
}
content = tool_info["content"]
if not isinstance(content, (str, list, tuple)):
tool_info["content"] = str(content)
content_check = tool_info["content"]
if (isinstance(content_check, str) and not content_check.strip()) or (
isinstance(content_check, (list, tuple)) and len(content_check) == 0
):
logger.debug(f"{self.log_prefix}工具{tool_name}无有效内容,跳过展示")
continue
tool_results.append(tool_info)
used_tools.append(tool_name)
preview = str(content)[:200]
logger.debug(f"{self.log_prefix}工具{tool_name}结果内容: {preview}...")
except Exception as e:
logger.error(f"{self.log_prefix}工具{tool_name}执行失败: {e}")
error_info = {
"type": "tool_error",
"id": f"tool_error_{time.time()}",
"content": f"工具{tool_name}执行失败: {str(e)}",
"tool_name": tool_name,
"timestamp": time.time(),
}
tool_results.append(error_info)
return tool_results, used_tools
async def execute_tool_call(self, tool_call: ToolCall) -> Optional[Dict[str, Any]]:
"""执行单个工具调用"""
function_name = tool_call.func_name
function_args = tool_call.args or {}
function_args["llm_called"] = True
executor = component_query_service.get_tool_executor(function_name)
if not executor:
logger.warning(f"未知工具名称: {function_name}")
return None
result = await executor(function_args)
if result:
return {
"tool_call_id": tool_call.call_id,
"role": "tool",
"name": function_name,
"type": "function",
"content": result["content"],
}
return None
async def execute_specific_tool_simple(self, tool_name: str, tool_args: Dict) -> Optional[Dict]:
"""直接执行指定工具"""
try:
tool_call = ToolCall(
call_id=f"direct_tool_{time.time()}",
func_name=tool_name,
args=tool_args,
)
logger.info(f"{self.log_prefix}直接执行工具: {tool_name}")
result = await self.execute_tool_call(tool_call)
if result:
tool_info = {
"type": result.get("type", "unknown_type"),
"id": result.get("id", f"direct_tool_{time.time()}"),
"content": result.get("content", ""),
"tool_name": tool_name,
"timestamp": time.time(),
}
logger.info(f"{self.log_prefix}直接工具执行成功: {tool_name}")
return tool_info
except Exception as e:
logger.error(f"{self.log_prefix}直接工具执行失败 {tool_name}: {e}")
return None
# === 缓存方法 ===
def _generate_cache_key(self, target_message: str, chat_history: str, sender: str) -> str:
content = f"{target_message}_{chat_history}_{sender}"
return hashlib.md5(content.encode()).hexdigest()
def _get_from_cache(self, cache_key: str) -> Optional[List[Dict]]:
if not self.enable_cache or cache_key not in self.tool_cache:
return None
cache_item = self.tool_cache[cache_key]
if cache_item["ttl"] <= 0:
del self.tool_cache[cache_key]
return None
cache_item["ttl"] -= 1
return cache_item["result"]
def _set_cache(self, cache_key: str, result: List[Dict]):
if not self.enable_cache:
return
self.tool_cache[cache_key] = {"result": result, "ttl": self.cache_ttl, "timestamp": time.time()}
def _cleanup_expired_cache(self):
if not self.enable_cache:
return
expired = [k for k, v in self.tool_cache.items() if v["ttl"] <= 0]
for key in expired:
del self.tool_cache[key]
def clear_cache(self):
if self.enable_cache:
self.tool_cache.clear()
def get_cache_status(self) -> Dict:
if not self.enable_cache:
return {"enabled": False, "cache_count": 0}
self._cleanup_expired_cache()
ttl_distribution: Dict[int, int] = {}
for item in self.tool_cache.values():
ttl = item["ttl"]
ttl_distribution[ttl] = ttl_distribution.get(ttl, 0) + 1
return {
"enabled": True,
"cache_count": len(self.tool_cache),
"cache_ttl": self.cache_ttl,
"ttl_distribution": ttl_distribution,
}
def set_cache_config(self, enable_cache: Optional[bool] = None, cache_ttl: int = -1):
if enable_cache is not None:
self.enable_cache = enable_cache
if cache_ttl > 0:
self.cache_ttl = cache_ttl

View File

@@ -80,6 +80,21 @@ def create_builtin_tools() -> List[ToolOption]:
stop_builder.set_description("Stop the current inner loop and return control to the outer chat flow.")
tools.append(stop_builder.build())
send_emoji_builder = ToolOptionBuilder()
send_emoji_builder.set_name("send_emoji")
send_emoji_builder.set_description(
"Send an emoji sticker to help express emotions. "
"You should specify the emotion type to select an appropriate emoji."
)
send_emoji_builder.add_param(
name="emotion",
param_type=ToolParamType.STRING,
description="The emotion type for selecting an appropriate emoji (e.g., 'happy', 'sad', 'angry', 'surprised', etc.).",
required=False,
enum_values=None,
)
tools.append(send_emoji_builder.build())
return tools

View File

@@ -1,5 +1,6 @@
"""Maisaka 推理引擎。"""
import difflib
import json
import asyncio
import re
@@ -48,6 +49,7 @@ class MaisakaReasoningEngine:
def __init__(self, runtime: "MaisakaHeartFlowChatting") -> None:
self._runtime = runtime
self._reply_context_builder = MaisakaReplyContextBuilder(runtime.session_id)
self._last_reasoning_content: str = ""
async def run_loop(self) -> None:
"""独立消费消息批次,并执行对应的内部思考轮次。"""
@@ -71,6 +73,13 @@ class MaisakaReasoningEngine:
response = await self._runtime._chat_loop_service.chat_loop_step(self._runtime._chat_history)
cycle_detail.time_records["planner"] = time.time() - planner_started_at
reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content):
response.content = "让我根据新情况重新思考:"
response.raw_message.content = "让我根据新情况重新思考:"
logger.info(f"{self._runtime.log_prefix} reasoning content replaced due to high similarity")
self._last_reasoning_content = reasoning_content
response.raw_message.platform = anchor_message.platform
response.raw_message.session_id = self._runtime.session_id
response.raw_message.message_info.group_info = self._runtime._build_group_info(anchor_message)
@@ -330,6 +339,37 @@ class MaisakaReasoningEngine:
self._runtime._chat_history = trimmed_history
self._runtime._log_history_trimmed(removed_count, conversation_message_count)
@staticmethod
def _calculate_similarity(text1: str, text2: str) -> float:
"""计算两个文本之间的相似度。
Args:
text1: 第一个文本
text2: 第二个文本
Returns:
float: 相似度值,范围 0-11 表示完全相同
"""
return difflib.SequenceMatcher(None, text1, text2).ratio()
def _should_replace_reasoning(self, current_content: str) -> bool:
"""判断是否需要替换推理内容。
当当前推理内容与上一次相似度大于90%返回True。
Args:
current_content: 当前的推理内容
Returns:
bool: 是否需要替换
"""
if not self._last_reasoning_content or not current_content:
return False
similarity = self._calculate_similarity(current_content, self._last_reasoning_content)
logger.info(f"{self._runtime.log_prefix} reasoning similarity: {similarity:.2f}")
return similarity > 0.9
async def _handle_tool_calls(
self,
tool_calls: list[ToolCall],
@@ -382,6 +422,10 @@ class MaisakaReasoningEngine:
self._runtime._enter_stop_state()
return True
if tool_call.func_name == "send_emoji":
await self._handle_send_emoji(tool_call, anchor_message)
continue
if self._runtime._mcp_manager and self._runtime._mcp_manager.is_mcp_tool(tool_call.func_name):
await handle_mcp_tool(tool_call, self._runtime._chat_history, self._runtime._mcp_manager)
continue
@@ -615,6 +659,104 @@ class MaisakaReasoningEngine:
)
return True
async def _handle_send_emoji(self, tool_call: ToolCall, anchor_message: SessionMessage) -> None:
"""处理发送表情包的工具调用。
Args:
tool_call: 工具调用对象
anchor_message: 锚点消息
"""
from src.chat.emoji_system.emoji_manager import emoji_manager
from src.common.utils.utils_image import ImageUtils
import random
tool_args = tool_call.args or {}
emotion = str(tool_args.get("emotion") or "").strip()
logger.info(f"{self._runtime.log_prefix} send_emoji tool triggered: emotion={emotion!r}")
# 获取表情包列表
if not emoji_manager.emojis:
self._runtime._chat_history.append(
self._build_tool_message(tool_call, "No emojis available in the emoji library.")
)
return
# 根据情感选择表情包
selected_emoji = None
if emotion:
# 尝试找到匹配情感的表情包
matching_emojis = [
emoji for emoji in emoji_manager.emojis
if emotion.lower() in (e.lower() for e in emoji.emotion)
]
if matching_emojis:
selected_emoji = random.choice(matching_emojis)
logger.info(
f"{self._runtime.log_prefix} found {len(matching_emojis)} emojis matching emotion '{emotion}', "
f"selected: {selected_emoji.description}"
)
# 如果没有找到匹配的情感表情包,随机选择一个
if selected_emoji is None:
selected_emoji = random.choice(emoji_manager.emojis)
logger.info(
f"{self._runtime.log_prefix} no emoji matched emotion '{emotion}', "
f"randomly selected: {selected_emoji.description}"
)
# 更新表情包使用次数
emoji_manager.update_emoji_usage(selected_emoji)
# 获取表情包的 base64 数据
try:
emoji_base64 = ImageUtils.image_path_to_base64(str(selected_emoji.full_path))
if not emoji_base64:
raise ValueError("Failed to convert emoji image to base64")
except Exception as exc:
logger.error(
f"{self._runtime.log_prefix} failed to convert emoji to base64: {exc}"
)
self._runtime._chat_history.append(
self._build_tool_message(tool_call, f"Failed to send emoji: {exc}")
)
return
# 发送表情包
try:
sent = await send_service.emoji_to_stream(
emoji_base64=emoji_base64,
stream_id=self._runtime.session_id,
storage_message=True,
set_reply=False,
reply_message=None,
)
except Exception as exc:
logger.exception(
f"{self._runtime.log_prefix} send_service.emoji_to_stream crashed: {exc}"
)
self._runtime._chat_history.append(
self._build_tool_message(tool_call, f"Emoji send crashed: {exc}")
)
return
if sent:
logger.info(
f"{self._runtime.log_prefix} emoji sent successfully: "
f"description={selected_emoji.description!r} emotion={selected_emoji.emotion}"
)
self._runtime._chat_history.append(
self._build_tool_message(
tool_call,
f"Sent emoji: {selected_emoji.description} (emotion: {', '.join(selected_emoji.emotion)})"
)
)
else:
logger.warning(f"{self._runtime.log_prefix} emoji send failed")
self._runtime._chat_history.append(
self._build_tool_message(tool_call, "Failed to send emoji.")
)
def _build_tool_message(self, tool_call: ToolCall, content: str) -> SessionMessage:
return build_message(
role="tool",

View File

@@ -1,13 +1,11 @@
"""
MaiSaka tool handlers.
MaiSaka 工具处理器。
"""
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Optional
import json as _json
import os
from rich.panel import Panel
@@ -22,11 +20,8 @@ if TYPE_CHECKING:
from src.mcp_module import MCPManager
MAI_FILES_DIR = Path(os.path.join(os.path.dirname(os.path.abspath(__file__)), "mai_files"))
class ToolHandlerContext:
"""Shared context for tool handlers."""
"""工具处理器共享上下文。"""
def __init__(
self,
@@ -39,18 +34,18 @@ class ToolHandlerContext:
async def handle_stop(tc: ToolCall, chat_history: list[SessionMessage]) -> None:
"""Handle the stop tool."""
console.print("[accent]Calling tool: stop()[/accent]")
"""处理 stop 工具。"""
console.print("[accent]调用工具: stop()[/accent]")
chat_history.append(
build_message(role="tool", content="Conversation loop will stop after this round.", tool_call_id=tc.call_id)
build_message(role="tool", content="当前轮次结束后将停止对话循环。", tool_call_id=tc.call_id)
)
async def handle_wait(tc: ToolCall, chat_history: list[SessionMessage], ctx: ToolHandlerContext) -> str:
"""Handle the wait tool."""
"""处理 wait 工具。"""
seconds = (tc.args or {}).get("seconds", 30)
seconds = max(5, min(seconds, 300))
console.print(f"[accent]Calling tool: wait({seconds})[/accent]")
console.print(f"[accent]调用工具: wait({seconds})[/accent]")
tool_result = await _do_wait(seconds, ctx)
chat_history.append(build_message(role="tool", content=tool_result, tool_call_id=tc.call_id))
@@ -58,41 +53,41 @@ async def handle_wait(tc: ToolCall, chat_history: list[SessionMessage], ctx: Too
async def _do_wait(seconds: int, ctx: ToolHandlerContext) -> str:
"""Wait for user input with a timeout."""
console.print(f"[muted]Waiting for user input (timeout: {seconds}s)...[/muted]")
"""等待用户输入,支持超时。"""
console.print(f"[muted]等待用户输入中(超时: {seconds} 秒)...[/muted]")
console.print("[bold magenta]> [/bold magenta]", end="")
user_input = await ctx.reader.get_line(timeout=seconds)
if user_input is None:
console.print()
console.print("[muted]Wait timeout[/muted]")
return "Wait timed out; no user input received."
console.print("[muted]等待超时[/muted]")
return "等待超时,未收到用户输入。"
user_input = user_input.strip()
if not user_input:
return "User submitted an empty input."
return "用户提交了空输入。"
now = datetime.now()
ctx.last_user_input_time = now
ctx.user_input_times.append(now)
if user_input.lower() in ("/quit", "/exit", "/q"):
return "[[QUIT]] User requested to exit."
return "[[QUIT]] 用户请求退出。"
return f"User input received: {user_input}"
return f"已收到用户输入: {user_input}"
async def handle_mcp_tool(tc: ToolCall, chat_history: list[SessionMessage], mcp_manager: "MCPManager") -> None:
"""Handle an MCP tool call."""
"""处理 MCP 工具调用。"""
args_str = _json.dumps(tc.args or {}, ensure_ascii=False)
args_preview = args_str if len(args_str) <= 120 else args_str[:120] + "..."
console.print(f"[accent]Calling MCP tool: {tc.func_name}({args_preview})[/accent]")
console.print(f"[accent]调用 MCP 工具: {tc.func_name}({args_preview})[/accent]")
with console.status(f"[info]Running MCP tool {tc.func_name}...[/info]", spinner="dots"):
with console.status(f"[info]正在执行 MCP 工具 {tc.func_name}...[/info]", spinner="dots"):
result = await mcp_manager.call_tool(tc.func_name, tc.args or {})
display_text = result if len(result) <= 800 else result[:800] + "\n... (truncated)"
display_text = result if len(result) <= 800 else result[:800] + "\n...(已截断)"
console.print(
Panel(
display_text,
@@ -105,132 +100,6 @@ async def handle_mcp_tool(tc: ToolCall, chat_history: list[SessionMessage], mcp_
async def handle_unknown_tool(tc: ToolCall, chat_history: list[SessionMessage]) -> None:
"""Handle an unknown tool call."""
console.print(f"[accent]Calling unknown tool: {tc.func_name}({tc.args})[/accent]")
chat_history.append(build_message(role="tool", content=f"Unknown tool: {tc.func_name}", tool_call_id=tc.call_id))
async def handle_write_file(tc: ToolCall, chat_history: list[SessionMessage]) -> None:
"""Write a file under the local mai_files workspace."""
filename = (tc.args or {}).get("filename", "")
content = (tc.args or {}).get("content", "")
console.print(f'[accent]Calling tool: write_file("{filename}")[/accent]')
MAI_FILES_DIR.mkdir(parents=True, exist_ok=True)
file_path = MAI_FILES_DIR / filename
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as file:
file.write(content)
file_size = file_path.stat().st_size
console.print(
Panel(
f"Path: {filename}\nSize: {file_size} bytes",
title="File Written",
border_style="green",
padding=(0, 1),
)
)
chat_history.append(
build_message(
role="tool",
content=f"File written successfully: {filename} ({file_size} bytes)",
tool_call_id=tc.call_id,
)
)
except Exception as exc:
error_msg = f"Failed to write file: {exc}"
console.print(f"[error]{error_msg}[/error]")
chat_history.append(build_message(role="tool", content=error_msg, tool_call_id=tc.call_id))
async def handle_read_file(tc: ToolCall, chat_history: list[SessionMessage]) -> None:
"""Read a file from the local mai_files workspace."""
filename = (tc.args or {}).get("filename", "")
console.print(f'[accent]Calling tool: read_file("{filename}")[/accent]')
file_path = MAI_FILES_DIR / filename
try:
if not file_path.exists():
error_msg = f"File does not exist: {filename}"
console.print(f"[warning]{error_msg}[/warning]")
chat_history.append(build_message(role="tool", content=error_msg, tool_call_id=tc.call_id))
return
if not file_path.is_file():
error_msg = f"Path is not a file: {filename}"
console.print(f"[warning]{error_msg}[/warning]")
chat_history.append(build_message(role="tool", content=error_msg, tool_call_id=tc.call_id))
return
with open(file_path, "r", encoding="utf-8") as file:
file_content = file.read()
display_content = file_content if len(file_content) <= 1000 else file_content[:1000] + "\n... (truncated)"
console.print(
Panel(
display_content,
title=f"Read File: {filename}",
border_style="blue",
padding=(0, 1),
)
)
chat_history.append(
build_message(role="tool", content=f"File content of {filename}:\n{file_content}", tool_call_id=tc.call_id)
)
except Exception as exc:
error_msg = f"Failed to read file: {exc}"
console.print(f"[error]{error_msg}[/error]")
chat_history.append(build_message(role="tool", content=error_msg, tool_call_id=tc.call_id))
async def handle_list_files(tc: ToolCall, chat_history: list[SessionMessage]) -> None:
"""List files under the local mai_files workspace."""
console.print("[accent]Calling tool: list_files()[/accent]")
try:
MAI_FILES_DIR.mkdir(parents=True, exist_ok=True)
files_info: list[dict[str, Any]] = []
for item in MAI_FILES_DIR.rglob("*"):
if item.is_file():
stat = item.stat()
files_info.append(
{
"name": str(item.relative_to(MAI_FILES_DIR)),
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
}
)
if not files_info:
result_text = "No files found under mai_files."
else:
files_info.sort(key=lambda item: item["name"])
lines = [f"Found {len(files_info)} file(s):\n"]
for item in files_info:
lines.append(f"- {item['name']} ({item['size']} bytes, modified {item['modified']})")
result_text = "\n".join(lines)
console.print(
Panel(
result_text,
title="File List",
border_style="cyan",
padding=(0, 1),
)
)
chat_history.append(build_message(role="tool", content=result_text, tool_call_id=tc.call_id))
except Exception as exc:
error_msg = f"Failed to list files: {exc}"
console.print(f"[error]{error_msg}[/error]")
chat_history.append(build_message(role="tool", content=error_msg, tool_call_id=tc.call_id))
try:
MAI_FILES_DIR.mkdir(parents=True, exist_ok=True)
except Exception as exc:
console.print(f"[warning]Failed to initialize mai_files directory: {exc}[/warning]")
"""处理未知工具调用。"""
console.print(f"[accent]调用未知工具: {tc.func_name}({tc.args})[/accent]")
chat_history.append(build_message(role="tool", content=f"未知工具: {tc.func_name}", tool_call_id=tc.call_id))