topfans/backend/services/activityService/service/activity_service.go

1381 lines
40 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"unicode/utf8"
appErrors "github.com/topfans/backend/pkg/errors"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/pkg/models"
pb "github.com/topfans/backend/pkg/proto/activity"
pbCommon "github.com/topfans/backend/pkg/proto/common"
"github.com/topfans/backend/services/activityService/client"
"github.com/topfans/backend/services/activityService/config"
"github.com/topfans/backend/services/activityService/repository"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)
// rateLimitScript 原子 INCR + 首次 EXPIRE 的 Lua 脚本
// 消除 INCR 与 EXPIRE 之间的非原子竞态
var rateLimitScript = func() *redis.Script {
return redis.NewScript(`
local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
return count
`)
}()
// ActivityService 活动Service接口
type ActivityService interface {
// GetActivityList 获取活动列表
GetActivityList(ctx context.Context, req *pb.GetActivityListRequest) (*pb.GetActivityListResponse, error)
// GetActivity 获取活动详情
GetActivity(ctx context.Context, req *pb.GetProgressRequest) (*pb.Activity, error)
// GetActivityItems 获取活动道具列表
GetActivityItems(ctx context.Context, req *pb.GetProgressRequest) ([]*pb.ActivityItem, error)
// GetProgress 获取活动进度
GetProgress(ctx context.Context, req *pb.GetProgressRequest) (*pb.GetProgressResponse, error)
// PurchaseItem 购买道具
PurchaseItem(ctx context.Context, req *pb.PurchaseItemRequest) (*pb.PurchaseItemResponse, error)
// BatchPurchaseItem 批量购买道具
BatchPurchaseItem(ctx context.Context, req *pb.BatchPurchaseItemRequest) (*pb.BatchPurchaseItemResponse, error)
// GetContributionRanking 获取贡献点排名
GetContributionRanking(ctx context.Context, req *pb.ContributionRankingRequest) (*pb.ContributionRankingResponse, error)
// GetMintingActivities 获取铸造活动列表用于运营banner
GetMintingActivities(ctx context.Context, req *pb.GetMintingActivitiesRequest) (*pb.GetMintingActivitiesResponse, error)
// GetLatestContributions 获取最新贡献记录(用于实时显示)
GetLatestContributions(ctx context.Context, req *pb.GetLatestContributionsRequest) (*pb.GetLatestContributionsResponse, error)
// CreateActivityMessage 创建一条活动留言
CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error)
// ListActivityMessages 列出活动留言
ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error)
}
// activityService 活动Service实现
type activityService struct {
activityRepo repository.ActivityRepository
mintingActivityRepo repository.MintingActivityRepository
messagesRepo repository.ActivityMessagesRepository
userRPCClient client.UserRPCClient
redisClient *redis.Client
messageCfg *config.ActivityMessageConfig
}
// NewActivityService 创建活动Service实例
func NewActivityService(activityRepo repository.ActivityRepository, mintingActivityRepo repository.MintingActivityRepository, userRPCClient client.UserRPCClient, redisClient *redis.Client) ActivityService {
return &activityService{
activityRepo: activityRepo,
mintingActivityRepo: mintingActivityRepo,
messagesRepo: repository.NewActivityMessagesRepository(),
userRPCClient: userRPCClient,
redisClient: redisClient,
messageCfg: config.LoadMessageConfig(),
}
}
// comboKey 生成连击计数器 Redis Key
func (s *activityService) comboKey(userID int64, itemType string) string {
return fmt.Sprintf("combo:%d:%s", userID, itemType)
}
// incrementComboCount 增加连击计数
// 用户每次购买道具时调用3秒内同用户同道具的多次购买累加显示
func (s *activityService) incrementComboCount(ctx context.Context, userID int64, itemType string) error {
if s.redisClient == nil {
return nil
}
key := s.comboKey(userID, itemType)
if err := s.redisClient.Incr(ctx, key).Err(); err != nil {
logger.Logger.Warn("Incr combo count failed", zap.Error(err))
return err
}
if err := s.redisClient.Expire(ctx, key, 3*time.Second).Err(); err != nil {
logger.Logger.Warn("Expire combo count failed", zap.Error(err))
return err
}
return nil
}
// getComboCount 获取连击计数
func (s *activityService) getComboCount(ctx context.Context, userID int64, itemType string) int64 {
if s.redisClient == nil {
return 1
}
key := s.comboKey(userID, itemType)
count, err := s.redisClient.Get(ctx, key).Int64()
if err != nil || count == 0 {
return 1
}
return count
}
// GetActivityList 获取活动列表
func (s *activityService) GetActivityList(ctx context.Context, req *pb.GetActivityListRequest) (*pb.GetActivityListResponse, error) {
logger.Logger.Info("GetActivityList request",
zap.Int64("star_id", req.StarId),
zap.String("status", req.Status),
zap.Int32("page", req.Page),
zap.Int32("page_size", req.PageSize),
)
if req.Page <= 0 {
req.Page = 1
}
if req.PageSize <= 0 {
req.PageSize = 10
}
activities, total, err := s.activityRepo.GetActivitiesByStar(req.StarId, req.Status, int(req.Page), int(req.PageSize))
if err != nil {
logger.Logger.Error("GetActivityList failed", zap.Error(err))
return &pb.GetActivityListResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取活动列表失败: " + err.Error(),
},
}, nil
}
// 转换结果
pbActivities := make([]*pb.Activity, len(activities))
for i, activity := range activities {
pbActivities[i] = s.convertActivity(activity)
}
return &pb.GetActivityListResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
Activities: pbActivities,
Page: req.Page,
PageSize: req.PageSize,
Total: int32(total),
}, nil
}
// GetActivity 获取活动详情
func (s *activityService) GetActivity(ctx context.Context, req *pb.GetProgressRequest) (*pb.Activity, error) {
logger.Logger.Info("GetActivity request", zap.Int64("activity_id", req.ActivityId))
if req.ActivityId <= 0 {
return nil, fmt.Errorf("activity_id is required")
}
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
if err != nil {
logger.Logger.Error("GetActivity failed", zap.Error(err))
return nil, err
}
if activity == nil {
return nil, appErrors.ErrActivityNotFound
}
return s.convertActivity(activity), nil
}
// GetActivityItems 获取活动道具列表
func (s *activityService) GetActivityItems(ctx context.Context, req *pb.GetProgressRequest) ([]*pb.ActivityItem, error) {
logger.Logger.Info("GetActivityItems request", zap.Int64("activity_id", req.ActivityId))
if req.ActivityId <= 0 {
return nil, fmt.Errorf("activity_id is required")
}
items, err := s.activityRepo.GetActivityItems(req.ActivityId)
if err != nil {
logger.Logger.Error("GetActivityItems failed", zap.Error(err))
return nil, err
}
// 转换结果
pbItems := make([]*pb.ActivityItem, len(items))
for i, item := range items {
pbItems[i] = &pb.ActivityItem{
Id: item.ID,
ItemType: item.ItemType,
ItemName: item.ItemName,
IconUrl: item.IconURL,
CrystalCost: int32(item.CrystalCost),
ContributionPoints: int32(item.ContributionPoints),
}
}
return pbItems, nil
}
// GetProgress 获取活动进度
func (s *activityService) GetProgress(ctx context.Context, req *pb.GetProgressRequest) (*pb.GetProgressResponse, error) {
logger.Logger.Info("GetProgress request", zap.Int64("activity_id", req.ActivityId))
if req.ActivityId <= 0 {
return nil, fmt.Errorf("activity_id is required")
}
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
if err != nil {
logger.Logger.Error("GetProgress failed", zap.Error(err))
return nil, err
}
if activity == nil {
return nil, appErrors.ErrActivityNotFound
}
currentStage := activity.GetStage()
currentStatus := activity.GetCurrentStatus()
return &pb.GetProgressResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
ActivityId: activity.ID,
CurrentProgress: activity.CurrentProgress,
TargetProgress: activity.TargetProgress,
CurrentStage: currentStage,
EndTime: activity.EndTime,
Status: currentStatus,
}, nil
}
// PurchaseItem 购买道具
func (s *activityService) PurchaseItem(ctx context.Context, req *pb.PurchaseItemRequest) (*pb.PurchaseItemResponse, error) {
logger.Logger.Info("PurchaseItem request",
zap.Int64("activity_id", req.ActivityId),
zap.String("item_type", req.ItemType),
zap.Int32("quantity", req.Quantity),
zap.Int64("star_id", req.StarId),
)
// 参数校验
if req.ActivityId <= 0 {
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "activity_id is required",
},
}, nil
}
if req.ItemType == "" {
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "item_type is required",
},
}, nil
}
if req.Quantity <= 0 {
req.Quantity = 1
}
// 获取用户ID从context中获取这里简化处理
userID := req.UserId
// 获取活动
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
if err != nil {
logger.Logger.Error("GetActivity failed", zap.Error(err))
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取活动失败: " + err.Error(),
},
}, nil
}
if activity == nil {
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.NotFound),
Message: "活动不存在",
},
}, nil
}
// 检查活动状态
currentStatus := activity.GetCurrentStatus()
if currentStatus != "active" {
var message string
switch currentStatus {
case "expired":
message = "activity:expired"
case "pending":
message = "activity:pending"
case "completed":
message = "activity:completed"
default:
message = "activity:expired"
}
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: message,
},
}, nil
}
// 获取道具
item, err := s.activityRepo.GetActivityItemByType(req.ActivityId, req.ItemType)
if err != nil {
logger.Logger.Error("GetActivityItemByType failed", zap.Error(err))
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取道具失败: " + err.Error(),
},
}, nil
}
if item == nil {
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.NotFound),
Message: "道具不存在",
},
}, nil
}
// 计算总消费
totalCost := int64(item.CrystalCost) * int64(req.Quantity)
totalContribution := int64(item.ContributionPoints) * int64(req.Quantity)
// 通过RPC获取用户当前水晶余额
profile, err := s.userRPCClient.GetFanProfile(userID, req.StarId)
if err != nil {
logger.Logger.Error("GetFanProfile failed", zap.Error(err))
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取粉丝档案失败: " + err.Error(),
},
}, nil
}
if profile == nil {
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.NotFound),
Message: "粉丝档案不存在",
},
}, nil
}
// 检查水晶余额是否足够
if profile.CrystalBalance < totalCost {
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "水晶余额不足",
},
}, nil
}
// 通过RPC扣减水晶
newBalance, err := s.userRPCClient.UpdateCrystalBalance(userID, req.StarId, -totalCost)
if err != nil {
logger.Logger.Error("UpdateCrystalBalance failed", zap.Error(err))
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "扣减水晶失败: " + err.Error(),
},
}, nil
}
// 更新活动进度
newProgress := activity.CurrentProgress + totalContribution
if newProgress > activity.TargetProgress {
newProgress = activity.TargetProgress
}
err = s.activityRepo.UpdateActivityProgress(req.ActivityId, newProgress)
if err != nil {
logger.Logger.Error("UpdateActivityProgress failed", zap.Error(err))
}
// 创建贡献记录
now := time.Now().UnixMilli()
contribution := &models.ActivityContribution{
ActivityID: req.ActivityId,
UserID: userID,
StarID: req.StarId,
ItemID: item.ID,
ItemType: item.ItemType,
Quantity: int(req.Quantity),
CrystalSpent: totalCost,
ContributionPoints: totalContribution,
CreatedAt: now,
}
err = s.activityRepo.CreateContribution(contribution)
if err != nil {
logger.Logger.Error("CreateContribution failed", zap.Error(err))
}
// 更新 Redis 连击计数器3秒TTL
s.incrementComboCount(ctx, userID, req.ItemType)
comboCount := s.getComboCount(ctx, userID, req.ItemType)
// Redis Publish contributions_response实时推送给订阅者
if s.redisClient != nil {
nickname, avatarURL := "", ""
if profile, _ := s.userRPCClient.GetFanProfile(userID, req.StarId); profile != nil {
nickname = profile.Nickname
avatarURL = profile.AvatarUrl
}
itemName, itemIcon := "", ""
if item != nil {
itemName = item.ItemName
itemIcon = item.IconURL
}
contribMsg := &pb.ContributionRecord{
Id: contribution.ID,
UserId: userID,
Nickname: nickname,
AvatarUrl: avatarURL,
StarId: req.StarId,
ItemId: item.ID,
ItemType: req.ItemType,
ItemName: itemName,
ItemIcon: itemIcon,
Quantity: int32(req.Quantity),
ComboCount: int32(comboCount),
CreatedAt: contribution.CreatedAt,
}
payload, _ := json.Marshal(map[string]interface{}{
"activity_id": req.ActivityId,
"type": "contributions_response",
"record": contribMsg,
})
s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload)
}
// 更新用户统计
stats, _ := s.activityRepo.GetUserStats(req.ActivityId, userID, req.StarId)
if stats == nil {
stats = &models.ActivityUserStats{
ActivityID: req.ActivityId,
UserID: userID,
StarID: req.StarId,
TotalContribution: 0,
TotalCrystalSpent: 0,
TotalItems: 0,
LastContributeAt: now,
CreatedAt: now,
UpdatedAt: now,
}
}
stats.TotalContribution += totalContribution
stats.TotalCrystalSpent += totalCost
stats.TotalItems += int(req.Quantity)
stats.LastContributeAt = now
stats.UpdatedAt = now
err = s.activityRepo.UpdateUserStats(stats)
if err != nil {
logger.Logger.Error("UpdateUserStats failed", zap.Error(err))
}
logger.Logger.Info("PurchaseItem success",
zap.Int64("user_id", userID),
zap.Int64("total_cost", totalCost),
zap.Int64("total_contribution", totalContribution),
zap.Int64("new_progress", newProgress),
)
return &pb.PurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
TotalCrystalSpent: totalCost,
TotalContribution: totalContribution,
CurrentProgress: newProgress,
RemainingBalance: newBalance,
}, nil
}
// BatchPurchaseItem 批量购买道具
func (s *activityService) BatchPurchaseItem(ctx context.Context, req *pb.BatchPurchaseItemRequest) (*pb.BatchPurchaseItemResponse, error) {
logger.Logger.Info("BatchPurchaseItem request",
zap.Int64("activity_id", req.ActivityId),
zap.Int("items_count", len(req.Items)),
zap.Int64("star_id", req.StarId),
)
// 参数校验
if req.ActivityId <= 0 {
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "activity_id is required",
},
}, nil
}
if len(req.Items) == 0 {
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "items 是必填参数",
},
}, nil
}
userID := req.UserId
// 获取活动
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
if err != nil {
logger.Logger.Error("GetActivity failed", zap.Error(err))
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取活动失败: " + err.Error(),
},
}, nil
}
if activity == nil {
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.NotFound),
Message: "活动不存在",
},
}, nil
}
// 检查活动状态
currentStatus := activity.GetCurrentStatus()
if currentStatus != "active" {
var message string
switch currentStatus {
case "expired":
message = "activity:expired"
case "pending":
message = "activity:pending"
case "completed":
message = "activity:completed"
default:
message = "activity:expired"
}
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: message,
},
}, nil
}
// 通过RPC获取用户当前水晶余额只调用一次
profile, err := s.userRPCClient.GetFanProfile(userID, req.StarId)
if err != nil {
logger.Logger.Error("GetFanProfile failed", zap.Error(err))
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取粉丝档案失败: " + err.Error(),
},
}, nil
}
if profile == nil {
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.NotFound),
Message: "粉丝档案不存在",
},
}, nil
}
// 计算总消费水晶和贡献点并验证所有items预校验
var totalCost int64
var totalContribution int64
itemInfoMap := make(map[string]*models.ActivityItem)
validItems := make([]*pb.PurchaseItem, 0, len(req.Items))
var fails []*pb.PurchaseFailItem
for _, item := range req.Items {
if item.Quantity <= 0 {
fails = append(fails, &pb.PurchaseFailItem{
ItemType: item.ItemType,
Reason: "quantity must be > 0",
})
continue
}
activityItem, err := s.activityRepo.GetActivityItemByType(req.ActivityId, item.ItemType)
if err != nil || activityItem == nil {
logger.Logger.Warn("GetActivityItemByType failed or item not found",
zap.String("item_type", item.ItemType),
zap.Error(err),
)
fails = append(fails, &pb.PurchaseFailItem{
ItemType: item.ItemType,
Reason: "item not found",
})
continue
}
itemInfoMap[item.ItemType] = activityItem
totalCost += int64(activityItem.CrystalCost) * int64(item.Quantity)
totalContribution += int64(activityItem.ContributionPoints) * int64(item.Quantity)
validItems = append(validItems, item)
}
// 如果所有items都失败直接返回
if len(validItems) == 0 {
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "no valid items",
},
SuccessCount: 0,
FailCount: int32(len(fails)),
Fails: fails,
}, nil
}
// 检查水晶余额是否足够
if profile.CrystalBalance < totalCost {
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "水晶余额不足",
},
SuccessCount: 0,
FailCount: int32(len(req.Items)),
Fails: fails,
}, nil
}
// 通过RPC扣减水晶
newBalance, err := s.userRPCClient.UpdateCrystalBalance(userID, req.StarId, -totalCost)
if err != nil {
logger.Logger.Error("UpdateCrystalBalance failed", zap.Error(err))
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "扣减水晶失败: " + err.Error(),
},
SuccessCount: 0,
FailCount: int32(len(validItems)),
Fails: fails,
}, nil
}
// 更新活动进度
newProgress := activity.CurrentProgress + totalContribution
if newProgress > activity.TargetProgress {
newProgress = activity.TargetProgress
}
err = s.activityRepo.UpdateActivityProgress(req.ActivityId, newProgress)
if err != nil {
logger.Logger.Error("UpdateActivityProgress failed", zap.Error(err))
}
// 为每个成功的item创建贡献记录并更新连击计数
now := time.Now().UnixMilli()
for _, item := range validItems {
activityItem := itemInfoMap[item.ItemType]
itemCost := int64(activityItem.CrystalCost) * int64(item.Quantity)
itemContribution := int64(activityItem.ContributionPoints) * int64(item.Quantity)
contribution := &models.ActivityContribution{
ActivityID: req.ActivityId,
UserID: userID,
StarID: req.StarId,
ItemID: activityItem.ID,
ItemType: item.ItemType,
Quantity: int(item.Quantity),
CrystalSpent: itemCost,
ContributionPoints: itemContribution,
CreatedAt: now,
}
err = s.activityRepo.CreateContribution(contribution)
if err != nil {
logger.Logger.Error("CreateContribution failed", zap.Error(err))
}
// 更新 Redis 连击计数器3秒TTL
s.incrementComboCount(ctx, userID, item.ItemType)
comboCount := s.getComboCount(ctx, userID, item.ItemType)
// Redis Publish contributions_response实时推送给订阅者
if s.redisClient != nil {
nickname, avatarURL := "", ""
if profile, _ := s.userRPCClient.GetFanProfile(userID, req.StarId); profile != nil {
nickname = profile.Nickname
avatarURL = profile.AvatarUrl
}
contribMsg := &pb.ContributionRecord{
Id: contribution.ID,
UserId: userID,
Nickname: nickname,
AvatarUrl: avatarURL,
StarId: req.StarId,
ItemId: activityItem.ID,
ItemType: item.ItemType,
ItemName: activityItem.ItemName,
ItemIcon: activityItem.IconURL,
Quantity: int32(item.Quantity),
ComboCount: int32(comboCount),
CreatedAt: contribution.CreatedAt,
}
payload, _ := json.Marshal(map[string]interface{}{
"activity_id": req.ActivityId,
"type": "contributions_response",
"record": contribMsg,
})
s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload)
}
}
// 更新用户统计
stats, _ := s.activityRepo.GetUserStats(req.ActivityId, userID, req.StarId)
if stats == nil {
stats = &models.ActivityUserStats{
ActivityID: req.ActivityId,
UserID: userID,
StarID: req.StarId,
TotalContribution: 0,
TotalCrystalSpent: 0,
TotalItems: 0,
LastContributeAt: now,
CreatedAt: now,
UpdatedAt: now,
}
}
stats.TotalContribution += totalContribution
stats.TotalCrystalSpent += totalCost
stats.TotalItems += len(validItems)
stats.LastContributeAt = now
stats.UpdatedAt = now
err = s.activityRepo.UpdateUserStats(stats)
if err != nil {
logger.Logger.Error("UpdateUserStats failed", zap.Error(err))
}
logger.Logger.Info("BatchPurchaseItem success",
zap.Int64("user_id", userID),
zap.Int64("total_cost", totalCost),
zap.Int64("total_contribution", totalContribution),
zap.Int64("new_progress", newProgress),
zap.Int("success_count", len(validItems)),
zap.Int("fail_count", len(fails)),
)
return &pb.BatchPurchaseItemResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
TotalCrystalSpent: totalCost,
TotalContribution: totalContribution,
CurrentProgress: newProgress,
RemainingBalance: newBalance,
SuccessCount: int32(len(validItems)),
FailCount: int32(len(fails)),
Fails: fails,
}, nil
}
// GetContributionRanking 获取贡献点排名
func (s *activityService) GetContributionRanking(ctx context.Context, req *pb.ContributionRankingRequest) (*pb.ContributionRankingResponse, error) {
logger.Logger.Info("GetContributionRanking request",
zap.Int64("activity_id", req.ActivityId),
zap.Int64("star_id", req.StarId),
zap.Int32("page", req.Page),
zap.Int32("page_size", req.PageSize),
)
if req.ActivityId <= 0 {
return &pb.ContributionRankingResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "activity_id is required",
},
}, nil
}
if req.Page <= 0 {
req.Page = 1
}
if req.PageSize <= 0 {
req.PageSize = 10
}
// 获取排名列表
stats, total, err := s.activityRepo.GetRanking(req.ActivityId, req.StarId, int(req.Page), int(req.PageSize))
if err != nil {
logger.Logger.Error("GetRanking failed", zap.Error(err))
return &pb.ContributionRankingResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取排名失败: " + err.Error(),
},
}, nil
}
// 转换结果
items := make([]*pb.ContributionRankingItem, len(stats))
for i, stat := range stats {
// 从 fan_profiles 获取用户昵称和头像
var nickname, avatarUrl string
if req.StarId > 0 {
fanProfile, err := s.userRPCClient.GetFanProfile(stat.UserID, req.StarId)
if err == nil && fanProfile != nil {
nickname = fanProfile.Nickname
avatarUrl = fanProfile.AvatarUrl
}
}
items[i] = &pb.ContributionRankingItem{
Rank: int32(i + 1 + (int(req.Page)-1)*int(req.PageSize)),
UserId: stat.UserID,
Nickname: nickname,
AvatarUrl: avatarUrl,
TotalContribution: stat.TotalContribution,
TotalCrystalSpent: stat.TotalCrystalSpent,
}
}
// 查询当前用户的贡献排名
var myContribution *pb.MyContribution
if req.UserId > 0 && req.StarId > 0 {
// 获取当前用户的头像和昵称
var nickname, avatarUrl string
fanProfile, err := s.userRPCClient.GetFanProfile(req.UserId, req.StarId)
if err == nil && fanProfile != nil {
nickname = fanProfile.Nickname
avatarUrl = fanProfile.AvatarUrl
}
myStat, err := s.activityRepo.GetUserStats(req.ActivityId, req.UserId, req.StarId)
if err == nil && myStat != nil {
// 计算当前用户的排名
rank, err := s.activityRepo.GetUserRank(req.UserId, req.ActivityId, req.StarId)
if err == nil {
myContribution = &pb.MyContribution{
Rank: int32(rank),
TotalContribution: myStat.TotalContribution,
TotalCrystalSpent: myStat.TotalCrystalSpent,
Status: "ranked",
Nickname: nickname,
AvatarUrl: avatarUrl,
}
}
} else {
// 如果没有查到用户的贡献值返回贡献值为0的对象
myContribution = &pb.MyContribution{
Rank: 0,
TotalContribution: 0,
TotalCrystalSpent: 0,
Status: "unranked",
Nickname: nickname,
AvatarUrl: avatarUrl,
}
}
}
return &pb.ContributionRankingResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
Items: items,
MyContribution: myContribution,
Page: req.Page,
PageSize: req.PageSize,
Total: int32(total),
}, nil
}
// convertActivity 转换Activity模型到proto
func (s *activityService) convertActivity(activity *models.Activity) *pb.Activity {
items := make([]*pb.ActivityItem, len(activity.Items))
for i, item := range activity.Items {
items[i] = &pb.ActivityItem{
Id: item.ID,
ItemType: item.ItemType,
ItemName: item.ItemName,
IconUrl: item.IconURL,
CrystalCost: int32(item.CrystalCost),
ContributionPoints: int32(item.ContributionPoints),
}
}
// 解析 stage_configs 获取图片信息
coverImage, bannerImage, stageBg, stageTitle := activity.GetStageImages()
return &pb.Activity{
Id: activity.ID,
ActivityType: activity.ActivityType,
Title: activity.Title,
Theme: activity.Theme,
Description: activity.Description,
StarId: activity.StarID,
StartTime: activity.StartTime,
EndTime: activity.EndTime,
OverallEndTime: activity.OverallEndTime,
TargetProgress: activity.TargetProgress,
CurrentProgress: activity.CurrentProgress,
Status: activity.GetCurrentStatus(),
CurrentStage: activity.GetStage(),
Items: items,
CoverImage: coverImage,
BannerImage: bannerImage,
CurrentStageBackground: stageBg,
CurrentStageTitle: stageTitle,
Icon: activity.Icon,
}
}
// GetMintingActivities 获取铸造活动列表用于运营banner
func (s *activityService) GetMintingActivities(ctx context.Context, req *pb.GetMintingActivitiesRequest) (*pb.GetMintingActivitiesResponse, error) {
logger.Logger.Info("GetMintingActivities request",
zap.Int64("star_id", req.StarId),
zap.Int32("page", req.Page),
zap.Int32("page_size", req.PageSize),
)
if req.Page <= 0 {
req.Page = 1
}
if req.PageSize <= 0 {
req.PageSize = 10
}
activities, total, err := s.mintingActivityRepo.GetActiveMintingActivities(req.StarId, int(req.Page), int(req.PageSize))
if err != nil {
logger.Logger.Error("GetMintingActivities failed", zap.Error(err))
return &pb.GetMintingActivitiesResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取铸造活动列表失败: " + err.Error(),
},
}, nil
}
// 转换结果
pbActivities := make([]*pb.MintingActivity, len(activities))
for i, activity := range activities {
pbActivities[i] = &pb.MintingActivity{
Id: activity.ID,
Title: activity.Title,
Description: activity.Description,
CoverImage: activity.CoverImage,
StarId: activity.StarID,
Route: activity.Route,
Params: activity.Params,
IsActive: activity.IsActive,
CreatedAt: activity.CreatedAt,
UpdatedAt: activity.UpdatedAt,
}
}
return &pb.GetMintingActivitiesResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
Activities: pbActivities,
Page: req.Page,
PageSize: req.PageSize,
Total: int32(total),
}, nil
}
// GetLatestContributions 获取最新贡献记录(用于实时显示)
func (s *activityService) GetLatestContributions(ctx context.Context, req *pb.GetLatestContributionsRequest) (*pb.GetLatestContributionsResponse, error) {
logger.Logger.Info("GetLatestContributions request",
zap.Int64("activity_id", req.ActivityId),
zap.Int64("since_timestamp", req.SinceTimestamp),
zap.Int64("since_id", req.SinceId),
zap.Int32("limit", req.Limit),
)
if req.ActivityId <= 0 {
return &pb.GetLatestContributionsResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.InvalidArgument),
Message: "activity_id is required",
},
}, nil
}
limit := int(req.Limit)
if limit <= 0 {
limit = 5
}
if limit > 20 {
limit = 20
}
// 获取贡献记录
contributions, err := s.activityRepo.GetLatestContributions(req.ActivityId, req.SinceTimestamp, req.SinceId, limit)
if err != nil {
logger.Logger.Error("GetLatestContributions failed", zap.Error(err))
return &pb.GetLatestContributionsResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.Internal),
Message: "获取贡献记录失败: " + err.Error(),
},
}, nil
}
// 转换结果
records := make([]*pb.ContributionRecord, len(contributions))
for i, c := range contributions {
// 获取用户昵称和头像
nickname, avatarUrl := "", ""
fanProfile, err := s.userRPCClient.GetFanProfile(c.UserID, c.StarID)
if err == nil && fanProfile != nil {
nickname = fanProfile.Nickname
avatarUrl = fanProfile.AvatarUrl
}
// 获取道具信息
itemName, itemIcon := "", ""
items, err := s.activityRepo.GetActivityItems(req.ActivityId)
if err == nil {
for _, item := range items {
if item.ID == c.ItemID {
itemName = item.ItemName
itemIcon = item.IconURL
break
}
}
}
// 获取连击计数
comboCount := int32(s.getComboCount(ctx, c.UserID, c.ItemType))
records[i] = &pb.ContributionRecord{
Id: c.ID,
UserId: c.UserID,
Nickname: nickname,
AvatarUrl: avatarUrl,
StarId: c.StarID,
ItemId: c.ItemID,
ItemType: c.ItemType,
ItemName: itemName,
ItemIcon: itemIcon,
Quantity: int32(c.Quantity),
ComboCount: comboCount,
CreatedAt: c.CreatedAt,
}
}
return &pb.GetLatestContributionsResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
Records: records,
}, nil
}
// CreateActivityMessage 创建一条活动留言(含频控/累计上限/敏感词校验 + Redis Publish
func (s *activityService) CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) {
logger.Logger.Info("CreateActivityMessage request",
zap.Int64("activity_id", req.ActivityId),
zap.Int64("user_id", req.UserId),
zap.Int64("star_id", req.StarId),
)
// 1. 入参校验
// 业务校验错误统一返回 (resp, nil): 把 gRPC code 放进 BaseResponse,
// 让 controller 通过 ErrorWithCode 走 HTTP 200 + body.code 路径;
// 仅基础设施错误(DB/Redis/RPC)用 (nil, err) 让 controller 走 HTTP 500。
content := strings.TrimSpace(req.Content)
if content == "" {
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentEmpty)),
Message: appErrors.ErrActivityMessageContentEmpty.Error(),
},
}, nil
}
if utf8.RuneCountInString(content) > 500 {
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentTooLong)),
Message: appErrors.ErrActivityMessageContentTooLong.Error(),
},
}, nil
}
// 2. 活动存在性 + 状态
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
if err != nil {
logger.Logger.Error("GetActivityByID failed", zap.Error(err))
return nil, err
}
if activity == nil {
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityNotFound)),
Message: appErrors.ErrActivityNotFound.Error(),
},
}, nil
}
// 使用 GetCurrentStatus() 计算动态状态(基于 StartTime/EndTime/CurrentProgress),
// 而不是 activity.Status(DB 列,创建后默认值 "pending" 不会被自动更新为 "active")
if activity.GetCurrentStatus() != "active" {
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageActivityInactive)),
Message: appErrors.ErrActivityMessageActivityInactive.Error(),
},
}, nil
}
// 3. 频控Redis Lua 原子化INCR + 首次设置 EXPIRE 60s
if s.redisClient != nil {
rateKey := fmt.Sprintf("msg:rate:%d:%d", req.ActivityId, req.UserId)
var count int64
if rateLimitScript != nil {
raw, err := rateLimitScript.Run(ctx, s.redisClient, []string{rateKey}, 60).Result()
if err == nil {
if v, ok := raw.(int64); ok {
count = v
}
}
} else {
// 降级:脚本未初始化时退回 INCR + EXPIRE非原子但能跑
count, err = s.redisClient.Incr(ctx, rateKey).Result()
if err == nil && count == 1 {
s.redisClient.Expire(ctx, rateKey, 60*time.Second)
}
}
if count > s.messageCfg.MessageRateLimitPerMin {
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageTooFrequent)),
Message: appErrors.ErrActivityMessageTooFrequent.Error(),
},
}, nil
}
}
// 4. 累计上限
total, err := s.messagesRepo.CountByUserActivity(req.ActivityId, req.UserId)
if err != nil {
logger.Logger.Error("CountByUserActivity failed", zap.Error(err))
return nil, err
}
if total >= s.messageCfg.MessageLimitPerActivity {
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageLimitReached)),
Message: appErrors.ErrActivityMessageLimitReached.Error(),
},
}, nil
}
// 5. 敏感词校验(首版本地词表)
if containsBannedWord(content, s.messageCfg.BannedWords) {
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityMessageContentInvalid)),
Message: appErrors.ErrActivityMessageContentInvalid.Error(),
},
}, nil
}
// 6. 写入
now := time.Now().UnixMilli()
msg := &models.ActivityMessage{
ActivityID: req.ActivityId,
UserID: req.UserId,
StarID: req.StarId,
Nickname: "",
AvatarURL: "",
Content: content,
Status: 0,
CreatedAt: now,
UpdatedAt: now,
}
msgID, err := s.messagesRepo.Insert(msg)
if err != nil {
logger.Logger.Error("Insert message failed", zap.Error(err))
return nil, err
}
// 7. 回查昵称头像
nickname, avatarURL := "", ""
if profile, _ := s.userRPCClient.GetFanProfile(req.UserId, req.StarId); profile != nil {
nickname = profile.Nickname
avatarURL = profile.AvatarUrl
}
// 写回 nickname/avatar如果首次 RPC 拿到)
if (nickname != "" || avatarURL != "") && msgID > 0 {
if err := s.messagesRepo.UpdateProfile(msgID, nickname, avatarURL); err != nil {
logger.Logger.Warn("UpdateProfile failed", zap.Error(err), zap.Int64("msg_id", msgID))
}
}
pbMsg := &pb.ActivityMessage{
Id: msgID,
ActivityId: req.ActivityId,
UserId: req.UserId,
StarId: req.StarId,
Nickname: nickname,
AvatarUrl: avatarURL,
Content: content,
CreatedAt: now,
}
// 8. Redis Publish不论是否有人订阅都发
if s.redisClient != nil {
channel := fmt.Sprintf("act:%d:messages", req.ActivityId)
payload, _ := json.Marshal(map[string]interface{}{
"activity_id": req.ActivityId,
"type": "messages_response",
"message": pbMsg,
})
s.redisClient.Publish(ctx, channel, payload)
}
logger.Logger.Info("CreateActivityMessage success",
zap.Int64("msg_id", msgID),
zap.Int64("user_id", req.UserId),
)
return &pb.CreateActivityMessageResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
Message: pbMsg,
}, nil
}
// ListActivityMessages 列出活动留言
func (s *activityService) ListActivityMessages(ctx context.Context, req *pb.ListActivityMessagesRequest) (*pb.ListActivityMessagesResponse, error) {
logger.Logger.Info("ListActivityMessages request",
zap.Int64("activity_id", req.ActivityId),
zap.Int32("page", req.Page),
zap.Int32("page_size", req.PageSize),
)
page := int(req.Page)
pageSize := int(req.PageSize)
if page <= 0 {
page = 1
}
if pageSize <= 0 {
pageSize = 20
}
if pageSize > 50 {
pageSize = 50
}
// 校验活动存在
activity, err := s.activityRepo.GetActivityByID(req.ActivityId)
if err != nil {
logger.Logger.Error("GetActivityByID failed", zap.Error(err))
return nil, err
}
if activity == nil {
return &pb.ListActivityMessagesResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(appErrors.ToGRPCCode(appErrors.ErrActivityNotFound)),
Message: appErrors.ErrActivityNotFound.Error(),
},
}, appErrors.ErrActivityNotFound
}
rows, total, err := s.messagesRepo.ListByActivity(req.ActivityId, page, pageSize)
if err != nil {
logger.Logger.Error("ListByActivity failed", zap.Error(err))
return nil, err
}
messages := make([]*pb.ActivityMessage, 0, len(rows))
for _, m := range rows {
nickname := m.Nickname
avatarURL := m.AvatarURL
// 若 DB 中 nickname 为空(早期数据),回查 user profile
if nickname == "" && avatarURL == "" {
nickname, avatarURL = s.fetchUserProfile(m.UserID, m.StarID)
}
messages = append(messages, &pb.ActivityMessage{
Id: m.ID,
ActivityId: m.ActivityID,
UserId: m.UserID,
StarId: m.StarID,
Nickname: nickname,
AvatarUrl: avatarURL,
Content: m.Content,
CreatedAt: m.CreatedAt,
})
}
logger.Logger.Debug("ListActivityMessages success",
zap.Int64("activity_id", req.ActivityId),
zap.Int("count", len(messages)),
zap.Int64("total", total),
)
return &pb.ListActivityMessagesResponse{
Base: &pbCommon.BaseResponse{
Code: uint32(codes.OK),
Message: "ok",
},
Messages: messages,
Page: int32(page),
PageSize: int32(pageSize),
Total: int32(total),
}, nil
}
// fetchUserProfile 拉取用户昵称头像(失败时返回空串)
func (s *activityService) fetchUserProfile(userID, starID int64) (string, string) {
profile, err := s.userRPCClient.GetFanProfile(userID, starID)
if err != nil || profile == nil {
return "", ""
}
return profile.Nickname, profile.AvatarUrl
}
// containsBannedWord 检查是否含敏感词
func containsBannedWord(content string, banned []string) bool {
lower := strings.ToLower(content)
for _, w := range banned {
if strings.Contains(lower, strings.ToLower(w)) {
return true
}
}
return false
}