- Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
102 lines
2.5 KiB
Go
102 lines
2.5 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/sink"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
)
|
||
|
||
// 业务侧错误
|
||
var (
|
||
ErrInvalidEventID = errors.New("invalid event_id")
|
||
ErrInvalidEventType = errors.New("invalid event_type")
|
||
ErrPropertiesTooLarge = errors.New("properties too large")
|
||
)
|
||
|
||
// MaxPropertiesSize properties JSON 最大 1KB(防止 abuse)
|
||
const MaxPropertiesSize = 1024
|
||
|
||
// DefaultEventTypeWhitelist 默认事件类型白名单
|
||
var DefaultEventTypeWhitelist = []string{
|
||
"asset.like",
|
||
"asset.mint",
|
||
"exhibition.start",
|
||
"exhibition.end",
|
||
"exhibition.revenue",
|
||
"asset.level_up",
|
||
"crystal.change",
|
||
}
|
||
|
||
// EventService 事件业务逻辑层
|
||
type EventService struct {
|
||
sink sink.EventSink
|
||
whiteList map[string]bool
|
||
}
|
||
|
||
// NewEventService 构造 EventService
|
||
func NewEventService(s sink.EventSink, whiteList []string) *EventService {
|
||
wl := make(map[string]bool, len(whiteList))
|
||
for _, t := range whiteList {
|
||
wl[t] = true
|
||
}
|
||
return &EventService{sink: s, whiteList: wl}
|
||
}
|
||
|
||
// validate 校验事件
|
||
func (s *EventService) validate(e *model.Event) error {
|
||
if e == nil {
|
||
return ErrInvalidEventID
|
||
}
|
||
if e.EventID == "" {
|
||
return ErrInvalidEventID
|
||
}
|
||
if !s.whiteList[e.EventType] {
|
||
return fmt.Errorf("%w: %s", ErrInvalidEventType, e.EventType)
|
||
}
|
||
b, _ := json.Marshal(e.Properties)
|
||
if len(b) > MaxPropertiesSize {
|
||
return fmt.Errorf("%w: %d > %d", ErrPropertiesTooLarge, len(b), MaxPropertiesSize)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// TrackEvent 提交单个事件(非阻塞推入 sink)
|
||
func (s *EventService) TrackEvent(ctx context.Context, e *model.Event) (*pb.TrackEventResponse, error) {
|
||
if e.ReceivedAt.IsZero() {
|
||
e.ReceivedAt = time.Now()
|
||
}
|
||
if err := s.validate(e); err != nil {
|
||
return nil, err
|
||
}
|
||
if err := s.sink.Submit(ctx, e); err != nil {
|
||
return &pb.TrackEventResponse{Accepted: 0, Rejected: 1}, nil
|
||
}
|
||
return &pb.TrackEventResponse{Accepted: 1, Rejected: 0}, nil
|
||
}
|
||
|
||
// BatchTrackEvent 批量提交
|
||
func (s *EventService) BatchTrackEvent(ctx context.Context, es []*model.Event) (*pb.TrackEventResponse, error) {
|
||
accepted, rejected := int32(0), int32(0)
|
||
for _, e := range es {
|
||
if e.ReceivedAt.IsZero() {
|
||
e.ReceivedAt = time.Now()
|
||
}
|
||
if err := s.validate(e); err != nil {
|
||
rejected++
|
||
continue
|
||
}
|
||
if err := s.sink.Submit(ctx, e); err != nil {
|
||
rejected++
|
||
} else {
|
||
accepted++
|
||
}
|
||
}
|
||
return &pb.TrackEventResponse{Accepted: accepted, Rejected: rejected}, nil
|
||
}
|