topfans/backend/services/taskService/worker/daily_reset_worker.go
2026-04-15 14:11:07 +08:00

148 lines
4.2 KiB
Go

package worker
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/taskService/client"
"github.com/topfans/backend/services/taskService/config"
"github.com/topfans/backend/services/taskService/repository"
"go.uber.org/zap"
)
type DailyResetWorker struct {
dailyRepo repository.DailyTaskRepository
revenueRepo repository.RevenueRepository
userClient client.UserServiceClient
stopCh chan struct{}
wg sync.WaitGroup
}
func NewDailyResetWorker(
dailyRepo repository.DailyTaskRepository,
revenueRepo repository.RevenueRepository,
userClient client.UserServiceClient,
) *DailyResetWorker {
return &DailyResetWorker{
dailyRepo: dailyRepo,
revenueRepo: revenueRepo,
userClient: userClient,
stopCh: make(chan struct{}),
}
}
func (w *DailyResetWorker) Start() {
w.wg.Add(1)
go w.runLoop()
logger.Logger.Info("DailyResetWorker started")
}
func (w *DailyResetWorker) Stop() {
close(w.stopCh)
w.wg.Wait()
logger.Logger.Info("DailyResetWorker stopped")
}
func (w *DailyResetWorker) runLoop() {
defer w.wg.Done()
for {
// 计算下一个 05:00 Asia/Shanghai
now := time.Now()
loc, _ := time.LoadLocation("Asia/Shanghai")
next := time.Date(now.Year(), now.Month(), now.Day(), config.WorkerConfigData.ResetHour, config.WorkerConfigData.ResetMinute, 0, 0, loc)
if next.Before(now) {
next = next.Add(24 * time.Hour)
}
waitDuration := next.Sub(now)
logger.Logger.Info(fmt.Sprintf("DailyResetWorker: next reset at %s (in %v)", next.Format(time.RFC3339), waitDuration))
select {
case <-time.After(waitDuration):
w.doResetAndAutoClaim()
case <-w.stopCh:
logger.Logger.Info("DailyResetWorker: received stop signal, exiting")
return
}
}
}
func (w *DailyResetWorker) doResetAndAutoClaim() {
// 使用 Advisory Lock 防止多实例重复执行
lockKey := time.Now().Format("20060102")
lockID, err := strconv.ParseInt(lockKey, 10, 64)
if err != nil {
logger.Logger.Error("DailyResetWorker: failed to parse lock key", zap.String("lock_key", lockKey), zap.Error(err))
return
}
acquired := w.dailyRepo.AcquireAdvisoryLock(lockID)
if !acquired {
logger.Logger.Info("DailyResetWorker: another instance is running daily reset, skipping")
return
}
defer func() {
w.dailyRepo.ReleaseAdvisoryLock(lockID)
}()
// 1. 重置每日任务
resetCount, err := w.dailyRepo.ResetAllDailyTasks()
if err != nil {
logger.Logger.Error("DailyResetWorker: failed to reset daily tasks", zap.Error(err))
} else {
logger.Logger.Info(fmt.Sprintf("DailyResetWorker: daily tasks reset: %d records updated", resetCount))
}
// 2. 自动发放展示收益
w.autoClaimExhibitionRevenue()
}
func (w *DailyResetWorker) autoClaimExhibitionRevenue() {
batchSize := config.WorkerConfigData.RevenueBatchSize
maxRetries := config.WorkerConfigData.RevenueMaxRetries
totalClaimed := 0
for {
records, err := w.revenueRepo.ListClaimableRevenue(batchSize)
if err != nil {
logger.Logger.Error("DailyResetWorker: failed to list claimable revenue", zap.Error(err))
break
}
if len(records) == 0 {
break
}
for _, record := range records {
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
_, err := w.userClient.UpdateCrystalBalance(context.Background(), record.UserID, record.StarID, record.CrystalAmount)
if err == nil {
if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "claimed"); err != nil {
logger.Logger.Error("DailyResetWorker: failed to update status to claimed",
zap.Int64("record_id", record.ID), zap.Error(err))
}
totalClaimed++
break
}
lastErr = err
time.Sleep(100 * time.Millisecond)
}
if lastErr != nil {
if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "failed"); err != nil {
logger.Logger.Error("DailyResetWorker: failed to update status to failed",
zap.Int64("record_id", record.ID), zap.Error(err))
}
logger.Logger.Error("DailyResetWorker: failed to auto-claim revenue after retries",
zap.Int64("record_id", record.ID), zap.Error(lastErr))
}
}
}
logger.Logger.Info(fmt.Sprintf("DailyResetWorker: auto-claim completed: %d records claimed", totalClaimed))
}