package sink import ( "context" "errors" "github.com/topfans/backend/services/statisticService/model" ) // ErrChannelFull channel 满时返回 var ErrChannelFull = errors.New("event channel full") // ChannelEventSink 把事件推到 channel,由 event_flusher 异步消费 type ChannelEventSink struct { ch chan<- *model.Event } // NewChannelEventSink 构造 channel sink func NewChannelEventSink(ch chan<- *model.Event) *ChannelEventSink { return &ChannelEventSink{ch: ch} } // Submit 非阻塞提交,channel 满时立即返回 ErrChannelFull func (s *ChannelEventSink) Submit(ctx context.Context, e *model.Event) error { if e == nil { return errors.New("event is nil") } select { case s.ch <- e: return nil default: return ErrChannelFull } } // SubmitBatch 批量提交,任一失败立即停止 func (s *ChannelEventSink) SubmitBatch(ctx context.Context, es []*model.Event) error { for _, e := range es { if err := s.Submit(ctx, e); err != nil { return err } } return nil } // Close 关闭(channel sink 无状态,no-op) func (s *ChannelEventSink) Close() error { return nil }