增加了event_handler修改内容的方法
This commit is contained in:
@@ -71,7 +71,7 @@ class EventsManager:
|
||||
llm_response: Optional["LLMGenerationDataModel"] = None,
|
||||
stream_id: Optional[str] = None,
|
||||
action_usage: Optional[List[str]] = None,
|
||||
) -> bool:
|
||||
) -> Tuple[bool, Optional[MaiMessages]]:
|
||||
"""
|
||||
处理所有事件,根据事件类型分发给订阅的处理器。
|
||||
"""
|
||||
@@ -89,10 +89,10 @@ class EventsManager:
|
||||
# 2. 获取并遍历处理器
|
||||
handlers = self._events_subscribers.get(event_type, [])
|
||||
if not handlers:
|
||||
return True
|
||||
return True, None
|
||||
|
||||
current_stream_id = transformed_message.stream_id if transformed_message else None
|
||||
|
||||
modified_message: Optional[MaiMessages] = None
|
||||
for handler in handlers:
|
||||
# 3. 前置检查和配置加载
|
||||
if (
|
||||
@@ -107,15 +107,19 @@ class EventsManager:
|
||||
handler.set_plugin_config(plugin_config)
|
||||
|
||||
# 4. 根据类型分发任务
|
||||
if handler.intercept_message or event_type == EventType.ON_STOP: # 让ON_STOP的所有事件处理器都发挥作用,防止还没执行即被取消
|
||||
if (
|
||||
handler.intercept_message or event_type == EventType.ON_STOP
|
||||
): # 让ON_STOP的所有事件处理器都发挥作用,防止还没执行即被取消
|
||||
# 阻塞执行,并更新 continue_flag
|
||||
should_continue = await self._dispatch_intercepting_handler(handler, event_type, transformed_message)
|
||||
should_continue, modified_message = await self._dispatch_intercepting_handler_task(
|
||||
handler, event_type, modified_message or transformed_message
|
||||
)
|
||||
continue_flag = continue_flag and should_continue
|
||||
else:
|
||||
# 异步执行,不阻塞
|
||||
self._dispatch_handler_task(handler, event_type, transformed_message)
|
||||
|
||||
return continue_flag
|
||||
return continue_flag, modified_message
|
||||
|
||||
async def cancel_handler_tasks(self, handler_name: str) -> None:
|
||||
tasks_to_be_cancelled = self._handler_tasks.get(handler_name, [])
|
||||
@@ -327,16 +331,18 @@ class EventsManager:
|
||||
except Exception as e:
|
||||
logger.error(f"创建事件处理器任务 {handler.handler_name} 时发生异常: {e}", exc_info=True)
|
||||
|
||||
async def _dispatch_intercepting_handler(
|
||||
async def _dispatch_intercepting_handler_task(
|
||||
self, handler: BaseEventHandler, event_type: EventType | str, message: Optional[MaiMessages] = None
|
||||
) -> bool:
|
||||
) -> Tuple[bool, Optional[MaiMessages]]:
|
||||
"""分发并等待一个阻塞(同步)的事件处理器,返回是否应继续处理。"""
|
||||
if event_type == EventType.UNKNOWN:
|
||||
raise ValueError("未知事件类型")
|
||||
if event_type not in self._history_enable_map:
|
||||
raise ValueError(f"事件类型 {event_type} 未注册")
|
||||
try:
|
||||
success, continue_processing, return_message, custom_result = await handler.execute(message)
|
||||
success, continue_processing, return_message, custom_result, modified_message = await handler.execute(
|
||||
message
|
||||
)
|
||||
|
||||
if not success:
|
||||
logger.error(f"EventHandler {handler.handler_name} 执行失败: {return_message}")
|
||||
@@ -345,17 +351,17 @@ class EventsManager:
|
||||
|
||||
if self._history_enable_map[event_type] and custom_result:
|
||||
self._events_result_history[event_type].append(custom_result)
|
||||
return continue_processing
|
||||
return continue_processing, modified_message
|
||||
except KeyError:
|
||||
logger.error(f"事件 {event_type} 注册的历史记录启用情况与实际不符合")
|
||||
return True
|
||||
return True, None
|
||||
except Exception as e:
|
||||
logger.error(f"EventHandler {handler.handler_name} 发生异常: {e}", exc_info=True)
|
||||
return True # 发生异常时默认不中断其他处理
|
||||
return True, None # 发生异常时默认不中断其他处理
|
||||
|
||||
def _task_done_callback(
|
||||
self,
|
||||
task: asyncio.Task[Tuple[bool, bool, str | None, CustomEventHandlerResult | None]],
|
||||
task: asyncio.Task[Tuple[bool, bool, str | None, CustomEventHandlerResult | None, MaiMessages | None]],
|
||||
event_type: EventType | str,
|
||||
):
|
||||
"""任务完成回调"""
|
||||
@@ -365,7 +371,7 @@ class EventsManager:
|
||||
if event_type not in self._history_enable_map:
|
||||
raise ValueError(f"事件类型 {event_type} 未注册")
|
||||
try:
|
||||
success, _, result, custom_result = task.result() # 忽略是否继续的标志,因为消息本身未被拦截
|
||||
success, _, result, custom_result, _ = task.result() # 忽略是否继续的标志和消息的修改,因为消息本身未被拦截
|
||||
if success:
|
||||
logger.debug(f"事件处理任务 {task_name} 已成功完成: {result}")
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user