Files
Losita d7184b776b Version: 0.9.75.dev.260505
后端:
1.收口阶段 6 agent 结构迁移,将 newAgent 内核与 agentsvc 编排层迁入 services/agent
- 切换 Agent 启动装配与 HTTP handler 直连 agent sv,移除旧 service agent bridge
- 补齐 Agent 对 memory、task、task-class、schedule 的 RPC 适配与契约字段
- 扩展 schedule、task、task-class RPC/contract 支撑 Agent 查询、写入与 provider 切流
- 更新迁移文档、README 与相关注释,明确 agent 当前切流点和剩余 memory 迁移面
2026-05-05 16:00:57 +08:00

871 lines
25 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package agentstream
import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"
llmservice "github.com/LoveLosita/smartflow/backend/services/llm"
)
// PayloadEmitter 是真正向外层 SSE 管道写 chunk 的最小接口。
//
// 说明:
// 1. 这里刻意不用 chan/string 绑死实现;
// 2. 上层既可以传"写 channel"的函数,也可以传"写 gin stream"的函数;
// 3. 只要签名是 `func(string) error`,都能接进来。
type PayloadEmitter func(payload string) error
// StageEmitter 是 graph/node 对"当前阶段"进行推送的兼容接口。
//
// 设计说明:
// 1. 旧调用侧仍然只关心 stage/detail 两段文本,因此这里先保留;
// 2. 新的结构化事件能力会通过 ChunkEmitter 补齐,而不是继续扩展这个函数签名;
// 3. 这样能兼顾当前兼容性和后续协议升级空间。
type StageEmitter func(stage, detail string)
// PseudoStreamOptions 描述"整段文字伪流式输出"的切块与节奏配置。
//
// 字段语义:
// 1. MinChunkRunes达到该最小长度后若命中标点/换行等边界,可提前切块;
// 2. MaxChunkRunes单块最大 rune 数,超过后强制切块,避免一次性发太长;
// 3. ChunkInterval块与块之间的等待时间为 0 时表示只做切块,不做人为延迟。
type PseudoStreamOptions struct {
MinChunkRunes int
MaxChunkRunes int
ChunkInterval time.Duration
}
const (
defaultPseudoStreamMinChunkRunes = 8
defaultPseudoStreamMaxChunkRunes = 24
)
// DefaultPseudoStreamOptions 返回一份适合中文短句展示的默认伪流式配置。
func DefaultPseudoStreamOptions() PseudoStreamOptions {
return PseudoStreamOptions{
MinChunkRunes: defaultPseudoStreamMinChunkRunes,
MaxChunkRunes: defaultPseudoStreamMaxChunkRunes,
}
}
// ChunkEmitter 是 agent 统一的 SSE chunk 发射器。
//
// 职责边界:
// 1. 负责把"正文 / 思考 / 工具事件 / 确认请求 / 中断提示"统一转换成 OpenAI 兼容 payload
// 2. 负责在必要时把结构化事件附带成 extra同时给当前前端提供可读的降级文本
// 3. 不负责决定什么时候发什么,也不负责持久化状态。
type ChunkEmitter struct {
emit PayloadEmitter
RequestID string
ModelName string
Created int64
// thinkingGateMu 是“正文门卫”的轻量保护。
// 1. 它只保护 thinking_summary 是否还能发,不串行化全部 SSE
// 2. 正文一旦开始,对应 block 的门会被关闭,后续同 block 摘要直接丢弃;
// 3. 这样既避免摘要 goroutine 在正文之后补发旧思考,又不误杀后续节点的新一轮思考。
thinkingGateMu sync.Mutex
thinkingClosedBlocks map[string]bool
// reasoningSummaryFunc 用于把原始 reasoning 压成用户可见摘要。
// 1. 该函数由 service 层注入stream 包只负责调度,不负责选择模型;
// 2. 未注入时模型 reasoning 只会被静默丢弃,不再回退成 raw reasoning_content
// 3. 正文一旦开始ReasoningDigestor 和 ChunkEmitter 会同时关门,迟到结果不会再发给前端。
reasoningSummaryFunc ReasoningSummaryFunc
// extraEventHook 用于把关键结构化事件同步给上层做持久化。
// 1. hook 失败不能影响 SSE 主链路;
// 2. hook 只接收 extra 结构,避免 emitter 反向依赖业务层;
// 3. 不注入时保持空实现,兼容旧调用路径。
extraEventHook func(extra *OpenAIChunkExtra)
}
// NoopPayloadEmitter 返回一个空实现,便于骨架期安全占位。
func NoopPayloadEmitter() PayloadEmitter {
return func(string) error { return nil }
}
// NoopStageEmitter 返回一个空实现,避免 graph 在没有接前端时处处判空。
func NoopStageEmitter() StageEmitter {
return func(stage, detail string) {}
}
// WrapStageEmitter 把可空函数包装成稳定的 StageEmitter。
func WrapStageEmitter(fn func(stage, detail string)) StageEmitter {
if fn == nil {
return NoopStageEmitter()
}
return fn
}
// NewChunkEmitter 创建统一 chunk 发射器。
//
// 兜底策略:
// 1. emit 为空时回退到 Noop避免骨架期到处判空
// 2. modelName 为空时回填 worker保持 OpenAI 兼容字段稳定;
// 3. created <= 0 时用当前时间兜底,避免上层还没决定时间戳就无法复用。
func NewChunkEmitter(emit PayloadEmitter, requestID, modelName string, created int64) *ChunkEmitter {
if emit == nil {
emit = NoopPayloadEmitter()
}
modelName = strings.TrimSpace(modelName)
if modelName == "" {
modelName = "worker"
}
if created <= 0 {
created = time.Now().Unix()
}
return &ChunkEmitter{
emit: emit,
RequestID: strings.TrimSpace(requestID),
ModelName: modelName,
Created: created,
}
}
// SetExtraEventHook 设置结构化事件回调。
func (e *ChunkEmitter) SetExtraEventHook(hook func(extra *OpenAIChunkExtra)) {
if e == nil {
return
}
e.extraEventHook = hook
}
// SetReasoningSummaryFunc 设置 reasoning 摘要模型调用函数。
//
// 职责边界:
// 1. 这里只保存函数引用,不立即调用模型;
// 2. 摘要触发频率、单飞、正文闸门由 ReasoningDigestor 负责;
// 3. 传 nil 表示关闭摘要能力,后续 reasoning chunk 会被静默丢弃。
func (e *ChunkEmitter) SetReasoningSummaryFunc(fn ReasoningSummaryFunc) {
if e == nil {
return
}
e.reasoningSummaryFunc = fn
}
// NewReasoningDigestor 为当前 block 创建一个 reasoning 摘要器。
//
// 步骤说明:
// 1. 若未注入摘要函数,返回 nil调用方只需跳过 raw reasoning 推送;
// 2. 摘要结果先经过 ChunkEmitter 的正文门卫,再走统一 extra/hook 链路;
// 3. Digestor 自身仍负责单飞、水位线和正文开始后的 in-flight 结果丢弃。
func (e *ChunkEmitter) NewReasoningDigestor(ctx context.Context, blockID, stage string) (*ReasoningDigestor, error) {
if e == nil || e.reasoningSummaryFunc == nil {
return nil, nil
}
e.openThinkingSummaryGate(blockID, stage)
return NewReasoningDigestor(ReasoningDigestorOptions{
SummaryFunc: e.reasoningSummaryFunc,
SummarySink: func(summary StreamThinkingSummaryExtra) {
_ = e.EmitThinkingSummary(blockID, stage, summary)
},
BaseContext: ctx,
SummaryTimeout: 8 * time.Second,
})
}
// EmitReasoningText 输出一段 reasoning 文字,并附带 reasoning_text extra。
func (e *ChunkEmitter) EmitReasoningText(blockID, stage, text string, includeRole bool) error {
if e == nil || e.emit == nil {
return nil
}
text = strings.TrimSpace(text)
if text == "" {
return nil
}
payload, err := ToOpenAIReasoningChunkWithExtra(
e.RequestID,
e.ModelName,
e.Created,
text,
includeRole,
NewReasoningTextExtra(blockID, stage),
)
if err != nil {
return err
}
if payload == "" {
return nil
}
return e.emit(payload)
}
// EmitAssistantText 输出一段 assistant 正文,并附带 assistant_text extra。
func (e *ChunkEmitter) EmitAssistantText(blockID, stage, text string, includeRole bool) error {
if e == nil || e.emit == nil {
return nil
}
//这里如果不删掉,换行符会被吞了,导致文字黏连
/* text = strings.TrimSpace(text)*/
if text == "" {
return nil
}
e.closeThinkingSummaryGate(blockID, stage)
payload, err := ToOpenAIAssistantChunkWithExtra(
e.RequestID,
e.ModelName,
e.Created,
text,
includeRole,
NewAssistantTextExtra(blockID, stage),
)
if err != nil {
return err
}
if payload == "" {
return nil
}
return e.emit(payload)
}
// EmitThinkingSummary 输出一次“流式思考摘要”事件。
//
// 协议约束:
// 1. 该事件只走 extra.thinking_summary不回写 delta.content / delta.reasoning_content
// 2. 仍复用现有 extra hook让上层在不依赖 emitter 细节的前提下同步持久化;
// 3. includeRole 不再需要,因为 thinking_summary 本身就是纯结构化事件。
func (e *ChunkEmitter) EmitThinkingSummary(blockID, stage string, summary StreamThinkingSummaryExtra) error {
if e == nil || e.emit == nil {
return nil
}
if e.isThinkingSummaryGateClosed(blockID, stage) {
return nil
}
return e.emitExtraOnly(NewThinkingSummaryExtra(blockID, stage, summary))
}
func (e *ChunkEmitter) openThinkingSummaryGate(blockID, stage string) {
if e == nil {
return
}
e.thinkingGateMu.Lock()
if e.thinkingClosedBlocks != nil {
delete(e.thinkingClosedBlocks, thinkingSummaryGateKey(blockID, stage))
}
e.thinkingGateMu.Unlock()
}
func (e *ChunkEmitter) closeThinkingSummaryGate(blockID, stage string) {
if e == nil {
return
}
e.thinkingGateMu.Lock()
if e.thinkingClosedBlocks == nil {
e.thinkingClosedBlocks = make(map[string]bool)
}
e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)] = true
e.thinkingGateMu.Unlock()
}
func (e *ChunkEmitter) isThinkingSummaryGateClosed(blockID, stage string) bool {
if e == nil {
return true
}
e.thinkingGateMu.Lock()
defer e.thinkingGateMu.Unlock()
return e.thinkingClosedBlocks[thinkingSummaryGateKey(blockID, stage)]
}
func thinkingSummaryGateKey(blockID, stage string) string {
blockID = strings.TrimSpace(blockID)
stage = strings.TrimSpace(stage)
if blockID != "" {
return blockID
}
if stage != "" {
return stage
}
return "__default__"
}
// EmitPseudoReasoningText 把整段 reasoning 文本按伪流式方式逐块推出。
func (e *ChunkEmitter) EmitPseudoReasoningText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error {
return e.emitPseudoText(
ctx,
text,
options,
func(chunk string, includeRole bool) error {
return e.EmitReasoningText(blockID, stage, chunk, includeRole)
},
)
}
// EmitPseudoAssistantText 把整段 assistant 文本按伪流式方式逐块推出。
func (e *ChunkEmitter) EmitPseudoAssistantText(ctx context.Context, blockID, stage, text string, options PseudoStreamOptions) error {
return e.emitPseudoText(
ctx,
text,
options,
func(chunk string, includeRole bool) error {
return e.EmitAssistantText(blockID, stage, chunk, includeRole)
},
)
}
// EmitStatus 输出一条阶段状态事件。
//
// 协议约束:
// 1. 状态事件只通过 extra 传递,不再写入 reasoning_content
// 2. includeRole 保留是为了兼容旧签名,当前结构化事件路径不依赖 role。
func (e *ChunkEmitter) EmitStatus(blockID, stage, code, summary string, includeRole bool) error {
if e == nil || e.emit == nil {
return nil
}
_ = includeRole
return e.emitExtraOnly(NewStatusExtra(blockID, stage, code, summary))
}
// EmitToolCallStart 输出一次工具调用开始事件。
//
// 协议约束:
// 1. 工具调用开始事件只走 extra.tool不回写 reasoning_content
// 2. includeRole 保留是为了兼容旧签名,当前结构化事件路径不依赖 role。
func (e *ChunkEmitter) EmitToolCallStart(blockID, stage, toolName, summary, argumentsPreview string, includeRole bool) error {
if e == nil || e.emit == nil {
return nil
}
_ = includeRole
return e.emitExtraOnly(NewToolCallExtra(blockID, stage, toolName, "start", summary, argumentsPreview))
}
// EmitToolCallResult 输出一次工具调用结果事件。
//
// 协议约束:
// 1. status 由调用方明确传入(如 done/blocked/failed
// 2. 结果事件只走 extra.tool不回写 reasoning_content。
func (e *ChunkEmitter) EmitToolCallResult(
blockID string,
stage string,
toolName string,
status string,
summary string,
argumentsPreview string,
argumentView map[string]any,
resultView map[string]any,
includeRole bool,
) error {
if e == nil || e.emit == nil {
return nil
}
_ = includeRole
return e.emitExtraOnly(NewToolResultExtra(
blockID,
stage,
toolName,
status,
summary,
argumentsPreview,
argumentView,
resultView,
))
}
// emitExtraOnly 仅输出结构化 extra 事件,不附带 content/reasoning。
func (e *ChunkEmitter) emitExtraOnly(extra *OpenAIChunkExtra) error {
if e == nil || e.emit == nil {
return nil
}
e.emitExtraEventHook(extra)
payload, err := ToOpenAIStreamWithExtra(
nil,
e.RequestID,
e.ModelName,
e.Created,
false,
extra,
)
if err != nil {
return err
}
if payload == "" {
return nil
}
return e.emit(payload)
}
func (e *ChunkEmitter) emitExtraEventHook(extra *OpenAIChunkExtra) {
if e == nil || e.extraEventHook == nil || extra == nil {
return
}
e.extraEventHook(extra)
}
// EmitConfirmRequest 输出一次待确认事件。
//
// 当前展示策略:
// 1. 对旧前端confirm 文案通过 assistant content 直接可见;
// 2. 对新前端extra.confirm 可直接驱动确认卡片或按钮;
// 3. 默认使用伪流式,避免确认文案整块砸下来太生硬。
func (e *ChunkEmitter) EmitConfirmRequest(ctx context.Context, blockID, stage, interactionID, title, summary string, options PseudoStreamOptions) error {
if e == nil || e.emit == nil {
return nil
}
text := buildConfirmAssistantText(title, summary)
extra := NewConfirmRequestExtra(blockID, stage, interactionID, title, summary)
e.emitExtraEventHook(extra)
if strings.TrimSpace(text) != "" {
e.closeThinkingSummaryGate(blockID, stage)
}
return e.emitPseudoText(
ctx,
text,
options,
func(chunk string, includeRole bool) error {
payload, err := ToOpenAIAssistantChunkWithExtra(
e.RequestID,
e.ModelName,
e.Created,
chunk,
includeRole,
extra,
)
if err != nil {
return err
}
if payload == "" {
return nil
}
return e.emit(payload)
},
)
}
// EmitInterruptMessage 输出一次中断提示。
//
// 适用场景:
// 1. ask_user 追问;
// 2. 告知用户当前会话已进入等待状态;
// 3. 后续 connection_lost 恢复若需要对用户补一句解释,也可复用这一入口。
func (e *ChunkEmitter) EmitInterruptMessage(ctx context.Context, blockID, stage, interactionID, interactionType, summary string, options PseudoStreamOptions) error {
if e == nil || e.emit == nil {
return nil
}
text := buildInterruptAssistantText(interactionType, summary)
extra := NewInterruptExtra(blockID, stage, interactionID, interactionType, summary)
if strings.TrimSpace(text) != "" {
e.closeThinkingSummaryGate(blockID, stage)
}
return e.emitPseudoText(
ctx,
text,
options,
func(chunk string, includeRole bool) error {
payload, err := ToOpenAIAssistantChunkWithExtra(
e.RequestID,
e.ModelName,
e.Created,
chunk,
includeRole,
extra,
)
if err != nil {
return err
}
if payload == "" {
return nil
}
return e.emit(payload)
},
)
}
// EmitScheduleCompleted 输出一次"排程完毕"卡片事件。
//
// 协议约束:
// 1. 只走 extra不附带 content/reasoning
// 2. 前端拿到 kind=schedule_completed 后自行拉取排程数据渲染卡片。
func (e *ChunkEmitter) EmitScheduleCompleted(blockID, stage string) error {
if e == nil || e.emit == nil {
return nil
}
return e.emitExtraOnly(NewScheduleCompletedExtra(blockID, stage))
}
// EmitBusinessCard 输出一次业务结果卡片事件。
//
// 协议约束:
// 1. 只走 extra不附带 content/reasoning
// 2. card 为空时直接跳过,避免发出缺少关键字段的空卡片。
func (e *ChunkEmitter) EmitBusinessCard(blockID, stage string, card *StreamBusinessCardExtra) error {
if e == nil || e.emit == nil || card == nil {
return nil
}
return e.emitExtraOnly(NewBusinessCardExtra(blockID, stage, card))
}
// EmitFinish 统一输出 stop 结束块,并带上 finish extra。
func (e *ChunkEmitter) EmitFinish(blockID, stage string) error {
if e == nil || e.emit == nil {
return nil
}
payload, err := ToOpenAIFinishStreamWithExtra(
e.RequestID,
e.ModelName,
e.Created,
NewFinishExtra(blockID, stage),
)
if err != nil {
return err
}
if payload == "" {
return nil
}
return e.emit(payload)
}
// EmitDone 统一输出 OpenAI 兼容流式结束标记。
func (e *ChunkEmitter) EmitDone() error {
if e == nil || e.emit == nil {
return nil
}
return e.emit("[DONE]")
}
// EmitStreamAssistantText 从 StreamReader 逐 chunk 读取并实时推送 assistant 正文。
//
// 职责边界:
// 1. 负责把 StreamReader 的每个 chunk 实时转换为 SSE payload 推送;
// 2. 负责累计完整文本并返回,供调用方写入 history
// 3. 不负责打开/关闭 StreamReader调用方负责生命周期管理。
func (e *ChunkEmitter) EmitStreamAssistantText(
ctx context.Context,
reader llmservice.StreamReader,
blockID, stage string,
) (string, error) {
if e == nil || reader == nil {
return "", nil
}
var fullText strings.Builder
firstChunk := true
digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage)
if digestorErr != nil {
return "", digestorErr
}
defer func() {
if digestor != nil {
_ = digestor.Close(ctx)
}
}()
for {
chunk, err := reader.Recv()
if err != nil {
if err == io.EOF {
break
}
return fullText.String(), err
}
// 1. reasoning content 只喂给摘要器,不再透传给前端。
// 2. 未注入摘要能力时直接丢弃,避免 raw reasoning_content 泄漏到 SSE。
if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if digestor != nil {
digestor.Append(chunk.ReasoningContent)
}
}
// 推送 assistant 正文。
if chunk != nil && chunk.Content != "" {
if digestor != nil {
digestor.MarkContentStarted()
}
if emitErr := e.EmitAssistantText(blockID, stage, chunk.Content, firstChunk); emitErr != nil {
return fullText.String(), emitErr
}
fullText.WriteString(chunk.Content)
firstChunk = false
}
}
return fullText.String(), nil
}
// EmitStreamReasoningText 从 StreamReader 逐 chunk 读取 reasoning并转成低频 thinking_summary。
//
// 与 EmitStreamAssistantText 结构相同,但不再输出 raw ReasoningContent。
// 用于只需展示思考过程而无需展示正文的场景。
func (e *ChunkEmitter) EmitStreamReasoningText(
ctx context.Context,
reader llmservice.StreamReader,
blockID, stage string,
) (string, error) {
if e == nil || reader == nil {
return "", nil
}
var fullText strings.Builder
digestor, digestorErr := e.NewReasoningDigestor(ctx, blockID, stage)
if digestorErr != nil {
return "", digestorErr
}
defer func() {
if digestor != nil {
_ = digestor.Close(ctx)
}
}()
for {
chunk, err := reader.Recv()
if err != nil {
if err == io.EOF {
break
}
return fullText.String(), err
}
if chunk != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if digestor != nil {
digestor.Append(chunk.ReasoningContent)
}
fullText.WriteString(chunk.ReasoningContent)
}
}
return fullText.String(), nil
}
// EmitStageAsReasoning 把"阶段提示"伪装成 reasoning chunk 推给前端。
//
// 兼容说明:
// 1. 保留旧函数签名,方便当前旧链路直接复用;
// 2. 实际实现已升级为统一的 ChunkEmitter + status extra
// 3. 这样后续新链路可以直接跳过这个兼容函数,转用结构化方法。
func EmitStageAsReasoning(emit PayloadEmitter, requestID, modelName string, created int64, stage, detail string, includeRole bool) error {
return NewChunkEmitter(emit, requestID, modelName, created).EmitStatus(stage, stage, stage, detail, includeRole)
}
// EmitAssistantReply 把一段完整正文作为 assistant chunk 推出。
//
// 注意:
// 1. 这里保持"整段发",不主动切块;
// 2. 若后续某条链路需要更自然的阅读节奏,应直接调用 EmitPseudoAssistantText
// 3. 为兼容老调用侧,这里 blockID 和 stage 都留空。
func EmitAssistantReply(emit PayloadEmitter, requestID, modelName string, created int64, content string, includeRole bool) error {
return NewChunkEmitter(emit, requestID, modelName, created).EmitAssistantText("", "", content, includeRole)
}
// EmitFinish 统一输出 stop 结束块。
func EmitFinish(emit PayloadEmitter, requestID, modelName string, created int64) error {
return NewChunkEmitter(emit, requestID, modelName, created).EmitFinish("", "")
}
// EmitDone 统一输出 OpenAI 兼容流式结束标记。
func EmitDone(emit PayloadEmitter) error {
return NewChunkEmitter(emit, "", "", 0).EmitDone()
}
func buildStageReasoningText(stage, detail string) string {
stage = strings.TrimSpace(stage)
detail = strings.TrimSpace(detail)
switch {
case stage != "" && detail != "":
return fmt.Sprintf("阶段:%s\n%s", stage, detail)
case stage != "":
return fmt.Sprintf("阶段:%s", stage)
default:
return detail
}
}
func buildToolCallReasoningText(toolName, summary, argumentsPreview string) string {
toolName = strings.TrimSpace(toolName)
summary = strings.TrimSpace(summary)
argumentsPreview = strings.TrimSpace(argumentsPreview)
lines := make([]string, 0, 3)
if toolName != "" {
lines = append(lines, fmt.Sprintf("正在调用工具:%s", toolName))
}
if summary != "" {
lines = append(lines, summary)
}
if argumentsPreview != "" {
lines = append(lines, fmt.Sprintf("参数摘要:%s", argumentsPreview))
}
return strings.TrimSpace(strings.Join(lines, "\n"))
}
func buildToolResultReasoningText(toolName, summary string) string {
toolName = strings.TrimSpace(toolName)
summary = strings.TrimSpace(summary)
switch {
case toolName != "" && summary != "":
return fmt.Sprintf("工具结果:%s\n%s", toolName, summary)
case toolName != "":
return fmt.Sprintf("工具结果:%s", toolName)
default:
return summary
}
}
func buildConfirmAssistantText(title, summary string) string {
title = strings.TrimSpace(title)
summary = strings.TrimSpace(summary)
switch {
case title != "" && summary != "":
return fmt.Sprintf("%s\n%s", title, summary)
case title != "":
return title
default:
return summary
}
}
func buildInterruptAssistantText(interactionType, summary string) string {
interactionType = strings.TrimSpace(interactionType)
summary = strings.TrimSpace(summary)
switch {
case interactionType != "" && summary != "":
return fmt.Sprintf("当前进入 %s 阶段。\n%s", interactionType, summary)
case summary != "":
return summary
default:
return interactionType
}
}
func (e *ChunkEmitter) emitPseudoText(ctx context.Context, text string, options PseudoStreamOptions, emitChunk func(chunk string, includeRole bool) error) error {
if emitChunk == nil {
return nil
}
// 只剥首尾空格和制表符,保留结尾 \n让上层加的段落分隔符能作为内容的一部分推出。
text = strings.TrimRight(strings.TrimLeft(text, " \t\r\n"), " \t\r")
if text == "" {
return nil
}
chunks := SplitPseudoStreamText(text, options)
for i, chunk := range chunks {
if err := emitChunk(chunk, i == 0); err != nil {
return err
}
if i < len(chunks)-1 {
if err := waitPseudoStreamInterval(ctx, options.ChunkInterval); err != nil {
return err
}
}
}
return nil
}
// SplitPseudoStreamText 按"标点优先、长度兜底"的策略切分整段文本。
//
// 步骤说明:
// 1. 优先在句号、问号、感叹号、分号、换行等自然边界切块,保证阅读顺畅;
// 2. 若长时间遇不到合适边界,则在 MaxChunkRunes 处强制切块,避免整段卡太久;
// 3. 对中文文本优先按 rune 长度处理,避免多字节字符被截断。
func SplitPseudoStreamText(text string, options PseudoStreamOptions) []string {
hasTrailingNewline := strings.HasSuffix(strings.TrimRight(text, " \t"), "\n")
text = strings.TrimRight(strings.TrimLeft(text, " \t\r\n"), " \t\r")
if text == "" {
return nil
}
options = normalizePseudoStreamOptions(options)
runes := []rune(text)
if len(runes) <= options.MaxChunkRunes {
// text 经 TrimRight(" \t\r") 已保留结尾 \n直接返回不再追加。
return []string{text}
}
chunks := make([]string, 0, len(runes)/options.MinChunkRunes+1)
start := 0
size := 0
for i, r := range runes {
size++
shouldFlush := false
if size >= options.MaxChunkRunes {
shouldFlush = true
}
if size >= options.MinChunkRunes && isPseudoStreamBoundary(r) {
shouldFlush = true
}
if !shouldFlush {
continue
}
// 用 Trim(" \t\r") 代替 TrimSpace保留 chunk 内的 \n段落分隔符
// TrimSpace 会把 flush 在 \n 边界时结尾的 \n、以及下一段开头的 \n 全部删掉,导致黏连。
chunk := strings.Trim(string(runes[start:i+1]), " \t\r")
if chunk != "" {
chunks = append(chunks, chunk)
}
start = i + 1
size = 0
}
if start < len(runes) {
chunk := strings.Trim(string(runes[start:]), " \t\r")
if chunk != "" {
chunks = append(chunks, chunk)
}
}
if len(chunks) == 0 {
return []string{text}
}
// 仅当最后一个 chunk 尚未以 \n 结尾时才追加,避免 Trim 修复后出现双换行。
if hasTrailingNewline && !strings.HasSuffix(chunks[len(chunks)-1], "\n") {
chunks[len(chunks)-1] += "\n"
}
return chunks
}
func normalizePseudoStreamOptions(options PseudoStreamOptions) PseudoStreamOptions {
if options.MinChunkRunes <= 0 {
options.MinChunkRunes = defaultPseudoStreamMinChunkRunes
}
if options.MaxChunkRunes <= 0 {
options.MaxChunkRunes = defaultPseudoStreamMaxChunkRunes
}
if options.MaxChunkRunes < options.MinChunkRunes {
options.MaxChunkRunes = options.MinChunkRunes
}
return options
}
func isPseudoStreamBoundary(r rune) bool {
switch r {
case '。', '', '', '', '', '', '.', '!', '?', ';', ':', ',', '\n':
return true
default:
return false
}
}
func waitPseudoStreamInterval(ctx context.Context, interval time.Duration) error {
if interval <= 0 {
return nil
}
if ctx == nil {
ctx = context.Background()
}
timer := time.NewTimer(interval)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}