package worker import ( "context" "database/sql" "fmt" "sync" "time" "go.uber.org/zap" "github.com/topfans/backend/pkg/logger" "github.com/topfans/backend/services/statisticService/metrics" ) // mvList 4 个物化视图 var mvList = []string{ "mv_daily_user_income", "mv_daily_exhibition_revenue", "mv_daily_like_income", "mv_asset_level_distribution", } // Materializer 物化视图刷新 worker // - 每个 MV 独立 goroutine + ticker,错开 30s 启动 // - pg_try_advisory_lock 防多实例重复刷新 // - 每次刷新写 refresh_log type Materializer struct { db *sql.DB schema string mu sync.Mutex running bool stop chan struct{} } // NewMaterializer 构造 func NewMaterializer(db *sql.DB, schema string) *Materializer { return &Materializer{db: db, schema: schema, stop: make(chan struct{})} } // RefreshOne 刷新单个 MV(pg_try_advisory_lock 防多实例) // 返回 error(nil = 成功或锁被其他实例抢走) func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error { // 抢锁(234567 区别于 weekly user income 的 123456) var got bool if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil { return err } if !got { return nil // 锁被其他实例抢走,本轮跳过 } defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)") // 记录开始 t0 := time.Now() var mvID int if err := m.db.QueryRowContext(ctx, fmt.Sprintf(`INSERT INTO %s.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`, m.schema), mvName).Scan(&mvID); err != nil { return err } // 执行 REFRESH _, err := m.db.ExecContext(ctx, fmt.Sprintf("REFRESH MATERIALIZED VIEW CONCURRENTLY %s.%s", m.schema, mvName)) if err != nil { _, _ = m.db.ExecContext(ctx, fmt.Sprintf(`UPDATE %s.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, m.schema), err.Error(), mvID) metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc() return err } _, _ = m.db.ExecContext(ctx, fmt.Sprintf(`UPDATE %s.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, m.schema), mvID) metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc() metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds()) return nil } // Start 启动 worker(每个 MV 一个 goroutine + ticker) func (m *Materializer) Start(ctx context.Context, interval time.Duration) { m.mu.Lock() m.running = true m.mu.Unlock() metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1) defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0) var wg sync.WaitGroup for i, mv := range mvList { wg.Add(1) go func(idx int, mvName string) { defer wg.Done() // 错开启动(30s × index)避免同时刷新 select { case <-m.stop: return case <-time.After(time.Duration(idx*30) * time.Second): } ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-m.stop: return case <-ticker.C: if err := m.RefreshOne(ctx, mvName); err != nil { logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err)) } } } }(i, mv) } <-m.stop wg.Wait() } // Stop 停止 worker func (m *Materializer) Stop() { m.mu.Lock() defer m.mu.Unlock() if m.running { close(m.stop) m.running = false } }