Version: 0.9.75.dev.260505
后端: 1.收口阶段 6 agent 结构迁移,将 newAgent 内核与 agentsvc 编排层迁入 services/agent - 切换 Agent 启动装配与 HTTP handler 直连 agent sv,移除旧 service agent bridge - 补齐 Agent 对 memory、task、task-class、schedule 的 RPC 适配与契约字段 - 扩展 schedule、task、task-class RPC/contract 支撑 Agent 查询、写入与 provider 切流 - 更新迁移文档、README 与相关注释,明确 agent 当前切流点和剩余 memory 迁移面
This commit is contained in:
870
backend/services/agent/stream/emitter.go
Normal file
870
backend/services/agent/stream/emitter.go
Normal file
@@ -0,0 +1,870 @@
|
||||
package agentstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
llmservice "github.com/LoveLosita/smartflow/backend/services/llm"
|
||||
)
|
||||
|
||||
// PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。
|
||||
//
|
||||
// 说明:
|
||||
// 1. 这里刻意不用 chan/string 绑死实现;
|
||||
// 2. 上层既可以传"写 channel"的函数,也可以传"写 gin stream"的函数;
|
||||
// 3. 只要签名是 `func(string) error`,都能接进来。
|
||||
type PayloadEmitter func(payload string) error
|
||||
|
||||
// StageEmitter 是 graph/node 对"当前阶段"进行推送的兼容接口。
|
||||
//
|
||||
// 设计说明:
|
||||
// 1. 旧调用侧仍然只关心 stage/detail 两段文本,因此这里先保留;
|
||||
// 2. 新的结构化事件能力会通过 ChunkEmitter 补齐,而不是继续扩展这个函数签名;
|
||||
// 3. 这样能兼顾当前兼容性和后续协议升级空间。
|
||||
type StageEmitter func(stage, detail string)
|
||||
|
||||
// PseudoStreamOptions 描述"整段文字伪流式输出"的切块与节奏配置。
|
||||
//
|
||||
// 字段语义:
|
||||
// 1. MinChunkRunes:达到该最小长度后,若命中标点/换行等边界,可提前切块;
|
||||
// 2. MaxChunkRunes:单块最大 rune 数,超过后强制切块,避免一次性发太长;
|
||||
// 3. ChunkInterval:块与块之间的等待时间;为 0 时表示只做切块,不做人为延迟。
|
||||
type PseudoStreamOptions struct {
|
||||
MinChunkRunes int
|
||||
MaxChunkRunes int
|
||||
ChunkInterval time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
defaultPseudoStreamMinChunkRunes = 8
|
||||
defaultPseudoStreamMaxChunkRunes = 24
|
||||
)
|
||||
|
||||
// DefaultPseudoStreamOptions 返回一份适合中文短句展示的默认伪流式配置。
|
||||
func DefaultPseudoStreamOptions() PseudoStreamOptions {
|
||||
return PseudoStreamOptions{
|
||||
MinChunkRunes: defaultPseudoStreamMinChunkRunes,
|
||||
MaxChunkRunes: defaultPseudoStreamMaxChunkRunes,
|
||||
}
|
||||
}
|
||||
|
||||
// ChunkEmitter 是 agent 统一的 SSE chunk 发射器。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 负责把"正文 / 思考 / 工具事件 / 确认请求 / 中断提示"统一转换成 OpenAI 兼容 payload;
|
||||
// 2. 负责在必要时把结构化事件附带成 extra,同时给当前前端提供可读的降级文本;
|
||||
// 3. 不负责决定什么时候发什么,也不负责持久化状态。
|
||||
type ChunkEmitter struct {
|
||||
emit PayloadEmitter
|
||||
RequestID string
|
||||
ModelName string
|
||||
Created int64
|
||||
|
||||
// thinkingGateMu 是“正文门卫”的轻量保护。
|
||||
// 1. 它只保护 thinking_summary 是否还能发,不串行化全部 SSE;
|
||||
// 2. 正文一旦开始,对应 block 的门会被关闭,后续同 block 摘要直接丢弃;
|
||||
// 3. 这样既避免摘要 goroutine 在正文之后补发旧思考,又不误杀后续节点的新一轮思考。
|
||||
thinkingGateMu sync.Mutex
|
||||
thinkingClosedBlocks map[string]bool
|
||||
// reasoningSummaryFunc 用于把原始 reasoning 压成用户可见摘要。
|
||||
// 1. 该函数由 service 层注入,stream 包只负责调度,不负责选择模型;
|
||||
// 2. 未注入时模型 reasoning 只会被静默丢弃,不再回退成 raw reasoning_content;
|
||||
// 3. 正文一旦开始,ReasoningDigestor 和 ChunkEmitter 会同时关门,迟到结果不会再发给前端。
|
||||
reasoningSummaryFunc ReasoningSummaryFunc
|
||||
// extraEventHook 用于把关键结构化事件同步给上层做持久化。
|
||||
// 1. hook 失败不能影响 SSE 主链路;
|
||||
// 2. hook 只接收 extra 结构,避免 emitter 反向依赖业务层;
|
||||
// 3. 不注入时保持空实现,兼容旧调用路径。
|
||||
extraEventHook func(extra *OpenAIChunkExtra)
|
||||
}
|
||||
|
||||
// NoopPayloadEmitter 返回一个空实现,便于骨架期安全占位。
|
||||
func NoopPayloadEmitter() PayloadEmitter {
|
||||
return func(string) error { return nil }
|
||||
}
|
||||
|
||||
// NoopStageEmitter 返回一个空实现,避免 graph 在没有接前端时处处判空。
|
||||
func NoopStageEmitter() StageEmitter {
|
||||
return func(stage, detail string) {}
|
||||
}
|
||||
|
||||
// WrapStageEmitter 把可空函数包装成稳定的 StageEmitter。
|
||||
func WrapStageEmitter(fn func(stage, detail string)) StageEmitter {
|
||||
if fn == nil {
|
||||
return NoopStageEmitter()
|
||||
}
|
||||
return fn
|
||||
}
|
||||
|
||||
// NewChunkEmitter 创建统一 chunk 发射器。
|
||||
//
|
||||
// 兜底策略:
|
||||
// 1. emit 为空时回退到 Noop,避免骨架期到处判空;
|
||||
// 2. modelName 为空时回填 worker,保持 OpenAI 兼容字段稳定;
|
||||
// 3. created <= 0 时用当前时间兜底,避免上层还没决定时间戳就无法复用。
|
||||
func NewChunkEmitter(emit PayloadEmitter, requestID, modelName string, created int64) *ChunkEmitter {
|
||||
if emit == nil {
|
||||
emit = NoopPayloadEmitter()
|
||||
}
|
||||
|
||||
modelName = strings.TrimSpace(modelName)
|
||||
if modelName == "" {
|
||||
modelName = "worker"
|
||||
}
|
||||
if created <= 0 {
|
||||
created = time.Now().Unix()
|
||||
}
|
||||
|
||||
return &ChunkEmitter{
|
||||
emit: emit,
|
||||
RequestID: strings.TrimSpace(requestID),
|
||||
ModelName: modelName,
|
||||
Created: created,
|
||||
}
|
||||
}
|
||||
|
||||
// SetExtraEventHook 设置结构化事件回调。
|
||||
func (e *ChunkEmitter) SetExtraEventHook(hook func(extra *OpenAIChunkExtra)) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
e.extraEventHook = hook
|
||||
}
|
||||
|
||||
// SetReasoningSummaryFunc 设置 reasoning 摘要模型调用函数。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 这里只保存函数引用,不立即调用模型;
|
||||
// 2. 摘要触发频率、单飞、正文闸门由 ReasoningDigestor 负责;
|
||||
// 3. 传 nil 表示关闭摘要能力,后续 reasoning chunk 会被静默丢弃。
|
||||
func (e *ChunkEmitter) SetReasoningSummaryFunc(fn ReasoningSummaryFunc) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
e.reasoningSummaryFunc = fn
|
||||
}
|
||||
|
||||
// NewReasoningDigestor 为当前 block 创建一个 reasoning 摘要器。
|
||||
//
|
||||
// 步骤说明:
|
||||
// 1. 若未注入摘要函数,返回 nil,调用方只需跳过 raw reasoning 推送;
|
||||
// 2. 摘要结果先经过 ChunkEmitter 的正文门卫,再走统一 extra/hook 链路;
|
||||
// 3. Digestor 自身仍负责单飞、水位线和正文开始后的 in-flight 结果丢弃。
|
||||
func (e *ChunkEmitter) NewReasoningDigestor(ctx context.Context, blockID, stage string) (*ReasoningDigestor, error) {
|
||||
if e == nil || e.reasoningSummaryFunc == nil {
|
||||
return nil, nil
|
||||
}
|
||||
e.openThinkingSummaryGate(blockID, stage)
|
||||
return NewReasoningDigestor(ReasoningDigestorOptions{
|
||||
SummaryFunc: e.reasoningSummaryFunc,
|
||||
SummarySink: func(summary StreamThinkingSummaryExtra) {
|
||||
_ = e.EmitThinkingSummary(blockID, stage, summary)
|
||||
},
|
||||
BaseContext: ctx,
|
||||
SummaryTimeout: 8 * time.Second,
|
||||
})
|
||||
}
|
||||
|
||||
// EmitReasoningText 输出一段 reasoning 文字,并附带 reasoning_text extra。
|
||||
func (e *ChunkEmitter) EmitReasoningText(blockID, stage, text string, includeRole bool) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
text = strings.TrimSpace(text)
|
||||
if text == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
payload, err := ToOpenAIReasoningChunkWithExtra(
|
||||
e.RequestID,
|
||||
e.ModelName,
|
||||
e.Created,
|
||||
text,
|
||||
includeRole,
|
||||
NewReasoningTextExtra(blockID, stage),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
return e.emit(payload)
|
||||
}
|
||||
|
||||
// EmitAssistantText 输出一段 assistant 正文,并附带 assistant_text extra。
|
||||
func (e *ChunkEmitter) EmitAssistantText(blockID, stage, text string, includeRole bool) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
//这里如果不删掉,换行符会被吞了,导致文字黏连
|
||||
/* text = strings.TrimSpace(text)*/
|
||||
if text == "" {
|
||||
return nil
|
||||
}
|
||||
e.closeThinkingSummaryGate(blockID, stage)
|
||||
|
||||
payload, err := ToOpenAIAssistantChunkWithExtra(
|
||||
e.RequestID,
|
||||
e.ModelName,
|
||||
e.Created,
|
||||
text,
|
||||
includeRole,
|
||||
NewAssistantTextExtra(blockID, stage),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
return e.emit(payload)
|
||||
}
|
||||
|
||||
// EmitThinkingSummary 输出一次“流式思考摘要”事件。
|
||||
//
|
||||
// 协议约束:
|
||||
// 1. 该事件只走 extra.thinking_summary,不回写 delta.content / delta.reasoning_content;
|
||||
// 2. 仍复用现有 extra hook,让上层在不依赖 emitter 细节的前提下同步持久化;
|
||||
// 3. includeRole 不再需要,因为 thinking_summary 本身就是纯结构化事件。
|
||||
func (e *ChunkEmitter) EmitThinkingSummary(blockID, stage string, summary StreamThinkingSummaryExtra) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
if e.isThinkingSummaryGateClosed(blockID, stage) {
|
||||
return nil
|
||||
}
|
||||
return e.emitExtraOnly(NewThinkingSummaryExtra(blockID, stage, summary))
|
||||
}
|
||||
|
||||
func (e *ChunkEmitter) openThinkingSummaryGate(blockID, stage string) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
e.thinkingGateMu.Lock()
|
||||
if e.thinkingClosedBlocks != nil {
|
||||
delete(e.thinkingClosedBlocks, thinkingSummaryGateKey(blockID, stage))
|
||||
}
|
||||
e.thinkingGateMu.Unlock()
|
||||
}
|
||||
|
||||
func (e *ChunkEmitter) closeThinkingSummaryGate(blockID, stage string) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
e.thinkingGateMu.Lock()
|
||||
if e.thinkingClosedBlocks == nil {
|
||||
e.thinkingClosedBlocks = make(map[string]bool)
|
||||
}
|
||||
e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)] = true
|
||||
e.thinkingGateMu.Unlock()
|
||||
}
|
||||
|
||||
func (e *ChunkEmitter) isThinkingSummaryGateClosed(blockID, stage string) bool {
|
||||
if e == nil {
|
||||
return true
|
||||
}
|
||||
e.thinkingGateMu.Lock()
|
||||
defer e.thinkingGateMu.Unlock()
|
||||
return e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)]
|
||||
}
|
||||
|
||||
func thinkingSummaryGateKey(blockID, stage string) string {
|
||||
blockID = strings.TrimSpace(blockID)
|
||||
stage = strings.TrimSpace(stage)
|
||||
if blockID != "" {
|
||||
return blockID
|
||||
}
|
||||
if stage != "" {
|
||||
return stage
|
||||
}
|
||||
return "__default__"
|
||||
}
|
||||
|
||||
// EmitPseudoReasoningText 把整段 reasoning 文本按伪流式方式逐块推出。
|
||||
func (e *ChunkEmitter) EmitPseudoReasoningText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error {
|
||||
return e.emitPseudoText(
|
||||
ctx,
|
||||
text,
|
||||
options,
|
||||
func(chunk string, includeRole bool) error {
|
||||
return e.EmitReasoningText(blockID, stage, chunk, includeRole)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// EmitPseudoAssistantText 把整段 assistant 文本按伪流式方式逐块推出。
|
||||
func (e *ChunkEmitter) EmitPseudoAssistantText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error {
|
||||
return e.emitPseudoText(
|
||||
ctx,
|
||||
text,
|
||||
options,
|
||||
func(chunk string, includeRole bool) error {
|
||||
return e.EmitAssistantText(blockID, stage, chunk, includeRole)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// EmitStatus 输出一条阶段状态事件。
|
||||
//
|
||||
// 协议约束:
|
||||
// 1. 状态事件只通过 extra 传递,不再写入 reasoning_content;
|
||||
// 2. includeRole 保留是为了兼容旧签名,当前结构化事件路径不依赖 role。
|
||||
func (e *ChunkEmitter) EmitStatus(blockID, stage, code, summary string, includeRole bool) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
_ = includeRole
|
||||
return e.emitExtraOnly(NewStatusExtra(blockID, stage, code, summary))
|
||||
}
|
||||
|
||||
// EmitToolCallStart 输出一次工具调用开始事件。
|
||||
//
|
||||
// 协议约束:
|
||||
// 1. 工具调用开始事件只走 extra.tool,不回写 reasoning_content;
|
||||
// 2. includeRole 保留是为了兼容旧签名,当前结构化事件路径不依赖 role。
|
||||
func (e *ChunkEmitter) EmitToolCallStart(blockID, stage, toolName, summary, argumentsPreview string, includeRole bool) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
_ = includeRole
|
||||
return e.emitExtraOnly(NewToolCallExtra(blockID, stage, toolName, "start", summary, argumentsPreview))
|
||||
}
|
||||
|
||||
// EmitToolCallResult 输出一次工具调用结果事件。
|
||||
//
|
||||
// 协议约束:
|
||||
// 1. status 由调用方明确传入(如 done/blocked/failed);
|
||||
// 2. 结果事件只走 extra.tool,不回写 reasoning_content。
|
||||
func (e *ChunkEmitter) EmitToolCallResult(
|
||||
blockID string,
|
||||
stage string,
|
||||
toolName string,
|
||||
status string,
|
||||
summary string,
|
||||
argumentsPreview string,
|
||||
argumentView map[string]any,
|
||||
resultView map[string]any,
|
||||
includeRole bool,
|
||||
) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
_ = includeRole
|
||||
return e.emitExtraOnly(NewToolResultExtra(
|
||||
blockID,
|
||||
stage,
|
||||
toolName,
|
||||
status,
|
||||
summary,
|
||||
argumentsPreview,
|
||||
argumentView,
|
||||
resultView,
|
||||
))
|
||||
}
|
||||
|
||||
// emitExtraOnly 仅输出结构化 extra 事件,不附带 content/reasoning。
|
||||
func (e *ChunkEmitter) emitExtraOnly(extra *OpenAIChunkExtra) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
e.emitExtraEventHook(extra)
|
||||
payload, err := ToOpenAIStreamWithExtra(
|
||||
nil,
|
||||
e.RequestID,
|
||||
e.ModelName,
|
||||
e.Created,
|
||||
false,
|
||||
extra,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
return e.emit(payload)
|
||||
}
|
||||
|
||||
func (e *ChunkEmitter) emitExtraEventHook(extra *OpenAIChunkExtra) {
|
||||
if e == nil || e.extraEventHook == nil || extra == nil {
|
||||
return
|
||||
}
|
||||
e.extraEventHook(extra)
|
||||
}
|
||||
|
||||
// EmitConfirmRequest 输出一次待确认事件。
|
||||
//
|
||||
// 当前展示策略:
|
||||
// 1. 对旧前端,confirm 文案通过 assistant content 直接可见;
|
||||
// 2. 对新前端,extra.confirm 可直接驱动确认卡片或按钮;
|
||||
// 3. 默认使用伪流式,避免确认文案整块砸下来太生硬。
|
||||
func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, interactionID, title, summary string, options PseudoStreamOptions) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
text := buildConfirmAssistantText(title, summary)
|
||||
extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary)
|
||||
e.emitExtraEventHook(extra)
|
||||
if strings.TrimSpace(text) != "" {
|
||||
e.closeThinkingSummaryGate(blockID, stage)
|
||||
}
|
||||
return e.emitPseudoText(
|
||||
ctx,
|
||||
text,
|
||||
options,
|
||||
func(chunk string, includeRole bool) error {
|
||||
payload, err := ToOpenAIAssistantChunkWithExtra(
|
||||
e.RequestID,
|
||||
e.ModelName,
|
||||
e.Created,
|
||||
chunk,
|
||||
includeRole,
|
||||
extra,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
return e.emit(payload)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// EmitInterruptMessage 输出一次中断提示。
|
||||
//
|
||||
// 适用场景:
|
||||
// 1. ask_user 追问;
|
||||
// 2. 告知用户当前会话已进入等待状态;
|
||||
// 3. 后续 connection_lost 恢复若需要对用户补一句解释,也可复用这一入口。
|
||||
func (e *ChunkEmitter) EmitInterruptMessage(ctx context.Context, blockID, stage, interactionID, interactionType, summary string, options PseudoStreamOptions) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
text := buildInterruptAssistantText(interactionType, summary)
|
||||
extra := NewInterruptExtra(blockID, stage, interactionID, interactionType, summary)
|
||||
if strings.TrimSpace(text) != "" {
|
||||
e.closeThinkingSummaryGate(blockID, stage)
|
||||
}
|
||||
return e.emitPseudoText(
|
||||
ctx,
|
||||
text,
|
||||
options,
|
||||
func(chunk string, includeRole bool) error {
|
||||
payload, err := ToOpenAIAssistantChunkWithExtra(
|
||||
e.RequestID,
|
||||
e.ModelName,
|
||||
e.Created,
|
||||
chunk,
|
||||
includeRole,
|
||||
extra,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
return e.emit(payload)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// EmitScheduleCompleted 输出一次"排程完毕"卡片事件。
|
||||
//
|
||||
// 协议约束:
|
||||
// 1. 只走 extra,不附带 content/reasoning;
|
||||
// 2. 前端拿到 kind=schedule_completed 后自行拉取排程数据渲染卡片。
|
||||
func (e *ChunkEmitter) EmitScheduleCompleted(blockID, stage string) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
return e.emitExtraOnly(NewScheduleCompletedExtra(blockID, stage))
|
||||
}
|
||||
|
||||
// EmitBusinessCard 输出一次业务结果卡片事件。
|
||||
//
|
||||
// 协议约束:
|
||||
// 1. 只走 extra,不附带 content/reasoning;
|
||||
// 2. card 为空时直接跳过,避免发出缺少关键字段的空卡片。
|
||||
func (e *ChunkEmitter) EmitBusinessCard(blockID, stage string, card *StreamBusinessCardExtra) error {
|
||||
if e == nil || e.emit == nil || card == nil {
|
||||
return nil
|
||||
}
|
||||
return e.emitExtraOnly(NewBusinessCardExtra(blockID, stage, card))
|
||||
}
|
||||
|
||||
// EmitFinish 统一输出 stop 结束块,并带上 finish extra。
|
||||
func (e *ChunkEmitter) EmitFinish(blockID, stage string) error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
payload, err := ToOpenAIFinishStreamWithExtra(
|
||||
e.RequestID,
|
||||
e.ModelName,
|
||||
e.Created,
|
||||
NewFinishExtra(blockID, stage),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
return e.emit(payload)
|
||||
}
|
||||
|
||||
// EmitDone 统一输出 OpenAI 兼容流式结束标记。
|
||||
func (e *ChunkEmitter) EmitDone() error {
|
||||
if e == nil || e.emit == nil {
|
||||
return nil
|
||||
}
|
||||
return e.emit("[DONE]")
|
||||
}
|
||||
|
||||
// EmitStreamAssistantText 从 StreamReader 逐 chunk 读取并实时推送 assistant 正文。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 负责把 StreamReader 的每个 chunk 实时转换为 SSE payload 推送;
|
||||
// 2. 负责累计完整文本并返回,供调用方写入 history;
|
||||
// 3. 不负责打开/关闭 StreamReader,调用方负责生命周期管理。
|
||||
func (e *ChunkEmitter) EmitStreamAssistantText(
|
||||
ctx context.Context,
|
||||
reader llmservice.StreamReader,
|
||||
blockID, stage string,
|
||||
) (string, error) {
|
||||
if e == nil || reader == nil {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
var fullText strings.Builder
|
||||
firstChunk := true
|
||||
digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage)
|
||||
if digestorErr != nil {
|
||||
return "", digestorErr
|
||||
}
|
||||
defer func() {
|
||||
if digestor != nil {
|
||||
_ = digestor.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
chunk, err := reader.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return fullText.String(), err
|
||||
}
|
||||
|
||||
// 1. reasoning content 只喂给摘要器,不再透传给前端。
|
||||
// 2. 未注入摘要能力时直接丢弃,避免 raw reasoning_content 泄漏到 SSE。
|
||||
if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
||||
if digestor != nil {
|
||||
digestor.Append(chunk.ReasoningContent)
|
||||
}
|
||||
}
|
||||
|
||||
// 推送 assistant 正文。
|
||||
if chunk != nil && chunk.Content != "" {
|
||||
if digestor != nil {
|
||||
digestor.MarkContentStarted()
|
||||
}
|
||||
if emitErr := e.EmitAssistantText(blockID, stage, chunk.Content, firstChunk); emitErr != nil {
|
||||
return fullText.String(), emitErr
|
||||
}
|
||||
fullText.WriteString(chunk.Content)
|
||||
firstChunk = false
|
||||
}
|
||||
}
|
||||
|
||||
return fullText.String(), nil
|
||||
}
|
||||
|
||||
// EmitStreamReasoningText 从 StreamReader 逐 chunk 读取 reasoning,并转成低频 thinking_summary。
|
||||
//
|
||||
// 与 EmitStreamAssistantText 结构相同,但不再输出 raw ReasoningContent。
|
||||
// 用于只需展示思考过程而无需展示正文的场景。
|
||||
func (e *ChunkEmitter) EmitStreamReasoningText(
|
||||
ctx context.Context,
|
||||
reader llmservice.StreamReader,
|
||||
blockID, stage string,
|
||||
) (string, error) {
|
||||
if e == nil || reader == nil {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
var fullText strings.Builder
|
||||
digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage)
|
||||
if digestorErr != nil {
|
||||
return "", digestorErr
|
||||
}
|
||||
defer func() {
|
||||
if digestor != nil {
|
||||
_ = digestor.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
chunk, err := reader.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return fullText.String(), err
|
||||
}
|
||||
|
||||
if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
||||
if digestor != nil {
|
||||
digestor.Append(chunk.ReasoningContent)
|
||||
}
|
||||
fullText.WriteString(chunk.ReasoningContent)
|
||||
}
|
||||
}
|
||||
|
||||
return fullText.String(), nil
|
||||
}
|
||||
|
||||
// EmitStageAsReasoning 把"阶段提示"伪装成 reasoning chunk 推给前端。
|
||||
//
|
||||
// 兼容说明:
|
||||
// 1. 保留旧函数签名,方便当前旧链路直接复用;
|
||||
// 2. 实际实现已升级为统一的 ChunkEmitter + status extra;
|
||||
// 3. 这样后续新链路可以直接跳过这个兼容函数,转用结构化方法。
|
||||
func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, created int64, stage, detail string, includeRole bool) error {
|
||||
return NewChunkEmitter(emit, requestID, modelName, created).EmitStatus(stage, stage, stage, detail, includeRole)
|
||||
}
|
||||
|
||||
// EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。
|
||||
//
|
||||
// 注意:
|
||||
// 1. 这里保持"整段发",不主动切块;
|
||||
// 2. 若后续某条链路需要更自然的阅读节奏,应直接调用 EmitPseudoAssistantText;
|
||||
// 3. 为兼容老调用侧,这里 blockID 和 stage 都留空。
|
||||
func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error {
|
||||
return NewChunkEmitter(emit, requestID, modelName, created).EmitAssistantText("", "", content, includeRole)
|
||||
}
|
||||
|
||||
// EmitFinish 统一输出 stop 结束块。
|
||||
func EmitFinish(emit PayloadEmitter, requestID, modelName string, created int64) error {
|
||||
return NewChunkEmitter(emit, requestID, modelName, created).EmitFinish("", "")
|
||||
}
|
||||
|
||||
// EmitDone 统一输出 OpenAI 兼容流式结束标记。
|
||||
func EmitDone(emit PayloadEmitter) error {
|
||||
return NewChunkEmitter(emit, "", "", 0).EmitDone()
|
||||
}
|
||||
|
||||
func buildStageReasoningText(stage, detail string) string {
|
||||
stage = strings.TrimSpace(stage)
|
||||
detail = strings.TrimSpace(detail)
|
||||
|
||||
switch {
|
||||
case stage != "" && detail != "":
|
||||
return fmt.Sprintf("阶段:%s\n%s", stage, detail)
|
||||
case stage != "":
|
||||
return fmt.Sprintf("阶段:%s", stage)
|
||||
default:
|
||||
return detail
|
||||
}
|
||||
}
|
||||
|
||||
func buildToolCallReasoningText(toolName, summary, argumentsPreview string) string {
|
||||
toolName = strings.TrimSpace(toolName)
|
||||
summary = strings.TrimSpace(summary)
|
||||
argumentsPreview = strings.TrimSpace(argumentsPreview)
|
||||
|
||||
lines := make([]string, 0, 3)
|
||||
if toolName != "" {
|
||||
lines = append(lines, fmt.Sprintf("正在调用工具:%s", toolName))
|
||||
}
|
||||
if summary != "" {
|
||||
lines = append(lines, summary)
|
||||
}
|
||||
if argumentsPreview != "" {
|
||||
lines = append(lines, fmt.Sprintf("参数摘要:%s", argumentsPreview))
|
||||
}
|
||||
return strings.TrimSpace(strings.Join(lines, "\n"))
|
||||
}
|
||||
|
||||
func buildToolResultReasoningText(toolName, summary string) string {
|
||||
toolName = strings.TrimSpace(toolName)
|
||||
summary = strings.TrimSpace(summary)
|
||||
|
||||
switch {
|
||||
case toolName != "" && summary != "":
|
||||
return fmt.Sprintf("工具结果:%s\n%s", toolName, summary)
|
||||
case toolName != "":
|
||||
return fmt.Sprintf("工具结果:%s", toolName)
|
||||
default:
|
||||
return summary
|
||||
}
|
||||
}
|
||||
|
||||
func buildConfirmAssistantText(title, summary string) string {
|
||||
title = strings.TrimSpace(title)
|
||||
summary = strings.TrimSpace(summary)
|
||||
|
||||
switch {
|
||||
case title != "" && summary != "":
|
||||
return fmt.Sprintf("%s\n%s", title, summary)
|
||||
case title != "":
|
||||
return title
|
||||
default:
|
||||
return summary
|
||||
}
|
||||
}
|
||||
|
||||
func buildInterruptAssistantText(interactionType, summary string) string {
|
||||
interactionType = strings.TrimSpace(interactionType)
|
||||
summary = strings.TrimSpace(summary)
|
||||
|
||||
switch {
|
||||
case interactionType != "" && summary != "":
|
||||
return fmt.Sprintf("当前进入 %s 阶段。\n%s", interactionType, summary)
|
||||
case summary != "":
|
||||
return summary
|
||||
default:
|
||||
return interactionType
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ChunkEmitter) emitPseudoText(ctx context.Context, text string, options PseudoStreamOptions, emitChunk func(chunk string, includeRole bool) error) error {
|
||||
if emitChunk == nil {
|
||||
return nil
|
||||
}
|
||||
// 只剥首尾空格和制表符,保留结尾 \n,让上层加的段落分隔符能作为内容的一部分推出。
|
||||
text = strings.TrimRight(strings.TrimLeft(text, " \t\r\n"), " \t\r")
|
||||
if text == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
chunks := SplitPseudoStreamText(text, options)
|
||||
for i, chunk := range chunks {
|
||||
if err := emitChunk(chunk, i == 0); err != nil {
|
||||
return err
|
||||
}
|
||||
if i < len(chunks)-1 {
|
||||
if err := waitPseudoStreamInterval(ctx, options.ChunkInterval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SplitPseudoStreamText 按"标点优先、长度兜底"的策略切分整段文本。
|
||||
//
|
||||
// 步骤说明:
|
||||
// 1. 优先在句号、问号、感叹号、分号、换行等自然边界切块,保证阅读顺畅;
|
||||
// 2. 若长时间遇不到合适边界,则在 MaxChunkRunes 处强制切块,避免整段卡太久;
|
||||
// 3. 对中文文本优先按 rune 长度处理,避免多字节字符被截断。
|
||||
func SplitPseudoStreamText(text string, options PseudoStreamOptions) []string {
|
||||
hasTrailingNewline := strings.HasSuffix(strings.TrimRight(text, " \t"), "\n")
|
||||
text = strings.TrimRight(strings.TrimLeft(text, " \t\r\n"), " \t\r")
|
||||
if text == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
options = normalizePseudoStreamOptions(options)
|
||||
runes := []rune(text)
|
||||
if len(runes) <= options.MaxChunkRunes {
|
||||
// text 经 TrimRight(" \t\r") 已保留结尾 \n,直接返回,不再追加。
|
||||
return []string{text}
|
||||
}
|
||||
|
||||
chunks := make([]string, 0, len(runes)/options.MinChunkRunes+1)
|
||||
start := 0
|
||||
size := 0
|
||||
for i, r := range runes {
|
||||
size++
|
||||
|
||||
shouldFlush := false
|
||||
if size >= options.MaxChunkRunes {
|
||||
shouldFlush = true
|
||||
}
|
||||
if size >= options.MinChunkRunes && isPseudoStreamBoundary(r) {
|
||||
shouldFlush = true
|
||||
}
|
||||
if !shouldFlush {
|
||||
continue
|
||||
}
|
||||
|
||||
// 用 Trim(" \t\r") 代替 TrimSpace:保留 chunk 内的 \n(段落分隔符)。
|
||||
// TrimSpace 会把 flush 在 \n 边界时结尾的 \n、以及下一段开头的 \n 全部删掉,导致黏连。
|
||||
chunk := strings.Trim(string(runes[start:i+1]), " \t\r")
|
||||
if chunk != "" {
|
||||
chunks = append(chunks, chunk)
|
||||
}
|
||||
start = i + 1
|
||||
size = 0
|
||||
}
|
||||
|
||||
if start < len(runes) {
|
||||
chunk := strings.Trim(string(runes[start:]), " \t\r")
|
||||
if chunk != "" {
|
||||
chunks = append(chunks, chunk)
|
||||
}
|
||||
}
|
||||
|
||||
if len(chunks) == 0 {
|
||||
return []string{text}
|
||||
}
|
||||
// 仅当最后一个 chunk 尚未以 \n 结尾时才追加,避免 Trim 修复后出现双换行。
|
||||
if hasTrailingNewline && !strings.HasSuffix(chunks[len(chunks)-1], "\n") {
|
||||
chunks[len(chunks)-1] += "\n"
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
func normalizePseudoStreamOptions(options PseudoStreamOptions) PseudoStreamOptions {
|
||||
if options.MinChunkRunes <= 0 {
|
||||
options.MinChunkRunes = defaultPseudoStreamMinChunkRunes
|
||||
}
|
||||
if options.MaxChunkRunes <= 0 {
|
||||
options.MaxChunkRunes = defaultPseudoStreamMaxChunkRunes
|
||||
}
|
||||
if options.MaxChunkRunes < options.MinChunkRunes {
|
||||
options.MaxChunkRunes = options.MinChunkRunes
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
func isPseudoStreamBoundary(r rune) bool {
|
||||
switch r {
|
||||
case '。', '!', '?', ';', ':', ',', '.', '!', '?', ';', ':', ',', '\n':
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func waitPseudoStreamInterval(ctx context.Context, interval time.Duration) error {
|
||||
if interval <= 0 {
|
||||
return nil
|
||||
}
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
timer := time.NewTimer(interval)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
404
backend/services/agent/stream/openai.go
Normal file
404
backend/services/agent/stream/openai.go
Normal file
@@ -0,0 +1,404 @@
|
||||
package agentstream
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/cloudwego/eino/schema"
|
||||
)
|
||||
|
||||
// OpenAIChunkResponse 是 OpenAI 兼容的流式 chunk DTO。
|
||||
//
|
||||
// 设计说明:
|
||||
// 1. 外层继续保持 OpenAI 兼容壳,避免前端和调试工具一次性大改;
|
||||
// 2. 新增顶层 Extra 字段,用来承载“工具调用 / 确认请求 / 中断恢复”等结构化事件;
|
||||
// 3. 这样旧前端仍可继续读取 delta.content / delta.reasoning_content,新前端则可渐进消费 extra。
|
||||
type OpenAIChunkResponse struct {
|
||||
ID string `json:"id"`
|
||||
Object string `json:"object"`
|
||||
Created int64 `json:"created"`
|
||||
Model string `json:"model"`
|
||||
Choices []OpenAIChunkChoice `json:"choices,omitempty"`
|
||||
Extra *OpenAIChunkExtra `json:"extra,omitempty"`
|
||||
}
|
||||
|
||||
// OpenAIChunkChoice 对应 OpenAI choices[0]。
|
||||
type OpenAIChunkChoice struct {
|
||||
Index int `json:"index"`
|
||||
Delta OpenAIChunkDelta `json:"delta"`
|
||||
FinishReason *string `json:"finish_reason"`
|
||||
}
|
||||
|
||||
// OpenAIChunkDelta 是真正承载 role/content/reasoning 的位置。
|
||||
type OpenAIChunkDelta struct {
|
||||
Role string `json:"role,omitempty"`
|
||||
Content string `json:"content,omitempty"`
|
||||
ReasoningContent string `json:"reasoning_content,omitempty"`
|
||||
}
|
||||
|
||||
// StreamExtraKind 表示当前 chunk 在业务语义上属于哪类事件。
|
||||
type StreamExtraKind string
|
||||
|
||||
const (
|
||||
StreamExtraKindReasoningText StreamExtraKind = "reasoning_text"
|
||||
StreamExtraKindThinkingSummary StreamExtraKind = "thinking_summary"
|
||||
StreamExtraKindAssistantText StreamExtraKind = "assistant_text"
|
||||
StreamExtraKindStatus StreamExtraKind = "status"
|
||||
StreamExtraKindToolCall StreamExtraKind = "tool_call"
|
||||
StreamExtraKindToolResult StreamExtraKind = "tool_result"
|
||||
StreamExtraKindConfirm StreamExtraKind = "confirm_request"
|
||||
StreamExtraKindInterrupt StreamExtraKind = "interrupt"
|
||||
StreamExtraKindBusinessCard StreamExtraKind = "business_card"
|
||||
StreamExtraKindFinish StreamExtraKind = "finish"
|
||||
StreamExtraKindScheduleCompleted StreamExtraKind = "schedule_completed"
|
||||
)
|
||||
|
||||
// StreamDisplayMode 表示前端更适合如何展示该结构化事件。
|
||||
type StreamDisplayMode string
|
||||
|
||||
const (
|
||||
StreamDisplayModeAppend StreamDisplayMode = "append"
|
||||
StreamDisplayModeReplace StreamDisplayMode = "replace"
|
||||
StreamDisplayModeCard StreamDisplayMode = "card"
|
||||
)
|
||||
|
||||
// OpenAIChunkExtra 是挂在 OpenAI 兼容壳上的结构化扩展字段。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. Kind / Stage / BlockID 提供前端排版和分组所需的最小元信息;
|
||||
// 2. Status / Tool / Confirm / Interrupt / BusinessCard 只存展示层真正需要的摘要,不直接耦合后端完整状态对象;
|
||||
// 3. Meta 留给后续做灰度扩展,避免每加一种小字段都要立刻改 DTO 结构。
|
||||
type OpenAIChunkExtra struct {
|
||||
Kind StreamExtraKind `json:"kind,omitempty"`
|
||||
BlockID string `json:"block_id,omitempty"`
|
||||
Stage string `json:"stage,omitempty"`
|
||||
DisplayMode StreamDisplayMode `json:"display_mode,omitempty"`
|
||||
ThinkingSummary *StreamThinkingSummaryExtra `json:"thinking_summary,omitempty"`
|
||||
Status *StreamStatusExtra `json:"status,omitempty"`
|
||||
Tool *StreamToolExtra `json:"tool,omitempty"`
|
||||
Confirm *StreamConfirmExtra `json:"confirm,omitempty"`
|
||||
Interrupt *StreamInterruptExtra `json:"interrupt,omitempty"`
|
||||
BusinessCard *StreamBusinessCardExtra `json:"business_card,omitempty"`
|
||||
Meta map[string]any `json:"meta,omitempty"`
|
||||
}
|
||||
|
||||
// StreamThinkingSummaryExtra 表示“流式思考摘要”事件。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. short_summary 仅用于 SSE 端快速展示短句,不要求与持久化内容完全一致;
|
||||
// 2. detail_summary 作为更完整的摘要正文,后续持久化层可直接复用;
|
||||
// 3. summary_seq / final / duration_seconds 由摘要调度层补充运行态信息,前端可据此去重和排序。
|
||||
type StreamThinkingSummaryExtra struct {
|
||||
SummarySeq int `json:"summary_seq,omitempty"`
|
||||
ShortSummary string `json:"short_summary,omitempty"`
|
||||
DetailSummary string `json:"detail_summary,omitempty"`
|
||||
Final bool `json:"final,omitempty"`
|
||||
DurationSeconds float64 `json:"duration_seconds,omitempty"`
|
||||
}
|
||||
|
||||
// StreamStatusExtra 表示普通阶段状态或提示性事件。
|
||||
type StreamStatusExtra struct {
|
||||
Code string `json:"code,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
}
|
||||
|
||||
// StreamToolExtra 表示一次工具调用相关事件。
|
||||
type StreamToolExtra struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
ArgumentsPreview string `json:"arguments_preview,omitempty"`
|
||||
ArgumentView map[string]any `json:"argument_view,omitempty"`
|
||||
ResultView map[string]any `json:"result_view,omitempty"`
|
||||
}
|
||||
|
||||
// StreamConfirmExtra 表示一次待确认事件的展示摘要。
|
||||
type StreamConfirmExtra struct {
|
||||
InteractionID string `json:"interaction_id,omitempty"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
}
|
||||
|
||||
// StreamInterruptExtra 表示一次中断事件的展示摘要。
|
||||
type StreamInterruptExtra struct {
|
||||
InteractionID string `json:"interaction_id,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
}
|
||||
|
||||
// StreamBusinessCardExtra 表示一张业务结果卡片。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. CardType 只允许前端已约定的卡片类型(task_query/task_record);
|
||||
// 2. Source 仅在 task_record 时有语义,其他卡片类型可为空;
|
||||
// 3. Data 承载“可直接渲染的最小快照”,避免前端再二次补拉才能看到结果。
|
||||
type StreamBusinessCardExtra struct {
|
||||
CardType string `json:"card_type,omitempty"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
Source string `json:"source,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
// ToOpenAIStream 把 Eino message 转成 OpenAI 兼容 chunk。
|
||||
func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool) (string, error) {
|
||||
return ToOpenAIStreamWithExtra(chunk, requestID, modelName, created, includeRole, nil)
|
||||
}
|
||||
|
||||
// ToOpenAIStreamWithExtra 把 Eino message 转成带 extra 的 OpenAI 兼容 chunk。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 负责把 chunk.Content / chunk.ReasoningContent 映射到协议字段;
|
||||
// 2. 负责挂载可选 extra,供前端识别工具调用、确认请求等结构化事件;
|
||||
// 3. 不负责发送,也不负责决定“这个 chunk 该不该推”。
|
||||
func ToOpenAIStreamWithExtra(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool, extra *OpenAIChunkExtra) (string, error) {
|
||||
delta := OpenAIChunkDelta{}
|
||||
if includeRole {
|
||||
delta.Role = "assistant"
|
||||
}
|
||||
if chunk != nil {
|
||||
delta.Content = chunk.Content
|
||||
delta.ReasoningContent = chunk.ReasoningContent
|
||||
}
|
||||
return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil, extra)
|
||||
}
|
||||
|
||||
// ToOpenAIReasoningChunk 直接构造一个 reasoning chunk。
|
||||
func ToOpenAIReasoningChunk(requestID, modelName string, created int64, reasoning string, includeRole bool) (string, error) {
|
||||
return ToOpenAIReasoningChunkWithExtra(requestID, modelName, created, reasoning, includeRole, nil)
|
||||
}
|
||||
|
||||
// ToOpenAIReasoningChunkWithExtra 直接构造一个带 extra 的 reasoning chunk。
|
||||
func ToOpenAIReasoningChunkWithExtra(requestID, modelName string, created int64, reasoning string, includeRole bool, extra *OpenAIChunkExtra) (string, error) {
|
||||
delta := OpenAIChunkDelta{ReasoningContent: reasoning}
|
||||
if includeRole {
|
||||
delta.Role = "assistant"
|
||||
}
|
||||
return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil, extra)
|
||||
}
|
||||
|
||||
// ToOpenAIAssistantChunk 直接构造一个正文 chunk。
|
||||
func ToOpenAIAssistantChunk(requestID, modelName string, created int64, content string, includeRole bool) (string, error) {
|
||||
return ToOpenAIAssistantChunkWithExtra(requestID, modelName, created, content, includeRole, nil)
|
||||
}
|
||||
|
||||
// ToOpenAIAssistantChunkWithExtra 直接构造一个带 extra 的正文 chunk。
|
||||
func ToOpenAIAssistantChunkWithExtra(requestID, modelName string, created int64, content string, includeRole bool, extra *OpenAIChunkExtra) (string, error) {
|
||||
delta := OpenAIChunkDelta{Content: content}
|
||||
if includeRole {
|
||||
delta.Role = "assistant"
|
||||
}
|
||||
return buildOpenAIChunkPayload(requestID, modelName, created, delta, nil, extra)
|
||||
}
|
||||
|
||||
// ToOpenAIFinishStream 生成流式结束 chunk(finish_reason=stop)。
|
||||
func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, error) {
|
||||
return ToOpenAIFinishStreamWithExtra(requestID, modelName, created, nil)
|
||||
}
|
||||
|
||||
// ToOpenAIFinishStreamWithExtra 生成带 extra 的流式结束 chunk。
|
||||
func ToOpenAIFinishStreamWithExtra(requestID, modelName string, created int64, extra *OpenAIChunkExtra) (string, error) {
|
||||
stop := "stop"
|
||||
return buildOpenAIChunkPayload(requestID, modelName, created, OpenAIChunkDelta{}, &stop, extra)
|
||||
}
|
||||
|
||||
// NewReasoningTextExtra 创建“思考文字”事件的 extra。
|
||||
func NewReasoningTextExtra(blockID, stage string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindReasoningText,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeAppend,
|
||||
}
|
||||
}
|
||||
|
||||
// NewThinkingSummaryExtra 创建“流式思考摘要”事件的 extra。
|
||||
func NewThinkingSummaryExtra(blockID, stage string, summary StreamThinkingSummaryExtra) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindThinkingSummary,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeAppend,
|
||||
ThinkingSummary: &summary,
|
||||
}
|
||||
}
|
||||
|
||||
// NewAssistantTextExtra 创建“正文文字”事件的 extra。
|
||||
func NewAssistantTextExtra(blockID, stage string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindAssistantText,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeAppend,
|
||||
}
|
||||
}
|
||||
|
||||
// NewStatusExtra 创建普通状态事件的 extra。
|
||||
func NewStatusExtra(blockID, stage, code, summary string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindStatus,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeCard,
|
||||
Status: &StreamStatusExtra{
|
||||
Code: code,
|
||||
Summary: summary,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewToolCallExtra 创建“工具调用开始/中间态”事件的 extra。
|
||||
func NewToolCallExtra(blockID, stage, toolName, status, summary, argumentsPreview string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindToolCall,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeCard,
|
||||
Tool: &StreamToolExtra{
|
||||
Name: toolName,
|
||||
Status: status,
|
||||
Summary: summary,
|
||||
ArgumentsPreview: argumentsPreview,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewToolResultExtra 创建“工具结果”事件的 extra。
|
||||
func NewToolResultExtra(
|
||||
blockID string,
|
||||
stage string,
|
||||
toolName string,
|
||||
status string,
|
||||
summary string,
|
||||
argumentsPreview string,
|
||||
argumentView map[string]any,
|
||||
resultView map[string]any,
|
||||
) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindToolResult,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeCard,
|
||||
Tool: &StreamToolExtra{
|
||||
Name: toolName,
|
||||
Status: status,
|
||||
Summary: summary,
|
||||
ArgumentsPreview: argumentsPreview,
|
||||
ArgumentView: argumentView,
|
||||
ResultView: resultView,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewConfirmRequestExtra 创建“待确认”事件的 extra。
|
||||
func NewConfirmRequestExtra(blockID, stage, interactionID, title, summary string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindConfirm,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeCard,
|
||||
Confirm: &StreamConfirmExtra{
|
||||
InteractionID: interactionID,
|
||||
Title: title,
|
||||
Summary: summary,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewInterruptExtra 创建“中断”事件的 extra。
|
||||
func NewInterruptExtra(blockID, stage, interactionID, interactionType, summary string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindInterrupt,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeCard,
|
||||
Interrupt: &StreamInterruptExtra{
|
||||
InteractionID: interactionID,
|
||||
Type: interactionType,
|
||||
Summary: summary,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// NewBusinessCardExtra 创建“业务结果卡片”事件的 extra。
|
||||
func NewBusinessCardExtra(blockID, stage string, businessCard *StreamBusinessCardExtra) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindBusinessCard,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeCard,
|
||||
BusinessCard: businessCard,
|
||||
}
|
||||
}
|
||||
|
||||
// NewScheduleCompletedExtra 创建”排程完毕”卡片事件的 extra。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 仅作为前端渲染”排程完毕小卡片”的信号,不携带排程数据;
|
||||
// 2. 前端收到此事件后,自行通过对话 ID 调用现有接口拉取排程详情;
|
||||
// 3. 触发条件:CommonState.HasScheduleChanges == true 且 IsCompleted()。
|
||||
func NewScheduleCompletedExtra(blockID, stage string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindScheduleCompleted,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeCard,
|
||||
}
|
||||
}
|
||||
|
||||
// NewFinishExtra 创建”收尾完成”事件的 extra。
|
||||
func NewFinishExtra(blockID, stage string) *OpenAIChunkExtra {
|
||||
return &OpenAIChunkExtra{
|
||||
Kind: StreamExtraKindFinish,
|
||||
BlockID: blockID,
|
||||
Stage: stage,
|
||||
DisplayMode: StreamDisplayModeReplace,
|
||||
}
|
||||
}
|
||||
|
||||
func buildOpenAIChunkPayload(requestID, modelName string, created int64, delta OpenAIChunkDelta, finishReason *string, extra *OpenAIChunkExtra) (string, error) {
|
||||
// 1. 若既没有 role,也没有正文/思考,也没有 finish_reason,且也没有 extra,则视为“空块”,直接跳过。
|
||||
// 2. 这样后续 emitter 即使拆成“结构化事件 + 文本事件”双轨,也能复用统一的空块兜底。
|
||||
if delta.Role == "" && delta.Content == "" && delta.ReasoningContent == "" && finishReason == nil && !hasStreamExtra(extra) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
choices := make([]OpenAIChunkChoice, 0, 1)
|
||||
if delta.Role != "" || delta.Content != "" || delta.ReasoningContent != "" || finishReason != nil {
|
||||
choices = append(choices, OpenAIChunkChoice{
|
||||
Index: 0,
|
||||
Delta: delta,
|
||||
FinishReason: finishReason,
|
||||
})
|
||||
}
|
||||
|
||||
dto := OpenAIChunkResponse{
|
||||
ID: requestID,
|
||||
Object: "chat.completion.chunk",
|
||||
Created: created,
|
||||
Model: modelName,
|
||||
Choices: choices,
|
||||
Extra: extra,
|
||||
}
|
||||
data, err := json.Marshal(dto)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
func hasStreamExtra(extra *OpenAIChunkExtra) bool {
|
||||
if extra == nil {
|
||||
return false
|
||||
}
|
||||
return extra.Kind != "" ||
|
||||
extra.BlockID != "" ||
|
||||
extra.Stage != "" ||
|
||||
extra.DisplayMode != "" ||
|
||||
extra.ThinkingSummary != nil ||
|
||||
extra.Status != nil ||
|
||||
extra.Tool != nil ||
|
||||
extra.Confirm != nil ||
|
||||
extra.Interrupt != nil ||
|
||||
extra.BusinessCard != nil ||
|
||||
len(extra.Meta) > 0
|
||||
}
|
||||
599
backend/services/agent/stream/reasoning_digestor.go
Normal file
599
backend/services/agent/stream/reasoning_digestor.go
Normal file
@@ -0,0 +1,599 @@
|
||||
package agentstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultReasoningDigestMinNewRunes = 120
|
||||
defaultReasoningDigestMinNewTokens = 80
|
||||
defaultReasoningDigestMinInterval = 3 * time.Second
|
||||
)
|
||||
|
||||
// ReasoningSummaryFunc 负责真正调用摘要模型。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 该函数只负责“把输入整理成一份摘要结果”,不负责调度、节流、正文闸门和结果丢弃;
|
||||
// 2. 返回值里的 short/detail 由模型或适配层填写;
|
||||
// 3. summary_seq / final / duration_seconds 由 ReasoningDigestor 统一补齐,避免上层重复维护运行态字段。
|
||||
type ReasoningSummaryFunc func(ctx context.Context, input ReasoningSummaryInput) (StreamThinkingSummaryExtra, error)
|
||||
|
||||
// ReasoningSummarySink 负责消费一条已经通过闸门校验的摘要结果。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 常见用法是把结果交给 ChunkEmitter.EmitThinkingSummary;
|
||||
// 2. 该回调不参与单飞、重试、水位线判断;
|
||||
// 3. 回调为 nil 时,Digestor 仍会维护 LatestSummary,方便调用方按需主动拉取。
|
||||
type ReasoningSummarySink func(summary StreamThinkingSummaryExtra)
|
||||
|
||||
// ReasoningSummaryInput 是注入给摘要模型调用方的统一输入。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. FullReasoning 提供完整 reasoning 缓冲区,适合做“全量重摘要”;
|
||||
// 2. DeltaReasoning + PreviousSummary 提供增量上下文,适合做“旧摘要续写”;
|
||||
// 3. CandidateSeq / Final / DurationSeconds 仅表达调度层意图,不要求模型原样回填。
|
||||
type ReasoningSummaryInput struct {
|
||||
FullReasoning string `json:"full_reasoning,omitempty"`
|
||||
DeltaReasoning string `json:"delta_reasoning,omitempty"`
|
||||
PreviousSummary *StreamThinkingSummaryExtra `json:"previous_summary,omitempty"`
|
||||
CandidateSeq int `json:"candidate_seq,omitempty"`
|
||||
Final bool `json:"final,omitempty"`
|
||||
DurationSeconds float64 `json:"duration_seconds,omitempty"`
|
||||
}
|
||||
|
||||
// ReasoningDigestorOptions 描述 reasoning 摘要器的调度参数。
|
||||
type ReasoningDigestorOptions struct {
|
||||
SummaryFunc ReasoningSummaryFunc
|
||||
SummarySink ReasoningSummarySink
|
||||
BaseContext context.Context
|
||||
MinNewRunes int
|
||||
MinNewTokens int
|
||||
MinInterval time.Duration
|
||||
SummaryTimeout time.Duration
|
||||
Now func() time.Time
|
||||
}
|
||||
|
||||
// ReasoningDigestor 负责把流式 reasoning 文本整理成“低频摘要事件”。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 只负责缓冲、单飞、水位线、正文闸门、Flush/Close,不直接依赖 AgentService;
|
||||
// 2. 只通过 SummaryFunc / SummarySink 两个函数注入模型调用与结果消费,不在这里选模型;
|
||||
// 3. 一旦正文开始或显式关闸,后续摘要结果即使返回成功也必须丢弃,避免前端和持久化出现越界数据。
|
||||
type ReasoningDigestor struct {
|
||||
summaryFunc ReasoningSummaryFunc
|
||||
summarySink ReasoningSummarySink
|
||||
baseContext context.Context
|
||||
minNewRunes int
|
||||
minNewTokens int
|
||||
minInterval time.Duration
|
||||
summaryTimeout time.Duration
|
||||
now func() time.Time
|
||||
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
buffer strings.Builder
|
||||
deltaBuffer strings.Builder
|
||||
startedAt time.Time
|
||||
lastRequestAt time.Time
|
||||
pendingRunes int
|
||||
pendingTokens int
|
||||
summarySeq int
|
||||
latestSummary *StreamThinkingSummaryExtra
|
||||
finalEmitted bool
|
||||
inFlight bool
|
||||
gateClosed bool
|
||||
contentStarted bool
|
||||
closed bool
|
||||
timer *time.Timer
|
||||
timerArmed bool
|
||||
currentCancel context.CancelFunc
|
||||
}
|
||||
|
||||
type reasoningDigestCall struct {
|
||||
ctx context.Context
|
||||
stop context.CancelFunc
|
||||
input ReasoningSummaryInput
|
||||
final bool
|
||||
}
|
||||
|
||||
// NewReasoningDigestor 创建一个只关注“流式思考摘要调度”的核心对象。
|
||||
//
|
||||
// 步骤说明:
|
||||
// 1. 先校验 SummaryFunc;它是唯一必填项,因为 Digestor 不在本文件里选择模型;
|
||||
// 2. 再补齐默认水位线和最小时间间隔,让调用方即使只传核心依赖也能启动;
|
||||
// 3. 最后只初始化并发控制原语,不在构造阶段启动常驻主循环,避免引入额外 goroutine 生命周期负担。
|
||||
func NewReasoningDigestor(options ReasoningDigestorOptions) (*ReasoningDigestor, error) {
|
||||
if options.SummaryFunc == nil {
|
||||
return nil, errors.New("reasoning digestor: SummaryFunc 不能为空")
|
||||
}
|
||||
|
||||
if options.MinNewRunes < 0 {
|
||||
options.MinNewRunes = 0
|
||||
}
|
||||
if options.MinNewTokens < 0 {
|
||||
options.MinNewTokens = 0
|
||||
}
|
||||
if options.MinNewRunes == 0 && options.MinNewTokens == 0 {
|
||||
options.MinNewRunes = defaultReasoningDigestMinNewRunes
|
||||
options.MinNewTokens = defaultReasoningDigestMinNewTokens
|
||||
}
|
||||
if options.MinInterval <= 0 {
|
||||
options.MinInterval = defaultReasoningDigestMinInterval
|
||||
}
|
||||
if options.BaseContext == nil {
|
||||
options.BaseContext = context.Background()
|
||||
}
|
||||
if options.Now == nil {
|
||||
options.Now = time.Now
|
||||
}
|
||||
|
||||
digestor := &ReasoningDigestor{
|
||||
summaryFunc: options.SummaryFunc,
|
||||
summarySink: options.SummarySink,
|
||||
baseContext: options.BaseContext,
|
||||
minNewRunes: options.MinNewRunes,
|
||||
minNewTokens: options.MinNewTokens,
|
||||
minInterval: options.MinInterval,
|
||||
summaryTimeout: options.SummaryTimeout,
|
||||
now: options.Now,
|
||||
}
|
||||
digestor.cond = sync.NewCond(&digestor.mu)
|
||||
return digestor, nil
|
||||
}
|
||||
|
||||
// Append 追加一段 reasoning chunk,并按水位线决定是否后台触发摘要。
|
||||
//
|
||||
// 步骤说明:
|
||||
// 1. 先把原始 reasoning 文本写入 full buffer,保证 Flush/Close 可以拿到全量上下文;
|
||||
// 2. 再把本轮新增文本记入 deltaBuffer 与 rune/token 水位线,用于“最小新增量”判断;
|
||||
// 3. 若正文闸门已关闭,则只保留缓冲快照,不再调度摘要;
|
||||
// 4. 若当前已有摘要请求在飞,则只更新 dirty/latest,不排队第二个请求,等单飞请求返回后再决定是否补一次。
|
||||
func (d *ReasoningDigestor) Append(reasoning string) {
|
||||
if d == nil || reasoning == "" {
|
||||
return
|
||||
}
|
||||
|
||||
var call reasoningDigestCall
|
||||
var shouldStart bool
|
||||
|
||||
d.mu.Lock()
|
||||
if d.closed {
|
||||
d.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if d.startedAt.IsZero() {
|
||||
d.startedAt = d.now()
|
||||
}
|
||||
d.buffer.WriteString(reasoning)
|
||||
|
||||
if d.gateClosed || d.contentStarted {
|
||||
d.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
d.deltaBuffer.WriteString(reasoning)
|
||||
d.pendingRunes += utf8.RuneCountInString(reasoning)
|
||||
d.pendingTokens += estimateReasoningTokens(reasoning)
|
||||
d.finalEmitted = false
|
||||
|
||||
call, shouldStart = d.prepareSummaryLocked(d.baseContext, false, false)
|
||||
d.mu.Unlock()
|
||||
|
||||
if shouldStart {
|
||||
go d.runSummary(call)
|
||||
}
|
||||
}
|
||||
|
||||
// MarkContentStarted 标记“正文已经开始输出”。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 该方法会直接关闭摘要闸门;
|
||||
// 2. 它不回收旧摘要结果,但会丢弃后续任何尚未完成的摘要调用;
|
||||
// 3. 调用后即使继续 Append reasoning,也只保留缓冲,不再触发新摘要。
|
||||
func (d *ReasoningDigestor) MarkContentStarted() {
|
||||
d.closeGate(true)
|
||||
}
|
||||
|
||||
// CloseGate 显式关闭摘要闸门,但不额外声明正文已经开始。
|
||||
func (d *ReasoningDigestor) CloseGate() {
|
||||
d.closeGate(false)
|
||||
}
|
||||
|
||||
// Flush 在正文尚未开始时尝试补发最后一次摘要。
|
||||
//
|
||||
// 步骤说明:
|
||||
// 1. 先等待当前单飞请求结束,避免 Flush 与后台自动摘要并发跑两次;
|
||||
// 2. 若正文已经开始或闸门已关,则直接返回,不再补摘要;
|
||||
// 3. 若此前已经发过 final 且没有新增 reasoning,则跳过,避免重复 final 事件;
|
||||
// 4. 其余场景会强制走一次摘要,即使新增量还没达到自动触发水位线。
|
||||
func (d *ReasoningDigestor) Flush(ctx context.Context) error {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
call, shouldStart := d.prepareFlushCall(ctx)
|
||||
if !shouldStart {
|
||||
return nil
|
||||
}
|
||||
return d.runSummary(call)
|
||||
}
|
||||
|
||||
// Close 结束摘要器生命周期。
|
||||
//
|
||||
// 步骤说明:
|
||||
// 1. 若正文还未开始,先尝试 Flush 一次 final 摘要;
|
||||
// 2. 再关闭闸门、停止等待中的定时器,并取消正在进行的摘要调用;
|
||||
// 3. 最后等待单飞调用完全退出,避免遗留后台 goroutine 持续写结果。
|
||||
func (d *ReasoningDigestor) Close(ctx context.Context) error {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := d.Flush(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
if d.closed {
|
||||
d.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
d.closed = true
|
||||
d.gateClosed = true
|
||||
d.stopTimerLocked()
|
||||
if d.currentCancel != nil {
|
||||
d.currentCancel()
|
||||
}
|
||||
for d.inFlight {
|
||||
d.cond.Wait()
|
||||
}
|
||||
d.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// LatestSummary 返回最近一次通过闸门校验并成功发布的摘要。
|
||||
func (d *ReasoningDigestor) LatestSummary() (StreamThinkingSummaryExtra, bool) {
|
||||
if d == nil {
|
||||
return StreamThinkingSummaryExtra{}, false
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
if d.latestSummary == nil {
|
||||
return StreamThinkingSummaryExtra{}, false
|
||||
}
|
||||
return *cloneThinkingSummaryExtra(d.latestSummary), true
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) closeGate(markContentStarted bool) {
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
|
||||
d.mu.Lock()
|
||||
if markContentStarted {
|
||||
d.contentStarted = true
|
||||
}
|
||||
d.gateClosed = true
|
||||
d.pendingRunes = 0
|
||||
d.pendingTokens = 0
|
||||
d.deltaBuffer.Reset()
|
||||
d.stopTimerLocked()
|
||||
if d.currentCancel != nil {
|
||||
d.currentCancel()
|
||||
}
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) prepareFlushCall(ctx context.Context) (reasoningDigestCall, bool) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
if d.closed || d.gateClosed || d.contentStarted {
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
|
||||
d.stopTimerLocked()
|
||||
for d.inFlight {
|
||||
d.cond.Wait()
|
||||
if d.closed || d.gateClosed || d.contentStarted {
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
}
|
||||
|
||||
if strings.TrimSpace(d.buffer.String()) == "" {
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
if d.finalEmitted && d.pendingRunes == 0 && d.pendingTokens == 0 {
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
return d.prepareSummaryLocked(ctx, true, true)
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) prepareSummaryLocked(parent context.Context, force bool, final bool) (reasoningDigestCall, bool) {
|
||||
if d.closed || d.gateClosed || d.contentStarted || d.inFlight {
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
|
||||
fullReasoning := d.buffer.String()
|
||||
if strings.TrimSpace(fullReasoning) == "" {
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
|
||||
// 1. 自动摘要必须同时满足“新增量水位线 + 最小时间间隔”。
|
||||
// 2. 若新增量不足,则直接等待后续 Append,不做空转请求。
|
||||
// 3. 若时间间隔未到,则只挂一个定时器做兜底唤醒,避免排队多个请求。
|
||||
if !force {
|
||||
if !d.reachedWatermarkLocked() {
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
wait := d.nextAllowedIntervalLocked()
|
||||
if wait > 0 {
|
||||
d.armTimerLocked(wait)
|
||||
return reasoningDigestCall{}, false
|
||||
}
|
||||
}
|
||||
|
||||
callCtx, stop := d.newCallContext(parent)
|
||||
call := reasoningDigestCall{
|
||||
ctx: callCtx,
|
||||
stop: stop,
|
||||
input: ReasoningSummaryInput{
|
||||
FullReasoning: strings.Clone(fullReasoning),
|
||||
DeltaReasoning: strings.Clone(d.deltaBuffer.String()),
|
||||
PreviousSummary: cloneThinkingSummaryExtra(d.latestSummary),
|
||||
CandidateSeq: d.summarySeq + 1,
|
||||
Final: final,
|
||||
DurationSeconds: d.durationSecondsLocked(),
|
||||
},
|
||||
final: final,
|
||||
}
|
||||
|
||||
d.stopTimerLocked()
|
||||
d.inFlight = true
|
||||
d.lastRequestAt = d.now()
|
||||
d.pendingRunes = 0
|
||||
d.pendingTokens = 0
|
||||
d.deltaBuffer.Reset()
|
||||
d.currentCancel = stop
|
||||
|
||||
return call, true
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) runSummary(call reasoningDigestCall) error {
|
||||
if call.stop == nil {
|
||||
return nil
|
||||
}
|
||||
defer call.stop()
|
||||
|
||||
summary, err := d.summaryFunc(call.ctx, call.input)
|
||||
if err != nil {
|
||||
// 1. 摘要失败时不把错误扩散回主流式链路,避免 reasoning 展示被摘要能力反向拖垮。
|
||||
// 2. 若失败期间又追加了新 reasoning,则仍按单飞规则尝试补下一次;否则等待后续 Append/Flush 兜底。
|
||||
_, _, nextCall, shouldStart := d.finishSummary(call.final, nil)
|
||||
if shouldStart {
|
||||
go d.runSummary(nextCall)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
normalized := normalizeThinkingSummary(summary, call.input.Final, call.input.DurationSeconds)
|
||||
emittedSummary, sink, nextCall, shouldStart := d.finishSummary(call.final, &normalized)
|
||||
if emittedSummary != nil && sink != nil {
|
||||
sink(*emittedSummary)
|
||||
}
|
||||
if shouldStart {
|
||||
go d.runSummary(nextCall)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) finishSummary(final bool, summary *StreamThinkingSummaryExtra) (*StreamThinkingSummaryExtra, ReasoningSummarySink, reasoningDigestCall, bool) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
d.inFlight = false
|
||||
d.currentCancel = nil
|
||||
d.cond.Broadcast()
|
||||
|
||||
var emittedSummary *StreamThinkingSummaryExtra
|
||||
var sink ReasoningSummarySink
|
||||
|
||||
// 1. 先判断正文闸门;正文一旦开始,所有晚到结果都必须丢弃。
|
||||
// 2. 再补齐 summary_seq/final/duration,并缓存 LatestSummary 供上层读取。
|
||||
// 3. 若当前请求期间又积累了新 reasoning,则只启动下一次单飞摘要,不排队多次。
|
||||
if summary != nil && !d.closed && !d.gateClosed && !d.contentStarted {
|
||||
normalized := *summary
|
||||
d.summarySeq++
|
||||
normalized.SummarySeq = d.summarySeq
|
||||
normalized.Final = final
|
||||
if normalized.DurationSeconds <= 0 {
|
||||
normalized.DurationSeconds = d.durationSecondsLocked()
|
||||
}
|
||||
d.latestSummary = cloneThinkingSummaryExtra(&normalized)
|
||||
d.finalEmitted = final
|
||||
emittedSummary = cloneThinkingSummaryExtra(&normalized)
|
||||
sink = d.summarySink
|
||||
}
|
||||
|
||||
if d.closed || d.gateClosed || d.contentStarted || final {
|
||||
return emittedSummary, sink, reasoningDigestCall{}, false
|
||||
}
|
||||
|
||||
nextCall, shouldStart := d.prepareSummaryLocked(d.baseContext, false, false)
|
||||
return emittedSummary, sink, nextCall, shouldStart
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) reachedWatermarkLocked() bool {
|
||||
return reachedReasoningWatermark(d.pendingRunes, d.pendingTokens, d.minNewRunes, d.minNewTokens)
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) nextAllowedIntervalLocked() time.Duration {
|
||||
if d.lastRequestAt.IsZero() {
|
||||
return 0
|
||||
}
|
||||
wait := d.minInterval - d.now().Sub(d.lastRequestAt)
|
||||
if wait < 0 {
|
||||
return 0
|
||||
}
|
||||
return wait
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) armTimerLocked(wait time.Duration) {
|
||||
if wait <= 0 || d.closed || d.gateClosed || d.contentStarted {
|
||||
return
|
||||
}
|
||||
if d.timer == nil {
|
||||
d.timer = time.AfterFunc(wait, d.onTimer)
|
||||
d.timerArmed = true
|
||||
return
|
||||
}
|
||||
if d.timerArmed {
|
||||
d.timer.Reset(wait)
|
||||
return
|
||||
}
|
||||
d.timer.Reset(wait)
|
||||
d.timerArmed = true
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) stopTimerLocked() {
|
||||
if d.timer == nil {
|
||||
return
|
||||
}
|
||||
if d.timer.Stop() {
|
||||
d.timerArmed = false
|
||||
return
|
||||
}
|
||||
d.timerArmed = false
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) onTimer() {
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var call reasoningDigestCall
|
||||
var shouldStart bool
|
||||
|
||||
d.mu.Lock()
|
||||
d.timerArmed = false
|
||||
call, shouldStart = d.prepareSummaryLocked(d.baseContext, false, false)
|
||||
d.mu.Unlock()
|
||||
|
||||
if shouldStart {
|
||||
go d.runSummary(call)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) newCallContext(parent context.Context) (context.Context, context.CancelFunc) {
|
||||
if parent == nil {
|
||||
parent = d.baseContext
|
||||
}
|
||||
if parent == nil {
|
||||
parent = context.Background()
|
||||
}
|
||||
|
||||
baseCtx, baseCancel := context.WithCancel(parent)
|
||||
if d.summaryTimeout <= 0 {
|
||||
return baseCtx, baseCancel
|
||||
}
|
||||
|
||||
timeoutCtx, timeoutCancel := context.WithTimeout(baseCtx, d.summaryTimeout)
|
||||
return timeoutCtx, func() {
|
||||
timeoutCancel()
|
||||
baseCancel()
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ReasoningDigestor) durationSecondsLocked() float64 {
|
||||
if d.startedAt.IsZero() {
|
||||
return 0
|
||||
}
|
||||
duration := d.now().Sub(d.startedAt)
|
||||
if duration <= 0 {
|
||||
return 0
|
||||
}
|
||||
return float64(duration.Milliseconds()) / 1000
|
||||
}
|
||||
|
||||
func reachedReasoningWatermark(pendingRunes, pendingTokens, minRunes, minTokens int) bool {
|
||||
if minRunes > 0 && pendingRunes >= minRunes {
|
||||
return true
|
||||
}
|
||||
if minTokens > 0 && pendingTokens >= minTokens {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func normalizeThinkingSummary(summary StreamThinkingSummaryExtra, final bool, durationSeconds float64) StreamThinkingSummaryExtra {
|
||||
summary.ShortSummary = strings.TrimSpace(summary.ShortSummary)
|
||||
summary.DetailSummary = strings.TrimSpace(summary.DetailSummary)
|
||||
|
||||
// 1. 短摘要只是实时展示兜底,允许从长摘要压一个默认值。
|
||||
// 2. 反过来不能把短摘要补成 detail_summary,否则会绕过“短摘要不持久化”的产品语义。
|
||||
// 3. 若模型没有给 detail_summary,timeline 层会跳过持久化,仅保留本次 SSE 展示。
|
||||
if summary.ShortSummary == "" {
|
||||
summary.ShortSummary = summary.DetailSummary
|
||||
}
|
||||
summary.Final = final
|
||||
if summary.DurationSeconds <= 0 {
|
||||
summary.DurationSeconds = durationSeconds
|
||||
}
|
||||
return summary
|
||||
}
|
||||
|
||||
func cloneThinkingSummaryExtra(src *StreamThinkingSummaryExtra) *StreamThinkingSummaryExtra {
|
||||
if src == nil {
|
||||
return nil
|
||||
}
|
||||
clone := *src
|
||||
return &clone
|
||||
}
|
||||
|
||||
func estimateReasoningTokens(text string) int {
|
||||
text = strings.TrimSpace(text)
|
||||
if text == "" {
|
||||
return 0
|
||||
}
|
||||
|
||||
asciiRunes := 0
|
||||
totalTokens := 0
|
||||
for _, r := range text {
|
||||
switch {
|
||||
case unicode.IsSpace(r):
|
||||
if asciiRunes > 0 {
|
||||
totalTokens += compactASCIITokens(asciiRunes)
|
||||
asciiRunes = 0
|
||||
}
|
||||
case r <= unicode.MaxASCII && (unicode.IsLetter(r) || unicode.IsDigit(r)):
|
||||
asciiRunes++
|
||||
default:
|
||||
if asciiRunes > 0 {
|
||||
totalTokens += compactASCIITokens(asciiRunes)
|
||||
asciiRunes = 0
|
||||
}
|
||||
totalTokens++
|
||||
}
|
||||
}
|
||||
if asciiRunes > 0 {
|
||||
totalTokens += compactASCIITokens(asciiRunes)
|
||||
}
|
||||
return totalTokens
|
||||
}
|
||||
|
||||
func compactASCIITokens(asciiRunes int) int {
|
||||
if asciiRunes <= 0 {
|
||||
return 0
|
||||
}
|
||||
return max(1, (asciiRunes+3)/4)
|
||||
}
|
||||
37
backend/services/agent/stream/sse_adapter.go
Normal file
37
backend/services/agent/stream/sse_adapter.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package agentstream
|
||||
|
||||
import "log"
|
||||
|
||||
// NewSSEPayloadEmitter 创建将 chunk 事件写入 outChan 的 emitter。
|
||||
//
|
||||
// 职责边界:
|
||||
// 1. 接收 outChan(SSE 输出通道),返回 PayloadEmitter 函数;
|
||||
// 2. 只把原始 JSON payload 写入通道,不添加 "data: " 前缀和 "\n\n" 后缀;
|
||||
// 3. SSE 格式化("data: " + payload + "\n\n")由 API 层的 writeSSEData 统一处理;
|
||||
// 4. 通道满时静默丢弃并返回 nil,让图继续完成状态持久化,避免因客户端超时而丢失快照。
|
||||
//
|
||||
// 使用示例:
|
||||
//
|
||||
// emitter := NewSSEPayloadEmitter(outChan)
|
||||
// chunkEmitter := NewChunkEmitter(emitter, requestID, modelName, created)
|
||||
// chunkEmitter.EmitAssistantText("", "", "hello", true)
|
||||
func NewSSEPayloadEmitter(outChan chan<- string) PayloadEmitter {
|
||||
return func(payload string) error {
|
||||
if outChan == nil {
|
||||
return nil
|
||||
}
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case outChan <- payload:
|
||||
return nil
|
||||
default:
|
||||
// 通道已满:客户端可能已断开或消费过慢。
|
||||
// 静默丢弃此 chunk,让图继续执行并完成状态持久化。
|
||||
// 客户端重连后可从 Redis 快照恢复,不需要这条消息。
|
||||
log.Printf("[WARN] SSE outChan full, dropping payload (len=%d)", len(payload))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
41
backend/services/agent/stream/usage.go
Normal file
41
backend/services/agent/stream/usage.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package agentstream
|
||||
|
||||
import "github.com/cloudwego/eino/schema"
|
||||
|
||||
// CloneUsage 深拷贝 TokenUsage。
|
||||
func CloneUsage(usage *schema.TokenUsage) *schema.TokenUsage {
|
||||
if usage == nil {
|
||||
return nil
|
||||
}
|
||||
copied := *usage
|
||||
return &copied
|
||||
}
|
||||
|
||||
// MergeUsage 合并两段 usage,取更大值。
|
||||
// 适用于同一次调用不同流分片的 usage 收敛。
|
||||
func MergeUsage(base *schema.TokenUsage, incoming *schema.TokenUsage) *schema.TokenUsage {
|
||||
if incoming == nil {
|
||||
return CloneUsage(base)
|
||||
}
|
||||
if base == nil {
|
||||
return CloneUsage(incoming)
|
||||
}
|
||||
|
||||
merged := *base
|
||||
if incoming.PromptTokens > merged.PromptTokens {
|
||||
merged.PromptTokens = incoming.PromptTokens
|
||||
}
|
||||
if incoming.CompletionTokens > merged.CompletionTokens {
|
||||
merged.CompletionTokens = incoming.CompletionTokens
|
||||
}
|
||||
if incoming.TotalTokens > merged.TotalTokens {
|
||||
merged.TotalTokens = incoming.TotalTokens
|
||||
}
|
||||
if incoming.PromptTokenDetails.CachedTokens > merged.PromptTokenDetails.CachedTokens {
|
||||
merged.PromptTokenDetails.CachedTokens = incoming.PromptTokenDetails.CachedTokens
|
||||
}
|
||||
if incoming.CompletionTokensDetails.ReasoningTokens > merged.CompletionTokensDetails.ReasoningTokens {
|
||||
merged.CompletionTokensDetails.ReasoningTokens = incoming.CompletionTokensDetails.ReasoningTokens
|
||||
}
|
||||
return &merged
|
||||
}
|
||||
Reference in New Issue
Block a user