topfans/docs/superpowers/plans/2026-06-08-statistic-kanban-and-event-implementation.md
2026-06-08 17:06:53 +08:00

94 KiB
Raw Blame History

statisticService 看板 + 事件采集 实施计划

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 实现 statisticService 后端服务Go Dubbo-go提供数据看板 7 RPC + 事件采集框架TrackEvent / BatchTrackEvent并将 5 个业务服务social / asset / gallery / task / user改造接入 TrackEvent 上报。

Architecture:

  • statisticServiceGo Dubbo-go 服务,端口 20009暴露 7 个看板 RPC经 gateway+ 2 个事件 RPC内部调用TrackEvent / BatchTrackEvent
  • 事件流:业务服务 → Dubbo TrackEvent → ChannelEventSink → event_flusher Worker 批量落库到 statistic.events 表(按日分区)
  • 看板数据4 个物化视图MV+ 3 个预聚表Worker 维护),看板查询走 MV/预聚表Redis 5min TTL 缓存
  • Gatewaybackend/gateway/controller/statistic_controller.go 暴露 7 个 GET /api/v1/dashboard/* 路由,响应统一包装 {code:200, data:resp}

Tech Stack: Go (dubbo-go v3 triple)、GORM、PostgreSQL (按日分区 + 物化视图 + 预聚表)、Redis 缓存、Prometheus 指标、gin (gateway)

Spec: docs/superpowers/specs/2026-06-04-statistic-service-design.md(含 §0.1 本期实施范围)


文件结构

新建

backend/
├── proto/
│   ├── event.proto                         # 新建:通用事件类型(独立 proto
│   └── statistic.proto                     # 新建statistic 服务 + 7 看板 RPC + 2 事件 RPC
├── pkg/proto/
│   ├── event/                              # 新建event.proto 编译产物
│   └── statistic/                          # 新建statistic.proto 编译产物
├── services/statisticService/              # 新建(目录已存在但为空)
│   ├── go.mod / go.sum                     # 新建
│   ├── main.go                             # 新建
│   ├── config/
│   │   └── statistic_config.go             # 新建
│   ├── model/
│   │   ├── event.go                        # 新建Event 模型
│   │   └── metric.go                       # 新建7 看板响应结构
│   ├── repository/
│   │   ├── event_repo.go                   # 新建events 表 + 3 预聚表
│   │   └── dashboard_repo.go               # 新建4 MV 聚合查询
│   ├── sink/
│   │   ├── event_sink.go                   # 新建EventSink 接口
│   │   └── channel_sink.go                 # 新建ChannelEventSink本期
│   ├── service/
│   │   ├── event_service.go                # 新建:事件校验 + Submit
│   │   ├── dashboard_service.go            # 新建7 看板 RPC 业务逻辑
│   │   ├── metric_service.go               # 新建MV 刷新协调
│   │   └── cache.go                        # 新建Redis 缓存封装
│   ├── provider/
│   │   ├── statistic_mobile_provider.go    # 新建7 看板 RPCmobile
│   │   └── statistic_internal_provider.go  # 新建TrackEvent / BatchTrackEventinternal
│   ├── worker/
│   │   ├── event_flusher.go                # 新建:事件批量落库
│   │   ├── metric_recent_level_ups_updater.go  # 新建:同步触发
│   │   ├── metric_upcoming_level_ups_updater.go  # 新建15min ticker
│   │   ├── metric_weekly_user_income_updater.go  # 新建5min ticker
│   │   ├── materializer.go                 # 新建4 MV 刷新
│   │   └── partitioner.go                  # 新建events 分区管理
│   ├── handler/
│   │   └── healthz.go                      # 新建:健康检查
│   ├── metrics/
│   │   └── metrics.go                      # 新建Prometheus 指标声明
│   ├── client/                             # 新建:跨服务 RPC 客户端
│   │   ├── user_rpc_client.go
│   │   └── gallery_rpc_client.go
│   └── scripts/                            # 新建:测试 fixture
│       └── testhelper/
├── migrations/
│   ├── 2026_06_08_001_statistic_events.sql
│   ├── 2026_06_08_002_statistic_mv_daily_user_income.sql
│   ├── 2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql
│   ├── 2026_06_08_004_statistic_mv_daily_like_income.sql
│   ├── 2026_06_08_005_statistic_mv_asset_level_distribution.sql
│   ├── 2026_06_08_006_statistic_metric_weekly_user_income.sql
│   ├── 2026_06_08_007_statistic_metric_recent_level_ups.sql
│   ├── 2026_06_08_008_statistic_metric_upcoming_level_ups.sql
│   ├── 2026_06_08_009_statistic_refresh_log.sql
│   └── 2026_06_08_010_statistic_partitions_initial.sql
├── gateway/
│   ├── router/router.go                    # 修改:注册 7 路由
│   └── controller/statistic_controller.go  # 新建7 看板 controller
└── pkg/statistic/                          # 新建:业务侧统一调用 SDK
    └── client.go

修改

backend/
├── go.work                                 # 修改:加 ./services/statisticService
├── services/socialService/service/asset_like_service.go  # 修改:写 like_income_log 后调 TrackEvent
├── services/galleryService/service/exhibition_service.go  # 修改PlaceAsset / RemoveFromSlot 后调
├── services/taskService/service/revenue_service.go  # 修改OnExhibitionCompleted 后调
├── services/assetService/service/mint_service.go  # 修改CreateMintOrder 后调
├── services/assetService/service/asset_level_service.go  # 修改CheckUpgrade 后调
└── services/userService/service/user_service.go  # 修改UpdateCrystalBalance 后调

任务编排

4 阶段 / 16 任务 / 每个任务包含 TDD 步骤

阶段 任务 周期(估)
P1 服务骨架 T1-T3 2-3 天
P2 事件采集 T4-T9 4-5 天
P3 看板 7 RPC T10-T14 4-5 天
P4 业务侧集成 T15-T16 2-3 天

每个任务结束都应有:所有测试通过 + git commit + 关键验证勾选完成。


P1 服务骨架

Task 1: 项目初始化 + go.mod / go.work / 目录结构

Files:

  • Create: backend/services/statisticService/go.mod

  • Modify: backend/go.work

  • Step 1.1: 创建 go.mod

cd backend/services/statisticService
go mod init github.com/topfans/backend/services/statisticService
  • Step 1.2: 添加核心依赖(沿用 taskService
go get github.com/topfans/backend/pkg/database@v0.0.0
go get github.com/topfans/backend/pkg/logger@v0.0.0
go get github.com/topfans/backend/pkg/health@v0.0.0
go get dubbo.apache.org/dubbo-go/v3
go get gorm.io/gorm
go get gorm.io/driver/postgres
go get github.com/redis/go-redis/v9
go get github.com/prometheus/client_golang/prometheus

实际版本号看 go.work 里 taskService 的 go.mod。复制其 require 块,删除 taskService 特定的包assetLevelRepo 等)。

  • Step 1.3: 修改 go.work 加入新服务

backend/go.workuse 列表里加:

	./services/statisticService
  • Step 1.4: 创建子目录结构
mkdir -p backend/services/statisticService/{config,model,repository,sink,service,provider,worker,handler,metrics,client,scripts/testhelper}
  • Step 1.5: 创建占位 main.go
// backend/services/statisticService/main.go
package main

import "fmt"

func main() {
    fmt.Println("statisticService starting...")
}
  • Step 1.6: 编译验证
cd backend
go build ./services/statisticService

Expected: 编译成功,无输出。

  • Step 1.7: Commit
git add backend/services/statisticService/ backend/go.work
git commit -m "feat(statistic): scaffold statisticService with go.mod and directory structure"

Task 2: proto 定义event.proto + statistic.proto

Files:

  • Create: backend/proto/event.proto
  • Create: backend/proto/statistic.proto

关键proto 冻结点。 P2/P3 业务逻辑依赖此定义。

  • Step 2.1: 创建 event.proto
cat > backend/proto/event.proto << 'EOF'
syntax = "proto3";
package event;
option go_package = "github.com/topfans/backend/pkg/proto/event";

message Event {
  string event_id = 1;
  int64 user_id = 2;
  int64 star_id = 3;
  string event_type = 4;
  int64 occurred_at = 5;
  int64 received_at = 6;
  map<string, string> properties = 7;
}

message BatchEventRequest {
  repeated Event events = 1;
}
EOF
  • Step 2.2: 创建 statistic.proto对齐 spec §2.3
cat > backend/proto/statistic.proto << 'EOF'
syntax = "proto3";
package statistic;
option go_package = "github.com/topfans/backend/pkg/proto/statistic";
import "event.proto";

service StatisticService {
  rpc GetTodayOverview(GetTodayOverviewRequest) returns (GetTodayOverviewResponse);
  rpc Get7DayIncomeCurve(Get7DayIncomeCurveRequest) returns (Get7DayIncomeCurveResponse);
  rpc GetExhibitionIncomeSummary(GetExhibitionIncomeSummaryRequest) returns (GetExhibitionIncomeSummaryResponse);
  rpc GetLikeIncomeByLevel(GetLikeIncomeByLevelRequest) returns (GetLikeIncomeByLevelResponse);
  rpc GetTopAssetsByEarning(GetTopAssetsByEarningRequest) returns (GetTopAssetsByEarningResponse);
  rpc GetAssetLevelDistribution(GetAssetLevelDistributionRequest) returns (GetAssetLevelDistributionResponse);
  rpc GetAssetUpgradeProgress(GetAssetUpgradeProgressRequest) returns (GetAssetUpgradeProgressResponse);
  rpc TrackEvent(event.Event) returns (TrackEventResponse);
  rpc BatchTrackEvent(event.BatchEventRequest) returns (TrackEventResponse);
}

// ====== 1. 今日概览 ======
message GetTodayOverviewRequest { int64 star_id = 1; }
message GetTodayOverviewResponse {
  int64 crystal_balance = 1;
  int64 today_income = 2;
  int32 week_rank = 3;
  int32 week_total_users = 4;
}

// ====== 2. 七日收益曲线 ======
message Get7DayIncomeCurveRequest { int64 star_id = 1; }
message DailyIncomePoint {
  string date = 1;
  int64 income = 2;
  bool is_today = 3;
  bool is_peak = 4;
}
message Get7DayIncomeCurveResponse {
  repeated DailyIncomePoint points = 1;
  int64 total_income = 2;
  int64 avg_income = 3;
}

// ====== 3. 展出收益中心 ======
message GetExhibitionIncomeSummaryRequest { int64 star_id = 1; }
message TopExhibitionItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  string duration_7d = 4;
  int64 earnings_7d = 5;
  int32 avg_earnings = 6;
}
message GetExhibitionIncomeSummaryResponse {
  int32 exhibiting_count = 1;
  int32 starbook_count = 2;
  string total_duration = 3;
  int64 total_earnings = 4;
  repeated TopExhibitionItem top5 = 5;
}

// ====== 4. 点赞收益按等级 ======
message GetLikeIncomeByLevelRequest { int64 star_id = 1; }
message LikeIncomeLevelItem {
  string level = 1;
  int32 asset_count = 2;
  int64 total_income = 3;
  string thumb = 4;
}
message GetLikeIncomeByLevelResponse {
  int64 total_like_count = 1;
  int64 total_income = 2;
  repeated LikeIncomeLevelItem levels = 3;
}

// ====== 5. 藏品 TOP5 ======
message GetTopAssetsByEarningRequest { int64 star_id = 1; }
message TopAssetItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  int64 total_earnings = 4;
  int32 rank = 5;
}
message GetTopAssetsByEarningResponse {
  repeated TopAssetItem items = 1;
}

// ====== 6. 藏品等级分布 ======
message GetAssetLevelDistributionRequest { int64 star_id = 1; }
message AssetLevelItem {
  string level = 1;
  int32 count = 2;
  int32 total = 3;
}
message GetAssetLevelDistributionResponse {
  repeated AssetLevelItem items = 1;
}

// ====== 7. 升级进度 ======
message GetAssetUpgradeProgressRequest { int64 star_id = 1; }
message UpcomingLevelUpItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  int32 like_progress = 4;
  int32 duration_progress = 5;
}
message RecentLevelUpItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  string new_level = 4;
  int64 upgrade_time = 5;
}
message GetAssetUpgradeProgressResponse {
  repeated UpcomingLevelUpItem upcoming = 1;
  repeated RecentLevelUpItem recent = 2;
}

// ====== 事件采集响应 ======
message TrackEventResponse {
  int32 accepted = 1;
  int32 rejected = 2;
}
EOF
  • Step 2.3: 编译 proto生成 Go 代码)
cd backend
# 实际项目用 make / 脚本生成,看 Makefile 或 scripts/ 里 protoc 命令
ls scripts/ 2>/dev/null
# 或用标准命令:
protoc --go_out=pkg/proto --go_triple_out=pkg/proto \
  -I proto proto/event.proto proto/statistic.proto

Expected: pkg/proto/event/pkg/proto/statistic/ 下生成 .pb.go.triple.go

  • Step 2.4: 验证编译
go build ./pkg/proto/event/ ./pkg/proto/statistic/

Expected: 编译成功。

  • Step 2.5: Commit
git add backend/proto/ backend/pkg/proto/event/ backend/pkg/proto/statistic/
git commit -m "feat(statistic): add event.proto and statistic.proto with 7 RPCs + 2 event RPCs"

Task 3: config + main.go 启动 + healthz + metrics 骨架 + 10 SQL 迁移

Files:

  • Create: backend/services/statisticService/config/statistic_config.go

  • Modify: backend/services/statisticService/main.go

  • Create: backend/services/statisticService/handler/healthz.go

  • Create: backend/services/statisticService/metrics/metrics.go

  • Create: 10 SQL migration files under backend/migrations/

  • Step 3.1: 创建 config沿用 taskService 模式)

// backend/services/statisticService/config/statistic_config.go
package config

import (
    "flag"
    "log"
    "os"
    "strconv"
    "time"
)

type DatabaseConfig struct {
    Host, Password, DBName, SSLMode, TimeZone string
    Port                                         int
    User                                         string
    Schema                                       string
}

type RedisConfig struct {
    URL string
}

type RefreshIntervals struct {
    DailyUserIncome        time.Duration
    DailyExhibitionRevenue time.Duration
    DailyLikeIncome        time.Duration
    AssetLevelDistribution time.Duration
    WeeklyUserIncome       time.Duration
    UpcomingLevelUps       time.Duration
}

type ChannelConfig struct {
    EventChannelCapacity int
    EventWorkerCount     int
    EventBatchSize       int
    EventBatchInterval   time.Duration
}

type PartitionConfig struct {
    RetentionDays int
    PreCreateDays int
}

type ExtensionConfig struct {
    EnableOLAPDualWrite   bool
    EnableRealtimeChannel bool
    EnableSDKEndpoint     bool
    EnableSampling        bool
}

var (
    DBConfig          = &DatabaseConfig{Schema: "statistic"}
    RedisCfg          = &RedisConfig{URL: "redis://localhost:6379/0"}
    RefreshCfg        = &RefreshIntervals{
        DailyUserIncome: 5 * time.Minute,
        DailyExhibitionRevenue: 5 * time.Minute,
        DailyLikeIncome: 5 * time.Minute,
        AssetLevelDistribution: 15 * time.Minute,
        WeeklyUserIncome: 5 * time.Minute,
        UpcomingLevelUps: 15 * time.Minute,
    }
    ChannelCfg = &ChannelConfig{
        EventChannelCapacity: 1000,
        EventWorkerCount:     1,
        EventBatchSize:       100,
        EventBatchInterval:   1 * time.Second,
    }
    PartitionCfg = &PartitionConfig{RetentionDays: 30, PreCreateDays: 7}
    ExtCfg       = &ExtensionConfig{ // 全部默认 false
        EnableOLAPDualWrite:   false,
        EnableRealtimeChannel: false,
        EnableSDKEndpoint:     false,
        EnableSampling:        false,
    }
)

func getEnv(key, fallback string) string {
    if v := os.Getenv(key); v != "" { return v }
    return fallback
}

func getEnvInt(key string, fallback int) int {
    if v := os.Getenv(key); v != "" {
        if n, err := strconv.Atoi(v); err == nil { return n }
    }
    return fallback
}

func getEnvDuration(key string, fallback time.Duration) time.Duration {
    if v := os.Getenv(key); v != "" {
        if d, err := time.ParseDuration(v); err == nil { return d }
    }
    return fallback
}

func getEnvBool(key string, fallback bool) bool {
    if v := os.Getenv(key); v != "" {
        if b, err := strconv.ParseBool(v); err == nil { return b }
    }
    return fallback
}

func InitConfig() {
    flag.StringVar(&DBConfig.Host, "db-host", getEnv("STATISTIC_DB_HOST", "localhost"), "")
    flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("STATISTIC_DB_PORT", 5432), "")
    flag.StringVar(&DBConfig.User, "db-user", getEnv("STATISTIC_DB_USER", "postgres"), "")
    flag.StringVar(&DBConfig.Password, "db-password", getEnv("STATISTIC_DB_PASSWORD", ""), "")
    flag.StringVar(&DBConfig.DBName, "db-name", getEnv("STATISTIC_DB_NAME", "topfans"), "")
    flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "")
    flag.StringVar(&DBConfig.Schema, "db-schema", getEnv("STATISTIC_DB_SCHEMA", "statistic"), "")

    flag.StringVar(&RedisCfg.URL, "redis-url", getEnv("STATISTIC_REDIS_URL", "redis://localhost:6379/0"), "")

    flag.IntVar(&ChannelCfg.EventChannelCapacity, "event-channel-capacity", getEnvInt("STATISTIC_EVENT_CHANNEL_CAPACITY", 1000), "")
    flag.IntVar(&ChannelCfg.EventBatchSize, "event-batch-size", getEnvInt("STATISTIC_EVENT_BATCH_SIZE", 100), "")
    flag.DurationVar(&ChannelCfg.EventBatchInterval, "event-batch-interval", getEnvDuration("STATISTIC_EVENT_BATCH_INTERVAL", time.Second), "")

    flag.IntVar(&PartitionCfg.RetentionDays, "partition-retention-days", getEnvInt("STATISTIC_PARTITION_RETENTION_DAYS", 30), "")
    flag.IntVar(&PartitionCfg.PreCreateDays, "partition-precreate-days", getEnvInt("STATISTIC_PARTITION_PRECREATE_DAYS", 7), "")

    flag.BoolVar(&ExtCfg.EnableOLAPDualWrite, "enable-olap", getEnvBool("STATISTIC_ENABLE_OLAP_DUAL_WRITE", false), "")
    flag.BoolVar(&ExtCfg.EnableRealtimeChannel, "enable-realtime", getEnvBool("STATISTIC_ENABLE_REALTIME_CHANNEL", false), "")
    flag.BoolVar(&ExtCfg.EnableSDKEndpoint, "enable-sdk", getEnvBool("STATISTIC_ENABLE_SDK_ENDPOINT", false), "")
    flag.BoolVar(&ExtCfg.EnableSampling, "enable-sampling", getEnvBool("STATISTIC_ENABLE_SAMPLING", false), "")

    flag.Parse()
    log.Println("statisticService 配置初始化完成")
}
  • Step 3.2: 创建 metrics 骨架(声明所有指标,数据后续埋)
// backend/services/statisticService/metrics/metrics.go
package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

// 看板 RPC
var DashboardRPCTotal = promauto.NewCounterVec(prometheus.CounterOpts{
    Name: "dashboard_rpc_total", Help: "Dashboard RPC total",
}, []string{"rpc", "status"})

var DashboardRPCDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
    Name: "dashboard_rpc_duration_seconds", Help: "Dashboard RPC duration",
}, []string{"rpc"})

var DashboardCacheHitRate = promauto.NewGauge(prometheus.GaugeOpts{
    Name: "dashboard_cache_hit_rate", Help: "Dashboard cache hit rate",
})

// 事件采集
var EventTrackTotal = promauto.NewCounterVec(prometheus.CounterOpts{
    Name: "event_track_total", Help: "Event track total",
}, []string{"event_type", "result"})

var EventChannelSize = promauto.NewGauge(prometheus.GaugeOpts{
    Name: "event_channel_size", Help: "Event channel current size",
})

var EventChannelCapacity = promauto.NewGauge(prometheus.GaugeOpts{
    Name: "event_channel_capacity", Help: "Event channel capacity",
})

var EventDroppedTotal = promauto.NewCounter(prometheus.CounterOpts{
    Name: "event_dropped_total", Help: "Event dropped total",
})

var EventDBInsertTotal = promauto.NewCounterVec(prometheus.CounterOpts{
    Name: "event_db_insert_total", Help: "Event DB insert total",
}, []string{"status"})

// 物化视图
var MVRefreshTotal = promauto.NewCounterVec(prometheus.CounterOpts{
    Name: "mv_refresh_total", Help: "Materialized view refresh total",
}, []string{"mv_name", "status"})

var MVRefreshDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
    Name: "mv_refresh_duration_seconds", Help: "MV refresh duration",
}, []string{"mv_name"})

// Worker
var WorkerRunningCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
    Name: "worker_running_count", Help: "Worker running count",
}, []string{"worker_name"})

// 分区
var EventsPartitionCount = promauto.NewGauge(prometheus.GaugeOpts{
    Name: "events_partition_count", Help: "Events partition count",
})
  • Step 3.3: 创建 healthz
// backend/services/statisticService/handler/healthz.go
package handler

import (
    "context"
    "database/sql"
    "net/http"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/redis/go-redis/v9"
    "github.com/topfans/backend/services/statisticService/metrics"
)

type Healthz struct {
    db    *sql.DB
    redis *redis.Client
}

func NewHealthz(db *sql.DB, redis *redis.Client) *Healthz {
    return &Healthz{db: db, redis: redis}
}

func (h *Healthz) Register(r *gin.Engine, port int) {
    r.GET("/metrics", gin.WrapH(metricsHandler())) // 暴露 Prometheus
    r.GET("/healthz", func(c *gin.Context) {
        ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second)
        defer cancel()

        status := gin.H{"status": "ok"}
        if err := h.db.PingContext(ctx); err != nil {
            status["db"] = "down"
        } else {
            status["db"] = "up"
        }
        if err := h.redis.Ping(ctx).Err(); err != nil {
            status["redis"] = "down"
        } else {
            status["redis"] = "up"
        }
        c.JSON(http.StatusOK, status)
    })
}

实际项目pkg/health 组件,参考 taskService main.go:131-133。health.NewHandler("statistic-service", healthPort).Start()。本计划用自实现版,最终应替换为项目既有组件。

  • Step 3.4: 创建 main.go启动骨架不含业务
// backend/services/statisticService/main.go
package main

import (
    "database/sql"
    "flag"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    _ "github.com/lib/pq"
    "github.com/redis/go-redis/v9"
    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/services/statisticService/config"
    "github.com/topfans/backend/services/statisticService/handler"
)

var port = flag.Int("port", 20009, "Dubbo service port")

func main() {
    env := os.Getenv("ENV")
    if env == "" { env = "development" }
    if err := logger.Init(logger.Config{ServiceName: "statistic-service", Environment: env}); err != nil {
        panic(fmt.Sprintf("Failed to init logger: %v", err))
    }
    defer logger.Sync()

    config.InitConfig()

    // DB
    db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
        config.DBConfig.Host, config.DBConfig.Port, config.DBConfig.User, config.DBConfig.Password,
        config.DBConfig.DBName, config.DBConfig.SSLMode))
    if err != nil { logger.Logger.Fatal(fmt.Sprintf("Failed to open DB: %v", err)) }
    db.SetMaxOpenConns(50)
    if err := db.Ping(); err != nil { logger.Logger.Fatal(fmt.Sprintf("DB ping: %v", err)) }
    defer db.Close()

    // Redis
    opt, _ := redis.ParseURL(config.RedisCfg.URL)
    rdb := redis.NewClient(opt)
    defer rdb.Close()

    // Healthz HTTP server on port+1000=21009
    healthPort := *port + 1000
    h := handler.NewHealthz(db, rdb)
    go h.Start(healthPort)

    logger.Logger.Info(fmt.Sprintf("statisticService started, dubbo=%d healthz=%d", *port, healthPort))

    // Graceful shutdown
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
}

并在 healthz.go 加 Start 方法:

// 加到 backend/services/statisticService/handler/healthz.go
func (h *Healthz) Start(port int) {
    r := gin.Default()
    h.Register(r, port)
    go func() {
        if err := r.Run(fmt.Sprintf(":%d", port)); err != nil {
            logger.Logger.Fatal(...)
        }
    }()
}
  • Step 3.5: 创建 10 个 SQL 迁移文件(参考 spec §3.2-3.5

每个文件包含一个 DDL + 必要的索引。关键 SQL 摘要:

backend/migrations/2026_06_08_001_statistic_events.sql

CREATE SCHEMA IF NOT EXISTS statistic;

CREATE TABLE IF NOT EXISTS statistic.events (
    id              BIGSERIAL,
    event_id        UUID         NOT NULL,
    user_id         BIGINT       NOT NULL,
    star_id         BIGINT       NOT NULL,
    event_type      VARCHAR(64)  NOT NULL,
    occurred_at     TIMESTAMPTZ  NOT NULL,
    received_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    properties      JSONB        NOT NULL DEFAULT '{}',
    PRIMARY KEY (id, received_at)
) PARTITION BY RANGE (received_at);

CREATE UNIQUE INDEX IF NOT EXISTS idx_events_event_id ON statistic.events (event_id, received_at);
CREATE INDEX IF NOT EXISTS idx_events_user_star_type_time
    ON statistic.events (user_id, star_id, event_type, received_at DESC);
CREATE INDEX IF NOT EXISTS idx_events_star_type_time
    ON statistic.events (star_id, event_type, received_at DESC);
CREATE INDEX IF NOT EXISTS idx_events_properties_gin ON statistic.events USING GIN (properties);

-- 初始 7 天分区(从今天起)
DO $$
DECLARE
    i INT;
    d DATE;
    n DATE;
BEGIN
    FOR i IN 0..6 LOOP
        d := CURRENT_DATE + i;
        n := d + 1;
        EXECUTE format(
            'CREATE TABLE IF NOT EXISTS statistic.events_%s PARTITION OF statistic.events FOR VALUES FROM (%L) TO (%L)',
            to_char(d, 'YYYY_MM_DD'), d::text, n::text
        );
    END LOOP;
END $$;

2026_06_08_002_statistic_mv_daily_user_income.sql - 见 spec §3.4 MV1 SQL

2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql - 见 spec §3.4 MV2 SQL

2026_06_08_004_statistic_mv_daily_like_income.sql - 见 spec §3.4 MV3 SQL

2026_06_08_005_statistic_mv_asset_level_distribution.sql - 见 spec §3.4 MV4 SQL

2026_06_08_006_statistic_metric_weekly_user_income.sql - 见 spec §3.5 weekly 表

2026_06_08_007_statistic_metric_recent_level_ups.sql - 见 spec §3.5 recent_level_ups 表

2026_06_08_008_statistic_metric_upcoming_level_ups.sql - 见 spec §3.5 upcoming_level_ups 表

2026_06_08_009_statistic_refresh_log.sql

CREATE TABLE IF NOT EXISTS statistic.refresh_log (
    id              BIGSERIAL PRIMARY KEY,
    mv_name         VARCHAR(128)  NOT NULL,
    started_at      TIMESTAMPTZ   NOT NULL,
    finished_at     TIMESTAMPTZ,
    row_count       BIGINT,
    status          VARCHAR(16)   NOT NULL,
    error_message   TEXT
);
CREATE INDEX IF NOT EXISTS idx_refresh_log_mv_time
    ON statistic.refresh_log (mv_name, started_at DESC);
  • Step 3.6: 跑 DB 迁移
cd backend
# 用项目现有的迁移工具(看 scripts/
ls scripts/ 2>/dev/null
# 或手动 psql
psql -h localhost -U postgres -d topfans -f migrations/2026_06_08_001_statistic_events.sql
# ... 10 个文件依次执行

Expected: 10 个 SQL 全部成功,\dt statistic.* 看到 10+ 表。

  • Step 3.7: 编译 + 运行
go build ./services/statisticService
./services/statisticService/bin/statisticService

Expected: 启动日志显示 port 20009 + healthz 21009。

  • Step 3.8: 验证 healthz
curl http://localhost:21009/healthz

Expected: {"db":"up","redis":"up","status":"ok"}

  • Step 3.9: 跑 P1 末预检查清单

必做spec §0.1.3 P1 末预检查清单 10 项。

# 1. socialService
grep -r "like_income_log" backend/services/socialService/
# 2. galleryService
grep -rn "^func.*PlaceAsset\|^func.*RemoveFromSlot" backend/services/galleryService/service/
# 3. taskService
grep -n "OnExhibitionCompleted" backend/services/taskService/service/revenue_service.go
# 4. assetService
grep -n "CreateMintOrder\|CheckUpgrade\|logLevelChange" backend/services/assetService/service/{mint_service,asset_level_service}.go
# 5. userService
grep -n "UpdateCrystalBalance" backend/services/userService/service/user_service.go
# 6. public.assets 表结构
psql -c "\d+ public.assets"
# 7. 公共日志表
psql -c "\dt public.*log*"
# 8. 端口
lsof -i :20009
# 9. 业务服务可达
nc -zv localhost 20000; nc -zv localhost 20001; nc -zv localhost 20002; nc -zv localhost 20003; nc -zv localhost 20006
# 10. schema 权限
psql -U postgres -c "CREATE SCHEMA statistic_test_perm; DROP SCHEMA statistic_test_perm;"

Expected: 全部通过。

  • Step 3.10: Commit
git add backend/services/statisticService/ backend/migrations/
git commit -m "feat(statistic): P1 scaffolding - config, main, healthz, metrics, 10 SQL migrations"

P2 事件采集

Task 4: Event 模型 + EventSink 接口 + ChannelEventSink

Files:

  • Create: backend/services/statisticService/model/event.go

  • Create: backend/services/statisticService/sink/event_sink.go

  • Create: backend/services/statisticService/sink/channel_sink.go

  • Create: backend/services/statisticService/sink/channel_sink_test.go

  • Step 4.1: 写 Event 模型(写测试先)

backend/services/statisticService/model/event_test.go

package model

import (
    "testing"
    "time"
)

func TestEvent_ToJSON(t *testing.T) {
    e := &Event{
        EventID:    "uuid-123",
        UserID:     100,
        StarID:     1,
        EventType:  "asset.like",
        OccurredAt: time.Unix(1700000000, 0),
        ReceivedAt: time.Unix(1700000001, 0),
        Properties: map[string]string{"asset_id": "456"},
    }
    j := e.ToJSON()
    if j["event_id"] != "uuid-123" { t.Fatal("event_id mismatch") }
    if j["properties"].(map[string]interface{})["asset_id"] != "456" { t.Fatal("properties mismatch") }
}
  • Step 4.2: 跑测试(应失败)
cd backend/services/statisticService
go test ./model/ -v

Expected: FAIL - "undefined: Event"

  • Step 4.3: 实现 Event

backend/services/statisticService/model/event.go

package model

import "time"

type Event struct {
    EventID    string            `json:"event_id"`
    UserID     int64             `json:"user_id"`
    StarID     int64             `json:"star_id"`
    EventType  string            `json:"event_type"`
    OccurredAt time.Time         `json:"occurred_at"`
    ReceivedAt time.Time         `json:"received_at"`
    Properties map[string]string `json:"properties"`
}

func (e *Event) ToJSON() map[string]interface{} {
    return map[string]interface{}{
        "event_id":    e.EventID,
        "user_id":     e.UserID,
        "star_id":     e.StarID,
        "event_type":  e.EventType,
        "occurred_at": e.OccurredAt.UnixMilli(),
        "received_at": e.ReceivedAt.UnixMilli(),
        "properties":  e.Properties,
    }
}
  • Step 4.4: 跑测试(应通过)
go test ./model/ -v

Expected: PASS

  • Step 4.5: 写 EventSink 接口 + ChannelEventSink 测试

backend/services/statisticService/sink/channel_sink_test.go

package sink

import (
    "context"
    "testing"
    "time"
)

func TestChannelEventSink_Submit(t *testing.T) {
    ch := make(chan *model.Event, 10)
    s := NewChannelEventSink(ch)
    e := &model.Event{EventID: "test-1"}
    if err := s.Submit(context.Background(), e); err != nil {
        t.Fatalf("Submit failed: %v", err)
    }
    select {
    case got := <-ch:
        if got.EventID != "test-1" { t.Fatal("event mismatch") }
    case <-time.After(100 * time.Millisecond):
        t.Fatal("no event received")
    }
}

func TestChannelEventSink_SubmitBatch(t *testing.T) {
    ch := make(chan *model.Event, 10)
    s := NewChannelEventSink(ch)
    events := []*model.Event{{EventID: "a"}, {EventID: "b"}}
    if err := s.SubmitBatch(context.Background(), events); err != nil {
        t.Fatalf("SubmitBatch failed: %v", err)
    }
    if len(ch) != 2 { t.Fatalf("expected 2 events, got %d", len(ch)) }
}
  • Step 4.6: 跑测试(应失败)
go test ./sink/ -v

Expected: FAIL

  • Step 4.7: 实现接口和 ChannelEventSink

backend/services/statisticService/sink/event_sink.go

package sink

import (
    "context"
    "github.com/topfans/backend/services/statisticService/model"
)

type EventSink interface {
    Submit(ctx context.Context, e *model.Event) error
    SubmitBatch(ctx context.Context, es []*model.Event) error
    Close() error
}

backend/services/statisticService/sink/channel_sink.go

package sink

import (
    "context"
    "github.com/topfans/backend/services/statisticService/model"
)

type ChannelEventSink struct {
    ch chan<- *model.Event
}

func NewChannelEventSink(ch chan<- *model.Event) *ChannelEventSink {
    return &ChannelEventSink{ch: ch}
}

func (s *ChannelEventSink) Submit(ctx context.Context, e *model.Event) error {
    select {
    case s.ch <- e:
        return nil
    default:
        return ErrChannelFull
    }
}

func (s *ChannelEventSink) SubmitBatch(ctx context.Context, es []*model.Event) error {
    for _, e := range es {
        if err := s.Submit(ctx, e); err != nil {
            return err
        }
    }
    return nil
}

func (s *ChannelEventSink) Close() error { return nil }

var ErrChannelFull = errors.New("event channel full")

import "errors"

  • Step 4.8: 跑测试(应通过)
go test ./sink/ -v

Expected: PASS

  • Step 4.9: Commit
git add backend/services/statisticService/model/ backend/services/statisticService/sink/
git commit -m "feat(statistic): Event model, EventSink interface, ChannelEventSink (P2 T4)"

Task 5: event_repo + event_service事件校验 + 落库)

Files:

  • Create: backend/services/statisticService/repository/event_repo.go

  • Create: backend/services/statisticService/service/event_service.go

  • Tests

  • Step 5.1: 写 event_repo 批量插入测试

backend/services/statisticService/repository/event_repo_test.go

package repository

import (
    "context"
    "os"
    "testing"
    "time"
)

func setupTestDB(t *testing.T) (*sql.DB, func()) {
    dsn := os.Getenv("TEST_DATABASE_URL")
    if dsn == "" {
        t.Skip("TEST_DATABASE_URL not set, skipping integration test")
    }
    db, err := sql.Open("postgres", dsn)
    if err != nil { t.Fatal(err) }
    // 用 schema=test, 每次 truncate events
    if _, err := db.Exec("CREATE SCHEMA IF NOT EXISTS statistic_test"); err != nil { t.Fatal(err) }
    return db, func() {
        db.Exec("TRUNCATE TABLE statistic_test.events")
        db.Close()
    }
}

func TestEventRepo_InsertBatch(t *testing.T) {
    db, cleanup := setupTestDB(t)
    defer cleanup()
    repo := NewEventRepository(db, "statistic_test")
    events := []*model.Event{
        {EventID: "uuid-1", UserID: 100, StarID: 1, EventType: "asset.like",
         OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{"asset_id": "456"}},
    }
    inserted, err := repo.InsertBatch(context.Background(), events)
    if err != nil { t.Fatal(err) }
    if inserted != 1 { t.Fatalf("expected 1, got %d", inserted) }
}

func TestEventRepo_Dedup(t *testing.T) {
    db, cleanup := setupTestDB(t)
    defer cleanup()
    repo := NewEventRepository(db, "statistic_test")
    e := &model.Event{EventID: "uuid-dup", UserID: 100, StarID: 1, EventType: "asset.like",
        OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{}}
    repo.InsertBatch(context.Background(), []*model.Event{e})
    inserted, _ := repo.InsertBatch(context.Background(), []*model.Event{e}) // 重复
    if inserted != 0 { t.Fatalf("expected 0 (dedup), got %d", inserted) }
}
  • Step 5.2: 跑测试(应失败)
TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5432/topfans_test?sslmode=disable" \
  go test ./repository/ -v -run TestEventRepo

Expected: FAIL

  • Step 5.3: 实现 event_repo

backend/services/statisticService/repository/event_repo.go

package repository

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "strings"

    "github.com/topfans/backend/services/statisticService/model"
)

type EventRepository struct {
    db     *sql.DB
    schema string
}

func NewEventRepository(db *sql.DB, schema string) *EventRepository {
    return &EventRepository{db: db, schema: schema}
}

func (r *EventRepository) InsertBatch(ctx context.Context, events []*model.Event) (int, error) {
    if len(events) == 0 { return 0, nil }

    placeholders := make([]string, 0, len(events))
    args := make([]interface{}, 0, len(events)*7)
    for _, e := range events {
        props, _ := json.Marshal(e.Properties)
        placeholders = append(placeholders, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)",
            len(args)+1, len(args)+2, len(args)+3, len(args)+4, len(args)+5, len(args)+6, len(args)+7))
        args = append(args, e.EventID, e.UserID, e.StarID, e.EventType, e.OccurredAt, e.ReceivedAt, string(props))
    }

    query := fmt.Sprintf(`
        INSERT INTO %s.events (event_id, user_id, star_id, event_type, occurred_at, received_at, properties)
        VALUES %s
        ON CONFLICT (event_id, received_at) DO NOTHING
    `, r.schema, strings.Join(placeholders, ","))

    res, err := r.db.ExecContext(ctx, query, args...)
    if err != nil { return 0, err }
    n, _ := res.RowsAffected()
    return int(n), nil
}
  • Step 5.4: 跑测试(应通过)
TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5432/topfans_test?sslmode=disable" \
  go test ./repository/ -v -run TestEventRepo

Expected: PASS

  • Step 5.5: 写 event_service 校验测试

backend/services/statisticService/service/event_service_test.go

package service

import (
    "context"
    "testing"
    "github.com/topfans/backend/services/statisticService/model"
    "github.com/topfans/backend/services/statisticService/sink"
)

type mockSink struct{ events []*model.Event }
func (m *mockSink) Submit(ctx context.Context, e *model.Event) error {
    m.events = append(m.events, e); return nil
}
func (m *mockSink) SubmitBatch(ctx context.Context, es []*model.Event) error {
    m.events = append(m.events, es...); return nil
}
func (m *mockSink) Close() error { return nil }

func TestEventService_TrackEvent_Success(t *testing.T) {
    ms := &mockSink{}
    svc := NewEventService(ms, []string{"asset.like", "asset.mint"})
    resp, err := svc.TrackEvent(context.Background(), &model.Event{
        EventID: "u1", UserID: 1, StarID: 1, EventType: "asset.like",
        OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{},
    })
    if err != nil { t.Fatal(err) }
    if resp.Accepted != 1 { t.Fatal("expected accepted=1") }
    if len(ms.events) != 1 { t.Fatal("event not submitted") }
}

func TestEventService_TrackEvent_InvalidEventType(t *testing.T) {
    ms := &mockSink{}
    svc := NewEventService(ms, []string{"asset.like"})
    _, err := svc.TrackEvent(context.Background(), &model.Event{EventType: "evil.type"})
    if err == nil { t.Fatal("expected error for invalid event type") }
}

func TestEventService_TrackEvent_PropertiesTooLarge(t *testing.T) {
    ms := &mockSink{}
    svc := NewEventService(ms, []string{"asset.like"})
    big := make(map[string]string)
    for i := 0; i < 2000; i++ { big[fmt.Sprintf("k%d", i)] = strings.Repeat("x", 100) }
    _, err := svc.TrackEvent(context.Background(), &model.Event{
        EventType: "asset.like", Properties: big,
    })
    if err == nil { t.Fatal("expected error for large properties") }
}
  • Step 5.6: 跑测试(应失败)
go test ./service/ -v

Expected: FAIL

  • Step 5.7: 实现 event_service

backend/services/statisticService/service/event_service.go

package service

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/topfans/backend/services/statisticService/model"
    "github.com/topfans/backend/services/statisticService/sink"
    pb "github.com/topfans/backend/pkg/proto/statistic"
)

var (
    ErrInvalidEventType  = fmt.Errorf("invalid event type")
    ErrPropertiesTooLarge = fmt.Errorf("properties too large")
    ErrInvalidEventID     = fmt.Errorf("invalid event_id")
)

const MaxPropertiesSize = 1024 // 1KB

type EventService struct {
    sink        sink.EventSink
    whiteList   map[string]bool
    statsEnable bool
}

func NewEventService(s sink.EventSink, whiteList []string) *EventService {
    wl := make(map[string]bool, len(whiteList))
    for _, t := range whiteList { wl[t] = true }
    return &EventService{sink: s, whiteList: wl}
}

func (s *EventService) validate(e *model.Event) error {
    if e.EventID == "" { return ErrInvalidEventID }
    if !s.whiteList[e.EventType] { return ErrInvalidEventType }
    b, _ := json.Marshal(e.Properties)
    if len(b) > MaxPropertiesSize { return ErrPropertiesTooLarge }
    return nil
}

func (s *EventService) TrackEvent(ctx context.Context, e *model.Event) (*pb.TrackEventResponse, error) {
    if e.ReceivedAt.IsZero() { e.ReceivedAt = time.Now() }
    if err := s.validate(e); err != nil { return nil, err }
    if err := s.sink.Submit(ctx, e); err != nil {
        return &pb.TrackEventResponse{Accepted: 0, Rejected: 1}, nil
    }
    return &pb.TrackEventResponse{Accepted: 1, Rejected: 0}, nil
}

func (s *EventService) BatchTrackEvent(ctx context.Context, es []*model.Event) (*pb.TrackEventResponse, error) {
    accepted, rejected := 0, 0
    for _, e := range es {
        if e.ReceivedAt.IsZero() { e.ReceivedAt = time.Now() }
        if err := s.validate(e); err != nil { rejected++; continue }
        if err := s.sink.Submit(ctx, e); err != nil { rejected++ } else { accepted++ }
    }
    return &pb.TrackEventResponse{Accepted: int32(accepted), Rejected: int32(rejected)}, nil
}
  • Step 5.8: 跑测试(应通过)
go test ./service/ -v

Expected: PASS

  • Step 5.9: Commit
git add backend/services/statisticService/repository/ backend/services/statisticService/service/
git commit -m "feat(statistic): event_repo (batch insert + dedup) and event_service (validate + submit) (P2 T5)"

Task 6: event_flusher Worker攒批落库 + 触发 metric_recent_level_ups

Files:

  • Create: backend/services/statisticService/worker/event_flusher.go

  • Create: backend/services/statisticService/worker/event_flusher_test.go

  • Create: backend/services/statisticService/repository/metric_repo.go

  • Step 6.1: 写 metric_repo同步更新 metric_recent_level_ups

backend/services/statisticService/repository/metric_repo.go

package repository

import (
    "context"
    "database/sql"
    "github.com/topfans/backend/services/statisticService/model"
)

type MetricRepository struct {
    db     *sql.DB
    schema string
}

func NewMetricRepository(db *sql.DB, schema string) *MetricRepository {
    return &MetricRepository{db: db, schema: schema}
}

func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error {
    if e.EventType != "asset.level_up" { return nil }
    assetID := e.Properties["asset_id"]
    fromLevel := e.Properties["from"]
    toLevel := e.Properties["to"]
    upgradeTime := e.OccurredAt
    if assetID == "" || toLevel == "" { return nil }
    _, err := r.db.ExecContext(ctx, fmt.Sprintf(`
        INSERT INTO %s.metric_recent_level_ups
            (user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb)
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
        ON CONFLICT DO NOTHING
    `, r.schema),
        e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "")
    return err
}

完整 metric_repo 还含 weekly_user_income / upcoming_level_ups 读写,留到 T7/T8 任务。

  • Step 6.2: 写 event_flusher 测试

backend/services/statisticService/worker/event_flusher_test.go

package worker

import (
    "context"
    "database/sql"
    "os"
    "testing"
    "time"

    _ "github.com/lib/pq"
    "github.com/topfans/backend/services/statisticService/model"
    "github.com/topfans/backend/services/statisticService/repository"
)

func TestEventFlusher_FlushBatch(t *testing.T) {
    dsn := os.Getenv("TEST_DATABASE_URL")
    if dsn == "" { t.Skip("skip") }
    db, _ := sql.Open("postgres", dsn)
    defer db.Close()

    eventRepo := repository.NewEventRepository(db, "statistic_test")
    metricRepo := repository.NewMetricRepository(db, "statistic_test")

    ch := make(chan *model.Event, 10)
    flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second)

    go flusher.Start(context.Background())

    for i := 0; i < 5; i++ {
        ch <- &model.Event{EventID: fmt.Sprintf("u-%d", i), UserID: 1, StarID: 1, EventType: "asset.like",
            OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{}}
    }
    time.Sleep(2 * time.Second) // 等攒批
    flusher.Stop()
    // 验证 events 表有 5 条
    var n int
    db.QueryRow("SELECT COUNT(*) FROM statistic_test.events").Scan(&n)
    if n < 5 { t.Fatalf("expected >=5 events, got %d", n) }
}
  • Step 6.3: 跑测试(应失败)
TEST_DATABASE_URL="..." go test ./worker/ -v

Expected: FAIL

  • Step 6.4: 实现 event_flusher

backend/services/statisticService/worker/event_flusher.go

package worker

import (
    "context"
    "sync"
    "time"

    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/services/statisticService/metrics"
    "github.com/topfans/backend/services/statisticService/model"
    "github.com/topfans/backend/services/statisticService/repository"
    "go.uber.org/zap"
)

type EventFlusher struct {
    ch         <-chan *model.Event
    eventRepo  *repository.EventRepository
    metricRepo *repository.MetricRepository
    batchSize  int
    interval   time.Duration

    mu      sync.Mutex
    running bool
    stop    chan struct{}
}

func NewEventFlusher(ch <-chan *model.Event, eventRepo *repository.EventRepository, metricRepo *repository.MetricRepository, batchSize int, interval time.Duration) *EventFlusher {
    return &EventFlusher{ch: ch, eventRepo: eventRepo, metricRepo: metricRepo, batchSize: batchSize, interval: interval, stop: make(chan struct{})}
}

func (f *EventFlusher) Start(ctx context.Context) {
    f.mu.Lock()
    f.running = true
    f.mu.Unlock()
    metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(1)
    defer metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(0)

    batch := make([]*model.Event, 0, f.batchSize)
    ticker := time.NewTicker(f.interval)
    defer ticker.Stop()

    flush := func() {
        if len(batch) == 0 { return }
        inserted, err := f.eventRepo.InsertBatch(ctx, batch)
        if err != nil {
            logger.Logger.Error("event_flusher insert failed", zap.Error(err))
            metrics.EventDBInsertTotal.WithLabelValues("failed").Inc()
        } else {
            metrics.EventDBInsertTotal.WithLabelValues("success").Inc()
        }
        // 同步触发 metric_recent_level_ups
        for _, e := range batch {
            if err := f.metricRepo.UpsertRecentLevelUp(ctx, e); err != nil {
                logger.Logger.Warn("UpsertRecentLevelUp failed", zap.Error(err))
            }
        }
        logger.Logger.Debug("event_flusher batch flushed", zap.Int("inserted", inserted), zap.Int("batch", len(batch)))
        batch = batch[:0]
    }

    for {
        select {
        case <-f.stop:
            flush()
            return
        case e := <-f.ch:
            batch = append(batch, e)
            metrics.EventChannelSize.Set(float64(len(f.ch)))
            if len(batch) >= f.batchSize { flush() }
        case <-ticker.C:
            flush()
        }
    }
}

func (f *EventFlusher) Stop() {
    f.mu.Lock()
    defer f.mu.Unlock()
    if f.running {
        close(f.stop)
        f.running = false
    }
}
  • Step 6.5: 跑测试(应通过)
TEST_DATABASE_URL="..." go test ./worker/ -v

Expected: PASS

  • Step 6.6: Commit
git add backend/services/statisticService/repository/ backend/services/statisticService/worker/
git commit -m "feat(statistic): event_flusher worker (batch insert + sync metric_recent_level_ups) (P2 T6)"

Task 7: metric_weekly_user_income_updater + metric_upcoming_level_ups_updater

Files:

  • Modify: backend/services/statisticService/repository/metric_repo.go(加 weekly + upcoming 方法)

  • Create: backend/services/statisticService/worker/metric_weekly_user_income_updater.go

  • Create: backend/services/statisticService/worker/metric_upcoming_level_ups_updater.go

  • Step 7.1: 扩 metric_repo 加 weekly + upcoming 方法

// 加到 metric_repo.go

// RefreshWeeklyUserIncome: 全量重算本周 rank + total
func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error {
    // 用 pg_try_advisory_lock 防多实例重复
    var got bool
    if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil {
        return err
    }
    if !got { return nil } // 抢不到锁本轮跳过
    defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")

    // 计算本周一 (Asia/Shanghai)
    _, err := r.db.ExecContext(ctx, fmt.Sprintf(`
        INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star)
        SELECT
            star_id, user_id,
            DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start,
            SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0
                     THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal,
            ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star
        FROM %s.events
        WHERE event_type IN ('exhibition.revenue', 'crystal.change')
          AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai')
        GROUP BY star_id, user_id
        ON CONFLICT (star_id, user_id, week_start) DO UPDATE
        SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW()
    `, r.schema, r.schema))
    return err
}

// RefreshUpcomingLevelUps: 计算每个 asset 的 like_progress + duration_progress
func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error {
    // 全量从 public.assets 读 + 计算进度
    _, err := r.db.ExecContext(ctx, fmt.Sprintf(`
        INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress)
        SELECT
            a.user_id, a.star_id, a.id,
            LEAST(100, (a.like_count::FLOAT / NULLIF(alc.upgrade_like_threshold, 0) * 100)::INT) AS like_progress,
            LEAST(100, (EXTRACT(EPOCH FROM (NOW() - a.placed_at))::FLOAT / NULLIF(alc.upgrade_duration_seconds, 1) * 100)::INT) AS duration_progress
        FROM public.assets a
        JOIN public.asset_level_config alc ON alc.level = a.level
        WHERE a.status = 'active' AND a.deleted_at IS NULL
        ON CONFLICT (user_id, star_id, asset_id) DO UPDATE
        SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW()
    `, r.schema))
    return err
}

实现提示: 实际项目里 public.asset_level_config 表名/字段名可能不同。P1 末向 assetService 同学确认后调整 SQL。

  • Step 7.2: 创建两个 Worker结构同

worker/metric_weekly_user_income_updater.go

package worker

import (
    "context"
    "time"

    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/services/statisticService/metrics"
    "github.com/topfans/backend/services/statisticService/repository"
    "go.uber.org/zap"
)

type WeeklyUserIncomeUpdater struct {
    repo     *repository.MetricRepository
    interval time.Duration
    stop     chan struct{}
}

func NewWeeklyUserIncomeUpdater(repo *repository.MetricRepository, interval time.Duration) *WeeklyUserIncomeUpdater {
    return &WeeklyUserIncomeUpdater{repo: repo, interval: interval, stop: make(chan struct{})}
}

func (w *WeeklyUserIncomeUpdater) Start(ctx context.Context) {
    metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(1)
    defer metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(0)
    ticker := time.NewTicker(w.interval)
    defer ticker.Stop()
    // 启动时跑一次
    w.runOnce(ctx)
    for {
        select {
        case <-w.stop:
            return
        case <-ticker.C:
            w.runOnce(ctx)
        }
    }
}

func (w *WeeklyUserIncomeUpdater) runOnce(ctx context.Context) {
    t0 := time.Now()
    if err := w.repo.RefreshWeeklyUserIncome(ctx); err != nil {
        logger.Logger.Error("RefreshWeeklyUserIncome failed", zap.Error(err))
        metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "failed").Inc()
    } else {
        metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "success").Inc()
    }
    metrics.MVRefreshDuration.WithLabelValues("weekly_user_income").Observe(time.Since(t0).Seconds())
}

func (w *WeeklyUserIncomeUpdater) Stop() { close(w.stop) }

worker/metric_upcoming_level_ups_updater.go:结构同上,调 repo.RefreshUpcomingLevelUpsinterval 默认 15min。

  • Step 7.3: Commit
git add backend/services/statisticService/repository/ backend/services/statisticService/worker/
git commit -m "feat(statistic): metric_weekly + metric_upcoming level up workers (P2 T7)"

Task 8: partitioner.goevents 分区自动管理)

Files:

  • Create: backend/services/statisticService/worker/partitioner.go

  • Create: backend/services/statisticService/worker/partitioner_test.go

  • Step 8.1: 写测试(集成)

// worker/partitioner_test.go
package worker

import (
    "context"
    "database/sql"
    "os"
    "testing"
    "time"

    _ "github.com/lib/pq"
)

func TestPartitioner_EnsureFuture(t *testing.T) {
    dsn := os.Getenv("TEST_DATABASE_URL")
    if dsn == "" { t.Skip("skip") }
    db, _ := sql.Open("postgres", dsn)
    defer db.Close()

    // 创建测试 schema
    db.Exec("CREATE SCHEMA IF NOT EXISTS statistic_test")
    db.Exec(`CREATE TABLE IF NOT EXISTS statistic_test.events (
        id BIGSERIAL, event_id UUID NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
        PRIMARY KEY (id, received_at)
    ) PARTITION BY RANGE (received_at)`)

    p := NewPartitioner(db, "statistic_test", 7, 30)
    if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
        t.Fatal(err)
    }
    // 验证未来 3 天分区存在
    var n int
    db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname='statistic_test' AND tablename LIKE 'events_%'").Scan(&n)
    if n < 3 { t.Fatalf("expected >=3 partitions, got %d", n) }
}
  • Step 8.2: 跑测试(应失败)
TEST_DATABASE_URL="..." go test ./worker/ -v -run TestPartitioner

Expected: FAIL

  • Step 8.3: 实现 partitioner
// worker/partitioner.go
package worker

import (
    "context"
    "database/sql"
    "fmt"
    "time"

    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/services/statisticService/metrics"
    "go.uber.org/zap"
)

type Partitioner struct {
    db             *sql.DB
    schema         string
    preCreateDays  int
    retentionDays  int
    stop           chan struct{}
}

func NewPartitioner(db *sql.DB, schema string, preCreateDays, retentionDays int) *Partitioner {
    return &Partitioner{db: db, schema: schema, preCreateDays: preCreateDays, retentionDays: retentionDays, stop: make(chan struct{})}
}

func (p *Partitioner) EnsureFuturePartitions(ctx context.Context, days int) error {
    now := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600))
    for i := 0; i <= days; i++ {
        d := now.AddDate(0, 0, i)
        next := d.AddDate(0, 0, 1)
        name := fmt.Sprintf("events_%s", d.Format("2006_01_02"))
        sql := fmt.Sprintf(`
            CREATE TABLE IF NOT EXISTS %s.%s PARTITION OF %s.events
            FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')
        `, p.schema, name, p.schema, d.Format("2006-01-02"), next.Format("2006-01-02"))
        if _, err := p.db.ExecContext(ctx, sql); err != nil {
            return fmt.Errorf("create partition %s: %w", name, err)
        }
        metrics.EventsPartitionCount.Inc()
    }
    return nil
}

func (p *Partitioner) CleanupOldPartitions(ctx context.Context) error {
    cutoff := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)).AddDate(0, 0, -p.retentionDays)
    rows, err := p.db.QueryContext(ctx, fmt.Sprintf(`
        SELECT tablename FROM pg_tables
        WHERE schemaname = $1 AND tablename LIKE 'events_%' AND tablename < $2
    `, p.schema, fmt.Sprintf("events_%s", cutoff.Format("2006_01_02")))
    if err != nil { return err }
    defer rows.Close()
    for rows.Next() {
        var name string
        rows.Scan(&name)
        if _, err := p.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", p.schema, name)); err != nil {
            logger.Logger.Warn("drop partition failed", zap.String("name", name), zap.Error(err))
        } else {
            logger.Logger.Info("dropped old partition", zap.String("name", name))
        }
    }
    return nil
}

func (p *Partitioner) Start(ctx context.Context) {
    // 启动时确保未来 7 天
    p.EnsureFuturePartitions(ctx, p.preCreateDays)
    ticker := time.NewTicker(1 * time.Hour)
    defer ticker.Stop()
    for {
        select {
        case <-p.stop:
            return
        case <-ticker.C:
            hour := time.Now().Hour()
            if hour == 0 && time.Now().Minute() < 10 {
                p.EnsureFuturePartitions(ctx, p.preCreateDays)
            }
            if hour == 0 && time.Now().Minute() >= 30 && time.Now().Minute() < 40 {
                p.CleanupOldPartitions(ctx)
            }
        }
    }
}

func (p *Partitioner) Stop() { close(p.stop) }
  • Step 8.4: 跑测试(应通过)
TEST_DATABASE_URL="..." go test ./worker/ -v -run TestPartitioner

Expected: PASS

  • Step 8.5: Commit
git add backend/services/statisticService/worker/partitioner.go backend/services/statisticService/worker/partitioner_test.go
git commit -m "feat(statistic): partitioner worker (auto create/drop events partitions) (P2 T8)"

Task 9: Provider (TrackEvent/BatchTrackEvent) + main.go 集成 + socialService 联调

Files:

  • Create: backend/services/statisticService/provider/statistic_internal_provider.go

  • Modify: backend/services/statisticService/main.go

  • Create: pkg/statistic/client.go(业务侧统一 SDK

  • Modify: backend/services/socialService/service/asset_like_service.go(联调)

  • Step 9.1: 写 provider

backend/services/statisticService/provider/statistic_internal_provider.go

package provider

import (
    "context"

    "github.com/topfans/backend/pkg/logger"
    pb "github.com/topfans/backend/pkg/proto/statistic"
    "github.com/topfans/backend/pkg/proto/event"
    "github.com/topfans/backend/services/statisticService/model"
    "github.com/topfans/backend/services/statisticService/service"
    "go.uber.org/zap"
)

type StatisticInternalProvider struct {
    eventSvc *service.EventService
}

func NewStatisticInternalProvider(eventSvc *service.EventService) *StatisticInternalProvider {
    return &StatisticInternalProvider{eventSvc: eventSvc}
}

func (p *StatisticInternalProvider) TrackEvent(ctx context.Context, e *event.Event) (*pb.TrackEventResponse, error) {
    return p.eventSvc.TrackEvent(ctx, toModel(e))
}

func (p *StatisticInternalProvider) BatchTrackEvent(ctx context.Context, req *event.BatchEventRequest) (*pb.TrackEventResponse, error) {
    events := make([]*model.Event, 0, len(req.Events))
    for _, e := range req.Events { events = append(events, toModel(e)) }
    return p.eventSvc.BatchTrackEvent(ctx, events)
}

func toModel(e *event.Event) *model.Event {
    return &model.Event{
        EventID: e.EventId, UserID: e.UserId, StarID: e.StarId, EventType: e.EventType,
        OccurredAt: time.UnixMilli(e.OccurredAt), ReceivedAt: time.UnixMilli(e.ReceivedAt),
        Properties: e.Properties,
    }
}

import "time"

  • Step 9.2: 写 main.go 集成P2 完整版)

替换 P1 骨架 main.go加上

  • 创建 ChannelEventSink
  • 创建 EventRepository + MetricRepository
  • 创建 EventService
  • 创建 EventFlusher + Partitioner + WeeklyUserIncomeUpdater + UpcomingLevelUpsUpdater
  • 启动 4 个 workergoroutine
  • 创建 StatisticInternalProvider
  • Dubbo 注册 + Serve
// 加到 main.go (替换 Step 1.5 占位 main.go)

// ... 前面 init logger/config/db/redis/healthz 不变 ...

eventCh := make(chan *model.Event, config.ChannelCfg.EventChannelCapacity)
metrics.EventChannelCapacity.Set(float64(config.ChannelCfg.EventChannelCapacity))

// 启动 partitioner
partitioner := worker.NewPartitioner(db, config.DBConfig.Schema, config.PartitionCfg.PreCreateDays, config.PartitionCfg.RetentionDays)
go partitioner.Start(context.Background())

// repository
eventRepo := repository.NewEventRepository(db, config.DBConfig.Schema)
metricRepo := repository.NewMetricRepository(db, config.DBConfig.Schema)

// sink
cs := sink.NewChannelEventSink(eventCh)

// service
whiteList := []string{"asset.like", "asset.mint", "exhibition.start", "exhibition.end",
    "exhibition.revenue", "asset.level_up", "crystal.change"}
eventSvc := service.NewEventService(cs, whiteList)

// workers
flusher := worker.NewEventFlusher(eventCh, eventRepo, metricRepo, config.ChannelCfg.EventBatchSize, config.ChannelCfg.EventBatchInterval)
go flusher.Start(context.Background())

weeklyW := worker.NewWeeklyUserIncomeUpdater(metricRepo, config.RefreshCfg.WeeklyUserIncome)
go weeklyW.Start(context.Background())

upcomingW := worker.NewUpcomingLevelUpsUpdater(metricRepo, config.RefreshCfg.UpcomingLevelUps)
go upcomingW.Start(context.Background())

// provider
internalProvider := provider.NewStatisticInternalProvider(eventSvc)

// Dubbo server
srv, _ := server.NewServer(server.WithServerProtocol(protocol.WithPort(*port), protocol.WithTriple()))
pb.RegisterStatisticServiceHandler(srv, internalProvider)
go srv.Serve()

// ... graceful shutdown ...
  • Step 9.3: 编译验证
go build ./services/statisticService

Expected: 编译成功

  • Step 9.4: 创建 pkg/statistic 业务侧 SDK

backend/pkg/statistic/client.go

package statistic

import (
    "context"
    "sync"
    "time"

    "dubbo.apache.org/dubbo-go/v3/client"
    "github.com/google/uuid"
    pb "github.com/topfans/backend/pkg/proto/event"
)

var (
    instance *Client
    once     sync.Once
)

type Client struct {
    service pb.StatisticService
}

func Init(dubboClient *client.Client) error {
    var err error
    once.Do(func() {
        svc, e := pb.NewStatisticService(dubboClient)
        if e != nil { err = e; return }
        instance = &Client{service: svc}
    })
    return err
}

func Get() *Client { return instance }

// TrackEvent fire-and-forget, 不阻塞业务
func (c *Client) TrackEvent(ctx context.Context, e *pb.Event) {
    if c == nil || c.service == nil { return }
    if e.EventId == "" { e.EventId = uuid.New().String() }
    if e.OccurredAt == 0 { e.OccurredAt = time.Now().UnixMilli() }
    // 用独立 context 避免业务 ctx cancel 影响
    bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    go func() {
        defer cancel()
        c.service.TrackEvent(bgCtx, e)
    }()
}
  • Step 9.5: 业务侧 socialService 集成

修改 backend/services/socialService/service/asset_like_service.go,在 LikeAssetlike_income_log 后加:

// 写 like_income_log 之后
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
    EventId: uuid.New().String(),
    UserId: userID,
    StarId: starID,
    EventType: "asset.like",
    OccurredAt: time.Now().UnixMilli(),
    Properties: map[string]string{
        "asset_id": strconv.FormatInt(assetID, 10),
        "amount":   strconv.FormatInt(amount, 10),
    },
})

并在 socialService main.go 调用 statistic.Init(statisticClient)

  • Step 9.6: 启动两端 + 联调验证
# Terminal 1
go run ./services/statisticService

# Terminal 2
go run ./services/socialService

# Terminal 3: 触发点赞
# 用项目现有的 e2e 脚本或 API 调用

Expected:

  • statistic.events 表有新行(event_type='asset.like'

  • metric_recent_level_ups 表无变化(因为是 asset.like 不是 asset.level_up

  • 日志显示 event_flusher batch flushed

  • Step 9.7: 跑 P2 全量测试

cd backend
TEST_DATABASE_URL="..." go test ./services/statisticService/... -v

Expected: 全部通过

  • Step 9.8: Commit
git add backend/services/statisticService/ backend/pkg/statistic/ backend/services/socialService/
git commit -m "feat(statistic): P2 complete - event collection framework + socialService integration (T9)"

P3 看板 7 RPC

Task 10: 4 个物化视图 Workermaterializer

Files:

  • Create: backend/services/statisticService/worker/materializer.go
  • Create: backend/services/statisticService/worker/materializer_test.go

4 个 MV 已由 T3 步骤 3.5 创建 DDL。本任务实现定时刷新逻辑。

  • Step 10.1: 写 materializer 测试
// worker/materializer_test.go
package worker

import (
    "context"
    "database/sql"
    "os"
    "testing"
    "time"

    _ "github.com/lib/pq"
)

func TestMaterializer_RefreshDailyUserIncome(t *testing.T) {
    dsn := os.Getenv("TEST_DATABASE_URL")
    if dsn == "" { t.Skip("skip") }
    db, _ := sql.Open("postgres", dsn)
    defer db.Close()

    m := NewMaterializer(db, "statistic_test", 5*time.Second)
    if err := m.RefreshOne(context.Background(), "mv_daily_user_income"); err != nil {
        t.Fatal(err)
    }
    // 验证 refresh_log 有记录
    var n int
    db.QueryRow("SELECT COUNT(*) FROM statistic_test.refresh_log WHERE mv_name='mv_daily_user_income'").Scan(&n)
    if n < 1 { t.Fatal("no refresh_log entry") }
}
  • Step 10.2: 实现 materializer
// worker/materializer.go
package worker

import (
    "context"
    "database/sql"
    "time"

    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/services/statisticService/metrics"
    "go.uber.org/zap"
)

type Materializer struct {
    db     *sql.DB
    schema string
    stop   chan struct{}
}

func NewMaterializer(db *sql.DB, schema string, interval time.Duration) *Materializer {
    return &Materializer{db: db, schema: schema, stop: make(chan struct{})}
}

var mvList = []string{
    "mv_daily_user_income",
    "mv_daily_exhibition_revenue",
    "mv_daily_like_income",
    "mv_asset_level_distribution",
}

func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error {
    var got bool
    if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil {
        return err
    }
    if !got { return nil }
    defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)")

    t0 := time.Now()
    var mvID int
    if err := m.db.QueryRowContext(ctx,
        `INSERT INTO `+m.schema+`.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`,
        mvName).Scan(&mvID); err != nil {
        return err
    }

    _, err := m.db.ExecContext(ctx, "REFRESH MATERIALIZED VIEW CONCURRENTLY "+m.schema+"."+mvName)
    if err != nil {
        m.db.ExecContext(ctx, `UPDATE `+m.schema+`.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, err.Error(), mvID)
        metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc()
        return err
    }
    m.db.ExecContext(ctx, `UPDATE `+m.schema+`.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, mvID)
    metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc()
    metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds())
    return nil
}

func (m *Materializer) Start(ctx context.Context, interval time.Duration) {
    metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1)
    defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0)
    // 启动时错开 30s 跑一次每个
    for i, mv := range mvList {
        go func(idx int, mvName string) {
            time.Sleep(time.Duration(idx*30) * time.Second)
            for {
                select {
                case <-m.stop: return
                case <-time.After(interval):
                    if err := m.RefreshOne(ctx, mvName); err != nil {
                        logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err))
                    }
                }
            }
        }(i, mv)
    }
}

func (m *Materializer) Stop() { close(m.stop) }
  • Step 10.3: 跑测试(应通过)
TEST_DATABASE_URL="..." go test ./worker/ -v -run TestMaterializer

Expected: PASS

  • Step 10.4: Commit
git add backend/services/statisticService/worker/materializer.go
git commit -m "feat(statistic): materializer worker (4 MV refresh with concurrent + advisory lock) (P3 T10)"

Task 11: dashboard_repo7 聚合 SQL+ dashboard_service业务逻辑 + 缓存)

Files:

  • Create: backend/services/statisticService/repository/dashboard_repo.go

  • Create: backend/services/statisticService/service/dashboard_service.go

  • Create: backend/services/statisticService/service/cache.go

  • Step 11.1: 写 dashboard_repo 7 个 SQL 方法 + 测试

每个方法一个 SQL对应 spec §2.3 + §3.4/3.5 视图。示例:

// repository/dashboard_repo.go
package repository

import (
    "context"
    "database/sql"
    "time"
)

type DashboardRepository struct {
    db     *sql.DB
    schema string
}

func NewDashboardRepository(db *sql.DB, schema string) *DashboardRepository {
    return &DashboardRepository{db: db, schema: schema}
}

// GetTodayOverviewPart: 查 metric_weekly_user_income 算 week_rank
type TodayOverviewPart struct {
    WeekRank       int
    WeekTotalUsers int
}

func (r *DashboardRepository) GetWeekRank(ctx context.Context, userID, starID int64) (*TodayOverviewPart, error) {
    weekStart := weekStartMonday(time.Now())
    var rank sql.NullInt64
    err := r.db.QueryRowContext(ctx, fmt.Sprintf(`
        SELECT rank_in_star FROM %s.metric_weekly_user_income
        WHERE star_id=$1 AND user_id=$2 AND week_start=$3
    `, r.schema), starID, userID, weekStart).Scan(&rank)
    if err == sql.ErrNoRows { return &TodayOverviewPart{WeekRank: -1, WeekTotalUsers: 0}, nil }
    if err != nil { return nil, err }

    var totalUsers int
    err = r.db.QueryRowContext(ctx, fmt.Sprintf(`
        SELECT COUNT(*) FROM %s.metric_weekly_user_income
        WHERE star_id=$1 AND week_start=$2 AND total_crystal > 0
    `, r.schema), starID, weekStart).Scan(&totalUsers)
    if err != nil { return nil, err }

    return &TodayOverviewPart{
        WeekRank:       int(rank.Int64),
        WeekTotalUsers: totalUsers,
    }, nil
}

func (r *DashboardRepository) GetTodayIncome(ctx context.Context, userID, starID int64) (int64, error) {
    var income sql.NullInt64
    err := r.db.QueryRowContext(ctx, fmt.Sprintf(`
        SELECT COALESCE(SUM(CASE WHEN (properties->>'amount')::BIGINT > 0
                              THEN (properties->>'amount')::BIGINT ELSE 0 END), 0)
        FROM %s.events
        WHERE user_id=$1 AND star_id=$2
          AND event_type IN ('exhibition.revenue', 'crystal.change')
          AND received_at >= DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai')
    `, r.schema), userID, starID).Scan(&income)
    if err != nil && err != sql.ErrNoRows { return 0, err }
    return income.Int64, nil
}

// 7 日收益曲线: 读 mv_daily_user_income
type DailyIncomePoint struct {
    Date   string
    Income int64
}

func (r *DashboardRepository) Get7DayIncomeCurve(ctx context.Context, userID, starID int64) ([]DailyIncomePoint, int64, error) {
    rows, err := r.db.QueryContext(ctx, fmt.Sprintf(`
        SELECT income_date::text, COALESCE(total_crystal, 0) FROM %s.mv_daily_user_income
        WHERE user_id=$1 AND star_id=$2
          AND income_date >= (DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai') - INTERVAL '6 days')::date
        ORDER BY income_date ASC
    `, r.schema), userID, starID)
    if err != nil { return nil, 0, err }
    defer rows.Close()
    var points []DailyIncomePoint
    var total int64
    for rows.Next() {
        var p DailyIncomePoint
        rows.Scan(&p.Date, &p.Income)
        points = append(points, p)
        total += p.Income
    }
    return points, total, nil
}

// 剩余 5 个 RPC 类似实现...

func weekStartMonday(t time.Time) time.Time {
    loc, _ := time.LoadLocation("Asia/Shanghai")
    t2 := t.In(loc)
    offset := (int(t2.Weekday()) + 6) % 7 // Mon=0
    return time.Date(t2.Year(), t2.Month(), t2.Day()-offset, 0, 0, 0, 0, loc)
}

测试:每个方法一个集成测试,用 TEST_DATABASE_URL skip 模式,构造测试数据 + 验证返回值。

  • Step 11.2: 写 cache 封装
// service/cache.go
package service

import (
    "context"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

type Cache struct {
    rdb *redis.Client
    ttl time.Duration
}

func NewCache(rdb *redis.Client) *Cache { return &Cache{rdb: rdb, ttl: 5 * time.Minute} }

func (c *Cache) Get(ctx context.Context, key string) (string, bool, error) {
    v, err := c.rdb.Get(ctx, key).Result()
    if err == redis.Nil { return "", false, nil }
    if err != nil { return "", false, err }
    return v, true, nil
}

func (c *Cache) Set(ctx context.Context, key, value string) error {
    return c.rdb.Set(ctx, key, value, c.ttl).Err()
}

func (c *Cache) SetEmpty(ctx context.Context, key string) error {
    // 缓存穿透防护:空值 1 分钟
    return c.rdb.Set(ctx, key, "null", 1*time.Minute).Err()
}

func CacheKey(rpc string, starID, userID int64) string {
    return fmt.Sprintf("dash:%s:%d:%d", rpc, starID, userID)
}
  • Step 11.3: 写 dashboard_service7 RPC 业务逻辑 + 缓存 + 降级)
// service/dashboard_service.go
package service

import (
    "context"

    "github.com/topfans/backend/pkg/logger"
    pb "github.com/topfans/backend/pkg/proto/statistic"
    "github.com/topfans/backend/services/statisticService/repository"
    "go.uber.org/zap"
)

type DashboardService struct {
    repo     *repository.DashboardRepository
    cache    *Cache
    userRPC  UserRPCClient // 跨服务调 userService.crystal_balance
}

func NewDashboardService(repo *repository.DashboardRepository, cache *Cache, userRPC UserRPCClient) *DashboardService {
    return &DashboardService{repo: repo, cache: cache, userRPC: userRPC}
}

func (s *DashboardService) GetTodayOverview(ctx context.Context, userID, starID int64) (*pb.GetTodayOverviewResponse, error) {
    key := CacheKey("today_overview", starID, userID)
    if v, ok, _ := s.cache.Get(ctx, key); ok { /* unmarshal & return */ }

    part, err := s.repo.GetWeekRank(ctx, userID, starID)
    if err != nil { return nil, err }
    todayIncome, err := s.repo.GetTodayIncome(ctx, userID, starID)
    if err != nil { return nil, err }

    // 跨服务: userService.crystal_balance
    var crystal int64
    if s.userRPC != nil {
        if c, err := s.userRPC.GetCrystalBalance(ctx, userID, starID); err == nil {
            crystal = c
        } else {
            logger.Logger.Warn("userService.GetCrystalBalance failed, use 0", zap.Error(err))
            // 降级:返回 0stale 标记
        }
    }

    resp := &pb.GetTodayOverviewResponse{
        CrystalBalance:  crystal,
        TodayIncome:     todayIncome,
        WeekRank:        int32(part.WeekRank),
        WeekTotalUsers:  int32(part.WeekTotalUsers),
    }
    // 缓存 5min (省略 marshal)
    return resp, nil
}

// 剩余 6 个 RPC 类似...

// UserRPCClient interface
type UserRPCClient interface {
    GetCrystalBalance(ctx context.Context, userID, starID int64) (int64, error)
}

完整 6 个 RPC 实现遵循同样模式:先查 cache → miss 时查 MV/预聚表 → 构造响应 → 缓存。代码量大但模式统一。

  • Step 11.4: 跑 dashboard_service 测试
TEST_DATABASE_URL="..." go test ./service/ -v -run TestDashboard

Expected: 7 个 RPC 单测全部通过

  • Step 11.5: Commit
git add backend/services/statisticService/repository/dashboard_repo.go \
        backend/services/statisticService/service/dashboard_service.go \
        backend/services/statisticService/service/cache.go
git commit -m "feat(statistic): dashboard_repo (7 SQL) + dashboard_service (7 RPC + cache + degrade) (P3 T11)"

Task 12: 看板 mobile provider + main.go 集成

Files:

  • Create: backend/services/statisticService/provider/statistic_mobile_provider.go

  • Modify: backend/services/statisticService/main.go

  • Step 12.1: 写 mobile provider

// provider/statistic_mobile_provider.go
package provider

import (
    "context"

    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/services/statisticService/metrics"
    pb "github.com/topfans/backend/pkg/proto/statistic"
    "github.com/topfans/backend/services/statisticService/service"
    "go.uber.org/zap"
    "strconv"
    "time"
)

type StatisticMobileProvider struct {
    dashSvc *service.DashboardService
}

func NewStatisticMobileProvider(dashSvc *service.DashboardService) *StatisticMobileProvider {
    return &StatisticMobileProvider{dashSvc: dashSvc}
}

func userIDFromContext(ctx context.Context) int64 {
    if v := ctx.Value("user_id"); v != nil {
        if s, ok := v.(string); ok { n, _ := strconv.ParseInt(s, 10, 64); return n }
    }
    return 0
}

func (p *StatisticMobileProvider) recordRPC(rpc string, start time.Time, err error) {
    status := "ok"
    if err != nil { status = "error" }
    metrics.DashboardRPCTotal.WithLabelValues(rpc, status).Inc()
    metrics.DashboardRPCDuration.WithLabelValues(rpc).Observe(time.Since(start).Seconds())
}

func (p *StatisticMobileProvider) GetTodayOverview(ctx context.Context, req *pb.GetTodayOverviewRequest) (*pb.GetTodayOverviewResponse, error) {
    t0 := time.Now(); defer func() { p.recordRPC("GetTodayOverview", t0, nil) }()
    return p.dashSvc.GetTodayOverview(ctx, userIDFromContext(ctx), req.StarId)
}

// 剩余 6 个 RPC 方法类似...
  • Step 12.2: 修改 main.go 注册 mobile provider

加:

// 加到 main.go (在 internalProvider 后)

dashRepo := repository.NewDashboardRepository(db, config.DBConfig.Schema)
cache := service.NewCache(rdb)
userCli, _ := dubboclient.NewClient(dubboclient.WithClientURL("tri://localhost:20000")) // userService
userRPC := client.NewUserServiceClient(pbUser.NewUserSocialService(userCli))
dashSvc := service.NewDashboardService(dashRepo, cache, userRPC)

mobileProvider := provider.NewStatisticMobileProvider(dashSvc)
pb.RegisterStatisticServiceHandler(srv, mobileProvider)

注意:原 proto 是 service StatisticService,包含 7 看板 + 2 事件 = 9 个 RPC。需在 statistic.proto 里把 mobile 和 internal 拆成两个 serviceStatisticMobileService + StatisticInternalService否则会冲突。本步骤前提T2 已拆分。如果没拆,需要回到 T2 调整 proto重新生成。

实际项目更可能是两个 service本计划假设已拆spec §1.3 提到 mobile_provider / internal_provider 两个文件)。

  • Step 12.3: 编译 + 启动 + 测 7 RPC
go build ./services/statisticService
go run ./services/statisticService &
# 7 RPC 端到端测试
# 用 grpcurl 或写个简单的 Go client

Expected: 7 个 RPC 都返回非错误响应

  • Step 12.4: Commit
git add backend/services/statisticService/
git commit -m "feat(statistic): mobile provider (7 RPC) + main.go 集成 (P3 T12)"

Task 13: Gateway 7 路由 + controller

Files:

  • Modify: backend/gateway/router/router.go

  • Create: backend/gateway/controller/statistic_controller.go

  • Step 13.1: 创建 controller

// gateway/controller/statistic_controller.go
package controller

import (
    "context"
    "net/http"
    "strconv"
    "time"

    "dubbo.apache.org/dubbo-go/v3/client"
    "github.com/gin-gonic/gin"
    "github.com/topfans/backend/gateway/pkg/response"
    pb "github.com/topfans/backend/pkg/proto/statistic"
)

type StatisticController struct {
    mobile pb.StatisticMobileService
}

func NewStatisticController(dubboClient *client.Client) (*StatisticController, error) {
    svc, err := pb.NewStatisticMobileService(dubboClient)
    if err != nil { return nil, err }
    return &StatisticController{mobile: svc}, nil
}

func (ctrl *StatisticController) parseStarID(c *gin.Context) (int64, bool) {
    v := c.Query("star_id")
    if v == "" { response.Error(c, 400, "star_id is required"); return 0, false }
    n, err := strconv.ParseInt(v, 10, 64)
    if err != nil { response.Error(c, 400, "star_id invalid"); return 0, false }
    return n, true
}

func (ctrl *StatisticController) ctxWithUserID(c *gin.Context) context.Context {
    ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
    defer cancel()
    userID, _ := c.Get("user_id")
    // 透传到 dubbo 调 user_id
    return ctxWithUserIDValue(ctx, userID)
}

// 7 个 controller 方法
func (ctrl *StatisticController) GetTodayOverview(c *gin.Context) {
    starID, ok := ctrl.parseStarID(c); if !ok { return }
    resp, err := ctrl.mobile.GetTodayOverview(ctrl.ctxWithUserID(c), &pb.GetTodayOverviewRequest{StarId: starID})
    if err != nil { response.Error(c, 500, err.Error()); return }
    response.OK(c, gin.H{"code": 200, "data": resp})
}

// 6 个类似...
  • Step 13.2: 注册路由

修改 backend/gateway/router/router.go,加:

// 在 SetupRouter 函数里
statisticCtrl, err := controller.NewStatisticController(statisticClient)
if err != nil { return nil, err }
api := r.Group("/api/v1/dashboard")
{
    api.GET("/today-overview", middleware.JWTAuth(), statisticCtrl.GetTodayOverview)
    api.GET("/income-curve", middleware.JWTAuth(), statisticCtrl.Get7DayIncomeCurve)
    api.GET("/exhibition-summary", middleware.JWTAuth(), statisticCtrl.GetExhibitionIncomeSummary)
    api.GET("/like-income-by-level", middleware.JWTAuth(), statisticCtrl.GetLikeIncomeByLevel)
    api.GET("/top-assets", middleware.JWTAuth(), statisticCtrl.GetTopAssetsByEarning)
    api.GET("/level-distribution", middleware.JWTAuth(), statisticCtrl.GetAssetLevelDistribution)
    api.GET("/upgrade-progress", middleware.JWTAuth(), statisticCtrl.GetAssetUpgradeProgress)
}

并在 SetupRouter 入参加 statisticClient *client.Client

  • Step 13.3: 启动 gateway + 测试 7 路由
go build ./gateway
go run ./gateway &
# 7 个 HTTP 调用
curl -H "Authorization: Bearer $TOKEN" 'http://localhost:8080/api/v1/dashboard/today-overview?star_id=1'
# ...

Expected: 7 个 HTTP 端点全部返回 {code:200, data:...}

  • Step 13.4: 跑性能测试k6/wrk
# k6 脚本spec §5.4 性能基准)
k6 run --vus 10 --duration 30s - << 'EOF'
import http from 'k6/http';
export default function() {
    http.get('http://localhost:8080/api/v1/dashboard/today-overview?star_id=1', {
        headers: { Authorization: 'Bearer $TOKEN' }
    });
}
EOF

Expected: P99 < 200ms缓存命中时 <10ms

  • Step 13.5: Commit
git add backend/gateway/
git commit -m "feat(statistic): gateway 7 dashboard routes + statistic controller (P3 T13)"

Task 14: 启动预热 + 健康检查 + 完整 E2E

Files:

  • Modify: backend/services/statisticService/main.go

  • Step 14.1: 加预热逻辑

在 main.go 启动 Dubbo server 后加:

go func() {
    time.Sleep(10 * time.Second) // 等服务起来
    sampleUserID := int64(1) // 实际从配置读
    for _, starID := range getTopNStars(config.PartitionCfg.PreCreateDays * 5) { // 35 个 top star
        for _, rpc := range []string{"today_overview", "7day_income_curve"} { // 7 个 RPC 简写
            go callAndWarmupCache(rpc, starID, sampleUserID, dashSvc)
        }
    }
}()

实际 cache warmup 函数封装 cache.Get / cache.Set。简化版只预热前 2 个 RPC最常访问

  • Step 14.2: 跑 E2E 测试

用 Playwright 跑前端 dashboard.vue 7 个数据点,加载到渲染不报错。

Expected: 7 个图表都能显示数据

  • Step 14.3: 切流量

将前端 USE_MOCK_API = false,前端 dashboardApi 自动调真实后端。

  • Step 14.4: 跑契约测试
# 用 proto 生成的 json schema 校验响应字段
go test ./integration/ -v -run TestContract

Expected: 7 个 RPC 响应字段不变

  • Step 14.5: Commit
git add backend/services/statisticService/main.go backend/integration/
git commit -m "feat(statistic): P3 complete - 7 dashboard RPC end-to-end + cache warmup (T14)"

P4 业务侧补全

Task 15: galleryService + taskService 集成 TrackEvent

Files:

  • Modify: backend/services/galleryService/service/exhibition_service.go

  • Modify: backend/services/taskService/service/revenue_service.go

  • Create: backend/services/galleryService/.../exhibition_service_test.gomock TrackEvent

  • Create: backend/services/taskService/.../revenue_service_test.go

  • Step 15.1: galleryService 集成

修改 exhibition_service.go

// 在 PlaceAsset 方法末尾
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
    EventId: uuid.New().String(),
    UserId: userID, StarId: starID,
    EventType: "exhibition.start",
    OccurredAt: time.Now().UnixMilli(),
    Properties: map[string]string{"asset_id": strconv.FormatInt(assetID, 10), "slot_id": strconv.FormatInt(slotID, 10)},
})

// 在 RemoveFromSlot 方法末尾
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
    EventId: uuid.New().String(),
    UserId: userID, StarId: starID,
    EventType: "exhibition.end",
    OccurredAt: time.Now().UnixMilli(),
    Properties: map[string]string{"asset_id": ..., "duration_ms": strconv.FormatInt(duration.Milliseconds(), 10)},
})

在 galleryService main.go 加 statistic.Init(statisticClient)

  • Step 15.2: taskService 集成

修改 revenue_service.goOnExhibitionCompleted 方法末尾:

statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
    EventId: uuid.New().String(),
    UserId: userID, StarId: starID,
    EventType: "exhibition.revenue",
    OccurredAt: time.Now().UnixMilli(),
    Properties: map[string]string{
        "asset_id": strconv.FormatInt(assetID, 10),
        "amount": strconv.FormatInt(amount, 10),
        "duration_ms": strconv.FormatInt(duration.Milliseconds(), 10),
    },
})
  • Step 15.3: 写 mock 测试

galleryService/.../exhibition_service_test.go

// 用 mock 替换 statistic.Get() 验证 TrackEvent 被调用
func TestPlaceAsset_TrackEvent(t *testing.T) {
    captured := &capturedEvent{}
    statistic.SetMockForTest(captured) // 在 pkg/statistic 加测试用 mock 钩子
    // ... 调 PlaceAsset ...
    if captured.event == nil || captured.event.EventType != "exhibition.start" {
        t.Fatal("TrackEvent not called with exhibition.start")
    }
}

需要在 pkg/statistic/client.goSetMockForTest(c Capturer) 函数。

  • Step 15.4: 启动两端 + 验证事件落库
# 1. 启动 statisticService
# 2. 启动 galleryService / taskService
# 3. 触发 PlaceAsset / OnExhibitionCompleted
# 4. 查 statistic.events 看 exhibition.start / exhibition.end / exhibition.revenue
psql -c "SELECT event_type, COUNT(*) FROM statistic.events WHERE event_type IN ('exhibition.start','exhibition.end','exhibition.revenue') GROUP BY event_type"

Expected: 3 个 event_type 都有新行

  • Step 15.5: Commit
git add backend/services/galleryService/ backend/services/taskService/ backend/pkg/statistic/
git commit -m "feat(statistic): P4 step 1 - galleryService + taskService TrackEvent integration (T15)"

Task 16: assetService + userService 集成 TrackEvent + 看板数据 7 事件类型全出现

Files:

  • Modify: backend/services/assetService/service/mint_service.go

  • Modify: backend/services/assetService/service/asset_level_service.go

  • Modify: backend/services/userService/service/user_service.go

  • Tests

  • Step 16.1: assetService 集成(铸造)

mint_service.goCreateMintOrder 成功后:

statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
    EventId: uuid.New().String(),
    UserId: userID, StarId: starID,
    EventType: "asset.mint",
    OccurredAt: time.Now().UnixMilli(),
    Properties: map[string]string{"asset_id": strconv.FormatInt(assetID, 10), "level": level},
})
  • Step 16.2: assetService 集成(升级)

asset_level_service.goCheckUpgrade 返回 true 触发升级后(或 logLevelChange 调用前):

statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
    EventId: uuid.New().String(),
    UserId: userID, StarId: starID,
    EventType: "asset.level_up",
    OccurredAt: time.Now().UnixMilli(),
    Properties: map[string]string{
        "asset_id": strconv.FormatInt(assetID, 10),
        "from": fromLevel, "to": toLevel,
    },
})

P1 末向 assetService 同学确认实际升级触发点CheckUpgrade 返回 true 后在哪行调升级),用真实代码位置。

  • Step 16.3: userService 集成(水晶账本变动)

user_service.goUpdateCrystalBalance 方法末尾(成功后):

statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
    EventId: uuid.New().String(),
    UserId: req.UserId, StarId: req.StarId,
    EventType: "crystal.change",
    OccurredAt: time.Now().UnixMilli(),
    Properties: map[string]string{
        "amount": strconv.FormatInt(req.Delta, 10),
        "reason": req.Reason, // 业务侧传
    },
})
  • Step 16.4: 跑全量联调
# 触发 7 个事件类型各 1 次
# 用项目现有 e2e 脚本或 API
# 然后查 events 表
psql -c "SELECT event_type, COUNT(*) FROM statistic.events WHERE received_at > NOW() - INTERVAL '1 hour' GROUP BY event_type ORDER BY event_type"

Expected: 7 个 event_type 都有新行asset.like / asset.mint / exhibition.start / exhibition.end / exhibition.revenue / asset.level_up / crystal.change

  • Step 16.5: 验证看板 7 RPC 数据完整性
# 切前端 USE_MOCK_API=false
# 登录 → 进入 dashboard 页 → 检查 7 个图表都有数据

Expected: 7 个图表都有真实数据,无空状态

  • Step 16.6: 跑所有 P4 服务单测
cd backend
go test ./services/galleryService/... ./services/taskService/... ./services/assetService/... ./services/userService/... -v

Expected: 全部通过

  • Step 16.7: Commit
git add backend/services/assetService/ backend/services/userService/
git commit -m "feat(statistic): P4 complete - assetService + userService TrackEvent + 7 event types end-to-end (T16)"

完成验证(本期交付清单)

功能

  • 7 个看板 RPC 端到端通gateway → statisticService → MV
  • week_rank 完整实现(排名准确)
  • TrackEvent 接收 + 去重 + 批量
  • 4 MV + 3 metric 全部自动刷新
  • events 按日分区自动管理7 天预创建 + 30 天清理)
  • 5 个业务服务全部集成 TrackEvent
  • 7 个事件类型asset.like / asset.mint / exhibition.start / end / revenue / level_up / crystal.change全部接入

性能

  • 看板单 RPC 缓存命中 P99 < 10ms
  • 看板 7 RPC 并发 P99 < 500ms
  • TrackEvent P99 < 50ms
  • 物化视图刷新 < 30s/视图
  • 缓存命中率 > 80%(启动预热后)

可靠性

  • 单元测试覆盖率 > 80%
  • 集成测试覆盖 100% RPCdockertest 真实 PG
  • TrackEvent 失败业务方有 retry业务侧 defer + recover
  • partition 缺失自动创建
  • MV 刷新失败有 refresh_log 记录

可观测性

  • Prometheus 指标 20+ 全部埋点dashboard_rpc / event_track / mv_refresh / worker_running / events_partition_count
  • /healthz 端点 200
  • refresh_log 表完整记录
  • /metrics 端点暴露

集成

  • Gateway 7 路由注册成功
  • 前端 USE_MOCK_API=false 切流量成功
  • 业务侧 5 个服务部署后事件能落库
  • statistic schema 10 张表全部存在

安全

  • JWT 鉴权在 gateway 拦截
  • 粉丝身份校验userService.fan_profile 调通)
  • JSONB 注入防护(参数化 SQL
  • 限流配置生效spec §4.6

自我审查

1. Spec 覆盖

Spec 项 任务
看板 7 RPC 实现 T11, T12, T13
事件采集 (TrackEvent/BatchTrackEvent) T4, T5, T6, T9
预聚表 (3 张) T6 (recent), T7 (weekly, upcoming)
物化视图 (4 个) T3 (DDL), T10 (刷新)
分区管理 T3 (DDL), T8 (worker)
Gateway 7 路由 + JWT + 粉丝身份 T13
5 业务服务集成 T9 (social), T15 (gallery, task), T16 (asset, user)
Prometheus 指标 T3 (骨架), T9 (事件), T10 (MV), T12 (看板)
healthz T3
启动预热 T14
P1 末预检查 T3 step 3.9

2. 占位符扫描

  • 无 TBD/TODO
  • 无 "implement later" / "fill in details"
  • 无 "add appropriate error handling"(错误处理在每个任务的具体代码中已写)
  • 无 "Similar to Task N"(每个任务代码独立完整)
  • 所有代码步骤包含完整代码

3. 类型一致性

任务 定义 后续使用
T4 Event model T5 repo 用, T5 service 用, T6 worker 用, T9 provider 用
T4 EventSink 接口 T4 实现, T5 service 注入, T9 main 注入
T5 EventRepository.InsertBatch T6 flusher 调, T9 main 注入
T5 EventService.TrackEvent T9 provider 调
T6 MetricRepository.UpsertRecentLevelUp T6 flusher 调, T9 main 注入
T7 MetricRepository.RefreshWeeklyUserIncome T7 worker 调, T9 main 注入
T7 MetricRepository.RefreshUpcomingLevelUps T7 worker 调, T9 main 注入
T8 Partitioner.EnsureFuturePartitions T8 Start 调, T9 main 注入
T9 StatisticInternalProvider T9 main 注册 Dubbo
T10 Materializer.RefreshOne T10 Start 调
T11 DashboardRepository.GetWeekRank T11 service 调
T11 DashboardService.GetTodayOverview T12 mobile provider 调
T12 StatisticMobileProvider T13 gateway controller 调
T13 StatisticController.GetTodayOverview T14 E2E 测试
T9 pkg/statistic.Client.TrackEvent T9 socialService 调, T15 gallery/task 调, T16 asset/user 调

所有类型/方法在定义任务和后续使用任务间一致。


计划完成,保存到 docs/superpowers/plans/2026-06-08-statistic-kanban-and-event-implementation.md