From 712bcd3605fc1446bacd7f7e140a41f21e0bd15a Mon Sep 17 00:00:00 2001 From: Losita <2810873701@qq.com> Date: Sun, 15 Mar 2026 23:38:46 +0800 Subject: [PATCH] =?UTF-8?q?Version:=200.6.0.dev.260315=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E5=AF=B9=E8=AF=9D=E8=90=BD=E5=BA=93=E5=AF=B9=E8=AF=9D?= =?UTF-8?q?=E8=AE=A1=E6=95=B0=E5=8E=9F=E5=AD=90=E8=87=AA=E5=A2=9E=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=EF=BC=8C=E7=A1=AE=E4=BF=9D=E8=81=8A=E5=A4=A9=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E5=92=8C=E6=B6=88=E6=81=AF=E8=AE=A1=E6=95=B0=E4=BA=8C?= =?UTF-8?q?=E8=80=85=E5=90=8C=E6=97=B6=E8=90=BD=E5=BA=93=EF=BC=8C=E4=BF=9D?= =?UTF-8?q?=E8=AF=81=E4=B8=80=E8=87=B4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/dao/agent.go | 47 +++++++++++++++++++++++------- backend/infra/outbox/repository.go | 24 +++++++++++++-- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/backend/dao/agent.go b/backend/dao/agent.go index c1c9b65..b572e0b 100644 --- a/backend/dao/agent.go +++ b/backend/dao/agent.go @@ -3,7 +3,9 @@ package dao import ( "context" "errors" + "fmt" "strings" + "time" "github.com/LoveLosita/smartflow/backend/model" "gorm.io/gorm" @@ -18,16 +20,41 @@ func NewAgentDAO(db *gorm.DB) *AgentDAO { } func (a *AgentDAO) SaveChatHistory(ctx context.Context, userID int, conversationID string, role, message string) error { - userChat := model.ChatHistory{ - UserID: userID, - MessageContent: &message, - Role: &role, - ChatID: conversationID, - } - if err := a.db.WithContext(ctx).Create(&userChat).Error; err != nil { - return err - } - return nil + // 1. 同步落库路径也要保证“消息写入”和“会话计数更新”原子一致。 + // 因此这里使用事务,避免出现“有消息但 message_count 没加”或反过来的不一致状态。 + return a.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + // 1.1 先写 chat_histories。 + userChat := model.ChatHistory{ + UserID: userID, + MessageContent: &message, + Role: &role, + ChatID: conversationID, + } + if err := tx.Create(&userChat).Error; err != nil { + return err + } + + // 1.2 再原子更新 agent_chats 的统计字段: + // - message_count: +1 + // - last_message_at: 当前时间 + // 这样 message_count 语义就稳定等于“已成功落库的消息条数”。 + now := time.Now() + updates := map[string]interface{}{ + "message_count": gorm.Expr("message_count + ?", 1), + "last_message_at": &now, + } + result := tx.Model(&model.AgentChat{}). + Where("user_id = ? AND chat_id = ?", userID, conversationID). + Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + // 会话不存在视为数据不一致,回滚事务,防止产生“孤儿历史记录”。 + return fmt.Errorf("conversation not found when updating stats: user_id=%d chat_id=%s", userID, conversationID) + } + return nil + }) } func (a *AgentDAO) CreateNewChat(userID int, chatID string) (int64, error) { diff --git a/backend/infra/outbox/repository.go b/backend/infra/outbox/repository.go index 0a5849b..10bf319 100644 --- a/backend/infra/outbox/repository.go +++ b/backend/infra/outbox/repository.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "time" "github.com/LoveLosita/smartflow/backend/model" @@ -224,9 +225,28 @@ func (d *Repository) PersistChatHistoryAndMarkConsumed(ctx context.Context, outb return err } - // 3. 业务写入成功后,把 outbox 推进到 consumed 最终态。 - // 并清理错误与重试字段,表示该消息生命周期结束。 + // 3. 同一事务内原子更新会话统计信息: + // - message_count + 1 + // - last_message_at = now + // 这样可以保证 message_count 与 chat_histories 的真实落库条数一致。 now := time.Now() + chatUpdates := map[string]interface{}{ + "message_count": gorm.Expr("message_count + ?", 1), + "last_message_at": &now, + } + chatResult := tx.Model(&model.AgentChat{}). + Where("user_id = ? AND chat_id = ?", payload.UserID, payload.ConversationID). + Updates(chatUpdates) + if chatResult.Error != nil { + return chatResult.Error + } + if chatResult.RowsAffected == 0 { + // 会话不存在时回滚,让 outbox 继续重试/告警,而不是吞掉不一致。 + return fmt.Errorf("conversation not found when updating stats: user_id=%d chat_id=%s", payload.UserID, payload.ConversationID) + } + + // 4. 业务写入成功后,把 outbox 推进到 consumed 最终态。 + // 并清理错误与重试字段,表示该消息生命周期结束。 updates := map[string]interface{}{ "status": model.OutboxStatusConsumed, "consumed_at": &now,