topfans/backend/services/statisticService/worker/materializer.go
zerosaturation bed8f8e578 feat(statistic): T4-T8 event collection framework (Event + Sink + Repo + Service + Workers)
- Event model + ToJSON
- EventSink interface + ChannelEventSink (non-blocking Submit)
- event_repo: batch INSERT ON CONFLICT DO NOTHING dedup
- event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill
- event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up
- metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock)
- partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup)
- 22 unit + integration tests (model/repo/service/sink/worker)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:20:53 +08:00

130 lines
3.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package worker
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/metrics"
)
// mvList 4 个物化视图
var mvList = []string{
"mv_daily_user_income",
"mv_daily_exhibition_revenue",
"mv_daily_like_income",
"mv_asset_level_distribution",
}
// Materializer 物化视图刷新 worker
// - 每个 MV 独立 goroutine + ticker错开 30s 启动
// - pg_try_advisory_lock 防多实例重复刷新
// - 每次刷新写 refresh_log
type Materializer struct {
db *sql.DB
schema string
mu sync.Mutex
running bool
stop chan struct{}
}
// NewMaterializer 构造
func NewMaterializer(db *sql.DB, schema string) *Materializer {
return &Materializer{db: db, schema: schema, stop: make(chan struct{})}
}
// RefreshOne 刷新单个 MVpg_try_advisory_lock 防多实例)
// 返回 errornil = 成功或锁被其他实例抢走)
func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error {
// 抢锁234567 区别于 weekly user income 的 123456
var got bool
if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil {
return err
}
if !got {
return nil // 锁被其他实例抢走,本轮跳过
}
defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)")
// 记录开始
t0 := time.Now()
var mvID int
if err := m.db.QueryRowContext(ctx,
fmt.Sprintf(`INSERT INTO %s.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`, m.schema),
mvName).Scan(&mvID); err != nil {
return err
}
// 执行 REFRESH
_, err := m.db.ExecContext(ctx,
fmt.Sprintf("REFRESH MATERIALIZED VIEW CONCURRENTLY %s.%s", m.schema, mvName))
if err != nil {
_, _ = m.db.ExecContext(ctx,
fmt.Sprintf(`UPDATE %s.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, m.schema),
err.Error(), mvID)
metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc()
return err
}
_, _ = m.db.ExecContext(ctx,
fmt.Sprintf(`UPDATE %s.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, m.schema), mvID)
metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc()
metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds())
return nil
}
// Start 启动 worker每个 MV 一个 goroutine + ticker
func (m *Materializer) Start(ctx context.Context, interval time.Duration) {
m.mu.Lock()
m.running = true
m.mu.Unlock()
metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1)
defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0)
var wg sync.WaitGroup
for i, mv := range mvList {
wg.Add(1)
go func(idx int, mvName string) {
defer wg.Done()
// 错开启动30s × index避免同时刷新
select {
case <-m.stop:
return
case <-time.After(time.Duration(idx*30) * time.Second):
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-m.stop:
return
case <-ticker.C:
if err := m.RefreshOne(ctx, mvName); err != nil {
logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err))
}
}
}
}(i, mv)
}
<-m.stop
wg.Wait()
}
// Stop 停止 worker
func (m *Materializer) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
close(m.stop)
m.running = false
}
}