From 3f95d23376c0fcdae9214869b60caf6c90ebdd2b Mon Sep 17 00:00:00 2001 From: LoveLosita <2810873701@qq.com> Date: Sat, 7 Mar 2026 16:11:11 +0800 Subject: [PATCH] =?UTF-8?q?Version:=200.4.5.dev.260307=20feat:=20?= =?UTF-8?q?=F0=9F=93=A1=20=E6=9B=B4=E6=96=B0=20SSE=20=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=B5=81=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 将 SSE 消息流格式更新为 Apifox 可识别的 OpenAI 格式 * 便于后续与前端的对接与协作 --- backend/agent/graph.go | 140 ++++++++++++++++++---------- backend/api/agent.go | 40 +++++--- backend/service/agent.go | 53 +++++------ docs/apifox/agent-chat.openapi.yaml | 110 ++++++++++++++++++++++ 4 files changed, 257 insertions(+), 86 deletions(-) create mode 100644 docs/apifox/agent-chat.openapi.yaml diff --git a/backend/agent/graph.go b/backend/agent/graph.go index 49073d1..4181e7b 100644 --- a/backend/agent/graph.go +++ b/backend/agent/graph.go @@ -5,72 +5,99 @@ import ( "encoding/json" "io" "strings" + "time" "github.com/cloudwego/eino-ext/components/model/ark" "github.com/cloudwego/eino/schema" + "github.com/google/uuid" arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model" ) -// StreamResponse 专为 Apifox/前端 识别设计的极简结构 +// StreamResponse 为 OpenAI/DeepSeek 兼容的流式 chunk 结构 type StreamResponse struct { - Choices []struct { - Delta struct { - Content string `json:"content"` - } `json:"delta"` - } `json:"choices"` + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []StreamChoice `json:"choices"` } -// ToStreamResponseDTO 将 Eino 的内部 Chunk 转换为 StreamResponse DTO -func ToStreamResponseDTO(chunk *schema.Message) StreamResponse { - var dto StreamResponse - dto.Choices = append(dto.Choices, struct { - Delta struct { - Content string `json:"content"` - } `json:"delta"` - }{}) - dto.Choices[0].Delta.Content = chunk.Content - return dto +type StreamChoice struct { + Index int `json:"index"` + Delta StreamDelta `json:"delta"` + FinishReason *string `json:"finish_reason"` } -func ToStreamReasoningResponseDTO(chunk *schema.Message) StreamResponse { - var dto StreamResponse - dto.Choices = append(dto.Choices, struct { - Delta struct { - Content string `json:"content"` - } `json:"delta"` - }{}) - dto.Choices[0].Delta.Content = chunk.ReasoningContent - return dto +type StreamDelta struct { + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + ReasoningContent string `json:"reasoning_content,omitempty"` } -// ToOpenAIStream 负责将 Eino 的内部 Chunk 转换为 OpenAI 兼容的 data: {JSON} 字符串 -func ToOpenAIStream(chunk *schema.Message) (string, error) { - var dto StreamResponse - if chunk.ReasoningContent != "" { - dto = ToStreamReasoningResponseDTO(chunk) - } else { - dto = ToStreamResponseDTO(chunk) +// ToOpenAIStream 将单个 Eino chunk 转为 OpenAI 兼容 JSON +func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool) (string, error) { + delta := StreamDelta{} + if includeRole { + delta.Role = "assistant" + } + if chunk != nil { + delta.Content = chunk.Content + delta.ReasoningContent = chunk.ReasoningContent + } + + if delta.Role == "" && delta.Content == "" && delta.ReasoningContent == "" { + return "", nil + } + + dto := StreamResponse{ + ID: requestID, + Object: "chat.completion.chunk", + Created: created, + Model: modelName, + Choices: []StreamChoice{{ + Index: 0, + Delta: delta, + FinishReason: nil, + }}, } jsonBytes, err := json.Marshal(dto) if err != nil { return "", err } - // 严格遵循 SSE 协议格式 return string(jsonBytes), nil } -func StreamChat(ctx context.Context, llm *ark.ChatModel, userInput string, ifThinking bool, chatHistory []*schema.Message, outChan chan<- string) (string, error) { - // 1. 组装消息 +// ToOpenAIFinishStream 生成结束 chunk(finish_reason=stop) +func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, error) { + stop := "stop" + dto := StreamResponse{ + ID: requestID, + Object: "chat.completion.chunk", + Created: created, + Model: modelName, + Choices: []StreamChoice{{ + Index: 0, + Delta: StreamDelta{}, + FinishReason: &stop, + }}, + } + jsonBytes, err := json.Marshal(dto) + if err != nil { + return "", err + } + return string(jsonBytes), nil +} + +func StreamChat(ctx context.Context, llm *ark.ChatModel, modelName string, userInput string, ifThinking bool, chatHistory []*schema.Message, outChan chan<- string) (string, error) { + // 1) 组装提示消息 messages := make([]*schema.Message, 0) - // A. 塞入 System Message (人设) messages = append(messages, schema.SystemMessage(SystemPrompt)) - // B. 塞入历史记录 (上下文) if len(chatHistory) > 0 { messages = append(messages, chatHistory...) } - // C. 塞入用户当前的消息 (当前需求) messages = append(messages, schema.UserMessage(userInput)) - // 2. 调用流式接口 + + // 2) 发起流式请求 var thinking *ark.Thinking if ifThinking { thinking = &arkModel.Thinking{Type: arkModel.ThinkingTypeEnabled} @@ -81,28 +108,45 @@ func StreamChat(ctx context.Context, llm *ark.ChatModel, userInput string, ifThi if err != nil { return "", err } - defer reader.Close() // 记得关闭 Reader + defer reader.Close() - // 3. 循环读取直到结束 + if strings.TrimSpace(modelName) == "" { + modelName = "smartflow-worker" + } + requestID := "chatcmpl-" + uuid.NewString() + created := time.Now().Unix() + firstChunk := true + + // 3) 持续转发 chunk var fullText strings.Builder for { chunk, err := reader.Recv() if err == io.EOF { - break // 读取完成 + break } if err != nil { return "", err } - /*if chunk.Content == "" { // 过滤掉空内容,避免发送无效消息 - continue - }*/ + fullText.WriteString(chunk.Content) - // 将内容发送到通道中供前端消费 - retChuck, err := ToOpenAIStream(chunk) + + payload, err := ToOpenAIStream(chunk, requestID, modelName, created, firstChunk) if err != nil { return "", err } - outChan <- retChuck + if payload != "" { + outChan <- payload + firstChunk = false + } } + + // 4) 发送结束 chunk 和 [DONE] + finishChunk, err := ToOpenAIFinishStream(requestID, modelName, created) + if err != nil { + return "", err + } + outChan <- finishChunk + outChan <- "[DONE]" + return fullText.String(), nil } diff --git a/backend/api/agent.go b/backend/api/agent.go index b3de0f0..a6e8dab 100644 --- a/backend/api/agent.go +++ b/backend/api/agent.go @@ -1,6 +1,7 @@ package api import ( + "encoding/json" "io" "net/http" "strings" @@ -16,50 +17,65 @@ type AgentHandler struct { svc *service.AgentService } -// NewAgentHandler 组装 Handler 的“工厂” +// NewAgentHandler 组装 AgentHandler func NewAgentHandler(svc *service.AgentService) *AgentHandler { return &AgentHandler{ - svc: svc, // 把传进来的 Service 揣进口袋里 + svc: svc, } } +func writeSSEData(w io.Writer, payload string) error { + _, err := io.WriteString(w, "data: "+payload+"\n\n") + return err +} + func (api *AgentHandler) ChatAgent(c *gin.Context) { - // 1. 设置请求头 + // 1) 设置 SSE 响应头 c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("Transfer-Encoding", "chunked") - // 2. 从请求中获取用户输入 + c.Writer.Header().Set("X-Accel-Buffering", "no") + + // 2) 解析请求体 var req model.UserSendMessageRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, respond.WrongParamType) return } - // 兼容:如果前端没传会话 ID,后端兜底创建一个 + // 3) 规范化会话 ID conversationID := strings.TrimSpace(req.ConversationID) if conversationID == "" { conversationID = uuid.NewString() } - // 把最终生效的会话 ID 回传给前端,方便后续继续同一会话 c.Writer.Header().Set("X-Conversation-ID", conversationID) - userID := c.GetInt("user_id") // 从上下文中获取用户 ID - // 3. 调用 Service 层的聊天方法,获取输出通道和错误通道 - outChan, errChan := api.svc.AgentChat(c.Request.Context(), req.Message, req.Thinking, userID, conversationID) - // 4. 循环转发消息/错误 + userID := c.GetInt("user_id") + outChan, errChan := api.svc.AgentChat(c.Request.Context(), req.Message, req.Thinking, req.Model, userID, conversationID) + + // 4) 转发 SSE 流 c.Stream(func(w io.Writer) bool { select { case err, ok := <-errChan: if ok && err != nil { - respond.DealWithError(c, err) + errPayload, _ := json.Marshal(map[string]any{ + "error": map[string]any{ + "message": err.Error(), + "type": "server_error", + }, + }) + _ = writeSSEData(w, string(errPayload)) + _ = writeSSEData(w, "[DONE]") } return false case msg, ok := <-outChan: if !ok { return false } - c.SSEvent("message", msg) // 发送 SSE 格式消息 + if err := writeSSEData(w, msg); err != nil { + return false + } return true case <-c.Request.Context().Done(): return false diff --git a/backend/service/agent.go b/backend/service/agent.go index d8ca440..34a9673 100644 --- a/backend/service/agent.go +++ b/backend/service/agent.go @@ -9,6 +9,7 @@ import ( "github.com/LoveLosita/smartflow/backend/conv" "github.com/LoveLosita/smartflow/backend/dao" "github.com/LoveLosita/smartflow/backend/inits" + "github.com/cloudwego/eino-ext/components/model/ark" "github.com/cloudwego/eino/schema" "github.com/google/uuid" ) @@ -35,15 +36,24 @@ func normalizeConversationID(chatID string) string { return trimmed } -func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, userID int, chatID string) (<-chan string, <-chan error) { - //1. 创建一个输出通道 +func (s *AgentService) pickChatModel(requestModel string) (*ark.ChatModel, string) { + model := strings.TrimSpace(requestModel) + if strings.EqualFold(model, "strategist") { + return s.AIHub.Strategist, "strategist" + } + return s.AIHub.Worker, "worker" +} + +func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThinking bool, modelName string, userID int, chatID string) (<-chan string, <-chan error) { + // 1) 准备输出通道 outChan := make(chan string, 5) errChan := make(chan error, 1) - //补充:会话 ID 兜底,避免上层漏传 - chatID = normalizeConversationID(chatID) - //2. 先确保这个会话存在(如果不存在就创建一个新的) - //先看看缓存里面有没有这个会话 + // 2) 规范化会话并选择模型 + chatID = normalizeConversationID(chatID) + selectedModel, resolvedModelName := s.pickChatModel(modelName) + + // 3) 确保会话存在 result, err := s.agentCache.GetConversationStatus(ctx, chatID) if err != nil { errChan <- err @@ -51,7 +61,6 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin close(errChan) return outChan, errChan } - //如果缓存里面没有,就去查库 if !result { innerResult, err := s.repo.IfChatExists(ctx, userID, chatID) if err != nil { @@ -61,7 +70,6 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } if !innerResult { - //如果会话不存在,先创建一个新的会话 if _, err = s.repo.CreateNewChat(userID, chatID); err != nil { errChan <- err close(outChan) @@ -69,15 +77,12 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } } - //补充:把“会话存在”状态回写缓存,后续请求可直接命中 if err = s.agentCache.SetConversationStatus(ctx, chatID); err != nil { - //缓存回写失败不影响主流程 log.Printf("failed to set conversation status cache for %s: %v", chatID, err) } } - //能走到这里,要么缓存里有这个会话,要么数据库里有这个会话了 - //4. 提取出历史消息,构建上下文 - //先尝试从缓存里拿历史消息 + + // 4) 构建历史上下文 chatHistory, err := s.agentCache.GetHistory(ctx, chatID) if err != nil { errChan <- err @@ -85,9 +90,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin close(errChan) return outChan, errChan } - //如果缓存里没有历史消息,就从数据库里拿 if chatHistory == nil { - //先从数据库拿到历史消息 histories, err := s.repo.GetUserChatHistories(ctx, userID, 20, chatID) if err != nil { errChan <- err @@ -95,9 +98,7 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin close(errChan) return outChan, errChan } - //再转换成 Eino 的消息格式 chatHistory = conv.ToEinoMessages(histories) - //把历史消息放到缓存里,方便下次直接拿 if err = s.agentCache.BackfillHistory(ctx, chatID, chatHistory); err != nil { errChan <- err close(outChan) @@ -105,9 +106,9 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin return outChan, errChan } } - //3. 将用户消息异步落缓存和库 + + // 5) 异步落用户消息 go func() { - //这里先不管落库成功与否了,毕竟不想因为落库失败而影响用户的聊天体验 bg := context.Background() _ = s.agentCache.PushMessage(bg, chatID, &schema.Message{ Role: schema.User, @@ -116,17 +117,17 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin _ = s.repo.SaveChatHistory(bg, userID, chatID, "user", userMessage) }() - //5. 启动一个 goroutine 来处理聊天逻辑 + // 6) 流式输出模型回复 go func() { - defer close(outChan) // 确保在函数结束时关闭通道 - defer close(errChan) - //3. 调用 StreamChat 函数进行流式聊天 - fullText, err := agent.StreamChat(ctx, s.AIHub.Worker, userMessage, ifThinking, chatHistory, outChan) + defer close(outChan) + + fullText, err := agent.StreamChat(ctx, selectedModel, resolvedModelName, userMessage, ifThinking, chatHistory, outChan) if err != nil { errChan <- err return } - //4. 将 AI 的回复异步落缓存和库 + + // 7) 异步落助手消息 go func() { bg := context.Background() _ = s.agentCache.PushMessage(bg, chatID, &schema.Message{ @@ -135,9 +136,9 @@ func (s *AgentService) AgentChat(ctx context.Context, userMessage string, ifThin }) if saveErr := s.repo.SaveChatHistory(bg, userID, chatID, "assistant", fullText); saveErr != nil { log.Printf("failed to save chat history to database: %v", saveErr) - return } }() }() + return outChan, errChan } diff --git a/docs/apifox/agent-chat.openapi.yaml b/docs/apifox/agent-chat.openapi.yaml new file mode 100644 index 0000000..77efea1 --- /dev/null +++ b/docs/apifox/agent-chat.openapi.yaml @@ -0,0 +1,110 @@ +openapi: 3.0.1 +info: + title: '' + version: 1.0.0 +paths: + /agent/chat: + post: + summary: AI Agent&聊天 + deprecated: false + description: >- + 本接口既支持带着消息新建对话,也支持通过旧对话继续聊天。 + 在 JSON 中传入 conversation_id,后端查库:存在则延续,不存在则创建新对话后再聊天。 + + 流式响应采用 OpenAI/DeepSeek 兼容格式: + - 思考流:choices[0].delta.reasoning_content + - 正文流:choices[0].delta.content + - 结束标记:data: [DONE] + tags: + - Agent模块 + parameters: + - name: Authorization + in: header + description: token + required: false + example: '' + schema: + type: string + - name: Content-Type + in: header + description: '' + required: false + example: + - application/json + schema: + type: array + items: + type: string + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + conversation_id: + type: string + description: 可选。不传时后端自动生成,并通过 X-Conversation-ID 响应头返回。 + x-apifox-mock: '{{$string.uuid}}' + message: + type: string + description: 用户输入内容 + model: + type: string + description: 可选,worker 或 strategist(默认 worker) + thinking: + type: boolean + description: 是否开启深度思考 + required: + - message + - thinking + x-apifox-orders: + - conversation_id + - message + - model + - thinking + example: + conversation_id: 0b6eac35-ccaa-46d1-aa58-d33bc2cd48aa + message: 提醒我有空的时候给自己挑一件新衣服 + model: worker + thinking: true + responses: + '200': + description: '' + content: + text/event-stream: + schema: + type: string + description: >- + 每条 SSE 事件都是 `data: {JSON}`,最终以 `data: [DONE]` 结束。 + Apifox 可按 OpenAI 兼容格式自动合并,并区分 reasoning_content 与 content。 + example: |- + data: {"id":"chatcmpl-3f3ee5d6-8c4f-4b5b-a2a8-7f5b9bde8b9d","object":"chat.completion.chunk","created":1740637581,"model":"worker","choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"先分析一下你的需求。"},"finish_reason":null}]} + + data: {"id":"chatcmpl-3f3ee5d6-8c4f-4b5b-a2a8-7f5b9bde8b9d","object":"chat.completion.chunk","created":1740637581,"model":"worker","choices":[{"index":0,"delta":{"reasoning_content":"你提到的是空闲时提醒。"},"finish_reason":null}]} + + data: {"id":"chatcmpl-3f3ee5d6-8c4f-4b5b-a2a8-7f5b9bde8b9d","object":"chat.completion.chunk","created":1740637581,"model":"worker","choices":[{"index":0,"delta":{"content":"可以,我会在你有空时提醒你。"},"finish_reason":null}]} + + data: {"id":"chatcmpl-3f3ee5d6-8c4f-4b5b-a2a8-7f5b9bde8b9d","object":"chat.completion.chunk","created":1740637581,"model":"worker","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + + data: [DONE] + headers: + X-Conversation-ID: + example: 0b6eac35-ccaa-46d1-aa58-d33bc2cd48aa + required: false + description: 生效的会话 ID,用于后续续聊 + schema: + type: string + x-apifox-name: 成功 + x-apifox-ordering: 0 + security: [] + x-apifox-folder: Agent模块 + x-apifox-status: developing +components: + schemas: {} + responses: {} + securitySchemes: {} +servers: + - url: http://127.0.0.1:8080/api/v1 + description: 开发环境 +security: []