31 KiB
用户贡献实时显示方案
1. 概述
1.1 需求描述
在活动页面实时展示所有用户的贡献动态,包括:
- 贡献者头像 / 昵称
- 道具图标 / 名称
- 贡献数量
1.2 展示形式
- 列表形式:最新贡献显示在顶部
- 生命周期:页面可见时显示,不可见时暂停
- 消失逻辑:每条记录 5 秒无新数据则逐条淡出消失
1.3 技术方案
- 后端:Gateway 层新增 HTTP API,ActivityService 处理业务逻辑
- 前端:轮询 composable 与列表组件
- 数据库:
activity_contributions表已存在
2. 数据库设计
2.1 ER 关系
┌─────────────────┐ ┌──────────────────────────┐
│ activities │ │ activity_contributions │
│─────────────────│ │───────────────────────────│
│ id │──────< │ activity_id │
│ star_id │ │ user_id │
│ ... │ │ star_id │
└─────────────────┘ │ item_id │
│ item_type │
│ quantity │
│ contribution_points │
│ created_at(毫秒时间戳) │
└──────────────────────────┘
│
│ LEFT JOIN
▼
┌─────────────────┐
│ users │
│─────────────────│
│ id │
│ nickname │
│ avatar_url │
└─────────────────┘
│
│ LEFT JOIN
▼
┌─────────────────┐
│ activity_items │
│─────────────────│
│ id │
│ item_name │
│ icon_url │
└─────────────────┘
2.2 表结构
activity_contributions(活动贡献记录表)
| 字段 | 类型 | 约束 | 说明 |
|---|---|---|---|
| id | bigint | PK, AUTO_INCREMENT | 贡献记录 ID |
| activity_id | bigint | NOT NULL, INDEX | 关联活动 ID |
| user_id | bigint | NOT NULL | 贡献者用户 ID |
| star_id | bigint | NOT NULL | 明星 ID |
| item_id | bigint | NOT NULL | 道具 ID |
| item_type | varchar(50) | NOT NULL | 道具类型,如 firework、megaphone |
| quantity | int | DEFAULT 1 | 本次贡献数量 |
| crystal_spent | bigint | NOT NULL | 花费的水晶数量 |
| contribution_points | bigint | NOT NULL | 贡献值 |
| created_at | bigint | NOT NULL, INDEX | 创建时间(毫秒时间戳) |
索引设计
| 索引名 | 索引字段 | 类型 | 用途 |
|---|---|---|---|
| idx_activity_id | (activity_id) | BTREE | 按活动 ID 筛选 |
| idx_activity_created | (activity_id, created_at DESC, id DESC) | BTREE | 按活动时间降序+ID降序查询,支持 since_timestamp + since_id 轮询 |
created_at DESC索引可加速WHERE activity_id = ? AND created_at > sinceTimestamp查询。
2.3 写入时机
purchaseItem 成功后,同步写入此表。
连击计数器(Redis):
用户每次点击购买道具时,更新 Redis 连击计数器:
- Key:
combo:{user_id}:{item_type} - Value:当前连击数
- TTL:3 秒(无新点击则过期)
用户点击购买 → Redis INCR combo:{user_id}:{item_type} + EXPIRE 3秒
2.4 查询时获取连击数
前端轮询时,后端从 Redis 获取当前连击数,合并到返回结果中:
GET combo:{user_id}:{item_type} → 返回当前计数
连击逻辑:
- 同一用户、同一道具:3 秒内的多次点击累加显示
- 不同道具:独立计数,互不影响
3. 后端 API 设计
3.1 接口概述
| 接口 | 方法 | 路径 | 说明 |
|---|---|---|---|
| 获取最新贡献记录 | GET | /api/v1/activity/:activityId/contributions/latest |
实时轮询用 |
3.2 获取最新贡献记录接口
请求
GET /api/v1/activity/:activityId/contributions/latest
Query 参数
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| since_timestamp | int64 | 否 | 0 | 起始时间戳(毫秒),获取该时间之后的最新记录 |
| since_id | int64 | 否 | 0 | 起始记录 ID(用于同时间戳内的二次筛选) |
| limit | int32 | 否 | 1 | 每次拉取条数,最大 5 |
响应(成功)
{
"code": 0,
"message": "success",
"data": {
"records": [
{
"id": 12345,
"activity_id": 1001,
"user_id": 888,
"user_nickname": "小明",
"user_avatar": "https://example.com/avatar.jpg",
"item_type": "firework",
"item_name": "烟花",
"item_icon": "https://example.com/firework.png",
"quantity": 3,
"contribution_points": 3,
"combo_count": 2,
"created_at": 1747133400000
}
],
"latest_timestamp": 1747133400000,
"latest_id": 12345
}
}
字段说明:
| 字段 | 说明 |
|---|---|
| quantity | 本次 contribution 的实际贡献数量 |
| combo_count | 从 Redis 获取的当前连击数(3秒内同用户同道具的累计点击) |
响应(失败)
{
"code": 40401,
"message": "活动不存在",
"data": null
}
3.3 接口流程
请求到达
│
▼
参数校验(activityId 格式、limit 范围)
│
▼
Gateway 调用 ActivityService(直接查 Repository)
│
▼
查询 activity_contributions 表,并联表查询用户信息和道具信息
SELECT c.id, c.activity_id, c.user_id,
u.nickname as user_nickname, u.avatar_url as user_avatar,
i.item_type, i.item_name, i.icon_url as item_icon,
c.quantity, c.contribution_points, c.created_at
FROM activity_contributions c
LEFT JOIN users u ON c.user_id = u.id
LEFT JOIN activity_items i ON c.item_id = i.id
WHERE c.activity_id = ?
AND (c.created_at > ? OR (c.created_at = ? AND c.id > ?))
ORDER BY c.created_at DESC, c.id DESC
LIMIT limit
│
▼
组装响应
latest_timestamp = 本次返回的最新记录时间戳
latest_id = 本次返回的最大 ID
combo_count = 从 Redis 获取连击数
│
▼
返回 code: 0 + records + latest_timestamp + latest_id
使用时间戳 + ID 双重条件查询,好处是:
- 用户退出页面再进来时,用列表中最新记录的时间戳和 ID 作为起点
- 同一毫秒内多条记录时,用 ID 做二次筛选,不会漏数据
ORDER BY created_at DESC, id DESC保证同时间戳内 ID 大的在前
3.4 后端实现
Gateway 层(HTTP API)
新增 Controller:gateway/controller/contribution_controller.go
package controller
type ContributionController struct {
db *gorm.DB
}
func NewContributionController(db *gorm.DB) *ContributionController
// GetLatestContributions 获取最新贡献记录
// @Summary 获取活动最新贡献记录(实时轮询用)
// @Tags contributions
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param activityId path int64 true "活动ID"
// @Param since_timestamp query int64 false "起始时间戳(毫秒)"
// @Param since_id query int64 false "起始记录ID(用于同时间戳内的二次筛选)"
// @Param limit query int32 false "拉取条数,默认1,最大5"
// @Success 200 {object} response.Response
// @Router /api/v1/activity/{activityId}/contributions/latest [get]
func (ctrl *ContributionController) GetLatestContributions(c *gin.Context)
Repository 层
// services/activityService/repository/activity_repository.go
// GetLatestContributions 获取最新的贡献记录(用于实时轮询)
// sinceTimestamp: 起始时间戳,sinceId: 起始记录ID(用于同时间戳内的二次筛选)
func (r *activityRepository) GetLatestContributions(activityID int64, sinceTimestamp int64, sinceId int64, limit int) ([]*models.ActivityContribution, int64, int64, error) {
query := r.db.Model(&models.ActivityContribution{}).
Where("activity_id = ?", activityID)
if sinceTimestamp > 0 && sinceId > 0 {
// 同一毫秒内用 ID 做二次筛选
query = query.Where("(created_at > ? OR (created_at = ? AND id > ?))", sinceTimestamp, sinceTimestamp, sinceId)
} else if sinceTimestamp > 0 {
query = query.Where("created_at > ?", sinceTimestamp)
}
var contributions []*models.ActivityContribution
if err := query.Order("created_at DESC, id DESC").Limit(limit).Find(&contributions).Error; err != nil {
return nil, 0, 0, err
}
// 返回本次返回的最新时间戳和最大 ID
var latestTimestamp, latestId int64
if len(contributions) > 0 {
latestTimestamp = contributions[0].CreatedAt
latestId = contributions[0].Id
}
return contributions, latestTimestamp, latestId, nil
}
获取连击数(Service 层):
func (s *ActivityService) GetComboCount(userID int64, itemType string) (int64, error) {
key := fmt.Sprintf("combo:%d:%s", userID, itemType)
count, err := s.redis.Get(key).Int64()
if err != nil || count == 0 {
return 1, nil // 无连击时返回 1
}
return count, nil
}
写入时机(purchaseItem 改造)
当前 PurchaseItem 成功后已调用 CreateContribution,需确保冗余字段(user_nickname、user_avatar、item_name、item_icon)一并写入。
连击计数:
// Redis INCR + EXPIRE
rdb.Incr(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType))
rdb.Expire(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType), 3*time.Second)
查询时获取连击数
// 从 Redis 获取 combo_count
comboCount, _ := rdb.Get(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType)).Int64()
4. 前端架构设计
4.1 目录结构
frontend/pages/support-activity/
├── components/
│ └── ContributionList.vue # 新增:贡献列表组件
├── composables/
│ └── useContributionPolling.js # 新增:贡献轮询逻辑
└── index.vue
4.2 数据流
页面可见
│
▼
useContributionPolling.start()
│
▼
首次全量拉取:GET /api/v1/activity/{id}/contributions/latest?since_timestamp=0&since_id=0&limit=5
│
▼
records = 返回的最新记录(最多 5 条)
latestTimestamp = 本次返回的最新时间戳
latestId = 本次返回的最大 ID
│
▼
定时轮询(每秒)
│
▼
GET /api/v1/activity/{id}/contributions/latest?since_timestamp={latestTimestamp}&since_id={latestId}&limit=1
│
▼
比对 records:
- 有新记录(时间戳更新 或 同时间戳内 ID 更新)→ 插入到列表头部,移除超出的记录,重置所有记录计时器
- 无新记录 → 不更新,计时器继续倒计时
│
▼
更新 latestTimestamp 和 latestId
页面不可见
│
▼
useContributionPolling.stop() — 停止轮询,保留 latestTimestamp 和 records
页面重新可见
│
▼
useContributionPolling.start() — 直接用现有的 latestTimestamp 继续轮询
4.3 轮询策略
| 参数 | 值 | 说明 |
|---|---|---|
| 轮询间隔 | 1000ms | 每秒拉取一次,确保实时感 |
| 首次请求 | 全量拉取 5 条最新记录 | 获取最近 5 条实时贡献 |
| 轮询请求 | limit=1 | 每秒拉取 1 条,保持实时感 |
| 列表上限 | 5 条 | 超出后移除最旧记录 |
| 消失计时 | 每条记录 5 秒无更新则逐条淡出消失 | 无新数据时,最早那条记录 5 秒后先消失,后续记录依次前移并同样计时 |
| 暂停条件 | onHide | 页面隐藏时停止轮询,保留 latestTimestamp、latestId 和 records |
| 恢复条件 | onShow | 页面显示时继续轮询,使用现有的 latestTimestamp 和 latestId |
| 切换活动 | - | activityId 变化时,调用 reset() 重置所有状态 |
5. 组件设计
5.1 ContributionList.vue
功能
- 展示贡献记录滚动列表,最新记录在顶部
- 页面不可见时整体隐藏(v-if)
- 新记录有淡入动画,记录超时淡出消失
Props
| 属性 | 类型 | 必填 | 说明 |
|---|---|---|---|
| activityId | string | 是 | 活动 ID |
模板结构
<template>
<view class="contribution-list" v-if="visible">
<view class="list-header">
<text class="header-title">实时贡献</text>
</view>
<scroll-view class="list-content" scroll-y>
<view
v-for="(record, index) in records"
:key="record.id"
class="contribution-item"
:class="{ 'new-item': index === 0, 'fading-out': record.fading }"
>
<image class="user-avatar" :src="record.user_avatar" mode="aspectFill" />
<text class="user-nickname">{{ record.user_nickname }}</text>
<text class="contribute-text">贡献了</text>
<image class="item-icon" :src="record.item_icon" mode="aspectFill" />
<text class="item-name">{{ record.item_name }}</text>
<text class="item-quantity">x{{ record.combo_count > 1 ? record.combo_count : record.quantity }}</text>
</view>
</scroll-view>
</view>
</template>
样式要点
- 固定高度(建议 200rpx),超出部分滚动
- 半透明背景(rgba(0,0,0,0.3))
- 每条记录一行:头像 + 昵称 + "贡献了" + 道具图标 + 名称 + 数量
- 新记录(index === 0)有 0.3s 淡入动画
- 消失时添加 fading 类,0.5s 淡出动画
5.2 useContributionPolling.js
功能
- 管理贡献记录的轮询逻辑
- 提供 start / stop 方法
- 自动处理增量拉取和列表更新
- 每条记录独立计时,超时淡出消失
接口
// 输入
activityId: Ref<string>
// 输出
{
records, // Ref<ContributionRecord[]> — 当前展示的贡献列表
visible, // Ref<boolean> — 列表是否可见(用于 v-if 控制)
loading, // Ref<boolean> — 是否正在加载
error, // Ref<string | null> — 错误信息
start, // () => void — 开始轮询
stop, // () => void — 停止轮询(保留状态)
reset // () => void — 重置所有状态(切换活动时用)
}
// 内部状态
{
latestTimestamp, // number — 当前列表中最新记录的时间戳(继续轮询的起点)
latestId, // number — 当前列表中最新记录的 ID(同时间戳内二次筛选用)
pollingTimer, // number — setInterval 返回的 timer ID
isPolling, // boolean — 是否正在轮询
recordTimers, // Map — 记录 ID -> 定时器
}
核心逻辑
const MAX_RECORDS = 5
const POLL_INTERVAL = 1000 // 每秒拉取
const RECORD_TTL = 5000 // 每条记录 5 秒后消失
let latestTimestamp = 0 // 初始为 0,获取所有最新记录
let latestId = 0 // 初始为 0
const recordTimers = new Map() // 记录 ID -> 定时器
function resetRecordTimer(record) {
// 清除已有定时器
if (recordTimers.has(record.id)) {
clearTimeout(recordTimers.get(record.id))
recordTimers.delete(record.id)
}
// 清除淡出状态(新数据到来时重置)
const target = records.value.find(r => r.id === record.id)
if (target) {
target.fading = false
}
// 设置新的消失定时器
const timer = setTimeout(() => {
// 标记为淡出状态
const target = records.value.find(r => r.id === record.id)
if (target) {
target.fading = true
// 等待动画完成后移除
setTimeout(() => {
records.value = records.value.filter(r => r.id !== record.id)
recordTimers.delete(record.id)
}, 500)
} else {
recordTimers.delete(record.id)
}
}, RECORD_TTL)
recordTimers.set(record.id, timer)
}
async function fetchLatest() {
// 拉取 since_timestamp 和 since_id 之后的最新记录
const res = await fetchActivityContributionsLatest(activityId.value, latestTimestamp, latestId, 1)
if (res.records.length === 0) return
const newRecord = res.records[0]
// 检测到新记录(时间戳更新,或时间戳相同但 ID 更新)
const isNew = newRecord.created_at > latestTimestamp ||
(newRecord.created_at === latestTimestamp && newRecord.id > latestId)
if (isNew) {
// 重置所有现有记录的计时器(新数据到来,刷新列表)
records.value.forEach(resetRecordTimer)
// 新记录插入到列表头部
records.value = [newRecord, ...records.value].slice(0, MAX_RECORDS)
// 为新记录启动消失计时器
resetRecordTimer(newRecord)
// 更新时间戳和 ID
latestTimestamp = newRecord.created_at
latestId = newRecord.id
}
}
function start() {
if (pollingTimer) return
isPolling.value = true
// 如果 latestTimestamp 为 0,说明是首次或切换活动,清空列表
if (latestTimestamp === 0) {
records.value = []
}
fetchLatest()
pollingTimer = setInterval(fetchLatest, POLL_INTERVAL)
}
function stop() {
if (pollingTimer) {
clearInterval(pollingTimer)
pollingTimer = null
}
// 停止所有记录的计时器
recordTimers.forEach(timer => clearTimeout(timer))
recordTimers.clear()
isPolling.value = false
// 不清空 records、latestTimestamp、latestId,保留状态以便恢复
}
function reset() {
// 切换活动时调用,重置所有状态
stop()
latestTimestamp = 0
latestId = 0
records.value = []
}
// 监听页面显示/隐藏
onShow(() => {
visible.value = true
start()
})
onHide(() => {
visible.value = false
stop()
})
fetchActivityContributionsLatest调用GET /api/v1/activity/{id}/contributions/latest?since_timestamp={sinceTimestamp}&since_id={sinceId}&limit={limit}新增
reset()方法,切换活动时调用,重置所有状态。
6. 页面集成
6.1 组件引入
在 ThemeBanner 下方、StageArea 上方引入:
<!-- ThemeBanner 下方 -->
<ThemeBanner v-if="config" ... />
<!-- 贡献列表 -->
<ContributionList
v-if="activityId && !isLoading"
:activity-id="activityId"
class="contribution-list-wrapper"
/>
<!-- StageArea -->
<StageArea v-if="config" ... />
6.2 样式控制
.contribution-list-wrapper {
width: 100%;
padding: 0 24rpx;
}
.contribution-item.fading-out {
animation: fadeOut 0.5s forwards;
}
@keyframes fadeOut {
from { opacity: 1; }
to { opacity: 0; }
}
7. 性能与优化
| 优化点 | 方案 |
|---|---|
| 列表更新 | 新记录插入到头部,使用展开运算符 |
| 去重 | 以 record.id 作为 v-for 的 :key + 时间戳+ID 双重比对 |
| 列表上限 | 超过 5 条时移除最旧记录 |
| 记录消失 | 每条记录 5 秒无更新则淡出消失 |
| 内存占用 | 页面隐藏时停止计时器,保留 records |
| 轮询策略 | 每秒拉取 1 条,保持实时感 |
| 同毫秒去重 | 使用 since_timestamp + since_id 双重条件,同一毫秒多条不漏 |
8. 错误处理
| 场景 | 处理方式 |
|---|---|
| 接口返回 404(活动不存在) | 组件隐藏,不影响页面其他功能 |
| 接口返回 500(服务器错误) | 静默忽略,继续等待下一次 |
| 网络断开 | 页面 onHide 会停止轮询;恢复后 onShow 继续轮询 |
| 返回数据为空 | 不更新列表,不更新 latestTimestamp 和 latestId |
| 切换活动 | 调用 reset() 重置所有状态 |
| 同毫秒多条记录 | 使用 since_timestamp + since_id 双重筛选,不会漏数据 |
| 连击显示 | combo_count 从 Redis 获取,3秒内同用户同道具累加 |
9. 实施步骤
9.1 后端实现
- 在
activity_contributions表联表查询用户信息和道具信息(LEFT JOIN users, LEFT JOIN activity_items) - 在 Redis 中实现连击计数器
combo:{user_id}:{item_type},TTL 3秒 - 在
activity_repository.go添加GetLatestContributions方法 - 在 Gateway 层添加
contribution_controller.go - 在路由注册新接口
/api/v1/activity/:activityId/contributions/latest - 验证
purchaseItem后CreateContribution正常工作,并更新 Redis 连击计数器
9.2 前端 API 层
- 在
frontend/utils/activity-config.js中新增fetchActivityContributionsLatest方法
9.3 前端组件
- 创建
useContributionPolling.js - 创建
ContributionList.vue
9.4 页面集成
- 在
index.vue中引入ContributionList
9.5 测试验证
- 轮询正常:每秒间隔拉取数据
- 增量正确:新贡献出现时列表头部插入
- 消失逻辑:无新数据 5 秒后记录淡出消失
- 页面切换:onHide 停止,onShow 继续(latestTimestamp 保持)
- 列表上限:超过 5 条时移除最旧记录
11. Redis 缓存 + 时间窗口合并 + 分布式锁优化
11.1 背景与目标
背景
- 前端已实现每秒轮询
GET /api/v1/activity/:activityId/contributions/latest - 如果 10,000 用户同时轮询,每秒 10,000 次 DB 查询,存在性能瓶颈
目标
- 秒级合并:10s/12s/13s 的查询请求,统一在 10s 窗口执行一次 DB 查询
- 有新数据立即返回:如果缓存中检测到新记录,直接返回最新数据
- 无新数据合并查询:窗口内无写入时,后续请求复用缓存结果
性能指标
- 10k 并发轮询 → 实际 DB 查询频率 ≤ 1次/秒(正常状态)
- 缓存未命中时首个请求有 100-300ms 延迟,后续请求 < 5ms
- 缓存 TTL 5秒,滚动窗口
11.2 整体流程
用户轮询请求 (10k 并发)
│
▼
Gateway API
│
▼
查询 Redis 缓存
activity:{id}:contributions:latest
│
├── 有缓存 + 窗口有效(now_ms - updated_at < 1000ms)
│ │
│ ▼
│ 检查 sinceTimestamp:
│ - sinceTimestamp <= updated_at → 缓存数据够新,直接返回
│ - sinceTimestamp > updated_at → 继续查 DB(数据可能不够新)
│
└── 缓存不存在 / 过期
│
▼
SETNX 加分布式锁
lock: activity:{id}:contributions:lock
(5秒自动释放,防止死锁)
│
├── 获取锁成功
│ │
│ ▼
│ 查询 PostgreSQL
│ 回填 Redis (TTL=5秒)
│ 释放锁
│ │
│ ▼
│ 返回数据
│
└── 获取锁失败
│
▼
等待 100ms 重试
(最多 3 次,避免长时间等待)
11.3 Redis Key 设计
| Key | 类型 | 说明 | TTL |
|---|---|---|---|
activity:{activityId}:contributions:latest |
Hash | 最新贡献记录缓存 | 5秒 |
activity:{activityId}:contributions:lock |
String | 分布式锁 | 5秒 |
11.4 缓存数据结构
{
"records": [
{
"id": 12345,
"user_id": 1001,
"user_nickname": "用户昵称",
"user_avatar": "https://...",
"item_id": 1,
"item_type": "gift_flower",
"item_name": "玫瑰花",
"item_icon": "https://...",
"quantity": 10,
"crystal_spent": 100,
"contribution_points": 50,
"combo_count": 2,
"created_at": 1747133400000
}
],
"updated_at": 1747133400000,
"latest_id": 12345
}
字段说明:
records: 最新贡献记录数组(最多 5 条),每条记录的 combo_count 已合并updated_at: 窗口时间戳(毫秒级 Unix 时间戳),用于判断是否在有效窗口内latest_id: 最新记录的 ID,用于增量检测
时间戳单位:统一使用毫秒,与前端 sinceTimestamp 参数单位一致
11.5 查询流程(伪代码)
func GetContributionsLatest(activityId int64, sinceTimestamp int64, sinceId int64, limit int) ([]*ContributionRecord, error) {
ctx := context.Background()
cacheKey := fmt.Sprintf("activity:%d:contributions:latest", activityId)
lockKey := fmt.Sprintf("activity:%d:contributions:lock", activityId)
nowMs := time.Now().UnixMilli()
// 1. 尝试获取缓存
cached := redis.Get(ctx, cacheKey)
if cached != nil {
cache := parseCache(cached)
// 缓存有效(1秒窗口内)→ 检查 sinceTimestamp 是否在窗口内
if cache != nil && nowMs-cache.UpdatedAt < 1000 {
// sinceTimestamp <= updated_at,说明请求的数据在缓存窗口内,直接返回
if sinceTimestamp <= cache.UpdatedAt {
return cache.Records, nil
}
// sinceTimestamp > updated_at,缓存数据可能不够新,继续查 DB
}
}
// 2. 缓存不存在或过期或数据不够新,尝试加锁
locked := redis.SetNX(ctx, lockKey, "1", 5*time.Second)
if !locked {
// 3. 获取锁失败,等待重试
for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
cached := redis.Get(ctx, cacheKey)
if cached != nil {
cache := parseCache(cached)
if cache != nil && nowMs-cache.UpdatedAt < 1000 && sinceTimestamp <= cache.UpdatedAt {
return cache.Records, nil
}
}
}
// 重试 3 次后仍失败,返回错误或旧缓存
return nil, errors.New("cache unavailable after retry")
}
// 4. 获取锁成功,查 DB
defer redis.Del(ctx, lockKey)
records, err := db.QueryContributions(activityId, sinceTimestamp, sinceId, limit)
if err != nil {
return nil, err
}
// 5. 回填缓存(TTL 5秒)
if len(records) > 0 {
// 为每条记录获取最新的 combo_count
for _, record := range records {
comboCount, _ := redis.Get(ctx, fmt.Sprintf("combo:%d:%s", record.UserId, record.ItemType)).Int64()
if comboCount > 0 {
record.ComboCount = comboCount
}
}
cache := &ContributionCache{
Records: records,
UpdatedAt: nowMs,
LatestId: records[0].Id,
}
redis.Set(ctx, cacheKey, cache, 5*time.Second)
}
return records, nil
}
11.6 写入流程(使缓存失效)
在 PurchaseItem 成功写入 activity_contributions 后:
func (s *activityService) PurchaseItem(...) error {
// ... 原有的购买逻辑 ...
// 1. 写数据库
err := s.repo.CreateContribution(contribution)
if err != nil {
return err
}
// 2. 使缓存失效(不是更新,是删除)
cacheKey := fmt.Sprintf("activity:%d:contributions:latest", activityId)
redis.Del(ctx, cacheKey)
return nil
}
为什么不更新缓存而是删除?
- 如果更新缓存,需要处理并发写入的 race condition
- 删除缓存让下次查询触发重建,逻辑更简单且正确
11.7 窗口合并策略
时间窗口对齐
- 窗口粒度:1 秒(1000 毫秒,可调整)
- 查询时:如果
now_ms - cache.updated_at < 1000ms,认为是同一窗口 - 超过 1 秒:缓存过期,下次查询触发重建
sinceTimestamp 过滤逻辑
- 前端传
sinceTimestamp(毫秒)进行增量查询 - 缓存命中时,判断
sinceTimestamp <= cache.updated_at:true→ 请求的数据在缓存窗口内,直接返回缓存false→ 缓存数据不够新,继续查 DB
有新数据时的处理
- 后端对比
sinceId:sinceId > cache.latest_id→ 有新数据,返回最新记录sinceId <= cache.latest_id→ 无新数据,返回缓存数据
combo_count 合并
- 缓存回填时,从 Redis 获取每条记录的
combo:{user_id}:{item_type}值 - 合并到
record.combo_count字段 - 如果 Redis 无值,视为 1
11.8 分布式锁设计
锁 Key
lock: activity:{activityId}:contributions:lock
锁参数
- TTL: 5 秒(防止进程崩溃导致死锁)
- 重试: 获取失败后等待 100ms 重试,最多 3 次
- 释放: 使用后立即删除(
DEL命令)
可靠性说明
- 锁仅用于防止缓存击穿(cache stampede)
- 锁持有时间极短(一次 DB 查询,约 10-50ms)
- 单实例 Redis 足够,无需 Redlock
11.9 错误处理
| 场景 | 处理方式 |
|---|---|
| Redis 不可用 | 回退到直接查 DB(降级) |
| 获取锁失败 + 重试 3 次后仍失败 | 返回 503 Service Unavailable 或返回旧缓存 |
| DB 查询失败 | 返回错误,前端显示重试 |
| 缓存为空(无数据) | 返回空数组,不缓存 |
11.10 影响范围
需要修改的文件
-
Gateway 层
gateway/controller/activity_controller.go— 添加GetContributionsLatest方法(如果尚未添加)
-
Service 层
services/activityService/provider/activity_provider.go— 实现缓存逻辑 + 锁逻辑
-
Repository 层
services/activityService/repository/activity_repository.go—GetLatestContributions查询方法(如果尚未添加)
-
Proto 定义(如使用 Dubbo RPC)
pkg/proto/activity/activity.proto— 添加GetContributionsLatestRequest/Response- 重新生成
activity.pb.go
不需要修改的文件
- 前端
ContributionList.vue和useContributionPolling.js无需改动(接口兼容)
11.11 测试要点
- 并发测试:10k 请求同时发起,验证 DB 只查询 1 次
- 缓存失效测试:Purchase 后,验证缓存被正确删除
- 锁竞争测试:缓存失效瞬间,多个请求抢锁,验证只有一个请求查 DB
- 降级测试:Redis 不可用时,验证服务能回退到直连 DB
- 增量查询测试:传入
sinceId,验证只返回增量数据
11.12 后续优化(可选)
- 多级缓存:引入本地内存缓存(如 Go 的
sync.Map),减少 Redis 请求 - 窗口动态调整:根据并发量动态调整窗口大小
- 监控告警:监控缓存命中率、锁等待时间、DB 查询 QPS