package worker import ( "context" "fmt" "strconv" "sync" "time" "github.com/topfans/backend/pkg/logger" "github.com/topfans/backend/services/taskService/client" "github.com/topfans/backend/services/taskService/config" "github.com/topfans/backend/services/taskService/repository" "go.uber.org/zap" ) type DailyResetWorker struct { dailyRepo repository.DailyTaskRepository revenueRepo repository.RevenueRepository userClient client.UserServiceClient stopCh chan struct{} wg sync.WaitGroup } func NewDailyResetWorker( dailyRepo repository.DailyTaskRepository, revenueRepo repository.RevenueRepository, userClient client.UserServiceClient, ) *DailyResetWorker { return &DailyResetWorker{ dailyRepo: dailyRepo, revenueRepo: revenueRepo, userClient: userClient, stopCh: make(chan struct{}), } } func (w *DailyResetWorker) Start() { w.wg.Add(1) go w.runLoop() logger.Logger.Info("DailyResetWorker started") } func (w *DailyResetWorker) Stop() { close(w.stopCh) w.wg.Wait() logger.Logger.Info("DailyResetWorker stopped") } func (w *DailyResetWorker) runLoop() { defer w.wg.Done() for { // 计算下一个 05:00 Asia/Shanghai now := time.Now() loc, _ := time.LoadLocation("Asia/Shanghai") next := time.Date(now.Year(), now.Month(), now.Day(), config.WorkerConfigData.ResetHour, config.WorkerConfigData.ResetMinute, 0, 0, loc) if next.Before(now) { next = next.Add(24 * time.Hour) } waitDuration := next.Sub(now) logger.Logger.Info(fmt.Sprintf("DailyResetWorker: next reset at %s (in %v)", next.Format(time.RFC3339), waitDuration)) select { case <-time.After(waitDuration): w.doResetAndAutoClaim() case <-w.stopCh: logger.Logger.Info("DailyResetWorker: received stop signal, exiting") return } } } func (w *DailyResetWorker) doResetAndAutoClaim() { // 使用 Advisory Lock 防止多实例重复执行 lockKey := time.Now().Format("20060102") lockID, err := strconv.ParseInt(lockKey, 10, 64) if err != nil { logger.Logger.Error("DailyResetWorker: failed to parse lock key", zap.String("lock_key", lockKey), zap.Error(err)) return } acquired := w.dailyRepo.AcquireAdvisoryLock(lockID) if !acquired { logger.Logger.Info("DailyResetWorker: another instance is running daily reset, skipping") return } defer func() { w.dailyRepo.ReleaseAdvisoryLock(lockID) }() // 1. 重置每日任务 resetCount, err := w.dailyRepo.ResetAllDailyTasks() if err != nil { logger.Logger.Error("DailyResetWorker: failed to reset daily tasks", zap.Error(err)) } else { logger.Logger.Info(fmt.Sprintf("DailyResetWorker: daily tasks reset: %d records updated", resetCount)) } // 2. 自动发放展示收益 w.autoClaimExhibitionRevenue() } func (w *DailyResetWorker) autoClaimExhibitionRevenue() { batchSize := config.WorkerConfigData.RevenueBatchSize maxRetries := config.WorkerConfigData.RevenueMaxRetries totalClaimed := 0 for { records, err := w.revenueRepo.ListClaimableRevenue(batchSize) if err != nil { logger.Logger.Error("DailyResetWorker: failed to list claimable revenue", zap.Error(err)) break } if len(records) == 0 { break } for _, record := range records { var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { _, err := w.userClient.UpdateCrystalBalance(context.Background(), record.UserID, record.StarID, record.CrystalAmount) if err == nil { if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "claimed"); err != nil { logger.Logger.Error("DailyResetWorker: failed to update status to claimed", zap.Int64("record_id", record.ID), zap.Error(err)) } totalClaimed++ break } lastErr = err time.Sleep(100 * time.Millisecond) } if lastErr != nil { if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "failed"); err != nil { logger.Logger.Error("DailyResetWorker: failed to update status to failed", zap.Int64("record_id", record.ID), zap.Error(err)) } logger.Logger.Error("DailyResetWorker: failed to auto-claim revenue after retries", zap.Int64("record_id", record.ID), zap.Error(lastErr)) } } } logger.Logger.Info(fmt.Sprintf("DailyResetWorker: auto-claim completed: %d records claimed", totalClaimed)) }