- Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
130 lines
3.4 KiB
Go
130 lines
3.4 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
"go.uber.org/zap"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
)
|
||
|
||
// mvList 4 个物化视图
|
||
var mvList = []string{
|
||
"mv_daily_user_income",
|
||
"mv_daily_exhibition_revenue",
|
||
"mv_daily_like_income",
|
||
"mv_asset_level_distribution",
|
||
}
|
||
|
||
// Materializer 物化视图刷新 worker
|
||
// - 每个 MV 独立 goroutine + ticker,错开 30s 启动
|
||
// - pg_try_advisory_lock 防多实例重复刷新
|
||
// - 每次刷新写 refresh_log
|
||
type Materializer struct {
|
||
db *sql.DB
|
||
schema string
|
||
|
||
mu sync.Mutex
|
||
running bool
|
||
stop chan struct{}
|
||
}
|
||
|
||
// NewMaterializer 构造
|
||
func NewMaterializer(db *sql.DB, schema string) *Materializer {
|
||
return &Materializer{db: db, schema: schema, stop: make(chan struct{})}
|
||
}
|
||
|
||
// RefreshOne 刷新单个 MV(pg_try_advisory_lock 防多实例)
|
||
// 返回 error(nil = 成功或锁被其他实例抢走)
|
||
func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error {
|
||
// 抢锁(234567 区别于 weekly user income 的 123456)
|
||
var got bool
|
||
if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil {
|
||
return err
|
||
}
|
||
if !got {
|
||
return nil // 锁被其他实例抢走,本轮跳过
|
||
}
|
||
defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)")
|
||
|
||
// 记录开始
|
||
t0 := time.Now()
|
||
var mvID int
|
||
if err := m.db.QueryRowContext(ctx,
|
||
fmt.Sprintf(`INSERT INTO %s.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`, m.schema),
|
||
mvName).Scan(&mvID); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 执行 REFRESH
|
||
_, err := m.db.ExecContext(ctx,
|
||
fmt.Sprintf("REFRESH MATERIALIZED VIEW CONCURRENTLY %s.%s", m.schema, mvName))
|
||
if err != nil {
|
||
_, _ = m.db.ExecContext(ctx,
|
||
fmt.Sprintf(`UPDATE %s.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, m.schema),
|
||
err.Error(), mvID)
|
||
metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc()
|
||
return err
|
||
}
|
||
|
||
_, _ = m.db.ExecContext(ctx,
|
||
fmt.Sprintf(`UPDATE %s.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, m.schema), mvID)
|
||
metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc()
|
||
metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds())
|
||
return nil
|
||
}
|
||
|
||
// Start 启动 worker(每个 MV 一个 goroutine + ticker)
|
||
func (m *Materializer) Start(ctx context.Context, interval time.Duration) {
|
||
m.mu.Lock()
|
||
m.running = true
|
||
m.mu.Unlock()
|
||
metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1)
|
||
defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0)
|
||
|
||
var wg sync.WaitGroup
|
||
for i, mv := range mvList {
|
||
wg.Add(1)
|
||
go func(idx int, mvName string) {
|
||
defer wg.Done()
|
||
// 错开启动(30s × index)避免同时刷新
|
||
select {
|
||
case <-m.stop:
|
||
return
|
||
case <-time.After(time.Duration(idx*30) * time.Second):
|
||
}
|
||
|
||
ticker := time.NewTicker(interval)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-m.stop:
|
||
return
|
||
case <-ticker.C:
|
||
if err := m.RefreshOne(ctx, mvName); err != nil {
|
||
logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err))
|
||
}
|
||
}
|
||
}
|
||
}(i, mv)
|
||
}
|
||
|
||
<-m.stop
|
||
wg.Wait()
|
||
}
|
||
|
||
// Stop 停止 worker
|
||
func (m *Materializer) Stop() {
|
||
m.mu.Lock()
|
||
defer m.mu.Unlock()
|
||
if m.running {
|
||
close(m.stop)
|
||
m.running = false
|
||
}
|
||
}
|