122 KiB
后端服务压力测试实施计划
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 为部署在阿里云单机(101.132.250.62,4G/2C,docker-compose)的 TopFans 后端微服务落地一套可执行、可恢复、可重复的压力测试方案,覆盖容量评估、SLA 基线、稳定性、破坏性四类目标。
Architecture:
- 工具栈:自研 Go 二进制
loadgen(压源)+seed(数据准备)+ Bash 监控/恢复脚本 - 部署:压源在同地域阿里云 ECS,被压机为 prod
101.132.250.62 - 数据隔离:所有测试数据用
star_id=999900物理隔离,token 预签到users.csv - 监控分层:loadgen 客户端事件 + 主机
sample.sh+ 可选 Prometheus 导出 - 执行窗口:凌晨 02:00-06:00 业务低峰
Tech Stack: Go 1.25.5、net/http、HdrHistogram-go、gonum/plot、golang-migrate、Bash、PostgreSQL 14+、Redis、Prometheus + Grafana、cAdvisor、postgres-exporter、redis-exporter
Spec: docs/superpowers/specs/2026-06-12-load-testing-design.md
文件结构
1. 压测工具(仓库内,部署到压源/被压机)
backend/scripts/loadgen/ # 新增
├── seed/ # 新增:数据准备工具
│ ├── main.go # seed CLI 入口
│ ├── stars.go # 测试明星
│ ├── users.go # 1000 测试用户
│ ├── profiles.go # fan_profiles + 水晶
│ ├── slots_and_exhibits.go # booth_slots + exhibitions
│ ├── assets.go # 5000 资产
│ ├── friendships.go # 双向好友
│ ├── tokens.go # 复用 pkg/jwt 签 token
│ ├── sequences.go # 重置序列(CLAUDE.md 规范)
│ ├── cleanup.go # 测试数据清理
│ ├── seed_test.go # stars/users 单元测试
│ └── README.md
├── loadgen/ # 新增:压测主程序
│ ├── main.go # loadgen CLI 入口
│ ├── lib/
│ │ ├── ramp.go # 阶梯调度器
│ │ ├── ramp_test.go
│ │ ├── circuit.go # 6 维红线判停
│ │ ├── circuit_test.go
│ │ ├── hdr.go # HDR 直方图封装
│ │ ├── hdr_test.go
│ │ ├── csv.go # users.csv 加载
│ │ ├── csv_test.go
│ │ ├── client.go # http.Transport
│ │ ├── client_test.go
│ │ ├── ssh_metrics.go # ssh tail metrics-feed
│ │ └── log.go # stderr 行仪表
│ ├── scenarios/
│ │ ├── s1_login.go
│ │ ├── s2_read.go
│ │ ├── s3_like.go
│ │ ├── s4_mint.go # 含 reset 调度
│ │ ├── s5_dashboard.go
│ │ ├── s6_ranking.go
│ │ ├── s7_place.go
│ │ └── scenarios_test.go # 构造逻辑单测
│ ├── reporter/
│ │ ├── json.go
│ │ ├── csv.go
│ │ ├── plot.go # gonum/plot 三联图
│ │ └── markdown.go
│ ├── preflight.go # 7 项开压前检查
│ ├── verify.go # 压测后完整性校验
│ └── main_test.go
├── monitor/ # 新增
│ ├── sample.sh # 后台采样
│ ├── docker-compose.monitor.yml # Prometheus 栈
│ └── grafana-dashboards/
│ ├── 01-host.json # 整机
│ ├── 02-containers.json # 容器维度
│ ├── 03-postgres.json # PG 健康
│ └── 04-business.json # 业务指标
├── recover/ # 新增
│ ├── emergency-stop.sh
│ └── restore-from-backup.sh
└── reports/ # 新增(放入 .gitignore)
└── .gitkeep
2. 部署到 prod 的脚本
/opt/topfans/loadtest/ # 部署到 prod
├── seed # seed 二进制
├── scripts/
│ └── mint_reset.sh # 铸造数据重置
├── monitor/
│ └── sample.sh
└── recover/
├── emergency-stop.sh
└── restore-from-backup.sh
3. 压源 ECS 上的工具
/opt/loadgen/ # 部署到压源
├── loadgen # loadgen 二进制
├── monitor/
│ └── docker-compose.monitor.yml # 可选
├── grafana-dashboards/ # 可选
└── reports/ # 报告输出
4. 修改/创建的文件
- 修改:
docker/docker-compose.prod.yml—POSTGRES_MAX_CONNECTIONS: 100 → 50 - 修改:
backend/.gitignore— 添加backend/scripts/loadgen/reports/ - 创建:
docs/loadtest/round1/.gitkeep - 创建:
docs/loadtest/round1/prod-vs-local-schema-diff.md(Day 1 上午产出)
实施阶段总览
| 阶段 | 时长 | 任务范围 | 时段 |
|---|---|---|---|
| Phase 1: 环境与数据验证 | 1.5h | Task 1-4 | Day 1 上午 |
| Phase 2: PG max_connections 修复 | 0.5h | Task 5 | Day 1 下午 14:00 |
| Phase 3: 压源 ECS 准备 | 1h | Task 6-7 | Day 1 下午 |
| Phase 4: seed 工具开发 | 3h | Task 8-19 | Day 1 下午-晚上 |
| Phase 5: loadgen 库开发 | 2h | Task 20-29 | Day 1 晚上 |
| Phase 6: loadgen 场景开发 | 2h | Task 30-37 | Day 1 晚上-次日凌晨 |
| Phase 7: 报告 + 监控 + 恢复 | 1.5h | Task 38-46 | 次日凌晨前 |
| Phase 8: 部署 + 预演 | 1.5h | Task 47-50 | Day 1 23:00 |
| Phase 9: 第一轮压测窗口 | 4h | Task 51-58 | Day 2 02:00-06:00 |
| Phase 10: 报告与决策 | 1h | Task 59-61 | Day 3 |
第一轮总投入:约 18 小时(分散在 Day 1-3),实际有效压测窗口 4 小时。
Phase 1: 环境与数据验证(Day 1 上午)
Task 1: SSH 验证 prod 与本地 docker 可达
Files:
-
无文件创建,仅做环境检查
-
Step 1: SSH 到 prod 验证可达
Run:
ssh -o ConnectTimeout=5 root@101.132.250.62 "echo connected && uname -a"
Expected: 输出 connected + Linux 内核信息。
- Step 2: 列出 prod 容器并核对数量
Run:
ssh root@101.132.250.62 "docker ps --format '{{.Names}}' | sort"
Expected: 11 个容器,以 topfans- 为前缀。
- Step 3: 验证本地 docker 可达
Run:
docker ps --format '{{.Names}}' | sort
Expected: 看到 topfans-postgres 等容器(本地可能更多)。
Task 2: Prod 库 schema 调研并产出 diff 报告
Files:
-
创建:
docs/loadtest/round1/prod-vs-local-schema-diff.md -
Step 1: 在 prod 上 dump 关键表结构
Run:
ssh root@101.132.250.62 <<'EOF'
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
for t in users fan_profiles assets asset_likes exhibitions booth_slots stars friendships mint_orders mint_cost_config crystal_transaction_records; do
echo "=== $t ==="
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -c "\d+ $t"
done
EOF
Expected: 每张表的字段、索引、外键、约束。
- Step 2: 在本地做同样 dump 并对比
Run:
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
for t in users fan_profiles assets asset_likes exhibitions booth_slots stars friendships mint_orders mint_cost_config crystal_transaction_records; do
echo "=== $t ==="
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d top-fans -c "\d+ $t"
done
Expected: 本地库为 top-fans(带横线),prod 为 topfans(无横线)。
- Step 3: 写出 diff 报告
创建 docs/loadtest/round1/prod-vs-local-schema-diff.md:
# Prod vs Local Schema Diff
> **日期**: 2026-06-12
> **调研方式**: 远程 `psql \d+` + 本地 `psql \d+` 对比
## 库名差异
- 本地 docker: `top-fans` (带横线)
- prod: `topfans` (无横线)
- → 所有工具的 DSN 必须参数化,seed/loadgen 启动时显式传 `--db-name`
## 字段差异
| 表 | 本地 | prod | 差异 | 影响 |
|---|---|---|---|---|
| (填充实际差异) | | | | |
## 索引差异
| 表 | 索引 | 本地 | prod | 影响 |
|---|---|---|---|---|
## 结论
- [ ] Schema 一致,可开工 / [ ] 存在阻塞差异,需先迁移
关键差异必查项(spec §2.2):
asset_likes.exhibition_id NOT NULL、booth_slots.is_enabled默认 false、fan_profiles是否有experience/revenue_boost_bps、auto_users表存在性。
Task 3: 创建工作目录结构
Files:
-
创建:
backend/scripts/loadgen/全部子目录 -
修改:
backend/.gitignore -
Step 1: 创建目录骨架
Run:
mkdir -p backend/scripts/loadgen/{seed,loadgen/lib,loadgen/scenarios,loadgen/reporter,monitor/grafana-dashboards,recover,reports}
mkdir -p docs/loadtest/round1/{monitoring,raw-data}
touch backend/scripts/loadgen/reports/.gitkeep
touch docs/loadtest/round1/.gitkeep
- Step 2: 在 .gitignore 屏蔽报告目录
打开 backend/.gitignore(若不存在则创建),追加:
# Load test reports (含 token)
backend/scripts/loadgen/reports/
docs/loadtest/round1/monitoring/sample-*.log
docs/loadtest/round1/raw-data/
- Step 3: 提交目录骨架
Run:
git add backend/scripts/loadgen/ docs/loadtest/ backend/.gitignore
git commit -m "feat(loadtest): scaffold seed/loadgen/monitor/recover directories"
Task 4: 创建共享 Go module 配置
Files:
-
修改:
backend/go.mod(确认) -
创建:
backend/scripts/loadgen/go.mod(如果独立 module) -
Step 1: 验证 go.sum 已包含 HdrHistogram + gonum/plot
Run:
grep -E "hdrhistogram-go|gonum" backend/go.sum
Expected: 两项均已存在(HdrHistogram-go v1.1.2、gonum v0.16.0+)。如已存在,直接在主 module 内添加子包。
- Step 2: 在主 module 注册新包
由于
backend/go.mod已是单 module 模式,所有backend/scripts/loadgen/**/*.go会被go build ./...自动包含,不需要单独的 go.mod。
Run 验证:
cd backend && go build ./scripts/loadgen/...
Expected: 当前无文件,build 应无 error(no Go files)。
Phase 2: PG max_connections 修复(Day 1 14:00-14:30)
⚠️ 必须在业务低峰(14:00-14:05)执行,重启 PG 容器有 30s 停机。
Task 5: 应用 §2.1 方案 A(降 max_connections 到 50)
Files:
-
修改:
docker/docker-compose.prod.yml -
Step 1: 备份 docker-compose.prod.yml
Run:
cp docker/docker-compose.prod.yml docker/docker-compose.prod.yml.bak.$(date +%Y%m%d)
- Step 2: 修改 POSTGRES_MAX_CONNECTIONS
打开 docker/docker-compose.prod.yml,定位到 postgres 服务的环境变量:
environment:
POSTGRES_MAX_CONNECTIONS: 100
改为:
environment:
POSTGRES_MAX_CONNECTIONS: 50
- Step 3: 重启 postgres 容器
Run:
cd docker
docker-compose -f docker-compose.prod.yml restart postgres
echo "等待 30s ..."
sleep 30
Expected: 容器在 ~30s 内重启完成。
- Step 4: 验证 max_connections 生效
Run:
ssh root@101.132.250.62 <<'EOF'
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -c "SHOW max_connections;"
EOF
Expected: 输出 50。
- Step 5: 验证 gateway 仍能联通
Run:
curl -s -o /dev/null -w "%{http_code}\n" http://101.132.250.62:8080/health
Expected: 200。
- Step 6: 提交 docker-compose 变更
Run:
git add docker/docker-compose.prod.yml
git commit -m "fix(pg): reduce max_connections 100→50 to fit 400M cgroup limit"
Phase 3: 压源 ECS 准备(Day 1 下午)
Task 6: 阿里云控制台开压源 ECS
Files:
-
无代码,手动操作
-
Step 1: ECS 控制台创建实例
-
区域:华东 1(杭州)同 prod
-
规格:按量付费,4G/2C 通用型(ecs.s6-c1m2.xlarge 或同档)
-
镜像:Ubuntu 22.04 / Alibaba Cloud Linux 3
-
公网带宽:按使用流量 / 5Mbps 固定(评估实际带宽)
-
安全组:出站全开,入站仅 22(SSH)
-
Step 2: 记录压源 IP 与密码
将公网 IP、root 密码、SSH 私钥保存到本地 ~/.ssh/loadgen-key 并设权限 600。
- Step 3: 验证 SSH
Run:
ssh -i ~/.ssh/loadgen-key root@<压源IP> "echo connected && uname -a"
Expected: 成功。
- Step 4: 安装基础工具
Run:
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
apt update && apt install -y curl git build-essential ca-certificates
EOF
Expected: 工具安装成功。
- Step 5: 安装 Go 1.25.5
Run:
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
wget https://go.dev/dl/go1.25.5.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.25.5.linux-amd64.tar.gz
echo 'export PATH=$PATH:/usr/local/go/bin' >> /root/.bashrc
export PATH=$PATH:/usr/local/go/bin
go version
EOF
Expected: go version go1.25.5 linux/amd64。
- Step 6: 验证压源到 prod 延迟
Run:
ssh -i ~/.ssh/loadgen-key root@<压源IP> "ping -c 10 101.132.250.62 | tail -3"
Expected: avg < 10ms (同地域骨干网)。
Task 7: 压源 ECS 部署目录准备
Files:
-
无代码,创建目录
-
Step 1: 创建压源工作目录
Run:
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
mkdir -p /opt/loadgen/{monitor,grafana-dashboards,reports}
echo "/opt/loadgen 目录已创建"
EOF
Phase 4: seed 工具开发(Day 1 下午-晚上)
Task 8: 初始化 seed main.go
Files:
-
创建:
backend/scripts/loadgen/seed/main.go -
Step 1: 写 main.go 骨架
创建 backend/scripts/loadgen/seed/main.go:
package main
import (
"database/sql"
"flag"
"fmt"
"log"
"os"
_ "github.com/lib/pq"
)
const (
LoadtestStarID = int64(999900)
LoadtestUserMin = int64(30000001)
LoadtestUserMax = int64(30001000)
)
type Config struct {
JWTSecret string
DBHost string
DBPort int
DBName string
DBUser string
DBPass string
Reset bool
ResetTok bool
}
func main() {
cleanup := flag.Bool("cleanup", false, "run cleanup (default: keep baseline)")
cleanupFull := flag.Bool("full", false, "with -cleanup: also delete users/stars")
cleanupStarID := flag.Int64("cleanup-star-id", LoadtestStarID, "star_id to clean (safety)")
flag.Parse()
cfg := parseFlags()
if *cleanup {
db, err := openDB(cfg)
if err != nil { log.Fatalf("open db: %v", err) }
defer db.Close()
if err := Cleanup(db, *cleanupStarID, *cleanupFull); err != nil {
log.Fatalf("cleanup: %v", err)
}
log.Println("✅ cleanup done")
return
}
if cfg.ResetTok {
if err := GenerateTokensForLoadtest(cfg); err != nil {
log.Fatalf("generate tokens: %v", err)
}
return
}
db, err := openDB(cfg)
if err != nil {
log.Fatalf("open db: %v", err)
}
defer db.Close()
if cfg.Reset {
log.Println("⚠️ --reset enabled, deleting existing loadtest data first")
if err := Cleanup(db, LoadtestStarID, false); err != nil {
log.Fatalf("reset: %v", err)
}
}
if err := runSeed(db, cfg); err != nil {
log.Fatalf("seed failed: %v", err)
}
if err := GenerateTokensForLoadtest(cfg); err != nil {
log.Fatalf("generate tokens: %v", err)
}
log.Println("✅ seed + tokens completed")
}
func parseFlags() *Config {
cfg := &Config{}
flag.StringVar(&cfg.JWTSecret, "jwt-secret", os.Getenv("JWT_SECRET"), "JWT secret (或 $JWT_SECRET)")
flag.StringVar(&cfg.DBHost, "db-host", "localhost", "PG host")
flag.IntVar(&cfg.DBPort, "db-port", 5432, "PG port")
flag.StringVar(&cfg.DBName, "db-name", "topfans", "PG database name (本地为 'top-fans' 带横线)")
flag.StringVar(&cfg.DBUser, "db-user", "postgres", "PG user")
flag.StringVar(&cfg.DBPass, "db-password", os.Getenv("DB_PASSWORD"), "PG password (或 $DB_PASSWORD)")
flag.BoolVar(&cfg.Reset, "reset", false, "delete existing test data before seed")
flag.BoolVar(&cfg.ResetTok, "reset-tokens", false, "only re-sign tokens, don't touch data")
flag.Parse()
return cfg
}
func openDB(cfg *Config) (*sql.DB, error) {
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
cfg.DBHost, cfg.DBPort, cfg.DBUser, cfg.DBPass, cfg.DBName)
return sql.Open("postgres", dsn)
}
func runSeed(db *sql.DB, cfg *Config) error {
if err := SeedStars(db); err != nil { return fmt.Errorf("seed stars: %w", err) }
log.Println("✓ stars seeded")
if err := SeedUsers(db); err != nil { return fmt.Errorf("seed users: %w", err) }
log.Println("✓ 1000 users seeded")
if err := SeedProfiles(db); err != nil { return fmt.Errorf("seed profiles: %w", err) }
log.Println("✓ 1000 fan_profiles + crystal seeded")
if err := SeedAssets(db); err != nil { return fmt.Errorf("seed assets: %w", err) }
log.Println("✓ 5000 assets seeded")
if err := SeedSlotsAndExhibits(db); err != nil { return fmt.Errorf("seed slots+exhibits: %w", err) }
log.Println("✓ 3000 booth_slots + 2000 exhibitions seeded")
if err := SeedFriendships(db); err != nil { return fmt.Errorf("seed friendships: %w", err) }
log.Println("✓ 10000 friendships seeded")
if err := ResetSequences(db); err != nil { return fmt.Errorf("reset sequences: %w", err) }
log.Println("✓ sequences reset")
return nil
}
- Step 2: 验证编译
Run:
cd backend && go build ./scripts/loadgen/seed/
Expected: 编译成功(无输出)。
- Step 3: 提交骨架
Run:
git add backend/scripts/loadgen/seed/main.go
git commit -m "feat(seed): main.go skeleton with config and DB open"
Task 9: stars.go - 测试明星
Files:
-
创建:
backend/scripts/loadgen/seed/stars.go -
Step 1: 写 SeedStars 函数
创建 backend/scripts/loadgen/seed/stars.go:
package main
import (
"database/sql"
"time"
)
func SeedStars(db *sql.DB) error {
ts := time.Now().UnixMilli()
_, err := db.Exec(`
INSERT INTO stars (star_id, name, identity_id, is_active, created_at, updated_at)
VALUES ($1, 'loadtest_star', 'loadtest_star', true, $2, $2)
ON CONFLICT (star_id) DO NOTHING
`, LoadtestStarID, ts)
return err
}
- Step 2: 在 main.go 调用
在 main.go 的 runSeed 中追加:
func runSeed(db *sql.DB, cfg *Config) error {
if err := SeedStars(db); err != nil {
return fmt.Errorf("seed stars: %w", err)
}
log.Println("✓ stars seeded")
return nil
}
- Step 3: 验证编译
Run:
cd backend && go build ./scripts/loadgen/seed/
Expected: 成功。
- Step 4: 提交
Run:
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert loadtest star (star_id=999900)"
Task 10: users.go - 1000 测试用户
Files:
-
创建:
backend/scripts/loadgen/seed/users.go -
Step 1: 离线生成 bcrypt hash
Run:
cd backend && cat > /tmp/gen_bcrypt.go <<'EOF'
package main
import (
"fmt"
"golang.org/x/crypto/bcrypt"
)
func main() {
h, _ := bcrypt.GenerateFromPassword([]byte("Test@123"), 10)
fmt.Println(string(h))
}
EOF
go run /tmp/gen_bcrypt.go > /tmp/bcrypt_hash.txt
cat /tmp/bcrypt_hash.txt
Expected: $2a$10$... 一行。
- Step 2: 写 SeedUsers 函数
创建 backend/scripts/loadgen/seed/users.go:
package main
import (
"database/sql"
"fmt"
"os"
"time"
)
const bcryptHashFile = "/tmp/loadtest_bcrypt.txt"
func SeedUsers(db *sql.DB) error {
hash, err := os.ReadFile(bcryptHashFile)
if err != nil {
return fmt.Errorf("read bcrypt hash file %s: %w (run Task 10.1 first)", bcryptHashFile)
}
ts := time.Now().UnixMilli()
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO users (id, mobile, password_hash, is_active, created_at, updated_at)
VALUES ($1, $2, $3, true, $4, $4)
ON CONFLICT (id) DO NOTHING
`)
if err != nil {
return err
}
defer stmt.Close()
for uid := LoadtestUserMin; uid <= LoadtestUserMax; uid++ {
mobile := fmt.Sprintf("199%08d", uid-LoadtestUserMin+1)
if _, err := stmt.Exec(uid, mobile, string(hash), ts); err != nil {
return fmt.Errorf("insert user %d: %w", uid, err)
}
}
return tx.Commit()
}
注:把 Step 1 生成的 hash 复制到
backend/scripts/loadgen/seed/loadtest_bcrypt.txt(commit 时.gitignore需保留此文件,因为需要复用,但只包含一个公开测试密码的 hash)。
- Step 3: 提交 hash 文件
# 在 backend/scripts/loadgen/seed/loadtest_bcrypt.txt 中粘贴 hash(单行)
git add backend/scripts/loadgen/seed/loadtest_bcrypt.txt
git commit -m "chore(seed): store loadtest bcrypt hash for 'Test@123'"
风险:此 hash 暴露了测试密码。spec 已接受("测试数据,泄露影响有限")。可考虑编译期注入
-ldflags替代以避免文件存在。
- Step 4: 在 main.go 调用
func runSeed(db *sql.DB, cfg *Config) error {
if err := SeedStars(db); err != nil { return fmt.Errorf("seed stars: %w", err) }
log.Println("✓ stars seeded")
if err := SeedUsers(db); err != nil { return fmt.Errorf("seed users: %w", err) }
log.Println("✓ 1000 users seeded")
return nil
}
- Step 5: 验证编译并提交
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert 1000 loadtest users (id 30000001-30001000)"
Task 11: profiles.go - fan_profiles + 水晶充值
Files:
-
创建:
backend/scripts/loadgen/seed/profiles.go -
Step 1: 写 SeedProfiles
创建 backend/scripts/loadgen/seed/profiles.go:
package main
import (
"database/sql"
"fmt"
"time"
)
func SeedProfiles(db *sql.DB) error {
ts := time.Now().UnixMilli()
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO fan_profiles (user_id, star_id, nickname, crystal_balance, slot_limit, is_active, created_at, updated_at)
VALUES ($1, $2, $3, 2200, 3, true, $4, $4)
ON CONFLICT (user_id, star_id) DO NOTHING
`)
if err != nil { return err }
defer stmt.Close()
for uid := LoadtestUserMin; uid <= LoadtestUserMax; uid++ {
nick := fmt.Sprintf("loadtest_%d", uid-LoadtestUserMin+1)
if _, err := stmt.Exec(uid, LoadtestStarID, nick, ts); err != nil {
return fmt.Errorf("insert profile %d: %w", uid, err)
}
}
return tx.Commit()
}
- Step 2: 在 main.go 调用并提交
// runSeed 追加
if err := SeedProfiles(db); err != nil { return fmt.Errorf("seed profiles: %w", err) }
log.Println("✓ 1000 fan_profiles + crystal seeded")
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/profiles.go backend/scripts/loadgen/seed/main.go
git commit -m "feat(seed): insert fan_profiles with 2200 crystal each"
Task 12: assets.go - 5000 资产
Files:
-
创建:
backend/scripts/loadgen/seed/assets.go -
Step 1: 写 SeedAssets
创建 backend/scripts/loadgen/seed/assets.go:
package main
import (
"database/sql"
"fmt"
"time"
)
const (
LoadtestPlaceholderURL = "<OSS_URL>/loadtest-placeholder.png" // TODO: Task 49 上传 OSS 后替换
AssetsPerUser = 5
)
func SeedAssets(db *sql.DB) error {
ts := time.Now().UnixMilli()
var maxID int64
if err := db.QueryRow("SELECT COALESCE(MAX(id), 0) FROM assets").Scan(&maxID); err != nil {
return err
}
startID := maxID + 1000
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO assets (id, owner_uid, star_id, name, cover_url, info, status, like_count, is_active, created_at, updated_at, grade)
VALUES ($1, $2, $3, $4, $5, 'loadtest', 1, 0, true, $6, $6, 1)
ON CONFLICT (id) DO NOTHING
`)
if err != nil { return err }
defer stmt.Close()
n := int64(0)
for uid := LoadtestUserMin; uid <= LoadtestUserMax; uid++ {
for i := 1; i <= AssetsPerUser; i++ {
aid := startID + n
name := fmt.Sprintf("loadtest_asset_%d_%d", uid, i)
if _, err := stmt.Exec(aid, uid, LoadtestStarID, name, LoadtestPlaceholderURL, ts); err != nil {
return fmt.Errorf("insert asset %d: %w", aid, err)
}
n++
}
}
return tx.Commit()
}
- Step 2: 调用并提交
// runSeed 追加
if err := SeedAssets(db); err != nil { return fmt.Errorf("seed assets: %w", err) }
log.Println("✓ 5000 assets seeded")
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert 5000 loadtest assets (5 per user)"
Task 13: slots_and_exhibits.go - booth_slots + exhibitions
Files:
-
创建:
backend/scripts/loadgen/seed/slots_and_exhibits.go -
Step 1: 写 SeedSlotsAndExhibits
创建 backend/scripts/loadgen/seed/slots_and_exhibits.go:
package main
import (
"database/sql"
"fmt"
"time"
)
func SeedSlotsAndExhibits(db *sql.DB) error {
ts := time.Now().UnixMilli()
expire := ts + 4*3600*1000 // 4 小时
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
// 1. booth_slots: 每 user 3 个, is_enabled=true
slotStmt, err := tx.Prepare(`
INSERT INTO booth_slots (host_profile_id, user_id, star_id, slot_index, is_enabled, created_at, updated_at)
SELECT fp.id, fp.user_id, fp.star_id, idx, true, $1, $1
FROM fan_profiles fp
CROSS JOIN (VALUES (1),(2),(3)) AS s(slot_index)
WHERE fp.star_id = $2
ON CONFLICT DO NOTHING
`)
if err != nil { return err }
defer slotStmt.Close()
if _, err := slotStmt.Exec(ts, LoadtestStarID); err != nil {
return fmt.Errorf("insert slots: %w", err)
}
// 2. exhibitions: 把每个 user 的前 2 个 asset 上架到 slot 1, 2
exStmt, err := tx.Prepare(`
INSERT INTO exhibitions (asset_id, slot_id, host_profile_id, occupier_uid, occupier_star_id, start_time, expire_at, created_at, updated_at)
SELECT a.id, bs.slot_id, fp.id, a.owner_uid, $1, $2, $3, $2, $2
FROM assets a
JOIN fan_profiles fp ON a.owner_uid = fp.user_id AND fp.star_id = $1
JOIN booth_slots bs ON bs.host_profile_id = fp.id AND bs.slot_index = $4
WHERE a.star_id = $1 AND a.name LIKE 'loadtest_asset_%' AND a.name LIKE '%_1' OR a.name LIKE '%_2'
ON CONFLICT DO NOTHING
`)
if err != nil { return err }
defer exStmt.Close()
// asset 1 → slot 1
if _, err := exStmt.Exec(LoadtestStarID, ts, expire, 1); err != nil {
return fmt.Errorf("insert exhibits slot1: %w", err)
}
// asset 2 → slot 2
if _, err := exStmt.Exec(LoadtestStarID, ts, expire, 2); err != nil {
return fmt.Errorf("insert exhibits slot2: %w", err)
}
return tx.Commit()
}
⚠️ OR 优先级坑:
a.name LIKE '%_1' OR a.name LIKE '%_2'会被 AND 拆解错误。修正为:AND (a.name LIKE '%_1' OR a.name LIKE '%_2')把括号加到
Prepare字符串里。
- Step 2: 调用并提交
// runSeed 追加
if err := SeedSlotsAndExhibits(db); err != nil { return fmt.Errorf("seed slots+exhibits: %w", err) }
log.Println("✓ 3000 booth_slots + 2000 exhibitions seeded")
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert booth_slots (3/user) + exhibitions (2/user)"
Task 14: friendships.go - 双向好友
Files:
-
创建:
backend/scripts/loadgen/seed/friendships.go -
Step 1: 写 SeedFriendships
创建 backend/scripts/loadgen/seed/friendships.go:
package main
import (
"database/sql"
"time"
)
func SeedFriendships(db *sql.DB) error {
ts := time.Now().UnixMilli()
_, err := db.Exec(`
INSERT INTO friendships (user_id, friend_id, star_id, status, intimacy, created_at, updated_at)
SELECT a.id, b.id, $1, 'accepted', 0, $2, $2
FROM users a, users b
WHERE a.id BETWEEN $3 AND $4
AND b.id BETWEEN $3 AND $4
AND a.id != b.id
AND ((a.id - $3) + 1) % 10 = ((b.id - $3) % 10)
ON CONFLICT (user_id, friend_id, star_id) DO NOTHING
`, LoadtestStarID, ts, LoadtestUserMin, LoadtestUserMax)
return err
}
说明:JOIN 条件
(a-30000000+1) % 10 = (b-30000000) % 10约生成 10000 行双向关系。
- Step 2: 调用并提交
// runSeed 追加
if err := SeedFriendships(db); err != nil { return fmt.Errorf("seed friendships: %w", err) }
log.Println("✓ 10000 friendships seeded")
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): insert 10000 loadtest friendships (bidirectional, ~10/user)"
Task 15: tokens.go - 预签 JWT
Files:
-
创建:
backend/scripts/loadgen/seed/tokens.go -
Step 1: 写 GenerateTokensForLoadtest
创建 backend/scripts/loadgen/seed/tokens.go:
package main
import (
"database/sql"
"encoding/csv"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/topfans/backend/pkg/jwt"
)
type TestUser struct {
UserID int64
Mobile string
AssetIDs []int64
ExhibitionIDs []int64
}
func GenerateTokensForLoadtest(cfg *Config) error {
jwt.SetSecret(cfg.JWTSecret)
// 1. 从 PG 读取每个 user 的 asset_ids / exhibition_ids
db, err := openDB(cfg)
if err != nil { return err }
defer db.Close()
rows, err := db.Query(`
SELECT u.id, u.mobile,
COALESCE(array_agg(DISTINCT a.id) FILTER (WHERE a.id IS NOT NULL), '{}'),
COALESCE(array_agg(DISTINCT e.id) FILTER (WHERE e.id IS NOT NULL), '{}')
FROM users u
LEFT JOIN assets a ON a.owner_uid = u.id AND a.star_id = $1
LEFT JOIN fan_profiles fp ON fp.user_id = u.id AND fp.star_id = $1
LEFT JOIN booth_slots bs ON bs.host_profile_id = fp.id AND bs.slot_index IN (1, 2)
LEFT JOIN exhibitions e ON e.slot_id = bs.slot_id AND e.occupier_star_id = $1
WHERE u.id BETWEEN $2 AND $3
GROUP BY u.id, u.mobile
ORDER BY u.id
`, LoadtestStarID, LoadtestUserMin, LoadtestUserMax)
if err != nil { return err }
defer rows.Close()
var users []TestUser
for rows.Next() {
var u TestUser
if err := rows.Scan(&u.UserID, &u.Mobile, &u.AssetIDs, &u.ExhibitionIDs); err != nil {
return err
}
users = append(users, u)
}
if err := rows.Err(); err != nil { return err }
// 2. 签 token, 写 users.csv
f, err := os.Create("users.csv")
if err != nil { return err }
defer f.Close()
w := csv.NewWriter(f)
defer w.Flush()
if err := w.Write([]string{"phone", "password", "user_id", "star_id", "jwt_token", "asset_ids", "exhibition_ids"}); err != nil {
return err
}
now := time.Now().UnixMilli()
for _, u := range users {
token, err := jwt.GenerateToken(u.UserID, LoadtestStarID, now)
if err != nil { return err }
if err := w.Write([]string{
u.Mobile, "Test@123",
strconv.FormatInt(u.UserID, 10),
"999900", token,
joinInt64(u.AssetIDs),
joinInt64(u.ExhibitionIDs),
}); err != nil {
return err
}
}
fmt.Printf("✅ users.csv written: %d rows\n", len(users))
return nil
}
func joinInt64(s []int64) string {
parts := make([]string, len(s))
for i, v := range s { parts[i] = strconv.FormatInt(v, 10) }
return strings.Join(parts, ";")
}
- Step 2: 调用并提交
// main.go: 新增 "tokens" 子命令
func main() {
cfg := parseFlags()
if cfg.ResetTok {
if err := GenerateTokensForLoadtest(cfg); err != nil {
log.Fatalf("generate tokens: %v", err)
}
return
}
// ... existing seed flow
if err := runSeed(db, cfg); err != nil { log.Fatalf("seed failed: %v", err) }
if err := GenerateTokensForLoadtest(cfg); err != nil { log.Fatalf("generate tokens: %v", err) }
log.Println("✅ seed + tokens completed")
}
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): pre-sign JWT for 1000 users, write users.csv"
Task 16: sequences.go - 序列重置(CLAUDE.md 规范)
Files:
-
创建:
backend/scripts/loadgen/seed/sequences.go -
Step 1: 写 ResetSequences
创建 backend/scripts/loadgen/seed/sequences.go:
package main
import (
"database/sql"
"fmt"
)
var loadtestTables = []string{
"users",
"fan_profiles",
"assets",
"booth_slots",
"exhibitions",
"stars",
"asset_likes",
"friendships",
"crystal_transaction_records",
"mint_orders", // UUID, 但 setval 不会报错
}
func ResetSequences(db *sql.DB) error {
for _, tbl := range loadtestTables {
seq := fmt.Sprintf("%s_id_seq", tbl)
var maxID sql.NullInt64
if err := db.QueryRow(fmt.Sprintf("SELECT MAX(id) FROM %s", tbl)).Scan(&maxID); err != nil {
// 表可能不存在(部分为可选),记录并继续
fmt.Printf(" skip %s: %v\n", tbl, err)
continue
}
if !maxID.Valid { continue }
if _, err := db.Exec(fmt.Sprintf("SELECT setval('%s', $1)", seq), maxID.Int64); err != nil {
return fmt.Errorf("setval %s: %w", seq, err)
}
fmt.Printf(" ✓ %s → %d\n", seq, maxID.Int64)
}
return nil
}
⚠️
booth_slots主键是slot_id而非id,重命名为booth_slots_slot_id_seq。同理检查其他表:如asset_likes主键名。
- Step 2: 修正表名映射
修改 loadtestTables:
var loadtestSeqs = map[string]string{
"users": "users_id_seq",
"fan_profiles": "fan_profiles_id_seq",
"assets": "assets_id_seq",
"booth_slots": "booth_slots_slot_id_seq", // 主键是 slot_id
"exhibitions": "exhibitions_id_seq",
"stars": "stars_star_id_seq", // 主键是 star_id(不是 id!)
"asset_likes": "asset_likes_id_seq",
"friendships": "friendships_id_seq",
"crystal_transaction_records": "crystal_transaction_records_id_seq",
// mint_orders 主键是 UUID, 不需要 setval
}
// pkColumns 把表名映射到其主键列名(stars 的主键是 star_id 而非 id)
var pkColumns = map[string]string{
"users": "id",
"fan_profiles": "id",
"assets": "id",
"booth_slots": "slot_id",
"exhibitions": "id",
"stars": "star_id", // 关键差异
"asset_likes": "id",
"friendships": "id",
"crystal_transaction_records": "id",
}
func ResetSequences(db *sql.DB) error {
for tbl, seq := range loadtestSeqs {
pk := pkColumns[tbl]
if pk == "" { pk = "id" }
var maxID sql.NullInt64
if err := db.QueryRow(fmt.Sprintf("SELECT MAX(%s) FROM %s", pk, tbl)).Scan(&maxID); err != nil {
fmt.Printf(" skip %s: %v\n", tbl, err)
continue
}
if !maxID.Valid { continue }
if _, err := db.Exec(fmt.Sprintf("SELECT setval('%s', $1)", seq), maxID.Int64); err != nil {
return fmt.Errorf("setval %s: %w", seq, err)
}
fmt.Printf(" ✓ %s → %d\n", seq, maxID.Int64)
}
return nil
}
- Step 3: 调用并提交
// runSeed 末尾追加
if err := ResetSequences(db); err != nil { return fmt.Errorf("reset sequences: %w", err) }
log.Println("✓ sequences reset")
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): reset PG sequences (CLAUDE.md compliance)"
Task 17: cleanup.go - 数据清理
Files:
-
创建:
backend/scripts/loadgen/seed/cleanup.go -
Step 1: 写 Cleanup
创建 backend/scripts/loadgen/seed/cleanup.go:
package main
import (
"database/sql"
"errors"
"fmt"
)
func Cleanup(db *sql.DB, starID int64, full bool) error {
if starID != LoadtestStarID {
return errors.New("safety: cleanup only accepts loadtest star_id 999900")
}
queries := []string{
"DELETE FROM asset_likes WHERE star_id = $1",
"DELETE FROM exhibitions USING fan_profiles fp WHERE exhibitions.host_profile_id = fp.id AND fp.star_id = $1",
"DELETE FROM booth_slots WHERE star_id = $1",
"DELETE FROM mint_orders WHERE star_id = $1",
"DELETE FROM crystal_transaction_records WHERE star_id = $1",
"DELETE FROM friendships WHERE star_id = $1",
"DELETE FROM assets WHERE star_id = $1",
"DELETE FROM fan_profiles WHERE star_id = $1",
}
if full {
queries = append(queries,
"DELETE FROM users WHERE id BETWEEN $2 AND $3",
"DELETE FROM stars WHERE star_id = $1",
)
}
for _, q := range queries {
if _, err := db.Exec(q, starID, LoadtestUserMin, LoadtestUserMax); err != nil {
return fmt.Errorf("cleanup %q: %w", q[:30], err)
}
}
return ResetSequences(db)
}
- Step 2: 添加 cleanup 子命令
修改 main.go:
func main() {
cleanup := flag.Bool("cleanup", false, "run cleanup (default: keep baseline)")
cleanupFull := flag.Bool("full", false, "with -cleanup: also delete users/stars")
cleanupStarID := flag.Int64("cleanup-star-id", LoadtestStarID, "star_id to clean (safety)")
flag.Parse()
cfg := parseFlags()
if *cleanup {
db, _ := openDB(cfg)
defer db.Close()
if err := Cleanup(db, *cleanupStarID, *cleanupFull); err != nil {
log.Fatalf("cleanup: %v", err)
}
log.Println("✅ cleanup done")
return
}
// ... existing seed flow
}
- Step 3: 验证编译并提交
cd backend && go build ./scripts/loadgen/seed/
git add backend/scripts/loadgen/seed/
git commit -m "feat(seed): cleanup with --keep-baseline / --full + safety check"
Task 18: seed 单元测试
Files:
-
创建:
backend/scripts/loadgen/seed/seed_test.go -
Step 1: 写 mobile 编号函数测试
package main
import "testing"
func TestMobileNumbering(t *testing.T) {
cases := []struct {
uid int64
want string
}{
{30000001, "19900000001"},
{30000050, "19900000050"},
{30001000, "19900001000"},
}
for _, c := range cases {
got := formatMobile(c.uid)
if got != c.want {
t.Errorf("formatMobile(%d) = %q, want %q", c.uid, got, c.want)
}
}
}
func formatMobile(uid int64) string {
return fmt.Sprintf("199%08d", uid-LoadtestUserMin+1)
}
- Step 2: 写 sequence 映射测试
func TestSequenceMapping(t *testing.T) {
cases := map[string]string{
"users": "users_id_seq",
"stars": "stars_star_id_seq",
"booth_slots": "booth_slots_slot_id_seq",
}
for tbl, want := range cases {
got, ok := loadtestSeqs[tbl]
if !ok {
t.Errorf("table %s not in loadtestSeqs", tbl)
continue
}
if got != want {
t.Errorf("seq for %s = %q, want %q", tbl, got, want)
}
}
}
- Step 3: 写 cleanup safety 测试
func TestCleanupRejectsInvalidStarID(t *testing.T) {
db, _ := sql.Open("postgres", "host=localhost sslmode=disable")
defer db.Close()
err := Cleanup(db, 87, false) // 真实业务 star_id
if err == nil {
t.Fatal("expected error for non-loadtest star_id, got nil")
}
}
- Step 4: 运行测试
cd backend && go test ./scripts/loadgen/seed/ -v
Expected: 3 tests pass。
- Step 5: 提交
git add backend/scripts/loadgen/seed/seed_test.go
git commit -m "test(seed): unit tests for mobile format, seq mapping, cleanup safety"
Task 19: seed README + 部署脚本
Files:
-
创建:
backend/scripts/loadgen/seed/README.md -
Step 1: 写 README
# seed - 压测数据准备工具
## 用途
在 prod 本地插入 1000 个测试用户、5000 资产、3000 booth_slots、2000 exhibitions、10000 friendships,签 1000 个 JWT,写 `users.csv`。
## 编译
```bash
cd backend && go build -o seed ./scripts/loadgen/seed/
在 prod 上跑
# 1. 上传二进制
scp seed root@101.132.250.62:/opt/topfans/loadtest/
# 2. SSH 上去跑
ssh root@101.132.250.62
cd /opt/topfans/loadtest
export DB_PASSWORD=$(cat /opt/topfans/docker/.env.prod | grep DB_PASSWORD | cut -d= -f2)
export JWT_SECRET=$(cat /opt/topfans/docker/.env.prod | grep JWT_SECRET | cut -d= -f2)
./seed --db-name=topfans --jwt-secret="$JWT_SECRET"
清理
# 保留 1000 users + 资产(下次复用)
./seed --cleanup
# 全删
./seed --cleanup --full
# 只重签 token(第二轮压测 JWT 过期时)
./seed --reset-tokens --jwt-secret="$JWT_SECRET"
- [ ] **Step 2: 提交**
```bash
git add backend/scripts/loadgen/seed/README.md
git commit -m "docs(seed): README with build/deploy/cleanup usage"
Phase 5: loadgen 库开发(Day 1 晚上)
Task 20: loadgen main.go 骨架
Files:
-
创建:
backend/scripts/loadgen/loadgen/main.go -
Step 1: 写 main.go 骨架
package main
import (
"flag"
"log"
)
func main() {
var (
scenarios = flag.String("scenarios", "", "comma-separated, e.g. S1,S2,S3")
stage = flag.String("stage", "step", "baseline|step|soak|stress")
rps = flag.Int("rps", 0, "single-RPS mode (overrides stage)")
vus = flag.Int("vus", 0, "max concurrent virtual users (default: auto)")
duration = flag.Duration("duration", 0, "single stage duration (default per §5.3)")
interPause = flag.Duration("inter-scenario-pause", 15*60*1_000_000_000, "pause between scenarios")
monitor = flag.String("monitor", "lite", "off|lite|full")
prodSSH = flag.String("prod-ssh", "", "user@host for ssh metrics")
target = flag.String("target", "http://101.132.250.62:8080", "target gateway URL")
cmd = flag.String("cmd", "run", "run|preflight|verify|report")
inputDir = flag.String("input", "", "for cmd=report: input dir")
outputFile = flag.String("output", "./report.md", "for cmd=report: output file")
)
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Printf("loadgen starting: cmd=%s scenarios=%s", *cmd, *scenarios)
switch *cmd {
case "run":
if err := runLoadgen(*target, *scenarios, *stage, *rps, *vus, *duration, *interPause, *monitor, *prodSSH); err != nil {
log.Fatalf("run failed: %v", err)
}
case "preflight":
if err := runPreflight(*target, *prodSSH); err != nil { log.Fatalf("preflight: %v", err) }
case "verify":
if err := runVerify(*prodSSH); err != nil { log.Fatalf("verify: %v", err) }
case "report":
if err := runReport(*inputDir, *outputFile); err != nil { log.Fatalf("report: %v", err) }
default:
log.Fatalf("unknown cmd: %s", *cmd)
}
}
⚠️ 注意
15*60*1_000_000_000是 15 分钟纳秒(Go 1.21+ 用time.Minute*15)。
- Step 2: 修正 duration 字面量
import "time"
interPause = flag.Duration("inter-scenario-pause", 15*time.Minute, "pause between scenarios")
- Step 3: 添加占位 runLoadgen 等
func runLoadgen(target, scenarios, stage string, rps, vus int, duration, interPause time.Duration, monitor, prodSSH string) error {
// TODO: 在后续 Task 填充
return nil
}
func runPreflight(target, prodSSH string) error { return nil }
func runVerify(prodSSH string) error { return nil }
func runReport(input, output string) error { return nil }
- Step 4: 验证编译并提交
cd backend && go build ./scripts/loadgen/loadgen/
git add backend/scripts/loadgen/loadgen/
git commit -m "feat(loadgen): main.go skeleton with run/preflight/verify/report subcommands"
Task 21: lib/csv.go - users.csv 加载
Files:
-
创建:
backend/scripts/loadgen/loadgen/lib/csv.go -
创建:
backend/scripts/loadgen/loadgen/lib/csv_test.go -
Step 1: 写 Loader
package lib
import (
"encoding/csv"
"io"
"os"
"strconv"
"strings"
)
type TestUser struct {
Phone string
Password string
UserID int64
StarID int64
JWTToken string
AssetIDs []int64
ExhibitionIDs []int64
}
func LoadUsers(path string) ([]TestUser, error) {
f, err := os.Open(path)
if err != nil { return nil, err }
defer f.Close()
r := csv.NewReader(f)
r.FieldsPerRecord = -1
header, err := r.Read()
if err != nil { return nil, err }
if len(header) < 5 || header[0] != "phone" {
return nil, io.ErrUnexpectedEOF
}
var users []TestUser
for {
row, err := r.Read()
if err == io.EOF { break }
if err != nil { return nil, err }
if len(row) < 7 { continue }
uid, _ := strconv.ParseInt(row[2], 10, 64)
sid, _ := strconv.ParseInt(row[3], 10, 64)
users = append(users, TestUser{
Phone: row[0],
Password: row[1],
UserID: uid,
StarID: sid,
JWTToken: row[4],
AssetIDs: parseIDs(row[5]),
ExhibitionIDs: parseIDs(row[6]),
})
}
return users, nil
}
func parseIDs(s string) []int64 {
if s == "" { return nil }
parts := strings.Split(s, ";")
out := make([]int64, 0, len(parts))
for _, p := range parts {
v, err := strconv.ParseInt(p, 10, 64)
if err != nil { continue }
out = append(out, v)
}
return out
}
- Step 2: 写测试
package lib
import (
"os"
"path/filepath"
"testing"
)
func TestLoadUsers(t *testing.T) {
dir := t.TempDir()
p := filepath.Join(dir, "users.csv")
content := `phone,password,user_id,star_id,jwt_token,asset_ids,exhibition_ids
19900000001,Test@123,30000001,999900,token1,1000;1001;1002,5000;5001
19900000002,Test@123,30000002,999900,token2,1003,5002
`
if err := os.WriteFile(p, []byte(content), 0644); err != nil { t.Fatal(err) }
users, err := LoadUsers(p)
if err != nil { t.Fatal(err) }
if len(users) != 2 { t.Fatalf("want 2 users, got %d", len(users)) }
u := users[0]
if u.UserID != 30000001 { t.Errorf("uid=%d", u.UserID) }
if len(u.AssetIDs) != 3 { t.Errorf("assets=%v", u.AssetIDs) }
if u.AssetIDs[0] != 1000 { t.Errorf("first asset=%d", u.AssetIDs[0]) }
}
- Step 3: 运行测试
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestLoadUsers
Expected: PASS。
- Step 4: 提交
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): load users.csv into memory with parseIDs helper"
Task 22: lib/client.go - http.Transport 封装
Files:
-
创建:
backend/scripts/loadgen/loadgen/lib/client.go -
Step 1: 写 Client
package lib
import (
"net"
"net/http"
"time"
)
func NewHTTPClient(target string) *http.Client {
dialer := &net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}
transport := &http.Transport{
DialContext: dialer.DialContext,
MaxConnsPerHost: 500,
MaxIdleConns: 500,
MaxIdleConnsPerHost: 200,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
}
return &http.Client{
Transport: transport,
Timeout: 15 * time.Second,
}
}
- Step 2: 提交
cd backend && go build ./scripts/loadgen/loadgen/lib/
git add backend/scripts/loadgen/loadgen/lib/client.go
git commit -m "feat(loadgen): HTTP transport with MaxConnsPerHost=500, keep-alive"
Task 23: lib/hdr.go - HDR 直方图封装
Files:
-
创建:
backend/scripts/loadgen/loadgen/lib/hdr.go -
创建:
backend/scripts/loadgen/loadgen/lib/hdr_test.go -
Step 1: 写 LatencyRecorder
package lib
import (
"sync"
"github.com/HdrHistogram/hdrhistogram-go"
)
type LatencyRecorder struct {
mu sync.Mutex
h *hdrhistogram.Histogram
}
func NewLatencyRecorder() *LatencyRecorder {
// 0-30s, 3 位有效精度 (~0.1% 误差)
return &LatencyRecorder{
h: hdrhistogram.New(1, 30_000_000, 3),
}
}
func (r *LatencyRecorder) Record(latencyUs int64) {
r.mu.Lock()
defer r.mu.Unlock()
if latencyUs < 1 { latencyUs = 1 }
_ = r.h.RecordValue(latencyUs)
}
func (r *LatencyRecorder) Snapshot() *hdrhistogram.Histogram {
r.mu.Lock()
defer r.mu.Unlock()
return r.h.Copy()
}
- Step 2: 写测试
package lib
import "testing"
func TestLatencyRecorder(t *testing.T) {
r := NewLatencyRecorder()
for i := 0; i < 100; i++ {
r.Record(int64(10_000 + i*100)) // 10ms-20ms
}
snap := r.Snapshot()
p50 := snap.ValueAtQuantile(50.0)
if p50 < 10_000 || p50 > 20_000 {
t.Errorf("p50 out of range: %d", p50)
}
}
- Step 3: 运行 + 提交
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestLatencyRecorder
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): HDR histogram wrapper for latency recording"
Task 24: lib/log.go - stderr 行仪表
Files:
-
创建:
backend/scripts/loadgen/loadgen/lib/log.go -
Step 1: 写 StderrDashboard
package lib
import (
"fmt"
"io"
"os"
"time"
)
type Dashboard struct {
scenario string
out io.Writer
}
func NewDashboard(scenario string) *Dashboard {
return &Dashboard{scenario: scenario, out: os.Stderr}
}
func (d *Dashboard) PerSecondLine(t time.Time, targetRPS, actualRPS float64, p50, p95, p99 int64, errRate float64, vu int) {
fmt.Fprintf(d.out, "[%s] %-12s | target=%5.0f actual=%6.1f | p50=%4d p95=%4d p99=%5d | err=%.1f%% | vu=%d\n",
t.Format("15:04:05"), d.scenario, targetRPS, actualRPS,
p50/1000, p95/1000, p99/1000, errRate*100, vu)
}
func (d *Dashboard) PerMinuteSummary(elapsed time.Duration, totalReqs int64, errs int64, h interface{ ValueAtQuantile(float64) int64 }, status2xx, status4xx, status5xx int64) {
p50 := h.ValueAtQuantile(50) / 1000
p95 := h.ValueAtQuantile(95) / 1000
p99 := h.ValueAtQuantile(99) / 1000
fmt.Fprintf(d.out, "═══════════════════════════════════════════════════════════════\n")
fmt.Fprintf(d.out, " Requests: %d (%.1f/s avg)\n", totalReqs, float64(totalReqs)/elapsed.Seconds())
fmt.Fprintf(d.out, " Errors: %d (%.1f%%)\n", errs, float64(errs)/float64(totalReqs)*100)
fmt.Fprintf(d.out, " Latency p50: %dms p95: %dms p99: %dms\n", p50, p95, p99)
fmt.Fprintf(d.out, " Status: 2xx=%.1f%% 4xx=%.1f%% 5xx=%.1f%%\n",
float64(status2xx)/float64(totalReqs)*100,
float64(status4xx)/float64(totalReqs)*100,
float64(status5xx)/float64(totalReqs)*100)
fmt.Fprintf(d.out, "═══════════════════════════════════════════════════════════════\n")
}
- Step 2: 验证编译并提交
cd backend && go build ./scripts/loadgen/loadgen/lib/
git add backend/scripts/loadgen/loadgen/lib/log.go
git commit -m "feat(loadgen): stderr per-second + per-minute dashboard lines"
Task 25: lib/ramp.go - 阶梯调度器
Files:
-
创建:
backend/scripts/loadgen/loadgen/lib/ramp.go -
创建:
backend/scripts/loadgen/loadgen/lib/ramp_test.go -
Step 1: 写 StageScheduler
package lib
import "time"
type Stage struct {
RPS int
Duration time.Duration
}
type StageScheduler struct {
stages []Stage
idx int
start time.Time
}
func NewStageScheduler(stages []Stage) *StageScheduler {
return &StageScheduler{stages: stages, start: time.Now()}
}
// CurrentRPS returns the target RPS for the current stage, or 0 if all stages done.
func (s *StageScheduler) CurrentRPS() int {
if s.idx >= len(s.stages) { return 0 }
return s.stages[s.idx].RPS
}
// Advance checks if the current stage has elapsed and moves to the next.
func (s *StageScheduler) Advance() bool {
if s.idx >= len(s.stages) { return false }
elapsed := time.Since(s.start)
if elapsed >= s.stages[s.idx].Duration {
s.idx++
s.start = time.Now()
return s.idx < len(s.stages)
}
return true
}
func (s *StageScheduler) StageIndex() int { return s.idx }
func (s *StageScheduler) StageCount() int { return len(s.stages) }
- Step 2: 写测试
package lib
import (
"testing"
"time"
)
func TestStageScheduler(t *testing.T) {
s := NewStageScheduler([]Stage{
{RPS: 10, Duration: 50 * time.Millisecond},
{RPS: 20, Duration: 50 * time.Millisecond},
})
if got := s.CurrentRPS(); got != 10 { t.Errorf("first rps=%d", got) }
time.Sleep(60 * time.Millisecond)
if !s.Advance() { t.Fatal("should still have more stages") }
if got := s.CurrentRPS(); got != 20 { t.Errorf("second rps=%d", got) }
time.Sleep(60 * time.Millisecond)
if s.Advance() { t.Fatal("should be done") }
if got := s.CurrentRPS(); got != 0 { t.Errorf("after end rps=%d", got) }
}
- Step 3: 运行 + 提交
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestStageScheduler
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): stage scheduler for RPS ramp-up"
Task 26: lib/circuit.go - 6 维红线判停
Files:
- 创建:
backend/scripts/loadgen/loadgen/lib/circuit.go - 创建:
backend/scripts/loadgen/loadgen/lib/circuit_test.go
关键(spec §7.3):6 维红线 = R1 错误率 + R2 P99 + R3 5xx(客户端) + R4 PG 连接 + R5 磁盘 + R6 OOM(服务端)。 R1-R3 来自 loadgen 自身 metrics,R4-R6 来自
metrics-feed.jsonl(sample.sh写入 prod 端)。 两者都要 check → CircuitBreaker 既接受 client-side Metrics,也接受 server-side ServerMetrics。
- Step 1: 写 CircuitBreaker
package lib
import (
"sync"
"time"
)
type CircuitState int
const (
CircuitOK CircuitState = iota
CircuitTripped
)
type CircuitBreaker struct {
mu sync.Mutex
state CircuitState
// R1/R2/R3 触发计时
errRateStart time.Time
p99Start time.Time
fiveXXStart time.Time
// R4/R5/R6 触发计时
pgConnStart time.Time
diskStart time.Time
oomSeen bool
// 阈值(详见 spec §7.3)
ErrRate float64 // R1: 0.05
P99ThresholdMs int64 // R2: 3000
FiveXXRate float64 // R3: 0.10
PGConnMax int // R4: 42 (50×85%, max_connections 50 时的 85%)
DiskMinGB int // R5: 5
SustainTime time.Duration // R1/R2: 30s
Sustain5xx time.Duration // R3: 10s
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
ErrRate: 0.05,
P99ThresholdMs: 3000,
FiveXXRate: 0.10,
PGConnMax: 42, // 50×85% (spec §2.1 把 max_connections 从 100 改 50 后,§7.3 R4 阈值要相应调整)
DiskMinGB: 5,
SustainTime: 30 * time.Second,
Sustain5xx: 10 * time.Second,
}
}
// ClientMetrics 来自 loadgen 自身(R1/R2/R3)
type ClientMetrics struct {
ErrorRate float64
P99Ms int64
FiveXXRate float64
}
// ServerMetrics 来自 metrics-feed.jsonl 解析(R4/R5/R6)
type ServerMetrics struct {
PGActive int
DiskGB int
OOMEvent bool
}
// Check 综合客户端 + 服务端指标,任一红线持续达 SustainTime 即跳闸
func (cb *CircuitBreaker) Check(client ClientMetrics, server ServerMetrics, now time.Time) bool {
cb.mu.Lock()
defer cb.mu.Unlock()
// R1: 错误率
if client.ErrorRate > cb.ErrRate {
if cb.errRateStart.IsZero() { cb.errRateStart = now }
if now.Sub(cb.errRateStart) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.errRateStart = time.Time{} }
// R2: P99
if client.P99Ms > cb.P99ThresholdMs {
if cb.p99Start.IsZero() { cb.p99Start = now }
if now.Sub(cb.p99Start) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.p99Start = time.Time{} }
// R3: 5xx
if client.FiveXXRate > cb.FiveXXRate {
if cb.fiveXXStart.IsZero() { cb.fiveXXStart = now }
if now.Sub(cb.fiveXXStart) >= cb.Sustain5xx {
cb.state = CircuitTripped
return true
}
} else { cb.fiveXXStart = time.Time{} }
// R4: PG 连接数(基于 max_connections=50 重新校准的 85% = 42)
if server.PGActive > cb.PGConnMax {
if cb.pgConnStart.IsZero() { cb.pgConnStart = now }
if now.Sub(cb.pgConnStart) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.pgConnStart = time.Time{} }
// R5: 磁盘空闲
if server.DiskGB < cb.DiskMinGB {
if cb.diskStart.IsZero() { cb.diskStart = now }
if now.Sub(cb.diskStart) >= cb.SustainTime {
cb.state = CircuitTripped
return true
}
} else { cb.diskStart = time.Time{} }
// R6: OOM 事件(瞬时即触发,无需 sustain)
if server.OOMEvent || cb.oomSeen {
cb.oomSeen = true
cb.state = CircuitTripped
return true
}
return false
}
func (cb *CircuitBreaker) State() CircuitState { return cb.state }
- Step 2: 写测试
package lib
import (
"testing"
"time"
)
func TestCircuitBreaker_R1(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if cb.Check(ClientMetrics{ErrorRate: 0.06}, ServerMetrics{}, now) {
t.Error("R1 should not trip on first check")
}
if !cb.Check(ClientMetrics{ErrorRate: 0.06}, ServerMetrics{}, now.Add(31*time.Second)) {
t.Error("R1 should trip after 30s sustain")
}
}
func TestCircuitBreaker_R2(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if !cb.Check(ClientMetrics{P99Ms: 4000}, ServerMetrics{}, now.Add(31*time.Second)) {
t.Error("R2 P99>3000 sustained 30s should trip")
}
}
func TestCircuitBreaker_R4_PGConn(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if !cb.Check(ClientMetrics{}, ServerMetrics{PGActive: 50}, now.Add(31*time.Second)) {
t.Error("R4 PG active > 42 sustained should trip")
}
}
func TestCircuitBreaker_R5_Disk(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
if !cb.Check(ClientMetrics{}, ServerMetrics{DiskGB: 3}, now.Add(31*time.Second)) {
t.Error("R5 disk < 5GB sustained should trip")
}
}
func TestCircuitBreaker_R6_OOM_Instant(t *testing.T) {
cb := NewCircuitBreaker()
if !cb.Check(ClientMetrics{}, ServerMetrics{OOMEvent: true}, time.Now()) {
t.Error("R6 OOM should trip instantly without sustain")
}
}
func TestCircuitBreaker_Recovers(t *testing.T) {
cb := NewCircuitBreaker()
now := time.Now()
cb.Check(ClientMetrics{ErrorRate: 0.06}, ServerMetrics{}, now)
if cb.Check(ClientMetrics{ErrorRate: 0.01}, ServerMetrics{}, now.Add(10*time.Second)) {
t.Error("should not trip when error drops")
}
if cb.State() != CircuitOK { t.Error("should remain OK") }
}
- Step 3: 运行 + 提交
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestCircuit
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): 6-dim circuit breaker (R1/R2/R3 client + R4/R5/R6 server)"
Task 27: lib/ssh_metrics.go - SSH 远程监控 + 解析为 ServerMetrics
Files:
-
创建:
backend/scripts/loadgen/loadgen/lib/ssh_metrics.go -
创建:
backend/scripts/loadgen/loadgen/lib/ssh_metrics_test.go -
Step 1: 写 TailMetricsFeed + 解析为 ServerMetrics
package lib
import (
"bufio"
"os/exec"
"strconv"
"strings"
)
type MetricsLine struct {
Timestamp string
Fields map[string]string
}
// TailMetricsFeed runs `ssh host tail -F /opt/topfans/loadtest/metrics-feed.jsonl` and emits parsed lines.
func TailMetricsFeed(sshTarget, path string) (<-chan MetricsLine, error) {
out := make(chan MetricsLine, 100)
cmd := exec.Command("ssh", sshTarget, "tail -F "+path)
stdout, err := cmd.StdoutPipe()
if err != nil { return nil, err }
if err := cmd.Start(); err != nil { return nil, err }
go func() {
defer close(out)
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
line := scanner.Text()
ml := parseMetricsLine(line)
select {
case out <- ml:
default: // drop if buffer full
}
}
}()
return out, nil
}
func parseMetricsLine(s string) MetricsLine {
parts := strings.SplitN(s, " ", 2)
ml := MetricsLine{Fields: make(map[string]string)}
if len(parts) >= 1 { ml.Timestamp = parts[0] }
if len(parts) == 2 {
for _, kv := range strings.Fields(parts[1]) {
eq := strings.SplitN(kv, "=", 2)
if len(eq) == 2 { ml.Fields[eq[0]] = eq[1] }
}
}
return ml
}
// ToServerMetrics 解析一行 metrics-feed 为 R4/R5/R6 指标
// 期望格式: "<ts> pg_active=42 disk_free=12 topfans-postgres=12.3% topfans-gateway=5.1% ..."
// OOM 事件通过单独 `oom=true` 标识(由 sample.sh 检测 docker events 注入)
func (ml MetricsLine) ToServerMetrics() ServerMetrics {
sm := ServerMetrics{}
if v, ok := ml.Fields["pg_active"]; ok {
sm.PGActive, _ = strconv.Atoi(v)
}
if v, ok := ml.Fields["disk_free"]; ok {
sm.DiskGB, _ = strconv.Atoi(v)
}
if v, ok := ml.Fields["oom"]; ok && v == "true" {
sm.OOMEvent = true
}
return sm
}
- Step 2: 写测试
package lib
import "testing"
func TestParseMetricsLine(t *testing.T) {
ml := parseMetricsLine("1700000000 pg_active=42 disk_free=12 oom=false")
if ml.Timestamp != "1700000000" { t.Errorf("ts=%q", ml.Timestamp) }
if ml.Fields["pg_active"] != "42" { t.Errorf("pg=%q", ml.Fields["pg_active"]) }
if ml.Fields["disk_free"] != "12" { t.Errorf("disk=%q", ml.Fields["disk_free"]) }
}
func TestToServerMetrics(t *testing.T) {
ml := parseMetricsLine("1700000000 pg_active=50 disk_free=3 oom=true")
sm := ml.ToServerMetrics()
if sm.PGActive != 50 { t.Errorf("pg=%d", sm.PGActive) }
if sm.DiskGB != 3 { t.Errorf("disk=%d", sm.DiskGB) }
if !sm.OOMEvent { t.Error("oom should be true") }
}
- Step 3: 验证编译 + 提交
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v -run TestParse
cd backend && go build ./scripts/loadgen/loadgen/lib/
git add backend/scripts/loadgen/loadgen/lib/
git commit -m "feat(loadgen): ssh tail metrics-feed + parse to ServerMetrics for R4/R5/R6"
Task 28: loadgen runLoadgen 主循环
Files:
-
修改:
backend/scripts/loadgen/loadgen/main.go -
Step 1: 实现 runLoadgen 框架(集成 lib + scenarios)
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
"github.com/topfans/backend/scripts/loadgen/loadgen/scenarios"
"github.com/topfans/backend/scripts/loadgen/loadgen/reporter"
)
func runLoadgen(target, scenarioIDs, stage string, rps, vus int, duration, interPause time.Duration, monitorMode, prodSSH, stepSchedule string) error {
users, err := lib.LoadUsers("users.csv")
if err != nil { return fmt.Errorf("load users.csv: %w", err) }
client := lib.NewHTTPClient(target)
recorder := lib.NewLatencyRecorder()
breaker := lib.NewCircuitBreaker()
dashboard := lib.NewDashboard(scenarioIDs)
var errCount, totalCount, fiveXXCount atomic.Int64
var clientLatencyP99Us atomic.Int64 // for R7 (loadgen self-latency drift)
// metrics feed consumer (R4/R5/R6)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if monitorMode != "off" && prodSSH != "" {
feed, err := lib.TailMetricsFeed(prodSSH, "/opt/topfans/loadtest/metrics-feed.jsonl")
if err != nil { return fmt.Errorf("tail metrics: %w", err) }
go consumeServerMetrics(ctx, feed, breaker)
}
// SIGINT handler
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("SIGINT received, shutting down gracefully...")
cancel()
}()
// parse step schedule (e.g. "2,5,10,15,25,40")
var stages []int
if stage == "step" {
if stepSchedule == "" {
return fmt.Errorf("--step-schedule required for --stage=step (e.g. --step-schedule='2,5,10,15,25,40')")
}
for _, s := range strings.Split(stepSchedule, ",") {
var v int
fmt.Sscanf(s, "%d", &v)
if v > 0 { stages = append(stages, v) }
}
if len(stages) == 0 {
return fmt.Errorf("--step-schedule invalid: %q", stepSchedule)
}
}
// run scenarios
ids := strings.Split(scenarioIDs, ",")
for _, id := range ids {
s, err := scenarios.Get(id, client, users, &errCount, &totalCount, &fiveXXCount, recorder, &clientLatencyP99Us)
if err != nil { return err }
// 给场景传 stage schedule(stage=baseline 不用,step 用)
if err := s.Run(ctx, rps, duration, dashboard, breaker, stages); err != nil {
return err
}
time.Sleep(interPause)
}
// write report
if err := reporter.WriteJSON("report.json", recorder.Snapshot(), totalCount.Load(), errCount.Load(), fiveXXCount.Load()); err != nil {
return err
}
return nil
}
// consumeServerMetrics 把 metrics-feed.jsonl 行喂给 CircuitBreaker.Check()
func consumeServerMetrics(ctx context.Context, feed <-chan lib.MetricsLine, breaker *lib.CircuitBreaker) {
ticker := time.NewTicker(5 * time.Second)
var latestServer lib.ServerMetrics
for {
select {
case <-ctx.Done(): return
case ml := <-feed:
latestServer = ml.ToServerMetrics()
case <-ticker.C:
// loadgen 自身 metrics(R1/R2/R3) 由各场景直接 call breaker.Check 注入
// 此处只更新 server-side R4/R5/R6
// TODO: 也注入 clientP99 - 需要 R7 联动,见 spec §8.1
}
}
}
- Step 2: 给 main.go 添加 --step-schedule flag
修改 main.go 的 flag 定义:
stepSchedule = flag.String("step-schedule", "", "comma-separated RPS list for --stage=step, e.g. '2,5,10,15,25,40' for S1")
并把 *stepSchedule 传给 runLoadgen 最后一个参数。
- Step 3: 编译 + 提交(可能因 scenarios 缺失失败,先用占位)
cd backend && go build ./scripts/loadgen/loadgen/ 2>&1 | head -20
预期会报
scenarios.Get未定义,继续 Task 30 实现。
git add backend/scripts/loadgen/loadgen/main.go
git commit -m "feat(loadgen): runLoadgen main loop with 6-dim circuit breaker + step schedule flag"
Task 29: 全部 lib 测试一次跑通
Files:
-
修改: 无,跑测试
-
Step 1: 运行所有 lib 测试
cd backend && go test ./scripts/loadgen/loadgen/lib/ -v
Expected: 全部 PASS。
- Step 2: 提交(若修改了)
git status
# 如有修改,提交
Phase 6: loadgen 场景开发(Day 1 晚上-次日凌晨)
Task 30: scenarios 注册表
Files:
-
创建:
backend/scripts/loadgen/loadgen/scenarios/scenarios.go -
Step 1: 写 Scenario 接口 + 注册表
package scenarios
import (
"context"
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
)
type Scenario interface {
Run(ctx context.Context, rpsOverride int, durationOverride time.Duration, dash *lib.Dashboard, breaker *lib.CircuitBreaker) error
}
var registry = map[string]func(client *http.Client, users []lib.TestUser, errCount, totalCount, fiveXXCount *atomic.Int64, rec *lib.LatencyRecorder) Scenario{}
// Get retrieves a scenario by ID (e.g. "S1").
func Get(id string, client *http.Client, users []lib.TestUser, errCount, totalCount, fiveXXCount *atomic.Int64, rec *lib.LatencyRecorder) (Scenario, error) {
factory, ok := registry[id]
if !ok { return nil, fmt.Errorf("unknown scenario: %s", id) }
return factory(client, users, errCount, totalCount, fiveXXCount, rec), nil
}
func register(id string, factory func(*http.Client, []lib.TestUser, *atomic.Int64, *atomic.Int64, *atomic.Int64, *lib.LatencyRecorder) Scenario) {
registry[id] = factory
}
- Step 2: 提交
cd backend && go build ./scripts/loadgen/loadgen/scenarios/
git add backend/scripts/loadgen/loadgen/scenarios/scenarios.go
git commit -m "feat(loadgen): scenario interface + registry"
Task 31: S1 登录
Files:
-
创建:
backend/scripts/loadgen/loadgen/scenarios/s1_login.go -
Step 1: 写 S1
package scenarios
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync/atomic"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
)
type s1Login struct {
client *http.Client
users []lib.TestUser
errCount *atomic.Int64
totalCount *atomic.Int64
fiveXXCount *atomic.Int64
rec *lib.LatencyRecorder
}
func init() { register("S1", newS1) }
func newS1(c *http.Client, u []lib.TestUser, e, t, f *atomic.Int64, r *lib.LatencyRecorder) Scenario {
return &s1Login{client: c, users: u, errCount: e, totalCount: t, fiveXXCount: f, rec: r}
}
func (s *s1Login) Run(ctx context.Context, rpsOverride int, durationOverride time.Duration, dash *lib.Dashboard, breaker *lib.CircuitBreaker) error {
targetRPS := rpsOverride
if targetRPS == 0 { targetRPS = 15 } // 默认拐点
duration := durationOverride
if duration == 0 { duration = 2 * time.Minute }
ticker := time.NewTicker(time.Second / time.Duration(targetRPS))
defer ticker.Stop()
timeout := time.NewTimer(duration)
defer timeout.Stop()
for {
select {
case <-ctx.Done(): return nil
case <-timeout.C: return nil
case <-ticker.C:
u := s.users[rand.Intn(len(s.users))]
s.doLogin(u)
}
}
}
func (s *s1Login) doLogin(u lib.TestUser) {
body, _ := json.Marshal(map[string]string{"mobile": u.Phone, "password": u.Password})
req, _ := http.NewRequest("POST", "/api/v1/auth/login", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
s.fireRequest(req)
}
func (s *s1Login) fireRequest(req *http.Request) {
start := time.Now()
resp, err := s.client.Do(req)
latency := time.Since(start)
s.rec.Record(latency.Microseconds())
s.totalCount.Add(1)
if err != nil {
s.errCount.Add(1)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
s.fiveXXCount.Add(1)
s.errCount.Add(1)
} else if resp.StatusCode >= 400 {
s.errCount.Add(1)
}
}
真实路径需要 baseURL,简化为 client.Do 时拼上。后续 Task 集中处理。
- Step 2: 抽出 fireRequest 到公共文件
创建 backend/scripts/loadgen/loadgen/scenarios/common.go:
package scenarios
import (
"net/http"
"time"
"github.com/topfans/backend/scripts/loadgen/loadgen/lib"
)
func doRequest(client *http.Client, req *http.Request, rec *lib.LatencyRecorder, errCount, totalCount, fiveXXCount *atomic.Int64) {
start := time.Now()
resp, err := client.Do(req)
latency := time.Since(start)
rec.Record(latency.Microseconds())
totalCount.Add(1)
if err != nil { errCount.Add(1); return }
defer resp.Body.Close()
switch {
case resp.StatusCode >= 500:
fiveXXCount.Add(1)
errCount.Add(1)
case resp.StatusCode >= 400:
errCount.Add(1)
}
}
- Step 3: 重构 s1 使用公共函数 + 提交
func (s *s1Login) doLogin(u lib.TestUser) {
body, _ := json.Marshal(map[string]string{"mobile": u.Phone, "password": u.Password})
req, _ := http.NewRequest("POST", baseURL+"/api/v1/auth/login", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
const baseURL = "http://101.132.250.62:8080" // TODO: from --target
cd backend && go build ./scripts/loadgen/loadgen/scenarios/
git add backend/scripts/loadgen/loadgen/scenarios/
git commit -m "feat(scenario): S1 login + common doRequest helper"
Task 32-36: S2-S7 场景(模板化批量)
S2-S7 的写法与 S1 高度相似。本计划只给骨架,实施时按 S1 复制,每个场景 ~80 行 Go。 重要:实施前必读 spec §5.1 + 附录 B(接口路径速查),并在以下关键代码位置反查项目源码(以
grep验证最新签名):
| 场景 | handler 路径参考 | 必查项 |
|---|---|---|
| S2 | backend/services/assetService/provider/asset_mobile_provider.go |
GET /assets/me/items query 参数;GET /assets/:id 路径 |
| S3 | backend/services/socialService/provider/social_mobile_provider.go |
like 接口的 exhibition_id 来源(query 还是 body) |
| S4 | backend/services/assetService/provider/asset_mint_provider.go |
precreate / mints 完整 body schema(material_id vs material_url) |
| S5 | backend/services/statisticService/provider/dashboard_*.go |
7 个 dashboard 接口的实际路径(可能带版本前缀) |
| S6 | backend/services/assetService/provider/asset_ranking_provider.go |
dimension 枚举值(已确认:displaying/month/total),star_id 是否必须 |
| S7 | backend/services/galleryService/provider/gallery_mobile_provider.go |
place 接口的 slot_id 来源,DELETE 路径格式 |
Files (批量创建):
backend/scripts/loadgen/loadgen/scenarios/s2_read.gobackend/scripts/loadgen/loadgen/scenarios/s3_like.gobackend/scripts/loadgen/loadgen/scenarios/s4_mint.gobackend/scripts/loadgen/loadgen/scenarios/s5_dashboard.gobackend/scripts/loadgen/loadgen/scenarios/s6_ranking.gobackend/scripts/loadgen/loadgen/scenarios/s7_place.go
统一要求(每个场景):
- 接口路径和 body/query 严格按 spec §5.1 + 附录 B,实施前先 grep handler 源确认
- 数据依赖从
users[i].AssetIDs/ExhibitionIDs随机选 - S4 调用
resetMintData()在每阶/每轮结束后通过 ssh 触发mint_reset.sh - S5 一次"用户会话"= 7 个
/dashboard/*串行请求,baseURL×7,RPS 按会话计 - S6 一次请求循环 24 种参数组合(2×3×4)
- S7 用
slot_index=3留作压测槽位 - 默认 RPS / duration 按 spec §5.3 表格
- 每个场景在 init() 里 register()
- Step 1: 写 S2 资产读
Step 1a: 实施前 grep handler 确认路径:
grep -rn "GetMyAssets\|/assets/me/items\|GetAssetByID" backend/services/assetService/provider/ | head
Step 1b: 写 S2:
// s2_read.go (摘要)
func (s *s2Read) Run(ctx, rpsOverride, durationOverride, ..., stages []int) {
targetRPS := pickRPS(rpsOverride, stages, 250) // step 时取 stages 第一个
duration := pickDuration(durationOverride, 2*time.Minute)
ticker := time.NewTicker(...)
for { select {
case <-ticker.C:
u := s.users[rand.Intn(len(s.users))]
// 50% 概率: 列表, 50%: 详情
if rand.Float64() < 0.5 {
req, _ := http.NewRequest("GET", baseURL+"/api/v1/assets/me/items?page=1&page_size=20", nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
} else {
assetID := u.AssetIDs[rand.Intn(len(u.AssetIDs))]
req, _ := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/assets/%d", baseURL, assetID), nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}}
}
- Step 2: 写 S3 点赞(含 exhibition_id 处理)
Step 2a: grep 确认 exhibition_id 怎么传:
grep -rn "exhibition_id\|ExhibitionID" backend/services/socialService/provider/ | grep -i like
Step 2b: 写 S3(基于 seed 阶段记录的 exhibition_id):
// s3_like.go
func (s *s3Like) doLike(u lib.TestUser) {
// 选一个 user 已上架的 exhibition(每 user 2 个)
if len(u.ExhibitionIDs) == 0 {
return // 没上架的 asset, 跳过
}
exID := u.ExhibitionIDs[rand.Intn(len(u.ExhibitionIDs))]
// 查 exhibitions 表拿 asset_id(因为 seed 没存 asset_id per exhibition,只存了 exID)
// 也可以从 u.AssetIDs 中选(每个 user 的 1, 2 号 asset 都已上架)
assetID := u.AssetIDs[rand.Intn(2)] // 1, 2 号(已上架)
// 50% 点赞, 50% 取消点赞(交替避免 like_count 单调)
if rand.Float64() < 0.5 {
// POST like
body := fmt.Sprintf(`{"exhibition_id":%d}`, exID)
req, _ := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/social/assets/%d/like", baseURL, assetID), strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
} else {
// DELETE like
req, _ := http.NewRequest("DELETE", fmt.Sprintf("%s/api/v1/social/assets/%d/like", baseURL, assetID), nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}
⚠️ 如果 grep 发现 exhibition_id 走 query 而非 body,调整 body 字段到 query string。
- Step 3: 写 S4 铸造(含 reset)
⚠️ 跨包引用修复:
LoadtestPlaceholderURL在seedpackage(package main),Go 不允许scenariospackage(package scenarios)导入package main。 修复方案:把常量放到loadgen/lib/config.go(新文件),seed 和 scenarios 都从 env 读,默认值两边各定义一份。
Step 3a: 创建 backend/scripts/loadgen/loadgen/lib/config.go:
package lib
import "os"
const DefaultLoadtestPlaceholderURL = "<OSS_URL>/loadtest-placeholder.png"
// LoadtestPlaceholderURL 优先从 $LOADTEST_PLACEHOLDER_URL 读
func LoadtestPlaceholderURL() string {
if v := os.Getenv("LOADTEST_PLACEHOLDER_URL"); v != "" { return v }
return DefaultLoadtestPlaceholderURL
}
const (
LoadtestStarID = int64(999900)
LoadtestUserMin = int64(30000001)
LoadtestUserMax = int64(30001000)
)
Step 3b: seed/assets.go 改为:
// 把原 const LoadtestPlaceholderURL 改为:
func loadtestPlaceholderURL() string {
if v := os.Getenv("LOADTEST_PLACEHOLDER_URL"); v != "" { return v }
return "<OSS_URL>/loadtest-placeholder.png" // 与 lib.DefaultLoadtestPlaceholderURL 同步
}
Step 3c: 写 s4_mint.go:
import "os/exec"
import "github.com/topfans/backend/scripts/loadgen/loadgen/lib"
type s4Mint struct {
// ... 通用字段
prodSSH string
roundIdx int
}
func (s *s4Mint) Run(ctx, rpsOverride, durationOverride, ..., stages []int) {
for stageIdx, stageRPS := range stages {
log.Printf("S4 stage %d: %d RPS × %v", stageIdx, stageRPS, 2*time.Minute)
runStage(ctx, stageRPS, 2*time.Minute, s)
log.Printf("S4 stage %d done, resetting...", stageIdx)
// ssh 触发 reset(每阶必须 reset,否则 10k 配额耗尽)
cmd := exec.Command("ssh", s.prodSSH, "bash /opt/topfans/loadtest/scripts/mint_reset.sh")
if out, err := cmd.CombinedOutput(); err != nil {
log.Printf("⚠️ mint reset failed: %v\n%s", err, out)
}
s.roundIdx++
}
}
func (s *s4Mint) doMint(u lib.TestUser) {
// 1. POST /api/v1/assets/mints/precreate {material_url, name, info, ...}
precreateBody, _ := json.Marshal(map[string]any{
"material_url": lib.LoadtestPlaceholderURL(),
"name": fmt.Sprintf("loadtest_mint_%d_round%d_%d", u.UserID, s.roundIdx, time.Now().UnixNano()),
"info": "loadtest",
"material_id": 0, // 如有需要 grep 确认
})
req, _ := http.NewRequest("POST", baseURL+"/api/v1/assets/mints/precreate", bytes.NewReader(precreateBody))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
// ... doRequest, 解析返回的 order_id
// 2. POST /api/v1/assets/mints {order_id}
}
实施前必查: grep
precreate在 assetService handler 中确认 body schema,特别是material_urlvsmaterial_id哪个必填。
- Step 4: 写 S5 看板
endpoints := []string{
"/api/v1/dashboard/today-overview",
"/api/v1/dashboard/income-curve",
"/api/v1/dashboard/exhibition-summary",
"/api/v1/dashboard/like-income-by-level",
"/api/v1/dashboard/top-assets",
"/api/v1/dashboard/level-distribution",
"/api/v1/dashboard/upgrade-progress",
}
func (s *s5Dashboard) doSession(u lib.TestUser) {
sessionStart := time.Now()
for _, ep := range endpoints {
req, _ := http.NewRequest("GET", baseURL+ep, nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
s.rec.Record(time.Since(sessionStart).Microseconds()) // P95 按会话
}
- Step 5: 写 S6 榜单
var (
s6Dimensions = []string{"displaying", "month", "total"}
s6StarIDs = []int64{87, 88, 93, 999900}
s6Endpoints = []string{"/api/v1/rankings/hot", "/api/v1/rankings/original"}
)
func (s *s6Ranking) doOne() {
for _, ep := range s6Endpoints {
for _, dim := range s6Dimensions {
for _, sid := range s6StarIDs {
url := fmt.Sprintf("%s%s?dimension=%s&star_id=%d&page=1&page_size=10",
baseURL, ep, dim, sid)
req, _ := http.NewRequest("GET", url, nil)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}
}
}
- Step 6: 写 S7 上架
// seed 没存 slot_id per user, 需在 seed 阶段补充:
// 在 seed/tokens.go 里多读一列 booth_slots.slot_id WHERE slot_index=3
// 然后 TestUser 加 SlotID3 字段
// S7 用 slot_index=3(留作压测槽位),从 user 未上架的 asset_ids[2..4] 选
func (s *s7Place) doPlace(u lib.TestUser) {
if u.SlotID3 == 0 || len(u.AssetIDs) < 3 { return }
// 50% place, 50% unplace
if rand.Float64() < 0.5 {
assetID := u.AssetIDs[2+rand.Intn(3)] // 3,4,5 号
body, _ := json.Marshal(map[string]any{
"slot_id": u.SlotID3,
"asset_id": assetID,
})
req, _ := http.NewRequest("POST", baseURL+"/api/v1/galleries/place", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
} else {
req, _ := http.NewRequest("DELETE", fmt.Sprintf("%s/api/v1/galleries/slots/%d/asset", baseURL, u.SlotID3), nil)
req.Header.Set("Authorization", "Bearer "+u.JWTToken)
doRequest(s.client, req, s.rec, s.errCount, s.totalCount, s.fiveXXCount)
}
}
- Step 7: 补充 seed/tokens.go 增加 SlotID3 字段
在 Task 15 GenerateTokensForLoadtest 中,SELECT 多加一列 booth_slots.slot_id WHERE slot_index=3,TestUser 加 SlotID3 int64,CSV 多写一列。同步 csv.go 的 LoadUsers 解析。
- Step 8: 验证编译 + 提交
cd backend && go build ./scripts/loadgen/loadgen/scenarios/
git add backend/scripts/loadgen/loadgen/scenarios/ backend/scripts/loadgen/seed/tokens.go backend/scripts/loadgen/loadgen/lib/csv.go
git commit -m "feat(scenario): S2-S7 implementations (read/like/mint/dashboard/ranking/place) + SlotID3"
Task 37: scenarios 单元测试(构造逻辑)
Files:
-
创建:
backend/scripts/loadgen/loadgen/scenarios/scenarios_test.go -
Step 1: 测试场景注册表
package scenarios
import "testing"
func TestAllScenariosRegistered(t *testing.T) {
expected := []string{"S1", "S2", "S3", "S4", "S5", "S6", "S7"}
for _, id := range expected {
if _, ok := registry[id]; !ok {
t.Errorf("scenario %s not registered", id)
}
}
}
func TestMintNameFormat(t *testing.T) {
name := fmt.Sprintf("loadtest_mint_%d_round%d", int64(30000001), 3)
if !strings.HasPrefix(name, "loadtest_mint_") {
t.Errorf("name %q must start with loadtest_mint_", name)
}
}
- Step 2: 运行 + 提交
cd backend && go test ./scripts/loadgen/loadgen/scenarios/ -v
git add backend/scripts/loadgen/loadgen/scenarios/scenarios_test.go
git commit -m "test(scenario): verify all 7 scenarios registered, mint name format"
Phase 7: 报告 + 监控 + 恢复(次日凌晨前)
Task 38: reporter/json.go + csv.go
Files:
-
创建:
backend/scripts/loadgen/loadgen/reporter/json.go -
创建:
backend/scripts/loadgen/loadgen/reporter/csv.go -
Step 1: 写 JSON 报告
package reporter
import (
"encoding/json"
"os"
"github.com/HdrHistogram/hdrhistogram-go"
)
type RunReport struct {
TotalRequests int64 `json:"total_requests"`
Errors int64 `json:"errors"`
FiveXX int64 `json:"five_xx"`
P50Us int64 `json:"p50_us"`
P95Us int64 `json:"p95_us"`
P99Us int64 `json:"p99_us"`
MaxUs int64 `json:"max_us"`
}
func WriteJSON(path string, h *hdrhistogram.Histogram, total, errs, fiveXX int64) error {
r := RunReport{
TotalRequests: total,
Errors: errs,
FiveXX: fiveXX,
P50Us: h.ValueAtQuantile(50),
P95Us: h.ValueAtQuantile(95),
P99Us: h.ValueAtQuantile(99),
MaxUs: h.Max(),
}
f, err := os.Create(path)
if err != nil { return err }
defer f.Close()
return json.NewEncoder(f).Encode(r)
}
- Step 2: 写 CSV baseline
package reporter
import "os"
func WriteBaselineCSV(path string, scenarios []string, baseline map[string]RunReport) error {
f, err := os.Create(path)
if err != nil { return err }
defer f.Close()
_, err = f.WriteString("scenario,total,errors,five_xx,p50_ms,p95_ms,p99_ms,max_ms\n")
if err != nil { return err }
for _, s := range scenarios {
r := baseline[s]
_, err = f.WriteString(fmt.Sprintf("%s,%d,%d,%d,%.2f,%.2f,%.2f,%.2f\n",
s, r.TotalRequests, r.Errors, r.FiveXX,
float64(r.P50Us)/1000, float64(r.P95Us)/1000,
float64(r.P99Us)/1000, float64(r.MaxUs)/1000))
if err != nil { return err }
}
return nil
}
- Step 3: 验证 + 提交
cd backend && go build ./scripts/loadgen/loadgen/reporter/
git add backend/scripts/loadgen/loadgen/reporter/
git commit -m "feat(reporter): JSON + baseline CSV writers"
Task 39: reporter/plot.go - 三联图
Files:
-
创建:
backend/scripts/loadgen/loadgen/reporter/plot.go -
Step 1: 写 PlotRPSLatencyError
package reporter
import (
"gonum.org/v1/gonum/plot"
"gonum.org/v1/gonum/plot/plotter"
"gonum.org/v1/gonum/plot/vg"
"os"
)
func PlotRPSLatencyError(scenario string, samples []Sample, outPath string) error {
p := plot.New()
p.Title.Text = scenario + " — RPS / P99 / Error"
p.X.Label.Text = "Stage"
p.Y.Label.Text = "Value"
rpsPts := make(plotter.XYs, len(samples))
p99Pts := make(plotter.XYs, len(samples))
errPts := make(plotter.XYs, len(samples))
for i, s := range samples {
rpsPts[i].X = float64(i)
rpsPts[i].Y = s.RPS
p99Pts[i].X = float64(i)
p99Pts[i].Y = s.P99Ms
errPts[i].X = float64(i)
errPts[i].Y = s.ErrorRate * 100
}
rpsLine, _ := plotter.NewLine(rpsPts)
p99Line, _ := plotter.NewLine(p99Pts)
errLine, _ := plotter.NewLine(errPts)
p.Add(rpsLine, p99Line, errLine)
p.Legend.Add("RPS", rpsLine)
p.Legend.Add("P99 ms", p99Line)
p.Legend.Add("Error %", errLine)
f, err := os.Create(outPath)
if err != nil { return err }
defer f.Close()
return p.Save(12*vg.Inch, 6*vg.Inch, f)
}
type Sample struct {
RPS float64
P99Ms float64
ErrorRate float64
}
- Step 2: 提交
cd backend && go build ./scripts/loadgen/loadgen/reporter/
git add backend/scripts/loadgen/loadgen/reporter/plot.go
git commit -m "feat(reporter): gonum/plot RPS-Latency-Error three-line chart"
Task 40: reporter/markdown.go
Files:
-
创建:
backend/scripts/loadgen/loadgen/reporter/markdown.go -
Step 1: 写 GenerateMarkdown
package reporter
import (
"fmt"
"os"
"time"
)
type ScenarioReport struct {
ID string
Stages []StageReport
KneeRPS int
TopBottleneck string
}
type StageReport struct {
RPS int
Duration time.Duration
P50Ms float64
P95Ms float64
P99Ms float64
ErrorRate float64
}
func GenerateMarkdown(path string, scenarios []ScenarioReport) error {
f, err := os.Create(path)
if err != nil { return err }
defer f.Close()
fmt.Fprintf(f, "# 压测报告\n\n")
for _, s := range scenarios {
fmt.Fprintf(f, "## %s\n\n", s.ID)
fmt.Fprintf(f, "**拐点 RPS**: %d\n\n", s.KneeRPS)
fmt.Fprintf(f, "**Top 瓶颈**: %s\n\n", s.TopBottleneck)
fmt.Fprintf(f, "| Stage | RPS | P50ms | P95ms | P99ms | Err%% |\n")
fmt.Fprintf(f, "|-------|-----|-------|-------|-------|------|\n")
for _, st := range s.Stages {
fmt.Fprintf(f, "| %s | %d | %.1f | %.1f | %.1f | %.1f |\n",
st.Duration, st.RPS, st.P50Ms, st.P95Ms, st.P99Ms, st.ErrorRate*100)
}
fmt.Fprintf(f, "\n")
}
return nil
}
- Step 2: 提交
cd backend && go build ./scripts/loadgen/loadgen/reporter/
git add backend/scripts/loadgen/loadgen/reporter/markdown.go
git commit -m "feat(reporter): markdown report generator"
Task 41: preflight.go - 7 项开压前检查
Files:
-
创建:
backend/scripts/loadgen/loadgen/preflight.go -
Step 1: 写 PreflightChecks
package main
import (
"fmt"
"net/http"
"os"
"os/exec"
"strings"
"time"
)
type CheckResult struct {
Name string
Passed bool
Detail string
}
func runPreflight(target, prodSSH string) error {
checks := []CheckResult{}
// ① Gateway /health
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(target + "/health")
checks = append(checks, CheckResult{
Name: "① Gateway /health",
Passed: err == nil && resp.StatusCode == 200,
Detail: fmt.Sprintf("status=%d err=%v", ifZero(resp.StatusCode), err),
})
// ② SSH 到 prod
cmd := exec.Command("ssh", "-o", "ConnectTimeout=5", prodSSH, "echo connected")
out, err := cmd.CombinedOutput()
checks = append(checks, CheckResult{
Name: "② SSH to prod",
Passed: err == nil && strings.Contains(string(out), "connected"),
Detail: strings.TrimSpace(string(out)),
})
// ③ pg_dump 备份文件存在
cmd = exec.Command("ssh", prodSSH, "ls -t /opt/topfans/backups/pre-loadtest-*.sql 2>/dev/null | head -1")
out, _ = cmd.Output()
backupFile := strings.TrimSpace(string(out))
info, statErr := os.Stat(backupFile)
sizeOK := statErr == nil && info.Size() > 50*1024*1024
checks = append(checks, CheckResult{
Name: "③ pg_dump backup exists (>50MB)",
Passed: sizeOK,
Detail: fmt.Sprintf("file=%s size=%d", backupFile, ifZero64(info.Size())),
})
// ④ 阿里云快照 (24h 内)
checks = append(checks, CheckResult{
Name: "④ 阿里云快照 < 24h (人工确认)",
Passed: true, // 需人工确认
Detail: "需运维在 ECS 控制台确认",
})
// ⑤ prod 磁盘空闲 > 10GB
cmd = exec.Command("ssh", prodSSH, "df -B1G /opt | tail -1 | awk '{print $4}'")
out, _ = cmd.Output()
var freeGB int
fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &freeGB)
checks = append(checks, CheckResult{
Name: "⑤ prod 磁盘空闲 > 10GB",
Passed: freeGB > 10,
Detail: fmt.Sprintf("free=%dGB", freeGB),
})
// ⑥ users.csv 加载
if _, err := os.Stat("users.csv"); err == nil {
if users, err := libLoadUsers("users.csv"); err == nil {
checks = append(checks, CheckResult{
Name: "⑥ users.csv 1000 rows",
Passed: len(users) == 1000,
Detail: fmt.Sprintf("rows=%d", len(users)),
})
}
} else {
checks = append(checks, CheckResult{Name: "⑥ users.csv 存在", Passed: false, Detail: err.Error()})
}
// ⑦ JWT_SECRET 验证
if len(os.Getenv("JWT_SECRET")) > 0 {
checks = append(checks, CheckResult{
Name: "⑦ JWT_SECRET set",
Passed: true,
Detail: "set",
})
} else {
checks = append(checks, CheckResult{Name: "⑦ JWT_SECRET set", Passed: false, Detail: "empty"})
}
// 输出
for _, c := range checks {
mark := "✓"
if !c.Passed { mark = "✗" }
fmt.Printf("%s %s — %s\n", mark, c.Name, c.Detail)
}
for _, c := range checks {
if !c.Passed { return fmt.Errorf("preflight failed: %s", c.Name) }
}
fmt.Println("ALL CHECKS PASSED — 可以开压")
return nil
}
- Step 2: 添加辅助函数
func ifZero(v int) int { if v == 0 { return -1 }; return v }
func ifZero64(v int64) int64 { if v == 0 { return -1 }; return v }
- Step 3: 验证 + 提交
cd backend && go build ./scripts/loadgen/loadgen/
git add backend/scripts/loadgen/loadgen/preflight.go
git commit -m "feat(loadgen): preflight 7 checks (health, ssh, backup, disk, csv, jwt)"
Task 42: verify.go - 压测后完整性校验
Files:
-
创建:
backend/scripts/loadgen/loadgen/verify.go -
Step 1: 写 runVerify
package main
import (
"fmt"
"os/exec"
"strings"
)
func runVerify(prodSSH string) error {
// 1. 真实数据未变(只查询 star_id != 999900)
pre := sshPG(prodSSH, "SELECT count(*) FROM mint_orders WHERE star_id != 999900")
fmt.Printf(" real mint_orders: %s\n", pre)
// 2. PG 连接已回落
conn := sshPG(prodSSH, "SELECT count(*) FROM pg_stat_activity")
fmt.Printf(" PG active conn: %s\n", conn)
// 3. 容器 RestartCount 未增(对比 baseline,简化: 只看绝对值)
out, _ := exec.Command("ssh", prodSSH, "docker ps --format '{{.Names}} {{.Status}}'").Output()
fmt.Printf(" 容器状态:\n%s\n", out)
return nil
}
func sshPG(ssh, query string) string {
cmd := exec.Command("ssh", ssh,
fmt.Sprintf(`export PGPASSWORD="${DB_PASSWORD:-postgres123}"; PG=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1); docker exec -e PGPASSWORD="$PGPASSWORD" "$PG" psql -U postgres -d topfans -t -c "%s"`, query))
out, _ := cmd.Output()
return strings.TrimSpace(string(out))
}
- Step 2: 验证 + 提交
cd backend && go build ./scripts/loadgen/loadgen/
git add backend/scripts/loadgen/loadgen/verify.go
git commit -m "feat(loadgen): post-test verify (real-data integrity, conn count, container status)"
Task 43: monitor/sample.sh
Files:
-
创建:
backend/scripts/loadgen/monitor/sample.sh -
Step 1: 写 sample.sh
#!/bin/bash
# /opt/topfans/loadtest/monitor/sample.sh
# 后台采样,写到 metrics-feed.jsonl
set -e
OUT="/opt/topfans/loadtest/metrics-feed.jsonl"
INTERVAL=${INTERVAL:-5}
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
[ -z "$PG_CONTAINER" ] && { echo "❌ 找不到 postgres 容器"; exit 1; }
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
echo "📊 sampling to $OUT every ${INTERVAL}s, pid $$"
while true; do
TS=$(date +%s)
# docker stats (一次性)
DOCKER=$(docker stats --no-stream --format '{{.Name}}={{.MemPerc}}' 2>/dev/null | tr '\n' ' ')
# pg active
PG_ACTIVE=$(docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -tA -c "SELECT count(*) FROM pg_stat_activity WHERE state='active'" 2>/dev/null)
# disk
DISK_FREE=$(df -B1G /opt | tail -1 | awk '{print $4}')
echo "$TS pg_active=$PG_ACTIVE disk_free=$DISK_FREE $DOCKER" >> "$OUT"
sleep "$INTERVAL"
done
- Step 2: 提交
chmod +x backend/scripts/loadgen/monitor/sample.sh
git add backend/scripts/loadgen/monitor/sample.sh
git commit -m "feat(monitor): sample.sh writes metrics-feed.jsonl every 5s"
Task 44: monitor/docker-compose.monitor.yml
Files:
-
创建:
backend/scripts/loadgen/monitor/docker-compose.monitor.yml -
Step 1: 写 prom + grafana + exporters
version: "3.8"
services:
cadvisor:
image: gcr.io/cadvisor/cadvisor:v0.47.0
container_name: topfans-cadvisor
volumes:
- /:/rootfs:ro
- /var/run:/var/run:ro
- /sys:/sys:ro
- /var/lib/docker/:/var/lib/docker:ro
ports:
- "8088:8080"
restart: unless-stopped
node-exporter:
image: prom/node-exporter:v1.7.0
container_name: topfans-node-exporter
pid: host
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
ports:
- "9100:9100"
restart: unless-stopped
postgres-exporter:
image: prometheuscommunity/postgres-exporter:v0.13.2
container_name: topfans-pg-exporter
environment:
DATA_SOURCE_NAME: "postgresql://postgres:${DB_PASSWORD:-postgres123}@postgres:5432/topfans?sslmode=disable"
ports:
- "9187:9187"
restart: unless-stopped
redis-exporter:
image: oliver006/redis_exporter:v1.58.0
container_name: topfans-redis-exporter
environment:
REDIS_ADDR: "redis://redis:6379"
ports:
- "9121:9121"
restart: unless-stopped
prometheus:
image: prom/prometheus:v2.51.2
container_name: topfans-prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml:ro
- ./grafana-dashboards/:/etc/grafana/dashboards/:ro
ports:
- "9090:9090"
restart: unless-stopped
grafana:
image: grafana/grafana:10.4.2
container_name: topfans-grafana
environment:
GF_SECURITY_ADMIN_PASSWORD: "${GRAFANA_PASSWORD:-admin}"
volumes:
- grafana-data:/var/lib/grafana
- ./grafana-dashboards/:/etc/grafana/dashboards/:ro
ports:
- "3000:3000"
restart: unless-stopped
volumes:
grafana-data:
⚠️ 被压机需 hostname 网络 或
network_mode: host(压测场景简化)。如不接入 prod compose network,exporters 无法连 postgres/redis,需用host: localhost。
- Step 2: 添加 prometheus.yml 配置
创建 backend/scripts/loadgen/monitor/prometheus.yml:
global:
scrape_interval: 5s
scrape_configs:
- job_name: cadvisor
static_configs:
- targets: ["cadvisor:8080"]
- job_name: node
static_configs:
- targets: ["node-exporter:9100"]
- job_name: postgres
static_configs:
- targets: ["postgres-exporter:9187"]
- job_name: redis
static_configs:
- targets: ["redis-exporter:9121"]
- job_name: loadgen
metrics_path: /metrics
static_configs:
- targets: ["host.docker.internal:9091"]
- Step 3: 提交
git add backend/scripts/loadgen/monitor/
git commit -m "feat(monitor): prometheus+grafana stack with 4 exporters"
Task 45: 4 个 Grafana 面板
Files:
-
创建:
backend/scripts/loadgen/monitor/grafana-dashboards/01-host.json(整机) -
创建:
backend/scripts/loadgen/monitor/grafana-dashboards/02-containers.json -
创建:
backend/scripts/loadgen/monitor/grafana-dashboards/03-postgres.json -
创建:
backend/scripts/loadgen/monitor/grafana-dashboards/04-business.json -
Step 1: 创建 01-host.json(整机)
完整 Grafana dashboard JSON 过长(>500 行),本计划用占位,实施时复制 prod 现有 dashboard 或从官方模板导入。
最小可用内容:
{
"title": "Load Test - Host Overview",
"panels": [
{
"title": "CPU Usage",
"type": "graph",
"targets": [
{ "expr": "100 - (avg by(instance)(rate(node_cpu_seconds_total{mode=\"idle\"}[1m])) * 100)" }
]
}
]
}
-
Step 2: 类似创建 02-containers.json / 03-postgres.json / 04-business.json
-
Step 3: 提交占位
git add backend/scripts/loadgen/monitor/grafana-dashboards/
git commit -m "feat(monitor): 4 Grafana dashboard placeholders (host/containers/pg/business)"
Task 46: recover/emergency-stop.sh + restore-from-backup.sh
Files:
-
创建:
backend/scripts/loadgen/recover/emergency-stop.sh -
创建:
backend/scripts/loadgen/recover/restore-from-backup.sh -
Step 1: 写 emergency-stop.sh
#!/bin/bash
# 一键灭火
set -e
echo "🚨 emergency stop"
pkill -9 loadgen 2>/dev/null || true
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
[ -z "$PG_CONTAINER" ] && { echo "❌ 找不到 postgres 容器"; exit 1; }
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans -c "
SELECT pg_terminate_backend(pid) FROM pg_stat_activity
WHERE state != 'idle' AND now() - query_start > interval '10 seconds'
AND usename = 'postgres';
"
cd /opt/topfans/docker
docker-compose -f docker-compose.prod.yml restart
sleep 30
if curl -fs http://localhost:8080/health; then
echo "✅ gateway 恢复"
else
echo "⚠️ gateway 仍未恢复"
exit 1
fi
- Step 2: 写 restore-from-backup.sh
#!/bin/bash
# 用法: bash restore-from-backup.sh /opt/topfans/backups/pre-loadtest-XXXX.sql
set -e
BACKUP_FILE=$1
[ -f "$BACKUP_FILE" ] || { echo "❌ 备份文件不存在: $BACKUP_FILE"; exit 1; }
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
[ -z "$PG_CONTAINER" ] && { echo "❌ 找不到 postgres 容器"; exit 1; }
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
echo "🛑 停应用层..."
docker ps --filter 'name=topfans-' --format '{{.Names}}' \
| grep -v postgres | grep -v redis | grep -v exporter \
| xargs -r docker stop
echo "🗑️ 删库重建..."
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -c "DROP DATABASE topfans;"
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -c "CREATE DATABASE topfans;"
echo "📥 还原 $BACKUP_FILE ..."
docker exec -i -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" psql -U postgres -d topfans < "$BACKUP_FILE"
echo "🚀 启动应用层..."
cd /opt/topfans/docker
docker-compose -f docker-compose.prod.yml --profile prod up -d
sleep 30
curl -fs http://localhost:8080/health && echo "✅ 恢复完成"
- Step 3: 提交
chmod +x backend/scripts/loadgen/recover/*.sh
git add backend/scripts/loadgen/recover/
git commit -m "feat(recover): emergency-stop + restore-from-backup scripts"
Phase 8: 部署 + 预演(Day 1 23:00)
Task 47: 编译 + scp 二进制
Files:
-
无代码,部署命令
-
Step 1: 编译 seed + loadgen
cd backend
GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o bin/seed ./scripts/loadgen/seed/
GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o bin/loadgen ./scripts/loadgen/loadgen/
ls -lh bin/
Expected: 两个 ~10-20MB 的二进制。
- Step 2: scp seed 到 prod
scp backend/bin/seed root@101.132.250.62:/opt/topfans/loadtest/seed
- Step 3: scp loadgen 到压源
scp backend/bin/loadgen -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/loadgen
- Step 4: scp 脚本到 prod
scp backend/scripts/loadgen/recover/*.sh root@101.132.250.62:/opt/topfans/loadtest/recover/
ssh root@101.132.250.62 "chmod +x /opt/topfans/loadtest/recover/*.sh"
- Step 5: 验证可执行
注:Go
flag包默认不响应--help(只响应-h/-help)。验证方式改用-h:
ssh root@101.132.250.62 "/opt/topfans/loadtest/seed -h" 2>&1 | head -20
ssh -i ~/.ssh/loadgen-key root@<压源IP> "/opt/loadgen/loadgen -h" 2>&1 | head -20
Expected: 都显示 help(列出 --jwt-secret, --db-name 等 flags)。
Task 48: 在 prod 上传占位图到 OSS
Files:
-
无,OSS 上传
-
Step 1: 准备占位图(本地)
# 用任意 1x1 PNG 作为占位
python3 -c "
import base64
png = base64.b64decode('iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==')
open('/tmp/loadtest-placeholder.png','wb').write(png)
"
- Step 2: 用项目现有接口拿签名
Run(spec §4.4):
# 调 /api/v1/assets/oss/upload-signature 拿 PUT URL
# PUT 文件到 OSS 的 loadtest/loadtest-placeholder.png
# 拷贝返回的可访问 URL
实施时需读
backend/services/assetService/.../oss.go找到签名接口的精确 path/参数。
- Step 3: 把 URL 写回 assets.go
打开 backend/scripts/loadgen/seed/assets.go,替换:
const LoadtestPlaceholderURL = "<OSS_URL>/loadtest-placeholder.png"
→ 实际 URL。
- Step 4: 重新编译 + 重新部署
cd backend && GOOS=linux GOARCH=amd64 go build -o bin/seed ./scripts/loadgen/seed/
scp bin/seed root@101.132.250.62:/opt/topfans/loadtest/seed
- Step 5: 提交占位 URL 常量
git add backend/scripts/loadgen/seed/assets.go
git commit -m "chore(seed): set LoadtestPlaceholderURL to real OSS URL"
Task 49: 在本地 docker 跑 seed 完整流程
Files:
-
无,本地验证
-
Step 1: 本地起 docker-compose
cd docker
docker-compose -f docker-compose.local.yml up -d
- Step 2: 在本地跑 seed
cd backend
go build -o bin/seed ./scripts/loadgen/seed/
DB_PASSWORD=postgres123 JWT_SECRET=topfans-secret-key-local-dev-only \
./bin/seed --db-name=top-fans --db-host=localhost
Expected:
-
输出 "✓ stars/users/profiles/... seeded"
-
末尾输出 "✅ seed + tokens completed"
-
当前目录生成
users.csv(~150KB) -
Step 3: 验证数据量
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
docker exec -e PGPASSWORD=postgres123 "$PG_CONTAINER" psql -U postgres -d top-fans -c "
SELECT
(SELECT count(*) FROM users WHERE id BETWEEN 30000001 AND 30001000) AS users,
(SELECT count(*) FROM assets WHERE star_id = 999900) AS assets,
(SELECT count(*) FROM fan_profiles WHERE star_id = 999900) AS profiles,
(SELECT count(*) FROM friendships WHERE star_id = 999900) AS friendships;
"
Expected: 1000 / 5000 / 1000 / ~10000
- Step 4: 跑 cleanup 验证安全
DB_PASSWORD=postgres123 ./bin/seed --db-name=top-fans --db-host=localhost --cleanup
# 验证 star_id=999900 数据全清空
Task 50: 预演 dry run(--monitor=off mini baseline)
Files:
-
无,执行命令
-
Step 1: 压源上准备 users.csv
# 把 prod 上生成的 users.csv 复制到压源
scp root@101.132.250.62:/opt/topfans/loadtest/users.csv <压源>:/opt/loadgen/
ssh -i ~/.ssh/loadgen-key root@<压源IP> "head -1 /opt/loadgen/users.csv; wc -l /opt/loadgen/users.csv"
Expected: 1001 行(1 header + 1000 data)。
- Step 2: 在 prod 上启 sample.sh 后台
ssh root@101.132.250.62 "nohup bash /opt/topfans/loadtest/monitor/sample.sh > /tmp/sample.log 2>&1 &"
sleep 5
ssh root@101.132.250.62 "tail -3 /opt/topfans/loadtest/metrics-feed.jsonl"
Expected: 看到 pg_active / disk_free 行。
- Step 3a: 跑 spec baseline(--monitor=off, 1 RPS × 30s × 7 场景,符合 spec §5.3)
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
for s in S1 S2 S3 S4 S5 S6 S7; do
/opt/loadgen/loadgen --cmd=run --scenarios=$s --rps=1 --duration=30s --monitor=off 2>&1 | tail -15
done
EOF
Expected: 每个场景末尾输出 per-minute summary,err 接近 0%,P50/P95/P99 稳定。
- Step 3b: 跑工具链 smoke(--monitor=off, 10 RPS × 30s × S1,验证红线不误触)
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
/opt/loadgen/loadgen --cmd=run --scenarios=S1 --rps=10 --duration=30s --monitor=off 2>&1 | tail -15
EOF
Expected: toolchain 顺畅,无 panic,err 接近 0%。
Step 3a vs 3b 区别:3a 验证 spec baseline(1 RPS),3b 验证工具链在稍高 RPS 下不误触红线。两步都做才完整。
- Step 4: 收尾:停 sample.sh + cleanup
ssh root@101.132.250.62 "pkill -f sample.sh || true"
ssh root@101.132.250.62 "cd /opt/topfans/loadtest && ./seed --cleanup"
- Step 5: 记录 dry-run 结果
把 preflight/dry-run 输出粘贴到 docs/loadtest/round1/seed-validation.log:
echo "=== dry run at $(date) ===" >> docs/loadtest/round1/seed-validation.log
# 把执行 tail 输出贴进去
Phase 9: 第一轮压测窗口(Day 2 02:00-06:00)
窗口严格控制:02:00 开压,05:45 收尾,06:00 全部清理。所有 Task 按时间盒执行,超时立即 emergency-stop + review。
Task 51: 01:30 - Preflight(7 项)
Files:
-
产出:
docs/loadtest/round1/monitoring/preflight.log -
Step 1: 在压源跑 preflight
ssh -i ~/.ssh/loadgen-key root@<压源IP> "/opt/loadgen/loadgen --cmd=preflight --target=http://101.132.250.62:8080 --prod-ssh=root@101.132.250.62 2>&1 | tee /opt/loadgen/preflight.log"
Expected: 7 项全 ✓,末行 "ALL CHECKS PASSED"。
- Step 2: 拉回日志到本地
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/preflight.log docs/loadtest/round1/monitoring/preflight.log
- Step 3: 任一项 fail → 立即停,发 review ticket
不进入 Task 52。
Task 52: 01:55 - pg_dump 备份
Files:
-
产出:
/opt/topfans/backups/pre-loadtest-YYYYMMDD-HHMM.sql(on prod) -
Step 1: 在 prod 上备份
ssh root@101.132.250.62 <<'EOF'
mkdir -p /opt/topfans/backups
export PGPASSWORD="${DB_PASSWORD:-postgres123}"
PG_CONTAINER=$(docker ps --filter 'name=postgres' --format '{{.Names}}' | grep -v exporter | head -1)
docker exec -e PGPASSWORD="$PGPASSWORD" "$PG_CONTAINER" pg_dump -U postgres -d topfans \
-f /opt/topfans/backups/pre-loadtest-$(date +%Y%m%d-%H%M).sql
ls -lh /opt/topfans/backups/pre-loadtest-*.sql
EOF
Expected: 输出文件 > 50MB。
Task 53: 02:00 - prod seed
Files:
-
产出:
users.csv在 prod + 压源 -
Step 1: 在 prod 跑 seed
ssh root@101.132.250.62 <<'EOF'
cd /opt/topfans/loadtest
export DB_PASSWORD=$(grep DB_PASSWORD /opt/topfans/docker/.env.prod | cut -d= -f2)
export JWT_SECRET=$(grep JWT_SECRET /opt/topfans/docker/.env.prod | cut -d= -f2)
./seed --db-name=topfans --jwt-secret="$JWT_SECRET" 2>&1 | tail -20
EOF
Expected: 全部 ✓, "✅ seed + tokens completed"。
- Step 2: 复制 users.csv 到压源
scp root@101.132.250.62:/opt/topfans/loadtest/users.csv <压源>:/opt/loadgen/users.csv
- Step 3: 验证 users.csv 1001 行
ssh -i ~/.ssh/loadgen-key root@<压源IP> "wc -l /opt/loadgen/users.csv"
Expected: 1001。
Task 54: 02:01 - 启动 sample.sh 后台
Files:
-
产出:
metrics-feed.jsonl在 prod -
Step 1: 启动
ssh root@101.132.250.62 <<'EOF'
nohup bash /opt/topfans/loadtest/monitor/sample.sh > /tmp/sample.log 2>&1 &
echo "started pid $!"
sleep 3
tail -2 /opt/topfans/loadtest/metrics-feed.jsonl
EOF
Expected: 看到 pg_active / disk_free 行。
Task 55: 02:02-02:29 - Baseline 阶段(21 min)
Files:
-
产出:
baseline-*.json× 7 -
Step 1: 压源跑 baseline
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
mkdir -p reports/run-$(date +%Y%m%d-%H%M%S)
for s in S1 S2 S3 S4 S5 S6 S7; do
echo "=== $s baseline ==="
/opt/loadgen/loadgen --cmd=run --scenarios=$s --rps=1 --duration=3m --monitor=lite 2>&1 | tee -a reports/baseline-$s.log
echo "" >> reports/baseline-$s.log
sleep 60 # 1min buffer
done
EOF
Expected: 7 个场景各 3min,err 接近 0%,P50/P95/P99 稳定。
Task 56: 02:29-05:35 - Step 阶梯阶段(每场景 12min + 15min buffer)
Files:
-
产出:
step-*.json× 7 -
Step 1: 压源跑 step 阶梯(每场景用对应 spec §5.3 表的 RPS 阶梯)
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
# spec §5.3 每场景独立 RPS 阶梯(每阶 2min)
declare -A SCHEDULES=(
[S1]="2,5,10,15,25,40"
[S2]="20,50,100,200,400,700"
[S3]="5,10,20,40,80,150"
[S4]="5,10,20,30,50,80"
[S5]="2,5,10,20,35,60"
[S6]="20,50,100,200,400,700"
[S7]="5,10,20,40,80,150"
)
for s in S1 S2 S3 S4 S5 S6 S7; do
schedule="${SCHEDULES[$s]}"
echo "=== $s step (RPS=$schedule) ==="
/opt/loadgen/loadgen --cmd=run --scenarios=$s --stage=step \
--step-schedule="$schedule" --duration=2m --inter-scenario-pause=0 \
2>&1 | tee -a reports/step-$s.log
echo "" >> reports/step-$s.log
sleep 900 # 15min buffer
done
EOF
预期时间盒(交错执行,总 ~174 min):
- 02:29 S1 阶梯 12min → 02:41-03:41 buffer
- 03:41 S2 阶梯 12min → 03:53-04:53 buffer
- ... 共 7 场景交错,最后场景 05:23 结束 + 12min buffer = 05:35
Task 57: 05:35-05:40 - 停 monitor
- Step 1: 停 sample.sh + 收尾
ssh root@101.132.250.62 "pkill -f sample.sh || true"
ssh root@101.132.250.62 "ps aux | grep sample.sh | grep -v grep" # 确认无残留
- Step 2: 拉回 metrics-feed.jsonl
scp root@101.132.250.62:/opt/topfans/loadtest/metrics-feed.jsonl docs/loadtest/round1/monitoring/sample-$(date +%Y%m%d-%H%M).log
Task 58: 05:40-05:50 - cleanup + verify
- Step 1: cleanup
ssh root@101.132.250.62 "cd /opt/topfans/loadtest && ./seed --cleanup"
- Step 2: verify
ssh -i ~/.ssh/loadgen-key root@<压源IP> "/opt/loadgen/loadgen --cmd=verify --prod-ssh=root@101.132.250.62 2>&1 | tee /opt/loadgen/verify.log"
- Step 3: 拉回 verify.log
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/verify.log docs/loadtest/round1/monitoring/verify.log
- Step 4: 拉回 reports/
scp -r -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/reports/* docs/loadtest/round1/raw-data/
- Step 5: 释放压源 ECS(如不再用)
暂不释放 — 第二轮压测还要用。
Phase 10: 报告与决策(Day 3)
Task 59: 生成 report-round1.md
前提:Task 20 的
runReport占位需要先实现(否则--cmd=report子命令是空函数)。
- Step 0: 实现 runReport(在 main.go)
import (
"github.com/topfans/backend/scripts/loadgen/loadgen/reporter"
)
func runReport(inputDir, output string) error {
// 1. 收集 reports/run-*/ 下的 *.json
var scenarios []reporter.ScenarioReport
matches, _ := filepath.Glob(filepath.Join(inputDir, "run-*", "*.json"))
for _, m := range matches {
data, err := os.ReadFile(m)
if err != nil { continue }
var sr reporter.ScenarioReport
if err := json.Unmarshal(data, &sr); err != nil { continue }
scenarios = append(scenarios, sr)
}
// 2. 生成 baseline.csv
baselinePath := filepath.Join(inputDir, "baseline.csv")
if err := reporter.WriteBaselineCSV(baselinePath, scenarios); err != nil {
return fmt.Errorf("write baseline: %w", err)
}
// 3. 生成 SVG plots(每场景一份)
for _, s := range scenarios {
plotPath := filepath.Join(inputDir, "plot-"+s.ID+".svg")
if err := reporter.PlotRPSLatencyError(s.ID, samplesFromStages(s.Stages), plotPath); err != nil {
log.Printf("plot %s failed: %v", s.ID, err)
}
}
// 4. 生成 markdown 报告
return reporter.GenerateMarkdown(output, scenarios)
}
func samplesFromStages(stages []reporter.StageReport) []reporter.Sample {
out := make([]reporter.Sample, len(stages))
for i, s := range stages {
out[i] = reporter.Sample{RPS: float64(s.RPS), P99Ms: s.P99Ms, ErrorRate: s.ErrorRate}
}
return out
}
实施时把
import段补全encoding/json,path/filepath,github.com/topfans/backend/scripts/loadgen/loadgen/reporter。
- Step 1: 在压源跑 reporter
ssh -i ~/.ssh/loadgen-key root@<压源IP> <<'EOF'
cd /opt/loadgen
ls -1 reports/
/opt/loadgen/loadgen --cmd=report --input=reports/ --output=report-round1.md
cat report-round1.md | head -100
EOF
- Step 2: 拉回报告
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/report-round1.md docs/loadtest/round1/report-round1.md
- Step 3: 拉回 SVG 图
scp -i ~/.ssh/loadgen-key root@<压源IP>:/opt/loadgen/reports/plot-*.svg docs/loadtest/round1/raw-data/
Task 60: 审查会议(30-60 min)
- Step 1: 阅读 report-round1.md
打开 docs/loadtest/round1/report-round1.md,识别:
-
每场景拐点 RPS
-
主要瓶颈(S1 bcrypt? S5 PG aggregation? S6 Redis cache miss?)
-
哪些可配置修(改 bcrypt cost、max_connections)
-
哪些需改代码(加索引、改 N+1)
-
哪些接受现状
-
Step 2: 写 review notes
创建 docs/loadtest/round1/review-notes.md:
# 第一轮压测 Review
> **日期**: YYYY-MM-DD
> **参与者**: <name1>, <name2>
## 拐点小结
| 场景 | 拐点 RPS | 目标 | 差距 | 行动 |
|---|---|---|---|---|
## 主要瓶颈(按优先级)
1. **<瓶颈 A>**: <场景> P99=Xms, 期望 Yms
2. **<瓶颈 B>**: ...
## 决策
- [ ] 修复项 1: <描述> + owner + deadline
- [ ] 修复项 2: ...
- [ ] 接受现状: <场景 + 原因>
Task 61: 决定是否进入第二轮
- Step 1: 判断触发条件
| 条件 | 第二轮? |
|---|---|
| 拐点已满足业务预期 | ❌ 跳过 |
| 改了代码/配置 | ✅ 跑改动影响场景 |
| 出现不可解释的异常 | ✅ 复测 |
- Step 2: 写 decision.md
创建 docs/loadtest/round1/decision.md:
# 第一轮后决策
> **日期**: YYYY-MM-DD
## 决定
- [✅ 触发第二轮 / ❌ 跳过第二轮]
## 理由
- <原因 1>
- <原因 2>
## 第二轮范围
- 场景: S<X>, S<Y>
- 改动: <改了什么>
- 窗口: 第二轮 02:00-06:00
Task 62: 第二轮前置:JWT 重签(若距离第一轮 > 7 天)
spec §6.3 C1:JWT 7 天过期,第二轮若距第一轮超过 7 天,需在开压前 30 min 重签。
- Step 1: 判断是否需要重签
DAYS_SINCE_ROUND1=$(( ( $(date +%s) - $(stat -c %Y docs/loadtest/round1/report-round1.md) ) / 86400 ))
if [ "$DAYS_SINCE_ROUND1" -gt 7 ]; then
echo "⚠️ 距离第一轮已 $DAYS_SINCE_ROUND1 天,需重签 JWT"
fi
- Step 2: 在 prod 上重签 token
ssh root@101.132.250.62 <<'EOF'
cd /opt/topfans/loadtest
export DB_PASSWORD=$(grep DB_PASSWORD /opt/topfans/docker/.env.prod | cut -d= -f2)
export JWT_SECRET=$(grep JWT_SECRET /opt/topfans/docker/.env.prod | cut -d= -f2)
./seed --reset-tokens --jwt-secret="$JWT_SECRET"
EOF
- Step 3: 同步 users.csv 到压源
scp root@101.132.250.62:/opt/topfans/loadtest/users.csv <压源>:/opt/loadgen/users.csv
第二轮的完整实施计划应新建
docs/superpowers/plans/YYYY-MM-DD-load-testing-round2.md(不在本计划范围)。
完成标准
整个计划完成的判据:
- Phase 1-8 全部 Task 完成
- Phase 9 第一轮压测窗口跑通,产出
report-round1.md - Phase 10 review + decision + (可选) Task 62 JWT 重签完成
docs/loadtest/round1/目录完整(prod-vs-local-schema-diff.md, preflight.log, sample-*.log, baseline/step 日志, report-round1.md, review-notes.md, decision.md)- 所有代码提交,无未提交的 .bak / .log
- CLAUDE.md 序列重置规范在 seed/sequences.go 中体现
- spec §7.3 6 维红线(R1-R6)全部 wire 到 CircuitBreaker
- spec §5.3 RPS 阶梯通过
--step-schedule标志注入(非硬编码) - S2-S7 场景在实施前 grep handler 源码确认路径(参考计划中表格)
风险与回退
| 风险 | 触发条件 | 回退 |
|---|---|---|
| seed 在 prod 失败(权限/schema) | Phase 4 Task 49 联调失败 | 走 dry-run 在本地复现,先修代码再上 prod |
| loadgen 编译失败 | go build 任何错误 |
修代码,rebuild,重 scp |
| preflight 某项 fail | Phase 9 Task 51 | 不开压,第二天 review,可能窗口改期 |
| 压测触发红线 | Phase 9 任一阶段 | emergency-stop.sh,第二天 review |
| 数据污染真实数据 | verify 检测到 | restore-from-backup.sh + review seed cleanup |
| 阿里云快照超时 | Task 52 备份失败 | 重试,仍失败 → 推迟整个窗口 |
备注
- 所有命令已在 Day 1 23:00 前在本地 docker 联调通过
- SSH 配置 ~/.ssh/ 在开工前确认 loadgen-key 存在
- DB_PASSWORD / JWT_SECRET 来自 prod
/opt/topfans/docker/.env.prod - 第二轮如触发,新建 plan 文档
docs/superpowers/plans/YYYY-MM-DD-load-testing-round2.md