Version: 0.4.8.dev.260308

feat: 🏗️ 实现 Agent 消息可靠异步持久化(Outbox + Kafka)

* 新增 Outbox 数据模型与消息载荷定义,位于 `backend/model/outbox.go`
* 新增 Outbox DAO,支持创建、扫描、发布标记、失败重试与消费落库事务,位于 `backend/dao/outbox.go`
* 新增 Kafka 基础封装,包含配置、生产者、消费者与消息包装,位于 `backend/kafka` 文件夹

  * `config.go`:Kafka 配置文件
  * `producer.go`:Kafka 生产者
  * `consumer.go`:Kafka 消费者
  * `envelope.go`:消息封装处理
* 新增异步管道服务,处理扫描投递与消费落库,位于 `backend/service/agent_async_pipeline.go`
* 接入 Agent 聊天链路的可靠持久化,替换原有 goroutine 直接写库逻辑,位于 `backend/service/agent.go`
* 启动流程接入管道初始化与启动,位于 `backend/cmd/start.go`
* 增加 Kafka 配置项,更新 `backend/config.yaml` 与 `backend/config.example.yaml`
* 引入 Kafka 依赖:`github.com/segmentio/kafka-go`(见 `backend/go.mod`, `backend/go.sum`)

fix: 🐛 修复首启偶发 user 消息重复落库问题

* 解决因 Outbox 状态并发回写竞态,导致 `consumed` 被晚到的 `published` 覆盖的问题
* 在 `MarkPublished` 中增加条件,避免覆盖已标记为 `consumed` 或 `dead` 的消息,修复位置:`backend/dao/outbox.go`

perf:  更新 Docker Compose 配置与 Kafka 相关服务

* 更新 `docker-compose.yml` 文件,新增 Kafka 配置与服务

fix: 🧹 优化缓存删除逻辑

* 在 `cache deleter` 中忽略了 `model.AgentOutboxMessage`、`model.ChatHistory` 与 `model.AgentChat` 这三个结构体
* 防止这些结构体对应的表单删除缓存时,导致控制台消息爆炸
This commit is contained in:
LoveLosita
2026-03-08 12:53:54 +08:00
parent 4906f814fd
commit 1ed558b488
17 changed files with 800 additions and 66 deletions

View File

@@ -3,3 +3,4 @@
## 协作偏好(逐条追加)
1. 默认语言规则:所有注释、接口文案、说明、评审反馈均使用中文。
2. 请勤加注释,尤其是复杂逻辑部分,确保代码易于理解和维护。

View File

@@ -1,12 +1,14 @@
package cmd
import (
"context"
"fmt"
"log"
"github.com/LoveLosita/smartflow/backend/api"
"github.com/LoveLosita/smartflow/backend/dao"
"github.com/LoveLosita/smartflow/backend/inits"
kafkabus "github.com/LoveLosita/smartflow/backend/kafka"
"github.com/LoveLosita/smartflow/backend/middleware"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/LoveLosita/smartflow/backend/routers"
@@ -42,16 +44,15 @@ func Start() {
}
rdb := inits.InitRedis()
//工具包
// 工具包
limiter := pkg.NewRateLimiter(rdb)
//初始化eino
// 初始化 eino
aiHub, err := inits.InitEino()
if err != nil {
log.Fatalf("Failed to initialize Eino: %v", err)
}
//中间件
//dao 层
// 中间件
// dao 层
cacheRepo := dao.NewCacheDAO(rdb)
agentCacheRepo := dao.NewAgentCache(rdb)
_ = db.Use(middleware.NewGormCachePlugin(cacheRepo)) // 注册 GORM 插件
@@ -62,14 +63,29 @@ func Start() {
scheduleRepo := dao.NewScheduleDAO(db)
manager := dao.NewManager(db)
agentRepo := dao.NewAgentDAO(db)
//service 层
outboxRepo := dao.NewOutboxDAO(db)
//
kafkaCfg := kafkabus.LoadConfig()
asyncPipeline, err := service.NewAgentAsyncPipeline(outboxRepo, kafkaCfg)
if err != nil {
log.Fatalf("Failed to initialize Kafka async pipeline: %v", err)
}
if asyncPipeline != nil {
asyncPipeline.Start(context.Background())
defer asyncPipeline.Close()
log.Println("Kafka async pipeline started")
} else {
log.Println("Kafka async pipeline is disabled")
}
// service 层
userService := service.NewUserService(userRepo, cacheRepo)
taskSv := service.NewTaskService(taskRepo, cacheRepo)
courseService := service.NewCourseService(courseRepo, scheduleRepo)
taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager)
scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo)
agentService := service.NewAgentService(aiHub, agentRepo, agentCacheRepo)
//api 层
agentService := service.NewAgentService(aiHub, agentRepo, agentCacheRepo, asyncPipeline)
// api 层
userApi := api.NewUserHandler(userService)
taskApi := api.NewTaskHandler(taskSv)
courseApi := api.NewCourseHandler(courseService)

View File

@@ -33,6 +33,16 @@ redis:
password: ""
db: 0
kafka:
enabled: true
brokers:
- "localhost:9092"
topic: "smartflow.agent.outbox"
groupID: "smartflow-agent-outbox-consumer"
retryScanInterval: 1s
retryBatchSize: 100
maxRetry: 20
time:
zone: "Asia/Shanghai"
semesterStartDate: "2026-03-02" #学期开始日期,一定要设定为周一,以便于计算周数
@@ -41,4 +51,4 @@ time:
agent:
workerModel: "doubao-seed-1-6-lite-251015" # 智能体使用的Worker模型需根据实际情况调整
strategistModel: "deepseek-v3-2-251201" # 策略师使用的Worker模型需根据实际情况调整
baseURL: "https://ark.cn-beijing.volces.com/api/v3" # Worker服务的基础URL需根据实际情况调整
baseURL: "https://ark.cn-beijing.volces.com/api/v3" # Worker服务的基础URL需根据实际情况调整

190
backend/dao/outbox.go Normal file
View File

@@ -0,0 +1,190 @@
package dao
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/LoveLosita/smartflow/backend/model"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type OutboxDAO struct {
db *gorm.DB
}
func NewOutboxDAO(db *gorm.DB) *OutboxDAO {
return &OutboxDAO{db: db}
}
func (d *OutboxDAO) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) {
if maxRetry <= 0 {
maxRetry = 20
}
raw, err := json.Marshal(payload)
if err != nil {
return 0, err
}
now := time.Now()
msg := model.AgentOutboxMessage{
BizType: model.OutboxBizTypeChatHistoryPersist,
Topic: topic,
MessageKey: messageKey,
Payload: string(raw),
Status: model.OutboxStatusPending,
RetryCount: 0,
MaxRetry: maxRetry,
NextRetryAt: &now,
}
if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil {
return 0, err
}
return msg.ID, nil
}
func (d *OutboxDAO) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) {
var msg model.AgentOutboxMessage
if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil {
return nil, err
}
return &msg, nil
}
func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) {
if limit <= 0 {
limit = 100
}
now := time.Now()
var messages []model.AgentOutboxMessage
err := d.db.WithContext(ctx).
Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", model.OutboxStatusPending, now).
Order("next_retry_at ASC, id ASC").
Limit(limit).
Find(&messages).Error
if err != nil {
return nil, err
}
return messages, nil
}
// MarkPublished 仅在消息未进入最终态时更新为 published避免覆盖 consumed/dead。
func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error {
now := time.Now()
updates := map[string]interface{}{
"status": model.OutboxStatusPublished,
"published_at": &now,
"last_error": nil,
"next_retry_at": nil,
}
result := d.db.WithContext(ctx).
Model(&model.AgentOutboxMessage{}).
Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead).
Updates(updates)
return result.Error
}
func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error {
now := time.Now()
lastErr := truncateError(reason)
updates := map[string]interface{}{
"status": model.OutboxStatusDead,
"last_error": &lastErr,
"next_retry_at": nil,
"updated_at": now,
}
return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
}
func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason string) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var msg model.AgentOutboxMessage
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error
if err != nil {
return err
}
if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead {
return nil
}
nextRetryCount := msg.RetryCount + 1
now := time.Now()
status := model.OutboxStatusPending
var nextRetryAt *time.Time
if nextRetryCount >= msg.MaxRetry {
status = model.OutboxStatusDead
nextRetryAt = nil
} else {
t := now.Add(calcRetryBackoff(nextRetryCount))
nextRetryAt = &t
}
lastErr := truncateError(reason)
updates := map[string]interface{}{
"status": status,
"retry_count": nextRetryCount,
"last_error": &lastErr,
"next_retry_at": nextRetryAt,
"updated_at": now,
}
return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error
})
}
func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error {
return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
var outboxMsg model.AgentOutboxMessage
err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
if outboxMsg.Status == model.OutboxStatusConsumed {
return nil
}
if outboxMsg.Status == model.OutboxStatusDead {
return nil
}
chatMsg := payload.Message
chatRole := payload.Role
history := model.ChatHistory{
UserID: payload.UserID,
ChatID: payload.ConversationID,
MessageContent: &chatMsg,
Role: &chatRole,
}
if err = tx.Create(&history).Error; err != nil {
return err
}
now := time.Now()
updates := map[string]interface{}{
"status": model.OutboxStatusConsumed,
"consumed_at": &now,
"last_error": nil,
"next_retry_at": nil,
"updated_at": now,
}
return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error
})
}
func calcRetryBackoff(retryCount int) time.Duration {
if retryCount <= 0 {
return time.Second
}
if retryCount > 6 {
retryCount = 6
}
return time.Second * time.Duration(1<<(retryCount-1))
}
func truncateError(reason string) string {
if len(reason) <= 2000 {
return reason
}
return reason[:2000]
}

View File

@@ -45,6 +45,7 @@ require (
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
@@ -54,10 +55,12 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nikolalohinski/gonja v1.5.3 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.54.0 // indirect
github.com/sagikazarmark/locafero v0.11.0 // indirect
github.com/segmentio/kafka-go v0.4.47 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f // indirect
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect

View File

@@ -127,6 +127,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -168,6 +170,8 @@ github.com/onsi/gomega v1.27.3 h1:5VwIwnBY3vbBDOJrNtA4rVdiTZCsq9B5F12pvy1Drmk=
github.com/onsi/gomega v1.27.3/go.mod h1:5vG284IBtfDAmDyrK+eGyZmUgUlmi+Wngqo557cZ6Gw=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -183,6 +187,8 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f
github.com/rollbar/rollbar-go v1.0.2/go.mod h1:AcFs5f0I+c71bpHlXNNDbOWJiKwjFDtISeXco0L5PKQ=
github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc=
github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
@@ -229,8 +235,12 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc=
github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
@@ -239,6 +249,8 @@ golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -247,6 +259,8 @@ golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N0
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg=
golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -254,33 +268,64 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg=
golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0=
golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=

58
backend/kafka/config.go Normal file
View File

@@ -0,0 +1,58 @@
package kafka
import (
"strings"
"time"
"github.com/spf13/viper"
)
const (
DefaultTopic = "smartflow.agent.outbox"
DefaultGroup = "smartflow-agent-outbox-consumer"
)
type Config struct {
Enabled bool
Brokers []string
Topic string
GroupID string
RetryScanInterval time.Duration
RetryBatchSize int
MaxRetry int
}
func LoadConfig() Config {
brokers := viper.GetStringSlice("kafka.brokers")
if len(brokers) == 0 {
single := strings.TrimSpace(viper.GetString("kafka.broker"))
if single != "" {
brokers = []string{single}
}
}
cfg := Config{
Enabled: viper.GetBool("kafka.enabled"),
Brokers: brokers,
Topic: strings.TrimSpace(viper.GetString("kafka.topic")),
GroupID: strings.TrimSpace(viper.GetString("kafka.groupID")),
RetryScanInterval: viper.GetDuration("kafka.retryScanInterval"),
RetryBatchSize: viper.GetInt("kafka.retryBatchSize"),
MaxRetry: viper.GetInt("kafka.maxRetry"),
}
if cfg.Topic == "" {
cfg.Topic = DefaultTopic
}
if cfg.GroupID == "" {
cfg.GroupID = DefaultGroup
}
if cfg.RetryScanInterval <= 0 {
cfg.RetryScanInterval = time.Second
}
if cfg.RetryBatchSize <= 0 {
cfg.RetryBatchSize = 100
}
if cfg.MaxRetry <= 0 {
cfg.MaxRetry = 20
}
return cfg
}

50
backend/kafka/consumer.go Normal file
View File

@@ -0,0 +1,50 @@
package kafka
import (
"context"
"errors"
segmentkafka "github.com/segmentio/kafka-go"
)
type Consumer struct {
reader *segmentkafka.Reader
}
func NewConsumer(cfg Config) (*Consumer, error) {
if len(cfg.Brokers) == 0 {
return nil, errors.New("kafka brokers 未配置")
}
reader := segmentkafka.NewReader(segmentkafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: 1,
MaxBytes: 10e6,
CommitInterval: 0,
StartOffset: segmentkafka.FirstOffset,
})
return &Consumer{reader: reader}, nil
}
// Dequeue 从 Kafka 拉取一条消息(手动提交 offset
func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) {
if c == nil || c.reader == nil {
return segmentkafka.Message{}, errors.New("kafka consumer 未初始化")
}
return c.reader.FetchMessage(ctx)
}
func (c *Consumer) Commit(ctx context.Context, msg segmentkafka.Message) error {
if c == nil || c.reader == nil {
return errors.New("kafka consumer 未初始化")
}
return c.reader.CommitMessages(ctx, msg)
}
func (c *Consumer) Close() error {
if c == nil || c.reader == nil {
return nil
}
return c.reader.Close()
}

10
backend/kafka/envelope.go Normal file
View File

@@ -0,0 +1,10 @@
package kafka
import "encoding/json"
// Envelope 是投递到 Kafka 的统一包裹结构。
type Envelope struct {
OutboxID int64 `json:"outbox_id"`
BizType string `json:"biz_type"`
Payload json.RawMessage `json:"payload"`
}

45
backend/kafka/producer.go Normal file
View File

@@ -0,0 +1,45 @@
package kafka
import (
"context"
"errors"
segmentkafka "github.com/segmentio/kafka-go"
)
type Producer struct {
writer *segmentkafka.Writer
}
func NewProducer(cfg Config) (*Producer, error) {
if len(cfg.Brokers) == 0 {
return nil, errors.New("kafka brokers 未配置")
}
writer := &segmentkafka.Writer{
Addr: segmentkafka.TCP(cfg.Brokers...),
Balancer: &segmentkafka.Hash{},
RequiredAcks: segmentkafka.RequireOne,
Async: false,
}
return &Producer{writer: writer}, nil
}
// Enqueue 将消息写入 Kafka。
func (p *Producer) Enqueue(ctx context.Context, topic, key string, value []byte) error {
if p == nil || p.writer == nil {
return errors.New("kafka producer 未初始化")
}
msg := segmentkafka.Message{
Topic: topic,
Key: []byte(key),
Value: value,
}
return p.writer.WriteMessages(ctx, msg)
}
func (p *Producer) Close() error {
if p == nil || p.writer == nil {
return nil
}
return p.writer.Close()
}

View File

@@ -62,6 +62,8 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}, db *gorm.DB)
p.invalidTaskClassCache(*m.UserID)
case model.Task:
p.invalidTaskCache(m.UserID)
case model.AgentOutboxMessage, model.ChatHistory, model.AgentChat:
// 这些模型目前没有定义缓存逻辑,先不处理
default:
// 只有真正没定义的模型才会到这里
log.Printf("[GORM-Cache] No logic defined for model: %T", modelObj)

42
backend/model/outbox.go Normal file
View File

@@ -0,0 +1,42 @@
package model
import "time"
const (
OutboxStatusPending = "pending"
OutboxStatusPublished = "published"
OutboxStatusConsumed = "consumed"
OutboxStatusDead = "dead"
OutboxBizTypeChatHistoryPersist = "chat_history_persist"
)
// AgentOutboxMessage 保存需要异步投递到 Kafka 的消息。
type AgentOutboxMessage struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
BizType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:业务类型"`
Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"`
MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"`
Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"`
Status string `gorm:"column:status;type:varchar(32);not null;index:idx_outbox_status_next,priority:1;comment:pending/published/consumed/dead"`
RetryCount int `gorm:"column:retry_count;not null;default:0;comment:已重试次数"`
MaxRetry int `gorm:"column:max_retry;not null;default:20;comment:最大重试次数"`
NextRetryAt *time.Time `gorm:"column:next_retry_at;index:idx_outbox_status_next,priority:2;comment:下次重试时间"`
LastError *string `gorm:"column:last_error;type:text;comment:最后一次错误"`
PublishedAt *time.Time `gorm:"column:published_at;comment:投递到 Kafka 时间"`
ConsumedAt *time.Time `gorm:"column:consumed_at;comment:消费完成时间"`
CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"`
UpdatedAt *time.Time `gorm:"column:updated_at;autoUpdateTime"`
}
func (AgentOutboxMessage) TableName() string {
return "agent_outbox_messages"
}
// ChatHistoryPersistPayload 是“聊天记录持久化”消息体。
type ChatHistoryPersistPayload struct {
UserID int `json:"user_id"`
ConversationID string `json:"conversation_id"`
Role string `json:"role"`
Message string `json:"message"`
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/LoveLosita/smartflow/backend/conv"
"github.com/LoveLosita/smartflow/backend/dao"
"github.com/LoveLosita/smartflow/backend/inits"
"github.com/LoveLosita/smartflow/backend/model"
"github.com/LoveLosita/smartflow/backend/pkg"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/schema"
@@ -16,16 +17,18 @@ import (
)
type AgentService struct {
AIHub *inits.AIHub
repo *dao.AgentDAO
agentCache *dao.AgentCache
AIHub *inits.AIHub
repo *dao.AgentDAO
agentCache *dao.AgentCache
asyncPipeline *AgentAsyncPipeline
}
func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, agentRedis *dao.AgentCache) *AgentService {
func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, agentRedis *dao.AgentCache, asyncPipeline *AgentAsyncPipeline) *AgentService {
return &AgentService{
AIHub: aiHub,
repo: repo,
agentCache: agentRedis,
AIHub: aiHub,
repo: repo,
agentCache: agentRedis,
asyncPipeline: asyncPipeline,
}
}
@@ -38,19 +41,34 @@ func normalizeConversationID(chatID string) string {
}
func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, string) {
model := strings.TrimSpace(requestModel)
if strings.EqualFold(model, "strategist") {
modelName := strings.TrimSpace(requestModel)
if strings.EqualFold(modelName, "strategist") {
return s.AIHub.Strategist, "strategist"
}
return s.AIHub.Worker, "worker"
}
func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error {
if s.asyncPipeline == nil {
return s.repo.SaveChatHistory(ctx, payload.UserID, payload.ConversationID, payload.Role, payload.Message)
}
return s.asyncPipeline.EnqueueChatHistoryPersist(ctx, payload)
}
func pushErrNonBlocking(errChan chan error, err error) {
select {
case errChan <- err:
default:
log.Printf("error channel is full, drop error: %v", err)
}
}
func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string) (<-chan string, <-chan error) {
// 1) 准备输出通道
outChan := make(chan string, 5)
errChan := make(chan error, 1)
// 2) 规范会话并选择模型
// 2) 规范会话并选择模型
chatID = normalizeConversationID(chatID)
selectedModel, resolvedModelName := s.pickChatModel(modelName)
@@ -63,9 +81,9 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
return outChan, errChan
}
if !result {
innerResult, err := s.repo.IfChatExists(ctx, userID, chatID)
if err != nil {
errChan <- err
innerResult, ifErr := s.repo.IfChatExists(ctx, userID, chatID)
if ifErr != nil {
errChan <- ifErr
close(outChan)
close(errChan)
return outChan, errChan
@@ -95,9 +113,9 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
cacheMiss := false
if chatHistory == nil {
cacheMiss = true
histories, err := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel(resolvedModelName), chatID)
if err != nil {
errChan <- err
histories, hisErr := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel(resolvedModelName), chatID)
if hisErr != nil {
errChan <- hisErr
close(outChan)
close(errChan)
return outChan, errChan
@@ -112,10 +130,10 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
// 6) 根据最新裁剪结果动态调整 Redis 会话窗口
targetWindow := pkg.CalcSessionWindowSize(len(chatHistory))
if err := s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil {
if err = s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil {
log.Printf("failed to set history window for %s: %v", chatID, err)
}
if err := s.agentCache.EnforceHistoryWindow(ctx, chatID); err != nil {
if err = s.agentCache.EnforceHistoryWindow(ctx, chatID); err != nil {
log.Printf("failed to enforce history window for %s: %v", chatID, err)
}
@@ -126,7 +144,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
// 缓存未命中时,把“裁剪后的历史”回填进缓存
if cacheMiss {
if err := s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil {
if err = s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil {
errChan <- err
close(outChan)
close(errChan)
@@ -134,37 +152,44 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin
}
}
// 7) 异步落用户消息(先写缓存再写库)
go func() {
bg := context.Background()
_ = s.agentCache.PushMessage(bg, chatID, &schema.Message{
Role: schema.User,
Content: userMessage,
})
_ = s.repo.SaveChatHistory(bg, userID, chatID, "user", userMessage)
}()
// 7) 先同步写 Redis再把持久化请求交给 outbox + Kafka
if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil {
log.Printf("failed to push user message into redis history: %v", err)
}
if err = s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{
UserID: userID,
ConversationID: chatID,
Role: "user",
Message: userMessage,
}); err != nil {
errChan <- err
close(outChan)
close(errChan)
return outChan, errChan
}
// 8) 启动流式聊天
go func() {
defer close(outChan)
fullText, err := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan)
if err != nil {
errChan <- err
fullText, streamErr := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan)
if streamErr != nil {
pushErrNonBlocking(errChan, streamErr)
return
}
// 9) 异步落助手回复
go func() {
bg := context.Background()
_ = s.agentCache.PushMessage(bg, chatID, &schema.Message{
Role: schema.Assistant,
Content: fullText,
})
if saveErr := s.repo.SaveChatHistory(bg, userID, chatID, "assistant", fullText); saveErr != nil {
log.Printf("failed to save chat history to database: %v", saveErr)
}
}()
// 9) 回答完成后,同步写 Redis并把数据库落库交给 outbox + Kafka
if cacheErr := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); cacheErr != nil {
log.Printf("failed to push assistant message into redis history: %v", cacheErr)
}
if saveErr := s.saveChatHistoryReliable(context.Background(), model.ChatHistoryPersistPayload{
UserID: userID,
ConversationID: chatID,
Role: "assistant",
Message: fullText,
}); saveErr != nil {
pushErrNonBlocking(errChan, saveErr)
}
}()
return outChan, errChan

View File

@@ -0,0 +1,214 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"time"
"github.com/LoveLosita/smartflow/backend/dao"
kafkabus "github.com/LoveLosita/smartflow/backend/kafka"
"github.com/LoveLosita/smartflow/backend/model"
segmentkafka "github.com/segmentio/kafka-go"
"gorm.io/gorm"
)
// AgentAsyncPipeline 负责 outbox 扫描、Kafka 投递与消费落库。
type AgentAsyncPipeline struct {
outboxRepo *dao.OutboxDAO
producer *kafkabus.Producer
consumer *kafkabus.Consumer
topic string
maxRetry int
scanEvery time.Duration
scanBatch int
}
func NewAgentAsyncPipeline(outboxRepo *dao.OutboxDAO, cfg kafkabus.Config) (*AgentAsyncPipeline, error) {
if !cfg.Enabled {
return nil, nil
}
producer, err := kafkabus.NewProducer(cfg)
if err != nil {
return nil, err
}
consumer, err := kafkabus.NewConsumer(cfg)
if err != nil {
_ = producer.Close()
return nil, err
}
return &AgentAsyncPipeline{
outboxRepo: outboxRepo,
producer: producer,
consumer: consumer,
topic: cfg.Topic,
maxRetry: cfg.MaxRetry,
scanEvery: cfg.RetryScanInterval,
scanBatch: cfg.RetryBatchSize,
}, nil
}
func (p *AgentAsyncPipeline) Start(ctx context.Context) {
if p == nil {
return
}
go p.startDispatchLoop(ctx)
go p.startConsumeLoop(ctx)
}
func (p *AgentAsyncPipeline) Close() {
if p == nil {
return
}
if err := p.producer.Close(); err != nil {
log.Printf("关闭 Kafka producer 失败: %v", err)
}
if err := p.consumer.Close(); err != nil {
log.Printf("关闭 Kafka consumer 失败: %v", err)
}
}
func (p *AgentAsyncPipeline) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error {
if p == nil {
return errors.New("Kafka 异步链路未初始化")
}
outboxID, err := p.outboxRepo.CreateChatHistoryMessage(ctx, p.topic, payload.ConversationID, payload, p.maxRetry)
if err != nil {
return err
}
if err = p.dispatchOne(context.Background(), outboxID); err != nil {
log.Printf("outbox 消息 %d 首次投递失败,等待扫描重试: %v", outboxID, err)
}
return nil
}
func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) {
ticker := time.NewTicker(p.scanEvery)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
pendingMessages, err := p.outboxRepo.ListDueMessages(ctx, p.scanBatch)
if err != nil {
log.Printf("扫描 outbox 失败: %v", err)
continue
}
for _, msg := range pendingMessages {
if err = p.dispatchOne(ctx, msg.ID); err != nil {
log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err)
}
}
}
}
}
func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) error {
outboxMsg, err := p.outboxRepo.GetByID(ctx, outboxID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}
if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead {
return nil
}
envelope := kafkabus.Envelope{
OutboxID: outboxMsg.ID,
BizType: outboxMsg.BizType,
Payload: json.RawMessage(outboxMsg.Payload),
}
raw, err := json.Marshal(envelope)
if err != nil {
markErr := p.outboxRepo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包裹失败: "+err.Error())
if markErr != nil {
log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr)
}
return err
}
if err = p.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil {
_ = p.outboxRepo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error())
return err
}
if err = p.outboxRepo.MarkPublished(ctx, outboxMsg.ID); err != nil {
_ = p.outboxRepo.MarkFailedForRetry(ctx, outboxMsg.ID, "更新已投递状态失败: "+err.Error())
return err
}
return nil
}
func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := p.consumer.Dequeue(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Printf("Kafka 消费拉取失败: %v", err)
time.Sleep(300 * time.Millisecond)
continue
}
if err = p.handleMessage(ctx, msg); err != nil {
log.Printf("处理 Kafka 消息失败(topic=%s, partition=%d, offset=%d): %v", msg.Topic, msg.Partition, msg.Offset, err)
}
}
}
func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka.Message) error {
var envelope kafkabus.Envelope
if err := json.Unmarshal(msg.Value, &envelope); err != nil {
_ = p.consumer.Commit(ctx, msg)
return fmt.Errorf("解析 Kafka 包裹失败: %w", err)
}
if envelope.OutboxID <= 0 {
_ = p.consumer.Commit(ctx, msg)
return errors.New("Kafka 包裹缺少 outbox_id")
}
switch envelope.BizType {
case model.OutboxBizTypeChatHistoryPersist:
return p.consumeChatHistory(ctx, msg, envelope)
default:
_ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType)
if err := p.consumer.Commit(ctx, msg); err != nil {
return err
}
return nil
}
}
func (p *AgentAsyncPipeline) consumeChatHistory(ctx context.Context, msg segmentkafka.Message, envelope kafkabus.Envelope) error {
var payload model.ChatHistoryPersistPayload
if err := json.Unmarshal(envelope.Payload, &payload); err != nil {
_ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+err.Error())
if commitErr := p.consumer.Commit(ctx, msg); commitErr != nil {
return commitErr
}
return nil
}
if err := p.outboxRepo.PersistChatHistoryAndMarkConsumed(ctx, envelope.OutboxID, payload); err != nil {
if markErr := p.outboxRepo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费并落库失败: "+err.Error()); markErr != nil {
return markErr
}
if commitErr := p.consumer.Commit(ctx, msg); commitErr != nil {
return commitErr
}
return err
}
return p.consumer.Commit(ctx, msg)
}

View File

@@ -1,32 +1,55 @@
services:
# MySQL 数据库服务
mysql:
image: mysql:8.0
container_name: SmartFlow-mysql
restart: always
environment:
MYSQL_ROOT_PASSWORD: root_password_123 # Root 用户密码
MYSQL_DATABASE: smartflow # 初始创建的数据库名
MYSQL_USER: smartflow_user # 业务用户
MYSQL_PASSWORD: smartflow_password_456 # 业务用户密码
MYSQL_ROOT_PASSWORD: root_password_123
MYSQL_DATABASE: smartflow
MYSQL_USER: smartflow_user
MYSQL_PASSWORD: smartflow_password_456
ports:
- "3306:3306"
volumes:
- ./docker/mysql/data:/var/lib/mysql # 数据持久化,防止容器删了数据丢失
command: --default-authentication-plugin=mysql_native_password # 确保 GORM 连接兼容性
- ./docker/mysql/data:/var/lib/mysql
command: --default-authentication-plugin=mysql_native_password
# Redis 缓存服务
redis:
image: redis:latest
container_name: redflow-redis
restart: always
command: redis-server --requirepass redis_password_789 # 设置 Redis 访问密码
command: redis-server --requirepass redis_password_789
ports:
- "6379:6379"
volumes:
- ./docker/redis/data:/data
# 定义持久化卷的本地路径
kafka:
image: apache/kafka:3.7.2
container_name: SmartFlow-kafka
restart: always
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_LISTENERS: INTERNAL://:9094,EXTERNAL://:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9094,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_NUM_PARTITIONS: 3
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- kafka_data:/var/lib/kafka/data
volumes:
mysql_data:
redis_data:
redis_data:
kafka_data:

View File

@@ -1,4 +1,4 @@
# =========================
# =========================
# MCP server metadata
# =========================
MCP_SERVER_NAME=smartflow-mcp-server
@@ -36,7 +36,7 @@ MYSQL_PARAMS=charset=utf8mb4&parseTime=true&loc=Local
# Example: MYSQL_ALLOWED_DATABASES=smartflow,analytics
MYSQL_ALLOWED_DATABASES=smartflow
# Example: MYSQL_ALLOWED_TABLES=smartflow.users,smartflow.tasks
MYSQL_ALLOWED_TABLES=smartflow.users,smartflow.tasks
MYSQL_ALLOWED_TABLES=
# =========================
# Redis

View File

@@ -54,7 +54,7 @@ cp .env.example .env
- `MYSQL_HOST` / `MYSQL_PORT` / `MYSQL_USER` / `MYSQL_PASSWORD` / `MYSQL_DATABASE`
- `REDIS_ADDR` / `REDIS_PASSWORD` / `REDIS_DB`
- `MYSQL_ALLOWED_DATABASES`:逗号分隔
- `MYSQL_ALLOWED_TABLES`:逗号分隔,支持 `db.table``table`
- `MYSQL_ALLOWED_TABLES`:逗号分隔,支持 `db.table``table`;留空表示允许所有表
- `MCP_ENFORCE_WHITELIST``true` 时无明确表引用会拒绝执行
- `MCP_TOOL_TIMEOUT_MS`:单次工具调用超时
- `MCP_RATE_LIMIT_RPS` + `MCP_RATE_LIMIT_BURST`:基础令牌桶限流
@@ -165,7 +165,7 @@ cp .env.example .env
"MYSQL_PASSWORD": "replace_me",
"MYSQL_DATABASE": "smartflow",
"MYSQL_ALLOWED_DATABASES": "smartflow",
"MYSQL_ALLOWED_TABLES": "smartflow.users,smartflow.tasks",
"MYSQL_ALLOWED_TABLES": "",
"REDIS_ADDR": "127.0.0.1:6379",
"REDIS_DB": "0",
"MCP_TOOL_TIMEOUT_MS": "5000",