From bed8f8e57817c8924d05fa48594a194e42541a7c Mon Sep 17 00:00:00 2001 From: zerosaturation Date: Mon, 8 Jun 2026 17:16:09 +0800 Subject: [PATCH] feat(statistic): T4-T8 event collection framework (Event + Sink + Repo + Service + Workers) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 --- .../services/statisticService/model/event.go | 33 ++++ .../statisticService/model/event_test.go | 42 +++++ .../statisticService/repository/event_repo.go | 60 ++++++ .../repository/event_repo_test.go | 124 ++++++++++++ .../repository/metric_repo.go | 88 +++++++++ .../statisticService/service/event_service.go | 101 ++++++++++ .../service/event_service_test.go | 124 ++++++++++++ .../statisticService/sink/channel_sink.go | 47 +++++ .../sink/channel_sink_test.go | 59 ++++++ .../statisticService/sink/event_sink.go | 19 ++ .../statisticService/worker/event_flusher.go | 111 +++++++++++ .../worker/event_flusher_test.go | 176 ++++++++++++++++++ .../statisticService/worker/materializer.go | 129 +++++++++++++ .../worker/materializer_test.go | 65 +++++++ .../metric_upcoming_level_ups_updater.go | 72 +++++++ .../metric_weekly_user_income_updater.go | 75 ++++++++ .../statisticService/worker/partitioner.go | 137 ++++++++++++++ .../worker/partitioner_test.go | 106 +++++++++++ 18 files changed, 1568 insertions(+) create mode 100644 backend/services/statisticService/model/event.go create mode 100644 backend/services/statisticService/model/event_test.go create mode 100644 backend/services/statisticService/repository/event_repo.go create mode 100644 backend/services/statisticService/repository/event_repo_test.go create mode 100644 backend/services/statisticService/repository/metric_repo.go create mode 100644 backend/services/statisticService/service/event_service.go create mode 100644 backend/services/statisticService/service/event_service_test.go create mode 100644 backend/services/statisticService/sink/channel_sink.go create mode 100644 backend/services/statisticService/sink/channel_sink_test.go create mode 100644 backend/services/statisticService/sink/event_sink.go create mode 100644 backend/services/statisticService/worker/event_flusher.go create mode 100644 backend/services/statisticService/worker/event_flusher_test.go create mode 100644 backend/services/statisticService/worker/materializer.go create mode 100644 backend/services/statisticService/worker/materializer_test.go create mode 100644 backend/services/statisticService/worker/metric_upcoming_level_ups_updater.go create mode 100644 backend/services/statisticService/worker/metric_weekly_user_income_updater.go create mode 100644 backend/services/statisticService/worker/partitioner.go create mode 100644 backend/services/statisticService/worker/partitioner_test.go diff --git a/backend/services/statisticService/model/event.go b/backend/services/statisticService/model/event.go new file mode 100644 index 0000000..b641fbe --- /dev/null +++ b/backend/services/statisticService/model/event.go @@ -0,0 +1,33 @@ +package model + +import "time" + +// Event 通用事件模型 +// 关联: spec §2.2 + plan Task 4 +type Event struct { + EventID string `json:"event_id"` + UserID int64 `json:"user_id"` + StarID int64 `json:"star_id"` + EventType string `json:"event_type"` + OccurredAt time.Time `json:"occurred_at"` + ReceivedAt time.Time `json:"received_at"` + Properties map[string]string `json:"properties"` +} + +// ToJSON 序列化为 map(用于日志/调试/Redis 缓存) +// 时间字段转为毫秒时间戳(与 proto event.proto 字段类型一致) +func (e *Event) ToJSON() map[string]interface{} { + props := e.Properties + if props == nil { + props = map[string]string{} + } + return map[string]interface{}{ + "event_id": e.EventID, + "user_id": e.UserID, + "star_id": e.StarID, + "event_type": e.EventType, + "occurred_at": e.OccurredAt.UnixMilli(), + "received_at": e.ReceivedAt.UnixMilli(), + "properties": props, + } +} diff --git a/backend/services/statisticService/model/event_test.go b/backend/services/statisticService/model/event_test.go new file mode 100644 index 0000000..c23d6c9 --- /dev/null +++ b/backend/services/statisticService/model/event_test.go @@ -0,0 +1,42 @@ +package model + +import ( + "testing" + "time" +) + +func TestEvent_ToJSON(t *testing.T) { + e := &Event{ + EventID: "uuid-123", + UserID: 100, + StarID: 1, + EventType: "asset.like", + OccurredAt: time.Unix(1700000000, 0), + ReceivedAt: time.Unix(1700000001, 0), + Properties: map[string]string{"asset_id": "456"}, + } + j := e.ToJSON() + if j["event_id"] != "uuid-123" { + t.Fatal("event_id mismatch") + } + if j["user_id"].(int64) != 100 { + t.Fatal("user_id mismatch") + } + if j["event_type"] != "asset.like" { + t.Fatal("event_type mismatch") + } + if j["properties"].(map[string]string)["asset_id"] != "456" { + t.Fatal("properties mismatch") + } + if j["occurred_at"].(int64) != 1700000000*1000 { + t.Fatalf("occurred_at millis = %v, want %d", j["occurred_at"], 1700000000*1000) + } +} + +func TestEvent_ToJSON_EmptyProperties(t *testing.T) { + e := &Event{EventID: "e1"} + j := e.ToJSON() + if j["properties"] == nil { + t.Fatal("properties should not be nil") + } +} diff --git a/backend/services/statisticService/repository/event_repo.go b/backend/services/statisticService/repository/event_repo.go new file mode 100644 index 0000000..7a6d2d3 --- /dev/null +++ b/backend/services/statisticService/repository/event_repo.go @@ -0,0 +1,60 @@ +package repository + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + + "github.com/topfans/backend/services/statisticService/model" +) + +// EventRepository events 表操作 +type EventRepository struct { + db *sql.DB + schema string +} + +// NewEventRepository 构造 EventRepository +func NewEventRepository(db *sql.DB, schema string) *EventRepository { + return &EventRepository{db: db, schema: schema} +} + +// InsertBatch 批量插入事件,event_id 重复时 ON CONFLICT DO NOTHING +// 返回实际插入的行数 +func (r *EventRepository) InsertBatch(ctx context.Context, events []*model.Event) (int, error) { + if len(events) == 0 { + return 0, nil + } + + placeholders := make([]string, 0, len(events)) + args := make([]interface{}, 0, len(events)*7) + for _, e := range events { + props := e.Properties + if props == nil { + props = map[string]string{} + } + propsJSON, _ := json.Marshal(props) + + placeholders = append(placeholders, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)", + len(args)+1, len(args)+2, len(args)+3, len(args)+4, len(args)+5, len(args)+6, len(args)+7)) + args = append(args, e.EventID, e.UserID, e.StarID, e.EventType, e.OccurredAt, e.ReceivedAt, string(propsJSON)) + } + + query := fmt.Sprintf(` + INSERT INTO %s.events (event_id, user_id, star_id, event_type, occurred_at, received_at, properties) + VALUES %s + ON CONFLICT (event_id, received_at) DO NOTHING + `, r.schema, strings.Join(placeholders, ",")) + + res, err := r.db.ExecContext(ctx, query, args...) + if err != nil { + return 0, fmt.Errorf("insert events: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return 0, fmt.Errorf("rows affected: %w", err) + } + return int(n), nil +} diff --git a/backend/services/statisticService/repository/event_repo_test.go b/backend/services/statisticService/repository/event_repo_test.go new file mode 100644 index 0000000..fb4c409 --- /dev/null +++ b/backend/services/statisticService/repository/event_repo_test.go @@ -0,0 +1,124 @@ +package repository + +import ( + "context" + "database/sql" + "os" + "testing" + "time" + + _ "github.com/lib/pq" + "github.com/topfans/backend/services/statisticService/model" +) + +// setupTestDB 创建测试 DB 连接和 schema(仅在 TEST_DATABASE_URL 设置时运行) +// schema 名带 t.Name() 后缀避免并发或顺序测试间污染 +func setupTestDB(t *testing.T) (*sql.DB, string, func()) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("TEST_DATABASE_URL not set, skipping integration test") + } + db, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatal(err) + } + if err := db.Ping(); err != nil { + t.Skipf("DB ping failed: %v", err) + } + + schema := "statistic_test_" + sanitizeName(t.Name()) + if _, err := db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema); err != nil { + t.Fatal(err) + } + // 测试用普通表(不分区)— 分区逻辑由 partitioner worker 单独测(T8) + // 这样避免 unique index on partitioned table 的微妙时序问题 + if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events ( + id BIGSERIAL PRIMARY KEY, + event_id UUID NOT NULL, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + event_type VARCHAR(64) NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + properties JSONB NOT NULL DEFAULT '{}' + )`); err != nil { + t.Fatal(err) + } + if _, err := db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`); err != nil { + t.Fatal(err) + } + + cleanup := func() { + db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE") + db.Close() + } + return db, schema, cleanup +} + +// sanitizeName 把 "TestEventRepo_Dedup" 转成 "testeventrepo_dedup"(PG identifier 合法) +func sanitizeName(s string) string { + out := make([]byte, 0, len(s)) + for i := 0; i < len(s); i++ { + c := s[i] + if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' { + out = append(out, c) + } else if c >= 'A' && c <= 'Z' { + out = append(out, c+32) // 转小写 + } + } + return string(out) +} + +func TestEventRepo_InsertBatch(t *testing.T) { + db, schema, cleanup := setupTestDB(t) + defer cleanup() + + repo := NewEventRepository(db, schema) + now := time.Now() + events := []*model.Event{ + {EventID: "11111111-1111-1111-1111-111111111111", UserID: 100, StarID: 1, EventType: "asset.like", + OccurredAt: now, ReceivedAt: now, Properties: map[string]string{"asset_id": "456"}}, + {EventID: "22222222-2222-2222-2222-222222222222", UserID: 101, StarID: 1, EventType: "asset.like", + OccurredAt: now, ReceivedAt: now, Properties: map[string]string{"asset_id": "457"}}, + } + inserted, err := repo.InsertBatch(context.Background(), events) + if err != nil { + t.Fatal(err) + } + if inserted != 2 { + t.Fatalf("expected 2, got %d", inserted) + } +} + +func TestEventRepo_Dedup(t *testing.T) { + db, schema, cleanup := setupTestDB(t) + defer cleanup() + + repo := NewEventRepository(db, schema) + now := time.Now() + e := &model.Event{EventID: "33333333-3333-3333-3333-333333333333", UserID: 100, StarID: 1, EventType: "asset.like", + OccurredAt: now, ReceivedAt: now, Properties: map[string]string{}} + inserted1, _ := repo.InsertBatch(context.Background(), []*model.Event{e}) + if inserted1 != 1 { + t.Fatalf("first insert: expected 1, got %d", inserted1) + } + // 重复 event_id + received_at 应被 ON CONFLICT 跳过 + inserted2, _ := repo.InsertBatch(context.Background(), []*model.Event{e}) + if inserted2 != 0 { + t.Fatalf("dedup: expected 0, got %d", inserted2) + } +} + +func TestEventRepo_EmptyBatch(t *testing.T) { + db, schema, cleanup := setupTestDB(t) + defer cleanup() + + repo := NewEventRepository(db, schema) + inserted, err := repo.InsertBatch(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + if inserted != 0 { + t.Fatalf("expected 0, got %d", inserted) + } +} diff --git a/backend/services/statisticService/repository/metric_repo.go b/backend/services/statisticService/repository/metric_repo.go new file mode 100644 index 0000000..d115e0a --- /dev/null +++ b/backend/services/statisticService/repository/metric_repo.go @@ -0,0 +1,88 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + + "github.com/topfans/backend/services/statisticService/model" +) + +// MetricRepository 预聚表 / 物化视图维护 +type MetricRepository struct { + db *sql.DB + schema string +} + +// NewMetricRepository 构造 MetricRepository +func NewMetricRepository(db *sql.DB, schema string) *MetricRepository { + return &MetricRepository{db: db, schema: schema} +} + +// UpsertRecentLevelUp 同步写入最近升级记录(仅当 event_type == "asset.level_up") +func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error { + if e == nil || e.EventType != "asset.level_up" { + return nil + } + assetID := e.Properties["asset_id"] + fromLevel := e.Properties["from"] + toLevel := e.Properties["to"] + upgradeTime := e.OccurredAt + if assetID == "" || toLevel == "" { + return nil + } + _, err := r.db.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO %s.metric_recent_level_ups + (user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, r.schema), + e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "") + return err +} + +// RefreshWeeklyUserIncome 全量重算本周 rank + total(pg_try_advisory_lock 防多实例) +func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error { + var got bool + if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil { + return err + } + if !got { + return nil // 抢不到锁本轮跳过 + } + defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)") + + _, err := r.db.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star) + SELECT + star_id, user_id, + DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start, + SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0 + THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal, + ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star + FROM %s.events + WHERE event_type IN ('exhibition.revenue', 'crystal.change') + AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai') + GROUP BY star_id, user_id + ON CONFLICT (star_id, user_id, week_start) DO UPDATE + SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW() + `, r.schema, r.schema)) + return err +} + +// RefreshUpcomingLevelUps 计算每个 asset 的 like_progress + duration_progress +// 注: public.asset_level_config 表名/字段名需 P1 末向 assetService 同学确认 +func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error { + _, err := r.db.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress) + SELECT + a.user_id, a.star_id, a.id, + LEAST(100, (a.like_count::FLOAT / NULLIF(alc.upgrade_like_threshold, 0) * 100)::INT) AS like_progress, + LEAST(100, (EXTRACT(EPOCH FROM (NOW() - a.placed_at))::FLOAT / NULLIF(alc.upgrade_duration_seconds, 1) * 100)::INT) AS duration_progress + FROM public.assets a + JOIN public.asset_level_config alc ON alc.level = a.level + WHERE a.status = 'active' AND a.deleted_at IS NULL + ON CONFLICT (user_id, star_id, asset_id) DO UPDATE + SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW() + `, r.schema)) + return err +} diff --git a/backend/services/statisticService/service/event_service.go b/backend/services/statisticService/service/event_service.go new file mode 100644 index 0000000..90248d3 --- /dev/null +++ b/backend/services/statisticService/service/event_service.go @@ -0,0 +1,101 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/sink" + pb "github.com/topfans/backend/pkg/proto/statistic" +) + +// 业务侧错误 +var ( + ErrInvalidEventID = errors.New("invalid event_id") + ErrInvalidEventType = errors.New("invalid event_type") + ErrPropertiesTooLarge = errors.New("properties too large") +) + +// MaxPropertiesSize properties JSON 最大 1KB(防止 abuse) +const MaxPropertiesSize = 1024 + +// DefaultEventTypeWhitelist 默认事件类型白名单 +var DefaultEventTypeWhitelist = []string{ + "asset.like", + "asset.mint", + "exhibition.start", + "exhibition.end", + "exhibition.revenue", + "asset.level_up", + "crystal.change", +} + +// EventService 事件业务逻辑层 +type EventService struct { + sink sink.EventSink + whiteList map[string]bool +} + +// NewEventService 构造 EventService +func NewEventService(s sink.EventSink, whiteList []string) *EventService { + wl := make(map[string]bool, len(whiteList)) + for _, t := range whiteList { + wl[t] = true + } + return &EventService{sink: s, whiteList: wl} +} + +// validate 校验事件 +func (s *EventService) validate(e *model.Event) error { + if e == nil { + return ErrInvalidEventID + } + if e.EventID == "" { + return ErrInvalidEventID + } + if !s.whiteList[e.EventType] { + return fmt.Errorf("%w: %s", ErrInvalidEventType, e.EventType) + } + b, _ := json.Marshal(e.Properties) + if len(b) > MaxPropertiesSize { + return fmt.Errorf("%w: %d > %d", ErrPropertiesTooLarge, len(b), MaxPropertiesSize) + } + return nil +} + +// TrackEvent 提交单个事件(非阻塞推入 sink) +func (s *EventService) TrackEvent(ctx context.Context, e *model.Event) (*pb.TrackEventResponse, error) { + if e.ReceivedAt.IsZero() { + e.ReceivedAt = time.Now() + } + if err := s.validate(e); err != nil { + return nil, err + } + if err := s.sink.Submit(ctx, e); err != nil { + return &pb.TrackEventResponse{Accepted: 0, Rejected: 1}, nil + } + return &pb.TrackEventResponse{Accepted: 1, Rejected: 0}, nil +} + +// BatchTrackEvent 批量提交 +func (s *EventService) BatchTrackEvent(ctx context.Context, es []*model.Event) (*pb.TrackEventResponse, error) { + accepted, rejected := int32(0), int32(0) + for _, e := range es { + if e.ReceivedAt.IsZero() { + e.ReceivedAt = time.Now() + } + if err := s.validate(e); err != nil { + rejected++ + continue + } + if err := s.sink.Submit(ctx, e); err != nil { + rejected++ + } else { + accepted++ + } + } + return &pb.TrackEventResponse{Accepted: accepted, Rejected: rejected}, nil +} diff --git a/backend/services/statisticService/service/event_service_test.go b/backend/services/statisticService/service/event_service_test.go new file mode 100644 index 0000000..2823f56 --- /dev/null +++ b/backend/services/statisticService/service/event_service_test.go @@ -0,0 +1,124 @@ +package service + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/sink" +) + +// mockSink 记录所有 submit 的事件 +type mockSink struct { + events []*model.Event + err error +} + +func (m *mockSink) Submit(ctx context.Context, e *model.Event) error { + if m.err != nil { + return m.err + } + m.events = append(m.events, e) + return nil +} + +func (m *mockSink) SubmitBatch(ctx context.Context, es []*model.Event) error { + if m.err != nil { + return m.err + } + m.events = append(m.events, es...) + return nil +} + +func (m *mockSink) Close() error { return nil } + +func TestEventService_TrackEvent_Success(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like", "asset.mint"}) + resp, err := svc.TrackEvent(context.Background(), &model.Event{ + EventID: "u1", UserID: 1, StarID: 1, EventType: "asset.like", + OccurredAt: time.Now(), Properties: map[string]string{}, + }) + if err != nil { + t.Fatal(err) + } + if resp.Accepted != 1 || resp.Rejected != 0 { + t.Fatalf("got accepted=%d rejected=%d, want 1/0", resp.Accepted, resp.Rejected) + } + if len(ms.events) != 1 { + t.Fatal("event not submitted") + } + if ms.events[0].ReceivedAt.IsZero() { + t.Fatal("ReceivedAt should be set by service") + } +} + +func TestEventService_TrackEvent_InvalidEventType(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like"}) + _, err := svc.TrackEvent(context.Background(), &model.Event{EventID: "u1", EventType: "evil.type"}) + if err == nil { + t.Fatal("expected error for invalid event type") + } + if !errors.Is(err, ErrInvalidEventType) { + t.Fatalf("expected ErrInvalidEventType, got %v", err) + } +} + +func TestEventService_TrackEvent_EmptyEventID(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like"}) + _, err := svc.TrackEvent(context.Background(), &model.Event{EventType: "asset.like"}) + if !errors.Is(err, ErrInvalidEventID) { + t.Fatalf("expected ErrInvalidEventID, got %v", err) + } +} + +func TestEventService_TrackEvent_PropertiesTooLarge(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like"}) + big := make(map[string]string) + for i := 0; i < 200; i++ { + big["k"+string(rune(i))] = string(make([]byte, 100)) + } + _, err := svc.TrackEvent(context.Background(), &model.Event{ + EventID: "u1", EventType: "asset.like", Properties: big, + }) + if !errors.Is(err, ErrPropertiesTooLarge) { + t.Fatalf("expected ErrPropertiesTooLarge, got %v", err) + } +} + +func TestEventService_TrackEvent_SinkError(t *testing.T) { + ms := &mockSink{err: sink.ErrChannelFull} + svc := NewEventService(ms, []string{"asset.like"}) + resp, err := svc.TrackEvent(context.Background(), &model.Event{ + EventID: "u1", EventType: "asset.like", + }) + if err != nil { + t.Fatal("expected no error (degraded to rejected), got", err) + } + if resp.Accepted != 0 || resp.Rejected != 1 { + t.Fatalf("expected 0/1, got %d/%d", resp.Accepted, resp.Rejected) + } +} + +func TestEventService_BatchTrackEvent_Mixed(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like", "asset.mint"}) + events := []*model.Event{ + {EventID: "1", EventType: "asset.like", Properties: map[string]string{}}, + {EventID: "2", EventType: "asset.mint", Properties: map[string]string{}}, + {EventID: "3", EventType: "evil.type"}, // 拒绝 + {EventID: "", EventType: "asset.like"}, // 拒绝 + } + resp, err := svc.BatchTrackEvent(context.Background(), events) + if err != nil { + t.Fatal(err) + } + if resp.Accepted != 2 || resp.Rejected != 2 { + t.Fatalf("got %d/%d, want 2/2", resp.Accepted, resp.Rejected) + } +} diff --git a/backend/services/statisticService/sink/channel_sink.go b/backend/services/statisticService/sink/channel_sink.go new file mode 100644 index 0000000..12019eb --- /dev/null +++ b/backend/services/statisticService/sink/channel_sink.go @@ -0,0 +1,47 @@ +package sink + +import ( + "context" + "errors" + + "github.com/topfans/backend/services/statisticService/model" +) + +// ErrChannelFull channel 满时返回 +var ErrChannelFull = errors.New("event channel full") + +// ChannelEventSink 把事件推到 channel,由 event_flusher 异步消费 +type ChannelEventSink struct { + ch chan<- *model.Event +} + +// NewChannelEventSink 构造 channel sink +func NewChannelEventSink(ch chan<- *model.Event) *ChannelEventSink { + return &ChannelEventSink{ch: ch} +} + +// Submit 非阻塞提交,channel 满时立即返回 ErrChannelFull +func (s *ChannelEventSink) Submit(ctx context.Context, e *model.Event) error { + if e == nil { + return errors.New("event is nil") + } + select { + case s.ch <- e: + return nil + default: + return ErrChannelFull + } +} + +// SubmitBatch 批量提交,任一失败立即停止 +func (s *ChannelEventSink) SubmitBatch(ctx context.Context, es []*model.Event) error { + for _, e := range es { + if err := s.Submit(ctx, e); err != nil { + return err + } + } + return nil +} + +// Close 关闭(channel sink 无状态,no-op) +func (s *ChannelEventSink) Close() error { return nil } diff --git a/backend/services/statisticService/sink/channel_sink_test.go b/backend/services/statisticService/sink/channel_sink_test.go new file mode 100644 index 0000000..9255f33 --- /dev/null +++ b/backend/services/statisticService/sink/channel_sink_test.go @@ -0,0 +1,59 @@ +package sink + +import ( + "context" + "testing" + "time" + + "github.com/topfans/backend/services/statisticService/model" +) + +func TestChannelEventSink_Submit(t *testing.T) { + ch := make(chan *model.Event, 10) + s := NewChannelEventSink(ch) + e := &model.Event{EventID: "test-1"} + if err := s.Submit(context.Background(), e); err != nil { + t.Fatalf("Submit failed: %v", err) + } + select { + case got := <-ch: + if got.EventID != "test-1" { + t.Fatal("event mismatch") + } + case <-time.After(100 * time.Millisecond): + t.Fatal("no event received") + } +} + +func TestChannelEventSink_SubmitBatch(t *testing.T) { + ch := make(chan *model.Event, 10) + s := NewChannelEventSink(ch) + events := []*model.Event{{EventID: "a"}, {EventID: "b"}} + if err := s.SubmitBatch(context.Background(), events); err != nil { + t.Fatalf("SubmitBatch failed: %v", err) + } + if len(ch) != 2 { + t.Fatalf("expected 2 events, got %d", len(ch)) + } +} + +func TestChannelEventSink_ChannelFull(t *testing.T) { + ch := make(chan *model.Event, 1) + s := NewChannelEventSink(ch) + // 第一个事件占用 channel + if err := s.Submit(context.Background(), &model.Event{EventID: "first"}); err != nil { + t.Fatalf("first submit failed: %v", err) + } + // 第二个应失败(channel 满) + if err := s.Submit(context.Background(), &model.Event{EventID: "second"}); err != ErrChannelFull { + t.Fatalf("expected ErrChannelFull, got %v", err) + } +} + +func TestChannelEventSink_NilEvent(t *testing.T) { + ch := make(chan *model.Event, 10) + s := NewChannelEventSink(ch) + if err := s.Submit(context.Background(), nil); err == nil { + t.Fatal("expected error for nil event") + } +} diff --git a/backend/services/statisticService/sink/event_sink.go b/backend/services/statisticService/sink/event_sink.go new file mode 100644 index 0000000..a59c226 --- /dev/null +++ b/backend/services/statisticService/sink/event_sink.go @@ -0,0 +1,19 @@ +package sink + +import ( + "context" + + "github.com/topfans/backend/services/statisticService/model" +) + +// EventSink 事件 sink 接口 +// 本期实现: ChannelEventSink(推到 channel,由 event_flusher 消费) +// 未来扩展: KafkaEventSink / ClickHouseDualWriteSink / SamplingEventSink +type EventSink interface { + // Submit 提交单个事件;非阻塞,channel 满时返回 ErrChannelFull + Submit(ctx context.Context, e *model.Event) error + // SubmitBatch 批量提交;任一失败即停止 + SubmitBatch(ctx context.Context, es []*model.Event) error + // Close 关闭 sink + Close() error +} diff --git a/backend/services/statisticService/worker/event_flusher.go b/backend/services/statisticService/worker/event_flusher.go new file mode 100644 index 0000000..de91cc5 --- /dev/null +++ b/backend/services/statisticService/worker/event_flusher.go @@ -0,0 +1,111 @@ +package worker + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/repository" +) + +// EventFlusher 攒批落库 worker +// - 从 channel 接收事件 +// - 攒 batchSize 条 或 到 interval 时触发落库 +// - 落库后同步触发 metric_recent_level_ups 更新(仅 asset.level_up 事件) +type EventFlusher struct { + ch <-chan *model.Event + eventRepo *repository.EventRepository + metricRepo *repository.MetricRepository + batchSize int + interval time.Duration + + mu sync.Mutex + running bool + stop chan struct{} +} + +// NewEventFlusher 构造 EventFlusher +func NewEventFlusher( + ch <-chan *model.Event, + eventRepo *repository.EventRepository, + metricRepo *repository.MetricRepository, + batchSize int, + interval time.Duration, +) *EventFlusher { + return &EventFlusher{ + ch: ch, + eventRepo: eventRepo, + metricRepo: metricRepo, + batchSize: batchSize, + interval: interval, + stop: make(chan struct{}), + } +} + +// Start 启动 worker(阻塞直到 ctx 取消或 Stop) +func (f *EventFlusher) Start(ctx context.Context) { + f.mu.Lock() + f.running = true + f.mu.Unlock() + metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(0) + + batch := make([]*model.Event, 0, f.batchSize) + ticker := time.NewTicker(f.interval) + defer ticker.Stop() + + flush := func() { + if len(batch) == 0 { + return + } + inserted, err := f.eventRepo.InsertBatch(ctx, batch) + if err != nil { + logger.Logger.Error("event_flusher insert failed", zap.Error(err), zap.Int("batch", len(batch))) + metrics.EventDBInsertTotal.WithLabelValues("failed").Inc() + } else { + metrics.EventDBInsertTotal.WithLabelValues("success").Inc() + } + // 同步触发 metric_recent_level_ups(不阻塞落库) + for _, e := range batch { + if err := f.metricRepo.UpsertRecentLevelUp(ctx, e); err != nil { + logger.Logger.Warn("UpsertRecentLevelUp failed", + zap.String("event_id", e.EventID), zap.Error(err)) + } + } + logger.Logger.Debug("event_flusher batch flushed", + zap.Int("inserted", inserted), zap.Int("batch", len(batch))) + // 复制 batch 避免被覆盖 + batch = batch[:0] + } + + for { + select { + case <-f.stop: + flush() + return + case e := <-f.ch: + batch = append(batch, e) + metrics.EventChannelSize.Set(float64(len(f.ch))) + if len(batch) >= f.batchSize { + flush() + } + case <-ticker.C: + flush() + } + } +} + +// Stop 停止 worker(会先 flush 残留事件再退出) +func (f *EventFlusher) Stop() { + f.mu.Lock() + defer f.mu.Unlock() + if f.running { + close(f.stop) + f.running = false + } +} diff --git a/backend/services/statisticService/worker/event_flusher_test.go b/backend/services/statisticService/worker/event_flusher_test.go new file mode 100644 index 0000000..b29ccb4 --- /dev/null +++ b/backend/services/statisticService/worker/event_flusher_test.go @@ -0,0 +1,176 @@ +package worker + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + _ "github.com/lib/pq" + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/repository" +) + +func TestMain(m *testing.M) { + // 初始化 logger (worker 内部用 logger.Logger) + _ = logger.Init(logger.Config{ServiceName: "statistic-test", Environment: "test"}) + os.Exit(m.Run()) +} + +func setupFlusherDB(t *testing.T) (*sql.DB, string, func()) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("TEST_DATABASE_URL not set") + } + db, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatal(err) + } + if err := db.Ping(); err != nil { + t.Skipf("DB ping failed: %v", err) + } + schema := "statistic_test_flusher_" + sanitizeName(t.Name()) + db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema) + // 普通表(非分区),简化测试 + db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events ( + id BIGSERIAL PRIMARY KEY, + event_id UUID NOT NULL, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + event_type VARCHAR(64) NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + properties JSONB NOT NULL DEFAULT '{}' + )`) + db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`) + // metric_recent_level_ups 表 + db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.metric_recent_level_ups ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + asset_id BIGINT NOT NULL, + from_level VARCHAR(8) NOT NULL, + to_level VARCHAR(8) NOT NULL, + upgrade_time TIMESTAMPTZ NOT NULL, + asset_name VARCHAR(128), + asset_thumb VARCHAR(512) + )`) + cleanup := func() { + db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE") + db.Close() + } + return db, schema, cleanup +} + +// sanitizeName 把 "TestEventFlusher_FlushBatch" 转成合法 PG identifier(小写) +func sanitizeName(s string) string { + out := make([]byte, 0, len(s)) + for i := 0; i < len(s); i++ { + c := s[i] + if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' { + out = append(out, c) + } else if c >= 'A' && c <= 'Z' { + out = append(out, c+32) + } + } + return string(out) +} + +func TestEventFlusher_FlushBatch(t *testing.T) { + db, schema, cleanup := setupFlusherDB(t) + defer cleanup() + + eventRepo := repository.NewEventRepository(db, schema) + metricRepo := repository.NewMetricRepository(db, schema) + + ch := make(chan *model.Event, 10) + flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second) + + go flusher.Start(context.Background()) + + now := time.Now() + for i := 0; i < 5; i++ { + ch <- &model.Event{ + EventID: fmt.Sprintf("44444444-4444-4444-4444-%012d", i), + UserID: 1, StarID: 1, EventType: "asset.like", + OccurredAt: now, ReceivedAt: now, + Properties: map[string]string{}, + } + } + time.Sleep(2 * time.Second) // 等 ticker 触发 flush + flusher.Stop() + + var n int + db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&n) + if n != 5 { + t.Fatalf("expected 5 events, got %d", n) + } +} + +func TestEventFlusher_TriggersMetricOnLevelUp(t *testing.T) { + db, schema, cleanup := setupFlusherDB(t) + defer cleanup() + + eventRepo := repository.NewEventRepository(db, schema) + metricRepo := repository.NewMetricRepository(db, schema) + + ch := make(chan *model.Event, 10) + // 小 batch size 触发立即 flush + flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond) + + go flusher.Start(context.Background()) + + now := time.Now() + ch <- &model.Event{ + EventID: "55555555-5555-5555-5555-555555555555", + UserID: 1, StarID: 1, EventType: "asset.level_up", + OccurredAt: now, ReceivedAt: now, + Properties: map[string]string{"asset_id": "123", "from": "SR", "to": "SSR"}, + } + time.Sleep(500 * time.Millisecond) // 等 batchSize 触发 + flusher.Stop() + + // 验证 events + metric_recent_level_ups + var eventsN, metricN int + db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&eventsN) + db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN) + + if eventsN != 1 { + t.Fatalf("expected 1 event, got %d", eventsN) + } + if metricN != 1 { + t.Fatalf("expected 1 metric_recent_level_ups row, got %d", metricN) + } +} + +func TestEventFlusher_NoMetricOnNonLevelUp(t *testing.T) { + db, schema, cleanup := setupFlusherDB(t) + defer cleanup() + + eventRepo := repository.NewEventRepository(db, schema) + metricRepo := repository.NewMetricRepository(db, schema) + + ch := make(chan *model.Event, 10) + flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond) + + go flusher.Start(context.Background()) + + now := time.Now() + ch <- &model.Event{ + EventID: "66666666-6666-6666-6666-666666666666", + UserID: 1, StarID: 1, EventType: "asset.like", + OccurredAt: now, ReceivedAt: now, + Properties: map[string]string{}, + } + time.Sleep(500 * time.Millisecond) + flusher.Stop() + + var metricN int + db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN) + if metricN != 0 { + t.Fatalf("expected 0 metric rows (asset.like != asset.level_up), got %d", metricN) + } +} diff --git a/backend/services/statisticService/worker/materializer.go b/backend/services/statisticService/worker/materializer.go new file mode 100644 index 0000000..0e493ca --- /dev/null +++ b/backend/services/statisticService/worker/materializer.go @@ -0,0 +1,129 @@ +package worker + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" +) + +// mvList 4 个物化视图 +var mvList = []string{ + "mv_daily_user_income", + "mv_daily_exhibition_revenue", + "mv_daily_like_income", + "mv_asset_level_distribution", +} + +// Materializer 物化视图刷新 worker +// - 每个 MV 独立 goroutine + ticker,错开 30s 启动 +// - pg_try_advisory_lock 防多实例重复刷新 +// - 每次刷新写 refresh_log +type Materializer struct { + db *sql.DB + schema string + + mu sync.Mutex + running bool + stop chan struct{} +} + +// NewMaterializer 构造 +func NewMaterializer(db *sql.DB, schema string) *Materializer { + return &Materializer{db: db, schema: schema, stop: make(chan struct{})} +} + +// RefreshOne 刷新单个 MV(pg_try_advisory_lock 防多实例) +// 返回 error(nil = 成功或锁被其他实例抢走) +func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error { + // 抢锁(234567 区别于 weekly user income 的 123456) + var got bool + if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil { + return err + } + if !got { + return nil // 锁被其他实例抢走,本轮跳过 + } + defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)") + + // 记录开始 + t0 := time.Now() + var mvID int + if err := m.db.QueryRowContext(ctx, + fmt.Sprintf(`INSERT INTO %s.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`, m.schema), + mvName).Scan(&mvID); err != nil { + return err + } + + // 执行 REFRESH + _, err := m.db.ExecContext(ctx, + fmt.Sprintf("REFRESH MATERIALIZED VIEW CONCURRENTLY %s.%s", m.schema, mvName)) + if err != nil { + _, _ = m.db.ExecContext(ctx, + fmt.Sprintf(`UPDATE %s.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, m.schema), + err.Error(), mvID) + metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc() + return err + } + + _, _ = m.db.ExecContext(ctx, + fmt.Sprintf(`UPDATE %s.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, m.schema), mvID) + metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc() + metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds()) + return nil +} + +// Start 启动 worker(每个 MV 一个 goroutine + ticker) +func (m *Materializer) Start(ctx context.Context, interval time.Duration) { + m.mu.Lock() + m.running = true + m.mu.Unlock() + metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0) + + var wg sync.WaitGroup + for i, mv := range mvList { + wg.Add(1) + go func(idx int, mvName string) { + defer wg.Done() + // 错开启动(30s × index)避免同时刷新 + select { + case <-m.stop: + return + case <-time.After(time.Duration(idx*30) * time.Second): + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-m.stop: + return + case <-ticker.C: + if err := m.RefreshOne(ctx, mvName); err != nil { + logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err)) + } + } + } + }(i, mv) + } + + <-m.stop + wg.Wait() +} + +// Stop 停止 worker +func (m *Materializer) Stop() { + m.mu.Lock() + defer m.mu.Unlock() + if m.running { + close(m.stop) + m.running = false + } +} diff --git a/backend/services/statisticService/worker/materializer_test.go b/backend/services/statisticService/worker/materializer_test.go new file mode 100644 index 0000000..c1f3006 --- /dev/null +++ b/backend/services/statisticService/worker/materializer_test.go @@ -0,0 +1,65 @@ +package worker + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + + _ "github.com/lib/pq" +) + +func setupMaterializerDB(t *testing.T) (*sql.DB, string, func()) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("TEST_DATABASE_URL not set") + } + db, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatal(err) + } + if err := db.Ping(); err != nil { + t.Skipf("DB ping failed: %v", err) + } + schema := "statistic_test_mat_" + sanitizeName(t.Name()) + db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema) + // refresh_log 表(MV DDL 不需要建,因为只测 RefreshOne 对 refresh_log 的写) + db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.refresh_log ( + id BIGSERIAL PRIMARY KEY, + mv_name VARCHAR(128) NOT NULL, + started_at TIMESTAMPTZ NOT NULL, + finished_at TIMESTAMPTZ, + row_count BIGINT, + status VARCHAR(16) NOT NULL, + error_message TEXT + )`, schema)) + + cleanup := func() { + db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE") + db.Close() + } + return db, schema, cleanup +} + +func TestMaterializer_RefreshOne_LogsToRefreshLog(t *testing.T) { + db, schema, cleanup := setupMaterializerDB(t) + defer cleanup() + + m := NewMaterializer(db, schema) + // 用一个不存在的 MV 名(期望失败但 refresh_log 仍写入) + err := m.RefreshOne(context.Background(), "mv_does_not_exist") + if err == nil { + t.Fatal("expected error for non-existent MV") + } + // 验证 refresh_log 有 failed 记录 + var status string + if err := db.QueryRow( + "SELECT status FROM "+schema+`.refresh_log WHERE mv_name='mv_does_not_exist' ORDER BY id DESC LIMIT 1`, + ).Scan(&status); err != nil { + t.Fatal(err) + } + if status != "failed" { + t.Fatalf("expected status=failed, got %s", status) + } +} diff --git a/backend/services/statisticService/worker/metric_upcoming_level_ups_updater.go b/backend/services/statisticService/worker/metric_upcoming_level_ups_updater.go new file mode 100644 index 0000000..78038d7 --- /dev/null +++ b/backend/services/statisticService/worker/metric_upcoming_level_ups_updater.go @@ -0,0 +1,72 @@ +package worker + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + "github.com/topfans/backend/services/statisticService/repository" +) + +// UpcomingLevelUpsUpdater 即将升级进度 worker(每 15 分钟) +type UpcomingLevelUpsUpdater struct { + repo *repository.MetricRepository + interval time.Duration + + mu sync.Mutex + running bool + stop chan struct{} +} + +// NewUpcomingLevelUpsUpdater 构造 +func NewUpcomingLevelUpsUpdater(repo *repository.MetricRepository, interval time.Duration) *UpcomingLevelUpsUpdater { + return &UpcomingLevelUpsUpdater{repo: repo, interval: interval, stop: make(chan struct{})} +} + +// Start 启动 worker(阻塞) +func (u *UpcomingLevelUpsUpdater) Start(ctx context.Context) { + u.mu.Lock() + u.running = true + u.mu.Unlock() + metrics.WorkerRunningCount.WithLabelValues("upcoming_level_ups").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("upcoming_level_ups").Set(0) + + ticker := time.NewTicker(u.interval) + defer ticker.Stop() + + u.runOnce(ctx) + + for { + select { + case <-u.stop: + return + case <-ticker.C: + u.runOnce(ctx) + } + } +} + +func (u *UpcomingLevelUpsUpdater) runOnce(ctx context.Context) { + t0 := time.Now() + if err := u.repo.RefreshUpcomingLevelUps(ctx); err != nil { + logger.Logger.Error("RefreshUpcomingLevelUps failed", zap.Error(err)) + metrics.MVRefreshTotal.WithLabelValues("upcoming_level_ups", "failed").Inc() + return + } + metrics.MVRefreshTotal.WithLabelValues("upcoming_level_ups", "success").Inc() + metrics.MVRefreshDuration.WithLabelValues("upcoming_level_ups").Observe(time.Since(t0).Seconds()) +} + +// Stop 停止 worker +func (u *UpcomingLevelUpsUpdater) Stop() { + u.mu.Lock() + defer u.mu.Unlock() + if u.running { + close(u.stop) + u.running = false + } +} diff --git a/backend/services/statisticService/worker/metric_weekly_user_income_updater.go b/backend/services/statisticService/worker/metric_weekly_user_income_updater.go new file mode 100644 index 0000000..8fce29e --- /dev/null +++ b/backend/services/statisticService/worker/metric_weekly_user_income_updater.go @@ -0,0 +1,75 @@ +package worker + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + "github.com/topfans/backend/services/statisticService/repository" +) + +// WeeklyUserIncomeUpdater 每周用户收入 + 排名 worker +// - 每 5 分钟从 events 聚合本周收入,计算 rank +// - 用 pg_try_advisory_lock 防多实例重复 +type WeeklyUserIncomeUpdater struct { + repo *repository.MetricRepository + interval time.Duration + + mu sync.Mutex + running bool + stop chan struct{} +} + +// NewWeeklyUserIncomeUpdater 构造 +func NewWeeklyUserIncomeUpdater(repo *repository.MetricRepository, interval time.Duration) *WeeklyUserIncomeUpdater { + return &WeeklyUserIncomeUpdater{repo: repo, interval: interval, stop: make(chan struct{})} +} + +// Start 启动 worker(阻塞) +func (w *WeeklyUserIncomeUpdater) Start(ctx context.Context) { + w.mu.Lock() + w.running = true + w.mu.Unlock() + metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(0) + + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + // 启动时立即跑一次 + w.runOnce(ctx) + + for { + select { + case <-w.stop: + return + case <-ticker.C: + w.runOnce(ctx) + } + } +} + +func (w *WeeklyUserIncomeUpdater) runOnce(ctx context.Context) { + t0 := time.Now() + if err := w.repo.RefreshWeeklyUserIncome(ctx); err != nil { + logger.Logger.Error("RefreshWeeklyUserIncome failed", zap.Error(err)) + metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "failed").Inc() + return + } + metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "success").Inc() + metrics.MVRefreshDuration.WithLabelValues("weekly_user_income").Observe(time.Since(t0).Seconds()) +} + +// Stop 停止 worker +func (w *WeeklyUserIncomeUpdater) Stop() { + w.mu.Lock() + defer w.mu.Unlock() + if w.running { + close(w.stop) + w.running = false + } +} diff --git a/backend/services/statisticService/worker/partitioner.go b/backend/services/statisticService/worker/partitioner.go new file mode 100644 index 0000000..3ca0aa7 --- /dev/null +++ b/backend/services/statisticService/worker/partitioner.go @@ -0,0 +1,137 @@ +package worker + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" +) + +// Partitioner events 分区自动管理 +// - 启动时创建未来 N 天分区 +// - 每天 00:05 滚动预创建 +// - 每天 00:30 清理超过保留期的旧分区 +type Partitioner struct { + db *sql.DB + schema string + preCreateDays int + retentionDays int + + mu sync.Mutex + running bool + stop chan struct{} +} + +// NewPartitioner 构造 +func NewPartitioner(db *sql.DB, schema string, preCreateDays, retentionDays int) *Partitioner { + return &Partitioner{ + db: db, + schema: schema, + preCreateDays: preCreateDays, + retentionDays: retentionDays, + stop: make(chan struct{}), + } +} + +// EnsureFuturePartitions 创建未来 days 天的分区(含今天) +func (p *Partitioner) EnsureFuturePartitions(ctx context.Context, days int) error { + now := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)) + for i := 0; i <= days; i++ { + d := now.AddDate(0, 0, i) + next := d.AddDate(0, 0, 1) + name := fmt.Sprintf("events_%s", d.Format("2006_01_02")) + sqlStr := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s PARTITION OF %s.events + FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08') + `, p.schema, name, p.schema, d.Format("2006-01-02"), next.Format("2006-01-02")) + if _, err := p.db.ExecContext(ctx, sqlStr); err != nil { + return fmt.Errorf("create partition %s: %w", name, err) + } + metrics.EventsPartitionCount.Inc() + } + return nil +} + +// CleanupOldPartitions 删除超过 retentionDays 的旧分区 +func (p *Partitioner) CleanupOldPartitions(ctx context.Context) error { + cutoff := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)).AddDate(0, 0, -p.retentionDays) + cutoffName := fmt.Sprintf("events_%s", cutoff.Format("2006_01_02")) + + // LIKE 'events_%'(% 作为 LIKE 通配符匹配任何后缀) + // 用字符串拼接避免与 fmt.Sprintf 的 % 转义冲突 + rows, err := p.db.QueryContext(ctx, + "SELECT tablename FROM pg_tables WHERE schemaname = $1 AND tablename LIKE 'events_%' AND tablename < $2", + p.schema, cutoffName) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + logger.Logger.Warn("scan partition name failed", zap.Error(err)) + continue + } + if _, err := p.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s CASCADE", p.schema, name)); err != nil { + logger.Logger.Warn("drop partition failed", zap.String("name", name), zap.Error(err)) + } else { + logger.Logger.Info("dropped old partition", zap.String("name", name)) + } + } + return nil +} + +// Start 启动 worker(阻塞) +// - 启动时确保未来 N 天分区 +// - 每小时检查是否到了 00:05(创建)或 00:30(清理)窗口 +func (p *Partitioner) Start(ctx context.Context) { + p.mu.Lock() + p.running = true + p.mu.Unlock() + metrics.WorkerRunningCount.WithLabelValues("partitioner").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("partitioner").Set(0) + + // 启动时跑一次 + if err := p.EnsureFuturePartitions(ctx, p.preCreateDays); err != nil { + logger.Logger.Error("EnsureFuturePartitions failed at startup", zap.Error(err)) + } + + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-p.stop: + return + case <-ticker.C: + hour := time.Now().Hour() + min := time.Now().Minute() + if hour == 0 && min < 10 { + if err := p.EnsureFuturePartitions(ctx, p.preCreateDays); err != nil { + logger.Logger.Error("EnsureFuturePartitions failed", zap.Error(err)) + } + } + if hour == 0 && min >= 30 && min < 40 { + if err := p.CleanupOldPartitions(ctx); err != nil { + logger.Logger.Error("CleanupOldPartitions failed", zap.Error(err)) + } + } + } + } +} + +// Stop 停止 worker +func (p *Partitioner) Stop() { + p.mu.Lock() + defer p.mu.Unlock() + if p.running { + close(p.stop) + p.running = false + } +} diff --git a/backend/services/statisticService/worker/partitioner_test.go b/backend/services/statisticService/worker/partitioner_test.go new file mode 100644 index 0000000..6970629 --- /dev/null +++ b/backend/services/statisticService/worker/partitioner_test.go @@ -0,0 +1,106 @@ +package worker + +import ( + "context" + "database/sql" + "fmt" + "os" + "testing" + "time" + + _ "github.com/lib/pq" +) + +func setupPartitionerDB(t *testing.T) (*sql.DB, string, func()) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("TEST_DATABASE_URL not set") + } + db, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatal(err) + } + if err := db.Ping(); err != nil { + t.Skipf("DB ping failed: %v", err) + } + schema := "statistic_test_part_" + sanitizeName(t.Name()) + db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema) + // 分区 events 表 + db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.events ( + id BIGSERIAL, event_id UUID NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (id, received_at) + ) PARTITION BY RANGE (received_at)`, schema)) + cleanup := func() { + db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE") + db.Close() + } + return db, schema, cleanup +} + +func TestPartitioner_EnsureFuture(t *testing.T) { + db, schema, cleanup := setupPartitionerDB(t) + defer cleanup() + + p := NewPartitioner(db, schema, 7, 30) + if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil { + t.Fatal(err) + } + // 验证 3 个分区存在(含今天) + var n int + if err := db.QueryRow( + "SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename LIKE 'events_%'", schema).Scan(&n); err != nil { + t.Fatal(err) + } + if n != 4 { + t.Fatalf("expected 4 partitions (today + 3 future), got %d", n) + } +} + +func TestPartitioner_EnsureFuture_Idempotent(t *testing.T) { + db, schema, cleanup := setupPartitionerDB(t) + defer cleanup() + + p := NewPartitioner(db, schema, 7, 30) + // 跑两次应都成功(IF NOT EXISTS) + if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil { + t.Fatal(err) + } + if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil { + t.Fatalf("second call failed: %v", err) + } +} + +func TestPartitioner_CleanupOld(t *testing.T) { + db, schema, cleanup := setupPartitionerDB(t) + defer cleanup() + + // 手动创建 35 天前 + 5 天前的分区 + old := time.Now().AddDate(0, 0, -35) + recent := time.Now().AddDate(0, 0, -5) + oldName := fmt.Sprintf("events_%s", old.Format("2006_01_02")) + recentName := fmt.Sprintf("events_%s", recent.Format("2006_01_02")) + + db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events + FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`, + schema, oldName, schema, old.Format("2006-01-02"), old.AddDate(0, 0, 1).Format("2006-01-02"))) + db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events + FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`, + schema, recentName, schema, recent.Format("2006-01-02"), recent.AddDate(0, 0, 1).Format("2006-01-02"))) + + // 30 天保留策略:35 天前应被清理,5 天前应保留 + p := NewPartitioner(db, schema, 7, 30) + if err := p.CleanupOldPartitions(context.Background()); err != nil { + t.Fatal(err) + } + + var oldN, recentN int + db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, oldName).Scan(&oldN) + db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, recentName).Scan(&recentN) + if oldN != 0 { + t.Fatalf("35-day-old partition should be dropped, still exists (count=%d)", oldN) + } + if recentN != 1 { + t.Fatalf("5-day-old partition should remain, got count=%d", recentN) + } +}