Version: 0.8.3.dev.260328

后端:
1.彻底删除原agent文件夹,并将现agent2文件夹全量重命名为agent(包括全部涉及到的文件以及文档、注释),迁移工作完美结束
2.修复了重试消息的相关逻辑问题

前端:
1.改善了一些交互体验,修复了一些bug,现在只剩少的功能了,现存的bug基本都修复完毕

全仓库:
1.更新了决策记录和README文档
This commit is contained in:
Losita
2026-03-28 18:00:31 +08:00
parent 5fc9548420
commit 468367d617
108 changed files with 1910 additions and 17173 deletions

View File

@@ -1,93 +1,19 @@
package chat
package agentchat
import (
"context"
"encoding/json"
"io"
"strings"
"time"
agentllm "github.com/LoveLosita/smartflow/backend/agent/llm"
agentstream "github.com/LoveLosita/smartflow/backend/agent/stream"
"github.com/cloudwego/eino-ext/components/model/ark"
"github.com/cloudwego/eino/schema"
"github.com/google/uuid"
arkModel "github.com/volcengine/volcengine-go-sdk/service/arkruntime/model"
)
// StreamResponse 是 OpenAI/DeepSeek 兼容的流式 chunk 结构。
type StreamResponse struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
Model string `json:"model"`
Choices []StreamChoice `json:"choices"`
}
type StreamChoice struct {
Index int `json:"index"`
Delta StreamDelta `json:"delta"`
FinishReason *string `json:"finish_reason"`
}
type StreamDelta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
}
// ToOpenAIStream 将单个 Eino chunk 转为 OpenAI 兼容 JSON。
func ToOpenAIStream(chunk *schema.Message, requestID, modelName string, created int64, includeRole bool) (string, error) {
delta := StreamDelta{}
if includeRole {
delta.Role = "assistant"
}
if chunk != nil {
delta.Content = chunk.Content
delta.ReasoningContent = chunk.ReasoningContent
}
if delta.Role == "" && delta.Content == "" && delta.ReasoningContent == "" {
return "", nil
}
dto := StreamResponse{
ID: requestID,
Object: "chat.completion.chunk",
Created: created,
Model: modelName,
Choices: []StreamChoice{{
Index: 0,
Delta: delta,
FinishReason: nil,
}},
}
jsonBytes, err := json.Marshal(dto)
if err != nil {
return "", err
}
return string(jsonBytes), nil
}
// ToOpenAIFinishStream 生成结束 chunkfinish_reason=stop
func ToOpenAIFinishStream(requestID, modelName string, created int64) (string, error) {
stop := "stop"
dto := StreamResponse{
ID: requestID,
Object: "chat.completion.chunk",
Created: created,
Model: modelName,
Choices: []StreamChoice{{
Index: 0,
Delta: StreamDelta{},
FinishReason: &stop,
}},
}
jsonBytes, err := json.Marshal(dto)
if err != nil {
return "", err
}
return string(jsonBytes), nil
}
// StreamChat 负责模型流式输出,并在关键节点打点:
// 1) 流连接建立llm.Stream 返回)
// 2) 首包到达(首字延迟)
@@ -103,7 +29,8 @@ func StreamChat(
traceID string,
chatID string,
requestStart time.Time,
) (string, *schema.TokenUsage, error) {
reasoningStartAt *time.Time,
) (string, string, int, *schema.TokenUsage, error) {
/*callStart := time.Now()*/
messages := make([]*schema.Message, 0)
@@ -123,7 +50,7 @@ func StreamChat(
/*connectStart := time.Now()*/
reader, err := llm.Stream(ctx, messages, ark.WithThinking(thinking))
if err != nil {
return "", nil, err
return "", "", 0, nil, err
}
defer reader.Close()
@@ -135,6 +62,12 @@ func StreamChat(
firstChunk := true
chunkCount := 0
var tokenUsage *schema.TokenUsage
var localReasoningStartAt *time.Time
if reasoningStartAt != nil && !reasoningStartAt.IsZero() {
startCopy := reasoningStartAt.In(time.Local)
localReasoningStartAt = &startCopy
}
var reasoningEndAt *time.Time
/*streamRecvStart := time.Now()
log.Printf("打点|流连接建立|trace_id=%s|chat_id=%s|request_id=%s|本步耗时_ms=%d|请求累计_ms=%d|history_len=%d",
@@ -147,29 +80,42 @@ func StreamChat(
)*/
var fullText strings.Builder
var reasoningText strings.Builder
for {
chunk, err := reader.Recv()
if err == io.EOF {
break
}
if err != nil {
return "", nil, err
return "", "", 0, nil, err
}
// 优先记录模型真实 usage通常在尾块返回部分模型也可能中途返回
if chunk != nil && chunk.ResponseMeta != nil && chunk.ResponseMeta.Usage != nil {
tokenUsage = mergeTokenUsage(tokenUsage, chunk.ResponseMeta.Usage)
tokenUsage = agentllm.MergeUsage(tokenUsage, chunk.ResponseMeta.Usage)
}
fullText.WriteString(chunk.Content)
if chunk != nil {
if strings.TrimSpace(chunk.ReasoningContent) != "" && localReasoningStartAt == nil {
now := time.Now()
localReasoningStartAt = &now
}
if strings.TrimSpace(chunk.Content) != "" && localReasoningStartAt != nil && reasoningEndAt == nil {
now := time.Now()
reasoningEndAt = &now
}
fullText.WriteString(chunk.Content)
reasoningText.WriteString(chunk.ReasoningContent)
}
payload, err := ToOpenAIStream(chunk, requestID, modelName, created, firstChunk)
payload, err := agentstream.ToOpenAIStream(chunk, requestID, modelName, created, firstChunk)
if err != nil {
return "", nil, err
return "", "", 0, nil, err
}
if payload != "" {
outChan <- payload
chunkCount++
firstChunk = false
/*if firstChunk {
log.Printf("打点|首包到达|trace_id=%s|chat_id=%s|request_id=%s|本步耗时_ms=%d|请求累计_ms=%d",
traceID,
@@ -183,9 +129,9 @@ func StreamChat(
}
}
finishChunk, err := ToOpenAIFinishStream(requestID, modelName, created)
finishChunk, err := agentstream.ToOpenAIFinishStream(requestID, modelName, created)
if err != nil {
return "", nil, err
return "", "", 0, nil, err
}
outChan <- finishChunk
outChan <- "[DONE]"
@@ -200,39 +146,16 @@ func StreamChat(
time.Since(requestStart).Milliseconds(),
)*/
return fullText.String(), tokenUsage, nil
}
// mergeTokenUsage 合并流式分片中的 usage。
//
// 设计说明:
// 1. 不同模型的 usage 回传时机不同(中间块/尾块);
// 2. 这里按“更大值覆盖”合并,确保最终拿到完整统计;
// 3. 只用于统计,不影响流式正文输出。
func mergeTokenUsage(base *schema.TokenUsage, incoming *schema.TokenUsage) *schema.TokenUsage {
if incoming == nil {
return base
}
if base == nil {
copied := *incoming
return &copied
reasoningDurationSeconds := 0
if localReasoningStartAt != nil {
if reasoningEndAt == nil {
now := time.Now()
reasoningEndAt = &now
}
if reasoningEndAt.After(*localReasoningStartAt) {
reasoningDurationSeconds = int(reasoningEndAt.Sub(*localReasoningStartAt) / time.Second)
}
}
merged := *base
if incoming.PromptTokens > merged.PromptTokens {
merged.PromptTokens = incoming.PromptTokens
}
if incoming.CompletionTokens > merged.CompletionTokens {
merged.CompletionTokens = incoming.CompletionTokens
}
if incoming.TotalTokens > merged.TotalTokens {
merged.TotalTokens = incoming.TotalTokens
}
if incoming.PromptTokenDetails.CachedTokens > merged.PromptTokenDetails.CachedTokens {
merged.PromptTokenDetails.CachedTokens = incoming.PromptTokenDetails.CachedTokens
}
if incoming.CompletionTokensDetails.ReasoningTokens > merged.CompletionTokensDetails.ReasoningTokens {
merged.CompletionTokensDetails.ReasoningTokens = incoming.CompletionTokensDetails.ReasoningTokens
}
return &merged
return fullText.String(), reasoningText.String(), reasoningDurationSeconds, tokenUsage, nil
}