diff --git a/backend/cmd/start.go b/backend/cmd/start.go index 4a7e9ee..cf29b92 100644 --- a/backend/cmd/start.go +++ b/backend/cmd/start.go @@ -919,6 +919,7 @@ func (r *appRuntime) registerEventHandlers() error { r.memoryModule, r.activeTriggerWorkflow, r.notificationService, + r.tokenStoreClient, r.userAuthClient, ); err != nil { return err diff --git a/backend/cmd/taskclassforum/main.go b/backend/cmd/taskclassforum/main.go index 6230532..8f98b31 100644 --- a/backend/cmd/taskclassforum/main.go +++ b/backend/cmd/taskclassforum/main.go @@ -5,10 +5,12 @@ import ( "github.com/LoveLosita/smartflow/backend/bootstrap" legacydao "github.com/LoveLosita/smartflow/backend/dao" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" "github.com/LoveLosita/smartflow/backend/services/taskclassforum/adapter" forumdao "github.com/LoveLosita/smartflow/backend/services/taskclassforum/dao" forumrpc "github.com/LoveLosita/smartflow/backend/services/taskclassforum/rpc" forumsv "github.com/LoveLosita/smartflow/backend/services/taskclassforum/sv" + sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" "github.com/spf13/viper" ) @@ -21,14 +23,19 @@ func main() { if err != nil { log.Fatalf("failed to connect taskclassforum database: %v", err) } + if err := registerForumRewardOutboxRoutes(); err != nil { + log.Fatalf("failed to register taskclassforum outbox routes: %v", err) + } // 1. 复用同一个 DB 句柄装配 legacy TaskClass DAO,避免本轮抢改 task-class 模块。 // 2. 计划广场只通过快照端口读取和创建 TaskClass,不直接写 schedule。 // 3. 后续 task-class 独立成服务后,只替换这里的 adapter 注入点。 taskClassPort := adapter.NewLegacyTaskClassAdapter(legacydao.NewTaskClassDAO(db)) + eventPublisher := outboxinfra.NewRepositoryPublisher(outboxinfra.NewRepository(db), viper.GetInt("kafka.maxRetry")) svc := forumsv.New(forumsv.Options{ - DB: db, - TaskClassPort: taskClassPort, + DB: db, + TaskClassPort: taskClassPort, + EventPublisher: eventPublisher, }) forumrpc.Start(forumrpc.ServerOptions{ ListenOn: viper.GetString("taskclassforum.rpc.listenOn"), @@ -36,3 +43,16 @@ func main() { Service: svc, }) } + +// registerForumRewardOutboxRoutes 负责让独立 taskclassforum RPC 进程认识奖励事件的落表归属。 +// +// 步骤说明: +// 1. 点赞、导入事件都由 token-store 消费并写 token_grants,所以事件路由归属 token-store; +// 2. taskclassforum 进程只负责发布事件,不启动 consumer,也不直接写奖励账本; +// 3. 若注册失败直接阻止启动,避免后续点赞/导入看似成功但 outbox 永远无法入队。 +func registerForumRewardOutboxRoutes() error { + if err := outboxinfra.RegisterEventService(sharedevents.ForumPostLikedEventType, outboxinfra.ServiceTokenStore); err != nil { + return err + } + return outboxinfra.RegisterEventService(sharedevents.ForumPostImportedEventType, outboxinfra.ServiceTokenStore) +} diff --git a/backend/config.example.yaml b/backend/config.example.yaml index 6819397..91ba79a 100644 --- a/backend/config.example.yaml +++ b/backend/config.example.yaml @@ -47,6 +47,9 @@ taskclassforum: # Token 商店 zrpc 独立服务与网关客户端配置。 tokenstore: + reward: + forumLikeAmount: 1 + forumImportAmount: 5 rpc: listenOn: "0.0.0.0:9083" endpoints: @@ -64,6 +67,18 @@ kafka: retryBatchSize: 100 maxRetry: 20 +# 服务级 outbox 目录配置;未显式覆盖时会使用代码内置默认值。 +outbox: + services: + taskclass-forum: + topic: "smartflow.taskclass-forum.outbox" + groupID: "smartflow-taskclass-forum-outbox-consumer" + table: "taskclass_forum_outbox_messages" + token-store: + topic: "smartflow.token-store.outbox" + groupID: "smartflow-token-store-outbox-consumer" + table: "token_store_outbox_messages" + # 通知投递配置。 notification: frontendBaseURL: "http://localhost:5173" diff --git a/backend/gateway/forumapi/handler.go b/backend/gateway/forumapi/handler.go index 7564ec4..ff70e8d 100644 --- a/backend/gateway/forumapi/handler.go +++ b/backend/gateway/forumapi/handler.go @@ -14,7 +14,12 @@ import ( "github.com/gin-gonic/gin" ) -const requestTimeout = 2 * time.Second +const ( + requestTimeout = 2 * time.Second + forumLikeRewardAmount = int64(1) + forumImportRewardAmount = int64(5) + rewardHintStatusActive = "rule_active" +) type ForumClient interface { ListPosts(ctx context.Context, actorUserID uint64, page int, pageSize int, sort string, keyword string, tag string) ([]contracts.ForumPostBrief, contracts.PageResult, error) @@ -220,8 +225,8 @@ func (h *Handler) LikePost(c *gin.Context) { LikeCount: counters.LikeCount, RewardHint: &rewardHint{ Receiver: "author", - Status: "recorded", - Amount: 1, + Status: rewardHintStatusActive, + Amount: forumLikeRewardAmount, }, })) } @@ -369,8 +374,8 @@ func (h *Handler) ImportPost(c *gin.Context) { ImportCount: result.ImportCount, RewardHint: rewardHint{ Receiver: "author", - Status: "recorded", - Amount: 2, + Status: rewardHintStatusActive, + Amount: forumImportRewardAmount, }, NextAction: nextAction{ Type: "open_task_class", diff --git a/backend/gateway/tokenstore/client.go b/backend/gateway/tokenstore/client.go index 6706d59..1ee20f1 100644 --- a/backend/gateway/tokenstore/client.go +++ b/backend/gateway/tokenstore/client.go @@ -217,6 +217,25 @@ func (c *Client) ListGrants(ctx context.Context, req tokencontracts.ListTokenGra return tokenGrantsFromPB(resp.Items), pageFromPB(resp.Page), nil } +func (c *Client) RecordForumRewardGrant(ctx context.Context, req tokencontracts.RecordForumRewardGrantRequest) (*tokencontracts.TokenGrantView, error) { + if err := c.ensureReady(); err != nil { + return nil, err + } + resp, err := c.rpc.RecordForumRewardGrant(ctx, &pb.RecordForumRewardGrantRequest{ + EventId: req.EventID, + ReceiverUserId: req.ReceiverUserID, + Source: req.Source, + SourceRefId: req.SourceRefID, + }) + if err != nil { + return nil, responseFromRPCError(err) + } + if resp == nil { + return nil, errors.New("tokenstore zrpc service returned empty record forum reward grant response") + } + return tokenGrantFromPB(resp.Grant), nil +} + func (c *Client) ensureReady() error { if c == nil || c.rpc == nil { return errors.New("tokenstore zrpc client is not initialized") diff --git a/backend/go.sum b/backend/go.sum index c24c2ed..824fae7 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -1,10 +1,20 @@ +cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= +github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0= +github.com/IBM/sarama v1.43.1/go.mod h1:GG5q1RURtDNPz8xxJs3mgX6Ytak8Z9eLhAkJPObe2xE= +github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/airbrake/gobrake v3.6.1+incompatible/go.mod h1:wM4gu3Cn0W0K7GUuVWnlXZU11AGBXMILnrdOU8Kn00o= +github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE= +github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68= github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= @@ -45,6 +55,7 @@ github.com/cloudwego/eino-ext/components/model/ark v0.1.64 h1:ecsP4xWhOGi6NYxl2N github.com/cloudwego/eino-ext/components/model/ark v0.1.64/go.mod h1:aabMR15RTXBSi9Eu13CWavzE+no5BQO4FJUEEdqImbg= github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13 h1:z0bI5TH3nE+uDQiRhxBQMvk2HswlDUM3xP38+VSgpSQ= github.com/cloudwego/eino-ext/libs/acl/openai v0.1.13/go.mod h1:1xMQZ8eE11pkEoTAEy8UlaAY817qGVMvjpDPGSIO3Ns= +github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5/go.mod h1:KdCmV+x/BuvyMxRnYBlmVaq4OLiKW6iRQfvC62cvdkI= github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= @@ -57,21 +68,30 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eino-contrib/jsonschema v1.0.3 h1:2Kfsm1xlMV0ssY2nuxshS4AwbLFuqmPmzIjLVJ1Fsp0= github.com/eino-contrib/jsonschema v1.0.3/go.mod h1:cpnX4SyKjWjGC7iN2EbhxaTdLqGjCi0e9DxpLYxddD4= github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.14.0/go.mod h1:NcS5X47pLl/hfqxU70yPwL9ZMkUlwlKxtAohpi2wBEU= +github.com/envoyproxy/go-control-plane/envoy v1.36.0/go.mod h1:ty89S1YCCVruQAm9OtKeEkQLTb+Lkz0k8v9W0Oxsv98= +github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/fullstorydev/grpcurl v1.9.3/go.mod h1:/b4Wxe8bG6ndAjlfSUjwseQReUDUvBJiFEB7UllOlUE= github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= @@ -83,6 +103,7 @@ github.com/gin-gonic/gin v1.11.0 h1:OW/6PLjyusp2PPXtyxKHU0RbX6I/l28FTdDlae5ueWk= github.com/gin-gonic/gin v1.11.0/go.mod h1:+iq/FyxlGzII0KHiBGjuNn4UNENUlKbGlNmc+W50Dls= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= +github.com/go-jose/go-jose/v4 v4.1.3/go.mod h1:x4oUasVrzR7071A4TnHLGSPpNOm2a21K9Kf04k1rs08= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -109,6 +130,7 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= @@ -124,6 +146,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.2.5/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -137,6 +160,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo= github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -148,6 +173,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/jsonschema-go v0.4.2/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE= github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db h1:097atOisP2aRj7vFgYQBbFN4U4JNXUNYpxael3UzMyo= github.com/google/pprof v0.0.0-20241029153458-d1b30febd7db/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -157,16 +183,34 @@ github.com/goph/emperror v0.17.2 h1:yLapQcmEsO0ipe9p5TaN22djm3OFV/TfM/fcYP0/J18= github.com/goph/emperror v0.17.2/go.mod h1:+ZbQ+fUNO/6FNiUo0ujtMjhgad9Xa6fQL9KhH4LNHic= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/grafana/pyroscope-go v1.2.8 h1:UvCwIhlx9DeV7F6TW/z8q1Mi4PIm3vuUJ2ZlCEvmA4M= github.com/grafana/pyroscope-go v1.2.8/go.mod h1:SSi59eQ1/zmKoY/BKwa5rSFsJaq+242Bcrr4wPix1g8= github.com/grafana/pyroscope-go/godeltaprof v0.1.9 h1:c1Us8i6eSmkW+Ez05d3co8kasnuOY813tbMN8i/a3Og= github.com/grafana/pyroscope-go/godeltaprof v0.1.9/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU= +github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 h1:X+2YciYSxvMQK0UZ7sg45ZVabVZBeBuvMkmuI2V3Fak= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7/go.mod h1:lW34nIZuQ8UDPdkon5fmfp2l3+ZkQ2me/+oecHYLOII= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jhump/protoreflect v1.18.0/go.mod h1:ezWcltJIVF4zYdIFM+D/sHV4Oh5LNU08ORzCGfwvTz8= +github.com/jhump/protoreflect/v2 v2.0.0-beta.1/go.mod h1:D9LBEowZyv8/iSu97FU2zmXG3JxVTmNw21mu63niFzU= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -177,10 +221,12 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -210,10 +256,15 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/meguminnnnnnnnn/go-openai v0.1.1 h1:u/IMMgrj/d617Dh/8BKAwlcstD74ynOJzCtVl+y8xAs= github.com/meguminnnnnnnnn/go-openai v0.1.1/go.mod h1:qs96ysDmxhE4BZoU45I43zcyfnaYxU3X+aRzLko/htY= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= +github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= +github.com/modelcontextprotocol/go-sdk v1.4.0/go.mod h1:Nxc2n+n/GdCebUaqCOhTetptS17SXXNu9IfNTaLDi1E= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -222,6 +273,8 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/nikolalohinski/gonja v1.5.3 h1:GsA+EEaZDZPGJ8JtpeGN78jidhOlxeJROpqMT9fTj9c= github.com/nikolalohinski/gonja v1.5.3/go.mod h1:RmjwxNiXAEqcq1HeK5SSMmqFJvKOfTfXhkJv6YBtPa4= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -239,12 +292,15 @@ github.com/openzipkin/zipkin-go v0.4.3 h1:9EGwpqkgnwdEIJ+Od7QVSEIH+ocmm5nPat0G7s github.com/openzipkin/zipkin-go v0.4.3/go.mod h1:M9wCJZFWCo2RiY+o1eBCEMe0Dp2S5LDHcMZmk3RmK7c= github.com/pelletier/go-toml/v2 v2.3.0 h1:k59bC/lIZREW0/iVaQR8nDHxVq8OVlIzYCOJf421CaM= github.com/pelletier/go-toml/v2 v2.3.0/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/petermattis/goid v0.0.0-20260113132338-7c7de50cc741/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= @@ -262,15 +318,21 @@ github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs= github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/robertkrimen/otto v0.2.1 h1:FVP0PJ0AHIjC+N4pKCG9yCDz6LHNPCwi/GKID5pGGF0= github.com/robertkrimen/otto v0.2.1/go.mod h1:UPwtJ1Xu7JrLcZjNWN8orJaM5n5YEtqL//farB5FlRY= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rollbar/rollbar-go v1.0.2/go.mod h1:AcFs5f0I+c71bpHlXNNDbOWJiKwjFDtISeXco0L5PKQ= github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= +github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg= +github.com/segmentio/encoding v0.5.3/go.mod h1:HS1ZKa3kSN32ZHVZ7ZLPLXWvOVIiZtyJnO1gPH1sKt0= github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -294,6 +356,7 @@ github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= +github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -336,8 +399,11 @@ github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs= github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc= github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA= +github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= @@ -353,8 +419,10 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.21 h1:lPBu71Y7osQmzlflM9OfeIV2JlmpBjqBNlLtcoB go.etcd.io/etcd/client/pkg/v3 v3.5.21/go.mod h1:BgqT/IXPjK9NkeSDjbzwsHySX3yIle2+ndz28nVsjUs= go.etcd.io/etcd/client/v3 v3.5.21 h1:T6b1Ow6fNjOLOtM0xSoKNQt1ASPCLWrF9XMHcH9pEyY= go.etcd.io/etcd/client/v3 v3.5.21/go.mod h1:mFYy67IOqmbRf/kRUvsHixzo3iG+1OF2W2+jVIQRAnU= +go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/detectors/gcp v1.39.0/go.mod h1:t/OGqzHBa5v6RHZwrDBJ2OirWc+4q/w2fTbLZwAKjTk= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs= @@ -461,6 +529,7 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2/go.mod h1:b7fPSJ0pKZ3ccUh8gnTONJxhn3c/PS6tyzQvyqw4iA8= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -502,6 +571,7 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= google.golang.org/genproto/googleapis/api v0.0.0-20260427160629-7cedc36a6bc4 h1:yOzSCGPx+cp5VO7IxvZ9SBFF7j1tZVcNtlHR2iYKtVo= google.golang.org/genproto/googleapis/api v0.0.0-20260427160629-7cedc36a6bc4/go.mod h1:Q9HWtNeE7tM9npdIsEvqXj1QJIvVoeAV3rtXtS715Cw= google.golang.org/genproto/googleapis/rpc v0.0.0-20260427160629-7cedc36a6bc4 h1:tEkOQcXgF6dH1G+MVKZrfpYvozGrzb91k6ha7jireSM= @@ -528,6 +598,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/cheggaaa/pb.v1 v1.0.28/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4= gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= @@ -549,6 +620,7 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg= gorm.io/driver/mysql v1.6.0/go.mod h1:D/oCC2GWK3M/dqoLxnOlaNKmXz8WNTfcS9y5ovaSqKo= +gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8= gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= @@ -559,12 +631,14 @@ k8s.io/apimachinery v0.34.3 h1:/TB+SFEiQvN9HPldtlWOTp0hWbJ+fjU+wkxysf/aQnE= k8s.io/apimachinery v0.34.3/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= k8s.io/client-go v0.34.3 h1:wtYtpzy/OPNYf7WyNBTj3iUA0XaBHVqhv4Iv3tbrF5A= k8s.io/client-go v0.34.3/go.mod h1:OxxeYagaP9Kdf78UrKLa3YZixMCfP6bgPwPwNBQBzpM= +k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b/go.mod h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts= k8s.io/utils v0.0.0-20260319190234-28399d86e0b5 h1:kBawHLSnx/mYHmRnNUf9d4CpjREbeZuxoSGOX/J+aYM= k8s.io/utils v0.0.0-20260319190234-28399d86e0b5/go.mod h1:xDxuJ0whA3d0I4mf/C4ppKHxXynQ+fxnkmQH0vTHnuk= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE= sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= diff --git a/backend/infra/outbox/engine.go b/backend/infra/outbox/engine.go index cf13c74..dd0f229 100644 --- a/backend/infra/outbox/engine.go +++ b/backend/infra/outbox/engine.go @@ -25,6 +25,8 @@ import ( // 3. 返回 error 表示可重试失败,框架回写 retry 后提交 offset。 type MessageHandler func(ctx context.Context, envelope kafkabus.Envelope) error +const minPublishedRescueAfter = 10 * time.Second + // PublishRequest 是通用事件发布入参。 // // 设计目标: @@ -56,6 +58,10 @@ type Engine struct { maxRetry int scanEvery time.Duration scanBatch int + // publishedRescueAfter 是 published 消息本地兜底消费窗口,避免 Kafka 已投递但 consumer 长时间未完成时永久卡住。 + publishedRescueAfter time.Duration + // publishedRescueEnabled 控制是否启用本地兜底消费;默认只给幂等账务类服务打开。 + publishedRescueEnabled bool handlersMu sync.RWMutex handlers map[string]MessageHandler @@ -91,15 +97,17 @@ func NewEngine(repo *Repository, cfg kafkabus.Config) (*Engine, error) { } return &Engine{ - repo: serviceRepo, - producer: producer, - consumer: consumer, - brokers: cfg.Brokers, - route: route, - maxRetry: cfg.MaxRetry, - scanEvery: cfg.RetryScanInterval, - scanBatch: cfg.RetryBatchSize, - handlers: make(map[string]MessageHandler), + repo: serviceRepo, + producer: producer, + consumer: consumer, + brokers: cfg.Brokers, + route: route, + maxRetry: cfg.MaxRetry, + scanEvery: cfg.RetryScanInterval, + scanBatch: cfg.RetryBatchSize, + publishedRescueAfter: normalizePublishedRescueAfter(cfg.RetryScanInterval), + publishedRescueEnabled: route.ServiceName == ServiceNameTokenStore, + handlers: make(map[string]MessageHandler), }, nil } @@ -265,6 +273,9 @@ func (e *Engine) startDispatchLoop(ctx context.Context) { log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err) } } + if err = e.rescueStalePublishedMessages(ctx); err != nil { + log.Printf("兜底消费已投递 outbox 消息失败(service=%s): %v", e.route.ServiceName, err) + } } } } @@ -281,7 +292,7 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { return nil } - eventPayload, payloadErr := parseOutboxEventPayload(outboxMsg.Payload) + envelope, payloadErr := e.envelopeFromOutboxMessage(outboxMsg) if payloadErr != nil { markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "解析 outbox 事件包失败: "+payloadErr.Error()) if markErr != nil { @@ -289,23 +300,6 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { } return payloadErr } - if eventPayload.EventID == "" { - eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10) - } - serviceName := strings.TrimSpace(outboxMsg.ServiceName) - if serviceName == "" { - serviceName = e.route.ServiceName - } - - envelope := kafkabus.Envelope{ - OutboxID: outboxMsg.ID, - EventID: eventPayload.EventID, - EventType: eventPayload.EventType, - EventVersion: eventPayload.EventVersion, - ServiceName: serviceName, - AggregateID: eventPayload.AggregateID, - Payload: eventPayload.PayloadJSON, - } raw, err := json.Marshal(envelope) if err != nil { markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "序列化 outbox 封装失败: "+err.Error()) @@ -326,6 +320,93 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error { return nil } +// rescueStalePublishedMessages 对 published 后长时间未 consumed 的消息做本地兜底消费。 +// +// 职责边界: +// 1. 只处理当前 service 表内的 stale published 消息,不扫描其它服务; +// 2. 不重新投递 Kafka,直接复用 handler 的幂等消费逻辑,避免同一坏分区长期卡死; +// 3. 单条失败只写日志并继续下一条,避免一条坏消息阻断整批兜底。 +func (e *Engine) rescueStalePublishedMessages(ctx context.Context) error { + if e == nil || !e.publishedRescueEnabled { + return nil + } + before := time.Now().Add(-e.publishedRescueAfter) + messages, err := e.repo.ListStalePublishedMessages(ctx, e.route.ServiceName, before, e.scanBatch) + if err != nil { + return err + } + if len(messages) > 0 { + log.Printf("outbox stale published messages=%d, service=%s start local consume", len(messages), e.route.ServiceName) + } + for _, msg := range messages { + if err := e.consumePublishedOne(ctx, msg.ID); err != nil { + log.Printf("兜底消费 outbox 消息失败(id=%d, service=%s): %v", msg.ID, e.route.ServiceName, err) + } + } + return nil +} + +// consumePublishedOne 兜底消费单条已投递但未完成的 outbox 消息。 +// +// 职责边界: +// 1. 只在当前状态仍为 published 时处理,避免覆盖正常 consumer 的最终态; +// 2. 解析失败标记 dead,业务失败交给 handleEnvelope 推进重试; +// 3. 不提交 Kafka offset,因为这里没有从 Kafka 读取消息。 +func (e *Engine) consumePublishedOne(ctx context.Context, outboxID int64) error { + outboxMsg, err := e.repo.GetByID(ctx, outboxID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err + } + if outboxMsg.Status != model.OutboxStatusPublished { + return nil + } + + envelope, payloadErr := e.envelopeFromOutboxMessage(outboxMsg) + if payloadErr != nil { + markErr := e.repo.MarkDead(ctx, outboxMsg.ID, "解析已投递 outbox 事件包失败: "+payloadErr.Error()) + if markErr != nil { + log.Printf("标记 outbox 死信失败(id=%d): %v", outboxMsg.ID, markErr) + } + return payloadErr + } + return e.handleEnvelope(ctx, envelope, false) +} + +// envelopeFromOutboxMessage 把 outbox 表记录还原成统一事件信封。 +// +// 职责边界: +// 1. 只做 payload 外壳解析和缺省字段补齐; +// 2. 不判断业务事件是否合法,具体校验仍交给 handler; +// 3. event_id 缺失时使用 outbox id 兜底,保持历史消息可消费。 +func (e *Engine) envelopeFromOutboxMessage(outboxMsg *model.AgentOutboxMessage) (kafkabus.Envelope, error) { + if outboxMsg == nil { + return kafkabus.Envelope{}, errors.New("outbox message is nil") + } + eventPayload, err := parseOutboxEventPayload(outboxMsg.Payload) + if err != nil { + return kafkabus.Envelope{}, err + } + if eventPayload.EventID == "" { + eventPayload.EventID = strconv.FormatInt(outboxMsg.ID, 10) + } + serviceName := strings.TrimSpace(outboxMsg.ServiceName) + if serviceName == "" { + serviceName = e.route.ServiceName + } + return kafkabus.Envelope{ + OutboxID: outboxMsg.ID, + EventID: eventPayload.EventID, + EventType: eventPayload.EventType, + EventVersion: eventPayload.EventVersion, + ServiceName: serviceName, + AggregateID: eventPayload.AggregateID, + Payload: eventPayload.PayloadJSON, + }, nil +} + func (e *Engine) startConsumeLoop(ctx context.Context) { for { select { @@ -361,12 +442,33 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er return errors.New("Kafka 封装缺少 outbox_id") } + if err := e.handleEnvelope(ctx, envelope, true); err != nil { + if commitErr := e.consumer.Commit(ctx, msg); commitErr != nil { + return commitErr + } + return err + } + return e.consumer.Commit(ctx, msg) +} + +// handleEnvelope 执行统一事件信封的本地 handler 路由和状态推进。 +// +// 职责边界: +// 1. 负责事件类型、服务归属和 handler 存在性校验; +// 2. handler 成功后由业务 handler 自己标记 consumed; +// 3. retryOnFailure=true 时才把失败消息退回 pending,避免本地兜底把已投递消息重复投到 Kafka。 +func (e *Engine) handleEnvelope(ctx context.Context, envelope kafkabus.Envelope, retryOnFailure bool) error { + status, err := e.currentMessageStatus(ctx, envelope.OutboxID) + if err != nil { + return err + } + if status != model.OutboxStatusPublished { + return nil + } + eventType := strings.TrimSpace(envelope.EventType) if eventType == "" { _ = e.repo.MarkDead(ctx, envelope.OutboxID, "消息缺少事件类型") - if err := e.consumer.Commit(ctx, msg); err != nil { - return err - } return nil } @@ -386,9 +488,6 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er eventType, envelope.OutboxID, ) - if err := e.consumer.Commit(ctx, msg); err != nil { - return err - } return nil } } @@ -400,23 +499,50 @@ func (e *Engine) handleMessage(ctx context.Context, msg segmentkafka.Message) er } else { _ = e.repo.MarkDead(ctx, envelope.OutboxID, "本服务未注册 handler: "+eventType) } - if err := e.consumer.Commit(ctx, msg); err != nil { - return err - } return nil } if err := handler(ctx, envelope); err != nil { - if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil { - return markErr - } - if commitErr := e.consumer.Commit(ctx, msg); commitErr != nil { - return commitErr + if retryOnFailure { + if markErr := e.repo.MarkFailedForRetry(ctx, envelope.OutboxID, "消费处理失败: "+err.Error()); markErr != nil { + return markErr + } } return err } - return e.consumer.Commit(ctx, msg) + return nil +} + +// currentMessageStatus 读取 outbox 当前状态,作为重复 Kafka 消息的第一道闸门。 +// +// 职责边界: +// 1. 只返回当前状态,不推进状态机; +// 2. 记录已消失时按最终态处理,避免历史 Kafka 消息造成消费循环报错; +// 3. handler 只允许在 published 状态执行,pending/consumed/dead 都直接跳过。 +func (e *Engine) currentMessageStatus(ctx context.Context, outboxID int64) (string, error) { + msg, err := e.repo.GetByID(ctx, outboxID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return model.OutboxStatusConsumed, nil + } + return "", err + } + return strings.TrimSpace(msg.Status), nil +} + +// normalizePublishedRescueAfter 根据扫描间隔计算 published 兜底窗口。 +// +// 职责边界: +// 1. 只做最小窗口保护,避免刚投递的消息被立即本地重复消费; +// 2. 不读取配置中心,保持 outbox engine 构造参数单一; +// 3. 返回值越小恢复越快,越大重复消费概率越低。 +func normalizePublishedRescueAfter(scanEvery time.Duration) time.Duration { + rescueAfter := scanEvery * 3 + if rescueAfter < minPublishedRescueAfter { + return minPublishedRescueAfter + } + return rescueAfter } func resolveEngineRoute(repo *Repository, cfg kafkabus.Config) ServiceRoute { diff --git a/backend/infra/outbox/migration.go b/backend/infra/outbox/migration.go new file mode 100644 index 0000000..c3d67e7 --- /dev/null +++ b/backend/infra/outbox/migration.go @@ -0,0 +1,28 @@ +package outbox + +import ( + "fmt" + + "github.com/LoveLosita/smartflow/backend/model" + "gorm.io/gorm" +) + +// AutoMigrateServiceTable 按服务目录迁移单个服务拥有的 outbox 表。 +// +// 职责边界: +// 1. 只负责创建或补齐服务级 outbox 物理表,不迁移任何业务表; +// 2. table 名统一从 service catalog 解析,避免独立服务和 core 进程各写一份默认值; +// 3. 失败时返回带 service/table 的错误,方便启动期直接定位配置漂移。 +func AutoMigrateServiceTable(db *gorm.DB, serviceName string) error { + if db == nil { + return fmt.Errorf("auto migrate outbox table failed for %s: db is nil", serviceName) + } + cfg, ok := ResolveServiceConfig(serviceName) + if !ok { + return fmt.Errorf("resolve outbox config failed for service %s", serviceName) + } + if err := db.Table(cfg.TableName).AutoMigrate(&model.AgentOutboxMessage{}); err != nil { + return fmt.Errorf("auto migrate outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err) + } + return nil +} diff --git a/backend/infra/outbox/repository.go b/backend/infra/outbox/repository.go index f935859..8b829aa 100644 --- a/backend/infra/outbox/repository.go +++ b/backend/infra/outbox/repository.go @@ -127,6 +127,35 @@ func (d *Repository) ListDueMessages(ctx context.Context, serviceName string, li return messages, nil } +// ListStalePublishedMessages 拉取已经投递到 Kafka 但长时间没有完成业务消费的消息。 +// +// 职责边界: +// 1. 只扫描 published 状态,不修改消息,避免和正常 Kafka consumer 抢状态机; +// 2. before 由调用方决定,仓储层不关心具体兜底窗口; +// 3. 返回结果交给上层幂等 handler 处理,重复消费风险由业务 event_id 兜底。 +func (d *Repository) ListStalePublishedMessages(ctx context.Context, serviceName string, before time.Time, limit int) ([]model.AgentOutboxMessage, error) { + if limit <= 0 { + limit = 100 + } + if before.IsZero() { + before = time.Now() + } + + var messages []model.AgentOutboxMessage + query := d.scopedDB(ctx). + Where("status = ? AND published_at IS NOT NULL AND published_at <= ?", model.OutboxStatusPublished, before). + Order("published_at ASC, id ASC"). + Limit(limit) + serviceName = strings.TrimSpace(serviceName) + if serviceName != "" { + query = query.Where("service_name = ?", serviceName) + } + if err := query.Find(&messages).Error; err != nil { + return nil, err + } + return messages, nil +} + // MarkPublished 标记消息已经成功投递到 Kafka。 func (d *Repository) MarkPublished(ctx context.Context, id int64) error { now := time.Now() diff --git a/backend/infra/outbox/repository_publisher.go b/backend/infra/outbox/repository_publisher.go new file mode 100644 index 0000000..9d365d1 --- /dev/null +++ b/backend/infra/outbox/repository_publisher.go @@ -0,0 +1,103 @@ +package outbox + +import ( + "context" + "encoding/json" + "errors" + "strings" + + "gorm.io/gorm" +) + +// RepositoryPublisher 只负责把事件写入服务级 outbox 表。 +// +// 职责边界: +// 1. 负责复用 Repository 的 eventType -> service -> table 路由能力写入 outbox; +// 2. 不启动 Kafka relay / consumer,也不注册任何 handler; +// 3. 适合独立 RPC 服务进程只发布事件、统一由 worker 进程消费的迁移期场景。 +type RepositoryPublisher struct { + repo *Repository + maxRetry int +} + +// NewRepositoryPublisher 基于 outbox 仓储创建轻量发布器。 +func NewRepositoryPublisher(repo *Repository, maxRetry int) *RepositoryPublisher { + return &RepositoryPublisher{ + repo: repo, + maxRetry: maxRetry, + } +} + +// Publish 写入统一事件外壳,保持与 Engine.Publish 相同的 outbox payload 格式。 +// +// 步骤说明: +// 1. 先校验事件类型和业务 payload,明显坏入参直接返回错误,避免写入不可消费消息; +// 2. 再把业务 payload 序列化成 RawMessage,并包进统一事件外壳,保证 worker 解析口径一致; +// 3. 最后交给 Repository 按事件路由落表;路由缺失时返回错误,由业务侧决定是否降级。 +func (p *RepositoryPublisher) Publish(ctx context.Context, req PublishRequest) error { + if p == nil || p.repo == nil { + return errors.New("outbox repository publisher is nil") + } + + eventType := strings.TrimSpace(req.EventType) + if eventType == "" { + return errors.New("eventType is empty") + } + if req.Payload == nil { + return errors.New("payload is nil") + } + + payloadJSON, err := json.Marshal(req.Payload) + if err != nil { + return err + } + + eventVersion := strings.TrimSpace(req.EventVersion) + if eventVersion == "" { + eventVersion = DefaultEventVersion + } + + eventID := strings.TrimSpace(req.EventID) + messageKey := strings.TrimSpace(req.MessageKey) + if messageKey == "" { + messageKey = eventID + } + if messageKey == "" { + messageKey = eventType + } + + aggregateID := strings.TrimSpace(req.AggregateID) + if aggregateID == "" { + aggregateID = messageKey + } + + _, err = p.repo.CreateMessage(ctx, eventType, messageKey, OutboxEventPayload{ + EventID: eventID, + EventType: eventType, + EventVersion: eventVersion, + AggregateID: aggregateID, + Payload: payloadJSON, + }, p.maxRetry) + return err +} + +// PublishWithTx 使用外部事务写入 outbox 消息。 +// +// 职责边界: +// 1. 只把底层 Repository 切到调用方传入的事务句柄,事件外壳和路由逻辑仍复用 Publish; +// 2. 不提交或回滚事务,事务生命周期由业务用例控制; +// 3. 适合“业务表更新 + outbox 入队”必须原子提交的场景。 +func (p *RepositoryPublisher) PublishWithTx(ctx context.Context, tx *gorm.DB, req PublishRequest) error { + if p == nil || p.repo == nil { + return errors.New("outbox repository publisher 未初始化") + } + if tx == nil { + return errors.New("gorm 事务句柄为空") + } + + txPublisher := &RepositoryPublisher{ + repo: p.repo.WithTx(tx), + maxRetry: p.maxRetry, + } + return txPublisher.Publish(ctx, req) +} diff --git a/backend/infra/outbox/service_catalog.go b/backend/infra/outbox/service_catalog.go index 9fc487a..bf715e9 100644 --- a/backend/infra/outbox/service_catalog.go +++ b/backend/infra/outbox/service_catalog.go @@ -15,6 +15,8 @@ const ( ServiceMemory = "memory" ServiceActiveScheduler = "active-scheduler" ServiceNotification = "notification" + ServiceTaskClassForum = "taskclass-forum" + ServiceTokenStore = "token-store" ) // ServiceConfig 描述一个服务级 outbox 的固定归属。 @@ -83,6 +85,18 @@ func LoadServiceConfigs() map[string]ServiceConfig { GroupID: "smartflow-notification-outbox-consumer", TableName: "notification_outbox_messages", }, + ServiceTaskClassForum: { + Name: ServiceTaskClassForum, + Topic: "smartflow.taskclass-forum.outbox", + GroupID: "smartflow-taskclass-forum-outbox-consumer", + TableName: "taskclass_forum_outbox_messages", + }, + ServiceTokenStore: { + Name: ServiceTokenStore, + Topic: "smartflow.token-store.outbox", + GroupID: "smartflow-token-store-outbox-consumer", + TableName: "token_store_outbox_messages", + }, } for name, entry := range entries { diff --git a/backend/infra/outbox/service_route.go b/backend/infra/outbox/service_route.go index 88e7ef7..66ed937 100644 --- a/backend/infra/outbox/service_route.go +++ b/backend/infra/outbox/service_route.go @@ -10,6 +10,8 @@ const ( ServiceNameMemory = "memory" ServiceNameActiveScheduler = "active-scheduler" ServiceNameNotification = "notification" + ServiceNameTaskClassForum = "taskclass-forum" + ServiceNameTokenStore = "token-store" ) // ServiceRoute 描述一个 outbox 服务的终态路由信息。 @@ -56,6 +58,18 @@ var builtinServiceRoutes = map[string]ServiceRoute{ Topic: "smartflow.notification.outbox", GroupID: "smartflow-notification-outbox-consumer", }, + ServiceNameTaskClassForum: { + ServiceName: ServiceNameTaskClassForum, + TableName: "taskclass_forum_outbox_messages", + Topic: "smartflow.taskclass-forum.outbox", + GroupID: "smartflow-taskclass-forum-outbox-consumer", + }, + ServiceNameTokenStore: { + ServiceName: ServiceNameTokenStore, + TableName: "token_store_outbox_messages", + Topic: "smartflow.token-store.outbox", + GroupID: "smartflow-token-store-outbox-consumer", + }, } // DefaultServiceRoutes 返回当前已知服务的默认路由清单。 @@ -71,6 +85,8 @@ func DefaultServiceRoutes() []ServiceRoute { builtinServiceRoutes[ServiceNameMemory], builtinServiceRoutes[ServiceNameActiveScheduler], builtinServiceRoutes[ServiceNameNotification], + builtinServiceRoutes[ServiceNameTaskClassForum], + builtinServiceRoutes[ServiceNameTokenStore], } } diff --git a/backend/inits/mysql.go b/backend/inits/mysql.go index b3cd5e2..a4e1447 100644 --- a/backend/inits/mysql.go +++ b/backend/inits/mysql.go @@ -65,12 +65,8 @@ func autoMigrateOutboxTables(db *gorm.DB) error { // 1. 这里必须按服务目录读取最终生效的 table 名,而不能只看默认内置映射。 // 2. 这样即使后续通过配置覆盖 outbox.services.*.table,启动建表也会和运行时写入保持一致。 for _, serviceName := range outboxinfra.ServiceNames() { - cfg, ok := outboxinfra.ResolveServiceConfig(serviceName) - if !ok { - return fmt.Errorf("resolve outbox config failed for service %s", serviceName) - } - if err := db.Table(cfg.TableName).AutoMigrate(&model.AgentOutboxMessage{}); err != nil { - return fmt.Errorf("auto migrate outbox table failed for %s (%s): %w", cfg.Name, cfg.TableName, err) + if err := outboxinfra.AutoMigrateServiceTable(db, serviceName); err != nil { + return err } } return nil diff --git a/backend/service/events/core_outbox_handlers.go b/backend/service/events/core_outbox_handlers.go index 90647dd..dabf6c7 100644 --- a/backend/service/events/core_outbox_handlers.go +++ b/backend/service/events/core_outbox_handlers.go @@ -49,9 +49,10 @@ func RegisterAllOutboxHandlers( memoryModule *memory.Module, activeTriggerWorkflow ActiveScheduleTriggeredProcessor, notificationService *notification.NotificationService, + forumRewardRecorder ForumRewardGrantRecorder, adjuster ports.TokenUsageAdjuster, ) error { - if err := validateAllOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, activeTriggerWorkflow, notificationService); err != nil { + if err := validateAllOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, activeTriggerWorkflow, notificationService, forumRewardRecorder); err != nil { return err } @@ -64,6 +65,7 @@ func RegisterAllOutboxHandlers( memoryModule, activeTriggerWorkflow, notificationService, + forumRewardRecorder, adjuster, )) } @@ -112,6 +114,7 @@ func validateAllOutboxHandlerDeps( memoryModule *memory.Module, activeTriggerWorkflow ActiveScheduleTriggeredProcessor, notificationService *notification.NotificationService, + forumRewardRecorder ForumRewardGrantRecorder, ) error { if err := validateCoreOutboxHandlerDeps(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule); err != nil { return err @@ -122,6 +125,9 @@ func validateAllOutboxHandlerDeps( if notificationService == nil { return errors.New("notification service is nil") } + if forumRewardRecorder == nil { + return errors.New("forum reward grant recorder is nil") + } return nil } @@ -191,10 +197,25 @@ func allOutboxHandlerRoutes( memoryModule *memory.Module, activeTriggerWorkflow ActiveScheduleTriggeredProcessor, notificationService *notification.NotificationService, + forumRewardRecorder ForumRewardGrantRecorder, adjuster ports.TokenUsageAdjuster, ) []outboxHandlerRoute { routes := coreOutboxHandlerRoutes(eventBus, outboxRepo, repoManager, agentRepo, cacheRepo, memoryModule, adjuster) routes = append(routes, + outboxHandlerRoute{ + EventType: sharedevents.ForumPostLikedEventType, + Service: outboxHandlerServiceTokenStore, + Register: func() error { + return RegisterForumPostLikedRewardHandler(eventBus, outboxRepo, forumRewardRecorder) + }, + }, + outboxHandlerRoute{ + EventType: sharedevents.ForumPostImportedEventType, + Service: outboxHandlerServiceTokenStore, + Register: func() error { + return RegisterForumPostImportedRewardHandler(eventBus, outboxRepo, forumRewardRecorder) + }, + }, outboxHandlerRoute{ EventType: sharedevents.ActiveScheduleTriggeredEventType, Service: outboxHandlerServiceActiveScheduler, diff --git a/backend/service/events/forum_reward.go b/backend/service/events/forum_reward.go new file mode 100644 index 0000000..421e093 --- /dev/null +++ b/backend/service/events/forum_reward.go @@ -0,0 +1,135 @@ +package events + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + + kafkabus "github.com/LoveLosita/smartflow/backend/infra/kafka" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" + tokencontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/tokenstore" + sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" +) + +// ForumRewardGrantRecorder 描述论坛奖励事件消费后写入 token-store 账本所需的最小能力。 +// +// 职责边界: +// 1. 只暴露论坛奖励入账能力,不暴露商品、订单和用户可见查询接口; +// 2. 由 token-store 自己解析奖励额度和幂等规则,handler 不计算 Token 数量; +// 3. 接口用于隔离 service/events 与 token-store 具体 RPC client 实现。 +type ForumRewardGrantRecorder interface { + RecordForumRewardGrant(ctx context.Context, req tokencontracts.RecordForumRewardGrantRequest) (*tokencontracts.TokenGrantView, error) +} + +// RegisterForumPostLikedRewardHandler 注册计划被点赞奖励消费者。 +func RegisterForumPostLikedRewardHandler( + bus OutboxBus, + outboxRepo *outboxinfra.Repository, + recorder ForumRewardGrantRecorder, +) error { + return registerForumRewardHandler(bus, outboxRepo, recorder, sharedevents.ForumPostLikedEventType, sharedevents.ForumRewardSourceLike) +} + +// RegisterForumPostImportedRewardHandler 注册计划被导入奖励消费者。 +func RegisterForumPostImportedRewardHandler( + bus OutboxBus, + outboxRepo *outboxinfra.Repository, + recorder ForumRewardGrantRecorder, +) error { + return registerForumRewardHandler(bus, outboxRepo, recorder, sharedevents.ForumPostImportedEventType, sharedevents.ForumRewardSourceImport) +} + +// registerForumRewardHandler 收敛论坛奖励事件的通用解析、校验和入账流程。 +// +// 步骤说明: +// 1. 先校验 outbox 与 token-store 依赖,避免启动期注册半截 handler; +// 2. 消费时先检查版本和 payload,明显不可修复的坏消息直接标记 dead; +// 3. 再调用 token-store 内部 RPC 幂等写 token_grants,RPC 临时失败返回 error 交给 outbox 重试; +// 4. 入账成功后标记 consumed,确保重复消费不会重复发放。 +func registerForumRewardHandler( + bus OutboxBus, + outboxRepo *outboxinfra.Repository, + recorder ForumRewardGrantRecorder, + eventType string, + source string, +) error { + if bus == nil { + return errors.New("event bus is nil") + } + if outboxRepo == nil { + return errors.New("outbox repository is nil") + } + if recorder == nil { + return errors.New("forum reward grant recorder is nil") + } + eventOutboxRepo, err := scopedOutboxRepoForEvent(outboxRepo, eventType) + if err != nil { + return err + } + + handler := func(ctx context.Context, envelope kafkabus.Envelope) error { + if !isAllowedForumRewardEventVersion(envelope.EventVersion) { + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, fmt.Sprintf("论坛奖励事件版本不受支持: %s", envelope.EventVersion)) + return nil + } + + var payload sharedevents.ForumPostRewardPayload + if unmarshalErr := json.Unmarshal(envelope.Payload, &payload); unmarshalErr != nil { + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "解析论坛奖励载荷失败: "+unmarshalErr.Error()) + return nil + } + if validateErr := payload.Validate(); validateErr != nil { + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "论坛奖励载荷非法: "+validateErr.Error()) + return nil + } + if payload.EventType() != eventType { + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, fmt.Sprintf("论坛奖励事件类型不匹配: envelope=%s payload=%s", eventType, payload.EventType())) + return nil + } + + eventID := strings.TrimSpace(envelope.EventID) + if eventID == "" { + eventID = strings.TrimSpace(payload.EventID) + } + if eventID == "" { + _ = eventOutboxRepo.MarkDead(ctx, envelope.OutboxID, "论坛奖励 event_id 为空") + return nil + } + + _, err := recorder.RecordForumRewardGrant(ctx, tokencontracts.RecordForumRewardGrantRequest{ + EventID: eventID, + ReceiverUserID: payload.RewardReceiverUserID, + Source: forumRewardSource(payload, source), + SourceRefID: forumRewardSourceRefID(payload, source), + }) + if err != nil { + return err + } + return eventOutboxRepo.MarkConsumed(ctx, envelope.OutboxID) + } + + return bus.RegisterEventHandler(eventType, handler) +} + +func isAllowedForumRewardEventVersion(version string) bool { + version = strings.TrimSpace(version) + return version == "" || version == sharedevents.ForumRewardEventVersion +} + +func forumRewardSource(payload sharedevents.ForumPostRewardPayload, fallback string) string { + source := strings.TrimSpace(payload.Source) + if source != "" { + return source + } + return fallback +} + +func forumRewardSourceRefID(payload sharedevents.ForumPostRewardPayload, source string) string { + if source == sharedevents.ForumRewardSourceImport && payload.ImportID > 0 { + return strconv.FormatUint(payload.ImportID, 10) + } + return strconv.FormatUint(payload.PostID, 10) +} diff --git a/backend/service/events/outbox_bus.go b/backend/service/events/outbox_bus.go index 2de4cfe..a17bbc6 100644 --- a/backend/service/events/outbox_bus.go +++ b/backend/service/events/outbox_bus.go @@ -171,6 +171,8 @@ func OutboxServiceNames() []string { string(outboxHandlerServiceMemory), string(outboxHandlerServiceActiveScheduler), string(outboxHandlerServiceNotification), + string(outboxHandlerServiceTaskClassForum), + string(outboxHandlerServiceTokenStore), } } diff --git a/backend/service/events/outbox_handler_routes.go b/backend/service/events/outbox_handler_routes.go index 1748ca6..c7a3955 100644 --- a/backend/service/events/outbox_handler_routes.go +++ b/backend/service/events/outbox_handler_routes.go @@ -18,6 +18,8 @@ const ( outboxHandlerServiceMemory outboxHandlerService = "memory" outboxHandlerServiceActiveScheduler outboxHandlerService = "active-scheduler" outboxHandlerServiceNotification outboxHandlerService = "notification" + outboxHandlerServiceTaskClassForum outboxHandlerService = "taskclass-forum" + outboxHandlerServiceTokenStore outboxHandlerService = "token-store" ) // outboxHandlerRoute 显式描述“事件类型 -> 服务归属 -> handler 注册动作”。 diff --git a/backend/services/taskclassforum/dao/connect.go b/backend/services/taskclassforum/dao/connect.go index 37d9c4d..982059c 100644 --- a/backend/services/taskclassforum/dao/connect.go +++ b/backend/services/taskclassforum/dao/connect.go @@ -3,6 +3,7 @@ package dao import ( "fmt" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" forummodel "github.com/LoveLosita/smartflow/backend/services/taskclassforum/model" "github.com/spf13/viper" "gorm.io/driver/mysql" @@ -12,7 +13,7 @@ import ( // OpenDBFromConfig 创建计划广场服务自己的数据库句柄,并迁移本服务私有表。 // // 职责边界: -// 1. 只迁移 forum_* 表,不迁移 task_classes / task_items,避免抢占 task-class 拆分线; +// 1. 只迁移 forum_* 表和本服务 outbox 表,不迁移 task_classes / task_items,避免抢占 task-class 拆分线; // 2. 不负责装配 legacy TaskClass adapter,adapter 在服务实现阶段单独注入; // 3. 返回 *gorm.DB 供本服务 DAO 复用,调用方负责进程生命周期。 func OpenDBFromConfig() (*gorm.DB, error) { @@ -41,8 +42,10 @@ func OpenDBFromConfig() (*gorm.DB, error) { // // 步骤说明: // 1. 先创建帖子、模板、条目、点赞、评论、导入记录表; -// 2. 唯一约束交给 GORM tag 生成,保证点赞和导入幂等有数据库兜底; -// 3. 失败时直接返回错误,避免服务在 schema 不完整时继续启动。 +// 2. 再按 service catalog 创建 taskclass-forum outbox 表,为后续论坛自身异步事件预留稳定目录; +// 3. 迁移期论坛奖励事件直接写 token-store outbox 表,发布端也兜底创建目标表,避免独立启动顺序导致奖励漏表; +// 4. 唯一约束交给 GORM tag 生成,保证点赞和导入幂等有数据库兜底; +// 5. 失败时直接返回错误,避免服务在 schema 不完整时继续启动。 func AutoMigrate(db *gorm.DB) error { if db == nil { return fmt.Errorf("taskclassforum auto migrate failed: db is nil") @@ -57,5 +60,11 @@ func AutoMigrate(db *gorm.DB) error { ); err != nil { return fmt.Errorf("auto migrate taskclassforum tables failed: %w", err) } + if err := outboxinfra.AutoMigrateServiceTable(db, outboxinfra.ServiceTaskClassForum); err != nil { + return err + } + if err := outboxinfra.AutoMigrateServiceTable(db, outboxinfra.ServiceTokenStore); err != nil { + return err + } return nil } diff --git a/backend/services/taskclassforum/dao/forum.go b/backend/services/taskclassforum/dao/forum.go index faeabed..57706a8 100644 --- a/backend/services/taskclassforum/dao/forum.go +++ b/backend/services/taskclassforum/dao/forum.go @@ -29,6 +29,19 @@ func (dao *ForumDAO) WithTx(tx *gorm.DB) *ForumDAO { return &ForumDAO{db: tx} } +// GormDB 返回当前 DAO 绑定的 GORM 句柄。 +// +// 职责边界: +// 1. 只提供给需要和 forum 业务事务同提交的基础设施使用,例如 outbox 入队; +// 2. 不鼓励业务层绕过 DAO 任意读写 forum_* 表; +// 3. 若当前 DAO 来自 WithTx,返回值就是同一个事务句柄。 +func (dao *ForumDAO) GormDB() *gorm.DB { + if dao == nil { + return nil + } + return dao.db +} + // Transaction 在一个数据库事务内执行计划广场写操作。 func (dao *ForumDAO) Transaction(ctx context.Context, fn func(txDAO *ForumDAO) error) error { return dao.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { diff --git a/backend/services/taskclassforum/sv/import.go b/backend/services/taskclassforum/sv/import.go index 6172a9e..82b15d9 100644 --- a/backend/services/taskclassforum/sv/import.go +++ b/backend/services/taskclassforum/sv/import.go @@ -10,6 +10,7 @@ import ( forumdao "github.com/LoveLosita/smartflow/backend/services/taskclassforum/dao" forummodel "github.com/LoveLosita/smartflow/backend/services/taskclassforum/model" forumcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/taskclassforum" + sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" ) // ImportPost 从论坛模板导入当前用户自己的 TaskClass 副本。 @@ -41,7 +42,7 @@ func (s *Service) ImportPost(ctx context.Context, req forumcontracts.ImportForum return nil, err } if existing != nil && existing.Status == forummodel.ForumImportStatusImported { - return importResultFromModel(*existing), nil + return s.importResultWithCurrentImportCount(ctx, *existing), nil } } existing, err := s.forumDAO.FindImport(ctx, req.PostID, req.ActorUserID) @@ -49,7 +50,7 @@ func (s *Service) ImportPost(ctx context.Context, req forumcontracts.ImportForum return nil, err } if existing != nil && existing.Status == forummodel.ForumImportStatusImported { - return importResultFromModel(*existing), nil + return s.importResultWithCurrentImportCount(ctx, *existing), nil } if existing != nil && existing.Status == forummodel.ForumImportStatusFailed && existing.NewTaskClassID != nil { return s.recoverCreatedImport(ctx, req, *existing) @@ -73,9 +74,7 @@ func (s *Service) ImportPost(ctx context.Context, req forumcontracts.ImportForum return nil, err } if pending.Status == forummodel.ForumImportStatusImported { - result := importResultFromModel(*pending) - result.ImportCount = post.ImportCount - return result, nil + return s.importResultWithCurrentImportCount(ctx, *pending), nil } created, err := s.taskClassPort.CreateTaskClassFromSnapshot(ctx, req.ActorUserID, snapshot, targetTitle) @@ -90,6 +89,7 @@ func (s *Service) ImportPost(ctx context.Context, req forumcontracts.ImportForum } var imported forummodel.ForumImport + var rewardPayload *sharedevents.ForumPostRewardPayload if err := s.forumDAO.Transaction(ctx, func(txDAO *forumdao.ForumDAO) error { if _, err := txDAO.LockPublishedPost(ctx, req.PostID); err != nil { return normalizeRecordNotFound(err, respond.UserTaskClassNotFound) @@ -105,7 +105,8 @@ func (s *Service) ImportPost(ctx context.Context, req forumcontracts.ImportForum imported = *again return nil } - if err := txDAO.FinalizeImport(ctx, pending.ID, created.TaskClassID, created.Title, time.Now()); err != nil { + finalizedAt := time.Now() + if err := txDAO.FinalizeImport(ctx, pending.ID, created.TaskClassID, created.Title, finalizedAt); err != nil { return err } imported = *again @@ -113,6 +114,18 @@ func (s *Service) ImportPost(ctx context.Context, req forumcontracts.ImportForum imported.TargetTitle = created.Title imported.Status = forummodel.ForumImportStatusImported if again.Status != forummodel.ForumImportStatusImported { + payload := sharedevents.NewForumPostImportedPayload(req.PostID, again.ID, again.AuthorUserID, req.ActorUserID, finalizedAt) + if again.EventID != "" { + payload.EventID = again.EventID + } + // 调用目的:导入成功和作者奖励事件必须同事务提交,避免只创建副本却永久漏发奖励。 + handled, publishErr := s.publishForumRewardEventInTx(ctx, txDAO.GormDB(), payload) + if publishErr != nil { + return publishErr + } + if !handled { + rewardPayload = &payload + } return txDAO.AddPostCounter(ctx, req.PostID, "import_count", 1) } return nil @@ -120,6 +133,9 @@ func (s *Service) ImportPost(ctx context.Context, req forumcontracts.ImportForum _ = s.forumDAO.MarkImportFailedAfterTaskClassCreated(ctx, pending.ID, created.TaskClassID, created.Title, err.Error(), time.Now()) return nil, err } + if rewardPayload != nil { + s.publishForumRewardEventBestEffort(*rewardPayload) + } result := importResultFromModel(imported) if postAfter, err := s.forumDAO.FindPublishedPost(ctx, req.PostID); err == nil { result.ImportCount = postAfter.ImportCount @@ -183,6 +199,7 @@ func (s *Service) recoverCreatedImport(ctx context.Context, req forumcontracts.I return nil, respond.RequestIsProcessing } imported := existing + var rewardPayload *sharedevents.ForumPostRewardPayload if err := s.forumDAO.Transaction(ctx, func(txDAO *forumdao.ForumDAO) error { if _, err := txDAO.LockPublishedPost(ctx, req.PostID); err != nil { return normalizeRecordNotFound(err, respond.UserTaskClassNotFound) @@ -201,15 +218,31 @@ func (s *Service) recoverCreatedImport(ctx context.Context, req forumcontracts.I if again.Status != forummodel.ForumImportStatusFailed || again.NewTaskClassID == nil { return respond.RequestIsProcessing } - if err := txDAO.FinalizeImport(ctx, again.ID, *again.NewTaskClassID, again.TargetTitle, time.Now()); err != nil { + finalizedAt := time.Now() + if err := txDAO.FinalizeImport(ctx, again.ID, *again.NewTaskClassID, again.TargetTitle, finalizedAt); err != nil { return err } imported = *again imported.Status = forummodel.ForumImportStatusImported + payload := sharedevents.NewForumPostImportedPayload(req.PostID, again.ID, again.AuthorUserID, req.ActorUserID, finalizedAt) + if again.EventID != "" { + payload.EventID = again.EventID + } + // 调用目的:恢复已创建副本的导入记录时,同步补齐奖励 outbox,保证恢复路径和首次成功路径一致。 + handled, publishErr := s.publishForumRewardEventInTx(ctx, txDAO.GormDB(), payload) + if publishErr != nil { + return publishErr + } + if !handled { + rewardPayload = &payload + } return txDAO.AddPostCounter(ctx, req.PostID, "import_count", 1) }); err != nil { return nil, err } + if rewardPayload != nil { + s.publishForumRewardEventBestEffort(*rewardPayload) + } result := importResultFromModel(imported) if postAfter, err := s.forumDAO.FindPublishedPost(ctx, req.PostID); err == nil { result.ImportCount = postAfter.ImportCount @@ -231,6 +264,20 @@ func importResultFromModel(item forummodel.ForumImport) *forumcontracts.ImportFo } } -func forumImportEventID(postID uint64, userID uint64) string { - return fmt.Sprintf("forum.post.imported:%d:%d", postID, userID) +// importResultWithCurrentImportCount 复用已有导入记录时补齐帖子当前导入计数。 +// +// 职责边界: +// 1. 只补齐响应展示用的 import_count,不改变 forum_imports 状态; +// 2. 查询帖子失败时保留基础导入回执,避免幂等重放因为展示字段失败而误报导入失败; +// 3. 新导入路径仍以事务内 AddPostCounter 为准,这里只处理已导入短路路径。 +func (s *Service) importResultWithCurrentImportCount(ctx context.Context, item forummodel.ForumImport) *forumcontracts.ImportForumPostResult { + result := importResultFromModel(item) + if post, err := s.forumDAO.FindPublishedPost(ctx, item.PostID); err == nil { + result.ImportCount = post.ImportCount + } + return result +} + +func forumImportEventID(postID uint64, userID uint64) string { + return sharedevents.ForumRewardEventID(sharedevents.ForumPostImportedEventType, postID, userID) } diff --git a/backend/services/taskclassforum/sv/like.go b/backend/services/taskclassforum/sv/like.go index 26b1212..fc2d81d 100644 --- a/backend/services/taskclassforum/sv/like.go +++ b/backend/services/taskclassforum/sv/like.go @@ -2,21 +2,21 @@ package sv import ( "context" - "fmt" "time" "github.com/LoveLosita/smartflow/backend/respond" forumdao "github.com/LoveLosita/smartflow/backend/services/taskclassforum/dao" forummodel "github.com/LoveLosita/smartflow/backend/services/taskclassforum/model" forumcontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/taskclassforum" + sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" ) // LikePost 点赞计划帖子。 // // 职责边界: -// 1. 负责保证同一用户同一帖子只有一个 active 点赞状态; -// 2. 负责维护帖子 like_count 计数字段; -// 3. 不直接发放 Token,只写稳定 event_id,后续奖励链路可基于该 ID 幂等消费。 +// 1. 保证同一用户同一帖子只有一个 active 点赞状态; +// 2. 维护帖子 like_count 计数字段; +// 3. 只在首次创建 like 记录时补发 outbox 事件,取消后重新激活旧记录不重复发奖励。 func (s *Service) LikePost(ctx context.Context, actorUserID uint64, postID uint64) (forumcontracts.ForumPostCounters, forumcontracts.ForumPostViewerState, error) { if err := s.Ready(); err != nil { return forumcontracts.ForumPostCounters{}, forumcontracts.ForumPostViewerState{}, err @@ -25,6 +25,7 @@ func (s *Service) LikePost(ctx context.Context, actorUserID uint64, postID uint6 return forumcontracts.ForumPostCounters{}, forumcontracts.ForumPostViewerState{}, respond.MissingParam } + var rewardPayload *sharedevents.ForumPostRewardPayload if err := s.forumDAO.Transaction(ctx, func(txDAO *forumdao.ForumDAO) error { post, err := txDAO.LockPublishedPost(ctx, postID) if err != nil { @@ -35,7 +36,19 @@ func (s *Service) LikePost(ctx context.Context, actorUserID uint64, postID uint6 return err } if like == nil { - return createActiveLike(ctx, txDAO, post, actorUserID) + payload, createErr := createActiveLike(ctx, txDAO, post, actorUserID) + if createErr != nil { + return createErr + } + // 调用目的:优先把首次点赞奖励事件写入当前事务,保证点赞记录和 outbox 入队原子提交。 + handled, publishErr := s.publishForumRewardEventInTx(ctx, txDAO.GormDB(), payload) + if publishErr != nil { + return publishErr + } + if !handled { + rewardPayload = &payload + } + return nil } if like.Status == forummodel.ForumLikeStatusActive { return nil @@ -47,6 +60,10 @@ func (s *Service) LikePost(ctx context.Context, actorUserID uint64, postID uint6 }); err != nil { return forumcontracts.ForumPostCounters{}, forumcontracts.ForumPostViewerState{}, err } + + if rewardPayload != nil { + s.publishForumRewardEventBestEffort(*rewardPayload) + } return s.postInteractionState(ctx, actorUserID, postID) } @@ -80,7 +97,7 @@ func (s *Service) UnlikePost(ctx context.Context, actorUserID uint64, postID uin return s.postInteractionState(ctx, actorUserID, postID) } -func createActiveLike(ctx context.Context, txDAO *forumdao.ForumDAO, post *forummodel.ForumPost, actorUserID uint64) error { +func createActiveLike(ctx context.Context, txDAO *forumdao.ForumDAO, post *forummodel.ForumPost, actorUserID uint64) (sharedevents.ForumPostRewardPayload, error) { like := &forummodel.ForumLike{ PostID: post.ID, UserID: actorUserID, @@ -89,9 +106,21 @@ func createActiveLike(ctx context.Context, txDAO *forumdao.ForumDAO, post *forum EventID: forumLikeEventID(post.ID, actorUserID), } if err := txDAO.CreateLike(ctx, like); err != nil { - return err + return sharedevents.ForumPostRewardPayload{}, err } - return txDAO.AddPostCounter(ctx, post.ID, "like_count", 1) + if err := txDAO.AddPostCounter(ctx, post.ID, "like_count", 1); err != nil { + return sharedevents.ForumPostRewardPayload{}, err + } + + likedAt := like.LikedAt + if likedAt.IsZero() { + likedAt = time.Now() + } + payload := sharedevents.NewForumPostLikedPayload(post.ID, post.AuthorUserID, actorUserID, likedAt) + if like.EventID != "" { + payload.EventID = like.EventID + } + return payload, nil } func (s *Service) postInteractionState(ctx context.Context, actorUserID uint64, postID uint64) (forumcontracts.ForumPostCounters, forumcontracts.ForumPostViewerState, error) { @@ -107,5 +136,5 @@ func (s *Service) postInteractionState(ctx context.Context, actorUserID uint64, } func forumLikeEventID(postID uint64, userID uint64) string { - return fmt.Sprintf("forum.post.liked:%d:%d", postID, userID) + return sharedevents.ForumRewardEventID(sharedevents.ForumPostLikedEventType, postID, userID) } diff --git a/backend/services/taskclassforum/sv/service.go b/backend/services/taskclassforum/sv/service.go index bceaffe..b1daa55 100644 --- a/backend/services/taskclassforum/sv/service.go +++ b/backend/services/taskclassforum/sv/service.go @@ -3,16 +3,27 @@ package sv import ( "context" "errors" + "log" + "strings" + "time" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" forumdao "github.com/LoveLosita/smartflow/backend/services/taskclassforum/dao" + sharedevents "github.com/LoveLosita/smartflow/backend/shared/events" "gorm.io/gorm" ) -// TaskClassSnapshotPort 是计划广场读取和写入 TaskClass 的端口。 +const forumRewardPublishTimeout = 800 * time.Millisecond + +type transactionalEventPublisher interface { + PublishWithTx(ctx context.Context, tx *gorm.DB, req outboxinfra.PublishRequest) error +} + +// TaskClassSnapshotPort 是计划广场读取和写入 TaskClass 快照的端口。 // // 职责边界: -// 1. P0 由 legacy adapter 适配旧 TaskClass DAO / Service; -// 2. 业务层只依赖快照语义,不关心底层是旧表、旧服务还是后续 RPC; +// 1. P0 先由 legacy adapter 适配旧 TaskClass DAO / Service; +// 2. 业务层只依赖快照语义,不关心底层来自旧表、旧服务还是后续 RPC; // 3. 不负责写 schedule,一键导入只创建当前用户自己的 TaskClass 副本。 type TaskClassSnapshotPort interface { GetOwnedTaskClassSnapshot(ctx context.Context, userID uint64, taskClassID uint64) (*TaskClassSnapshot, error) @@ -56,8 +67,9 @@ type CreatedTaskClass struct { // Options 是计划广场服务的依赖注入参数。 type Options struct { - DB *gorm.DB - TaskClassPort TaskClassSnapshotPort + DB *gorm.DB + TaskClassPort TaskClassSnapshotPort + EventPublisher outboxinfra.EventPublisher } // Service 承载计划广场服务内部业务编排。 @@ -65,24 +77,26 @@ type Options struct { // 职责边界: // 1. 负责帖子、模板快照、点赞、评论、导入记录的事务编排; // 2. 不负责 HTTP 参数绑定,也不直接返回 respond.Response; -// 3. 不拥有 TaskClass 原表,只通过 TaskClassSnapshotPort 读取和创建副本。 +// 3. 不持有 TaskClass 原表,只通过 TaskClassSnapshotPort 读取和创建副本。 type Service struct { - db *gorm.DB - forumDAO *forumdao.ForumDAO - taskClassPort TaskClassSnapshotPort + db *gorm.DB + forumDAO *forumdao.ForumDAO + taskClassPort TaskClassSnapshotPort + eventPublisher outboxinfra.EventPublisher } func New(opts Options) *Service { return &Service{ - db: opts.DB, - forumDAO: forumdao.NewForumDAO(opts.DB), - taskClassPort: opts.TaskClassPort, + db: opts.DB, + forumDAO: forumdao.NewForumDAO(opts.DB), + taskClassPort: opts.TaskClassPort, + eventPublisher: opts.EventPublisher, } } // Ready 用于第二步骨架阶段的依赖检查。 // -// 后续实现真实用例时,具体方法会做更细的参数校验;这里先帮助 cmd / 测试快速发现依赖未注入。 +// 后续实现真实用例时,具体方法会做更细的参数校验;这里只先帮助 cmd / 测试快速发现依赖未注入。 func (s *Service) Ready() error { if s == nil { return errors.New("taskclassforum service is nil") @@ -92,3 +106,93 @@ func (s *Service) Ready() error { } return nil } + +// publishForumRewardEventBestEffort 在主事务成功后补发论坛奖励 outbox 事件。 +// +// 职责边界: +// 1. 这里只处理“事务已经成功提交后的补发”,不再回头影响点赞/导入接口的成功结果; +// 2. 改用独立短超时 context,避免客户端断开直接打断补发,也避免 outbox 写入长时间拖慢接口尾部; +// 3. 发布失败时只记日志不返回 error,这是 P0 的明确取舍:先保住主链路,再靠日志和稳定 event_id 排障/补偿。 +func (s *Service) publishForumRewardEventBestEffort(payload sharedevents.ForumPostRewardPayload) { + if s == nil || s.eventPublisher == nil { + return + } + if err := payload.Validate(); err != nil { + log.Printf( + "forum reward outbox payload 非法,跳过发布: event_id=%s post_id=%d import_id=%d source=%s err=%v", + payload.EventID, + payload.PostID, + payload.ImportID, + payload.Source, + err, + ) + return + } + + eventType := strings.TrimSpace(payload.EventType()) + if eventType == "" { + log.Printf( + "forum reward outbox 事件类型为空,跳过发布: event_id=%s post_id=%d import_id=%d source=%s", + payload.EventID, + payload.PostID, + payload.ImportID, + payload.Source, + ) + return + } + + publishCtx, cancel := context.WithTimeout(context.Background(), forumRewardPublishTimeout) + defer cancel() + + if err := s.eventPublisher.Publish(publishCtx, outboxinfra.PublishRequest{ + EventType: eventType, + EventVersion: sharedevents.ForumRewardEventVersion, + MessageKey: payload.MessageKey(), + AggregateID: payload.AggregateID(), + EventID: payload.EventID, + Payload: payload, + }); err != nil { + log.Printf( + "forum reward outbox 发布失败,按 P0 约定忽略主链路错误: event_type=%s event_id=%s post_id=%d import_id=%d actor_user_id=%d err=%v", + eventType, + payload.EventID, + payload.PostID, + payload.ImportID, + payload.ActorUserID, + err, + ) + } +} + +// publishForumRewardEventInTx 尝试把论坛奖励事件写进当前业务事务。 +// +// 返回值说明: +// 1. handled=true 表示发布器支持事务写入,调用方不需要再做事务后 best-effort 补发; +// 2. handled=false 表示当前发布器不支持事务写入,调用方可退回旧的事务后补发路径; +// 3. error 非空表示 outbox 入队失败,业务事务应一起回滚,避免成功互动永久漏奖。 +func (s *Service) publishForumRewardEventInTx(ctx context.Context, tx *gorm.DB, payload sharedevents.ForumPostRewardPayload) (bool, error) { + if s == nil || s.eventPublisher == nil { + return false, nil + } + publisher, ok := s.eventPublisher.(transactionalEventPublisher) + if !ok { + return false, nil + } + if err := payload.Validate(); err != nil { + return true, err + } + + eventType := strings.TrimSpace(payload.EventType()) + if eventType == "" { + return true, errors.New("论坛奖励事件类型为空") + } + + return true, publisher.PublishWithTx(ctx, tx, outboxinfra.PublishRequest{ + EventType: eventType, + EventVersion: sharedevents.ForumRewardEventVersion, + MessageKey: payload.MessageKey(), + AggregateID: payload.AggregateID(), + EventID: payload.EventID, + Payload: payload, + }) +} diff --git a/backend/services/tokenstore/dao/connect.go b/backend/services/tokenstore/dao/connect.go index e5af2ba..fca2d8f 100644 --- a/backend/services/tokenstore/dao/connect.go +++ b/backend/services/tokenstore/dao/connect.go @@ -3,6 +3,7 @@ package dao import ( "fmt" + outboxinfra "github.com/LoveLosita/smartflow/backend/infra/outbox" tokenmodel "github.com/LoveLosita/smartflow/backend/services/tokenstore/model" "github.com/spf13/viper" "gorm.io/driver/mysql" @@ -13,7 +14,7 @@ import ( // OpenDBFromConfig 创建 token-store 服务自己的数据库句柄,并迁移本服务私有表。 // // 职责边界: -// 1. 只迁移 token_* 表,不迁移 users,避免和 user/auth 服务边界冲突; +// 1. 只迁移 token_* 表和 token-store outbox 表,不迁移 users,避免和 user/auth 服务边界冲突; // 2. 自动迁移后执行 P0 seed,确保前端商品页有可展示商品; // 3. 返回 *gorm.DB 供本服务 DAO 复用,调用方负责进程生命周期。 func OpenDBFromConfig() (*gorm.DB, error) { @@ -45,8 +46,9 @@ func OpenDBFromConfig() (*gorm.DB, error) { // // 步骤说明: // 1. 先创建商品、订单、获取账本和奖励规则表; -// 2. 通过唯一约束保证 order_no、event_id 和幂等键不会重复写入; -// 3. 失败时直接返回错误,避免服务在 schema 不完整时继续启动。 +// 2. 再按 service catalog 创建 token-store outbox 表,保证论坛奖励事件有稳定落表目录; +// 3. 通过唯一约束保证 order_no、event_id 和幂等键不会重复写入; +// 4. 失败时直接返回错误,避免服务在 schema 不完整时继续启动。 func AutoMigrate(db *gorm.DB) error { if db == nil { return fmt.Errorf("tokenstore auto migrate failed: db is nil") @@ -59,6 +61,9 @@ func AutoMigrate(db *gorm.DB) error { ); err != nil { return fmt.Errorf("auto migrate tokenstore tables failed: %w", err) } + if err := outboxinfra.AutoMigrateServiceTable(db, outboxinfra.ServiceTokenStore); err != nil { + return err + } return nil } @@ -172,7 +177,7 @@ func defaultTokenRewardRules() []tokenmodel.TokenRewardRule { { Source: tokenmodel.TokenGrantSourceForumImport, Name: "计划被导入奖励", - Amount: 2, + Amount: 5, Status: tokenmodel.TokenRewardRuleStatusActive, }, } diff --git a/backend/services/tokenstore/dao/tokenstore.go b/backend/services/tokenstore/dao/tokenstore.go index 0323645..e8436ae 100644 --- a/backend/services/tokenstore/dao/tokenstore.go +++ b/backend/services/tokenstore/dao/tokenstore.go @@ -64,6 +64,31 @@ func (dao *TokenStoreDAO) ListActiveProducts(ctx context.Context) ([]tokenmodel. return products, err } +// FindRewardRuleBySource 按来源读取社区奖励规则。 +// +// 职责边界: +// 1. 只读取 token_reward_rules,不计算最终发放金额,也不判断停用语义; +// 2. 未找到规则时返回 nil,由服务层决定配置或默认值兜底; +// 3. source 在 DAO 层做一次规范化,避免大小写和空格造成规则漏命中。 +func (dao *TokenStoreDAO) FindRewardRuleBySource(ctx context.Context, source string) (*tokenmodel.TokenRewardRule, error) { + source = strings.ToLower(strings.TrimSpace(source)) + if source == "" { + return nil, nil + } + + var rule tokenmodel.TokenRewardRule + err := dao.db.WithContext(ctx). + Where("source = ?", source). + First(&rule).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + if err != nil { + return nil, err + } + return &rule, nil +} + func (dao *TokenStoreDAO) FindActiveProductByID(ctx context.Context, productID uint64) (*tokenmodel.TokenProduct, error) { var product tokenmodel.TokenProduct err := dao.db.WithContext(ctx). diff --git a/backend/services/tokenstore/rpc/handler.go b/backend/services/tokenstore/rpc/handler.go index 92835d5..335d77b 100644 --- a/backend/services/tokenstore/rpc/handler.go +++ b/backend/services/tokenstore/rpc/handler.go @@ -171,6 +171,28 @@ func (h *Handler) ListGrants(ctx context.Context, req *pb.ListTokenGrantsRequest }, nil } +// RecordForumRewardGrant 负责把论坛 outbox 奖励事件转成 token-store 内部账本写入调用。 +func (h *Handler) RecordForumRewardGrant(ctx context.Context, req *pb.RecordForumRewardGrantRequest) (*pb.RecordForumRewardGrantResponse, error) { + svc, err := h.service() + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + if req == nil { + return nil, grpcErrorFromServiceError(respond.MissingParam) + } + + grant, err := svc.RecordForumRewardGrant(ctx, tokencontracts.RecordForumRewardGrantRequest{ + EventID: req.EventId, + ReceiverUserID: req.ReceiverUserId, + Source: req.Source, + SourceRefID: req.SourceRefId, + }) + if err != nil { + return nil, grpcErrorFromServiceError(err) + } + return &pb.RecordForumRewardGrantResponse{Grant: tokenGrantToPB(grant)}, nil +} + func tokenPageToPB(page tokencontracts.PageResult) *pb.PageResponse { return &pb.PageResponse{ Page: int32(page.Page), diff --git a/backend/services/tokenstore/rpc/pb/tokenstore.pb.go b/backend/services/tokenstore/rpc/pb/tokenstore.pb.go index 659032d..bda3321 100644 --- a/backend/services/tokenstore/rpc/pb/tokenstore.pb.go +++ b/backend/services/tokenstore/rpc/pb/tokenstore.pb.go @@ -210,3 +210,22 @@ type ListTokenGrantsResponse struct { func (m *ListTokenGrantsResponse) Reset() { *m = ListTokenGrantsResponse{} } func (m *ListTokenGrantsResponse) String() string { return proto.CompactTextString(m) } func (*ListTokenGrantsResponse) ProtoMessage() {} + +type RecordForumRewardGrantRequest struct { + EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + ReceiverUserId uint64 `protobuf:"varint,2,opt,name=receiver_user_id,json=receiverUserId,proto3" json:"receiver_user_id,omitempty"` + Source string `protobuf:"bytes,3,opt,name=source,proto3" json:"source,omitempty"` + SourceRefId string `protobuf:"bytes,4,opt,name=source_ref_id,json=sourceRefId,proto3" json:"source_ref_id,omitempty"` +} + +func (m *RecordForumRewardGrantRequest) Reset() { *m = RecordForumRewardGrantRequest{} } +func (m *RecordForumRewardGrantRequest) String() string { return proto.CompactTextString(m) } +func (*RecordForumRewardGrantRequest) ProtoMessage() {} + +type RecordForumRewardGrantResponse struct { + Grant *TokenGrantView `protobuf:"bytes,1,opt,name=grant,proto3" json:"grant,omitempty"` +} + +func (m *RecordForumRewardGrantResponse) Reset() { *m = RecordForumRewardGrantResponse{} } +func (m *RecordForumRewardGrantResponse) String() string { return proto.CompactTextString(m) } +func (*RecordForumRewardGrantResponse) ProtoMessage() {} diff --git a/backend/services/tokenstore/rpc/pb/tokenstore_grpc.pb.go b/backend/services/tokenstore/rpc/pb/tokenstore_grpc.pb.go index 1c5fd54..d9b02b0 100644 --- a/backend/services/tokenstore/rpc/pb/tokenstore_grpc.pb.go +++ b/backend/services/tokenstore/rpc/pb/tokenstore_grpc.pb.go @@ -9,13 +9,14 @@ import ( ) const ( - TokenStoreService_GetSummary_FullMethodName = "/smartflow.tokenstore.TokenStoreService/GetSummary" - TokenStoreService_ListProducts_FullMethodName = "/smartflow.tokenstore.TokenStoreService/ListProducts" - TokenStoreService_CreateOrder_FullMethodName = "/smartflow.tokenstore.TokenStoreService/CreateOrder" - TokenStoreService_ListOrders_FullMethodName = "/smartflow.tokenstore.TokenStoreService/ListOrders" - TokenStoreService_GetOrder_FullMethodName = "/smartflow.tokenstore.TokenStoreService/GetOrder" - TokenStoreService_MockPaidOrder_FullMethodName = "/smartflow.tokenstore.TokenStoreService/MockPaidOrder" - TokenStoreService_ListGrants_FullMethodName = "/smartflow.tokenstore.TokenStoreService/ListGrants" + TokenStoreService_GetSummary_FullMethodName = "/smartflow.tokenstore.TokenStoreService/GetSummary" + TokenStoreService_ListProducts_FullMethodName = "/smartflow.tokenstore.TokenStoreService/ListProducts" + TokenStoreService_CreateOrder_FullMethodName = "/smartflow.tokenstore.TokenStoreService/CreateOrder" + TokenStoreService_ListOrders_FullMethodName = "/smartflow.tokenstore.TokenStoreService/ListOrders" + TokenStoreService_GetOrder_FullMethodName = "/smartflow.tokenstore.TokenStoreService/GetOrder" + TokenStoreService_MockPaidOrder_FullMethodName = "/smartflow.tokenstore.TokenStoreService/MockPaidOrder" + TokenStoreService_ListGrants_FullMethodName = "/smartflow.tokenstore.TokenStoreService/ListGrants" + TokenStoreService_RecordForumRewardGrant_FullMethodName = "/smartflow.tokenstore.TokenStoreService/RecordForumRewardGrant" ) type TokenStoreServiceClient interface { @@ -26,6 +27,7 @@ type TokenStoreServiceClient interface { GetOrder(ctx context.Context, in *GetTokenOrderRequest, opts ...grpc.CallOption) (*GetTokenOrderResponse, error) MockPaidOrder(ctx context.Context, in *MockPaidOrderRequest, opts ...grpc.CallOption) (*MockPaidOrderResponse, error) ListGrants(ctx context.Context, in *ListTokenGrantsRequest, opts ...grpc.CallOption) (*ListTokenGrantsResponse, error) + RecordForumRewardGrant(ctx context.Context, in *RecordForumRewardGrantRequest, opts ...grpc.CallOption) (*RecordForumRewardGrantResponse, error) } type tokenStoreServiceClient struct { @@ -64,6 +66,10 @@ func (c *tokenStoreServiceClient) ListGrants(ctx context.Context, in *ListTokenG return invokeTokenStore[ListTokenGrantsResponse](ctx, c.cc, TokenStoreService_ListGrants_FullMethodName, in, opts...) } +func (c *tokenStoreServiceClient) RecordForumRewardGrant(ctx context.Context, in *RecordForumRewardGrantRequest, opts ...grpc.CallOption) (*RecordForumRewardGrantResponse, error) { + return invokeTokenStore[RecordForumRewardGrantResponse](ctx, c.cc, TokenStoreService_RecordForumRewardGrant_FullMethodName, in, opts...) +} + func invokeTokenStore[Resp any](ctx context.Context, cc grpc.ClientConnInterface, fullMethod string, in interface{}, opts ...grpc.CallOption) (*Resp, error) { out := new(Resp) err := cc.Invoke(ctx, fullMethod, in, out, opts...) @@ -81,6 +87,7 @@ type TokenStoreServiceServer interface { GetOrder(context.Context, *GetTokenOrderRequest) (*GetTokenOrderResponse, error) MockPaidOrder(context.Context, *MockPaidOrderRequest) (*MockPaidOrderResponse, error) ListGrants(context.Context, *ListTokenGrantsRequest) (*ListTokenGrantsResponse, error) + RecordForumRewardGrant(context.Context, *RecordForumRewardGrantRequest) (*RecordForumRewardGrantResponse, error) } type UnimplementedTokenStoreServiceServer struct{} @@ -113,6 +120,10 @@ func (UnimplementedTokenStoreServiceServer) ListGrants(context.Context, *ListTok return nil, status.Errorf(codes.Unimplemented, "method ListGrants not implemented") } +func (UnimplementedTokenStoreServiceServer) RecordForumRewardGrant(context.Context, *RecordForumRewardGrantRequest) (*RecordForumRewardGrantResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RecordForumRewardGrant not implemented") +} + func RegisterTokenStoreServiceServer(s grpc.ServiceRegistrar, srv TokenStoreServiceServer) { s.RegisterService(&TokenStoreService_ServiceDesc, srv) } @@ -165,6 +176,9 @@ var TokenStoreService_ServiceDesc = grpc.ServiceDesc{ tokenStoreUnaryHandler[ListTokenGrantsRequest]("ListGrants", TokenStoreService_ListGrants_FullMethodName, func(s TokenStoreServiceServer, ctx context.Context, req *ListTokenGrantsRequest) (interface{}, error) { return s.ListGrants(ctx, req) }), + tokenStoreUnaryHandler[RecordForumRewardGrantRequest]("RecordForumRewardGrant", TokenStoreService_RecordForumRewardGrant_FullMethodName, func(s TokenStoreServiceServer, ctx context.Context, req *RecordForumRewardGrantRequest) (interface{}, error) { + return s.RecordForumRewardGrant(ctx, req) + }), }, Streams: []grpc.StreamDesc{}, Metadata: "tokenstore.proto", diff --git a/backend/services/tokenstore/rpc/tokenstore.proto b/backend/services/tokenstore/rpc/tokenstore.proto index 30c3738..9669777 100644 --- a/backend/services/tokenstore/rpc/tokenstore.proto +++ b/backend/services/tokenstore/rpc/tokenstore.proto @@ -12,6 +12,7 @@ service TokenStoreService { rpc GetOrder(GetTokenOrderRequest) returns (GetTokenOrderResponse); rpc MockPaidOrder(MockPaidOrderRequest) returns (MockPaidOrderResponse); rpc ListGrants(ListTokenGrantsRequest) returns (ListTokenGrantsResponse); + rpc RecordForumRewardGrant(RecordForumRewardGrantRequest) returns (RecordForumRewardGrantResponse); } message PageResponse { @@ -142,3 +143,14 @@ message ListTokenGrantsResponse { repeated TokenGrantView items = 1; PageResponse page = 2; } + +message RecordForumRewardGrantRequest { + string event_id = 1; + uint64 receiver_user_id = 2; + string source = 3; + string source_ref_id = 4; +} + +message RecordForumRewardGrantResponse { + TokenGrantView grant = 1; +} diff --git a/backend/services/tokenstore/sv/reward.go b/backend/services/tokenstore/sv/reward.go new file mode 100644 index 0000000..51654d6 --- /dev/null +++ b/backend/services/tokenstore/sv/reward.go @@ -0,0 +1,234 @@ +package sv + +import ( + "context" + "errors" + "strconv" + "strings" + + tokenmodel "github.com/LoveLosita/smartflow/backend/services/tokenstore/model" + tokencontracts "github.com/LoveLosita/smartflow/backend/shared/contracts/tokenstore" + "github.com/spf13/viper" +) + +const ( + forumLikeRewardConfigKey = "tokenstore.reward.forumLikeAmount" + forumImportRewardConfigKey = "tokenstore.reward.forumImportAmount" + + defaultForumLikeRewardAmount int64 = 1 + defaultForumImportRewardAmount int64 = 5 +) + +type forumRewardGrantRequest struct { + EventID string + ReceiverUserID uint64 + Source string + SourceRefID uint64 +} + +type forumRewardDecision struct { + Amount int64 + Status string + Description string +} + +// RecordForumRewardGrant 负责把论坛点赞/导入奖励写入 token_grants。 +// +// 职责边界: +// 1. 只处理 forum_like / forum_import 两类奖励账本写入,不修改 users,也不调用 user/auth; +// 2. 以 event_id 作为最终幂等边界,重复请求校验一致后返回既有 grant; +// 3. 奖励金额优先读取 token_reward_rules,配置和代码默认值只作为兜底。 +func (s *Service) RecordForumRewardGrant(ctx context.Context, req tokencontracts.RecordForumRewardGrantRequest) (*tokencontracts.TokenGrantView, error) { + if err := s.Ready(); err != nil { + return nil, err + } + + normalized, err := normalizeForumRewardGrantRequest(req) + if err != nil { + return nil, err + } + + // 1. 先按 event_id 回查,命中时直接视为成功,避免 outbox 重试重复写账本。 + // 2. 命中后必须校验用户、来源和来源业务 ID,避免错误复用 event_id 时静默吞掉错账。 + // 3. 校验通过才返回既有 grant,兼容“首次已成功、调用方超时后重试”的常见场景。 + existing, err := s.tokenDAO.FindGrantByEventID(ctx, normalized.EventID) + if err != nil { + return nil, err + } + if existing != nil { + if err := validateExistingForumRewardGrant(*existing, normalized); err != nil { + return nil, err + } + view := grantViewFromModel(*existing) + return &view, nil + } + + sourceRefID := normalized.SourceRefID + decision, err := s.forumRewardDecision(ctx, normalized.Source) + if err != nil { + return nil, err + } + grant := tokenmodel.TokenGrant{ + EventID: normalized.EventID, + UserID: normalized.ReceiverUserID, + Source: normalized.Source, + SourceLabel: grantSourceLabel(normalized.Source, ""), + SourceRefID: &sourceRefID, + Amount: decision.Amount, + Status: decision.Status, + QuotaApplied: false, + Description: decision.Description, + } + + // 1. 账本写入只依赖 token_grants.event_id 唯一约束兜底并发幂等。 + // 2. 若并发下插入触发唯一键冲突,立刻回查 event_id,把已有 grant 当作成功结果返回。 + // 3. 只有“冲突后仍查不到旧记录”这种异常态才上抛内部错误,避免吞掉真实一致性问题。 + if err := s.tokenDAO.CreateGrant(ctx, &grant); err != nil { + if !isDuplicateKeyError(err) { + return nil, err + } + + existing, err := s.tokenDAO.FindGrantByEventID(ctx, normalized.EventID) + if err != nil { + return nil, err + } + if existing == nil { + return nil, errors.New("forum reward grant duplicated but not found by event_id") + } + if err := validateExistingForumRewardGrant(*existing, normalized); err != nil { + return nil, err + } + view := grantViewFromModel(*existing) + return &view, nil + } + + view := grantViewFromModel(grant) + return &view, nil +} + +func normalizeForumRewardGrantRequest(req tokencontracts.RecordForumRewardGrantRequest) (forumRewardGrantRequest, error) { + normalized := forumRewardGrantRequest{ + EventID: strings.TrimSpace(req.EventID), + ReceiverUserID: req.ReceiverUserID, + Source: strings.ToLower(strings.TrimSpace(req.Source)), + } + + switch { + case normalized.EventID == "": + return forumRewardGrantRequest{}, tokenStoreBadRequest("event_id 不能为空") + case normalized.ReceiverUserID == 0: + return forumRewardGrantRequest{}, tokenStoreBadRequest("receiver_user_id 不能为空") + } + + sourceRefID, err := parseForumRewardSourceRefID(req.SourceRefID) + if err != nil { + return forumRewardGrantRequest{}, err + } + normalized.SourceRefID = sourceRefID + + switch normalized.Source { + case tokenmodel.TokenGrantSourceForumLike, tokenmodel.TokenGrantSourceForumImport: + return normalized, nil + default: + return forumRewardGrantRequest{}, tokenStoreBadRequest("source 仅支持 forum_like 或 forum_import") + } +} + +func parseForumRewardSourceRefID(raw string) (uint64, error) { + trimmed := strings.TrimSpace(raw) + if trimmed == "" { + return 0, tokenStoreBadRequest("source_ref_id 不能为空") + } + + parsed, err := strconv.ParseUint(trimmed, 10, 64) + if err != nil || parsed == 0 { + return 0, tokenStoreBadRequest("source_ref_id 必须是正整数") + } + return parsed, nil +} + +// validateExistingForumRewardGrant 校验重复 event_id 是否真的是同一条论坛奖励。 +// +// 职责边界: +// 1. 只比较幂等所需的最小字段:接收人、来源和来源业务 ID; +// 2. 不比较金额和状态,避免规则调整后重放旧事件被误判; +// 3. 不一致时返回业务校验错误,让上游暴露这类错账风险。 +func validateExistingForumRewardGrant(existing tokenmodel.TokenGrant, req forumRewardGrantRequest) error { + sourceRefID := uint64(0) + if existing.SourceRefID != nil { + sourceRefID = *existing.SourceRefID + } + if existing.UserID != req.ReceiverUserID || existing.Source != req.Source || sourceRefID != req.SourceRefID { + return tokenStoreBadRequest("event_id 幂等冲突:已有奖励记录与本次论坛奖励请求不一致") + } + return nil +} + +// forumRewardDecision 解析论坛奖励发放决策。 +// +// 职责边界: +// 1. 优先读取 token_reward_rules,保持“从表里读”的 P0 口径; +// 2. 规则停用或金额非正时写 skipped 账本,消费 outbox 但不增加 Token; +// 3. 表规则缺失时再读取配置和代码默认值,兼容旧环境尚未 seed 的情况。 +func (s *Service) forumRewardDecision(ctx context.Context, source string) (forumRewardDecision, error) { + rule, err := s.tokenDAO.FindRewardRuleBySource(ctx, source) + if err != nil { + return forumRewardDecision{}, err + } + if rule != nil { + if strings.TrimSpace(rule.Status) != tokenmodel.TokenRewardRuleStatusActive { + return skippedForumRewardDecision(source, "奖励规则已停用,未发放 Token"), nil + } + if rule.Amount <= 0 { + return skippedForumRewardDecision(source, "奖励规则金额非正,未发放 Token"), nil + } + return recordedForumRewardDecision(source, rule.Amount), nil + } + + switch strings.TrimSpace(source) { + case tokenmodel.TokenGrantSourceForumLike: + return recordedForumRewardDecision(source, positiveConfigAmountOrDefault(forumLikeRewardConfigKey, defaultForumLikeRewardAmount)), nil + case tokenmodel.TokenGrantSourceForumImport: + return recordedForumRewardDecision(source, positiveConfigAmountOrDefault(forumImportRewardConfigKey, defaultForumImportRewardAmount)), nil + default: + return skippedForumRewardDecision(source, "未知论坛奖励来源,未发放 Token"), nil + } +} + +func recordedForumRewardDecision(source string, amount int64) forumRewardDecision { + if amount <= 0 { + return skippedForumRewardDecision(source, "奖励金额非正,未发放 Token") + } + return forumRewardDecision{ + Amount: amount, + Status: tokenmodel.TokenGrantStatusRecorded, + Description: forumRewardDescription(source), + } +} + +func skippedForumRewardDecision(source string, description string) forumRewardDecision { + return forumRewardDecision{ + Amount: 0, + Status: tokenmodel.TokenGrantStatusSkipped, + Description: strings.TrimSpace(description), + } +} + +func positiveConfigAmountOrDefault(configKey string, fallback int64) int64 { + amount := viper.GetInt64(configKey) + if amount <= 0 { + return fallback + } + return amount +} + +func forumRewardDescription(source string) string { + switch strings.TrimSpace(source) { + case tokenmodel.TokenGrantSourceForumLike: + return "计划被点赞奖励" + case tokenmodel.TokenGrantSourceForumImport: + return "计划被导入奖励" + default: + return "论坛奖励入账" + } +} diff --git a/backend/shared/contracts/tokenstore/types.go b/backend/shared/contracts/tokenstore/types.go index 72e90e8..576ce81 100644 --- a/backend/shared/contracts/tokenstore/types.go +++ b/backend/shared/contracts/tokenstore/types.go @@ -100,6 +100,19 @@ type ListTokenGrantsRequest struct { Source string `json:"source"` } +// RecordForumRewardGrantRequest 是论坛奖励入账的内部 RPC 契约。 +// +// 职责边界: +// 1. 只描述一条待记录到 token_grants 的论坛奖励事实; +// 2. 不携带最终奖励金额,金额由 token-store 按 source 和配置解析; +// 3. source_ref_id 使用字符串承接 post_id / import_id,服务层再按当前库表结构落成整数。 +type RecordForumRewardGrantRequest struct { + EventID string `json:"event_id"` + ReceiverUserID uint64 `json:"receiver_user_id"` + Source string `json:"source"` + SourceRefID string `json:"source_ref_id"` +} + // TokenGrantRecord 是 token-store 内部发放出口使用的获取事实。 type TokenGrantRecord struct { EventID string `json:"event_id"` diff --git a/backend/shared/events/forum.go b/backend/shared/events/forum.go new file mode 100644 index 0000000..3dc673f --- /dev/null +++ b/backend/shared/events/forum.go @@ -0,0 +1,128 @@ +package events + +import ( + "errors" + "fmt" + "strings" + "time" +) + +const ( + ForumPostLikedEventType = "forum.post.liked" + ForumPostImportedEventType = "forum.post.imported" + ForumRewardEventVersion = "v1" + + ForumRewardSourceLike = "forum_like" + ForumRewardSourceImport = "forum_import" +) + +// ForumPostRewardPayload 是计划广场作者奖励事件的统一载荷。 +// +// 职责边界: +// 1. 只描述“哪个帖子因什么互动触发了作者奖励”,不直接携带最终 Token 数额; +// 2. source 负责表达奖励来源,真正的奖励规则仍由 token-store 自己解析; +// 3. event_id 必须稳定,供 outbox 重试和下游记账幂等共同使用。 +type ForumPostRewardPayload struct { + EventID string `json:"event_id"` + PostID uint64 `json:"post_id"` + ImportID uint64 `json:"import_id"` + AuthorUserID uint64 `json:"author_user_id"` + ActorUserID uint64 `json:"actor_user_id"` + RewardReceiverUserID uint64 `json:"reward_receiver_user_id"` + Source string `json:"source"` + OccurredAt time.Time `json:"occurred_at"` +} + +func NewForumPostLikedPayload(postID uint64, authorUserID uint64, actorUserID uint64, occurredAt time.Time) ForumPostRewardPayload { + return newForumPostRewardPayload( + ForumPostLikedEventType, + ForumRewardSourceLike, + postID, + 0, + authorUserID, + actorUserID, + occurredAt, + ) +} + +func NewForumPostImportedPayload(postID uint64, importID uint64, authorUserID uint64, actorUserID uint64, occurredAt time.Time) ForumPostRewardPayload { + return newForumPostRewardPayload( + ForumPostImportedEventType, + ForumRewardSourceImport, + postID, + importID, + authorUserID, + actorUserID, + occurredAt, + ) +} + +func newForumPostRewardPayload( + eventType string, + source string, + postID uint64, + importID uint64, + authorUserID uint64, + actorUserID uint64, + occurredAt time.Time, +) ForumPostRewardPayload { + if occurredAt.IsZero() { + occurredAt = time.Now() + } + return ForumPostRewardPayload{ + EventID: ForumRewardEventID(eventType, postID, actorUserID), + PostID: postID, + ImportID: importID, + AuthorUserID: authorUserID, + ActorUserID: actorUserID, + RewardReceiverUserID: authorUserID, + Source: strings.TrimSpace(source), + OccurredAt: occurredAt, + } +} + +func ForumRewardEventID(eventType string, postID uint64, actorUserID uint64) string { + return fmt.Sprintf("%s:%d:%d", strings.TrimSpace(eventType), postID, actorUserID) +} + +// EventType 根据 source 反推出当前奖励事件类型。 +func (p ForumPostRewardPayload) EventType() string { + switch strings.TrimSpace(p.Source) { + case ForumRewardSourceLike: + return ForumPostLikedEventType + case ForumRewardSourceImport: + return ForumPostImportedEventType + default: + return "" + } +} + +func (p ForumPostRewardPayload) MessageKey() string { + return strings.TrimSpace(p.EventID) +} + +func (p ForumPostRewardPayload) AggregateID() string { + return fmt.Sprintf("post:%d", p.PostID) +} + +func (p ForumPostRewardPayload) Validate() error { + if strings.TrimSpace(p.EventID) == "" { + return errors.New("forum reward event_id 不能为空") + } + if strings.TrimSpace(p.EventType()) == "" { + return errors.New("forum reward source 非法") + } + if p.PostID == 0 { + return errors.New("forum reward post_id 不能为空") + } + if p.AuthorUserID == 0 || p.ActorUserID == 0 || p.RewardReceiverUserID == 0 { + return errors.New("forum reward user_id 不能为空") + } + if strings.TrimSpace(p.Source) == ForumRewardSourceImport && p.ImportID == 0 { + return errors.New("forum import reward import_id 不能为空") + } + if p.OccurredAt.IsZero() { + return errors.New("forum reward occurred_at 不能为空") + } + return nil +}