topfans/backend/services/galleryService/service/cleanup_worker.go
2026-05-16 02:42:32 +08:00

251 lines
7.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"log"
"math"
"time"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/galleryService/client"
"github.com/topfans/backend/services/galleryService/repository"
"go.uber.org/zap"
)
// CleanupWorker 清理过期展品的Worker
type CleanupWorker struct {
repo repository.GalleryRepository
assetClient client.AssetRPCClient
userClient client.UserRPCClient
taskClient client.TaskRPCClient
ctx context.Context
cancel context.CancelFunc
}
// NewCleanupWorker 创建清理Worker实例
func NewCleanupWorker(repo repository.GalleryRepository, assetClient client.AssetRPCClient, userClient client.UserRPCClient, taskClient client.TaskRPCClient) *CleanupWorker {
ctx, cancel := context.WithCancel(context.Background())
return &CleanupWorker{
repo: repo,
assetClient: assetClient,
userClient: userClient,
taskClient: taskClient,
ctx: ctx,
cancel: cancel,
}
}
// Start 启动清理Worker
func (w *CleanupWorker) Start() {
log.Println("清理Worker已启动每分钟扫描一次过期展品")
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
// 立即执行一次清理
w.cleanup()
for {
select {
case <-ticker.C:
w.cleanup()
case <-w.ctx.Done():
log.Println("清理Worker已停止")
return
}
}
}
// Stop 停止清理Worker
func (w *CleanupWorker) Stop() {
w.cancel()
}
// cleanup 执行清理逻辑
func (w *CleanupWorker) cleanup() {
now := time.Now().UnixMilli()
// 1. 清理过期的展品展示记录
w.cleanupExpiredExhibitions(now)
// 2. 清理无效的 display_status处理手动软删除导致的不一致
w.cleanupInvalidDisplayStatus()
}
// cleanupExpiredExhibitions 清理过期的展品展示记录
func (w *CleanupWorker) cleanupExpiredExhibitions(now int64) {
// 获取过期的展品展示记录
expired, err := w.repo.GetExpiredExhibitions(now)
if err != nil {
log.Printf("获取过期展品失败: %v", err)
return
}
if len(expired) == 0 {
log.Println("没有过期的展品需要清理")
return
}
log.Printf("发现 %d 个过期展品,开始清理", len(expired))
// 批量删除过期记录
successCount := 0
failedCount := 0
for _, e := range expired {
// 1. 获取点赞数用于计算收益
likeCount := 0
if w.assetClient != nil {
likeCount = w.assetClient.GetAssetLikeCount(e.AssetID)
}
// 2. 计算展示收益(使用 Buff 公式)
// R1 = R0 × T × [100% + Buff(n)]
// R0 = 5 水晶/小时
// Buff(n): n<5→0%, 5≤n<10→10%, 10≤n<30→20%, n≥30→30%
revenue := calculateExhibitionRevenue(likeCount, e.StartTime, now)
logger.Logger.Info("计算展出收益",
zap.Int64("exhibition_id", e.ID),
zap.Int64("asset_id", e.AssetID),
zap.Int("like_count", likeCount),
zap.Int64("start_time", e.StartTime),
zap.Int64("end_time", now),
zap.Int64("revenue", revenue))
// 3. 调用 TaskService 记录收益
// 注意:仅当展位所有者和占位者不同时才创建收益记录
if w.taskClient != nil {
// 获取展位所有者的实际用户ID而非profile_id
slotOwnerUID := e.HostProfileID // 默认使用HostProfileID
if ownerUID, err := w.repo.GetSlotOwnerUserID(e.SlotID); err == nil {
slotOwnerUID = ownerUID
}
_, err := w.taskClient.OnExhibitionCompleted(context.Background(), &client.OnExhibitionCompletedRequest{
ExhibitionId: e.ID,
AssetId: e.AssetID,
SlotId: e.SlotID,
OccupierUid: e.OccupierUID,
OccupierStarId: e.OccupierStarID,
SlotOwnerUid: slotOwnerUID,
StartTime: e.StartTime,
ExpireAt: now,
CrystalAmount: revenue,
})
if err != nil {
logger.Logger.Error("调用TaskService记录收益失败",
zap.Int64("exhibition_id", e.ID),
zap.Error(err))
// 不阻断主流程
}
}
// 4. 不再删除展品 - 用户领取收益时才会下架
// 此处只更新展品状态为过期(可选),展品记录保留供用户领取
successCount++
// 4.5 更新展览过期时间为历史值,防止重复处理(领取收益时才会真正下架)
if err := w.repo.UpdateExhibitionExpireAt(e.ID, now-3600000); err != nil {
logger.Logger.Error("更新展览过期时间失败",
zap.Int64("exhibition_id", e.ID),
zap.Error(err))
}
log.Printf("展品已到期并生成领取记录: ExhibitionID=%d, AssetID=%d, SlotID=%d, OccupierUID=%d, Revenue=%d",
e.ID, e.AssetID, e.SlotID, e.OccupierUID, revenue)
// 5. 清除该资产的点赞记录(不阻断主流程),允许用户在下次展出时再次点赞
assetID := e.AssetID
go func() {
if w.assetClient != nil {
if err := w.assetClient.ClearAssetLikeRecords(assetID); err != nil {
logger.Logger.Error("清除过期展品点赞记录失败",
zap.Int64("asset_id", assetID),
zap.Error(err))
}
}
}()
}
log.Printf("过期展品清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount)
}
// calculateExhibitionRevenue 计算单次上架收益
// 设计文档公式:
// R1 = R0 × T × [100% + Buff(n)]
// R0 = 5 水晶/小时
// T = 上架时长(小时)
// Buff(n) 根据点赞数计算n<5→0%, 5≤n<10→10%, 10≤n<30→20%, n≥30→30%
func calculateExhibitionRevenue(likeCount int, startTime, endTime int64) int64 {
R0 := int64(5) // 水晶/小时
// 计算上架时长(毫秒转小时)
T := (endTime - startTime) / 3600000
if T <= 0 {
T = 1 // 最少1小时
}
// 计算Buff
var buff int
switch {
case likeCount >= 30:
buff = 30
case likeCount >= 10:
buff = 20
case likeCount >= 5:
buff = 10
default:
buff = 0
}
// 基础收益
baseRevenue := R0 * T
// 应用Buff加成银行家四舍五入
// R1 = R0 × T × (100% + Buff)
buffedRevenue := int64(math.Round(float64(baseRevenue) * (100 + float64(buff)) / 100))
return buffedRevenue
}
// cleanupInvalidDisplayStatus 清理无效的 display_status
// 处理手动给 exhibition 添加 deleted_at 或 exhibition 已过期但 display_status 仍为1的情况
func (w *CleanupWorker) cleanupInvalidDisplayStatus() {
// 获取 display_status=1 但没有有效 exhibition 的资产ID列表
invalidAssetIDs, err := w.repo.GetAssetsWithInvalidDisplayStatus()
if err != nil {
log.Printf("获取无效 display_status 资产列表失败: %v", err)
return
}
if len(invalidAssetIDs) == 0 {
log.Println("没有无效的 display_status 需要清理")
return
}
log.Printf("发现 %d 个无效 display_status开始清理", len(invalidAssetIDs))
successCount := 0
failedCount := 0
for _, assetID := range invalidAssetIDs {
if err := w.repo.UpdateAssetRegistryDisplayStatus(assetID, int32(0)); err != nil {
log.Printf("重置 display_status 失败 (AssetID: %d): %v", assetID, err)
failedCount++
continue
}
successCount++
log.Printf("已重置无效 display_status: AssetID=%d", assetID)
}
log.Printf("无效 display_status 清理完成: 成功 %d 个, 失败 %d 个", successCount, failedCount)
}
// publishEvent 发布事件(预留接口)
// func (w *CleanupWorker) publishEvent(eventType string, exhibition *models.Exhibition) {
// // TODO: 实现事件发布逻辑
// // 可以通过 RPC 调用 Task Service 或发送到消息队列
// }