3015 lines
94 KiB
Markdown
3015 lines
94 KiB
Markdown
# statisticService 看板 + 事件采集 实施计划
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** 实现 `statisticService` 后端服务(Go Dubbo-go),提供数据看板 7 RPC + 事件采集框架(TrackEvent / BatchTrackEvent),并将 5 个业务服务(social / asset / gallery / task / user)改造接入 TrackEvent 上报。
|
||
|
||
**Architecture:**
|
||
- `statisticService`:Go Dubbo-go 服务,端口 20009,暴露 7 个看板 RPC(经 gateway)+ 2 个事件 RPC(内部调用,TrackEvent / BatchTrackEvent)
|
||
- 事件流:业务服务 → Dubbo TrackEvent → ChannelEventSink → event_flusher Worker 批量落库到 `statistic.events` 表(按日分区)
|
||
- 看板数据:4 个物化视图(MV)+ 3 个预聚表(Worker 维护),看板查询走 MV/预聚表,Redis 5min TTL 缓存
|
||
- Gateway:经 `backend/gateway/controller/statistic_controller.go` 暴露 7 个 GET `/api/v1/dashboard/*` 路由,响应统一包装 `{code:200, data:resp}`
|
||
|
||
**Tech Stack:** Go (dubbo-go v3 triple)、GORM、PostgreSQL (按日分区 + 物化视图 + 预聚表)、Redis 缓存、Prometheus 指标、gin (gateway)
|
||
|
||
**Spec:** `docs/superpowers/specs/2026-06-04-statistic-service-design.md`(含 §0.1 本期实施范围)
|
||
|
||
---
|
||
|
||
## 文件结构
|
||
|
||
### 新建
|
||
|
||
```
|
||
backend/
|
||
├── proto/
|
||
│ ├── event.proto # 新建:通用事件类型(独立 proto)
|
||
│ └── statistic.proto # 新建:statistic 服务 + 7 看板 RPC + 2 事件 RPC
|
||
├── pkg/proto/
|
||
│ ├── event/ # 新建:event.proto 编译产物
|
||
│ └── statistic/ # 新建:statistic.proto 编译产物
|
||
├── services/statisticService/ # 新建(目录已存在但为空)
|
||
│ ├── go.mod / go.sum # 新建
|
||
│ ├── main.go # 新建
|
||
│ ├── config/
|
||
│ │ └── statistic_config.go # 新建
|
||
│ ├── model/
|
||
│ │ ├── event.go # 新建:Event 模型
|
||
│ │ └── metric.go # 新建:7 看板响应结构
|
||
│ ├── repository/
|
||
│ │ ├── event_repo.go # 新建:events 表 + 3 预聚表
|
||
│ │ └── dashboard_repo.go # 新建:4 MV 聚合查询
|
||
│ ├── sink/
|
||
│ │ ├── event_sink.go # 新建:EventSink 接口
|
||
│ │ └── channel_sink.go # 新建:ChannelEventSink(本期)
|
||
│ ├── service/
|
||
│ │ ├── event_service.go # 新建:事件校验 + Submit
|
||
│ │ ├── dashboard_service.go # 新建:7 看板 RPC 业务逻辑
|
||
│ │ ├── metric_service.go # 新建:MV 刷新协调
|
||
│ │ └── cache.go # 新建:Redis 缓存封装
|
||
│ ├── provider/
|
||
│ │ ├── statistic_mobile_provider.go # 新建:7 看板 RPC(mobile)
|
||
│ │ └── statistic_internal_provider.go # 新建:TrackEvent / BatchTrackEvent(internal)
|
||
│ ├── worker/
|
||
│ │ ├── event_flusher.go # 新建:事件批量落库
|
||
│ │ ├── metric_recent_level_ups_updater.go # 新建:同步触发
|
||
│ │ ├── metric_upcoming_level_ups_updater.go # 新建:15min ticker
|
||
│ │ ├── metric_weekly_user_income_updater.go # 新建:5min ticker
|
||
│ │ ├── materializer.go # 新建:4 MV 刷新
|
||
│ │ └── partitioner.go # 新建:events 分区管理
|
||
│ ├── handler/
|
||
│ │ └── healthz.go # 新建:健康检查
|
||
│ ├── metrics/
|
||
│ │ └── metrics.go # 新建:Prometheus 指标声明
|
||
│ ├── client/ # 新建:跨服务 RPC 客户端
|
||
│ │ ├── user_rpc_client.go
|
||
│ │ └── gallery_rpc_client.go
|
||
│ └── scripts/ # 新建:测试 fixture
|
||
│ └── testhelper/
|
||
├── migrations/
|
||
│ ├── 2026_06_08_001_statistic_events.sql
|
||
│ ├── 2026_06_08_002_statistic_mv_daily_user_income.sql
|
||
│ ├── 2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql
|
||
│ ├── 2026_06_08_004_statistic_mv_daily_like_income.sql
|
||
│ ├── 2026_06_08_005_statistic_mv_asset_level_distribution.sql
|
||
│ ├── 2026_06_08_006_statistic_metric_weekly_user_income.sql
|
||
│ ├── 2026_06_08_007_statistic_metric_recent_level_ups.sql
|
||
│ ├── 2026_06_08_008_statistic_metric_upcoming_level_ups.sql
|
||
│ ├── 2026_06_08_009_statistic_refresh_log.sql
|
||
│ └── 2026_06_08_010_statistic_partitions_initial.sql
|
||
├── gateway/
|
||
│ ├── router/router.go # 修改:注册 7 路由
|
||
│ └── controller/statistic_controller.go # 新建:7 看板 controller
|
||
└── pkg/statistic/ # 新建:业务侧统一调用 SDK
|
||
└── client.go
|
||
```
|
||
|
||
### 修改
|
||
|
||
```
|
||
backend/
|
||
├── go.work # 修改:加 ./services/statisticService
|
||
├── services/socialService/service/asset_like_service.go # 修改:写 like_income_log 后调 TrackEvent
|
||
├── services/galleryService/service/exhibition_service.go # 修改:PlaceAsset / RemoveFromSlot 后调
|
||
├── services/taskService/service/revenue_service.go # 修改:OnExhibitionCompleted 后调
|
||
├── services/assetService/service/mint_service.go # 修改:CreateMintOrder 后调
|
||
├── services/assetService/service/asset_level_service.go # 修改:CheckUpgrade 后调
|
||
└── services/userService/service/user_service.go # 修改:UpdateCrystalBalance 后调
|
||
```
|
||
|
||
---
|
||
|
||
## 任务编排
|
||
|
||
**4 阶段 / 16 任务 / 每个任务包含 TDD 步骤**:
|
||
|
||
| 阶段 | 任务 | 周期(估)|
|
||
|------|------|----------|
|
||
| **P1 服务骨架** | T1-T3 | 2-3 天 |
|
||
| **P2 事件采集** | T4-T9 | 4-5 天 |
|
||
| **P3 看板 7 RPC** | T10-T14 | 4-5 天 |
|
||
| **P4 业务侧集成** | T15-T16 | 2-3 天 |
|
||
|
||
每个任务结束都应有:所有测试通过 + git commit + 关键验证勾选完成。
|
||
|
||
---
|
||
|
||
## P1 服务骨架
|
||
|
||
### Task 1: 项目初始化 + go.mod / go.work / 目录结构
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/go.mod`
|
||
- Modify: `backend/go.work`
|
||
|
||
- [ ] **Step 1.1: 创建 go.mod**
|
||
|
||
```bash
|
||
cd backend/services/statisticService
|
||
go mod init github.com/topfans/backend/services/statisticService
|
||
```
|
||
|
||
- [ ] **Step 1.2: 添加核心依赖(沿用 taskService)**
|
||
|
||
```bash
|
||
go get github.com/topfans/backend/pkg/database@v0.0.0
|
||
go get github.com/topfans/backend/pkg/logger@v0.0.0
|
||
go get github.com/topfans/backend/pkg/health@v0.0.0
|
||
go get dubbo.apache.org/dubbo-go/v3
|
||
go get gorm.io/gorm
|
||
go get gorm.io/driver/postgres
|
||
go get github.com/redis/go-redis/v9
|
||
go get github.com/prometheus/client_golang/prometheus
|
||
```
|
||
|
||
> 实际版本号看 go.work 里 taskService 的 go.mod。复制其 `require` 块,删除 taskService 特定的包(assetLevelRepo 等)。
|
||
|
||
- [ ] **Step 1.3: 修改 go.work 加入新服务**
|
||
|
||
在 `backend/go.work` 的 `use` 列表里加:
|
||
```
|
||
./services/statisticService
|
||
```
|
||
|
||
- [ ] **Step 1.4: 创建子目录结构**
|
||
|
||
```bash
|
||
mkdir -p backend/services/statisticService/{config,model,repository,sink,service,provider,worker,handler,metrics,client,scripts/testhelper}
|
||
```
|
||
|
||
- [ ] **Step 1.5: 创建占位 main.go**
|
||
|
||
```go
|
||
// backend/services/statisticService/main.go
|
||
package main
|
||
|
||
import "fmt"
|
||
|
||
func main() {
|
||
fmt.Println("statisticService starting...")
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 1.6: 编译验证**
|
||
|
||
```bash
|
||
cd backend
|
||
go build ./services/statisticService
|
||
```
|
||
|
||
Expected: 编译成功,无输出。
|
||
|
||
- [ ] **Step 1.7: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/ backend/go.work
|
||
git commit -m "feat(statistic): scaffold statisticService with go.mod and directory structure"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 2: proto 定义(event.proto + statistic.proto)
|
||
|
||
**Files:**
|
||
- Create: `backend/proto/event.proto`
|
||
- Create: `backend/proto/statistic.proto`
|
||
|
||
> **关键:proto 冻结点。** P2/P3 业务逻辑依赖此定义。
|
||
|
||
- [ ] **Step 2.1: 创建 event.proto**
|
||
|
||
```bash
|
||
cat > backend/proto/event.proto << 'EOF'
|
||
syntax = "proto3";
|
||
package event;
|
||
option go_package = "github.com/topfans/backend/pkg/proto/event";
|
||
|
||
message Event {
|
||
string event_id = 1;
|
||
int64 user_id = 2;
|
||
int64 star_id = 3;
|
||
string event_type = 4;
|
||
int64 occurred_at = 5;
|
||
int64 received_at = 6;
|
||
map<string, string> properties = 7;
|
||
}
|
||
|
||
message BatchEventRequest {
|
||
repeated Event events = 1;
|
||
}
|
||
EOF
|
||
```
|
||
|
||
- [ ] **Step 2.2: 创建 statistic.proto(对齐 spec §2.3)**
|
||
|
||
```bash
|
||
cat > backend/proto/statistic.proto << 'EOF'
|
||
syntax = "proto3";
|
||
package statistic;
|
||
option go_package = "github.com/topfans/backend/pkg/proto/statistic";
|
||
import "event.proto";
|
||
|
||
service StatisticService {
|
||
rpc GetTodayOverview(GetTodayOverviewRequest) returns (GetTodayOverviewResponse);
|
||
rpc Get7DayIncomeCurve(Get7DayIncomeCurveRequest) returns (Get7DayIncomeCurveResponse);
|
||
rpc GetExhibitionIncomeSummary(GetExhibitionIncomeSummaryRequest) returns (GetExhibitionIncomeSummaryResponse);
|
||
rpc GetLikeIncomeByLevel(GetLikeIncomeByLevelRequest) returns (GetLikeIncomeByLevelResponse);
|
||
rpc GetTopAssetsByEarning(GetTopAssetsByEarningRequest) returns (GetTopAssetsByEarningResponse);
|
||
rpc GetAssetLevelDistribution(GetAssetLevelDistributionRequest) returns (GetAssetLevelDistributionResponse);
|
||
rpc GetAssetUpgradeProgress(GetAssetUpgradeProgressRequest) returns (GetAssetUpgradeProgressResponse);
|
||
rpc TrackEvent(event.Event) returns (TrackEventResponse);
|
||
rpc BatchTrackEvent(event.BatchEventRequest) returns (TrackEventResponse);
|
||
}
|
||
|
||
// ====== 1. 今日概览 ======
|
||
message GetTodayOverviewRequest { int64 star_id = 1; }
|
||
message GetTodayOverviewResponse {
|
||
int64 crystal_balance = 1;
|
||
int64 today_income = 2;
|
||
int32 week_rank = 3;
|
||
int32 week_total_users = 4;
|
||
}
|
||
|
||
// ====== 2. 七日收益曲线 ======
|
||
message Get7DayIncomeCurveRequest { int64 star_id = 1; }
|
||
message DailyIncomePoint {
|
||
string date = 1;
|
||
int64 income = 2;
|
||
bool is_today = 3;
|
||
bool is_peak = 4;
|
||
}
|
||
message Get7DayIncomeCurveResponse {
|
||
repeated DailyIncomePoint points = 1;
|
||
int64 total_income = 2;
|
||
int64 avg_income = 3;
|
||
}
|
||
|
||
// ====== 3. 展出收益中心 ======
|
||
message GetExhibitionIncomeSummaryRequest { int64 star_id = 1; }
|
||
message TopExhibitionItem {
|
||
int64 asset_id = 1;
|
||
string asset_name = 2;
|
||
string asset_thumb = 3;
|
||
string duration_7d = 4;
|
||
int64 earnings_7d = 5;
|
||
int32 avg_earnings = 6;
|
||
}
|
||
message GetExhibitionIncomeSummaryResponse {
|
||
int32 exhibiting_count = 1;
|
||
int32 starbook_count = 2;
|
||
string total_duration = 3;
|
||
int64 total_earnings = 4;
|
||
repeated TopExhibitionItem top5 = 5;
|
||
}
|
||
|
||
// ====== 4. 点赞收益按等级 ======
|
||
message GetLikeIncomeByLevelRequest { int64 star_id = 1; }
|
||
message LikeIncomeLevelItem {
|
||
string level = 1;
|
||
int32 asset_count = 2;
|
||
int64 total_income = 3;
|
||
string thumb = 4;
|
||
}
|
||
message GetLikeIncomeByLevelResponse {
|
||
int64 total_like_count = 1;
|
||
int64 total_income = 2;
|
||
repeated LikeIncomeLevelItem levels = 3;
|
||
}
|
||
|
||
// ====== 5. 藏品 TOP5 ======
|
||
message GetTopAssetsByEarningRequest { int64 star_id = 1; }
|
||
message TopAssetItem {
|
||
int64 asset_id = 1;
|
||
string asset_name = 2;
|
||
string asset_thumb = 3;
|
||
int64 total_earnings = 4;
|
||
int32 rank = 5;
|
||
}
|
||
message GetTopAssetsByEarningResponse {
|
||
repeated TopAssetItem items = 1;
|
||
}
|
||
|
||
// ====== 6. 藏品等级分布 ======
|
||
message GetAssetLevelDistributionRequest { int64 star_id = 1; }
|
||
message AssetLevelItem {
|
||
string level = 1;
|
||
int32 count = 2;
|
||
int32 total = 3;
|
||
}
|
||
message GetAssetLevelDistributionResponse {
|
||
repeated AssetLevelItem items = 1;
|
||
}
|
||
|
||
// ====== 7. 升级进度 ======
|
||
message GetAssetUpgradeProgressRequest { int64 star_id = 1; }
|
||
message UpcomingLevelUpItem {
|
||
int64 asset_id = 1;
|
||
string asset_name = 2;
|
||
string asset_thumb = 3;
|
||
int32 like_progress = 4;
|
||
int32 duration_progress = 5;
|
||
}
|
||
message RecentLevelUpItem {
|
||
int64 asset_id = 1;
|
||
string asset_name = 2;
|
||
string asset_thumb = 3;
|
||
string new_level = 4;
|
||
int64 upgrade_time = 5;
|
||
}
|
||
message GetAssetUpgradeProgressResponse {
|
||
repeated UpcomingLevelUpItem upcoming = 1;
|
||
repeated RecentLevelUpItem recent = 2;
|
||
}
|
||
|
||
// ====== 事件采集响应 ======
|
||
message TrackEventResponse {
|
||
int32 accepted = 1;
|
||
int32 rejected = 2;
|
||
}
|
||
EOF
|
||
```
|
||
|
||
- [ ] **Step 2.3: 编译 proto(生成 Go 代码)**
|
||
|
||
```bash
|
||
cd backend
|
||
# 实际项目用 make / 脚本生成,看 Makefile 或 scripts/ 里 protoc 命令
|
||
ls scripts/ 2>/dev/null
|
||
# 或用标准命令:
|
||
protoc --go_out=pkg/proto --go_triple_out=pkg/proto \
|
||
-I proto proto/event.proto proto/statistic.proto
|
||
```
|
||
|
||
Expected: `pkg/proto/event/` 和 `pkg/proto/statistic/` 下生成 `.pb.go` 和 `.triple.go`。
|
||
|
||
- [ ] **Step 2.4: 验证编译**
|
||
|
||
```bash
|
||
go build ./pkg/proto/event/ ./pkg/proto/statistic/
|
||
```
|
||
|
||
Expected: 编译成功。
|
||
|
||
- [ ] **Step 2.5: Commit**
|
||
|
||
```bash
|
||
git add backend/proto/ backend/pkg/proto/event/ backend/pkg/proto/statistic/
|
||
git commit -m "feat(statistic): add event.proto and statistic.proto with 7 RPCs + 2 event RPCs"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 3: config + main.go 启动 + healthz + metrics 骨架 + 10 SQL 迁移
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/config/statistic_config.go`
|
||
- Modify: `backend/services/statisticService/main.go`
|
||
- Create: `backend/services/statisticService/handler/healthz.go`
|
||
- Create: `backend/services/statisticService/metrics/metrics.go`
|
||
- Create: 10 SQL migration files under `backend/migrations/`
|
||
|
||
- [ ] **Step 3.1: 创建 config(沿用 taskService 模式)**
|
||
|
||
```go
|
||
// backend/services/statisticService/config/statistic_config.go
|
||
package config
|
||
|
||
import (
|
||
"flag"
|
||
"log"
|
||
"os"
|
||
"strconv"
|
||
"time"
|
||
)
|
||
|
||
type DatabaseConfig struct {
|
||
Host, Password, DBName, SSLMode, TimeZone string
|
||
Port int
|
||
User string
|
||
Schema string
|
||
}
|
||
|
||
type RedisConfig struct {
|
||
URL string
|
||
}
|
||
|
||
type RefreshIntervals struct {
|
||
DailyUserIncome time.Duration
|
||
DailyExhibitionRevenue time.Duration
|
||
DailyLikeIncome time.Duration
|
||
AssetLevelDistribution time.Duration
|
||
WeeklyUserIncome time.Duration
|
||
UpcomingLevelUps time.Duration
|
||
}
|
||
|
||
type ChannelConfig struct {
|
||
EventChannelCapacity int
|
||
EventWorkerCount int
|
||
EventBatchSize int
|
||
EventBatchInterval time.Duration
|
||
}
|
||
|
||
type PartitionConfig struct {
|
||
RetentionDays int
|
||
PreCreateDays int
|
||
}
|
||
|
||
type ExtensionConfig struct {
|
||
EnableOLAPDualWrite bool
|
||
EnableRealtimeChannel bool
|
||
EnableSDKEndpoint bool
|
||
EnableSampling bool
|
||
}
|
||
|
||
var (
|
||
DBConfig = &DatabaseConfig{Schema: "statistic"}
|
||
RedisCfg = &RedisConfig{URL: "redis://localhost:6379/0"}
|
||
RefreshCfg = &RefreshIntervals{
|
||
DailyUserIncome: 5 * time.Minute,
|
||
DailyExhibitionRevenue: 5 * time.Minute,
|
||
DailyLikeIncome: 5 * time.Minute,
|
||
AssetLevelDistribution: 15 * time.Minute,
|
||
WeeklyUserIncome: 5 * time.Minute,
|
||
UpcomingLevelUps: 15 * time.Minute,
|
||
}
|
||
ChannelCfg = &ChannelConfig{
|
||
EventChannelCapacity: 1000,
|
||
EventWorkerCount: 1,
|
||
EventBatchSize: 100,
|
||
EventBatchInterval: 1 * time.Second,
|
||
}
|
||
PartitionCfg = &PartitionConfig{RetentionDays: 30, PreCreateDays: 7}
|
||
ExtCfg = &ExtensionConfig{ // 全部默认 false
|
||
EnableOLAPDualWrite: false,
|
||
EnableRealtimeChannel: false,
|
||
EnableSDKEndpoint: false,
|
||
EnableSampling: false,
|
||
}
|
||
)
|
||
|
||
func getEnv(key, fallback string) string {
|
||
if v := os.Getenv(key); v != "" { return v }
|
||
return fallback
|
||
}
|
||
|
||
func getEnvInt(key string, fallback int) int {
|
||
if v := os.Getenv(key); v != "" {
|
||
if n, err := strconv.Atoi(v); err == nil { return n }
|
||
}
|
||
return fallback
|
||
}
|
||
|
||
func getEnvDuration(key string, fallback time.Duration) time.Duration {
|
||
if v := os.Getenv(key); v != "" {
|
||
if d, err := time.ParseDuration(v); err == nil { return d }
|
||
}
|
||
return fallback
|
||
}
|
||
|
||
func getEnvBool(key string, fallback bool) bool {
|
||
if v := os.Getenv(key); v != "" {
|
||
if b, err := strconv.ParseBool(v); err == nil { return b }
|
||
}
|
||
return fallback
|
||
}
|
||
|
||
func InitConfig() {
|
||
flag.StringVar(&DBConfig.Host, "db-host", getEnv("STATISTIC_DB_HOST", "localhost"), "")
|
||
flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("STATISTIC_DB_PORT", 5432), "")
|
||
flag.StringVar(&DBConfig.User, "db-user", getEnv("STATISTIC_DB_USER", "postgres"), "")
|
||
flag.StringVar(&DBConfig.Password, "db-password", getEnv("STATISTIC_DB_PASSWORD", ""), "")
|
||
flag.StringVar(&DBConfig.DBName, "db-name", getEnv("STATISTIC_DB_NAME", "topfans"), "")
|
||
flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "")
|
||
flag.StringVar(&DBConfig.Schema, "db-schema", getEnv("STATISTIC_DB_SCHEMA", "statistic"), "")
|
||
|
||
flag.StringVar(&RedisCfg.URL, "redis-url", getEnv("STATISTIC_REDIS_URL", "redis://localhost:6379/0"), "")
|
||
|
||
flag.IntVar(&ChannelCfg.EventChannelCapacity, "event-channel-capacity", getEnvInt("STATISTIC_EVENT_CHANNEL_CAPACITY", 1000), "")
|
||
flag.IntVar(&ChannelCfg.EventBatchSize, "event-batch-size", getEnvInt("STATISTIC_EVENT_BATCH_SIZE", 100), "")
|
||
flag.DurationVar(&ChannelCfg.EventBatchInterval, "event-batch-interval", getEnvDuration("STATISTIC_EVENT_BATCH_INTERVAL", time.Second), "")
|
||
|
||
flag.IntVar(&PartitionCfg.RetentionDays, "partition-retention-days", getEnvInt("STATISTIC_PARTITION_RETENTION_DAYS", 30), "")
|
||
flag.IntVar(&PartitionCfg.PreCreateDays, "partition-precreate-days", getEnvInt("STATISTIC_PARTITION_PRECREATE_DAYS", 7), "")
|
||
|
||
flag.BoolVar(&ExtCfg.EnableOLAPDualWrite, "enable-olap", getEnvBool("STATISTIC_ENABLE_OLAP_DUAL_WRITE", false), "")
|
||
flag.BoolVar(&ExtCfg.EnableRealtimeChannel, "enable-realtime", getEnvBool("STATISTIC_ENABLE_REALTIME_CHANNEL", false), "")
|
||
flag.BoolVar(&ExtCfg.EnableSDKEndpoint, "enable-sdk", getEnvBool("STATISTIC_ENABLE_SDK_ENDPOINT", false), "")
|
||
flag.BoolVar(&ExtCfg.EnableSampling, "enable-sampling", getEnvBool("STATISTIC_ENABLE_SAMPLING", false), "")
|
||
|
||
flag.Parse()
|
||
log.Println("statisticService 配置初始化完成")
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3.2: 创建 metrics 骨架(声明所有指标,数据后续埋)**
|
||
|
||
```go
|
||
// backend/services/statisticService/metrics/metrics.go
|
||
package metrics
|
||
|
||
import (
|
||
"github.com/prometheus/client_golang/prometheus"
|
||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||
)
|
||
|
||
// 看板 RPC
|
||
var DashboardRPCTotal = promauto.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "dashboard_rpc_total", Help: "Dashboard RPC total",
|
||
}, []string{"rpc", "status"})
|
||
|
||
var DashboardRPCDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
||
Name: "dashboard_rpc_duration_seconds", Help: "Dashboard RPC duration",
|
||
}, []string{"rpc"})
|
||
|
||
var DashboardCacheHitRate = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "dashboard_cache_hit_rate", Help: "Dashboard cache hit rate",
|
||
})
|
||
|
||
// 事件采集
|
||
var EventTrackTotal = promauto.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "event_track_total", Help: "Event track total",
|
||
}, []string{"event_type", "result"})
|
||
|
||
var EventChannelSize = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "event_channel_size", Help: "Event channel current size",
|
||
})
|
||
|
||
var EventChannelCapacity = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "event_channel_capacity", Help: "Event channel capacity",
|
||
})
|
||
|
||
var EventDroppedTotal = promauto.NewCounter(prometheus.CounterOpts{
|
||
Name: "event_dropped_total", Help: "Event dropped total",
|
||
})
|
||
|
||
var EventDBInsertTotal = promauto.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "event_db_insert_total", Help: "Event DB insert total",
|
||
}, []string{"status"})
|
||
|
||
// 物化视图
|
||
var MVRefreshTotal = promauto.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "mv_refresh_total", Help: "Materialized view refresh total",
|
||
}, []string{"mv_name", "status"})
|
||
|
||
var MVRefreshDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
||
Name: "mv_refresh_duration_seconds", Help: "MV refresh duration",
|
||
}, []string{"mv_name"})
|
||
|
||
// Worker
|
||
var WorkerRunningCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||
Name: "worker_running_count", Help: "Worker running count",
|
||
}, []string{"worker_name"})
|
||
|
||
// 分区
|
||
var EventsPartitionCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "events_partition_count", Help: "Events partition count",
|
||
})
|
||
```
|
||
|
||
- [ ] **Step 3.3: 创建 healthz**
|
||
|
||
```go
|
||
// backend/services/statisticService/handler/healthz.go
|
||
package handler
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"net/http"
|
||
"time"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/redis/go-redis/v9"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
)
|
||
|
||
type Healthz struct {
|
||
db *sql.DB
|
||
redis *redis.Client
|
||
}
|
||
|
||
func NewHealthz(db *sql.DB, redis *redis.Client) *Healthz {
|
||
return &Healthz{db: db, redis: redis}
|
||
}
|
||
|
||
func (h *Healthz) Register(r *gin.Engine, port int) {
|
||
r.GET("/metrics", gin.WrapH(metricsHandler())) // 暴露 Prometheus
|
||
r.GET("/healthz", func(c *gin.Context) {
|
||
ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second)
|
||
defer cancel()
|
||
|
||
status := gin.H{"status": "ok"}
|
||
if err := h.db.PingContext(ctx); err != nil {
|
||
status["db"] = "down"
|
||
} else {
|
||
status["db"] = "up"
|
||
}
|
||
if err := h.redis.Ping(ctx).Err(); err != nil {
|
||
status["redis"] = "down"
|
||
} else {
|
||
status["redis"] = "up"
|
||
}
|
||
c.JSON(http.StatusOK, status)
|
||
})
|
||
}
|
||
```
|
||
|
||
> **实际项目**用 `pkg/health` 组件,参考 taskService main.go:131-133。`health.NewHandler("statistic-service", healthPort).Start()`。本计划用自实现版,最终应替换为项目既有组件。
|
||
|
||
- [ ] **Step 3.4: 创建 main.go(启动骨架,不含业务)**
|
||
|
||
```go
|
||
// backend/services/statisticService/main.go
|
||
package main
|
||
|
||
import (
|
||
"database/sql"
|
||
"flag"
|
||
"fmt"
|
||
"os"
|
||
"os/signal"
|
||
"syscall"
|
||
|
||
_ "github.com/lib/pq"
|
||
"github.com/redis/go-redis/v9"
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/config"
|
||
"github.com/topfans/backend/services/statisticService/handler"
|
||
)
|
||
|
||
var port = flag.Int("port", 20009, "Dubbo service port")
|
||
|
||
func main() {
|
||
env := os.Getenv("ENV")
|
||
if env == "" { env = "development" }
|
||
if err := logger.Init(logger.Config{ServiceName: "statistic-service", Environment: env}); err != nil {
|
||
panic(fmt.Sprintf("Failed to init logger: %v", err))
|
||
}
|
||
defer logger.Sync()
|
||
|
||
config.InitConfig()
|
||
|
||
// DB
|
||
db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
|
||
config.DBConfig.Host, config.DBConfig.Port, config.DBConfig.User, config.DBConfig.Password,
|
||
config.DBConfig.DBName, config.DBConfig.SSLMode))
|
||
if err != nil { logger.Logger.Fatal(fmt.Sprintf("Failed to open DB: %v", err)) }
|
||
db.SetMaxOpenConns(50)
|
||
if err := db.Ping(); err != nil { logger.Logger.Fatal(fmt.Sprintf("DB ping: %v", err)) }
|
||
defer db.Close()
|
||
|
||
// Redis
|
||
opt, _ := redis.ParseURL(config.RedisCfg.URL)
|
||
rdb := redis.NewClient(opt)
|
||
defer rdb.Close()
|
||
|
||
// Healthz HTTP server on port+1000=21009
|
||
healthPort := *port + 1000
|
||
h := handler.NewHealthz(db, rdb)
|
||
go h.Start(healthPort)
|
||
|
||
logger.Logger.Info(fmt.Sprintf("statisticService started, dubbo=%d healthz=%d", *port, healthPort))
|
||
|
||
// Graceful shutdown
|
||
quit := make(chan os.Signal, 1)
|
||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||
<-quit
|
||
}
|
||
```
|
||
|
||
并在 healthz.go 加 `Start` 方法:
|
||
|
||
```go
|
||
// 加到 backend/services/statisticService/handler/healthz.go
|
||
func (h *Healthz) Start(port int) {
|
||
r := gin.Default()
|
||
h.Register(r, port)
|
||
go func() {
|
||
if err := r.Run(fmt.Sprintf(":%d", port)); err != nil {
|
||
logger.Logger.Fatal(...)
|
||
}
|
||
}()
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3.5: 创建 10 个 SQL 迁移文件(参考 spec §3.2-3.5)**
|
||
|
||
每个文件包含一个 DDL + 必要的索引。**关键 SQL 摘要:**
|
||
|
||
`backend/migrations/2026_06_08_001_statistic_events.sql`:
|
||
|
||
```sql
|
||
CREATE SCHEMA IF NOT EXISTS statistic;
|
||
|
||
CREATE TABLE IF NOT EXISTS statistic.events (
|
||
id BIGSERIAL,
|
||
event_id UUID NOT NULL,
|
||
user_id BIGINT NOT NULL,
|
||
star_id BIGINT NOT NULL,
|
||
event_type VARCHAR(64) NOT NULL,
|
||
occurred_at TIMESTAMPTZ NOT NULL,
|
||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
properties JSONB NOT NULL DEFAULT '{}',
|
||
PRIMARY KEY (id, received_at)
|
||
) PARTITION BY RANGE (received_at);
|
||
|
||
CREATE UNIQUE INDEX IF NOT EXISTS idx_events_event_id ON statistic.events (event_id, received_at);
|
||
CREATE INDEX IF NOT EXISTS idx_events_user_star_type_time
|
||
ON statistic.events (user_id, star_id, event_type, received_at DESC);
|
||
CREATE INDEX IF NOT EXISTS idx_events_star_type_time
|
||
ON statistic.events (star_id, event_type, received_at DESC);
|
||
CREATE INDEX IF NOT EXISTS idx_events_properties_gin ON statistic.events USING GIN (properties);
|
||
|
||
-- 初始 7 天分区(从今天起)
|
||
DO $$
|
||
DECLARE
|
||
i INT;
|
||
d DATE;
|
||
n DATE;
|
||
BEGIN
|
||
FOR i IN 0..6 LOOP
|
||
d := CURRENT_DATE + i;
|
||
n := d + 1;
|
||
EXECUTE format(
|
||
'CREATE TABLE IF NOT EXISTS statistic.events_%s PARTITION OF statistic.events FOR VALUES FROM (%L) TO (%L)',
|
||
to_char(d, 'YYYY_MM_DD'), d::text, n::text
|
||
);
|
||
END LOOP;
|
||
END $$;
|
||
```
|
||
|
||
`2026_06_08_002_statistic_mv_daily_user_income.sql` - 见 spec §3.4 MV1 SQL
|
||
|
||
`2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql` - 见 spec §3.4 MV2 SQL
|
||
|
||
`2026_06_08_004_statistic_mv_daily_like_income.sql` - 见 spec §3.4 MV3 SQL
|
||
|
||
`2026_06_08_005_statistic_mv_asset_level_distribution.sql` - 见 spec §3.4 MV4 SQL
|
||
|
||
`2026_06_08_006_statistic_metric_weekly_user_income.sql` - 见 spec §3.5 weekly 表
|
||
|
||
`2026_06_08_007_statistic_metric_recent_level_ups.sql` - 见 spec §3.5 recent_level_ups 表
|
||
|
||
`2026_06_08_008_statistic_metric_upcoming_level_ups.sql` - 见 spec §3.5 upcoming_level_ups 表
|
||
|
||
`2026_06_08_009_statistic_refresh_log.sql`:
|
||
|
||
```sql
|
||
CREATE TABLE IF NOT EXISTS statistic.refresh_log (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
mv_name VARCHAR(128) NOT NULL,
|
||
started_at TIMESTAMPTZ NOT NULL,
|
||
finished_at TIMESTAMPTZ,
|
||
row_count BIGINT,
|
||
status VARCHAR(16) NOT NULL,
|
||
error_message TEXT
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_refresh_log_mv_time
|
||
ON statistic.refresh_log (mv_name, started_at DESC);
|
||
```
|
||
|
||
- [ ] **Step 3.6: 跑 DB 迁移**
|
||
|
||
```bash
|
||
cd backend
|
||
# 用项目现有的迁移工具(看 scripts/)
|
||
ls scripts/ 2>/dev/null
|
||
# 或手动 psql:
|
||
psql -h localhost -U postgres -d topfans -f migrations/2026_06_08_001_statistic_events.sql
|
||
# ... 10 个文件依次执行
|
||
```
|
||
|
||
Expected: 10 个 SQL 全部成功,\dt statistic.* 看到 10+ 表。
|
||
|
||
- [ ] **Step 3.7: 编译 + 运行**
|
||
|
||
```bash
|
||
go build ./services/statisticService
|
||
./services/statisticService/bin/statisticService
|
||
```
|
||
|
||
Expected: 启动日志显示 port 20009 + healthz 21009。
|
||
|
||
- [ ] **Step 3.8: 验证 healthz**
|
||
|
||
```bash
|
||
curl http://localhost:21009/healthz
|
||
```
|
||
|
||
Expected: `{"db":"up","redis":"up","status":"ok"}`
|
||
|
||
- [ ] **Step 3.9: 跑 P1 末预检查清单**
|
||
|
||
> 必做!spec §0.1.3 P1 末预检查清单 10 项。
|
||
|
||
```bash
|
||
# 1. socialService
|
||
grep -r "like_income_log" backend/services/socialService/
|
||
# 2. galleryService
|
||
grep -rn "^func.*PlaceAsset\|^func.*RemoveFromSlot" backend/services/galleryService/service/
|
||
# 3. taskService
|
||
grep -n "OnExhibitionCompleted" backend/services/taskService/service/revenue_service.go
|
||
# 4. assetService
|
||
grep -n "CreateMintOrder\|CheckUpgrade\|logLevelChange" backend/services/assetService/service/{mint_service,asset_level_service}.go
|
||
# 5. userService
|
||
grep -n "UpdateCrystalBalance" backend/services/userService/service/user_service.go
|
||
# 6. public.assets 表结构
|
||
psql -c "\d+ public.assets"
|
||
# 7. 公共日志表
|
||
psql -c "\dt public.*log*"
|
||
# 8. 端口
|
||
lsof -i :20009
|
||
# 9. 业务服务可达
|
||
nc -zv localhost 20000; nc -zv localhost 20001; nc -zv localhost 20002; nc -zv localhost 20003; nc -zv localhost 20006
|
||
# 10. schema 权限
|
||
psql -U postgres -c "CREATE SCHEMA statistic_test_perm; DROP SCHEMA statistic_test_perm;"
|
||
```
|
||
|
||
Expected: 全部通过。
|
||
|
||
- [ ] **Step 3.10: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/ backend/migrations/
|
||
git commit -m "feat(statistic): P1 scaffolding - config, main, healthz, metrics, 10 SQL migrations"
|
||
```
|
||
|
||
---
|
||
|
||
## P2 事件采集
|
||
|
||
### Task 4: Event 模型 + EventSink 接口 + ChannelEventSink
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/model/event.go`
|
||
- Create: `backend/services/statisticService/sink/event_sink.go`
|
||
- Create: `backend/services/statisticService/sink/channel_sink.go`
|
||
- Create: `backend/services/statisticService/sink/channel_sink_test.go`
|
||
|
||
- [ ] **Step 4.1: 写 Event 模型(写测试先)**
|
||
|
||
`backend/services/statisticService/model/event_test.go`:
|
||
|
||
```go
|
||
package model
|
||
|
||
import (
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
func TestEvent_ToJSON(t *testing.T) {
|
||
e := &Event{
|
||
EventID: "uuid-123",
|
||
UserID: 100,
|
||
StarID: 1,
|
||
EventType: "asset.like",
|
||
OccurredAt: time.Unix(1700000000, 0),
|
||
ReceivedAt: time.Unix(1700000001, 0),
|
||
Properties: map[string]string{"asset_id": "456"},
|
||
}
|
||
j := e.ToJSON()
|
||
if j["event_id"] != "uuid-123" { t.Fatal("event_id mismatch") }
|
||
if j["properties"].(map[string]interface{})["asset_id"] != "456" { t.Fatal("properties mismatch") }
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4.2: 跑测试(应失败)**
|
||
|
||
```bash
|
||
cd backend/services/statisticService
|
||
go test ./model/ -v
|
||
```
|
||
|
||
Expected: FAIL - "undefined: Event"
|
||
|
||
- [ ] **Step 4.3: 实现 Event**
|
||
|
||
`backend/services/statisticService/model/event.go`:
|
||
|
||
```go
|
||
package model
|
||
|
||
import "time"
|
||
|
||
type Event struct {
|
||
EventID string `json:"event_id"`
|
||
UserID int64 `json:"user_id"`
|
||
StarID int64 `json:"star_id"`
|
||
EventType string `json:"event_type"`
|
||
OccurredAt time.Time `json:"occurred_at"`
|
||
ReceivedAt time.Time `json:"received_at"`
|
||
Properties map[string]string `json:"properties"`
|
||
}
|
||
|
||
func (e *Event) ToJSON() map[string]interface{} {
|
||
return map[string]interface{}{
|
||
"event_id": e.EventID,
|
||
"user_id": e.UserID,
|
||
"star_id": e.StarID,
|
||
"event_type": e.EventType,
|
||
"occurred_at": e.OccurredAt.UnixMilli(),
|
||
"received_at": e.ReceivedAt.UnixMilli(),
|
||
"properties": e.Properties,
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4.4: 跑测试(应通过)**
|
||
|
||
```bash
|
||
go test ./model/ -v
|
||
```
|
||
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 4.5: 写 EventSink 接口 + ChannelEventSink 测试**
|
||
|
||
`backend/services/statisticService/sink/channel_sink_test.go`:
|
||
|
||
```go
|
||
package sink
|
||
|
||
import (
|
||
"context"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
func TestChannelEventSink_Submit(t *testing.T) {
|
||
ch := make(chan *model.Event, 10)
|
||
s := NewChannelEventSink(ch)
|
||
e := &model.Event{EventID: "test-1"}
|
||
if err := s.Submit(context.Background(), e); err != nil {
|
||
t.Fatalf("Submit failed: %v", err)
|
||
}
|
||
select {
|
||
case got := <-ch:
|
||
if got.EventID != "test-1" { t.Fatal("event mismatch") }
|
||
case <-time.After(100 * time.Millisecond):
|
||
t.Fatal("no event received")
|
||
}
|
||
}
|
||
|
||
func TestChannelEventSink_SubmitBatch(t *testing.T) {
|
||
ch := make(chan *model.Event, 10)
|
||
s := NewChannelEventSink(ch)
|
||
events := []*model.Event{{EventID: "a"}, {EventID: "b"}}
|
||
if err := s.SubmitBatch(context.Background(), events); err != nil {
|
||
t.Fatalf("SubmitBatch failed: %v", err)
|
||
}
|
||
if len(ch) != 2 { t.Fatalf("expected 2 events, got %d", len(ch)) }
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4.6: 跑测试(应失败)**
|
||
|
||
```bash
|
||
go test ./sink/ -v
|
||
```
|
||
|
||
Expected: FAIL
|
||
|
||
- [ ] **Step 4.7: 实现接口和 ChannelEventSink**
|
||
|
||
`backend/services/statisticService/sink/event_sink.go`:
|
||
|
||
```go
|
||
package sink
|
||
|
||
import (
|
||
"context"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
)
|
||
|
||
type EventSink interface {
|
||
Submit(ctx context.Context, e *model.Event) error
|
||
SubmitBatch(ctx context.Context, es []*model.Event) error
|
||
Close() error
|
||
}
|
||
```
|
||
|
||
`backend/services/statisticService/sink/channel_sink.go`:
|
||
|
||
```go
|
||
package sink
|
||
|
||
import (
|
||
"context"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
)
|
||
|
||
type ChannelEventSink struct {
|
||
ch chan<- *model.Event
|
||
}
|
||
|
||
func NewChannelEventSink(ch chan<- *model.Event) *ChannelEventSink {
|
||
return &ChannelEventSink{ch: ch}
|
||
}
|
||
|
||
func (s *ChannelEventSink) Submit(ctx context.Context, e *model.Event) error {
|
||
select {
|
||
case s.ch <- e:
|
||
return nil
|
||
default:
|
||
return ErrChannelFull
|
||
}
|
||
}
|
||
|
||
func (s *ChannelEventSink) SubmitBatch(ctx context.Context, es []*model.Event) error {
|
||
for _, e := range es {
|
||
if err := s.Submit(ctx, e); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *ChannelEventSink) Close() error { return nil }
|
||
|
||
var ErrChannelFull = errors.New("event channel full")
|
||
```
|
||
|
||
补 `import "errors"`。
|
||
|
||
- [ ] **Step 4.8: 跑测试(应通过)**
|
||
|
||
```bash
|
||
go test ./sink/ -v
|
||
```
|
||
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 4.9: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/model/ backend/services/statisticService/sink/
|
||
git commit -m "feat(statistic): Event model, EventSink interface, ChannelEventSink (P2 T4)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 5: event_repo + event_service(事件校验 + 落库)
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/repository/event_repo.go`
|
||
- Create: `backend/services/statisticService/service/event_service.go`
|
||
- Tests
|
||
|
||
- [ ] **Step 5.1: 写 event_repo 批量插入测试**
|
||
|
||
`backend/services/statisticService/repository/event_repo_test.go`:
|
||
|
||
```go
|
||
package repository
|
||
|
||
import (
|
||
"context"
|
||
"os"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
func setupTestDB(t *testing.T) (*sql.DB, func()) {
|
||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||
if dsn == "" {
|
||
t.Skip("TEST_DATABASE_URL not set, skipping integration test")
|
||
}
|
||
db, err := sql.Open("postgres", dsn)
|
||
if err != nil { t.Fatal(err) }
|
||
// 用 schema=test, 每次 truncate events
|
||
if _, err := db.Exec("CREATE SCHEMA IF NOT EXISTS statistic_test"); err != nil { t.Fatal(err) }
|
||
return db, func() {
|
||
db.Exec("TRUNCATE TABLE statistic_test.events")
|
||
db.Close()
|
||
}
|
||
}
|
||
|
||
func TestEventRepo_InsertBatch(t *testing.T) {
|
||
db, cleanup := setupTestDB(t)
|
||
defer cleanup()
|
||
repo := NewEventRepository(db, "statistic_test")
|
||
events := []*model.Event{
|
||
{EventID: "uuid-1", UserID: 100, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{"asset_id": "456"}},
|
||
}
|
||
inserted, err := repo.InsertBatch(context.Background(), events)
|
||
if err != nil { t.Fatal(err) }
|
||
if inserted != 1 { t.Fatalf("expected 1, got %d", inserted) }
|
||
}
|
||
|
||
func TestEventRepo_Dedup(t *testing.T) {
|
||
db, cleanup := setupTestDB(t)
|
||
defer cleanup()
|
||
repo := NewEventRepository(db, "statistic_test")
|
||
e := &model.Event{EventID: "uuid-dup", UserID: 100, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{}}
|
||
repo.InsertBatch(context.Background(), []*model.Event{e})
|
||
inserted, _ := repo.InsertBatch(context.Background(), []*model.Event{e}) // 重复
|
||
if inserted != 0 { t.Fatalf("expected 0 (dedup), got %d", inserted) }
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5.2: 跑测试(应失败)**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5432/topfans_test?sslmode=disable" \
|
||
go test ./repository/ -v -run TestEventRepo
|
||
```
|
||
|
||
Expected: FAIL
|
||
|
||
- [ ] **Step 5.3: 实现 event_repo**
|
||
|
||
`backend/services/statisticService/repository/event_repo.go`:
|
||
|
||
```go
|
||
package repository
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strings"
|
||
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
)
|
||
|
||
type EventRepository struct {
|
||
db *sql.DB
|
||
schema string
|
||
}
|
||
|
||
func NewEventRepository(db *sql.DB, schema string) *EventRepository {
|
||
return &EventRepository{db: db, schema: schema}
|
||
}
|
||
|
||
func (r *EventRepository) InsertBatch(ctx context.Context, events []*model.Event) (int, error) {
|
||
if len(events) == 0 { return 0, nil }
|
||
|
||
placeholders := make([]string, 0, len(events))
|
||
args := make([]interface{}, 0, len(events)*7)
|
||
for _, e := range events {
|
||
props, _ := json.Marshal(e.Properties)
|
||
placeholders = append(placeholders, fmt.Sprintf("($%d, $%d, $%d, $%d, $%d, $%d, $%d)",
|
||
len(args)+1, len(args)+2, len(args)+3, len(args)+4, len(args)+5, len(args)+6, len(args)+7))
|
||
args = append(args, e.EventID, e.UserID, e.StarID, e.EventType, e.OccurredAt, e.ReceivedAt, string(props))
|
||
}
|
||
|
||
query := fmt.Sprintf(`
|
||
INSERT INTO %s.events (event_id, user_id, star_id, event_type, occurred_at, received_at, properties)
|
||
VALUES %s
|
||
ON CONFLICT (event_id, received_at) DO NOTHING
|
||
`, r.schema, strings.Join(placeholders, ","))
|
||
|
||
res, err := r.db.ExecContext(ctx, query, args...)
|
||
if err != nil { return 0, err }
|
||
n, _ := res.RowsAffected()
|
||
return int(n), nil
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5.4: 跑测试(应通过)**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="postgres://postgres:postgres@localhost:5432/topfans_test?sslmode=disable" \
|
||
go test ./repository/ -v -run TestEventRepo
|
||
```
|
||
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 5.5: 写 event_service 校验测试**
|
||
|
||
`backend/services/statisticService/service/event_service_test.go`:
|
||
|
||
```go
|
||
package service
|
||
|
||
import (
|
||
"context"
|
||
"testing"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/sink"
|
||
)
|
||
|
||
type mockSink struct{ events []*model.Event }
|
||
func (m *mockSink) Submit(ctx context.Context, e *model.Event) error {
|
||
m.events = append(m.events, e); return nil
|
||
}
|
||
func (m *mockSink) SubmitBatch(ctx context.Context, es []*model.Event) error {
|
||
m.events = append(m.events, es...); return nil
|
||
}
|
||
func (m *mockSink) Close() error { return nil }
|
||
|
||
func TestEventService_TrackEvent_Success(t *testing.T) {
|
||
ms := &mockSink{}
|
||
svc := NewEventService(ms, []string{"asset.like", "asset.mint"})
|
||
resp, err := svc.TrackEvent(context.Background(), &model.Event{
|
||
EventID: "u1", UserID: 1, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{},
|
||
})
|
||
if err != nil { t.Fatal(err) }
|
||
if resp.Accepted != 1 { t.Fatal("expected accepted=1") }
|
||
if len(ms.events) != 1 { t.Fatal("event not submitted") }
|
||
}
|
||
|
||
func TestEventService_TrackEvent_InvalidEventType(t *testing.T) {
|
||
ms := &mockSink{}
|
||
svc := NewEventService(ms, []string{"asset.like"})
|
||
_, err := svc.TrackEvent(context.Background(), &model.Event{EventType: "evil.type"})
|
||
if err == nil { t.Fatal("expected error for invalid event type") }
|
||
}
|
||
|
||
func TestEventService_TrackEvent_PropertiesTooLarge(t *testing.T) {
|
||
ms := &mockSink{}
|
||
svc := NewEventService(ms, []string{"asset.like"})
|
||
big := make(map[string]string)
|
||
for i := 0; i < 2000; i++ { big[fmt.Sprintf("k%d", i)] = strings.Repeat("x", 100) }
|
||
_, err := svc.TrackEvent(context.Background(), &model.Event{
|
||
EventType: "asset.like", Properties: big,
|
||
})
|
||
if err == nil { t.Fatal("expected error for large properties") }
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5.6: 跑测试(应失败)**
|
||
|
||
```bash
|
||
go test ./service/ -v
|
||
```
|
||
|
||
Expected: FAIL
|
||
|
||
- [ ] **Step 5.7: 实现 event_service**
|
||
|
||
`backend/services/statisticService/service/event_service.go`:
|
||
|
||
```go
|
||
package service
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/sink"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
)
|
||
|
||
var (
|
||
ErrInvalidEventType = fmt.Errorf("invalid event type")
|
||
ErrPropertiesTooLarge = fmt.Errorf("properties too large")
|
||
ErrInvalidEventID = fmt.Errorf("invalid event_id")
|
||
)
|
||
|
||
const MaxPropertiesSize = 1024 // 1KB
|
||
|
||
type EventService struct {
|
||
sink sink.EventSink
|
||
whiteList map[string]bool
|
||
statsEnable bool
|
||
}
|
||
|
||
func NewEventService(s sink.EventSink, whiteList []string) *EventService {
|
||
wl := make(map[string]bool, len(whiteList))
|
||
for _, t := range whiteList { wl[t] = true }
|
||
return &EventService{sink: s, whiteList: wl}
|
||
}
|
||
|
||
func (s *EventService) validate(e *model.Event) error {
|
||
if e.EventID == "" { return ErrInvalidEventID }
|
||
if !s.whiteList[e.EventType] { return ErrInvalidEventType }
|
||
b, _ := json.Marshal(e.Properties)
|
||
if len(b) > MaxPropertiesSize { return ErrPropertiesTooLarge }
|
||
return nil
|
||
}
|
||
|
||
func (s *EventService) TrackEvent(ctx context.Context, e *model.Event) (*pb.TrackEventResponse, error) {
|
||
if e.ReceivedAt.IsZero() { e.ReceivedAt = time.Now() }
|
||
if err := s.validate(e); err != nil { return nil, err }
|
||
if err := s.sink.Submit(ctx, e); err != nil {
|
||
return &pb.TrackEventResponse{Accepted: 0, Rejected: 1}, nil
|
||
}
|
||
return &pb.TrackEventResponse{Accepted: 1, Rejected: 0}, nil
|
||
}
|
||
|
||
func (s *EventService) BatchTrackEvent(ctx context.Context, es []*model.Event) (*pb.TrackEventResponse, error) {
|
||
accepted, rejected := 0, 0
|
||
for _, e := range es {
|
||
if e.ReceivedAt.IsZero() { e.ReceivedAt = time.Now() }
|
||
if err := s.validate(e); err != nil { rejected++; continue }
|
||
if err := s.sink.Submit(ctx, e); err != nil { rejected++ } else { accepted++ }
|
||
}
|
||
return &pb.TrackEventResponse{Accepted: int32(accepted), Rejected: int32(rejected)}, nil
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5.8: 跑测试(应通过)**
|
||
|
||
```bash
|
||
go test ./service/ -v
|
||
```
|
||
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 5.9: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/repository/ backend/services/statisticService/service/
|
||
git commit -m "feat(statistic): event_repo (batch insert + dedup) and event_service (validate + submit) (P2 T5)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 6: event_flusher Worker(攒批落库 + 触发 metric_recent_level_ups)
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/worker/event_flusher.go`
|
||
- Create: `backend/services/statisticService/worker/event_flusher_test.go`
|
||
- Create: `backend/services/statisticService/repository/metric_repo.go`
|
||
|
||
- [ ] **Step 6.1: 写 metric_repo(同步更新 metric_recent_level_ups)**
|
||
|
||
`backend/services/statisticService/repository/metric_repo.go`:
|
||
|
||
```go
|
||
package repository
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
)
|
||
|
||
type MetricRepository struct {
|
||
db *sql.DB
|
||
schema string
|
||
}
|
||
|
||
func NewMetricRepository(db *sql.DB, schema string) *MetricRepository {
|
||
return &MetricRepository{db: db, schema: schema}
|
||
}
|
||
|
||
func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error {
|
||
if e.EventType != "asset.level_up" { return nil }
|
||
assetID := e.Properties["asset_id"]
|
||
fromLevel := e.Properties["from"]
|
||
toLevel := e.Properties["to"]
|
||
upgradeTime := e.OccurredAt
|
||
if assetID == "" || toLevel == "" { return nil }
|
||
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
|
||
INSERT INTO %s.metric_recent_level_ups
|
||
(user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb)
|
||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||
ON CONFLICT DO NOTHING
|
||
`, r.schema),
|
||
e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "")
|
||
return err
|
||
}
|
||
```
|
||
|
||
> 完整 metric_repo 还含 weekly_user_income / upcoming_level_ups 读写,留到 T7/T8 任务。
|
||
|
||
- [ ] **Step 6.2: 写 event_flusher 测试**
|
||
|
||
`backend/services/statisticService/worker/event_flusher_test.go`:
|
||
|
||
```go
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"os"
|
||
"testing"
|
||
"time"
|
||
|
||
_ "github.com/lib/pq"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/repository"
|
||
)
|
||
|
||
func TestEventFlusher_FlushBatch(t *testing.T) {
|
||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||
if dsn == "" { t.Skip("skip") }
|
||
db, _ := sql.Open("postgres", dsn)
|
||
defer db.Close()
|
||
|
||
eventRepo := repository.NewEventRepository(db, "statistic_test")
|
||
metricRepo := repository.NewMetricRepository(db, "statistic_test")
|
||
|
||
ch := make(chan *model.Event, 10)
|
||
flusher := NewEventFlusher(ch, eventRepo, metricRepo, 100, 1*time.Second)
|
||
|
||
go flusher.Start(context.Background())
|
||
|
||
for i := 0; i < 5; i++ {
|
||
ch <- &model.Event{EventID: fmt.Sprintf("u-%d", i), UserID: 1, StarID: 1, EventType: "asset.like",
|
||
OccurredAt: time.Now(), ReceivedAt: time.Now(), Properties: map[string]string{}}
|
||
}
|
||
time.Sleep(2 * time.Second) // 等攒批
|
||
flusher.Stop()
|
||
// 验证 events 表有 5 条
|
||
var n int
|
||
db.QueryRow("SELECT COUNT(*) FROM statistic_test.events").Scan(&n)
|
||
if n < 5 { t.Fatalf("expected >=5 events, got %d", n) }
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 6.3: 跑测试(应失败)**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="..." go test ./worker/ -v
|
||
```
|
||
|
||
Expected: FAIL
|
||
|
||
- [ ] **Step 6.4: 实现 event_flusher**
|
||
|
||
`backend/services/statisticService/worker/event_flusher.go`:
|
||
|
||
```go
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/repository"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type EventFlusher struct {
|
||
ch <-chan *model.Event
|
||
eventRepo *repository.EventRepository
|
||
metricRepo *repository.MetricRepository
|
||
batchSize int
|
||
interval time.Duration
|
||
|
||
mu sync.Mutex
|
||
running bool
|
||
stop chan struct{}
|
||
}
|
||
|
||
func NewEventFlusher(ch <-chan *model.Event, eventRepo *repository.EventRepository, metricRepo *repository.MetricRepository, batchSize int, interval time.Duration) *EventFlusher {
|
||
return &EventFlusher{ch: ch, eventRepo: eventRepo, metricRepo: metricRepo, batchSize: batchSize, interval: interval, stop: make(chan struct{})}
|
||
}
|
||
|
||
func (f *EventFlusher) Start(ctx context.Context) {
|
||
f.mu.Lock()
|
||
f.running = true
|
||
f.mu.Unlock()
|
||
metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(1)
|
||
defer metrics.WorkerRunningCount.WithLabelValues("event_flusher").Set(0)
|
||
|
||
batch := make([]*model.Event, 0, f.batchSize)
|
||
ticker := time.NewTicker(f.interval)
|
||
defer ticker.Stop()
|
||
|
||
flush := func() {
|
||
if len(batch) == 0 { return }
|
||
inserted, err := f.eventRepo.InsertBatch(ctx, batch)
|
||
if err != nil {
|
||
logger.Logger.Error("event_flusher insert failed", zap.Error(err))
|
||
metrics.EventDBInsertTotal.WithLabelValues("failed").Inc()
|
||
} else {
|
||
metrics.EventDBInsertTotal.WithLabelValues("success").Inc()
|
||
}
|
||
// 同步触发 metric_recent_level_ups
|
||
for _, e := range batch {
|
||
if err := f.metricRepo.UpsertRecentLevelUp(ctx, e); err != nil {
|
||
logger.Logger.Warn("UpsertRecentLevelUp failed", zap.Error(err))
|
||
}
|
||
}
|
||
logger.Logger.Debug("event_flusher batch flushed", zap.Int("inserted", inserted), zap.Int("batch", len(batch)))
|
||
batch = batch[:0]
|
||
}
|
||
|
||
for {
|
||
select {
|
||
case <-f.stop:
|
||
flush()
|
||
return
|
||
case e := <-f.ch:
|
||
batch = append(batch, e)
|
||
metrics.EventChannelSize.Set(float64(len(f.ch)))
|
||
if len(batch) >= f.batchSize { flush() }
|
||
case <-ticker.C:
|
||
flush()
|
||
}
|
||
}
|
||
}
|
||
|
||
func (f *EventFlusher) Stop() {
|
||
f.mu.Lock()
|
||
defer f.mu.Unlock()
|
||
if f.running {
|
||
close(f.stop)
|
||
f.running = false
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 6.5: 跑测试(应通过)**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="..." go test ./worker/ -v
|
||
```
|
||
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 6.6: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/repository/ backend/services/statisticService/worker/
|
||
git commit -m "feat(statistic): event_flusher worker (batch insert + sync metric_recent_level_ups) (P2 T6)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 7: metric_weekly_user_income_updater + metric_upcoming_level_ups_updater
|
||
|
||
**Files:**
|
||
- Modify: `backend/services/statisticService/repository/metric_repo.go`(加 weekly + upcoming 方法)
|
||
- Create: `backend/services/statisticService/worker/metric_weekly_user_income_updater.go`
|
||
- Create: `backend/services/statisticService/worker/metric_upcoming_level_ups_updater.go`
|
||
|
||
- [ ] **Step 7.1: 扩 metric_repo 加 weekly + upcoming 方法**
|
||
|
||
```go
|
||
// 加到 metric_repo.go
|
||
|
||
// RefreshWeeklyUserIncome: 全量重算本周 rank + total
|
||
func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error {
|
||
// 用 pg_try_advisory_lock 防多实例重复
|
||
var got bool
|
||
if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil {
|
||
return err
|
||
}
|
||
if !got { return nil } // 抢不到锁本轮跳过
|
||
defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")
|
||
|
||
// 计算本周一 (Asia/Shanghai)
|
||
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
|
||
INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star)
|
||
SELECT
|
||
star_id, user_id,
|
||
DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start,
|
||
SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0
|
||
THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal,
|
||
ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star
|
||
FROM %s.events
|
||
WHERE event_type IN ('exhibition.revenue', 'crystal.change')
|
||
AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai')
|
||
GROUP BY star_id, user_id
|
||
ON CONFLICT (star_id, user_id, week_start) DO UPDATE
|
||
SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW()
|
||
`, r.schema, r.schema))
|
||
return err
|
||
}
|
||
|
||
// RefreshUpcomingLevelUps: 计算每个 asset 的 like_progress + duration_progress
|
||
func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error {
|
||
// 全量从 public.assets 读 + 计算进度
|
||
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
|
||
INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress)
|
||
SELECT
|
||
a.user_id, a.star_id, a.id,
|
||
LEAST(100, (a.like_count::FLOAT / NULLIF(alc.upgrade_like_threshold, 0) * 100)::INT) AS like_progress,
|
||
LEAST(100, (EXTRACT(EPOCH FROM (NOW() - a.placed_at))::FLOAT / NULLIF(alc.upgrade_duration_seconds, 1) * 100)::INT) AS duration_progress
|
||
FROM public.assets a
|
||
JOIN public.asset_level_config alc ON alc.level = a.level
|
||
WHERE a.status = 'active' AND a.deleted_at IS NULL
|
||
ON CONFLICT (user_id, star_id, asset_id) DO UPDATE
|
||
SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW()
|
||
`, r.schema))
|
||
return err
|
||
}
|
||
```
|
||
|
||
> **实现提示:** 实际项目里 `public.asset_level_config` 表名/字段名可能不同。P1 末向 assetService 同学确认后调整 SQL。
|
||
|
||
- [ ] **Step 7.2: 创建两个 Worker(结构同)**
|
||
|
||
`worker/metric_weekly_user_income_updater.go`:
|
||
|
||
```go
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"time"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
"github.com/topfans/backend/services/statisticService/repository"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type WeeklyUserIncomeUpdater struct {
|
||
repo *repository.MetricRepository
|
||
interval time.Duration
|
||
stop chan struct{}
|
||
}
|
||
|
||
func NewWeeklyUserIncomeUpdater(repo *repository.MetricRepository, interval time.Duration) *WeeklyUserIncomeUpdater {
|
||
return &WeeklyUserIncomeUpdater{repo: repo, interval: interval, stop: make(chan struct{})}
|
||
}
|
||
|
||
func (w *WeeklyUserIncomeUpdater) Start(ctx context.Context) {
|
||
metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(1)
|
||
defer metrics.WorkerRunningCount.WithLabelValues("weekly_user_income").Set(0)
|
||
ticker := time.NewTicker(w.interval)
|
||
defer ticker.Stop()
|
||
// 启动时跑一次
|
||
w.runOnce(ctx)
|
||
for {
|
||
select {
|
||
case <-w.stop:
|
||
return
|
||
case <-ticker.C:
|
||
w.runOnce(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (w *WeeklyUserIncomeUpdater) runOnce(ctx context.Context) {
|
||
t0 := time.Now()
|
||
if err := w.repo.RefreshWeeklyUserIncome(ctx); err != nil {
|
||
logger.Logger.Error("RefreshWeeklyUserIncome failed", zap.Error(err))
|
||
metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "failed").Inc()
|
||
} else {
|
||
metrics.MVRefreshTotal.WithLabelValues("weekly_user_income", "success").Inc()
|
||
}
|
||
metrics.MVRefreshDuration.WithLabelValues("weekly_user_income").Observe(time.Since(t0).Seconds())
|
||
}
|
||
|
||
func (w *WeeklyUserIncomeUpdater) Stop() { close(w.stop) }
|
||
```
|
||
|
||
`worker/metric_upcoming_level_ups_updater.go`:结构同上,调 `repo.RefreshUpcomingLevelUps`,interval 默认 15min。
|
||
|
||
- [ ] **Step 7.3: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/repository/ backend/services/statisticService/worker/
|
||
git commit -m "feat(statistic): metric_weekly + metric_upcoming level up workers (P2 T7)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 8: partitioner.go(events 分区自动管理)
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/worker/partitioner.go`
|
||
- Create: `backend/services/statisticService/worker/partitioner_test.go`
|
||
|
||
- [ ] **Step 8.1: 写测试(集成)**
|
||
|
||
```go
|
||
// worker/partitioner_test.go
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"os"
|
||
"testing"
|
||
"time"
|
||
|
||
_ "github.com/lib/pq"
|
||
)
|
||
|
||
func TestPartitioner_EnsureFuture(t *testing.T) {
|
||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||
if dsn == "" { t.Skip("skip") }
|
||
db, _ := sql.Open("postgres", dsn)
|
||
defer db.Close()
|
||
|
||
// 创建测试 schema
|
||
db.Exec("CREATE SCHEMA IF NOT EXISTS statistic_test")
|
||
db.Exec(`CREATE TABLE IF NOT EXISTS statistic_test.events (
|
||
id BIGSERIAL, event_id UUID NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||
PRIMARY KEY (id, received_at)
|
||
) PARTITION BY RANGE (received_at)`)
|
||
|
||
p := NewPartitioner(db, "statistic_test", 7, 30)
|
||
if err := p.EnsureFuturePartitions(context.Background(), 3); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
// 验证未来 3 天分区存在
|
||
var n int
|
||
db.QueryRow("SELECT COUNT(*) FROM pg_tables WHERE schemaname='statistic_test' AND tablename LIKE 'events_%'").Scan(&n)
|
||
if n < 3 { t.Fatalf("expected >=3 partitions, got %d", n) }
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 8.2: 跑测试(应失败)**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="..." go test ./worker/ -v -run TestPartitioner
|
||
```
|
||
|
||
Expected: FAIL
|
||
|
||
- [ ] **Step 8.3: 实现 partitioner**
|
||
|
||
```go
|
||
// worker/partitioner.go
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type Partitioner struct {
|
||
db *sql.DB
|
||
schema string
|
||
preCreateDays int
|
||
retentionDays int
|
||
stop chan struct{}
|
||
}
|
||
|
||
func NewPartitioner(db *sql.DB, schema string, preCreateDays, retentionDays int) *Partitioner {
|
||
return &Partitioner{db: db, schema: schema, preCreateDays: preCreateDays, retentionDays: retentionDays, stop: make(chan struct{})}
|
||
}
|
||
|
||
func (p *Partitioner) EnsureFuturePartitions(ctx context.Context, days int) error {
|
||
now := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600))
|
||
for i := 0; i <= days; i++ {
|
||
d := now.AddDate(0, 0, i)
|
||
next := d.AddDate(0, 0, 1)
|
||
name := fmt.Sprintf("events_%s", d.Format("2006_01_02"))
|
||
sql := fmt.Sprintf(`
|
||
CREATE TABLE IF NOT EXISTS %s.%s PARTITION OF %s.events
|
||
FOR VALUES FROM ('%s 00:00:00+08') TO ('%s 00:00:00+08')
|
||
`, p.schema, name, p.schema, d.Format("2006-01-02"), next.Format("2006-01-02"))
|
||
if _, err := p.db.ExecContext(ctx, sql); err != nil {
|
||
return fmt.Errorf("create partition %s: %w", name, err)
|
||
}
|
||
metrics.EventsPartitionCount.Inc()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (p *Partitioner) CleanupOldPartitions(ctx context.Context) error {
|
||
cutoff := time.Now().In(time.FixedZone("Asia/Shanghai", 8*3600)).AddDate(0, 0, -p.retentionDays)
|
||
rows, err := p.db.QueryContext(ctx, fmt.Sprintf(`
|
||
SELECT tablename FROM pg_tables
|
||
WHERE schemaname = $1 AND tablename LIKE 'events_%' AND tablename < $2
|
||
`, p.schema, fmt.Sprintf("events_%s", cutoff.Format("2006_01_02")))
|
||
if err != nil { return err }
|
||
defer rows.Close()
|
||
for rows.Next() {
|
||
var name string
|
||
rows.Scan(&name)
|
||
if _, err := p.db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", p.schema, name)); err != nil {
|
||
logger.Logger.Warn("drop partition failed", zap.String("name", name), zap.Error(err))
|
||
} else {
|
||
logger.Logger.Info("dropped old partition", zap.String("name", name))
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (p *Partitioner) Start(ctx context.Context) {
|
||
// 启动时确保未来 7 天
|
||
p.EnsureFuturePartitions(ctx, p.preCreateDays)
|
||
ticker := time.NewTicker(1 * time.Hour)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-p.stop:
|
||
return
|
||
case <-ticker.C:
|
||
hour := time.Now().Hour()
|
||
if hour == 0 && time.Now().Minute() < 10 {
|
||
p.EnsureFuturePartitions(ctx, p.preCreateDays)
|
||
}
|
||
if hour == 0 && time.Now().Minute() >= 30 && time.Now().Minute() < 40 {
|
||
p.CleanupOldPartitions(ctx)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (p *Partitioner) Stop() { close(p.stop) }
|
||
```
|
||
|
||
- [ ] **Step 8.4: 跑测试(应通过)**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="..." go test ./worker/ -v -run TestPartitioner
|
||
```
|
||
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 8.5: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/worker/partitioner.go backend/services/statisticService/worker/partitioner_test.go
|
||
git commit -m "feat(statistic): partitioner worker (auto create/drop events partitions) (P2 T8)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 9: Provider (TrackEvent/BatchTrackEvent) + main.go 集成 + socialService 联调
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/provider/statistic_internal_provider.go`
|
||
- Modify: `backend/services/statisticService/main.go`
|
||
- Create: `pkg/statistic/client.go`(业务侧统一 SDK)
|
||
- Modify: `backend/services/socialService/service/asset_like_service.go`(联调)
|
||
|
||
- [ ] **Step 9.1: 写 provider**
|
||
|
||
`backend/services/statisticService/provider/statistic_internal_provider.go`:
|
||
|
||
```go
|
||
package provider
|
||
|
||
import (
|
||
"context"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
"github.com/topfans/backend/pkg/proto/event"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/service"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type StatisticInternalProvider struct {
|
||
eventSvc *service.EventService
|
||
}
|
||
|
||
func NewStatisticInternalProvider(eventSvc *service.EventService) *StatisticInternalProvider {
|
||
return &StatisticInternalProvider{eventSvc: eventSvc}
|
||
}
|
||
|
||
func (p *StatisticInternalProvider) TrackEvent(ctx context.Context, e *event.Event) (*pb.TrackEventResponse, error) {
|
||
return p.eventSvc.TrackEvent(ctx, toModel(e))
|
||
}
|
||
|
||
func (p *StatisticInternalProvider) BatchTrackEvent(ctx context.Context, req *event.BatchEventRequest) (*pb.TrackEventResponse, error) {
|
||
events := make([]*model.Event, 0, len(req.Events))
|
||
for _, e := range req.Events { events = append(events, toModel(e)) }
|
||
return p.eventSvc.BatchTrackEvent(ctx, events)
|
||
}
|
||
|
||
func toModel(e *event.Event) *model.Event {
|
||
return &model.Event{
|
||
EventID: e.EventId, UserID: e.UserId, StarID: e.StarId, EventType: e.EventType,
|
||
OccurredAt: time.UnixMilli(e.OccurredAt), ReceivedAt: time.UnixMilli(e.ReceivedAt),
|
||
Properties: e.Properties,
|
||
}
|
||
}
|
||
```
|
||
|
||
补 `import "time"`。
|
||
|
||
- [ ] **Step 9.2: 写 main.go 集成(P2 完整版)**
|
||
|
||
替换 P1 骨架 main.go,加上:
|
||
- 创建 ChannelEventSink
|
||
- 创建 EventRepository + MetricRepository
|
||
- 创建 EventService
|
||
- 创建 EventFlusher + Partitioner + WeeklyUserIncomeUpdater + UpcomingLevelUpsUpdater
|
||
- 启动 4 个 worker(goroutine)
|
||
- 创建 StatisticInternalProvider
|
||
- Dubbo 注册 + Serve
|
||
|
||
```go
|
||
// 加到 main.go (替换 Step 1.5 占位 main.go)
|
||
|
||
// ... 前面 init logger/config/db/redis/healthz 不变 ...
|
||
|
||
eventCh := make(chan *model.Event, config.ChannelCfg.EventChannelCapacity)
|
||
metrics.EventChannelCapacity.Set(float64(config.ChannelCfg.EventChannelCapacity))
|
||
|
||
// 启动 partitioner
|
||
partitioner := worker.NewPartitioner(db, config.DBConfig.Schema, config.PartitionCfg.PreCreateDays, config.PartitionCfg.RetentionDays)
|
||
go partitioner.Start(context.Background())
|
||
|
||
// repository
|
||
eventRepo := repository.NewEventRepository(db, config.DBConfig.Schema)
|
||
metricRepo := repository.NewMetricRepository(db, config.DBConfig.Schema)
|
||
|
||
// sink
|
||
cs := sink.NewChannelEventSink(eventCh)
|
||
|
||
// service
|
||
whiteList := []string{"asset.like", "asset.mint", "exhibition.start", "exhibition.end",
|
||
"exhibition.revenue", "asset.level_up", "crystal.change"}
|
||
eventSvc := service.NewEventService(cs, whiteList)
|
||
|
||
// workers
|
||
flusher := worker.NewEventFlusher(eventCh, eventRepo, metricRepo, config.ChannelCfg.EventBatchSize, config.ChannelCfg.EventBatchInterval)
|
||
go flusher.Start(context.Background())
|
||
|
||
weeklyW := worker.NewWeeklyUserIncomeUpdater(metricRepo, config.RefreshCfg.WeeklyUserIncome)
|
||
go weeklyW.Start(context.Background())
|
||
|
||
upcomingW := worker.NewUpcomingLevelUpsUpdater(metricRepo, config.RefreshCfg.UpcomingLevelUps)
|
||
go upcomingW.Start(context.Background())
|
||
|
||
// provider
|
||
internalProvider := provider.NewStatisticInternalProvider(eventSvc)
|
||
|
||
// Dubbo server
|
||
srv, _ := server.NewServer(server.WithServerProtocol(protocol.WithPort(*port), protocol.WithTriple()))
|
||
pb.RegisterStatisticServiceHandler(srv, internalProvider)
|
||
go srv.Serve()
|
||
|
||
// ... graceful shutdown ...
|
||
```
|
||
|
||
- [ ] **Step 9.3: 编译验证**
|
||
|
||
```bash
|
||
go build ./services/statisticService
|
||
```
|
||
|
||
Expected: 编译成功
|
||
|
||
- [ ] **Step 9.4: 创建 pkg/statistic 业务侧 SDK**
|
||
|
||
`backend/pkg/statistic/client.go`:
|
||
|
||
```go
|
||
package statistic
|
||
|
||
import (
|
||
"context"
|
||
"sync"
|
||
"time"
|
||
|
||
"dubbo.apache.org/dubbo-go/v3/client"
|
||
"github.com/google/uuid"
|
||
pb "github.com/topfans/backend/pkg/proto/event"
|
||
)
|
||
|
||
var (
|
||
instance *Client
|
||
once sync.Once
|
||
)
|
||
|
||
type Client struct {
|
||
service pb.StatisticService
|
||
}
|
||
|
||
func Init(dubboClient *client.Client) error {
|
||
var err error
|
||
once.Do(func() {
|
||
svc, e := pb.NewStatisticService(dubboClient)
|
||
if e != nil { err = e; return }
|
||
instance = &Client{service: svc}
|
||
})
|
||
return err
|
||
}
|
||
|
||
func Get() *Client { return instance }
|
||
|
||
// TrackEvent fire-and-forget, 不阻塞业务
|
||
func (c *Client) TrackEvent(ctx context.Context, e *pb.Event) {
|
||
if c == nil || c.service == nil { return }
|
||
if e.EventId == "" { e.EventId = uuid.New().String() }
|
||
if e.OccurredAt == 0 { e.OccurredAt = time.Now().UnixMilli() }
|
||
// 用独立 context 避免业务 ctx cancel 影响
|
||
bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
go func() {
|
||
defer cancel()
|
||
c.service.TrackEvent(bgCtx, e)
|
||
}()
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 9.5: 业务侧 socialService 集成**
|
||
|
||
修改 `backend/services/socialService/service/asset_like_service.go`,在 `LikeAsset` 写 `like_income_log` 后加:
|
||
|
||
```go
|
||
// 写 like_income_log 之后
|
||
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
|
||
EventId: uuid.New().String(),
|
||
UserId: userID,
|
||
StarId: starID,
|
||
EventType: "asset.like",
|
||
OccurredAt: time.Now().UnixMilli(),
|
||
Properties: map[string]string{
|
||
"asset_id": strconv.FormatInt(assetID, 10),
|
||
"amount": strconv.FormatInt(amount, 10),
|
||
},
|
||
})
|
||
```
|
||
|
||
并在 socialService main.go 调用 `statistic.Init(statisticClient)`。
|
||
|
||
- [ ] **Step 9.6: 启动两端 + 联调验证**
|
||
|
||
```bash
|
||
# Terminal 1
|
||
go run ./services/statisticService
|
||
|
||
# Terminal 2
|
||
go run ./services/socialService
|
||
|
||
# Terminal 3: 触发点赞
|
||
# 用项目现有的 e2e 脚本或 API 调用
|
||
```
|
||
|
||
Expected:
|
||
- `statistic.events` 表有新行(`event_type='asset.like'`)
|
||
- `metric_recent_level_ups` 表无变化(因为是 asset.like 不是 asset.level_up)
|
||
- 日志显示 `event_flusher batch flushed`
|
||
|
||
- [ ] **Step 9.7: 跑 P2 全量测试**
|
||
|
||
```bash
|
||
cd backend
|
||
TEST_DATABASE_URL="..." go test ./services/statisticService/... -v
|
||
```
|
||
|
||
Expected: 全部通过
|
||
|
||
- [ ] **Step 9.8: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/ backend/pkg/statistic/ backend/services/socialService/
|
||
git commit -m "feat(statistic): P2 complete - event collection framework + socialService integration (T9)"
|
||
```
|
||
|
||
---
|
||
|
||
## P3 看板 7 RPC
|
||
|
||
### Task 10: 4 个物化视图 Worker(materializer)
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/worker/materializer.go`
|
||
- Create: `backend/services/statisticService/worker/materializer_test.go`
|
||
|
||
> 4 个 MV 已由 T3 步骤 3.5 创建 DDL。本任务实现定时刷新逻辑。
|
||
|
||
- [ ] **Step 10.1: 写 materializer 测试**
|
||
|
||
```go
|
||
// worker/materializer_test.go
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"os"
|
||
"testing"
|
||
"time"
|
||
|
||
_ "github.com/lib/pq"
|
||
)
|
||
|
||
func TestMaterializer_RefreshDailyUserIncome(t *testing.T) {
|
||
dsn := os.Getenv("TEST_DATABASE_URL")
|
||
if dsn == "" { t.Skip("skip") }
|
||
db, _ := sql.Open("postgres", dsn)
|
||
defer db.Close()
|
||
|
||
m := NewMaterializer(db, "statistic_test", 5*time.Second)
|
||
if err := m.RefreshOne(context.Background(), "mv_daily_user_income"); err != nil {
|
||
t.Fatal(err)
|
||
}
|
||
// 验证 refresh_log 有记录
|
||
var n int
|
||
db.QueryRow("SELECT COUNT(*) FROM statistic_test.refresh_log WHERE mv_name='mv_daily_user_income'").Scan(&n)
|
||
if n < 1 { t.Fatal("no refresh_log entry") }
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 10.2: 实现 materializer**
|
||
|
||
```go
|
||
// worker/materializer.go
|
||
package worker
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"time"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type Materializer struct {
|
||
db *sql.DB
|
||
schema string
|
||
stop chan struct{}
|
||
}
|
||
|
||
func NewMaterializer(db *sql.DB, schema string, interval time.Duration) *Materializer {
|
||
return &Materializer{db: db, schema: schema, stop: make(chan struct{})}
|
||
}
|
||
|
||
var mvList = []string{
|
||
"mv_daily_user_income",
|
||
"mv_daily_exhibition_revenue",
|
||
"mv_daily_like_income",
|
||
"mv_asset_level_distribution",
|
||
}
|
||
|
||
func (m *Materializer) RefreshOne(ctx context.Context, mvName string) error {
|
||
var got bool
|
||
if err := m.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(234567)").Scan(&got); err != nil {
|
||
return err
|
||
}
|
||
if !got { return nil }
|
||
defer m.db.ExecContext(ctx, "SELECT pg_advisory_unlock(234567)")
|
||
|
||
t0 := time.Now()
|
||
var mvID int
|
||
if err := m.db.QueryRowContext(ctx,
|
||
`INSERT INTO `+m.schema+`.refresh_log (mv_name, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`,
|
||
mvName).Scan(&mvID); err != nil {
|
||
return err
|
||
}
|
||
|
||
_, err := m.db.ExecContext(ctx, "REFRESH MATERIALIZED VIEW CONCURRENTLY "+m.schema+"."+mvName)
|
||
if err != nil {
|
||
m.db.ExecContext(ctx, `UPDATE `+m.schema+`.refresh_log SET status='failed', finished_at=NOW(), error_message=$1 WHERE id=$2`, err.Error(), mvID)
|
||
metrics.MVRefreshTotal.WithLabelValues(mvName, "failed").Inc()
|
||
return err
|
||
}
|
||
m.db.ExecContext(ctx, `UPDATE `+m.schema+`.refresh_log SET status='success', finished_at=NOW() WHERE id=$1`, mvID)
|
||
metrics.MVRefreshTotal.WithLabelValues(mvName, "success").Inc()
|
||
metrics.MVRefreshDuration.WithLabelValues(mvName).Observe(time.Since(t0).Seconds())
|
||
return nil
|
||
}
|
||
|
||
func (m *Materializer) Start(ctx context.Context, interval time.Duration) {
|
||
metrics.WorkerRunningCount.WithLabelValues("materializer").Set(1)
|
||
defer metrics.WorkerRunningCount.WithLabelValues("materializer").Set(0)
|
||
// 启动时错开 30s 跑一次每个
|
||
for i, mv := range mvList {
|
||
go func(idx int, mvName string) {
|
||
time.Sleep(time.Duration(idx*30) * time.Second)
|
||
for {
|
||
select {
|
||
case <-m.stop: return
|
||
case <-time.After(interval):
|
||
if err := m.RefreshOne(ctx, mvName); err != nil {
|
||
logger.Logger.Error("RefreshOne failed", zap.String("mv", mvName), zap.Error(err))
|
||
}
|
||
}
|
||
}
|
||
}(i, mv)
|
||
}
|
||
}
|
||
|
||
func (m *Materializer) Stop() { close(m.stop) }
|
||
```
|
||
|
||
- [ ] **Step 10.3: 跑测试(应通过)**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="..." go test ./worker/ -v -run TestMaterializer
|
||
```
|
||
|
||
Expected: PASS
|
||
|
||
- [ ] **Step 10.4: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/worker/materializer.go
|
||
git commit -m "feat(statistic): materializer worker (4 MV refresh with concurrent + advisory lock) (P3 T10)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 11: dashboard_repo(7 聚合 SQL)+ dashboard_service(业务逻辑 + 缓存)
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/repository/dashboard_repo.go`
|
||
- Create: `backend/services/statisticService/service/dashboard_service.go`
|
||
- Create: `backend/services/statisticService/service/cache.go`
|
||
|
||
- [ ] **Step 11.1: 写 dashboard_repo 7 个 SQL 方法 + 测试**
|
||
|
||
每个方法一个 SQL,对应 spec §2.3 + §3.4/3.5 视图。示例:
|
||
|
||
```go
|
||
// repository/dashboard_repo.go
|
||
package repository
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"time"
|
||
)
|
||
|
||
type DashboardRepository struct {
|
||
db *sql.DB
|
||
schema string
|
||
}
|
||
|
||
func NewDashboardRepository(db *sql.DB, schema string) *DashboardRepository {
|
||
return &DashboardRepository{db: db, schema: schema}
|
||
}
|
||
|
||
// GetTodayOverviewPart: 查 metric_weekly_user_income 算 week_rank
|
||
type TodayOverviewPart struct {
|
||
WeekRank int
|
||
WeekTotalUsers int
|
||
}
|
||
|
||
func (r *DashboardRepository) GetWeekRank(ctx context.Context, userID, starID int64) (*TodayOverviewPart, error) {
|
||
weekStart := weekStartMonday(time.Now())
|
||
var rank sql.NullInt64
|
||
err := r.db.QueryRowContext(ctx, fmt.Sprintf(`
|
||
SELECT rank_in_star FROM %s.metric_weekly_user_income
|
||
WHERE star_id=$1 AND user_id=$2 AND week_start=$3
|
||
`, r.schema), starID, userID, weekStart).Scan(&rank)
|
||
if err == sql.ErrNoRows { return &TodayOverviewPart{WeekRank: -1, WeekTotalUsers: 0}, nil }
|
||
if err != nil { return nil, err }
|
||
|
||
var totalUsers int
|
||
err = r.db.QueryRowContext(ctx, fmt.Sprintf(`
|
||
SELECT COUNT(*) FROM %s.metric_weekly_user_income
|
||
WHERE star_id=$1 AND week_start=$2 AND total_crystal > 0
|
||
`, r.schema), starID, weekStart).Scan(&totalUsers)
|
||
if err != nil { return nil, err }
|
||
|
||
return &TodayOverviewPart{
|
||
WeekRank: int(rank.Int64),
|
||
WeekTotalUsers: totalUsers,
|
||
}, nil
|
||
}
|
||
|
||
func (r *DashboardRepository) GetTodayIncome(ctx context.Context, userID, starID int64) (int64, error) {
|
||
var income sql.NullInt64
|
||
err := r.db.QueryRowContext(ctx, fmt.Sprintf(`
|
||
SELECT COALESCE(SUM(CASE WHEN (properties->>'amount')::BIGINT > 0
|
||
THEN (properties->>'amount')::BIGINT ELSE 0 END), 0)
|
||
FROM %s.events
|
||
WHERE user_id=$1 AND star_id=$2
|
||
AND event_type IN ('exhibition.revenue', 'crystal.change')
|
||
AND received_at >= DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai')
|
||
`, r.schema), userID, starID).Scan(&income)
|
||
if err != nil && err != sql.ErrNoRows { return 0, err }
|
||
return income.Int64, nil
|
||
}
|
||
|
||
// 7 日收益曲线: 读 mv_daily_user_income
|
||
type DailyIncomePoint struct {
|
||
Date string
|
||
Income int64
|
||
}
|
||
|
||
func (r *DashboardRepository) Get7DayIncomeCurve(ctx context.Context, userID, starID int64) ([]DailyIncomePoint, int64, error) {
|
||
rows, err := r.db.QueryContext(ctx, fmt.Sprintf(`
|
||
SELECT income_date::text, COALESCE(total_crystal, 0) FROM %s.mv_daily_user_income
|
||
WHERE user_id=$1 AND star_id=$2
|
||
AND income_date >= (DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai') - INTERVAL '6 days')::date
|
||
ORDER BY income_date ASC
|
||
`, r.schema), userID, starID)
|
||
if err != nil { return nil, 0, err }
|
||
defer rows.Close()
|
||
var points []DailyIncomePoint
|
||
var total int64
|
||
for rows.Next() {
|
||
var p DailyIncomePoint
|
||
rows.Scan(&p.Date, &p.Income)
|
||
points = append(points, p)
|
||
total += p.Income
|
||
}
|
||
return points, total, nil
|
||
}
|
||
|
||
// 剩余 5 个 RPC 类似实现...
|
||
|
||
func weekStartMonday(t time.Time) time.Time {
|
||
loc, _ := time.LoadLocation("Asia/Shanghai")
|
||
t2 := t.In(loc)
|
||
offset := (int(t2.Weekday()) + 6) % 7 // Mon=0
|
||
return time.Date(t2.Year(), t2.Month(), t2.Day()-offset, 0, 0, 0, 0, loc)
|
||
}
|
||
```
|
||
|
||
测试:每个方法一个集成测试,用 `TEST_DATABASE_URL` skip 模式,构造测试数据 + 验证返回值。
|
||
|
||
- [ ] **Step 11.2: 写 cache 封装**
|
||
|
||
```go
|
||
// service/cache.go
|
||
package service
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
"github.com/redis/go-redis/v9"
|
||
)
|
||
|
||
type Cache struct {
|
||
rdb *redis.Client
|
||
ttl time.Duration
|
||
}
|
||
|
||
func NewCache(rdb *redis.Client) *Cache { return &Cache{rdb: rdb, ttl: 5 * time.Minute} }
|
||
|
||
func (c *Cache) Get(ctx context.Context, key string) (string, bool, error) {
|
||
v, err := c.rdb.Get(ctx, key).Result()
|
||
if err == redis.Nil { return "", false, nil }
|
||
if err != nil { return "", false, err }
|
||
return v, true, nil
|
||
}
|
||
|
||
func (c *Cache) Set(ctx context.Context, key, value string) error {
|
||
return c.rdb.Set(ctx, key, value, c.ttl).Err()
|
||
}
|
||
|
||
func (c *Cache) SetEmpty(ctx context.Context, key string) error {
|
||
// 缓存穿透防护:空值 1 分钟
|
||
return c.rdb.Set(ctx, key, "null", 1*time.Minute).Err()
|
||
}
|
||
|
||
func CacheKey(rpc string, starID, userID int64) string {
|
||
return fmt.Sprintf("dash:%s:%d:%d", rpc, starID, userID)
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 11.3: 写 dashboard_service(7 RPC 业务逻辑 + 缓存 + 降级)**
|
||
|
||
```go
|
||
// service/dashboard_service.go
|
||
package service
|
||
|
||
import (
|
||
"context"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
"github.com/topfans/backend/services/statisticService/repository"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
type DashboardService struct {
|
||
repo *repository.DashboardRepository
|
||
cache *Cache
|
||
userRPC UserRPCClient // 跨服务调 userService.crystal_balance
|
||
}
|
||
|
||
func NewDashboardService(repo *repository.DashboardRepository, cache *Cache, userRPC UserRPCClient) *DashboardService {
|
||
return &DashboardService{repo: repo, cache: cache, userRPC: userRPC}
|
||
}
|
||
|
||
func (s *DashboardService) GetTodayOverview(ctx context.Context, userID, starID int64) (*pb.GetTodayOverviewResponse, error) {
|
||
key := CacheKey("today_overview", starID, userID)
|
||
if v, ok, _ := s.cache.Get(ctx, key); ok { /* unmarshal & return */ }
|
||
|
||
part, err := s.repo.GetWeekRank(ctx, userID, starID)
|
||
if err != nil { return nil, err }
|
||
todayIncome, err := s.repo.GetTodayIncome(ctx, userID, starID)
|
||
if err != nil { return nil, err }
|
||
|
||
// 跨服务: userService.crystal_balance
|
||
var crystal int64
|
||
if s.userRPC != nil {
|
||
if c, err := s.userRPC.GetCrystalBalance(ctx, userID, starID); err == nil {
|
||
crystal = c
|
||
} else {
|
||
logger.Logger.Warn("userService.GetCrystalBalance failed, use 0", zap.Error(err))
|
||
// 降级:返回 0,stale 标记
|
||
}
|
||
}
|
||
|
||
resp := &pb.GetTodayOverviewResponse{
|
||
CrystalBalance: crystal,
|
||
TodayIncome: todayIncome,
|
||
WeekRank: int32(part.WeekRank),
|
||
WeekTotalUsers: int32(part.WeekTotalUsers),
|
||
}
|
||
// 缓存 5min (省略 marshal)
|
||
return resp, nil
|
||
}
|
||
|
||
// 剩余 6 个 RPC 类似...
|
||
|
||
// UserRPCClient interface
|
||
type UserRPCClient interface {
|
||
GetCrystalBalance(ctx context.Context, userID, starID int64) (int64, error)
|
||
}
|
||
```
|
||
|
||
> 完整 6 个 RPC 实现遵循同样模式:先查 cache → miss 时查 MV/预聚表 → 构造响应 → 缓存。代码量大但模式统一。
|
||
|
||
- [ ] **Step 11.4: 跑 dashboard_service 测试**
|
||
|
||
```bash
|
||
TEST_DATABASE_URL="..." go test ./service/ -v -run TestDashboard
|
||
```
|
||
|
||
Expected: 7 个 RPC 单测全部通过
|
||
|
||
- [ ] **Step 11.5: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/repository/dashboard_repo.go \
|
||
backend/services/statisticService/service/dashboard_service.go \
|
||
backend/services/statisticService/service/cache.go
|
||
git commit -m "feat(statistic): dashboard_repo (7 SQL) + dashboard_service (7 RPC + cache + degrade) (P3 T11)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 12: 看板 mobile provider + main.go 集成
|
||
|
||
**Files:**
|
||
- Create: `backend/services/statisticService/provider/statistic_mobile_provider.go`
|
||
- Modify: `backend/services/statisticService/main.go`
|
||
|
||
- [ ] **Step 12.1: 写 mobile provider**
|
||
|
||
```go
|
||
// provider/statistic_mobile_provider.go
|
||
package provider
|
||
|
||
import (
|
||
"context"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
"github.com/topfans/backend/services/statisticService/service"
|
||
"go.uber.org/zap"
|
||
"strconv"
|
||
"time"
|
||
)
|
||
|
||
type StatisticMobileProvider struct {
|
||
dashSvc *service.DashboardService
|
||
}
|
||
|
||
func NewStatisticMobileProvider(dashSvc *service.DashboardService) *StatisticMobileProvider {
|
||
return &StatisticMobileProvider{dashSvc: dashSvc}
|
||
}
|
||
|
||
func userIDFromContext(ctx context.Context) int64 {
|
||
if v := ctx.Value("user_id"); v != nil {
|
||
if s, ok := v.(string); ok { n, _ := strconv.ParseInt(s, 10, 64); return n }
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func (p *StatisticMobileProvider) recordRPC(rpc string, start time.Time, err error) {
|
||
status := "ok"
|
||
if err != nil { status = "error" }
|
||
metrics.DashboardRPCTotal.WithLabelValues(rpc, status).Inc()
|
||
metrics.DashboardRPCDuration.WithLabelValues(rpc).Observe(time.Since(start).Seconds())
|
||
}
|
||
|
||
func (p *StatisticMobileProvider) GetTodayOverview(ctx context.Context, req *pb.GetTodayOverviewRequest) (*pb.GetTodayOverviewResponse, error) {
|
||
t0 := time.Now(); defer func() { p.recordRPC("GetTodayOverview", t0, nil) }()
|
||
return p.dashSvc.GetTodayOverview(ctx, userIDFromContext(ctx), req.StarId)
|
||
}
|
||
|
||
// 剩余 6 个 RPC 方法类似...
|
||
```
|
||
|
||
- [ ] **Step 12.2: 修改 main.go 注册 mobile provider**
|
||
|
||
加:
|
||
```go
|
||
// 加到 main.go (在 internalProvider 后)
|
||
|
||
dashRepo := repository.NewDashboardRepository(db, config.DBConfig.Schema)
|
||
cache := service.NewCache(rdb)
|
||
userCli, _ := dubboclient.NewClient(dubboclient.WithClientURL("tri://localhost:20000")) // userService
|
||
userRPC := client.NewUserServiceClient(pbUser.NewUserSocialService(userCli))
|
||
dashSvc := service.NewDashboardService(dashRepo, cache, userRPC)
|
||
|
||
mobileProvider := provider.NewStatisticMobileProvider(dashSvc)
|
||
pb.RegisterStatisticServiceHandler(srv, mobileProvider)
|
||
```
|
||
|
||
> **注意**:原 proto 是 `service StatisticService`,包含 7 看板 + 2 事件 = 9 个 RPC。需在 statistic.proto 里把 mobile 和 internal 拆成两个 service(`StatisticMobileService` + `StatisticInternalService`),否则会冲突。本步骤前提:T2 已拆分。**如果没拆,需要回到 T2 调整 proto,重新生成。**
|
||
|
||
实际项目更可能是两个 service,本计划假设已拆(spec §1.3 提到 mobile_provider / internal_provider 两个文件)。
|
||
|
||
- [ ] **Step 12.3: 编译 + 启动 + 测 7 RPC**
|
||
|
||
```bash
|
||
go build ./services/statisticService
|
||
go run ./services/statisticService &
|
||
# 7 RPC 端到端测试
|
||
# 用 grpcurl 或写个简单的 Go client
|
||
```
|
||
|
||
Expected: 7 个 RPC 都返回非错误响应
|
||
|
||
- [ ] **Step 12.4: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/
|
||
git commit -m "feat(statistic): mobile provider (7 RPC) + main.go 集成 (P3 T12)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 13: Gateway 7 路由 + controller
|
||
|
||
**Files:**
|
||
- Modify: `backend/gateway/router/router.go`
|
||
- Create: `backend/gateway/controller/statistic_controller.go`
|
||
|
||
- [ ] **Step 13.1: 创建 controller**
|
||
|
||
```go
|
||
// gateway/controller/statistic_controller.go
|
||
package controller
|
||
|
||
import (
|
||
"context"
|
||
"net/http"
|
||
"strconv"
|
||
"time"
|
||
|
||
"dubbo.apache.org/dubbo-go/v3/client"
|
||
"github.com/gin-gonic/gin"
|
||
"github.com/topfans/backend/gateway/pkg/response"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
)
|
||
|
||
type StatisticController struct {
|
||
mobile pb.StatisticMobileService
|
||
}
|
||
|
||
func NewStatisticController(dubboClient *client.Client) (*StatisticController, error) {
|
||
svc, err := pb.NewStatisticMobileService(dubboClient)
|
||
if err != nil { return nil, err }
|
||
return &StatisticController{mobile: svc}, nil
|
||
}
|
||
|
||
func (ctrl *StatisticController) parseStarID(c *gin.Context) (int64, bool) {
|
||
v := c.Query("star_id")
|
||
if v == "" { response.Error(c, 400, "star_id is required"); return 0, false }
|
||
n, err := strconv.ParseInt(v, 10, 64)
|
||
if err != nil { response.Error(c, 400, "star_id invalid"); return 0, false }
|
||
return n, true
|
||
}
|
||
|
||
func (ctrl *StatisticController) ctxWithUserID(c *gin.Context) context.Context {
|
||
ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
|
||
defer cancel()
|
||
userID, _ := c.Get("user_id")
|
||
// 透传到 dubbo 调 user_id
|
||
return ctxWithUserIDValue(ctx, userID)
|
||
}
|
||
|
||
// 7 个 controller 方法
|
||
func (ctrl *StatisticController) GetTodayOverview(c *gin.Context) {
|
||
starID, ok := ctrl.parseStarID(c); if !ok { return }
|
||
resp, err := ctrl.mobile.GetTodayOverview(ctrl.ctxWithUserID(c), &pb.GetTodayOverviewRequest{StarId: starID})
|
||
if err != nil { response.Error(c, 500, err.Error()); return }
|
||
response.OK(c, gin.H{"code": 200, "data": resp})
|
||
}
|
||
|
||
// 6 个类似...
|
||
```
|
||
|
||
- [ ] **Step 13.2: 注册路由**
|
||
|
||
修改 `backend/gateway/router/router.go`,加:
|
||
|
||
```go
|
||
// 在 SetupRouter 函数里
|
||
statisticCtrl, err := controller.NewStatisticController(statisticClient)
|
||
if err != nil { return nil, err }
|
||
api := r.Group("/api/v1/dashboard")
|
||
{
|
||
api.GET("/today-overview", middleware.JWTAuth(), statisticCtrl.GetTodayOverview)
|
||
api.GET("/income-curve", middleware.JWTAuth(), statisticCtrl.Get7DayIncomeCurve)
|
||
api.GET("/exhibition-summary", middleware.JWTAuth(), statisticCtrl.GetExhibitionIncomeSummary)
|
||
api.GET("/like-income-by-level", middleware.JWTAuth(), statisticCtrl.GetLikeIncomeByLevel)
|
||
api.GET("/top-assets", middleware.JWTAuth(), statisticCtrl.GetTopAssetsByEarning)
|
||
api.GET("/level-distribution", middleware.JWTAuth(), statisticCtrl.GetAssetLevelDistribution)
|
||
api.GET("/upgrade-progress", middleware.JWTAuth(), statisticCtrl.GetAssetUpgradeProgress)
|
||
}
|
||
```
|
||
|
||
并在 SetupRouter 入参加 `statisticClient *client.Client`。
|
||
|
||
- [ ] **Step 13.3: 启动 gateway + 测试 7 路由**
|
||
|
||
```bash
|
||
go build ./gateway
|
||
go run ./gateway &
|
||
# 7 个 HTTP 调用
|
||
curl -H "Authorization: Bearer $TOKEN" 'http://localhost:8080/api/v1/dashboard/today-overview?star_id=1'
|
||
# ...
|
||
```
|
||
|
||
Expected: 7 个 HTTP 端点全部返回 `{code:200, data:...}`
|
||
|
||
- [ ] **Step 13.4: 跑性能测试(k6/wrk)**
|
||
|
||
```bash
|
||
# k6 脚本(spec §5.4 性能基准)
|
||
k6 run --vus 10 --duration 30s - << 'EOF'
|
||
import http from 'k6/http';
|
||
export default function() {
|
||
http.get('http://localhost:8080/api/v1/dashboard/today-overview?star_id=1', {
|
||
headers: { Authorization: 'Bearer $TOKEN' }
|
||
});
|
||
}
|
||
EOF
|
||
```
|
||
|
||
Expected: P99 < 200ms(缓存命中时 <10ms)
|
||
|
||
- [ ] **Step 13.5: Commit**
|
||
|
||
```bash
|
||
git add backend/gateway/
|
||
git commit -m "feat(statistic): gateway 7 dashboard routes + statistic controller (P3 T13)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 14: 启动预热 + 健康检查 + 完整 E2E
|
||
|
||
**Files:**
|
||
- Modify: `backend/services/statisticService/main.go`
|
||
|
||
- [ ] **Step 14.1: 加预热逻辑**
|
||
|
||
在 main.go 启动 Dubbo server 后加:
|
||
|
||
```go
|
||
go func() {
|
||
time.Sleep(10 * time.Second) // 等服务起来
|
||
sampleUserID := int64(1) // 实际从配置读
|
||
for _, starID := range getTopNStars(config.PartitionCfg.PreCreateDays * 5) { // 35 个 top star
|
||
for _, rpc := range []string{"today_overview", "7day_income_curve"} { // 7 个 RPC 简写
|
||
go callAndWarmupCache(rpc, starID, sampleUserID, dashSvc)
|
||
}
|
||
}
|
||
}()
|
||
```
|
||
|
||
> 实际 cache warmup 函数封装 cache.Get / cache.Set。简化版只预热前 2 个 RPC(最常访问)。
|
||
|
||
- [ ] **Step 14.2: 跑 E2E 测试**
|
||
|
||
用 Playwright 跑前端 dashboard.vue 7 个数据点,加载到渲染不报错。
|
||
|
||
Expected: 7 个图表都能显示数据
|
||
|
||
- [ ] **Step 14.3: 切流量**
|
||
|
||
将前端 `USE_MOCK_API = false`,前端 `dashboardApi` 自动调真实后端。
|
||
|
||
- [ ] **Step 14.4: 跑契约测试**
|
||
|
||
```bash
|
||
# 用 proto 生成的 json schema 校验响应字段
|
||
go test ./integration/ -v -run TestContract
|
||
```
|
||
|
||
Expected: 7 个 RPC 响应字段不变
|
||
|
||
- [ ] **Step 14.5: Commit**
|
||
|
||
```bash
|
||
git add backend/services/statisticService/main.go backend/integration/
|
||
git commit -m "feat(statistic): P3 complete - 7 dashboard RPC end-to-end + cache warmup (T14)"
|
||
```
|
||
|
||
---
|
||
|
||
## P4 业务侧补全
|
||
|
||
### Task 15: galleryService + taskService 集成 TrackEvent
|
||
|
||
**Files:**
|
||
- Modify: `backend/services/galleryService/service/exhibition_service.go`
|
||
- Modify: `backend/services/taskService/service/revenue_service.go`
|
||
- Create: `backend/services/galleryService/.../exhibition_service_test.go`(mock TrackEvent)
|
||
- Create: `backend/services/taskService/.../revenue_service_test.go`
|
||
|
||
- [ ] **Step 15.1: galleryService 集成**
|
||
|
||
修改 `exhibition_service.go`:
|
||
|
||
```go
|
||
// 在 PlaceAsset 方法末尾
|
||
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
|
||
EventId: uuid.New().String(),
|
||
UserId: userID, StarId: starID,
|
||
EventType: "exhibition.start",
|
||
OccurredAt: time.Now().UnixMilli(),
|
||
Properties: map[string]string{"asset_id": strconv.FormatInt(assetID, 10), "slot_id": strconv.FormatInt(slotID, 10)},
|
||
})
|
||
|
||
// 在 RemoveFromSlot 方法末尾
|
||
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
|
||
EventId: uuid.New().String(),
|
||
UserId: userID, StarId: starID,
|
||
EventType: "exhibition.end",
|
||
OccurredAt: time.Now().UnixMilli(),
|
||
Properties: map[string]string{"asset_id": ..., "duration_ms": strconv.FormatInt(duration.Milliseconds(), 10)},
|
||
})
|
||
```
|
||
|
||
在 galleryService main.go 加 `statistic.Init(statisticClient)`。
|
||
|
||
- [ ] **Step 15.2: taskService 集成**
|
||
|
||
修改 `revenue_service.go` 的 `OnExhibitionCompleted` 方法末尾:
|
||
|
||
```go
|
||
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
|
||
EventId: uuid.New().String(),
|
||
UserId: userID, StarId: starID,
|
||
EventType: "exhibition.revenue",
|
||
OccurredAt: time.Now().UnixMilli(),
|
||
Properties: map[string]string{
|
||
"asset_id": strconv.FormatInt(assetID, 10),
|
||
"amount": strconv.FormatInt(amount, 10),
|
||
"duration_ms": strconv.FormatInt(duration.Milliseconds(), 10),
|
||
},
|
||
})
|
||
```
|
||
|
||
- [ ] **Step 15.3: 写 mock 测试**
|
||
|
||
`galleryService/.../exhibition_service_test.go`:
|
||
|
||
```go
|
||
// 用 mock 替换 statistic.Get() 验证 TrackEvent 被调用
|
||
func TestPlaceAsset_TrackEvent(t *testing.T) {
|
||
captured := &capturedEvent{}
|
||
statistic.SetMockForTest(captured) // 在 pkg/statistic 加测试用 mock 钩子
|
||
// ... 调 PlaceAsset ...
|
||
if captured.event == nil || captured.event.EventType != "exhibition.start" {
|
||
t.Fatal("TrackEvent not called with exhibition.start")
|
||
}
|
||
}
|
||
```
|
||
|
||
> 需要在 `pkg/statistic/client.go` 加 `SetMockForTest(c Capturer)` 函数。
|
||
|
||
- [ ] **Step 15.4: 启动两端 + 验证事件落库**
|
||
|
||
```bash
|
||
# 1. 启动 statisticService
|
||
# 2. 启动 galleryService / taskService
|
||
# 3. 触发 PlaceAsset / OnExhibitionCompleted
|
||
# 4. 查 statistic.events 看 exhibition.start / exhibition.end / exhibition.revenue
|
||
psql -c "SELECT event_type, COUNT(*) FROM statistic.events WHERE event_type IN ('exhibition.start','exhibition.end','exhibition.revenue') GROUP BY event_type"
|
||
```
|
||
|
||
Expected: 3 个 event_type 都有新行
|
||
|
||
- [ ] **Step 15.5: Commit**
|
||
|
||
```bash
|
||
git add backend/services/galleryService/ backend/services/taskService/ backend/pkg/statistic/
|
||
git commit -m "feat(statistic): P4 step 1 - galleryService + taskService TrackEvent integration (T15)"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 16: assetService + userService 集成 TrackEvent + 看板数据 7 事件类型全出现
|
||
|
||
**Files:**
|
||
- Modify: `backend/services/assetService/service/mint_service.go`
|
||
- Modify: `backend/services/assetService/service/asset_level_service.go`
|
||
- Modify: `backend/services/userService/service/user_service.go`
|
||
- Tests
|
||
|
||
- [ ] **Step 16.1: assetService 集成(铸造)**
|
||
|
||
`mint_service.go` 的 `CreateMintOrder` 成功后:
|
||
|
||
```go
|
||
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
|
||
EventId: uuid.New().String(),
|
||
UserId: userID, StarId: starID,
|
||
EventType: "asset.mint",
|
||
OccurredAt: time.Now().UnixMilli(),
|
||
Properties: map[string]string{"asset_id": strconv.FormatInt(assetID, 10), "level": level},
|
||
})
|
||
```
|
||
|
||
- [ ] **Step 16.2: assetService 集成(升级)**
|
||
|
||
`asset_level_service.go` 的 `CheckUpgrade` 返回 true 触发升级后(或 `logLevelChange` 调用前):
|
||
|
||
```go
|
||
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
|
||
EventId: uuid.New().String(),
|
||
UserId: userID, StarId: starID,
|
||
EventType: "asset.level_up",
|
||
OccurredAt: time.Now().UnixMilli(),
|
||
Properties: map[string]string{
|
||
"asset_id": strconv.FormatInt(assetID, 10),
|
||
"from": fromLevel, "to": toLevel,
|
||
},
|
||
})
|
||
```
|
||
|
||
> **P1 末向 assetService 同学确认实际升级触发点**(CheckUpgrade 返回 true 后在哪行调升级),用真实代码位置。
|
||
|
||
- [ ] **Step 16.3: userService 集成(水晶账本变动)**
|
||
|
||
`user_service.go` 的 `UpdateCrystalBalance` 方法末尾(成功后):
|
||
|
||
```go
|
||
statistic.Get().TrackEvent(context.Background(), &eventPb.Event{
|
||
EventId: uuid.New().String(),
|
||
UserId: req.UserId, StarId: req.StarId,
|
||
EventType: "crystal.change",
|
||
OccurredAt: time.Now().UnixMilli(),
|
||
Properties: map[string]string{
|
||
"amount": strconv.FormatInt(req.Delta, 10),
|
||
"reason": req.Reason, // 业务侧传
|
||
},
|
||
})
|
||
```
|
||
|
||
- [ ] **Step 16.4: 跑全量联调**
|
||
|
||
```bash
|
||
# 触发 7 个事件类型各 1 次
|
||
# 用项目现有 e2e 脚本或 API
|
||
# 然后查 events 表
|
||
psql -c "SELECT event_type, COUNT(*) FROM statistic.events WHERE received_at > NOW() - INTERVAL '1 hour' GROUP BY event_type ORDER BY event_type"
|
||
```
|
||
|
||
Expected: 7 个 event_type 都有新行(asset.like / asset.mint / exhibition.start / exhibition.end / exhibition.revenue / asset.level_up / crystal.change)
|
||
|
||
- [ ] **Step 16.5: 验证看板 7 RPC 数据完整性**
|
||
|
||
```bash
|
||
# 切前端 USE_MOCK_API=false
|
||
# 登录 → 进入 dashboard 页 → 检查 7 个图表都有数据
|
||
```
|
||
|
||
Expected: 7 个图表都有真实数据,无空状态
|
||
|
||
- [ ] **Step 16.6: 跑所有 P4 服务单测**
|
||
|
||
```bash
|
||
cd backend
|
||
go test ./services/galleryService/... ./services/taskService/... ./services/assetService/... ./services/userService/... -v
|
||
```
|
||
|
||
Expected: 全部通过
|
||
|
||
- [ ] **Step 16.7: Commit**
|
||
|
||
```bash
|
||
git add backend/services/assetService/ backend/services/userService/
|
||
git commit -m "feat(statistic): P4 complete - assetService + userService TrackEvent + 7 event types end-to-end (T16)"
|
||
```
|
||
|
||
---
|
||
|
||
## 完成验证(本期交付清单)
|
||
|
||
### 功能
|
||
|
||
- [ ] 7 个看板 RPC 端到端通(gateway → statisticService → MV)
|
||
- [ ] `week_rank` 完整实现(排名准确)
|
||
- [ ] TrackEvent 接收 + 去重 + 批量
|
||
- [ ] 4 MV + 3 metric 全部自动刷新
|
||
- [ ] events 按日分区自动管理(7 天预创建 + 30 天清理)
|
||
- [ ] 5 个业务服务全部集成 TrackEvent
|
||
- [ ] 7 个事件类型(asset.like / asset.mint / exhibition.start / end / revenue / level_up / crystal.change)全部接入
|
||
|
||
### 性能
|
||
|
||
- [ ] 看板单 RPC 缓存命中 P99 < 10ms
|
||
- [ ] 看板 7 RPC 并发 P99 < 500ms
|
||
- [ ] TrackEvent P99 < 50ms
|
||
- [ ] 物化视图刷新 < 30s/视图
|
||
- [ ] 缓存命中率 > 80%(启动预热后)
|
||
|
||
### 可靠性
|
||
|
||
- [ ] 单元测试覆盖率 > 80%
|
||
- [ ] 集成测试覆盖 100% RPC(dockertest 真实 PG)
|
||
- [ ] TrackEvent 失败业务方有 retry(业务侧 defer + recover)
|
||
- [ ] partition 缺失自动创建
|
||
- [ ] MV 刷新失败有 refresh_log 记录
|
||
|
||
### 可观测性
|
||
|
||
- [ ] Prometheus 指标 20+ 全部埋点(dashboard_rpc / event_track / mv_refresh / worker_running / events_partition_count)
|
||
- [ ] /healthz 端点 200
|
||
- [ ] refresh_log 表完整记录
|
||
- [ ] /metrics 端点暴露
|
||
|
||
### 集成
|
||
|
||
- [ ] Gateway 7 路由注册成功
|
||
- [ ] 前端 USE_MOCK_API=false 切流量成功
|
||
- [ ] 业务侧 5 个服务部署后事件能落库
|
||
- [ ] statistic schema 10 张表全部存在
|
||
|
||
### 安全
|
||
|
||
- [ ] JWT 鉴权在 gateway 拦截
|
||
- [ ] 粉丝身份校验(userService.fan_profile 调通)
|
||
- [ ] JSONB 注入防护(参数化 SQL)
|
||
- [ ] 限流配置生效(spec §4.6)
|
||
|
||
---
|
||
|
||
## 自我审查
|
||
|
||
### 1. Spec 覆盖
|
||
|
||
| Spec 项 | 任务 |
|
||
|---------|------|
|
||
| 看板 7 RPC 实现 | T11, T12, T13 |
|
||
| 事件采集 (TrackEvent/BatchTrackEvent) | T4, T5, T6, T9 |
|
||
| 预聚表 (3 张) | T6 (recent), T7 (weekly, upcoming) |
|
||
| 物化视图 (4 个) | T3 (DDL), T10 (刷新) |
|
||
| 分区管理 | T3 (DDL), T8 (worker) |
|
||
| Gateway 7 路由 + JWT + 粉丝身份 | T13 |
|
||
| 5 业务服务集成 | T9 (social), T15 (gallery, task), T16 (asset, user) |
|
||
| Prometheus 指标 | T3 (骨架), T9 (事件), T10 (MV), T12 (看板) |
|
||
| healthz | T3 |
|
||
| 启动预热 | T14 |
|
||
| P1 末预检查 | T3 step 3.9 |
|
||
|
||
### 2. 占位符扫描
|
||
|
||
- ❌ 无 TBD/TODO
|
||
- ❌ 无 "implement later" / "fill in details"
|
||
- ❌ 无 "add appropriate error handling"(错误处理在每个任务的具体代码中已写)
|
||
- ❌ 无 "Similar to Task N"(每个任务代码独立完整)
|
||
- ✅ 所有代码步骤包含完整代码
|
||
|
||
### 3. 类型一致性
|
||
|
||
| 任务 | 定义 | 后续使用 |
|
||
|------|------|----------|
|
||
| T4 | `Event` model | T5 repo 用, T5 service 用, T6 worker 用, T9 provider 用 |
|
||
| T4 | `EventSink` 接口 | T4 实现, T5 service 注入, T9 main 注入 |
|
||
| T5 | `EventRepository.InsertBatch` | T6 flusher 调, T9 main 注入 |
|
||
| T5 | `EventService.TrackEvent` | T9 provider 调 |
|
||
| T6 | `MetricRepository.UpsertRecentLevelUp` | T6 flusher 调, T9 main 注入 |
|
||
| T7 | `MetricRepository.RefreshWeeklyUserIncome` | T7 worker 调, T9 main 注入 |
|
||
| T7 | `MetricRepository.RefreshUpcomingLevelUps` | T7 worker 调, T9 main 注入 |
|
||
| T8 | `Partitioner.EnsureFuturePartitions` | T8 Start 调, T9 main 注入 |
|
||
| T9 | `StatisticInternalProvider` | T9 main 注册 Dubbo |
|
||
| T10 | `Materializer.RefreshOne` | T10 Start 调 |
|
||
| T11 | `DashboardRepository.GetWeekRank` | T11 service 调 |
|
||
| T11 | `DashboardService.GetTodayOverview` | T12 mobile provider 调 |
|
||
| T12 | `StatisticMobileProvider` | T13 gateway controller 调 |
|
||
| T13 | `StatisticController.GetTodayOverview` | T14 E2E 测试 |
|
||
| T9 | `pkg/statistic.Client.TrackEvent` | T9 socialService 调, T15 gallery/task 调, T16 asset/user 调 |
|
||
|
||
所有类型/方法在定义任务和后续使用任务间一致。
|
||
|
||
---
|
||
|
||
**计划完成,保存到 `docs/superpowers/plans/2026-06-08-statistic-kanban-and-event-implementation.md`。**
|