topfans/backend/services/statisticService/worker/event_flusher.go
zerosaturation bed8f8e578 feat(statistic): T4-T8 event collection framework (Event + Sink + Repo + Service + Workers)
- Event model + ToJSON
- EventSink interface + ChannelEventSink (non-blocking Submit)
- event_repo: batch INSERT ON CONFLICT DO NOTHING dedup
- event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill
- event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up
- metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock)
- partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup)
- 22 unit + integration tests (model/repo/service/sink/worker)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:20:53 +08:00

112 lines
2.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package worker
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/metrics"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/repository"
)
// EventFlusher 攒批落库 worker
// - 从 channel 接收事件
// - 攒 batchSize 条 或 到 interval 时触发落库
// - 落库后同步触发 metric_recent_level_ups 更新(仅 asset.level_up 事件)
type EventFlusher struct {
ch <-chan *model.Event
eventRepo *repository.EventRepository
metricRepo *repository.MetricRepository
batchSize int
interval time.Duration
mu sync.Mutex
running bool
stop chan struct{}
}
// NewEventFlusher 构造 EventFlusher
func NewEventFlusher(
ch <-chan *model.Event,
eventRepo *repository.EventRepository,
metricRepo *repository.MetricRepository,
batchSize int,
interval time.Duration,
) *EventFlusher {
return &EventFlusher{
ch: ch,
eventRepo: eventRepo,
metricRepo: metricRepo,
batchSize: batchSize,
interval: interval,
stop: make(chan struct{}),
}
}
// Start 启动 worker阻塞直到 ctx 取消或 Stop
func (f *EventFlusher) Start(ctx context.Context) {
f.mu.Lock()
f.running = true
f.mu.Unlock()
metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(1)
defer metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(0)
batch := make([]*model.Event, 0, f.batchSize)
ticker := time.NewTicker(f.interval)
defer ticker.Stop()
flush := func() {
if len(batch) == 0 {
return
}
inserted, err := f.eventRepo.InsertBatch(ctx, batch)
if err != nil {
logger.Logger.Error("event_flusher insert failed", zap.Error(err), zap.Int("batch", len(batch)))
metrics.EventDBInsertTotal.WithLabelValues("failed").Inc()
} else {
metrics.EventDBInsertTotal.WithLabelValues("success").Inc()
}
// 同步触发 metric_recent_level_ups不阻塞落库
for _, e := range batch {
if err := f.metricRepo.UpsertRecentLevelUp(ctx, e); err != nil {
logger.Logger.Warn("UpsertRecentLevelUp failed",
zap.String("event_id", e.EventID), zap.Error(err))
}
}
logger.Logger.Debug("event_flusher batch flushed",
zap.Int("inserted", inserted), zap.Int("batch", len(batch)))
// 复制 batch 避免被覆盖
batch = batch[:0]
}
for {
select {
case <-f.stop:
flush()
return
case e := <-f.ch:
batch = append(batch, e)
metrics.EventChannelSize.Set(float64(len(f.ch)))
if len(batch) >= f.batchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
// Stop 停止 worker会先 flush 残留事件再退出)
func (f *EventFlusher) Stop() {
f.mu.Lock()
defer f.mu.Unlock()
if f.running {
close(f.stop)
f.running = false
}
}