1381 lines
40 KiB
Go
1381 lines
40 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
"unicode/utf8"
|
||
|
||
appErrors "github.com/topfans/backend/pkg/errors"
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/pkg/models"
|
||
pb "github.com/topfans/backend/pkg/proto/activity"
|
||
pbCommon "github.com/topfans/backend/pkg/proto/common"
|
||
"github.com/topfans/backend/services/activityService/client"
|
||
"github.com/topfans/backend/services/activityService/config"
|
||
"github.com/topfans/backend/services/activityService/repository"
|
||
"github.com/redis/go-redis/v9"
|
||
"go.uber.org/zap"
|
||
"google.golang.org/grpc/codes"
|
||
)
|
||
|
||
// rateLimitScript 原子 INCR + 首次 EXPIRE 的 Lua 脚本
|
||
// 消除 INCR 与 EXPIRE 之间的非原子竞态
|
||
var rateLimitScript = func() *redis.Script {
|
||
return redis.NewScript(`
|
||
local count = redis.call('INCR', KEYS[1])
|
||
if count == 1 then
|
||
redis.call('EXPIRE', KEYS[1], ARGV[1])
|
||
end
|
||
return count
|
||
`)
|
||
}()
|
||
|
||
// ActivityService 活动Service接口
|
||
type ActivityService interface {
|
||
// GetActivityList 获取活动列表
|
||
GetActivityList(ctx context.Context, req *pb.GetActivityListRequest) (*pb.GetActivityListResponse, error)
|
||
|
||
// GetActivity 获取活动详情
|
||
GetActivity(ctx context.Context, req *pb.GetProgressRequest) (*pb.Activity, error)
|
||
|
||
// GetActivityItems 获取活动道具列表
|
||
GetActivityItems(ctx context.Context, req *pb.GetProgressRequest) ([]*pb.ActivityItem, error)
|
||
|
||
// GetProgress 获取活动进度
|
||
GetProgress(ctx context.Context, req *pb.GetProgressRequest) (*pb.GetProgressResponse, error)
|
||
|
||
// PurchaseItem 购买道具
|
||
PurchaseItem(ctx context.Context, req *pb.PurchaseItemRequest) (*pb.PurchaseItemResponse, error)
|
||
|
||
// BatchPurchaseItem 批量购买道具
|
||
BatchPurchaseItem(ctx context.Context, req *pb.BatchPurchaseItemRequest) (*pb.BatchPurchaseItemResponse, error)
|
||
|
||
// GetContributionRanking 获取贡献点排名
|
||
GetContributionRanking(ctx context.Context, req *pb.ContributionRankingRequest) (*pb.ContributionRankingResponse, error)
|
||
|
||
// GetMintingActivities 获取铸造活动列表(用于运营banner)
|
||
GetMintingActivities(ctx context.Context, req *pb.GetMintingActivitiesRequest) (*pb.GetMintingActivitiesResponse, error)
|
||
|
||
// GetLatestContributions 获取最新贡献记录(用于实时显示)
|
||
GetLatestContributions(ctx context.Context, req *pb.GetLatestContributionsRequest) (*pb.GetLatestContributionsResponse, error)
|
||
|
||
// CreateActivityMessage 创建一条活动留言
|
||
CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error)
|
||
|
||
// ListActivityMessages 列出活动留言
|
||
ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error)
|
||
}
|
||
|
||
// activityService 活动Service实现
|
||
type activityService struct {
|
||
activityRepo repository.ActivityRepository
|
||
mintingActivityRepo repository.MintingActivityRepository
|
||
messagesRepo repository.ActivityMessagesRepository
|
||
userRPCClient client.UserRPCClient
|
||
redisClient *redis.Client
|
||
messageCfg *config.ActivityMessageConfig
|
||
}
|
||
|
||
// NewActivityService 创建活动Service实例
|
||
func NewActivityService(activityRepo repository.ActivityRepository, mintingActivityRepo repository.MintingActivityRepository, userRPCClient client.UserRPCClient, redisClient *redis.Client) ActivityService {
|
||
return &activityService{
|
||
activityRepo: activityRepo,
|
||
mintingActivityRepo: mintingActivityRepo,
|
||
messagesRepo: repository.NewActivityMessagesRepository(),
|
||
userRPCClient: userRPCClient,
|
||
redisClient: redisClient,
|
||
messageCfg: config.LoadMessageConfig(),
|
||
}
|
||
}
|
||
|
||
// comboKey 生成连击计数器 Redis Key
|
||
func (s *activityService) comboKey(userID int64, itemType string) string {
|
||
return fmt.Sprintf("combo:%d:%s", userID, itemType)
|
||
}
|
||
|
||
// incrementComboCount 增加连击计数
|
||
// 用户每次购买道具时调用,3秒内同用户同道具的多次购买累加显示
|
||
func (s *activityService) incrementComboCount(ctx context.Context, userID int64, itemType string) error {
|
||
if s.redisClient == nil {
|
||
return nil
|
||
}
|
||
key := s.comboKey(userID, itemType)
|
||
if err := s.redisClient.Incr(ctx, key).Err(); err != nil {
|
||
logger.Logger.Warn("Incr combo count failed", zap.Error(err))
|
||
return err
|
||
}
|
||
if err := s.redisClient.Expire(ctx, key, 3*time.Second).Err(); err != nil {
|
||
logger.Logger.Warn("Expire combo count failed", zap.Error(err))
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// getComboCount 获取连击计数
|
||
func (s *activityService) getComboCount(ctx context.Context, userID int64, itemType string) int64 {
|
||
if s.redisClient == nil {
|
||
return 1
|
||
}
|
||
key := s.comboKey(userID, itemType)
|
||
count, err := s.redisClient.Get(ctx, key).Int64()
|
||
if err != nil || count == 0 {
|
||
return 1
|
||
}
|
||
return count
|
||
}
|
||
|
||
// GetActivityList 获取活动列表
|
||
func (s *activityService) GetActivityList(ctx context.Context, req *pb.GetActivityListRequest) (*pb.GetActivityListResponse, error) {
|
||
logger.Logger.Info("GetActivityList request",
|
||
zap.Int64("star_id", req.StarId),
|
||
zap.String("status", req.Status),
|
||
zap.Int32("page", req.Page),
|
||
zap.Int32("page_size", req.PageSize),
|
||
)
|
||
|
||
if req.Page <= 0 {
|
||
req.Page = 1
|
||
}
|
||
if req.PageSize <= 0 {
|
||
req.PageSize = 10
|
||
}
|
||
|
||
activities, total, err := s.activityRepo.GetActivitiesByStar(req.StarId, req.Status, int(req.Page), int(req.PageSize))
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivityList failed", zap.Error(err))
|
||
return &pb.GetActivityListResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取活动列表失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 转换结果
|
||
pbActivities := make([]*pb.Activity, len(activities))
|
||
for i, activity := range activities {
|
||
pbActivities[i] = s.convertActivity(activity)
|
||
}
|
||
|
||
return &pb.GetActivityListResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
Activities: pbActivities,
|
||
Page: req.Page,
|
||
PageSize: req.PageSize,
|
||
Total: int32(total),
|
||
}, nil
|
||
}
|
||
|
||
// GetActivity 获取活动详情
|
||
func (s *activityService) GetActivity(ctx context.Context, req *pb.GetProgressRequest) (*pb.Activity, error) {
|
||
logger.Logger.Info("GetActivity request", zap.Int64("activity_id", req.ActivityId))
|
||
|
||
if req.ActivityId <= 0 {
|
||
return nil, fmt.Errorf("activity_id is required")
|
||
}
|
||
|
||
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivity failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
if activity == nil {
|
||
return nil, appErrors.ErrActivityNotFound
|
||
}
|
||
|
||
return s.convertActivity(activity), nil
|
||
}
|
||
|
||
// GetActivityItems 获取活动道具列表
|
||
func (s *activityService) GetActivityItems(ctx context.Context, req *pb.GetProgressRequest) ([]*pb.ActivityItem, error) {
|
||
logger.Logger.Info("GetActivityItems request", zap.Int64("activity_id", req.ActivityId))
|
||
|
||
if req.ActivityId <= 0 {
|
||
return nil, fmt.Errorf("activity_id is required")
|
||
}
|
||
|
||
items, err := s.activityRepo.GetActivityItems(req.ActivityId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivityItems failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
// 转换结果
|
||
pbItems := make([]*pb.ActivityItem, len(items))
|
||
for i, item := range items {
|
||
pbItems[i] = &pb.ActivityItem{
|
||
Id: item.ID,
|
||
ItemType: item.ItemType,
|
||
ItemName: item.ItemName,
|
||
IconUrl: item.IconURL,
|
||
CrystalCost: int32(item.CrystalCost),
|
||
ContributionPoints: int32(item.ContributionPoints),
|
||
}
|
||
}
|
||
|
||
return pbItems, nil
|
||
}
|
||
|
||
// GetProgress 获取活动进度
|
||
func (s *activityService) GetProgress(ctx context.Context, req *pb.GetProgressRequest) (*pb.GetProgressResponse, error) {
|
||
logger.Logger.Info("GetProgress request", zap.Int64("activity_id", req.ActivityId))
|
||
|
||
if req.ActivityId <= 0 {
|
||
return nil, fmt.Errorf("activity_id is required")
|
||
}
|
||
|
||
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetProgress failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
if activity == nil {
|
||
return nil, appErrors.ErrActivityNotFound
|
||
}
|
||
|
||
currentStage := activity.GetStage()
|
||
currentStatus := activity.GetCurrentStatus()
|
||
|
||
return &pb.GetProgressResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
ActivityId: activity.ID,
|
||
CurrentProgress: activity.CurrentProgress,
|
||
TargetProgress: activity.TargetProgress,
|
||
CurrentStage: currentStage,
|
||
EndTime: activity.EndTime,
|
||
Status: currentStatus,
|
||
}, nil
|
||
}
|
||
|
||
// PurchaseItem 购买道具
|
||
func (s *activityService) PurchaseItem(ctx context.Context, req *pb.PurchaseItemRequest) (*pb.PurchaseItemResponse, error) {
|
||
logger.Logger.Info("PurchaseItem request",
|
||
zap.Int64("activity_id", req.ActivityId),
|
||
zap.String("item_type", req.ItemType),
|
||
zap.Int32("quantity", req.Quantity),
|
||
zap.Int64("star_id", req.StarId),
|
||
)
|
||
|
||
// 参数校验
|
||
if req.ActivityId <= 0 {
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "activity_id is required",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if req.ItemType == "" {
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "item_type is required",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if req.Quantity <= 0 {
|
||
req.Quantity = 1
|
||
}
|
||
|
||
// 获取用户ID(从context中获取,这里简化处理)
|
||
userID := req.UserId
|
||
|
||
// 获取活动
|
||
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivity failed", zap.Error(err))
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取活动失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if activity == nil {
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.NotFound),
|
||
Message: "活动不存在",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 检查活动状态
|
||
currentStatus := activity.GetCurrentStatus()
|
||
if currentStatus != "active" {
|
||
var message string
|
||
switch currentStatus {
|
||
case "expired":
|
||
message = "activity:expired"
|
||
case "pending":
|
||
message = "activity:pending"
|
||
case "completed":
|
||
message = "activity:completed"
|
||
default:
|
||
message = "activity:expired"
|
||
}
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: message,
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 获取道具
|
||
item, err := s.activityRepo.GetActivityItemByType(req.ActivityId, req.ItemType)
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivityItemByType failed", zap.Error(err))
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取道具失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if item == nil {
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.NotFound),
|
||
Message: "道具不存在",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 计算总消费
|
||
totalCost := int64(item.CrystalCost) * int64(req.Quantity)
|
||
totalContribution := int64(item.ContributionPoints) * int64(req.Quantity)
|
||
|
||
// 通过RPC获取用户当前水晶余额
|
||
profile, err := s.userRPCClient.GetFanProfile(userID, req.StarId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetFanProfile failed", zap.Error(err))
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取粉丝档案失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if profile == nil {
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.NotFound),
|
||
Message: "粉丝档案不存在",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 检查水晶余额是否足够
|
||
if profile.CrystalBalance < totalCost {
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "水晶余额不足",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 通过RPC扣减水晶
|
||
newBalance, err := s.userRPCClient.UpdateCrystalBalance(userID, req.StarId, -totalCost)
|
||
if err != nil {
|
||
logger.Logger.Error("UpdateCrystalBalance failed", zap.Error(err))
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "扣减水晶失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 更新活动进度
|
||
newProgress := activity.CurrentProgress + totalContribution
|
||
if newProgress > activity.TargetProgress {
|
||
newProgress = activity.TargetProgress
|
||
}
|
||
|
||
err = s.activityRepo.UpdateActivityProgress(req.ActivityId, newProgress)
|
||
if err != nil {
|
||
logger.Logger.Error("UpdateActivityProgress failed", zap.Error(err))
|
||
}
|
||
|
||
// 创建贡献记录
|
||
now := time.Now().UnixMilli()
|
||
contribution := &models.ActivityContribution{
|
||
ActivityID: req.ActivityId,
|
||
UserID: userID,
|
||
StarID: req.StarId,
|
||
ItemID: item.ID,
|
||
ItemType: item.ItemType,
|
||
Quantity: int(req.Quantity),
|
||
CrystalSpent: totalCost,
|
||
ContributionPoints: totalContribution,
|
||
CreatedAt: now,
|
||
}
|
||
|
||
err = s.activityRepo.CreateContribution(contribution)
|
||
if err != nil {
|
||
logger.Logger.Error("CreateContribution failed", zap.Error(err))
|
||
}
|
||
|
||
// 更新 Redis 连击计数器(3秒TTL)
|
||
s.incrementComboCount(ctx, userID, req.ItemType)
|
||
comboCount := s.getComboCount(ctx, userID, req.ItemType)
|
||
|
||
// Redis Publish contributions_response(实时推送给订阅者)
|
||
if s.redisClient != nil {
|
||
nickname, avatarURL := "", ""
|
||
if profile, _ := s.userRPCClient.GetFanProfile(userID, req.StarId); profile != nil {
|
||
nickname = profile.Nickname
|
||
avatarURL = profile.AvatarUrl
|
||
}
|
||
itemName, itemIcon := "", ""
|
||
if item != nil {
|
||
itemName = item.ItemName
|
||
itemIcon = item.IconURL
|
||
}
|
||
contribMsg := &pb.ContributionRecord{
|
||
Id: contribution.ID,
|
||
UserId: userID,
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarURL,
|
||
StarId: req.StarId,
|
||
ItemId: item.ID,
|
||
ItemType: req.ItemType,
|
||
ItemName: itemName,
|
||
ItemIcon: itemIcon,
|
||
Quantity: int32(req.Quantity),
|
||
ComboCount: int32(comboCount),
|
||
CreatedAt: contribution.CreatedAt,
|
||
}
|
||
payload, _ := json.Marshal(map[string]interface{}{
|
||
"activity_id": req.ActivityId,
|
||
"type": "contributions_response",
|
||
"record": contribMsg,
|
||
})
|
||
s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload)
|
||
}
|
||
|
||
// 更新用户统计
|
||
stats, _ := s.activityRepo.GetUserStats(req.ActivityId, userID, req.StarId)
|
||
if stats == nil {
|
||
stats = &models.ActivityUserStats{
|
||
ActivityID: req.ActivityId,
|
||
UserID: userID,
|
||
StarID: req.StarId,
|
||
TotalContribution: 0,
|
||
TotalCrystalSpent: 0,
|
||
TotalItems: 0,
|
||
LastContributeAt: now,
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
}
|
||
}
|
||
|
||
stats.TotalContribution += totalContribution
|
||
stats.TotalCrystalSpent += totalCost
|
||
stats.TotalItems += int(req.Quantity)
|
||
stats.LastContributeAt = now
|
||
stats.UpdatedAt = now
|
||
|
||
err = s.activityRepo.UpdateUserStats(stats)
|
||
if err != nil {
|
||
logger.Logger.Error("UpdateUserStats failed", zap.Error(err))
|
||
}
|
||
|
||
logger.Logger.Info("PurchaseItem success",
|
||
zap.Int64("user_id", userID),
|
||
zap.Int64("total_cost", totalCost),
|
||
zap.Int64("total_contribution", totalContribution),
|
||
zap.Int64("new_progress", newProgress),
|
||
)
|
||
|
||
return &pb.PurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
TotalCrystalSpent: totalCost,
|
||
TotalContribution: totalContribution,
|
||
CurrentProgress: newProgress,
|
||
RemainingBalance: newBalance,
|
||
}, nil
|
||
}
|
||
|
||
// BatchPurchaseItem 批量购买道具
|
||
func (s *activityService) BatchPurchaseItem(ctx context.Context, req *pb.BatchPurchaseItemRequest) (*pb.BatchPurchaseItemResponse, error) {
|
||
logger.Logger.Info("BatchPurchaseItem request",
|
||
zap.Int64("activity_id", req.ActivityId),
|
||
zap.Int("items_count", len(req.Items)),
|
||
zap.Int64("star_id", req.StarId),
|
||
)
|
||
|
||
// 参数校验
|
||
if req.ActivityId <= 0 {
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "activity_id is required",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if len(req.Items) == 0 {
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "items 是必填参数",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
userID := req.UserId
|
||
|
||
// 获取活动
|
||
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivity failed", zap.Error(err))
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取活动失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if activity == nil {
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.NotFound),
|
||
Message: "活动不存在",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 检查活动状态
|
||
currentStatus := activity.GetCurrentStatus()
|
||
if currentStatus != "active" {
|
||
var message string
|
||
switch currentStatus {
|
||
case "expired":
|
||
message = "activity:expired"
|
||
case "pending":
|
||
message = "activity:pending"
|
||
case "completed":
|
||
message = "activity:completed"
|
||
default:
|
||
message = "activity:expired"
|
||
}
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: message,
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 通过RPC获取用户当前水晶余额(只调用一次)
|
||
profile, err := s.userRPCClient.GetFanProfile(userID, req.StarId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetFanProfile failed", zap.Error(err))
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取粉丝档案失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if profile == nil {
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.NotFound),
|
||
Message: "粉丝档案不存在",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 计算总消费水晶和贡献点,并验证所有items(预校验)
|
||
var totalCost int64
|
||
var totalContribution int64
|
||
itemInfoMap := make(map[string]*models.ActivityItem)
|
||
validItems := make([]*pb.PurchaseItem, 0, len(req.Items))
|
||
var fails []*pb.PurchaseFailItem
|
||
|
||
for _, item := range req.Items {
|
||
if item.Quantity <= 0 {
|
||
fails = append(fails, &pb.PurchaseFailItem{
|
||
ItemType: item.ItemType,
|
||
Reason: "quantity must be > 0",
|
||
})
|
||
continue
|
||
}
|
||
|
||
activityItem, err := s.activityRepo.GetActivityItemByType(req.ActivityId, item.ItemType)
|
||
if err != nil || activityItem == nil {
|
||
logger.Logger.Warn("GetActivityItemByType failed or item not found",
|
||
zap.String("item_type", item.ItemType),
|
||
zap.Error(err),
|
||
)
|
||
fails = append(fails, &pb.PurchaseFailItem{
|
||
ItemType: item.ItemType,
|
||
Reason: "item not found",
|
||
})
|
||
continue
|
||
}
|
||
|
||
itemInfoMap[item.ItemType] = activityItem
|
||
totalCost += int64(activityItem.CrystalCost) * int64(item.Quantity)
|
||
totalContribution += int64(activityItem.ContributionPoints) * int64(item.Quantity)
|
||
validItems = append(validItems, item)
|
||
}
|
||
|
||
// 如果所有items都失败,直接返回
|
||
if len(validItems) == 0 {
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "no valid items",
|
||
},
|
||
SuccessCount: 0,
|
||
FailCount: int32(len(fails)),
|
||
Fails: fails,
|
||
}, nil
|
||
}
|
||
|
||
// 检查水晶余额是否足够
|
||
if profile.CrystalBalance < totalCost {
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "水晶余额不足",
|
||
},
|
||
SuccessCount: 0,
|
||
FailCount: int32(len(req.Items)),
|
||
Fails: fails,
|
||
}, nil
|
||
}
|
||
|
||
// 通过RPC扣减水晶
|
||
newBalance, err := s.userRPCClient.UpdateCrystalBalance(userID, req.StarId, -totalCost)
|
||
if err != nil {
|
||
logger.Logger.Error("UpdateCrystalBalance failed", zap.Error(err))
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "扣减水晶失败: " + err.Error(),
|
||
},
|
||
SuccessCount: 0,
|
||
FailCount: int32(len(validItems)),
|
||
Fails: fails,
|
||
}, nil
|
||
}
|
||
|
||
// 更新活动进度
|
||
newProgress := activity.CurrentProgress + totalContribution
|
||
if newProgress > activity.TargetProgress {
|
||
newProgress = activity.TargetProgress
|
||
}
|
||
|
||
err = s.activityRepo.UpdateActivityProgress(req.ActivityId, newProgress)
|
||
if err != nil {
|
||
logger.Logger.Error("UpdateActivityProgress failed", zap.Error(err))
|
||
}
|
||
|
||
// 为每个成功的item创建贡献记录并更新连击计数
|
||
now := time.Now().UnixMilli()
|
||
for _, item := range validItems {
|
||
activityItem := itemInfoMap[item.ItemType]
|
||
itemCost := int64(activityItem.CrystalCost) * int64(item.Quantity)
|
||
itemContribution := int64(activityItem.ContributionPoints) * int64(item.Quantity)
|
||
|
||
contribution := &models.ActivityContribution{
|
||
ActivityID: req.ActivityId,
|
||
UserID: userID,
|
||
StarID: req.StarId,
|
||
ItemID: activityItem.ID,
|
||
ItemType: item.ItemType,
|
||
Quantity: int(item.Quantity),
|
||
CrystalSpent: itemCost,
|
||
ContributionPoints: itemContribution,
|
||
CreatedAt: now,
|
||
}
|
||
|
||
err = s.activityRepo.CreateContribution(contribution)
|
||
if err != nil {
|
||
logger.Logger.Error("CreateContribution failed", zap.Error(err))
|
||
}
|
||
|
||
// 更新 Redis 连击计数器(3秒TTL)
|
||
s.incrementComboCount(ctx, userID, item.ItemType)
|
||
comboCount := s.getComboCount(ctx, userID, item.ItemType)
|
||
|
||
// Redis Publish contributions_response(实时推送给订阅者)
|
||
if s.redisClient != nil {
|
||
nickname, avatarURL := "", ""
|
||
if profile, _ := s.userRPCClient.GetFanProfile(userID, req.StarId); profile != nil {
|
||
nickname = profile.Nickname
|
||
avatarURL = profile.AvatarUrl
|
||
}
|
||
contribMsg := &pb.ContributionRecord{
|
||
Id: contribution.ID,
|
||
UserId: userID,
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarURL,
|
||
StarId: req.StarId,
|
||
ItemId: activityItem.ID,
|
||
ItemType: item.ItemType,
|
||
ItemName: activityItem.ItemName,
|
||
ItemIcon: activityItem.IconURL,
|
||
Quantity: int32(item.Quantity),
|
||
ComboCount: int32(comboCount),
|
||
CreatedAt: contribution.CreatedAt,
|
||
}
|
||
payload, _ := json.Marshal(map[string]interface{}{
|
||
"activity_id": req.ActivityId,
|
||
"type": "contributions_response",
|
||
"record": contribMsg,
|
||
})
|
||
s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload)
|
||
}
|
||
}
|
||
|
||
// 更新用户统计
|
||
stats, _ := s.activityRepo.GetUserStats(req.ActivityId, userID, req.StarId)
|
||
if stats == nil {
|
||
stats = &models.ActivityUserStats{
|
||
ActivityID: req.ActivityId,
|
||
UserID: userID,
|
||
StarID: req.StarId,
|
||
TotalContribution: 0,
|
||
TotalCrystalSpent: 0,
|
||
TotalItems: 0,
|
||
LastContributeAt: now,
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
}
|
||
}
|
||
|
||
stats.TotalContribution += totalContribution
|
||
stats.TotalCrystalSpent += totalCost
|
||
stats.TotalItems += len(validItems)
|
||
stats.LastContributeAt = now
|
||
stats.UpdatedAt = now
|
||
|
||
err = s.activityRepo.UpdateUserStats(stats)
|
||
if err != nil {
|
||
logger.Logger.Error("UpdateUserStats failed", zap.Error(err))
|
||
}
|
||
|
||
logger.Logger.Info("BatchPurchaseItem success",
|
||
zap.Int64("user_id", userID),
|
||
zap.Int64("total_cost", totalCost),
|
||
zap.Int64("total_contribution", totalContribution),
|
||
zap.Int64("new_progress", newProgress),
|
||
zap.Int("success_count", len(validItems)),
|
||
zap.Int("fail_count", len(fails)),
|
||
)
|
||
|
||
return &pb.BatchPurchaseItemResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
TotalCrystalSpent: totalCost,
|
||
TotalContribution: totalContribution,
|
||
CurrentProgress: newProgress,
|
||
RemainingBalance: newBalance,
|
||
SuccessCount: int32(len(validItems)),
|
||
FailCount: int32(len(fails)),
|
||
Fails: fails,
|
||
}, nil
|
||
}
|
||
|
||
// GetContributionRanking 获取贡献点排名
|
||
func (s *activityService) GetContributionRanking(ctx context.Context, req *pb.ContributionRankingRequest) (*pb.ContributionRankingResponse, error) {
|
||
logger.Logger.Info("GetContributionRanking request",
|
||
zap.Int64("activity_id", req.ActivityId),
|
||
zap.Int64("star_id", req.StarId),
|
||
zap.Int32("page", req.Page),
|
||
zap.Int32("page_size", req.PageSize),
|
||
)
|
||
|
||
if req.ActivityId <= 0 {
|
||
return &pb.ContributionRankingResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "activity_id is required",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
if req.Page <= 0 {
|
||
req.Page = 1
|
||
}
|
||
if req.PageSize <= 0 {
|
||
req.PageSize = 10
|
||
}
|
||
|
||
// 获取排名列表
|
||
stats, total, err := s.activityRepo.GetRanking(req.ActivityId, req.StarId, int(req.Page), int(req.PageSize))
|
||
if err != nil {
|
||
logger.Logger.Error("GetRanking failed", zap.Error(err))
|
||
return &pb.ContributionRankingResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取排名失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 转换结果
|
||
items := make([]*pb.ContributionRankingItem, len(stats))
|
||
for i, stat := range stats {
|
||
// 从 fan_profiles 获取用户昵称和头像
|
||
var nickname, avatarUrl string
|
||
if req.StarId > 0 {
|
||
fanProfile, err := s.userRPCClient.GetFanProfile(stat.UserID, req.StarId)
|
||
if err == nil && fanProfile != nil {
|
||
nickname = fanProfile.Nickname
|
||
avatarUrl = fanProfile.AvatarUrl
|
||
}
|
||
}
|
||
|
||
items[i] = &pb.ContributionRankingItem{
|
||
Rank: int32(i + 1 + (int(req.Page)-1)*int(req.PageSize)),
|
||
UserId: stat.UserID,
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarUrl,
|
||
TotalContribution: stat.TotalContribution,
|
||
TotalCrystalSpent: stat.TotalCrystalSpent,
|
||
}
|
||
}
|
||
|
||
// 查询当前用户的贡献排名
|
||
var myContribution *pb.MyContribution
|
||
if req.UserId > 0 && req.StarId > 0 {
|
||
// 获取当前用户的头像和昵称
|
||
var nickname, avatarUrl string
|
||
fanProfile, err := s.userRPCClient.GetFanProfile(req.UserId, req.StarId)
|
||
if err == nil && fanProfile != nil {
|
||
nickname = fanProfile.Nickname
|
||
avatarUrl = fanProfile.AvatarUrl
|
||
}
|
||
|
||
myStat, err := s.activityRepo.GetUserStats(req.ActivityId, req.UserId, req.StarId)
|
||
if err == nil && myStat != nil {
|
||
// 计算当前用户的排名
|
||
rank, err := s.activityRepo.GetUserRank(req.UserId, req.ActivityId, req.StarId)
|
||
if err == nil {
|
||
myContribution = &pb.MyContribution{
|
||
Rank: int32(rank),
|
||
TotalContribution: myStat.TotalContribution,
|
||
TotalCrystalSpent: myStat.TotalCrystalSpent,
|
||
Status: "ranked",
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarUrl,
|
||
}
|
||
}
|
||
} else {
|
||
// 如果没有查到用户的贡献值,返回贡献值为0的对象
|
||
myContribution = &pb.MyContribution{
|
||
Rank: 0,
|
||
TotalContribution: 0,
|
||
TotalCrystalSpent: 0,
|
||
Status: "unranked",
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarUrl,
|
||
}
|
||
}
|
||
}
|
||
|
||
return &pb.ContributionRankingResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
Items: items,
|
||
MyContribution: myContribution,
|
||
Page: req.Page,
|
||
PageSize: req.PageSize,
|
||
Total: int32(total),
|
||
}, nil
|
||
}
|
||
|
||
// convertActivity 转换Activity模型到proto
|
||
func (s *activityService) convertActivity(activity *models.Activity) *pb.Activity {
|
||
items := make([]*pb.ActivityItem, len(activity.Items))
|
||
for i, item := range activity.Items {
|
||
items[i] = &pb.ActivityItem{
|
||
Id: item.ID,
|
||
ItemType: item.ItemType,
|
||
ItemName: item.ItemName,
|
||
IconUrl: item.IconURL,
|
||
CrystalCost: int32(item.CrystalCost),
|
||
ContributionPoints: int32(item.ContributionPoints),
|
||
}
|
||
}
|
||
|
||
// 解析 stage_configs 获取图片信息
|
||
coverImage, bannerImage, stageBg, stageTitle := activity.GetStageImages()
|
||
|
||
return &pb.Activity{
|
||
Id: activity.ID,
|
||
ActivityType: activity.ActivityType,
|
||
Title: activity.Title,
|
||
Theme: activity.Theme,
|
||
Description: activity.Description,
|
||
StarId: activity.StarID,
|
||
StartTime: activity.StartTime,
|
||
EndTime: activity.EndTime,
|
||
OverallEndTime: activity.OverallEndTime,
|
||
TargetProgress: activity.TargetProgress,
|
||
CurrentProgress: activity.CurrentProgress,
|
||
Status: activity.GetCurrentStatus(),
|
||
CurrentStage: activity.GetStage(),
|
||
Items: items,
|
||
CoverImage: coverImage,
|
||
BannerImage: bannerImage,
|
||
CurrentStageBackground: stageBg,
|
||
CurrentStageTitle: stageTitle,
|
||
Icon: activity.Icon,
|
||
}
|
||
}
|
||
|
||
// GetMintingActivities 获取铸造活动列表(用于运营banner)
|
||
func (s *activityService) GetMintingActivities(ctx context.Context, req *pb.GetMintingActivitiesRequest) (*pb.GetMintingActivitiesResponse, error) {
|
||
logger.Logger.Info("GetMintingActivities request",
|
||
zap.Int64("star_id", req.StarId),
|
||
zap.Int32("page", req.Page),
|
||
zap.Int32("page_size", req.PageSize),
|
||
)
|
||
|
||
if req.Page <= 0 {
|
||
req.Page = 1
|
||
}
|
||
if req.PageSize <= 0 {
|
||
req.PageSize = 10
|
||
}
|
||
|
||
activities, total, err := s.mintingActivityRepo.GetActiveMintingActivities(req.StarId, int(req.Page), int(req.PageSize))
|
||
if err != nil {
|
||
logger.Logger.Error("GetMintingActivities failed", zap.Error(err))
|
||
return &pb.GetMintingActivitiesResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取铸造活动列表失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 转换结果
|
||
pbActivities := make([]*pb.MintingActivity, len(activities))
|
||
for i, activity := range activities {
|
||
pbActivities[i] = &pb.MintingActivity{
|
||
Id: activity.ID,
|
||
Title: activity.Title,
|
||
Description: activity.Description,
|
||
CoverImage: activity.CoverImage,
|
||
StarId: activity.StarID,
|
||
Route: activity.Route,
|
||
Params: activity.Params,
|
||
IsActive: activity.IsActive,
|
||
CreatedAt: activity.CreatedAt,
|
||
UpdatedAt: activity.UpdatedAt,
|
||
}
|
||
}
|
||
|
||
return &pb.GetMintingActivitiesResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
Activities: pbActivities,
|
||
Page: req.Page,
|
||
PageSize: req.PageSize,
|
||
Total: int32(total),
|
||
}, nil
|
||
}
|
||
|
||
// GetLatestContributions 获取最新贡献记录(用于实时显示)
|
||
func (s *activityService) GetLatestContributions(ctx context.Context, req *pb.GetLatestContributionsRequest) (*pb.GetLatestContributionsResponse, error) {
|
||
logger.Logger.Info("GetLatestContributions request",
|
||
zap.Int64("activity_id", req.ActivityId),
|
||
zap.Int64("since_timestamp", req.SinceTimestamp),
|
||
zap.Int64("since_id", req.SinceId),
|
||
zap.Int32("limit", req.Limit),
|
||
)
|
||
|
||
if req.ActivityId <= 0 {
|
||
return &pb.GetLatestContributionsResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.InvalidArgument),
|
||
Message: "activity_id is required",
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
limit := int(req.Limit)
|
||
if limit <= 0 {
|
||
limit = 5
|
||
}
|
||
if limit > 20 {
|
||
limit = 20
|
||
}
|
||
|
||
// 获取贡献记录
|
||
contributions, err := s.activityRepo.GetLatestContributions(req.ActivityId, req.SinceTimestamp, req.SinceId, limit)
|
||
if err != nil {
|
||
logger.Logger.Error("GetLatestContributions failed", zap.Error(err))
|
||
return &pb.GetLatestContributionsResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.Internal),
|
||
Message: "获取贡献记录失败: " + err.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 转换结果
|
||
records := make([]*pb.ContributionRecord, len(contributions))
|
||
for i, c := range contributions {
|
||
// 获取用户昵称和头像
|
||
nickname, avatarUrl := "", ""
|
||
fanProfile, err := s.userRPCClient.GetFanProfile(c.UserID, c.StarID)
|
||
if err == nil && fanProfile != nil {
|
||
nickname = fanProfile.Nickname
|
||
avatarUrl = fanProfile.AvatarUrl
|
||
}
|
||
|
||
// 获取道具信息
|
||
itemName, itemIcon := "", ""
|
||
items, err := s.activityRepo.GetActivityItems(req.ActivityId)
|
||
if err == nil {
|
||
for _, item := range items {
|
||
if item.ID == c.ItemID {
|
||
itemName = item.ItemName
|
||
itemIcon = item.IconURL
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 获取连击计数
|
||
comboCount := int32(s.getComboCount(ctx, c.UserID, c.ItemType))
|
||
|
||
records[i] = &pb.ContributionRecord{
|
||
Id: c.ID,
|
||
UserId: c.UserID,
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarUrl,
|
||
StarId: c.StarID,
|
||
ItemId: c.ItemID,
|
||
ItemType: c.ItemType,
|
||
ItemName: itemName,
|
||
ItemIcon: itemIcon,
|
||
Quantity: int32(c.Quantity),
|
||
ComboCount: comboCount,
|
||
CreatedAt: c.CreatedAt,
|
||
}
|
||
}
|
||
|
||
return &pb.GetLatestContributionsResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
Records: records,
|
||
}, nil
|
||
}
|
||
|
||
// CreateActivityMessage 创建一条活动留言(含频控/累计上限/敏感词校验 + Redis Publish)
|
||
func (s *activityService) CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) {
|
||
logger.Logger.Info("CreateActivityMessage request",
|
||
zap.Int64("activity_id", req.ActivityId),
|
||
zap.Int64("user_id", req.UserId),
|
||
zap.Int64("star_id", req.StarId),
|
||
)
|
||
|
||
// 1. 入参校验
|
||
// 业务校验错误统一返回 (resp, nil): 把 gRPC code 放进 BaseResponse,
|
||
// 让 controller 通过 ErrorWithCode 走 HTTP 200 + body.code 路径;
|
||
// 仅基础设施错误(DB/Redis/RPC)用 (nil, err) 让 controller 走 HTTP 500。
|
||
content := strings.TrimSpace(req.Content)
|
||
if content == "" {
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentEmpty)),
|
||
Message: appErrors.ErrActivityMessageContentEmpty.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
if utf8.RuneCountInString(content) > 500 {
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentTooLong)),
|
||
Message: appErrors.ErrActivityMessageContentTooLong.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 2. 活动存在性 + 状态
|
||
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivityByID failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
if activity == nil {
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityNotFound)),
|
||
Message: appErrors.ErrActivityNotFound.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
// 使用 GetCurrentStatus() 计算动态状态(基于 StartTime/EndTime/CurrentProgress),
|
||
// 而不是 activity.Status(DB 列,创建后默认值 "pending" 不会被自动更新为 "active")
|
||
if activity.GetCurrentStatus() != "active" {
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageActivityInactive)),
|
||
Message: appErrors.ErrActivityMessageActivityInactive.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 3. 频控(Redis Lua 原子化:INCR + 首次设置 EXPIRE 60s)
|
||
if s.redisClient != nil {
|
||
rateKey := fmt.Sprintf("msg:rate:%d:%d", req.ActivityId, req.UserId)
|
||
var count int64
|
||
if rateLimitScript != nil {
|
||
raw, err := rateLimitScript.Run(ctx, s.redisClient, []string{rateKey}, 60).Result()
|
||
if err == nil {
|
||
if v, ok := raw.(int64); ok {
|
||
count = v
|
||
}
|
||
}
|
||
} else {
|
||
// 降级:脚本未初始化时退回 INCR + EXPIRE(非原子,但能跑)
|
||
count, err = s.redisClient.Incr(ctx, rateKey).Result()
|
||
if err == nil && count == 1 {
|
||
s.redisClient.Expire(ctx, rateKey, 60*time.Second)
|
||
}
|
||
}
|
||
if count > s.messageCfg.MessageRateLimitPerMin {
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageTooFrequent)),
|
||
Message: appErrors.ErrActivityMessageTooFrequent.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
}
|
||
|
||
// 4. 累计上限
|
||
total, err := s.messagesRepo.CountByUserActivity(req.ActivityId, req.UserId)
|
||
if err != nil {
|
||
logger.Logger.Error("CountByUserActivity failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
if total >= s.messageCfg.MessageLimitPerActivity {
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageLimitReached)),
|
||
Message: appErrors.ErrActivityMessageLimitReached.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 5. 敏感词校验(首版本地词表)
|
||
if containsBannedWord(content, s.messageCfg.BannedWords) {
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentInvalid)),
|
||
Message: appErrors.ErrActivityMessageContentInvalid.Error(),
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// 6. 写入
|
||
now := time.Now().UnixMilli()
|
||
msg := &models.ActivityMessage{
|
||
ActivityID: req.ActivityId,
|
||
UserID: req.UserId,
|
||
StarID: req.StarId,
|
||
Nickname: "",
|
||
AvatarURL: "",
|
||
Content: content,
|
||
Status: 0,
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
}
|
||
msgID, err := s.messagesRepo.Insert(msg)
|
||
if err != nil {
|
||
logger.Logger.Error("Insert message failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
// 7. 回查昵称头像
|
||
nickname, avatarURL := "", ""
|
||
if profile, _ := s.userRPCClient.GetFanProfile(req.UserId, req.StarId); profile != nil {
|
||
nickname = profile.Nickname
|
||
avatarURL = profile.AvatarUrl
|
||
}
|
||
|
||
// 写回 nickname/avatar(如果首次 RPC 拿到)
|
||
if (nickname != "" || avatarURL != "") && msgID > 0 {
|
||
if err := s.messagesRepo.UpdateProfile(msgID, nickname, avatarURL); err != nil {
|
||
logger.Logger.Warn("UpdateProfile failed", zap.Error(err), zap.Int64("msg_id", msgID))
|
||
}
|
||
}
|
||
|
||
pbMsg := &pb.ActivityMessage{
|
||
Id: msgID,
|
||
ActivityId: req.ActivityId,
|
||
UserId: req.UserId,
|
||
StarId: req.StarId,
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarURL,
|
||
Content: content,
|
||
CreatedAt: now,
|
||
}
|
||
|
||
// 8. Redis Publish(不论是否有人订阅都发)
|
||
if s.redisClient != nil {
|
||
channel := fmt.Sprintf("act:%d:messages", req.ActivityId)
|
||
payload, _ := json.Marshal(map[string]interface{}{
|
||
"activity_id": req.ActivityId,
|
||
"type": "messages_response",
|
||
"message": pbMsg,
|
||
})
|
||
s.redisClient.Publish(ctx, channel, payload)
|
||
}
|
||
|
||
logger.Logger.Info("CreateActivityMessage success",
|
||
zap.Int64("msg_id", msgID),
|
||
zap.Int64("user_id", req.UserId),
|
||
)
|
||
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
Message: pbMsg,
|
||
}, nil
|
||
}
|
||
|
||
// ListActivityMessages 列出活动留言
|
||
func (s *activityService) ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) {
|
||
logger.Logger.Info("ListActivityMessages request",
|
||
zap.Int64("activity_id", req.ActivityId),
|
||
zap.Int32("page", req.Page),
|
||
zap.Int32("page_size", req.PageSize),
|
||
)
|
||
|
||
page := int(req.Page)
|
||
pageSize := int(req.PageSize)
|
||
if page <= 0 {
|
||
page = 1
|
||
}
|
||
if pageSize <= 0 {
|
||
pageSize = 20
|
||
}
|
||
if pageSize > 50 {
|
||
pageSize = 50
|
||
}
|
||
|
||
// 校验活动存在
|
||
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
|
||
if err != nil {
|
||
logger.Logger.Error("GetActivityByID failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
if activity == nil {
|
||
return &pb.ListActivityMessagesResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityNotFound)),
|
||
Message: appErrors.ErrActivityNotFound.Error(),
|
||
},
|
||
}, appErrors.ErrActivityNotFound
|
||
}
|
||
|
||
rows, total, err := s.messagesRepo.ListByActivity(req.ActivityId, page, pageSize)
|
||
if err != nil {
|
||
logger.Logger.Error("ListByActivity failed", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
messages := make([]*pb.ActivityMessage, 0, len(rows))
|
||
for _, m := range rows {
|
||
nickname := m.Nickname
|
||
avatarURL := m.AvatarURL
|
||
// 若 DB 中 nickname 为空(早期数据),回查 user profile
|
||
if nickname == "" && avatarURL == "" {
|
||
nickname, avatarURL = s.fetchUserProfile(m.UserID, m.StarID)
|
||
}
|
||
messages = append(messages, &pb.ActivityMessage{
|
||
Id: m.ID,
|
||
ActivityId: m.ActivityID,
|
||
UserId: m.UserID,
|
||
StarId: m.StarID,
|
||
Nickname: nickname,
|
||
AvatarUrl: avatarURL,
|
||
Content: m.Content,
|
||
CreatedAt: m.CreatedAt,
|
||
})
|
||
}
|
||
|
||
logger.Logger.Debug("ListActivityMessages success",
|
||
zap.Int64("activity_id", req.ActivityId),
|
||
zap.Int("count", len(messages)),
|
||
zap.Int64("total", total),
|
||
)
|
||
|
||
return &pb.ListActivityMessagesResponse{
|
||
Base: &pbCommon.BaseResponse{
|
||
Code: uint32(codes.OK),
|
||
Message: "ok",
|
||
},
|
||
Messages: messages,
|
||
Page: int32(page),
|
||
PageSize: int32(pageSize),
|
||
Total: int32(total),
|
||
}, nil
|
||
}
|
||
|
||
// fetchUserProfile 拉取用户昵称头像(失败时返回空串)
|
||
func (s *activityService) fetchUserProfile(userID, starID int64) (string, string) {
|
||
profile, err := s.userRPCClient.GetFanProfile(userID, starID)
|
||
if err != nil || profile == nil {
|
||
return "", ""
|
||
}
|
||
return profile.Nickname, profile.AvatarUrl
|
||
}
|
||
|
||
// containsBannedWord 检查是否含敏感词
|
||
func containsBannedWord(content string, banned []string) bool {
|
||
lower := strings.ToLower(content)
|
||
for _, w := range banned {
|
||
if strings.Contains(lower, strings.ToLower(w)) {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|