- Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
138 lines
3.9 KiB
Go
138 lines
3.9 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
"go.uber.org/zap"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
)
|
||
|
||
// Partitioner events 分区自动管理
|
||
// - 启动时创建未来 N 天分区
|
||
// - 每天 00:05 滚动预创建
|
||
// - 每天 00:30 清理超过保留期的旧分区
|
||
type Partitioner struct {
|
||
db *sql.DB
|
||
schema string
|
||
preCreateDays int
|
||
retentionDays int
|
||
|
||
mu sync.Mutex
|
||
running bool
|
||
stop chan struct{}
|
||
}
|
||
|
||
// NewPartitioner 构造
|
||
func NewPartitioner(db *sql.DB, schema string, preCreateDays, retentionDays int) *Partitioner {
|
||
return &Partitioner{
|
||
db: db,
|
||
schema: schema,
|
||
preCreateDays: preCreateDays,
|
||
retentionDays: retentionDays,
|
||
stop: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// EnsureFuturePartitions 创建未来 days 天的分区(含今天)
|
||
func (p *Partitioner) EnsureFuturePartitions(ctx context.Context, days int) error {
|
||
now := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600))
|
||
for i := 0; i <= days; i++ {
|
||
d := now.AddDate(0, 0, i)
|
||
next := d.AddDate(0, 0, 1)
|
||
name := fmt.Sprintf("events_%s", d.Format("2006_01_02"))
|
||
sqlStr := fmt.Sprintf(`
|
||
CREATE TABLE IF NOT EXISTS %s.%s PARTITION OF %s.events
|
||
FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')
|
||
`, p.schema, name, p.schema, d.Format("2006-01-02"), next.Format("2006-01-02"))
|
||
if _, err := p.db.ExecContext(ctx, sqlStr); err != nil {
|
||
return fmt.Errorf("create partition %s: %w", name, err)
|
||
}
|
||
metrics.EventsPartitionCount.Inc()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// CleanupOldPartitions 删除超过 retentionDays 的旧分区
|
||
func (p *Partitioner) CleanupOldPartitions(ctx context.Context) error {
|
||
cutoff := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)).AddDate(0, 0, -p.retentionDays)
|
||
cutoffName := fmt.Sprintf("events_%s", cutoff.Format("2006_01_02"))
|
||
|
||
// LIKE 'events_%'(% 作为 LIKE 通配符匹配任何后缀)
|
||
// 用字符串拼接避免与 fmt.Sprintf 的 % 转义冲突
|
||
rows, err := p.db.QueryContext(ctx,
|
||
"SELECT tablename FROM pg_tables WHERE schemaname = $1 AND tablename LIKE 'events_%' AND tablename < $2",
|
||
p.schema, cutoffName)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer rows.Close()
|
||
for rows.Next() {
|
||
var name string
|
||
if err := rows.Scan(&name); err != nil {
|
||
logger.Logger.Warn("scan partition name failed", zap.Error(err))
|
||
continue
|
||
}
|
||
if _, err := p.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s CASCADE", p.schema, name)); err != nil {
|
||
logger.Logger.Warn("drop partition failed", zap.String("name", name), zap.Error(err))
|
||
} else {
|
||
logger.Logger.Info("dropped old partition", zap.String("name", name))
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Start 启动 worker(阻塞)
|
||
// - 启动时确保未来 N 天分区
|
||
// - 每小时检查是否到了 00:05(创建)或 00:30(清理)窗口
|
||
func (p *Partitioner) Start(ctx context.Context) {
|
||
p.mu.Lock()
|
||
p.running = true
|
||
p.mu.Unlock()
|
||
metrics.WorkerRunningCount.WithLabelValues("partitioner").Set(1)
|
||
defer metrics.WorkerRunningCount.WithLabelValues("partitioner").Set(0)
|
||
|
||
// 启动时跑一次
|
||
if err := p.EnsureFuturePartitions(ctx, p.preCreateDays); err != nil {
|
||
logger.Logger.Error("EnsureFuturePartitions failed at startup", zap.Error(err))
|
||
}
|
||
|
||
ticker := time.NewTicker(1 * time.Hour)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-p.stop:
|
||
return
|
||
case <-ticker.C:
|
||
hour := time.Now().Hour()
|
||
min := time.Now().Minute()
|
||
if hour == 0 && min < 10 {
|
||
if err := p.EnsureFuturePartitions(ctx, p.preCreateDays); err != nil {
|
||
logger.Logger.Error("EnsureFuturePartitions failed", zap.Error(err))
|
||
}
|
||
}
|
||
if hour == 0 && min >= 30 && min < 40 {
|
||
if err := p.CleanupOldPartitions(ctx); err != nil {
|
||
logger.Logger.Error("CleanupOldPartitions failed", zap.Error(err))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Stop 停止 worker
|
||
func (p *Partitioner) Stop() {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
if p.running {
|
||
close(p.stop)
|
||
p.running = false
|
||
}
|
||
}
|