- Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
177 lines
5.1 KiB
Go
177 lines
5.1 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"os"
|
||
"testing"
|
||
"time"
|
||
|
||
_ "github.com/lib/pq"
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/repository"
|
||
)
|
||
|
||
func TestMain(m *testing.M) {
|
||
// 初始化 logger (worker 内部用 logger.Logger)
|
||
_ = logger.Init(logger.Config{ServiceName: "statistic-test", Environment: "test"})
|
||
os.Exit(m.Run())
|
||
}
|
||
|
||
func setupFlusherDB(t *testing.T) (*sql.DB, string, func()) {
|
||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||
if dsn == "" {
|
||
t.Skip("TEST_DATABASE_URL not set")
|
||
}
|
||
db, err := sql.Open("postgres", dsn)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if err := db.Ping(); err != nil {
|
||
t.Skipf("DB ping failed: %v", err)
|
||
}
|
||
schema := "statistic_test_flusher_" + sanitizeName(t.Name())
|
||
db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema)
|
||
// 普通表(非分区),简化测试
|
||
db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
event_id UUID NOT NULL,
|
||
user_id BIGINT NOT NULL,
|
||
star_id BIGINT NOT NULL,
|
||
event_type VARCHAR(64) NOT NULL,
|
||
occurred_at TIMESTAMPTZ NOT NULL,
|
||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
properties JSONB NOT NULL DEFAULT '{}'
|
||
)`)
|
||
db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`)
|
||
// metric_recent_level_ups 表
|
||
db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.metric_recent_level_ups (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
user_id BIGINT NOT NULL,
|
||
star_id BIGINT NOT NULL,
|
||
asset_id BIGINT NOT NULL,
|
||
from_level VARCHAR(8) NOT NULL,
|
||
to_level VARCHAR(8) NOT NULL,
|
||
upgrade_time TIMESTAMPTZ NOT NULL,
|
||
asset_name VARCHAR(128),
|
||
asset_thumb VARCHAR(512)
|
||
)`)
|
||
cleanup := func() {
|
||
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
|
||
db.Close()
|
||
}
|
||
return db, schema, cleanup
|
||
}
|
||
|
||
// sanitizeName 把 "TestEventFlusher_FlushBatch" 转成合法 PG identifier(小写)
|
||
func sanitizeName(s string) string {
|
||
out := make([]byte, 0, len(s))
|
||
for i := 0; i < len(s); i++ {
|
||
c := s[i]
|
||
if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' {
|
||
out = append(out, c)
|
||
} else if c >= 'A' && c <= 'Z' {
|
||
out = append(out, c+32)
|
||
}
|
||
}
|
||
return string(out)
|
||
}
|
||
|
||
func TestEventFlusher_FlushBatch(t *testing.T) {
|
||
db, schema, cleanup := setupFlusherDB(t)
|
||
defer cleanup()
|
||
|
||
eventRepo := repository.NewEventRepository(db, schema)
|
||
metricRepo := repository.NewMetricRepository(db, schema)
|
||
|
||
ch := make(chan *model.Event, 10)
|
||
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second)
|
||
|
||
go flusher.Start(context.Background())
|
||
|
||
now := time.Now()
|
||
for i := 0; i < 5; i++ {
|
||
ch <- &model.Event{
|
||
EventID: fmt.Sprintf("44444444-4444-4444-4444-%012d", i),
|
||
UserID: 1, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: now, ReceivedAt: now,
|
||
Properties: map[string]string{},
|
||
}
|
||
}
|
||
time.Sleep(2 * time.Second) // 等 ticker 触发 flush
|
||
flusher.Stop()
|
||
|
||
var n int
|
||
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&n)
|
||
if n != 5 {
|
||
t.Fatalf("expected 5 events, got %d", n)
|
||
}
|
||
}
|
||
|
||
func TestEventFlusher_TriggersMetricOnLevelUp(t *testing.T) {
|
||
db, schema, cleanup := setupFlusherDB(t)
|
||
defer cleanup()
|
||
|
||
eventRepo := repository.NewEventRepository(db, schema)
|
||
metricRepo := repository.NewMetricRepository(db, schema)
|
||
|
||
ch := make(chan *model.Event, 10)
|
||
// 小 batch size 触发立即 flush
|
||
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond)
|
||
|
||
go flusher.Start(context.Background())
|
||
|
||
now := time.Now()
|
||
ch <- &model.Event{
|
||
EventID: "55555555-5555-5555-5555-555555555555",
|
||
UserID: 1, StarID: 1, EventType: "asset.level_up",
|
||
OccurredAt: now, ReceivedAt: now,
|
||
Properties: map[string]string{"asset_id": "123", "from": "SR", "to": "SSR"},
|
||
}
|
||
time.Sleep(500 * time.Millisecond) // 等 batchSize 触发
|
||
flusher.Stop()
|
||
|
||
// 验证 events + metric_recent_level_ups
|
||
var eventsN, metricN int
|
||
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&eventsN)
|
||
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN)
|
||
|
||
if eventsN != 1 {
|
||
t.Fatalf("expected 1 event, got %d", eventsN)
|
||
}
|
||
if metricN != 1 {
|
||
t.Fatalf("expected 1 metric_recent_level_ups row, got %d", metricN)
|
||
}
|
||
}
|
||
|
||
func TestEventFlusher_NoMetricOnNonLevelUp(t *testing.T) {
|
||
db, schema, cleanup := setupFlusherDB(t)
|
||
defer cleanup()
|
||
|
||
eventRepo := repository.NewEventRepository(db, schema)
|
||
metricRepo := repository.NewMetricRepository(db, schema)
|
||
|
||
ch := make(chan *model.Event, 10)
|
||
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond)
|
||
|
||
go flusher.Start(context.Background())
|
||
|
||
now := time.Now()
|
||
ch <- &model.Event{
|
||
EventID: "66666666-6666-6666-6666-666666666666",
|
||
UserID: 1, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: now, ReceivedAt: now,
|
||
Properties: map[string]string{},
|
||
}
|
||
time.Sleep(500 * time.Millisecond)
|
||
flusher.Stop()
|
||
|
||
var metricN int
|
||
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN)
|
||
if metricN != 0 {
|
||
t.Fatalf("expected 0 metric rows (asset.like != asset.level_up), got %d", metricN)
|
||
}
|
||
}
|