package service import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "time" "go.uber.org/zap" "github.com/topfans/backend/pkg/logger" ) // DifyClient Dify Workflow API HTTP 客户端 type DifyClient struct { BaseURL string APIKey string HTTPClient *http.Client } // NewDifyClient 创建 Dify 客户端 func NewDifyClient(baseURL, apiKey string) *DifyClient { return &DifyClient{ BaseURL: baseURL, APIKey: apiKey, HTTPClient: &http.Client{ Timeout: 120 * time.Second, // Dify 工作流最长等待 120s }, } } // WorkflowInput Dify 工作流输入 type WorkflowInput struct { SourceImageURL string `json:"source_image_url"` UseCutout bool `json:"use_cutout"` PresetCodes []string `json:"preset_codes"` RenderConfigs []map[string]interface{} `json:"render_configs"` } // WorkflowOutput Dify 工作流输出 type WorkflowOutput struct { WorkflowRunID string `json:"workflow_run_id"` TaskID string `json:"task_id"` Status string `json:"status"` // succeeded / failed / running Outputs map[string]interface{} `json:"data"` Error string `json:"error"` } // RunWorkflow 触发 Dify 工作流(blocking 模式) // Dify API: POST /v1/workflows/run func (c *DifyClient) RunWorkflow(ctx context.Context, inputs map[string]interface{}, userID string) (*WorkflowOutput, error) { url := fmt.Sprintf("%s/workflows/run", c.BaseURL) body := map[string]interface{}{ "inputs": inputs, "response_mode": "blocking", "user": fmt.Sprintf("user-%s", userID), } jsonBody, err := json.Marshal(body) if err != nil { return nil, fmt.Errorf("marshal request: %w", err) } req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonBody)) if err != nil { return nil, fmt.Errorf("create request: %w", err) } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey)) req.Header.Set("Content-Type", "application/json") resp, err := c.HTTPClient.Do(req) if err != nil { logger.Logger.Error("Dify request failed", zap.Error(err)) return nil, fmt.Errorf("dify request: %w", err) } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) logger.Logger.Info("Dify raw response", zap.String("body", string(respBody))) if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("dify returned HTTP %d: %s", resp.StatusCode, string(respBody)) } var output WorkflowOutput if err := json.Unmarshal(respBody, &output); err != nil { // Dify blocking 模式直接返回 data 字段,尝试灵活解析 var raw map[string]interface{} if err2 := json.Unmarshal(respBody, &raw); err2 != nil { return nil, fmt.Errorf("parse dify response: %w", err) } output.Status = "succeeded" output.Outputs = raw } // Dify blocking 响应: data.status / data.outputs / data.error 是嵌套的 // 现有 struct: Status 映射顶层 status(通常为空),Outputs 映射 data(整个) // 这里从 data 中提取真正的 status 和 outputs if output.Outputs != nil { if s, ok := output.Outputs["status"].(string); ok && s != "" { output.Status = s } if errMsg, ok := output.Outputs["error"]; ok && errMsg != nil { if errStr, ok := errMsg.(string); ok && errStr != "" { output.Error = errStr } } // 提取 data.outputs 作为真正的 Outputs if innerOutputs, ok := output.Outputs["outputs"].(map[string]interface{}); ok { output.Outputs = innerOutputs } } logger.Logger.Info("Dify workflow completed", zap.String("status", output.Status)) return &output, nil } // RunWorkflowAsync 触发 Dify 工作流(streaming 模式,返回 workflow_run_id 用于轮询) func (c *DifyClient) RunWorkflowAsync(ctx context.Context, inputs map[string]interface{}, userID string) (string, error) { url := fmt.Sprintf("%s/workflows/run", c.BaseURL) body := map[string]interface{}{ "inputs": inputs, "response_mode": "streaming", "user": fmt.Sprintf("user-%s", userID), } jsonBody, err := json.Marshal(body) if err != nil { return "", fmt.Errorf("marshal request: %w", err) } req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(jsonBody)) if err != nil { return "", fmt.Errorf("create request: %w", err) } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey)) req.Header.Set("Content-Type", "application/json") resp, err := c.HTTPClient.Do(req) if err != nil { return "", fmt.Errorf("dify request: %w", err) } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) // 从 streaming 响应的第一条 event 中提取 workflow_run_id var event struct { Event string `json:"event"` WorkflowRunID string `json:"workflow_run_id"` TaskID string `json:"task_id"` } if err := json.Unmarshal(respBody, &event); err != nil { return "", fmt.Errorf("parse streaming response: %w (body: %s)", err, string(respBody)) } if event.WorkflowRunID == "" { return "", fmt.Errorf("no workflow_run_id in response: %s", string(respBody)) } return event.WorkflowRunID, nil } // GetWorkflowResult 查询工作流执行结果 // Dify API: GET /v1/workflows/run/{workflow_run_id} func (c *DifyClient) GetWorkflowResult(ctx context.Context, workflowRunID string) (*WorkflowOutput, error) { url := fmt.Sprintf("%s/workflows/run/%s", c.BaseURL, workflowRunID) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, fmt.Errorf("create request: %w", err) } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey)) resp, err := c.HTTPClient.Do(req) if err != nil { return nil, fmt.Errorf("dify request: %w", err) } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("dify returned HTTP %d: %s", resp.StatusCode, string(respBody)) } var output WorkflowOutput if err := json.Unmarshal(respBody, &output); err != nil { return nil, fmt.Errorf("parse response: %w", err) } return &output, nil }