package worker import ( "context" "database/sql" "fmt" "os" "testing" "time" _ "github.com/lib/pq" "github.com/topfans/backend/pkg/logger" "github.com/topfans/backend/services/statisticService/model" "github.com/topfans/backend/services/statisticService/repository" ) func TestMain(m *testing.M) { // 初始化 logger (worker 内部用 logger.Logger) _ = logger.Init(logger.Config{ServiceName: "statistic-test", Environment: "test"}) os.Exit(m.Run()) } func setupFlusherDB(t *testing.T) (*sql.DB, string, func()) { dsn := os.Getenv("TEST_DATABASE_URL") if dsn == "" { t.Skip("TEST_DATABASE_URL not set") } db, err := sql.Open("postgres", dsn) if err != nil { t.Fatal(err) } if err := db.Ping(); err != nil { t.Skipf("DB ping failed: %v", err) } schema := "statistic_test_flusher_" + sanitizeName(t.Name()) db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema) // 普通表(非分区),简化测试 db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events ( id BIGSERIAL PRIMARY KEY, event_id UUID NOT NULL, user_id BIGINT NOT NULL, star_id BIGINT NOT NULL, event_type VARCHAR(64) NOT NULL, occurred_at TIMESTAMPTZ NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), properties JSONB NOT NULL DEFAULT '{}' )`) db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`) // metric_recent_level_ups 表 db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.metric_recent_level_ups ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL, star_id BIGINT NOT NULL, asset_id BIGINT NOT NULL, from_level VARCHAR(8) NOT NULL, to_level VARCHAR(8) NOT NULL, upgrade_time TIMESTAMPTZ NOT NULL, asset_name VARCHAR(128), asset_thumb VARCHAR(512) )`) cleanup := func() { db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE") db.Close() } return db, schema, cleanup } // sanitizeName 把 "TestEventFlusher_FlushBatch" 转成合法 PG identifier(小写) func sanitizeName(s string) string { out := make([]byte, 0, len(s)) for i := 0; i < len(s); i++ { c := s[i] if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' { out = append(out, c) } else if c >= 'A' && c <= 'Z' { out = append(out, c+32) } } return string(out) } func TestEventFlusher_FlushBatch(t *testing.T) { db, schema, cleanup := setupFlusherDB(t) defer cleanup() eventRepo := repository.NewEventRepository(db, schema) metricRepo := repository.NewMetricRepository(db, schema) ch := make(chan *model.Event, 10) flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second) go flusher.Start(context.Background()) now := time.Now() for i := 0; i < 5; i++ { ch <- &model.Event{ EventID: fmt.Sprintf("44444444-4444-4444-4444-%012d", i), UserID: 1, StarID: 1, EventType: "asset.like", OccurredAt: now, ReceivedAt: now, Properties: map[string]string{}, } } time.Sleep(2 * time.Second) // 等 ticker 触发 flush flusher.Stop() var n int db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&n) if n != 5 { t.Fatalf("expected 5 events, got %d", n) } } func TestEventFlusher_TriggersMetricOnLevelUp(t *testing.T) { db, schema, cleanup := setupFlusherDB(t) defer cleanup() eventRepo := repository.NewEventRepository(db, schema) metricRepo := repository.NewMetricRepository(db, schema) ch := make(chan *model.Event, 10) // 小 batch size 触发立即 flush flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond) go flusher.Start(context.Background()) now := time.Now() ch <- &model.Event{ EventID: "55555555-5555-5555-5555-555555555555", UserID: 1, StarID: 1, EventType: "asset.level_up", OccurredAt: now, ReceivedAt: now, Properties: map[string]string{"asset_id": "123", "from": "SR", "to": "SSR"}, } time.Sleep(500 * time.Millisecond) // 等 batchSize 触发 flusher.Stop() // 验证 events + metric_recent_level_ups var eventsN, metricN int db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&eventsN) db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN) if eventsN != 1 { t.Fatalf("expected 1 event, got %d", eventsN) } if metricN != 1 { t.Fatalf("expected 1 metric_recent_level_ups row, got %d", metricN) } } func TestEventFlusher_NoMetricOnNonLevelUp(t *testing.T) { db, schema, cleanup := setupFlusherDB(t) defer cleanup() eventRepo := repository.NewEventRepository(db, schema) metricRepo := repository.NewMetricRepository(db, schema) ch := make(chan *model.Event, 10) flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond) go flusher.Start(context.Background()) now := time.Now() ch <- &model.Event{ EventID: "66666666-6666-6666-6666-666666666666", UserID: 1, StarID: 1, EventType: "asset.like", OccurredAt: now, ReceivedAt: now, Properties: map[string]string{}, } time.Sleep(500 * time.Millisecond) flusher.Stop() var metricN int db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN) if metricN != 0 { t.Fatalf("expected 0 metric rows (asset.like != asset.level_up), got %d", metricN) } }