Version: 0.9.68.dev.260504
后端: 1. 阶段 3 notification 服务边界落地,新增 `cmd/notification`、`services/notification`、`gateway/notification`、`shared/contracts/notification` 和 notification port,按 userauth 同款最小手搓 zrpc 样板收口 2. notification outbox consumer、relay 和 retry loop 迁入独立服务入口,处理 `notification.feishu.requested`,gateway 改为通过 zrpc client 调用 notification 3. 清退旧单体 notification DAO/model/service/provider/runner 和 `service/events/notification_feishu.go`,旧实现不再作为活跃编译路径 4. 修复 outbox 路由归属、dispatch 启动扫描、Kafka topic 探测/投递超时、sending 租约恢复、毒消息 MarkDead 错误回传和 RPC timeout 边界 5. 同步调整 active-scheduler 触发通知事件、核心 outbox handler、MySQL 迁移边界和 notification 配置 文档: 1. 更新微服务迁移计划,将阶段 3 notification 标记为已完成,并明确下一阶段从 active-scheduler 开始
This commit is contained in:
@@ -58,6 +58,11 @@ func probeTopic(ctx context.Context, brokers []string, topic string) error {
|
||||
continue
|
||||
}
|
||||
|
||||
// 1. segmentio/kafka-go 的 ReadPartitions 不直接接收 context。
|
||||
// 2. 这里必须给底层连接设置 I/O deadline,避免 broker 已接受连接但 metadata 响应卡住时,
|
||||
// 上层 WaitTopicReady 永远阻塞,导致 outbox dispatch / consume 循环无法启动。
|
||||
// 3. deadline 命中后本轮探测失败,外层 ticker 会继续重试直到总 timeout 到期。
|
||||
_ = conn.SetDeadline(time.Now().Add(2 * time.Second))
|
||||
partitions, readErr := conn.ReadPartitions(topic)
|
||||
_ = conn.Close()
|
||||
if readErr != nil {
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const defaultDispatchTimeout = 10 * time.Second
|
||||
|
||||
// MessageHandler 是事件消费处理器。
|
||||
//
|
||||
// 语义约束:
|
||||
@@ -153,13 +155,16 @@ func (e *Engine) Start(ctx context.Context) {
|
||||
e.scanEvery,
|
||||
e.scanBatch,
|
||||
)
|
||||
// 1. dispatch 先启动,保证已到期的 outbox 不会被 topic 探测阻塞在 pending。
|
||||
// 2. consume 仍等待 topic 探测,降低启动期消费者空转与 metadata 抖动。
|
||||
// 3. 若探测失败,继续启动消费者;真实错误交给消费循环记录并由运维日志暴露。
|
||||
e.StartDispatch(ctx)
|
||||
|
||||
if err := kafkabus.WaitTopicReady(ctx, e.brokers, e.route.Topic, 30*time.Second); err != nil {
|
||||
log.Printf("Kafka topic not ready before consume loop start: %v", err)
|
||||
} else {
|
||||
log.Printf("Kafka topic is ready: %s", e.route.Topic)
|
||||
}
|
||||
|
||||
e.StartDispatch(ctx)
|
||||
e.StartConsume(ctx)
|
||||
}
|
||||
|
||||
@@ -246,25 +251,35 @@ func (e *Engine) startDispatchLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(e.scanEvery)
|
||||
defer ticker.Stop()
|
||||
|
||||
log.Printf("outbox dispatch loop started: service=%s scan=%s batch=%d", e.route.ServiceName, e.scanEvery, e.scanBatch)
|
||||
e.scanAndDispatchDueMessages(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
pendingMessages, err := e.repo.ListDueMessages(ctx, e.route.ServiceName, e.scanBatch)
|
||||
if err != nil {
|
||||
log.Printf("扫描 outbox 失败: %v", err)
|
||||
continue
|
||||
}
|
||||
if len(pendingMessages) > 0 {
|
||||
log.Printf("outbox due messages=%d, service=%s start dispatch", len(pendingMessages), e.route.ServiceName)
|
||||
}
|
||||
e.scanAndDispatchDueMessages(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, msg := range pendingMessages {
|
||||
if err = e.dispatchOne(ctx, msg.ID); err != nil {
|
||||
log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err)
|
||||
}
|
||||
}
|
||||
func (e *Engine) scanAndDispatchDueMessages(ctx context.Context) {
|
||||
// 1. 每轮只拉取当前服务到期消息,避免独立微服务误扫其它服务的 outbox 表。
|
||||
// 2. 单条投递失败只记录并进入 retry,不阻断本轮剩余消息。
|
||||
// 3. 启动时也会执行一次本函数,避免重启后必须等待下一次 ticker 才能推进历史 pending。
|
||||
pendingMessages, err := e.repo.ListDueMessages(ctx, e.route.ServiceName, e.scanBatch)
|
||||
if err != nil {
|
||||
log.Printf("扫描 outbox 失败: %v", err)
|
||||
return
|
||||
}
|
||||
if len(pendingMessages) > 0 {
|
||||
log.Printf("outbox due messages=%d, service=%s start dispatch", len(pendingMessages), e.route.ServiceName)
|
||||
}
|
||||
|
||||
for _, msg := range pendingMessages {
|
||||
if err = e.dispatchOne(ctx, msg.ID); err != nil {
|
||||
log.Printf("重试投递 outbox 消息失败(id=%d): %v", msg.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -315,7 +330,11 @@ func (e *Engine) dispatchOne(ctx context.Context, outboxID int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = e.producer.Enqueue(ctx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil {
|
||||
// 1. Kafka 写入使用单条超时,避免 broker/metadata 卡住时消息长期停留在 pending。
|
||||
// 2. 超时失败后仍走统一 retry 状态机,由下一轮扫描继续补偿。
|
||||
dispatchCtx, cancel := context.WithTimeout(ctx, defaultDispatchTimeout)
|
||||
defer cancel()
|
||||
if err = e.producer.Enqueue(dispatchCtx, outboxMsg.Topic, outboxMsg.MessageKey, raw); err != nil {
|
||||
_ = e.repo.MarkFailedForRetry(ctx, outboxMsg.ID, "投递 Kafka 失败: "+err.Error())
|
||||
return err
|
||||
}
|
||||
@@ -426,18 +445,18 @@ func resolveEngineRoute(repo *Repository, cfg kafkabus.Config) ServiceRoute {
|
||||
GroupID: strings.TrimSpace(cfg.GroupID),
|
||||
}
|
||||
if repo != nil {
|
||||
repoRoute := normalizeServiceRoute(repo.route)
|
||||
repoRoute := repo.route
|
||||
if route.ServiceName == "" {
|
||||
route.ServiceName = repoRoute.ServiceName
|
||||
route.ServiceName = strings.TrimSpace(repoRoute.ServiceName)
|
||||
}
|
||||
if route.TableName == "" {
|
||||
route.TableName = repoRoute.TableName
|
||||
if route.TableName == "" && strings.TrimSpace(repoRoute.TableName) != "" {
|
||||
route.TableName = strings.TrimSpace(repoRoute.TableName)
|
||||
}
|
||||
if route.Topic == "" {
|
||||
route.Topic = repoRoute.Topic
|
||||
if route.Topic == "" && strings.TrimSpace(repoRoute.Topic) != "" {
|
||||
route.Topic = strings.TrimSpace(repoRoute.Topic)
|
||||
}
|
||||
if route.GroupID == "" {
|
||||
route.GroupID = repoRoute.GroupID
|
||||
if route.GroupID == "" && strings.TrimSpace(repoRoute.GroupID) != "" {
|
||||
route.GroupID = strings.TrimSpace(repoRoute.GroupID)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user