diff --git a/docs/superpowers/plans/2026-06-08-statistic-kanban-and-event-implementation.md b/docs/superpowers/plans/2026-06-08-statistic-kanban-and-event-implementation.md new file mode 100644 index 0000000..783c137 --- /dev/null +++ b/docs/superpowers/plans/2026-06-08-statistic-kanban-and-event-implementation.md @@ -0,0 +1,3014 @@ +# statisticService 看板 + 事件采集 实施计划 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 实现 `statisticService` 后端服务(Go Dubbo-go),提供数据看板 7 RPC + 事件采集框架(TrackEvent / BatchTrackEvent),并将 5 个业务服务(social / asset / gallery / task / user)改造接入 TrackEvent 上报。 + +**Architecture:** +- `statisticService`:Go Dubbo-go 服务,端口 20009,暴露 7 个看板 RPC(经 gateway)+ 2 个事件 RPC(内部调用,TrackEvent / BatchTrackEvent) +- 事件流:业务服务 → Dubbo TrackEvent → ChannelEventSink → event_flusher Worker 批量落库到 `statistic.events` 表(按日分区) +- 看板数据:4 个物化视图(MV)+ 3 个预聚表(Worker 维护),看板查询走 MV/预聚表,Redis 5min TTL 缓存 +- Gateway:经 `backend/gateway/controller/statistic_controller.go` 暴露 7 个 GET `/api/v1/dashboard/*` 路由,响应统一包装 `{code:200, data:resp}` + +**Tech Stack:** Go (dubbo-go v3 triple)、GORM、PostgreSQL (按日分区 + 物化视图 + 预聚表)、Redis 缓存、Prometheus 指标、gin (gateway) + +**Spec:** `docs/superpowers/specs/2026-06-04-statistic-service-design.md`(含 §0.1 本期实施范围) + +--- + +## 文件结构 + +### 新建 + +``` +backend/ +├── proto/ +│ ├── event.proto # 新建:通用事件类型(独立 proto) +│ └── statistic.proto # 新建:statistic 服务 + 7 看板 RPC + 2 事件 RPC +├── pkg/proto/ +│ ├── event/ # 新建:event.proto 编译产物 +│ └── statistic/ # 新建:statistic.proto 编译产物 +├── services/statisticService/ # 新建(目录已存在但为空) +│ ├── go.mod / go.sum # 新建 +│ ├── main.go # 新建 +│ ├── config/ +│ │ └── statistic_config.go # 新建 +│ ├── model/ +│ │ ├── event.go # 新建:Event 模型 +│ │ └── metric.go # 新建:7 看板响应结构 +│ ├── repository/ +│ │ ├── event_repo.go # 新建:events 表 + 3 预聚表 +│ │ └── dashboard_repo.go # 新建:4 MV 聚合查询 +│ ├── sink/ +│ │ ├── event_sink.go # 新建:EventSink 接口 +│ │ └── channel_sink.go # 新建:ChannelEventSink(本期) +│ ├── service/ +│ │ ├── event_service.go # 新建:事件校验 + Submit +│ │ ├── dashboard_service.go # 新建:7 看板 RPC 业务逻辑 +│ │ ├── metric_service.go # 新建:MV 刷新协调 +│ │ └── cache.go # 新建:Redis 缓存封装 +│ ├── provider/ +│ │ ├── statistic_mobile_provider.go # 新建:7 看板 RPC(mobile) +│ │ └── statistic_internal_provider.go # 新建:TrackEvent / BatchTrackEvent(internal) +│ ├── worker/ +│ │ ├── event_flusher.go # 新建:事件批量落库 +│ │ ├── metric_recent_level_ups_updater.go # 新建:同步触发 +│ │ ├── metric_upcoming_level_ups_updater.go # 新建:15min ticker +│ │ ├── metric_weekly_user_income_updater.go # 新建:5min ticker +│ │ ├── materializer.go # 新建:4 MV 刷新 +│ │ └── partitioner.go # 新建:events 分区管理 +│ ├── handler/ +│ │ └── healthz.go # 新建:健康检查 +│ ├── metrics/ +│ │ └── metrics.go # 新建:Prometheus 指标声明 +│ ├── client/ # 新建:跨服务 RPC 客户端 +│ │ ├── user_rpc_client.go +│ │ └── gallery_rpc_client.go +│ └── scripts/ # 新建:测试 fixture +│ └── testhelper/ +├── migrations/ +│ ├── 2026_06_08_001_statistic_events.sql +│ ├── 2026_06_08_002_statistic_mv_daily_user_income.sql +│ ├── 2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql +│ ├── 2026_06_08_004_statistic_mv_daily_like_income.sql +│ ├── 2026_06_08_005_statistic_mv_asset_level_distribution.sql +│ ├── 2026_06_08_006_statistic_metric_weekly_user_income.sql +│ ├── 2026_06_08_007_statistic_metric_recent_level_ups.sql +│ ├── 2026_06_08_008_statistic_metric_upcoming_level_ups.sql +│ ├── 2026_06_08_009_statistic_refresh_log.sql +│ └── 2026_06_08_010_statistic_partitions_initial.sql +├── gateway/ +│ ├── router/router.go # 修改:注册 7 路由 +│ └── controller/statistic_controller.go # 新建:7 看板 controller +└── pkg/statistic/ # 新建:业务侧统一调用 SDK + └── client.go +``` + +### 修改 + +``` +backend/ +├── go.work # 修改:加 ./services/statisticService +├── services/socialService/service/asset_like_service.go # 修改:写 like_income_log 后调 TrackEvent +├── services/galleryService/service/exhibition_service.go # 修改:PlaceAsset / RemoveFromSlot 后调 +├── services/taskService/service/revenue_service.go # 修改:OnExhibitionCompleted 后调 +├── services/assetService/service/mint_service.go # 修改:CreateMintOrder 后调 +├── services/assetService/service/asset_level_service.go # 修改:CheckUpgrade 后调 +└── services/userService/service/user_service.go # 修改:UpdateCrystalBalance 后调 +``` + +--- + +## 任务编排 + +**4 阶段 / 16 任务 / 每个任务包含 TDD 步骤**: + +| 阶段 | 任务 | 周期(估)| +|------|------|----------| +| **P1 服务骨架** | T1-T3 | 2-3 天 | +| **P2 事件采集** | T4-T9 | 4-5 天 | +| **P3 看板 7 RPC** | T10-T14 | 4-5 天 | +| **P4 业务侧集成** | T15-T16 | 2-3 天 | + +每个任务结束都应有:所有测试通过 + git commit + 关键验证勾选完成。 + +--- + +## P1 服务骨架 + +### Task 1: 项目初始化 + go.mod / go.work / 目录结构 + +**Files:** +- Create: `backend/services/statisticService/go.mod` +- Modify: `backend/go.work` + +- [ ] **Step 1.1: 创建 go.mod** + +```bash +cd backend/services/statisticService +go mod init github.com/topfans/backend/services/statisticService +``` + +- [ ] **Step 1.2: 添加核心依赖(沿用 taskService)** + +```bash +go get github.com/topfans/backend/pkg/database@v0.0.0 +go get github.com/topfans/backend/pkg/logger@v0.0.0 +go get github.com/topfans/backend/pkg/health@v0.0.0 +go get dubbo.apache.org/dubbo-go/v3 +go get gorm.io/gorm +go get gorm.io/driver/postgres +go get github.com/redis/go-redis/v9 +go get github.com/prometheus/client_golang/prometheus +``` + +> 实际版本号看 go.work 里 taskService 的 go.mod。复制其 `require` 块,删除 taskService 特定的包(assetLevelRepo 等)。 + +- [ ] **Step 1.3: 修改 go.work 加入新服务** + +在 `backend/go.work` 的 `use` 列表里加: +``` + ./services/statisticService +``` + +- [ ] **Step 1.4: 创建子目录结构** + +```bash +mkdir -p backend/services/statisticService/{config,model,repository,sink,service,provider,worker,handler,metrics,client,scripts/testhelper} +``` + +- [ ] **Step 1.5: 创建占位 main.go** + +```go +// backend/services/statisticService/main.go +package main + +import "fmt" + +func main() { + fmt.Println("statisticService starting...") +} +``` + +- [ ] **Step 1.6: 编译验证** + +```bash +cd backend +go build ./services/statisticService +``` + +Expected: 编译成功,无输出。 + +- [ ] **Step 1.7: Commit** + +```bash +git add backend/services/statisticService/ backend/go.work +git commit -m "feat(statistic): scaffold statisticService with go.mod and directory structure" +``` + +--- + +### Task 2: proto 定义(event.proto + statistic.proto) + +**Files:** +- Create: `backend/proto/event.proto` +- Create: `backend/proto/statistic.proto` + +> **关键:proto 冻结点。** P2/P3 业务逻辑依赖此定义。 + +- [ ] **Step 2.1: 创建 event.proto** + +```bash +cat > backend/proto/event.proto << 'EOF' +syntax = "proto3"; +package event; +option go_package = "github.com/topfans/backend/pkg/proto/event"; + +message Event { + string event_id = 1; + int64 user_id = 2; + int64 star_id = 3; + string event_type = 4; + int64 occurred_at = 5; + int64 received_at = 6; + map properties = 7; +} + +message BatchEventRequest { + repeated Event events = 1; +} +EOF +``` + +- [ ] **Step 2.2: 创建 statistic.proto(对齐 spec §2.3)** + +```bash +cat > backend/proto/statistic.proto << 'EOF' +syntax = "proto3"; +package statistic; +option go_package = "github.com/topfans/backend/pkg/proto/statistic"; +import "event.proto"; + +service StatisticService { + rpc GetTodayOverview(GetTodayOverviewRequest) returns (GetTodayOverviewResponse); + rpc Get7DayIncomeCurve(Get7DayIncomeCurveRequest) returns (Get7DayIncomeCurveResponse); + rpc GetExhibitionIncomeSummary(GetExhibitionIncomeSummaryRequest) returns (GetExhibitionIncomeSummaryResponse); + rpc GetLikeIncomeByLevel(GetLikeIncomeByLevelRequest) returns (GetLikeIncomeByLevelResponse); + rpc GetTopAssetsByEarning(GetTopAssetsByEarningRequest) returns (GetTopAssetsByEarningResponse); + rpc GetAssetLevelDistribution(GetAssetLevelDistributionRequest) returns (GetAssetLevelDistributionResponse); + rpc GetAssetUpgradeProgress(GetAssetUpgradeProgressRequest) returns (GetAssetUpgradeProgressResponse); + rpc TrackEvent(event.Event) returns (TrackEventResponse); + rpc BatchTrackEvent(event.BatchEventRequest) returns (TrackEventResponse); +} + +// ====== 1. 今日概览 ====== +message GetTodayOverviewRequest { int64 star_id = 1; } +message GetTodayOverviewResponse { + int64 crystal_balance = 1; + int64 today_income = 2; + int32 week_rank = 3; + int32 week_total_users = 4; +} + +// ====== 2. 七日收益曲线 ====== +message Get7DayIncomeCurveRequest { int64 star_id = 1; } +message DailyIncomePoint { + string date = 1; + int64 income = 2; + bool is_today = 3; + bool is_peak = 4; +} +message Get7DayIncomeCurveResponse { + repeated DailyIncomePoint points = 1; + int64 total_income = 2; + int64 avg_income = 3; +} + +// ====== 3. 展出收益中心 ====== +message GetExhibitionIncomeSummaryRequest { int64 star_id = 1; } +message TopExhibitionItem { + int64 asset_id = 1; + string asset_name = 2; + string asset_thumb = 3; + string duration_7d = 4; + int64 earnings_7d = 5; + int32 avg_earnings = 6; +} +message GetExhibitionIncomeSummaryResponse { + int32 exhibiting_count = 1; + int32 starbook_count = 2; + string total_duration = 3; + int64 total_earnings = 4; + repeated TopExhibitionItem top5 = 5; +} + +// ====== 4. 点赞收益按等级 ====== +message GetLikeIncomeByLevelRequest { int64 star_id = 1; } +message LikeIncomeLevelItem { + string level = 1; + int32 asset_count = 2; + int64 total_income = 3; + string thumb = 4; +} +message GetLikeIncomeByLevelResponse { + int64 total_like_count = 1; + int64 total_income = 2; + repeated LikeIncomeLevelItem levels = 3; +} + +// ====== 5. 藏品 TOP5 ====== +message GetTopAssetsByEarningRequest { int64 star_id = 1; } +message TopAssetItem { + int64 asset_id = 1; + string asset_name = 2; + string asset_thumb = 3; + int64 total_earnings = 4; + int32 rank = 5; +} +message GetTopAssetsByEarningResponse { + repeated TopAssetItem items = 1; +} + +// ====== 6. 藏品等级分布 ====== +message GetAssetLevelDistributionRequest { int64 star_id = 1; } +message AssetLevelItem { + string level = 1; + int32 count = 2; + int32 total = 3; +} +message GetAssetLevelDistributionResponse { + repeated AssetLevelItem items = 1; +} + +// ====== 7. 升级进度 ====== +message GetAssetUpgradeProgressRequest { int64 star_id = 1; } +message UpcomingLevelUpItem { + int64 asset_id = 1; + string asset_name = 2; + string asset_thumb = 3; + int32 like_progress = 4; + int32 duration_progress = 5; +} +message RecentLevelUpItem { + int64 asset_id = 1; + string asset_name = 2; + string asset_thumb = 3; + string new_level = 4; + int64 upgrade_time = 5; +} +message GetAssetUpgradeProgressResponse { + repeated UpcomingLevelUpItem upcoming = 1; + repeated RecentLevelUpItem recent = 2; +} + +// ====== 事件采集响应 ====== +message TrackEventResponse { + int32 accepted = 1; + int32 rejected = 2; +} +EOF +``` + +- [ ] **Step 2.3: 编译 proto(生成 Go 代码)** + +```bash +cd backend +# 实际项目用 make / 脚本生成,看 Makefile 或 scripts/ 里 protoc 命令 +ls scripts/ 2>/dev/null +# 或用标准命令: +protoc --go_out=pkg/proto --go_triple_out=pkg/proto \ + -I proto proto/event.proto proto/statistic.proto +``` + +Expected: `pkg/proto/event/` 和 `pkg/proto/statistic/` 下生成 `.pb.go` 和 `.triple.go`。 + +- [ ] **Step 2.4: 验证编译** + +```bash +go build ./pkg/proto/event/ ./pkg/proto/statistic/ +``` + +Expected: 编译成功。 + +- [ ] **Step 2.5: Commit** + +```bash +git add backend/proto/ backend/pkg/proto/event/ backend/pkg/proto/statistic/ +git commit -m "feat(statistic): add event.proto and statistic.proto with 7 RPCs + 2 event RPCs" +``` + +--- + +### Task 3: config + main.go 启动 + healthz + metrics 骨架 + 10 SQL 迁移 + +**Files:** +- Create: `backend/services/statisticService/config/statistic_config.go` +- Modify: `backend/services/statisticService/main.go` +- Create: `backend/services/statisticService/handler/healthz.go` +- Create: `backend/services/statisticService/metrics/metrics.go` +- Create: 10 SQL migration files under `backend/migrations/` + +- [ ] **Step 3.1: 创建 config(沿用 taskService 模式)** + +```go +// backend/services/statisticService/config/statistic_config.go +package config + +import ( + "flag" + "log" + "os" + "strconv" + "time" +) + +type DatabaseConfig struct { + Host, Password, DBName, SSLMode, TimeZone string + Port int + User string + Schema string +} + +type RedisConfig struct { + URL string +} + +type RefreshIntervals struct { + DailyUserIncome time.Duration + DailyExhibitionRevenue time.Duration + DailyLikeIncome time.Duration + AssetLevelDistribution time.Duration + WeeklyUserIncome time.Duration + UpcomingLevelUps time.Duration +} + +type ChannelConfig struct { + EventChannelCapacity int + EventWorkerCount int + EventBatchSize int + EventBatchInterval time.Duration +} + +type PartitionConfig struct { + RetentionDays int + PreCreateDays int +} + +type ExtensionConfig struct { + EnableOLAPDualWrite bool + EnableRealtimeChannel bool + EnableSDKEndpoint bool + EnableSampling bool +} + +var ( + DBConfig = &DatabaseConfig{Schema: "statistic"} + RedisCfg = &RedisConfig{URL: "redis://localhost:6379/0"} + RefreshCfg = &RefreshIntervals{ + DailyUserIncome: 5 * time.Minute, + DailyExhibitionRevenue: 5 * time.Minute, + DailyLikeIncome: 5 * time.Minute, + AssetLevelDistribution: 15 * time.Minute, + WeeklyUserIncome: 5 * time.Minute, + UpcomingLevelUps: 15 * time.Minute, + } + ChannelCfg = &ChannelConfig{ + EventChannelCapacity: 1000, + EventWorkerCount: 1, + EventBatchSize: 100, + EventBatchInterval: 1 * time.Second, + } + PartitionCfg = &PartitionConfig{RetentionDays: 30, PreCreateDays: 7} + ExtCfg = &ExtensionConfig{ // 全部默认 false + EnableOLAPDualWrite: false, + EnableRealtimeChannel: false, + EnableSDKEndpoint: false, + EnableSampling: false, + } +) + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { return v } + return fallback +} + +func getEnvInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { return n } + } + return fallback +} + +func getEnvDuration(key string, fallback time.Duration) time.Duration { + if v := os.Getenv(key); v != "" { + if d, err := time.ParseDuration(v); err == nil { return d } + } + return fallback +} + +func getEnvBool(key string, fallback bool) bool { + if v := os.Getenv(key); v != "" { + if b, err := strconv.ParseBool(v); err == nil { return b } + } + return fallback +} + +func InitConfig() { + flag.StringVar(&DBConfig.Host, "db-host", getEnv("STATISTIC_DB_HOST", "localhost"), "") + flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("STATISTIC_DB_PORT", 5432), "") + flag.StringVar(&DBConfig.User, "db-user", getEnv("STATISTIC_DB_USER", "postgres"), "") + flag.StringVar(&DBConfig.Password, "db-password", getEnv("STATISTIC_DB_PASSWORD", ""), "") + flag.StringVar(&DBConfig.DBName, "db-name", getEnv("STATISTIC_DB_NAME", "topfans"), "") + flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "") + flag.StringVar(&DBConfig.Schema, "db-schema", getEnv("STATISTIC_DB_SCHEMA", "statistic"), "") + + flag.StringVar(&RedisCfg.URL, "redis-url", getEnv("STATISTIC_REDIS_URL", "redis://localhost:6379/0"), "") + + flag.IntVar(&ChannelCfg.EventChannelCapacity, "event-channel-capacity", getEnvInt("STATISTIC_EVENT_CHANNEL_CAPACITY", 1000), "") + flag.IntVar(&ChannelCfg.EventBatchSize, "event-batch-size", getEnvInt("STATISTIC_EVENT_BATCH_SIZE", 100), "") + flag.DurationVar(&ChannelCfg.EventBatchInterval, "event-batch-interval", getEnvDuration("STATISTIC_EVENT_BATCH_INTERVAL", time.Second), "") + + flag.IntVar(&PartitionCfg.RetentionDays, "partition-retention-days", getEnvInt("STATISTIC_PARTITION_RETENTION_DAYS", 30), "") + flag.IntVar(&PartitionCfg.PreCreateDays, "partition-precreate-days", getEnvInt("STATISTIC_PARTITION_PRECREATE_DAYS", 7), "") + + flag.BoolVar(&ExtCfg.EnableOLAPDualWrite, "enable-olap", getEnvBool("STATISTIC_ENABLE_OLAP_DUAL_WRITE", false), "") + flag.BoolVar(&ExtCfg.EnableRealtimeChannel, "enable-realtime", getEnvBool("STATISTIC_ENABLE_REALTIME_CHANNEL", false), "") + flag.BoolVar(&ExtCfg.EnableSDKEndpoint, "enable-sdk", getEnvBool("STATISTIC_ENABLE_SDK_ENDPOINT", false), "") + flag.BoolVar(&ExtCfg.EnableSampling, "enable-sampling", getEnvBool("STATISTIC_ENABLE_SAMPLING", false), "") + + flag.Parse() + log.Println("statisticService 配置初始化完成") +} +``` + +- [ ] **Step 3.2: 创建 metrics 骨架(声明所有指标,数据后续埋)** + +```go +// backend/services/statisticService/metrics/metrics.go +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// 看板 RPC +var DashboardRPCTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "dashboard_rpc_total", Help: "Dashboard RPC total", +}, []string{"rpc", "status"}) + +var DashboardRPCDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "dashboard_rpc_duration_seconds", Help: "Dashboard RPC duration", +}, []string{"rpc"}) + +var DashboardCacheHitRate = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dashboard_cache_hit_rate", Help: "Dashboard cache hit rate", +}) + +// 事件采集 +var EventTrackTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "event_track_total", Help: "Event track total", +}, []string{"event_type", "result"}) + +var EventChannelSize = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "event_channel_size", Help: "Event channel current size", +}) + +var EventChannelCapacity = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "event_channel_capacity", Help: "Event channel capacity", +}) + +var EventDroppedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "event_dropped_total", Help: "Event dropped total", +}) + +var EventDBInsertTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "event_db_insert_total", Help: "Event DB insert total", +}, []string{"status"}) + +// 物化视图 +var MVRefreshTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mv_refresh_total", Help: "Materialized view refresh total", +}, []string{"mv_name", "status"}) + +var MVRefreshDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "mv_refresh_duration_seconds", Help: "MV refresh duration", +}, []string{"mv_name"}) + +// Worker +var WorkerRunningCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "worker_running_count", Help: "Worker running count", +}, []string{"worker_name"}) + +// 分区 +var EventsPartitionCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "events_partition_count", Help: "Events partition count", +}) +``` + +- [ ] **Step 3.3: 创建 healthz** + +```go +// backend/services/statisticService/handler/healthz.go +package handler + +import ( + "context" + "database/sql" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/redis/go-redis/v9" + "github.com/topfans/backend/services/statisticService/metrics" +) + +type Healthz struct { + db *sql.DB + redis *redis.Client +} + +func NewHealthz(db *sql.DB, redis *redis.Client) *Healthz { + return &Healthz{db: db, redis: redis} +} + +func (h *Healthz) Register(r *gin.Engine, port int) { + r.GET("/metrics", gin.WrapH(metricsHandler())) // 暴露 Prometheus + r.GET("/healthz", func(c *gin.Context) { + ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) + defer cancel() + + status := gin.H{"status": "ok"} + if err := h.db.PingContext(ctx); err != nil { + status["db"] = "down" + } else { + status["db"] = "up" + } + if err := h.redis.Ping(ctx).Err(); err != nil { + status["redis"] = "down" + } else { + status["redis"] = "up" + } + c.JSON(http.StatusOK, status) + }) +} +``` + +> **实际项目**用 `pkg/health` 组件,参考 taskService main.go:131-133。`health.NewHandler("statistic-service", healthPort).Start()`。本计划用自实现版,最终应替换为项目既有组件。 + +- [ ] **Step 3.4: 创建 main.go(启动骨架,不含业务)** + +```go +// backend/services/statisticService/main.go +package main + +import ( + "database/sql" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + _ "github.com/lib/pq" + "github.com/redis/go-redis/v9" + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/config" + "github.com/topfans/backend/services/statisticService/handler" +) + +var port = flag.Int("port", 20009, "Dubbo service port") + +func main() { + env := os.Getenv("ENV") + if env == "" { env = "development" } + if err := logger.Init(logger.Config{ServiceName: "statistic-service", Environment: env}); err != nil { + panic(fmt.Sprintf("Failed to init logger: %v", err)) + } + defer logger.Sync() + + config.InitConfig() + + // DB + db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", + config.DBConfig.Host, config.DBConfig.Port, config.DBConfig.User, config.DBConfig.Password, + config.DBConfig.DBName, config.DBConfig.SSLMode)) + if err != nil { logger.Logger.Fatal(fmt.Sprintf("Failed to open DB: %v", err)) } + db.SetMaxOpenConns(50) + if err := db.Ping(); err != nil { logger.Logger.Fatal(fmt.Sprintf("DB ping: %v", err)) } + defer db.Close() + + // Redis + opt, _ := redis.ParseURL(config.RedisCfg.URL) + rdb := redis.NewClient(opt) + defer rdb.Close() + + // Healthz HTTP server on port+1000=21009 + healthPort := *port + 1000 + h := handler.NewHealthz(db, rdb) + go h.Start(healthPort) + + logger.Logger.Info(fmt.Sprintf("statisticService started, dubbo=%d healthz=%d", *port, healthPort)) + + // Graceful shutdown + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit +} +``` + +并在 healthz.go 加 `Start` 方法: + +```go +// 加到 backend/services/statisticService/handler/healthz.go +func (h *Healthz) Start(port int) { + r := gin.Default() + h.Register(r, port) + go func() { + if err := r.Run(fmt.Sprintf(":%d", port)); err != nil { + logger.Logger.Fatal(...) + } + }() +} +``` + +- [ ] **Step 3.5: 创建 10 个 SQL 迁移文件(参考 spec §3.2-3.5)** + +每个文件包含一个 DDL + 必要的索引。**关键 SQL 摘要:** + +`backend/migrations/2026_06_08_001_statistic_events.sql`: + +```sql +CREATE SCHEMA IF NOT EXISTS statistic; + +CREATE TABLE IF NOT EXISTS statistic.events ( + id BIGSERIAL, + event_id UUID NOT NULL, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + event_type VARCHAR(64) NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + properties JSONB NOT NULL DEFAULT '{}', + PRIMARY KEY (id, received_at) +) PARTITION BY RANGE (received_at); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_events_event_id ON statistic.events (event_id, received_at); +CREATE INDEX IF NOT EXISTS idx_events_user_star_type_time + ON statistic.events (user_id, star_id, event_type, received_at DESC); +CREATE INDEX IF NOT EXISTS idx_events_star_type_time + ON statistic.events (star_id, event_type, received_at DESC); +CREATE INDEX IF NOT EXISTS idx_events_properties_gin ON statistic.events USING GIN (properties); + +-- 初始 7 天分区(从今天起) +DO $$ +DECLARE + i INT; + d DATE; + n DATE; +BEGIN + FOR i IN 0..6 LOOP + d := CURRENT_DATE + i; + n := d + 1; + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS statistic.events_%s PARTITION OF statistic.events FOR VALUES FROM (%L) TO (%L)', + to_char(d, 'YYYY_MM_DD'), d::text, n::text + ); + END LOOP; +END $$; +``` + +`2026_06_08_002_statistic_mv_daily_user_income.sql` - 见 spec §3.4 MV1 SQL + +`2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql` - 见 spec §3.4 MV2 SQL + +`2026_06_08_004_statistic_mv_daily_like_income.sql` - 见 spec §3.4 MV3 SQL + +`2026_06_08_005_statistic_mv_asset_level_distribution.sql` - 见 spec §3.4 MV4 SQL + +`2026_06_08_006_statistic_metric_weekly_user_income.sql` - 见 spec §3.5 weekly 表 + +`2026_06_08_007_statistic_metric_recent_level_ups.sql` - 见 spec §3.5 recent_level_ups 表 + +`2026_06_08_008_statistic_metric_upcoming_level_ups.sql` - 见 spec §3.5 upcoming_level_ups 表 + +`2026_06_08_009_statistic_refresh_log.sql`: + +```sql +CREATE TABLE IF NOT EXISTS statistic.refresh_log ( + id BIGSERIAL PRIMARY KEY, + mv_name VARCHAR(128) NOT NULL, + started_at TIMESTAMPTZ NOT NULL, + finished_at TIMESTAMPTZ, + row_count BIGINT, + status VARCHAR(16) NOT NULL, + error_message TEXT +); +CREATE INDEX IF NOT EXISTS idx_refresh_log_mv_time + ON statistic.refresh_log (mv_name, started_at DESC); +``` + +- [ ] **Step 3.6: 跑 DB 迁移** + +```bash +cd backend +# 用项目现有的迁移工具(看 scripts/) +ls scripts/ 2>/dev/null +# 或手动 psql: +psql -h localhost -U postgres -d topfans -f migrations/2026_06_08_001_statistic_events.sql +# ... 10 个文件依次执行 +``` + +Expected: 10 个 SQL 全部成功,\dt statistic.* 看到 10+ 表。 + +- [ ] **Step 3.7: 编译 + 运行** + +```bash +go build ./services/statisticService +./services/statisticService/bin/statisticService +``` + +Expected: 启动日志显示 port 20009 + healthz 21009。 + +- [ ] **Step 3.8: 验证 healthz** + +```bash +curl http://localhost:21009/healthz +``` + +Expected: `{"db":"up","redis":"up","status":"ok"}` + +- [ ] **Step 3.9: 跑 P1 末预检查清单** + +> 必做!spec §0.1.3 P1 末预检查清单 10 项。 + +```bash +# 1. socialService +grep -r "like_income_log" backend/services/socialService/ +# 2. galleryService +grep -rn "^func.*PlaceAsset\|^func.*RemoveFromSlot" backend/services/galleryService/service/ +# 3. taskService +grep -n "OnExhibitionCompleted" backend/services/taskService/service/revenue_service.go +# 4. assetService +grep -n "CreateMintOrder\|CheckUpgrade\|logLevelChange" backend/services/assetService/service/{mint_service,asset_level_service}.go +# 5. userService +grep -n "UpdateCrystalBalance" backend/services/userService/service/user_service.go +# 6. public.assets 表结构 +psql -c "\d+ public.assets" +# 7. 公共日志表 +psql -c "\dt public.*log*" +# 8. 端口 +lsof -i :20009 +# 9. 业务服务可达 +nc -zv localhost 20000; nc -zv localhost 20001; nc -zv localhost 20002; nc -zv localhost 20003; nc -zv localhost 20006 +# 10. schema 权限 +psql -U postgres -c "CREATE SCHEMA statistic_test_perm; DROP SCHEMA statistic_test_perm;" +``` + +Expected: 全部通过。 + +- [ ] **Step 3.10: Commit** + +```bash +git add backend/services/statisticService/ backend/migrations/ +git commit -m "feat(statistic): P1 scaffolding - config, main, healthz, metrics, 10 SQL migrations" +``` + +--- + +## P2 事件采集 + +### Task 4: Event 模型 + EventSink 接口 + ChannelEventSink + +**Files:** +- Create: `backend/services/statisticService/model/event.go` +- Create: `backend/services/statisticService/sink/event_sink.go` +- Create: `backend/services/statisticService/sink/channel_sink.go` +- Create: `backend/services/statisticService/sink/channel_sink_test.go` + +- [ ] **Step 4.1: 写 Event 模型(写测试先)** + +`backend/services/statisticService/model/event_test.go`: + +```go +package model + +import ( + "testing" + "time" +) + +func TestEvent_ToJSON(t *testing.T) { + e := &Event{ + EventID: "uuid-123", + UserID: 100, + StarID: 1, + EventType: "asset.like", + OccurredAt: time.Unix(1700000000, 0), + ReceivedAt: time.Unix(1700000001, 0), + Properties: map[string]string{"asset_id": "456"}, + } + j := e.ToJSON() + if j["event_id"] != "uuid-123" { t.Fatal("event_id mismatch") } + if j["properties"].(map[string]interface{})["asset_id"] != "456" { t.Fatal("properties mismatch") } +} +``` + +- [ ] **Step 4.2: 跑测试(应失败)** + +```bash +cd backend/services/statisticService +go test ./model/ -v +``` + +Expected: FAIL - "undefined: Event" + +- [ ] **Step 4.3: 实现 Event** + +`backend/services/statisticService/model/event.go`: + +```go +package model + +import "time" + +type Event struct { + EventID string `json:"event_id"` + UserID int64 `json:"user_id"` + StarID int64 `json:"star_id"` + EventType string `json:"event_type"` + OccurredAt time.Time `json:"occurred_at"` + ReceivedAt time.Time `json:"received_at"` + Properties map[string]string `json:"properties"` +} + +func (e *Event) ToJSON() map[string]interface{} { + return map[string]interface{}{ + "event_id": e.EventID, + "user_id": e.UserID, + "star_id": e.StarID, + "event_type": e.EventType, + "occurred_at": e.OccurredAt.UnixMilli(), + "received_at": e.ReceivedAt.UnixMilli(), + "properties": e.Properties, + } +} +``` + +- [ ] **Step 4.4: 跑测试(应通过)** + +```bash +go test ./model/ -v +``` + +Expected: PASS + +- [ ] **Step 4.5: 写 EventSink 接口 + ChannelEventSink 测试** + +`backend/services/statisticService/sink/channel_sink_test.go`: + +```go +package sink + +import ( + "context" + "testing" + "time" +) + +func TestChannelEventSink_Submit(t *testing.T) { + ch := make(chan *model.Event, 10) + s := NewChannelEventSink(ch) + e := &model.Event{EventID: "test-1"} + if err := s.Submit(context.Background(), e); err != nil { + t.Fatalf("Submit failed: %v", err) + } + select { + case got := <-ch: + if got.EventID != "test-1" { t.Fatal("event mismatch") } + case <-time.After(100 * time.Millisecond): + t.Fatal("no event received") + } +} + +func TestChannelEventSink_SubmitBatch(t *testing.T) { + ch := make(chan *model.Event, 10) + s := NewChannelEventSink(ch) + events := []*model.Event{{EventID: "a"}, {EventID: "b"}} + if err := s.SubmitBatch(context.Background(), events); err != nil { + t.Fatalf("SubmitBatch failed: %v", err) + } + if len(ch) != 2 { t.Fatalf("expected 2 events, got %d", len(ch)) } +} +``` + +- [ ] **Step 4.6: 跑测试(应失败)** + +```bash +go test ./sink/ -v +``` + +Expected: FAIL + +- [ ] **Step 4.7: 实现接口和 ChannelEventSink** + +`backend/services/statisticService/sink/event_sink.go`: + +```go +package sink + +import ( + "context" + "github.com/topfans/backend/services/statisticService/model" +) + +type EventSink interface { + Submit(ctx context.Context, e *model.Event) error + SubmitBatch(ctx context.Context, es []*model.Event) error + Close() error +} +``` + +`backend/services/statisticService/sink/channel_sink.go`: + +```go +package sink + +import ( + "context" + "github.com/topfans/backend/services/statisticService/model" +) + +type ChannelEventSink struct { + ch chan<- *model.Event +} + +func NewChannelEventSink(ch chan<- *model.Event) *ChannelEventSink { + return &ChannelEventSink{ch: ch} +} + +func (s *ChannelEventSink) Submit(ctx context.Context, e *model.Event) error { + select { + case s.ch <- e: + return nil + default: + return ErrChannelFull + } +} + +func (s *ChannelEventSink) SubmitBatch(ctx context.Context, es []*model.Event) error { + for _, e := range es { + if err := s.Submit(ctx, e); err != nil { + return err + } + } + return nil +} + +func (s *ChannelEventSink) Close() error { return nil } + +var ErrChannelFull = errors.New("event channel full") +``` + +补 `import "errors"`。 + +- [ ] **Step 4.8: 跑测试(应通过)** + +```bash +go test ./sink/ -v +``` + +Expected: PASS + +- [ ] **Step 4.9: Commit** + +```bash +git add backend/services/statisticService/model/ backend/services/statisticService/sink/ +git commit -m "feat(statistic): Event model, EventSink interface, ChannelEventSink (P2 T4)" +``` + +--- + +### Task 5: event_repo + event_service(事件校验 + 落库) + +**Files:** +- Create: `backend/services/statisticService/repository/event_repo.go` +- Create: `backend/services/statisticService/service/event_service.go` +- Tests + +- [ ] **Step 5.1: 写 event_repo 批量插入测试** + +`backend/services/statisticService/repository/event_repo_test.go`: + +```go +package repository + +import ( + "context" + "os" + "testing" + "time" +) + +func setupTestDB(t *testing.T) (*sql.DB, func()) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { + t.Skip("TEST_DATABASE_URL not set, skipping integration test") + } + db, err := sql.Open("postgres", dsn) + if err != nil { t.Fatal(err) } + // 用 schema=test, 每次 truncate events + if _, err := db.Exec("CREATE SCHEMA IF NOT EXISTS statistic_test"); err != nil { t.Fatal(err) } + return db, func() { + db.Exec("TRUNCATE TABLE statistic_test.events") + db.Close() + } +} + +func TestEventRepo_InsertBatch(t *testing.T) { + db, cleanup := setupTestDB(t) + defer cleanup() + repo := NewEventRepository(db, "statistic_test") + events := []*model.Event{ + {EventID: "uuid-1", UserID: 100, StarID: 1, EventType: "asset.like", + OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{"asset_id": "456"}}, + } + inserted, err := repo.InsertBatch(context.Background(), events) + if err != nil { t.Fatal(err) } + if inserted != 1 { t.Fatalf("expected 1, got %d", inserted) } +} + +func TestEventRepo_Dedup(t *testing.T) { + db, cleanup := setupTestDB(t) + defer cleanup() + repo := NewEventRepository(db, "statistic_test") + e := &model.Event{EventID: "uuid-dup", UserID: 100, StarID: 1, EventType: "asset.like", + OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{}} + repo.InsertBatch(context.Background(), []*model.Event{e}) + inserted, _ := repo.InsertBatch(context.Background(), []*model.Event{e}) // 重复 + if inserted != 0 { t.Fatalf("expected 0 (dedup), got %d", inserted) } +} +``` + +- [ ] **Step 5.2: 跑测试(应失败)** + +```bash +TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5432/topfans_test?sslmode=disable" \ + go test ./repository/ -v -run TestEventRepo +``` + +Expected: FAIL + +- [ ] **Step 5.3: 实现 event_repo** + +`backend/services/statisticService/repository/event_repo.go`: + +```go +package repository + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + + "github.com/topfans/backend/services/statisticService/model" +) + +type EventRepository struct { + db *sql.DB + schema string +} + +func NewEventRepository(db *sql.DB, schema string) *EventRepository { + return &EventRepository{db: db, schema: schema} +} + +func (r *EventRepository) InsertBatch(ctx context.Context, events []*model.Event) (int, error) { + if len(events) == 0 { return 0, nil } + + placeholders := make([]string, 0, len(events)) + args := make([]interface{}, 0, len(events)*7) + for _, e := range events { + props, _ := json.Marshal(e.Properties) + placeholders = append(placeholders, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)", + len(args)+1, len(args)+2, len(args)+3, len(args)+4, len(args)+5, len(args)+6, len(args)+7)) + args = append(args, e.EventID, e.UserID, e.StarID, e.EventType, e.OccurredAt, e.ReceivedAt, string(props)) + } + + query := fmt.Sprintf(` + INSERT INTO %s.events (event_id, user_id, star_id, event_type, occurred_at, received_at, properties) + VALUES %s + ON CONFLICT (event_id, received_at) DO NOTHING + `, r.schema, strings.Join(placeholders, ",")) + + res, err := r.db.ExecContext(ctx, query, args...) + if err != nil { return 0, err } + n, _ := res.RowsAffected() + return int(n), nil +} +``` + +- [ ] **Step 5.4: 跑测试(应通过)** + +```bash +TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5432/topfans_test?sslmode=disable" \ + go test ./repository/ -v -run TestEventRepo +``` + +Expected: PASS + +- [ ] **Step 5.5: 写 event_service 校验测试** + +`backend/services/statisticService/service/event_service_test.go`: + +```go +package service + +import ( + "context" + "testing" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/sink" +) + +type mockSink struct{ events []*model.Event } +func (m *mockSink) Submit(ctx context.Context, e *model.Event) error { + m.events = append(m.events, e); return nil +} +func (m *mockSink) SubmitBatch(ctx context.Context, es []*model.Event) error { + m.events = append(m.events, es...); return nil +} +func (m *mockSink) Close() error { return nil } + +func TestEventService_TrackEvent_Success(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like", "asset.mint"}) + resp, err := svc.TrackEvent(context.Background(), &model.Event{ + EventID: "u1", UserID: 1, StarID: 1, EventType: "asset.like", + OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{}, + }) + if err != nil { t.Fatal(err) } + if resp.Accepted != 1 { t.Fatal("expected accepted=1") } + if len(ms.events) != 1 { t.Fatal("event not submitted") } +} + +func TestEventService_TrackEvent_InvalidEventType(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like"}) + _, err := svc.TrackEvent(context.Background(), &model.Event{EventType: "evil.type"}) + if err == nil { t.Fatal("expected error for invalid event type") } +} + +func TestEventService_TrackEvent_PropertiesTooLarge(t *testing.T) { + ms := &mockSink{} + svc := NewEventService(ms, []string{"asset.like"}) + big := make(map[string]string) + for i := 0; i < 2000; i++ { big[fmt.Sprintf("k%d", i)] = strings.Repeat("x", 100) } + _, err := svc.TrackEvent(context.Background(), &model.Event{ + EventType: "asset.like", Properties: big, + }) + if err == nil { t.Fatal("expected error for large properties") } +} +``` + +- [ ] **Step 5.6: 跑测试(应失败)** + +```bash +go test ./service/ -v +``` + +Expected: FAIL + +- [ ] **Step 5.7: 实现 event_service** + +`backend/services/statisticService/service/event_service.go`: + +```go +package service + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/sink" + pb "github.com/topfans/backend/pkg/proto/statistic" +) + +var ( + ErrInvalidEventType = fmt.Errorf("invalid event type") + ErrPropertiesTooLarge = fmt.Errorf("properties too large") + ErrInvalidEventID = fmt.Errorf("invalid event_id") +) + +const MaxPropertiesSize = 1024 // 1KB + +type EventService struct { + sink sink.EventSink + whiteList map[string]bool + statsEnable bool +} + +func NewEventService(s sink.EventSink, whiteList []string) *EventService { + wl := make(map[string]bool, len(whiteList)) + for _, t := range whiteList { wl[t] = true } + return &EventService{sink: s, whiteList: wl} +} + +func (s *EventService) validate(e *model.Event) error { + if e.EventID == "" { return ErrInvalidEventID } + if !s.whiteList[e.EventType] { return ErrInvalidEventType } + b, _ := json.Marshal(e.Properties) + if len(b) > MaxPropertiesSize { return ErrPropertiesTooLarge } + return nil +} + +func (s *EventService) TrackEvent(ctx context.Context, e *model.Event) (*pb.TrackEventResponse, error) { + if e.ReceivedAt.IsZero() { e.ReceivedAt = time.Now() } + if err := s.validate(e); err != nil { return nil, err } + if err := s.sink.Submit(ctx, e); err != nil { + return &pb.TrackEventResponse{Accepted: 0, Rejected: 1}, nil + } + return &pb.TrackEventResponse{Accepted: 1, Rejected: 0}, nil +} + +func (s *EventService) BatchTrackEvent(ctx context.Context, es []*model.Event) (*pb.TrackEventResponse, error) { + accepted, rejected := 0, 0 + for _, e := range es { + if e.ReceivedAt.IsZero() { e.ReceivedAt = time.Now() } + if err := s.validate(e); err != nil { rejected++; continue } + if err := s.sink.Submit(ctx, e); err != nil { rejected++ } else { accepted++ } + } + return &pb.TrackEventResponse{Accepted: int32(accepted), Rejected: int32(rejected)}, nil +} +``` + +- [ ] **Step 5.8: 跑测试(应通过)** + +```bash +go test ./service/ -v +``` + +Expected: PASS + +- [ ] **Step 5.9: Commit** + +```bash +git add backend/services/statisticService/repository/ backend/services/statisticService/service/ +git commit -m "feat(statistic): event_repo (batch insert + dedup) and event_service (validate + submit) (P2 T5)" +``` + +--- + +### Task 6: event_flusher Worker(攒批落库 + 触发 metric_recent_level_ups) + +**Files:** +- Create: `backend/services/statisticService/worker/event_flusher.go` +- Create: `backend/services/statisticService/worker/event_flusher_test.go` +- Create: `backend/services/statisticService/repository/metric_repo.go` + +- [ ] **Step 6.1: 写 metric_repo(同步更新 metric_recent_level_ups)** + +`backend/services/statisticService/repository/metric_repo.go`: + +```go +package repository + +import ( + "context" + "database/sql" + "github.com/topfans/backend/services/statisticService/model" +) + +type MetricRepository struct { + db *sql.DB + schema string +} + +func NewMetricRepository(db *sql.DB, schema string) *MetricRepository { + return &MetricRepository{db: db, schema: schema} +} + +func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error { + if e.EventType != "asset.level_up" { return nil } + assetID := e.Properties["asset_id"] + fromLevel := e.Properties["from"] + toLevel := e.Properties["to"] + upgradeTime := e.OccurredAt + if assetID == "" || toLevel == "" { return nil } + _, err := r.db.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO %s.metric_recent_level_ups + (user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT DO NOTHING + `, r.schema), + e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "") + return err +} +``` + +> 完整 metric_repo 还含 weekly_user_income / upcoming_level_ups 读写,留到 T7/T8 任务。 + +- [ ] **Step 6.2: 写 event_flusher 测试** + +`backend/services/statisticService/worker/event_flusher_test.go`: + +```go +package worker + +import ( + "context" + "database/sql" + "os" + "testing" + "time" + + _ "github.com/lib/pq" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/repository" +) + +func TestEventFlusher_FlushBatch(t *testing.T) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { t.Skip("skip") } + db, _ := sql.Open("postgres", dsn) + defer db.Close() + + eventRepo := repository.NewEventRepository(db, "statistic_test") + metricRepo := repository.NewMetricRepository(db, "statistic_test") + + ch := make(chan *model.Event, 10) + flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second) + + go flusher.Start(context.Background()) + + for i := 0; i < 5; i++ { + ch <- &model.Event{EventID: fmt.Sprintf("u-%d", i), UserID: 1, StarID: 1, EventType: "asset.like", + OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{}} + } + time.Sleep(2 * time.Second) // 等攒批 + flusher.Stop() + // 验证 events 表有 5 条 + var n int + db.QueryRow("SELECT COUNT(*) FROM statistic_test.events").Scan(&n) + if n < 5 { t.Fatalf("expected >=5 events, got %d", n) } +} +``` + +- [ ] **Step 6.3: 跑测试(应失败)** + +```bash +TEST_DATABASE_URL="..." go test ./worker/ -v +``` + +Expected: FAIL + +- [ ] **Step 6.4: 实现 event_flusher** + +`backend/services/statisticService/worker/event_flusher.go`: + +```go +package worker + +import ( + "context" + "sync" + "time" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/repository" + "go.uber.org/zap" +) + +type EventFlusher struct { + ch <-chan *model.Event + eventRepo *repository.EventRepository + metricRepo *repository.MetricRepository + batchSize int + interval time.Duration + + mu sync.Mutex + running bool + stop chan struct{} +} + +func NewEventFlusher(ch <-chan *model.Event, eventRepo *repository.EventRepository, metricRepo *repository.MetricRepository, batchSize int, interval time.Duration) *EventFlusher { + return &EventFlusher{ch: ch, eventRepo: eventRepo, metricRepo: metricRepo, batchSize: batchSize, interval: interval, stop: make(chan struct{})} +} + +func (f *EventFlusher) Start(ctx context.Context) { + f.mu.Lock() + f.running = true + f.mu.Unlock() + metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(0) + + batch := make([]*model.Event, 0, f.batchSize) + ticker := time.NewTicker(f.interval) + defer ticker.Stop() + + flush := func() { + if len(batch) == 0 { return } + inserted, err := f.eventRepo.InsertBatch(ctx, batch) + if err != nil { + logger.Logger.Error("event_flusher insert failed", zap.Error(err)) + metrics.EventDBInsertTotal.WithLabelValues("failed").Inc() + } else { + metrics.EventDBInsertTotal.WithLabelValues("success").Inc() + } + // 同步触发 metric_recent_level_ups + for _, e := range batch { + if err := f.metricRepo.UpsertRecentLevelUp(ctx, e); err != nil { + logger.Logger.Warn("UpsertRecentLevelUp failed", zap.Error(err)) + } + } + logger.Logger.Debug("event_flusher batch flushed", zap.Int("inserted", inserted), zap.Int("batch", len(batch))) + batch = batch[:0] + } + + for { + select { + case <-f.stop: + flush() + return + case e := <-f.ch: + batch = append(batch, e) + metrics.EventChannelSize.Set(float64(len(f.ch))) + if len(batch) >= f.batchSize { flush() } + case <-ticker.C: + flush() + } + } +} + +func (f *EventFlusher) Stop() { + f.mu.Lock() + defer f.mu.Unlock() + if f.running { + close(f.stop) + f.running = false + } +} +``` + +- [ ] **Step 6.5: 跑测试(应通过)** + +```bash +TEST_DATABASE_URL="..." go test ./worker/ -v +``` + +Expected: PASS + +- [ ] **Step 6.6: Commit** + +```bash +git add backend/services/statisticService/repository/ backend/services/statisticService/worker/ +git commit -m "feat(statistic): event_flusher worker (batch insert + sync metric_recent_level_ups) (P2 T6)" +``` + +--- + +### Task 7: metric_weekly_user_income_updater + metric_upcoming_level_ups_updater + +**Files:** +- Modify: `backend/services/statisticService/repository/metric_repo.go`(加 weekly + upcoming 方法) +- Create: `backend/services/statisticService/worker/metric_weekly_user_income_updater.go` +- Create: `backend/services/statisticService/worker/metric_upcoming_level_ups_updater.go` + +- [ ] **Step 7.1: 扩 metric_repo 加 weekly + upcoming 方法** + +```go +// 加到 metric_repo.go + +// RefreshWeeklyUserIncome: 全量重算本周 rank + total +func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error { + // 用 pg_try_advisory_lock 防多实例重复 + var got bool + if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil { + return err + } + if !got { return nil } // 抢不到锁本轮跳过 + defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)") + + // 计算本周一 (Asia/Shanghai) + _, err := r.db.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star) + SELECT + star_id, user_id, + DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start, + SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0 + THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal, + ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star + FROM %s.events + WHERE event_type IN ('exhibition.revenue', 'crystal.change') + AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai') + GROUP BY star_id, user_id + ON CONFLICT (star_id, user_id, week_start) DO UPDATE + SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW() + `, r.schema, r.schema)) + return err +} + +// RefreshUpcomingLevelUps: 计算每个 asset 的 like_progress + duration_progress +func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error { + // 全量从 public.assets 读 + 计算进度 + _, err := r.db.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress) + SELECT + a.user_id, a.star_id, a.id, + LEAST(100, (a.like_count::FLOAT / NULLIF(alc.upgrade_like_threshold, 0) * 100)::INT) AS like_progress, + LEAST(100, (EXTRACT(EPOCH FROM (NOW() - a.placed_at))::FLOAT / NULLIF(alc.upgrade_duration_seconds, 1) * 100)::INT) AS duration_progress + FROM public.assets a + JOIN public.asset_level_config alc ON alc.level = a.level + WHERE a.status = 'active' AND a.deleted_at IS NULL + ON CONFLICT (user_id, star_id, asset_id) DO UPDATE + SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW() + `, r.schema)) + return err +} +``` + +> **实现提示:** 实际项目里 `public.asset_level_config` 表名/字段名可能不同。P1 末向 assetService 同学确认后调整 SQL。 + +- [ ] **Step 7.2: 创建两个 Worker(结构同)** + +`worker/metric_weekly_user_income_updater.go`: + +```go +package worker + +import ( + "context" + "time" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + "github.com/topfans/backend/services/statisticService/repository" + "go.uber.org/zap" +) + +type WeeklyUserIncomeUpdater struct { + repo *repository.MetricRepository + interval time.Duration + stop chan struct{} +} + +func NewWeeklyUserIncomeUpdater(repo *repository.MetricRepository, interval time.Duration) *WeeklyUserIncomeUpdater { + return &WeeklyUserIncomeUpdater{repo: repo, interval: interval, stop: make(chan struct{})} +} + +func (w *WeeklyUserIncomeUpdater) Start(ctx context.Context) { + metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(0) + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + // 启动时跑一次 + w.runOnce(ctx) + for { + select { + case <-w.stop: + return + case <-ticker.C: + w.runOnce(ctx) + } + } +} + +func (w *WeeklyUserIncomeUpdater) runOnce(ctx context.Context) { + t0 := time.Now() + if err := w.repo.RefreshWeeklyUserIncome(ctx); err != nil { + logger.Logger.Error("RefreshWeeklyUserIncome failed", zap.Error(err)) + metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "failed").Inc() + } else { + metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "success").Inc() + } + metrics.MVRefreshDuration.WithLabelValues("weekly_user_income").Observe(time.Since(t0).Seconds()) +} + +func (w *WeeklyUserIncomeUpdater) Stop() { close(w.stop) } +``` + +`worker/metric_upcoming_level_ups_updater.go`:结构同上,调 `repo.RefreshUpcomingLevelUps`,interval 默认 15min。 + +- [ ] **Step 7.3: Commit** + +```bash +git add backend/services/statisticService/repository/ backend/services/statisticService/worker/ +git commit -m "feat(statistic): metric_weekly + metric_upcoming level up workers (P2 T7)" +``` + +--- + +### Task 8: partitioner.go(events 分区自动管理) + +**Files:** +- Create: `backend/services/statisticService/worker/partitioner.go` +- Create: `backend/services/statisticService/worker/partitioner_test.go` + +- [ ] **Step 8.1: 写测试(集成)** + +```go +// worker/partitioner_test.go +package worker + +import ( + "context" + "database/sql" + "os" + "testing" + "time" + + _ "github.com/lib/pq" +) + +func TestPartitioner_EnsureFuture(t *testing.T) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { t.Skip("skip") } + db, _ := sql.Open("postgres", dsn) + defer db.Close() + + // 创建测试 schema + db.Exec("CREATE SCHEMA IF NOT EXISTS statistic_test") + db.Exec(`CREATE TABLE IF NOT EXISTS statistic_test.events ( + id BIGSERIAL, event_id UUID NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (id, received_at) + ) PARTITION BY RANGE (received_at)`) + + p := NewPartitioner(db, "statistic_test", 7, 30) + if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil { + t.Fatal(err) + } + // 验证未来 3 天分区存在 + var n int + db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname='statistic_test' AND tablename LIKE 'events_%'").Scan(&n) + if n < 3 { t.Fatalf("expected >=3 partitions, got %d", n) } +} +``` + +- [ ] **Step 8.2: 跑测试(应失败)** + +```bash +TEST_DATABASE_URL="..." go test ./worker/ -v -run TestPartitioner +``` + +Expected: FAIL + +- [ ] **Step 8.3: 实现 partitioner** + +```go +// worker/partitioner.go +package worker + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + "go.uber.org/zap" +) + +type Partitioner struct { + db *sql.DB + schema string + preCreateDays int + retentionDays int + stop chan struct{} +} + +func NewPartitioner(db *sql.DB, schema string, preCreateDays, retentionDays int) *Partitioner { + return &Partitioner{db: db, schema: schema, preCreateDays: preCreateDays, retentionDays: retentionDays, stop: make(chan struct{})} +} + +func (p *Partitioner) EnsureFuturePartitions(ctx context.Context, days int) error { + now := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)) + for i := 0; i <= days; i++ { + d := now.AddDate(0, 0, i) + next := d.AddDate(0, 0, 1) + name := fmt.Sprintf("events_%s", d.Format("2006_01_02")) + sql := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s PARTITION OF %s.events + FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08') + `, p.schema, name, p.schema, d.Format("2006-01-02"), next.Format("2006-01-02")) + if _, err := p.db.ExecContext(ctx, sql); err != nil { + return fmt.Errorf("create partition %s: %w", name, err) + } + metrics.EventsPartitionCount.Inc() + } + return nil +} + +func (p *Partitioner) CleanupOldPartitions(ctx context.Context) error { + cutoff := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)).AddDate(0, 0, -p.retentionDays) + rows, err := p.db.QueryContext(ctx, fmt.Sprintf(` + SELECT tablename FROM pg_tables + WHERE schemaname = $1 AND tablename LIKE 'events_%' AND tablename < $2 + `, p.schema, fmt.Sprintf("events_%s", cutoff.Format("2006_01_02"))) + if err != nil { return err } + defer rows.Close() + for rows.Next() { + var name string + rows.Scan(&name) + if _, err := p.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", p.schema, name)); err != nil { + logger.Logger.Warn("drop partition failed", zap.String("name", name), zap.Error(err)) + } else { + logger.Logger.Info("dropped old partition", zap.String("name", name)) + } + } + return nil +} + +func (p *Partitioner) Start(ctx context.Context) { + // 启动时确保未来 7 天 + p.EnsureFuturePartitions(ctx, p.preCreateDays) + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + for { + select { + case <-p.stop: + return + case <-ticker.C: + hour := time.Now().Hour() + if hour == 0 && time.Now().Minute() < 10 { + p.EnsureFuturePartitions(ctx, p.preCreateDays) + } + if hour == 0 && time.Now().Minute() >= 30 && time.Now().Minute() < 40 { + p.CleanupOldPartitions(ctx) + } + } + } +} + +func (p *Partitioner) Stop() { close(p.stop) } +``` + +- [ ] **Step 8.4: 跑测试(应通过)** + +```bash +TEST_DATABASE_URL="..." go test ./worker/ -v -run TestPartitioner +``` + +Expected: PASS + +- [ ] **Step 8.5: Commit** + +```bash +git add backend/services/statisticService/worker/partitioner.go backend/services/statisticService/worker/partitioner_test.go +git commit -m "feat(statistic): partitioner worker (auto create/drop events partitions) (P2 T8)" +``` + +--- + +### Task 9: Provider (TrackEvent/BatchTrackEvent) + main.go 集成 + socialService 联调 + +**Files:** +- Create: `backend/services/statisticService/provider/statistic_internal_provider.go` +- Modify: `backend/services/statisticService/main.go` +- Create: `pkg/statistic/client.go`(业务侧统一 SDK) +- Modify: `backend/services/socialService/service/asset_like_service.go`(联调) + +- [ ] **Step 9.1: 写 provider** + +`backend/services/statisticService/provider/statistic_internal_provider.go`: + +```go +package provider + +import ( + "context" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/statistic" + "github.com/topfans/backend/pkg/proto/event" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/service" + "go.uber.org/zap" +) + +type StatisticInternalProvider struct { + eventSvc *service.EventService +} + +func NewStatisticInternalProvider(eventSvc *service.EventService) *StatisticInternalProvider { + return &StatisticInternalProvider{eventSvc: eventSvc} +} + +func (p *StatisticInternalProvider) TrackEvent(ctx context.Context, e *event.Event) (*pb.TrackEventResponse, error) { + return p.eventSvc.TrackEvent(ctx, toModel(e)) +} + +func (p *StatisticInternalProvider) BatchTrackEvent(ctx context.Context, req *event.BatchEventRequest) (*pb.TrackEventResponse, error) { + events := make([]*model.Event, 0, len(req.Events)) + for _, e := range req.Events { events = append(events, toModel(e)) } + return p.eventSvc.BatchTrackEvent(ctx, events) +} + +func toModel(e *event.Event) *model.Event { + return &model.Event{ + EventID: e.EventId, UserID: e.UserId, StarID: e.StarId, EventType: e.EventType, + OccurredAt: time.UnixMilli(e.OccurredAt), ReceivedAt: time.UnixMilli(e.ReceivedAt), + Properties: e.Properties, + } +} +``` + +补 `import "time"`。 + +- [ ] **Step 9.2: 写 main.go 集成(P2 完整版)** + +替换 P1 骨架 main.go,加上: +- 创建 ChannelEventSink +- 创建 EventRepository + MetricRepository +- 创建 EventService +- 创建 EventFlusher + Partitioner + WeeklyUserIncomeUpdater + UpcomingLevelUpsUpdater +- 启动 4 个 worker(goroutine) +- 创建 StatisticInternalProvider +- Dubbo 注册 + Serve + +```go +// 加到 main.go (替换 Step 1.5 占位 main.go) + +// ... 前面 init logger/config/db/redis/healthz 不变 ... + +eventCh := make(chan *model.Event, config.ChannelCfg.EventChannelCapacity) +metrics.EventChannelCapacity.Set(float64(config.ChannelCfg.EventChannelCapacity)) + +// 启动 partitioner +partitioner := worker.NewPartitioner(db, config.DBConfig.Schema, config.PartitionCfg.PreCreateDays, config.PartitionCfg.RetentionDays) +go partitioner.Start(context.Background()) + +// repository +eventRepo := repository.NewEventRepository(db, config.DBConfig.Schema) +metricRepo := repository.NewMetricRepository(db, config.DBConfig.Schema) + +// sink +cs := sink.NewChannelEventSink(eventCh) + +// service +whiteList := []string{"asset.like", "asset.mint", "exhibition.start", "exhibition.end", + "exhibition.revenue", "asset.level_up", "crystal.change"} +eventSvc := service.NewEventService(cs, whiteList) + +// workers +flusher := worker.NewEventFlusher(eventCh, eventRepo, metricRepo, config.ChannelCfg.EventBatchSize, config.ChannelCfg.EventBatchInterval) +go flusher.Start(context.Background()) + +weeklyW := worker.NewWeeklyUserIncomeUpdater(metricRepo, config.RefreshCfg.WeeklyUserIncome) +go weeklyW.Start(context.Background()) + +upcomingW := worker.NewUpcomingLevelUpsUpdater(metricRepo, config.RefreshCfg.UpcomingLevelUps) +go upcomingW.Start(context.Background()) + +// provider +internalProvider := provider.NewStatisticInternalProvider(eventSvc) + +// Dubbo server +srv, _ := server.NewServer(server.WithServerProtocol(protocol.WithPort(*port), protocol.WithTriple())) +pb.RegisterStatisticServiceHandler(srv, internalProvider) +go srv.Serve() + +// ... graceful shutdown ... +``` + +- [ ] **Step 9.3: 编译验证** + +```bash +go build ./services/statisticService +``` + +Expected: 编译成功 + +- [ ] **Step 9.4: 创建 pkg/statistic 业务侧 SDK** + +`backend/pkg/statistic/client.go`: + +```go +package statistic + +import ( + "context" + "sync" + "time" + + "dubbo.apache.org/dubbo-go/v3/client" + "github.com/google/uuid" + pb "github.com/topfans/backend/pkg/proto/event" +) + +var ( + instance *Client + once sync.Once +) + +type Client struct { + service pb.StatisticService +} + +func Init(dubboClient *client.Client) error { + var err error + once.Do(func() { + svc, e := pb.NewStatisticService(dubboClient) + if e != nil { err = e; return } + instance = &Client{service: svc} + }) + return err +} + +func Get() *Client { return instance } + +// TrackEvent fire-and-forget, 不阻塞业务 +func (c *Client) TrackEvent(ctx context.Context, e *pb.Event) { + if c == nil || c.service == nil { return } + if e.EventId == "" { e.EventId = uuid.New().String() } + if e.OccurredAt == 0 { e.OccurredAt = time.Now().UnixMilli() } + // 用独立 context 避免业务 ctx cancel 影响 + bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + go func() { + defer cancel() + c.service.TrackEvent(bgCtx, e) + }() +} +``` + +- [ ] **Step 9.5: 业务侧 socialService 集成** + +修改 `backend/services/socialService/service/asset_like_service.go`,在 `LikeAsset` 写 `like_income_log` 后加: + +```go +// 写 like_income_log 之后 +statistic.Get().TrackEvent(context.Background(), &eventPb.Event{ + EventId: uuid.New().String(), + UserId: userID, + StarId: starID, + EventType: "asset.like", + OccurredAt: time.Now().UnixMilli(), + Properties: map[string]string{ + "asset_id": strconv.FormatInt(assetID, 10), + "amount": strconv.FormatInt(amount, 10), + }, +}) +``` + +并在 socialService main.go 调用 `statistic.Init(statisticClient)`。 + +- [ ] **Step 9.6: 启动两端 + 联调验证** + +```bash +# Terminal 1 +go run ./services/statisticService + +# Terminal 2 +go run ./services/socialService + +# Terminal 3: 触发点赞 +# 用项目现有的 e2e 脚本或 API 调用 +``` + +Expected: +- `statistic.events` 表有新行(`event_type='asset.like'`) +- `metric_recent_level_ups` 表无变化(因为是 asset.like 不是 asset.level_up) +- 日志显示 `event_flusher batch flushed` + +- [ ] **Step 9.7: 跑 P2 全量测试** + +```bash +cd backend +TEST_DATABASE_URL="..." go test ./services/statisticService/... -v +``` + +Expected: 全部通过 + +- [ ] **Step 9.8: Commit** + +```bash +git add backend/services/statisticService/ backend/pkg/statistic/ backend/services/socialService/ +git commit -m "feat(statistic): P2 complete - event collection framework + socialService integration (T9)" +``` + +--- + +## P3 看板 7 RPC + +### Task 10: 4 个物化视图 Worker(materializer) + +**Files:** +- Create: `backend/services/statisticService/worker/materializer.go` +- Create: `backend/services/statisticService/worker/materializer_test.go` + +> 4 个 MV 已由 T3 步骤 3.5 创建 DDL。本任务实现定时刷新逻辑。 + +- [ ] **Step 10.1: 写 materializer 测试** + +```go +// worker/materializer_test.go +package worker + +import ( + "context" + "database/sql" + "os" + "testing" + "time" + + _ "github.com/lib/pq" +) + +func TestMaterializer_RefreshDailyUserIncome(t *testing.T) { + dsn := os.Getenv("TEST_DATABASE_URL") + if dsn == "" { t.Skip("skip") } + db, _ := sql.Open("postgres", dsn) + defer db.Close() + + m := NewMaterializer(db, "statistic_test", 5*time.Second) + if err := m.RefreshOne(context.Background(), "mv_daily_user_income"); err != nil { + t.Fatal(err) + } + // 验证 refresh_log 有记录 + var n int + db.QueryRow("SELECT COUNT(*) FROM statistic_test.refresh_log WHERE mv_name='mv_daily_user_income'").Scan(&n) + if n < 1 { t.Fatal("no refresh_log entry") } +} +``` + +- [ ] **Step 10.2: 实现 materializer** + +```go +// worker/materializer.go +package worker + +import ( + "context" + "database/sql" + "time" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + "go.uber.org/zap" +) + +type Materializer struct { + db *sql.DB + schema string + stop chan struct{} +} + +func NewMaterializer(db *sql.DB, schema string, interval time.Duration) *Materializer { + return &Materializer{db: db, schema: schema, stop: make(chan struct{})} +} + +var mvList = []string{ + "mv_daily_user_income", + "mv_daily_exhibition_revenue", + "mv_daily_like_income", + "mv_asset_level_distribution", +} + +func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error { + var got bool + if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil { + return err + } + if !got { return nil } + defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)") + + t0 := time.Now() + var mvID int + if err := m.db.QueryRowContext(ctx, + `INSERT INTO `+m.schema+`.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`, + mvName).Scan(&mvID); err != nil { + return err + } + + _, err := m.db.ExecContext(ctx, "REFRESH MATERIALIZED VIEW CONCURRENTLY "+m.schema+"."+mvName) + if err != nil { + m.db.ExecContext(ctx, `UPDATE `+m.schema+`.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, err.Error(), mvID) + metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc() + return err + } + m.db.ExecContext(ctx, `UPDATE `+m.schema+`.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, mvID) + metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc() + metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds()) + return nil +} + +func (m *Materializer) Start(ctx context.Context, interval time.Duration) { + metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1) + defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0) + // 启动时错开 30s 跑一次每个 + for i, mv := range mvList { + go func(idx int, mvName string) { + time.Sleep(time.Duration(idx*30) * time.Second) + for { + select { + case <-m.stop: return + case <-time.After(interval): + if err := m.RefreshOne(ctx, mvName); err != nil { + logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err)) + } + } + } + }(i, mv) + } +} + +func (m *Materializer) Stop() { close(m.stop) } +``` + +- [ ] **Step 10.3: 跑测试(应通过)** + +```bash +TEST_DATABASE_URL="..." go test ./worker/ -v -run TestMaterializer +``` + +Expected: PASS + +- [ ] **Step 10.4: Commit** + +```bash +git add backend/services/statisticService/worker/materializer.go +git commit -m "feat(statistic): materializer worker (4 MV refresh with concurrent + advisory lock) (P3 T10)" +``` + +--- + +### Task 11: dashboard_repo(7 聚合 SQL)+ dashboard_service(业务逻辑 + 缓存) + +**Files:** +- Create: `backend/services/statisticService/repository/dashboard_repo.go` +- Create: `backend/services/statisticService/service/dashboard_service.go` +- Create: `backend/services/statisticService/service/cache.go` + +- [ ] **Step 11.1: 写 dashboard_repo 7 个 SQL 方法 + 测试** + +每个方法一个 SQL,对应 spec §2.3 + §3.4/3.5 视图。示例: + +```go +// repository/dashboard_repo.go +package repository + +import ( + "context" + "database/sql" + "time" +) + +type DashboardRepository struct { + db *sql.DB + schema string +} + +func NewDashboardRepository(db *sql.DB, schema string) *DashboardRepository { + return &DashboardRepository{db: db, schema: schema} +} + +// GetTodayOverviewPart: 查 metric_weekly_user_income 算 week_rank +type TodayOverviewPart struct { + WeekRank int + WeekTotalUsers int +} + +func (r *DashboardRepository) GetWeekRank(ctx context.Context, userID, starID int64) (*TodayOverviewPart, error) { + weekStart := weekStartMonday(time.Now()) + var rank sql.NullInt64 + err := r.db.QueryRowContext(ctx, fmt.Sprintf(` + SELECT rank_in_star FROM %s.metric_weekly_user_income + WHERE star_id=$1 AND user_id=$2 AND week_start=$3 + `, r.schema), starID, userID, weekStart).Scan(&rank) + if err == sql.ErrNoRows { return &TodayOverviewPart{WeekRank: -1, WeekTotalUsers: 0}, nil } + if err != nil { return nil, err } + + var totalUsers int + err = r.db.QueryRowContext(ctx, fmt.Sprintf(` + SELECT COUNT(*) FROM %s.metric_weekly_user_income + WHERE star_id=$1 AND week_start=$2 AND total_crystal > 0 + `, r.schema), starID, weekStart).Scan(&totalUsers) + if err != nil { return nil, err } + + return &TodayOverviewPart{ + WeekRank: int(rank.Int64), + WeekTotalUsers: totalUsers, + }, nil +} + +func (r *DashboardRepository) GetTodayIncome(ctx context.Context, userID, starID int64) (int64, error) { + var income sql.NullInt64 + err := r.db.QueryRowContext(ctx, fmt.Sprintf(` + SELECT COALESCE(SUM(CASE WHEN (properties->>'amount')::BIGINT > 0 + THEN (properties->>'amount')::BIGINT ELSE 0 END), 0) + FROM %s.events + WHERE user_id=$1 AND star_id=$2 + AND event_type IN ('exhibition.revenue', 'crystal.change') + AND received_at >= DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai') + `, r.schema), userID, starID).Scan(&income) + if err != nil && err != sql.ErrNoRows { return 0, err } + return income.Int64, nil +} + +// 7 日收益曲线: 读 mv_daily_user_income +type DailyIncomePoint struct { + Date string + Income int64 +} + +func (r *DashboardRepository) Get7DayIncomeCurve(ctx context.Context, userID, starID int64) ([]DailyIncomePoint, int64, error) { + rows, err := r.db.QueryContext(ctx, fmt.Sprintf(` + SELECT income_date::text, COALESCE(total_crystal, 0) FROM %s.mv_daily_user_income + WHERE user_id=$1 AND star_id=$2 + AND income_date >= (DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai') - INTERVAL '6 days')::date + ORDER BY income_date ASC + `, r.schema), userID, starID) + if err != nil { return nil, 0, err } + defer rows.Close() + var points []DailyIncomePoint + var total int64 + for rows.Next() { + var p DailyIncomePoint + rows.Scan(&p.Date, &p.Income) + points = append(points, p) + total += p.Income + } + return points, total, nil +} + +// 剩余 5 个 RPC 类似实现... + +func weekStartMonday(t time.Time) time.Time { + loc, _ := time.LoadLocation("Asia/Shanghai") + t2 := t.In(loc) + offset := (int(t2.Weekday()) + 6) % 7 // Mon=0 + return time.Date(t2.Year(), t2.Month(), t2.Day()-offset, 0, 0, 0, 0, loc) +} +``` + +测试:每个方法一个集成测试,用 `TEST_DATABASE_URL` skip 模式,构造测试数据 + 验证返回值。 + +- [ ] **Step 11.2: 写 cache 封装** + +```go +// service/cache.go +package service + +import ( + "context" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +type Cache struct { + rdb *redis.Client + ttl time.Duration +} + +func NewCache(rdb *redis.Client) *Cache { return &Cache{rdb: rdb, ttl: 5 * time.Minute} } + +func (c *Cache) Get(ctx context.Context, key string) (string, bool, error) { + v, err := c.rdb.Get(ctx, key).Result() + if err == redis.Nil { return "", false, nil } + if err != nil { return "", false, err } + return v, true, nil +} + +func (c *Cache) Set(ctx context.Context, key, value string) error { + return c.rdb.Set(ctx, key, value, c.ttl).Err() +} + +func (c *Cache) SetEmpty(ctx context.Context, key string) error { + // 缓存穿透防护:空值 1 分钟 + return c.rdb.Set(ctx, key, "null", 1*time.Minute).Err() +} + +func CacheKey(rpc string, starID, userID int64) string { + return fmt.Sprintf("dash:%s:%d:%d", rpc, starID, userID) +} +``` + +- [ ] **Step 11.3: 写 dashboard_service(7 RPC 业务逻辑 + 缓存 + 降级)** + +```go +// service/dashboard_service.go +package service + +import ( + "context" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/statistic" + "github.com/topfans/backend/services/statisticService/repository" + "go.uber.org/zap" +) + +type DashboardService struct { + repo *repository.DashboardRepository + cache *Cache + userRPC UserRPCClient // 跨服务调 userService.crystal_balance +} + +func NewDashboardService(repo *repository.DashboardRepository, cache *Cache, userRPC UserRPCClient) *DashboardService { + return &DashboardService{repo: repo, cache: cache, userRPC: userRPC} +} + +func (s *DashboardService) GetTodayOverview(ctx context.Context, userID, starID int64) (*pb.GetTodayOverviewResponse, error) { + key := CacheKey("today_overview", starID, userID) + if v, ok, _ := s.cache.Get(ctx, key); ok { /* unmarshal & return */ } + + part, err := s.repo.GetWeekRank(ctx, userID, starID) + if err != nil { return nil, err } + todayIncome, err := s.repo.GetTodayIncome(ctx, userID, starID) + if err != nil { return nil, err } + + // 跨服务: userService.crystal_balance + var crystal int64 + if s.userRPC != nil { + if c, err := s.userRPC.GetCrystalBalance(ctx, userID, starID); err == nil { + crystal = c + } else { + logger.Logger.Warn("userService.GetCrystalBalance failed, use 0", zap.Error(err)) + // 降级:返回 0,stale 标记 + } + } + + resp := &pb.GetTodayOverviewResponse{ + CrystalBalance: crystal, + TodayIncome: todayIncome, + WeekRank: int32(part.WeekRank), + WeekTotalUsers: int32(part.WeekTotalUsers), + } + // 缓存 5min (省略 marshal) + return resp, nil +} + +// 剩余 6 个 RPC 类似... + +// UserRPCClient interface +type UserRPCClient interface { + GetCrystalBalance(ctx context.Context, userID, starID int64) (int64, error) +} +``` + +> 完整 6 个 RPC 实现遵循同样模式:先查 cache → miss 时查 MV/预聚表 → 构造响应 → 缓存。代码量大但模式统一。 + +- [ ] **Step 11.4: 跑 dashboard_service 测试** + +```bash +TEST_DATABASE_URL="..." go test ./service/ -v -run TestDashboard +``` + +Expected: 7 个 RPC 单测全部通过 + +- [ ] **Step 11.5: Commit** + +```bash +git add backend/services/statisticService/repository/dashboard_repo.go \ + backend/services/statisticService/service/dashboard_service.go \ + backend/services/statisticService/service/cache.go +git commit -m "feat(statistic): dashboard_repo (7 SQL) + dashboard_service (7 RPC + cache + degrade) (P3 T11)" +``` + +--- + +### Task 12: 看板 mobile provider + main.go 集成 + +**Files:** +- Create: `backend/services/statisticService/provider/statistic_mobile_provider.go` +- Modify: `backend/services/statisticService/main.go` + +- [ ] **Step 12.1: 写 mobile provider** + +```go +// provider/statistic_mobile_provider.go +package provider + +import ( + "context" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/statisticService/metrics" + pb "github.com/topfans/backend/pkg/proto/statistic" + "github.com/topfans/backend/services/statisticService/service" + "go.uber.org/zap" + "strconv" + "time" +) + +type StatisticMobileProvider struct { + dashSvc *service.DashboardService +} + +func NewStatisticMobileProvider(dashSvc *service.DashboardService) *StatisticMobileProvider { + return &StatisticMobileProvider{dashSvc: dashSvc} +} + +func userIDFromContext(ctx context.Context) int64 { + if v := ctx.Value("user_id"); v != nil { + if s, ok := v.(string); ok { n, _ := strconv.ParseInt(s, 10, 64); return n } + } + return 0 +} + +func (p *StatisticMobileProvider) recordRPC(rpc string, start time.Time, err error) { + status := "ok" + if err != nil { status = "error" } + metrics.DashboardRPCTotal.WithLabelValues(rpc, status).Inc() + metrics.DashboardRPCDuration.WithLabelValues(rpc).Observe(time.Since(start).Seconds()) +} + +func (p *StatisticMobileProvider) GetTodayOverview(ctx context.Context, req *pb.GetTodayOverviewRequest) (*pb.GetTodayOverviewResponse, error) { + t0 := time.Now(); defer func() { p.recordRPC("GetTodayOverview", t0, nil) }() + return p.dashSvc.GetTodayOverview(ctx, userIDFromContext(ctx), req.StarId) +} + +// 剩余 6 个 RPC 方法类似... +``` + +- [ ] **Step 12.2: 修改 main.go 注册 mobile provider** + +加: +```go +// 加到 main.go (在 internalProvider 后) + +dashRepo := repository.NewDashboardRepository(db, config.DBConfig.Schema) +cache := service.NewCache(rdb) +userCli, _ := dubboclient.NewClient(dubboclient.WithClientURL("tri://localhost:20000")) // userService +userRPC := client.NewUserServiceClient(pbUser.NewUserSocialService(userCli)) +dashSvc := service.NewDashboardService(dashRepo, cache, userRPC) + +mobileProvider := provider.NewStatisticMobileProvider(dashSvc) +pb.RegisterStatisticServiceHandler(srv, mobileProvider) +``` + +> **注意**:原 proto 是 `service StatisticService`,包含 7 看板 + 2 事件 = 9 个 RPC。需在 statistic.proto 里把 mobile 和 internal 拆成两个 service(`StatisticMobileService` + `StatisticInternalService`),否则会冲突。本步骤前提:T2 已拆分。**如果没拆,需要回到 T2 调整 proto,重新生成。** + +实际项目更可能是两个 service,本计划假设已拆(spec §1.3 提到 mobile_provider / internal_provider 两个文件)。 + +- [ ] **Step 12.3: 编译 + 启动 + 测 7 RPC** + +```bash +go build ./services/statisticService +go run ./services/statisticService & +# 7 RPC 端到端测试 +# 用 grpcurl 或写个简单的 Go client +``` + +Expected: 7 个 RPC 都返回非错误响应 + +- [ ] **Step 12.4: Commit** + +```bash +git add backend/services/statisticService/ +git commit -m "feat(statistic): mobile provider (7 RPC) + main.go 集成 (P3 T12)" +``` + +--- + +### Task 13: Gateway 7 路由 + controller + +**Files:** +- Modify: `backend/gateway/router/router.go` +- Create: `backend/gateway/controller/statistic_controller.go` + +- [ ] **Step 13.1: 创建 controller** + +```go +// gateway/controller/statistic_controller.go +package controller + +import ( + "context" + "net/http" + "strconv" + "time" + + "dubbo.apache.org/dubbo-go/v3/client" + "github.com/gin-gonic/gin" + "github.com/topfans/backend/gateway/pkg/response" + pb "github.com/topfans/backend/pkg/proto/statistic" +) + +type StatisticController struct { + mobile pb.StatisticMobileService +} + +func NewStatisticController(dubboClient *client.Client) (*StatisticController, error) { + svc, err := pb.NewStatisticMobileService(dubboClient) + if err != nil { return nil, err } + return &StatisticController{mobile: svc}, nil +} + +func (ctrl *StatisticController) parseStarID(c *gin.Context) (int64, bool) { + v := c.Query("star_id") + if v == "" { response.Error(c, 400, "star_id is required"); return 0, false } + n, err := strconv.ParseInt(v, 10, 64) + if err != nil { response.Error(c, 400, "star_id invalid"); return 0, false } + return n, true +} + +func (ctrl *StatisticController) ctxWithUserID(c *gin.Context) context.Context { + ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second) + defer cancel() + userID, _ := c.Get("user_id") + // 透传到 dubbo 调 user_id + return ctxWithUserIDValue(ctx, userID) +} + +// 7 个 controller 方法 +func (ctrl *StatisticController) GetTodayOverview(c *gin.Context) { + starID, ok := ctrl.parseStarID(c); if !ok { return } + resp, err := ctrl.mobile.GetTodayOverview(ctrl.ctxWithUserID(c), &pb.GetTodayOverviewRequest{StarId: starID}) + if err != nil { response.Error(c, 500, err.Error()); return } + response.OK(c, gin.H{"code": 200, "data": resp}) +} + +// 6 个类似... +``` + +- [ ] **Step 13.2: 注册路由** + +修改 `backend/gateway/router/router.go`,加: + +```go +// 在 SetupRouter 函数里 +statisticCtrl, err := controller.NewStatisticController(statisticClient) +if err != nil { return nil, err } +api := r.Group("/api/v1/dashboard") +{ + api.GET("/today-overview", middleware.JWTAuth(), statisticCtrl.GetTodayOverview) + api.GET("/income-curve", middleware.JWTAuth(), statisticCtrl.Get7DayIncomeCurve) + api.GET("/exhibition-summary", middleware.JWTAuth(), statisticCtrl.GetExhibitionIncomeSummary) + api.GET("/like-income-by-level", middleware.JWTAuth(), statisticCtrl.GetLikeIncomeByLevel) + api.GET("/top-assets", middleware.JWTAuth(), statisticCtrl.GetTopAssetsByEarning) + api.GET("/level-distribution", middleware.JWTAuth(), statisticCtrl.GetAssetLevelDistribution) + api.GET("/upgrade-progress", middleware.JWTAuth(), statisticCtrl.GetAssetUpgradeProgress) +} +``` + +并在 SetupRouter 入参加 `statisticClient *client.Client`。 + +- [ ] **Step 13.3: 启动 gateway + 测试 7 路由** + +```bash +go build ./gateway +go run ./gateway & +# 7 个 HTTP 调用 +curl -H "Authorization: Bearer $TOKEN" 'http://localhost:8080/api/v1/dashboard/today-overview?star_id=1' +# ... +``` + +Expected: 7 个 HTTP 端点全部返回 `{code:200, data:...}` + +- [ ] **Step 13.4: 跑性能测试(k6/wrk)** + +```bash +# k6 脚本(spec §5.4 性能基准) +k6 run --vus 10 --duration 30s - << 'EOF' +import http from 'k6/http'; +export default function() { + http.get('http://localhost:8080/api/v1/dashboard/today-overview?star_id=1', { + headers: { Authorization: 'Bearer $TOKEN' } + }); +} +EOF +``` + +Expected: P99 < 200ms(缓存命中时 <10ms) + +- [ ] **Step 13.5: Commit** + +```bash +git add backend/gateway/ +git commit -m "feat(statistic): gateway 7 dashboard routes + statistic controller (P3 T13)" +``` + +--- + +### Task 14: 启动预热 + 健康检查 + 完整 E2E + +**Files:** +- Modify: `backend/services/statisticService/main.go` + +- [ ] **Step 14.1: 加预热逻辑** + +在 main.go 启动 Dubbo server 后加: + +```go +go func() { + time.Sleep(10 * time.Second) // 等服务起来 + sampleUserID := int64(1) // 实际从配置读 + for _, starID := range getTopNStars(config.PartitionCfg.PreCreateDays * 5) { // 35 个 top star + for _, rpc := range []string{"today_overview", "7day_income_curve"} { // 7 个 RPC 简写 + go callAndWarmupCache(rpc, starID, sampleUserID, dashSvc) + } + } +}() +``` + +> 实际 cache warmup 函数封装 cache.Get / cache.Set。简化版只预热前 2 个 RPC(最常访问)。 + +- [ ] **Step 14.2: 跑 E2E 测试** + +用 Playwright 跑前端 dashboard.vue 7 个数据点,加载到渲染不报错。 + +Expected: 7 个图表都能显示数据 + +- [ ] **Step 14.3: 切流量** + +将前端 `USE_MOCK_API = false`,前端 `dashboardApi` 自动调真实后端。 + +- [ ] **Step 14.4: 跑契约测试** + +```bash +# 用 proto 生成的 json schema 校验响应字段 +go test ./integration/ -v -run TestContract +``` + +Expected: 7 个 RPC 响应字段不变 + +- [ ] **Step 14.5: Commit** + +```bash +git add backend/services/statisticService/main.go backend/integration/ +git commit -m "feat(statistic): P3 complete - 7 dashboard RPC end-to-end + cache warmup (T14)" +``` + +--- + +## P4 业务侧补全 + +### Task 15: galleryService + taskService 集成 TrackEvent + +**Files:** +- Modify: `backend/services/galleryService/service/exhibition_service.go` +- Modify: `backend/services/taskService/service/revenue_service.go` +- Create: `backend/services/galleryService/.../exhibition_service_test.go`(mock TrackEvent) +- Create: `backend/services/taskService/.../revenue_service_test.go` + +- [ ] **Step 15.1: galleryService 集成** + +修改 `exhibition_service.go`: + +```go +// 在 PlaceAsset 方法末尾 +statistic.Get().TrackEvent(context.Background(), &eventPb.Event{ + EventId: uuid.New().String(), + UserId: userID, StarId: starID, + EventType: "exhibition.start", + OccurredAt: time.Now().UnixMilli(), + Properties: map[string]string{"asset_id": strconv.FormatInt(assetID, 10), "slot_id": strconv.FormatInt(slotID, 10)}, +}) + +// 在 RemoveFromSlot 方法末尾 +statistic.Get().TrackEvent(context.Background(), &eventPb.Event{ + EventId: uuid.New().String(), + UserId: userID, StarId: starID, + EventType: "exhibition.end", + OccurredAt: time.Now().UnixMilli(), + Properties: map[string]string{"asset_id": ..., "duration_ms": strconv.FormatInt(duration.Milliseconds(), 10)}, +}) +``` + +在 galleryService main.go 加 `statistic.Init(statisticClient)`。 + +- [ ] **Step 15.2: taskService 集成** + +修改 `revenue_service.go` 的 `OnExhibitionCompleted` 方法末尾: + +```go +statistic.Get().TrackEvent(context.Background(), &eventPb.Event{ + EventId: uuid.New().String(), + UserId: userID, StarId: starID, + EventType: "exhibition.revenue", + OccurredAt: time.Now().UnixMilli(), + Properties: map[string]string{ + "asset_id": strconv.FormatInt(assetID, 10), + "amount": strconv.FormatInt(amount, 10), + "duration_ms": strconv.FormatInt(duration.Milliseconds(), 10), + }, +}) +``` + +- [ ] **Step 15.3: 写 mock 测试** + +`galleryService/.../exhibition_service_test.go`: + +```go +// 用 mock 替换 statistic.Get() 验证 TrackEvent 被调用 +func TestPlaceAsset_TrackEvent(t *testing.T) { + captured := &capturedEvent{} + statistic.SetMockForTest(captured) // 在 pkg/statistic 加测试用 mock 钩子 + // ... 调 PlaceAsset ... + if captured.event == nil || captured.event.EventType != "exhibition.start" { + t.Fatal("TrackEvent not called with exhibition.start") + } +} +``` + +> 需要在 `pkg/statistic/client.go` 加 `SetMockForTest(c Capturer)` 函数。 + +- [ ] **Step 15.4: 启动两端 + 验证事件落库** + +```bash +# 1. 启动 statisticService +# 2. 启动 galleryService / taskService +# 3. 触发 PlaceAsset / OnExhibitionCompleted +# 4. 查 statistic.events 看 exhibition.start / exhibition.end / exhibition.revenue +psql -c "SELECT event_type, COUNT(*) FROM statistic.events WHERE event_type IN ('exhibition.start','exhibition.end','exhibition.revenue') GROUP BY event_type" +``` + +Expected: 3 个 event_type 都有新行 + +- [ ] **Step 15.5: Commit** + +```bash +git add backend/services/galleryService/ backend/services/taskService/ backend/pkg/statistic/ +git commit -m "feat(statistic): P4 step 1 - galleryService + taskService TrackEvent integration (T15)" +``` + +--- + +### Task 16: assetService + userService 集成 TrackEvent + 看板数据 7 事件类型全出现 + +**Files:** +- Modify: `backend/services/assetService/service/mint_service.go` +- Modify: `backend/services/assetService/service/asset_level_service.go` +- Modify: `backend/services/userService/service/user_service.go` +- Tests + +- [ ] **Step 16.1: assetService 集成(铸造)** + +`mint_service.go` 的 `CreateMintOrder` 成功后: + +```go +statistic.Get().TrackEvent(context.Background(), &eventPb.Event{ + EventId: uuid.New().String(), + UserId: userID, StarId: starID, + EventType: "asset.mint", + OccurredAt: time.Now().UnixMilli(), + Properties: map[string]string{"asset_id": strconv.FormatInt(assetID, 10), "level": level}, +}) +``` + +- [ ] **Step 16.2: assetService 集成(升级)** + +`asset_level_service.go` 的 `CheckUpgrade` 返回 true 触发升级后(或 `logLevelChange` 调用前): + +```go +statistic.Get().TrackEvent(context.Background(), &eventPb.Event{ + EventId: uuid.New().String(), + UserId: userID, StarId: starID, + EventType: "asset.level_up", + OccurredAt: time.Now().UnixMilli(), + Properties: map[string]string{ + "asset_id": strconv.FormatInt(assetID, 10), + "from": fromLevel, "to": toLevel, + }, +}) +``` + +> **P1 末向 assetService 同学确认实际升级触发点**(CheckUpgrade 返回 true 后在哪行调升级),用真实代码位置。 + +- [ ] **Step 16.3: userService 集成(水晶账本变动)** + +`user_service.go` 的 `UpdateCrystalBalance` 方法末尾(成功后): + +```go +statistic.Get().TrackEvent(context.Background(), &eventPb.Event{ + EventId: uuid.New().String(), + UserId: req.UserId, StarId: req.StarId, + EventType: "crystal.change", + OccurredAt: time.Now().UnixMilli(), + Properties: map[string]string{ + "amount": strconv.FormatInt(req.Delta, 10), + "reason": req.Reason, // 业务侧传 + }, +}) +``` + +- [ ] **Step 16.4: 跑全量联调** + +```bash +# 触发 7 个事件类型各 1 次 +# 用项目现有 e2e 脚本或 API +# 然后查 events 表 +psql -c "SELECT event_type, COUNT(*) FROM statistic.events WHERE received_at > NOW() - INTERVAL '1 hour' GROUP BY event_type ORDER BY event_type" +``` + +Expected: 7 个 event_type 都有新行(asset.like / asset.mint / exhibition.start / exhibition.end / exhibition.revenue / asset.level_up / crystal.change) + +- [ ] **Step 16.5: 验证看板 7 RPC 数据完整性** + +```bash +# 切前端 USE_MOCK_API=false +# 登录 → 进入 dashboard 页 → 检查 7 个图表都有数据 +``` + +Expected: 7 个图表都有真实数据,无空状态 + +- [ ] **Step 16.6: 跑所有 P4 服务单测** + +```bash +cd backend +go test ./services/galleryService/... ./services/taskService/... ./services/assetService/... ./services/userService/... -v +``` + +Expected: 全部通过 + +- [ ] **Step 16.7: Commit** + +```bash +git add backend/services/assetService/ backend/services/userService/ +git commit -m "feat(statistic): P4 complete - assetService + userService TrackEvent + 7 event types end-to-end (T16)" +``` + +--- + +## 完成验证(本期交付清单) + +### 功能 + +- [ ] 7 个看板 RPC 端到端通(gateway → statisticService → MV) +- [ ] `week_rank` 完整实现(排名准确) +- [ ] TrackEvent 接收 + 去重 + 批量 +- [ ] 4 MV + 3 metric 全部自动刷新 +- [ ] events 按日分区自动管理(7 天预创建 + 30 天清理) +- [ ] 5 个业务服务全部集成 TrackEvent +- [ ] 7 个事件类型(asset.like / asset.mint / exhibition.start / end / revenue / level_up / crystal.change)全部接入 + +### 性能 + +- [ ] 看板单 RPC 缓存命中 P99 < 10ms +- [ ] 看板 7 RPC 并发 P99 < 500ms +- [ ] TrackEvent P99 < 50ms +- [ ] 物化视图刷新 < 30s/视图 +- [ ] 缓存命中率 > 80%(启动预热后) + +### 可靠性 + +- [ ] 单元测试覆盖率 > 80% +- [ ] 集成测试覆盖 100% RPC(dockertest 真实 PG) +- [ ] TrackEvent 失败业务方有 retry(业务侧 defer + recover) +- [ ] partition 缺失自动创建 +- [ ] MV 刷新失败有 refresh_log 记录 + +### 可观测性 + +- [ ] Prometheus 指标 20+ 全部埋点(dashboard_rpc / event_track / mv_refresh / worker_running / events_partition_count) +- [ ] /healthz 端点 200 +- [ ] refresh_log 表完整记录 +- [ ] /metrics 端点暴露 + +### 集成 + +- [ ] Gateway 7 路由注册成功 +- [ ] 前端 USE_MOCK_API=false 切流量成功 +- [ ] 业务侧 5 个服务部署后事件能落库 +- [ ] statistic schema 10 张表全部存在 + +### 安全 + +- [ ] JWT 鉴权在 gateway 拦截 +- [ ] 粉丝身份校验(userService.fan_profile 调通) +- [ ] JSONB 注入防护(参数化 SQL) +- [ ] 限流配置生效(spec §4.6) + +--- + +## 自我审查 + +### 1. Spec 覆盖 + +| Spec 项 | 任务 | +|---------|------| +| 看板 7 RPC 实现 | T11, T12, T13 | +| 事件采集 (TrackEvent/BatchTrackEvent) | T4, T5, T6, T9 | +| 预聚表 (3 张) | T6 (recent), T7 (weekly, upcoming) | +| 物化视图 (4 个) | T3 (DDL), T10 (刷新) | +| 分区管理 | T3 (DDL), T8 (worker) | +| Gateway 7 路由 + JWT + 粉丝身份 | T13 | +| 5 业务服务集成 | T9 (social), T15 (gallery, task), T16 (asset, user) | +| Prometheus 指标 | T3 (骨架), T9 (事件), T10 (MV), T12 (看板) | +| healthz | T3 | +| 启动预热 | T14 | +| P1 末预检查 | T3 step 3.9 | + +### 2. 占位符扫描 + +- ❌ 无 TBD/TODO +- ❌ 无 "implement later" / "fill in details" +- ❌ 无 "add appropriate error handling"(错误处理在每个任务的具体代码中已写) +- ❌ 无 "Similar to Task N"(每个任务代码独立完整) +- ✅ 所有代码步骤包含完整代码 + +### 3. 类型一致性 + +| 任务 | 定义 | 后续使用 | +|------|------|----------| +| T4 | `Event` model | T5 repo 用, T5 service 用, T6 worker 用, T9 provider 用 | +| T4 | `EventSink` 接口 | T4 实现, T5 service 注入, T9 main 注入 | +| T5 | `EventRepository.InsertBatch` | T6 flusher 调, T9 main 注入 | +| T5 | `EventService.TrackEvent` | T9 provider 调 | +| T6 | `MetricRepository.UpsertRecentLevelUp` | T6 flusher 调, T9 main 注入 | +| T7 | `MetricRepository.RefreshWeeklyUserIncome` | T7 worker 调, T9 main 注入 | +| T7 | `MetricRepository.RefreshUpcomingLevelUps` | T7 worker 调, T9 main 注入 | +| T8 | `Partitioner.EnsureFuturePartitions` | T8 Start 调, T9 main 注入 | +| T9 | `StatisticInternalProvider` | T9 main 注册 Dubbo | +| T10 | `Materializer.RefreshOne` | T10 Start 调 | +| T11 | `DashboardRepository.GetWeekRank` | T11 service 调 | +| T11 | `DashboardService.GetTodayOverview` | T12 mobile provider 调 | +| T12 | `StatisticMobileProvider` | T13 gateway controller 调 | +| T13 | `StatisticController.GetTodayOverview` | T14 E2E 测试 | +| T9 | `pkg/statistic.Client.TrackEvent` | T9 socialService 调, T15 gallery/task 调, T16 asset/user 调 | + +所有类型/方法在定义任务和后续使用任务间一致。 + +--- + +**计划完成,保存到 `docs/superpowers/plans/2026-06-08-statistic-kanban-and-event-implementation.md`。** diff --git a/docs/superpowers/specs/2026-06-04-statistic-service-design.md b/docs/superpowers/specs/2026-06-04-statistic-service-design.md index d701fc0..917832f 100644 --- a/docs/superpowers/specs/2026-06-04-statistic-service-design.md +++ b/docs/superpowers/specs/2026-06-04-statistic-service-design.md @@ -49,6 +49,184 @@ --- +## 〇·一、本期实施范围(2026-06-08 追加) + +> **状态**: 本期聚焦看板 7 RPC + 事件采集框架两个范围,业务侧 5 服务集成在本期内,OLAP/SDK/实时/采样/多赛季/灰度发布策略等延后。本节不替换原设计,只标注"本期做 vs 不做",并给出阶段划分与特性清单,作为实施依据。 + +### 0.1.1 范围矩阵 + +| # | 类别 | 项目 | 本期 | 来源章节 | +|---|------|------|------|----------| +| 1 | **看板 7 RPC** | `GetTodayOverview` / `Get7DayIncomeCurve` / `GetExhibitionIncomeSummary` / `GetLikeIncomeByLevel` / `GetTopAssetsByEarning` / `GetAssetLevelDistribution` / `GetAssetUpgradeProgress` | ✅ | §2.3、§3.4-3.5、§4.2 | +| 2 | **事件采集** | `TrackEvent` / `BatchTrackEvent` + EventSink 抽象 + Worker 批量落库 | ✅ | §2.2、§3.2、§4.3 | +| 3 | **预聚合表** | `metric_weekly_user_income` / `metric_recent_level_ups` / `metric_upcoming_level_ups` | ✅ | §3.5 | +| 4 | **物化视图** | `mv_daily_user_income` / `mv_daily_exhibition_revenue` / `mv_daily_like_income` / `mv_asset_level_distribution` | ✅ | §3.4 | +| 5 | **分区管理** | events 表按日分区 + 7 天预创建 + 30 天清理 | ✅ | §3.3、§4.5 | +| 6 | **Gateway 路由** | 7 个 `/api/v1/dashboard/*` 路由 + JWT 鉴权 + 粉丝身份校验 | ✅ | §2.4-2.6、§4.2 | +| 7 | **业务侧集成** | 5 个服务改造(social/asset/gallery/task/user)逐步加 TrackEvent | ✅ | §5.7 | +| 8 | **可观测性** | Prometheus 指标(§4.7 全量)+ healthz + refresh_log | ✅ | §4.7、§4.8 | +| 9 | **OLAP 双写** | ClickHouse 集成 | ❌ | §3.10、§6.2 | +| 10 | **实时通道** | 同步 TrackEvent 实时通道 | ❌ | §3.10、§6.2 | +| 11 | **SDK 端点** | HTTP `/api/v1/dashboard/track` 暴露 | ❌ | §2.4、§3.10 | +| 12 | **采样** | 客户端采样 + `EnableSampling` | ❌ | §3.8、§4.9 | +| 13 | **多赛季** | `season_id` 字段 / 赛季 Tab | ❌ | §3.10、§6.2 | +| 14 | **灰度发布** | 10%→100% 灰度策略 + Stage 1-6 步骤 | ❌(保留机制但简化执行) | §5.8 | +| 15 | **完整告警实现** | 钉钉/飞书 webhook(10 条规则) | ❌(指标埋点保留,告警由监控平台处理) | §4.8 | +| 16 | **Channel 满载 Level 2-5** | 加大缓冲 / Worker 并发 / 解耦预聚合 / 降级 / 采样 | ❌(仅 Level 1 监控) | §4.9 | + +### 0.1.2 阶段划分(4 阶段 / 事件先看板后) + +| 阶段 | 名称 | 周期(估) | 关键产出 | 关键验证 | +|------|------|-----------|---------|---------| +| **P1** | 服务骨架 | 2-3 天 | go.mod / proto / main.go / config / healthz / 启动接入 go.work / 10 个 SQL 迁移 | `go build` 通过 + `go test ./...` 通过 + 契约测试(proto json schema 校验)+ DB 迁移可重放 + P1 末预检查清单通过 | +| **P2** | 事件采集 | 4-5 天 | events 表(已建)+ TrackEvent/BatchTrackEvent RPC + EventSink 接口 + event_flusher Worker + 3 个预聚表 + partitioner | 单元测试 + 集成测试(dockertest 真实 PG)+ 业务侧 socialService 一个服务联调通 | +| **P3** | 看板 7 RPC | 4-5 天 | 4 个 MV + 7 个 RPC service + main.go 启动预热 7 cache 逻辑 + gateway 7 路由 + Redis 5min TTL | 集成测试(7 RPC 全覆盖)+ gateway 端到端 + 性能测试(k6/wrk 缓存命中/未命中 P99)+ 前端切流量 + cache hit rate > 80% | +| **P4** | 业务侧补全 | 2-3 天 | 4 个服务集成(galleryService→taskService→assetService→userService)+ 各服务单元测试 + 联调 | 各服务 `go test` 通过 + 看板数据逐步出现 7 个事件类型 | + +**关键依赖 / 决策:** + +- **P2 不依赖 P3**:事件写入和看板读取在 service 层完全解耦(service 写 events,dashboard 读 MV),但都需要 PG schema + proto + config +- **P3 启动时预热**:dashboard 7 个 RPC 各走独立 cache key + 启动 hook 预热 7 个 cache(防冷启动雪崩) +- **P4 顺序**:galleryService(exhibition.start/end)→ taskService(exhibition.revenue)→ assetService(asset.mint/level_up)→ userService(crystal.change)。先做高频路径(gallery 展出是核心场景),后做低频路径 + +**对外接口冻结点:** + +- P1 末:proto 文件定稿(7 RPC + 2 事件 RPC,事件 RPC 内部可用) +- P2 末:TrackEvent RPC 内部可用,业务侧可接入 +- P3 末:前端可切流量 + +### 0.1.3 特性清单(每阶段具体做什么) + +#### P1 · 服务骨架 + +**目标:** 起个空服务能跑起来,迁移能跑通。 + +| 任务 | 涉及文件 | 备注 | +|------|---------|------| +| go.mod 创建 | `backend/services/statisticService/go.mod` | 沿用 taskService 的依赖(pkg/database / pkg/redis / pkg/dubbo / pkg/metrics) | +| go.work 集成 | `backend/go.work` | 加 `./services/statisticService` | +| proto 定义 | `backend/proto/event.proto`、`backend/proto/statistic.proto` | **关键:冻结 proto**,7 RPC + 2 事件 RPC | +| 配置 | `backend/services/statisticService/config/statistic_config.go` | 端口 20009 + schema=statistic + Redis + 刷新间隔 + Channel 配置 + 4 个 EnableXxx=false(OLAPDualWrite / RealtimeChannel / SDKEndpoint / Sampling) | +| main.go | `backend/services/statisticService/main.go` | Dubbo triple 注册 + HTTP healthz + 启动 Worker hook | +| DB 迁移 10 个 SQL | `backend/migrations/2026_06_08_001_statistic_events.sql` 等 10 个 | events 分区表 + 4 MV + 3 预聚表 + refresh_log + 初始 7 天分区 | +| healthz 端点 | `backend/services/statisticService/handler/healthz.go` | 暴露 6 个关键指标(DB ping / Redis ping / Worker 状态 / 事件 channel 大小 / 预聚表行数) | +| metrics 埋点骨架 | `backend/services/statisticService/metrics/metrics.go` | §4.7 全部 20+ 指标先声明,本期不全部埋数据 | +| 启动验证 | 本地 + CI | `go build` + `go test ./...` + DB 迁移重放 + healthz 200 | + +**P1 末预检查清单**(P2 开工前必须验证): + +| # | 检查项 | 来源 | 验证方式 | +|---|--------|------|---------| +| 1 | socialService 有 `LikeAsset` 方法(或等价的写 like_income_log 入口) | §2.2 事件类型 | `grep -r "like_income_log" backend/services/socialService/` | +| 2 | galleryService 有 `PlaceAsset`(开始展出)/ `RemoveFromSlot`(结束展出)方法 | §2.2 | `grep -rn "^func.*PlaceAsset\|^func.*RemoveFromSlot" backend/services/galleryService/service/` | +| 3 | taskService 有 `OnExhibitionCompleted` 方法(派发展览收益) | §2.2 | `grep -n "OnExhibitionCompleted" backend/services/taskService/service/revenue_service.go` | +| 4 | assetService 有 `CreateMintOrder`(铸造)/ `CheckUpgrade` 或 `logLevelChange`(升级)方法 | §2.2 | `grep -n "CreateMintOrder\|CheckUpgrade\|logLevelChange" backend/services/assetService/service/{mint_service,asset_level_service}.go`;**实际等级变更点需 P1 末向 assetService 同学确认** | +| 5 | userService 有 `UpdateCrystalBalance` 方法(水晶账本变动) | §2.2 | `grep -n "UpdateCrystalBalance" backend/services/userService/service/user_service.go` | +| 6 | public 库的 `assets` 表有 `level` 字段、`status='active'` 软删除状态、`deleted_at IS NULL` 约定(MV4 假设) | §3.4 MV4 | `\d+ public.assets` 或 DBeaver 看 schema | +| 7 | public 库的 `like_income_log` / `crystal_log` / `level_up_log` 表存在 | §3.4、§3.5 | `\dt public.*` | +| 8 | Dubbo triple 端口 20009 未被占用 | §1.4 | `lsof -i :20009` | +| 9 | 业务侧 5 个服务的 Dubbo 注册名(`tri://...:20000/20001/20002/20003/20006`)可达 | §1.2、§5.5 | `nc -zv` 或本地 e2e 调通 | +| 10 | PostgreSQL `statistic` schema 创建权限 OK | §3.1 | 用业务账号 `CREATE SCHEMA statistic` 测试 | + +**核对不过则不进 P2。** + +#### P2 · 事件采集框架 + +**目标:** 服务能收事件、能落库、能维护预聚表;socialService 一个业务方联调通。 + +| 模块 | 涉及文件 | 备注 | +|------|---------|------| +| **model** | `model/event.go` | Event 结构 + JSONB 序列化 | +| **repository** | `repository/event_repo.go` | 批量 INSERT ON CONFLICT DO NOTHING + event_id 去重 | +| **repository** | `repository/metric_repo.go` | 3 个预聚表读写 | +| **sink** | `sink/event_sink.go`(接口)+ `sink/channel_sink.go`(本期实现) | EventSink 接口预留 OLAP/实时/采样实现点 | +| **service** | `service/event_service.go` | 校验(event_id 格式 / user_id 存在 / event_type 白名单 / properties < 1KB)+ 推入 channel | +| **worker** | `worker/event_flusher.go` | 攒批 100 条/1s + 批量落库 + 触发 metric_recent_level_ups 同步更新 | +| **worker** | `worker/metric_recent_level_ups_updater.go` | 同步更新(被 event_flusher 调用) | +| **worker** | `worker/metric_upcoming_level_ups_updater.go` | 15min ticker(不依赖 event) | +| **worker** | `worker/metric_weekly_user_income_updater.go` | 5min ticker,pg_try_advisory_lock 防多实例重复 | +| **worker** | `worker/partitioner.go` | 启动时 + 每天 00:05 创建未来 7 天 / 每天 00:30 清理 30 天前 | +| **proto handler** | `handler/track_event.go` | TrackEvent / BatchTrackEvent RPC 实现 | +| **业务侧集成** | socialService 的 `like_income_log` 写入后 | 调用 `statisticClient.TrackEvent` 异步(fire-and-forget) | +| **测试** | `service/event_service_test.go`、`worker/event_flusher_test.go`、`integration/track_event_test.go` | 单元 + 集成(dockertest 真实 PG) | + +**EventSink 接口设计(关键):** + +```go +type EventSink interface { + Submit(ctx context.Context, e *event.Event) error + SubmitBatch(ctx context.Context, es []*event.Event) error + Close() error +} +// 本期实现:ChannelEventSink(推到 channel,由 event_flusher 消费) +// 未来扩展:KafkaEventSink / ClickHouseDualWriteSink / SamplingEventSink +``` + +#### P3 · 看板 7 RPC + +**目标:** 7 个看板端到端通,前端可切流量。 + +| 模块 | 涉及文件 | 备注 | +|------|---------|------| +| **model** | `model/metric.go` | 7 个响应结构对齐 proto | +| **repository** | `repository/dashboard_repo.go` | 7 个聚合 SQL(读 MV/预聚表,公共 schema 关联 assets/exhibitions) | +| **service** | `service/dashboard_service.go` | 7 个 RPC 业务逻辑 + Redis 5min TTL + 缓存穿透防护 + 跨服务调用降级(userService.crystal_balance 失败时用 stale 标记) | +| **service** | `service/metric_service.go` | MV 刷新协调(被 materializer 调用) | +| **worker** | `worker/materializer.go` | 4 个 MV 用 REFRESH MATERIALIZED VIEW CONCURRENTLY + pg_try_advisory_lock + 错开执行时间(避免 DB 瞬时压力) + refresh_log 记录 | +| **cache** | `service/cache.go` | 封装 pkg/redis,cache key 格式 `dash:{rpc}:{star_id}:{user_id}` | +| **gateway 路由** | `backend/gateway/router/router.go` | 7 个 GET 路由 | +| **gateway controller** | `backend/gateway/controller/statistic_controller.go` | 7 个方法 + JWT 鉴权 + 粉丝身份校验(调 userService.fan_profile)+ 响应包装 `{code:200, data:resp}` | +| **业务侧消费** | gateway 调 7 个 RPC,前端 dashboardApi 已固定 | 切流量前确认前端 mock 数据准确性 | +| **测试** | `service/dashboard_service_test.go`(7 RPC 单测)、`integration/dashboard_rpc_test.go`(7 RPC dockertest)、`integration/gateway_test.go`(端到端) | 100% RPC 覆盖 | + +**看板冷启动优化(启动 hook 预热):** + +```go +// main.go 启动时 +go func() { + for _, starID := range getTopNStars(100) { // 取前 100 star + for _, rpc := range dashboardRPCs { + go callAndCache(rpc, starID, sampleUserID) + } + } +}() +``` + +#### P4 · 业务侧补全(4 个服务) + +| 顺序 | 服务 | 事件类型 | 集成位置 | +|------|------|---------|---------| +| 1 | galleryService | `exhibition.start`、`exhibition.end` | `PlaceAsset`(开始)/ `RemoveFromSlot`(结束)后 | +| 2 | taskService | `exhibition.revenue` | `OnExhibitionCompleted` 调用后 | +| 3 | assetService | `asset.mint`、`asset.level_up` | `CreateMintOrder`(铸造)/ `CheckUpgrade`(升级)后 | +| 4 | userService | `crystal.change` | `UpdateCrystalBalance` 调用后 | + +**集成模式(统一封装):** + +```go +// 各服务引入 pkg/statistic 客户端 +type StatisticClient interface { + TrackEvent(ctx context.Context, e *event.Event) error + BatchTrackEvent(ctx context.Context, es []*event.Event) error +} +// 业务调用方用 defer + recover 保证 fire-and-forget 不影响主流程 +defer func() { + if r := recover(); r != nil { + log.Errorf("statistic track panic: %v", r) + } + go statisticClient.TrackEvent(bgCtx, buildEvent(...)) +}() +``` + +**每个服务必做:** + +- 加 `statisticClient` 注入 +- 在指定业务方法后加 `defer go TrackEvent` +- 单测验证 `TrackEvent` 被调用(用 mock client) +- 联调验证事件能落库 + 看板数据能出现 + +--- + ## 一、架构总览 ### 1.1 服务定位