package service import ( "context" "log" "math" "time" "github.com/topfans/backend/pkg/database" "github.com/topfans/backend/pkg/logger" "github.com/topfans/backend/services/galleryService/client" "github.com/topfans/backend/services/galleryService/repository" "go.uber.org/zap" ) // CleanupWorker 清理过期展品的Worker type CleanupWorker struct { repo repository.GalleryRepository assetClient client.AssetRPCClient userClient client.UserRPCClient taskClient client.TaskRPCClient ctx context.Context cancel context.CancelFunc } // NewCleanupWorker 创建清理Worker实例 func NewCleanupWorker(repo repository.GalleryRepository, assetClient client.AssetRPCClient, userClient client.UserRPCClient, taskClient client.TaskRPCClient) *CleanupWorker { ctx, cancel := context.WithCancel(context.Background()) return &CleanupWorker{ repo: repo, assetClient: assetClient, userClient: userClient, taskClient: taskClient, ctx: ctx, cancel: cancel, } } // Start 启动清理Worker func (w *CleanupWorker) Start() { log.Println("清理Worker已启动,每小时扫描一次过期展品") ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() // 立即执行一次清理 w.cleanup() for { select { case <-ticker.C: w.cleanup() case <-w.ctx.Done(): log.Println("清理Worker已停止") return } } } // Stop 停止清理Worker func (w *CleanupWorker) Stop() { w.cancel() } // cleanup 执行清理逻辑 func (w *CleanupWorker) cleanup() { now := time.Now().UnixMilli() // 1. 清理过期的展品展示记录 w.cleanupExpiredExhibitions(now) // 2. 清理无效的 display_status(处理手动软删除导致的不一致) w.cleanupInvalidDisplayStatus() } // cleanupExpiredExhibitions 清理过期的展品展示记录(使用 ZSET 驱动 + 数据库兜底) func (w *CleanupWorker) cleanupExpiredExhibitions(now int64) { ctx := context.Background() // 1. 先尝试从 ZSET 获取过期展品 expiredAssetIDs, err := database.GetExpiredAssets(ctx, now) if err != nil { log.Printf("从 ZSET 获取过期展品失败: %v,降级到数据库查询", err) w.cleanupExpiredExhibitionsFromDB(now) return } // 2. ZSET 有数据时处理 ZSET 中的过期展品 if len(expiredAssetIDs) > 0 { log.Printf("ZSET 发现 %d 个过期展品,开始清理", len(expiredAssetIDs)) w.cleanupAssetsFromZSET(ctx, expiredAssetIDs, now) } // 3. 兜底:检查数据库中可能遗漏的过期展品(ZSET 可能在 Redis 重启或数据丢失时漏掉) w.cleanupExpiredExhibitionsFromDB(now) } // cleanupAssetsFromZSET 从 ZSET 处理过期展品 func (w *CleanupWorker) cleanupAssetsFromZSET(ctx context.Context, expiredAssetIDs []int64, now int64) { // 批量删除过期记录 successCount := 0 failedCount := 0 for _, assetID := range expiredAssetIDs { // 从数据库查询该 asset_id 对应的有效展览 e, err := w.repo.GetExhibitionByAssetID(assetID) if err != nil || e == nil { // 展览不存在或已处理,从 ZSET 移除 database.RemoveExpiringAsset(ctx, assetID) continue } // 1. 获取点赞数用于计算收益 likeCount := 0 if w.assetClient != nil { likeCount = w.assetClient.GetAssetLikeCount(e.AssetID) } // 2. 计算展示收益 revenue := calculateExhibitionRevenue(likeCount, e.StartTime, now) logger.Logger.Info("计算展出收益", zap.Int64("exhibition_id", e.ID), zap.Int64("asset_id", e.AssetID), zap.Int("like_count", likeCount), zap.Int64("start_time", e.StartTime), zap.Int64("end_time", now), zap.Int64("revenue", revenue)) // 3. 调用 TaskService 记录收益 if w.taskClient != nil { slotOwnerUID := e.HostProfileID if ownerUID, err := w.repo.GetSlotOwnerUserID(e.SlotID); err == nil { slotOwnerUID = ownerUID } _, err := w.taskClient.OnExhibitionCompleted(context.Background(), &client.OnExhibitionCompletedRequest{ ExhibitionId: e.ID, AssetId: e.AssetID, SlotId: e.SlotID, OccupierUid: e.OccupierUID, OccupierStarId: e.OccupierStarID, SlotOwnerUid: slotOwnerUID, StartTime: e.StartTime, ExpireAt: now, CrystalAmount: revenue, LikeCount: int32(likeCount), }) if err != nil { logger.Logger.Error("调用TaskService记录收益失败", zap.Int64("exhibition_id", e.ID), zap.Error(err)) failedCount++ continue } } successCount++ // 4. 标记展品已处理 if err := w.repo.SetExhibitionProcessed(e.ID, true); err != nil { logger.Logger.Error("标记展品已处理失败", zap.Int64("exhibition_id", e.ID), zap.Error(err)) } // 5. 从 ZSET 移除 database.RemoveExpiringAsset(ctx, assetID) database.RemoveExpiringAssetFromStar(ctx, e.OccupierStarID, assetID) log.Printf("展品已到期并生成领取记录: ExhibitionID=%d, AssetID=%d, SlotID=%d, OccupierUID=%d, Revenue=%d", e.ID, e.AssetID, e.SlotID, e.OccupierUID, revenue) } log.Printf("ZSET 过期展品清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount) } // cleanupExpiredExhibitionsFromDB 兜底方案:从数据库查询过期展览 func (w *CleanupWorker) cleanupExpiredExhibitionsFromDB(now int64) { expired, err := w.repo.GetExpiredExhibitions(now) if err != nil { log.Printf("从数据库获取过期展品失败: %v", err) return } if len(expired) == 0 { log.Println("没有过期的展品需要清理") return } log.Printf("数据库发现 %d 个过期展品,开始清理", len(expired)) successCount := 0 failedCount := 0 ctx := context.Background() for _, e := range expired { likeCount := 0 if w.assetClient != nil { likeCount = w.assetClient.GetAssetLikeCount(e.AssetID) } revenue := calculateExhibitionRevenue(likeCount, e.StartTime, now) logger.Logger.Info("计算展出收益", zap.Int64("exhibition_id", e.ID), zap.Int64("asset_id", e.AssetID), zap.Int("like_count", likeCount), zap.Int64("start_time", e.StartTime), zap.Int64("end_time", now), zap.Int64("revenue", revenue)) if w.taskClient != nil { slotOwnerUID := e.HostProfileID if ownerUID, err := w.repo.GetSlotOwnerUserID(e.SlotID); err == nil { slotOwnerUID = ownerUID } _, err := w.taskClient.OnExhibitionCompleted(context.Background(), &client.OnExhibitionCompletedRequest{ ExhibitionId: e.ID, AssetId: e.AssetID, SlotId: e.SlotID, OccupierUid: e.OccupierUID, OccupierStarId: e.OccupierStarID, SlotOwnerUid: slotOwnerUID, StartTime: e.StartTime, ExpireAt: now, CrystalAmount: revenue, LikeCount: int32(likeCount), }) if err != nil { logger.Logger.Error("调用TaskService记录收益失败", zap.Int64("exhibition_id", e.ID), zap.Error(err)) failedCount++ continue } } successCount++ if err := w.repo.SetExhibitionProcessed(e.ID, true); err != nil { logger.Logger.Error("标记展品已处理失败", zap.Int64("exhibition_id", e.ID), zap.Error(err)) } // 降级方案也需要清理 ZSET database.RemoveExpiringAsset(ctx, e.AssetID) database.RemoveExpiringAssetFromStar(ctx, e.OccupierStarID, e.AssetID) log.Printf("展品已到期并生成领取记录: ExhibitionID=%d, AssetID=%d, SlotID=%d, OccupierUID=%d, Revenue=%d", e.ID, e.AssetID, e.SlotID, e.OccupierUID, revenue) } log.Printf("过期展品清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount) } // calculateExhibitionRevenue 计算单次上架收益 // 设计文档公式: // R1 = R0 × T × [100% + Buff(n)] // R0 = 5 水晶/小时 // T = 上架时长(小时) // Buff(n) 根据点赞数计算:n<5→0%, 5≤n<10→10%, 10≤n<30→20%, n≥30→30% func calculateExhibitionRevenue(likeCount int, startTime, endTime int64) int64 { R0 := int64(5) // 水晶/小时 // 计算上架时长(毫秒转小时) T := (endTime - startTime) / 3600000 if T <= 0 { T = 1 // 最少1小时 } // 计算Buff var buff int switch { case likeCount >= 30: buff = 30 case likeCount >= 10: buff = 20 case likeCount >= 5: buff = 10 default: buff = 0 } // 基础收益 baseRevenue := R0 * T // 应用Buff加成(银行家四舍五入) // R1 = R0 × T × (100% + Buff) buffedRevenue := int64(math.Round(float64(baseRevenue) * (100 + float64(buff)) / 100)) return buffedRevenue } // cleanupInvalidDisplayStatus 清理无效的 display_status // 处理手动给 exhibition 添加 deleted_at 或 exhibition 已过期但 display_status 仍为1的情况 func (w *CleanupWorker) cleanupInvalidDisplayStatus() { // 获取 display_status=1 但没有有效 exhibition 的资产ID列表 invalidAssetIDs, err := w.repo.GetAssetsWithInvalidDisplayStatus() if err != nil { log.Printf("获取无效 display_status 资产列表失败: %v", err) return } if len(invalidAssetIDs) == 0 { log.Println("没有无效的 display_status 需要清理") return } log.Printf("发现 %d 个无效 display_status,开始清理", len(invalidAssetIDs)) successCount := 0 failedCount := 0 for _, assetID := range invalidAssetIDs { if err := w.repo.UpdateAssetRegistryDisplayStatus(assetID, int32(0)); err != nil { log.Printf("重置 display_status 失败 (AssetID: %d): %v", assetID, err) failedCount++ continue } successCount++ log.Printf("已重置无效 display_status: AssetID=%d", assetID) } log.Printf("无效 display_status 清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount) } // publishEvent 发布事件(预留接口) // func (w *CleanupWorker) publishEvent(eventType string, exhibition *models.Exhibition) { // // TODO: 实现事件发布逻辑 // // 可以通过 RPC 调用 Task Service 或发送到消息队列 // }