From 959049db425deac5ca790315ceef42fcc8832d99 Mon Sep 17 00:00:00 2001 From: Losita <2810873701@qq.com> Date: Mon, 9 Mar 2026 23:25:25 +0800 Subject: [PATCH] =?UTF-8?q?Version:=200.4.9.dev.260309=20feat:=20?= =?UTF-8?q?=F0=9F=97=84=EF=B8=8F=20=E6=96=B0=E5=A2=9E=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=BB=BA=E8=A1=A8=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 新增项目启动时自动建表能力,减少手动初始化数据库步骤 * 解决 `agent_chat` 与 `chat_history` 结构体互相持有对方结构体用于 `preload` 导致的循环依赖问题 * 修复因结构体互相依赖引发的建表失败问题,保证数据库初始化流程稳定 feat: 🐳 Docker Compose 引入 Kafka 分区自动初始化 * 更新 `docker-compose` 配置,引入 Kafka partition 自动初始化脚本 * 保证服务启动后 Topic 即具备可用 partition,实现开箱即用 * 修复转移环境后 MySQL 等容器数据无法持久化的问题,统一改为使用命名卷进行数据持久化 docs: 📚 补充 Outbox + Kafka 持久化链路注释 * 为 Outbox + Kafka 消息持久化链路补充详细代码注释 * 提升异步消息链路的可读性与维护性 * 当前代码 Review 进度约 50% undo: ⚠️ Kafka 初始化阶段出现消息短暂堆积 * 初次初始化项目时观察到消息在 Kafka 中短暂堆积现象 * 后续被消费者一次性消费且未再次复现 * 已在生产者启动、消费者启动以及消息消费流程中增加控制台日志输出,降低系统黑箱程度 * 后续若条件允许将进一步排查该现象的触发原因 --- .gitignore | 3 +- backend/cmd/start.go | 31 +++++----- backend/dao/outbox.go | 28 ++++++++- backend/go.mod | 21 +++---- backend/go.sum | 32 +++++----- backend/inits/mysql.go | 40 ++++++++++--- backend/kafka/admin.go | 78 +++++++++++++++++++++++++ backend/kafka/config.go | 13 +++-- backend/kafka/consumer.go | 16 +++-- backend/kafka/envelope.go | 6 +- backend/kafka/producer.go | 9 ++- backend/middleware/cache_deleter.go | 2 +- backend/model/agent.go | 46 +++++++-------- backend/model/outbox.go | 37 ++++++++---- backend/service/agent.go | 23 ++++---- backend/service/agent_async_pipeline.go | 52 +++++++++++++++-- docker-compose.yml | 59 ++++++++++++++----- 17 files changed, 363 insertions(+), 133 deletions(-) create mode 100644 backend/kafka/admin.go diff --git a/.gitignore b/.gitignore index fee03ce..72b6cbe 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,5 @@ backend/config.yaml # 5. IDE 与系统文件 .idea/ .vscode/ -.DS_Store # Mac 用户必加 \ No newline at end of file +.DS_Store # Mac 用户必加 +.gocache/ \ No newline at end of file diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 79d5581..534c893 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -16,14 +16,11 @@ import ( "github.com/spf13/viper" ) -// loadConfig 加载配置 -// 从配置文件中读取配置信息 +// loadConfig 加载应用配置。 func loadConfig() error { - // 设置配置文件路径 viper.SetConfigName("config") viper.SetConfigType("yaml") viper.AddConfigPath(".") - // 读取配置文件 if err := viper.ReadInConfig(); err != nil { return fmt.Errorf("failed to read config file: %w", err) } @@ -31,31 +28,29 @@ func loadConfig() error { return nil } -// Start 启动函数 +// Start 是应用启动入口。 func Start() { - // 加载配置 if err := loadConfig(); err != nil { log.Fatalf("Failed to load config: %v", err) } - // 初始化数据库 + db, err := inits.ConnectDB() if err != nil { log.Fatalf("Failed to connect to database: %v", err) } rdb := inits.InitRedis() - // 工具包 limiter := pkg.NewRateLimiter(rdb) - // 初始化 eino + aiHub, err := inits.InitEino() if err != nil { log.Fatalf("Failed to initialize Eino: %v", err) } - // 中间件 - // dao 层 + + // DAO 层初始化。 cacheRepo := dao.NewCacheDAO(rdb) agentCacheRepo := dao.NewAgentCache(rdb) - _ = db.Use(middleware.NewGormCachePlugin(cacheRepo)) // 注册 GORM 插件 + _ = db.Use(middleware.NewGormCachePlugin(cacheRepo)) userRepo := dao.NewUserDAO(db) taskRepo := dao.NewTaskDAO(db) courseRepo := dao.NewCourseDAO(db) @@ -64,7 +59,11 @@ func Start() { manager := dao.NewManager(db) agentRepo := dao.NewAgentDAO(db) outboxRepo := dao.NewOutboxDAO(db) - // + + // outbox 异步链路接线: + // - 读取 Kafka 配置 + // - 初始化 producer/consumer + // - 启动 dispatch/consume 两个后台循环 kafkaCfg := kafkabus.LoadConfig() asyncPipeline, err := service.NewAgentAsyncPipeline(outboxRepo, kafkaCfg) if err != nil { @@ -78,14 +77,15 @@ func Start() { log.Println("Kafka async pipeline is disabled") } - // service 层 + // Service 层初始化。 userService := service.NewUserService(userRepo, cacheRepo) taskSv := service.NewTaskService(taskRepo, cacheRepo) courseService := service.NewCourseService(courseRepo, scheduleRepo) taskClassService := service.NewTaskClassService(taskClassRepo, cacheRepo, scheduleRepo, manager) scheduleService := service.NewScheduleService(scheduleRepo, userRepo, taskClassRepo, manager, cacheRepo) agentService := service.NewAgentService(aiHub, agentRepo, agentCacheRepo, asyncPipeline) - // api 层 + + // API 层初始化。 userApi := api.NewUserHandler(userService) taskApi := api.NewTaskHandler(taskSv) courseApi := api.NewCourseHandler(courseService) @@ -100,6 +100,7 @@ func Start() { ScheduleHandler: scheduleApi, AgentHandler: agentApi, } + r := routers.RegisterRouters(handlers, cacheRepo, limiter) routers.StartEngine(r) } diff --git a/backend/dao/outbox.go b/backend/dao/outbox.go index 433c781..7553192 100644 --- a/backend/dao/outbox.go +++ b/backend/dao/outbox.go @@ -11,6 +11,11 @@ import ( "gorm.io/gorm/clause" ) +// OutboxDAO 封装 outbox 表读写逻辑。 +// outbox 状态机约定: +// pending -> published -> consumed(成功终态) +// pending/published -> pending(失败重试) +// pending/published -> dead(不可恢复或达到最大重试) type OutboxDAO struct { db *gorm.DB } @@ -19,6 +24,11 @@ func NewOutboxDAO(db *gorm.DB) *OutboxDAO { return &OutboxDAO{db: db} } +// CreateChatHistoryMessage 创建“聊天记录持久化”的 outbox 消息。 +// 关键点: +// 1) 初始状态为 pending; +// 2) NextRetryAt=now,允许被“首次同步投递”或“扫描器”立即处理; +// 3) payload 以 JSON 形式落表,保证消费端可重放。 func (d *OutboxDAO) CreateChatHistoryMessage(ctx context.Context, topic, messageKey string, payload model.ChatHistoryPersistPayload, maxRetry int) (int64, error) { if maxRetry <= 0 { maxRetry = 20 @@ -52,6 +62,8 @@ func (d *OutboxDAO) GetByID(ctx context.Context, id int64) (*model.AgentOutboxMe return &msg, nil } +// ListDueMessages 查询“到期可重试”的 pending 消息。 +// 查询条件:status=pending 且 next_retry_at<=当前时间。 func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.AgentOutboxMessage, error) { if limit <= 0 { limit = 100 @@ -69,7 +81,10 @@ func (d *OutboxDAO) ListDueMessages(ctx context.Context, limit int) ([]model.Age return messages, nil } -// MarkPublished 仅在消息未进入最终态时更新为 published,避免覆盖 consumed/dead。 +// MarkPublished 将消息标记为“已写入 Kafka”。 +// 注意: +// 1) 仅在非终态(非 consumed/dead)下更新,避免覆盖最终状态; +// 2) 清理 next_retry_at,避免已投递消息继续被扫描器重复拉取。 func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error { now := time.Now() updates := map[string]interface{}{ @@ -85,6 +100,7 @@ func (d *OutboxDAO) MarkPublished(ctx context.Context, id int64) error { return result.Error } +// MarkDead 将消息置为死信终态。 func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error { now := time.Now() lastErr := truncateError(reason) @@ -97,6 +113,11 @@ func (d *OutboxDAO) MarkDead(ctx context.Context, id int64, reason string) error return d.db.WithContext(ctx).Model(&model.AgentOutboxMessage{}).Where("id = ?", id).Updates(updates).Error } +// MarkFailedForRetry 在失败时推进重试状态。 +// 关键点: +// 1) 事务 + FOR UPDATE 防并发覆盖(尤其是 dispatch/consume 并发场景); +// 2) retry_count 自增; +// 3) 达到 max_retry 后转 dead,否则按指数退避设置 next_retry_at。 func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason string) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var msg model.AgentOutboxMessage @@ -104,6 +125,7 @@ func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason str if err != nil { return err } + // 终态直接跳过,保持幂等。 if msg.Status == model.OutboxStatusConsumed || msg.Status == model.OutboxStatusDead { return nil } @@ -131,6 +153,8 @@ func (d *OutboxDAO) MarkFailedForRetry(ctx context.Context, id int64, reason str }) } +// PersistChatHistoryAndMarkConsumed 执行“消费业务”并回写 consumed。 +// 这里把“写 chat_histories”与“更新 outbox 状态”放进同一事务,保证原子性。 func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outboxID int64, payload model.ChatHistoryPersistPayload) error { return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var outboxMsg model.AgentOutboxMessage @@ -141,6 +165,7 @@ func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outbo } return err } + // 幂等保护:重复消费不重复落库。 if outboxMsg.Status == model.OutboxStatusConsumed { return nil } @@ -172,6 +197,7 @@ func (d *OutboxDAO) PersistChatHistoryAndMarkConsumed(ctx context.Context, outbo }) } +// calcRetryBackoff 指数退避(上限 2^5=32 秒)。 func calcRetryBackoff(retryCount int) time.Duration { if retryCount <= 0 { return time.Second diff --git a/backend/go.mod b/backend/go.mod index ac768f2..8191581 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -1,15 +1,17 @@ module github.com/LoveLosita/smartflow/backend -go 1.23.4 +go 1.24.0 require ( + github.com/cloudwego/eino v0.7.13 github.com/cloudwego/eino-ext/components/model/ark v0.1.64 - github.com/cloudwego/eino-ext/components/model/openai v0.1.8 github.com/gin-gonic/gin v1.11.0 github.com/go-redis/redis/v8 v8.11.5 github.com/golang-jwt/jwt/v4 v4.5.2 github.com/google/uuid v1.6.0 + github.com/segmentio/kafka-go v0.4.47 github.com/spf13/viper v1.21.0 + github.com/volcengine/volcengine-go-sdk v1.2.9 golang.org/x/crypto v0.40.0 gorm.io/driver/mysql v1.6.0 gorm.io/gorm v1.31.1 @@ -20,16 +22,14 @@ require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/bytedance/gopkg v0.1.3 // indirect - github.com/bytedance/sonic v1.14.1 // indirect - github.com/bytedance/sonic/loader v0.3.0 // indirect + github.com/bytedance/mockey v1.3.0 // indirect + github.com/bytedance/sonic v1.15.0 // indirect + github.com/bytedance/sonic/loader v0.5.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cloudwego/base64x v0.1.6 // indirect - github.com/cloudwego/eino v0.7.13 // indirect - github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/eino-contrib/jsonschema v1.0.3 // indirect - github.com/evanphx/json-patch v0.5.2 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gin-contrib/sse v1.1.0 // indirect @@ -50,7 +50,6 @@ require ( github.com/leodido/go-urn v1.4.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/meguminnnnnnnnn/go-openai v0.1.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nikolalohinski/gonja v1.5.3 // indirect @@ -60,7 +59,6 @@ require ( github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect github.com/sagikazarmark/locafero v0.11.0 // indirect - github.com/segmentio/kafka-go v0.4.47 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/slongfield/pyfmt v0.0.0-20220222012616-ea85ff4c361f // indirect github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect @@ -71,17 +69,16 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.0 // indirect github.com/volcengine/volc-sdk-golang v1.0.23 // indirect - github.com/volcengine/volcengine-go-sdk v1.2.9 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/yargevad/filepathx v1.0.0 // indirect go.uber.org/mock v0.5.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/arch v0.20.0 // indirect + golang.org/x/arch v0.24.0 // indirect golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect golang.org/x/mod v0.26.0 // indirect golang.org/x/net v0.42.0 // indirect golang.org/x/sync v0.16.0 // indirect - golang.org/x/sys v0.35.0 // indirect + golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.28.0 // indirect golang.org/x/tools v0.35.0 // indirect google.golang.org/protobuf v1.36.9 // indirect diff --git a/backend/go.sum b/backend/go.sum index 823db2b..c7c7f7f 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -16,10 +16,10 @@ github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/mockey v1.3.0 h1:ONLRdvhqmCfr9rTasUB8ZKCfvbdD2tohOg4u+4Q/ed0= github.com/bytedance/mockey v1.3.0/go.mod h1:1BPHF9sol5R1ud/+0VEHGQq/+i2lN+GTsr3O2Q9IENY= -github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w= -github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc= -github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA= -github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE= +github.com/bytedance/sonic v1.15.0/go.mod h1:tFkWrPz0/CUCLEF4ri4UkHekCIcdnkqXw9VduqpJh0k= +github.com/bytedance/sonic/loader v0.5.0 h1:gXH3KVnatgY7loH5/TkeVyXPfESoqSBSBEiDd5VjlgE= +github.com/bytedance/sonic/loader v0.5.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/certifi/gocertifi v0.0.0-20190105021004-abcd57078448/go.mod h1:GJKEexRPVJrBSOjoqN5VNOIKJ5Q3RViH6eu3puDRwx4= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -31,10 +31,6 @@ github.com/cloudwego/eino v0.7.13 h1:Ku7hY+83gGJJjf4On3UgqjC57UcA+DXe0tqAZiNDDew github.com/cloudwego/eino v0.7.13/go.mod h1:nA8Vacmuqv3pqKBQbTWENBLQ8MmGmPt/WqiyLeB8ohQ= github.com/cloudwego/eino-ext/components/model/ark v0.1.64 h1:ecsP4xWhOGi6NYxl2NOemEoTNpNuLT7ING8gOZ7CArI= github.com/cloudwego/eino-ext/components/model/ark v0.1.64/go.mod h1:aabMR15RTXBSi9Eu13CWavzE+no5BQO4FJUEEdqImbg= -github.com/cloudwego/eino-ext/components/model/openai v0.1.8 h1:uVCE8nNvbhD37xGFgdKESWjvChDSkCAMA+DodhFRBaM= -github.com/cloudwego/eino-ext/components/model/openai v0.1.8/go.mod h1:K6g2VgULehhJC5dgFdPW3u7gZNZ1p6DhnfA5UhkRpNY= -github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13 h1:z0bI5TH3nE+uDQiRhxBQMvk2HswlDUM3xP38+VSgpSQ= -github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13/go.mod h1:1xMQZ8eE11pkEoTAEy8UlaAY817qGVMvjpDPGSIO3Ns= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -46,8 +42,6 @@ github.com/eino-contrib/jsonschema v1.0.3 h1:2Kfsm1xlMV0ssY2nuxshS4AwbLFuqmPmzIj github.com/eino-contrib/jsonschema v1.0.3/go.mod h1:cpnX4SyKjWjGC7iN2EbhxaTdLqGjCi0e9DxpLYxddD4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= -github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -112,7 +106,6 @@ github.com/goph/emperror v0.17.2/go.mod h1:+ZbQ+fUNO/6FNiUo0ujtMjhgad9Xa6fQL9KhH github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -148,8 +141,6 @@ github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/meguminnnnnnnnn/go-openai v0.1.1 h1:u/IMMgrj/d617Dh/8BKAwlcstD74ynOJzCtVl+y8xAs= -github.com/meguminnnnnnnnn/go-openai v0.1.1/go.mod h1:qs96ysDmxhE4BZoU45I43zcyfnaYxU3X+aRzLko/htY= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -212,13 +203,15 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= @@ -235,8 +228,11 @@ github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/ github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc= github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA= @@ -245,8 +241,8 @@ go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= go.uber.org/mock v0.5.0/go.mod h1:ge71pBPLYDk7QIi1LupWxdAykm7KIEFchiOqd6z7qMM= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c= -golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk= +golang.org/x/arch v0.24.0 h1:qlJ3M9upxvFfwRM51tTg3Yl+8CP9vCC1E7vlFpgv99Y= +golang.org/x/arch v0.24.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -297,8 +293,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/backend/inits/mysql.go b/backend/inits/mysql.go index f8d3fb4..71ca055 100644 --- a/backend/inits/mysql.go +++ b/backend/inits/mysql.go @@ -4,33 +4,55 @@ import ( "fmt" "log" + "github.com/LoveLosita/smartflow/backend/model" "github.com/spf13/viper" "gorm.io/driver/mysql" "gorm.io/gorm" ) -// ConnectDB 连接数据库 -// 从config.yaml中读取数据库配置 -// 返回错误信息 +func autoMigrateModels(db *gorm.DB) error { + models := []any{ + &model.User{}, + &model.AgentChat{}, + &model.ChatHistory{}, + &model.Task{}, + &model.TaskClass{}, + &model.TaskClassItem{}, + &model.ScheduleEvent{}, + &model.Schedule{}, + &model.AgentOutboxMessage{}, + } + + for _, m := range models { + if err := db.AutoMigrate(m); err != nil { + return fmt.Errorf("auto migrate failed for %T: %w", m, err) + } + } + return nil +} + func ConnectDB() (*gorm.DB, error) { - // 从配置中读取数据库信息 host := viper.GetString("database.host") port := viper.GetString("database.port") user := viper.GetString("database.user") password := viper.GetString("database.password") dbname := viper.GetString("database.dbname") - // 构建DSN连接字符串 - dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", - user, password, host, port, dbname) + dsn := fmt.Sprintf( + "%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", + user, password, host, port, dbname, + ) - // 连接数据库 - var err error db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { return nil, err } + if err = autoMigrateModels(db); err != nil { + return nil, err + } + log.Println("Database connected successfully") + log.Println("Database auto migration completed") return db, nil } diff --git a/backend/kafka/admin.go b/backend/kafka/admin.go new file mode 100644 index 0000000..f995454 --- /dev/null +++ b/backend/kafka/admin.go @@ -0,0 +1,78 @@ +package kafka + +import ( + "context" + "errors" + "fmt" + "time" + + segmentkafka "github.com/segmentio/kafka-go" +) + +// WaitTopicReady 在指定超时时间内等待 Kafka topic 可用。 +// 背景:初次部署时 broker 可能已启动,但 topic/partition 还没就绪。 +// 这里启动前先探测,可减少“应用已启动但实际无法消费”的静默窗口。 +func WaitTopicReady(parent context.Context, brokers []string, topic string, timeout time.Duration) error { + if len(brokers) == 0 { + return errors.New("kafka brokers is empty") + } + if topic == "" { + return errors.New("kafka topic is empty") + } + if timeout <= 0 { + timeout = 30 * time.Second + } + + ctx, cancel := context.WithTimeout(parent, timeout) + defer cancel() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + var lastErr error + for { + if err := probeTopic(ctx, brokers, topic); err == nil { + return nil + } else { + lastErr = err + } + + select { + case <-ctx.Done(): + if lastErr != nil { + return fmt.Errorf("wait topic ready timeout, topic=%s: %w", topic, lastErr) + } + return fmt.Errorf("wait topic ready timeout, topic=%s", topic) + case <-ticker.C: + } + } +} + +// probeTopic 轮询所有 broker,只要任一 broker 能读到 topic 分区信息即视为就绪。 +func probeTopic(ctx context.Context, brokers []string, topic string) error { + var lastErr error + for _, broker := range brokers { + conn, err := segmentkafka.DialContext(ctx, "tcp", broker) + if err != nil { + lastErr = err + continue + } + + partitions, readErr := conn.ReadPartitions(topic) + _ = conn.Close() + if readErr != nil { + lastErr = readErr + continue + } + if len(partitions) == 0 { + lastErr = fmt.Errorf("topic %s has no partitions yet", topic) + continue + } + return nil + } + + if lastErr != nil { + return lastErr + } + return errors.New("unable to probe topic readiness") +} diff --git a/backend/kafka/config.go b/backend/kafka/config.go index 5bc1b77..c62a6b3 100644 --- a/backend/kafka/config.go +++ b/backend/kafka/config.go @@ -12,16 +12,21 @@ const ( DefaultGroup = "smartflow-agent-outbox-consumer" ) +// Config 描述 outbox 异步链路所需的 Kafka 配置。 +// 说明:这些参数同时影响“发送端(producer)”与“消费端(consumer)”。 type Config struct { - Enabled bool - Brokers []string - Topic string - GroupID string + Enabled bool + Brokers []string + Topic string + GroupID string + // RetryScanInterval/RetryBatchSize/MaxRetry 作用于 outbox 扫描与失败重试。 RetryScanInterval time.Duration RetryBatchSize int MaxRetry int } +// LoadConfig 从配置中心读取 Kafka 配置,并做兜底默认值。 +// 兼容性:优先读取 kafka.brokers(数组),为空时降级读取 kafka.broker(单值)。 func LoadConfig() Config { brokers := viper.GetStringSlice("kafka.brokers") if len(brokers) == 0 { diff --git a/backend/kafka/consumer.go b/backend/kafka/consumer.go index 1211398..ff77f11 100644 --- a/backend/kafka/consumer.go +++ b/backend/kafka/consumer.go @@ -7,6 +7,8 @@ import ( segmentkafka "github.com/segmentio/kafka-go" ) +// Consumer 是 Kafka 读取端封装。 +// 采用“手动提交 offset”,确保业务落库与 offset 提交的顺序可控。 type Consumer struct { reader *segmentkafka.Reader } @@ -16,18 +18,19 @@ func NewConsumer(cfg Config) (*Consumer, error) { return nil, errors.New("kafka brokers 未配置") } reader := segmentkafka.NewReader(segmentkafka.ReaderConfig{ - Brokers: cfg.Brokers, - Topic: cfg.Topic, - GroupID: cfg.GroupID, - MinBytes: 1, - MaxBytes: 10e6, + Brokers: cfg.Brokers, + Topic: cfg.Topic, + GroupID: cfg.GroupID, + MinBytes: 1, + MaxBytes: 10e6, + // 关闭自动提交,业务处理成功后显式 Commit。 CommitInterval: 0, StartOffset: segmentkafka.FirstOffset, }) return &Consumer{reader: reader}, nil } -// Dequeue 从 Kafka 拉取一条消息(手动提交 offset)。 +// Dequeue 从 Kafka 拉取一条消息(不自动提交 offset)。 func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) { if c == nil || c.reader == nil { return segmentkafka.Message{}, errors.New("kafka consumer 未初始化") @@ -35,6 +38,7 @@ func (c *Consumer) Dequeue(ctx context.Context) (segmentkafka.Message, error) { return c.reader.FetchMessage(ctx) } +// Commit 显式提交 offset。 func (c *Consumer) Commit(ctx context.Context, msg segmentkafka.Message) error { if c == nil || c.reader == nil { return errors.New("kafka consumer 未初始化") diff --git a/backend/kafka/envelope.go b/backend/kafka/envelope.go index b7ef17c..dbb6fff 100644 --- a/backend/kafka/envelope.go +++ b/backend/kafka/envelope.go @@ -2,7 +2,11 @@ package kafka import "encoding/json" -// Envelope 是投递到 Kafka 的统一包裹结构。 +// Envelope 是 outbox 投递到 Kafka 的统一包裹结构。 +// 设计目的: +// 1) 消费端先拿到 outbox_id,可直接回写状态; +// 2) biz_type 做分发,支持后续扩展更多异步业务; +// 3) payload 保持原始 JSON,按业务类型再反序列化。 type Envelope struct { OutboxID int64 `json:"outbox_id"` BizType string `json:"biz_type"` diff --git a/backend/kafka/producer.go b/backend/kafka/producer.go index c587dc8..05034e1 100644 --- a/backend/kafka/producer.go +++ b/backend/kafka/producer.go @@ -7,6 +7,8 @@ import ( segmentkafka "github.com/segmentio/kafka-go" ) +// Producer 是 Kafka 写入端封装。 +// 这里保持同步写(Async=false),方便把写入结果直接反馈给 outbox 状态机。 type Producer struct { writer *segmentkafka.Writer } @@ -16,15 +18,18 @@ func NewProducer(cfg Config) (*Producer, error) { return nil, errors.New("kafka brokers 未配置") } writer := &segmentkafka.Writer{ - Addr: segmentkafka.TCP(cfg.Brokers...), + Addr: segmentkafka.TCP(cfg.Brokers...), + // Hash 分区器保证相同 key 落同一分区,利于同会话消息顺序。 Balancer: &segmentkafka.Hash{}, RequiredAcks: segmentkafka.RequireOne, - Async: false, + // 关闭异步,确保写失败时可立即触发 outbox 重试逻辑。 + Async: false, } return &Producer{writer: writer}, nil } // Enqueue 将消息写入 Kafka。 +// 成功仅代表“已被 Kafka 接收”,不代表业务已完成(业务完成由 consumer + 落库决定)。 func (p *Producer) Enqueue(ctx context.Context, topic, key string, value []byte) error { if p == nil || p.writer == nil { return errors.New("kafka producer 未初始化") diff --git a/backend/middleware/cache_deleter.go b/backend/middleware/cache_deleter.go index 5296a4f..151ad6f 100644 --- a/backend/middleware/cache_deleter.go +++ b/backend/middleware/cache_deleter.go @@ -62,7 +62,7 @@ func (p *GormCachePlugin) dispatchCacheLogic(modelObj interface{}, db *gorm.DB) p.invalidTaskClassCache(*m.UserID) case model.Task: p.invalidTaskCache(m.UserID) - case model.AgentOutboxMessage, model.ChatHistory, model.AgentChat: + case model.AgentOutboxMessage, model.ChatHistory, model.AgentChat, model.User: // 这些模型目前没有定义缓存逻辑,先不处理 default: // 只有真正没定义的模型才会到这里 diff --git a/backend/model/agent.go b/backend/model/agent.go index da4ff5b..403d759 100644 --- a/backend/model/agent.go +++ b/backend/model/agent.go @@ -3,17 +3,17 @@ package model import "time" type UserSendMessageRequest struct { - ConversationID string `json:"conversation_id,omitempty"` // 可选,指定对话 ID + ConversationID string `json:"conversation_id,omitempty"` Message string `json:"message" binding:"required"` - Model string `json:"model,omitempty"` // 可选,指定使用的模型 - Thinking bool `json:"thinking,omitempty"` // 可选,是否开启思考模式 + Model string `json:"model,omitempty"` + Thinking bool `json:"thinking,omitempty"` } type SSEResponse struct { - Event string `json:"event"` // 事件类型,如 "message"、"error" 等 - ID int `json:"id,omitempty"` // SSE 的 id 字段 - Retry int64 `json:"retry,omitempty"` // SSE 的 retry 字段(毫秒) - Data SSEMessageData `json:"data"` // 事件数据 + Event string `json:"event"` + ID int `json:"id,omitempty"` + Retry int64 `json:"retry,omitempty"` + Data SSEMessageData `json:"data"` } type SSEMessageData struct { @@ -23,35 +23,33 @@ type SSEMessageData struct { type AgentChat struct { ID int64 `gorm:"column:id;primaryKey;autoIncrement;comment:自增ID"` - ChatID string `gorm:"column:chat_id;type:varchar(36);not null;uniqueIndex:uk_chat_id;comment:会话ID,UUID格式"` - UserID int `gorm:"column:user_id;not null;index:idx_user_last,priority:1;index:idx_user_status,priority:1;comment:所属用户"` + ChatID string `gorm:"column:chat_id;type:varchar(36);not null;uniqueIndex:uk_chat_id;comment:会话UUID"` + UserID int `gorm:"column:user_id;not null;index:idx_user_last,priority:1;index:idx_user_status,priority:1;comment:所属用户ID"` Title *string `gorm:"column:title;type:varchar(255);comment:会话标题"` - SystemPrompt *string `gorm:"column:system_prompt;type:text;comment:可选:系统提示词/会话级上下文"` - Model *string `gorm:"column:model;type:varchar(100);comment:可选:使用的模型标识"` - MessageCount int `gorm:"column:message_count;not null;default:0;comment:消息数(可冗余)"` - TokensTotal int `gorm:"column:tokens_total;not null;default:0;comment:累计消耗(可冗余)"` - LastMessageAt *time.Time `gorm:"column:last_message_at;comment:最后一条消息时间"` - Status string `gorm:"column:status;type:varchar(32);not null;default:active;index:idx_user_status,priority:2;comment:active/archived"` + SystemPrompt *string `gorm:"column:system_prompt;type:text;comment:系统提示词"` + Model *string `gorm:"column:model;type:varchar(100);comment:模型标识"` + MessageCount int `gorm:"column:message_count;not null;default:0;comment:消息总数"` + TokensTotal int `gorm:"column:tokens_total;not null;default:0;comment:累计Token"` + LastMessageAt *time.Time `gorm:"column:last_message_at;comment:最后消息时间"` + Status string `gorm:"column:status;type:varchar(32);not null;default:active;index:idx_user_status,priority:2;comment:会话状态"` CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"` UpdatedAt *time.Time `gorm:"column:updated_at;autoUpdateTime"` - DeletedAt *time.Time `gorm:"column:deleted_at;comment:软删除"` - // 关联:一个会话有多条消息 - Chats []ChatHistory `gorm:"foreignKey:ChatID;references:ChatID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"` + DeletedAt *time.Time `gorm:"column:deleted_at;comment:软删除时间"` } func (AgentChat) TableName() string { return "agent_chats" } type ChatHistory struct { ID int `gorm:"column:id;primaryKey;autoIncrement"` - ChatID string `gorm:"column:chat_id;type:varchar(36);not null;index:idx_user_chat,priority:2;index:idx_chat_id;comment:对话UUID"` + ChatID string `gorm:"column:chat_id;type:varchar(36);not null;index:idx_user_chat,priority:2;index:idx_chat_id;comment:会话UUID"` UserID int `gorm:"column:user_id;not null;index:idx_user_chat,priority:1"` - MessageContent *string `gorm:"column:message_content;type:text;comment:用户或AI的话"` - Role *string `gorm:"column:role;type:varchar(32);comment:user / assistant"` - TokensConsumed int `gorm:"column:tokens_consumed;not null;default:0;comment:单次消耗,用于累加到 users 表"` + MessageContent *string `gorm:"column:message_content;type:text;comment:消息内容"` + Role *string `gorm:"column:role;type:varchar(32);comment:消息角色"` + TokensConsumed int `gorm:"column:tokens_consumed;not null;default:0;comment:本轮消耗Token"` CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"` - // 可选:回挂会话(按 chat_id -> chat_histories.chat_id) - ChatHistory AgentChat `gorm:"foreignKey:ChatID;references:ChatID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"` + // 只保留从聊天记录到会话的单向关联,避免迁移时出现循环依赖。 + Chat AgentChat `gorm:"foreignKey:ChatID;references:ChatID;constraint:OnUpdate:CASCADE,OnDelete:CASCADE"` } func (ChatHistory) TableName() string { return "chat_histories" } diff --git a/backend/model/outbox.go b/backend/model/outbox.go index 9796598..c473dce 100644 --- a/backend/model/outbox.go +++ b/backend/model/outbox.go @@ -3,26 +3,40 @@ package model import "time" const ( - OutboxStatusPending = "pending" + // OutboxStatusPending 表示消息已落 outbox,等待投递或等待下次重试窗口到达。 + OutboxStatusPending = "pending" + // OutboxStatusPublished 表示消息已成功写入 Kafka,但尚未完成业务消费。 OutboxStatusPublished = "published" - OutboxStatusConsumed = "consumed" - OutboxStatusDead = "dead" + // OutboxStatusConsumed 表示消息对应的业务逻辑已成功执行(本项目中即聊天记录已落库)。 + OutboxStatusConsumed = "consumed" + // OutboxStatusDead 表示达到最大重试次数或出现不可恢复错误,进入死信终态。 + OutboxStatusDead = "dead" + // OutboxBizTypeChatHistoryPersist 当前唯一业务类型:聊天记录异步持久化。 OutboxBizTypeChatHistoryPersist = "chat_history_persist" ) -// AgentOutboxMessage 保存需要异步投递到 Kafka 的消息。 +// AgentOutboxMessage 是 outbox 模式的核心表结构: +// 1. 先写本地数据库(保证事务内可见); +// 2. 再由后台扫描并投递 Kafka; +// 3. 由消费者完成最终业务落库并回写状态。 type AgentOutboxMessage struct { - ID int64 `gorm:"column:id;primaryKey;autoIncrement"` - BizType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:业务类型"` - Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"` - MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"` - Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"` + ID int64 `gorm:"column:id;primaryKey;autoIncrement"` + // BizType 决定消费者侧如何解释 Payload。 + BizType string `gorm:"column:biz_type;type:varchar(64);not null;index:idx_outbox_status_next,priority:3;comment:业务类型"` + // Topic/MessageKey 用于 Kafka 路由与分区稳定性。 + Topic string `gorm:"column:topic;type:varchar(128);not null;comment:Kafka Topic"` + MessageKey string `gorm:"column:message_key;type:varchar(128);not null;comment:Kafka 消息键"` + // Payload 存储业务 JSON,消费时再反序列化为具体 payload 结构。 + Payload string `gorm:"column:payload;type:longtext;not null;comment:业务载荷(JSON)"` + // Status + NextRetryAt + RetryCount 共同描述“是否可被调度重试”。 Status string `gorm:"column:status;type:varchar(32);not null;index:idx_outbox_status_next,priority:1;comment:pending/published/consumed/dead"` RetryCount int `gorm:"column:retry_count;not null;default:0;comment:已重试次数"` MaxRetry int `gorm:"column:max_retry;not null;default:20;comment:最大重试次数"` NextRetryAt *time.Time `gorm:"column:next_retry_at;index:idx_outbox_status_next,priority:2;comment:下次重试时间"` - LastError *string `gorm:"column:last_error;type:text;comment:最后一次错误"` + // LastError 记录最近一次失败原因,便于排障和可观测。 + LastError *string `gorm:"column:last_error;type:text;comment:最后一次错误"` + // PublishedAt/ConsumedAt 便于统计“投递延迟”和“消费完成耗时”。 PublishedAt *time.Time `gorm:"column:published_at;comment:投递到 Kafka 时间"` ConsumedAt *time.Time `gorm:"column:consumed_at;comment:消费完成时间"` CreatedAt *time.Time `gorm:"column:created_at;autoCreateTime"` @@ -33,7 +47,8 @@ func (AgentOutboxMessage) TableName() string { return "agent_outbox_messages" } -// ChatHistoryPersistPayload 是“聊天记录持久化”消息体。 +// ChatHistoryPersistPayload 是“聊天记录持久化”消息的业务载荷。 +// 注意:该载荷既会被写入 outbox,也会被封装到 Kafka Envelope 中传输。 type ChatHistoryPersistPayload struct { UserID int `json:"user_id"` ConversationID string `json:"conversation_id"` diff --git a/backend/service/agent.go b/backend/service/agent.go index 6cd4dfb..9982a1f 100644 --- a/backend/service/agent.go +++ b/backend/service/agent.go @@ -48,6 +48,9 @@ func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, strin return s.AIHub.Worker, "worker" } +// saveChatHistoryReliable 是聊天记录持久化的统一入口: +// 1) 启用 outbox + Kafka 时,走异步可靠链路; +// 2) 未启用时,退化为同步写数据库。 func (s *AgentService) saveChatHistoryReliable(ctx context.Context, payload model.ChatHistoryPersistPayload) error { if s.asyncPipeline == nil { return s.repo.SaveChatHistory(ctx, payload.UserID, payload.ConversationID, payload.Role, payload.Message) @@ -64,15 +67,15 @@ func pushErrNonBlocking(errChan chan error, err error) { } func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string) (<-chan string, <-chan error) { - // 1) 准备输出通道 + // 1) 准备输出通道。 outChan := make(chan string, 5) errChan := make(chan error, 1) - // 2) 规范会话并选择模型 + // 2) 规范会话 ID 并选择模型。 chatID = normalizeConversationID(chatID) selectedModel, resolvedModelName := s.pickChatModel(modelName) - // 3) 确保会话存在 + // 3) 确保会话存在:先查缓存,再回源数据库,必要时创建新会话。 result, err := s.agentCache.GetConversationStatus(ctx, chatID) if err != nil { errChan <- err @@ -101,7 +104,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin } } - // 4) 组装历史上下文(先读缓存,缓存未命中再读数据库) + // 4) 组装历史上下文:先读缓存,缓存未命中再读数据库。 chatHistory, err := s.agentCache.GetHistory(ctx, chatID) if err != nil { errChan <- err @@ -123,12 +126,12 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin chatHistory = conv.ToEinoMessages(histories) } - // 5) 按 token 预算裁剪历史:从最旧消息开始持续弹出,直到满足预算 + // 5) 基于 token 预算裁剪历史,避免请求超长。 historyBudget := pkg.HistoryTokenBudgetByModel(resolvedModelName, agent.SystemPrompt, userMessage) trimmedHistory, totalHistoryTokens, keptHistoryTokens, droppedCount := pkg.TrimHistoryByTokenBudget(chatHistory, historyBudget) chatHistory = trimmedHistory - // 6) 根据最新裁剪结果动态调整 Redis 会话窗口 + // 6) 根据裁剪结果调整 Redis 会话窗口,控制缓存体积。 targetWindow := pkg.CalcSessionWindowSize(len(chatHistory)) if err = s.agentCache.SetSessionWindowSize(ctx, chatID, targetWindow); err != nil { log.Printf("failed to set history window for %s: %v", chatID, err) @@ -142,7 +145,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin chatID, totalHistoryTokens, keptHistoryTokens, droppedCount, historyBudget, targetWindow) } - // 缓存未命中时,把“裁剪后的历史”回填进缓存 + // 缓存未命中时,把“裁剪后的历史”回填 Redis。 if cacheMiss { if err = s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil { errChan <- err @@ -152,7 +155,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin } } - // 7) 先同步写 Redis,再把持久化请求交给 outbox + Kafka + // 7) 先同步写 Redis,再把数据库持久化交给 outbox 可靠链路。 if err = s.agentCache.PushMessage(ctx, chatID, &schema.Message{Role: schema.User, Content: userMessage}); err != nil { log.Printf("failed to push user message into redis history: %v", err) } @@ -168,7 +171,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } - // 8) 启动流式聊天 + // 8) 启动流式对话。 go func() { defer close(outChan) @@ -178,7 +181,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return } - // 9) 回答完成后,同步写 Redis,并把数据库落库交给 outbox + Kafka + // 9) 助手回答完成后,重复同样流程:先写 Redis,再异步持久化。 if cacheErr := s.agentCache.PushMessage(context.Background(), chatID, &schema.Message{Role: schema.Assistant, Content: fullText}); cacheErr != nil { log.Printf("failed to push assistant message into redis history: %v", cacheErr) } diff --git a/backend/service/agent_async_pipeline.go b/backend/service/agent_async_pipeline.go index 8da6e5a..f1d8f93 100644 --- a/backend/service/agent_async_pipeline.go +++ b/backend/service/agent_async_pipeline.go @@ -15,11 +15,16 @@ import ( "gorm.io/gorm" ) -// AgentAsyncPipeline 负责 outbox 扫描、Kafka 投递与消费落库。 +// AgentAsyncPipeline 负责 outbox 的“异步可靠链路”: +// 1) 业务侧写 outbox(pending); +// 2) dispatch loop 扫描并投递 Kafka(published); +// 3) consume loop 消费并落库(consumed); +// 4) 任一步失败按重试策略回到 pending 或 dead。 type AgentAsyncPipeline struct { outboxRepo *dao.OutboxDAO producer *kafkabus.Producer consumer *kafkabus.Consumer + brokers []string topic string maxRetry int scanEvery time.Duration @@ -43,6 +48,7 @@ func NewAgentAsyncPipeline(outboxRepo *dao.OutboxDAO, cfg kafkabus.Config) (*Age outboxRepo: outboxRepo, producer: producer, consumer: consumer, + brokers: cfg.Brokers, topic: cfg.Topic, maxRetry: cfg.MaxRetry, scanEvery: cfg.RetryScanInterval, @@ -50,10 +56,23 @@ func NewAgentAsyncPipeline(outboxRepo *dao.OutboxDAO, cfg kafkabus.Config) (*Age }, nil } +// Start 启动两个后台协程: +// - startDispatchLoop:扫描 pending 并投递 Kafka +// - startConsumeLoop:消费 Kafka 并执行业务落库 func (p *AgentAsyncPipeline) Start(ctx context.Context) { if p == nil { return } + + log.Printf("Kafka async pipeline starting: topic=%s brokers=%v retry_scan=%s batch=%d", p.topic, p.brokers, p.scanEvery, p.scanBatch) + if err := kafkabus.WaitTopicReady(ctx, p.brokers, p.topic, 30*time.Second); err != nil { + // 首次部署常见情况:broker 已起来但 topic/partition 尚未可用。 + // 这里明确打印,避免“消息堆积但控制台无提示”的观感。 + log.Printf("Kafka topic not ready before consume loop start: %v", err) + } else { + log.Printf("Kafka topic is ready: %s", p.topic) + } + go p.startDispatchLoop(ctx) go p.startConsumeLoop(ctx) } @@ -70,6 +89,10 @@ func (p *AgentAsyncPipeline) Close() { } } +// EnqueueChatHistoryPersist 是业务侧入口: +// 1) 先写 outbox; +// 2) 立刻尝试“首发投递”一次(非阻塞主流程); +// 3) 失败后由扫描器按 next_retry_at 继续重试。 func (p *AgentAsyncPipeline) EnqueueChatHistoryPersist(ctx context.Context, payload model.ChatHistoryPersistPayload) error { if p == nil { return errors.New("Kafka 异步链路未初始化") @@ -84,6 +107,7 @@ func (p *AgentAsyncPipeline) EnqueueChatHistoryPersist(ctx context.Context, payl return nil } +// startDispatchLoop 定时扫描 pending 且到期的消息,逐条尝试投递。 func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) { ticker := time.NewTicker(p.scanEvery) defer ticker.Stop() @@ -98,6 +122,9 @@ func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) { log.Printf("扫描 outbox 失败: %v", err) continue } + if len(pendingMessages) > 0 { + log.Printf("outbox due messages=%d, start dispatch", len(pendingMessages)) + } for _, msg := range pendingMessages { if err = p.dispatchOne(ctx, msg.ID); err != nil { log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err) @@ -107,6 +134,11 @@ func (p *AgentAsyncPipeline) startDispatchLoop(ctx context.Context) { } } +// dispatchOne 执行单条 outbox 投递: +// 1) 读取 outbox 行; +// 2) 组装 Envelope; +// 3) 写 Kafka; +// 4) 回写 published。 func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) error { outboxMsg, err := p.outboxRepo.GetByID(ctx, outboxID) if err != nil { @@ -115,6 +147,7 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er } return err } + // 终态不再重复投递。 if outboxMsg.Status == model.OutboxStatusConsumed || outboxMsg.Status == model.OutboxStatusDead { return nil } @@ -126,7 +159,8 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er } raw, err := json.Marshal(envelope) if err != nil { - markErr := p.outboxRepo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包裹失败: "+err.Error()) + // 序列化都失败通常是坏数据,直接死信。 + markErr := p.outboxRepo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 包装失败: "+err.Error()) if markErr != nil { log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) } @@ -144,6 +178,7 @@ func (p *AgentAsyncPipeline) dispatchOne(ctx context.Context, outboxID int64) er return nil } +// startConsumeLoop 持续从 Kafka 拉取消息并处理。 func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) { for { select { @@ -157,7 +192,7 @@ func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) { if errors.Is(err, context.Canceled) { return } - log.Printf("Kafka 消费拉取失败: %v", err) + log.Printf("Kafka 消费拉取失败(topic=%s): %v", p.topic, err) time.Sleep(300 * time.Millisecond) continue } @@ -167,21 +202,24 @@ func (p *AgentAsyncPipeline) startConsumeLoop(ctx context.Context) { } } +// handleMessage 先解析 Envelope,再按 biz_type 分发到具体处理器。 func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka.Message) error { var envelope kafkabus.Envelope if err := json.Unmarshal(msg.Value, &envelope); err != nil { + // 包装体坏数据,提交 offset 跳过,避免阻塞分区。 _ = p.consumer.Commit(ctx, msg) - return fmt.Errorf("解析 Kafka 包裹失败: %w", err) + return fmt.Errorf("解析 Kafka 包装失败: %w", err) } if envelope.OutboxID <= 0 { _ = p.consumer.Commit(ctx, msg) - return errors.New("Kafka 包裹缺少 outbox_id") + return errors.New("Kafka 包装缺少 outbox_id") } switch envelope.BizType { case model.OutboxBizTypeChatHistoryPersist: return p.consumeChatHistory(ctx, msg, envelope) default: + // 未知业务类型直接死信并提交,避免反复重试无意义数据。 _ = p.outboxRepo.MarkDead(ctx, envelope.OutboxID, "未知业务类型: "+envelope.BizType) if err := p.consumer.Commit(ctx, msg); err != nil { return err @@ -190,6 +228,10 @@ func (p *AgentAsyncPipeline) handleMessage(ctx context.Context, msg segmentkafka } } +// consumeChatHistory 执行“聊天记录持久化”消费逻辑。 +// 提交策略说明: +// 1) 成功落库后提交; +// 2) 失败时先回写 outbox(便于重试/排障),再提交,避免分区阻塞。 func (p *AgentAsyncPipeline) consumeChatHistory(ctx context.Context, msg segmentkafka.Message, envelope kafkabus.Envelope) error { var payload model.ChatHistoryPersistPayload if err := json.Unmarshal(envelope.Payload, &payload); err != nil { diff --git a/docker-compose.yml b/docker-compose.yml index 38446b5..d404a41 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,8 +1,8 @@ services: mysql: image: mysql:8.0 - container_name: SmartFlow-mysql - restart: always + container_name: smartflow-mysql + restart: unless-stopped environment: MYSQL_ROOT_PASSWORD: root_password_123 MYSQL_DATABASE: smartflow @@ -10,24 +10,34 @@ services: MYSQL_PASSWORD: smartflow_password_456 ports: - "3306:3306" - volumes: - - ./docker/mysql/data:/var/lib/mysql command: --default-authentication-plugin=mysql_native_password + volumes: + - mysql_data:/var/lib/mysql + healthcheck: + test: ["CMD-SHELL", "mysqladmin ping -h localhost -uroot -proot_password_123"] + interval: 10s + timeout: 5s + retries: 10 redis: - image: redis:latest - container_name: redflow-redis - restart: always - command: redis-server --requirepass redis_password_789 + image: redis:7 + container_name: smartflow-redis + restart: unless-stopped + command: redis-server --appendonly yes --requirepass redis_password_789 ports: - "6379:6379" volumes: - - ./docker/redis/data:/data + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "-a", "redis_password_789", "ping"] + interval: 10s + timeout: 5s + retries: 10 kafka: image: apache/kafka:3.7.2 - container_name: SmartFlow-kafka - restart: always + container_name: smartflow-kafka + restart: unless-stopped ports: - "9092:9092" environment: @@ -40,16 +50,39 @@ services: KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_NUM_PARTITIONS: 3 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_DIRS: /var/lib/kafka/data volumes: - kafka_data:/var/lib/kafka/data + healthcheck: + test: ["CMD-SHELL", "/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list >/dev/null 2>&1"] + interval: 10s + timeout: 5s + retries: 15 + + kafka-init: + image: apache/kafka:3.7.2 + container_name: smartflow-kafka-init + depends_on: + kafka: + condition: service_healthy + entrypoint: ["/bin/bash", "-c"] + command: > + /opt/kafka/bin/kafka-topics.sh + --bootstrap-server kafka:9094 + --create + --if-not-exists + --topic smartflow.agent.outbox + --partitions 3 + --replication-factor 1 + restart: "no" volumes: mysql_data: redis_data: - kafka_data: \ No newline at end of file + kafka_data: