23 KiB
用户贡献连击聚合推送方案
1. 概述
1.1 背景
应援活动页面(support-activity/index.vue)实时展示用户的贡献动态(ContributionList.vue)。同一个用户在短时间内连续多次送同一个道具时,应当合并为一条 UI 项(数量累加),而不是显示 N 条独立的"道具×1"。
1.2 当前问题(实测)
用户连续点击"道具A" 3 次,前端实际显示:
小明 送烟花 ×1 ← 第 1 条
小明 送烟花 ×2 ← 第 2 条
小明 送烟花 ×3 ← 第 3 条
期望显示:
小明 送烟花 ×3 ← 唯一一条
1.3 根因
后端已有 Redis 连击计数机制(combo:{userID}:{itemType},TTL 3 秒,INCR),但只把累计值贴在每条 record 的 combo_count 字段上,并没有真正合并推送:
PurchaseItem每次购买都会:- 写一行
activity_contributions(DB 行 ID 单调递增) INCR combo计数器- 立即
Publish一条独立的ContributionRecord到 Redis Channelact:{activityId}:contributions
- 写一行
- 前端
useContributionRealtime.js:31-34收到 WS 推送后直接[...records, record].slice(-5)追加 - 模板
ContributionList.vue:43-46用combo_count > 1 ? combo_count : quantity显示:第 1 条 combo_count=1 走 quantity 分支显示 ×1,第 2 条 combo_count=2 显示 ×2 ……
也就是说 Redis 聚合是"半个聚合":字段存在但推送层和展示层都没合并。
1.4 目标
| 项目 | 目标 |
|---|---|
| 用户感知 | 同一用户 3 秒内送同一道具 N 次 → 前端只看到 1 条 UI 项,数量 = N |
| 后端约束 | DB 仍然每条购买写 1 行(排行榜/统计需要粒度) |
| 推送约束 | WS 在 3 秒窗口内只推送 1 条合并后的 record |
| 前端约束 | 模板与现有轮询/WS 接收逻辑改动最小 |
| 可靠性 | 多副本 / 进程重启不丢推送 |
2. 设计总览
2.1 数据流
用户第 1 次购买 ──┐
├─→ DB: 各写 1 行 contribution(共 N 行独立行)
用户第 2 次购买 ──┤
├─→ Redis Stream + Hash: 累加 aggregate 字段(quantity、首次信息)
用户第 N 次购买 ──┘
↓
(3 秒窗口结束) ──→ Worker 从 Stream 取出到期条目 ──→ Publish 1 条合并 record ──→ 前端追加 1 条 UI 项
2.2 设计选型
延迟推送的 3 种候选方案对比:
| 方案 | 可靠性 | 实现成本 | 适用场景 |
|---|---|---|---|
A. goroutine + time.Sleep |
⚠ 中(重启丢推送) | ⭐ 最低 | 单副本、低 QPS |
| B. Redis Stream 延迟队列 | ✅ 高(Stream 持久化、重启不丢、消费者组支持多副本) | ⭐⭐ 中 | 本项目采用:多副本、生产级 |
| C. Redis Keyspace Notifications | ⚠ 中(订阅断开期间事件丢失) | ⭐⭐ 中 | Redis 由运维统一管理 |
采用 B:项目已多副本 / 容器化部署(CLAUDE.md 提到 docker 目录、Kitex 微服务),goroutine 方案重启即丢推送,不可接受。
3. Redis Key 设计
| Key | 类型 | TTL | 用途 |
|---|---|---|---|
combo:stream:contributions |
Stream | 永久(MAXLEN 限制) | 延迟推送任务队列 |
combo:agg:{userID}:{itemType} |
Hash | 5s | 同一 (user, itemType) 在 3 秒窗口内的累计信息 |
combo:agg:lock:{userID}:{itemType} |
String "1" |
3s | "是否已写入 Stream",用 SET NX EX 3 防重复入队 |
Stream entry 字段:
activity_id user_id item_type first_id first_created_at expire_at_ms
expire_at_ms= 入队时now + 3000,worker 取出后判断now >= expire_at_ms才推送MAXLEN ~ 100000(仅作兜底,正常负载远低于此)
agg Hash 字段:
activity_id user_id star_id item_id item_name item_icon
nickname avatar_url quantity first_id first_created_at
quantity用HINCRBY累加- 其余字段首次写入后保持不变
4. 后端改造
4.1 文件改动清单
| 文件 | 改动 |
|---|---|
services/activityService/service/activity_service.go |
PurchaseItem 改造;新增 enqueueAggregatedContribution、consumeComboStream |
services/activityService/service/combo_worker.go |
新增:Stream worker(独立 goroutine,main 启动) |
services/activityService/main.go |
启动 worker goroutine |
services/activityService/service/activity_service.go::GetLatestContributions |
combo_count 来源改为 HGET combo:agg quantity,聚合已过期时回退为 1 |
4.2 PurchaseItem 改造点(activity_service.go:432-473)
// === PurchaseItem 改造 ===
// 1. DB 写入不变
err = s.activityRepo.CreateContribution(contribution)
// 2. 删除原来的:incrementComboCount + 立即 Publish
// 3. 新增:写入聚合 Hash + 入队 Stream
if s.redisClient != nil {
aggKey := fmt.Sprintf("combo:agg:%d:%s", userID, req.ItemType)
lockKey := fmt.Sprintf("combo:agg:lock:%d:%s", userID, req.ItemType)
// (a) 累加 quantity + 首次写入其他字段
// 注:先全部写入,最后再 Expire —— 避免 hash 刚创建就被先 EXPIRE 抹掉 TTL
s.redisClient.HIncrBy(ctx, aggKey, "quantity", int64(req.Quantity))
// HSetNX 保护非 quantity 字段不覆盖
s.redisClient.HSetNX(ctx, aggKey, "activity_id", req.ActivityId)
s.redisClient.HSetNX(ctx, aggKey, "user_id", userID)
s.redisClient.HSetNX(ctx, aggKey, "star_id", req.StarId)
s.redisClient.HSetNX(ctx, aggKey, "item_id", item.ID)
s.redisClient.HSetNX(ctx, aggKey, "item_name", itemName)
s.redisClient.HSetNX(ctx, aggKey, "item_icon", itemIcon)
s.redisClient.HSetNX(ctx, aggKey, "nickname", nickname)
s.redisClient.HSetNX(ctx, aggKey, "avatar_url", avatarURL)
s.redisClient.HSetNX(ctx, aggKey, "first_id", contribution.ID)
s.redisClient.HSetNX(ctx, aggKey, "first_created_at", contribution.CreatedAt)
// 所有写入完成后再续 TTL,确保 3 秒窗口内 hash 始终存在
s.redisClient.Expire(ctx, aggKey, 5*time.Second)
// (b) 仅首次入队 Stream(SET NX EX 3 防重复)
isFirst, _ := s.redisClient.SetNX(ctx, lockKey, "1", 3*time.Second).Result()
if isFirst {
expireAt := time.Now().Add(3 * time.Second).UnixMilli()
s.redisClient.XAdd(ctx, &redis.XAddArgs{
Stream: "combo:stream:contributions",
MaxLen: 100000,
Approx: true,
Values: map[string]interface{}{
"activity_id": req.ActivityId,
"user_id": userID,
"item_type": req.ItemType,
"first_id": contribution.ID,
"first_created_at": contribution.CreatedAt,
"expire_at_ms": expireAt,
},
})
}
}
4.3 新增:combo_worker.go(Stream 消费者)
package service
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
"github.com/zerosaturation/go-topsdk/util/logger"
pb "topfans/pkg/proto/activity"
)
// StartComboStreamWorker 启动 Stream 消费者(main 中调用一次)
func (s *activityService) StartComboStreamWorker(ctx context.Context) {
streamKey := "combo:stream:contributions"
consumerGroup := "combo-publishers"
consumerName := fmt.Sprintf("worker-%d", time.Now().UnixNano()%100000)
// 创建消费者组(已存在则忽略 BUSYGROUP)
_ = s.redisClient.XGroupCreateMkStream(ctx, streamKey, consumerGroup, "0").Err()
go func() {
for {
select {
case <-ctx.Done():
return
default:
}
// 阻塞读取新条目(最多阻塞 1s)
streams, err := s.redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: consumerGroup,
Consumer: consumerName,
Streams: []string{streamKey, ">"},
Count: 10,
Block: 1 * time.Second,
}).Result()
if err == redis.Nil {
continue
}
if err != nil {
logger.Logger.Warn("XReadGroup error", zap.Error(err))
time.Sleep(time.Second)
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
s.processComboEntry(ctx, msg)
// 确认消费
s.redisClient.XAck(ctx, streamKey, consumerGroup, msg.ID)
}
}
}
}()
}
// processComboEntry 处理一条 Stream 条目:
// - 等到 expire_at_ms 才推送(保证窗口结束)
// - 读取 agg Hash 推送合并 record
func (s *activityService) processComboEntry(ctx context.Context, msg redis.XMessage) {
expireAt, _ := strconv.ParseInt(msg.Values["expire_at_ms"].(string), 10, 64)
now := time.Now().UnixMilli()
if now < expireAt {
// 还没到窗口结束,挂起一段时间后再处理(用 XClaim 重投递或本地 sleep)
time.Sleep(time.Duration(expireAt-now) * time.Millisecond)
}
userID, _ := strconv.ParseInt(msg.Values["user_id"].(string), 10, 64)
activityID, _ := strconv.ParseInt(msg.Values["activity_id"].(string), 10, 64)
itemType := msg.Values["item_type"].(string)
aggKey := fmt.Sprintf("combo:agg:%d:%s", userID, itemType)
lockKey := fmt.Sprintf("combo:agg:lock:%d:%s", userID, itemType)
fields, err := s.redisClient.HGetAll(ctx, aggKey).Result()
if err != nil || len(fields) == 0 {
return
}
quantity, _ := strconv.ParseInt(fields["quantity"], 10, 32)
firstID, _ := strconv.ParseInt(fields["first_id"], 10, 64)
firstCreatedAt, _ := strconv.ParseInt(fields["first_created_at"], 10, 64)
record := &pb.ContributionRecord{
Id: firstID,
UserId: userID,
Nickname: fields["nickname"],
AvatarUrl: fields["avatar_url"],
StarId: parseInt64(fields["star_id"]),
ItemId: parseInt64(fields["item_id"]),
ItemType: itemType,
ItemName: fields["item_name"],
ItemIcon: fields["item_icon"],
Quantity: int32(quantity),
ComboCount: int32(quantity),
CreatedAt: firstCreatedAt,
}
payload, _ := json.Marshal(map[string]interface{}{
"activity_id": activityID,
"type": "contributions_response",
"record": record,
})
s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", activityID), payload)
s.redisClient.Del(ctx, aggKey, lockKey)
}
窗口等待策略:worker 取出条目后用本地
time.Sleep等待expire_at_ms再推送。简单可靠;如果 worker 重启,条目会通过消费者组的 pending list 重新投递,重启期间不会丢推送。
4.4 main.go 改动
// services/activityService/main.go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 在 NewActivityService 之后启动 worker
activityService := service.NewActivityService(...)
activityService.StartComboStreamWorker(ctx)
4.5 删除旧代码
删除 activity_service.go::incrementComboCount 和 getComboCount(已被新方案取代),但保留 comboKey 以备工具方法调用。
4.6 GetLatestContributions 改造(轮询路径合并)
GetLatestContributions 当前从 SQL 返回每条独立 contribution 行。3 秒窗口内同一个 user+item 的 N 行必须合并,否则前端轮询路径会出现"道具×1、×2、×3"。
改造点:在 SQL 查询后、组装响应前,对窗口内的行做内存合并:
// GetLatestContributions 末尾,组装 records 之前
// 1. SQL 返回按 created_at DESC 排序的独立行
// 2. 按 (user_id, item_id) 分组,3 秒内累加 quantity、id 取最早的(first_id)
const COMBO_WINDOW_MS = 3000
merged := make([]*pb.ContributionRecord, 0, len(contributions))
i := 0
for i < len(contributions) {
first := contributions[i]
j := i + 1
var totalQty int32 = first.Quantity
for j < len(contributions) &&
contributions[j].UserID == first.UserID &&
contributions[j].ItemID == first.ItemID &&
contributions[j].CreatedAt-first.CreatedAt <= COMBO_WINDOW_MS {
totalQty += contributions[j].Quantity
j++
}
records[i] = &pb.ContributionRecord{
Id: first.ID, // 用 first_id,前端按 id 去重
...
Quantity: totalQty,
ComboCount: totalQty,
CreatedAt: first.CreatedAt,
}
i = j
}
关键不变量:合并后的 record.id = 同组最早一行 DB 行的 ID,与 WS 推送保持一致 —— 前端按 id 去重时不会把"轮询拉到的一行"与"WS 推过来的合并 record"当成两条不同的 UI 项。
5. 前端处理
5.1 改动范围
| 文件 | 改动 |
|---|---|
frontend/pages/support-activity/composables/useContributionRealtime.js |
抽取 mergeComboRecords 工具函数;WS 与轮询两条路径共用,按 (user_id, item_id) + 3 秒窗口合并 |
frontend/pages/support-activity/composables/useContributionPolling.js |
调用 mergeComboRecords 后再写入 records |
frontend/pages/support-activity/components/ContributionList.vue |
模板去掉 combo_count 兜底分支,直接用 quantity |
5.2 新增:composables/useContributionRealtime.js::mergeComboRecords
/**
* 将一组 contribution record 按 (user_id, item_id) + 时间窗口合并
* - 同一 user+item 在 COMBO_WINDOW_MS 内多条 → 合并为 1 条
* - 合并后 id = 同组最早 id(与后端 WS 推送的 first_id 一致)
* - quantity = 同组 quantity 之和
* - 顺序保持输入顺序(最新在前)
*/
const COMBO_WINDOW_MS = 3000
export function mergeComboRecords(records) {
if (!Array.isArray(records) || records.length === 0) return records
// 已有 records 索引(按 user_id+item_id 找最新一条)
const indexByKey = new Map() // key = `${user_id}:${item_id}` -> records index
const result = [...records]
// 倒序遍历:新→旧,新的覆盖旧的(聚合窗口内则合并到最新一条)
for (let i = result.length - 1; i >= 0; i--) {
const r = result[i]
const key = `${r.user_id}:${r.item_id}`
const existingIdx = indexByKey.get(key)
if (existingIdx !== undefined) {
const existing = result[existingIdx]
// 两者时间差在 3 秒内 → 合并
if (Math.abs(existing.created_at - r.created_at) <= COMBO_WINDOW_MS) {
existing.quantity += r.quantity
existing.combo_count = existing.quantity
// id 取较早的(first_id)
if (r.id < existing.id) existing.id = r.id
// 删除被合并的旧条目
result.splice(i, 1)
continue
}
}
indexByKey.set(key, i)
}
return result
}
5.3 WS 与轮询两条路径都调用合并
WS 路径(useContributionRealtime.js::onWsMessage):
function onWsMessage(payload) {
if (!payload || Number(payload.activity_id) !== Number(activityId.value)) return
if (!payload.record) return
const record = payload.record
if (record.id > highestIdRef()) {
const merged = mergeComboRecords([...records.value, record])
records.value = merged.slice(-MAX_RECORDS)
}
}
轮询路径(useContributionPolling.js::fetchLatest):
// 在 [...newRecords, ...records.value].slice(0, MAX_RECORDS) 后增加 mergeComboRecords
if (isNew) {
records.value.forEach(resetRecordTimer)
const combined = mergeComboRecords([...newRecords, ...records.value])
records.value = combined.slice(0, MAX_RECORDS)
newRecords.forEach(resetRecordTimer)
latestTimestamp = firstRecord.created_at
latestId = firstRecord.id
}
5.4 模板调整(ContributionList.vue:43-46)
Before:
<text
class="item-count"
:class="getCountSizeClass(record.combo_count > 1 ? record.combo_count : record.quantity)"
>{{ record.combo_count > 1 ? record.combo_count : record.quantity }}</text>
After:
<text
class="item-count"
:class="getCountSizeClass(record.quantity)"
>{{ record.quantity }}</text>
后端合并后
quantity === combo_count,模板可直接用quantity;getCountSizeClass函数保留。useContributionRealtime.js:31-34的record.id > highestIdRef()去重保护保留,作为幂等兜底。
6. 错误处理
| 场景 | 处理 |
|---|---|
| Redis 不可用 | 降级为每条 Purchase 立即 Publish 单条 record(保留旧行为) |
| Worker 进程崩溃 / 重启 | 消费者组 pending list 在重启时 XAUTOCLAIM 重新投递,不丢推送 |
| 用户在 3 秒末尾继续点击(marker 已被 worker 取出但尚未 Publish) | 此时 hash 的 quantity 已累加;worker 取出时 sleep 到 expire_at_ms 后会读取到最新 quantity,合并推送完整结果 |
| HSetNX 之前 hash 已被 worker 删除(极小窗口竞态) | processComboEntry 读到空 hash → 直接 return;下一条 purchase 会重新入队 |
| 同一 user 不同 item_type | 不同 key,互不影响 |
| 跨 activity | act:{activityId}:contributions channel 已带 activity_id,前端按 activity_id 过滤 |
7. 性能与一致性
7.1 性能指标
- DB 写入:每次 Purchase 1 行(与现状一致)
- Redis 操作:1 次 HINCRBY + N 次 HSETNX + 1 次 SETNX + 1 次 XADD ≈ 4-6 次 Redis 调用(Pipeline 可优化到 1 次)
- WS 推送:3 秒内 N 次 Purchase → 1 条合并 record(与现状 N 条相比降低 N 倍推送频率)
- Worker 开销:单 goroutine,每秒 XReadGroup 一次,常驻内存 < 1MB
7.2 排行榜一致性
activity_contributions 表每条独立行写入不变:
GetContributionRanking(activity_service.go:811-919)按 sum(quantity) 聚合- 3 秒窗口合并只在推送层做,不影响 DB 聚合结果
8. 影响范围
8.1 后端需要修改的文件
-
services/activityService/service/activity_service.goPurchaseItem替换 combo 计数与推送逻辑(activity_service.go:432-473)BatchPurchaseItem同样改造(activity_service.go:726-755)GetLatestContributions增加内存合并:按(user_id, item_id)+ 3 秒窗口累加 quantity(activity_service.go:1017-1104)- 删除
incrementComboCount、getComboCount(保留comboKey)
-
services/activityService/service/combo_worker.go(新增)StartComboStreamWorkerprocessComboEntry
-
services/activityService/main.go- 启动 worker goroutine
8.2 前端需要修改的文件
-
frontend/pages/support-activity/composables/useContributionRealtime.js- 新增
mergeComboRecords工具函数 onWsMessage调用 mergeComboRecords
- 新增
-
frontend/pages/support-activity/composables/useContributionPolling.js- 引入
mergeComboRecords fetchLatest在合并新记录后调用 mergeComboRecords
- 引入
-
frontend/pages/support-activity/components/ContributionList.vue- 模板去掉
combo_count兜底分支(line 43-46)
- 模板去掉
8.3 不需要修改的文件
gateway/socket/activity_socket.goRedis PubSub 订阅(payload 格式不变)gateway/controller/activity_controller.goHTTP 转换(响应字段不变)- Repository 层 SQL
- 数据库 schema
9. 测试验证
9.1 单元测试
PurchaseItem在 Redis 不可用时降级为立即推送(与旧行为一致)processComboEntry在 hash 为空时跳过- XAdd MAXLEN 不会无限增长
9.2 集成测试
| 场景 | 预期 |
|---|---|
| 单次购买 | 1 条 DB 行 + 1 条 WS 推送,quantity=1 |
| 1 秒内连买 3 次同一道具 | 3 条 DB 行 + 1 条合并 WS 推送,quantity=3,record.id = 第 1 条的 id |
| 1 秒内连买 3 次不同道具 | 3 条 DB 行 + 3 条独立 WS 推送(每条 quantity=1) |
| 跨 3 秒购买 | 第 1 段合并成 1 条推送;3 秒后再来 1 次 → 第 2 段合并成 1 条推送 |
| Redis 故障 | 降级为立即推送(与旧行为一致) |
| Worker 重启 | pending list 重新投递,不丢推送 |
| 多副本部署 | SETNX 保证仅 1 个副本入队;消费者组保证推送幂等 |
9.3 前端验证
- 快速点 3 次同一道具 → UI 仅 1 条新项,数量从 1 累加到 3
- 切换道具 → 不同 UI 项独立显示
- 跨 3 秒 → 出现 2 条独立 UI 项
10. 风险与缓解
| 风险 | 缓解 |
|---|---|
notify-keyspace-events 未开启导致 HSETNX 监听不到? |
不依赖 Keyspace Notifications,仅用 Stream |
XReadGroup Block 阻塞 → worker 协程泄漏 |
用 ctx.Done 退出 + main 关闭时 cancel |
| Stream 内存增长 | MAXLEN ~ 100000 截断 |
| HSetNX 与 HIncrBy 竞态导致 quantity 丢失? | HSetNX 在 IncrBy 之前,pipeline 内顺序保证 |
| worker 取条目后 sleep 期间崩溃 → 条目留在 pending | XAUTOCLAIM 重新认领(消费者组自带机制) |
combo_count 字段外部消费方 |
仅前端展示用,前端改造一并删除兜底分支 |
11. 实施步骤
- 在 Redis 创建消费者组
combo-publishers(运维一次性脚本,或在 worker 启动时 XGroupCreateMkStream) - 新增
combo_worker.go,实现StartComboStreamWorker+processComboEntry - 在
main.go启动 worker - 改造
PurchaseItem与BatchPurchaseItem:删除incrementComboCount/getComboCount;改为 agg hash + Stream 入队 - 改造
GetLatestContributions:增加内存合并逻辑(按 user_id+item_id + 3 秒窗口累加 quantity) - 前端新增
mergeComboRecords工具函数;WS 与轮询两条路径共用 - 前端模板:去掉
combo_count兜底分支 - 集成测试:单测、连击 3 次推送合并、轮询 API 合并、多副本部署验证
- 监控告警:worker pending list 长度告警、Stream 长度告警
12. 后续优化(可选)
- Pipeline 优化:把 HIncrBy + HSetNX + SetNX + XAdd 合并到一个 Pipeline
- 批量推送:worker 一次处理多条同 user 条目(用 consumer group 多个 consumer 并行)
- Lua 脚本:原子化"累加 + 入队",省 4-6 次 Redis 调用为 1 次