Feat:添加对Action插件的支持,现在可以编写插件

This commit is contained in:
SengokuCola
2025-05-16 00:43:46 +08:00
parent ac6f96f805
commit cda9879bb2
28 changed files with 934 additions and 662 deletions

View File

@@ -0,0 +1,5 @@
# 导入所有动作模块以确保装饰器被执行
from . import reply_action # noqa
from . import no_reply_action # noqa
# 在此处添加更多动作模块导入

View File

@@ -12,7 +12,7 @@ _DEFAULT_ACTIONS: Dict[str, str] = {}
def register_action(cls):
"""
动作注册装饰器
用法:
@register_action
class MyAction(BaseAction):
@@ -24,22 +24,22 @@ def register_action(cls):
if not hasattr(cls, "action_name") or not hasattr(cls, "action_description"):
logger.error(f"动作类 {cls.__name__} 缺少必要的属性: action_name 或 action_description")
return cls
action_name = getattr(cls, "action_name")
action_description = getattr(cls, "action_description")
action_name = cls.action_name
action_description = cls.action_description
is_default = getattr(cls, "default", False)
if not action_name or not action_description:
logger.error(f"动作类 {cls.__name__} 的 action_name 或 action_description 为空")
return cls
# 将动作类注册到全局注册表
_ACTION_REGISTRY[action_name] = cls
# 如果是默认动作,添加到默认动作集
if is_default:
_DEFAULT_ACTIONS[action_name] = action_description
logger.info(f"已注册动作: {action_name} -> {cls.__name__},默认: {is_default}")
return cls
@@ -60,15 +60,14 @@ class BaseAction(ABC):
cycle_timers: 计时器字典
thinking_id: 思考ID
"""
#每个动作必须实现
self.action_name:str = "base_action"
self.action_description:str = "基础动作"
self.action_parameters:dict = {}
self.action_require:list[str] = []
self.default:bool = False
# 每个动作必须实现
self.action_name: str = "base_action"
self.action_description: str = "基础动作"
self.action_parameters: dict = {}
self.action_require: list[str] = []
self.default: bool = False
self.action_data = action_data
self.reasoning = reasoning
self.cycle_timers = cycle_timers

View File

@@ -29,7 +29,7 @@ class NoReplyAction(BaseAction):
action_require = [
"话题无关/无聊/不感兴趣/不懂",
"最后一条消息是你自己发的且无人回应你",
"你发送了太多消息,且无人回复"
"你发送了太多消息,且无人回复",
]
default = True
@@ -43,10 +43,10 @@ class NoReplyAction(BaseAction):
on_consecutive_no_reply_callback: Callable[[], Coroutine[None, None, None]],
current_cycle: CycleDetail,
log_prefix: str,
total_no_reply_count: int = 0,
total_waiting_time: float = 0.0,
# total_no_reply_count: int = 0,
# total_waiting_time: float = 0.0,
shutting_down: bool = False,
**kwargs
**kwargs,
):
"""初始化不回复动作处理器
@@ -69,8 +69,8 @@ class NoReplyAction(BaseAction):
self.on_consecutive_no_reply_callback = on_consecutive_no_reply_callback
self._current_cycle = current_cycle
self.log_prefix = log_prefix
self.total_no_reply_count = total_no_reply_count
self.total_waiting_time = total_waiting_time
# self.total_no_reply_count = total_no_reply_count
# self.total_waiting_time = total_waiting_time
self._shutting_down = shutting_down
async def handle_action(self) -> Tuple[bool, str]:
@@ -96,34 +96,6 @@ class NoReplyAction(BaseAction):
# 从计时器获取实际等待时间
current_waiting = self.cycle_timers.get("等待新消息", 0.0)
if not self._shutting_down:
self.total_no_reply_count += 1
self.total_waiting_time += current_waiting # 累加等待时间
logger.debug(
f"{self.log_prefix} 连续不回复计数增加: {self.total_no_reply_count}/{CONSECUTIVE_NO_REPLY_THRESHOLD}, "
f"本次等待: {current_waiting:.2f}秒, 累计等待: {self.total_waiting_time:.2f}"
)
# 检查是否同时达到次数和时间阈值
time_threshold = 0.66 * WAITING_TIME_THRESHOLD * CONSECUTIVE_NO_REPLY_THRESHOLD
if (
self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD
and self.total_waiting_time >= time_threshold
):
logger.info(
f"{self.log_prefix} 连续不回复达到阈值 ({self.total_no_reply_count}次) "
f"且累计等待时间达到 {self.total_waiting_time:.2f}秒 (阈值 {time_threshold}秒)"
f"调用回调请求状态转换"
)
# 调用回调。注意:这里不重置计数器和时间,依赖回调函数成功改变状态来隐式重置上下文。
await self.on_consecutive_no_reply_callback()
elif self.total_no_reply_count >= CONSECUTIVE_NO_REPLY_THRESHOLD:
# 仅次数达到阈值,但时间未达到
logger.debug(
f"{self.log_prefix} 连续不回复次数达到阈值 ({self.total_no_reply_count}次) "
f"但累计等待时间 {self.total_waiting_time:.2f}秒 未达到时间阈值 ({time_threshold}秒),暂不调用回调"
)
# else: 次数和时间都未达到阈值,不做处理
return True, "" # 不回复动作没有回复文本

View File

@@ -0,0 +1,215 @@
import traceback
from typing import Tuple, Dict, List, Any, Optional
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.focus_chat.hfc_utils import create_empty_anchor_message
from src.common.logger_manager import get_logger
from src.chat.person_info.person_info import person_info_manager
from abc import abstractmethod
logger = get_logger("plugin_action")
class PluginAction(BaseAction):
"""插件动作基类
封装了主程序内部依赖提供简化的API接口给插件开发者
"""
def __init__(self, action_data: dict, reasoning: str, cycle_timers: dict, thinking_id: str, **kwargs):
"""初始化插件动作基类"""
super().__init__(action_data, reasoning, cycle_timers, thinking_id)
# 存储内部服务和对象引用
self._services = {}
# 从kwargs提取必要的内部服务
if "observations" in kwargs:
self._services["observations"] = kwargs["observations"]
if "expressor" in kwargs:
self._services["expressor"] = kwargs["expressor"]
if "chat_stream" in kwargs:
self._services["chat_stream"] = kwargs["chat_stream"]
if "current_cycle" in kwargs:
self._services["current_cycle"] = kwargs["current_cycle"]
self.log_prefix = kwargs.get("log_prefix", "")
async def get_user_id_by_person_name(self, person_name: str) -> Tuple[str, str]:
"""根据用户名获取用户ID"""
person_id = person_info_manager.get_person_id_by_person_name(person_name)
user_id = await person_info_manager.get_value(person_id, "user_id")
platform = await person_info_manager.get_value(person_id, "platform")
return platform, user_id
# 提供简化的API方法
async def send_message(self, text: str, target: Optional[str] = None) -> bool:
"""发送消息的简化方法
Args:
text: 要发送的消息文本
target: 目标消息(可选)
Returns:
bool: 是否发送成功
"""
try:
expressor = self._services.get("expressor")
chat_stream = self._services.get("chat_stream")
if not expressor or not chat_stream:
logger.error(f"{self.log_prefix} 无法发送消息:缺少必要的内部服务")
return False
# 构造简化的动作数据
reply_data = {
"text": text,
"target": target or "",
"emojis": []
}
# 获取锚定消息(如果有)
observations = self._services.get("observations", [])
chatting_observation: ChattingObservation = next(
obs for obs in observations
if isinstance(obs, ChattingObservation)
)
anchor_message = chatting_observation.search_message_by_text(reply_data["target"])
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message.update_chat_stream(chat_stream)
response_set = [
("text", text),
]
# 调用内部方法发送消息
success = await expressor.send_response_messages(
anchor_message=anchor_message,
response_set=response_set,
)
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送消息时出错: {e}")
traceback.print_exc()
return False
async def send_message_by_expressor(self, text: str, target: Optional[str] = None) -> bool:
"""发送消息的简化方法
Args:
text: 要发送的消息文本
target: 目标消息(可选)
Returns:
bool: 是否发送成功
"""
try:
expressor = self._services.get("expressor")
chat_stream = self._services.get("chat_stream")
if not expressor or not chat_stream:
logger.error(f"{self.log_prefix} 无法发送消息:缺少必要的内部服务")
return False
# 构造简化的动作数据
reply_data = {
"text": text,
"target": target or "",
"emojis": []
}
# 获取锚定消息(如果有)
observations = self._services.get("observations", [])
chatting_observation: ChattingObservation = next(
obs for obs in observations
if isinstance(obs, ChattingObservation)
)
anchor_message = chatting_observation.search_message_by_text(reply_data["target"])
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message:
logger.info(f"{self.log_prefix} 未找到锚点消息,创建占位符")
anchor_message = await create_empty_anchor_message(
chat_stream.platform, chat_stream.group_info, chat_stream
)
else:
anchor_message.update_chat_stream(chat_stream)
# 调用内部方法发送消息
success, _ = await expressor.deal_reply(
cycle_timers=self.cycle_timers,
action_data=reply_data,
anchor_message=anchor_message,
reasoning=self.reasoning,
thinking_id=self.thinking_id
)
return success
except Exception as e:
logger.error(f"{self.log_prefix} 发送消息时出错: {e}")
return False
def get_chat_type(self) -> str:
"""获取当前聊天类型
Returns:
str: 聊天类型 ("group""private")
"""
chat_stream = self._services.get("chat_stream")
if chat_stream and hasattr(chat_stream, "group_info"):
return "group" if chat_stream.group_info else "private"
return "unknown"
def get_recent_messages(self, count: int = 5) -> List[Dict[str, Any]]:
"""获取最近的消息
Args:
count: 要获取的消息数量
Returns:
List[Dict]: 消息列表,每个消息包含发送者、内容等信息
"""
messages = []
observations = self._services.get("observations", [])
if observations and len(observations) > 0:
obs = observations[0]
if hasattr(obs, "get_talking_message"):
raw_messages = obs.get_talking_message()
# 转换为简化格式
for msg in raw_messages[-count:]:
simple_msg = {
"sender": msg.get("sender", "未知"),
"content": msg.get("content", ""),
"timestamp": msg.get("timestamp", 0)
}
messages.append(simple_msg)
return messages
@abstractmethod
async def process(self) -> Tuple[bool, str]:
"""插件处理逻辑,子类必须实现此方法
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
pass
async def handle_action(self) -> Tuple[bool, str]:
"""实现BaseAction的抽象方法调用子类的process方法
Returns:
Tuple[bool, str]: (是否执行成功, 回复文本)
"""
return await self.process()

View File

@@ -1,10 +1,8 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from src.common.logger_manager import get_logger
from src.chat.utils.timer_calculator import Timer
from src.chat.focus_chat.planners.actions.base_action import BaseAction, register_action
from typing import Tuple, List, Optional
from typing import Tuple, List
from src.chat.heart_flow.observation.observation import Observation
from src.chat.focus_chat.expressors.default_expressor import DefaultExpressor
from src.chat.message_receive.chat_stream import ChatStream
@@ -22,23 +20,22 @@ class ReplyAction(BaseAction):
处理构建和发送消息回复的动作。
"""
action_name:str = "reply"
action_description:str = "表达想法,可以只包含文本、表情或两者都有"
action_parameters:dict[str:str] = {
action_name: str = "reply"
action_description: str = "表达想法,可以只包含文本、表情或两者都有"
action_parameters: dict[str:str] = {
"text": "你想要表达的内容(可选)",
"emojis": "描述当前使用表情包的场景(可选)",
"target": "你想要回复的原始文本内容(非必须,仅文本,不包含发送者)(可选)",
}
action_require:list[str] = [
action_require: list[str] = [
"有实质性内容需要表达",
"有人提到你,但你还没有回应他",
"在合适的时候添加表情(不要总是添加)",
"如果你要回复特定某人的某句话或者你想回复较早的消息请在target中指定那句话的原始文本",
"除非有明确的回复目标如果选择了target不用特别提到某个人的人名",
"如果你有明确的,要回复特定某人的某句话或者你想回复较早的消息请在target中指定那句话的原始文本",
"一次只回复一个人,一次只回复一个话题,突出重点",
"如果是自己发的消息想继续,需自然衔接",
"避免重复或评价自己的发言,不要和自己聊天",
"注意:回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。"
"注意:回复尽量简短一些。可以参考贴吧,知乎和微博的回复风格,回复不要浮夸,不要用夸张修辞,平淡一些。不要有额外的符号,尽量简单简短",
]
default = True
@@ -54,7 +51,7 @@ class ReplyAction(BaseAction):
chat_stream: ChatStream,
current_cycle: CycleDetail,
log_prefix: str,
**kwargs
**kwargs,
):
"""初始化回复动作处理器
@@ -89,9 +86,9 @@ class ReplyAction(BaseAction):
reasoning=self.reasoning,
reply_data=self.action_data,
cycle_timers=self.cycle_timers,
thinking_id=self.thinking_id
thinking_id=self.thinking_id,
)
async def _handle_reply(
self, reasoning: str, reply_data: dict, cycle_timers: dict, thinking_id: str
) -> tuple[bool, str]:
@@ -105,13 +102,16 @@ class ReplyAction(BaseAction):
"emojis": "微笑" # 表情关键词列表(可选)
}
"""
# 重置连续不回复计数器
self.total_no_reply_count = 0
self.total_waiting_time = 0.0
# 从聊天观察获取锚定消息
observations: ChattingObservation = self.observations[0]
anchor_message = observations.serch_message_by_text(reply_data["target"])
chatting_observation: ChattingObservation = next(
obs for obs in self.observations
if isinstance(obs, ChattingObservation)
)
if reply_data.get("target"):
anchor_message = chatting_observation.search_message_by_text(reply_data["target"])
else:
anchor_message = None
# 如果没有找到锚点消息,创建一个占位符
if not anchor_message: