Version: 0.4.5.dev.260307

feat: 📡 更新 SSE 消息流格式

* 将 SSE 消息流格式更新为 Apifox 可识别的 OpenAI 格式
* 便于后续与前端的对接与协作
This commit is contained in:
LoveLosita
2026-03-07 16:11:11 +08:00
parent 26c350f378
commit 3f95d23376
4 changed files with 257 additions and 86 deletions

View File

@@ -5,72 +5,99 @@ import (
"encoding/json"
"io"
"strings"
"time"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/schema"
"github.com/google/uuid"
arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model"
)
// StreamResponse Apifox/前端 识别设计的极简结构
// StreamResponse 为 OpenAI/DeepSeek 兼容的流式 chunk 结构
type StreamResponse struct {
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
} `json:"choices"`
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
Model string `json:"model"`
Choices []StreamChoice `json:"choices"`
}
// ToStreamResponseDTO 将 Eino 的内部 Chunk 转换为 StreamResponse DTO
func ToStreamResponseDTO(chunk *schema.Message) StreamResponse {
var dto StreamResponse
dto.Choices = append(dto.Choices, struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
}{})
dto.Choices[0].Delta.Content = chunk.Content
return dto
type StreamChoice struct {
Index int `json:"index"`
Delta StreamDelta `json:"delta"`
FinishReason *string `json:"finish_reason"`
}
func ToStreamReasoningResponseDTO(chunk *schema.Message) StreamResponse {
var dto StreamResponse
dto.Choices = append(dto.Choices, struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
}{})
dto.Choices[0].Delta.Content = chunk.ReasoningContent
return dto
type StreamDelta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
}
// ToOpenAIStream 负责将 Eino 的内部 Chunk 转为 OpenAI 兼容的 data: {JSON} 字符串
func ToOpenAIStream(chunk *schema.Message) (string, error) {
var dto StreamResponse
if chunk.ReasoningContent != "" {
dto = ToStreamReasoningResponseDTO(chunk)
} else {
dto = ToStreamResponseDTO(chunk)
// ToOpenAIStream 将单个 Eino chunk 转为 OpenAI 兼容 JSON
func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool) (string, error) {
delta := StreamDelta{}
if includeRole {
delta.Role = "assistant"
}
if chunk != nil {
delta.Content = chunk.Content
delta.ReasoningContent = chunk.ReasoningContent
}
if delta.Role == "" && delta.Content == "" && delta.ReasoningContent == "" {
return "", nil
}
dto := StreamResponse{
ID: requestID,
Object: "chat.completion.chunk",
Created: created,
Model: modelName,
Choices: []StreamChoice{{
Index: 0,
Delta: delta,
FinishReason: nil,
}},
}
jsonBytes, err := json.Marshal(dto)
if err != nil {
return "", err
}
// 严格遵循 SSE 协议格式
return string(jsonBytes), nil
}
func StreamChat(ctx context.Context, llm *ark.ChatModel, userInput string, ifThinking bool, chatHistory []*schema.Message, outChan chan<- string) (string, error) {
// 1. 组装消息
// ToOpenAIFinishStream 生成结束 chunkfinish_reason=stop
func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, error) {
stop := "stop"
dto := StreamResponse{
ID: requestID,
Object: "chat.completion.chunk",
Created: created,
Model: modelName,
Choices: []StreamChoice{{
Index: 0,
Delta: StreamDelta{},
FinishReason: &stop,
}},
}
jsonBytes, err := json.Marshal(dto)
if err != nil {
return "", err
}
return string(jsonBytes), nil
}
func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userInput string, ifThinking bool, chatHistory []*schema.Message, outChan chan<- string) (string, error) {
// 1) 组装提示消息
messages := make([]*schema.Message, 0)
// A. 塞入 System Message (人设)
messages = append(messages, schema.SystemMessage(SystemPrompt))
// B. 塞入历史记录 (上下文)
if len(chatHistory) > 0 {
messages = append(messages, chatHistory...)
}
// C. 塞入用户当前的消息 (当前需求)
messages = append(messages, schema.UserMessage(userInput))
// 2. 调用流式接口
// 2) 发起流式请求
var thinking *ark.Thinking
if ifThinking {
thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeEnabled}
@@ -81,28 +108,45 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, userInput string, ifThi
if err != nil {
return "", err
}
defer reader.Close() // 记得关闭 Reader
defer reader.Close()
// 3. 循环读取直到结束
if strings.TrimSpace(modelName) == "" {
modelName = "smartflow-worker"
}
requestID := "chatcmpl-" + uuid.NewString()
created := time.Now().Unix()
firstChunk := true
// 3) 持续转发 chunk
var fullText strings.Builder
for {
chunk, err := reader.Recv()
if err == io.EOF {
break // 读取完成
break
}
if err != nil {
return "", err
}
/*if chunk.Content == "" { // 过滤掉空内容,避免发送无效消息
continue
}*/
fullText.WriteString(chunk.Content)
// 将内容发送到通道中供前端消费
retChuck, err := ToOpenAIStream(chunk)
payload, err := ToOpenAIStream(chunk, requestID, modelName, created, firstChunk)
if err != nil {
return "", err
}
outChan <- retChuck
if payload != "" {
outChan <- payload
firstChunk = false
}
}
// 4) 发送结束 chunk 和 [DONE]
finishChunk, err := ToOpenAIFinishStream(requestID, modelName, created)
if err != nil {
return "", err
}
outChan <- finishChunk
outChan <- "[DONE]"
return fullText.String(), nil
}