package agentstream import ( "context" "fmt" "io" "strings" "sync" "time" llmservice "github.com/LoveLosita/smartflow/backend/services/llm" ) // PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。 // // 说明: // 1. 这里刻意不用 chan/string 绑死实现; // 2. 上层既可以传"写 channel"的函数,也可以传"写 gin stream"的函数; // 3. 只要签名是 `func(string) error`,都能接进来。 type PayloadEmitter func(payload string) error // StageEmitter 是 graph/node 对"当前阶段"进行推送的兼容接口。 // // 设计说明: // 1. 旧调用侧仍然只关心 stage/detail 两段文本,因此这里先保留; // 2. 新的结构化事件能力会通过 ChunkEmitter 补齐,而不是继续扩展这个函数签名; // 3. 这样能兼顾当前兼容性和后续协议升级空间。 type StageEmitter func(stage, detail string) // PseudoStreamOptions 描述"整段文字伪流式输出"的切块与节奏配置。 // // 字段语义: // 1. MinChunkRunes:达到该最小长度后,若命中标点/换行等边界,可提前切块; // 2. MaxChunkRunes:单块最大 rune 数,超过后强制切块,避免一次性发太长; // 3. ChunkInterval:块与块之间的等待时间;为 0 时表示只做切块,不做人为延迟。 type PseudoStreamOptions struct { MinChunkRunes int MaxChunkRunes int ChunkInterval time.Duration } const ( defaultPseudoStreamMinChunkRunes = 8 defaultPseudoStreamMaxChunkRunes = 24 ) // DefaultPseudoStreamOptions 返回一份适合中文短句展示的默认伪流式配置。 func DefaultPseudoStreamOptions() PseudoStreamOptions { return PseudoStreamOptions{ MinChunkRunes: defaultPseudoStreamMinChunkRunes, MaxChunkRunes: defaultPseudoStreamMaxChunkRunes, } } // ChunkEmitter 是 agent 统一的 SSE chunk 发射器。 // // 职责边界: // 1. 负责把"正文 / 思考 / 工具事件 / 确认请求 / 中断提示"统一转换成 OpenAI 兼容 payload; // 2. 负责在必要时把结构化事件附带成 extra,同时给当前前端提供可读的降级文本; // 3. 不负责决定什么时候发什么,也不负责持久化状态。 type ChunkEmitter struct { emit PayloadEmitter RequestID string ModelName string Created int64 // thinkingGateMu 是“正文门卫”的轻量保护。 // 1. 它只保护 thinking_summary 是否还能发,不串行化全部 SSE; // 2. 正文一旦开始,对应 block 的门会被关闭,后续同 block 摘要直接丢弃; // 3. 这样既避免摘要 goroutine 在正文之后补发旧思考,又不误杀后续节点的新一轮思考。 thinkingGateMu sync.Mutex thinkingClosedBlocks map[string]bool // reasoningSummaryFunc 用于把原始 reasoning 压成用户可见摘要。 // 1. 该函数由 service 层注入,stream 包只负责调度,不负责选择模型; // 2. 未注入时模型 reasoning 只会被静默丢弃,不再回退成 raw reasoning_content; // 3. 正文一旦开始,ReasoningDigestor 和 ChunkEmitter 会同时关门,迟到结果不会再发给前端。 reasoningSummaryFunc ReasoningSummaryFunc // extraEventHook 用于把关键结构化事件同步给上层做持久化。 // 1. hook 失败不能影响 SSE 主链路; // 2. hook 只接收 extra 结构,避免 emitter 反向依赖业务层; // 3. 不注入时保持空实现,兼容旧调用路径。 extraEventHook func(extra *OpenAIChunkExtra) } // NoopPayloadEmitter 返回一个空实现,便于骨架期安全占位。 func NoopPayloadEmitter() PayloadEmitter { return func(string) error { return nil } } // NoopStageEmitter 返回一个空实现,避免 graph 在没有接前端时处处判空。 func NoopStageEmitter() StageEmitter { return func(stage, detail string) {} } // WrapStageEmitter 把可空函数包装成稳定的 StageEmitter。 func WrapStageEmitter(fn func(stage, detail string)) StageEmitter { if fn == nil { return NoopStageEmitter() } return fn } // NewChunkEmitter 创建统一 chunk 发射器。 // // 兜底策略: // 1. emit 为空时回退到 Noop,避免骨架期到处判空; // 2. modelName 为空时回填 worker,保持 OpenAI 兼容字段稳定; // 3. created <= 0 时用当前时间兜底,避免上层还没决定时间戳就无法复用。 func NewChunkEmitter(emit PayloadEmitter, requestID, modelName string, created int64) *ChunkEmitter { if emit == nil { emit = NoopPayloadEmitter() } modelName = strings.TrimSpace(modelName) if modelName == "" { modelName = "worker" } if created <= 0 { created = time.Now().Unix() } return &ChunkEmitter{ emit: emit, RequestID: strings.TrimSpace(requestID), ModelName: modelName, Created: created, } } // SetExtraEventHook 设置结构化事件回调。 func (e *ChunkEmitter) SetExtraEventHook(hook func(extra *OpenAIChunkExtra)) { if e == nil { return } e.extraEventHook = hook } // SetReasoningSummaryFunc 设置 reasoning 摘要模型调用函数。 // // 职责边界: // 1. 这里只保存函数引用,不立即调用模型; // 2. 摘要触发频率、单飞、正文闸门由 ReasoningDigestor 负责; // 3. 传 nil 表示关闭摘要能力,后续 reasoning chunk 会被静默丢弃。 func (e *ChunkEmitter) SetReasoningSummaryFunc(fn ReasoningSummaryFunc) { if e == nil { return } e.reasoningSummaryFunc = fn } // NewReasoningDigestor 为当前 block 创建一个 reasoning 摘要器。 // // 步骤说明: // 1. 若未注入摘要函数,返回 nil,调用方只需跳过 raw reasoning 推送; // 2. 摘要结果先经过 ChunkEmitter 的正文门卫,再走统一 extra/hook 链路; // 3. Digestor 自身仍负责单飞、水位线和正文开始后的 in-flight 结果丢弃。 func (e *ChunkEmitter) NewReasoningDigestor(ctx context.Context, blockID, stage string) (*ReasoningDigestor, error) { if e == nil || e.reasoningSummaryFunc == nil { return nil, nil } e.openThinkingSummaryGate(blockID, stage) return NewReasoningDigestor(ReasoningDigestorOptions{ SummaryFunc: e.reasoningSummaryFunc, SummarySink: func(summary StreamThinkingSummaryExtra) { _ = e.EmitThinkingSummary(blockID, stage, summary) }, BaseContext: ctx, SummaryTimeout: 8 * time.Second, }) } // EmitReasoningText 输出一段 reasoning 文字,并附带 reasoning_text extra。 func (e *ChunkEmitter) EmitReasoningText(blockID, stage, text string, includeRole bool) error { if e == nil || e.emit == nil { return nil } text = strings.TrimSpace(text) if text == "" { return nil } payload, err := ToOpenAIReasoningChunkWithExtra( e.RequestID, e.ModelName, e.Created, text, includeRole, NewReasoningTextExtra(blockID, stage), ) if err != nil { return err } if payload == "" { return nil } return e.emit(payload) } // EmitAssistantText 输出一段 assistant 正文,并附带 assistant_text extra。 func (e *ChunkEmitter) EmitAssistantText(blockID, stage, text string, includeRole bool) error { if e == nil || e.emit == nil { return nil } //这里如果不删掉,换行符会被吞了,导致文字黏连 /* text = strings.TrimSpace(text)*/ if text == "" { return nil } e.closeThinkingSummaryGate(blockID, stage) payload, err := ToOpenAIAssistantChunkWithExtra( e.RequestID, e.ModelName, e.Created, text, includeRole, NewAssistantTextExtra(blockID, stage), ) if err != nil { return err } if payload == "" { return nil } return e.emit(payload) } // EmitThinkingSummary 输出一次“流式思考摘要”事件。 // // 协议约束: // 1. 该事件只走 extra.thinking_summary,不回写 delta.content / delta.reasoning_content; // 2. 仍复用现有 extra hook,让上层在不依赖 emitter 细节的前提下同步持久化; // 3. includeRole 不再需要,因为 thinking_summary 本身就是纯结构化事件。 func (e *ChunkEmitter) EmitThinkingSummary(blockID, stage string, summary StreamThinkingSummaryExtra) error { if e == nil || e.emit == nil { return nil } if e.isThinkingSummaryGateClosed(blockID, stage) { return nil } return e.emitExtraOnly(NewThinkingSummaryExtra(blockID, stage, summary)) } func (e *ChunkEmitter) openThinkingSummaryGate(blockID, stage string) { if e == nil { return } e.thinkingGateMu.Lock() if e.thinkingClosedBlocks != nil { delete(e.thinkingClosedBlocks, thinkingSummaryGateKey(blockID, stage)) } e.thinkingGateMu.Unlock() } func (e *ChunkEmitter) closeThinkingSummaryGate(blockID, stage string) { if e == nil { return } e.thinkingGateMu.Lock() if e.thinkingClosedBlocks == nil { e.thinkingClosedBlocks = make(map[string]bool) } e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)] = true e.thinkingGateMu.Unlock() } func (e *ChunkEmitter) isThinkingSummaryGateClosed(blockID, stage string) bool { if e == nil { return true } e.thinkingGateMu.Lock() defer e.thinkingGateMu.Unlock() return e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)] } func thinkingSummaryGateKey(blockID, stage string) string { blockID = strings.TrimSpace(blockID) stage = strings.TrimSpace(stage) if blockID != "" { return blockID } if stage != "" { return stage } return "__default__" } // EmitPseudoReasoningText 把整段 reasoning 文本按伪流式方式逐块推出。 func (e *ChunkEmitter) EmitPseudoReasoningText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error { return e.emitPseudoText( ctx, text, options, func(chunk string, includeRole bool) error { return e.EmitReasoningText(blockID, stage, chunk, includeRole) }, ) } // EmitPseudoAssistantText 把整段 assistant 文本按伪流式方式逐块推出。 func (e *ChunkEmitter) EmitPseudoAssistantText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error { return e.emitPseudoText( ctx, text, options, func(chunk string, includeRole bool) error { return e.EmitAssistantText(blockID, stage, chunk, includeRole) }, ) } // EmitStatus 输出一条阶段状态事件。 // // 协议约束: // 1. 状态事件只通过 extra 传递,不再写入 reasoning_content; // 2. includeRole 保留是为了兼容旧签名,当前结构化事件路径不依赖 role。 func (e *ChunkEmitter) EmitStatus(blockID, stage, code, summary string, includeRole bool) error { if e == nil || e.emit == nil { return nil } _ = includeRole return e.emitExtraOnly(NewStatusExtra(blockID, stage, code, summary)) } // EmitToolCallStart 输出一次工具调用开始事件。 // // 协议约束: // 1. 工具调用开始事件只走 extra.tool,不回写 reasoning_content; // 2. includeRole 保留是为了兼容旧签名,当前结构化事件路径不依赖 role。 func (e *ChunkEmitter) EmitToolCallStart(blockID, stage, toolName, summary, argumentsPreview string, includeRole bool) error { if e == nil || e.emit == nil { return nil } _ = includeRole return e.emitExtraOnly(NewToolCallExtra(blockID, stage, toolName, "start", summary, argumentsPreview)) } // EmitToolCallResult 输出一次工具调用结果事件。 // // 协议约束: // 1. status 由调用方明确传入(如 done/blocked/failed); // 2. 结果事件只走 extra.tool,不回写 reasoning_content。 func (e *ChunkEmitter) EmitToolCallResult( blockID string, stage string, toolName string, status string, summary string, argumentsPreview string, argumentView map[string]any, resultView map[string]any, includeRole bool, ) error { if e == nil || e.emit == nil { return nil } _ = includeRole return e.emitExtraOnly(NewToolResultExtra( blockID, stage, toolName, status, summary, argumentsPreview, argumentView, resultView, )) } // emitExtraOnly 仅输出结构化 extra 事件,不附带 content/reasoning。 func (e *ChunkEmitter) emitExtraOnly(extra *OpenAIChunkExtra) error { if e == nil || e.emit == nil { return nil } e.emitExtraEventHook(extra) payload, err := ToOpenAIStreamWithExtra( nil, e.RequestID, e.ModelName, e.Created, false, extra, ) if err != nil { return err } if payload == "" { return nil } return e.emit(payload) } func (e *ChunkEmitter) emitExtraEventHook(extra *OpenAIChunkExtra) { if e == nil || e.extraEventHook == nil || extra == nil { return } e.extraEventHook(extra) } // EmitConfirmRequest 输出一次待确认事件。 // // 当前展示策略: // 1. 对旧前端,confirm 文案通过 assistant content 直接可见; // 2. 对新前端,extra.confirm 可直接驱动确认卡片或按钮; // 3. 默认使用伪流式,避免确认文案整块砸下来太生硬。 func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, interactionID, title, summary string, options PseudoStreamOptions) error { if e == nil || e.emit == nil { return nil } text := buildConfirmAssistantText(title, summary) extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary) e.emitExtraEventHook(extra) if strings.TrimSpace(text) != "" { e.closeThinkingSummaryGate(blockID, stage) } return e.emitPseudoText( ctx, text, options, func(chunk string, includeRole bool) error { payload, err := ToOpenAIAssistantChunkWithExtra( e.RequestID, e.ModelName, e.Created, chunk, includeRole, extra, ) if err != nil { return err } if payload == "" { return nil } return e.emit(payload) }, ) } // EmitInterruptMessage 输出一次中断提示。 // // 适用场景: // 1. ask_user 追问; // 2. 告知用户当前会话已进入等待状态; // 3. 后续 connection_lost 恢复若需要对用户补一句解释,也可复用这一入口。 func (e *ChunkEmitter) EmitInterruptMessage(ctx context.Context, blockID, stage, interactionID, interactionType, summary string, options PseudoStreamOptions) error { if e == nil || e.emit == nil { return nil } text := buildInterruptAssistantText(interactionType, summary) extra := NewInterruptExtra(blockID, stage, interactionID, interactionType, summary) if strings.TrimSpace(text) != "" { e.closeThinkingSummaryGate(blockID, stage) } return e.emitPseudoText( ctx, text, options, func(chunk string, includeRole bool) error { payload, err := ToOpenAIAssistantChunkWithExtra( e.RequestID, e.ModelName, e.Created, chunk, includeRole, extra, ) if err != nil { return err } if payload == "" { return nil } return e.emit(payload) }, ) } // EmitScheduleCompleted 输出一次"排程完毕"卡片事件。 // // 协议约束: // 1. 只走 extra,不附带 content/reasoning; // 2. 前端拿到 kind=schedule_completed 后自行拉取排程数据渲染卡片。 func (e *ChunkEmitter) EmitScheduleCompleted(blockID, stage string) error { if e == nil || e.emit == nil { return nil } return e.emitExtraOnly(NewScheduleCompletedExtra(blockID, stage)) } // EmitBusinessCard 输出一次业务结果卡片事件。 // // 协议约束: // 1. 只走 extra,不附带 content/reasoning; // 2. card 为空时直接跳过,避免发出缺少关键字段的空卡片。 func (e *ChunkEmitter) EmitBusinessCard(blockID, stage string, card *StreamBusinessCardExtra) error { if e == nil || e.emit == nil || card == nil { return nil } return e.emitExtraOnly(NewBusinessCardExtra(blockID, stage, card)) } // EmitFinish 统一输出 stop 结束块,并带上 finish extra。 func (e *ChunkEmitter) EmitFinish(blockID, stage string) error { if e == nil || e.emit == nil { return nil } payload, err := ToOpenAIFinishStreamWithExtra( e.RequestID, e.ModelName, e.Created, NewFinishExtra(blockID, stage), ) if err != nil { return err } if payload == "" { return nil } return e.emit(payload) } // EmitDone 统一输出 OpenAI 兼容流式结束标记。 func (e *ChunkEmitter) EmitDone() error { if e == nil || e.emit == nil { return nil } return e.emit("[DONE]") } // EmitStreamAssistantText 从 StreamReader 逐 chunk 读取并实时推送 assistant 正文。 // // 职责边界: // 1. 负责把 StreamReader 的每个 chunk 实时转换为 SSE payload 推送; // 2. 负责累计完整文本并返回,供调用方写入 history; // 3. 不负责打开/关闭 StreamReader,调用方负责生命周期管理。 func (e *ChunkEmitter) EmitStreamAssistantText( ctx context.Context, reader llmservice.StreamReader, blockID, stage string, ) (string, error) { if e == nil || reader == nil { return "", nil } var fullText strings.Builder firstChunk := true digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage) if digestorErr != nil { return "", digestorErr } defer func() { if digestor != nil { _ = digestor.Close(ctx) } }() for { chunk, err := reader.Recv() if err != nil { if err == io.EOF { break } return fullText.String(), err } // 1. reasoning content 只喂给摘要器,不再透传给前端。 // 2. 未注入摘要能力时直接丢弃,避免 raw reasoning_content 泄漏到 SSE。 if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { if digestor != nil { digestor.Append(chunk.ReasoningContent) } } // 推送 assistant 正文。 if chunk != nil && chunk.Content != "" { if digestor != nil { digestor.MarkContentStarted() } if emitErr := e.EmitAssistantText(blockID, stage, chunk.Content, firstChunk); emitErr != nil { return fullText.String(), emitErr } fullText.WriteString(chunk.Content) firstChunk = false } } return fullText.String(), nil } // EmitStreamReasoningText 从 StreamReader 逐 chunk 读取 reasoning,并转成低频 thinking_summary。 // // 与 EmitStreamAssistantText 结构相同,但不再输出 raw ReasoningContent。 // 用于只需展示思考过程而无需展示正文的场景。 func (e *ChunkEmitter) EmitStreamReasoningText( ctx context.Context, reader llmservice.StreamReader, blockID, stage string, ) (string, error) { if e == nil || reader == nil { return "", nil } var fullText strings.Builder digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage) if digestorErr != nil { return "", digestorErr } defer func() { if digestor != nil { _ = digestor.Close(ctx) } }() for { chunk, err := reader.Recv() if err != nil { if err == io.EOF { break } return fullText.String(), err } if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { if digestor != nil { digestor.Append(chunk.ReasoningContent) } fullText.WriteString(chunk.ReasoningContent) } } return fullText.String(), nil } // EmitStageAsReasoning 把"阶段提示"伪装成 reasoning chunk 推给前端。 // // 兼容说明: // 1. 保留旧函数签名,方便当前旧链路直接复用; // 2. 实际实现已升级为统一的 ChunkEmitter + status extra; // 3. 这样后续新链路可以直接跳过这个兼容函数,转用结构化方法。 func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, created int64, stage, detail string, includeRole bool) error { return NewChunkEmitter(emit, requestID, modelName, created).EmitStatus(stage, stage, stage, detail, includeRole) } // EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。 // // 注意: // 1. 这里保持"整段发",不主动切块; // 2. 若后续某条链路需要更自然的阅读节奏,应直接调用 EmitPseudoAssistantText; // 3. 为兼容老调用侧,这里 blockID 和 stage 都留空。 func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error { return NewChunkEmitter(emit, requestID, modelName, created).EmitAssistantText("", "", content, includeRole) } // EmitFinish 统一输出 stop 结束块。 func EmitFinish(emit PayloadEmitter, requestID, modelName string, created int64) error { return NewChunkEmitter(emit, requestID, modelName, created).EmitFinish("", "") } // EmitDone 统一输出 OpenAI 兼容流式结束标记。 func EmitDone(emit PayloadEmitter) error { return NewChunkEmitter(emit, "", "", 0).EmitDone() } func buildStageReasoningText(stage, detail string) string { stage = strings.TrimSpace(stage) detail = strings.TrimSpace(detail) switch { case stage != "" && detail != "": return fmt.Sprintf("阶段:%s\n%s", stage, detail) case stage != "": return fmt.Sprintf("阶段:%s", stage) default: return detail } } func buildToolCallReasoningText(toolName, summary, argumentsPreview string) string { toolName = strings.TrimSpace(toolName) summary = strings.TrimSpace(summary) argumentsPreview = strings.TrimSpace(argumentsPreview) lines := make([]string, 0, 3) if toolName != "" { lines = append(lines, fmt.Sprintf("正在调用工具:%s", toolName)) } if summary != "" { lines = append(lines, summary) } if argumentsPreview != "" { lines = append(lines, fmt.Sprintf("参数摘要:%s", argumentsPreview)) } return strings.TrimSpace(strings.Join(lines, "\n")) } func buildToolResultReasoningText(toolName, summary string) string { toolName = strings.TrimSpace(toolName) summary = strings.TrimSpace(summary) switch { case toolName != "" && summary != "": return fmt.Sprintf("工具结果:%s\n%s", toolName, summary) case toolName != "": return fmt.Sprintf("工具结果:%s", toolName) default: return summary } } func buildConfirmAssistantText(title, summary string) string { title = strings.TrimSpace(title) summary = strings.TrimSpace(summary) switch { case title != "" && summary != "": return fmt.Sprintf("%s\n%s", title, summary) case title != "": return title default: return summary } } func buildInterruptAssistantText(interactionType, summary string) string { interactionType = strings.TrimSpace(interactionType) summary = strings.TrimSpace(summary) switch { case interactionType != "" && summary != "": return fmt.Sprintf("当前进入 %s 阶段。\n%s", interactionType, summary) case summary != "": return summary default: return interactionType } } func (e *ChunkEmitter) emitPseudoText(ctx context.Context, text string, options PseudoStreamOptions, emitChunk func(chunk string, includeRole bool) error) error { if emitChunk == nil { return nil } // 只剥首尾空格和制表符,保留结尾 \n,让上层加的段落分隔符能作为内容的一部分推出。 text = strings.TrimRight(strings.TrimLeft(text, " \t\r\n"), " \t\r") if text == "" { return nil } chunks := SplitPseudoStreamText(text, options) for i, chunk := range chunks { if err := emitChunk(chunk, i == 0); err != nil { return err } if i < len(chunks)-1 { if err := waitPseudoStreamInterval(ctx, options.ChunkInterval); err != nil { return err } } } return nil } // SplitPseudoStreamText 按"标点优先、长度兜底"的策略切分整段文本。 // // 步骤说明: // 1. 优先在句号、问号、感叹号、分号、换行等自然边界切块,保证阅读顺畅; // 2. 若长时间遇不到合适边界,则在 MaxChunkRunes 处强制切块,避免整段卡太久; // 3. 对中文文本优先按 rune 长度处理,避免多字节字符被截断。 func SplitPseudoStreamText(text string, options PseudoStreamOptions) []string { hasTrailingNewline := strings.HasSuffix(strings.TrimRight(text, " \t"), "\n") text = strings.TrimRight(strings.TrimLeft(text, " \t\r\n"), " \t\r") if text == "" { return nil } options = normalizePseudoStreamOptions(options) runes := []rune(text) if len(runes) <= options.MaxChunkRunes { // text 经 TrimRight(" \t\r") 已保留结尾 \n,直接返回,不再追加。 return []string{text} } chunks := make([]string, 0, len(runes)/options.MinChunkRunes+1) start := 0 size := 0 for i, r := range runes { size++ shouldFlush := false if size >= options.MaxChunkRunes { shouldFlush = true } if size >= options.MinChunkRunes && isPseudoStreamBoundary(r) { shouldFlush = true } if !shouldFlush { continue } // 用 Trim(" \t\r") 代替 TrimSpace:保留 chunk 内的 \n(段落分隔符)。 // TrimSpace 会把 flush 在 \n 边界时结尾的 \n、以及下一段开头的 \n 全部删掉,导致黏连。 chunk := strings.Trim(string(runes[start:i+1]), " \t\r") if chunk != "" { chunks = append(chunks, chunk) } start = i + 1 size = 0 } if start < len(runes) { chunk := strings.Trim(string(runes[start:]), " \t\r") if chunk != "" { chunks = append(chunks, chunk) } } if len(chunks) == 0 { return []string{text} } // 仅当最后一个 chunk 尚未以 \n 结尾时才追加,避免 Trim 修复后出现双换行。 if hasTrailingNewline && !strings.HasSuffix(chunks[len(chunks)-1], "\n") { chunks[len(chunks)-1] += "\n" } return chunks } func normalizePseudoStreamOptions(options PseudoStreamOptions) PseudoStreamOptions { if options.MinChunkRunes <= 0 { options.MinChunkRunes = defaultPseudoStreamMinChunkRunes } if options.MaxChunkRunes <= 0 { options.MaxChunkRunes = defaultPseudoStreamMaxChunkRunes } if options.MaxChunkRunes < options.MinChunkRunes { options.MaxChunkRunes = options.MinChunkRunes } return options } func isPseudoStreamBoundary(r rune) bool { switch r { case '。', '!', '?', ';', ':', ',', '.', '!', '?', ';', ':', ',', '\n': return true default: return false } } func waitPseudoStreamInterval(ctx context.Context, interval time.Duration) error { if interval <= 0 { return nil } if ctx == nil { ctx = context.Background() } timer := time.NewTimer(interval) defer timer.Stop() select { case <-ctx.Done(): return ctx.Err() case <-timer.C: return nil } }