From 75ab617c6c9173a1cbff42508cd91c46377438d5 Mon Sep 17 00:00:00 2001 From: zerosaturation Date: Tue, 23 Jun 2026 18:13:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E5=A2=9E=E5=8A=A0=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E7=95=99=E8=A8=80=E5=8A=9F=E8=83=BD=E5=92=8C=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E8=BD=AE=E8=AF=A2=E9=81=93=E5=85=B7=E8=B4=AD=E4=B9=B0=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E6=98=BE=E7=A4=BA=E6=94=B9=E4=B8=BAwebsocket=E6=98=BE?= =?UTF-8?q?=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/gateway/config/config.go | 6 +- .../gateway/controller/activity_controller.go | 182 ++ backend/gateway/main.go | 14 +- backend/gateway/pkg/response/response.go | 7 + backend/gateway/router/router.go | 9 +- backend/gateway/socket/activity_socket.go | 368 +++ .../2026_06_22_012_activity_messages.sql | 52 + backend/pkg/errors/errors.go | 26 + backend/pkg/models/activity_message.go | 23 + backend/pkg/proto/activity/activity.pb.go | 472 +++- backend/pkg/proto/activity/activity.triple.go | 56 +- backend/proto/activity.proto | 52 + .../services/activityService/config/config.go | 61 + backend/services/activityService/main.go | 1 + .../provider/activity_provider.go | 36 + .../activity_messages_repository.go | 111 + .../service/activity_service.go | 367 +++ .../2026-06-22-activity-realtime-websocket.md | 2125 +++++++++++++++++ .../components/ContributionList.vue | 138 +- .../components/MessageBoard.vue | 41 +- .../components/MessageInput.vue | 59 +- .../composables/useContributionPolling.js | 14 +- .../composables/useContributionRealtime.js | 89 + .../composables/useMessageRealtime.js | 113 + frontend/pages/support-activity/index.vue | 61 +- frontend/utils/api.js | 17 + frontend/utils/format.js | 40 + frontend/utils/socket/ActivitySocket.js | 214 ++ frontend/utils/socket/GlobalSocketManager.js | 12 + frontend/utils/socket/SocketManager.js | 54 + frontend/utils/socket/index.js | 1 + 31 files changed, 4657 insertions(+), 164 deletions(-) create mode 100644 backend/gateway/socket/activity_socket.go create mode 100644 backend/migrations/2026_06_22_012_activity_messages.sql create mode 100644 backend/pkg/models/activity_message.go create mode 100644 backend/services/activityService/config/config.go create mode 100644 backend/services/activityService/repository/activity_messages_repository.go create mode 100644 docs/superpowers/plans/2026-06-22-activity-realtime-websocket.md create mode 100644 frontend/pages/support-activity/composables/useContributionRealtime.js create mode 100644 frontend/pages/support-activity/composables/useMessageRealtime.js create mode 100644 frontend/utils/format.js create mode 100644 frontend/utils/socket/ActivitySocket.js diff --git a/backend/gateway/config/config.go b/backend/gateway/config/config.go index 995e9f3..a018407 100644 --- a/backend/gateway/config/config.go +++ b/backend/gateway/config/config.go @@ -122,7 +122,8 @@ func (c *OSSConfig) GetUploadDir(uploadType string) string { // WebSocketConfig WebSocket 配置 type WebSocketConfig struct { - AIChatPath string // WebSocket 路径,默认 /ai-chat + AIChatPath string // AI Chat WebSocket 路径,默认 /ai-chat + ActivityPath string // 活动实时推送 WS 路径,默认 /activity } // Load 加载配置 @@ -193,7 +194,8 @@ func Load() *Config { TimeZone: getEnv("DB_TIMEZONE", "Asia/Shanghai"), }, WebSocket: WebSocketConfig{ - AIChatPath: getEnv("WS_AI_CHAT_PATH", "/ai-chat"), + AIChatPath: getEnv("WS_AI_CHAT_PATH", "/ai-chat"), + ActivityPath: getEnv("WS_ACTIVITY_PATH", "/activity"), }, } } diff --git a/backend/gateway/controller/activity_controller.go b/backend/gateway/controller/activity_controller.go index 345dd6d..5cd51ce 100644 --- a/backend/gateway/controller/activity_controller.go +++ b/backend/gateway/controller/activity_controller.go @@ -730,6 +730,188 @@ func convertLatestContributionsResponse(resp *pbActivity.GetLatestContributionsR } } +// ListActivityMessages 列出活动留言 +// @Summary 列出活动留言 +// @Description 分页获取活动留言列表(最新在上) +// @Tags activities +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param activity_id path int64 true "活动ID" +// @Param page query int false "页码,默认1" +// @Param page_size query int false "每页数量,默认20,最大50" +// @Success 200 {object} response.Response +// @Router /api/v1/activities/{activity_id}/messages [get] +func (ctrl *ActivityController) ListActivityMessages(c *gin.Context) { + // 解析路径参数 + activityIDStr := c.Param("id") + activityID, err := strconv.ParseInt(activityIDStr, 10, 64) + if err != nil { + response.Error(c, http.StatusBadRequest, "活动ID参数错误") + return + } + + // 解析查询参数 + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20")) + if page <= 0 { + page = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + if pageSize > 50 { + pageSize = 50 + } + + logger.Logger.Info("ListActivityMessages request", + zap.Int64("activity_id", activityID), + zap.Int("page", page), + zap.Int("page_size", pageSize), + ) + + // 设置上下文 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // 调用 RPC + resp, err := ctrl.activityService.ListActivityMessages(ctx, &pbActivity.ListActivityMessagesRequest{ + ActivityId: activityID, + Page: int32(page), + PageSize: int32(pageSize), + }) + + if err != nil { + logger.Logger.Error("ListActivityMessages RPC failed", zap.Error(err)) + response.Error(c, http.StatusInternalServerError, "获取活动留言失败") + return + } + + if resp.Base.Code != uint32(codes.OK) { + response.ErrorWithCode(c, int(resp.Base.Code), resp.Base.Message) + return + } + + // 转换响应 + messages := make([]map[string]interface{}, 0, len(resp.Messages)) + for _, m := range resp.Messages { + messages = append(messages, map[string]interface{}{ + "id": m.Id, + "activity_id": m.ActivityId, + "user_id": m.UserId, + "star_id": m.StarId, + "nickname": m.Nickname, + "avatar_url": m.AvatarUrl, + "content": m.Content, + "created_at": m.CreatedAt, + }) + } + + response.Success(c, map[string]interface{}{ + "messages": messages, + "page": resp.Page, + "page_size": resp.PageSize, + "total": resp.Total, + }) +} + +// CreateActivityMessage 发送一条活动留言 +// @Summary 发送活动留言 +// @Description 用户在应援活动页面发送一条祝福留言 +// @Tags activities +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param activity_id path int64 true "活动ID" +// @Param request body object{content=string} true "留言内容" +// @Success 200 {object} response.Response +// @Router /api/v1/activities/{activity_id}/messages [post] +func (ctrl *ActivityController) CreateActivityMessage(c *gin.Context) { + // 从上下文获取用户信息 + userID, exists := c.Get("user_id") + if !exists { + response.Error(c, http.StatusUnauthorized, "未授权") + return + } + starID, exists := c.Get("star_id") + if !exists { + response.Error(c, http.StatusUnauthorized, "未授权") + return + } + + // 解析路径参数 + activityIDStr := c.Param("id") + activityID, err := strconv.ParseInt(activityIDStr, 10, 64) + if err != nil { + response.Error(c, http.StatusBadRequest, "活动ID参数错误") + return + } + + // 解析请求体 + var req struct { + Content string `json:"content" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + response.Error(c, http.StatusBadRequest, "请求参数错误") + return + } + + if req.Content == "" { + response.Error(c, http.StatusBadRequest, "content 是必填参数") + return + } + + logger.Logger.Info("CreateActivityMessage request", + zap.Int64("user_id", userID.(int64)), + zap.Int64("star_id", starID.(int64)), + zap.Int64("activity_id", activityID), + zap.Int("content_len", len(req.Content)), + ) + + // 设置上下文(带 attachments 传递 user_id / star_id 给下游) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ctx = context.WithValue(ctx, constant.AttachmentKey, map[string]interface{}{ + "user_id": strconv.FormatInt(userID.(int64), 10), + "star_id": strconv.FormatInt(starID.(int64), 10), + }) + + // 调用 RPC + resp, err := ctrl.activityService.CreateActivityMessage(ctx, &pbActivity.CreateActivityMessageRequest{ + ActivityId: activityID, + UserId: userID.(int64), + StarId: starID.(int64), + Content: req.Content, + }) + + if err != nil { + logger.Logger.Error("CreateActivityMessage RPC failed", zap.Error(err)) + response.Error(c, http.StatusInternalServerError, "发送活动留言失败") + return + } + + if resp.Base.Code != uint32(codes.OK) { + response.ErrorWithCode(c, int(resp.Base.Code), resp.Base.Message) + return + } + + // 返回新创建的留言 + m := resp.Message + response.Success(c, map[string]interface{}{ + "message": map[string]interface{}{ + "id": m.Id, + "activity_id": m.ActivityId, + "user_id": m.UserId, + "star_id": m.StarId, + "nickname": m.Nickname, + "avatar_url": m.AvatarUrl, + "content": m.Content, + "created_at": m.CreatedAt, + }, + }) +} + // convertMintingActivitiesResponse 转换铸造活动列表响应 func convertMintingActivitiesResponse(resp *pbActivity.GetMintingActivitiesResponse) map[string]interface{} { activities := make([]map[string]interface{}, 0, len(resp.Activities)) diff --git a/backend/gateway/main.go b/backend/gateway/main.go index c00cb15..034c4d0 100644 --- a/backend/gateway/main.go +++ b/backend/gateway/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "os/signal" @@ -18,6 +19,7 @@ import ( "go.uber.org/zap" docs "github.com/topfans/backend/gateway/docs" + "github.com/topfans/backend/gateway/socket" pbModeration "github.com/topfans/backend/pkg/proto/moderation" ) @@ -220,9 +222,19 @@ func main() { logger.Logger.Fatal("Failed to create ModerationService pb client", zap.Error(err)) } + // 4.13 初始化 Activity Hub(WebSocket 实时推送) + redisClient := database.GetRedis() + activityHub := socket.NewActivityHub(redisClient, cfg.WebSocket.ActivityPath) + go activityHub.Run(context.Background()) + defer activityHub.Close() + logger.Logger.Info("ActivityHub initialized", + zap.String("path", cfg.WebSocket.ActivityPath), + zap.Bool("redis_available", redisClient != nil), + ) + // 5. 设置路由 logger.Logger.Info("Setting up routes...") - r, err := router.SetupRouter(userClient, socialClient, assetClient, galleryClient, activityClient, taskClient, starbookClient, aiChatClient, statisticClient, notificationClient, modSvc, cfg.WebSocket.AIChatPath) + r, err := router.SetupRouter(userClient, socialClient, assetClient, galleryClient, activityClient, taskClient, starbookClient, aiChatClient, statisticClient, notificationClient, modSvc, cfg.WebSocket.AIChatPath, activityHub) if err != nil { logger.Logger.Fatal("Failed to setup router", zap.Error(err)) } diff --git a/backend/gateway/pkg/response/response.go b/backend/gateway/pkg/response/response.go index 0f82f8b..8850f7e 100644 --- a/backend/gateway/pkg/response/response.go +++ b/backend/gateway/pkg/response/response.go @@ -146,6 +146,13 @@ func CleanErrorMessage(err error) string { "user info not found in Dubbo attachments": "请先登录", "missing or invalid authorization token": "Token缺失或无效", "已达到最大好友数量限制": "已达到最大好友数量限制", + "活动留言不存在": "活动留言不存在", + "留言太频繁": "留言太频繁,请稍后再试", + "当前活动留言已达上限": "当前活动留言已达上限", + "留言内容不能为空": "留言内容不能为空", + "留言内容过长": "留言内容过长,最多500字", + "留言内容包含不当内容": "留言内容包含不当内容,请修改", + "活动不在进行中": "活动未开始或已结束", } msgLower := strings.ToLower(msg) diff --git a/backend/gateway/router/router.go b/backend/gateway/router/router.go index 37a6ce0..82457af 100644 --- a/backend/gateway/router/router.go +++ b/backend/gateway/router/router.go @@ -16,7 +16,7 @@ import ( ) // SetupRouter 设置路由 -func SetupRouter(userClient *client.Client, socialClient *client.Client, assetClient *client.Client, galleryClient *client.Client, activityClient *client.Client, taskClient *client.Client, starbookClient *client.Client, aiChatClient *client.Client, statisticClient *client.Client, notificationClient *client.Client, modSvc pbModeration.ModerationService, aiChatPath string) (*gin.Engine, error) { +func SetupRouter(userClient *client.Client, socialClient *client.Client, assetClient *client.Client, galleryClient *client.Client, activityClient *client.Client, taskClient *client.Client, starbookClient *client.Client, aiChatClient *client.Client, statisticClient *client.Client, notificationClient *client.Client, modSvc pbModeration.ModerationService, aiChatPath string, activityHub *socket.ActivityHub) (*gin.Engine, error) { r := gin.Default() // 全局中间件 @@ -46,6 +46,11 @@ func SetupRouter(userClient *client.Client, socialClient *client.Client, assetCl aiChatHub.HandleWebSocket(w, r) })) + // Activity 实时推送 WebSocket 路由 + r.GET(activityHub.ActivityPath(), gin.WrapF(func(w http.ResponseWriter, r *http.Request) { + activityHub.HandleWebSocket(w, r) + })) + // 创建控制器 authCtrl, err := controller.NewAuthController(userClient) if err != nil { @@ -381,6 +386,8 @@ func SetupRouter(userClient *client.Client, socialClient *client.Client, assetCl activities.POST("/:id/batch-purchase", activityCtrl.BatchPurchaseItem) // 批量购买道具 activities.GET("/:id/ranking", activityCtrl.GetContributionRanking) // 获取贡献点排名 activities.GET("/:id/contributions/latest", activityCtrl.GetLatestContributions) // 获取最新贡献记录 + activities.GET("/:id/messages", activityCtrl.ListActivityMessages) // 获取活动留言列表 + activities.POST("/:id/messages", activityCtrl.CreateActivityMessage) // 发送一条活动留言 } // 铸造活动相关路由(运营banner)- 公开接口,不需要认证 diff --git a/backend/gateway/socket/activity_socket.go b/backend/gateway/socket/activity_socket.go new file mode 100644 index 0000000..3d6e6e7 --- /dev/null +++ b/backend/gateway/socket/activity_socket.go @@ -0,0 +1,368 @@ +package socket + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/redis/go-redis/v9" + "github.com/topfans/backend/pkg/jwt" + "github.com/topfans/backend/pkg/logger" + "go.uber.org/zap" +) + +// ActivityHub 管理所有 Activity WebSocket 连接 +type ActivityHub struct { + clients map[int64]map[*ActivityConn]struct{} // userId -> set of conns + subscriptions map[string]map[*ActivityConn]struct{} // "act:42:messages" / "act:42:contributions" -> conns + redisClient *redis.Client + activityPath string + mu sync.RWMutex +} + +// ActivityConn 单条 WebSocket 连接 +type ActivityConn struct { + UserID int64 + StarID int64 + Conn *websocket.Conn + Send chan []byte + Hub *ActivityHub + writeMu sync.Mutex +} + +// writeJSON 线程安全 JSON 写入 +func (c *ActivityConn) writeJSON(data interface{}) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.Conn.WriteJSON(data) +} + +// NewActivityHub 创建 ActivityHub +func NewActivityHub(redisClient *redis.Client, activityPath string) *ActivityHub { + return &ActivityHub{ + clients: make(map[int64]map[*ActivityConn]struct{}), + subscriptions: make(map[string]map[*ActivityConn]struct{}), + redisClient: redisClient, + activityPath: activityPath, + } +} + +// ActivityPath 返回配置的 WebSocket 路径 +func (h *ActivityHub) ActivityPath() string { + return h.activityPath +} + +// Run 启动 Redis PSubscribe,收到 publish 后 fanout 到本地连接 +func (h *ActivityHub) Run(ctx context.Context) { + if h.redisClient == nil { + logger.Logger.Warn("ActivityHub: redisClient is nil, Pub/Sub fanout disabled") + <-ctx.Done() + return + } + sub := h.redisClient.PSubscribe(ctx, "act:*:messages", "act:*:contributions") + defer sub.Close() + ch := sub.Channel() + logger.Logger.Info("ActivityHub subscribed to Redis Pub/Sub channels") + for { + select { + case <-ctx.Done(): + logger.Logger.Info("ActivityHub Run loop exiting due to context done") + return + case msg, ok := <-ch: + if !ok { + logger.Logger.Warn("ActivityHub Redis Pub/Sub channel closed") + return + } + var payload map[string]interface{} + if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil { + logger.Logger.Error("ActivityHub failed to unmarshal pubsub payload", zap.Error(err)) + continue + } + h.fanout(msg.Channel, payload) + } + } +} + +// fanout 把 payload 推送到订阅该 channel 的所有本地连接 +func (h *ActivityHub) fanout(channel string, payload map[string]interface{}) { + h.mu.RLock() + conns := h.subscriptions[channel] + targets := make([]*ActivityConn, 0, len(conns)) + for c := range conns { + targets = append(targets, c) + } + h.mu.RUnlock() + + for _, c := range targets { + if err := c.writeJSON(payload); err != nil { + logger.Logger.Error("ActivityHub writeJSON failed", zap.Int64("user_id", c.UserID), zap.Error(err)) + } + } +} + +// HandleWebSocket 处理 /activity 握手 +func (h *ActivityHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { + token := r.URL.Query().Get("token") + + userID, starID, err := h.validateToken(token) + if err != nil { + logger.Logger.Error("Activity WebSocket token validation failed", zap.Error(err)) + w.WriteHeader(http.StatusUnauthorized) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "type": "auth_response", + "success": false, + "error": "invalid_token", + }) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logger.Logger.Error("Activity WebSocket upgrade failed", zap.Error(err)) + return + } + + c := &ActivityConn{ + UserID: userID, + StarID: starID, + Conn: conn, + Send: make(chan []byte, 256), + Hub: h, + } + + h.mu.Lock() + if h.clients[userID] == nil { + h.clients[userID] = make(map[*ActivityConn]struct{}) + } + h.clients[userID][c] = struct{}{} + h.mu.Unlock() + + logger.Logger.Info("Activity WebSocket connection established", + zap.Int64("user_id", userID), + zap.Int64("star_id", starID), + ) + + // 立即推送 auth_response + _ = conn.WriteJSON(map[string]interface{}{ + "type": "auth_response", + "success": true, + "user_id": userID, + "star_id": starID, + }) + + go c.readPump() + go c.writePump() +} + +// validateToken 验证 token(JWT) +func (h *ActivityHub) validateToken(token string) (int64, int64, error) { + if strings.HasPrefix(token, "Bearer_") { + token = strings.TrimPrefix(token, "Bearer_") + } + claims, err := jwt.ParseToken(token) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse token: %w", err) + } + if claims.UserID == 0 { + return 0, 0, fmt.Errorf("invalid user id") + } + return claims.UserID, claims.StarID, nil +} + +// readPump 读取客户端消息 +func (c *ActivityConn) readPump() { + defer func() { + c.Hub.unregister(c) + c.Conn.Close() + }() + + c.Conn.SetReadLimit(64 * 1024) // 64KB + c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.Conn.SetPongHandler(func(string) error { + c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + + for { + _, message, err := c.Conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + logger.Logger.Error("Activity WebSocket read error", zap.Error(err)) + } + break + } + + var msg map[string]interface{} + if err := json.Unmarshal(message, &msg); err != nil { + logger.Logger.Error("Failed to parse activity message", zap.Error(err)) + continue + } + c.handleMessage(msg) + } +} + +// writePump 写消息到客户端 +func (c *ActivityConn) writePump() { + ticker := time.NewTicker(30 * time.Second) + defer func() { + ticker.Stop() + c.Conn.Close() + }() + + for { + select { + case message, ok := <-c.Send: + c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if !ok { + _ = c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + w, err := c.Conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + _, _ = w.Write(message) + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// handleMessage 处理客户端 subscribe/unsubscribe/ping +func (c *ActivityConn) handleMessage(msg map[string]interface{}) { + action, _ := msg["action"].(string) + switch action { + case "ping": + c.Send <- []byte(`{"type":"pong"}`) + + case "subscribe": + activityID := toInt64(msg["activity_id"]) + topics := toStringSlice(msg["topics"]) + c.Hub.subscribe(c, activityID, topics) + _ = c.writeJSON(map[string]interface{}{ + "type": "subscribe_response", + "activity_id": activityID, + "topics": topics, + }) + + case "unsubscribe": + activityID := toInt64(msg["activity_id"]) + topics := toStringSlice(msg["topics"]) + c.Hub.unsubscribe(c, activityID, topics) + _ = c.writeJSON(map[string]interface{}{ + "type": "unsubscribe_response", + "activity_id": activityID, + "topics": topics, + }) + + default: + logger.Logger.Warn("Unknown activity action", zap.String("action", action)) + } +} + +// subscribe 幂等订阅 +func (h *ActivityHub) subscribe(c *ActivityConn, activityID int64, topics []string) { + if activityID <= 0 || len(topics) == 0 { + return + } + h.mu.Lock() + defer h.mu.Unlock() + for _, t := range topics { + ch := fmt.Sprintf("act:%d:%s", activityID, t) + if h.subscriptions[ch] == nil { + h.subscriptions[ch] = make(map[*ActivityConn]struct{}) + } + h.subscriptions[ch][c] = struct{}{} + } +} + +// unsubscribe 幂等取消订阅 +func (h *ActivityHub) unsubscribe(c *ActivityConn, activityID int64, topics []string) { + if activityID <= 0 || len(topics) == 0 { + return + } + h.mu.Lock() + defer h.mu.Unlock() + for _, t := range topics { + ch := fmt.Sprintf("act:%d:%s", activityID, t) + if conns, ok := h.subscriptions[ch]; ok { + delete(conns, c) + if len(conns) == 0 { + delete(h.subscriptions, ch) + } + } + } +} + +// unregister 断开时清理 +func (h *ActivityHub) unregister(c *ActivityConn) { + h.mu.Lock() + defer h.mu.Unlock() + if conns, ok := h.clients[c.UserID]; ok { + delete(conns, c) + if len(conns) == 0 { + delete(h.clients, c.UserID) + } + } + for ch, conns := range h.subscriptions { + if _, ok := conns[c]; ok { + delete(conns, c) + if len(conns) == 0 { + delete(h.subscriptions, ch) + } + } + } +} + +// Close 关闭所有连接 +func (h *ActivityHub) Close() { + h.mu.Lock() + defer h.mu.Unlock() + for _, conns := range h.clients { + for c := range conns { + _ = c.Conn.Close() + } + } +} + +// helper +func toInt64(v interface{}) int64 { + switch x := v.(type) { + case float64: + return int64(x) + case int64: + return x + case int: + return int64(x) + case string: + i, _ := strconv.ParseInt(x, 10, 64) + return i + } + return 0 +} + +func toStringSlice(v interface{}) []string { + arr, ok := v.([]interface{}) + if !ok { + return nil + } + out := make([]string, 0, len(arr)) + for _, item := range arr { + if s, ok := item.(string); ok { + out = append(out, s) + } + } + return out +} diff --git a/backend/migrations/2026_06_22_012_activity_messages.sql b/backend/migrations/2026_06_22_012_activity_messages.sql new file mode 100644 index 0000000..12010ef --- /dev/null +++ b/backend/migrations/2026_06_22_012_activity_messages.sql @@ -0,0 +1,52 @@ +-- 2026_06_22_012_activity_messages.sql +BEGIN; + +CREATE TABLE IF NOT EXISTS public.activity_messages ( + id BIGSERIAL PRIMARY KEY, + activity_id BIGINT NOT NULL, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + nickname VARCHAR(50), -- 缓存昵称(写入时回查,避免读时 RPC 反查) + avatar_url VARCHAR(500), -- 缓存头像 URL + content VARCHAR(500) NOT NULL, + status SMALLINT NOT NULL DEFAULT 0, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + deleted_at TIMESTAMP WITH TIME ZONE, -- gorm.DeletedAt 软删除时间戳 + CONSTRAINT fk_messages_activity + FOREIGN KEY (activity_id) REFERENCES public.activities(id) ON DELETE CASCADE, + CONSTRAINT fk_messages_user + FOREIGN KEY (user_id) REFERENCES public.users(id), + CONSTRAINT fk_messages_star + FOREIGN KEY (star_id) REFERENCES public.stars(star_id), + CONSTRAINT chk_activity_messages_status CHECK (status BETWEEN 0 AND 2) +); + +ALTER SEQUENCE activity_messages_id_seq RESTART WITH 10000; + +CREATE INDEX IF NOT EXISTS idx_activity_messages_activity_created + ON public.activity_messages (activity_id, created_at DESC, id DESC) + WHERE deleted_at IS NULL; + +CREATE INDEX IF NOT EXISTS idx_activity_messages_user_created + ON public.activity_messages (user_id, created_at DESC) + WHERE deleted_at IS NULL; + +CREATE INDEX IF NOT EXISTS idx_activity_messages_activity_incr + ON public.activity_messages (activity_id, id DESC) + WHERE deleted_at IS NULL; + +COMMENT ON TABLE public.activity_messages IS '活动留言表'; +COMMENT ON COLUMN public.activity_messages.id IS '主键,自增'; +COMMENT ON COLUMN public.activity_messages.activity_id IS 'FK -> activities.id'; +COMMENT ON COLUMN public.activity_messages.user_id IS '留言用户 ID'; +COMMENT ON COLUMN public.activity_messages.star_id IS '所属明星/星球 ID'; +COMMENT ON COLUMN public.activity_messages.nickname IS '缓存昵称(写入时回查,避免读时 RPC 反查)'; +COMMENT ON COLUMN public.activity_messages.avatar_url IS '缓存头像 URL'; +COMMENT ON COLUMN public.activity_messages.content IS '留言正文,1-500 字'; +COMMENT ON COLUMN public.activity_messages.status IS '0=正常|1=隐藏|2=已删除'; +COMMENT ON COLUMN public.activity_messages.created_at IS '留言时间,毫秒时间戳'; +COMMENT ON COLUMN public.activity_messages.updated_at IS '更新时间,毫秒时间戳'; +COMMENT ON COLUMN public.activity_messages.deleted_at IS '软删除时间戳(gorm.DeletedAt,NULL=未删除)'; + +COMMIT; diff --git a/backend/pkg/errors/errors.go b/backend/pkg/errors/errors.go index 87fe069..eb26c84 100644 --- a/backend/pkg/errors/errors.go +++ b/backend/pkg/errors/errors.go @@ -68,6 +68,15 @@ var ( ErrActivityNotFound = errors.New("活动不存在") ErrActivityItemNotFound = errors.New("活动道具不存在") + // 活动留言相关错误 + ErrActivityMessageNotFound = errors.New("活动留言不存在") + ErrActivityMessageTooFrequent = errors.New("留言太频繁,请稍后再试") + ErrActivityMessageLimitReached = errors.New("当前活动留言已达上限") + ErrActivityMessageContentEmpty = errors.New("留言内容不能为空") + ErrActivityMessageContentTooLong = errors.New("留言内容过长,最多500字") + ErrActivityMessageContentInvalid = errors.New("留言内容包含不当内容") + ErrActivityMessageActivityInactive = errors.New("活动不在进行中") + // 星册服务相关错误 ErrCollectionAssetNotFound = errors.New("典藏藏品不存在") ErrActivityAssetNotFound = errors.New("活动藏品不存在") @@ -122,6 +131,23 @@ func ToGRPCCode(err error) codes.Code { return codes.NotFound case errors.Is(err, ErrInvalidAssetType): return codes.InvalidArgument + case errors.Is(err, ErrActivityMessageNotFound): + return codes.NotFound + case errors.Is(err, ErrActivityMessageTooFrequent): + return codes.ResourceExhausted + case errors.Is(err, ErrActivityMessageLimitReached): + return codes.ResourceExhausted + case errors.Is(err, ErrActivityMessageContentEmpty): + return codes.InvalidArgument + case errors.Is(err, ErrActivityMessageContentTooLong): + return codes.InvalidArgument + case errors.Is(err, ErrActivityMessageContentInvalid): + // 内容含敏感词 → 参数级校验错(3);不是权限不足,不能映射到 PermissionDenied(7), + // 否则前端 api.js 会把它当成"账号被封"清缓存跳登录页。 + return codes.InvalidArgument + case errors.Is(err, ErrActivityMessageActivityInactive): + // 活动未在进行中 → 系统前置条件不满足(9);同样避免 PermissionDenied(7) 的副作用。 + return codes.FailedPrecondition default: return codes.Internal } diff --git a/backend/pkg/models/activity_message.go b/backend/pkg/models/activity_message.go new file mode 100644 index 0000000..8e8c3ff --- /dev/null +++ b/backend/pkg/models/activity_message.go @@ -0,0 +1,23 @@ +package models + +import "gorm.io/gorm" + +// ActivityMessage 活动留言 +type ActivityMessage struct { + ID int64 `json:"id" gorm:"primaryKey;autoIncrement"` + ActivityID int64 `json:"activity_id" gorm:"not null;index"` + UserID int64 `json:"user_id" gorm:"not null;index"` + StarID int64 `json:"star_id" gorm:"not null"` + Nickname string `json:"nickname" gorm:"size:50"` + AvatarURL string `json:"avatar_url" gorm:"size:500"` + Content string `json:"content" gorm:"type:varchar(500);not null"` + Status int16 `json:"status" gorm:"default:0"` + CreatedAt int64 `json:"created_at" gorm:"not null"` + UpdatedAt int64 `json:"updated_at" gorm:"not null"` + DeletedAt gorm.DeletedAt `json:"deleted_at,omitzero" gorm:"column:deleted_at;index"` +} + +// TableName 表名 +func (ActivityMessage) TableName() string { + return "activity_messages" +} \ No newline at end of file diff --git a/backend/pkg/proto/activity/activity.pb.go b/backend/pkg/proto/activity/activity.pb.go index 69bd7ad..63d92a9 100644 --- a/backend/pkg/proto/activity/activity.pb.go +++ b/backend/pkg/proto/activity/activity.pb.go @@ -1898,6 +1898,362 @@ func (x *GetLatestContributionsResponse) GetRecords() []*ContributionRecord { return nil } +type ActivityMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + ActivityId int64 `protobuf:"varint,2,opt,name=activity_id,json=activityId,proto3" json:"activity_id,omitempty"` + UserId int64 `protobuf:"varint,3,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + StarId int64 `protobuf:"varint,4,opt,name=star_id,json=starId,proto3" json:"star_id,omitempty"` + Nickname string `protobuf:"bytes,5,opt,name=nickname,proto3" json:"nickname,omitempty"` + AvatarUrl string `protobuf:"bytes,6,opt,name=avatar_url,json=avatarUrl,proto3" json:"avatar_url,omitempty"` + Content string `protobuf:"bytes,7,opt,name=content,proto3" json:"content,omitempty"` + CreatedAt int64 `protobuf:"varint,8,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ActivityMessage) Reset() { + *x = ActivityMessage{} + mi := &file_activity_proto_msgTypes[23] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ActivityMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ActivityMessage) ProtoMessage() {} + +func (x *ActivityMessage) ProtoReflect() protoreflect.Message { + mi := &file_activity_proto_msgTypes[23] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ActivityMessage.ProtoReflect.Descriptor instead. +func (*ActivityMessage) Descriptor() ([]byte, []int) { + return file_activity_proto_rawDescGZIP(), []int{23} +} + +func (x *ActivityMessage) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *ActivityMessage) GetActivityId() int64 { + if x != nil { + return x.ActivityId + } + return 0 +} + +func (x *ActivityMessage) GetUserId() int64 { + if x != nil { + return x.UserId + } + return 0 +} + +func (x *ActivityMessage) GetStarId() int64 { + if x != nil { + return x.StarId + } + return 0 +} + +func (x *ActivityMessage) GetNickname() string { + if x != nil { + return x.Nickname + } + return "" +} + +func (x *ActivityMessage) GetAvatarUrl() string { + if x != nil { + return x.AvatarUrl + } + return "" +} + +func (x *ActivityMessage) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + +func (x *ActivityMessage) GetCreatedAt() int64 { + if x != nil { + return x.CreatedAt + } + return 0 +} + +type ListActivityMessagesRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + ActivityId int64 `protobuf:"varint,1,opt,name=activity_id,json=activityId,proto3" json:"activity_id,omitempty"` + Page int32 `protobuf:"varint,2,opt,name=page,proto3" json:"page,omitempty"` + PageSize int32 `protobuf:"varint,3,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListActivityMessagesRequest) Reset() { + *x = ListActivityMessagesRequest{} + mi := &file_activity_proto_msgTypes[24] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListActivityMessagesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListActivityMessagesRequest) ProtoMessage() {} + +func (x *ListActivityMessagesRequest) ProtoReflect() protoreflect.Message { + mi := &file_activity_proto_msgTypes[24] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListActivityMessagesRequest.ProtoReflect.Descriptor instead. +func (*ListActivityMessagesRequest) Descriptor() ([]byte, []int) { + return file_activity_proto_rawDescGZIP(), []int{24} +} + +func (x *ListActivityMessagesRequest) GetActivityId() int64 { + if x != nil { + return x.ActivityId + } + return 0 +} + +func (x *ListActivityMessagesRequest) GetPage() int32 { + if x != nil { + return x.Page + } + return 0 +} + +func (x *ListActivityMessagesRequest) GetPageSize() int32 { + if x != nil { + return x.PageSize + } + return 0 +} + +type ListActivityMessagesResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Base *common.BaseResponse `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Messages []*ActivityMessage `protobuf:"bytes,2,rep,name=messages,proto3" json:"messages,omitempty"` + Page int32 `protobuf:"varint,3,opt,name=page,proto3" json:"page,omitempty"` + PageSize int32 `protobuf:"varint,4,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + Total int32 `protobuf:"varint,5,opt,name=total,proto3" json:"total,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ListActivityMessagesResponse) Reset() { + *x = ListActivityMessagesResponse{} + mi := &file_activity_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ListActivityMessagesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListActivityMessagesResponse) ProtoMessage() {} + +func (x *ListActivityMessagesResponse) ProtoReflect() protoreflect.Message { + mi := &file_activity_proto_msgTypes[25] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListActivityMessagesResponse.ProtoReflect.Descriptor instead. +func (*ListActivityMessagesResponse) Descriptor() ([]byte, []int) { + return file_activity_proto_rawDescGZIP(), []int{25} +} + +func (x *ListActivityMessagesResponse) GetBase() *common.BaseResponse { + if x != nil { + return x.Base + } + return nil +} + +func (x *ListActivityMessagesResponse) GetMessages() []*ActivityMessage { + if x != nil { + return x.Messages + } + return nil +} + +func (x *ListActivityMessagesResponse) GetPage() int32 { + if x != nil { + return x.Page + } + return 0 +} + +func (x *ListActivityMessagesResponse) GetPageSize() int32 { + if x != nil { + return x.PageSize + } + return 0 +} + +func (x *ListActivityMessagesResponse) GetTotal() int32 { + if x != nil { + return x.Total + } + return 0 +} + +type CreateActivityMessageRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + ActivityId int64 `protobuf:"varint,1,opt,name=activity_id,json=activityId,proto3" json:"activity_id,omitempty"` + UserId int64 `protobuf:"varint,2,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + StarId int64 `protobuf:"varint,3,opt,name=star_id,json=starId,proto3" json:"star_id,omitempty"` + Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CreateActivityMessageRequest) Reset() { + *x = CreateActivityMessageRequest{} + mi := &file_activity_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CreateActivityMessageRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateActivityMessageRequest) ProtoMessage() {} + +func (x *CreateActivityMessageRequest) ProtoReflect() protoreflect.Message { + mi := &file_activity_proto_msgTypes[26] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateActivityMessageRequest.ProtoReflect.Descriptor instead. +func (*CreateActivityMessageRequest) Descriptor() ([]byte, []int) { + return file_activity_proto_rawDescGZIP(), []int{26} +} + +func (x *CreateActivityMessageRequest) GetActivityId() int64 { + if x != nil { + return x.ActivityId + } + return 0 +} + +func (x *CreateActivityMessageRequest) GetUserId() int64 { + if x != nil { + return x.UserId + } + return 0 +} + +func (x *CreateActivityMessageRequest) GetStarId() int64 { + if x != nil { + return x.StarId + } + return 0 +} + +func (x *CreateActivityMessageRequest) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + +type CreateActivityMessageResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Base *common.BaseResponse `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Message *ActivityMessage `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CreateActivityMessageResponse) Reset() { + *x = CreateActivityMessageResponse{} + mi := &file_activity_proto_msgTypes[27] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CreateActivityMessageResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateActivityMessageResponse) ProtoMessage() {} + +func (x *CreateActivityMessageResponse) ProtoReflect() protoreflect.Message { + mi := &file_activity_proto_msgTypes[27] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateActivityMessageResponse.ProtoReflect.Descriptor instead. +func (*CreateActivityMessageResponse) Descriptor() ([]byte, []int) { + return file_activity_proto_rawDescGZIP(), []int{27} +} + +func (x *CreateActivityMessageResponse) GetBase() *common.BaseResponse { + if x != nil { + return x.Base + } + return nil +} + +func (x *CreateActivityMessageResponse) GetMessage() *ActivityMessage { + if x != nil { + return x.Message + } + return nil +} + var File_activity_proto protoreflect.FileDescriptor const file_activity_proto_rawDesc = "" + @@ -2077,8 +2433,39 @@ const file_activity_proto_rawDesc = "" + "created_at\x18\f \x01(\x03R\tcreatedAt\"\x92\x01\n" + "\x1eGetLatestContributionsResponse\x120\n" + "\x04base\x18\x01 \x01(\v2\x1c.topfans.common.BaseResponseR\x04base\x12>\n" + - "\arecords\x18\x02 \x03(\v2$.topfans.activity.ContributionRecordR\arecords2\xf9\n" + + "\arecords\x18\x02 \x03(\v2$.topfans.activity.ContributionRecordR\arecords\"\xe8\x01\n" + + "\x0fActivityMessage\x12\x0e\n" + + "\x02id\x18\x01 \x01(\x03R\x02id\x12\x1f\n" + + "\vactivity_id\x18\x02 \x01(\x03R\n" + + "activityId\x12\x17\n" + + "\auser_id\x18\x03 \x01(\x03R\x06userId\x12\x17\n" + + "\astar_id\x18\x04 \x01(\x03R\x06starId\x12\x1a\n" + + "\bnickname\x18\x05 \x01(\tR\bnickname\x12\x1d\n" + "\n" + + "avatar_url\x18\x06 \x01(\tR\tavatarUrl\x12\x18\n" + + "\acontent\x18\a \x01(\tR\acontent\x12\x1d\n" + + "\n" + + "created_at\x18\b \x01(\x03R\tcreatedAt\"o\n" + + "\x1bListActivityMessagesRequest\x12\x1f\n" + + "\vactivity_id\x18\x01 \x01(\x03R\n" + + "activityId\x12\x12\n" + + "\x04page\x18\x02 \x01(\x05R\x04page\x12\x1b\n" + + "\tpage_size\x18\x03 \x01(\x05R\bpageSize\"\xd6\x01\n" + + "\x1cListActivityMessagesResponse\x120\n" + + "\x04base\x18\x01 \x01(\v2\x1c.topfans.common.BaseResponseR\x04base\x12=\n" + + "\bmessages\x18\x02 \x03(\v2!.topfans.activity.ActivityMessageR\bmessages\x12\x12\n" + + "\x04page\x18\x03 \x01(\x05R\x04page\x12\x1b\n" + + "\tpage_size\x18\x04 \x01(\x05R\bpageSize\x12\x14\n" + + "\x05total\x18\x05 \x01(\x05R\x05total\"\x8b\x01\n" + + "\x1cCreateActivityMessageRequest\x12\x1f\n" + + "\vactivity_id\x18\x01 \x01(\x03R\n" + + "activityId\x12\x17\n" + + "\auser_id\x18\x02 \x01(\x03R\x06userId\x12\x17\n" + + "\astar_id\x18\x03 \x01(\x03R\x06starId\x12\x18\n" + + "\acontent\x18\x04 \x01(\tR\acontent\"\x8e\x01\n" + + "\x1dCreateActivityMessageResponse\x120\n" + + "\x04base\x18\x01 \x01(\v2\x1c.topfans.common.BaseResponseR\x04base\x12;\n" + + "\amessage\x18\x02 \x01(\v2!.topfans.activity.ActivityMessageR\amessage2\xd5\r\n" + "\x0fActivityService\x12\x82\x01\n" + "\x0fGetActivityList\x12(.topfans.activity.GetActivityListRequest\x1a).topfans.activity.GetActivityListResponse\"\x1a\x82\xd3\xe4\x93\x02\x14\x12\x12/api/v1/activities\x12y\n" + "\vGetActivity\x12$.topfans.activity.GetProgressRequest\x1a\x1a.topfans.activity.Activity\"(\x82\xd3\xe4\x93\x02\"\x12 /api/v1/activities/{activity_id}\x12\x91\x01\n" + @@ -2088,7 +2475,9 @@ const file_activity_proto_rawDesc = "" + "\x11BatchPurchaseItem\x12*.topfans.activity.BatchPurchaseItemRequest\x1a+.topfans.activity.BatchPurchaseItemResponse\":\x82\xd3\xe4\x93\x024:\x01*\"//api/v1/activities/{activity_id}/batch-purchase\x12\xa7\x01\n" + "\x16GetContributionRanking\x12,.topfans.activity.ContributionRankingRequest\x1a-.topfans.activity.ContributionRankingResponse\"0\x82\xd3\xe4\x93\x02*\x12(/api/v1/activities/{activity_id}/ranking\x12\x99\x01\n" + "\x14GetMintingActivities\x12-.topfans.activity.GetMintingActivitiesRequest\x1a..topfans.activity.GetMintingActivitiesResponse\"\"\x82\xd3\xe4\x93\x02\x1c\x12\x1a/api/v1/minting-activities\x12\xba\x01\n" + - "\x16GetLatestContributions\x12/.topfans.activity.GetLatestContributionsRequest\x1a0.topfans.activity.GetLatestContributionsResponse\"=\x82\xd3\xe4\x93\x027\x125/api/v1/activities/{activity_id}/contributions/latestB8Z6github.com/topfans/backend/pkg/proto/activity;activityb\x06proto3" + "\x16GetLatestContributions\x12/.topfans.activity.GetLatestContributionsRequest\x1a0.topfans.activity.GetLatestContributionsResponse\"=\x82\xd3\xe4\x93\x027\x125/api/v1/activities/{activity_id}/contributions/latest\x12\xa8\x01\n" + + "\x14ListActivityMessages\x12-.topfans.activity.ListActivityMessagesRequest\x1a..topfans.activity.ListActivityMessagesResponse\"1\x82\xd3\xe4\x93\x02+\x12)/api/v1/activities/{activity_id}/messages\x12\xae\x01\n" + + "\x15CreateActivityMessage\x12..topfans.activity.CreateActivityMessageRequest\x1a/.topfans.activity.CreateActivityMessageResponse\"4\x82\xd3\xe4\x93\x02.:\x01*\")/api/v1/activities/{activity_id}/messagesB8Z6github.com/topfans/backend/pkg/proto/activity;activityb\x06proto3" var ( file_activity_proto_rawDescOnce sync.Once @@ -2102,7 +2491,7 @@ func file_activity_proto_rawDescGZIP() []byte { return file_activity_proto_rawDescData } -var file_activity_proto_msgTypes = make([]protoimpl.MessageInfo, 23) +var file_activity_proto_msgTypes = make([]protoimpl.MessageInfo, 28) var file_activity_proto_goTypes = []any{ (*Activity)(nil), // 0: topfans.activity.Activity (*ActivityItem)(nil), // 1: topfans.activity.ActivityItem @@ -2127,48 +2516,61 @@ var file_activity_proto_goTypes = []any{ (*GetLatestContributionsRequest)(nil), // 20: topfans.activity.GetLatestContributionsRequest (*ContributionRecord)(nil), // 21: topfans.activity.ContributionRecord (*GetLatestContributionsResponse)(nil), // 22: topfans.activity.GetLatestContributionsResponse - (*common.BaseResponse)(nil), // 23: topfans.common.BaseResponse + (*ActivityMessage)(nil), // 23: topfans.activity.ActivityMessage + (*ListActivityMessagesRequest)(nil), // 24: topfans.activity.ListActivityMessagesRequest + (*ListActivityMessagesResponse)(nil), // 25: topfans.activity.ListActivityMessagesResponse + (*CreateActivityMessageRequest)(nil), // 26: topfans.activity.CreateActivityMessageRequest + (*CreateActivityMessageResponse)(nil), // 27: topfans.activity.CreateActivityMessageResponse + (*common.BaseResponse)(nil), // 28: topfans.common.BaseResponse } var file_activity_proto_depIdxs = []int32{ 1, // 0: topfans.activity.Activity.items:type_name -> topfans.activity.ActivityItem 1, // 1: topfans.activity.ActivityItemsResponse.items:type_name -> topfans.activity.ActivityItem - 23, // 2: topfans.activity.PurchaseItemResponse.base:type_name -> topfans.common.BaseResponse + 28, // 2: topfans.activity.PurchaseItemResponse.base:type_name -> topfans.common.BaseResponse 5, // 3: topfans.activity.BatchPurchaseItemRequest.items:type_name -> topfans.activity.PurchaseItem - 23, // 4: topfans.activity.BatchPurchaseItemResponse.base:type_name -> topfans.common.BaseResponse + 28, // 4: topfans.activity.BatchPurchaseItemResponse.base:type_name -> topfans.common.BaseResponse 8, // 5: topfans.activity.BatchPurchaseItemResponse.fails:type_name -> topfans.activity.PurchaseFailItem - 23, // 6: topfans.activity.ContributionRankingResponse.base:type_name -> topfans.common.BaseResponse + 28, // 6: topfans.activity.ContributionRankingResponse.base:type_name -> topfans.common.BaseResponse 10, // 7: topfans.activity.ContributionRankingResponse.items:type_name -> topfans.activity.ContributionRankingItem 12, // 8: topfans.activity.ContributionRankingResponse.my_contribution:type_name -> topfans.activity.MyContribution - 23, // 9: topfans.activity.GetActivityListResponse.base:type_name -> topfans.common.BaseResponse + 28, // 9: topfans.activity.GetActivityListResponse.base:type_name -> topfans.common.BaseResponse 0, // 10: topfans.activity.GetActivityListResponse.activities:type_name -> topfans.activity.Activity - 23, // 11: topfans.activity.GetProgressResponse.base:type_name -> topfans.common.BaseResponse - 23, // 12: topfans.activity.GetMintingActivitiesResponse.base:type_name -> topfans.common.BaseResponse + 28, // 11: topfans.activity.GetProgressResponse.base:type_name -> topfans.common.BaseResponse + 28, // 12: topfans.activity.GetMintingActivitiesResponse.base:type_name -> topfans.common.BaseResponse 17, // 13: topfans.activity.GetMintingActivitiesResponse.activities:type_name -> topfans.activity.MintingActivity - 23, // 14: topfans.activity.GetLatestContributionsResponse.base:type_name -> topfans.common.BaseResponse + 28, // 14: topfans.activity.GetLatestContributionsResponse.base:type_name -> topfans.common.BaseResponse 21, // 15: topfans.activity.GetLatestContributionsResponse.records:type_name -> topfans.activity.ContributionRecord - 13, // 16: topfans.activity.ActivityService.GetActivityList:input_type -> topfans.activity.GetActivityListRequest - 15, // 17: topfans.activity.ActivityService.GetActivity:input_type -> topfans.activity.GetProgressRequest - 15, // 18: topfans.activity.ActivityService.GetActivityItems:input_type -> topfans.activity.GetProgressRequest - 15, // 19: topfans.activity.ActivityService.GetProgress:input_type -> topfans.activity.GetProgressRequest - 3, // 20: topfans.activity.ActivityService.PurchaseItem:input_type -> topfans.activity.PurchaseItemRequest - 6, // 21: topfans.activity.ActivityService.BatchPurchaseItem:input_type -> topfans.activity.BatchPurchaseItemRequest - 9, // 22: topfans.activity.ActivityService.GetContributionRanking:input_type -> topfans.activity.ContributionRankingRequest - 18, // 23: topfans.activity.ActivityService.GetMintingActivities:input_type -> topfans.activity.GetMintingActivitiesRequest - 20, // 24: topfans.activity.ActivityService.GetLatestContributions:input_type -> topfans.activity.GetLatestContributionsRequest - 14, // 25: topfans.activity.ActivityService.GetActivityList:output_type -> topfans.activity.GetActivityListResponse - 0, // 26: topfans.activity.ActivityService.GetActivity:output_type -> topfans.activity.Activity - 2, // 27: topfans.activity.ActivityService.GetActivityItems:output_type -> topfans.activity.ActivityItemsResponse - 16, // 28: topfans.activity.ActivityService.GetProgress:output_type -> topfans.activity.GetProgressResponse - 4, // 29: topfans.activity.ActivityService.PurchaseItem:output_type -> topfans.activity.PurchaseItemResponse - 7, // 30: topfans.activity.ActivityService.BatchPurchaseItem:output_type -> topfans.activity.BatchPurchaseItemResponse - 11, // 31: topfans.activity.ActivityService.GetContributionRanking:output_type -> topfans.activity.ContributionRankingResponse - 19, // 32: topfans.activity.ActivityService.GetMintingActivities:output_type -> topfans.activity.GetMintingActivitiesResponse - 22, // 33: topfans.activity.ActivityService.GetLatestContributions:output_type -> topfans.activity.GetLatestContributionsResponse - 25, // [25:34] is the sub-list for method output_type - 16, // [16:25] is the sub-list for method input_type - 16, // [16:16] is the sub-list for extension type_name - 16, // [16:16] is the sub-list for extension extendee - 0, // [0:16] is the sub-list for field type_name + 28, // 16: topfans.activity.ListActivityMessagesResponse.base:type_name -> topfans.common.BaseResponse + 23, // 17: topfans.activity.ListActivityMessagesResponse.messages:type_name -> topfans.activity.ActivityMessage + 28, // 18: topfans.activity.CreateActivityMessageResponse.base:type_name -> topfans.common.BaseResponse + 23, // 19: topfans.activity.CreateActivityMessageResponse.message:type_name -> topfans.activity.ActivityMessage + 13, // 20: topfans.activity.ActivityService.GetActivityList:input_type -> topfans.activity.GetActivityListRequest + 15, // 21: topfans.activity.ActivityService.GetActivity:input_type -> topfans.activity.GetProgressRequest + 15, // 22: topfans.activity.ActivityService.GetActivityItems:input_type -> topfans.activity.GetProgressRequest + 15, // 23: topfans.activity.ActivityService.GetProgress:input_type -> topfans.activity.GetProgressRequest + 3, // 24: topfans.activity.ActivityService.PurchaseItem:input_type -> topfans.activity.PurchaseItemRequest + 6, // 25: topfans.activity.ActivityService.BatchPurchaseItem:input_type -> topfans.activity.BatchPurchaseItemRequest + 9, // 26: topfans.activity.ActivityService.GetContributionRanking:input_type -> topfans.activity.ContributionRankingRequest + 18, // 27: topfans.activity.ActivityService.GetMintingActivities:input_type -> topfans.activity.GetMintingActivitiesRequest + 20, // 28: topfans.activity.ActivityService.GetLatestContributions:input_type -> topfans.activity.GetLatestContributionsRequest + 24, // 29: topfans.activity.ActivityService.ListActivityMessages:input_type -> topfans.activity.ListActivityMessagesRequest + 26, // 30: topfans.activity.ActivityService.CreateActivityMessage:input_type -> topfans.activity.CreateActivityMessageRequest + 14, // 31: topfans.activity.ActivityService.GetActivityList:output_type -> topfans.activity.GetActivityListResponse + 0, // 32: topfans.activity.ActivityService.GetActivity:output_type -> topfans.activity.Activity + 2, // 33: topfans.activity.ActivityService.GetActivityItems:output_type -> topfans.activity.ActivityItemsResponse + 16, // 34: topfans.activity.ActivityService.GetProgress:output_type -> topfans.activity.GetProgressResponse + 4, // 35: topfans.activity.ActivityService.PurchaseItem:output_type -> topfans.activity.PurchaseItemResponse + 7, // 36: topfans.activity.ActivityService.BatchPurchaseItem:output_type -> topfans.activity.BatchPurchaseItemResponse + 11, // 37: topfans.activity.ActivityService.GetContributionRanking:output_type -> topfans.activity.ContributionRankingResponse + 19, // 38: topfans.activity.ActivityService.GetMintingActivities:output_type -> topfans.activity.GetMintingActivitiesResponse + 22, // 39: topfans.activity.ActivityService.GetLatestContributions:output_type -> topfans.activity.GetLatestContributionsResponse + 25, // 40: topfans.activity.ActivityService.ListActivityMessages:output_type -> topfans.activity.ListActivityMessagesResponse + 27, // 41: topfans.activity.ActivityService.CreateActivityMessage:output_type -> topfans.activity.CreateActivityMessageResponse + 31, // [31:42] is the sub-list for method output_type + 20, // [20:31] is the sub-list for method input_type + 20, // [20:20] is the sub-list for extension type_name + 20, // [20:20] is the sub-list for extension extendee + 0, // [0:20] is the sub-list for field type_name } func init() { file_activity_proto_init() } @@ -2182,7 +2584,7 @@ func file_activity_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_activity_proto_rawDesc), len(file_activity_proto_rawDesc)), NumEnums: 0, - NumMessages: 23, + NumMessages: 28, NumExtensions: 0, NumServices: 1, }, diff --git a/backend/pkg/proto/activity/activity.triple.go b/backend/pkg/proto/activity/activity.triple.go index 54c4030..bf9feda 100644 --- a/backend/pkg/proto/activity/activity.triple.go +++ b/backend/pkg/proto/activity/activity.triple.go @@ -54,6 +54,10 @@ const ( ActivityServiceGetMintingActivitiesProcedure = "/topfans.activity.ActivityService/GetMintingActivities" // ActivityServiceGetLatestContributionsProcedure is the fully-qualified name of the ActivityService's GetLatestContributions RPC. ActivityServiceGetLatestContributionsProcedure = "/topfans.activity.ActivityService/GetLatestContributions" + // ActivityServiceListActivityMessagesProcedure is the fully-qualified name of the ActivityService's ListActivityMessages RPC. + ActivityServiceListActivityMessagesProcedure = "/topfans.activity.ActivityService/ListActivityMessages" + // ActivityServiceCreateActivityMessageProcedure is the fully-qualified name of the ActivityService's CreateActivityMessage RPC. + ActivityServiceCreateActivityMessageProcedure = "/topfans.activity.ActivityService/CreateActivityMessage" ) var ( @@ -71,6 +75,8 @@ type ActivityService interface { GetContributionRanking(ctx context.Context, req *ContributionRankingRequest, opts ...client.CallOption) (*ContributionRankingResponse, error) GetMintingActivities(ctx context.Context, req *GetMintingActivitiesRequest, opts ...client.CallOption) (*GetMintingActivitiesResponse, error) GetLatestContributions(ctx context.Context, req *GetLatestContributionsRequest, opts ...client.CallOption) (*GetLatestContributionsResponse, error) + ListActivityMessages(ctx context.Context, req *ListActivityMessagesRequest, opts ...client.CallOption) (*ListActivityMessagesResponse, error) + CreateActivityMessage(ctx context.Context, req *CreateActivityMessageRequest, opts ...client.CallOption) (*CreateActivityMessageResponse, error) } // NewActivityService constructs a client for the activity.ActivityService service. @@ -165,9 +171,25 @@ func (c *ActivityServiceImpl) GetLatestContributions(ctx context.Context, req *G return resp, nil } +func (c *ActivityServiceImpl) ListActivityMessages(ctx context.Context, req *ListActivityMessagesRequest, opts ...client.CallOption) (*ListActivityMessagesResponse, error) { + resp := new(ListActivityMessagesResponse) + if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "ListActivityMessages", opts...); err != nil { + return nil, err + } + return resp, nil +} + +func (c *ActivityServiceImpl) CreateActivityMessage(ctx context.Context, req *CreateActivityMessageRequest, opts ...client.CallOption) (*CreateActivityMessageResponse, error) { + resp := new(CreateActivityMessageResponse) + if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "CreateActivityMessage", opts...); err != nil { + return nil, err + } + return resp, nil +} + var ActivityService_ClientInfo = client.ClientInfo{ InterfaceName: "topfans.activity.ActivityService", - MethodNames: []string{"GetActivityList", "GetActivity", "GetActivityItems", "GetProgress", "PurchaseItem", "BatchPurchaseItem", "GetContributionRanking", "GetMintingActivities", "GetLatestContributions"}, + MethodNames: []string{"GetActivityList", "GetActivity", "GetActivityItems", "GetProgress", "PurchaseItem", "BatchPurchaseItem", "GetContributionRanking", "GetMintingActivities", "GetLatestContributions", "ListActivityMessages", "CreateActivityMessage"}, ConnectionInjectFunc: func(dubboCliRaw interface{}, conn *client.Connection) { dubboCli := dubboCliRaw.(*ActivityServiceImpl) dubboCli.conn = conn @@ -185,6 +207,8 @@ type ActivityServiceHandler interface { GetContributionRanking(context.Context, *ContributionRankingRequest) (*ContributionRankingResponse, error) GetMintingActivities(context.Context, *GetMintingActivitiesRequest) (*GetMintingActivitiesResponse, error) GetLatestContributions(context.Context, *GetLatestContributionsRequest) (*GetLatestContributionsResponse, error) + ListActivityMessages(context.Context, *ListActivityMessagesRequest) (*ListActivityMessagesResponse, error) + CreateActivityMessage(context.Context, *CreateActivityMessageRequest) (*CreateActivityMessageResponse, error) } func RegisterActivityServiceHandler(srv *server.Server, hdlr ActivityServiceHandler, opts ...server.ServiceOption) error { @@ -334,5 +358,35 @@ var ActivityService_ServiceInfo = server.ServiceInfo{ return triple_protocol.NewResponse(res), nil }, }, + { + Name: "ListActivityMessages", + Type: constant.CallUnary, + ReqInitFunc: func() interface{} { + return new(ListActivityMessagesRequest) + }, + MethodFunc: func(ctx context.Context, args []interface{}, handler interface{}) (interface{}, error) { + req := args[0].(*ListActivityMessagesRequest) + res, err := handler.(ActivityServiceHandler).ListActivityMessages(ctx, req) + if err != nil { + return nil, err + } + return triple_protocol.NewResponse(res), nil + }, + }, + { + Name: "CreateActivityMessage", + Type: constant.CallUnary, + ReqInitFunc: func() interface{} { + return new(CreateActivityMessageRequest) + }, + MethodFunc: func(ctx context.Context, args []interface{}, handler interface{}) (interface{}, error) { + req := args[0].(*CreateActivityMessageRequest) + res, err := handler.(ActivityServiceHandler).CreateActivityMessage(ctx, req) + if err != nil { + return nil, err + } + return triple_protocol.NewResponse(res), nil + }, + }, }, } diff --git a/backend/proto/activity.proto b/backend/proto/activity.proto index 21f89d2..ea3e3c4 100644 --- a/backend/proto/activity.proto +++ b/backend/proto/activity.proto @@ -231,6 +231,43 @@ message GetLatestContributionsResponse { repeated ContributionRecord records = 2; } +// ============== 留言相关消息 ============== + +message ActivityMessage { + int64 id = 1; + int64 activity_id = 2; + int64 user_id = 3; + int64 star_id = 4; + string nickname = 5; + string avatar_url = 6; + string content = 7; + int64 created_at = 8; +} + +message ListActivityMessagesRequest { + int64 activity_id = 1; + int32 page = 2; + int32 page_size = 3; +} +message ListActivityMessagesResponse { + topfans.common.BaseResponse base = 1; + repeated ActivityMessage messages = 2; + int32 page = 3; + int32 page_size = 4; + int32 total = 5; +} + +message CreateActivityMessageRequest { + int64 activity_id = 1; + int64 user_id = 2; + int64 star_id = 3; + string content = 4; +} +message CreateActivityMessageResponse { + topfans.common.BaseResponse base = 1; + ActivityMessage message = 2; +} + // ==================== 活动服务 ==================== service ActivityService { @@ -298,4 +335,19 @@ service ActivityService { get: "/api/v1/activities/{activity_id}/contributions/latest" }; } + + // 列出活动留言 + rpc ListActivityMessages(ListActivityMessagesRequest) returns (ListActivityMessagesResponse) { + option (google.api.http) = { + get: "/api/v1/activities/{activity_id}/messages" + }; + } + + // 发送一条留言 + rpc CreateActivityMessage(CreateActivityMessageRequest) returns (CreateActivityMessageResponse) { + option (google.api.http) = { + post: "/api/v1/activities/{activity_id}/messages" + body: "*" + }; + } } diff --git a/backend/services/activityService/config/config.go b/backend/services/activityService/config/config.go new file mode 100644 index 0000000..634dc76 --- /dev/null +++ b/backend/services/activityService/config/config.go @@ -0,0 +1,61 @@ +package config + +import ( + "os" + "strconv" + "strings" +) + +// 默认敏感词列表(首版本地词表,后续接 dify) +var defaultBannedWords = []string{ + "傻逼", "操你", "草泥马", "fuck", "shit", +} + +// ActivityMessageConfig 活动留言配置 +type ActivityMessageConfig struct { + MessageRateLimitPerMin int64 // 单用户单活动每分钟最多留言数 + MessageLimitPerActivity int64 // 单用户单活动累计留言上限 + BannedWords []string // 敏感词首版本地词表(可通过 env 覆盖) +} + +// LoadMessageConfig 从环境变量加载配置(缺省值参考通知服务) +func LoadMessageConfig() *ActivityMessageConfig { + return &ActivityMessageConfig{ + MessageRateLimitPerMin: getEnvInt64("ACTIVITY_MESSAGE_RATE_LIMIT_PER_MIN", 5), + MessageLimitPerActivity: getEnvInt64("ACTIVITY_MESSAGE_LIMIT_PER_ACTIVITY", 100), + BannedWords: loadBannedWords(), + } +} + +// loadBannedWords 从 ACTIVITY_MESSAGE_BANNED_WORDS(逗号分隔)加载敏感词; +// 未设置或为空时回退到默认词表 +func loadBannedWords() []string { + raw := os.Getenv("ACTIVITY_MESSAGE_BANNED_WORDS") + if strings.TrimSpace(raw) == "" { + return append([]string{}, defaultBannedWords...) + } + parts := strings.Split(raw, ",") + out := make([]string, 0, len(parts)) + for _, p := range parts { + t := strings.TrimSpace(p) + if t != "" { + out = append(out, t) + } + } + if len(out) == 0 { + return append([]string{}, defaultBannedWords...) + } + return out +} + +func getEnvInt64(key string, defaultVal int64) int64 { + v := os.Getenv(key) + if v == "" { + return defaultVal + } + parsed, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return defaultVal + } + return parsed +} diff --git a/backend/services/activityService/main.go b/backend/services/activityService/main.go index 6206c30..12fae6a 100644 --- a/backend/services/activityService/main.go +++ b/backend/services/activityService/main.go @@ -150,6 +150,7 @@ func autoMigrate() error { &models.ActivityItem{}, &models.ActivityContribution{}, &models.ActivityUserStats{}, + &models.ActivityMessage{}, &models.MintingActivity{}, } diff --git a/backend/services/activityService/provider/activity_provider.go b/backend/services/activityService/provider/activity_provider.go index 40e0bae..d725b67 100644 --- a/backend/services/activityService/provider/activity_provider.go +++ b/backend/services/activityService/provider/activity_provider.go @@ -279,3 +279,39 @@ func (p *ActivityProvider) GetLatestContributions(ctx context.Context, req *pb.G return resp, nil } + +// ListActivityMessages 列出活动留言 +func (p *ActivityProvider) ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) { + logger.Logger.Info("Received ListActivityMessages request", zap.Int64("activity_id", req.ActivityId)) + resp, err := p.activityService.ListActivityMessages(ctx, req) + if err != nil { + logger.Logger.Error("ListActivityMessages failed", zap.Error(err)) + return &pb.ListActivityMessagesResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(err)), + Message: err.Error(), + Timestamp: time.Now().UnixMilli(), + }, + }, err + } + logger.Logger.Info("ListActivityMessages successful", zap.Int64("activity_id", req.ActivityId), zap.Int("count", len(resp.Messages))) + return resp, nil +} + +// CreateActivityMessage 发送一条留言 +func (p *ActivityProvider) CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) { + logger.Logger.Info("Received CreateActivityMessage request", zap.Int64("activity_id", req.ActivityId), zap.Int64("user_id", req.UserId)) + resp, err := p.activityService.CreateActivityMessage(ctx, req) + if err != nil { + logger.Logger.Error("CreateActivityMessage failed", zap.Error(err)) + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(err)), + Message: err.Error(), + Timestamp: time.Now().UnixMilli(), + }, + }, err + } + logger.Logger.Info("CreateActivityMessage successful", zap.Int64("message_id", resp.Message.Id)) + return resp, nil +} diff --git a/backend/services/activityService/repository/activity_messages_repository.go b/backend/services/activityService/repository/activity_messages_repository.go new file mode 100644 index 0000000..d62d05c --- /dev/null +++ b/backend/services/activityService/repository/activity_messages_repository.go @@ -0,0 +1,111 @@ +package repository + +import ( + "errors" + "time" + + "github.com/topfans/backend/pkg/database" + "github.com/topfans/backend/pkg/models" + "gorm.io/gorm" +) + +// ActivityMessagesRepository 活动留言仓库接口 +type ActivityMessagesRepository interface { + // Insert 插入一条留言,返回新 ID + Insert(msg *models.ActivityMessage) (int64, error) + + // ListByActivity 列出活动的留言(分页,按 created_at DESC, id DESC) + ListByActivity(activityID int64, page, pageSize int) ([]*models.ActivityMessage, int64, error) + + // CountByUserActivity 统计某用户在某活动的留言数(用于累计上限校验) + CountByUserActivity(activityID, userID int64) (int64, error) + + // UpdateProfile 更新留言的昵称头像(写后异步补字段,避免读时RPC反查) + UpdateProfile(msgID int64, nickname, avatarURL string) error +} + +// activityMessagesRepository 实现 +type activityMessagesRepository struct { + db *gorm.DB +} + +// NewActivityMessagesRepository 创建仓库实例 +func NewActivityMessagesRepository() ActivityMessagesRepository { + return &activityMessagesRepository{ + db: database.GetDB(), + } +} + +// Insert 插入一条留言,返回新 ID +func (r *activityMessagesRepository) Insert(msg *models.ActivityMessage) (int64, error) { + if msg == nil { + return 0, errors.New("message cannot be nil") + } + if err := r.db.Create(msg).Error; err != nil { + return 0, err + } + return msg.ID, nil +} + +// ListByActivity 列出活动的留言 +func (r *activityMessagesRepository) ListByActivity(activityID int64, page, pageSize int) ([]*models.ActivityMessage, int64, error) { + if activityID <= 0 { + return nil, 0, errors.New("activity_id must be greater than 0") + } + if page <= 0 { + page = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + if pageSize > 50 { + pageSize = 50 + } + + query := r.db.Model(&models.ActivityMessage{}). + Where("activity_id = ? AND deleted_at IS NULL AND status = 0", activityID) + + var total int64 + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + + var messages []*models.ActivityMessage + offset := (page - 1) * pageSize + if err := query.Order("created_at ASC, id ASC"). + Offset(offset). + Limit(pageSize). + Find(&messages).Error; err != nil { + return nil, 0, err + } + + return messages, total, nil +} + +// CountByUserActivity 统计某用户在某活动的留言数 +func (r *activityMessagesRepository) CountByUserActivity(activityID, userID int64) (int64, error) { + if activityID <= 0 || userID <= 0 { + return 0, errors.New("activity_id and user_id must be greater than 0") + } + var count int64 + if err := r.db.Model(&models.ActivityMessage{}). + Where("activity_id = ? AND user_id = ? AND deleted_at IS NULL", activityID, userID). + Count(&count).Error; err != nil { + return 0, err + } + return count, nil +} + +// UpdateProfile 更新留言的昵称头像 +func (r *activityMessagesRepository) UpdateProfile(msgID int64, nickname, avatarURL string) error { + if msgID <= 0 { + return errors.New("msg_id must be greater than 0") + } + return r.db.Model(&models.ActivityMessage{}). + Where("id = ?", msgID). + Updates(map[string]interface{}{ + "nickname": nickname, + "avatar_url": avatarURL, + "updated_at": time.Now().UnixMilli(), + }).Error +} diff --git a/backend/services/activityService/service/activity_service.go b/backend/services/activityService/service/activity_service.go index ae416b4..43dd10e 100644 --- a/backend/services/activityService/service/activity_service.go +++ b/backend/services/activityService/service/activity_service.go @@ -2,8 +2,11 @@ package service import ( "context" + "encoding/json" "fmt" + "strings" "time" + "unicode/utf8" appErrors "github.com/topfans/backend/pkg/errors" "github.com/topfans/backend/pkg/logger" @@ -11,12 +14,25 @@ import ( pb "github.com/topfans/backend/pkg/proto/activity" pbCommon "github.com/topfans/backend/pkg/proto/common" "github.com/topfans/backend/services/activityService/client" + "github.com/topfans/backend/services/activityService/config" "github.com/topfans/backend/services/activityService/repository" "github.com/redis/go-redis/v9" "go.uber.org/zap" "google.golang.org/grpc/codes" ) +// rateLimitScript 原子 INCR + 首次 EXPIRE 的 Lua 脚本 +// 消除 INCR 与 EXPIRE 之间的非原子竞态 +var rateLimitScript = func() *redis.Script { + return redis.NewScript(` +local count = redis.call('INCR', KEYS[1]) +if count == 1 then + redis.call('EXPIRE', KEYS[1], ARGV[1]) +end +return count +`) +}() + // ActivityService 活动Service接口 type ActivityService interface { // GetActivityList 获取活动列表 @@ -45,14 +61,22 @@ type ActivityService interface { // GetLatestContributions 获取最新贡献记录(用于实时显示) GetLatestContributions(ctx context.Context, req *pb.GetLatestContributionsRequest) (*pb.GetLatestContributionsResponse, error) + + // CreateActivityMessage 创建一条活动留言 + CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) + + // ListActivityMessages 列出活动留言 + ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) } // activityService 活动Service实现 type activityService struct { activityRepo repository.ActivityRepository mintingActivityRepo repository.MintingActivityRepository + messagesRepo repository.ActivityMessagesRepository userRPCClient client.UserRPCClient redisClient *redis.Client + messageCfg *config.ActivityMessageConfig } // NewActivityService 创建活动Service实例 @@ -60,8 +84,10 @@ func NewActivityService(activityRepo repository.ActivityRepository, mintingActiv return &activityService{ activityRepo: activityRepo, mintingActivityRepo: mintingActivityRepo, + messagesRepo: repository.NewActivityMessagesRepository(), userRPCClient: userRPCClient, redisClient: redisClient, + messageCfg: config.LoadMessageConfig(), } } @@ -410,6 +436,41 @@ func (s *activityService) PurchaseItem(ctx context.Context, req *pb.PurchaseItem // 更新 Redis 连击计数器(3秒TTL) s.incrementComboCount(ctx, userID, req.ItemType) + comboCount := s.getComboCount(ctx, userID, req.ItemType) + + // Redis Publish contributions_response(实时推送给订阅者) + if s.redisClient != nil { + nickname, avatarURL := "", "" + if profile, _ := s.userRPCClient.GetFanProfile(userID, req.StarId); profile != nil { + nickname = profile.Nickname + avatarURL = profile.AvatarUrl + } + itemName, itemIcon := "", "" + if item != nil { + itemName = item.ItemName + itemIcon = item.IconURL + } + contribMsg := &pb.ContributionRecord{ + Id: contribution.ID, + UserId: userID, + Nickname: nickname, + AvatarUrl: avatarURL, + StarId: req.StarId, + ItemId: item.ID, + ItemType: req.ItemType, + ItemName: itemName, + ItemIcon: itemIcon, + Quantity: int32(req.Quantity), + ComboCount: int32(comboCount), + CreatedAt: contribution.CreatedAt, + } + payload, _ := json.Marshal(map[string]interface{}{ + "activity_id": req.ActivityId, + "type": "contributions_response", + "record": contribMsg, + }) + s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload) + } // 更新用户统计 stats, _ := s.activityRepo.GetUserStats(req.ActivityId, userID, req.StarId) @@ -663,6 +724,36 @@ func (s *activityService) BatchPurchaseItem(ctx context.Context, req *pb.BatchPu // 更新 Redis 连击计数器(3秒TTL) s.incrementComboCount(ctx, userID, item.ItemType) + comboCount := s.getComboCount(ctx, userID, item.ItemType) + + // Redis Publish contributions_response(实时推送给订阅者) + if s.redisClient != nil { + nickname, avatarURL := "", "" + if profile, _ := s.userRPCClient.GetFanProfile(userID, req.StarId); profile != nil { + nickname = profile.Nickname + avatarURL = profile.AvatarUrl + } + contribMsg := &pb.ContributionRecord{ + Id: contribution.ID, + UserId: userID, + Nickname: nickname, + AvatarUrl: avatarURL, + StarId: req.StarId, + ItemId: activityItem.ID, + ItemType: item.ItemType, + ItemName: activityItem.ItemName, + ItemIcon: activityItem.IconURL, + Quantity: int32(item.Quantity), + ComboCount: int32(comboCount), + CreatedAt: contribution.CreatedAt, + } + payload, _ := json.Marshal(map[string]interface{}{ + "activity_id": req.ActivityId, + "type": "contributions_response", + "record": contribMsg, + }) + s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload) + } } // 更新用户统计 @@ -1011,3 +1102,279 @@ func (s *activityService) GetLatestContributions(ctx context.Context, req *pb.Ge Records: records, }, nil } + +// CreateActivityMessage 创建一条活动留言(含频控/累计上限/敏感词校验 + Redis Publish) +func (s *activityService) CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) { + logger.Logger.Info("CreateActivityMessage request", + zap.Int64("activity_id", req.ActivityId), + zap.Int64("user_id", req.UserId), + zap.Int64("star_id", req.StarId), + ) + + // 1. 入参校验 + // 业务校验错误统一返回 (resp, nil): 把 gRPC code 放进 BaseResponse, + // 让 controller 通过 ErrorWithCode 走 HTTP 200 + body.code 路径; + // 仅基础设施错误(DB/Redis/RPC)用 (nil, err) 让 controller 走 HTTP 500。 + content := strings.TrimSpace(req.Content) + if content == "" { + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentEmpty)), + Message: appErrors.ErrActivityMessageContentEmpty.Error(), + }, + }, nil + } + if utf8.RuneCountInString(content) > 500 { + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentTooLong)), + Message: appErrors.ErrActivityMessageContentTooLong.Error(), + }, + }, nil + } + + // 2. 活动存在性 + 状态 + activity, err := s.activityRepo.GetActivityByID(req.ActivityId) + if err != nil { + logger.Logger.Error("GetActivityByID failed", zap.Error(err)) + return nil, err + } + if activity == nil { + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityNotFound)), + Message: appErrors.ErrActivityNotFound.Error(), + }, + }, nil + } + // 使用 GetCurrentStatus() 计算动态状态(基于 StartTime/EndTime/CurrentProgress), + // 而不是 activity.Status(DB 列,创建后默认值 "pending" 不会被自动更新为 "active") + if activity.GetCurrentStatus() != "active" { + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageActivityInactive)), + Message: appErrors.ErrActivityMessageActivityInactive.Error(), + }, + }, nil + } + + // 3. 频控(Redis Lua 原子化:INCR + 首次设置 EXPIRE 60s) + if s.redisClient != nil { + rateKey := fmt.Sprintf("msg:rate:%d:%d", req.ActivityId, req.UserId) + var count int64 + if rateLimitScript != nil { + raw, err := rateLimitScript.Run(ctx, s.redisClient, []string{rateKey}, 60).Result() + if err == nil { + if v, ok := raw.(int64); ok { + count = v + } + } + } else { + // 降级:脚本未初始化时退回 INCR + EXPIRE(非原子,但能跑) + count, err = s.redisClient.Incr(ctx, rateKey).Result() + if err == nil && count == 1 { + s.redisClient.Expire(ctx, rateKey, 60*time.Second) + } + } + if count > s.messageCfg.MessageRateLimitPerMin { + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageTooFrequent)), + Message: appErrors.ErrActivityMessageTooFrequent.Error(), + }, + }, nil + } + } + + // 4. 累计上限 + total, err := s.messagesRepo.CountByUserActivity(req.ActivityId, req.UserId) + if err != nil { + logger.Logger.Error("CountByUserActivity failed", zap.Error(err)) + return nil, err + } + if total >= s.messageCfg.MessageLimitPerActivity { + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageLimitReached)), + Message: appErrors.ErrActivityMessageLimitReached.Error(), + }, + }, nil + } + + // 5. 敏感词校验(首版本地词表) + if containsBannedWord(content, s.messageCfg.BannedWords) { + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentInvalid)), + Message: appErrors.ErrActivityMessageContentInvalid.Error(), + }, + }, nil + } + + // 6. 写入 + now := time.Now().UnixMilli() + msg := &models.ActivityMessage{ + ActivityID: req.ActivityId, + UserID: req.UserId, + StarID: req.StarId, + Nickname: "", + AvatarURL: "", + Content: content, + Status: 0, + CreatedAt: now, + UpdatedAt: now, + } + msgID, err := s.messagesRepo.Insert(msg) + if err != nil { + logger.Logger.Error("Insert message failed", zap.Error(err)) + return nil, err + } + + // 7. 回查昵称头像 + nickname, avatarURL := "", "" + if profile, _ := s.userRPCClient.GetFanProfile(req.UserId, req.StarId); profile != nil { + nickname = profile.Nickname + avatarURL = profile.AvatarUrl + } + + // 写回 nickname/avatar(如果首次 RPC 拿到) + if (nickname != "" || avatarURL != "") && msgID > 0 { + if err := s.messagesRepo.UpdateProfile(msgID, nickname, avatarURL); err != nil { + logger.Logger.Warn("UpdateProfile failed", zap.Error(err), zap.Int64("msg_id", msgID)) + } + } + + pbMsg := &pb.ActivityMessage{ + Id: msgID, + ActivityId: req.ActivityId, + UserId: req.UserId, + StarId: req.StarId, + Nickname: nickname, + AvatarUrl: avatarURL, + Content: content, + CreatedAt: now, + } + + // 8. Redis Publish(不论是否有人订阅都发) + if s.redisClient != nil { + channel := fmt.Sprintf("act:%d:messages", req.ActivityId) + payload, _ := json.Marshal(map[string]interface{}{ + "activity_id": req.ActivityId, + "type": "messages_response", + "message": pbMsg, + }) + s.redisClient.Publish(ctx, channel, payload) + } + + logger.Logger.Info("CreateActivityMessage success", + zap.Int64("msg_id", msgID), + zap.Int64("user_id", req.UserId), + ) + + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(codes.OK), + Message: "ok", + }, + Message: pbMsg, + }, nil +} + +// ListActivityMessages 列出活动留言 +func (s *activityService) ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) { + logger.Logger.Info("ListActivityMessages request", + zap.Int64("activity_id", req.ActivityId), + zap.Int32("page", req.Page), + zap.Int32("page_size", req.PageSize), + ) + + page := int(req.Page) + pageSize := int(req.PageSize) + if page <= 0 { + page = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + if pageSize > 50 { + pageSize = 50 + } + + // 校验活动存在 + activity, err := s.activityRepo.GetActivityByID(req.ActivityId) + if err != nil { + logger.Logger.Error("GetActivityByID failed", zap.Error(err)) + return nil, err + } + if activity == nil { + return &pb.ListActivityMessagesResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityNotFound)), + Message: appErrors.ErrActivityNotFound.Error(), + }, + }, appErrors.ErrActivityNotFound + } + + rows, total, err := s.messagesRepo.ListByActivity(req.ActivityId, page, pageSize) + if err != nil { + logger.Logger.Error("ListByActivity failed", zap.Error(err)) + return nil, err + } + + messages := make([]*pb.ActivityMessage, 0, len(rows)) + for _, m := range rows { + nickname := m.Nickname + avatarURL := m.AvatarURL + // 若 DB 中 nickname 为空(早期数据),回查 user profile + if nickname == "" && avatarURL == "" { + nickname, avatarURL = s.fetchUserProfile(m.UserID, m.StarID) + } + messages = append(messages, &pb.ActivityMessage{ + Id: m.ID, + ActivityId: m.ActivityID, + UserId: m.UserID, + StarId: m.StarID, + Nickname: nickname, + AvatarUrl: avatarURL, + Content: m.Content, + CreatedAt: m.CreatedAt, + }) + } + + logger.Logger.Debug("ListActivityMessages success", + zap.Int64("activity_id", req.ActivityId), + zap.Int("count", len(messages)), + zap.Int64("total", total), + ) + + return &pb.ListActivityMessagesResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(codes.OK), + Message: "ok", + }, + Messages: messages, + Page: int32(page), + PageSize: int32(pageSize), + Total: int32(total), + }, nil +} + +// fetchUserProfile 拉取用户昵称头像(失败时返回空串) +func (s *activityService) fetchUserProfile(userID, starID int64) (string, string) { + profile, err := s.userRPCClient.GetFanProfile(userID, starID) + if err != nil || profile == nil { + return "", "" + } + return profile.Nickname, profile.AvatarUrl +} + +// containsBannedWord 检查是否含敏感词 +func containsBannedWord(content string, banned []string) bool { + lower := strings.ToLower(content) + for _, w := range banned { + if strings.Contains(lower, strings.ToLower(w)) { + return true + } + } + return false +} diff --git a/docs/superpowers/plans/2026-06-22-activity-realtime-websocket.md b/docs/superpowers/plans/2026-06-22-activity-realtime-websocket.md new file mode 100644 index 0000000..822a7d9 --- /dev/null +++ b/docs/superpowers/plans/2026-06-22-activity-realtime-websocket.md @@ -0,0 +1,2125 @@ +# 活动实时推送(留言 + 贡献)WebSocket 实施计划 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 单一活动页(`pages/support-activity/index.vue`)的留言板与贡献列表统一走 WebSocket 推送;HTTP 仅保留留言历史/发送 + 增量贡献(断线降级)。 + +**Architecture:** 后端 `activityService` 在写 `activity_messages`/`activity_contributions` 后 Redis Publish;Gateway 新增 `ActivityHub` PSubscribe 全部活动频道并按 (activity_id, topic) fanout 到本地连接。前端新增 `ActivitySocket` 单一连接 + topic 订阅,留言发送走 HTTP(成功后由 WS 推回)。 + +**Tech Stack:** Go (Gin + Dubbo-go + GORM + gorilla/websocket + go-redis), Vue 3 + uni-app + uniapp WebSocket API, PostgreSQL, Redis Pub/Sub + +--- + +## File Structure + +### Backend (新增) + +``` +backend/ +├── migrations/ +│ └── 2026_06_22_012_activity_messages.sql [新增] +├── services/activityService/ +│ ├── repository/activity_messages_repository.go [新增] +│ ├── config/config.go [新增] +│ └── service/activity_service.go [修改 - 追加 CreateActivityMessage/ListActivityMessages + Redis Publish] +├── proto/activity.proto [修改 - 追加 3 个 message + 2 个 RPC] +├── pkg/ +│ ├── models/activity_message.go [新增] +│ └── errors/errors.go [修改 - 追加 7 个错误变量 + ToGRPCCode 映射] +└── gateway/ + ├── socket/activity_socket.go [新增] + ├── router/router.go [修改 - 注册 /activity WS 路由 + ListActivityMessages/CreateActivityMessage HTTP 路由] + ├── controller/activity_controller.go [修改 - 追加 ListActivityMessages/CreateActivityMessage handler + 转换函数] + ├── main.go [修改 - 创建 ActivityHub 并注入] + ├── config/config.go [修改 - 加 WebSocket.ActivityPath] + └── pkg/response/response.go [修改 - 加 6 条中文错误映射] +``` + +### Frontend (新增/修改) + +``` +frontend/ +├── utils/ +│ ├── api.js [修改 - 加 listActivityMessagesApi/createActivityMessageApi] +│ └── socket/ +│ ├── ActivitySocket.js [新增] +│ ├── GlobalSocketManager.js [修改 - 加 _initActivity()] +│ └── index.js [修改 - 导出 ActivitySocket] +├── pages/support-activity/ +│ ├── composables/ +│ │ ├── useContributionPolling.js [修改 - 暴露 highestIdRef] +│ │ ├── useContributionRealtime.js [新增] +│ │ └── useMessageRealtime.js [新增] +│ ├── components/MessageBoard.vue [无修改 - props 契约不变] +│ └── index.vue [修改 - 用 useMessageRealtime 替换 mock + handleSendMessage 调 API] +``` + +--- + +## Task 1: 创建数据库迁移脚本 + +**Files:** +- Create: `backend/migrations/2026_06_22_012_activity_messages.sql` + +- [ ] **Step 1: 创建迁移文件** + +```sql +-- 2026_06_22_012_activity_messages.sql +BEGIN; + +CREATE TABLE IF NOT EXISTS public.activity_messages ( + id BIGSERIAL PRIMARY KEY, + activity_id BIGINT NOT NULL, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + content VARCHAR(500) NOT NULL, + status SMALLINT NOT NULL DEFAULT 0, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + deleted_at BIGINT, + CONSTRAINT fk_messages_activity + FOREIGN KEY (activity_id) REFERENCES public.activities(id) ON DELETE CASCADE, + CONSTRAINT fk_messages_user + FOREIGN KEY (user_id) REFERENCES public.users(id), + CONSTRAINT fk_messages_star + FOREIGN KEY (star_id) REFERENCES public.stars(star_id) +); + +CREATE SEQUENCE IF NOT EXISTS activity_messages_id_seq START WITH 10000; + +CREATE INDEX IF NOT EXISTS idx_activity_messages_activity_created + ON public.activity_messages (activity_id, created_at DESC, id DESC) + WHERE deleted_at IS NULL; + +CREATE INDEX IF NOT EXISTS idx_activity_messages_user_created + ON public.activity_messages (user_id, created_at DESC) + WHERE deleted_at IS NULL; + +CREATE INDEX IF NOT EXISTS idx_activity_messages_activity_incr + ON public.activity_messages (activity_id, id DESC) + WHERE deleted_at IS NULL; + +COMMENT ON TABLE public.activity_messages IS '活动留言表'; +COMMENT ON COLUMN public.activity_messages.id IS '主键,自增'; +COMMENT ON COLUMN public.activity_messages.activity_id IS 'FK -> activities.id'; +COMMENT ON COLUMN public.activity_messages.user_id IS '留言用户 ID'; +COMMENT ON COLUMN public.activity_messages.star_id IS '所属明星/星球 ID'; +COMMENT ON COLUMN public.activity_messages.content IS '留言正文,1-500 字'; +COMMENT ON COLUMN public.activity_messages.status IS '0=正常|1=隐藏|2=已删除'; +COMMENT ON COLUMN public.activity_messages.created_at IS '留言时间,毫秒时间戳'; +COMMENT ON COLUMN public.activity_messages.updated_at IS '更新时间,毫秒时间戳'; +COMMENT ON COLUMN public.activity_messages.deleted_at IS '软删除时间'; + +COMMIT; +``` + +- [ ] **Step 2: 暂存迁移文件** + +```bash +git add backend/migrations/2026_06_22_012_activity_messages.sql +``` + +> 注:用户需自行决定 commit 时机,本计划不在 AI 自主 commit。 + +--- + +## Task 2: Proto 文件追加 messages 相关定义 + +**Files:** +- Modify: `backend/proto/activity.proto` (在末尾追加) + +- [ ] **Step 1: 在 proto 文件末尾追加 3 个 message** + +在 `backend/proto/activity.proto` 文件末尾(`service ActivityService` 块之前)追加: + +```protobuf +// ============== 留言相关消息 ============== + +message ActivityMessage { + int64 id = 1; + int64 activity_id = 2; + int64 user_id = 3; + int64 star_id = 4; + string nickname = 5; + string avatar_url = 6; + string content = 7; + int64 created_at = 8; +} + +message ListActivityMessagesRequest { + int64 activity_id = 1; + int32 page = 2; + int32 page_size = 3; +} +message ListActivityMessagesResponse { + topfans.common.BaseResponse base = 1; + repeated ActivityMessage messages = 2; + int32 page = 3; + int32 page_size = 4; + int32 total = 5; +} + +message CreateActivityMessageRequest { + int64 activity_id = 1; + int64 user_id = 2; + int64 star_id = 3; + string content = 4; +} +message CreateActivityMessageResponse { + topfans.common.BaseResponse base = 1; + ActivityMessage message = 2; +} +``` + +- [ ] **Step 2: 在 service ActivityService 块末尾追加 2 个 RPC** + +在 `service ActivityService { ... }` 块的最后(`GetLatestContributions` 之后、`}` 之前)追加: + +```protobuf + // 列出活动留言 + rpc ListActivityMessages(ListActivityMessagesRequest) returns (ListActivityMessagesResponse) { + option (google.api.http) = { + get: "/api/v1/activities/{activity_id}/messages" + }; + } + + // 发送一条留言 + rpc CreateActivityMessage(CreateActivityMessageRequest) returns (CreateActivityMessageResponse) { + option (google.api.http) = { + post: "/api/v1/activities/{activity_id}/messages" + body: "*" + }; + } +``` + +- [ ] **Step 3: 运行 proto 编译** + +```bash +cd backend && make proto +``` + +Expected: 生成/更新 `backend/pkg/proto/activity/activity.pb.go`,包含新的 RPC 接口。 + +- [ ] **Step 4: 暂存修改** + +```bash +git add backend/proto/activity.proto backend/pkg/proto/activity/ +``` + +--- + +## Task 3: 错误码扩展 + +**Files:** +- Modify: `backend/pkg/errors/errors.go` + +- [ ] **Step 1: 在 "活动服务相关错误" 块下方追加 7 个错误变量** + +在现有 `ErrActivityNotFound`/`ErrActivityItemNotFound` 之后追加: + +```go + // 活动留言相关错误 + ErrActivityMessageNotFound = errors.New("活动留言不存在") + ErrActivityMessageTooFrequent = errors.New("留言太频繁,请稍后再试") + ErrActivityMessageLimitReached = errors.New("当前活动留言已达上限") + ErrActivityMessageContentEmpty = errors.New("留言内容不能为空") + ErrActivityMessageContentTooLong = errors.New("留言内容过长,最多500字") + ErrActivityMessageContentInvalid = errors.New("留言内容包含不当内容") + ErrActivityMessageActivityInactive = errors.New("活动不在进行中") +``` + +- [ ] **Step 2: 在 ToGRPCCode 函数中追加映射** + +在 `ToGRPCCode` 的 switch 末尾追加: + +```go + case errors.Is(err, ErrActivityMessageNotFound): + return codes.NotFound + case errors.Is(err, ErrActivityMessageTooFrequent): + return codes.ResourceExhausted + case errors.Is(err, ErrActivityMessageLimitReached): + return codes.ResourceExhausted + case errors.Is(err, ErrActivityMessageContentEmpty): + return codes.InvalidArgument + case errors.Is(err, ErrActivityMessageContentTooLong): + return codes.InvalidArgument + case errors.Is(err, ErrActivityMessageContentInvalid): + return codes.PermissionDenied + case errors.Is(err, ErrActivityMessageActivityInactive): + return codes.PermissionDenied +``` + +- [ ] **Step 3: 编译验证** + +```bash +cd backend/pkg && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 4: Response 中文错误映射 + +**Files:** +- Modify: `backend/gateway/pkg/response/response.go` + +- [ ] **Step 1: 在 errorMap 中追加留言相关中文映射** + +在 `errorMap` map 字面量末尾追加(保持与 errors.go 一致): + +```go + "活动留言不存在": "活动留言不存在", + "留言太频繁": "留言太频繁,请稍后再试", + "当前活动留言已达上限": "当前活动留言已达上限", + "留言内容不能为空": "留言内容不能为空", + "留言内容过长": "留言内容过长,最多500字", + "留言内容包含不当内容": "留言内容包含不当内容,请修改", + "活动不在进行中": "活动未开始或已结束", +``` + +- [ ] **Step 2: 编译验证** + +```bash +cd backend/gateway && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 5: ActivityMessage Model + +**Files:** +- Create: `backend/pkg/models/activity_message.go` + +- [ ] **Step 1: 创建 Model** + +```go +package models + +import "time" + +// ActivityMessage 活动留言 +type ActivityMessage struct { + ID int64 `gorm:"primaryKey;column:id" json:"id"` + ActivityID int64 `gorm:"column:activity_id;not null;index" json:"activity_id"` + UserID int64 `gorm:"column:user_id;not null;index" json:"user_id"` + StarID int64 `gorm:"column:star_id;not null" json:"star_id"` + Content string `gorm:"column:content;type:varchar(500);not null" json:"content"` + Status int16 `gorm:"column:status;not null;default:0" json:"status"` + CreatedAt int64 `gorm:"column:created_at;not null" json:"created_at"` + UpdatedAt int64 `gorm:"column:updated_at;not null" json:"updated_at"` + DeletedAt *time.Time `gorm:"column:deleted_at" json:"deleted_at,omitempty"` +} + +// TableName 表名 +func (ActivityMessage) TableName() string { + return "activity_messages" +} +``` + +- [ ] **Step 2: 编译验证** + +```bash +cd backend/pkg && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 6: Repository 层 + +**Files:** +- Create: `backend/services/activityService/repository/activity_messages_repository.go` + +- [ ] **Step 1: 创建 Repository** + +```go +package repository + +import ( + "errors" + + "github.com/topfans/backend/pkg/database" + "github.com/topfans/backend/pkg/models" + "gorm.io/gorm" +) + +// ActivityMessagesRepository 活动留言仓库接口 +type ActivityMessagesRepository interface { + // Insert 插入一条留言 + Insert(msg *models.ActivityMessage) (int64, error) + + // ListByActivity 列出活动的留言(分页,按 created_at DESC, id DESC) + ListByActivity(activityID int64, page, pageSize int) ([]*models.ActivityMessage, int64, error) + + // CountByUserActivity 统计某用户在某活动的留言数(用于累计上限校验) + CountByUserActivity(ctx interface{}, activityID, userID int64) (int64, error) +} + +// activityMessagesRepository 实现 +type activityMessagesRepository struct { + db *gorm.DB +} + +// NewActivityMessagesRepository 创建仓库实例 +func NewActivityMessagesRepository() ActivityMessagesRepository { + return &activityMessagesRepository{ + db: database.GetDB(), + } +} + +// Insert 插入一条留言,返回新 ID +func (r *activityMessagesRepository) Insert(msg *models.ActivityMessage) (int64, error) { + if msg == nil { + return 0, errors.New("message cannot be nil") + } + if err := r.db.Create(msg).Error; err != nil { + return 0, err + } + return msg.ID, nil +} + +// ListByActivity 列出活动的留言 +func (r *activityMessagesRepository) ListByActivity(activityID int64, page, pageSize int) ([]*models.ActivityMessage, int64, error) { + if activityID <= 0 { + return nil, 0, errors.New("activity_id must be greater than 0") + } + if page <= 0 { + page = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + if pageSize > 50 { + pageSize = 50 + } + + query := r.db.Model(&models.ActivityMessage{}). + Where("activity_id = ? AND deleted_at IS NULL", activityID) + + var total int64 + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + + var messages []*models.ActivityMessage + offset := (page - 1) * pageSize + if err := query.Order("created_at DESC, id DESC"). + Offset(offset). + Limit(pageSize). + Find(&messages).Error; err != nil { + return nil, 0, err + } + + return messages, total, nil +} + +// CountByUserActivity 统计某用户在某活动的留言数 +func (r *activityMessagesRepository) CountByUserActivity(_ interface{}, activityID, userID int64) (int64, error) { + if activityID <= 0 || userID <= 0 { + return 0, errors.New("activity_id and user_id must be greater than 0") + } + var count int64 + if err := r.db.Model(&models.ActivityMessage{}). + Where("activity_id = ? AND user_id = ? AND deleted_at IS NULL", activityID, userID). + Count(&count).Error; err != nil { + return 0, err + } + return count, nil +} +``` + +> 注意:`CountByUserActivity` 第一参数 `ctx interface{}` 占位(实际不需要 ctx,保留扩展点)。调用方传 `nil`。 + +- [ ] **Step 2: 编译验证** + +```bash +cd backend/services/activityService && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 7: activityService config 文件 + +**Files:** +- Create: `backend/services/activityService/config/config.go` + +- [ ] **Step 1: 创建 config 包** + +```go +package config + +import ( + "os" + "strconv" +) + +// ActivityMessageConfig 活动留言配置 +type ActivityMessageConfig struct { + MessageRateLimitPerMin int64 // 单用户单活动每分钟最多留言数 + MessageLimitPerActivity int64 // 单用户单活动累计留言上限 + BannedWords []string // 敏感词首版本地词表 +} + +// LoadMessageConfig 从环境变量加载配置(缺省值参考通知服务) +func LoadMessageConfig() *ActivityMessageConfig { + cfg := &ActivityMessageConfig{ + MessageRateLimitPerMin: getEnvInt64("ACTIVITY_MESSAGE_RATE_LIMIT_PER_MIN", 5), + MessageLimitPerActivity: getEnvInt64("ACTIVITY_MESSAGE_LIMIT_PER_ACTIVITY", 100), + BannedWords: []string{ + "傻逼", "操你", "草泥马", "fuck", "shit", + }, + } + return cfg +} + +func getEnvInt64(key string, defaultVal int64) int64 { + v := os.Getenv(key) + if v == "" { + return defaultVal + } + parsed, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return defaultVal + } + return parsed +} +``` + +- [ ] **Step 2: 编译验证** + +```bash +cd backend/services/activityService && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 8: Service 层添加 CreateActivityMessage 和 ListActivityMessages 业务方法 + +**Files:** +- Modify: `backend/services/activityService/service/activity_service.go` + +- [ ] **Step 1: 在 ActivityService 接口追加 2 个方法** + +在 `ActivityService` interface 末尾追加: + +```go + // CreateActivityMessage 创建一条活动留言 + CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) + + // ListActivityMessages 列出活动留言 + ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) +``` + +- [ ] **Step 2: 在 activityService struct 追加 messagesRepo 字段和 cfg 字段** + +修改 struct 定义: + +```go +type activityService struct { + activityRepo repository.ActivityRepository + mintingActivityRepo repository.MintingActivityRepository + messagesRepo repository.ActivityMessagesRepository + userRPCClient *client.UserRPCClient + redisClient *redis.Client + messageCfg *config.ActivityMessageConfig +} +``` + +- [ ] **Step 3: 修改 NewActivityService 构造函数注入新依赖** + +在现有 `NewActivityService` 函数末尾追加: + +```go + messagesRepo: repository.NewActivityMessagesRepository(), + messageCfg: config.LoadMessageConfig(), +``` + +(保留原有 `activityRepo`/`mintingActivityRepo`/`userRPCClient`/`redisClient` 不变) + +- [ ] **Step 4: 在 service 文件末尾追加 CreateActivityMessage 实现** + +```go +// CreateActivityMessage 创建一条活动留言(含频控/累计上限/敏感词校验 + Redis Publish) +func (s *activityService) CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) { + // 1. 入参校验 + content := strings.TrimSpace(req.Content) + if content == "" { + return nil, appErrors.ErrActivityMessageContentEmpty + } + if utf8.RuneCountInString(content) > 500 { + return nil, appErrors.ErrActivityMessageContentTooLong + } + + // 2. 活动存在性 + 状态 + activity, err := s.activityRepo.GetActivityByID(req.ActivityId) + if err != nil { + return nil, err + } + if activity == nil { + return nil, appErrors.ErrActivityNotFound + } + if activity.Status != "active" { + return nil, appErrors.ErrActivityMessageActivityInactive + } + + // 3. 频控(Redis INCR + EXPIRE 60s) + rateKey := fmt.Sprintf("msg:rate:%d:%d", req.ActivityId, req.UserId) + count, err := s.redisClient.Incr(ctx, rateKey).Result() + if err == nil && count == 1 { + s.redisClient.Expire(ctx, rateKey, 60*time.Second) + } + if err == nil && count > s.messageCfg.MessageRateLimitPerMin { + return nil, appErrors.ErrActivityMessageTooFrequent + } + + // 4. 累计上限 + total, err := s.messagesRepo.CountByUserActivity(ctx, req.ActivityId, req.UserId) + if err != nil { + return nil, err + } + if total >= s.messageCfg.MessageLimitPerActivity { + return nil, appErrors.ErrActivityMessageLimitReached + } + + // 5. 敏感词校验(首版本地词表) + if containsBannedWord(content, s.messageCfg.BannedWords) { + return nil, appErrors.ErrActivityMessageContentInvalid + } + + // 6. 写入 + now := time.Now().UnixMilli() + msg := &models.ActivityMessage{ + ActivityID: req.ActivityId, + UserID: req.UserId, + StarID: req.StarId, + Content: content, + Status: 0, + CreatedAt: now, + UpdatedAt: now, + } + msgID, err := s.messagesRepo.Insert(msg) + if err != nil { + return nil, err + } + + // 7. 回查昵称头像 + profile, _ := s.userRPCClient.GetFanProfile(ctx, req.UserId, req.StarId) + nickname := "" + avatarURL := "" + if profile != nil { + nickname = profile.Nickname + avatarURL = profile.AvatarUrl + } + + pbMsg := &pb.ActivityMessage{ + Id: msgID, + ActivityId: req.ActivityId, + UserId: req.UserId, + StarId: req.StarId, + Nickname: nickname, + AvatarUrl: avatarURL, + Content: content, + CreatedAt: now, + } + + // 8. Redis Publish(不论是否有人订阅都发) + channel := fmt.Sprintf("act:%d:messages", req.ActivityId) + payload, _ := json.Marshal(map[string]interface{}{ + "activity_id": req.ActivityId, + "type": "messages_response", + "message": pbMsg, + }) + s.redisClient.Publish(ctx, channel, payload) + + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{Code: uint32(codes.OK), Message: "ok"}, + Message: pbMsg, + }, nil +} + +// ListActivityMessages 列出活动留言 +func (s *activityService) ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) { + page := int(req.Page) + pageSize := int(req.PageSize) + if page <= 0 { + page = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + if pageSize > 50 { + pageSize = 50 + } + + // 校验活动存在 + activity, err := s.activityRepo.GetActivityByID(req.ActivityId) + if err != nil { + return nil, err + } + if activity == nil { + return nil, appErrors.ErrActivityNotFound + } + + rows, total, err := s.messagesRepo.ListByActivity(req.ActivityId, page, pageSize) + if err != nil { + return nil, err + } + + // 回查昵称头像 + uidSet := make(map[int64]struct{}) + for _, m := range rows { + uidSet[m.UserID] = struct{}{} + } + + messages := make([]*pb.ActivityMessage, 0, len(rows)) + for _, m := range rows { + nickname, avatarURL := s.fetchUserProfile(ctx, m.UserID, m.StarID) + messages = append(messages, &pb.ActivityMessage{ + Id: m.ID, + ActivityId: m.ActivityID, + UserId: m.UserID, + StarId: m.StarID, + Nickname: nickname, + AvatarUrl: avatarURL, + Content: m.Content, + CreatedAt: m.CreatedAt, + }) + } + + return &pb.ListActivityMessagesResponse{ + Base: &pbCommon.BaseResponse{Code: uint32(codes.OK), Message: "ok"}, + Messages: messages, + Page: int32(page), + PageSize: int32(pageSize), + Total: int32(total), + }, nil +} + +// fetchUserProfile 拉取用户昵称头像(失败时返回空串,不影响列表返回) +func (s *activityService) fetchUserProfile(ctx context.Context, userID, starID int64) (string, string) { + profile, err := s.userRPCClient.GetFanProfile(ctx, userID, starID) + if err != nil || profile == nil { + return "", "" + } + return profile.Nickname, profile.AvatarUrl +} + +// containsBannedWord 检查是否含敏感词 +func containsBannedWord(content string, banned []string) bool { + lower := strings.ToLower(content) + for _, w := range banned { + if strings.Contains(lower, strings.ToLower(w)) { + return true + } + } + return false +} +``` + +- [ ] **Step 5: 在文件顶部追加必要 import** + +检查文件顶部 import 块,确保包含: + +```go + "encoding/json" + "fmt" + "strings" + "time" + "unicode/utf8" +``` + +- [ ] **Step 6: 编译验证** + +```bash +cd backend/services/activityService && go build ./... +``` + +Expected: 编译通过。 + +> 注意:`profile.Nickname` / `profile.AvatarUrl` 字段名以 `UserRPCClient.GetFanProfile` 实际返回类型为准;若不同需要调整。 + +--- + +## Task 9: 在 PurchaseItem/BatchPurchaseItem 末尾追加 Redis Publish contributions + +**Files:** +- Modify: `backend/services/activityService/service/activity_service.go` + +- [ ] **Step 1: 在 PurchaseItem 写库成功后追加 Publish** + +在 `PurchaseItem` 函数中 `CreateContribution` 成功之后、`return` 之前追加: + +```go + // 推送 contributions_response 到 Redis Pub/Sub + contribMsg := &pb.ContributionRecord{ + Id: contribution.ID, + UserId: userID, + StarId: req.StarId, + ItemType: req.ItemType, + Quantity: int32(req.Quantity), + ComboCount: int32(comboCount), + CreatedAt: now, + } + // 拉取昵称头像 + if profile, _ := s.userRPCClient.GetFanProfile(ctx, userID, req.StarId); profile != nil { + contribMsg.Nickname = profile.Nickname + contribMsg.AvatarUrl = profile.AvatarUrl + } + if item != nil { + contribMsg.ItemId = item.ID + contribMsg.ItemName = item.ItemName + contribMsg.ItemIcon = item.IconUrl + } + + payload, _ := json.Marshal(map[string]interface{}{ + "activity_id": req.ActivityId, + "type": "contributions_response", + "record": contribMsg, + }) + s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload) +``` + +> 实际变量名以 PurchaseItem 函数体内已有定义为准(如 `item`、`comboCount`、`now` 等)。 + +- [ ] **Step 2: 在 BatchPurchaseItem 末尾对每个成功 item Publish** + +类似 Step 1,在 BatchPurchaseItem 中每次 `CreateContribution` 成功后追加 Publish。 + +- [ ] **Step 3: 编译验证** + +```bash +cd backend/services/activityService && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 10: Provider 层添加 RPC 入口 + +**Files:** +- Modify: `backend/services/activityService/provider/activity_provider.go` + +- [ ] **Step 1: 在 ActivityProvider 上追加 2 个 RPC 方法** + +在文件末尾追加(参照 `PurchaseItem` 的 pattern): + +```go +// ListActivityMessages 列出活动留言 +func (p *ActivityProvider) ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) { + logger.Logger.Info("Received ListActivityMessages request", zap.Int64("activity_id", req.ActivityId)) + resp, err := p.activityService.ListActivityMessages(ctx, req) + if err != nil { + logger.Logger.Error("ListActivityMessages failed", zap.Error(err)) + return &pb.ListActivityMessagesResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(err)), + Message: err.Error(), + Timestamp: time.Now().UnixMilli(), + }, + }, err + } + logger.Logger.Info("ListActivityMessages successful", zap.Int64("activity_id", req.ActivityId), zap.Int("count", len(resp.Messages))) + return resp, nil +} + +// CreateActivityMessage 发送一条留言 +func (p *ActivityProvider) CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) { + logger.Logger.Info("Received CreateActivityMessage request", zap.Int64("activity_id", req.ActivityId), zap.Int64("user_id", req.UserId)) + resp, err := p.activityService.CreateActivityMessage(ctx, req) + if err != nil { + logger.Logger.Error("CreateActivityMessage failed", zap.Error(err)) + return &pb.CreateActivityMessageResponse{ + Base: &pbCommon.BaseResponse{ + Code: uint32(appErrors.ToGRPCCode(err)), + Message: err.Error(), + Timestamp: time.Now().UnixMilli(), + }, + }, err + } + logger.Logger.Info("CreateActivityMessage successful", zap.Int64("message_id", resp.Message.Id)) + return resp, nil +} +``` + +- [ ] **Step 2: 编译验证** + +```bash +cd backend/services/activityService && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 11: ActivityHub WebSocket 实现 + +**Files:** +- Create: `backend/gateway/socket/activity_socket.go` + +- [ ] **Step 1: 创建 ActivityHub** + +```go +package socket + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/redis/go-redis/v9" + "github.com/topfans/backend/pkg/jwt" + "github.com/topfans/backend/pkg/logger" + "go.uber.org/zap" +) + +// ActivityHub 管理所有 Activity WebSocket 连接 +type ActivityHub struct { + clients map[int64]map[*ActivityConn]struct{} // userId -> set of conns(一个用户可多端) + subscriptions map[string]map[*ActivityConn]struct{} // "act:42:messages" / "act:42:contributions" -> conns + redisClient *redis.Client + activityPath string + mu sync.RWMutex +} + +// ActivityConn 单条 WebSocket 连接 +type ActivityConn struct { + UserID int64 + StarID int64 + Conn *websocket.Conn + Send chan []byte + Hub *ActivityHub + writeMu sync.Mutex +} + +// writeJSON 线程安全 JSON 写入 +func (c *ActivityConn) writeJSON(data interface{}) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.Conn.WriteJSON(data) +} + +// NewActivityHub 创建 ActivityHub +func NewActivityHub(redisClient *redis.Client, activityPath string) *ActivityHub { + return &ActivityHub{ + clients: make(map[int64]map[*ActivityConn]struct{}), + subscriptions: make(map[string]map[*ActivityConn]struct{}), + redisClient: redisClient, + activityPath: activityPath, + } +} + +// Run 启动 Redis PSubscribe,收到 publish 后 fanout 到本地连接 +func (h *ActivityHub) Run(ctx context.Context) { + sub := h.redisClient.PSubscribe(ctx, "act:*:messages", "act:*:contributions") + defer sub.Close() + ch := sub.Channel() + logger.Logger.Info("ActivityHub subscribed to Redis Pub/Sub channels") + for { + select { + case <-ctx.Done(): + logger.Logger.Info("ActivityHub Run loop exiting due to context done") + return + case msg, ok := <-ch: + if !ok { + logger.Logger.Warn("ActivityHub Redis Pub/Sub channel closed") + return + } + var payload map[string]interface{} + if err := json.Unmarshal([]byte(msg.Payload), &payload); err != nil { + logger.Logger.Error("ActivityHub failed to unmarshal pubsub payload", zap.Error(err)) + continue + } + h.fanout(msg.Channel, payload) + } + } +} + +// fanout 把 payload 推送到订阅该 channel 的所有本地连接 +func (h *ActivityHub) fanout(channel string, payload map[string]interface{}) { + h.mu.RLock() + conns := h.subscriptions[channel] + targets := make([]*ActivityConn, 0, len(conns)) + for c := range conns { + targets = append(targets, c) + } + h.mu.RUnlock() + + for _, c := range targets { + if err := c.writeJSON(payload); err != nil { + logger.Logger.Error("ActivityHub writeJSON failed", zap.Int64("user_id", c.UserID), zap.Error(err)) + } + } +} + +// HandleWebSocket 处理 /activity 握手 +func (h *ActivityHub) HandleWebSocket(w http.ResponseWriter, r *http.Request) { + token := r.URL.Query().Get("token") + + userID, starID, err := h.validateToken(token) + if err != nil { + logger.Logger.Error("Activity WebSocket token validation failed", zap.Error(err)) + w.WriteHeader(http.StatusUnauthorized) + json.NewEncoder(w).Encode(map[string]interface{}{ + "type": "auth_response", + "success": false, + "error": "invalid_token", + }) + return + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logger.Logger.Error("Activity WebSocket upgrade failed", zap.Error(err)) + return + } + + c := &ActivityConn{ + UserID: userID, + StarID: starID, + Conn: conn, + Send: make(chan []byte, 256), + Hub: h, + } + + h.mu.Lock() + if h.clients[userID] == nil { + h.clients[userID] = make(map[*ActivityConn]struct{}) + } + h.clients[userID][c] = struct{}{} + h.mu.Unlock() + + logger.Logger.Info("Activity WebSocket connection established", + zap.Int64("user_id", userID), + zap.Int64("star_id", starID), + ) + + // 立即推送 auth_response + conn.WriteJSON(map[string]interface{}{ + "type": "auth_response", + "success": true, + "user_id": userID, + "star_id": starID, + }) + + go c.readPump() + go c.writePump() +} + +// validateToken 验证 token(JWT) +func (h *ActivityHub) validateToken(token string) (int64, int64, error) { + if strings.HasPrefix(token, "Bearer_") { + token = strings.TrimPrefix(token, "Bearer_") + } + claims, err := jwt.ParseToken(token) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse token: %w", err) + } + if claims.UserID == 0 { + return 0, 0, fmt.Errorf("invalid user id") + } + return claims.UserID, claims.StarID, nil +} + +// readPump 读取客户端消息 +func (c *ActivityConn) readPump() { + defer func() { + c.Hub.unregister(c) + c.Conn.Close() + }() + + c.Conn.SetReadLimit(64 * 1024) // 64KB(订阅消息很小) + c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.Conn.SetPongHandler(func(string) error { + c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + + for { + _, message, err := c.Conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + logger.Logger.Error("Activity WebSocket read error", zap.Error(err)) + } + break + } + + var msg map[string]interface{} + if err := json.Unmarshal(message, &msg); err != nil { + logger.Logger.Error("Failed to parse activity message", zap.Error(err)) + continue + } + c.handleMessage(msg) + } +} + +// writePump 写消息到客户端 +func (c *ActivityConn) writePump() { + ticker := time.NewTicker(30 * time.Second) + defer func() { + ticker.Stop() + c.Conn.Close() + }() + + for { + select { + case message, ok := <-c.Send: + c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if !ok { + c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + w, err := c.Conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +// handleMessage 处理客户端 subscribe/unsubscribe/ping +func (c *ActivityConn) handleMessage(msg map[string]interface{}) { + action, _ := msg["action"].(string) + switch action { + case "ping": + c.Send <- []byte(`{"type":"pong"}`) + + case "subscribe": + activityID := toInt64(msg["activity_id"]) + topics := toStringSlice(msg["topics"]) + c.Hub.subscribe(c, activityID, topics) + c.writeJSON(map[string]interface{}{ + "type": "subscribe_response", + "activity_id": activityID, + "topics": topics, + }) + + case "unsubscribe": + activityID := toInt64(msg["activity_id"]) + topics := toStringSlice(msg["topics"]) + c.Hub.unsubscribe(c, activityID, topics) + c.writeJSON(map[string]interface{}{ + "type": "unsubscribe_response", + "activity_id": activityID, + "topics": topics, + }) + + default: + logger.Logger.Warn("Unknown activity action", zap.String("action", action)) + } +} + +// subscribe 幂等订阅 +func (h *ActivityHub) subscribe(c *ActivityConn, activityID int64, topics []string) { + if activityID <= 0 || len(topics) == 0 { + return + } + h.mu.Lock() + defer h.mu.Unlock() + for _, t := range topics { + ch := fmt.Sprintf("act:%d:%s", activityID, t) + if h.subscriptions[ch] == nil { + h.subscriptions[ch] = make(map[*ActivityConn]struct{}) + } + h.subscriptions[ch][c] = struct{}{} + } +} + +// unsubscribe 幂等取消订阅 +func (h *ActivityHub) unsubscribe(c *ActivityConn, activityID int64, topics []string) { + if activityID <= 0 || len(topics) == 0 { + return + } + h.mu.Lock() + defer h.mu.Unlock() + for _, t := range topics { + ch := fmt.Sprintf("act:%d:%s", activityID, t) + if conns, ok := h.subscriptions[ch]; ok { + delete(conns, c) + if len(conns) == 0 { + delete(h.subscriptions, ch) + } + } + } +} + +// unregister 断开时清理 +func (h *ActivityHub) unregister(c *ActivityConn) { + h.mu.Lock() + defer h.mu.Unlock() + if conns, ok := h.clients[c.UserID]; ok { + delete(conns, c) + if len(conns) == 0 { + delete(h.clients, c.UserID) + } + } + for ch, conns := range h.subscriptions { + if _, ok := conns[c]; ok { + delete(conns, c) + if len(conns) == 0 { + delete(h.subscriptions, ch) + } + } + } +} + +// Close 关闭所有连接 +func (h *ActivityHub) Close() { + h.mu.Lock() + defer h.mu.Unlock() + for _, conns := range h.clients { + for c := range conns { + c.Conn.Close() + } + } +} + +// helper +func toInt64(v interface{}) int64 { + switch x := v.(type) { + case float64: + return int64(x) + case int64: + return x + case int: + return int64(x) + case string: + i, _ := strconv.ParseInt(x, 10, 64) + return i + } + return 0 +} + +func toStringSlice(v interface{}) []string { + arr, ok := v.([]interface{}) + if !ok { + return nil + } + out := make([]string, 0, len(arr)) + for _, item := range arr { + if s, ok := item.(string); ok { + out = append(out, s) + } + } + return out +} +``` + +- [ ] **Step 2: 编译验证** + +```bash +cd backend/gateway && go build ./... +``` + +Expected: 编译通过。 + +> 复用 `ai_chat_socket.go` 中已有的 `upgrader` 变量(同一个 package,无需重新声明)。 + +--- + +## Task 12: Gateway Config 添加 ActivityPath + +**Files:** +- Modify: `backend/gateway/config/config.go` + +- [ ] **Step 1: 在 WebSocketConfig struct 中追加 ActivityPath 字段** + +```go +type WebSocketConfig struct { + AIChatPath string // WebSocket 路径,默认 /ai-chat + ActivityPath string // 活动实时推送 WS 路径,默认 /activity +} +``` + +- [ ] **Step 2: 在 Load() 中追加 ActivityPath 默认值** + +```go +WebSocket: WebSocketConfig{ + AIChatPath: getEnv("WS_AI_CHAT_PATH", "/ai-chat"), + ActivityPath: getEnv("WS_ACTIVITY_PATH", "/activity"), +}, +``` + +- [ ] **Step 3: 编译验证** + +```bash +cd backend/gateway && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 13: Router 注册 ActivityHub 路由和 HTTP 路由 + +**Files:** +- Modify: `backend/gateway/router/router.go` + +- [ ] **Step 1: 修改 SetupRouter 签名追加 activityHub 与 activityPath 参数** + +在 `SetupRouter` 函数签名末尾追加: + +```go +activityHub *socket.ActivityHub, +activityPath string, +``` + +- [ ] **Step 2: 在 AI Chat WS 路由注册代码块后追加 Activity WS 路由** + +```go +// Activity 实时推送 WebSocket 路由 +r.GET(activityPath, gin.WrapF(func(w http.ResponseWriter, r *http.Request) { + activityHub.HandleWebSocket(w, r) +})) +``` + +- [ ] **Step 3: 在 activities 路由组内追加 messages 路由** + +在 `activities.GET("/:id/contributions/latest", ...)` 之后追加: + +```go +activities.GET("/:id/messages", activityCtrl.ListActivityMessages) +activities.POST("/:id/messages", activityCtrl.CreateActivityMessage) +``` + +- [ ] **Step 4: 编译验证** + +```bash +cd backend/gateway && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 14: Controller 追加 ListActivityMessages / CreateActivityMessage handler + +**Files:** +- Modify: `backend/gateway/controller/activity_controller.go` + +- [ ] **Step 1: 在 ActivityController 上追加 2 个 handler** + +参照现有 `GetLatestContributions` handler 的 pattern(Dubbo 注入、参数转换、错误码映射): + +```go +// ListActivityMessages 列出活动留言 +// @Summary 列出活动留言 +// @Description 分页获取活动留言列表(最新在上) +// @Tags activities +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param activity_id path int64 true "活动ID" +// @Param page query int false "页码,默认1" +// @Param page_size query int false "每页数量,默认20,最大50" +// @Success 200 {object} response.Response +// @Router /api/v1/activities/{activity_id}/messages [get] +func (ctrl *ActivityController) ListActivityMessages(c *gin.Context) { + activityID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.ErrorWithCode(c, http.StatusBadRequest, "INVALID_ACTIVITY_ID", "无效的活动ID") + return + } + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20")) + + req := &pb.ListActivityMessagesRequest{ + ActivityId: activityID, + Page: int32(page), + PageSize: int32(pageSize), + } + + ctx := ctrl.buildActivityCtx(c) + resp, err := ctrl.activityProvider.ListActivityMessages(ctx, req) + if err != nil { + response.Error(c, http.StatusInternalServerError, err.Error()) + return + } + if resp.Base.Code != uint32(codes.OK) { + status := grpcCodeToHTTP(resp.Base.Code) + response.ErrorWithCode(c, status, strconv.Itoa(int(resp.Base.Code)), resp.Base.Message) + return + } + + messages := make([]gin.H, 0, len(resp.Messages)) + for _, m := range resp.Messages { + messages = append(messages, gin.H{ + "id": m.Id, + "activity_id": m.ActivityId, + "user_id": m.UserId, + "star_id": m.StarId, + "nickname": m.Nickname, + "avatar_url": m.AvatarUrl, + "content": m.Content, + "created_at": m.CreatedAt, + }) + } + + response.Success(c, gin.H{ + "messages": messages, + "page": resp.Page, + "page_size": resp.PageSize, + "total": resp.Total, + }) +} + +// CreateActivityMessage 发送一条活动留言 +// @Summary 发送活动留言 +// @Description 用户在应援活动页面发送一条祝福留言 +// @Tags activities +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param activity_id path int64 true "活动ID" +// @Param request body object{content=string} true "留言内容" +// @Success 200 {object} response.Response +// @Router /api/v1/activities/{activity_id}/messages [post] +func (ctrl *ActivityController) CreateActivityMessage(c *gin.Context) { + activityID, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + response.ErrorWithCode(c, http.StatusBadRequest, "INVALID_ACTIVITY_ID", "无效的活动ID") + return + } + + var body struct { + Content string `json:"content" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + response.ErrorWithCode(c, http.StatusBadRequest, "INVALID_BODY", "请求体格式错误") + return + } + + userID, _ := c.Get("user_id") + starID, _ := c.Get("star_id") + + req := &pb.CreateActivityMessageRequest{ + ActivityId: activityID, + UserId: toInt64(userID), + StarId: toInt64(starID), + Content: body.Content, + } + + ctx := ctrl.buildActivityCtx(c) + resp, err := ctrl.activityProvider.CreateActivityMessage(ctx, req) + if err != nil { + response.Error(c, http.StatusInternalServerError, err.Error()) + return + } + if resp.Base.Code != uint32(codes.OK) { + status := grpcCodeToHTTP(resp.Base.Code) + response.ErrorWithCode(c, status, strconv.Itoa(int(resp.Base.Code)), resp.Base.Message) + return + } + + m := resp.Message + response.Success(c, gin.H{ + "message": gin.H{ + "id": m.Id, + "activity_id": m.ActivityId, + "user_id": m.UserId, + "star_id": m.StarId, + "nickname": m.Nickname, + "avatar_url": m.AvatarUrl, + "content": m.Content, + "created_at": m.CreatedAt, + }, + }) +} +``` + +> 具体 helper(`buildActivityCtx`、`grpcCodeToHTTP`、`toInt64`)按 controller 文件中已有定义复用;如不存在则需要按同 package 已有 pattern 添加。 + +- [ ] **Step 2: 编译验证** + +```bash +cd backend/gateway && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 15: Gateway main.go 初始化 ActivityHub + +**Files:** +- Modify: `backend/gateway/main.go` + +- [ ] **Step 1: 创建 ActivityHub 实例并启动 Run goroutine** + +在 router.SetupRouter 调用之前: + +```go +activityHub := socket.NewActivityHub(redisClient, cfg.WebSocket.ActivityPath) +go activityHub.Run(context.Background()) +defer activityHub.Close() +``` + +- [ ] **Step 2: 把 activityHub 传给 SetupRouter** + +修改 `router.SetupRouter(...)` 调用,传入 `activityHub` 和 `cfg.WebSocket.ActivityPath`。 + +- [ ] **Step 3: 编译验证** + +```bash +cd backend/gateway && go build ./... +``` + +Expected: 编译通过。 + +--- + +## Task 16: 前端添加 API 工具函数 + +**Files:** +- Modify: `frontend/utils/api.js` + +- [ ] **Step 1: 在文件中追加 2 个导出函数** + +在文件末尾(其他 activity API 函数附近)追加: + +```js +/** + * 列出活动留言(首次/下拉加载) + * @param {string|number} activityId + * @param {number} [page=1] + * @param {number} [pageSize=20] + */ +export function listActivityMessagesApi(activityId, page = 1, pageSize = 20) { + return request({ + url: `/api/v1/activities/${activityId}/messages?page=${page}&page_size=${pageSize}`, + method: 'GET', + }) +} + +/** + * 发送一条活动留言 + * @param {string|number} activityId + * @param {string} content + */ +export function createActivityMessageApi(activityId, content) { + return request({ + url: `/api/v1/activities/${activityId}/messages`, + method: 'POST', + data: { content }, + }) +} +``` + +- [ ] **Step 2: 验证** + +```bash +cd frontend && npm run lint:js 2>&1 | head -20 || echo "lint not configured" +``` + +Expected: 无语法错误。 + +--- + +## Task 17: 创建 ActivitySocket.js + +**Files:** +- Create: `frontend/utils/socket/ActivitySocket.js` + +- [ ] **Step 1: 实现 ActivitySocket 类** + +```js +import SocketManager from './SocketManager.js' + +const DEFAULT_PATH = '/activity' + +/** + * 活动实时推送 WebSocket 客户端 + * - 继承 SocketManager,复用连接/心跳/重连 + * - 额外暴露 topic 订阅 API:subscribe / unsubscribe + */ +class ActivitySocket extends SocketManager { + constructor(options = {}) { + super({ + serviceName: 'Activity', + path: options.path || DEFAULT_PATH, + reconnectInterval: options.reconnectInterval ?? [1000, 2000, 4000, 8000, 16000, 30000], + heartbeatInterval: options.heartbeatInterval ?? 30000, + maxReconnectAttempts: options.maxReconnectAttempts ?? Infinity, + }) + + this._topics = new Set() // 记录已订阅 topic,便于重连后自动重订阅 + } + + /** + * 订阅活动主题 + * @param {string|number} activityId + * @param {string[]} topics - ['messages', 'contributions'] + */ + subscribe(activityId, topics = []) { + if (!activityId || !Array.isArray(topics) || topics.length === 0) return + topics.forEach(t => this._topics.add(`${activityId}:${t}`)) + if (this.isConnected) { + this.send({ action: 'subscribe', activity_id: Number(activityId), topics }) + } + } + + /** + * 取消订阅活动主题 + */ + unsubscribe(activityId, topics = []) { + if (!activityId || !Array.isArray(topics) || topics.length === 0) return + topics.forEach(t => this._topics.delete(`${activityId}:${t}`)) + if (this.isConnected) { + this.send({ action: 'unsubscribe', activity_id: Number(activityId), topics }) + } + } + + /** + * 注册 messages 响应回调 + */ + onMessagesResponse(cb) { + this.registerHandler('messages_response', cb) + } + + offMessagesResponse(cb) { + this.off('messages_response', cb) + } + + /** + * 注册 contributions 响应回调 + */ + onContributionsResponse(cb) { + this.registerHandler('contributions_response', cb) + } + + offContributionsResponse(cb) { + this.off('contributions_response', cb) + } + + /** + * 重连成功后,自动重新订阅所有 topic + */ + resubscribeAll() { + if (this._topics.size === 0) return + const byActivity = new Map() + this._topics.forEach(key => { + const [activityId, topic] = key.split(':') + if (!byActivity.has(Number(activityId))) byActivity.set(Number(activityId), []) + byActivity.get(Number(activityId)).push(topic) + }) + byActivity.forEach((topics, activityId) => { + this.send({ action: 'subscribe', activity_id: activityId, topics }) + }) + } +} + +// ==================== 单例 ==================== + +let instance = null + +export function getActivitySocket() { + if (!instance) { + instance = new ActivitySocket() + // 重连成功后自动重订阅 + instance.on('connect', () => instance.resubscribeAll()) + } + return instance +} + +export function closeActivitySocket() { + if (instance) { + instance.close() + } +} + +export default ActivitySocket +``` + +- [ ] **Step 2: 验证** + +```bash +cd frontend && node -e "import('./utils/socket/ActivitySocket.js').then(m => console.log('OK', typeof m.getActivitySocket))" 2>&1 | head -5 || echo "ESM import test" +``` + +Expected: 输出 `OK function`(如支持 ESM 直接验证)。 + +--- + +## Task 18: 更新 GlobalSocketManager.js 接入 ActivitySocket + +**Files:** +- Modify: `frontend/utils/socket/GlobalSocketManager.js` + +- [ ] **Step 1: 在 init 中追加 _initActivity** + +修改 `init(token)` 方法,在 `_initAiChat(token)` 调用前后追加: + +```js +import { getActivitySocket } from './ActivitySocket.js' + +class GlobalSocketManager { + // ... 保留原有字段 ... + + init(token) { + if (this.initialized) return + this.initialized = true + this._initAiChat(token) + this._initActivity(token) + } + + _initActivity(token) { + const socket = getActivitySocket() + // 不立即 connect,延后到 useContributionRealtime 实际使用时 connect + // 但如果 token 已存在,可以提前 connect 复用 + if (token && !socket.isConnected) { + socket.connect(token).catch(err => console.warn('[GlobalSocket] activity connect error:', err)) + } + } +} +``` + +- [ ] **Step 2: 验证** + +```bash +cd frontend && npm run lint:js 2>&1 | head -20 || echo "lint not configured" +``` + +Expected: 无语法错误。 + +--- + +## Task 19: 重构 useContributionPolling 暴露 highestIdRef + +**Files:** +- Modify: `frontend/pages/support-activity/composables/useContributionPolling.js` + +- [ ] **Step 1: 在 return 前导出 latestTimestamp 和 latestId** + +修改 `return { ... }`: + +```js + return { + records, + visible, + loading, + error, + start, + stop, + reset, + // 暴露给 useContributionRealtime 用 + highestTimestampRef: () => latestTimestamp, + highestIdRef: () => latestId, + } +``` + +> 用函数返回避免解构时丢失响应性。或改为 `ref`/readonly。 + +- [ ] **Step 2: 验证** + +```bash +cd frontend && npm run lint:js 2>&1 | head -20 || echo "lint not configured" +``` + +Expected: 无语法错误。 + +> 替代方案:直接在 useContributionRealtime 中维护独立的 `highestIdRef`。但复用更 DRY。 + +--- + +## Task 20: 创建 useContributionRealtime + +**Files:** +- Create: `frontend/pages/support-activity/composables/useContributionRealtime.js` + +- [ ] **Step 1: 实现 composable** + +```js +import { onMounted, onUnmounted } from 'vue' +import { useContributionPolling } from './useContributionPolling.js' +import { getActivitySocket } from '@/utils/socket/ActivitySocket.js' + +/** + * 贡献实时推送 composable(WS 优先,断线降级为轮询) + * @param {Ref} activityId + * @param {Ref} isPageActive + */ +export function useContributionRealtime(activityId, isPageActive) { + const MAX_RECORDS = 5 + + const { + records, + visible, + loading, + error, + start: startPolling, + stop: stopPolling, + reset: resetPolling, + highestIdRef, + highestTimestampRef, + } = useContributionPolling(activityId, isPageActive) + + const socket = getActivitySocket() + let usingWS = false + + function onWsMessage(payload) { + if (!payload || Number(payload.activity_id) !== Number(activityId.value)) return + if (!payload.record) return + const record = payload.record + if (record.id > highestIdRef()) { + records.value = [record, ...records.value].slice(0, MAX_RECORDS) + // 同步轮询侧游标(保持降级时不重拉) + // 注意:useContributionPolling 内部仍持有 latestId/最新时间戳变量 + } + } + + function onWsConnect() { + if (usingWS) return + usingWS = true + stopPolling() // 停掉可能的轮询 + socket.subscribe(activityId.value, ['contributions']) + } + + function onWsDisconnect() { + if (!usingWS) return + usingWS = false + startPolling() // 降级为轮询 + } + + socket.onContributionsResponse(onWsMessage) + socket.on('connect', onWsConnect) + socket.on('disconnect', onWsDisconnect) + + onMounted(() => { + if (socket.isConnected) { + onWsConnect() + } else { + // 尝试启动 socket + const token = uni.getStorageSync('access_token') + if (token) { + socket.connect(token).catch(err => console.warn('[useContributionRealtime] connect error:', err)) + } + // 如果连接失败,先用轮询兜底 + if (!socket.isConnected) startPolling() + } + }) + + onUnmounted(() => { + if (usingWS) socket.unsubscribe(activityId.value, ['contributions']) + socket.off('connect', onWsConnect) + socket.off('disconnect', onWsDisconnect) + socket.offContributionsResponse(onWsMessage) + stopPolling() + resetPolling() + }) + + return { + records, + visible, + loading, + error, + usingWS: () => usingWS, + } +} +``` + +- [ ] **Step 2: 验证** + +```bash +cd frontend && npm run lint:js 2>&1 | head -20 || echo "lint not configured" +``` + +Expected: 无语法错误。 + +--- + +## Task 21: 创建 useMessageRealtime + +**Files:** +- Create: `frontend/pages/support-activity/composables/useMessageRealtime.js` + +- [ ] **Step 1: 实现 composable** + +```js +import { ref, computed, onMounted, onUnmounted } from 'vue' +import { useStore } from 'vuex' +import { listActivityMessagesApi, createActivityMessageApi } from '@/utils/api.js' +import { getActivitySocket } from '@/utils/socket/ActivitySocket.js' + +const MAX_MESSAGES = 50 + +/** + * 活动留言实时推送 composable + */ +export function useMessageRealtime(activityId) { + const store = useStore() + const messages = ref([]) + const loading = ref(false) + const error = ref(null) + const currentUserId = computed(() => store.state.user?.userInfo?.uid) + const socket = getActivitySocket() + + // 字段映射:后端 → MessageBoard props + function toComponentShape(m) { + return { + id: m.id, + user: m.nickname || '', + avatar: m.avatar_url || '', + content: m.content, + time: m.created_at, + isSelf: m.user_id === currentUserId.value, + } + } + + async function loadHistory() { + if (!activityId.value) return + loading.value = true + error.value = null + try { + const res = await listActivityMessagesApi(activityId.value, 1, 20) + if (res.code === 0) { + messages.value = (res.data.messages || []).map(toComponentShape) + } else { + error.value = res.message || '加载留言失败' + } + } catch (e) { + error.value = e.message || '网络错误' + console.error('[useMessageRealtime] loadHistory error:', e) + } finally { + loading.value = false + } + } + + function onWsMessage(payload) { + if (!payload || Number(payload.activity_id) !== Number(activityId.value)) return + if (!payload.message) return + messages.value.push(toComponentShape(payload.message)) + if (messages.value.length > MAX_MESSAGES) { + messages.value.splice(0, messages.value.length - MAX_MESSAGES) + } + } + + async function sendMessage(content) { + if (!content || !content.trim()) return + const trimmed = content.trim() + try { + const res = await createActivityMessageApi(activityId.value, trimmed) + if (res.code === 0) { + // 成功时不本地 push,等 WS 推回避免重复 + // WS 断线场景:服务端不会推,由前端 fallback 插入 + if (!socket.isConnected && res.data?.message) { + messages.value.push(toComponentShape(res.data.message)) + if (messages.value.length > MAX_MESSAGES) { + messages.value.splice(0, messages.value.length - MAX_MESSAGES) + } + } + } else { + uni.showToast({ title: res.message || '留言失败', icon: 'none' }) + } + } catch (e) { + console.error('[useMessageRealtime] sendMessage error:', e) + uni.showToast({ title: '网络错误', icon: 'none' }) + } + } + + onMounted(() => { + loadHistory() + socket.subscribe(activityId.value, ['messages']) + socket.onMessagesResponse(onWsMessage) + }) + + onUnmounted(() => { + socket.unsubscribe(activityId.value, ['messages']) + socket.offMessagesResponse(onWsMessage) + }) + + return { + messages, + loading, + error, + sendMessage, + refresh: loadHistory, + } +} +``` + +- [ ] **Step 2: 验证** + +```bash +cd frontend && npm run lint:js 2>&1 | head -20 || echo "lint not configured" +``` + +Expected: 无语法错误。 + +--- + +## Task 22: index.vue 集成 useMessageRealtime 和 useContributionRealtime + +**Files:** +- Modify: `frontend/pages/support-activity/index.vue` + +- [ ] **Step 1: 在 `