topfans/backend/services/statisticService/worker/event_flusher_test.go
zerosaturation bed8f8e578 feat(statistic): T4-T8 event collection framework (Event + Sink + Repo + Service + Workers)
- Event model + ToJSON
- EventSink interface + ChannelEventSink (non-blocking Submit)
- event_repo: batch INSERT ON CONFLICT DO NOTHING dedup
- event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill
- event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up
- metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock)
- partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup)
- 22 unit + integration tests (model/repo/service/sink/worker)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:20:53 +08:00

177 lines
5.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package worker
import (
"context"
"database/sql"
"fmt"
"os"
"testing"
"time"
_ "github.com/lib/pq"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/repository"
)
func TestMain(m *testing.M) {
// 初始化 logger (worker 内部用 logger.Logger)
_ = logger.Init(logger.Config{ServiceName: "statistic-test", Environment: "test"})
os.Exit(m.Run())
}
func setupFlusherDB(t *testing.T) (*sql.DB, string, func()) {
dsn := os.Getenv("TEST_DATABASE_URL")
if dsn == "" {
t.Skip("TEST_DATABASE_URL not set")
}
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatal(err)
}
if err := db.Ping(); err != nil {
t.Skipf("DB ping failed: %v", err)
}
schema := "statistic_test_flusher_" + sanitizeName(t.Name())
db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema)
// 普通表(非分区),简化测试
db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
event_type VARCHAR(64) NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
properties JSONB NOT NULL DEFAULT '{}'
)`)
db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`)
// metric_recent_level_ups 表
db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.metric_recent_level_ups (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
asset_id BIGINT NOT NULL,
from_level VARCHAR(8) NOT NULL,
to_level VARCHAR(8) NOT NULL,
upgrade_time TIMESTAMPTZ NOT NULL,
asset_name VARCHAR(128),
asset_thumb VARCHAR(512)
)`)
cleanup := func() {
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
db.Close()
}
return db, schema, cleanup
}
// sanitizeName 把 "TestEventFlusher_FlushBatch" 转成合法 PG identifier小写
func sanitizeName(s string) string {
out := make([]byte, 0, len(s))
for i := 0; i < len(s); i++ {
c := s[i]
if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' {
out = append(out, c)
} else if c >= 'A' && c <= 'Z' {
out = append(out, c+32)
}
}
return string(out)
}
func TestEventFlusher_FlushBatch(t *testing.T) {
db, schema, cleanup := setupFlusherDB(t)
defer cleanup()
eventRepo := repository.NewEventRepository(db, schema)
metricRepo := repository.NewMetricRepository(db, schema)
ch := make(chan *model.Event, 10)
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second)
go flusher.Start(context.Background())
now := time.Now()
for i := 0; i < 5; i++ {
ch <- &model.Event{
EventID: fmt.Sprintf("44444444-4444-4444-4444-%012d", i),
UserID: 1, StarID: 1, EventType: "asset.like",
OccurredAt: now, ReceivedAt: now,
Properties: map[string]string{},
}
}
time.Sleep(2 * time.Second) // 等 ticker 触发 flush
flusher.Stop()
var n int
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&n)
if n != 5 {
t.Fatalf("expected 5 events, got %d", n)
}
}
func TestEventFlusher_TriggersMetricOnLevelUp(t *testing.T) {
db, schema, cleanup := setupFlusherDB(t)
defer cleanup()
eventRepo := repository.NewEventRepository(db, schema)
metricRepo := repository.NewMetricRepository(db, schema)
ch := make(chan *model.Event, 10)
// 小 batch size 触发立即 flush
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond)
go flusher.Start(context.Background())
now := time.Now()
ch <- &model.Event{
EventID: "55555555-5555-5555-5555-555555555555",
UserID: 1, StarID: 1, EventType: "asset.level_up",
OccurredAt: now, ReceivedAt: now,
Properties: map[string]string{"asset_id": "123", "from": "SR", "to": "SSR"},
}
time.Sleep(500 * time.Millisecond) // 等 batchSize 触发
flusher.Stop()
// 验证 events + metric_recent_level_ups
var eventsN, metricN int
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&eventsN)
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN)
if eventsN != 1 {
t.Fatalf("expected 1 event, got %d", eventsN)
}
if metricN != 1 {
t.Fatalf("expected 1 metric_recent_level_ups row, got %d", metricN)
}
}
func TestEventFlusher_NoMetricOnNonLevelUp(t *testing.T) {
db, schema, cleanup := setupFlusherDB(t)
defer cleanup()
eventRepo := repository.NewEventRepository(db, schema)
metricRepo := repository.NewMetricRepository(db, schema)
ch := make(chan *model.Event, 10)
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond)
go flusher.Start(context.Background())
now := time.Now()
ch <- &model.Event{
EventID: "66666666-6666-6666-6666-666666666666",
UserID: 1, StarID: 1, EventType: "asset.like",
OccurredAt: now, ReceivedAt: now,
Properties: map[string]string{},
}
time.Sleep(500 * time.Millisecond)
flusher.Stop()
var metricN int
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN)
if metricN != 0 {
t.Fatalf("expected 0 metric rows (asset.like != asset.level_up), got %d", metricN)
}
}