package worker import ( "context" "sync" "time" "go.uber.org/zap" "github.com/topfans/backend/pkg/logger" "github.com/topfans/backend/services/statisticService/metrics" "github.com/topfans/backend/services/statisticService/model" "github.com/topfans/backend/services/statisticService/repository" ) // EventFlusher 攒批落库 worker // - 从 channel 接收事件 // - 攒 batchSize 条 或 到 interval 时触发落库 // - 落库后同步触发 metric_recent_level_ups 更新(仅 asset.level_up 事件) type EventFlusher struct { ch <-chan *model.Event eventRepo *repository.EventRepository metricRepo *repository.MetricRepository batchSize int interval time.Duration mu sync.Mutex running bool stop chan struct{} } // NewEventFlusher 构造 EventFlusher func NewEventFlusher( ch <-chan *model.Event, eventRepo *repository.EventRepository, metricRepo *repository.MetricRepository, batchSize int, interval time.Duration, ) *EventFlusher { return &EventFlusher{ ch: ch, eventRepo: eventRepo, metricRepo: metricRepo, batchSize: batchSize, interval: interval, stop: make(chan struct{}), } } // Start 启动 worker(阻塞直到 ctx 取消或 Stop) func (f *EventFlusher) Start(ctx context.Context) { f.mu.Lock() f.running = true f.mu.Unlock() metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(1) defer metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(0) batch := make([]*model.Event, 0, f.batchSize) ticker := time.NewTicker(f.interval) defer ticker.Stop() flush := func() { if len(batch) == 0 { return } inserted, err := f.eventRepo.InsertBatch(ctx, batch) if err != nil { logger.Logger.Error("event_flusher insert failed", zap.Error(err), zap.Int("batch", len(batch))) metrics.EventDBInsertTotal.WithLabelValues("failed").Inc() } else { metrics.EventDBInsertTotal.WithLabelValues("success").Inc() } // 同步触发 metric_recent_level_ups(不阻塞落库) for _, e := range batch { if err := f.metricRepo.UpsertRecentLevelUp(ctx, e); err != nil { logger.Logger.Warn("UpsertRecentLevelUp failed", zap.String("event_id", e.EventID), zap.Error(err)) } } logger.Logger.Debug("event_flusher batch flushed", zap.Int("inserted", inserted), zap.Int("batch", len(batch))) // 复制 batch 避免被覆盖 batch = batch[:0] } for { select { case <-f.stop: flush() return case e := <-f.ch: batch = append(batch, e) metrics.EventChannelSize.Set(float64(len(f.ch))) if len(batch) >= f.batchSize { flush() } case <-ticker.C: flush() } } } // Stop 停止 worker(会先 flush 残留事件再退出) func (f *EventFlusher) Stop() { f.mu.Lock() defer f.mu.Unlock() if f.running { close(f.stop) f.running = false } }