fix:收敛A_Memorix最小回归修复

最小修复聊天摘要写回游标恢复、摘要元数据透传、webui反馈参数解析、embedding批次缓存索引、图存储清理与配置默认值回归,并补齐针对性回归测试,确保问题解决且不影响现有逻辑。
This commit is contained in:
A-Dawn
2026-04-16 20:28:54 +08:00
parent 322309bef9
commit 6bfccf90a3
17 changed files with 361 additions and 60 deletions

View File

@@ -385,19 +385,19 @@ class EmbeddingAPIAdapter:
semaphore = asyncio.Semaphore(self.max_concurrent)
async def encode_with_semaphore(text: str, index: int):
async def encode_with_semaphore(text: str, batch_index: int, absolute_index: int):
async with semaphore:
embedding = await self._get_embedding_direct(text, dimensions=dimensions)
if embedding is None:
raise RuntimeError(f"文本 {index} 编码失败embedding 返回为空")
raise RuntimeError(f"文本 {absolute_index} 编码失败embedding 返回为空")
vector = self._validate_embedding_vector(
embedding,
source=f"文本 {index}",
source=f"文本 {absolute_index}",
)
return index, vector
return batch_index, vector
tasks = [
encode_with_semaphore(text, offset + index)
encode_with_semaphore(text, index, offset + index)
for index, text in uncached_items
]
results = await asyncio.gather(*tasks)

View File

@@ -803,6 +803,7 @@ class SDKMemoryKernel:
context_length: Optional[int] = None,
include_personality: Optional[bool] = None,
time_end: Optional[float] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
await self.initialize()
assert self.summary_importer
@@ -811,6 +812,7 @@ class SDKMemoryKernel:
context_length=context_length,
include_personality=include_personality,
time_end=time_end,
metadata=metadata,
)
if success:
await self.rebuild_episodes_for_sources([self._build_source("chat_summary", chat_id, [])])
@@ -854,6 +856,12 @@ class SDKMemoryKernel:
context_length=self._optional_int(summary_meta.get("context_length")),
include_personality=summary_meta.get("include_personality"),
time_end=time_end,
metadata={
**summary_meta,
"external_id": external_token,
"chat_id": str(chat_id or "").strip(),
"source_type": "chat_summary",
},
)
result.setdefault("external_id", external_id)
result.setdefault("chat_id", chat_id)

View File

@@ -1294,6 +1294,7 @@ class GraphStore:
if current_n == 0:
logger.warning("检测到空图元数据但邻接矩阵仍然存在,已重置为空图。")
self._adjacency = None
self._edge_hash_map = defaultdict(set)
elif current_n > adj_n:
logger.warning(f"检测到图存储维度不匹配: 节点数={current_n}, 矩阵大小={adj_n}. 正在自动修复...")
self._expand_adjacency_matrix(current_n - adj_n)
@@ -1305,6 +1306,14 @@ class GraphStore:
self._adjacency = csc_matrix((current_n, current_n), dtype=np.float32)
else:
self._adjacency = csr_matrix((current_n, current_n), dtype=np.float32)
self._edge_hash_map = defaultdict(
set,
{
(src_idx, dst_idx): set(hashes)
for (src_idx, dst_idx), hashes in self._edge_hash_map.items()
if src_idx < current_n and dst_idx < current_n
},
)
self._adjacency_dirty = True
logger.info(

View File

@@ -178,6 +178,27 @@ class MetadataStore:
int(knowledge_type_result.get("normalized", 0) or 0),
)
def _ensure_memory_feedback_task_columns(self, cursor: sqlite3.Cursor) -> None:
"""补齐 memory_feedback_tasks 历史库缺失的 rollback_* 列。"""
cursor.execute("PRAGMA table_info(memory_feedback_tasks)")
feedback_task_columns = {row[1] for row in cursor.fetchall()}
feedback_task_migrations = {
"rollback_status": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_status TEXT DEFAULT 'none'",
"rollback_plan_json": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_plan_json TEXT",
"rollback_result_json": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_result_json TEXT",
"rollback_error": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_error TEXT",
"rollback_requested_by": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_requested_by TEXT",
"rollback_reason": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_reason TEXT",
"rollback_requested_at": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_requested_at REAL",
"rolled_back_at": "ALTER TABLE memory_feedback_tasks ADD COLUMN rolled_back_at REAL",
}
for col, sql in feedback_task_migrations.items():
if col not in feedback_task_columns:
try:
cursor.execute(sql)
except sqlite3.OperationalError as e:
logger.warning(f"Schema迁移失败 (memory_feedback_tasks.{col}): {e}")
def close(self) -> None:
"""关闭数据库连接"""
if self._conn:
@@ -641,24 +662,7 @@ class MetadataStore:
CREATE INDEX IF NOT EXISTS idx_person_profile_refresh_queue_requested
ON person_profile_refresh_queue(requested_at DESC)
""")
cursor.execute("PRAGMA table_info(memory_feedback_tasks)")
feedback_task_columns = {row[1] for row in cursor.fetchall()}
feedback_task_migrations = {
"rollback_status": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_status TEXT DEFAULT 'none'",
"rollback_plan_json": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_plan_json TEXT",
"rollback_result_json": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_result_json TEXT",
"rollback_error": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_error TEXT",
"rollback_requested_by": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_requested_by TEXT",
"rollback_reason": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_reason TEXT",
"rollback_requested_at": "ALTER TABLE memory_feedback_tasks ADD COLUMN rollback_requested_at REAL",
"rolled_back_at": "ALTER TABLE memory_feedback_tasks ADD COLUMN rolled_back_at REAL",
}
for col, sql in feedback_task_migrations.items():
if col not in feedback_task_columns:
try:
cursor.execute(sql)
except sqlite3.OperationalError as e:
logger.warning(f"Schema迁移失败 (memory_feedback_tasks.{col}): {e}")
self._ensure_memory_feedback_task_columns(cursor)
cursor.execute("""
CREATE TABLE IF NOT EXISTS external_memory_refs (
external_id TEXT PRIMARY KEY,
@@ -953,6 +957,7 @@ class MetadataStore:
CREATE INDEX IF NOT EXISTS idx_person_profile_refresh_queue_requested
ON person_profile_refresh_queue(requested_at DESC)
""")
self._ensure_memory_feedback_task_columns(cursor)
cursor.execute("""
CREATE TABLE IF NOT EXISTS external_memory_refs (
external_id TEXT PRIMARY KEY,
@@ -3945,6 +3950,8 @@ class MetadataStore:
"episodes", "episode_paragraphs",
"episode_rebuild_sources", "episode_pending_paragraphs",
"paragraph_vector_backfill",
"memory_feedback_tasks", "memory_feedback_action_logs",
"paragraph_stale_relation_marks", "person_profile_refresh_queue",
]
for table in tables:
cursor.execute(f"DELETE FROM {table}")

View File

@@ -225,6 +225,7 @@ class SummaryImporter:
context_length: Optional[int] = None,
include_personality: Optional[bool] = None,
time_end: Optional[float] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[bool, str]:
"""
从指定的聊天流中提取记录并执行总结导入
@@ -327,7 +328,14 @@ class SummaryImporter:
}
# 6. 执行导入
await self._execute_import(summary_text, entities, relations, stream_id, time_meta=time_meta)
await self._execute_import(
summary_text,
entities,
relations,
stream_id,
time_meta=time_meta,
metadata=metadata,
)
# 7. 持久化
self.vector_store.save()
@@ -393,6 +401,7 @@ class SummaryImporter:
relations: List[Dict[str, str]],
stream_id: str,
time_meta: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
):
"""将数据写入存储"""
# 获取默认知识类型
@@ -407,6 +416,7 @@ class SummaryImporter:
hash_value = self.metadata_store.add_paragraph(
content=summary,
source=f"chat_summary:{stream_id}",
metadata=metadata,
knowledge_type=knowledge_type.value,
time_meta=time_meta,
)

View File

@@ -375,30 +375,6 @@ def _preflight_impl(config_path: Path, data_dir: Path) -> Dict[str, Any]:
"memory_feedback_tasks rollback columns missing under current schema version",
)
)
elif not has_stale_marks:
checks.append(
CheckItem(
"CP-15",
"error",
"paragraph_stale_relation_marks table missing under current schema version",
)
)
elif not has_profile_refresh_queue:
checks.append(
CheckItem(
"CP-16",
"error",
"person_profile_refresh_queue table missing under current schema version",
)
)
elif not has_feedback_rollback_status or not has_feedback_rollback_plan:
checks.append(
CheckItem(
"CP-17",
"error",
"memory_feedback_tasks rollback columns missing under current schema version",
)
)
if _sqlite_table_exists(conn, "relations"):
row = conn.execute("SELECT COUNT(*) FROM relations").fetchone()

View File

@@ -146,7 +146,7 @@ class VisualConfig(ConfigBase):
__ui_icon__ = "image"
planner_mode: Literal["text", "multimodal", "auto"] = Field(
default="text",
default="auto",
json_schema_extra={
"x-widget": "select",
"x-icon": "image",
@@ -155,7 +155,7 @@ class VisualConfig(ConfigBase):
"""Planner 视觉模式text 仅文本multimodal 强制多模态auto 按模型能力自动选择"""
replyer_mode: Literal["text", "multimodal", "auto"] = Field(
default="text",
default="auto",
json_schema_extra={
"x-widget": "select",
"x-icon": "git-branch",

View File

@@ -1134,7 +1134,14 @@ class MaisakaReasoningEngine:
tool_name=invocation.tool_name,
tool_reasoning=invocation.reasoning,
)
if invocation.tool_name == "query_memory" and isinstance(saved_record, dict):
except Exception:
logger.exception(
f"{self._runtime.log_prefix} 写入工具记录失败: 工具={invocation.tool_name} 调用编号={invocation.call_id}"
)
return
if invocation.tool_name == "query_memory" and isinstance(saved_record, dict):
try:
enqueue_payload = await memory_service.enqueue_feedback_task(
query_tool_id=str(saved_record.get("tool_id") or invocation.call_id or "").strip(),
session_id=str(saved_record.get("session_id") or self._runtime.chat_stream.session_id or "").strip(),
@@ -1143,15 +1150,16 @@ class MaisakaReasoningEngine:
if isinstance(tool_record_payload.get("structured_content"), dict)
else {},
)
except Exception:
logger.exception(
f"{self._runtime.log_prefix} 反馈纠错任务入队失败: tool_call_id={invocation.call_id}"
)
else:
if not bool(enqueue_payload.get("success")):
logger.debug(
f"{self._runtime.log_prefix} 反馈纠错任务未入队: "
f"tool_call_id={invocation.call_id} reason={enqueue_payload.get('reason', '')}"
)
except Exception:
logger.exception(
f"{self._runtime.log_prefix} 写入工具记录失败: 工具={invocation.tool_name} 调用编号={invocation.call_id}"
)
def _append_tool_execution_result(self, tool_call: ToolCall, result: ToolExecutionResult) -> None:
"""将统一工具执行结果写回 Maisaka 历史。

View File

@@ -6,10 +6,12 @@ from typing import Any, List, Optional
import asyncio
import json
import pickle
import time
from json_repair import repair_json
from src.services import memory_service as memory_service_module
from src.chat.utils.utils import is_bot_self
from src.common.logger import get_logger
from src.common.message_repository import count_messages, find_messages
@@ -281,7 +283,17 @@ class ChatSummaryWritebackService:
return
threshold = self._message_threshold()
state = self._states.setdefault(session_id, ChatSummaryWritebackState())
state = self._states.get(session_id)
if state is None:
restored_count = await self._load_last_trigger_message_count(
session_id=session_id,
total_message_count=total_message_count,
)
state = ChatSummaryWritebackState(
last_trigger_message_count=restored_count,
last_trigger_time=time.time() if restored_count > 0 else 0.0,
)
self._states[session_id] = state
pending_message_count = max(0, total_message_count - state.last_trigger_message_count)
if pending_message_count < threshold:
return
@@ -324,6 +336,64 @@ class ChatSummaryWritebackService:
getattr(result, "detail", ""),
)
async def _load_last_trigger_message_count(self, *, session_id: str, total_message_count: int) -> int:
"""从已落库的聊天摘要恢复触发游标,避免服务重启后重复摘要。"""
try:
runtime_manager = getattr(memory_service_module, "a_memorix_host_service", None)
ensure_kernel = getattr(runtime_manager, "_ensure_kernel", None)
if not callable(ensure_kernel):
return 0
kernel = await ensure_kernel()
metadata_store = getattr(kernel, "metadata_store", None)
if metadata_store is None:
return 0
paragraphs = metadata_store.get_paragraphs_by_source(f"chat_summary:{session_id}")
if not paragraphs:
return 0
latest_paragraph = max(paragraphs, key=self._paragraph_created_at)
metadata = self._paragraph_metadata(latest_paragraph)
trigger_message_count = self._coerce_positive_int(metadata.get("trigger_message_count"))
if trigger_message_count > 0:
return min(total_message_count, trigger_message_count)
# 兼容旧摘要数据:没有触发计数时,只能退化为对齐当前计数,
# 至少避免重启后立刻重复写入一条相近摘要。
return total_message_count
except Exception as exc:
logger.debug("恢复聊天摘要写回游标失败: session_id=%s error=%s", session_id, exc)
return 0
@staticmethod
def _paragraph_created_at(paragraph: dict[str, Any]) -> float:
try:
return float(paragraph.get("created_at") or 0.0)
except Exception:
return 0.0
@staticmethod
def _paragraph_metadata(paragraph: dict[str, Any]) -> dict[str, Any]:
metadata = paragraph.get("metadata")
if isinstance(metadata, dict):
return metadata
if isinstance(metadata, (bytes, bytearray)):
try:
parsed = pickle.loads(metadata)
except Exception:
return {}
return parsed if isinstance(parsed, dict) else {}
return {}
@staticmethod
def _coerce_positive_int(value: Any) -> int:
try:
number = int(value or 0)
except Exception:
return 0
return max(0, number)
@staticmethod
def _resolve_session_id(message: Any) -> str:
return str(

View File

@@ -208,7 +208,7 @@ def _resolve_static_path() -> Path | None:
# 开发环境优先允许复用仓库里的现成 dist
base_dir = _get_project_root()
static_path = base_dir / "dashboard" / "dist"
if static_path.exists():
if static_path.is_dir() and (static_path / "index.html").exists():
return static_path
try:

View File

@@ -365,11 +365,13 @@ async def _profile_delete_override(person_id: str) -> dict:
async def _feedback_list(limit: int, status: str, rollback_status: str, query: str) -> dict:
statuses = [item.strip() for item in str(status or "").split(",") if item.strip()]
rollback_statuses = [item.strip() for item in str(rollback_status or "").split(",") if item.strip()]
return await memory_service.feedback_admin(
action="list",
limit=limit,
status=status,
rollback_status=rollback_status,
statuses=statuses,
rollback_statuses=rollback_statuses,
query=query,
)