Files
Losita abe3b4960e Version: 0.9.68.dev.260504
后端:
1. 阶段 3 notification 服务边界落地,新增 `cmd/notification`、`services/notification`、`gateway/notification`、`shared/contracts/notification` 和 notification port,按 userauth 同款最小手搓 zrpc 样板收口
2. notification outbox consumer、relay 和 retry loop 迁入独立服务入口,处理 `notification.feishu.requested`,gateway 改为通过 zrpc client 调用 notification
3. 清退旧单体 notification DAO/model/service/provider/runner 和 `service/events/notification_feishu.go`,旧实现不再作为活跃编译路径
4. 修复 outbox 路由归属、dispatch 启动扫描、Kafka topic 探测/投递超时、sending 租约恢复、毒消息 MarkDead 错误回传和 RPC timeout 边界
5. 同步调整 active-scheduler 触发通知事件、核心 outbox handler、MySQL 迁移边界和 notification 配置

文档:
1. 更新微服务迁移计划,将阶段 3 notification 标记为已完成,并明确下一阶段从 active-scheduler 开始
2026-05-04 18:40:39 +08:00

171 lines
5.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package dao
import (
"context"
"errors"
"time"
notificationmodel "github.com/LoveLosita/smartflow/backend/services/notification/model"
"gorm.io/gorm"
)
// RecordDAO 管理 notification_records 投递状态机持久化。
//
// 职责边界:
// 1. 只负责通知记录的创建、去重查询、状态更新和重试扫描;
// 2. 不负责 provider 发送、幂等锁或 outbox consumed 标记;
// 3. 不读写 active_schedule_* 表,避免 notification 服务反向持有主动调度内部状态。
type RecordDAO struct {
db *gorm.DB
}
func NewRecordDAO(db *gorm.DB) *RecordDAO {
return &RecordDAO{db: db}
}
func (d *RecordDAO) WithTx(tx *gorm.DB) *RecordDAO {
return &RecordDAO{db: tx}
}
func (d *RecordDAO) ensureDB() error {
if d == nil || d.db == nil {
return errors.New("notification record dao 未初始化")
}
return nil
}
func (d *RecordDAO) CreateNotificationRecord(ctx context.Context, record *notificationmodel.NotificationRecord) error {
if err := d.ensureDB(); err != nil {
return err
}
if record == nil {
return errors.New("notification record 不能为空")
}
return d.db.WithContext(ctx).Create(record).Error
}
func (d *RecordDAO) UpdateNotificationRecordFields(ctx context.Context, notificationID int64, updates map[string]any) error {
if err := d.ensureDB(); err != nil {
return err
}
if notificationID <= 0 {
return errors.New("notification record id 不能为空")
}
if len(updates) == 0 {
return nil
}
return d.db.WithContext(ctx).
Model(&notificationmodel.NotificationRecord{}).
Where("id = ?", notificationID).
Updates(updates).Error
}
func (d *RecordDAO) GetNotificationRecordByID(ctx context.Context, notificationID int64) (*notificationmodel.NotificationRecord, error) {
if err := d.ensureDB(); err != nil {
return nil, err
}
if notificationID <= 0 {
return nil, gorm.ErrRecordNotFound
}
var record notificationmodel.NotificationRecord
err := d.db.WithContext(ctx).Where("id = ?", notificationID).First(&record).Error
if err != nil {
return nil, err
}
return &record, nil
}
// FindNotificationRecordByDedupeKey 查询通知去重记录。
//
// 说明:
// 1. notification 第一版按 channel + dedupe_key 聚合去重;
// 2. 若返回 pending/sending/sent上层应避免重复投递
// 3. 若返回 failed上层可以复用同一条记录进入 provider retry。
func (d *RecordDAO) FindNotificationRecordByDedupeKey(ctx context.Context, channel string, dedupeKey string) (*notificationmodel.NotificationRecord, error) {
if err := d.ensureDB(); err != nil {
return nil, err
}
if channel == "" || dedupeKey == "" {
return nil, gorm.ErrRecordNotFound
}
var record notificationmodel.NotificationRecord
err := d.db.WithContext(ctx).
Where("channel = ? AND dedupe_key = ?", channel, dedupeKey).
Order("created_at DESC, id DESC").
First(&record).Error
if err != nil {
return nil, err
}
return &record, nil
}
// ListRetryableNotificationRecords 查询到达重试时间的通知记录。
//
// 1. failed 记录按 next_retry_at 进入重试队列;
// 2. sending 记录只有超过租约才会回收,避免仍在执行的 provider 调用被重复放大;
// 3. 这让 retry scanner 同时覆盖显式失败重试和“发送中崩溃恢复”。
func (d *RecordDAO) ListRetryableNotificationRecords(ctx context.Context, now time.Time, sendingStaleBefore time.Time, limit int) ([]notificationmodel.NotificationRecord, error) {
if err := d.ensureDB(); err != nil {
return nil, err
}
if limit <= 0 || now.IsZero() {
return []notificationmodel.NotificationRecord{}, nil
}
if sendingStaleBefore.IsZero() {
sendingStaleBefore = now.Add(-10 * time.Minute)
}
var records []notificationmodel.NotificationRecord
err := d.db.WithContext(ctx).
Where(
"(status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?) OR (status = ? AND updated_at <= ?)",
notificationmodel.RecordStatusFailed,
now,
notificationmodel.RecordStatusSending,
sendingStaleBefore,
).
Order("next_retry_at ASC, id ASC").
Limit(limit).
Find(&records).Error
if err != nil {
return nil, err
}
return records, nil
}
// ClaimRetryableNotificationRecord 抢占一条到期失败通知,避免多实例重复调用 provider。
//
// 职责边界:
// 1. 只做跨进程 claim不发送通知、不推进最终投递状态
// 2. failed 到期记录和 stale sending 记录都可以被回收为 sending
// 3. 返回 claimed=false 表示记录已被其它实例抢走或状态已变化,调用方应跳过本次重试。
func (d *RecordDAO) ClaimRetryableNotificationRecord(ctx context.Context, notificationID int64, now time.Time, sendingStaleBefore time.Time) (bool, error) {
if err := d.ensureDB(); err != nil {
return false, err
}
if notificationID <= 0 || now.IsZero() {
return false, nil
}
if sendingStaleBefore.IsZero() {
sendingStaleBefore = now.Add(-10 * time.Minute)
}
result := d.db.WithContext(ctx).
Model(&notificationmodel.NotificationRecord{}).
Where(
"id = ? AND ((status = ? AND next_retry_at IS NOT NULL AND next_retry_at <= ?) OR (status = ? AND updated_at <= ?))",
notificationID,
notificationmodel.RecordStatusFailed,
now,
notificationmodel.RecordStatusSending,
sendingStaleBefore,
).
Updates(map[string]any{
"status": notificationmodel.RecordStatusSending,
"next_retry_at": nil,
"updated_at": now,
})
if result.Error != nil {
return false, result.Error
}
return result.RowsAffected == 1, nil
}