topfans/backend/services/galleryService/service/cleanup_worker.go

331 lines
9.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"log"
"math"
"time"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/galleryService/client"
"github.com/topfans/backend/services/galleryService/repository"
"go.uber.org/zap"
)
// CleanupWorker 清理过期展品的Worker
type CleanupWorker struct {
repo repository.GalleryRepository
assetClient client.AssetRPCClient
userClient client.UserRPCClient
taskClient client.TaskRPCClient
ctx context.Context
cancel context.CancelFunc
}
// NewCleanupWorker 创建清理Worker实例
func NewCleanupWorker(repo repository.GalleryRepository, assetClient client.AssetRPCClient, userClient client.UserRPCClient, taskClient client.TaskRPCClient) *CleanupWorker {
ctx, cancel := context.WithCancel(context.Background())
return &CleanupWorker{
repo: repo,
assetClient: assetClient,
userClient: userClient,
taskClient: taskClient,
ctx: ctx,
cancel: cancel,
}
}
// Start 启动清理Worker
func (w *CleanupWorker) Start() {
log.Println("清理Worker已启动每小时扫描一次过期展品")
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
// 立即执行一次清理
w.cleanup()
for {
select {
case <-ticker.C:
w.cleanup()
case <-w.ctx.Done():
log.Println("清理Worker已停止")
return
}
}
}
// Stop 停止清理Worker
func (w *CleanupWorker) Stop() {
w.cancel()
}
// cleanup 执行清理逻辑
func (w *CleanupWorker) cleanup() {
now := time.Now().UnixMilli()
// 1. 清理过期的展品展示记录
w.cleanupExpiredExhibitions(now)
// 2. 清理无效的 display_status处理手动软删除导致的不一致
w.cleanupInvalidDisplayStatus()
}
// cleanupExpiredExhibitions 清理过期的展品展示记录(使用 ZSET 驱动)
func (w *CleanupWorker) cleanupExpiredExhibitions(now int64) {
ctx := context.Background()
// 使用 ZSET 获取已过期的 asset_id 列表
expiredAssetIDs, err := database.GetExpiredAssets(ctx, now)
if err != nil {
log.Printf("从 ZSET 获取过期展品失败: %v", err)
// 降级:从数据库查询(兼容未初始化 Redis 的情况)
w.cleanupExpiredExhibitionsFromDB(now)
return
}
if len(expiredAssetIDs) == 0 {
log.Println("没有过期的展品需要清理")
return
}
log.Printf("ZSET 发现 %d 个过期展品,开始清理", len(expiredAssetIDs))
// 批量删除过期记录
successCount := 0
failedCount := 0
for _, assetID := range expiredAssetIDs {
// 从数据库查询该 asset_id 对应的有效展览
e, err := w.repo.GetExhibitionByAssetID(assetID)
if err != nil || e == nil {
// 展览不存在或已处理,从 ZSET 移除
database.RemoveExpiringAsset(ctx, assetID)
continue
}
// 1. 获取点赞数用于计算收益
likeCount := 0
if w.assetClient != nil {
likeCount = w.assetClient.GetAssetLikeCount(e.AssetID)
}
// 2. 计算展示收益
revenue := calculateExhibitionRevenue(likeCount, e.StartTime, now)
logger.Logger.Info("计算展出收益",
zap.Int64("exhibition_id", e.ID),
zap.Int64("asset_id", e.AssetID),
zap.Int("like_count", likeCount),
zap.Int64("start_time", e.StartTime),
zap.Int64("end_time", now),
zap.Int64("revenue", revenue))
// 3. 调用 TaskService 记录收益
if w.taskClient != nil {
slotOwnerUID := e.HostProfileID
if ownerUID, err := w.repo.GetSlotOwnerUserID(e.SlotID); err == nil {
slotOwnerUID = ownerUID
}
_, err := w.taskClient.OnExhibitionCompleted(context.Background(), &client.OnExhibitionCompletedRequest{
ExhibitionId: e.ID,
AssetId: e.AssetID,
SlotId: e.SlotID,
OccupierUid: e.OccupierUID,
OccupierStarId: e.OccupierStarID,
SlotOwnerUid: slotOwnerUID,
StartTime: e.StartTime,
ExpireAt: now,
CrystalAmount: revenue,
LikeCount: int32(likeCount),
})
if err != nil {
logger.Logger.Error("调用TaskService记录收益失败",
zap.Int64("exhibition_id", e.ID),
zap.Error(err))
failedCount++
continue
}
}
successCount++
// 4. 标记展品已处理
if err := w.repo.SetExhibitionProcessed(e.ID, true); err != nil {
logger.Logger.Error("标记展品已处理失败",
zap.Int64("exhibition_id", e.ID),
zap.Error(err))
}
// 5. 从 ZSET 移除
database.RemoveExpiringAsset(ctx, assetID)
database.RemoveExpiringAssetFromStar(ctx, e.OccupierStarID, assetID)
log.Printf("展品已到期并生成领取记录: ExhibitionID=%d, AssetID=%d, SlotID=%d, OccupierUID=%d, Revenue=%d",
e.ID, e.AssetID, e.SlotID, e.OccupierUID, revenue)
}
log.Printf("过期展品清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount)
}
// cleanupExpiredExhibitionsFromDB 降级方案:从数据库查询过期展览
func (w *CleanupWorker) cleanupExpiredExhibitionsFromDB(now int64) {
expired, err := w.repo.GetExpiredExhibitions(now)
if err != nil {
log.Printf("从数据库获取过期展品失败: %v", err)
return
}
if len(expired) == 0 {
log.Println("没有过期的展品需要清理")
return
}
log.Printf("数据库发现 %d 个过期展品,开始清理", len(expired))
successCount := 0
failedCount := 0
ctx := context.Background()
for _, e := range expired {
likeCount := 0
if w.assetClient != nil {
likeCount = w.assetClient.GetAssetLikeCount(e.AssetID)
}
revenue := calculateExhibitionRevenue(likeCount, e.StartTime, now)
logger.Logger.Info("计算展出收益",
zap.Int64("exhibition_id", e.ID),
zap.Int64("asset_id", e.AssetID),
zap.Int("like_count", likeCount),
zap.Int64("start_time", e.StartTime),
zap.Int64("end_time", now),
zap.Int64("revenue", revenue))
if w.taskClient != nil {
slotOwnerUID := e.HostProfileID
if ownerUID, err := w.repo.GetSlotOwnerUserID(e.SlotID); err == nil {
slotOwnerUID = ownerUID
}
_, err := w.taskClient.OnExhibitionCompleted(context.Background(), &client.OnExhibitionCompletedRequest{
ExhibitionId: e.ID,
AssetId: e.AssetID,
SlotId: e.SlotID,
OccupierUid: e.OccupierUID,
OccupierStarId: e.OccupierStarID,
SlotOwnerUid: slotOwnerUID,
StartTime: e.StartTime,
ExpireAt: now,
CrystalAmount: revenue,
LikeCount: int32(likeCount),
})
if err != nil {
logger.Logger.Error("调用TaskService记录收益失败",
zap.Int64("exhibition_id", e.ID),
zap.Error(err))
failedCount++
continue
}
}
successCount++
if err := w.repo.SetExhibitionProcessed(e.ID, true); err != nil {
logger.Logger.Error("标记展品已处理失败",
zap.Int64("exhibition_id", e.ID),
zap.Error(err))
}
// 降级方案也需要清理 ZSET
database.RemoveExpiringAsset(ctx, e.AssetID)
database.RemoveExpiringAssetFromStar(ctx, e.OccupierStarID, e.AssetID)
log.Printf("展品已到期并生成领取记录: ExhibitionID=%d, AssetID=%d, SlotID=%d, OccupierUID=%d, Revenue=%d",
e.ID, e.AssetID, e.SlotID, e.OccupierUID, revenue)
}
log.Printf("过期展品清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount)
}
// calculateExhibitionRevenue 计算单次上架收益
// 设计文档公式:
// R1 = R0 × T × [100% + Buff(n)]
// R0 = 5 水晶/小时
// T = 上架时长(小时)
// Buff(n) 根据点赞数计算n<5→0%, 5≤n<10→10%, 10≤n<30→20%, n≥30→30%
func calculateExhibitionRevenue(likeCount int, startTime, endTime int64) int64 {
R0 := int64(5) // 水晶/小时
// 计算上架时长(毫秒转小时)
T := (endTime - startTime) / 3600000
if T <= 0 {
T = 1 // 最少1小时
}
// 计算Buff
var buff int
switch {
case likeCount >= 30:
buff = 30
case likeCount >= 10:
buff = 20
case likeCount >= 5:
buff = 10
default:
buff = 0
}
// 基础收益
baseRevenue := R0 * T
// 应用Buff加成银行家四舍五入
// R1 = R0 × T × (100% + Buff)
buffedRevenue := int64(math.Round(float64(baseRevenue) * (100 + float64(buff)) / 100))
return buffedRevenue
}
// cleanupInvalidDisplayStatus 清理无效的 display_status
// 处理手动给 exhibition 添加 deleted_at 或 exhibition 已过期但 display_status 仍为1的情况
func (w *CleanupWorker) cleanupInvalidDisplayStatus() {
// 获取 display_status=1 但没有有效 exhibition 的资产ID列表
invalidAssetIDs, err := w.repo.GetAssetsWithInvalidDisplayStatus()
if err != nil {
log.Printf("获取无效 display_status 资产列表失败: %v", err)
return
}
if len(invalidAssetIDs) == 0 {
log.Println("没有无效的 display_status 需要清理")
return
}
log.Printf("发现 %d 个无效 display_status开始清理", len(invalidAssetIDs))
successCount := 0
failedCount := 0
for _, assetID := range invalidAssetIDs {
if err := w.repo.UpdateAssetRegistryDisplayStatus(assetID, int32(0)); err != nil {
log.Printf("重置 display_status 失败 (AssetID: %d): %v", assetID, err)
failedCount++
continue
}
successCount++
log.Printf("已重置无效 display_status: AssetID=%d", assetID)
}
log.Printf("无效 display_status 清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount)
}
// publishEvent 发布事件(预留接口)
// func (w *CleanupWorker) publishEvent(eventType string, exhibition *models.Exhibition) {
// // TODO: 实现事件发布逻辑
// // 可以通过 RPC 调用 Task Service 或发送到消息队列
// }