topfans/docs/superpowers/specs/2026-06-04-statistic-service-design.md
2026-06-09 17:08:23 +08:00

76 KiB
Raw Blame History

statisticService 设计文档

创建日期: 2026-06-04 状态: 规格自审通过 前置文档: docs/figma-analysis-data-dashboard.md(前端设计分析) 范围: 为数据看板新建后端微服务,支持未来扩展(埋点、未知需求)


〇、文档信息

内容
服务名 statisticService
端口 tri://127.0.0.1:20009(Dubbo triple)
HTTP 路径 /api/v1/dashboard/*(经 gateway,前端已固定)
数据库 PostgreSQL(复用现有实例,新建 statistic schema)
缓存 Redis(复用现有实例)
范围 看板 7 RPC + 事件采集框架 + 预留扩展(MQ 通道、SDK 端点、OLAP 通道、实时通道)
目标读者 后端开发、前端联调、运维、架构师

核心设计原则

  1. 单一服务,内部模块化 - 看板 + 事件采集 + 预聚合,职责清晰,易重构
  2. EventSink 抽象 - 预留 MQ / 双写 / 采样等扩展点
  3. 物化视图 + 预聚合表混用 - 历史数据走 MV,实时性要求高的走预聚合表
  4. 按日分区 - events 表按 received_at 按日分区,30 天滚动清理
  5. 准实时 + 预留实时 - 5 分钟刷新,可切换到同步通道
  6. JWT + 粉丝身份双校验 - 经 gateway 鉴权,业务侧校验 fan_profile

核心特点(4 大设计亮点)

# 特点 价值
1 多层缓存 + 读写分离 看板读 90% 命中 Redis 5min TTL,不进 DB;事件写 channel 非阻塞,异步批量落库
2 MV + 预聚合混用 历史数据走 MV(全量重算简单);实时性窗口数据(week_rank / recent_level_ups)走预聚合表 Worker 增量维护
3 EventSink 抽象 + 开关 后续 OLAP/实时/MQ 都是 cfg.EnableXxx 开关,无需重构;本期只实现最简版本
4 按日分区 + 自动管理 7 天预创建 + 30 天滚动清理,运维零干预;单分区 50M 行,操作快

与 figma 文档 4.2 节关键差异

figma 文档 statisticService(本期) 理由
HTTP 路径前缀 /api/v1/dashboard/* 保持 前端已固定,改前端成本巨大
响应包装 figma 没明确 {code: 200, data: <resp>} 对齐前端 .data 访问
week_rank figma 写"可选" 完整实现 用户确认
week_total_users 新增 用于"击败 X%"展示
鉴权字段 请求带 user_id 从 JWT 取 安全

·一、本期实施范围2026-06-08 追加)

状态: 本期聚焦看板 7 RPC + 事件采集框架两个范围,业务侧 5 服务集成在本期内,OLAP/SDK/实时/采样/多赛季/灰度发布策略等延后。本节不替换原设计,只标注"本期做 vs 不做",并给出阶段划分与特性清单,作为实施依据。

0.1.1 范围矩阵

# 类别 项目 本期 来源章节
1 看板 7 RPC GetTodayOverview / Get7DayIncomeCurve / GetExhibitionIncomeSummary / GetLikeIncomeByLevel / GetTopAssetsByEarning / GetAssetLevelDistribution / GetAssetUpgradeProgress §2.3、§3.4-3.5、§4.2
2 事件采集 TrackEvent / BatchTrackEvent + EventSink 抽象 + Worker 批量落库 §2.2、§3.2、§4.3
3 预聚合表 metric_weekly_user_income / metric_recent_level_ups / metric_upcoming_level_ups §3.5
4 物化视图 mv_daily_user_income / mv_daily_exhibition_revenue / mv_daily_like_income / mv_asset_level_distribution §3.4
5 分区管理 events 表按日分区 + 7 天预创建 + 30 天清理 §3.3、§4.5
6 Gateway 路由 7 个 /api/v1/dashboard/* 路由 + JWT 鉴权 + 粉丝身份校验 §2.4-2.6、§4.2
7 业务侧集成 5 个服务改造social/asset/gallery/task/user逐步加 TrackEvent §5.7
8 可观测性 Prometheus 指标§4.7 全量)+ healthz + refresh_log §4.7、§4.8
9 OLAP 双写 ClickHouse 集成 §3.10、§6.2
10 实时通道 同步 TrackEvent 实时通道 §3.10、§6.2
11 SDK 端点 HTTP /api/v1/dashboard/track 暴露 §2.4、§3.10
12 采样 客户端采样 + EnableSampling §3.8、§4.9
13 多赛季 season_id 字段 / 赛季 Tab §3.10、§6.2
14 灰度发布 10%→100% 灰度策略 + Stage 1-6 步骤 (保留机制但简化执行) §5.8
15 完整告警实现 钉钉/飞书 webhook10 条规则) (指标埋点保留,告警由监控平台处理) §4.8
16 Channel 满载 Level 2-5 加大缓冲 / Worker 并发 / 解耦预聚合 / 降级 / 采样 (仅 Level 1 监控) §4.9

0.1.2 阶段划分4 阶段 / 事件先看板后)

阶段 名称 周期(估) 关键产出 关键验证
P1 服务骨架 2-3 天 go.mod / proto / main.go / config / healthz / 启动接入 go.work / 10 个 SQL 迁移 go build 通过 + go test ./... 通过 + 契约测试proto json schema 校验)+ DB 迁移可重放 + P1 末预检查清单通过
P2 事件采集 4-5 天 events 表(已建)+ TrackEvent/BatchTrackEvent RPC + EventSink 接口 + event_flusher Worker + 3 个预聚表 + partitioner 单元测试 + 集成测试dockertest 真实 PG+ 业务侧 socialService 一个服务联调通
P3 看板 7 RPC 4-5 天 4 个 MV + 7 个 RPC service + main.go 启动预热 7 cache 逻辑 + gateway 7 路由 + Redis 5min TTL 集成测试7 RPC 全覆盖)+ gateway 端到端 + 性能测试k6/wrk 缓存命中/未命中 P99+ 前端切流量 + cache hit rate > 80%
P4 业务侧补全 2-3 天 4 个服务集成galleryService→taskService→assetService→userService+ 各服务单元测试 + 联调 各服务 go test 通过 + 看板数据逐步出现 7 个事件类型

关键依赖 / 决策:

  • P2 不依赖 P3:事件写入和看板读取在 service 层完全解耦service 写 eventsdashboard 读 MV但都需要 PG schema + proto + config
  • P3 启动时预热dashboard 7 个 RPC 各走独立 cache key + 启动 hook 预热 7 个 cache防冷启动雪崩
  • P4 顺序galleryServiceexhibition.start/end→ taskServiceexhibition.revenue→ assetServiceasset.mint/level_up→ userServicecrystal.change。先做高频路径gallery 展出是核心场景),后做低频路径

对外接口冻结点:

  • P1 末proto 文件定稿7 RPC + 2 事件 RPC事件 RPC 内部可用)
  • P2 末TrackEvent RPC 内部可用,业务侧可接入
  • P3 末:前端可切流量

0.1.3 特性清单(每阶段具体做什么)

P1 · 服务骨架

目标: 起个空服务能跑起来,迁移能跑通。

任务 涉及文件 备注
go.mod 创建 backend/services/statisticService/go.mod 沿用 taskService 的依赖pkg/database / pkg/redis / pkg/dubbo / pkg/metrics
go.work 集成 backend/go.work ./services/statisticService
proto 定义 backend/proto/event.protobackend/proto/statistic.proto 关键:冻结 proto7 RPC + 2 事件 RPC
配置 backend/services/statisticService/config/statistic_config.go 端口 20009 + schema=statistic + Redis + 刷新间隔 + Channel 配置 + 4 个 EnableXxx=falseOLAPDualWrite / RealtimeChannel / SDKEndpoint / Sampling
main.go backend/services/statisticService/main.go Dubbo triple 注册 + HTTP healthz + 启动 Worker hook
DB 迁移 10 个 SQL backend/migrations/2026_06_08_001_statistic_events.sql 等 10 个 events 分区表 + 4 MV + 3 预聚表 + refresh_log + 初始 7 天分区
healthz 端点 backend/services/statisticService/handler/healthz.go 暴露 6 个关键指标DB ping / Redis ping / Worker 状态 / 事件 channel 大小 / 预聚表行数)
metrics 埋点骨架 backend/services/statisticService/metrics/metrics.go §4.7 全部 20+ 指标先声明,本期不全部埋数据
启动验证 本地 + CI go build + go test ./... + DB 迁移重放 + healthz 200

P1 末预检查清单P2 开工前必须验证):

# 检查项 来源 验证方式
1 socialService 有 LikeAsset 方法(或等价的写 asset_likes 入口) §2.2 事件类型 grep -rn "asset_likes|LikeAsset" backend/services/socialService/
2 galleryService 有 PlaceAsset(开始展出)/ RemoveFromSlot(结束展出)方法 §2.2 grep -rn "^func.*PlaceAsset|^func.*RemoveFromSlot" backend/services/galleryService/service/
3 taskService 有 OnExhibitionCompleted 方法(派发展览收益) §2.2 grep -n "OnExhibitionCompleted" backend/services/taskService/service/revenue_service.go
4 assetService 有 CreateMintOrder(铸造)/ CheckUpgradelogLevelChange(升级)方法 §2.2 grep -n "CreateMintOrder|CheckUpgrade|logLevelChange" backend/services/assetService/service/{mint_service,asset_level_service}.go实际等级变更点需 P1 末向 assetService 同学确认
5 userService 有 UpdateCrystalBalance 方法(水晶账本变动) §2.2 grep -n "UpdateCrystalBalance" backend/services/userService/service/user_service.go
6 public 库的 assets 表有 level 字段、status='active' 软删除状态、deleted_at IS NULL 约定MV4 假设) §3.4 MV4 \d+ public.assets 或 DBeaver 看 schema
7 public 库的 asset_likes / crystal_log / level_up_log 表存在 §3.4、§3.5 \dt public.*
8 Dubbo triple 端口 20009 未被占用 §1.4 lsof -i :20009
9 业务侧 5 个服务的 Dubbo 注册名(tri://...:20000/20001/20002/20003/20006)可达 §1.2、§5.5 nc -zv 或本地 e2e 调通
10 PostgreSQL statistic schema 创建权限 OK §3.1 用业务账号 CREATE SCHEMA statistic 测试

核对不过则不进 P2。

P2 · 事件采集框架

目标: 服务能收事件、能落库、能维护预聚表socialService 一个业务方联调通。

模块 涉及文件 备注
model model/event.go Event 结构 + JSONB 序列化
repository repository/event_repo.go 批量 INSERT ON CONFLICT DO NOTHING + event_id 去重
repository repository/metric_repo.go 3 个预聚表读写
sink sink/event_sink.go(接口)+ sink/channel_sink.go(本期实现) EventSink 接口预留 OLAP/实时/采样实现点
service service/event_service.go 校验event_id 格式 / user_id 存在 / event_type 白名单 / properties < 1KB+ 推入 channel
worker worker/event_flusher.go 攒批 100 条/1s + 批量落库 + 触发 metric_recent_level_ups 同步更新
worker worker/metric_recent_level_ups_updater.go 同步更新(被 event_flusher 调用)
worker worker/metric_upcoming_level_ups_updater.go 15min ticker不依赖 event
worker worker/metric_weekly_user_income_updater.go 5min tickerpg_try_advisory_lock 防多实例重复
worker worker/partitioner.go 启动时 + 每天 00:05 创建未来 7 天 / 每天 00:30 清理 30 天前
proto handler handler/track_event.go TrackEvent / BatchTrackEvent RPC 实现
业务侧集成 socialService 的 like_income_log 写入后 调用 statisticClient.TrackEvent 异步fire-and-forget
测试 service/event_service_test.goworker/event_flusher_test.gointegration/track_event_test.go 单元 + 集成dockertest 真实 PG

EventSink 接口设计(关键):

type EventSink interface {
    Submit(ctx context.Context, e *event.Event) error
    SubmitBatch(ctx context.Context, es []*event.Event) error
    Close() error
}
// 本期实现ChannelEventSink推到 channel由 event_flusher 消费)
// 未来扩展KafkaEventSink / ClickHouseDualWriteSink / SamplingEventSink

P3 · 看板 7 RPC

目标: 7 个看板端到端通,前端可切流量。

模块 涉及文件 备注
model model/metric.go 7 个响应结构对齐 proto
repository repository/dashboard_repo.go 7 个聚合 SQL读 MV/预聚表,公共 schema 关联 assets/exhibitions
service service/dashboard_service.go 7 个 RPC 业务逻辑 + Redis 5min TTL + 缓存穿透防护 + 跨服务调用降级userService.crystal_balance 失败时用 stale 标记)
service service/metric_service.go MV 刷新协调(被 materializer 调用)
worker worker/materializer.go 4 个 MV 用 REFRESH MATERIALIZED VIEW CONCURRENTLY + pg_try_advisory_lock + 错开执行时间(避免 DB 瞬时压力) + refresh_log 记录
cache service/cache.go 封装 pkg/rediscache key 格式 dash:{rpc}:{star_id}:{user_id}
gateway 路由 backend/gateway/router/router.go 7 个 GET 路由
gateway controller backend/gateway/controller/statistic_controller.go 7 个方法 + JWT 鉴权 + 粉丝身份校验(调 userService.fan_profile+ 响应包装 {code:200, data:resp}
业务侧消费 gateway 调 7 个 RPC前端 dashboardApi 已固定 切流量前确认前端 mock 数据准确性
测试 service/dashboard_service_test.go7 RPC 单测)、integration/dashboard_rpc_test.go7 RPC dockertestintegration/gateway_test.go(端到端) 100% RPC 覆盖

看板冷启动优化(启动 hook 预热):

// main.go 启动时
go func() {
    for _, starID := range getTopNStars(100) {  // 取前 100 star
        for _, rpc := range dashboardRPCs {
            go callAndCache(rpc, starID, sampleUserID)
        }
    }
}()

P4 · 业务侧补全4 个服务)

顺序 服务 事件类型 集成位置
1 galleryService exhibition.startexhibition.end PlaceAsset(开始)/ RemoveFromSlot(结束)后
2 taskService exhibition.revenue OnExhibitionCompleted 调用后
3 assetService asset.mintasset.level_up CreateMintOrder(铸造)/ CheckUpgrade(升级)后
4 userService crystal.change UpdateCrystalBalance 调用后

集成模式(统一封装):

// 各服务引入 pkg/statistic 客户端
type StatisticClient interface {
    TrackEvent(ctx context.Context, e *event.Event) error
    BatchTrackEvent(ctx context.Context, es []*event.Event) error
}
// 业务调用方用 defer + recover 保证 fire-and-forget 不影响主流程
defer func() {
    if r := recover(); r != nil {
        log.Errorf("statistic track panic: %v", r)
    }
    go statisticClient.TrackEvent(bgCtx, buildEvent(...))
}()

每个服务必做:

  • statisticClient 注入
  • 在指定业务方法后加 defer go TrackEvent
  • 单测验证 TrackEvent 被调用(用 mock client
  • 联调验证事件能落库 + 看板数据能出现

一、架构总览

1.1 服务定位

statisticService 是 TopFans 平台的数据统计与看板服务,提供两类核心能力:

能力 描述 读者
数据看板 为 figma 数据看板页面提供 7 个聚合查询 RPC 前端 dashboard.vue
事件采集 提供业务事件接入通道(EventSink 抽象),用于埋点 内部各服务 + 未来前端 SDK

1.2 服务位置

backend/services/statisticService/      # 新建
backend/proto/statistic.proto            # 新建
backend/proto/event.proto                # 新建(通用事件类型)
backend/gateway/router/router.go         # 修改:注册 /api/v1/dashboard/* 路由
backend/gateway/controller/statistic_controller.go  # 新建
backend/go.work                          # 修改:加入 statisticService
backend/migrations/                      # 新建:statistic 表迁移(10 个 SQL)

1.3 目录结构(参照 taskService)

backend/services/statisticService/
├── main.go
├── go.mod / go.sum
├── config/
│   └── statistic_config.go              # 配置: 端口、DB、Redis、刷新间隔、扩展开关
├── provider/
│   ├── statistic_internal_provider.go   # 内部服务调用方(其他服务 TrackEvent)
│   └── statistic_mobile_provider.go     # 移动端/前端调用方(看板 7 RPC)
├── service/                              # 业务逻辑层
│   ├── dashboard_service.go            # 看板 7 RPC 业务逻辑
│   ├── event_service.go                # 事件采集业务逻辑(批量落库)
│   └── metric_service.go               # 预聚合/物化视图管理
├── repository/                           # 数据访问层
│   ├── event_repo.go                   # events 表操作
│   ├── metric_repo.go                  # 预聚合表查询
│   └── dashboard_repo.go               # 看板聚合 SQL
├── model/                                # 数据模型
│   ├── event.go                        # 通用事件模型
│   └── metric.go                       # 预聚合模型
├── worker/                               # 后台任务
│   ├── materializer.go                 # 物化视图定时刷新
│   ├── event_flusher.go                # 事件批量落库 worker
│   └── partitioner.go                  # events 按日分区自动管理
├── client/                               # 调用其他服务
│   ├── gallery_client.go
│   ├── asset_client.go
│   ├── user_client.go
│   └── task_client.go
└── docs/                                 # 服务级文档
    ├── EVENT_TYPES.md                   # 事件类型注册表
    ├── RUNBOOK.md                       # 运维手册
    └── FAQ.md                           # 常见问题

1.4 端口

  • Dubbo triple 端口: tri://127.0.0.1:20009
  • HTTP 经 gateway: http://gateway:8080/api/v1/dashboard/*
  • 端口冲突检查: 20000-20006, 20008 已被其他服务占用,20007 / 20009 空闲,选定 20009

1.5 与其他服务的关系

                        [Frontend dashboard.vue]
                                ↓ HTTP (USE_MOCK_API=false)
                          [Gateway :8080]
                                ↓ gRPC
                          [statisticService :20009]
                                ↓ gRPC 调用
                ┌───────────────┼───────────────┐
                ↓               ↓               ↓
         [galleryService] [assetService]  [userService]   [taskService]   [socialService]
            :20001            :20003         :20000         :20006          :20002
                ↓               ↓               ↓               ↓               ↓
              [PostgreSQL public schema: assets / exhibitions / users / level_up_logs / crystal_logs / like_income_logs]
                                ↓
                       [statisticService 物化视图 + 预聚合]
                                ↓
                          [PostgreSQL: statistic schema]

职责分工:

  • statisticService 主要是只读聚合 + 事件写入,不持有核心业务数据
  • 通过 gRPC 调用其他服务获取实时数据(如 crystal_balance 走 userService)
  • 通过 PostgreSQL 物化视图做预聚合,看板查询走 MV
  • 通过 Redis 缓存看板结果(5 分钟 TTL)

二、Proto + HTTP 设计

2.1 路径与服务命名约定

名称 说明
Dubbo 服务名 statistic.StatisticService gRPC 服务注册名
服务端口 tri://127.0.0.1:20009 Dubbo triple 协议
HTTP 路径前缀(看板) /api/v1/dashboard/* 经 gateway 暴露给前端,前端已固定
HTTP 路径前缀(预留 SDK) /api/v1/dashboard/track POST,经 gateway,本期不实现

关键决策: gRPC 服务名 = statistic,HTTP 路径 = /dashboard(因前端已固定,改名成本巨大)

2.2 event.proto(通用事件,独立文件)

syntax = "proto3";
package event;
option go_package = "github.com/topfans/backend/pkg/proto/event";

message Event {
  string event_id = 1;          // 事件 ID(UUID,客户端生成,用于去重)
  int64 user_id = 2;            // 用户 ID
  int64 star_id = 3;            // 顶粉星城 ID
  string event_type = 4;        // 事件类型(如 "asset.like", "exhibition.start")
  int64 occurred_at = 5;        // 事件发生时间(ms timestamp)
  int64 received_at = 6;        // 服务端接收时间(ms,服务端填充)
  map<string, string> properties = 7;  // 自定义属性(扁平 key-value)
}

message BatchEventRequest {
  repeated Event events = 1;
}

为什么独立 event.proto?

  • 跨服务复用(其他服务可只引用 event.proto,不必引用 statistic.proto)
  • 避免循环依赖
  • 未来独立升级(event/v2.proto 可并存)
  • 与项目 common.proto 风格一致(共享类型单独)

事件类型字符串规范(枚举化,但用字符串便于扩展):

事件类型 来源服务 properties 示例
asset.like socialService {"asset_id": "123", "level": "SSR", "amount": "10"}
asset.mint assetService {"asset_id": "123", "level": "UR"}
exhibition.start galleryService {"asset_id": "123", "slot_id": "1"}
exhibition.end galleryService {"asset_id": "123", "duration_ms": "86400000"}
exhibition.revenue taskService {"asset_id": "123", "amount": "100", "duration_ms": "60000"}
asset.level_up assetService {"asset_id": "123", "from": "SR", "to": "SSR"}
crystal.change userService {"amount": "+100", "reason": "exhibition"}

事件类型注册表: backend/services/statisticService/docs/EVENT_TYPES.md(不在代码里硬编码)

2.3 statistic.proto(主服务协议,字段对齐前端)

syntax = "proto3";
package statistic;
option go_package = "github.com/topfans/backend/pkg/proto/statistic";
import "event.proto";

service StatisticService {
  // ============ 看板 7 RPC(经 gateway 暴露) ============
  rpc GetTodayOverview(GetTodayOverviewRequest) returns (GetTodayOverviewResponse);
  rpc Get7DayIncomeCurve(Get7DayIncomeCurveRequest) returns (Get7DayIncomeCurveResponse);
  rpc GetExhibitionIncomeSummary(GetExhibitionIncomeSummaryRequest) returns (GetExhibitionIncomeSummaryResponse);
  rpc GetLikeIncomeByLevel(GetLikeIncomeByLevelRequest) returns (GetLikeIncomeByLevelResponse);
  rpc GetTopAssetsByEarning(GetTopAssetsByEarningRequest) returns (GetTopAssetsByEarningResponse);
  rpc GetAssetLevelDistribution(GetAssetLevelDistributionRequest) returns (GetAssetLevelDistributionResponse);
  rpc GetAssetUpgradeProgress(GetAssetUpgradeProgressRequest) returns (GetAssetUpgradeProgressResponse);

  // ============ 事件采集(内部 RPC) ============
  rpc TrackEvent(event.Event) returns (TrackEventResponse);
  rpc BatchTrackEvent(event.BatchEventRequest) returns (TrackEventResponse);
}

// ====== 1. 今日概览 ======
message GetTodayOverviewRequest { int64 star_id = 1; }
message GetTodayOverviewResponse {
  int64 crystal_balance = 1;
  int64 today_income = 2;
  int32 week_rank = 3;          // 本期完整实现
  int32 week_total_users = 4;   // 用于"击败 X%"
}

// ====== 2. 七日收益曲线 ======
message Get7DayIncomeCurveRequest { int64 star_id = 1; }
message DailyIncomePoint {
  string date = 1;
  int64 income = 2;
  bool is_today = 3;
  bool is_peak = 4;
}
message Get7DayIncomeCurveResponse {
  repeated DailyIncomePoint points = 1;
  int64 total_income = 2;
  int64 avg_income = 3;
}

// ====== 3. 展出收益中心 ======
message GetExhibitionIncomeSummaryRequest { int64 star_id = 1; }
message TopExhibitionItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  string duration_7d = 4;
  int64 earnings_7d = 5;
  int32 avg_earnings = 6;
}
message GetExhibitionIncomeSummaryResponse {
  int32 exhibiting_count = 1;
  int32 starbook_count = 2;
  string total_duration = 3;  // 格式: "D:HH:MM:SS"(< 24h 时省略 D) / Mock 样例: "712:13:56"
  int64 total_earnings = 4;
  repeated TopExhibitionItem top5 = 5;
}

// ====== 4. 点赞收益按等级 ======
message GetLikeIncomeByLevelRequest { int64 star_id = 1; }
message LikeIncomeLevelItem {
  string level = 1;
  int32 asset_count = 2;
  int64 total_income = 3;
  string thumb = 4;
}
message GetLikeIncomeByLevelResponse {
  int64 total_like_count = 1;
  int64 total_income = 2;
  repeated LikeIncomeLevelItem levels = 3;
}

// ====== 5. 藏品 TOP5 ======
message GetTopAssetsByEarningRequest { int64 star_id = 1; }
message TopAssetItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  int64 total_earnings = 4;
  int32 rank = 5;
}
message GetTopAssetsByEarningResponse {
  repeated TopAssetItem items = 1;
}

// ====== 6. 藏品等级分布 ======
message GetAssetLevelDistributionRequest { int64 star_id = 1; }
message AssetLevelItem {
  string level = 1;
  int32 count = 2;
  int32 total = 3;
}
message GetAssetLevelDistributionResponse {
  repeated AssetLevelItem items = 1;
}

// ====== 7. 升级进度 ======
message GetAssetUpgradeProgressRequest { int64 star_id = 1; }
message UpcomingLevelUpItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  int32 like_progress = 4;
  int32 duration_progress = 5;
}
message RecentLevelUpItem {
  int64 asset_id = 1;
  string asset_name = 2;
  string asset_thumb = 3;
  string new_level = 4;
  int64 upgrade_time = 5;
}
message GetAssetUpgradeProgressResponse {
  repeated UpcomingLevelUpItem upcoming = 1;
  repeated RecentLevelUpItem recent = 2;
}

// ====== 事件采集响应 ======
message TrackEventResponse {
  int32 accepted = 1;
  int32 rejected = 2;
}

2.4 HTTP 路径与 gRPC 方法映射

HTTP 路径 HTTP 方法 Gateway 调用的 gRPC 前端 dashboardApi 方法
/api/v1/dashboard/today-overview GET StatisticService.GetTodayOverview dashboardApi.getTodayOverview(starId)
/api/v1/dashboard/income-curve GET StatisticService.Get7DayIncomeCurve dashboardApi.get7DayIncomeCurve
/api/v1/dashboard/exhibition-summary GET StatisticService.GetExhibitionIncomeSummary dashboardApi.getExhibitionSummary
/api/v1/dashboard/like-income-by-level GET StatisticService.GetLikeIncomeByLevel dashboardApi.getLikeIncomeByLevel
/api/v1/dashboard/top-assets GET StatisticService.GetTopAssetsByEarning dashboardApi.getTopAssets
/api/v1/dashboard/level-distribution GET StatisticService.GetAssetLevelDistribution dashboardApi.getLevelDistribution
/api/v1/dashboard/upgrade-progress GET StatisticService.GetAssetUpgradeProgress dashboardApi.getUpgradeProgress
/api/v1/dashboard/track (预留) POST StatisticService.TrackEvent (未来 SDK)

2.5 Gateway 响应包装

Gateway 在调用 gRPC 后,统一包 {code: 200, data: <gRPC 响应>}:

// 伪代码 - gateway controller
func (c *DashboardController) GetTodayOverview(ctx *gin.Context) {
    starID := parseInt64(ctx.Query("star_id"))
    userID := getUserIDFromJWT(ctx)  // 鉴权后注入
    if userID == 0 { 
        respondUnauthorized(ctx); return 
    }
    
    resp, err := c.statisticClient.GetTodayOverview(ctx, &pb.GetTodayOverviewRequest{
        StarId: starID,
    })
    if err != nil {
        respondError(ctx, 500, err); return
    }
    respondOK(ctx, gin.H{
        "code": 200,
        "data": resp,  // 直接把 gRPC 响应作为 data
    })
}

2.6 鉴权

  • HTTP /api/v1/dashboard/*(看板 7 RPC): 经 gateway 的 JWT 中间件鉴权,从 token 取 user_id(注入到 ctx),star_id 从 query 取
  • HTTP /api/v1/dashboard/track(预留 SDK): 经 gateway JWT 鉴权,从 token 取 user_id
  • gRPC TrackEvent(服务间内部): Dubbo 内网白名单 + 调用方服务名校验

2.7 关键设计决策汇总

决策点 选择 理由
请求字段 只传 star_id,不传 user_id user_id 从 JWT 取,前端不可信
批量上报 提供 BatchTrackEvent SDK/服务可批量,减少 RPC 次数
事件 ID 客户端生成(UUID) 服务端去重,防止重试导致重复
时间字段 双时间戳(occurred_at + received_at) 区分业务时间和入仓时间
属性存储 map<string, string> 扁平 key-value,适合 JSONB 存储
错误处理 走 rejected 计数 + log 埋点不应阻塞业务
响应包装 {code: 200, data: <resp>} 对齐前端 .data 访问

三、数据表设计

3.1 数据库与 Schema

  • 复用现有 PostgreSQL 实例,新建 schema statistic
  • 数据库连接复用 pkg/database,只新增一个 schema 配置项
[PostgreSQL]
├── public (现有 9 个服务的业务表,只读访问)
└── statistic (本服务专用,可读写)
    ├── events               # 事件原始表(按日分区)
    ├── materialized_views    # 物化视图
    ├── metric_*              # 预聚合表
    └── refresh_log           # 物化视图刷新日志

3.2 核心表:statistic.events(按日分区)

CREATE TABLE statistic.events (
    id              BIGSERIAL,
    event_id        UUID         NOT NULL,
    user_id         BIGINT       NOT NULL,
    star_id         BIGINT       NOT NULL,
    event_type      VARCHAR(64)  NOT NULL,
    occurred_at     TIMESTAMPTZ  NOT NULL,
    received_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    properties      JSONB        NOT NULL DEFAULT '{}',
    
    PRIMARY KEY (id, received_at)
) PARTITION BY RANGE (received_at);

-- 按日分区示例(2026-06-04 这一天)
CREATE TABLE statistic.events_2026_06_04 PARTITION OF events
  FOR VALUES FROM ('2026-06-04 00:00:00+08') TO ('2026-06-05 00:00:00+08');
-- 注意: 分区键是 TIMESTAMPTZ,要带时区

-- 唯一约束(去重):同一 event_id 不能重复
CREATE UNIQUE INDEX idx_events_event_id ON statistic.events (event_id, received_at);

-- 看板查询主索引(覆盖 90% 查询)
CREATE INDEX idx_events_user_star_type_time
  ON statistic.events (user_id, star_id, event_type, received_at DESC);

-- 趋势分析索引
CREATE INDEX idx_events_star_type_time
  ON statistic.events (star_id, event_type, received_at DESC);

-- JSONB 属性 GIN 索引
CREATE INDEX idx_events_properties_gin ON statistic.events USING GIN (properties);

按日分区优劣:

优势 代价
看板 7 日查询只走 7 个 partition,极快 partition 数多(30 天保留 ≈ 30 个)
旧数据可按天快速 DROP(DETACH+DROP) 需要自动管理 partition 创建/删除
单 partition 容量小(50M 行),操作快 时间函数计算稍多
适合按日归档/分析 -

3.3 分区自动管理

// worker/partitioner.go
func EnsureFuturePartitions(ctx context.Context, db *sql.DB, days int) error {
    now := time.Now().In(AsiaShanghai)
    for i := 0; i <= days; i++ {
        day := now.AddDate(0, 0, i)
        next := day.AddDate(0, 0, 1)
        partitionName := fmt.Sprintf("events_%s", day.Format("2006_01_02"))
        sql := fmt.Sprintf(`
            CREATE TABLE IF NOT EXISTS statistic.%s 
            PARTITION OF statistic.events
            FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08');
        `, partitionName, day.Format("2006-01-02"), next.Format("2006-01-02"))
        if _, err := db.ExecContext(ctx, sql); err != nil {
            return err
        }
    }
    return nil
}

调度:

  • Worker 启动时调用 1 次(确保未来 7 天 partition 存在)
  • 每天 00:05 调用 1 次(滚动创建)

分区保留策略:

  • 默认保留 30 天(配置项 PartitionRetentionDays)
  • 超过保留期的旧分区用 DETACH PARTITION ... DROP 删除
  • 避免磁盘无限增长

3.4 物化视图(全量重算,准实时)

MV1: statistic.mv_daily_user_income(每日用户水晶收益)

CREATE MATERIALIZED VIEW statistic.mv_daily_user_income AS
SELECT
  user_id,
  star_id,
  DATE(received_at AT TIME ZONE 'Asia/Shanghai') AS income_date,
  -- 只统计"产生水晶收入"的事件: 展出收益(任务派发) + 水晶账本变动
  -- asset.level_up 不直接携带 amount,等级变更由 crystal.change 同步上报
  SUM(
    CASE
      WHEN event_type IN ('exhibition.revenue', 'crystal.change')
        AND COALESCE((properties->>'amount')::BIGINT, 0) > 0
      THEN COALESCE((properties->>'amount')::BIGINT, 0)
      ELSE 0
    END
  ) AS total_crystal
FROM statistic.events
WHERE event_type IN ('exhibition.revenue', 'crystal.change')
GROUP BY user_id, star_id, income_date;

CREATE UNIQUE INDEX idx_mv_daily_user_income_pk
  ON statistic.mv_daily_user_income (user_id, star_id, income_date);

服务于: 七日收益曲线、今日收益

MV2: statistic.mv_daily_exhibition_revenue(每日展出收益,按藏品)

CREATE MATERIALIZED VIEW statistic.mv_daily_exhibition_revenue AS
SELECT
  user_id,
  star_id,
  (properties->>'asset_id')::BIGINT AS asset_id,
  DATE(received_at AT TIME ZONE 'Asia/Shanghai') AS revenue_date,
  SUM(COALESCE((properties->>'duration_ms')::BIGINT, 0)) AS total_duration_ms,
  SUM(COALESCE((properties->>'amount')::BIGINT, 0)) AS total_earnings
FROM statistic.events
WHERE event_type = 'exhibition.revenue'
GROUP BY user_id, star_id, asset_id, revenue_date;

CREATE UNIQUE INDEX idx_mv_exhibition_revenue_pk
  ON statistic.mv_daily_exhibition_revenue (user_id, star_id, asset_id, revenue_date);

服务于: 展出收益中心(top5)、藏品矩阵 TOP5

MV3: statistic.mv_daily_like_income(每日点赞按等级)

CREATE MATERIALIZED VIEW statistic.mv_daily_like_income AS
SELECT
  e.user_id,
  e.star_id,
  a.level AS asset_level,
  DATE(e.received_at AT TIME ZONE 'Asia/Shanghai') AS like_date,
  COUNT(*) AS like_count,
  SUM(COALESCE((e.properties->>'amount')::BIGINT, 0)) AS total_crystal
FROM statistic.events e
JOIN public.assets a 
  ON a.id = (e.properties->>'asset_id')::BIGINT
WHERE e.event_type = 'asset.like'
GROUP BY e.user_id, e.star_id, a.level, like_date;

CREATE UNIQUE INDEX idx_mv_like_income_pk
  ON statistic.mv_daily_like_income (user_id, star_id, asset_level, like_date);

服务于: 点赞收益按等级(累计)

MV4: statistic.mv_asset_level_distribution(藏品等级分布)

假设: public.assets 表存在 status='active' 软删除状态字段和 deleted_at IS NULL 软删除约定(项目既有规范);若不成立,需调整 WHERE 条件。

CREATE MATERIALIZED VIEW statistic.mv_asset_level_distribution AS
SELECT
  user_id,
  star_id,
  level AS asset_level,
  COUNT(*) AS asset_count
FROM public.assets
WHERE status = 'active' AND deleted_at IS NULL
GROUP BY user_id, star_id, level;

CREATE UNIQUE INDEX idx_mv_level_dist_pk
  ON statistic.mv_asset_level_distribution (user_id, star_id, asset_level);

服务于: 藏品等级分布环形图

3.5 预聚合表(Worker 维护,实时性更高)

为什么部分用预聚合表?

  • 物化视图全量重算,适合"全量数据 + 定期刷新"场景
  • 预聚合表 Worker 增量维护,适合"实时性要求高"或"含时间窗口"场景
  • week_rank / recent_level_ups 这类需要按时间窗口,物化视图做不到

预聚合表:statistic.metric_weekly_user_income(本周收入 + 排名)

CREATE TABLE statistic.metric_weekly_user_income (
    star_id          BIGINT       NOT NULL,
    user_id          BIGINT       NOT NULL,
    week_start       DATE         NOT NULL,  -- 周一日期(Asia/Shanghai)
    total_crystal    BIGINT       NOT NULL DEFAULT 0,
    rank_in_star     INT          NOT NULL DEFAULT 0,
    updated_at       TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    
    PRIMARY KEY (star_id, user_id, week_start)
);
CREATE INDEX idx_weekly_rank 
  ON statistic.metric_weekly_user_income (star_id, week_start, rank_in_star);

服务于: GetTodayOverview 的 week_rank + week_total_users

week_rank 实现细节:

-- 本周收入用户排行(实际查询预聚合表,毫秒级)
SELECT
  rank_in_star    AS rank,
  total_crystal   AS income
FROM statistic.metric_weekly_user_income
WHERE star_id = $1
  AND user_id = $3
  AND week_start = $2;  -- 本周开始时间(周一 00:00,Asia/Shanghai)

-- week_total_users: 本周 star 下有收益的用户总数
SELECT COUNT(*)
FROM statistic.metric_weekly_user_income
WHERE star_id = $1
  AND week_start = $2
  AND total_crystal > 0;

-- 注: 上方 SQL 是概念示意;实际由 Worker 每 5 分钟从 public.crystal_log
--     全量重算写入 metric_weekly_user_income,并计算 rank_in_star。
--     看板查询不直接读 crystal_log。
  • 预聚合表: statistic.metric_weekly_user_income(star_id, user_id, week_start, total_crystal, rank_in_star)
  • Worker 每 5 分钟刷新一次
  • 看板查询走预聚合表,毫秒级返回
  • Redis 缓存 5 分钟
  • 边界: 用户本周无收益 → week_rank = -1,前端显示"暂无排名"
  • 跨周: worker 在周一 00:05 触发,清除上周数据

预聚合表:statistic.metric_recent_level_ups(最近升级记录,只保留近 30 天)

CREATE TABLE statistic.metric_recent_level_ups (
    id               BIGSERIAL PRIMARY KEY,
    user_id          BIGINT       NOT NULL,
    star_id          BIGINT       NOT NULL,
    asset_id         BIGINT       NOT NULL,
    from_level       VARCHAR(8)   NOT NULL,
    to_level         VARCHAR(8)   NOT NULL,
    upgrade_time     TIMESTAMPTZ  NOT NULL,
    asset_name       VARCHAR(128),
    asset_thumb      VARCHAR(512)
);
CREATE INDEX idx_recent_level_ups_user
  ON statistic.metric_recent_level_ups (user_id, star_id, upgrade_time DESC);

-- 自动清理: 30 天前的记录定时 DELETE

服务于: GetAssetUpgradeProgress 的 recent[]

预聚合表:statistic.metric_upcoming_level_ups(即将升级进度)

CREATE TABLE statistic.metric_upcoming_level_ups (
    user_id            BIGINT       NOT NULL,
    star_id            BIGINT       NOT NULL,
    asset_id           BIGINT       NOT NULL,
    like_progress      INT          NOT NULL,  -- 0-100
    duration_progress  INT          NOT NULL,  -- 0-100
    updated_at         TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    
    PRIMARY KEY (user_id, star_id, asset_id)
);

服务于: GetAssetUpgradeProgress 的 upcoming[]

3.6 物化视图刷新日志

CREATE TABLE statistic.refresh_log (
    id              BIGSERIAL PRIMARY KEY,
    mv_name         VARCHAR(128)  NOT NULL,
    started_at      TIMESTAMPTZ   NOT NULL,
    finished_at     TIMESTAMPTZ,
    row_count       BIGINT,
    status          VARCHAR(16)   NOT NULL,  -- 'running' / 'success' / 'failed'
    error_message   TEXT
);
CREATE INDEX idx_refresh_log_mv_time
  ON statistic.refresh_log (mv_name, started_at DESC);

用于: 监控物化视图刷新状态 + 失败告警

3.7 刷新策略(Worker,可自定义)

物化视图 / 预聚合表 默认刷新频率 配置项 优先级
mv_daily_user_income 5 分钟 RefreshIntervals.DailyUserIncome
mv_daily_exhibition_revenue 5 分钟 RefreshIntervals.DailyExhibitionRevenue
mv_daily_like_income 5 分钟 RefreshIntervals.DailyLikeIncome
mv_asset_level_distribution 15 分钟 RefreshIntervals.AssetLevelDistribution
metric_weekly_user_income 5 分钟 RefreshIntervals.WeeklyUserIncome
metric_recent_level_ups 实时 TrackEvent 同步触发
metric_upcoming_level_ups 15 分钟 RefreshIntervals.UpcomingLevelUps
metric_recent_level_ups 清理 每天 00:35 固定 后台
创建未来 N 天 partition 每天 00:05 固定 后台
清理过期 partition 每天 00:30 固定 后台

实现技术:

  • Go Worker 协程 + time.Ticker
  • 刷新用 REFRESH MATERIALIZED VIEW CONCURRENTLY(不阻塞读)
  • 并发控制: PostgreSQL Advisory Lock 防止多实例重复刷

3.8 配置项(预留扩展开关)

// config/statistic_config.go
type StatisticConfig struct {
    Port           int
    DBUrl          string
    DBSchema       string  // 默认 "statistic"
    RedisUrl       string
    WorkerEnabled  bool
    RefreshIntervals struct {  // 可自定义刷新频率
        DailyUserIncome          time.Duration
        DailyExhibitionRevenue   time.Duration
        DailyLikeIncome          time.Duration
        AssetLevelDistribution   time.Duration
        WeeklyUserIncome         time.Duration
        UpcomingLevelUps         time.Duration
    }
    
    // 分区管理
    PartitionRetentionDays int
    PartitionPreCreateDays int
    
    // 事件 channel
    EventChannelCapacity  int
    EventWorkerCount      int
    EventBatchSize        int
    EventBatchInterval    time.Duration
    
    // 告警阈值
    AlertChannelSizeRatio       float64  // 默认 0.8
    AlertDropRateThreshold      float64  // 默认 0.01
    AlertMVRefreshFailureCount  int      // 默认 3
    
    // 预留扩展(本期默认 false)
    EnableOLAPDualWrite    bool   // 双写 ClickHouse
    EnableRealtimeChannel  bool   // 同步 TrackEvent 实时通道
    EnableSDKEndpoint      bool   // 暴露 HTTP /track 端点
    EnableSampling         bool   // 客户端采样
    SampleRate             float64
}

3.9 性能预估

数据量假设:

  • 假设百万 DAU,平均每用户每日产生 50 个事件
  • 每日事件量 ≈ 5000 万条
  • 7 日事件量 ≈ 3.5 亿条
  • 30 日事件量 ≈ 15 亿条

索引评估:

  • idx_events_user_star_type_time 单 user 查询: 命中 7 日分区,毫秒级
  • idx_events_star_type_time 跨用户统计: 扫描本月分区(50M 行),秒级
  • mv_* 物化视图查询: 毫秒级(预聚合)

瓶颈与应对:

  • 看板冷启动 7 个并发请求,各走不同 MV,互不阻塞
  • 物化视图刷新时,REFRESH CONCURRENTLY 允许读不阻塞
  • week_rank 实时性: 5 分钟刷新,用户无感

3.10 未来扩展预留(表层)

预留 实现方式 启用时机
OLAP 双写 events 表加 cfg.EnableOLAPDualWrite,Worker 加 ClickHouse 写入分支 数据量 > 1亿/天
实时通道 cfg.EnableRealtimeChannel,同步 TrackEvent 路径 实时性要求提高
SDK 端点 cfg.EnableSDKEndpoint,暴露 HTTP /track 前端 SDK 启动
多赛季 预聚合表加 season_id 字段,MV 加分区 赛季结束归档时

四、关键流程

4.1 流程总览

┌─────────────────────────────────────────────────────────────┐
│                  statisticService 核心流程                     │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  流程 A: 看板查询(读)                                          │
│  Gateway → gRPC StatisticService.GetXxx → 查物化视图/MV     │
│                                                              │
│  流程 B: 事件采集(写)                                          │
│  其他服务 → gRPC StatisticService.TrackEvent → 写events表   │
│                                                              │
│  流程 C: 物化视图刷新(后台)                                     │
│  Worker 协程 → 定时器触发 → REFRESH MATERIALIZED VIEW        │
│                                                              │
│  流程 D: 分区管理(后台)                                         │
│  Worker 协程 → 每天 00:05 创建未来 7 天 / 00:30 清理过期分区   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

4.2 流程 A:看板查询(读路径)

时序图(以 GetTodayOverview 为例):

Frontend                Gateway             statisticService           PostgreSQL / Redis
   │                       │                       │                            │
   │ GET /api/v1/dashboard/│                       │                            │
   │   today-overview      │                       │                            │
   │ ?star_id=123          │                       │                            │
   ├──────────────────────>│                       │                            │
   │                       │ 1. JWT 鉴权            │                            │
   │                       │    从 token 提 user_id │                            │
   │                       │ 2. 解析 query          │                            │
   │                       │    star_id = 123       │                            │
   │                       │ 3. 校验:               │                            │
   │                       │    user.fan_profile    │                            │
   │                       │    .star_id == 123     │                            │
   │                       │                       │                            │
   │                       │ gRPC GetTodayOverview  │                            │
   │                       │ (tri://20009)         │                            │
   │                       ├──────────────────────>│                            │
   │                       │                       │ 1. 查 Redis 缓存            │
   │                       │                       │    Key: dash:overview      │
   │                       │                       │    :123:user_456          │
   │                       │                       ├──────────────────────────>│
   │                       │                       │<────── HIT ────────────────│
   │                       │                       │                            │
   │                       │                       │ (缓存 miss 则:            │
   │                       │                       │  2. 查 metric_weekly      │
   │                       │                       │     user_income           │
   │                       │                       │  3. 查 userService        │
   │                       │                       │     .crystal_balance)     │
   │                       │                       │                            │
   │                       │ GetTodayOverviewResp  │                            │
   │                       │ {                     │                            │
   │                       │   crystal_balance:2713│                            │
   │                       │   today_income:213    │                            │
   │                       │   week_rank:12        │                            │
   │                       │   week_total_users:341│                            │
   │                       │ }                     │                            │
   │                       │<──────────────────────┤                            │
   │                       │                       │                            │
   │                       │ HTTP 包装:            │                            │
   │                       │ { code: 200,          │                            │
   │                       │   data: { ... } }     │                            │
   │                       │                       │                            │
   │   HTTP 200 OK         │                       │                            │
   │<──────────────────────┤                       │                            │
   │                       │                       │                            │

关键设计点:

  1. JWT 鉴权 - Gateway 在路由层完成,service 收到的请求已带 user_id(注入到 ctx)
  2. 粉丝身份校验 - 必须验证 user 真的加入了该 star(查 userService.fan_profile),防止越权
  3. Redis 缓存 - 5 分钟 TTL,缓存 Key 包含 star_id + user_id
  4. 缓存穿透防护 - 缓存 miss 时查 DB,DB 也无数据则缓存空值 1 分钟
  5. 多 RPC 并发 - 前端 Promise.allSettled 一次发 7 个,各走独立 cache key,互不阻塞
  6. 降级兜底 - 跨服务调用(如 userService.GetCrystalBalance)失败时,该字段用上次缓存值并打 stale 标记,其他字段继续用 MV 返回,不阻断整个看板;连续 3 次失败再返回 5xx

4.3 流程 B:事件采集(写路径)

时序图(以 socialService 上报点赞事件为例):

socialService               statisticService                 PostgreSQL
   │                             │                              │
   │ 用户点赞:                    │                              │
   │ 1. 写 like_income_log       │                              │
   │ 2. 更新 asset.like_count    │                              │
   │                             │                              │
   │ (异步,失败不影响主流程)      │                              │
   │ gRPC TrackEvent             │                              │
   │ (tri://20009)               │                              │
   │ { event_id: uuid,           │                              │
   │   user_id: 10000001,        │                              │
   │   star_id: 123,             │                              │
   │   event_type: "asset.like", │                              │
   │   occurred_at: ts,          │                              │
   │   properties: {             │                              │
   │     "asset_id": "456",      │                              │
   │     "amount": "10"          │                              │
   │   }                         │                              │
   │ }                           │                              │
   ├────────────────────────────>│                              │
   │                             │ 1. 校验:                     │
   │                             │    - event_id 格式           │
   │                             │    - user_id 存在            │
   │                             │    - event_type 在白名单    │
   │                             │    - properties 大小 < 1KB   │
   │                             │                              │
   │                             │ 2. 推入 channel(非阻塞)      │
   │                             │    缓冲 1000 条,满则降级     │
   │                             │                              │
   │ TrackEventResponse          │                              │
   │ { accepted: 1, rejected: 0 }│                              │
   │<────────────────────────────┤                              │
   │                             │                              │
   │                             │ ── Worker 协程(独立) ──     │
   │                             │ 3. 批量读 channel            │
   │                             │    攒批 100 条 / 1秒        │
   │                             │ 4. 去重:                     │
   │                             │    SELECT event_id FROM      │
   │                             │    events WHERE event_id IN  │
   │                             │    (?, ?, ...)              │
   │                             ├─────────────────────────────>│
   │                             │<───── 已有 0 条 ─────────────│
   │                             │                              │
   │                             │ 5. 批量 INSERT              │
   │                             │    INSERT INTO events ...   │
   │                             │    ON CONFLICT DO NOTHING    │
   │                             ├─────────────────────────────>│
   │                             │<───── 100 行已写入 ──────────│
   │                             │                              │
   │                             │ 6. 触发预聚合同步更新        │
   │                             │    metric_recent_level_ups   │
   │                             │    (同步,不阻塞事件写入)     │
   │                             │    注: metric_upcoming      │
   │                             │    _level_ups 走 15 分钟     │
   │                             │    Worker,不在此路径        │

关键设计点:

  1. 不阻塞业务 - socialService 调用 TrackEvent 是 fire-and-forget,gRPC 调用快速返回(< 50ms)
  2. 批量落库 - Worker 协程攒批 100 条/1秒,降低 DB 压力
  3. 去重 - 用 event_id 唯一约束 + ON CONFLICT DO NOTHING,客户端重试安全
  4. 降级策略 - channel 满时拒绝新事件(返回 rejected: 1),但不影响主流程
  5. 预聚合同步 - TrackEvent 同步触发 metric_recent_level_ups 更新(MV 刷新走 5 分钟定时器)

批量上报优化(预留):

// 业务侧可一次性上报 100 条事件,减少 RPC 次数
resp, err := client.BatchTrackEvent(ctx, &event.BatchEventRequest{
    Events: events,  // 100 个 Event
})
// 返回: accepted=100, rejected=0

4.4 流程 C:物化视图刷新(后台)

时序图(以 5 分钟定时刷新为例):

statisticService Worker                  PostgreSQL
        │                                    │
        │ (5 分钟 ticker 触发)               │
        │                                    │
        │ 1. 抢 Advisory Lock                │
        │    pg_try_advisory_lock(1234)      │
        ├───────────────────────────────────>│
        │<─────── true(抢到) ────────────────│
        │                                    │
        │ 2. REFRESH MATERIALIZED VIEW       │
        │    CONCURRENTLY                    │
        │    mv_daily_user_income;           │
        ├───────────────────────────────────>│
        │    (5-10 秒)                        │
        │<─────── 完成,新数据可见 ───────────│
        │                                    │
        │ 3. 记录 refresh_log                │
        │    INSERT INTO refresh_log ...     │
        ├───────────────────────────────────>│
        │                                    │
        │ 4. 释放 Advisory Lock              │
        │    pg_advisory_unlock(1234)        │
        ├───────────────────────────────────>│
        │                                    │
        │ 5. 等待下一个 tick                  │

关键设计点:

  1. 多实例防重复 - 用 pg_try_advisory_lock 保证多实例下只有一个跑刷新
  2. 不阻塞读 - REFRESH CONCURRENTLY 允许查询继续用旧数据
  3. 失败重试 - 3 次重试,失败写 refresh_log 状态 'failed'
  4. 告警 - refresh_log 连续 N 次失败触发告警(可对接钉钉/飞书)
  5. 优先级 - 不同 MV 错开执行,避免 DB 瞬时压力大

4.5 流程 D:分区管理(后台)

  • Worker 启动时: 调用 EnsureFuturePartitions(7),创建未来 7 天分区
  • 每天 00:05: 调用 EnsureFuturePartitions(7),滚动创建
  • 每天 00:30: 调用 CleanupOldPartitions(30),清理 30 天前分区

4.6 异常场景处理

场景 处理
DB 不可用 看板查询返回 500 + 日志告警;事件采集降级为"丢弃但记录指标"
Redis 不可用 缓存全部走 DB(降级),不阻塞
物化视图刷新失败 重试 3 次 → 写 refresh_log failed → 告警 → 看板返回 5 分钟前的数据
Advisory Lock 抢不到 本实例跳过本轮,下个 tick 再试
事件 channel 满 返回 rejected: N,记录 metric_event_dropped 指标
JWT 鉴权失败 Gateway 返回 401,前端跳登录
粉丝身份不匹配 Gateway 返回 403 "未加入该顶粉星城"
限流 看板 RPC 100 QPS / user_id,超限返回 429;在 gateway 中间件用 Redis 滑动窗口实现(项目现有 pkg/ratelimit 组件)

4.7 监控指标(Prometheus)

// 看板 RPC
metrics.NewCounter("dashboard_rpc_total", []string{"rpc", "status"})
metrics.NewHistogram("dashboard_rpc_duration_seconds", []string{"rpc"})
metrics.NewGauge("dashboard_cache_hit_rate")

// 事件采集
metrics.NewCounter("event_track_total", []string{"event_type", "result"})  // result: accepted/rejected
metrics.NewGauge("event_channel_size")
metrics.NewGauge("event_channel_capacity")
metrics.NewCounter("event_dropped_total")
metrics.NewHistogram("event_batch_size")
metrics.NewCounter("event_db_insert_total", []string{"status"})

// 物化视图
metrics.NewCounter("mv_refresh_total", []string{"mv_name", "status"})
metrics.NewHistogram("mv_refresh_duration_seconds", []string{"mv_name"})

// Worker
metrics.NewGauge("worker_running_count", []string{"worker_name"})
metrics.NewHistogram("worker_loop_duration", []string{"worker_name"})

// 分区管理
metrics.NewGauge("events_partition_count")
metrics.NewCounter("events_partition_created_total")
metrics.NewCounter("events_partition_dropped_total")

4.8 告警规则(本期实现,Level 1)

告警项 触发条件 严重度 通知方式
MV 刷新失败 refresh_log.status='failed' 连续 3 次 飞书/钉钉 + 邮件
事件 channel 满 event_channel_size > 80% 容量 持续 1 分钟 飞书/钉钉
事件丢失率过高 event_dropped_total / event_track_total > 1% 持续 5 分钟 飞书/钉钉 + 邮件
DB 写入慢 event_db_insert_duration P99 > 500ms 持续 5 分钟 飞书/钉钉
看板 RPC 失败率高 dashboard_rpc_total{status=error} / dashboard_rpc_total > 5% 持续 5 分钟 飞书/钉钉
缓存命中率低 dashboard_cache_hit_rate < 50% 持续 15 分钟 飞书/钉钉
Advisory Lock 抢不到 累计 10 次未抢到 飞书/钉钉
Worker 不健康 worker_running_count=0 持续 1 分钟 飞书/钉钉 + 邮件
分区缺失 当前日期 partition 不存在 飞书/钉钉 + 邮件
跨时区异常 (本期不监控) - -

告警实现:

  • 本期不引入 AlertManager,只暴露 Prometheus 指标
  • 告警规则由 Grafana / 运维侧的告警系统订阅执行(项目已有 Prometheus)
  • statisticService 内部加一个"健康度自检"端点 GET /healthz,暴露关键指标

4.9 Channel 满载分析(本期)

为什么 channel 会满?(7 类原因)

  1. 生产速率 > 消费速率(最常见)
    • 消费端慢:DB 慢查询、批量插入性能瓶颈、Worker 被阻塞
    • 生产端快:业务高峰期、某服务异常、并发上报
  2. DB 突发压力:业务高峰、DB 维护、慢 SQL 阻塞
  3. 预聚合同步更新慢:metric_recent_level_ups 同步写慢
  4. 资源限制:channel 容量偏小、Worker 数量少、连接池耗尽
  5. 某个调用方异常:死循环、retry storm、单事件过大
  6. GC 压力:大量 Event 对象、频繁 GC 暂停 worker
  7. 时区/分区问题:occurred_at 路由错误、写入不存在分区

本期承诺(Level 1):

  • 默认 EventChannelCapacity: 1000 + EventWorkerCount: 1 + EventBatchSize: 100 + EventBatchInterval: 1s
  • 加监控:event_channel_sizeevent_dropped_totalworker_loop_duration
  • 不预设 Level 2-5 的实现(加大缓冲 / Worker 并发 / 解耦预聚合 / 降级 / 采样),留好配置项和代码 hook
  • 后续根据监控数据决定是否升级

4.10 性能与 SLA

4.10.1 核心性能指标

指标 目标 实测方式
看板单 RPC P99 延迟 < 200ms Prometheus + Grafana
看板 7 RPC 并发 P99 < 500ms e2e 测试
TrackEvent P99 延迟 < 50ms 调用方埋点
物化视图刷新耗时 < 30s/视图 refresh_log 统计
缓存命中率 > 80% Redis 监控

SLA:

  • 可用性: 99.5%(中等,看板不是核心交易)
  • 数据延迟: 准实时(5 分钟内,可自定义)
  • 事件丢失率: < 0.1%(进程崩溃可能丢失 channel 内未落库事件)

4.10.2 读性能分析(7 RPC 详细)

RPC 数据源 缓存命中 缓存未命中
GetTodayOverview metric_weekly + userService RPC <10ms ~50ms(1 SQL + 1 gRPC)
Get7DayIncomeCurve mv_daily_user_income(扫 7 日分区) <10ms ~30ms(1 SQL)
GetExhibitionIncomeSummary mv_daily_exhibition_revenue + exhibitions <10ms ~50ms(2 SQL)
GetLikeIncomeByLevel mv_daily_like_income(JOIN assets) <10ms ~40ms(1 SQL + JOIN)
GetTopAssetsByEarning mv_daily_exhibition_revenue ORDER BY <10ms ~30ms
GetAssetLevelDistribution mv_asset_level_distribution <10ms ~20ms
GetAssetUpgradeProgress metric_recent + metric_upcoming <10ms ~40ms(2 SQL)

7 RPC 并发 P99 预估: 200~400ms(各自独立 cache key,无相互阻塞)。

4.10.3 写性能分析(TrackEvent + Worker + 刷新)

操作 路径 延迟 备注
TrackEvent 单条 gRPC → channel(非阻塞)→ 返回 <5ms 不等落库,fire-and-forget
TrackEvent 批量 100 条 gRPC → channel <20ms 减少 RPC 次数
Worker 批量落库 攒批 100 条/1s → INSERT ... ON CONFLICT DO NOTHING ~50ms/batch 持续 50 万/分钟吞吐
MV 刷新(单视图) REFRESH MATERIALIZED VIEW CONCURRENTLY 5~15s SHARE UPDATE EXCLUSIVE 锁
预聚合表刷新 Worker 增量写 <5s 单 SQL,取决于 star 数
分区创建/清理 每天 1 次(7 天预创建 + 30 天清理) <1s 几乎无感

4.10.4 瓶颈与风险点

# 瓶颈 影响 缓解
1 MV 刷新阻塞 INSERT 刷新期间 events 表写入排队 1-2 万/5min(≈1% 事件丢失) 错开不同 MV 刷新时间;REFRESH CONCURRENTLY 已有
2 冷启动 7 RPC 并发 启动瞬间 14 个请求(含 userService)冲击 DB 启动 hook 预热 7 个 cache;首屏请求合并
3 JSONB GIN 索引膨胀 15 亿行 × 500B/properties × GIN 膨胀 ≈ 30-50GB 索引 30 天清理自然控制;高频字段可降级 btree
4 跨服务调用失败 crystal_balance 取不到,看板数据不全 已有 stale 缓存兜底(§4.2 第 6 条);连续 3 次失败才 5xx
5 week_rank 全量重算 100 star × 1 万 user = 100 万行,5min 一次 PostgreSQL 几秒搞定;预聚合表 + 索引已就绪
6 磁盘 IO 30 天 × 5000 万/天 × 500B ≈ 750GB 按日分区,旧分区可单独放慢盘;运维侧监控

相关风险见 §6.1 风险登记表(16 个风险中已覆盖大部分瓶颈)。

4.10.5 一句话总结

看板读 P99 控制在 200~400ms(全 Redis 命中场景下 <100ms);TrackEvent 完全不阻塞业务(<5ms 返回);MV 刷新阶段有 1% 事件丢失风险(可接受,看板非核心交易)。最大扩展点是 EventSink 抽象让后续 OLAP/实时/SDK 接入零重构。


五、测试与部署

5.1 测试策略

层级 范围 工具 覆盖率
单元测试 service / repository / worker Go testing + testify > 80%
集成测试 7 个 RPC + TrackEvent 端到端 Go testing + dockertest(postgres) 100% RPC
Worker 测试 物化视图刷新 + 预聚合维护 Go testing + sqlmock > 70%
性能测试 7 RPC 并发 P99、TrackEvent 吞吐 k6 / wrk 关键路径
E2E 前端 dashboard.vue → 网关 → 后端 Playwright 1 条主路径
契约测试 7 个 RPC 响应 schema 不变 json schema 校验 100%

测试夹具(fixture):

  • testdata/events_sample.json: 500 条样例事件,覆盖所有 event_type
  • testdata/expected/: 每个 RPC 的预期响应(对齐前端 mock)
  • 测试用 PostgreSQL: 每次 test 启动前 truncate,test 结束 teardown

5.2 单元测试重点

// service/dashboard_service_test.go
func TestGetTodayOverview_WeekRank(t *testing.T) {
    // 1. 准备测试数据(metric_weekly_user_income 表)
    // 2. 构造请求
    // 3. 调用 service
    // 4. 验证响应字段
}

func TestGet7DayIncomeCurve_EmptyWeek(t *testing.T) {
    // 边界: 本周无收益,返回空数组
}

func TestTrackEvent_Deduplication(t *testing.T) {
    // 同一 event_id 两次上报,只入库一次
}

func TestTrackEvent_ChannelFull(t *testing.T) {
    // channel 满时降级,返回 rejected > 0
}

5.3 集成测试(7 RPC 全覆盖)

// integration/dashboard_rpc_test.go
func TestDashboardService_GetTodayOverview_Integration(t *testing.T) {
    // 启动真实 PostgreSQL + statisticService
    // 调用 gRPC GetTodayOverview
    // 验证响应字段对齐前端 mock
}

func TestDashboardService_GetExhibitionIncomeSummary_Integration(t *testing.T) {
    // 验证 top5 排序
    // 验证 total_duration 格式
}

5.4 性能基准(关键路径)

测试项 目标
GetTodayOverview 缓存命中 < 10ms (P99)
GetTodayOverview 缓存未命中 < 200ms (P99)
7 RPC 并发 < 500ms (P99)
TrackEvent 单条 < 50ms (P99)
TrackEvent 批量 100 条 < 100ms (P99)
物化视图刷新(单视图) < 30s
Channel 满载处理 不阻塞业务调用方

5.5 部署清单

新增文件:

backend/services/statisticService/                # 整个目录
backend/proto/statistic.proto
backend/proto/event.proto
backend/gateway/controller/statistic_controller.go
backend/gateway/router/router.go                  # 修改:注册路由
backend/go.work                                  # 修改:加入 statisticService
backend/migrations/                               # 新建:statistic 表迁移
  2026_06_04_001_statistic_events.sql
  2026_06_04_002_statistic_mv_daily_user_income.sql
  2026_06_04_003_statistic_mv_daily_exhibition_revenue.sql
  2026_06_04_004_statistic_mv_daily_like_income.sql
  2026_06_04_005_statistic_mv_asset_level_distribution.sql
  2026_06_04_006_statistic_metric_weekly_user_income.sql
  2026_06_04_007_statistic_metric_recent_level_ups.sql
  2026_06_04_008_statistic_metric_upcoming_level_ups.sql
  2026_06_04_009_statistic_refresh_log.sql
  2026_06_04_010_statistic_partitions_initial.sql
docker/                                          # 修改:加 statisticService 容器

配置项(统计服务 .env):

# 服务端口
STATISTIC_SERVICE_PORT=20009

# 数据库
STATISTIC_DB_URL=postgres://user:pass@postgres:5432/topfans?sslmode=disable
STATISTIC_DB_SCHEMA=statistic

# Redis
STATISTIC_REDIS_URL=redis://redis:6379/0

# 业务服务 URL(用于跨服务调用,获取实时数据)
DUBBO_USER_SERVICE_URL=tri://userservice:20000
DUBBO_GALLERY_SERVICE_URL=tri://galleryservice:20001
DUBBO_ASSET_SERVICE_URL=tri://assetservice:20003
DUBBO_TASK_SERVICE_URL=tri://taskservice:20006
DUBBO_SOCIAL_SERVICE_URL=tri://socialservice:20002

# 事件 channel
STATISTIC_EVENT_CHANNEL_CAPACITY=1000
STATISTIC_EVENT_WORKER_COUNT=1
STATISTIC_EVENT_BATCH_SIZE=100
STATISTIC_EVENT_BATCH_INTERVAL=1s

# 物化视图刷新(可自定义)
STATISTIC_REFRESH_DAILY_USER_INCOME=5m
STATISTIC_REFRESH_DAILY_EXHIBITION_REVENUE=5m
STATISTIC_REFRESH_DAILY_LIKE_INCOME=5m
STATISTIC_REFRESH_ASSET_LEVEL_DISTRIBUTION=15m
STATISTIC_REFRESH_WEEKLY_USER_INCOME=5m
STATISTIC_REFRESH_UPCOMING_LEVEL_UPS=15m

# 分区管理
STATISTIC_PARTITION_RETENTION_DAYS=30
STATISTIC_PARTITION_PRECREATE_DAYS=7

# 预留扩展(本期默认 false)
STATISTIC_ENABLE_OLAP_DUAL_WRITE=false
STATISTIC_ENABLE_REALTIME_CHANNEL=false
STATISTIC_ENABLE_SDK_ENDPOINT=false

5.6 部署顺序

  1. 数据库迁移 - 先执行 10 个 SQL 迁移文件
  2. proto 编译 - protoc 生成 Go 代码
  3. 服务构建 - go build statisticService
  4. gateway 路由注册 - 修改 router.go,加 7 个 dashboard 路由
  5. 前端开关 - frontend/utils/api.js 顶部 USE_MOCK_API = false
  6. 联调测试 - 7 个端点 e2e 通过
  7. 切流量 - 灰度切换(先 10% 流量,再 100%)

5.7 业务侧集成清单(其他服务需要做的事)

需要调用 TrackEvent 的服务:

服务 上报事件 集成位置
socialService asset.like LikeAsset() 写 like_income_log 后
galleryService exhibition.start, exhibition.end StartExhibition() / EndExhibition() 后
taskService exhibition.revenue 定时任务派发收益后
assetService asset.mint, asset.level_up MintAsset() / LevelUp() 后
userService crystal.change 任何水晶变动后

集成方式: 各服务加 statisticClient.TrackEvent(),fire-and-forget 异步调用,失败 log 但不影响主业务。

5.8 灰度发布策略

阶段 范围 验证
Stage 1 statisticService 内部测试 单元测试 + 集成测试
Stage 2 网关联调(后端) 7 个 RPC 端到端通
Stage 3 前端切 10% 流量 监控看板 7 个数据准确性
Stage 4 前端 100% 流量 监控稳定性
Stage 5 业务侧集成(其他服务) TrackEvent 上报,验证物化视图刷新
Stage 6 移除 mock 依赖 关闭前端 USE_MOCK_API

六、风险、后续与验收

6.1 已知风险(16 个)

# 风险 影响 概率 缓解措施
1 粉丝身份越权 看到别人数据 Gateway 校验 fan_profile
2 JSONB 注入 DB 注入 / XSS jsonb_build_object + 前端不直渲
3 MV 刷新阻塞 看板延迟 REFRESH CONCURRENTLY
4 事件表爆炸 磁盘满 按日分区 + 30 天清理
5 week_rank 数据延迟 用户困惑 UI 提示"5 分钟前更新"
6 分区缺失 INSERT 失败 Worker 自动创建
7 多服务同时上报 Channel 满 Level 1 监控 + 告警
8 gateway 路由冲突 其他路径异常 先查现有路由
9 前端 30 分钟缓存 数据新鲜度 强制刷新 + 下拉刷新
10 跨时区错位 看板日期/排名错误 中-高 received_at 统一存 UTC,查询时 AT TIME ZONE 'Asia/Shanghai'
11 冷启动缓存穿透 启动瞬间 DB 压力大 启动时预热 7 个 RPC 缓存;首屏请求合并
12 慢 SQL 锁表 写入阻塞 REFRESH CONCURRENTLY 用 SHARE UPDATE EXCLUSIVE 锁;避开业务高峰
13 TrackEvent 失败永久丢失 数据缺失 中-高 业务方本地 retry 队列;statisticService 落库前不返回成功
14 proto 字段升级兼容 业务方调用报错 字段一律 optional / 带默认值;废弃字段保留 N 版本
15 单用户大量事件(脚本/爬虫) Channel 倾斜 按 user_id 限流(每 user 100 QPS)
16 服务启动依赖 启动期 TrackEvent 失败 业务方重试;statisticService 启动不依赖其他服务

6.2 后续可扩展(不在本期范围)

扩展项 触发条件 工作量 优先级
多赛季切换 赛季结束归档 1 周 P2
赛季总览 Tab 上面完成 + UI 1 周 P2
前端埋点 SDK 前端有团队接 2-3 周 P2
OLAP 双写 ClickHouse 事件量 > 1亿/天 2 周 P3
实时通道(同步 TrackEvent) 业务要求实时 1 周 P3
Channel 满载优化(Level 2-5) channel 满载告警频繁 1 周 P3
数据看板分享海报 运营要求 1 周 P4
7 日曲线交互(点击柱子弹明细) UI 要求 3 天 P4
升级提醒推送 业务要求 1 周 P4
本周 vs 上周对比 运营要求 1 周 P4
业务方自定义事件查询 业务方有强需求 2 周 P4
数据准确性核对工具 数据出现问题时 3 天 工具

6.3 验收清单(本期)

功能性:

  • 7 个看板 RPC 全部联通,数据准确(对照前端 mock)
  • week_rank 完整实现,排名准确
  • ExhibitionIncomeSummary.top5 排序正确
  • Get7DayIncomeCurve.points[].is_peak 标识正确
  • TrackEvent 接收 + 去重 + 批量
  • 物化视图刷新成功(4 个 MV + 3 个 metric 表: 2 个由 Worker 定时刷新 + 1 个由 TrackEvent 同步触发)
  • 预聚合表更新正确(recent_level_ups / upcoming_level_ups)
  • JWT 鉴权 + 粉丝身份校验
  • 跨时区处理(Asia/Shanghai 转换)

性能:

  • 看板单 RPC P99 < 200ms
  • 7 RPC 并发 P99 < 500ms
  • TrackEvent P99 < 50ms
  • 物化视图刷新 < 30s/视图
  • 缓存命中率 > 80%
  • 冷启动不雪崩(预热 + 缓存穿透防护)

可靠性:

  • 单元测试覆盖率 > 80%
  • 集成测试覆盖 100% RPC
  • Channel 满载不阻塞业务
  • MV 刷新失败有告警
  • partition 自动创建/清理
  • 服务异常时不丢关键看板数据
  • TrackEvent 失败业务方有 retry

可观测性:

  • Prometheus 指标完整(20+ 指标)
  • 健康检查端点
  • refresh_log 完整记录
  • 关键告警规则配置(10 条)

集成:

  • Gateway 路由注册正确(7 个 dashboard 路由)
  • 前端 USE_MOCK_API = false 切换
  • 业务侧 5 个服务集成(TrackEvent 上报)
  • Dubbo 服务注册成功(20009)
  • 数据库迁移脚本可重放(10 个 SQL)

安全:

  • 粉丝身份越权防护
  • JSONB 注入防护
  • 限流配置生效
  • CORS 配置(预留 SDK 时)

6.4 文档维护

文档 位置 维护人
本设计文档 docs/superpowers/specs/2026-06-04-statistic-service-design.md 后端主程
API 文档(Swagger) http://gateway:8080/swagger/index.html 自动生成
事件类型注册表 backend/services/statisticService/docs/EVENT_TYPES.md 后端 + 业务方
运维手册 backend/services/statisticService/docs/RUNBOOK.md 运维
常见问题 FAQ backend/services/statisticService/docs/FAQ.md 后端
更新日志 CHANGELOG backend/services/statisticService/CHANGELOG.md 后端

事件类型新增流程:

  1. 业务方提 PR 修改 EVENT_TYPES.md
  2. 后端 review
  3. 合并后业务方按规范上报

6.5 不在本期范围(明确排除)

  • 多赛季历史数据
  • 数据看板分享海报
  • 7 日曲线点击弹窗
  • 升级提醒推送
  • 本周 vs 上周对比
  • 前端埋点 SDK 实现(只预留后端 HTTP 端点)
  • 业务方自定义事件查询 API
  • ClickHouse 集成(只预留开关和 events 表设计)
  • BI 报表工具集成
  • 实时数据流(Flink/Kafka)

文档结束。