feat(statistic): T4-T8 event collection framework (Event + Sink + Repo + Service + Workers)

- Event model + ToJSON
- EventSink interface + ChannelEventSink (non-blocking Submit)
- event_repo: batch INSERT ON CONFLICT DO NOTHING dedup
- event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill
- event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up
- metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock)
- partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup)
- 22 unit + integration tests (model/repo/service/sink/worker)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
zerosaturation 2026-06-08 17:16:09 +08:00
parent f5ece5e1d2
commit bed8f8e578
18 changed files with 1568 additions and 0 deletions

View File

@ -0,0 +1,33 @@
package model
import "time"
// Event 通用事件模型
// 关联: spec §2.2 + plan Task 4
type Event struct {
EventID string `json:"event_id"`
UserID int64 `json:"user_id"`
StarID int64 `json:"star_id"`
EventType string `json:"event_type"`
OccurredAt time.Time `json:"occurred_at"`
ReceivedAt time.Time `json:"received_at"`
Properties map[string]string `json:"properties"`
}
// ToJSON 序列化为 map用于日志/调试/Redis 缓存)
// 时间字段转为毫秒时间戳(与 proto event.proto 字段类型一致)
func (e *Event) ToJSON() map[string]interface{} {
props := e.Properties
if props == nil {
props = map[string]string{}
}
return map[string]interface{}{
"event_id": e.EventID,
"user_id": e.UserID,
"star_id": e.StarID,
"event_type": e.EventType,
"occurred_at": e.OccurredAt.UnixMilli(),
"received_at": e.ReceivedAt.UnixMilli(),
"properties": props,
}
}

View File

@ -0,0 +1,42 @@
package model
import (
"testing"
"time"
)
func TestEvent_ToJSON(t *testing.T) {
e := &Event{
EventID: "uuid-123",
UserID: 100,
StarID: 1,
EventType: "asset.like",
OccurredAt: time.Unix(1700000000, 0),
ReceivedAt: time.Unix(1700000001, 0),
Properties: map[string]string{"asset_id": "456"},
}
j := e.ToJSON()
if j["event_id"] != "uuid-123" {
t.Fatal("event_id mismatch")
}
if j["user_id"].(int64) != 100 {
t.Fatal("user_id mismatch")
}
if j["event_type"] != "asset.like" {
t.Fatal("event_type mismatch")
}
if j["properties"].(map[string]string)["asset_id"] != "456" {
t.Fatal("properties mismatch")
}
if j["occurred_at"].(int64) != 1700000000*1000 {
t.Fatalf("occurred_at millis = %v, want %d", j["occurred_at"], 1700000000*1000)
}
}
func TestEvent_ToJSON_EmptyProperties(t *testing.T) {
e := &Event{EventID: "e1"}
j := e.ToJSON()
if j["properties"] == nil {
t.Fatal("properties should not be nil")
}
}

View File

@ -0,0 +1,60 @@
package repository
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"github.com/topfans/backend/services/statisticService/model"
)
// EventRepository events 表操作
type EventRepository struct {
db *sql.DB
schema string
}
// NewEventRepository 构造 EventRepository
func NewEventRepository(db *sql.DB, schema string) *EventRepository {
return &EventRepository{db: db, schema: schema}
}
// InsertBatch 批量插入事件event_id 重复时 ON CONFLICT DO NOTHING
// 返回实际插入的行数
func (r *EventRepository) InsertBatch(ctx context.Context, events []*model.Event) (int, error) {
if len(events) == 0 {
return 0, nil
}
placeholders := make([]string, 0, len(events))
args := make([]interface{}, 0, len(events)*7)
for _, e := range events {
props := e.Properties
if props == nil {
props = map[string]string{}
}
propsJSON, _ := json.Marshal(props)
placeholders = append(placeholders, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)",
len(args)+1, len(args)+2, len(args)+3, len(args)+4, len(args)+5, len(args)+6, len(args)+7))
args = append(args, e.EventID, e.UserID, e.StarID, e.EventType, e.OccurredAt, e.ReceivedAt, string(propsJSON))
}
query := fmt.Sprintf(`
INSERT INTO %s.events (event_id, user_id, star_id, event_type, occurred_at, received_at, properties)
VALUES %s
ON CONFLICT (event_id, received_at) DO NOTHING
`, r.schema, strings.Join(placeholders, ","))
res, err := r.db.ExecContext(ctx, query, args...)
if err != nil {
return 0, fmt.Errorf("insert events: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("rows affected: %w", err)
}
return int(n), nil
}

View File

@ -0,0 +1,124 @@
package repository
import (
"context"
"database/sql"
"os"
"testing"
"time"
_ "github.com/lib/pq"
"github.com/topfans/backend/services/statisticService/model"
)
// setupTestDB 创建测试 DB 连接和 schema仅在 TEST_DATABASE_URL 设置时运行)
// schema 名带 t.Name() 后缀避免并发或顺序测试间污染
func setupTestDB(t *testing.T) (*sql.DB, string, func()) {
dsn := os.Getenv("TEST_DATABASE_URL")
if dsn == "" {
t.Skip("TEST_DATABASE_URL not set, skipping integration test")
}
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatal(err)
}
if err := db.Ping(); err != nil {
t.Skipf("DB ping failed: %v", err)
}
schema := "statistic_test_" + sanitizeName(t.Name())
if _, err := db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema); err != nil {
t.Fatal(err)
}
// 测试用普通表(不分区)— 分区逻辑由 partitioner worker 单独测T8
// 这样避免 unique index on partitioned table 的微妙时序问题
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
event_type VARCHAR(64) NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
properties JSONB NOT NULL DEFAULT '{}'
)`); err != nil {
t.Fatal(err)
}
if _, err := db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`); err != nil {
t.Fatal(err)
}
cleanup := func() {
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
db.Close()
}
return db, schema, cleanup
}
// sanitizeName 把 "TestEventRepo_Dedup" 转成 "testeventrepo_dedup"PG identifier 合法)
func sanitizeName(s string) string {
out := make([]byte, 0, len(s))
for i := 0; i < len(s); i++ {
c := s[i]
if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' {
out = append(out, c)
} else if c >= 'A' && c <= 'Z' {
out = append(out, c+32) // 转小写
}
}
return string(out)
}
func TestEventRepo_InsertBatch(t *testing.T) {
db, schema, cleanup := setupTestDB(t)
defer cleanup()
repo := NewEventRepository(db, schema)
now := time.Now()
events := []*model.Event{
{EventID: "11111111-1111-1111-1111-111111111111", UserID: 100, StarID: 1, EventType: "asset.like",
OccurredAt: now, ReceivedAt: now, Properties: map[string]string{"asset_id": "456"}},
{EventID: "22222222-2222-2222-2222-222222222222", UserID: 101, StarID: 1, EventType: "asset.like",
OccurredAt: now, ReceivedAt: now, Properties: map[string]string{"asset_id": "457"}},
}
inserted, err := repo.InsertBatch(context.Background(), events)
if err != nil {
t.Fatal(err)
}
if inserted != 2 {
t.Fatalf("expected 2, got %d", inserted)
}
}
func TestEventRepo_Dedup(t *testing.T) {
db, schema, cleanup := setupTestDB(t)
defer cleanup()
repo := NewEventRepository(db, schema)
now := time.Now()
e := &model.Event{EventID: "33333333-3333-3333-3333-333333333333", UserID: 100, StarID: 1, EventType: "asset.like",
OccurredAt: now, ReceivedAt: now, Properties: map[string]string{}}
inserted1, _ := repo.InsertBatch(context.Background(), []*model.Event{e})
if inserted1 != 1 {
t.Fatalf("first insert: expected 1, got %d", inserted1)
}
// 重复 event_id + received_at 应被 ON CONFLICT 跳过
inserted2, _ := repo.InsertBatch(context.Background(), []*model.Event{e})
if inserted2 != 0 {
t.Fatalf("dedup: expected 0, got %d", inserted2)
}
}
func TestEventRepo_EmptyBatch(t *testing.T) {
db, schema, cleanup := setupTestDB(t)
defer cleanup()
repo := NewEventRepository(db, schema)
inserted, err := repo.InsertBatch(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
if inserted != 0 {
t.Fatalf("expected 0, got %d", inserted)
}
}

View File

@ -0,0 +1,88 @@
package repository
import (
"context"
"database/sql"
"fmt"
"github.com/topfans/backend/services/statisticService/model"
)
// MetricRepository 预聚表 / 物化视图维护
type MetricRepository struct {
db *sql.DB
schema string
}
// NewMetricRepository 构造 MetricRepository
func NewMetricRepository(db *sql.DB, schema string) *MetricRepository {
return &MetricRepository{db: db, schema: schema}
}
// UpsertRecentLevelUp 同步写入最近升级记录(仅当 event_type == "asset.level_up"
func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error {
if e == nil || e.EventType != "asset.level_up" {
return nil
}
assetID := e.Properties["asset_id"]
fromLevel := e.Properties["from"]
toLevel := e.Properties["to"]
upgradeTime := e.OccurredAt
if assetID == "" || toLevel == "" {
return nil
}
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_recent_level_ups
(user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, r.schema),
e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "")
return err
}
// RefreshWeeklyUserIncome 全量重算本周 rank + totalpg_try_advisory_lock 防多实例)
func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error {
var got bool
if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil {
return err
}
if !got {
return nil // 抢不到锁本轮跳过
}
defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star)
SELECT
star_id, user_id,
DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start,
SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0
THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal,
ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star
FROM %s.events
WHERE event_type IN ('exhibition.revenue', 'crystal.change')
AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai')
GROUP BY star_id, user_id
ON CONFLICT (star_id, user_id, week_start) DO UPDATE
SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW()
`, r.schema, r.schema))
return err
}
// RefreshUpcomingLevelUps 计算每个 asset 的 like_progress + duration_progress
// 注: public.asset_level_config 表名/字段名需 P1 末向 assetService 同学确认
func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error {
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress)
SELECT
a.user_id, a.star_id, a.id,
LEAST(100, (a.like_count::FLOAT / NULLIF(alc.upgrade_like_threshold, 0) * 100)::INT) AS like_progress,
LEAST(100, (EXTRACT(EPOCH FROM (NOW() - a.placed_at))::FLOAT / NULLIF(alc.upgrade_duration_seconds, 1) * 100)::INT) AS duration_progress
FROM public.assets a
JOIN public.asset_level_config alc ON alc.level = a.level
WHERE a.status = 'active' AND a.deleted_at IS NULL
ON CONFLICT (user_id, star_id, asset_id) DO UPDATE
SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW()
`, r.schema))
return err
}

View File

@ -0,0 +1,101 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/sink"
pb "github.com/topfans/backend/pkg/proto/statistic"
)
// 业务侧错误
var (
ErrInvalidEventID = errors.New("invalid event_id")
ErrInvalidEventType = errors.New("invalid event_type")
ErrPropertiesTooLarge = errors.New("properties too large")
)
// MaxPropertiesSize properties JSON 最大 1KB防止 abuse
const MaxPropertiesSize = 1024
// DefaultEventTypeWhitelist 默认事件类型白名单
var DefaultEventTypeWhitelist = []string{
"asset.like",
"asset.mint",
"exhibition.start",
"exhibition.end",
"exhibition.revenue",
"asset.level_up",
"crystal.change",
}
// EventService 事件业务逻辑层
type EventService struct {
sink sink.EventSink
whiteList map[string]bool
}
// NewEventService 构造 EventService
func NewEventService(s sink.EventSink, whiteList []string) *EventService {
wl := make(map[string]bool, len(whiteList))
for _, t := range whiteList {
wl[t] = true
}
return &EventService{sink: s, whiteList: wl}
}
// validate 校验事件
func (s *EventService) validate(e *model.Event) error {
if e == nil {
return ErrInvalidEventID
}
if e.EventID == "" {
return ErrInvalidEventID
}
if !s.whiteList[e.EventType] {
return fmt.Errorf("%w: %s", ErrInvalidEventType, e.EventType)
}
b, _ := json.Marshal(e.Properties)
if len(b) > MaxPropertiesSize {
return fmt.Errorf("%w: %d > %d", ErrPropertiesTooLarge, len(b), MaxPropertiesSize)
}
return nil
}
// TrackEvent 提交单个事件(非阻塞推入 sink
func (s *EventService) TrackEvent(ctx context.Context, e *model.Event) (*pb.TrackEventResponse, error) {
if e.ReceivedAt.IsZero() {
e.ReceivedAt = time.Now()
}
if err := s.validate(e); err != nil {
return nil, err
}
if err := s.sink.Submit(ctx, e); err != nil {
return &pb.TrackEventResponse{Accepted: 0, Rejected: 1}, nil
}
return &pb.TrackEventResponse{Accepted: 1, Rejected: 0}, nil
}
// BatchTrackEvent 批量提交
func (s *EventService) BatchTrackEvent(ctx context.Context, es []*model.Event) (*pb.TrackEventResponse, error) {
accepted, rejected := int32(0), int32(0)
for _, e := range es {
if e.ReceivedAt.IsZero() {
e.ReceivedAt = time.Now()
}
if err := s.validate(e); err != nil {
rejected++
continue
}
if err := s.sink.Submit(ctx, e); err != nil {
rejected++
} else {
accepted++
}
}
return &pb.TrackEventResponse{Accepted: accepted, Rejected: rejected}, nil
}

View File

@ -0,0 +1,124 @@
package service
import (
"context"
"errors"
"testing"
"time"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/sink"
)
// mockSink 记录所有 submit 的事件
type mockSink struct {
events []*model.Event
err error
}
func (m *mockSink) Submit(ctx context.Context, e *model.Event) error {
if m.err != nil {
return m.err
}
m.events = append(m.events, e)
return nil
}
func (m *mockSink) SubmitBatch(ctx context.Context, es []*model.Event) error {
if m.err != nil {
return m.err
}
m.events = append(m.events, es...)
return nil
}
func (m *mockSink) Close() error { return nil }
func TestEventService_TrackEvent_Success(t *testing.T) {
ms := &mockSink{}
svc := NewEventService(ms, []string{"asset.like", "asset.mint"})
resp, err := svc.TrackEvent(context.Background(), &model.Event{
EventID: "u1", UserID: 1, StarID: 1, EventType: "asset.like",
OccurredAt: time.Now(), Properties: map[string]string{},
})
if err != nil {
t.Fatal(err)
}
if resp.Accepted != 1 || resp.Rejected != 0 {
t.Fatalf("got accepted=%d rejected=%d, want 1/0", resp.Accepted, resp.Rejected)
}
if len(ms.events) != 1 {
t.Fatal("event not submitted")
}
if ms.events[0].ReceivedAt.IsZero() {
t.Fatal("ReceivedAt should be set by service")
}
}
func TestEventService_TrackEvent_InvalidEventType(t *testing.T) {
ms := &mockSink{}
svc := NewEventService(ms, []string{"asset.like"})
_, err := svc.TrackEvent(context.Background(), &model.Event{EventID: "u1", EventType: "evil.type"})
if err == nil {
t.Fatal("expected error for invalid event type")
}
if !errors.Is(err, ErrInvalidEventType) {
t.Fatalf("expected ErrInvalidEventType, got %v", err)
}
}
func TestEventService_TrackEvent_EmptyEventID(t *testing.T) {
ms := &mockSink{}
svc := NewEventService(ms, []string{"asset.like"})
_, err := svc.TrackEvent(context.Background(), &model.Event{EventType: "asset.like"})
if !errors.Is(err, ErrInvalidEventID) {
t.Fatalf("expected ErrInvalidEventID, got %v", err)
}
}
func TestEventService_TrackEvent_PropertiesTooLarge(t *testing.T) {
ms := &mockSink{}
svc := NewEventService(ms, []string{"asset.like"})
big := make(map[string]string)
for i := 0; i < 200; i++ {
big["k"+string(rune(i))] = string(make([]byte, 100))
}
_, err := svc.TrackEvent(context.Background(), &model.Event{
EventID: "u1", EventType: "asset.like", Properties: big,
})
if !errors.Is(err, ErrPropertiesTooLarge) {
t.Fatalf("expected ErrPropertiesTooLarge, got %v", err)
}
}
func TestEventService_TrackEvent_SinkError(t *testing.T) {
ms := &mockSink{err: sink.ErrChannelFull}
svc := NewEventService(ms, []string{"asset.like"})
resp, err := svc.TrackEvent(context.Background(), &model.Event{
EventID: "u1", EventType: "asset.like",
})
if err != nil {
t.Fatal("expected no error (degraded to rejected), got", err)
}
if resp.Accepted != 0 || resp.Rejected != 1 {
t.Fatalf("expected 0/1, got %d/%d", resp.Accepted, resp.Rejected)
}
}
func TestEventService_BatchTrackEvent_Mixed(t *testing.T) {
ms := &mockSink{}
svc := NewEventService(ms, []string{"asset.like", "asset.mint"})
events := []*model.Event{
{EventID: "1", EventType: "asset.like", Properties: map[string]string{}},
{EventID: "2", EventType: "asset.mint", Properties: map[string]string{}},
{EventID: "3", EventType: "evil.type"}, // 拒绝
{EventID: "", EventType: "asset.like"}, // 拒绝
}
resp, err := svc.BatchTrackEvent(context.Background(), events)
if err != nil {
t.Fatal(err)
}
if resp.Accepted != 2 || resp.Rejected != 2 {
t.Fatalf("got %d/%d, want 2/2", resp.Accepted, resp.Rejected)
}
}

View File

@ -0,0 +1,47 @@
package sink
import (
"context"
"errors"
"github.com/topfans/backend/services/statisticService/model"
)
// ErrChannelFull channel 满时返回
var ErrChannelFull = errors.New("event channel full")
// ChannelEventSink 把事件推到 channel由 event_flusher 异步消费
type ChannelEventSink struct {
ch chan<- *model.Event
}
// NewChannelEventSink 构造 channel sink
func NewChannelEventSink(ch chan<- *model.Event) *ChannelEventSink {
return &ChannelEventSink{ch: ch}
}
// Submit 非阻塞提交channel 满时立即返回 ErrChannelFull
func (s *ChannelEventSink) Submit(ctx context.Context, e *model.Event) error {
if e == nil {
return errors.New("event is nil")
}
select {
case s.ch <- e:
return nil
default:
return ErrChannelFull
}
}
// SubmitBatch 批量提交,任一失败立即停止
func (s *ChannelEventSink) SubmitBatch(ctx context.Context, es []*model.Event) error {
for _, e := range es {
if err := s.Submit(ctx, e); err != nil {
return err
}
}
return nil
}
// Close 关闭channel sink 无状态no-op
func (s *ChannelEventSink) Close() error { return nil }

View File

@ -0,0 +1,59 @@
package sink
import (
"context"
"testing"
"time"
"github.com/topfans/backend/services/statisticService/model"
)
func TestChannelEventSink_Submit(t *testing.T) {
ch := make(chan *model.Event, 10)
s := NewChannelEventSink(ch)
e := &model.Event{EventID: "test-1"}
if err := s.Submit(context.Background(), e); err != nil {
t.Fatalf("Submit failed: %v", err)
}
select {
case got := <-ch:
if got.EventID != "test-1" {
t.Fatal("event mismatch")
}
case <-time.After(100 * time.Millisecond):
t.Fatal("no event received")
}
}
func TestChannelEventSink_SubmitBatch(t *testing.T) {
ch := make(chan *model.Event, 10)
s := NewChannelEventSink(ch)
events := []*model.Event{{EventID: "a"}, {EventID: "b"}}
if err := s.SubmitBatch(context.Background(), events); err != nil {
t.Fatalf("SubmitBatch failed: %v", err)
}
if len(ch) != 2 {
t.Fatalf("expected 2 events, got %d", len(ch))
}
}
func TestChannelEventSink_ChannelFull(t *testing.T) {
ch := make(chan *model.Event, 1)
s := NewChannelEventSink(ch)
// 第一个事件占用 channel
if err := s.Submit(context.Background(), &model.Event{EventID: "first"}); err != nil {
t.Fatalf("first submit failed: %v", err)
}
// 第二个应失败channel 满)
if err := s.Submit(context.Background(), &model.Event{EventID: "second"}); err != ErrChannelFull {
t.Fatalf("expected ErrChannelFull, got %v", err)
}
}
func TestChannelEventSink_NilEvent(t *testing.T) {
ch := make(chan *model.Event, 10)
s := NewChannelEventSink(ch)
if err := s.Submit(context.Background(), nil); err == nil {
t.Fatal("expected error for nil event")
}
}

View File

@ -0,0 +1,19 @@
package sink
import (
"context"
"github.com/topfans/backend/services/statisticService/model"
)
// EventSink 事件 sink 接口
// 本期实现: ChannelEventSink推到 channel由 event_flusher 消费)
// 未来扩展: KafkaEventSink / ClickHouseDualWriteSink / SamplingEventSink
type EventSink interface {
// Submit 提交单个事件非阻塞channel 满时返回 ErrChannelFull
Submit(ctx context.Context, e *model.Event) error
// SubmitBatch 批量提交;任一失败即停止
SubmitBatch(ctx context.Context, es []*model.Event) error
// Close 关闭 sink
Close() error
}

View File

@ -0,0 +1,111 @@
package worker
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/metrics"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/repository"
)
// EventFlusher 攒批落库 worker
// - 从 channel 接收事件
// - 攒 batchSize 条 或 到 interval 时触发落库
// - 落库后同步触发 metric_recent_level_ups 更新(仅 asset.level_up 事件)
type EventFlusher struct {
ch <-chan *model.Event
eventRepo *repository.EventRepository
metricRepo *repository.MetricRepository
batchSize int
interval time.Duration
mu sync.Mutex
running bool
stop chan struct{}
}
// NewEventFlusher 构造 EventFlusher
func NewEventFlusher(
ch <-chan *model.Event,
eventRepo *repository.EventRepository,
metricRepo *repository.MetricRepository,
batchSize int,
interval time.Duration,
) *EventFlusher {
return &EventFlusher{
ch: ch,
eventRepo: eventRepo,
metricRepo: metricRepo,
batchSize: batchSize,
interval: interval,
stop: make(chan struct{}),
}
}
// Start 启动 worker阻塞直到 ctx 取消或 Stop
func (f *EventFlusher) Start(ctx context.Context) {
f.mu.Lock()
f.running = true
f.mu.Unlock()
metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(1)
defer metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(0)
batch := make([]*model.Event, 0, f.batchSize)
ticker := time.NewTicker(f.interval)
defer ticker.Stop()
flush := func() {
if len(batch) == 0 {
return
}
inserted, err := f.eventRepo.InsertBatch(ctx, batch)
if err != nil {
logger.Logger.Error("event_flusher insert failed", zap.Error(err), zap.Int("batch", len(batch)))
metrics.EventDBInsertTotal.WithLabelValues("failed").Inc()
} else {
metrics.EventDBInsertTotal.WithLabelValues("success").Inc()
}
// 同步触发 metric_recent_level_ups不阻塞落库
for _, e := range batch {
if err := f.metricRepo.UpsertRecentLevelUp(ctx, e); err != nil {
logger.Logger.Warn("UpsertRecentLevelUp failed",
zap.String("event_id", e.EventID), zap.Error(err))
}
}
logger.Logger.Debug("event_flusher batch flushed",
zap.Int("inserted", inserted), zap.Int("batch", len(batch)))
// 复制 batch 避免被覆盖
batch = batch[:0]
}
for {
select {
case <-f.stop:
flush()
return
case e := <-f.ch:
batch = append(batch, e)
metrics.EventChannelSize.Set(float64(len(f.ch)))
if len(batch) >= f.batchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
// Stop 停止 worker会先 flush 残留事件再退出)
func (f *EventFlusher) Stop() {
f.mu.Lock()
defer f.mu.Unlock()
if f.running {
close(f.stop)
f.running = false
}
}

View File

@ -0,0 +1,176 @@
package worker
import (
"context"
"database/sql"
"fmt"
"os"
"testing"
"time"
_ "github.com/lib/pq"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/repository"
)
func TestMain(m *testing.M) {
// 初始化 logger (worker 内部用 logger.Logger)
_ = logger.Init(logger.Config{ServiceName: "statistic-test", Environment: "test"})
os.Exit(m.Run())
}
func setupFlusherDB(t *testing.T) (*sql.DB, string, func()) {
dsn := os.Getenv("TEST_DATABASE_URL")
if dsn == "" {
t.Skip("TEST_DATABASE_URL not set")
}
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatal(err)
}
if err := db.Ping(); err != nil {
t.Skipf("DB ping failed: %v", err)
}
schema := "statistic_test_flusher_" + sanitizeName(t.Name())
db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema)
// 普通表(非分区),简化测试
db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.events (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
event_type VARCHAR(64) NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
properties JSONB NOT NULL DEFAULT '{}'
)`)
db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS "` + schema + `.idx_event_id" ON ` + schema + `.events (event_id, received_at)`)
// metric_recent_level_ups 表
db.Exec(`CREATE TABLE IF NOT EXISTS ` + schema + `.metric_recent_level_ups (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
asset_id BIGINT NOT NULL,
from_level VARCHAR(8) NOT NULL,
to_level VARCHAR(8) NOT NULL,
upgrade_time TIMESTAMPTZ NOT NULL,
asset_name VARCHAR(128),
asset_thumb VARCHAR(512)
)`)
cleanup := func() {
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
db.Close()
}
return db, schema, cleanup
}
// sanitizeName 把 "TestEventFlusher_FlushBatch" 转成合法 PG identifier小写
func sanitizeName(s string) string {
out := make([]byte, 0, len(s))
for i := 0; i < len(s); i++ {
c := s[i]
if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' {
out = append(out, c)
} else if c >= 'A' && c <= 'Z' {
out = append(out, c+32)
}
}
return string(out)
}
func TestEventFlusher_FlushBatch(t *testing.T) {
db, schema, cleanup := setupFlusherDB(t)
defer cleanup()
eventRepo := repository.NewEventRepository(db, schema)
metricRepo := repository.NewMetricRepository(db, schema)
ch := make(chan *model.Event, 10)
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second)
go flusher.Start(context.Background())
now := time.Now()
for i := 0; i < 5; i++ {
ch <- &model.Event{
EventID: fmt.Sprintf("44444444-4444-4444-4444-%012d", i),
UserID: 1, StarID: 1, EventType: "asset.like",
OccurredAt: now, ReceivedAt: now,
Properties: map[string]string{},
}
}
time.Sleep(2 * time.Second) // 等 ticker 触发 flush
flusher.Stop()
var n int
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&n)
if n != 5 {
t.Fatalf("expected 5 events, got %d", n)
}
}
func TestEventFlusher_TriggersMetricOnLevelUp(t *testing.T) {
db, schema, cleanup := setupFlusherDB(t)
defer cleanup()
eventRepo := repository.NewEventRepository(db, schema)
metricRepo := repository.NewMetricRepository(db, schema)
ch := make(chan *model.Event, 10)
// 小 batch size 触发立即 flush
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond)
go flusher.Start(context.Background())
now := time.Now()
ch <- &model.Event{
EventID: "55555555-5555-5555-5555-555555555555",
UserID: 1, StarID: 1, EventType: "asset.level_up",
OccurredAt: now, ReceivedAt: now,
Properties: map[string]string{"asset_id": "123", "from": "SR", "to": "SSR"},
}
time.Sleep(500 * time.Millisecond) // 等 batchSize 触发
flusher.Stop()
// 验证 events + metric_recent_level_ups
var eventsN, metricN int
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.events", schema)).Scan(&eventsN)
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN)
if eventsN != 1 {
t.Fatalf("expected 1 event, got %d", eventsN)
}
if metricN != 1 {
t.Fatalf("expected 1 metric_recent_level_ups row, got %d", metricN)
}
}
func TestEventFlusher_NoMetricOnNonLevelUp(t *testing.T) {
db, schema, cleanup := setupFlusherDB(t)
defer cleanup()
eventRepo := repository.NewEventRepository(db, schema)
metricRepo := repository.NewMetricRepository(db, schema)
ch := make(chan *model.Event, 10)
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 1, 100*time.Millisecond)
go flusher.Start(context.Background())
now := time.Now()
ch <- &model.Event{
EventID: "66666666-6666-6666-6666-666666666666",
UserID: 1, StarID: 1, EventType: "asset.like",
OccurredAt: now, ReceivedAt: now,
Properties: map[string]string{},
}
time.Sleep(500 * time.Millisecond)
flusher.Stop()
var metricN int
db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s.metric_recent_level_ups", schema)).Scan(&metricN)
if metricN != 0 {
t.Fatalf("expected 0 metric rows (asset.like != asset.level_up), got %d", metricN)
}
}

View File

@ -0,0 +1,129 @@
package worker
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/metrics"
)
// mvList 4 个物化视图
var mvList = []string{
"mv_daily_user_income",
"mv_daily_exhibition_revenue",
"mv_daily_like_income",
"mv_asset_level_distribution",
}
// Materializer 物化视图刷新 worker
// - 每个 MV 独立 goroutine + ticker错开 30s 启动
// - pg_try_advisory_lock 防多实例重复刷新
// - 每次刷新写 refresh_log
type Materializer struct {
db *sql.DB
schema string
mu sync.Mutex
running bool
stop chan struct{}
}
// NewMaterializer 构造
func NewMaterializer(db *sql.DB, schema string) *Materializer {
return &Materializer{db: db, schema: schema, stop: make(chan struct{})}
}
// RefreshOne 刷新单个 MVpg_try_advisory_lock 防多实例)
// 返回 errornil = 成功或锁被其他实例抢走)
func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error {
// 抢锁234567 区别于 weekly user income 的 123456
var got bool
if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil {
return err
}
if !got {
return nil // 锁被其他实例抢走,本轮跳过
}
defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)")
// 记录开始
t0 := time.Now()
var mvID int
if err := m.db.QueryRowContext(ctx,
fmt.Sprintf(`INSERT INTO %s.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`, m.schema),
mvName).Scan(&mvID); err != nil {
return err
}
// 执行 REFRESH
_, err := m.db.ExecContext(ctx,
fmt.Sprintf("REFRESH MATERIALIZED VIEW CONCURRENTLY %s.%s", m.schema, mvName))
if err != nil {
_, _ = m.db.ExecContext(ctx,
fmt.Sprintf(`UPDATE %s.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, m.schema),
err.Error(), mvID)
metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc()
return err
}
_, _ = m.db.ExecContext(ctx,
fmt.Sprintf(`UPDATE %s.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, m.schema), mvID)
metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc()
metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds())
return nil
}
// Start 启动 worker每个 MV 一个 goroutine + ticker
func (m *Materializer) Start(ctx context.Context, interval time.Duration) {
m.mu.Lock()
m.running = true
m.mu.Unlock()
metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1)
defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0)
var wg sync.WaitGroup
for i, mv := range mvList {
wg.Add(1)
go func(idx int, mvName string) {
defer wg.Done()
// 错开启动30s × index避免同时刷新
select {
case <-m.stop:
return
case <-time.After(time.Duration(idx*30) * time.Second):
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-m.stop:
return
case <-ticker.C:
if err := m.RefreshOne(ctx, mvName); err != nil {
logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err))
}
}
}
}(i, mv)
}
<-m.stop
wg.Wait()
}
// Stop 停止 worker
func (m *Materializer) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
close(m.stop)
m.running = false
}
}

View File

@ -0,0 +1,65 @@
package worker
import (
"context"
"database/sql"
"fmt"
"os"
"testing"
_ "github.com/lib/pq"
)
func setupMaterializerDB(t *testing.T) (*sql.DB, string, func()) {
dsn := os.Getenv("TEST_DATABASE_URL")
if dsn == "" {
t.Skip("TEST_DATABASE_URL not set")
}
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatal(err)
}
if err := db.Ping(); err != nil {
t.Skipf("DB ping failed: %v", err)
}
schema := "statistic_test_mat_" + sanitizeName(t.Name())
db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema)
// refresh_log 表MV DDL 不需要建,因为只测 RefreshOne 对 refresh_log 的写)
db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.refresh_log (
id BIGSERIAL PRIMARY KEY,
mv_name VARCHAR(128) NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ,
row_count BIGINT,
status VARCHAR(16) NOT NULL,
error_message TEXT
)`, schema))
cleanup := func() {
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
db.Close()
}
return db, schema, cleanup
}
func TestMaterializer_RefreshOne_LogsToRefreshLog(t *testing.T) {
db, schema, cleanup := setupMaterializerDB(t)
defer cleanup()
m := NewMaterializer(db, schema)
// 用一个不存在的 MV 名(期望失败但 refresh_log 仍写入)
err := m.RefreshOne(context.Background(), "mv_does_not_exist")
if err == nil {
t.Fatal("expected error for non-existent MV")
}
// 验证 refresh_log 有 failed 记录
var status string
if err := db.QueryRow(
"SELECT status FROM "+schema+`.refresh_log WHERE mv_name='mv_does_not_exist' ORDER BY id DESC LIMIT 1`,
).Scan(&status); err != nil {
t.Fatal(err)
}
if status != "failed" {
t.Fatalf("expected status=failed, got %s", status)
}
}

View File

@ -0,0 +1,72 @@
package worker
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/metrics"
"github.com/topfans/backend/services/statisticService/repository"
)
// UpcomingLevelUpsUpdater 即将升级进度 worker每 15 分钟)
type UpcomingLevelUpsUpdater struct {
repo *repository.MetricRepository
interval time.Duration
mu sync.Mutex
running bool
stop chan struct{}
}
// NewUpcomingLevelUpsUpdater 构造
func NewUpcomingLevelUpsUpdater(repo *repository.MetricRepository, interval time.Duration) *UpcomingLevelUpsUpdater {
return &UpcomingLevelUpsUpdater{repo: repo, interval: interval, stop: make(chan struct{})}
}
// Start 启动 worker阻塞
func (u *UpcomingLevelUpsUpdater) Start(ctx context.Context) {
u.mu.Lock()
u.running = true
u.mu.Unlock()
metrics.WorkerRunningCount.WithLabelValues("upcoming_level_ups").Set(1)
defer metrics.WorkerRunningCount.WithLabelValues("upcoming_level_ups").Set(0)
ticker := time.NewTicker(u.interval)
defer ticker.Stop()
u.runOnce(ctx)
for {
select {
case <-u.stop:
return
case <-ticker.C:
u.runOnce(ctx)
}
}
}
func (u *UpcomingLevelUpsUpdater) runOnce(ctx context.Context) {
t0 := time.Now()
if err := u.repo.RefreshUpcomingLevelUps(ctx); err != nil {
logger.Logger.Error("RefreshUpcomingLevelUps failed", zap.Error(err))
metrics.MVRefreshTotal.WithLabelValues("upcoming_level_ups", "failed").Inc()
return
}
metrics.MVRefreshTotal.WithLabelValues("upcoming_level_ups", "success").Inc()
metrics.MVRefreshDuration.WithLabelValues("upcoming_level_ups").Observe(time.Since(t0).Seconds())
}
// Stop 停止 worker
func (u *UpcomingLevelUpsUpdater) Stop() {
u.mu.Lock()
defer u.mu.Unlock()
if u.running {
close(u.stop)
u.running = false
}
}

View File

@ -0,0 +1,75 @@
package worker
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/metrics"
"github.com/topfans/backend/services/statisticService/repository"
)
// WeeklyUserIncomeUpdater 每周用户收入 + 排名 worker
// - 每 5 分钟从 events 聚合本周收入,计算 rank
// - 用 pg_try_advisory_lock 防多实例重复
type WeeklyUserIncomeUpdater struct {
repo *repository.MetricRepository
interval time.Duration
mu sync.Mutex
running bool
stop chan struct{}
}
// NewWeeklyUserIncomeUpdater 构造
func NewWeeklyUserIncomeUpdater(repo *repository.MetricRepository, interval time.Duration) *WeeklyUserIncomeUpdater {
return &WeeklyUserIncomeUpdater{repo: repo, interval: interval, stop: make(chan struct{})}
}
// Start 启动 worker阻塞
func (w *WeeklyUserIncomeUpdater) Start(ctx context.Context) {
w.mu.Lock()
w.running = true
w.mu.Unlock()
metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(1)
defer metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(0)
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
// 启动时立即跑一次
w.runOnce(ctx)
for {
select {
case <-w.stop:
return
case <-ticker.C:
w.runOnce(ctx)
}
}
}
func (w *WeeklyUserIncomeUpdater) runOnce(ctx context.Context) {
t0 := time.Now()
if err := w.repo.RefreshWeeklyUserIncome(ctx); err != nil {
logger.Logger.Error("RefreshWeeklyUserIncome failed", zap.Error(err))
metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "failed").Inc()
return
}
metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "success").Inc()
metrics.MVRefreshDuration.WithLabelValues("weekly_user_income").Observe(time.Since(t0).Seconds())
}
// Stop 停止 worker
func (w *WeeklyUserIncomeUpdater) Stop() {
w.mu.Lock()
defer w.mu.Unlock()
if w.running {
close(w.stop)
w.running = false
}
}

View File

@ -0,0 +1,137 @@
package worker
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/statisticService/metrics"
)
// Partitioner events 分区自动管理
// - 启动时创建未来 N 天分区
// - 每天 00:05 滚动预创建
// - 每天 00:30 清理超过保留期的旧分区
type Partitioner struct {
db *sql.DB
schema string
preCreateDays int
retentionDays int
mu sync.Mutex
running bool
stop chan struct{}
}
// NewPartitioner 构造
func NewPartitioner(db *sql.DB, schema string, preCreateDays, retentionDays int) *Partitioner {
return &Partitioner{
db: db,
schema: schema,
preCreateDays: preCreateDays,
retentionDays: retentionDays,
stop: make(chan struct{}),
}
}
// EnsureFuturePartitions 创建未来 days 天的分区(含今天)
func (p *Partitioner) EnsureFuturePartitions(ctx context.Context, days int) error {
now := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600))
for i := 0; i <= days; i++ {
d := now.AddDate(0, 0, i)
next := d.AddDate(0, 0, 1)
name := fmt.Sprintf("events_%s", d.Format("2006_01_02"))
sqlStr := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.%s PARTITION OF %s.events
FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')
`, p.schema, name, p.schema, d.Format("2006-01-02"), next.Format("2006-01-02"))
if _, err := p.db.ExecContext(ctx, sqlStr); err != nil {
return fmt.Errorf("create partition %s: %w", name, err)
}
metrics.EventsPartitionCount.Inc()
}
return nil
}
// CleanupOldPartitions 删除超过 retentionDays 的旧分区
func (p *Partitioner) CleanupOldPartitions(ctx context.Context) error {
cutoff := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)).AddDate(0, 0, -p.retentionDays)
cutoffName := fmt.Sprintf("events_%s", cutoff.Format("2006_01_02"))
// LIKE 'events_%'% 作为 LIKE 通配符匹配任何后缀)
// 用字符串拼接避免与 fmt.Sprintf 的 % 转义冲突
rows, err := p.db.QueryContext(ctx,
"SELECT tablename FROM pg_tables WHERE schemaname = $1 AND tablename LIKE 'events_%' AND tablename < $2",
p.schema, cutoffName)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
logger.Logger.Warn("scan partition name failed", zap.Error(err))
continue
}
if _, err := p.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s CASCADE", p.schema, name)); err != nil {
logger.Logger.Warn("drop partition failed", zap.String("name", name), zap.Error(err))
} else {
logger.Logger.Info("dropped old partition", zap.String("name", name))
}
}
return nil
}
// Start 启动 worker阻塞
// - 启动时确保未来 N 天分区
// - 每小时检查是否到了 00:05创建或 00:30清理窗口
func (p *Partitioner) Start(ctx context.Context) {
p.mu.Lock()
p.running = true
p.mu.Unlock()
metrics.WorkerRunningCount.WithLabelValues("partitioner").Set(1)
defer metrics.WorkerRunningCount.WithLabelValues("partitioner").Set(0)
// 启动时跑一次
if err := p.EnsureFuturePartitions(ctx, p.preCreateDays); err != nil {
logger.Logger.Error("EnsureFuturePartitions failed at startup", zap.Error(err))
}
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-p.stop:
return
case <-ticker.C:
hour := time.Now().Hour()
min := time.Now().Minute()
if hour == 0 && min < 10 {
if err := p.EnsureFuturePartitions(ctx, p.preCreateDays); err != nil {
logger.Logger.Error("EnsureFuturePartitions failed", zap.Error(err))
}
}
if hour == 0 && min >= 30 && min < 40 {
if err := p.CleanupOldPartitions(ctx); err != nil {
logger.Logger.Error("CleanupOldPartitions failed", zap.Error(err))
}
}
}
}
}
// Stop 停止 worker
func (p *Partitioner) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
if p.running {
close(p.stop)
p.running = false
}
}

View File

@ -0,0 +1,106 @@
package worker
import (
"context"
"database/sql"
"fmt"
"os"
"testing"
"time"
_ "github.com/lib/pq"
)
func setupPartitionerDB(t *testing.T) (*sql.DB, string, func()) {
dsn := os.Getenv("TEST_DATABASE_URL")
if dsn == "" {
t.Skip("TEST_DATABASE_URL not set")
}
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatal(err)
}
if err := db.Ping(); err != nil {
t.Skipf("DB ping failed: %v", err)
}
schema := "statistic_test_part_" + sanitizeName(t.Name())
db.Exec("CREATE SCHEMA IF NOT EXISTS " + schema)
// 分区 events 表
db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.events (
id BIGSERIAL, event_id UUID NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, received_at)
) PARTITION BY RANGE (received_at)`, schema))
cleanup := func() {
db.Exec("DROP SCHEMA IF EXISTS " + schema + " CASCADE")
db.Close()
}
return db, schema, cleanup
}
func TestPartitioner_EnsureFuture(t *testing.T) {
db, schema, cleanup := setupPartitionerDB(t)
defer cleanup()
p := NewPartitioner(db, schema, 7, 30)
if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
t.Fatal(err)
}
// 验证 3 个分区存在(含今天)
var n int
if err := db.QueryRow(
"SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename LIKE 'events_%'", schema).Scan(&n); err != nil {
t.Fatal(err)
}
if n != 4 {
t.Fatalf("expected 4 partitions (today + 3 future), got %d", n)
}
}
func TestPartitioner_EnsureFuture_Idempotent(t *testing.T) {
db, schema, cleanup := setupPartitionerDB(t)
defer cleanup()
p := NewPartitioner(db, schema, 7, 30)
// 跑两次应都成功IF NOT EXISTS
if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
t.Fatal(err)
}
if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
t.Fatalf("second call failed: %v", err)
}
}
func TestPartitioner_CleanupOld(t *testing.T) {
db, schema, cleanup := setupPartitionerDB(t)
defer cleanup()
// 手动创建 35 天前 + 5 天前的分区
old := time.Now().AddDate(0, 0, -35)
recent := time.Now().AddDate(0, 0, -5)
oldName := fmt.Sprintf("events_%s", old.Format("2006_01_02"))
recentName := fmt.Sprintf("events_%s", recent.Format("2006_01_02"))
db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events
FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`,
schema, oldName, schema, old.Format("2006-01-02"), old.AddDate(0, 0, 1).Format("2006-01-02")))
db.Exec(fmt.Sprintf(`CREATE TABLE %s.%s PARTITION OF %s.events
FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')`,
schema, recentName, schema, recent.Format("2006-01-02"), recent.AddDate(0, 0, 1).Format("2006-01-02")))
// 30 天保留策略35 天前应被清理5 天前应保留
p := NewPartitioner(db, schema, 7, 30)
if err := p.CleanupOldPartitions(context.Background()); err != nil {
t.Fatal(err)
}
var oldN, recentN int
db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, oldName).Scan(&oldN)
db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname = $1 AND tablename = $2", schema, recentName).Scan(&recentN)
if oldN != 0 {
t.Fatalf("35-day-old partition should be dropped, still exists (count=%d)", oldN)
}
if recentN != 1 {
t.Fatalf("5-day-old partition should remain, got count=%d", recentN)
}
}