- Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
125 lines
3.7 KiB
Go
125 lines
3.7 KiB
Go
package repository
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"os"
|
||
"testing"
|
||
"time"
|
||
|
||
_ "github.com/lib/pq"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
)
|
||
|
||
// setupTestDB 创建测试 DB 连接和 schema(仅在 TEST_DATABASE_URL 设置时运行)
|
||
// schema 名带 t.Name() 后缀避免并发或顺序测试间污染
|
||
func setupTestDB(t *testing.T) (*sql.DB, string, func()) {
|
||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||
if dsn == "" {
|
||
t.Skip("TEST_DATABASE_URL not set, skipping integration test")
|
||
}
|
||
db, err := sql.Open("postgres", dsn)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if err := db.Ping(); err != nil {
|
||
t.Skipf("DB ping failed: %v", err)
|
||
}
|
||
|
||
schema := "statistic_test_" + sanitizeName(t.Name())
|
||
if _, err := db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
// 测试用普通表(不分区)— 分区逻辑由 partitioner worker 单独测(T8)
|
||
// 这样避免 unique index on partitioned table 的微妙时序问题
|
||
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
event_id UUID NOT NULL,
|
||
user_id BIGINT NOT NULL,
|
||
star_id BIGINT NOT NULL,
|
||
event_type VARCHAR(64) NOT NULL,
|
||
occurred_at TIMESTAMPTZ NOT NULL,
|
||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
properties JSONB NOT NULL DEFAULT '{}'
|
||
)`); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if _, err := db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
cleanup := func() {
|
||
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
|
||
db.Close()
|
||
}
|
||
return db, schema, cleanup
|
||
}
|
||
|
||
// sanitizeName 把 "TestEventRepo_Dedup" 转成 "testeventrepo_dedup"(PG identifier 合法)
|
||
func sanitizeName(s string) string {
|
||
out := make([]byte, 0, len(s))
|
||
for i := 0; i < len(s); i++ {
|
||
c := s[i]
|
||
if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' {
|
||
out = append(out, c)
|
||
} else if c >= 'A' && c <= 'Z' {
|
||
out = append(out, c+32) // 转小写
|
||
}
|
||
}
|
||
return string(out)
|
||
}
|
||
|
||
func TestEventRepo_InsertBatch(t *testing.T) {
|
||
db, schema, cleanup := setupTestDB(t)
|
||
defer cleanup()
|
||
|
||
repo := NewEventRepository(db, schema)
|
||
now := time.Now()
|
||
events := []*model.Event{
|
||
{EventID: "11111111-1111-1111-1111-111111111111", UserID: 100, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: now, ReceivedAt: now, Properties: map[string]string{"asset_id": "456"}},
|
||
{EventID: "22222222-2222-2222-2222-222222222222", UserID: 101, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: now, ReceivedAt: now, Properties: map[string]string{"asset_id": "457"}},
|
||
}
|
||
inserted, err := repo.InsertBatch(context.Background(), events)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if inserted != 2 {
|
||
t.Fatalf("expected 2, got %d", inserted)
|
||
}
|
||
}
|
||
|
||
func TestEventRepo_Dedup(t *testing.T) {
|
||
db, schema, cleanup := setupTestDB(t)
|
||
defer cleanup()
|
||
|
||
repo := NewEventRepository(db, schema)
|
||
now := time.Now()
|
||
e := &model.Event{EventID: "33333333-3333-3333-3333-333333333333", UserID: 100, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: now, ReceivedAt: now, Properties: map[string]string{}}
|
||
inserted1, _ := repo.InsertBatch(context.Background(), []*model.Event{e})
|
||
if inserted1 != 1 {
|
||
t.Fatalf("first insert: expected 1, got %d", inserted1)
|
||
}
|
||
// 重复 event_id + received_at 应被 ON CONFLICT 跳过
|
||
inserted2, _ := repo.InsertBatch(context.Background(), []*model.Event{e})
|
||
if inserted2 != 0 {
|
||
t.Fatalf("dedup: expected 0, got %d", inserted2)
|
||
}
|
||
}
|
||
|
||
func TestEventRepo_EmptyBatch(t *testing.T) {
|
||
db, schema, cleanup := setupTestDB(t)
|
||
defer cleanup()
|
||
|
||
repo := NewEventRepository(db, schema)
|
||
inserted, err := repo.InsertBatch(context.Background(), nil)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if inserted != 0 {
|
||
t.Fatalf("expected 0, got %d", inserted)
|
||
}
|
||
}
|