- Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
107 lines
3.3 KiB
Go
107 lines
3.3 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"os"
|
||
"testing"
|
||
"time"
|
||
|
||
_ "github.com/lib/pq"
|
||
)
|
||
|
||
func setupPartitionerDB(t *testing.T) (*sql.DB, string, func()) {
|
||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||
if dsn == "" {
|
||
t.Skip("TEST_DATABASE_URL not set")
|
||
}
|
||
db, err := sql.Open("postgres", dsn)
|
||
if err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if err := db.Ping(); err != nil {
|
||
t.Skipf("DB ping failed: %v", err)
|
||
}
|
||
schema := "statistic_test_part_" + sanitizeName(t.Name())
|
||
db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema)
|
||
// 分区 events 表
|
||
db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.events (
|
||
id BIGSERIAL, event_id UUID NOT NULL,
|
||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
PRIMARY KEY (id, received_at)
|
||
) PARTITION BY RANGE (received_at)`, schema))
|
||
cleanup := func() {
|
||
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
|
||
db.Close()
|
||
}
|
||
return db, schema, cleanup
|
||
}
|
||
|
||
func TestPartitioner_EnsureFuture(t *testing.T) {
|
||
db, schema, cleanup := setupPartitionerDB(t)
|
||
defer cleanup()
|
||
|
||
p := NewPartitioner(db, schema, 7, 30)
|
||
if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
// 验证 3 个分区存在(含今天)
|
||
var n int
|
||
if err := db.QueryRow(
|
||
"SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename LIKE 'events_%'", schema).Scan(&n); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if n != 4 {
|
||
t.Fatalf("expected 4 partitions (today + 3 future), got %d", n)
|
||
}
|
||
}
|
||
|
||
func TestPartitioner_EnsureFuture_Idempotent(t *testing.T) {
|
||
db, schema, cleanup := setupPartitionerDB(t)
|
||
defer cleanup()
|
||
|
||
p := NewPartitioner(db, schema, 7, 30)
|
||
// 跑两次应都成功(IF NOT EXISTS)
|
||
if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
|
||
t.Fatalf("second call failed: %v", err)
|
||
}
|
||
}
|
||
|
||
func TestPartitioner_CleanupOld(t *testing.T) {
|
||
db, schema, cleanup := setupPartitionerDB(t)
|
||
defer cleanup()
|
||
|
||
// 手动创建 35 天前 + 5 天前的分区
|
||
old := time.Now().AddDate(0, 0, -35)
|
||
recent := time.Now().AddDate(0, 0, -5)
|
||
oldName := fmt.Sprintf("events_%s", old.Format("2006_01_02"))
|
||
recentName := fmt.Sprintf("events_%s", recent.Format("2006_01_02"))
|
||
|
||
db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events
|
||
FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`,
|
||
schema, oldName, schema, old.Format("2006-01-02"), old.AddDate(0, 0, 1).Format("2006-01-02")))
|
||
db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events
|
||
FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`,
|
||
schema, recentName, schema, recent.Format("2006-01-02"), recent.AddDate(0, 0, 1).Format("2006-01-02")))
|
||
|
||
// 30 天保留策略:35 天前应被清理,5 天前应保留
|
||
p := NewPartitioner(db, schema, 7, 30)
|
||
if err := p.CleanupOldPartitions(context.Background()); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
|
||
var oldN, recentN int
|
||
db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, oldName).Scan(&oldN)
|
||
db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, recentName).Scan(&recentN)
|
||
if oldN != 0 {
|
||
t.Fatalf("35-day-old partition should be dropped, still exists (count=%d)", oldN)
|
||
}
|
||
if recentN != 1 {
|
||
t.Fatalf("5-day-old partition should remain, got count=%d", recentN)
|
||
}
|
||
}
|