topfans/docs/superpowers/plans/2026-06-12-load-testing.md

4313 lines
122 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 后端服务压力测试实施计划
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 为部署在阿里云单机(`101.132.250.62`4G/2Cdocker-compose的 TopFans 后端微服务落地一套可执行、可恢复、可重复的压力测试方案覆盖容量评估、SLA 基线、稳定性、破坏性四类目标。
**Architecture:**
- 工具栈:自研 Go 二进制 `loadgen`(压源)+ `seed`(数据准备)+ Bash 监控/恢复脚本
- 部署:压源在**同地域阿里云 ECS**,被压机为 prod `101.132.250.62`
- 数据隔离:所有测试数据用 `star_id=999900` 物理隔离token 预签到 `users.csv`
- 监控分层loadgen 客户端事件 + 主机 `sample.sh` + 可选 Prometheus 导出
- 执行窗口:凌晨 02:00-06:00 业务低峰
**Tech Stack:** Go 1.25.5、`net/http`、`HdrHistogram-go`、`gonum/plot`、`golang-migrate`、Bash、PostgreSQL 14+、Redis、Prometheus + Grafana、cAdvisor、postgres-exporter、redis-exporter
**Spec:** `docs/superpowers/specs/2026-06-12-load-testing-design.md`
---
## 文件结构
### 1. 压测工具(仓库内,部署到压源/被压机)
```
backend/scripts/loadgen/ # 新增
├── seed/ # 新增:数据准备工具
│ ├── main.go # seed CLI 入口
│ ├── stars.go # 测试明星
│ ├── users.go # 1000 测试用户
│ ├── profiles.go # fan_profiles + 水晶
│ ├── slots_and_exhibits.go # booth_slots + exhibitions
│ ├── assets.go # 5000 资产
│ ├── friendships.go # 双向好友
│ ├── tokens.go # 复用 pkg/jwt 签 token
│ ├── sequences.go # 重置序列(CLAUDE.md 规范)
│ ├── cleanup.go # 测试数据清理
│ ├── seed_test.go # stars/users 单元测试
│ └── README.md
├── loadgen/ # 新增:压测主程序
│ ├── main.go # loadgen CLI 入口
│ ├── lib/
│ │ ├── ramp.go # 阶梯调度器
│ │ ├── ramp_test.go
│ │ ├── circuit.go # 6 维红线判停
│ │ ├── circuit_test.go
│ │ ├── hdr.go # HDR 直方图封装
│ │ ├── hdr_test.go
│ │ ├── csv.go # users.csv 加载
│ │ ├── csv_test.go
│ │ ├── client.go # http.Transport
│ │ ├── client_test.go
│ │ ├── ssh_metrics.go # ssh tail metrics-feed
│ │ └── log.go # stderr 行仪表
│ ├── scenarios/
│ │ ├── s1_login.go
│ │ ├── s2_read.go
│ │ ├── s3_like.go
│ │ ├── s4_mint.go # 含 reset 调度
│ │ ├── s5_dashboard.go
│ │ ├── s6_ranking.go
│ │ ├── s7_place.go
│ │ └── scenarios_test.go # 构造逻辑单测
│ ├── reporter/
│ │ ├── json.go
│ │ ├── csv.go
│ │ ├── plot.go # gonum/plot 三联图
│ │ └── markdown.go
│ ├── preflight.go # 7 项开压前检查
│ ├── verify.go # 压测后完整性校验
│ └── main_test.go
├── monitor/ # 新增
│ ├── sample.sh # 后台采样
│ ├── docker-compose.monitor.yml # Prometheus 栈
│ └── grafana-dashboards/
│ ├── 01-host.json # 整机
│ ├── 02-containers.json # 容器维度
│ ├── 03-postgres.json # PG 健康
│ └── 04-business.json # 业务指标
├── recover/ # 新增
│ ├── emergency-stop.sh
│ └── restore-from-backup.sh
└── reports/ # 新增(放入 .gitignore)
└── .gitkeep
```
### 2. 部署到 prod 的脚本
```
/opt/topfans/loadtest/ # 部署到 prod
├── seed # seed 二进制
├── scripts/
│ └── mint_reset.sh # 铸造数据重置
├── monitor/
│ └── sample.sh
└── recover/
├── emergency-stop.sh
└── restore-from-backup.sh
```
### 3. 压源 ECS 上的工具
```
/opt/loadgen/ # 部署到压源
├── loadgen # loadgen 二进制
├── monitor/
│ └── docker-compose.monitor.yml # 可选
├── grafana-dashboards/ # 可选
└── reports/ # 报告输出
```
### 4. 修改/创建的文件
- **修改**: `docker/docker-compose.prod.yml``POSTGRES_MAX_CONNECTIONS: 100 → 50`
- **修改**: `backend/.gitignore` — 添加 `backend/scripts/loadgen/reports/`
- **创建**: `docs/loadtest/round1/.gitkeep`
- **创建**: `docs/loadtest/round1/prod-vs-local-schema-diff.md`(Day 1 上午产出)
---
## 实施阶段总览
| 阶段 | 时长 | 任务范围 | 时段 |
|---|---|---|---|
| Phase 1: 环境与数据验证 | 1.5h | Task 1-4 | Day 1 上午 |
| Phase 2: PG max_connections 修复 | 0.5h | Task 5 | Day 1 下午 14:00 |
| Phase 3: 压源 ECS 准备 | 1h | Task 6-7 | Day 1 下午 |
| Phase 4: seed 工具开发 | 3h | Task 8-19 | Day 1 下午-晚上 |
| Phase 5: loadgen 库开发 | 2h | Task 20-29 | Day 1 晚上 |
| Phase 6: loadgen 场景开发 | 2h | Task 30-37 | Day 1 晚上-次日凌晨 |
| Phase 7: 报告 + 监控 + 恢复 | 1.5h | Task 38-46 | 次日凌晨前 |
| Phase 8: 部署 + 预演 | 1.5h | Task 47-50 | Day 1 23:00 |
| Phase 9: 第一轮压测窗口 | 4h | Task 51-58 | Day 2 02:00-06:00 |
| Phase 10: 报告与决策 | 1h | Task 59-61 | Day 3 |
**第一轮总投入**:约 18 小时(分散在 Day 1-3),实际有效压测窗口 4 小时。
---
## Phase 1: 环境与数据验证(Day 1 上午)
### Task 1: SSH 验证 prod 与本地 docker 可达
**Files:**
- 无文件创建,仅做环境检查
- [ ] **Step 1: SSH 到 prod 验证可达**
Run:
```bash
ssh -o ConnectTimeout=5 root@101.132.250.62 "echo connected && uname -a"
```
Expected: 输出 `connected` + Linux 内核信息。
- [ ] **Step 2: 列出 prod 容器并核对数量**
Run:
```bash
ssh root@101.132.250.62 "docker ps --format '{{.Names}}' | sort"
```
Expected: 11 个容器,以 `topfans-` 为前缀。
- [ ] **Step 3: 验证本地 docker 可达**
Run:
```bash
docker ps --format '{{.Names}}' | sort
```
Expected: 看到 `topfans-postgres` 等容器(本地可能更多)。
---
### Task 2: Prod 库 schema 调研并产出 diff 报告
**Files:**
- 创建: `docs/loadtest/round1/prod-vs-local-schema-diff.md`
- [ ] **Step 1: 在 prod 上 dump 关键表结构**
Run:
```bash
ssh root@101.132.250.62 <<'EOF'
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
for t in users fan_profiles assets asset_likes exhibitions booth_slots stars friendships mint_orders mint_cost_config crystal_transaction_records; do
echo "=== $t ==="
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -c "\d+ $t"
done
EOF
```
Expected: 每张表的字段、索引、外键、约束。
- [ ] **Step 2: 在本地做同样 dump 并对比**
Run:
```bash
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
for t in users fan_profiles assets asset_likes exhibitions booth_slots stars friendships mint_orders mint_cost_config crystal_transaction_records; do
echo "=== $t ==="
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d top-fans -c "\d+ $t"
done
```
Expected: 本地库为 `top-fans`(带横线),prod 为 `topfans`(无横线)。
- [ ] **Step 3: 写出 diff 报告**
创建 `docs/loadtest/round1/prod-vs-local-schema-diff.md`:
```markdown
# Prod vs Local Schema Diff
> **日期**: 2026-06-12
> **调研方式**: 远程 `psql \d+` + 本地 `psql \d+` 对比
## 库名差异
- 本地 docker: `top-fans` (带横线)
- prod: `topfans` (无横线)
- → 所有工具的 DSN 必须参数化,seed/loadgen 启动时显式传 `--db-name`
## 字段差异
| 表 | 本地 | prod | 差异 | 影响 |
|---|---|---|---|---|
| (填充实际差异) | | | | |
## 索引差异
| 表 | 索引 | 本地 | prod | 影响 |
|---|---|---|---|---|
## 结论
- [ ] Schema 一致,可开工 / [ ] 存在阻塞差异,需先迁移
```
> **关键差异必查项**(spec §2.2):`asset_likes.exhibition_id NOT NULL`、`booth_slots.is_enabled` 默认 false、`fan_profiles` 是否有 `experience`/`revenue_boost_bps`、`auto_users` 表存在性。
---
### Task 3: 创建工作目录结构
**Files:**
- 创建: `backend/scripts/loadgen/` 全部子目录
- 修改: `backend/.gitignore`
- [ ] **Step 1: 创建目录骨架**
Run:
```bash
mkdir -p backend/scripts/loadgen/{seed,loadgen/lib,loadgen/scenarios,loadgen/reporter,monitor/grafana-dashboards,recover,reports}
mkdir -p docs/loadtest/round1/{monitoring,raw-data}
touch backend/scripts/loadgen/reports/.gitkeep
touch docs/loadtest/round1/.gitkeep
```
- [ ] **Step 2: 在 .gitignore 屏蔽报告目录**
打开 `backend/.gitignore`(若不存在则创建),追加:
```
# Load test reports (含 token)
backend/scripts/loadgen/reports/
docs/loadtest/round1/monitoring/sample-*.log
docs/loadtest/round1/raw-data/
```
- [ ] **Step 3: 提交目录骨架**
Run:
```bash
git add backend/scripts/loadgen/ docs/loadtest/ backend/.gitignore
git commit -m "feat(loadtest): scaffold seed/loadgen/monitor/recover directories"
```
---
### Task 4: 创建共享 Go module 配置
**Files:**
- 修改: `backend/go.mod` (确认)
- 创建: `backend/scripts/loadgen/go.mod` (如果独立 module)
- [ ] **Step 1: 验证 go.sum 已包含 HdrHistogram + gonum/plot**
Run:
```bash
grep -E "hdrhistogram-go|gonum" backend/go.sum
```
Expected: 两项均已存在(HdrHistogram-go v1.1.2、gonum v0.16.0+)。如已存在,直接在主 module 内添加子包。
- [ ] **Step 2: 在主 module 注册新包**
> 由于 `backend/go.mod` 已是单 module 模式,所有 `backend/scripts/loadgen/**/*.go` 会被 `go build ./...` 自动包含,**不需要**单独的 go.mod。
Run 验证:
```bash
cd backend && go build ./scripts/loadgen/...
```
Expected: 当前无文件,build 应无 error(no Go files)。
---
## Phase 2: PG max_connections 修复(Day 1 14:00-14:30)
> ⚠️ 必须在业务低峰(14:00-14:05)执行,重启 PG 容器有 30s 停机。
### Task 5: 应用 §2.1 方案 A(降 max_connections 到 50)
**Files:**
- 修改: `docker/docker-compose.prod.yml`
- [ ] **Step 1: 备份 docker-compose.prod.yml**
Run:
```bash
cp docker/docker-compose.prod.yml docker/docker-compose.prod.yml.bak.$(date +%Y%m%d)
```
- [ ] **Step 2: 修改 POSTGRES_MAX_CONNECTIONS**
打开 `docker/docker-compose.prod.yml`,定位到 postgres 服务的环境变量:
```yaml
environment:
POSTGRES_MAX_CONNECTIONS: 100
```
改为:
```yaml
environment:
POSTGRES_MAX_CONNECTIONS: 50
```
- [ ] **Step 3: 重启 postgres 容器**
Run:
```bash
cd docker
docker-compose -f docker-compose.prod.yml restart postgres
echo "等待 30s ..."
sleep 30
```
Expected: 容器在 ~30s 内重启完成。
- [ ] **Step 4: 验证 max_connections 生效**
Run:
```bash
ssh root@101.132.250.62 <<'EOF'
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -c "SHOW max_connections;"
EOF
```
Expected: 输出 `50`
- [ ] **Step 5: 验证 gateway 仍能联通**
Run:
```bash
curl -s -o /dev/null -w "%{http_code}\n" http://101.132.250.62:8080/health
```
Expected: `200`
- [ ] **Step 6: 提交 docker-compose 变更**
Run:
```bash
git add docker/docker-compose.prod.yml
git commit -m "fix(pg): reduce max_connections 100→50 to fit 400M cgroup limit"
```
---
## Phase 3: 压源 ECS 准备(Day 1 下午)
### Task 6: 阿里云控制台开压源 ECS
**Files:**
- 无代码,手动操作
- [ ] **Step 1: ECS 控制台创建实例**
- 区域:华东 1(杭州)同 prod
- 规格:按量付费,4G/2C 通用型(ecs.s6-c1m2.xlarge 或同档)
- 镜像:Ubuntu 22.04 / Alibaba Cloud Linux 3
- 公网带宽:按使用流量 / 5Mbps 固定(评估实际带宽)
- 安全组:出站全开,入站仅 22(SSH)
- [ ] **Step 2: 记录压源 IP 与密码**
将公网 IP、root 密码、SSH 私钥保存到本地 `~/.ssh/loadgen-key` 并设权限 600。
- [ ] **Step 3: 验证 SSH**
Run:
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> "echo connected && uname -a"
```
Expected: 成功。
- [ ] **Step 4: 安装基础工具**
Run:
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
apt update && apt install -y curl git build-essential ca-certificates
EOF
```
Expected: 工具安装成功。
- [ ] **Step 5: 安装 Go 1.25.5**
Run:
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
wget https://go.dev/dl/go1.25.5.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.25.5.linux-amd64.tar.gz
echo 'export PATH=$PATH:/usr/local/go/bin' >> /root/.bashrc
export PATH=$PATH:/usr/local/go/bin
go version
EOF
```
Expected: `go version go1.25.5 linux/amd64`
- [ ] **Step 6: 验证压源到 prod 延迟**
Run:
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> "ping -c 10 101.132.250.62 | tail -3"
```
Expected: `avg < 10ms` (同地域骨干网)。
---
### Task 7: 压源 ECS 部署目录准备
**Files:**
- 无代码,创建目录
- [ ] **Step 1: 创建压源工作目录**
Run:
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
mkdir -p /opt/loadgen/{monitor,grafana-dashboards,reports}
echo "/opt/loadgen 目录已创建"
EOF
```
---
## Phase 4: seed 工具开发(Day 1 下午-晚上)
### Task 8: 初始化 seed main.go
**Files:**
- 创建: `backend/scripts/loadgen/seed/main.go`
- [ ] **Step 1: 写 main.go 骨架**
创建 `backend/scripts/loadgen/seed/main.go`:
```go
package main
import (
"database/sql"
"flag"
"fmt"
"log"
"os"
_ "github.com/lib/pq"
)
const (
LoadtestStarID = int64(999900)
LoadtestUserMin = int64(30000001)
LoadtestUserMax = int64(30001000)
)
type Config struct {
JWTSecret string
DBHost string
DBPort int
DBName string
DBUser string
DBPass string
Reset bool
ResetTok bool
}
func main() {
cleanup := flag.Bool("cleanup", false, "run cleanup (default: keep baseline)")
cleanupFull := flag.Bool("full", false, "with -cleanup: also delete users/stars")
cleanupStarID := flag.Int64("cleanup-star-id", LoadtestStarID, "star_id to clean (safety)")
flag.Parse()
cfg := parseFlags()
if *cleanup {
db, err := openDB(cfg)
if err != nil { log.Fatalf("open db: %v", err) }
defer db.Close()
if err := Cleanup(db, *cleanupStarID, *cleanupFull); err != nil {
log.Fatalf("cleanup: %v", err)
}
log.Println("✅ cleanup done")
return
}
if cfg.ResetTok {
if err := GenerateTokensForLoadtest(cfg); err != nil {
log.Fatalf("generate tokens: %v", err)
}
return
}
db, err := openDB(cfg)
if err != nil {
log.Fatalf("open db: %v", err)
}
defer db.Close()
if cfg.Reset {
log.Println("⚠️ --reset enabled, deleting existing loadtest data first")
if err := Cleanup(db, LoadtestStarID, false); err != nil {
log.Fatalf("reset: %v", err)
}
}
if err := runSeed(db, cfg); err != nil {
log.Fatalf("seed failed: %v", err)
}
if err := GenerateTokensForLoadtest(cfg); err != nil {
log.Fatalf("generate tokens: %v", err)
}
log.Println("✅ seed + tokens completed")
}
func parseFlags() *Config {
cfg := &Config{}
flag.StringVar(&cfg.JWTSecret, "jwt-secret", os.Getenv("JWT_SECRET"), "JWT secret (或 $JWT_SECRET)")
flag.StringVar(&cfg.DBHost, "db-host", "localhost", "PG host")
flag.IntVar(&cfg.DBPort, "db-port", 5432, "PG port")
flag.StringVar(&cfg.DBName, "db-name", "topfans", "PG database name (本地为 'top-fans' 带横线)")
flag.StringVar(&cfg.DBUser, "db-user", "postgres", "PG user")
flag.StringVar(&cfg.DBPass, "db-password", os.Getenv("DB_PASSWORD"), "PG password (或 $DB_PASSWORD)")
flag.BoolVar(&cfg.Reset, "reset", false, "delete existing test data before seed")
flag.BoolVar(&cfg.ResetTok, "reset-tokens", false, "only re-sign tokens, don't touch data")
flag.Parse()
return cfg
}
func openDB(cfg *Config) (*sql.DB, error) {
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
cfg.DBHost, cfg.DBPort, cfg.DBUser, cfg.DBPass, cfg.DBName)
return sql.Open("postgres", dsn)
}
func runSeed(db *sql.DB, cfg *Config) error {
if err := SeedStars(db); err != nil { return fmt.Errorf("seed stars: %w", err) }
log.Println("✓ stars seeded")
if err := SeedUsers(db); err != nil { return fmt.Errorf("seed users: %w", err) }
log.Println("✓ 1000 users seeded")
if err := SeedProfiles(db); err != nil { return fmt.Errorf("seed profiles: %w", err) }
log.Println("✓ 1000 fan_profiles + crystal seeded")
if err := SeedAssets(db); err != nil { return fmt.Errorf("seed assets: %w", err) }
log.Println("✓ 5000 assets seeded")
if err := SeedSlotsAndExhibits(db); err != nil { return fmt.Errorf("seed slots+exhibits: %w", err) }
log.Println("✓ 3000 booth_slots + 2000 exhibitions seeded")
if err := SeedFriendships(db); err != nil { return fmt.Errorf("seed friendships: %w", err) }
log.Println("✓ 10000 friendships seeded")
if err := ResetSequences(db); err != nil { return fmt.Errorf("reset sequences: %w", err) }
log.Println("✓ sequences reset")
return nil
}
```
- [ ] **Step 2: 验证编译**
Run:
```bash
cd backend && go build ./scripts/loadgen/seed/
```
Expected: 编译成功(无输出)。
- [ ] **Step 3: 提交骨架**
Run:
```bash
git add backend/scripts/loadgen/seed/main.go
git commit -m "feat(seed): main.go skeleton with config and DB open"
```
---
### Task 9: stars.go - 测试明星
**Files:**
- 创建: `backend/scripts/loadgen/seed/stars.go`
- [ ] **Step 1: 写 SeedStars 函数**
创建 `backend/scripts/loadgen/seed/stars.go`:
```go
package main
import (
"database/sql"
"time"
)
func SeedStars(db *sql.DB) error {
ts := time.Now().UnixMilli()
_, err := db.Exec(`
INSERT INTO stars (star_id, name, identity_id, is_active, created_at, updated_at)
VALUES ($1, 'loadtest_star', 'loadtest_star', true, $2, $2)
ON CONFLICT (star_id) DO NOTHING
`, LoadtestStarID, ts)
return err
}
```
- [ ] **Step 2: 在 main.go 调用**
`main.go``runSeed` 中追加:
```go
func runSeed(db *sql.DB, cfg *Config) error {
if err := SeedStars(db); err != nil {
return fmt.Errorf("seed stars: %w", err)
}
log.Println("✓ stars seeded")
return nil
}
```
- [ ] **Step 3: 验证编译**
Run:
```bash
cd backend && go build ./scripts/loadgen/seed/
```
Expected: 成功。
- [ ] **Step 4: 提交**
Run:
```bash
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert loadtest star (star_id=999900)"
```
---
### Task 10: users.go - 1000 测试用户
**Files:**
- 创建: `backend/scripts/loadgen/seed/users.go`
- [ ] **Step 1: 离线生成 bcrypt hash**
Run:
```bash
cd backend && cat > /tmp/gen_bcrypt.go <<'EOF'
package main
import (
"fmt"
"golang.org/x/crypto/bcrypt"
)
func main() {
h, _ := bcrypt.GenerateFromPassword([]byte("Test@123"), 10)
fmt.Println(string(h))
}
EOF
go run /tmp/gen_bcrypt.go > /tmp/bcrypt_hash.txt
cat /tmp/bcrypt_hash.txt
```
Expected: `$2a$10$...` 一行。
- [ ] **Step 2: 写 SeedUsers 函数**
创建 `backend/scripts/loadgen/seed/users.go`:
```go
package main
import (
"database/sql"
"fmt"
"os"
"time"
)
const bcryptHashFile = "/tmp/loadtest_bcrypt.txt"
func SeedUsers(db *sql.DB) error {
hash, err := os.ReadFile(bcryptHashFile)
if err != nil {
return fmt.Errorf("read bcrypt hash file %s: %w (run Task 10.1 first)", bcryptHashFile)
}
ts := time.Now().UnixMilli()
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO users (id, mobile, password_hash, is_active, created_at, updated_at)
VALUES ($1, $2, $3, true, $4, $4)
ON CONFLICT (id) DO NOTHING
`)
if err != nil {
return err
}
defer stmt.Close()
for uid := LoadtestUserMin; uid <= LoadtestUserMax; uid++ {
mobile := fmt.Sprintf("199%08d", uid-LoadtestUserMin+1)
if _, err := stmt.Exec(uid, mobile, string(hash), ts); err != nil {
return fmt.Errorf("insert user %d: %w", uid, err)
}
}
return tx.Commit()
}
```
> **注**:把 Step 1 生成的 hash 复制到 `backend/scripts/loadgen/seed/loadtest_bcrypt.txt`(commit 时 `.gitignore` 需保留此文件,因为需要复用,但**只**包含一个公开测试密码的 hash)。
- [ ] **Step 3: 提交 hash 文件**
```bash
# 在 backend/scripts/loadgen/seed/loadtest_bcrypt.txt 中粘贴 hash(单行)
git add backend/scripts/loadgen/seed/loadtest_bcrypt.txt
git commit -m "chore(seed): store loadtest bcrypt hash for 'Test@123'"
```
> **风险**:此 hash 暴露了测试密码。spec 已接受("测试数据,泄露影响有限")。可考虑编译期注入 `-ldflags` 替代以避免文件存在。
- [ ] **Step 4: 在 main.go 调用**
```go
func runSeed(db *sql.DB, cfg *Config) error {
if err := SeedStars(db); err != nil { return fmt.Errorf("seed stars: %w", err) }
log.Println("✓ stars seeded")
if err := SeedUsers(db); err != nil { return fmt.Errorf("seed users: %w", err) }
log.Println("✓ 1000 users seeded")
return nil
}
```
- [ ] **Step 5: 验证编译并提交**
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert 1000 loadtest users (id 30000001-30001000)"
```
---
### Task 11: profiles.go - fan_profiles + 水晶充值
**Files:**
- 创建: `backend/scripts/loadgen/seed/profiles.go`
- [ ] **Step 1: 写 SeedProfiles**
创建 `backend/scripts/loadgen/seed/profiles.go`:
```go
package main
import (
"database/sql"
"fmt"
"time"
)
func SeedProfiles(db *sql.DB) error {
ts := time.Now().UnixMilli()
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO fan_profiles (user_id, star_id, nickname, crystal_balance, slot_limit, is_active, created_at, updated_at)
VALUES ($1, $2, $3, 2200, 3, true, $4, $4)
ON CONFLICT (user_id, star_id) DO NOTHING
`)
if err != nil { return err }
defer stmt.Close()
for uid := LoadtestUserMin; uid <= LoadtestUserMax; uid++ {
nick := fmt.Sprintf("loadtest_%d", uid-LoadtestUserMin+1)
if _, err := stmt.Exec(uid, LoadtestStarID, nick, ts); err != nil {
return fmt.Errorf("insert profile %d: %w", uid, err)
}
}
return tx.Commit()
}
```
- [ ] **Step 2: 在 main.go 调用并提交**
```go
// runSeed 追加
if err := SeedProfiles(db); err != nil { return fmt.Errorf("seed profiles: %w", err) }
log.Println("✓ 1000 fan_profiles + crystal seeded")
```
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/profiles.go backend/scripts/loadgen/seed/main.go
git commit -m "feat(seed): insert fan_profiles with 2200 crystal each"
```
---
### Task 12: assets.go - 5000 资产
**Files:**
- 创建: `backend/scripts/loadgen/seed/assets.go`
- [ ] **Step 1: 写 SeedAssets**
创建 `backend/scripts/loadgen/seed/assets.go`:
```go
package main
import (
"database/sql"
"fmt"
"time"
)
const (
LoadtestPlaceholderURL = "<OSS_URL>/loadtest-placeholder.png" // TODO: Task 49 上传 OSS 后替换
AssetsPerUser = 5
)
func SeedAssets(db *sql.DB) error {
ts := time.Now().UnixMilli()
var maxID int64
if err := db.QueryRow("SELECT COALESCE(MAX(id), 0) FROM assets").Scan(&maxID); err != nil {
return err
}
startID := maxID + 1000
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO assets (id, owner_uid, star_id, name, cover_url, info, status, like_count, is_active, created_at, updated_at, grade)
VALUES ($1, $2, $3, $4, $5, 'loadtest', 1, 0, true, $6, $6, 1)
ON CONFLICT (id) DO NOTHING
`)
if err != nil { return err }
defer stmt.Close()
n := int64(0)
for uid := LoadtestUserMin; uid <= LoadtestUserMax; uid++ {
for i := 1; i <= AssetsPerUser; i++ {
aid := startID + n
name := fmt.Sprintf("loadtest_asset_%d_%d", uid, i)
if _, err := stmt.Exec(aid, uid, LoadtestStarID, name, LoadtestPlaceholderURL, ts); err != nil {
return fmt.Errorf("insert asset %d: %w", aid, err)
}
n++
}
}
return tx.Commit()
}
```
- [ ] **Step 2: 调用并提交**
```go
// runSeed 追加
if err := SeedAssets(db); err != nil { return fmt.Errorf("seed assets: %w", err) }
log.Println("✓ 5000 assets seeded")
```
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert 5000 loadtest assets (5 per user)"
```
---
### Task 13: slots_and_exhibits.go - booth_slots + exhibitions
**Files:**
- 创建: `backend/scripts/loadgen/seed/slots_and_exhibits.go`
- [ ] **Step 1: 写 SeedSlotsAndExhibits**
创建 `backend/scripts/loadgen/seed/slots_and_exhibits.go`:
```go
package main
import (
"database/sql"
"fmt"
"time"
)
func SeedSlotsAndExhibits(db *sql.DB) error {
ts := time.Now().UnixMilli()
expire := ts + 4*3600*1000 // 4 小时
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
// 1. booth_slots: 每 user 3 个, is_enabled=true
slotStmt, err := tx.Prepare(`
INSERT INTO booth_slots (host_profile_id, user_id, star_id, slot_index, is_enabled, created_at, updated_at)
SELECT fp.id, fp.user_id, fp.star_id, idx, true, $1, $1
FROM fan_profiles fp
CROSS JOIN (VALUES (1),(2),(3)) AS s(slot_index)
WHERE fp.star_id = $2
ON CONFLICT DO NOTHING
`)
if err != nil { return err }
defer slotStmt.Close()
if _, err := slotStmt.Exec(ts, LoadtestStarID); err != nil {
return fmt.Errorf("insert slots: %w", err)
}
// 2. exhibitions: 把每个 user 的前 2 个 asset 上架到 slot 1, 2
exStmt, err := tx.Prepare(`
INSERT INTO exhibitions (asset_id, slot_id, host_profile_id, occupier_uid, occupier_star_id, start_time, expire_at, created_at, updated_at)
SELECT a.id, bs.slot_id, fp.id, a.owner_uid, $1, $2, $3, $2, $2
FROM assets a
JOIN fan_profiles fp ON a.owner_uid = fp.user_id AND fp.star_id = $1
JOIN booth_slots bs ON bs.host_profile_id = fp.id AND bs.slot_index = $4
WHERE a.star_id = $1 AND a.name LIKE 'loadtest_asset_%' AND a.name LIKE '%_1' OR a.name LIKE '%_2'
ON CONFLICT DO NOTHING
`)
if err != nil { return err }
defer exStmt.Close()
// asset 1 → slot 1
if _, err := exStmt.Exec(LoadtestStarID, ts, expire, 1); err != nil {
return fmt.Errorf("insert exhibits slot1: %w", err)
}
// asset 2 → slot 2
if _, err := exStmt.Exec(LoadtestStarID, ts, expire, 2); err != nil {
return fmt.Errorf("insert exhibits slot2: %w", err)
}
return tx.Commit()
}
```
> ⚠️ **OR 优先级坑**:`a.name LIKE '%_1' OR a.name LIKE '%_2'` 会被 AND 拆解错误。修正为:
> ```sql
> AND (a.name LIKE '%_1' OR a.name LIKE '%_2')
> ```
> 把括号加到 `Prepare` 字符串里。
- [ ] **Step 2: 调用并提交**
```go
// runSeed 追加
if err := SeedSlotsAndExhibits(db); err != nil { return fmt.Errorf("seed slots+exhibits: %w", err) }
log.Println("✓ 3000 booth_slots + 2000 exhibitions seeded")
```
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert booth_slots (3/user) + exhibitions (2/user)"
```
---
### Task 14: friendships.go - 双向好友
**Files:**
- 创建: `backend/scripts/loadgen/seed/friendships.go`
- [ ] **Step 1: 写 SeedFriendships**
创建 `backend/scripts/loadgen/seed/friendships.go`:
```go
package main
import (
"database/sql"
"time"
)
func SeedFriendships(db *sql.DB) error {
ts := time.Now().UnixMilli()
_, err := db.Exec(`
INSERT INTO friendships (user_id, friend_id, star_id, status, intimacy, created_at, updated_at)
SELECT a.id, b.id, $1, 'accepted', 0, $2, $2
FROM users a, users b
WHERE a.id BETWEEN $3 AND $4
AND b.id BETWEEN $3 AND $4
AND a.id != b.id
AND ((a.id - $3) + 1) % 10 = ((b.id - $3) % 10)
ON CONFLICT (user_id, friend_id, star_id) DO NOTHING
`, LoadtestStarID, ts, LoadtestUserMin, LoadtestUserMax)
return err
}
```
> **说明**:JOIN 条件 `(a-30000000+1) % 10 = (b-30000000) % 10` 约生成 10000 行双向关系。
- [ ] **Step 2: 调用并提交**
```go
// runSeed 追加
if err := SeedFriendships(db); err != nil { return fmt.Errorf("seed friendships: %w", err) }
log.Println("✓ 10000 friendships seeded")
```
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert 10000 loadtest friendships (bidirectional, ~10/user)"
```
---
### Task 15: tokens.go - 预签 JWT
**Files:**
- 创建: `backend/scripts/loadgen/seed/tokens.go`
- [ ] **Step 1: 写 GenerateTokensForLoadtest**
创建 `backend/scripts/loadgen/seed/tokens.go`:
```go
package main
import (
"database/sql"
"encoding/csv"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/topfans/backend/pkg/jwt"
)
type TestUser struct {
UserID int64
Mobile string
AssetIDs []int64
ExhibitionIDs []int64
}
func GenerateTokensForLoadtest(cfg *Config) error {
jwt.SetSecret(cfg.JWTSecret)
// 1. 从 PG 读取每个 user 的 asset_ids / exhibition_ids
db, err := openDB(cfg)
if err != nil { return err }
defer db.Close()
rows, err := db.Query(`
SELECT u.id, u.mobile,
COALESCE(array_agg(DISTINCT a.id) FILTER (WHERE a.id IS NOT NULL), '{}'),
COALESCE(array_agg(DISTINCT e.id) FILTER (WHERE e.id IS NOT NULL), '{}')
FROM users u
LEFT JOIN assets a ON a.owner_uid = u.id AND a.star_id = $1
LEFT JOIN fan_profiles fp ON fp.user_id = u.id AND fp.star_id = $1
LEFT JOIN booth_slots bs ON bs.host_profile_id = fp.id AND bs.slot_index IN (1, 2)
LEFT JOIN exhibitions e ON e.slot_id = bs.slot_id AND e.occupier_star_id = $1
WHERE u.id BETWEEN $2 AND $3
GROUP BY u.id, u.mobile
ORDER BY u.id
`, LoadtestStarID, LoadtestUserMin, LoadtestUserMax)
if err != nil { return err }
defer rows.Close()
var users []TestUser
for rows.Next() {
var u TestUser
if err := rows.Scan(&u.UserID, &u.Mobile, &u.AssetIDs, &u.ExhibitionIDs); err != nil {
return err
}
users = append(users, u)
}
if err := rows.Err(); err != nil { return err }
// 2. 签 token, 写 users.csv
f, err := os.Create("users.csv")
if err != nil { return err }
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
if err := w.Write([]string{"phone", "password", "user_id", "star_id", "jwt_token", "asset_ids", "exhibition_ids"}); err != nil {
return err
}
now := time.Now().UnixMilli()
for _, u := range users {
token, err := jwt.GenerateToken(u.UserID, LoadtestStarID, now)
if err != nil { return err }
if err := w.Write([]string{
u.Mobile, "Test@123",
strconv.FormatInt(u.UserID, 10),
"999900", token,
joinInt64(u.AssetIDs),
joinInt64(u.ExhibitionIDs),
}); err != nil {
return err
}
}
fmt.Printf("✅ users.csv written: %d rows\n", len(users))
return nil
}
func joinInt64(s []int64) string {
parts := make([]string, len(s))
for i, v := range s { parts[i] = strconv.FormatInt(v, 10) }
return strings.Join(parts, ";")
}
```
- [ ] **Step 2: 调用并提交**
```go
// main.go: 新增 "tokens" 子命令
func main() {
cfg := parseFlags()
if cfg.ResetTok {
if err := GenerateTokensForLoadtest(cfg); err != nil {
log.Fatalf("generate tokens: %v", err)
}
return
}
// ... existing seed flow
if err := runSeed(db, cfg); err != nil { log.Fatalf("seed failed: %v", err) }
if err := GenerateTokensForLoadtest(cfg); err != nil { log.Fatalf("generate tokens: %v", err) }
log.Println("✅ seed + tokens completed")
}
```
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): pre-sign JWT for 1000 users, write users.csv"
```
---
### Task 16: sequences.go - 序列重置(CLAUDE.md 规范)
**Files:**
- 创建: `backend/scripts/loadgen/seed/sequences.go`
- [ ] **Step 1: 写 ResetSequences**
创建 `backend/scripts/loadgen/seed/sequences.go`:
```go
package main
import (
"database/sql"
"fmt"
)
var loadtestTables = []string{
"users",
"fan_profiles",
"assets",
"booth_slots",
"exhibitions",
"stars",
"asset_likes",
"friendships",
"crystal_transaction_records",
"mint_orders", // UUID, 但 setval 不会报错
}
func ResetSequences(db *sql.DB) error {
for _, tbl := range loadtestTables {
seq := fmt.Sprintf("%s_id_seq", tbl)
var maxID sql.NullInt64
if err := db.QueryRow(fmt.Sprintf("SELECT MAX(id) FROM %s", tbl)).Scan(&maxID); err != nil {
// 表可能不存在(部分为可选),记录并继续
fmt.Printf(" skip %s: %v\n", tbl, err)
continue
}
if !maxID.Valid { continue }
if _, err := db.Exec(fmt.Sprintf("SELECT setval('%s', $1)", seq), maxID.Int64); err != nil {
return fmt.Errorf("setval %s: %w", seq, err)
}
fmt.Printf(" ✓ %s → %d\n", seq, maxID.Int64)
}
return nil
}
```
> ⚠️ `booth_slots` 主键是 `slot_id` 而非 `id`,**重命名为 `booth_slots_slot_id_seq`**。同理检查其他表:如 `asset_likes` 主键名。
- [ ] **Step 2: 修正表名映射**
修改 `loadtestTables`:
```go
var loadtestSeqs = map[string]string{
"users": "users_id_seq",
"fan_profiles": "fan_profiles_id_seq",
"assets": "assets_id_seq",
"booth_slots": "booth_slots_slot_id_seq", // 主键是 slot_id
"exhibitions": "exhibitions_id_seq",
"stars": "stars_star_id_seq", // 主键是 star_id(不是 id!)
"asset_likes": "asset_likes_id_seq",
"friendships": "friendships_id_seq",
"crystal_transaction_records": "crystal_transaction_records_id_seq",
// mint_orders 主键是 UUID, 不需要 setval
}
// pkColumns 把表名映射到其主键列名(stars 的主键是 star_id 而非 id)
var pkColumns = map[string]string{
"users": "id",
"fan_profiles": "id",
"assets": "id",
"booth_slots": "slot_id",
"exhibitions": "id",
"stars": "star_id", // 关键差异
"asset_likes": "id",
"friendships": "id",
"crystal_transaction_records": "id",
}
func ResetSequences(db *sql.DB) error {
for tbl, seq := range loadtestSeqs {
pk := pkColumns[tbl]
if pk == "" { pk = "id" }
var maxID sql.NullInt64
if err := db.QueryRow(fmt.Sprintf("SELECT MAX(%s) FROM %s", pk, tbl)).Scan(&maxID); err != nil {
fmt.Printf(" skip %s: %v\n", tbl, err)
continue
}
if !maxID.Valid { continue }
if _, err := db.Exec(fmt.Sprintf("SELECT setval('%s', $1)", seq), maxID.Int64); err != nil {
return fmt.Errorf("setval %s: %w", seq, err)
}
fmt.Printf(" ✓ %s → %d\n", seq, maxID.Int64)
}
return nil
}
```
- [ ] **Step 3: 调用并提交**
```go
// runSeed 末尾追加
if err := ResetSequences(db); err != nil { return fmt.Errorf("reset sequences: %w", err) }
log.Println("✓ sequences reset")
```
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): reset PG sequences (CLAUDE.md compliance)"
```
---
### Task 17: cleanup.go - 数据清理
**Files:**
- 创建: `backend/scripts/loadgen/seed/cleanup.go`
- [ ] **Step 1: 写 Cleanup**
创建 `backend/scripts/loadgen/seed/cleanup.go`:
```go
package main
import (
"database/sql"
"errors"
"fmt"
)
func Cleanup(db *sql.DB, starID int64, full bool) error {
if starID != LoadtestStarID {
return errors.New("safety: cleanup only accepts loadtest star_id 999900")
}
queries := []string{
"DELETE FROM asset_likes WHERE star_id = $1",
"DELETE FROM exhibitions USING fan_profiles fp WHERE exhibitions.host_profile_id = fp.id AND fp.star_id = $1",
"DELETE FROM booth_slots WHERE star_id = $1",
"DELETE FROM mint_orders WHERE star_id = $1",
"DELETE FROM crystal_transaction_records WHERE star_id = $1",
"DELETE FROM friendships WHERE star_id = $1",
"DELETE FROM assets WHERE star_id = $1",
"DELETE FROM fan_profiles WHERE star_id = $1",
}
if full {
queries = append(queries,
"DELETE FROM users WHERE id BETWEEN $2 AND $3",
"DELETE FROM stars WHERE star_id = $1",
)
}
for _, q := range queries {
if _, err := db.Exec(q, starID, LoadtestUserMin, LoadtestUserMax); err != nil {
return fmt.Errorf("cleanup %q: %w", q[:30], err)
}
}
return ResetSequences(db)
}
```
- [ ] **Step 2: 添加 cleanup 子命令**
修改 `main.go`:
```go
func main() {
cleanup := flag.Bool("cleanup", false, "run cleanup (default: keep baseline)")
cleanupFull := flag.Bool("full", false, "with -cleanup: also delete users/stars")
cleanupStarID := flag.Int64("cleanup-star-id", LoadtestStarID, "star_id to clean (safety)")
flag.Parse()
cfg := parseFlags()
if *cleanup {
db, _ := openDB(cfg)
defer db.Close()
if err := Cleanup(db, *cleanupStarID, *cleanupFull); err != nil {
log.Fatalf("cleanup: %v", err)
}
log.Println("✅ cleanup done")
return
}
// ... existing seed flow
}
```
- [ ] **Step 3: 验证编译并提交**
```bash
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): cleanup with --keep-baseline / --full + safety check"
```
---
### Task 18: seed 单元测试
**Files:**
- 创建: `backend/scripts/loadgen/seed/seed_test.go`
- [ ] **Step 1: 写 mobile 编号函数测试**
```go
package main
import "testing"
func TestMobileNumbering(t *testing.T) {
cases := []struct {
uid int64
want string
}{
{30000001, "19900000001"},
{30000050, "19900000050"},
{30001000, "19900001000"},
}
for _, c := range cases {
got := formatMobile(c.uid)
if got != c.want {
t.Errorf("formatMobile(%d) = %q, want %q", c.uid, got, c.want)
}
}
}
func formatMobile(uid int64) string {
return fmt.Sprintf("199%08d", uid-LoadtestUserMin+1)
}
```
- [ ] **Step 2: 写 sequence 映射测试**
```go
func TestSequenceMapping(t *testing.T) {
cases := map[string]string{
"users": "users_id_seq",
"stars": "stars_star_id_seq",
"booth_slots": "booth_slots_slot_id_seq",
}
for tbl, want := range cases {
got, ok := loadtestSeqs[tbl]
if !ok {
t.Errorf("table %s not in loadtestSeqs", tbl)
continue
}
if got != want {
t.Errorf("seq for %s = %q, want %q", tbl, got, want)
}
}
}
```
- [ ] **Step 3: 写 cleanup safety 测试**
```go
func TestCleanupRejectsInvalidStarID(t *testing.T) {
db, _ := sql.Open("postgres", "host=localhost sslmode=disable")
defer db.Close()
err := Cleanup(db, 87, false) // 真实业务 star_id
if err == nil {
t.Fatal("expected error for non-loadtest star_id, got nil")
}
}
```
- [ ] **Step 4: 运行测试**
```bash
cd backend && go test ./scripts/loadgen/seed/ -v
```
Expected: 3 tests pass。
- [ ] **Step 5: 提交**
```bash
git add backend/scripts/loadgen/seed/seed_test.go
git commit -m "test(seed): unit tests for mobile format, seq mapping, cleanup safety"
```
---
### Task 19: seed README + 部署脚本
**Files:**
- 创建: `backend/scripts/loadgen/seed/README.md`
- [ ] **Step 1: 写 README**
```markdown
# seed - 压测数据准备工具
## 用途
在 prod 本地插入 1000 个测试用户、5000 资产、3000 booth_slots、2000 exhibitions、10000 friendships,签 1000 个 JWT,写 `users.csv`
## 编译
```bash
cd backend && go build -o seed ./scripts/loadgen/seed/
```
## 在 prod 上跑
```bash
# 1. 上传二进制
scp seed root@101.132.250.62:/opt/topfans/loadtest/
# 2. SSH 上去跑
ssh root@101.132.250.62
cd /opt/topfans/loadtest
export DB_PASSWORD=$(cat /opt/topfans/docker/.env.prod | grep DB_PASSWORD | cut -d= -f2)
export JWT_SECRET=$(cat /opt/topfans/docker/.env.prod | grep JWT_SECRET | cut -d= -f2)
./seed --db-name=topfans --jwt-secret="$JWT_SECRET"
```
## 清理
```bash
# 保留 1000 users + 资产(下次复用)
./seed --cleanup
# 全删
./seed --cleanup --full
# 只重签 token(第二轮压测 JWT 过期时)
./seed --reset-tokens --jwt-secret="$JWT_SECRET"
```
```
- [ ] **Step 2: 提交**
```bash
git add backend/scripts/loadgen/seed/README.md
git commit -m "docs(seed): README with build/deploy/cleanup usage"
```
---
## Phase 5: loadgen 库开发(Day 1 晚上)
### Task 20: loadgen main.go 骨架
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/main.go`
- [ ] **Step 1: 写 main.go 骨架**
```go
package main
import (
"flag"
"log"
)
func main() {
var (
scenarios = flag.String("scenarios", "", "comma-separated, e.g. S1,S2,S3")
stage = flag.String("stage", "step", "baseline|step|soak|stress")
rps = flag.Int("rps", 0, "single-RPS mode (overrides stage)")
vus = flag.Int("vus", 0, "max concurrent virtual users (default: auto)")
duration = flag.Duration("duration", 0, "single stage duration (default per §5.3)")
interPause = flag.Duration("inter-scenario-pause", 15*60*1_000_000_000, "pause between scenarios")
monitor = flag.String("monitor", "lite", "off|lite|full")
prodSSH = flag.String("prod-ssh", "", "user@host for ssh metrics")
target = flag.String("target", "http://101.132.250.62:8080", "target gateway URL")
cmd = flag.String("cmd", "run", "run|preflight|verify|report")
inputDir = flag.String("input", "", "for cmd=report: input dir")
outputFile = flag.String("output", "./report.md", "for cmd=report: output file")
)
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Printf("loadgen starting: cmd=%s scenarios=%s", *cmd, *scenarios)
switch *cmd {
case "run":
if err := runLoadgen(*target, *scenarios, *stage, *rps, *vus, *duration, *interPause, *monitor, *prodSSH); err != nil {
log.Fatalf("run failed: %v", err)
}
case "preflight":
if err := runPreflight(*target, *prodSSH); err != nil { log.Fatalf("preflight: %v", err) }
case "verify":
if err := runVerify(*prodSSH); err != nil { log.Fatalf("verify: %v", err) }
case "report":
if err := runReport(*inputDir, *outputFile); err != nil { log.Fatalf("report: %v", err) }
default:
log.Fatalf("unknown cmd: %s", *cmd)
}
}
```
> ⚠️ 注意 `15*60*1_000_000_000` 是 15 分钟纳秒(Go 1.21+ 用 `time.Minute*15`)。
- [ ] **Step 2: 修正 duration 字面量**
```go
import "time"
interPause = flag.Duration("inter-scenario-pause", 15*time.Minute, "pause between scenarios")
```
- [ ] **Step 3: 添加占位 runLoadgen 等**
```go
func runLoadgen(target, scenarios, stage string, rps, vus int, duration, interPause time.Duration, monitor, prodSSH string) error {
// TODO: 在后续 Task 填充
return nil
}
func runPreflight(target, prodSSH string) error { return nil }
func runVerify(prodSSH string) error { return nil }
func runReport(input, output string) error { return nil }
```
- [ ] **Step 4: 验证编译并提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/
git add backend/scripts/loadgen/loadgen/
git commit -m "feat(loadgen): main.go skeleton with run/preflight/verify/report subcommands"
```
---
### Task 21: lib/csv.go - users.csv 加载
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/lib/csv.go`
- 创建: `backend/scripts/loadgen/loadgen/lib/csv_test.go`
- [ ] **Step 1: 写 Loader**
```go
package lib
import (
"encoding/csv"
"io"
"os"
"strconv"
"strings"
)
type TestUser struct {
Phone string
Password string
UserID int64
StarID int64
JWTToken string
AssetIDs []int64
ExhibitionIDs []int64
}
func LoadUsers(path string) ([]TestUser, error) {
f, err := os.Open(path)
if err != nil { return nil, err }
defer f.Close()
r := csv.NewReader(f)
r.FieldsPerRecord = -1
header, err := r.Read()
if err != nil { return nil, err }
if len(header) < 5 || header[0] != "phone" {
return nil, io.ErrUnexpectedEOF
}
var users []TestUser
for {
row, err := r.Read()
if err == io.EOF { break }
if err != nil { return nil, err }
if len(row) < 7 { continue }
uid, _ := strconv.ParseInt(row[2], 10, 64)
sid, _ := strconv.ParseInt(row[3], 10, 64)
users = append(users, TestUser{
Phone: row[0],
Password: row[1],
UserID: uid,
StarID: sid,
JWTToken: row[4],
AssetIDs: parseIDs(row[5]),
ExhibitionIDs: parseIDs(row[6]),
})
}
return users, nil
}
func parseIDs(s string) []int64 {
if s == "" { return nil }
parts := strings.Split(s, ";")
out := make([]int64, 0, len(parts))
for _, p := range parts {
v, err := strconv.ParseInt(p, 10, 64)
if err != nil { continue }
out = append(out, v)
}
return out
}
```
- [ ] **Step 2: 写测试**
```go
package lib
import (
"os"
"path/filepath"
"testing"
)
func TestLoadUsers(t *testing.T) {
dir := t.TempDir()
p := filepath.Join(dir, "users.csv")
content := `phone,password,user_id,star_id,jwt_token,asset_ids,exhibition_ids
19900000001,Test@123,30000001,999900,token1,1000;1001;1002,5000;5001
19900000002,Test@123,30000002,999900,token2,1003,5002
`
if err := os.WriteFile(p, []byte(content), 0644); err != nil { t.Fatal(err) }
users, err := LoadUsers(p)
if err != nil { t.Fatal(err) }
if len(users) != 2 { t.Fatalf("want 2 users, got %d", len(users)) }
u := users[0]
if u.UserID != 30000001 { t.Errorf("uid=%d", u.UserID) }
if len(u.AssetIDs) != 3 { t.Errorf("assets=%v", u.AssetIDs) }
if u.AssetIDs[0] != 1000 { t.Errorf("first asset=%d", u.AssetIDs[0]) }
}
```
- [ ] **Step 3: 运行测试**
```bash
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestLoadUsers
```
Expected: PASS。
- [ ] **Step 4: 提交**
```bash
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): load users.csv into memory with parseIDs helper"
```
---
### Task 22: lib/client.go - http.Transport 封装
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/lib/client.go`
- [ ] **Step 1: 写 Client**
```go
package lib
import (
"net"
"net/http"
"time"
)
func NewHTTPClient(target string) *http.Client {
dialer := &net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}
transport := &http.Transport{
DialContext: dialer.DialContext,
MaxConnsPerHost: 500,
MaxIdleConns: 500,
MaxIdleConnsPerHost: 200,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
}
return &http.Client{
Transport: transport,
Timeout: 15 * time.Second,
}
}
```
- [ ] **Step 2: 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/lib/
git add backend/scripts/loadgen/loadgen/lib/client.go
git commit -m "feat(loadgen): HTTP transport with MaxConnsPerHost=500, keep-alive"
```
---
### Task 23: lib/hdr.go - HDR 直方图封装
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/lib/hdr.go`
- 创建: `backend/scripts/loadgen/loadgen/lib/hdr_test.go`
- [ ] **Step 1: 写 LatencyRecorder**
```go
package lib
import (
"sync"
"github.com/HdrHistogram/hdrhistogram-go"
)
type LatencyRecorder struct {
mu sync.Mutex
h *hdrhistogram.Histogram
}
func NewLatencyRecorder() *LatencyRecorder {
// 0-30s, 3 位有效精度 (~0.1% 误差)
return &LatencyRecorder{
h: hdrhistogram.New(1, 30_000_000, 3),
}
}
func (r *LatencyRecorder) Record(latencyUs int64) {
r.mu.Lock()
defer r.mu.Unlock()
if latencyUs < 1 { latencyUs = 1 }
_ = r.h.RecordValue(latencyUs)
}
func (r *LatencyRecorder) Snapshot() *hdrhistogram.Histogram {
r.mu.Lock()
defer r.mu.Unlock()
return r.h.Copy()
}
```
- [ ] **Step 2: 写测试**
```go
package lib
import "testing"
func TestLatencyRecorder(t *testing.T) {
r := NewLatencyRecorder()
for i := 0; i < 100; i++ {
r.Record(int64(10_000 + i*100)) // 10ms-20ms
}
snap := r.Snapshot()
p50 := snap.ValueAtQuantile(50.0)
if p50 < 10_000 || p50 > 20_000 {
t.Errorf("p50 out of range: %d", p50)
}
}
```
- [ ] **Step 3: 运行 + 提交**
```bash
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestLatencyRecorder
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): HDR histogram wrapper for latency recording"
```
---
### Task 24: lib/log.go - stderr 行仪表
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/lib/log.go`
- [ ] **Step 1: 写 StderrDashboard**
```go
package lib
import (
"fmt"
"io"
"os"
"time"
)
type Dashboard struct {
scenario string
out io.Writer
}
func NewDashboard(scenario string) *Dashboard {
return &Dashboard{scenario: scenario, out: os.Stderr}
}
func (d *Dashboard) PerSecondLine(t time.Time, targetRPS, actualRPS float64, p50, p95, p99 int64, errRate float64, vu int) {
fmt.Fprintf(d.out, "[%s] %-12s | target=%5.0f actual=%6.1f | p50=%4d p95=%4d p99=%5d | err=%.1f%% | vu=%d\n",
t.Format("15:04:05"), d.scenario, targetRPS, actualRPS,
p50/1000, p95/1000, p99/1000, errRate*100, vu)
}
func (d *Dashboard) PerMinuteSummary(elapsed time.Duration, totalReqs int64, errs int64, h interface{ ValueAtQuantile(float64) int64 }, status2xx, status4xx, status5xx int64) {
p50 := h.ValueAtQuantile(50) / 1000
p95 := h.ValueAtQuantile(95) / 1000
p99 := h.ValueAtQuantile(99) / 1000
fmt.Fprintf(d.out, "═══════════════════════════════════════════════════════════════\n")
fmt.Fprintf(d.out, " Requests: %d (%.1f/s avg)\n", totalReqs, float64(totalReqs)/elapsed.Seconds())
fmt.Fprintf(d.out, " Errors: %d (%.1f%%)\n", errs, float64(errs)/float64(totalReqs)*100)
fmt.Fprintf(d.out, " Latency p50: %dms p95: %dms p99: %dms\n", p50, p95, p99)
fmt.Fprintf(d.out, " Status: 2xx=%.1f%% 4xx=%.1f%% 5xx=%.1f%%\n",
float64(status2xx)/float64(totalReqs)*100,
float64(status4xx)/float64(totalReqs)*100,
float64(status5xx)/float64(totalReqs)*100)
fmt.Fprintf(d.out, "═══════════════════════════════════════════════════════════════\n")
}
```
- [ ] **Step 2: 验证编译并提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/lib/
git add backend/scripts/loadgen/loadgen/lib/log.go
git commit -m "feat(loadgen): stderr per-second + per-minute dashboard lines"
```
---
### Task 25: lib/ramp.go - 阶梯调度器
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/lib/ramp.go`
- 创建: `backend/scripts/loadgen/loadgen/lib/ramp_test.go`
- [ ] **Step 1: 写 StageScheduler**
```go
package lib
import "time"
type Stage struct {
RPS int
Duration time.Duration
}
type StageScheduler struct {
stages []Stage
idx int
start time.Time
}
func NewStageScheduler(stages []Stage) *StageScheduler {
return &StageScheduler{stages: stages, start: time.Now()}
}
// CurrentRPS returns the target RPS for the current stage, or 0 if all stages done.
func (s *StageScheduler) CurrentRPS() int {
if s.idx >= len(s.stages) { return 0 }
return s.stages[s.idx].RPS
}
// Advance checks if the current stage has elapsed and moves to the next.
func (s *StageScheduler) Advance() bool {
if s.idx >= len(s.stages) { return false }
elapsed := time.Since(s.start)
if elapsed >= s.stages[s.idx].Duration {
s.idx++
s.start = time.Now()
return s.idx < len(s.stages)
}
return true
}
func (s *StageScheduler) StageIndex() int { return s.idx }
func (s *StageScheduler) StageCount() int { return len(s.stages) }
```
- [ ] **Step 2: 写测试**
```go
package lib
import (
"testing"
"time"
)
func TestStageScheduler(t *testing.T) {
s := NewStageScheduler([]Stage{
{RPS: 10, Duration: 50 * time.Millisecond},
{RPS: 20, Duration: 50 * time.Millisecond},
})
if got := s.CurrentRPS(); got != 10 { t.Errorf("first rps=%d", got) }
time.Sleep(60 * time.Millisecond)
if !s.Advance() { t.Fatal("should still have more stages") }
if got := s.CurrentRPS(); got != 20 { t.Errorf("second rps=%d", got) }
time.Sleep(60 * time.Millisecond)
if s.Advance() { t.Fatal("should be done") }
if got := s.CurrentRPS(); got != 0 { t.Errorf("after end rps=%d", got) }
}
```
- [ ] **Step 3: 运行 + 提交**
```bash
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestStageScheduler
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): stage scheduler for RPS ramp-up"
```
---
### Task 26: lib/circuit.go - 6 维红线判停
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/lib/circuit.go`
- 创建: `backend/scripts/loadgen/loadgen/lib/circuit_test.go`
> **关键(spec §7.3)**:6 维红线 = R1 错误率 + R2 P99 + R3 5xx(客户端) + R4 PG 连接 + R5 磁盘 + R6 OOM(服务端)。
> R1-R3 来自 loadgen 自身 metrics,R4-R6 来自 `metrics-feed.jsonl`(`sample.sh` 写入 prod 端)。
> **两者都要 check** → CircuitBreaker 既接受 client-side Metrics,也接受 server-side ServerMetrics。
- [ ] **Step 1: 写 CircuitBreaker**
```go
package lib
import (
"sync"
"time"
)
type CircuitState int
const (
CircuitOK CircuitState = iota
CircuitTripped
)
type CircuitBreaker struct {
mu sync.Mutex
state CircuitState
// R1/R2/R3 触发计时
errRateStart time.Time
p99Start time.Time
fiveXXStart time.Time
// R4/R5/R6 触发计时
pgConnStart time.Time
diskStart time.Time
oomSeen bool
// 阈值(详见 spec §7.3)
ErrRate float64 // R1: 0.05
P99ThresholdMs int64 // R2: 3000
FiveXXRate float64 // R3: 0.10
PGConnMax int // R4: 42 (50×85%, max_connections 50 时的 85%)
DiskMinGB int // R5: 5
SustainTime time.Duration // R1/R2: 30s
Sustain5xx time.Duration // R3: 10s
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
ErrRate: 0.05,
P99ThresholdMs: 3000,
FiveXXRate: 0.10,
PGConnMax: 42, // 50×85% (spec §2.1 把 max_connections 从 100 改 50 后,§7.3 R4 阈值要相应调整)
DiskMinGB: 5,
SustainTime: 30 * time.Second,
Sustain5xx: 10 * time.Second,
}
}
// ClientMetrics 来自 loadgen 自身(R1/R2/R3)
type ClientMetrics struct {
ErrorRate float64
P99Ms int64
FiveXXRate float64
}
// ServerMetrics 来自 metrics-feed.jsonl 解析(R4/R5/R6)
type ServerMetrics struct {
PGActive int
DiskGB int
OOMEvent bool
}
// Check 综合客户端 + 服务端指标,任一红线持续达 SustainTime 即跳闸
func (cb *CircuitBreaker) Check(client ClientMetrics, server ServerMetrics, now time.Time) bool {
cb.mu.Lock()
defer cb.mu.Unlock()
// R1: 错误率
if client.ErrorRate > cb.ErrRate {
if cb.errRateStart.IsZero() { cb.errRateStart = now }
if now.Sub(cb.errRateStart) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.errRateStart = time.Time{} }
// R2: P99
if client.P99Ms > cb.P99ThresholdMs {
if cb.p99Start.IsZero() { cb.p99Start = now }
if now.Sub(cb.p99Start) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.p99Start = time.Time{} }
// R3: 5xx
if client.FiveXXRate > cb.FiveXXRate {
if cb.fiveXXStart.IsZero() { cb.fiveXXStart = now }
if now.Sub(cb.fiveXXStart) >= cb.Sustain5xx {
cb.state = CircuitTripped
return true
}
} else { cb.fiveXXStart = time.Time{} }
// R4: PG 连接数(基于 max_connections=50 重新校准的 85% = 42)
if server.PGActive > cb.PGConnMax {
if cb.pgConnStart.IsZero() { cb.pgConnStart = now }
if now.Sub(cb.pgConnStart) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.pgConnStart = time.Time{} }
// R5: 磁盘空闲
if server.DiskGB < cb.DiskMinGB {
if cb.diskStart.IsZero() { cb.diskStart = now }
if now.Sub(cb.diskStart) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.diskStart = time.Time{} }
// R6: OOM 事件(瞬时即触发,无需 sustain)
if server.OOMEvent || cb.oomSeen {
cb.oomSeen = true
cb.state = CircuitTripped
return true
}
return false
}
func (cb *CircuitBreaker) State() CircuitState { return cb.state }
```
- [ ] **Step 2: 写测试**
```go
package lib
import (
"testing"
"time"
)
func TestCircuitBreaker_R1(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if cb.Check(ClientMetrics{ErrorRate: 0.06}, ServerMetrics{}, now) {
t.Error("R1 should not trip on first check")
}
if !cb.Check(ClientMetrics{ErrorRate: 0.06}, ServerMetrics{}, now.Add(31*time.Second)) {
t.Error("R1 should trip after 30s sustain")
}
}
func TestCircuitBreaker_R2(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if !cb.Check(ClientMetrics{P99Ms: 4000}, ServerMetrics{}, now.Add(31*time.Second)) {
t.Error("R2 P99>3000 sustained 30s should trip")
}
}
func TestCircuitBreaker_R4_PGConn(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if !cb.Check(ClientMetrics{}, ServerMetrics{PGActive: 50}, now.Add(31*time.Second)) {
t.Error("R4 PG active > 42 sustained should trip")
}
}
func TestCircuitBreaker_R5_Disk(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if !cb.Check(ClientMetrics{}, ServerMetrics{DiskGB: 3}, now.Add(31*time.Second)) {
t.Error("R5 disk < 5GB sustained should trip")
}
}
func TestCircuitBreaker_R6_OOM_Instant(t *testing.T) {
cb := NewCircuitBreaker()
if !cb.Check(ClientMetrics{}, ServerMetrics{OOMEvent: true}, time.Now()) {
t.Error("R6 OOM should trip instantly without sustain")
}
}
func TestCircuitBreaker_Recovers(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
cb.Check(ClientMetrics{ErrorRate: 0.06}, ServerMetrics{}, now)
if cb.Check(ClientMetrics{ErrorRate: 0.01}, ServerMetrics{}, now.Add(10*time.Second)) {
t.Error("should not trip when error drops")
}
if cb.State() != CircuitOK { t.Error("should remain OK") }
}
```
- [ ] **Step 3: 运行 + 提交**
```bash
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestCircuit
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): 6-dim circuit breaker (R1/R2/R3 client + R4/R5/R6 server)"
```
---
### Task 27: lib/ssh_metrics.go - SSH 远程监控 + 解析为 ServerMetrics
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/lib/ssh_metrics.go`
- 创建: `backend/scripts/loadgen/loadgen/lib/ssh_metrics_test.go`
- [ ] **Step 1: 写 TailMetricsFeed + 解析为 ServerMetrics**
```go
package lib
import (
"bufio"
"os/exec"
"strconv"
"strings"
)
type MetricsLine struct {
Timestamp string
Fields map[string]string
}
// TailMetricsFeed runs `ssh host tail -F /opt/topfans/loadtest/metrics-feed.jsonl` and emits parsed lines.
func TailMetricsFeed(sshTarget, path string) (<-chan MetricsLine, error) {
out := make(chan MetricsLine, 100)
cmd := exec.Command("ssh", sshTarget, "tail -F "+path)
stdout, err := cmd.StdoutPipe()
if err != nil { return nil, err }
if err := cmd.Start(); err != nil { return nil, err }
go func() {
defer close(out)
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
ml := parseMetricsLine(line)
select {
case out <- ml:
default: // drop if buffer full
}
}
}()
return out, nil
}
func parseMetricsLine(s string) MetricsLine {
parts := strings.SplitN(s, " ", 2)
ml := MetricsLine{Fields: make(map[string]string)}
if len(parts) >= 1 { ml.Timestamp = parts[0] }
if len(parts) == 2 {
for _, kv := range strings.Fields(parts[1]) {
eq := strings.SplitN(kv, "=", 2)
if len(eq) == 2 { ml.Fields[eq[0]] = eq[1] }
}
}
return ml
}
// ToServerMetrics 解析一行 metrics-feed 为 R4/R5/R6 指标
// 期望格式: "<ts> pg_active=42 disk_free=12 topfans-postgres=12.3% topfans-gateway=5.1% ..."
// OOM 事件通过单独 `oom=true` 标识(由 sample.sh 检测 docker events 注入)
func (ml MetricsLine) ToServerMetrics() ServerMetrics {
sm := ServerMetrics{}
if v, ok := ml.Fields["pg_active"]; ok {
sm.PGActive, _ = strconv.Atoi(v)
}
if v, ok := ml.Fields["disk_free"]; ok {
sm.DiskGB, _ = strconv.Atoi(v)
}
if v, ok := ml.Fields["oom"]; ok && v == "true" {
sm.OOMEvent = true
}
return sm
}
```
- [ ] **Step 2: 写测试**
```go
package lib
import "testing"
func TestParseMetricsLine(t *testing.T) {
ml := parseMetricsLine("1700000000 pg_active=42 disk_free=12 oom=false")
if ml.Timestamp != "1700000000" { t.Errorf("ts=%q", ml.Timestamp) }
if ml.Fields["pg_active"] != "42" { t.Errorf("pg=%q", ml.Fields["pg_active"]) }
if ml.Fields["disk_free"] != "12" { t.Errorf("disk=%q", ml.Fields["disk_free"]) }
}
func TestToServerMetrics(t *testing.T) {
ml := parseMetricsLine("1700000000 pg_active=50 disk_free=3 oom=true")
sm := ml.ToServerMetrics()
if sm.PGActive != 50 { t.Errorf("pg=%d", sm.PGActive) }
if sm.DiskGB != 3 { t.Errorf("disk=%d", sm.DiskGB) }
if !sm.OOMEvent { t.Error("oom should be true") }
}
```
- [ ] **Step 3: 验证编译 + 提交**
```bash
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestParse
cd backend && go build ./scripts/loadgen/loadgen/lib/
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): ssh tail metrics-feed + parse to ServerMetrics for R4/R5/R6"
```
---
### Task 28: loadgen runLoadgen 主循环
**Files:**
- 修改: `backend/scripts/loadgen/loadgen/main.go`
- [ ] **Step 1: 实现 runLoadgen 框架(集成 lib + scenarios)**
```go
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
"github.com/topfans/backend/scripts/loadgen/loadgen/scenarios"
"github.com/topfans/backend/scripts/loadgen/loadgen/reporter"
)
func runLoadgen(target, scenarioIDs, stage string, rps, vus int, duration, interPause time.Duration, monitorMode, prodSSH, stepSchedule string) error {
users, err := lib.LoadUsers("users.csv")
if err != nil { return fmt.Errorf("load users.csv: %w", err) }
client := lib.NewHTTPClient(target)
recorder := lib.NewLatencyRecorder()
breaker := lib.NewCircuitBreaker()
dashboard := lib.NewDashboard(scenarioIDs)
var errCount, totalCount, fiveXXCount atomic.Int64
var clientLatencyP99Us atomic.Int64 // for R7 (loadgen self-latency drift)
// metrics feed consumer (R4/R5/R6)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if monitorMode != "off" && prodSSH != "" {
feed, err := lib.TailMetricsFeed(prodSSH, "/opt/topfans/loadtest/metrics-feed.jsonl")
if err != nil { return fmt.Errorf("tail metrics: %w", err) }
go consumeServerMetrics(ctx, feed, breaker)
}
// SIGINT handler
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("SIGINT received, shutting down gracefully...")
cancel()
}()
// parse step schedule (e.g. "2,5,10,15,25,40")
var stages []int
if stage == "step" {
if stepSchedule == "" {
return fmt.Errorf("--step-schedule required for --stage=step (e.g. --step-schedule='2,5,10,15,25,40')")
}
for _, s := range strings.Split(stepSchedule, ",") {
var v int
fmt.Sscanf(s, "%d", &v)
if v > 0 { stages = append(stages, v) }
}
if len(stages) == 0 {
return fmt.Errorf("--step-schedule invalid: %q", stepSchedule)
}
}
// run scenarios
ids := strings.Split(scenarioIDs, ",")
for _, id := range ids {
s, err := scenarios.Get(id, client, users, &errCount, &totalCount, &fiveXXCount, recorder, &clientLatencyP99Us)
if err != nil { return err }
// 给场景传 stage schedule(stage=baseline 不用,step 用)
if err := s.Run(ctx, rps, duration, dashboard, breaker, stages); err != nil {
return err
}
time.Sleep(interPause)
}
// write report
if err := reporter.WriteJSON("report.json", recorder.Snapshot(), totalCount.Load(), errCount.Load(), fiveXXCount.Load()); err != nil {
return err
}
return nil
}
// consumeServerMetrics 把 metrics-feed.jsonl 行喂给 CircuitBreaker.Check()
func consumeServerMetrics(ctx context.Context, feed <-chan lib.MetricsLine, breaker *lib.CircuitBreaker) {
ticker := time.NewTicker(5 * time.Second)
var latestServer lib.ServerMetrics
for {
select {
case <-ctx.Done(): return
case ml := <-feed:
latestServer = ml.ToServerMetrics()
case <-ticker.C:
// loadgen 自身 metrics(R1/R2/R3) 由各场景直接 call breaker.Check 注入
// 此处只更新 server-side R4/R5/R6
// TODO: 也注入 clientP99 - 需要 R7 联动,见 spec §8.1
}
}
}
```
- [ ] **Step 2: 给 main.go 添加 --step-schedule flag**
修改 `main.go` 的 flag 定义:
```go
stepSchedule = flag.String("step-schedule", "", "comma-separated RPS list for --stage=step, e.g. '2,5,10,15,25,40' for S1")
```
并把 `*stepSchedule` 传给 `runLoadgen` 最后一个参数。
- [ ] **Step 3: 编译 + 提交(可能因 scenarios 缺失失败,先用占位)**
```bash
cd backend && go build ./scripts/loadgen/loadgen/ 2>&1 | head -20
```
> 预期会报 `scenarios.Get` 未定义,继续 Task 30 实现。
```bash
git add backend/scripts/loadgen/loadgen/main.go
git commit -m "feat(loadgen): runLoadgen main loop with 6-dim circuit breaker + step schedule flag"
```
---
### Task 29: 全部 lib 测试一次跑通
**Files:**
- 修改: 无,跑测试
- [ ] **Step 1: 运行所有 lib 测试**
```bash
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v
```
Expected: 全部 PASS。
- [ ] **Step 2: 提交(若修改了)**
```bash
git status
# 如有修改,提交
```
---
## Phase 6: loadgen 场景开发(Day 1 晚上-次日凌晨)
### Task 30: scenarios 注册表
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/scenarios/scenarios.go`
- [ ] **Step 1: 写 Scenario 接口 + 注册表**
```go
package scenarios
import (
"context"
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
)
type Scenario interface {
Run(ctx context.Context, rpsOverride int, durationOverride time.Duration, dash *lib.Dashboard, breaker *lib.CircuitBreaker) error
}
var registry = map[string]func(client *http.Client, users []lib.TestUser, errCount, totalCount, fiveXXCount *atomic.Int64, rec *lib.LatencyRecorder) Scenario{}
// Get retrieves a scenario by ID (e.g. "S1").
func Get(id string, client *http.Client, users []lib.TestUser, errCount, totalCount, fiveXXCount *atomic.Int64, rec *lib.LatencyRecorder) (Scenario, error) {
factory, ok := registry[id]
if !ok { return nil, fmt.Errorf("unknown scenario: %s", id) }
return factory(client, users, errCount, totalCount, fiveXXCount, rec), nil
}
func register(id string, factory func(*http.Client, []lib.TestUser, *atomic.Int64, *atomic.Int64, *atomic.Int64, *lib.LatencyRecorder) Scenario) {
registry[id] = factory
}
```
- [ ] **Step 2: 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/scenarios/
git add backend/scripts/loadgen/loadgen/scenarios/scenarios.go
git commit -m "feat(loadgen): scenario interface + registry"
```
---
### Task 31: S1 登录
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/scenarios/s1_login.go`
- [ ] **Step 1: 写 S1**
```go
package scenarios
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync/atomic"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
)
type s1Login struct {
client *http.Client
users []lib.TestUser
errCount *atomic.Int64
totalCount *atomic.Int64
fiveXXCount *atomic.Int64
rec *lib.LatencyRecorder
}
func init() { register("S1", newS1) }
func newS1(c *http.Client, u []lib.TestUser, e, t, f *atomic.Int64, r *lib.LatencyRecorder) Scenario {
return &s1Login{client: c, users: u, errCount: e, totalCount: t, fiveXXCount: f, rec: r}
}
func (s *s1Login) Run(ctx context.Context, rpsOverride int, durationOverride time.Duration, dash *lib.Dashboard, breaker *lib.CircuitBreaker) error {
targetRPS := rpsOverride
if targetRPS == 0 { targetRPS = 15 } // 默认拐点
duration := durationOverride
if duration == 0 { duration = 2 * time.Minute }
ticker := time.NewTicker(time.Second / time.Duration(targetRPS))
defer ticker.Stop()
timeout := time.NewTimer(duration)
defer timeout.Stop()
for {
select {
case <-ctx.Done(): return nil
case <-timeout.C: return nil
case <-ticker.C:
u := s.users[rand.Intn(len(s.users))]
s.doLogin(u)
}
}
}
func (s *s1Login) doLogin(u lib.TestUser) {
body, _ := json.Marshal(map[string]string{"mobile": u.Phone, "password": u.Password})
req, _ := http.NewRequest("POST", "/api/v1/auth/login", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
s.fireRequest(req)
}
func (s *s1Login) fireRequest(req *http.Request) {
start := time.Now()
resp, err := s.client.Do(req)
latency := time.Since(start)
s.rec.Record(latency.Microseconds())
s.totalCount.Add(1)
if err != nil {
s.errCount.Add(1)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
s.fiveXXCount.Add(1)
s.errCount.Add(1)
} else if resp.StatusCode >= 400 {
s.errCount.Add(1)
}
}
```
> 真实路径需要 baseURL,简化为 client.Do 时拼上。后续 Task 集中处理。
- [ ] **Step 2: 抽出 fireRequest 到公共文件**
创建 `backend/scripts/loadgen/loadgen/scenarios/common.go`:
```go
package scenarios
import (
"net/http"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
)
func doRequest(client *http.Client, req *http.Request, rec *lib.LatencyRecorder, errCount, totalCount, fiveXXCount *atomic.Int64) {
start := time.Now()
resp, err := client.Do(req)
latency := time.Since(start)
rec.Record(latency.Microseconds())
totalCount.Add(1)
if err != nil { errCount.Add(1); return }
defer resp.Body.Close()
switch {
case resp.StatusCode >= 500:
fiveXXCount.Add(1)
errCount.Add(1)
case resp.StatusCode >= 400:
errCount.Add(1)
}
}
```
- [ ] **Step 3: 重构 s1 使用公共函数 + 提交**
```go
func (s *s1Login) doLogin(u lib.TestUser) {
body, _ := json.Marshal(map[string]string{"mobile": u.Phone, "password": u.Password})
req, _ := http.NewRequest("POST", baseURL+"/api/v1/auth/login", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
const baseURL = "http://101.132.250.62:8080" // TODO: from --target
```
```bash
cd backend && go build ./scripts/loadgen/loadgen/scenarios/
git add backend/scripts/loadgen/loadgen/scenarios/
git commit -m "feat(scenario): S1 login + common doRequest helper"
```
---
### Task 32-36: S2-S7 场景(模板化批量)
> S2-S7 的写法与 S1 高度相似。**本计划只给骨架,实施时按 S1 复制**,每个场景 ~80 行 Go。
> **重要**:实施前必读 spec §5.1 + 附录 B(接口路径速查),并在以下**关键代码位置**反查项目源码(以 `grep` 验证最新签名):
| 场景 | handler 路径参考 | 必查项 |
|---|---|---|
| S2 | `backend/services/assetService/provider/asset_mobile_provider.go` | `GET /assets/me/items` query 参数;`GET /assets/:id` 路径 |
| S3 | `backend/services/socialService/provider/social_mobile_provider.go` | like 接口的 `exhibition_id` 来源(query 还是 body) |
| S4 | `backend/services/assetService/provider/asset_mint_provider.go` | precreate / mints 完整 body schema(material_id vs material_url) |
| S5 | `backend/services/statisticService/provider/dashboard_*.go` | 7 个 dashboard 接口的实际路径(可能带版本前缀) |
| S6 | `backend/services/assetService/provider/asset_ranking_provider.go` | `dimension` 枚举值(已确认:displaying/month/total),`star_id` 是否必须 |
| S7 | `backend/services/galleryService/provider/gallery_mobile_provider.go` | place 接口的 slot_id 来源,DELETE 路径格式 |
**Files (批量创建):**
- `backend/scripts/loadgen/loadgen/scenarios/s2_read.go`
- `backend/scripts/loadgen/loadgen/scenarios/s3_like.go`
- `backend/scripts/loadgen/loadgen/scenarios/s4_mint.go`
- `backend/scripts/loadgen/loadgen/scenarios/s5_dashboard.go`
- `backend/scripts/loadgen/loadgen/scenarios/s6_ranking.go`
- `backend/scripts/loadgen/loadgen/scenarios/s7_place.go`
**统一要求**(每个场景):
1. 接口路径和 body/query 严格按 spec §5.1 + 附录 B,实施前先 grep handler 源确认
2. 数据依赖从 `users[i].AssetIDs/ExhibitionIDs` 随机选
3. S4 调用 `resetMintData()` 在每阶/每轮结束后通过 ssh 触发 `mint_reset.sh`
4. S5 一次"用户会话"= 7 个 `/dashboard/*` 串行请求,baseURL×7,RPS 按会话计
5. S6 一次请求循环 24 种参数组合(2×3×4)
6. S7 用 `slot_index=3` 留作压测槽位
7. 默认 RPS / duration 按 spec §5.3 表格
8. **每个场景在 init() 里 register()**
- [ ] **Step 1: 写 S2 资产读**
**Step 1a**: 实施前 grep handler 确认路径:
```bash
grep -rn "GetMyAssets\|/assets/me/items\|GetAssetByID" backend/services/assetService/provider/ | head
```
**Step 1b**: 写 S2:
```go
// s2_read.go (摘要)
func (s *s2Read) Run(ctx, rpsOverride, durationOverride, ..., stages []int) {
targetRPS := pickRPS(rpsOverride, stages, 250) // step 时取 stages 第一个
duration := pickDuration(durationOverride, 2*time.Minute)
ticker := time.NewTicker(...)
for { select {
case <-ticker.C:
u := s.users[rand.Intn(len(s.users))]
// 50% 概率: 列表, 50%: 详情
if rand.Float64() < 0.5 {
req, _ := http.NewRequest("GET", baseURL+"/api/v1/assets/me/items?page=1&page_size=20", nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
} else {
assetID := u.AssetIDs[rand.Intn(len(u.AssetIDs))]
req, _ := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/assets/%d", baseURL, assetID), nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}}
}
```
- [ ] **Step 2: 写 S3 点赞(含 exhibition_id 处理)**
**Step 2a**: grep 确认 exhibition_id 怎么传:
```bash
grep -rn "exhibition_id\|ExhibitionID" backend/services/socialService/provider/ | grep -i like
```
**Step 2b**: 写 S3(基于 seed 阶段记录的 exhibition_id):
```go
// s3_like.go
func (s *s3Like) doLike(u lib.TestUser) {
// 选一个 user 已上架的 exhibition(每 user 2 个)
if len(u.ExhibitionIDs) == 0 {
return // 没上架的 asset, 跳过
}
exID := u.ExhibitionIDs[rand.Intn(len(u.ExhibitionIDs))]
// 查 exhibitions 表拿 asset_id(因为 seed 没存 asset_id per exhibition,只存了 exID)
// 也可以从 u.AssetIDs 中选(每个 user 的 1, 2 号 asset 都已上架)
assetID := u.AssetIDs[rand.Intn(2)] // 1, 2 号(已上架)
// 50% 点赞, 50% 取消点赞(交替避免 like_count 单调)
if rand.Float64() < 0.5 {
// POST like
body := fmt.Sprintf(`{"exhibition_id":%d}`, exID)
req, _ := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/social/assets/%d/like", baseURL, assetID), strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
} else {
// DELETE like
req, _ := http.NewRequest("DELETE", fmt.Sprintf("%s/api/v1/social/assets/%d/like", baseURL, assetID), nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}
```
> ⚠️ **如果 grep 发现 exhibition_id 走 query 而非 body**,调整 body 字段到 query string。
- [ ] **Step 3: 写 S4 铸造(含 reset)**
> ⚠️ **跨包引用修复**:`LoadtestPlaceholderURL` 在 `seed` package(`package main`),**Go 不允许** `scenarios` package(`package scenarios`)导入 `package main`。
> **修复方案**:把常量放到 `loadgen/lib/config.go`(新文件),seed 和 scenarios 都从 env 读,默认值两边各定义一份。
**Step 3a**: 创建 `backend/scripts/loadgen/loadgen/lib/config.go`:
```go
package lib
import "os"
const DefaultLoadtestPlaceholderURL = "<OSS_URL>/loadtest-placeholder.png"
// LoadtestPlaceholderURL 优先从 $LOADTEST_PLACEHOLDER_URL 读
func LoadtestPlaceholderURL() string {
if v := os.Getenv("LOADTEST_PLACEHOLDER_URL"); v != "" { return v }
return DefaultLoadtestPlaceholderURL
}
const (
LoadtestStarID = int64(999900)
LoadtestUserMin = int64(30000001)
LoadtestUserMax = int64(30001000)
)
```
**Step 3b**: seed/assets.go 改为:
```go
// 把原 const LoadtestPlaceholderURL 改为:
func loadtestPlaceholderURL() string {
if v := os.Getenv("LOADTEST_PLACEHOLDER_URL"); v != "" { return v }
return "<OSS_URL>/loadtest-placeholder.png" // 与 lib.DefaultLoadtestPlaceholderURL 同步
}
```
**Step 3c**: 写 s4_mint.go:
```go
import "os/exec"
import "github.com/topfans/backend/scripts/loadgen/loadgen/lib"
type s4Mint struct {
// ... 通用字段
prodSSH string
roundIdx int
}
func (s *s4Mint) Run(ctx, rpsOverride, durationOverride, ..., stages []int) {
for stageIdx, stageRPS := range stages {
log.Printf("S4 stage %d: %d RPS × %v", stageIdx, stageRPS, 2*time.Minute)
runStage(ctx, stageRPS, 2*time.Minute, s)
log.Printf("S4 stage %d done, resetting...", stageIdx)
// ssh 触发 reset(每阶必须 reset,否则 10k 配额耗尽)
cmd := exec.Command("ssh", s.prodSSH, "bash /opt/topfans/loadtest/scripts/mint_reset.sh")
if out, err := cmd.CombinedOutput(); err != nil {
log.Printf("⚠️ mint reset failed: %v\n%s", err, out)
}
s.roundIdx++
}
}
func (s *s4Mint) doMint(u lib.TestUser) {
// 1. POST /api/v1/assets/mints/precreate {material_url, name, info, ...}
precreateBody, _ := json.Marshal(map[string]any{
"material_url": lib.LoadtestPlaceholderURL(),
"name": fmt.Sprintf("loadtest_mint_%d_round%d_%d", u.UserID, s.roundIdx, time.Now().UnixNano()),
"info": "loadtest",
"material_id": 0, // 如有需要 grep 确认
})
req, _ := http.NewRequest("POST", baseURL+"/api/v1/assets/mints/precreate", bytes.NewReader(precreateBody))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
// ... doRequest, 解析返回的 order_id
// 2. POST /api/v1/assets/mints {order_id}
}
```
> **实施前必查**: grep `precreate` 在 assetService handler 中确认 body schema,特别是 `material_url` vs `material_id` 哪个必填。
- [ ] **Step 4: 写 S5 看板**
```go
endpoints := []string{
"/api/v1/dashboard/today-overview",
"/api/v1/dashboard/income-curve",
"/api/v1/dashboard/exhibition-summary",
"/api/v1/dashboard/like-income-by-level",
"/api/v1/dashboard/top-assets",
"/api/v1/dashboard/level-distribution",
"/api/v1/dashboard/upgrade-progress",
}
func (s *s5Dashboard) doSession(u lib.TestUser) {
sessionStart := time.Now()
for _, ep := range endpoints {
req, _ := http.NewRequest("GET", baseURL+ep, nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
s.rec.Record(time.Since(sessionStart).Microseconds()) // P95 按会话
}
```
- [ ] **Step 5: 写 S6 榜单**
```go
var (
s6Dimensions = []string{"displaying", "month", "total"}
s6StarIDs = []int64{87, 88, 93, 999900}
s6Endpoints = []string{"/api/v1/rankings/hot", "/api/v1/rankings/original"}
)
func (s *s6Ranking) doOne() {
for _, ep := range s6Endpoints {
for _, dim := range s6Dimensions {
for _, sid := range s6StarIDs {
url := fmt.Sprintf("%s%s?dimension=%s&star_id=%d&page=1&page_size=10",
baseURL, ep, dim, sid)
req, _ := http.NewRequest("GET", url, nil)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}
}
}
```
- [ ] **Step 6: 写 S7 上架**
```go
// seed 没存 slot_id per user, 需在 seed 阶段补充:
// 在 seed/tokens.go 里多读一列 booth_slots.slot_id WHERE slot_index=3
// 然后 TestUser 加 SlotID3 字段
// S7 用 slot_index=3(留作压测槽位),从 user 未上架的 asset_ids[2..4] 选
func (s *s7Place) doPlace(u lib.TestUser) {
if u.SlotID3 == 0 || len(u.AssetIDs) < 3 { return }
// 50% place, 50% unplace
if rand.Float64() < 0.5 {
assetID := u.AssetIDs[2+rand.Intn(3)] // 3,4,5 号
body, _ := json.Marshal(map[string]any{
"slot_id": u.SlotID3,
"asset_id": assetID,
})
req, _ := http.NewRequest("POST", baseURL+"/api/v1/galleries/place", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
} else {
req, _ := http.NewRequest("DELETE", fmt.Sprintf("%s/api/v1/galleries/slots/%d/asset", baseURL, u.SlotID3), nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}
```
- [ ] **Step 7: 补充 seed/tokens.go 增加 SlotID3 字段**
在 Task 15 `GenerateTokensForLoadtest` 中,`SELECT` 多加一列 `booth_slots.slot_id` WHERE `slot_index=3`,`TestUser` 加 `SlotID3 int64`,CSV 多写一列。同步 `csv.go``LoadUsers` 解析。
- [ ] **Step 8: 验证编译 + 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/scenarios/
git add backend/scripts/loadgen/loadgen/scenarios/ backend/scripts/loadgen/seed/tokens.go backend/scripts/loadgen/loadgen/lib/csv.go
git commit -m "feat(scenario): S2-S7 implementations (read/like/mint/dashboard/ranking/place) + SlotID3"
```
---
### Task 37: scenarios 单元测试(构造逻辑)
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/scenarios/scenarios_test.go`
- [ ] **Step 1: 测试场景注册表**
```go
package scenarios
import "testing"
func TestAllScenariosRegistered(t *testing.T) {
expected := []string{"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
for _, id := range expected {
if _, ok := registry[id]; !ok {
t.Errorf("scenario %s not registered", id)
}
}
}
func TestMintNameFormat(t *testing.T) {
name := fmt.Sprintf("loadtest_mint_%d_round%d", int64(30000001), 3)
if !strings.HasPrefix(name, "loadtest_mint_") {
t.Errorf("name %q must start with loadtest_mint_", name)
}
}
```
- [ ] **Step 2: 运行 + 提交**
```bash
cd backend && go test ./scripts/loadgen/loadgen/scenarios/ -v
git add backend/scripts/loadgen/loadgen/scenarios/scenarios_test.go
git commit -m "test(scenario): verify all 7 scenarios registered, mint name format"
```
---
## Phase 7: 报告 + 监控 + 恢复(次日凌晨前)
### Task 38: reporter/json.go + csv.go
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/reporter/json.go`
- 创建: `backend/scripts/loadgen/loadgen/reporter/csv.go`
- [ ] **Step 1: 写 JSON 报告**
```go
package reporter
import (
"encoding/json"
"os"
"github.com/HdrHistogram/hdrhistogram-go"
)
type RunReport struct {
TotalRequests int64 `json:"total_requests"`
Errors int64 `json:"errors"`
FiveXX int64 `json:"five_xx"`
P50Us int64 `json:"p50_us"`
P95Us int64 `json:"p95_us"`
P99Us int64 `json:"p99_us"`
MaxUs int64 `json:"max_us"`
}
func WriteJSON(path string, h *hdrhistogram.Histogram, total, errs, fiveXX int64) error {
r := RunReport{
TotalRequests: total,
Errors: errs,
FiveXX: fiveXX,
P50Us: h.ValueAtQuantile(50),
P95Us: h.ValueAtQuantile(95),
P99Us: h.ValueAtQuantile(99),
MaxUs: h.Max(),
}
f, err := os.Create(path)
if err != nil { return err }
defer f.Close()
return json.NewEncoder(f).Encode(r)
}
```
- [ ] **Step 2: 写 CSV baseline**
```go
package reporter
import "os"
func WriteBaselineCSV(path string, scenarios []string, baseline map[string]RunReport) error {
f, err := os.Create(path)
if err != nil { return err }
defer f.Close()
_, err = f.WriteString("scenario,total,errors,five_xx,p50_ms,p95_ms,p99_ms,max_ms\n")
if err != nil { return err }
for _, s := range scenarios {
r := baseline[s]
_, err = f.WriteString(fmt.Sprintf("%s,%d,%d,%d,%.2f,%.2f,%.2f,%.2f\n",
s, r.TotalRequests, r.Errors, r.FiveXX,
float64(r.P50Us)/1000, float64(r.P95Us)/1000,
float64(r.P99Us)/1000, float64(r.MaxUs)/1000))
if err != nil { return err }
}
return nil
}
```
- [ ] **Step 3: 验证 + 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/reporter/
git add backend/scripts/loadgen/loadgen/reporter/
git commit -m "feat(reporter): JSON + baseline CSV writers"
```
---
### Task 39: reporter/plot.go - 三联图
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/reporter/plot.go`
- [ ] **Step 1: 写 PlotRPSLatencyError**
```go
package reporter
import (
"gonum.org/v1/gonum/plot"
"gonum.org/v1/gonum/plot/plotter"
"gonum.org/v1/gonum/plot/vg"
"os"
)
func PlotRPSLatencyError(scenario string, samples []Sample, outPath string) error {
p := plot.New()
p.Title.Text = scenario + " — RPS / P99 / Error"
p.X.Label.Text = "Stage"
p.Y.Label.Text = "Value"
rpsPts := make(plotter.XYs, len(samples))
p99Pts := make(plotter.XYs, len(samples))
errPts := make(plotter.XYs, len(samples))
for i, s := range samples {
rpsPts[i].X = float64(i)
rpsPts[i].Y = s.RPS
p99Pts[i].X = float64(i)
p99Pts[i].Y = s.P99Ms
errPts[i].X = float64(i)
errPts[i].Y = s.ErrorRate * 100
}
rpsLine, _ := plotter.NewLine(rpsPts)
p99Line, _ := plotter.NewLine(p99Pts)
errLine, _ := plotter.NewLine(errPts)
p.Add(rpsLine, p99Line, errLine)
p.Legend.Add("RPS", rpsLine)
p.Legend.Add("P99 ms", p99Line)
p.Legend.Add("Error %", errLine)
f, err := os.Create(outPath)
if err != nil { return err }
defer f.Close()
return p.Save(12*vg.Inch, 6*vg.Inch, f)
}
type Sample struct {
RPS float64
P99Ms float64
ErrorRate float64
}
```
- [ ] **Step 2: 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/reporter/
git add backend/scripts/loadgen/loadgen/reporter/plot.go
git commit -m "feat(reporter): gonum/plot RPS-Latency-Error three-line chart"
```
---
### Task 40: reporter/markdown.go
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/reporter/markdown.go`
- [ ] **Step 1: 写 GenerateMarkdown**
```go
package reporter
import (
"fmt"
"os"
"time"
)
type ScenarioReport struct {
ID string
Stages []StageReport
KneeRPS int
TopBottleneck string
}
type StageReport struct {
RPS int
Duration time.Duration
P50Ms float64
P95Ms float64
P99Ms float64
ErrorRate float64
}
func GenerateMarkdown(path string, scenarios []ScenarioReport) error {
f, err := os.Create(path)
if err != nil { return err }
defer f.Close()
fmt.Fprintf(f, "# 压测报告\n\n")
for _, s := range scenarios {
fmt.Fprintf(f, "## %s\n\n", s.ID)
fmt.Fprintf(f, "**拐点 RPS**: %d\n\n", s.KneeRPS)
fmt.Fprintf(f, "**Top 瓶颈**: %s\n\n", s.TopBottleneck)
fmt.Fprintf(f, "| Stage | RPS | P50ms | P95ms | P99ms | Err%% |\n")
fmt.Fprintf(f, "|-------|-----|-------|-------|-------|------|\n")
for _, st := range s.Stages {
fmt.Fprintf(f, "| %s | %d | %.1f | %.1f | %.1f | %.1f |\n",
st.Duration, st.RPS, st.P50Ms, st.P95Ms, st.P99Ms, st.ErrorRate*100)
}
fmt.Fprintf(f, "\n")
}
return nil
}
```
- [ ] **Step 2: 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/reporter/
git add backend/scripts/loadgen/loadgen/reporter/markdown.go
git commit -m "feat(reporter): markdown report generator"
```
---
### Task 41: preflight.go - 7 项开压前检查
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/preflight.go`
- [ ] **Step 1: 写 PreflightChecks**
```go
package main
import (
"fmt"
"net/http"
"os"
"os/exec"
"strings"
"time"
)
type CheckResult struct {
Name string
Passed bool
Detail string
}
func runPreflight(target, prodSSH string) error {
checks := []CheckResult{}
// ① Gateway /health
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(target + "/health")
checks = append(checks, CheckResult{
Name: "① Gateway /health",
Passed: err == nil && resp.StatusCode == 200,
Detail: fmt.Sprintf("status=%d err=%v", ifZero(resp.StatusCode), err),
})
// ② SSH 到 prod
cmd := exec.Command("ssh", "-o", "ConnectTimeout=5", prodSSH, "echo connected")
out, err := cmd.CombinedOutput()
checks = append(checks, CheckResult{
Name: "② SSH to prod",
Passed: err == nil && strings.Contains(string(out), "connected"),
Detail: strings.TrimSpace(string(out)),
})
// ③ pg_dump 备份文件存在
cmd = exec.Command("ssh", prodSSH, "ls -t /opt/topfans/backups/pre-loadtest-*.sql 2>/dev/null | head -1")
out, _ = cmd.Output()
backupFile := strings.TrimSpace(string(out))
info, statErr := os.Stat(backupFile)
sizeOK := statErr == nil && info.Size() > 50*1024*1024
checks = append(checks, CheckResult{
Name: "③ pg_dump backup exists (>50MB)",
Passed: sizeOK,
Detail: fmt.Sprintf("file=%s size=%d", backupFile, ifZero64(info.Size())),
})
// ④ 阿里云快照 (24h 内)
checks = append(checks, CheckResult{
Name: "④ 阿里云快照 < 24h (人工确认)",
Passed: true, // 需人工确认
Detail: "需运维在 ECS 控制台确认",
})
// ⑤ prod 磁盘空闲 > 10GB
cmd = exec.Command("ssh", prodSSH, "df -B1G /opt | tail -1 | awk '{print $4}'")
out, _ = cmd.Output()
var freeGB int
fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &freeGB)
checks = append(checks, CheckResult{
Name: "⑤ prod 磁盘空闲 > 10GB",
Passed: freeGB > 10,
Detail: fmt.Sprintf("free=%dGB", freeGB),
})
// ⑥ users.csv 加载
if _, err := os.Stat("users.csv"); err == nil {
if users, err := libLoadUsers("users.csv"); err == nil {
checks = append(checks, CheckResult{
Name: "⑥ users.csv 1000 rows",
Passed: len(users) == 1000,
Detail: fmt.Sprintf("rows=%d", len(users)),
})
}
} else {
checks = append(checks, CheckResult{Name: "⑥ users.csv 存在", Passed: false, Detail: err.Error()})
}
// ⑦ JWT_SECRET 验证
if len(os.Getenv("JWT_SECRET")) > 0 {
checks = append(checks, CheckResult{
Name: "⑦ JWT_SECRET set",
Passed: true,
Detail: "set",
})
} else {
checks = append(checks, CheckResult{Name: "⑦ JWT_SECRET set", Passed: false, Detail: "empty"})
}
// 输出
for _, c := range checks {
mark := "✓"
if !c.Passed { mark = "✗" }
fmt.Printf("%s %s — %s\n", mark, c.Name, c.Detail)
}
for _, c := range checks {
if !c.Passed { return fmt.Errorf("preflight failed: %s", c.Name) }
}
fmt.Println("ALL CHECKS PASSED — 可以开压")
return nil
}
```
- [ ] **Step 2: 添加辅助函数**
```go
func ifZero(v int) int { if v == 0 { return -1 }; return v }
func ifZero64(v int64) int64 { if v == 0 { return -1 }; return v }
```
- [ ] **Step 3: 验证 + 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/
git add backend/scripts/loadgen/loadgen/preflight.go
git commit -m "feat(loadgen): preflight 7 checks (health, ssh, backup, disk, csv, jwt)"
```
---
### Task 42: verify.go - 压测后完整性校验
**Files:**
- 创建: `backend/scripts/loadgen/loadgen/verify.go`
- [ ] **Step 1: 写 runVerify**
```go
package main
import (
"fmt"
"os/exec"
"strings"
)
func runVerify(prodSSH string) error {
// 1. 真实数据未变(只查询 star_id != 999900)
pre := sshPG(prodSSH, "SELECT count(*) FROM mint_orders WHERE star_id != 999900")
fmt.Printf(" real mint_orders: %s\n", pre)
// 2. PG 连接已回落
conn := sshPG(prodSSH, "SELECT count(*) FROM pg_stat_activity")
fmt.Printf(" PG active conn: %s\n", conn)
// 3. 容器 RestartCount 未增(对比 baseline,简化: 只看绝对值)
out, _ := exec.Command("ssh", prodSSH, "docker ps --format '{{.Names}} {{.Status}}'").Output()
fmt.Printf(" 容器状态:\n%s\n", out)
return nil
}
func sshPG(ssh, query string) string {
cmd := exec.Command("ssh", ssh,
fmt.Sprintf(`export PGPASSWORD="${DB_PASSWORD:-postgres123}"; PG=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1); docker exec -e PGPASSWORD="$PGPASSWORD" "$PG" psql -U postgres -d topfans -t -c "%s"`, query))
out, _ := cmd.Output()
return strings.TrimSpace(string(out))
}
```
- [ ] **Step 2: 验证 + 提交**
```bash
cd backend && go build ./scripts/loadgen/loadgen/
git add backend/scripts/loadgen/loadgen/verify.go
git commit -m "feat(loadgen): post-test verify (real-data integrity, conn count, container status)"
```
---
### Task 43: monitor/sample.sh
**Files:**
- 创建: `backend/scripts/loadgen/monitor/sample.sh`
- [ ] **Step 1: 写 sample.sh**
```bash
#!/bin/bash
# /opt/topfans/loadtest/monitor/sample.sh
# 后台采样,写到 metrics-feed.jsonl
set -e
OUT="/opt/topfans/loadtest/metrics-feed.jsonl"
INTERVAL=${INTERVAL:-5}
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
[ -z "$PG_CONTAINER" ] && { echo "❌ 找不到 postgres 容器"; exit 1; }
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
echo "📊 sampling to $OUT every ${INTERVAL}s, pid $$"
while true; do
TS=$(date +%s)
# docker stats (一次性)
DOCKER=$(docker stats --no-stream --format '{{.Name}}={{.MemPerc}}' 2>/dev/null | tr '\n' ' ')
# pg active
PG_ACTIVE=$(docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -tA -c "SELECT count(*) FROM pg_stat_activity WHERE state='active'" 2>/dev/null)
# disk
DISK_FREE=$(df -B1G /opt | tail -1 | awk '{print $4}')
echo "$TS pg_active=$PG_ACTIVE disk_free=$DISK_FREE $DOCKER" >> "$OUT"
sleep "$INTERVAL"
done
```
- [ ] **Step 2: 提交**
```bash
chmod +x backend/scripts/loadgen/monitor/sample.sh
git add backend/scripts/loadgen/monitor/sample.sh
git commit -m "feat(monitor): sample.sh writes metrics-feed.jsonl every 5s"
```
---
### Task 44: monitor/docker-compose.monitor.yml
**Files:**
- 创建: `backend/scripts/loadgen/monitor/docker-compose.monitor.yml`
- [ ] **Step 1: 写 prom + grafana + exporters**
```yaml
version: "3.8"
services:
cadvisor:
image: gcr.io/cadvisor/cadvisor:v0.47.0
container_name: topfans-cadvisor
volumes:
- /:/rootfs:ro
- /var/run:/var/run:ro
- /sys:/sys:ro
- /var/lib/docker/:/var/lib/docker:ro
ports:
- "8088:8080"
restart: unless-stopped
node-exporter:
image: prom/node-exporter:v1.7.0
container_name: topfans-node-exporter
pid: host
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
ports:
- "9100:9100"
restart: unless-stopped
postgres-exporter:
image: prometheuscommunity/postgres-exporter:v0.13.2
container_name: topfans-pg-exporter
environment:
DATA_SOURCE_NAME: "postgresql://postgres:${DB_PASSWORD:-postgres123}@postgres:5432/topfans?sslmode=disable"
ports:
- "9187:9187"
restart: unless-stopped
redis-exporter:
image: oliver006/redis_exporter:v1.58.0
container_name: topfans-redis-exporter
environment:
REDIS_ADDR: "redis://redis:6379"
ports:
- "9121:9121"
restart: unless-stopped
prometheus:
image: prom/prometheus:v2.51.2
container_name: topfans-prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- ./grafana-dashboards/:/etc/grafana/dashboards/:ro
ports:
- "9090:9090"
restart: unless-stopped
grafana:
image: grafana/grafana:10.4.2
container_name: topfans-grafana
environment:
GF_SECURITY_ADMIN_PASSWORD: "${GRAFANA_PASSWORD:-admin}"
volumes:
- grafana-data:/var/lib/grafana
- ./grafana-dashboards/:/etc/grafana/dashboards/:ro
ports:
- "3000:3000"
restart: unless-stopped
volumes:
grafana-data:
```
> ⚠️ **被压机需 hostname 网络** 或 `network_mode: host`(压测场景简化)。如不接入 prod compose network,exporters 无法连 postgres/redis,需用 `host: localhost`。
- [ ] **Step 2: 添加 prometheus.yml 配置**
创建 `backend/scripts/loadgen/monitor/prometheus.yml`:
```yaml
global:
scrape_interval: 5s
scrape_configs:
- job_name: cadvisor
static_configs:
- targets: ["cadvisor:8080"]
- job_name: node
static_configs:
- targets: ["node-exporter:9100"]
- job_name: postgres
static_configs:
- targets: ["postgres-exporter:9187"]
- job_name: redis
static_configs:
- targets: ["redis-exporter:9121"]
- job_name: loadgen
metrics_path: /metrics
static_configs:
- targets: ["host.docker.internal:9091"]
```
- [ ] **Step 3: 提交**
```bash
git add backend/scripts/loadgen/monitor/
git commit -m "feat(monitor): prometheus+grafana stack with 4 exporters"
```
---
### Task 45: 4 个 Grafana 面板
**Files:**
- 创建: `backend/scripts/loadgen/monitor/grafana-dashboards/01-host.json` (整机)
- 创建: `backend/scripts/loadgen/monitor/grafana-dashboards/02-containers.json`
- 创建: `backend/scripts/loadgen/monitor/grafana-dashboards/03-postgres.json`
- 创建: `backend/scripts/loadgen/monitor/grafana-dashboards/04-business.json`
- [ ] **Step 1: 创建 01-host.json(整机)**
> 完整 Grafana dashboard JSON 过长(>500 行),**本计划用占位**,实施时复制 prod 现有 dashboard 或从官方模板导入。
最小可用内容:
```json
{
"title": "Load Test - Host Overview",
"panels": [
{
"title": "CPU Usage",
"type": "graph",
"targets": [
{ "expr": "100 - (avg by(instance)(rate(node_cpu_seconds_total{mode=\"idle\"}[1m])) * 100)" }
]
}
]
}
```
- [ ] **Step 2: 类似创建 02-containers.json / 03-postgres.json / 04-business.json**
- [ ] **Step 3: 提交占位**
```bash
git add backend/scripts/loadgen/monitor/grafana-dashboards/
git commit -m "feat(monitor): 4 Grafana dashboard placeholders (host/containers/pg/business)"
```
---
### Task 46: recover/emergency-stop.sh + restore-from-backup.sh
**Files:**
- 创建: `backend/scripts/loadgen/recover/emergency-stop.sh`
- 创建: `backend/scripts/loadgen/recover/restore-from-backup.sh`
- [ ] **Step 1: 写 emergency-stop.sh**
```bash
#!/bin/bash
# 一键灭火
set -e
echo "🚨 emergency stop"
pkill -9 loadgen 2>/dev/null || true
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
[ -z "$PG_CONTAINER" ] && { echo "❌ 找不到 postgres 容器"; exit 1; }
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -c "
SELECT pg_terminate_backend(pid) FROM pg_stat_activity
WHERE state != 'idle' AND now() - query_start > interval '10 seconds'
AND usename = 'postgres';
"
cd /opt/topfans/docker
docker-compose -f docker-compose.prod.yml restart
sleep 30
if curl -fs http://localhost:8080/health; then
echo "✅ gateway 恢复"
else
echo "⚠️ gateway 仍未恢复"
exit 1
fi
```
- [ ] **Step 2: 写 restore-from-backup.sh**
```bash
#!/bin/bash
# 用法: bash restore-from-backup.sh /opt/topfans/backups/pre-loadtest-XXXX.sql
set -e
BACKUP_FILE=$1
[ -f "$BACKUP_FILE" ] || { echo "❌ 备份文件不存在: $BACKUP_FILE"; exit 1; }
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
[ -z "$PG_CONTAINER" ] && { echo "❌ 找不到 postgres 容器"; exit 1; }
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
echo "🛑 停应用层..."
docker ps --filter 'name=topfans-' --format '{{.Names}}' \
| grep -v postgres | grep -v redis | grep -v exporter \
| xargs -r docker stop
echo "🗑️ 删库重建..."
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -c "DROP DATABASE topfans;"
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -c "CREATE DATABASE topfans;"
echo "📥 还原 $BACKUP_FILE ..."
docker exec -i -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans < "$BACKUP_FILE"
echo "🚀 启动应用层..."
cd /opt/topfans/docker
docker-compose -f docker-compose.prod.yml --profile prod up -d
sleep 30
curl -fs http://localhost:8080/health && echo "✅ 恢复完成"
```
- [ ] **Step 3: 提交**
```bash
chmod +x backend/scripts/loadgen/recover/*.sh
git add backend/scripts/loadgen/recover/
git commit -m "feat(recover): emergency-stop + restore-from-backup scripts"
```
---
## Phase 8: 部署 + 预演(Day 1 23:00)
### Task 47: 编译 + scp 二进制
**Files:**
- 无代码,部署命令
- [ ] **Step 1: 编译 seed + loadgen**
```bash
cd backend
GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o bin/seed ./scripts/loadgen/seed/
GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o bin/loadgen ./scripts/loadgen/loadgen/
ls -lh bin/
```
Expected: 两个 ~10-20MB 的二进制。
- [ ] **Step 2: scp seed 到 prod**
```bash
scp backend/bin/seed root@101.132.250.62:/opt/topfans/loadtest/seed
```
- [ ] **Step 3: scp loadgen 到压源**
```bash
scp backend/bin/loadgen -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/loadgen
```
- [ ] **Step 4: scp 脚本到 prod**
```bash
scp backend/scripts/loadgen/recover/*.sh root@101.132.250.62:/opt/topfans/loadtest/recover/
ssh root@101.132.250.62 "chmod +x /opt/topfans/loadtest/recover/*.sh"
```
- [ ] **Step 5: 验证可执行**
> 注:Go `flag` 包默认不响应 `--help`(只响应 `-h` / `-help`)。验证方式改用 `-h`:
```bash
ssh root@101.132.250.62 "/opt/topfans/loadtest/seed -h" 2>&1 | head -20
ssh -i ~/.ssh/loadgen-key root@<压源IP> "/opt/loadgen/loadgen -h" 2>&1 | head -20
```
Expected: 都显示 help(列出 `--jwt-secret`, `--db-name` 等 flags)。
---
### Task 48: 在 prod 上传占位图到 OSS
**Files:**
- 无,OSS 上传
- [ ] **Step 1: 准备占位图(本地)**
```bash
# 用任意 1x1 PNG 作为占位
python3 -c "
import base64
png = base64.b64decode('iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==')
open('/tmp/loadtest-placeholder.png','wb').write(png)
"
```
- [ ] **Step 2: 用项目现有接口拿签名**
Run(spec §4.4):
```bash
# 调 /api/v1/assets/oss/upload-signature 拿 PUT URL
# PUT 文件到 OSS 的 loadtest/loadtest-placeholder.png
# 拷贝返回的可访问 URL
```
> 实施时需读 `backend/services/assetService/.../oss.go` 找到签名接口的精确 path/参数。
- [ ] **Step 3: 把 URL 写回 assets.go**
打开 `backend/scripts/loadgen/seed/assets.go`,替换:
```go
const LoadtestPlaceholderURL = "<OSS_URL>/loadtest-placeholder.png"
```
→ 实际 URL。
- [ ] **Step 4: 重新编译 + 重新部署**
```bash
cd backend && GOOS=linux GOARCH=amd64 go build -o bin/seed ./scripts/loadgen/seed/
scp bin/seed root@101.132.250.62:/opt/topfans/loadtest/seed
```
- [ ] **Step 5: 提交占位 URL 常量**
```bash
git add backend/scripts/loadgen/seed/assets.go
git commit -m "chore(seed): set LoadtestPlaceholderURL to real OSS URL"
```
---
### Task 49: 在本地 docker 跑 seed 完整流程
**Files:**
- 无,本地验证
- [ ] **Step 1: 本地起 docker-compose**
```bash
cd docker
docker-compose -f docker-compose.local.yml up -d
```
- [ ] **Step 2: 在本地跑 seed**
```bash
cd backend
go build -o bin/seed ./scripts/loadgen/seed/
DB_PASSWORD=postgres123 JWT_SECRET=topfans-secret-key-local-dev-only \
./bin/seed --db-name=top-fans --db-host=localhost
```
Expected:
- 输出 "✓ stars/users/profiles/... seeded"
- 末尾输出 "✅ seed + tokens completed"
- 当前目录生成 `users.csv` (~150KB)
- [ ] **Step 3: 验证数据量**
```bash
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
docker exec -e PGPASSWORD=postgres123 "$PG_CONTAINER" psql -U postgres -d top-fans -c "
SELECT
(SELECT count(*) FROM users WHERE id BETWEEN 30000001 AND 30001000) AS users,
(SELECT count(*) FROM assets WHERE star_id = 999900) AS assets,
(SELECT count(*) FROM fan_profiles WHERE star_id = 999900) AS profiles,
(SELECT count(*) FROM friendships WHERE star_id = 999900) AS friendships;
"
```
Expected: 1000 / 5000 / 1000 / ~10000
- [ ] **Step 4: 跑 cleanup 验证安全**
```bash
DB_PASSWORD=postgres123 ./bin/seed --db-name=top-fans --db-host=localhost --cleanup
# 验证 star_id=999900 数据全清空
```
---
### Task 50: 预演 dry run(--monitor=off mini baseline)
**Files:**
- 无,执行命令
- [ ] **Step 1: 压源上准备 users.csv**
```bash
# 把 prod 上生成的 users.csv 复制到压源
scp root@101.132.250.62:/opt/topfans/loadtest/users.csv <压源>:/opt/loadgen/
ssh -i ~/.ssh/loadgen-key root@<压源IP> "head -1 /opt/loadgen/users.csv; wc -l /opt/loadgen/users.csv"
```
Expected: 1001 行(1 header + 1000 data)。
- [ ] **Step 2: 在 prod 上启 sample.sh 后台**
```bash
ssh root@101.132.250.62 "nohup bash /opt/topfans/loadtest/monitor/sample.sh > /tmp/sample.log 2>&1 &"
sleep 5
ssh root@101.132.250.62 "tail -3 /opt/topfans/loadtest/metrics-feed.jsonl"
```
Expected: 看到 pg_active / disk_free 行。
- [ ] **Step 3a: 跑 spec baseline(--monitor=off, 1 RPS × 30s × 7 场景,符合 spec §5.3)**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
for s in S1 S2 S3 S4 S5 S6 S7; do
/opt/loadgen/loadgen --cmd=run --scenarios=$s --rps=1 --duration=30s --monitor=off 2>&1 | tail -15
done
EOF
```
Expected: 每个场景末尾输出 per-minute summary,err 接近 0%,P50/P95/P99 稳定。
- [ ] **Step 3b: 跑工具链 smoke(--monitor=off, 10 RPS × 30s × S1,验证红线不误触)**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
/opt/loadgen/loadgen --cmd=run --scenarios=S1 --rps=10 --duration=30s --monitor=off 2>&1 | tail -15
EOF
```
Expected: toolchain 顺畅,无 panic,err 接近 0%。
> **Step 3a vs 3b 区别**:3a 验证 spec baseline(1 RPS),3b 验证工具链在稍高 RPS 下不误触红线。两步都做才完整。
- [ ] **Step 4: 收尾:停 sample.sh + cleanup**
```bash
ssh root@101.132.250.62 "pkill -f sample.sh || true"
ssh root@101.132.250.62 "cd /opt/topfans/loadtest && ./seed --cleanup"
```
- [ ] **Step 5: 记录 dry-run 结果**
把 preflight/dry-run 输出粘贴到 `docs/loadtest/round1/seed-validation.log`:
```bash
echo "=== dry run at $(date) ===" >> docs/loadtest/round1/seed-validation.log
# 把执行 tail 输出贴进去
```
---
## Phase 9: 第一轮压测窗口(Day 2 02:00-06:00)
> **窗口严格控制**:02:00 开压,05:45 收尾,06:00 全部清理。所有 Task 按时间盒执行,超时立即 emergency-stop + review。
### Task 51: 01:30 - Preflight(7 项)
**Files:**
- 产出: `docs/loadtest/round1/monitoring/preflight.log`
- [ ] **Step 1: 在压源跑 preflight**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> "/opt/loadgen/loadgen --cmd=preflight --target=http://101.132.250.62:8080 --prod-ssh=root@101.132.250.62 2>&1 | tee /opt/loadgen/preflight.log"
```
Expected: 7 项全 ✓,末行 "ALL CHECKS PASSED"。
- [ ] **Step 2: 拉回日志到本地**
```bash
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/preflight.log docs/loadtest/round1/monitoring/preflight.log
```
- [ ] **Step 3: 任一项 fail → 立即停,发 review ticket**
> 不进入 Task 52。
---
### Task 52: 01:55 - pg_dump 备份
**Files:**
- 产出: `/opt/topfans/backups/pre-loadtest-YYYYMMDD-HHMM.sql` (on prod)
- [ ] **Step 1: 在 prod 上备份**
```bash
ssh root@101.132.250.62 <<'EOF'
mkdir -p /opt/topfans/backups
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" pg_dump -U postgres -d topfans \
-f /opt/topfans/backups/pre-loadtest-$(date +%Y%m%d-%H%M).sql
ls -lh /opt/topfans/backups/pre-loadtest-*.sql
EOF
```
Expected: 输出文件 > 50MB。
---
### Task 53: 02:00 - prod seed
**Files:**
- 产出: `users.csv` 在 prod + 压源
- [ ] **Step 1: 在 prod 跑 seed**
```bash
ssh root@101.132.250.62 <<'EOF'
cd /opt/topfans/loadtest
export DB_PASSWORD=$(grep DB_PASSWORD /opt/topfans/docker/.env.prod | cut -d= -f2)
export JWT_SECRET=$(grep JWT_SECRET /opt/topfans/docker/.env.prod | cut -d= -f2)
./seed --db-name=topfans --jwt-secret="$JWT_SECRET" 2>&1 | tail -20
EOF
```
Expected: 全部 ✓, "✅ seed + tokens completed"。
- [ ] **Step 2: 复制 users.csv 到压源**
```bash
scp root@101.132.250.62:/opt/topfans/loadtest/users.csv <压源>:/opt/loadgen/users.csv
```
- [ ] **Step 3: 验证 users.csv 1001 行**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> "wc -l /opt/loadgen/users.csv"
```
Expected: `1001`
---
### Task 54: 02:01 - 启动 sample.sh 后台
**Files:**
- 产出: `metrics-feed.jsonl` 在 prod
- [ ] **Step 1: 启动**
```bash
ssh root@101.132.250.62 <<'EOF'
nohup bash /opt/topfans/loadtest/monitor/sample.sh > /tmp/sample.log 2>&1 &
echo "started pid $!"
sleep 3
tail -2 /opt/topfans/loadtest/metrics-feed.jsonl
EOF
```
Expected: 看到 pg_active / disk_free 行。
---
### Task 55: 02:02-02:29 - Baseline 阶段(21 min)
**Files:**
- 产出: `baseline-*.json` × 7
- [ ] **Step 1: 压源跑 baseline**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
mkdir -p reports/run-$(date +%Y%m%d-%H%M%S)
for s in S1 S2 S3 S4 S5 S6 S7; do
echo "=== $s baseline ==="
/opt/loadgen/loadgen --cmd=run --scenarios=$s --rps=1 --duration=3m --monitor=lite 2>&1 | tee -a reports/baseline-$s.log
echo "" >> reports/baseline-$s.log
sleep 60 # 1min buffer
done
EOF
```
Expected: 7 个场景各 3min,err 接近 0%,P50/P95/P99 稳定。
---
### Task 56: 02:29-05:35 - Step 阶梯阶段(每场景 12min + 15min buffer)
**Files:**
- 产出: `step-*.json` × 7
- [ ] **Step 1: 压源跑 step 阶梯(每场景用对应 spec §5.3 表的 RPS 阶梯)**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
# spec §5.3 每场景独立 RPS 阶梯(每阶 2min)
declare -A SCHEDULES=(
[S1]="2,5,10,15,25,40"
[S2]="20,50,100,200,400,700"
[S3]="5,10,20,40,80,150"
[S4]="5,10,20,30,50,80"
[S5]="2,5,10,20,35,60"
[S6]="20,50,100,200,400,700"
[S7]="5,10,20,40,80,150"
)
for s in S1 S2 S3 S4 S5 S6 S7; do
schedule="${SCHEDULES[$s]}"
echo "=== $s step (RPS=$schedule) ==="
/opt/loadgen/loadgen --cmd=run --scenarios=$s --stage=step \
--step-schedule="$schedule" --duration=2m --inter-scenario-pause=0 \
2>&1 | tee -a reports/step-$s.log
echo "" >> reports/step-$s.log
sleep 900 # 15min buffer
done
EOF
```
预期时间盒(交错执行,总 ~174 min):
- 02:29 S1 阶梯 12min → 02:41-03:41 buffer
- 03:41 S2 阶梯 12min → 03:53-04:53 buffer
- ... 共 7 场景交错,最后场景 05:23 结束 + 12min buffer = 05:35
---
### Task 57: 05:35-05:40 - 停 monitor
- [ ] **Step 1: 停 sample.sh + 收尾**
```bash
ssh root@101.132.250.62 "pkill -f sample.sh || true"
ssh root@101.132.250.62 "ps aux | grep sample.sh | grep -v grep" # 确认无残留
```
- [ ] **Step 2: 拉回 metrics-feed.jsonl**
```bash
scp root@101.132.250.62:/opt/topfans/loadtest/metrics-feed.jsonl docs/loadtest/round1/monitoring/sample-$(date +%Y%m%d-%H%M).log
```
---
### Task 58: 05:40-05:50 - cleanup + verify
- [ ] **Step 1: cleanup**
```bash
ssh root@101.132.250.62 "cd /opt/topfans/loadtest && ./seed --cleanup"
```
- [ ] **Step 2: verify**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> "/opt/loadgen/loadgen --cmd=verify --prod-ssh=root@101.132.250.62 2>&1 | tee /opt/loadgen/verify.log"
```
- [ ] **Step 3: 拉回 verify.log**
```bash
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/verify.log docs/loadtest/round1/monitoring/verify.log
```
- [ ] **Step 4: 拉回 reports/**
```bash
scp -r -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/reports/* docs/loadtest/round1/raw-data/
```
- [ ] **Step 5: 释放压源 ECS(如不再用)**
> 暂不释放 — 第二轮压测还要用。
---
## Phase 10: 报告与决策(Day 3)
### Task 59: 生成 report-round1.md
> 前提:Task 20 的 `runReport` 占位需要先实现(否则 `--cmd=report` 子命令是空函数)。
- [ ] **Step 0: 实现 runReport(在 main.go)**
```go
import (
"github.com/topfans/backend/scripts/loadgen/loadgen/reporter"
)
func runReport(inputDir, output string) error {
// 1. 收集 reports/run-*/ 下的 *.json
var scenarios []reporter.ScenarioReport
matches, _ := filepath.Glob(filepath.Join(inputDir, "run-*", "*.json"))
for _, m := range matches {
data, err := os.ReadFile(m)
if err != nil { continue }
var sr reporter.ScenarioReport
if err := json.Unmarshal(data, &sr); err != nil { continue }
scenarios = append(scenarios, sr)
}
// 2. 生成 baseline.csv
baselinePath := filepath.Join(inputDir, "baseline.csv")
if err := reporter.WriteBaselineCSV(baselinePath, scenarios); err != nil {
return fmt.Errorf("write baseline: %w", err)
}
// 3. 生成 SVG plots(每场景一份)
for _, s := range scenarios {
plotPath := filepath.Join(inputDir, "plot-"+s.ID+".svg")
if err := reporter.PlotRPSLatencyError(s.ID, samplesFromStages(s.Stages), plotPath); err != nil {
log.Printf("plot %s failed: %v", s.ID, err)
}
}
// 4. 生成 markdown 报告
return reporter.GenerateMarkdown(output, scenarios)
}
func samplesFromStages(stages []reporter.StageReport) []reporter.Sample {
out := make([]reporter.Sample, len(stages))
for i, s := range stages {
out[i] = reporter.Sample{RPS: float64(s.RPS), P99Ms: s.P99Ms, ErrorRate: s.ErrorRate}
}
return out
}
```
> 实施时把 `import` 段补全 `encoding/json`, `path/filepath`, `github.com/topfans/backend/scripts/loadgen/loadgen/reporter`。
- [ ] **Step 1: 在压源跑 reporter**
```bash
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
ls -1 reports/
/opt/loadgen/loadgen --cmd=report --input=reports/ --output=report-round1.md
cat report-round1.md | head -100
EOF
```
- [ ] **Step 2: 拉回报告**
```bash
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/report-round1.md docs/loadtest/round1/report-round1.md
```
- [ ] **Step 3: 拉回 SVG 图**
```bash
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/reports/plot-*.svg docs/loadtest/round1/raw-data/
```
---
### Task 60: 审查会议(30-60 min)
- [ ] **Step 1: 阅读 report-round1.md**
打开 `docs/loadtest/round1/report-round1.md`,识别:
- 每场景拐点 RPS
- 主要瓶颈(S1 bcrypt? S5 PG aggregation? S6 Redis cache miss?)
- 哪些可配置修(改 bcrypt cost、max_connections)
- 哪些需改代码(加索引、改 N+1)
- 哪些接受现状
- [ ] **Step 2: 写 review notes**
创建 `docs/loadtest/round1/review-notes.md`:
```markdown
# 第一轮压测 Review
> **日期**: YYYY-MM-DD
> **参与者**: <name1>, <name2>
## 拐点小结
| 场景 | 拐点 RPS | 目标 | 差距 | 行动 |
|---|---|---|---|---|
## 主要瓶颈(按优先级)
1. **<瓶颈 A>**: <场景> P99=Xms, 期望 Yms
2. **<瓶颈 B>**: ...
## 决策
- [ ] 修复项 1: <描述> + owner + deadline
- [ ] 修复项 2: ...
- [ ] 接受现状: <场景 + 原因>
```
---
### Task 61: 决定是否进入第二轮
- [ ] **Step 1: 判断触发条件**
| 条件 | 第二轮? |
|---|---|
| 拐点已满足业务预期 | ❌ 跳过 |
| 改了代码/配置 | ✅ 跑改动影响场景 |
| 出现不可解释的异常 | ✅ 复测 |
- [ ] **Step 2: 写 decision.md**
创建 `docs/loadtest/round1/decision.md`:
```markdown
# 第一轮后决策
> **日期**: YYYY-MM-DD
## 决定
- [✅ 触发第二轮 / ❌ 跳过第二轮]
## 理由
- <原因 1>
- <原因 2>
## 第二轮范围
- 场景: S<X>, S<Y>
- 改动: <改了什么>
- 窗口: 第二轮 02:00-06:00
```
### Task 62: 第二轮前置:JWT 重签(若距离第一轮 > 7 天)
> spec §6.3 C1:JWT 7 天过期,第二轮若距第一轮超过 7 天,需在开压前 30 min 重签。
- [ ] **Step 1: 判断是否需要重签**
```bash
DAYS_SINCE_ROUND1=$(( ( $(date +%s) - $(stat -c %Y docs/loadtest/round1/report-round1.md) ) / 86400 ))
if [ "$DAYS_SINCE_ROUND1" -gt 7 ]; then
echo "⚠️ 距离第一轮已 $DAYS_SINCE_ROUND1 天,需重签 JWT"
fi
```
- [ ] **Step 2: 在 prod 上重签 token**
```bash
ssh root@101.132.250.62 <<'EOF'
cd /opt/topfans/loadtest
export DB_PASSWORD=$(grep DB_PASSWORD /opt/topfans/docker/.env.prod | cut -d= -f2)
export JWT_SECRET=$(grep JWT_SECRET /opt/topfans/docker/.env.prod | cut -d= -f2)
./seed --reset-tokens --jwt-secret="$JWT_SECRET"
EOF
```
- [ ] **Step 3: 同步 users.csv 到压源**
```bash
scp root@101.132.250.62:/opt/topfans/loadtest/users.csv <压源>:/opt/loadgen/users.csv
```
> 第二轮的完整实施计划应新建 `docs/superpowers/plans/YYYY-MM-DD-load-testing-round2.md`(不在本计划范围)。
---
## 完成标准
整个计划**完成**的判据:
- [ ] Phase 1-8 全部 Task 完成
- [ ] Phase 9 第一轮压测窗口跑通,产出 `report-round1.md`
- [ ] Phase 10 review + decision + (可选) Task 62 JWT 重签完成
- [ ] `docs/loadtest/round1/` 目录完整(prod-vs-local-schema-diff.md, preflight.log, sample-*.log, baseline/step 日志, report-round1.md, review-notes.md, decision.md)
- [ ] 所有代码提交,无未提交的 .bak / .log
- [ ] CLAUDE.md 序列重置规范在 seed/sequences.go 中体现
- [ ] spec §7.3 6 维红线(R1-R6)全部 wire 到 CircuitBreaker
- [ ] spec §5.3 RPS 阶梯通过 `--step-schedule` 标志注入(非硬编码)
- [ ] S2-S7 场景在实施前 grep handler 源码确认路径(参考计划中表格)
---
## 风险与回退
| 风险 | 触发条件 | 回退 |
|---|---|---|
| seed 在 prod 失败(权限/schema) | Phase 4 Task 49 联调失败 | 走 dry-run 在本地复现,先修代码再上 prod |
| loadgen 编译失败 | `go build` 任何错误 | 修代码,rebuild,重 scp |
| preflight 某项 fail | Phase 9 Task 51 | 不开压,第二天 review,可能窗口改期 |
| 压测触发红线 | Phase 9 任一阶段 | `emergency-stop.sh`,第二天 review |
| 数据污染真实数据 | verify 检测到 | `restore-from-backup.sh` + review seed cleanup |
| 阿里云快照超时 | Task 52 备份失败 | 重试,仍失败 → 推迟整个窗口 |
---
## 备注
- 所有命令已在 Day 1 23:00 前在本地 docker 联调通过
- SSH 配置 ~/.ssh/ 在开工前确认 loadgen-key 存在
- DB_PASSWORD / JWT_SECRET 来自 prod `/opt/topfans/docker/.env.prod`
- 第二轮如触发,新建 plan 文档 `docs/superpowers/plans/YYYY-MM-DD-load-testing-round2.md`