topfans/backend/gateway/service/dify_client.go
2026-06-03 22:19:22 +08:00

203 lines
6.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
)
// DifyClient Dify Workflow API HTTP 客户端
type DifyClient struct {
BaseURL string
APIKey string
HTTPClient *http.Client
}
// NewDifyClient 创建 Dify 客户端
func NewDifyClient(baseURL, apiKey string) *DifyClient {
return &DifyClient{
BaseURL: baseURL,
APIKey: apiKey,
HTTPClient: &http.Client{
Timeout: 120 * time.Second, // Dify 工作流最长等待 120s
},
}
}
// WorkflowInput Dify 工作流输入
type WorkflowInput struct {
SourceImageURL string `json:"source_image_url"`
UseCutout bool `json:"use_cutout"`
PresetCodes []string `json:"preset_codes"`
RenderConfigs []map[string]interface{} `json:"render_configs"`
}
// WorkflowOutput Dify 工作流输出
type WorkflowOutput struct {
WorkflowRunID string `json:"workflow_run_id"`
TaskID string `json:"task_id"`
Status string `json:"status"` // succeeded / failed / running
Outputs map[string]interface{} `json:"data"`
Error string `json:"error"`
}
// RunWorkflow 触发 Dify 工作流blocking 模式)
// Dify API: POST /v1/workflows/run
func (c *DifyClient) RunWorkflow(ctx context.Context, inputs map[string]interface{}, userID string) (*WorkflowOutput, error) {
url := fmt.Sprintf("%s/workflows/run", c.BaseURL)
body := map[string]interface{}{
"inputs": inputs,
"response_mode": "blocking",
"user": fmt.Sprintf("user-%s", userID),
}
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonBody))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
req.Header.Set("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
logger.Logger.Error("Dify request failed", zap.Error(err))
return nil, fmt.Errorf("dify request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
logger.Logger.Info("Dify raw response", zap.String("body", string(respBody)))
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("dify returned HTTP %d: %s", resp.StatusCode, string(respBody))
}
var output WorkflowOutput
if err := json.Unmarshal(respBody, &output); err != nil {
// Dify blocking 模式直接返回 data 字段,尝试灵活解析
var raw map[string]interface{}
if err2 := json.Unmarshal(respBody, &raw); err2 != nil {
return nil, fmt.Errorf("parse dify response: %w", err)
}
output.Status = "succeeded"
output.Outputs = raw
}
// Dify blocking 响应: data.status / data.outputs / data.error 是嵌套的
// 现有 struct: Status 映射顶层 status(通常为空)Outputs 映射 data(整个)
// 这里从 data 中提取真正的 status 和 outputs
if output.Outputs != nil {
if s, ok := output.Outputs["status"].(string); ok && s != "" {
output.Status = s
}
if errMsg, ok := output.Outputs["error"]; ok && errMsg != nil {
if errStr, ok := errMsg.(string); ok && errStr != "" {
output.Error = errStr
}
}
// 提取 data.outputs 作为真正的 Outputs
if innerOutputs, ok := output.Outputs["outputs"].(map[string]interface{}); ok {
output.Outputs = innerOutputs
}
}
logger.Logger.Info("Dify workflow completed", zap.String("status", output.Status))
return &output, nil
}
// RunWorkflowAsync 触发 Dify 工作流streaming 模式,返回 workflow_run_id 用于轮询)
func (c *DifyClient) RunWorkflowAsync(ctx context.Context, inputs map[string]interface{}, userID string) (string, error) {
url := fmt.Sprintf("%s/workflows/run", c.BaseURL)
body := map[string]interface{}{
"inputs": inputs,
"response_mode": "streaming",
"user": fmt.Sprintf("user-%s", userID),
}
jsonBody, err := json.Marshal(body)
if err != nil {
return "", fmt.Errorf("marshal request: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonBody))
if err != nil {
return "", fmt.Errorf("create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
req.Header.Set("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return "", fmt.Errorf("dify request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
// 从 streaming 响应的第一条 event 中提取 workflow_run_id
var event struct {
Event string `json:"event"`
WorkflowRunID string `json:"workflow_run_id"`
TaskID string `json:"task_id"`
}
if err := json.Unmarshal(respBody, &event); err != nil {
return "", fmt.Errorf("parse streaming response: %w (body: %s)", err, string(respBody))
}
if event.WorkflowRunID == "" {
return "", fmt.Errorf("no workflow_run_id in response: %s", string(respBody))
}
return event.WorkflowRunID, nil
}
// GetWorkflowResult 查询工作流执行结果
// Dify API: GET /v1/workflows/run/{workflow_run_id}
func (c *DifyClient) GetWorkflowResult(ctx context.Context, workflowRunID string) (*WorkflowOutput, error) {
url := fmt.Sprintf("%s/workflows/run/%s", c.BaseURL, workflowRunID)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey))
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("dify request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("dify returned HTTP %d: %s", resp.StatusCode, string(respBody))
}
var output WorkflowOutput
if err := json.Unmarshal(respBody, &output); err != nil {
return nil, fmt.Errorf("parse response: %w", err)
}
return &output, nil
}