diff --git a/AGENTS.md b/AGENTS.md index c1c1abf..2516b78 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -3,3 +3,4 @@ ## 协作偏好(逐条追加) 1. 默认语言规则:所有注释、接口文案、说明、评审反馈均使用中文。 +2. 请勤加注释,尤其是复杂逻辑部分,确保代码易于理解和维护。 diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 8830479..79d5581 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -1,12 +1,14 @@ package cmd import ( + "context" "fmt" "log" "github.com/LoveLosita/smartflow/backend/api" "github.com/LoveLosita/smartflow/backend/dao" "github.com/LoveLosita/smartflow/backend/inits" + kafkabus "github.com/LoveLosita/smartflow/backend/kafka" "github.com/LoveLosita/smartflow/backend/middleware" "github.com/LoveLosita/smartflow/backend/pkg" "github.com/LoveLosita/smartflow/backend/routers" @@ -42,16 +44,15 @@ func Start() { } rdb := inits.InitRedis() - //工具包 + // 工具包 limiter := pkg.NewRateLimiter(rdb) - //初始化eino + // 初始化 eino aiHub, err := inits.InitEino() if err != nil { log.Fatalf("Failed to initialize Eino: %v", err) } - //中间件 - - //dao 层 + // 中间件 + // dao 层 cacheRepo := dao.NewCacheDAO(rdb) agentCacheRepo := dao.NewAgentCache(rdb) _ = db.Use(middleware.NewGormCachePlugin(cacheRepo)) // 注册 GORM 插件 @@ -62,14 +63,29 @@ func Start() { scheduleRepo := dao.NewScheduleDAO(db) manager := dao.NewManager(db) agentRepo := dao.NewAgentDAO(db) - //service 层 + outboxRepo := dao.NewOutboxDAO(db) + // + kafkaCfg := kafkabus.LoadConfig() + asyncPipeline, err := service.NewAgentAsyncPipeline(outboxRepo, kafkaCfg) + if err != nil { + log.Fatalf("Failed to initialize Kafka async pipeline: %v", err) + } + if asyncPipeline != nil { + asyncPipeline.Start(context.Background()) + defer asyncPipeline.Close() + log.Println("Kafka async pipeline started") + } else { + log.Println("Kafka async pipeline is disabled") + } + + // service 层 userService := service.NewUserService(userRepo, cacheRepo) taskSv := service.NewTaskService(taskRepo, cacheRepo) courseService := service.NewCourseService(courseRepo, scheduleRepo) taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager) scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo) - agentService := service.NewAgentService(aiHub, agentRepo, agentCacheRepo) - //api 层 + agentService := service.NewAgentService(aiHub, agentRepo, agentCacheRepo, asyncPipeline) + // api 层 userApi := api.NewUserHandler(userService) taskApi := api.NewTaskHandler(taskSv) courseApi := api.NewCourseHandler(courseService) diff --git a/backend/config.example.yaml b/backend/config.example.yaml index a9fb485..3b99e15 100644 --- a/backend/config.example.yaml +++ b/backend/config.example.yaml @@ -33,6 +33,16 @@ redis: password: "" db: 0 +kafka: + enabled: true + brokers: + - "localhost:9092" + topic: "smartflow.agent.outbox" + groupID: "smartflow-agent-outbox-consumer" + retryScanInterval: 1s + retryBatchSize: 100 + maxRetry: 20 + time: zone: "Asia/Shanghai" semesterStartDate: "2026-03-02" #学期开始日期,一定要设定为周一,以便于计算周数 @@ -41,4 +51,4 @@ time: agent: workerModel: "doubao-seed-1-6-lite-251015" # 智能体使用的Worker模型,需根据实际情况调整 strategistModel: "deepseek-v3-2-251201" # 策略师使用的Worker模型,需根据实际情况调整 - baseURL: "https://ark.cn-beijing.volces.com/api/v3" # Worker服务的基础URL,需根据实际情况调整 \ No newline at end of file + baseURL: "https://ark.cn-beijing.volces.com/api/v3" # Worker服务的基础URL,需根据实际情况调整 diff --git a/backend/dao/outbox.go b/backend/dao/outbox.go new file mode 100644 index 0000000..433c781 --- /dev/null +++ b/backend/dao/outbox.go @@ -0,0 +1,190 @@ +package dao + +import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/LoveLosita/smartflow/backend/model" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type OutboxDAO struct { + db *gorm.DB +} + +func NewOutboxDAO(db *gorm.DB) *OutboxDAO { + return &OutboxDAO{db: db} +} + +func (d *OutboxDAO) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) { + if maxRetry <= 0 { + maxRetry = 20 + } + raw, err := json.Marshal(payload) + if err != nil { + return 0, err + } + now := time.Now() + msg := model.AgentOutboxMessage{ + BizType: model.OutboxBizTypeChatHistoryPersist, + Topic: topic, + MessageKey: messageKey, + Payload: string(raw), + Status: model.OutboxStatusPending, + RetryCount: 0, + MaxRetry: maxRetry, + NextRetryAt: &now, + } + if err = d.db.WithContext(ctx).Create(&msg).Error; err != nil { + return 0, err + } + return msg.ID, nil +} + +func (d *OutboxDAO) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMessage, error) { + var msg model.AgentOutboxMessage + if err := d.db.WithContext(ctx).Where("id = ?", id).First(&msg).Error; err != nil { + return nil, err + } + return &msg, nil +} + +func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) { + if limit <= 0 { + limit = 100 + } + now := time.Now() + var messages []model.AgentOutboxMessage + err := d.db.WithContext(ctx). + Where("status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?", model.OutboxStatusPending, now). + Order("next_retry_at ASC, id ASC"). + Limit(limit). + Find(&messages).Error + if err != nil { + return nil, err + } + return messages, nil +} + +// MarkPublished 仅在消息未进入最终态时更新为 published,避免覆盖 consumed/dead。 +func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error { + now := time.Now() + updates := map[string]interface{}{ + "status": model.OutboxStatusPublished, + "published_at": &now, + "last_error": nil, + "next_retry_at": nil, + } + result := d.db.WithContext(ctx). + Model(&model.AgentOutboxMessage{}). + Where("id = ? AND status NOT IN (?, ?)", id, model.OutboxStatusConsumed, model.OutboxStatusDead). + Updates(updates) + return result.Error +} + +func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error { + now := time.Now() + lastErr := truncateError(reason) + updates := map[string]interface{}{ + "status": model.OutboxStatusDead, + "last_error": &lastErr, + "next_retry_at": nil, + "updated_at": now, + } + return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error +} + +func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason string) error { + return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var msg model.AgentOutboxMessage + err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", id).First(&msg).Error + if err != nil { + return err + } + if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead { + return nil + } + + nextRetryCount := msg.RetryCount + 1 + now := time.Now() + status := model.OutboxStatusPending + var nextRetryAt *time.Time + if nextRetryCount >= msg.MaxRetry { + status = model.OutboxStatusDead + nextRetryAt = nil + } else { + t := now.Add(calcRetryBackoff(nextRetryCount)) + nextRetryAt = &t + } + lastErr := truncateError(reason) + updates := map[string]interface{}{ + "status": status, + "retry_count": nextRetryCount, + "last_error": &lastErr, + "next_retry_at": nextRetryAt, + "updated_at": now, + } + return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error + }) +} + +func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error { + return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + var outboxMsg model.AgentOutboxMessage + err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("id = ?", outboxID).First(&outboxMsg).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err + } + if outboxMsg.Status == model.OutboxStatusConsumed { + return nil + } + if outboxMsg.Status == model.OutboxStatusDead { + return nil + } + + chatMsg := payload.Message + chatRole := payload.Role + history := model.ChatHistory{ + UserID: payload.UserID, + ChatID: payload.ConversationID, + MessageContent: &chatMsg, + Role: &chatRole, + } + if err = tx.Create(&history).Error; err != nil { + return err + } + + now := time.Now() + updates := map[string]interface{}{ + "status": model.OutboxStatusConsumed, + "consumed_at": &now, + "last_error": nil, + "next_retry_at": nil, + "updated_at": now, + } + return tx.Model(&model.AgentOutboxMessage{}).Where("id = ?", outboxID).Updates(updates).Error + }) +} + +func calcRetryBackoff(retryCount int) time.Duration { + if retryCount <= 0 { + return time.Second + } + if retryCount > 6 { + retryCount = 6 + } + return time.Second * time.Duration(1<<(retryCount-1)) +} + +func truncateError(reason string) string { + if len(reason) <= 2000 { + return reason + } + return reason[:2000] +} diff --git a/backend/go.mod b/backend/go.mod index 4c32e52..ac768f2 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -45,6 +45,7 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.15.9 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -54,10 +55,12 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nikolalohinski/gonja v1.5.3 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/segmentio/kafka-go v0.4.47 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f // indirect github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect diff --git a/backend/go.sum b/backend/go.sum index c81a6a9..823db2b 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -127,6 +127,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -168,6 +170,8 @@ github.com/onsi/gomega v1.27.3 h1:5VwIwnBY3vbBDOJrNtA4rVdiTZCsq9B5F12pvy1Drmk= github.com/onsi/gomega v1.27.3/go.mod h1:5vG284IBtfDAmDyrK+eGyZmUgUlmi+Wngqo557cZ6Gw= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -183,6 +187,8 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f github.com/rollbar/rollbar-go v1.0.2/go.mod h1:AcFs5f0I+c71bpHlXNNDbOWJiKwjFDtISeXco0L5PKQ= github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= @@ -229,8 +235,12 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc= github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= @@ -239,6 +249,8 @@ golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c= golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -247,6 +259,8 @@ golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N0 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg= golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -254,33 +268,64 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0= golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/backend/kafka/config.go b/backend/kafka/config.go new file mode 100644 index 0000000..5bc1b77 --- /dev/null +++ b/backend/kafka/config.go @@ -0,0 +1,58 @@ +package kafka + +import ( + "strings" + "time" + + "github.com/spf13/viper" +) + +const ( + DefaultTopic = "smartflow.agent.outbox" + DefaultGroup = "smartflow-agent-outbox-consumer" +) + +type Config struct { + Enabled bool + Brokers []string + Topic string + GroupID string + RetryScanInterval time.Duration + RetryBatchSize int + MaxRetry int +} + +func LoadConfig() Config { + brokers := viper.GetStringSlice("kafka.brokers") + if len(brokers) == 0 { + single := strings.TrimSpace(viper.GetString("kafka.broker")) + if single != "" { + brokers = []string{single} + } + } + cfg := Config{ + Enabled: viper.GetBool("kafka.enabled"), + Brokers: brokers, + Topic: strings.TrimSpace(viper.GetString("kafka.topic")), + GroupID: strings.TrimSpace(viper.GetString("kafka.groupID")), + RetryScanInterval: viper.GetDuration("kafka.retryScanInterval"), + RetryBatchSize: viper.GetInt("kafka.retryBatchSize"), + MaxRetry: viper.GetInt("kafka.maxRetry"), + } + if cfg.Topic == "" { + cfg.Topic = DefaultTopic + } + if cfg.GroupID == "" { + cfg.GroupID = DefaultGroup + } + if cfg.RetryScanInterval <= 0 { + cfg.RetryScanInterval = time.Second + } + if cfg.RetryBatchSize <= 0 { + cfg.RetryBatchSize = 100 + } + if cfg.MaxRetry <= 0 { + cfg.MaxRetry = 20 + } + return cfg +} diff --git a/backend/kafka/consumer.go b/backend/kafka/consumer.go new file mode 100644 index 0000000..1211398 --- /dev/null +++ b/backend/kafka/consumer.go @@ -0,0 +1,50 @@ +package kafka + +import ( + "context" + "errors" + + segmentkafka "github.com/segmentio/kafka-go" +) + +type Consumer struct { + reader *segmentkafka.Reader +} + +func NewConsumer(cfg Config) (*Consumer, error) { + if len(cfg.Brokers) == 0 { + return nil, errors.New("kafka brokers 未配置") + } + reader := segmentkafka.NewReader(segmentkafka.ReaderConfig{ + Brokers: cfg.Brokers, + Topic: cfg.Topic, + GroupID: cfg.GroupID, + MinBytes: 1, + MaxBytes: 10e6, + CommitInterval: 0, + StartOffset: segmentkafka.FirstOffset, + }) + return &Consumer{reader: reader}, nil +} + +// Dequeue 从 Kafka 拉取一条消息(手动提交 offset)。 +func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) { + if c == nil || c.reader == nil { + return segmentkafka.Message{}, errors.New("kafka consumer 未初始化") + } + return c.reader.FetchMessage(ctx) +} + +func (c *Consumer) Commit(ctx context.Context, msg segmentkafka.Message) error { + if c == nil || c.reader == nil { + return errors.New("kafka consumer 未初始化") + } + return c.reader.CommitMessages(ctx, msg) +} + +func (c *Consumer) Close() error { + if c == nil || c.reader == nil { + return nil + } + return c.reader.Close() +} diff --git a/backend/kafka/envelope.go b/backend/kafka/envelope.go new file mode 100644 index 0000000..b7ef17c --- /dev/null +++ b/backend/kafka/envelope.go @@ -0,0 +1,10 @@ +package kafka + +import "encoding/json" + +// Envelope 是投递到 Kafka 的统一包裹结构。 +type Envelope struct { + OutboxID int64 `json:"outbox_id"` + BizType string `json:"biz_type"` + Payload json.RawMessage `json:"payload"` +} diff --git a/backend/kafka/producer.go b/backend/kafka/producer.go new file mode 100644 index 0000000..c587dc8 --- /dev/null +++ b/backend/kafka/producer.go @@ -0,0 +1,45 @@ +package kafka + +import ( + "context" + "errors" + + segmentkafka "github.com/segmentio/kafka-go" +) + +type Producer struct { + writer *segmentkafka.Writer +} + +func NewProducer(cfg Config) (*Producer, error) { + if len(cfg.Brokers) == 0 { + return nil, errors.New("kafka brokers 未配置") + } + writer := &segmentkafka.Writer{ + Addr: segmentkafka.TCP(cfg.Brokers...), + Balancer: &segmentkafka.Hash{}, + RequiredAcks: segmentkafka.RequireOne, + Async: false, + } + return &Producer{writer: writer}, nil +} + +// Enqueue 将消息写入 Kafka。 +func (p *Producer) Enqueue(ctx context.Context, topic, key string, value []byte) error { + if p == nil || p.writer == nil { + return errors.New("kafka producer 未初始化") + } + msg := segmentkafka.Message{ + Topic: topic, + Key: []byte(key), + Value: value, + } + return p.writer.WriteMessages(ctx, msg) +} + +func (p *Producer) Close() error { + if p == nil || p.writer == nil { + return nil + } + return p.writer.Close() +} diff --git a/backend/middleware/cache_deleter.go b/backend/middleware/cache_deleter.go index 1dd67dc..5296a4f 100644 --- a/backend/middleware/cache_deleter.go +++ b/backend/middleware/cache_deleter.go @@ -62,6 +62,8 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}, db *gorm.DB) p.invalidTaskClassCache(*m.UserID) case model.Task: p.invalidTaskCache(m.UserID) + case model.AgentOutboxMessage, model.ChatHistory, model.AgentChat: + // 这些模型目前没有定义缓存逻辑,先不处理 default: // 只有真正没定义的模型才会到这里 log.Printf("[GORM-Cache] No logic defined for model: %T", modelObj) diff --git a/backend/model/outbox.go b/backend/model/outbox.go new file mode 100644 index 0000000..9796598 --- /dev/null +++ b/backend/model/outbox.go @@ -0,0 +1,42 @@ +package model + +import "time" + +const ( + OutboxStatusPending = "pending" + OutboxStatusPublished = "published" + OutboxStatusConsumed = "consumed" + OutboxStatusDead = "dead" + + OutboxBizTypeChatHistoryPersist = "chat_history_persist" +) + +// AgentOutboxMessage 保存需要异步投递到 Kafka 的消息。 +type AgentOutboxMessage struct { + ID int64 `gorm:"column:id;primaryKey;autoIncrement"` + BizType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:业务类型"` + Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"` + MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"` + Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"` + Status string `gorm:"column:status;type:varchar(32);not null;index:idx_outbox_status_next,priority:1;comment:pending/published/consumed/dead"` + RetryCount int `gorm:"column:retry_count;not null;default:0;comment:已重试次数"` + MaxRetry int `gorm:"column:max_retry;not null;default:20;comment:最大重试次数"` + NextRetryAt *time.Time `gorm:"column:next_retry_at;index:idx_outbox_status_next,priority:2;comment:下次重试时间"` + LastError *string `gorm:"column:last_error;type:text;comment:最后一次错误"` + PublishedAt *time.Time `gorm:"column:published_at;comment:投递到 Kafka 时间"` + ConsumedAt *time.Time `gorm:"column:consumed_at;comment:消费完成时间"` + CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"` + UpdatedAt *time.Time `gorm:"column:updated_at;autoUpdateTime"` +} + +func (AgentOutboxMessage) TableName() string { + return "agent_outbox_messages" +} + +// ChatHistoryPersistPayload 是“聊天记录持久化”消息体。 +type ChatHistoryPersistPayload struct { + UserID int `json:"user_id"` + ConversationID string `json:"conversation_id"` + Role string `json:"role"` + Message string `json:"message"` +} diff --git a/backend/service/agent.go b/backend/service/agent.go index 78b6317..6cd4dfb 100644 --- a/backend/service/agent.go +++ b/backend/service/agent.go @@ -9,6 +9,7 @@ import ( "github.com/LoveLosita/smartflow/backend/conv" "github.com/LoveLosita/smartflow/backend/dao" "github.com/LoveLosita/smartflow/backend/inits" + "github.com/LoveLosita/smartflow/backend/model" "github.com/LoveLosita/smartflow/backend/pkg" "github.com/cloudwego/eino-ext/components/model/ark" "github.com/cloudwego/eino/schema" @@ -16,16 +17,18 @@ import ( ) type AgentService struct { - AIHub *inits.AIHub - repo *dao.AgentDAO - agentCache *dao.AgentCache + AIHub *inits.AIHub + repo *dao.AgentDAO + agentCache *dao.AgentCache + asyncPipeline *AgentAsyncPipeline } -func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, agentRedis *dao.AgentCache) *AgentService { +func NewAgentService(aiHub *inits.AIHub, repo *dao.AgentDAO, agentRedis *dao.AgentCache, asyncPipeline *AgentAsyncPipeline) *AgentService { return &AgentService{ - AIHub: aiHub, - repo: repo, - agentCache: agentRedis, + AIHub: aiHub, + repo: repo, + agentCache: agentRedis, + asyncPipeline: asyncPipeline, } } @@ -38,19 +41,34 @@ func normalizeConversationID(chatID string) string { } func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, string) { - model := strings.TrimSpace(requestModel) - if strings.EqualFold(model, "strategist") { + modelName := strings.TrimSpace(requestModel) + if strings.EqualFold(modelName, "strategist") { return s.AIHub.Strategist, "strategist" } return s.AIHub.Worker, "worker" } +func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error { + if s.asyncPipeline == nil { + return s.repo.SaveChatHistory(ctx, payload.UserID, payload.ConversationID, payload.Role, payload.Message) + } + return s.asyncPipeline.EnqueueChatHistoryPersist(ctx, payload) +} + +func pushErrNonBlocking(errChan chan error, err error) { + select { + case errChan <- err: + default: + log.Printf("error channel is full, drop error: %v", err) + } +} + func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string) (<-chan string, <-chan error) { // 1) 准备输出通道 outChan := make(chan string, 5) errChan := make(chan error, 1) - // 2) 规范化会话并选择模型 + // 2) 规范会话并选择模型 chatID = normalizeConversationID(chatID) selectedModel, resolvedModelName := s.pickChatModel(modelName) @@ -63,9 +81,9 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } if !result { - innerResult, err := s.repo.IfChatExists(ctx, userID, chatID) - if err != nil { - errChan <- err + innerResult, ifErr := s.repo.IfChatExists(ctx, userID, chatID) + if ifErr != nil { + errChan <- ifErr close(outChan) close(errChan) return outChan, errChan @@ -95,9 +113,9 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin cacheMiss := false if chatHistory == nil { cacheMiss = true - histories, err := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel(resolvedModelName), chatID) - if err != nil { - errChan <- err + histories, hisErr := s.repo.GetUserChatHistories(ctx, userID, pkg.HistoryFetchLimitByModel(resolvedModelName), chatID) + if hisErr != nil { + errChan <- hisErr close(outChan) close(errChan) return outChan, errChan @@ -112,10 +130,10 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin // 6) 根据最新裁剪结果动态调整 Redis 会话窗口 targetWindow := pkg.CalcSessionWindowSize(len(chatHistory)) - if err := s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil { + if err = s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil { log.Printf("failed to set history window for %s: %v", chatID, err) } - if err := s.agentCache.EnforceHistoryWindow(ctx, chatID); err != nil { + if err = s.agentCache.EnforceHistoryWindow(ctx, chatID); err != nil { log.Printf("failed to enforce history window for %s: %v", chatID, err) } @@ -126,7 +144,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin // 缓存未命中时,把“裁剪后的历史”回填进缓存 if cacheMiss { - if err := s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil { + if err = s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil { errChan <- err close(outChan) close(errChan) @@ -134,37 +152,44 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin } } - // 7) 异步落用户消息(先写缓存再写库) - go func() { - bg := context.Background() - _ = s.agentCache.PushMessage(bg, chatID, &schema.Message{ - Role: schema.User, - Content: userMessage, - }) - _ = s.repo.SaveChatHistory(bg, userID, chatID, "user", userMessage) - }() + // 7) 先同步写 Redis,再把持久化请求交给 outbox + Kafka + if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil { + log.Printf("failed to push user message into redis history: %v", err) + } + if err = s.saveChatHistoryReliable(ctx, model.ChatHistoryPersistPayload{ + UserID: userID, + ConversationID: chatID, + Role: "user", + Message: userMessage, + }); err != nil { + errChan <- err + close(outChan) + close(errChan) + return outChan, errChan + } // 8) 启动流式聊天 go func() { defer close(outChan) - fullText, err := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan) - if err != nil { - errChan <- err + fullText, streamErr := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan) + if streamErr != nil { + pushErrNonBlocking(errChan, streamErr) return } - // 9) 异步落助手回复 - go func() { - bg := context.Background() - _ = s.agentCache.PushMessage(bg, chatID, &schema.Message{ - Role: schema.Assistant, - Content: fullText, - }) - if saveErr := s.repo.SaveChatHistory(bg, userID, chatID, "assistant", fullText); saveErr != nil { - log.Printf("failed to save chat history to database: %v", saveErr) - } - }() + // 9) 回答完成后,同步写 Redis,并把数据库落库交给 outbox + Kafka + if cacheErr := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); cacheErr != nil { + log.Printf("failed to push assistant message into redis history: %v", cacheErr) + } + if saveErr := s.saveChatHistoryReliable(context.Background(), model.ChatHistoryPersistPayload{ + UserID: userID, + ConversationID: chatID, + Role: "assistant", + Message: fullText, + }); saveErr != nil { + pushErrNonBlocking(errChan, saveErr) + } }() return outChan, errChan diff --git a/backend/service/agent_async_pipeline.go b/backend/service/agent_async_pipeline.go new file mode 100644 index 0000000..8da6e5a --- /dev/null +++ b/backend/service/agent_async_pipeline.go @@ -0,0 +1,214 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "time" + + "github.com/LoveLosita/smartflow/backend/dao" + kafkabus "github.com/LoveLosita/smartflow/backend/kafka" + "github.com/LoveLosita/smartflow/backend/model" + segmentkafka "github.com/segmentio/kafka-go" + "gorm.io/gorm" +) + +// AgentAsyncPipeline 负责 outbox 扫描、Kafka 投递与消费落库。 +type AgentAsyncPipeline struct { + outboxRepo *dao.OutboxDAO + producer *kafkabus.Producer + consumer *kafkabus.Consumer + topic string + maxRetry int + scanEvery time.Duration + scanBatch int +} + +func NewAgentAsyncPipeline(outboxRepo *dao.OutboxDAO, cfg kafkabus.Config) (*AgentAsyncPipeline, error) { + if !cfg.Enabled { + return nil, nil + } + producer, err := kafkabus.NewProducer(cfg) + if err != nil { + return nil, err + } + consumer, err := kafkabus.NewConsumer(cfg) + if err != nil { + _ = producer.Close() + return nil, err + } + return &AgentAsyncPipeline{ + outboxRepo: outboxRepo, + producer: producer, + consumer: consumer, + topic: cfg.Topic, + maxRetry: cfg.MaxRetry, + scanEvery: cfg.RetryScanInterval, + scanBatch: cfg.RetryBatchSize, + }, nil +} + +func (p *AgentAsyncPipeline) Start(ctx context.Context) { + if p == nil { + return + } + go p.startDispatchLoop(ctx) + go p.startConsumeLoop(ctx) +} + +func (p *AgentAsyncPipeline) Close() { + if p == nil { + return + } + if err := p.producer.Close(); err != nil { + log.Printf("关闭 Kafka producer 失败: %v", err) + } + if err := p.consumer.Close(); err != nil { + log.Printf("关闭 Kafka consumer 失败: %v", err) + } +} + +func (p *AgentAsyncPipeline) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error { + if p == nil { + return errors.New("Kafka 异步链路未初始化") + } + outboxID, err := p.outboxRepo.CreateChatHistoryMessage(ctx, p.topic, payload.ConversationID, payload, p.maxRetry) + if err != nil { + return err + } + if err = p.dispatchOne(context.Background(), outboxID); err != nil { + log.Printf("outbox 消息 %d 首次投递失败,等待扫描重试: %v", outboxID, err) + } + return nil +} + +func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) { + ticker := time.NewTicker(p.scanEvery) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pendingMessages, err := p.outboxRepo.ListDueMessages(ctx, p.scanBatch) + if err != nil { + log.Printf("扫描 outbox 失败: %v", err) + continue + } + for _, msg := range pendingMessages { + if err = p.dispatchOne(ctx, msg.ID); err != nil { + log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err) + } + } + } + } +} + +func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) error { + outboxMsg, err := p.outboxRepo.GetByID(ctx, outboxID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err + } + if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead { + return nil + } + + envelope := kafkabus.Envelope{ + OutboxID: outboxMsg.ID, + BizType: outboxMsg.BizType, + Payload: json.RawMessage(outboxMsg.Payload), + } + raw, err := json.Marshal(envelope) + if err != nil { + markErr := p.outboxRepo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包裹失败: "+err.Error()) + if markErr != nil { + log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) + } + return err + } + + if err = p.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil { + _ = p.outboxRepo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error()) + return err + } + if err = p.outboxRepo.MarkPublished(ctx, outboxMsg.ID); err != nil { + _ = p.outboxRepo.MarkFailedForRetry(ctx, outboxMsg.ID, "更新已投递状态失败: "+err.Error()) + return err + } + return nil +} + +func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + } + + msg, err := p.consumer.Dequeue(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.Printf("Kafka 消费拉取失败: %v", err) + time.Sleep(300 * time.Millisecond) + continue + } + if err = p.handleMessage(ctx, msg); err != nil { + log.Printf("处理 Kafka 消息失败(topic=%s, partition=%d, offset=%d): %v", msg.Topic, msg.Partition, msg.Offset, err) + } + } +} + +func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka.Message) error { + var envelope kafkabus.Envelope + if err := json.Unmarshal(msg.Value, &envelope); err != nil { + _ = p.consumer.Commit(ctx, msg) + return fmt.Errorf("解析 Kafka 包裹失败: %w", err) + } + if envelope.OutboxID <= 0 { + _ = p.consumer.Commit(ctx, msg) + return errors.New("Kafka 包裹缺少 outbox_id") + } + + switch envelope.BizType { + case model.OutboxBizTypeChatHistoryPersist: + return p.consumeChatHistory(ctx, msg, envelope) + default: + _ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType) + if err := p.consumer.Commit(ctx, msg); err != nil { + return err + } + return nil + } +} + +func (p *AgentAsyncPipeline) consumeChatHistory(ctx context.Context, msg segmentkafka.Message, envelope kafkabus.Envelope) error { + var payload model.ChatHistoryPersistPayload + if err := json.Unmarshal(envelope.Payload, &payload); err != nil { + _ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "解析聊天持久化载荷失败: "+err.Error()) + if commitErr := p.consumer.Commit(ctx, msg); commitErr != nil { + return commitErr + } + return nil + } + + if err := p.outboxRepo.PersistChatHistoryAndMarkConsumed(ctx, envelope.OutboxID, payload); err != nil { + if markErr := p.outboxRepo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费并落库失败: "+err.Error()); markErr != nil { + return markErr + } + if commitErr := p.consumer.Commit(ctx, msg); commitErr != nil { + return commitErr + } + return err + } + + return p.consumer.Commit(ctx, msg) +} diff --git a/docker-compose.yml b/docker-compose.yml index 308ab71..38446b5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,32 +1,55 @@ services: - # MySQL 数据库服务 mysql: image: mysql:8.0 container_name: SmartFlow-mysql restart: always environment: - MYSQL_ROOT_PASSWORD: root_password_123 # Root 用户密码 - MYSQL_DATABASE: smartflow # 初始创建的数据库名 - MYSQL_USER: smartflow_user # 业务用户 - MYSQL_PASSWORD: smartflow_password_456 # 业务用户密码 + MYSQL_ROOT_PASSWORD: root_password_123 + MYSQL_DATABASE: smartflow + MYSQL_USER: smartflow_user + MYSQL_PASSWORD: smartflow_password_456 ports: - "3306:3306" volumes: - - ./docker/mysql/data:/var/lib/mysql # 数据持久化,防止容器删了数据丢失 - command: --default-authentication-plugin=mysql_native_password # 确保 GORM 连接兼容性 + - ./docker/mysql/data:/var/lib/mysql + command: --default-authentication-plugin=mysql_native_password - # Redis 缓存服务 redis: image: redis:latest container_name: redflow-redis restart: always - command: redis-server --requirepass redis_password_789 # 设置 Redis 访问密码 + command: redis-server --requirepass redis_password_789 ports: - "6379:6379" volumes: - ./docker/redis/data:/data -# 定义持久化卷的本地路径 + kafka: + image: apache/kafka:3.7.2 + container_name: SmartFlow-kafka + restart: always + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk + KAFKA_LISTENERS: INTERNAL://:9094,EXTERNAL://:9092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9094,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_NUM_PARTITIONS: 3 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_DIRS: /var/lib/kafka/data + volumes: + - kafka_data:/var/lib/kafka/data + volumes: mysql_data: - redis_data: \ No newline at end of file + redis_data: + kafka_data: \ No newline at end of file diff --git a/infra/smartflow-mcp-server/.env.example b/infra/smartflow-mcp-server/.env.example index 7cbc8d9..79dc0da 100644 --- a/infra/smartflow-mcp-server/.env.example +++ b/infra/smartflow-mcp-server/.env.example @@ -1,4 +1,4 @@ -# ========================= +# ========================= # MCP server metadata # ========================= MCP_SERVER_NAME=smartflow-mcp-server @@ -36,7 +36,7 @@ MYSQL_PARAMS=charset=utf8mb4&parseTime=true&loc=Local # Example: MYSQL_ALLOWED_DATABASES=smartflow,analytics MYSQL_ALLOWED_DATABASES=smartflow # Example: MYSQL_ALLOWED_TABLES=smartflow.users,smartflow.tasks -MYSQL_ALLOWED_TABLES=smartflow.users,smartflow.tasks +MYSQL_ALLOWED_TABLES= # ========================= # Redis diff --git a/infra/smartflow-mcp-server/README.md b/infra/smartflow-mcp-server/README.md index 3fb6a64..79fce37 100644 --- a/infra/smartflow-mcp-server/README.md +++ b/infra/smartflow-mcp-server/README.md @@ -54,7 +54,7 @@ cp .env.example .env - `MYSQL_HOST` / `MYSQL_PORT` / `MYSQL_USER` / `MYSQL_PASSWORD` / `MYSQL_DATABASE` - `REDIS_ADDR` / `REDIS_PASSWORD` / `REDIS_DB` - `MYSQL_ALLOWED_DATABASES`:逗号分隔 -- `MYSQL_ALLOWED_TABLES`:逗号分隔,支持 `db.table` 或 `table` +- `MYSQL_ALLOWED_TABLES`:逗号分隔,支持 `db.table` 或 `table`;留空表示允许所有表 - `MCP_ENFORCE_WHITELIST`:`true` 时无明确表引用会拒绝执行 - `MCP_TOOL_TIMEOUT_MS`:单次工具调用超时 - `MCP_RATE_LIMIT_RPS` + `MCP_RATE_LIMIT_BURST`:基础令牌桶限流 @@ -165,7 +165,7 @@ cp .env.example .env "MYSQL_PASSWORD": "replace_me", "MYSQL_DATABASE": "smartflow", "MYSQL_ALLOWED_DATABASES": "smartflow", - "MYSQL_ALLOWED_TABLES": "smartflow.users,smartflow.tasks", + "MYSQL_ALLOWED_TABLES": "", "REDIS_ADDR": "127.0.0.1:6379", "REDIS_DB": "0", "MCP_TOOL_TIMEOUT_MS": "5000",