148 lines
4.2 KiB
Go
148 lines
4.2 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/topfans/backend/pkg/logger"
|
|
"github.com/topfans/backend/services/taskService/client"
|
|
"github.com/topfans/backend/services/taskService/config"
|
|
"github.com/topfans/backend/services/taskService/repository"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type DailyResetWorker struct {
|
|
dailyRepo repository.DailyTaskRepository
|
|
revenueRepo repository.RevenueRepository
|
|
userClient client.UserServiceClient
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewDailyResetWorker(
|
|
dailyRepo repository.DailyTaskRepository,
|
|
revenueRepo repository.RevenueRepository,
|
|
userClient client.UserServiceClient,
|
|
) *DailyResetWorker {
|
|
return &DailyResetWorker{
|
|
dailyRepo: dailyRepo,
|
|
revenueRepo: revenueRepo,
|
|
userClient: userClient,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (w *DailyResetWorker) Start() {
|
|
w.wg.Add(1)
|
|
go w.runLoop()
|
|
logger.Logger.Info("DailyResetWorker started")
|
|
}
|
|
|
|
func (w *DailyResetWorker) Stop() {
|
|
close(w.stopCh)
|
|
w.wg.Wait()
|
|
logger.Logger.Info("DailyResetWorker stopped")
|
|
}
|
|
|
|
func (w *DailyResetWorker) runLoop() {
|
|
defer w.wg.Done()
|
|
|
|
for {
|
|
// 计算下一个 05:00 Asia/Shanghai
|
|
now := time.Now()
|
|
loc, _ := time.LoadLocation("Asia/Shanghai")
|
|
next := time.Date(now.Year(), now.Month(), now.Day(), config.WorkerConfigData.ResetHour, config.WorkerConfigData.ResetMinute, 0, 0, loc)
|
|
if next.Before(now) {
|
|
next = next.Add(24 * time.Hour)
|
|
}
|
|
waitDuration := next.Sub(now)
|
|
|
|
logger.Logger.Info(fmt.Sprintf("DailyResetWorker: next reset at %s (in %v)", next.Format(time.RFC3339), waitDuration))
|
|
|
|
select {
|
|
case <-time.After(waitDuration):
|
|
w.doResetAndAutoClaim()
|
|
case <-w.stopCh:
|
|
logger.Logger.Info("DailyResetWorker: received stop signal, exiting")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *DailyResetWorker) doResetAndAutoClaim() {
|
|
// 使用 Advisory Lock 防止多实例重复执行
|
|
lockKey := time.Now().Format("20060102")
|
|
lockID, err := strconv.ParseInt(lockKey, 10, 64)
|
|
if err != nil {
|
|
logger.Logger.Error("DailyResetWorker: failed to parse lock key", zap.String("lock_key", lockKey), zap.Error(err))
|
|
return
|
|
}
|
|
|
|
acquired := w.dailyRepo.AcquireAdvisoryLock(lockID)
|
|
if !acquired {
|
|
logger.Logger.Info("DailyResetWorker: another instance is running daily reset, skipping")
|
|
return
|
|
}
|
|
defer func() {
|
|
w.dailyRepo.ReleaseAdvisoryLock(lockID)
|
|
}()
|
|
|
|
// 1. 重置每日任务
|
|
resetCount, err := w.dailyRepo.ResetAllDailyTasks()
|
|
if err != nil {
|
|
logger.Logger.Error("DailyResetWorker: failed to reset daily tasks", zap.Error(err))
|
|
} else {
|
|
logger.Logger.Info(fmt.Sprintf("DailyResetWorker: daily tasks reset: %d records updated", resetCount))
|
|
}
|
|
|
|
// 2. 自动发放展示收益
|
|
w.autoClaimExhibitionRevenue()
|
|
}
|
|
|
|
func (w *DailyResetWorker) autoClaimExhibitionRevenue() {
|
|
batchSize := config.WorkerConfigData.RevenueBatchSize
|
|
maxRetries := config.WorkerConfigData.RevenueMaxRetries
|
|
totalClaimed := 0
|
|
|
|
for {
|
|
records, err := w.revenueRepo.ListClaimableRevenue(batchSize)
|
|
if err != nil {
|
|
logger.Logger.Error("DailyResetWorker: failed to list claimable revenue", zap.Error(err))
|
|
break
|
|
}
|
|
if len(records) == 0 {
|
|
break
|
|
}
|
|
|
|
for _, record := range records {
|
|
var lastErr error
|
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
|
_, err := w.userClient.UpdateCrystalBalance(context.Background(), record.UserID, record.StarID, record.CrystalAmount)
|
|
if err == nil {
|
|
if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "claimed"); err != nil {
|
|
logger.Logger.Error("DailyResetWorker: failed to update status to claimed",
|
|
zap.Int64("record_id", record.ID), zap.Error(err))
|
|
}
|
|
totalClaimed++
|
|
break
|
|
}
|
|
lastErr = err
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
if lastErr != nil {
|
|
if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "failed"); err != nil {
|
|
logger.Logger.Error("DailyResetWorker: failed to update status to failed",
|
|
zap.Int64("record_id", record.ID), zap.Error(err))
|
|
}
|
|
logger.Logger.Error("DailyResetWorker: failed to auto-claim revenue after retries",
|
|
zap.Int64("record_id", record.ID), zap.Error(lastErr))
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.Logger.Info(fmt.Sprintf("DailyResetWorker: auto-claim completed: %d records claimed", totalClaimed))
|
|
}
|