topfans/backend/services/statisticService/sink/channel_sink.go
zerosaturation bed8f8e578 feat(statistic): T4-T8 event collection framework (Event + Sink + Repo + Service + Workers)
- Event model + ToJSON
- EventSink interface + ChannelEventSink (non-blocking Submit)
- event_repo: batch INSERT ON CONFLICT DO NOTHING dedup
- event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill
- event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up
- metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock)
- partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup)
- 22 unit + integration tests (model/repo/service/sink/worker)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:20:53 +08:00

48 lines
1.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package sink
import (
"context"
"errors"
"github.com/topfans/backend/services/statisticService/model"
)
// ErrChannelFull channel 满时返回
var ErrChannelFull = errors.New("event channel full")
// ChannelEventSink 把事件推到 channel由 event_flusher 异步消费
type ChannelEventSink struct {
ch chan<- *model.Event
}
// NewChannelEventSink 构造 channel sink
func NewChannelEventSink(ch chan<- *model.Event) *ChannelEventSink {
return &ChannelEventSink{ch: ch}
}
// Submit 非阻塞提交channel 满时立即返回 ErrChannelFull
func (s *ChannelEventSink) Submit(ctx context.Context, e *model.Event) error {
if e == nil {
return errors.New("event is nil")
}
select {
case s.ch <- e:
return nil
default:
return ErrChannelFull
}
}
// SubmitBatch 批量提交,任一失败立即停止
func (s *ChannelEventSink) SubmitBatch(ctx context.Context, es []*model.Event) error {
for _, e := range es {
if err := s.Submit(ctx, e); err != nil {
return err
}
}
return nil
}
// Close 关闭channel sink 无状态no-op
func (s *ChannelEventSink) Close() error { return nil }