package worker import ( "context" "database/sql" "fmt" "os" "testing" "time" _ "github.com/lib/pq" ) func setupPartitionerDB(t *testing.T) (*sql.DB, string, func()) { dsn := os.Getenv("TEST_DATABASE_URL") if dsn == "" { t.Skip("TEST_DATABASE_URL not set") } db, err := sql.Open("postgres", dsn) if err != nil { t.Fatal(err) } if err := db.Ping(); err != nil { t.Skipf("DB ping failed: %v", err) } schema := "statistic_test_part_" + sanitizeName(t.Name()) db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema) // 分区 events 表 db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.events ( id BIGSERIAL, event_id UUID NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (id, received_at) ) PARTITION BY RANGE (received_at)`, schema)) cleanup := func() { db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE") db.Close() } return db, schema, cleanup } func TestPartitioner_EnsureFuture(t *testing.T) { db, schema, cleanup := setupPartitionerDB(t) defer cleanup() p := NewPartitioner(db, schema, 7, 30) if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil { t.Fatal(err) } // 验证 3 个分区存在(含今天) var n int if err := db.QueryRow( "SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename LIKE 'events_%'", schema).Scan(&n); err != nil { t.Fatal(err) } if n != 4 { t.Fatalf("expected 4 partitions (today + 3 future), got %d", n) } } func TestPartitioner_EnsureFuture_Idempotent(t *testing.T) { db, schema, cleanup := setupPartitionerDB(t) defer cleanup() p := NewPartitioner(db, schema, 7, 30) // 跑两次应都成功(IF NOT EXISTS) if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil { t.Fatal(err) } if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil { t.Fatalf("second call failed: %v", err) } } func TestPartitioner_CleanupOld(t *testing.T) { db, schema, cleanup := setupPartitionerDB(t) defer cleanup() // 手动创建 35 天前 + 5 天前的分区 old := time.Now().AddDate(0, 0, -35) recent := time.Now().AddDate(0, 0, -5) oldName := fmt.Sprintf("events_%s", old.Format("2006_01_02")) recentName := fmt.Sprintf("events_%s", recent.Format("2006_01_02")) db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`, schema, oldName, schema, old.Format("2006-01-02"), old.AddDate(0, 0, 1).Format("2006-01-02"))) db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`, schema, recentName, schema, recent.Format("2006-01-02"), recent.AddDate(0, 0, 1).Format("2006-01-02"))) // 30 天保留策略:35 天前应被清理,5 天前应保留 p := NewPartitioner(db, schema, 7, 30) if err := p.CleanupOldPartitions(context.Background()); err != nil { t.Fatal(err) } var oldN, recentN int db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, oldName).Scan(&oldN) db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, recentName).Scan(&recentN) if oldN != 0 { t.Fatalf("35-day-old partition should be dropped, still exists (count=%d)", oldN) } if recentN != 1 { t.Fatalf("5-day-old partition should remain, got count=%d", recentN) } }