topfans/docs/superpowers/specs/2026-05-13-contribution-realtime-display-design.md
2026-05-14 15:59:56 +08:00

960 lines
31 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 用户贡献实时显示方案
## 1. 概述
### 1.1 需求描述
在活动页面实时展示所有用户的贡献动态,包括:
- 贡献者头像 / 昵称
- 道具图标 / 名称
- 贡献数量
### 1.2 展示形式
- **列表形式**:最新贡献显示在顶部
- **生命周期**:页面可见时显示,不可见时暂停
- **消失逻辑**:每条记录 5 秒无新数据则逐条淡出消失
### 1.3 技术方案
- 后端Gateway 层新增 HTTP APIActivityService 处理业务逻辑
- 前端:轮询 composable 与列表组件
- 数据库:`activity_contributions` 表已存在
---
## 2. 数据库设计
### 2.1 ER 关系
```
┌─────────────────┐ ┌──────────────────────────┐
│ activities │ │ activity_contributions │
│─────────────────│ │───────────────────────────│
│ id │──────< │ activity_id │
│ star_id │ │ user_id │
│ ... │ │ star_id │
└─────────────────┘ │ item_id │
│ item_type │
│ quantity │
│ contribution_points │
│ created_at毫秒时间戳
└──────────────────────────┘
│ LEFT JOIN
┌─────────────────┐
│ users │
│─────────────────│
│ id │
│ nickname │
│ avatar_url │
└─────────────────┘
│ LEFT JOIN
┌─────────────────┐
│ activity_items │
│─────────────────│
│ id │
│ item_name │
│ icon_url │
└─────────────────┘
```
### 2.2 表结构
**activity_contributions活动贡献记录表**
| 字段 | 类型 | 约束 | 说明 |
|------|------|------|------|
| id | bigint | PK, AUTO_INCREMENT | 贡献记录 ID |
| activity_id | bigint | NOT NULL, INDEX | 关联活动 ID |
| user_id | bigint | NOT NULL | 贡献者用户 ID |
| star_id | bigint | NOT NULL | 明星 ID |
| item_id | bigint | NOT NULL | 道具 ID |
| item_type | varchar(50) | NOT NULL | 道具类型,如 `firework`、`megaphone` |
| quantity | int | DEFAULT 1 | 本次贡献数量 |
| crystal_spent | bigint | NOT NULL | 花费的水晶数量 |
| contribution_points | bigint | NOT NULL | 贡献值 |
| created_at | bigint | NOT NULL, INDEX | 创建时间(毫秒时间戳) |
**索引设计**
| 索引名 | 索引字段 | 类型 | 用途 |
|--------|----------|------|------|
| idx_activity_id | (activity_id) | BTREE | 按活动 ID 筛选 |
| idx_activity_created | (activity_id, created_at DESC, id DESC) | BTREE | 按活动时间降序+ID降序查询支持 since_timestamp + since_id 轮询 |
> `created_at DESC` 索引可加速 `WHERE activity_id = ? AND created_at > sinceTimestamp` 查询。
### 2.3 写入时机
`purchaseItem` 成功后,**同步写入**此表。
**连击计数器Redis**
用户每次点击购买道具时,更新 Redis 连击计数器:
- **Key**`combo:{user_id}:{item_type}`
- **Value**:当前连击数
- **TTL**3 秒(无新点击则过期)
```
用户点击购买 → Redis INCR combo:{user_id}:{item_type} + EXPIRE 3秒
```
### 2.4 查询时获取连击数
前端轮询时,后端从 Redis 获取当前连击数,合并到返回结果中:
```
GET combo:{user_id}:{item_type} → 返回当前计数
```
**连击逻辑**
- 同一用户、同一道具3 秒内的多次点击累加显示
- 不同道具:独立计数,互不影响
---
## 3. 后端 API 设计
### 3.1 接口概述
| 接口 | 方法 | 路径 | 说明 |
|------|------|------|------|
| 获取最新贡献记录 | GET | `/api/v1/activity/:activityId/contributions/latest` | 实时轮询用 |
### 3.2 获取最新贡献记录接口
**请求**
```
GET /api/v1/activity/:activityId/contributions/latest
```
**Query 参数**
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|------|------|------|--------|------|
| since_timestamp | int64 | 否 | 0 | 起始时间戳(毫秒),获取该时间之后的最新记录 |
| since_id | int64 | 否 | 0 | 起始记录 ID用于同时间戳内的二次筛选 |
| limit | int32 | 否 | 1 | 每次拉取条数,最大 5 |
**响应(成功)**
```json
{
"code": 0,
"message": "success",
"data": {
"records": [
{
"id": 12345,
"activity_id": 1001,
"user_id": 888,
"user_nickname": "小明",
"user_avatar": "https://example.com/avatar.jpg",
"item_type": "firework",
"item_name": "烟花",
"item_icon": "https://example.com/firework.png",
"quantity": 3,
"contribution_points": 3,
"combo_count": 2,
"created_at": 1747133400000
}
],
"latest_timestamp": 1747133400000,
"latest_id": 12345
}
}
```
**字段说明**
| 字段 | 说明 |
|------|------|
| quantity | 本次 contribution 的实际贡献数量 |
| combo_count | 从 Redis 获取的当前连击数3秒内同用户同道具的累计点击 |
**响应(失败)**
```json
{
"code": 40401,
"message": "活动不存在",
"data": null
}
```
### 3.3 接口流程
```
请求到达
参数校验activityId 格式、limit 范围)
Gateway 调用 ActivityService直接查 Repository
查询 activity_contributions 表,并联表查询用户信息和道具信息
SELECT c.id, c.activity_id, c.user_id,
u.nickname as user_nickname, u.avatar_url as user_avatar,
i.item_type, i.item_name, i.icon_url as item_icon,
c.quantity, c.contribution_points, c.created_at
FROM activity_contributions c
LEFT JOIN users u ON c.user_id = u.id
LEFT JOIN activity_items i ON c.item_id = i.id
WHERE c.activity_id = ?
AND (c.created_at > ? OR (c.created_at = ? AND c.id > ?))
ORDER BY c.created_at DESC, c.id DESC
LIMIT limit
组装响应
latest_timestamp = 本次返回的最新记录时间戳
latest_id = 本次返回的最大 ID
combo_count = 从 Redis 获取连击数
返回 code: 0 + records + latest_timestamp + latest_id
```
> 使用时间戳 + ID 双重条件查询,好处是:
> 1. 用户退出页面再进来时,用列表中最新记录的时间戳和 ID 作为起点
> 2. 同一毫秒内多条记录时,用 ID 做二次筛选,不会漏数据
> 3. `ORDER BY created_at DESC, id DESC` 保证同时间戳内 ID 大的在前
### 3.4 后端实现
#### Gateway 层HTTP API
新增 Controller`gateway/controller/contribution_controller.go`
```go
package controller
type ContributionController struct {
db *gorm.DB
}
func NewContributionController(db *gorm.DB) *ContributionController
// GetLatestContributions 获取最新贡献记录
// @Summary 获取活动最新贡献记录(实时轮询用)
// @Tags contributions
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param activityId path int64 true "活动ID"
// @Param since_timestamp query int64 false "起始时间戳(毫秒)"
// @Param since_id query int64 false "起始记录ID用于同时间戳内的二次筛选"
// @Param limit query int32 false "拉取条数默认1最大5"
// @Success 200 {object} response.Response
// @Router /api/v1/activity/{activityId}/contributions/latest [get]
func (ctrl *ContributionController) GetLatestContributions(c *gin.Context)
```
#### Repository 层
```go
// services/activityService/repository/activity_repository.go
// GetLatestContributions 获取最新的贡献记录(用于实时轮询)
// sinceTimestamp: 起始时间戳sinceId: 起始记录ID用于同时间戳内的二次筛选
func (r *activityRepository) GetLatestContributions(activityID int64, sinceTimestamp int64, sinceId int64, limit int) ([]*models.ActivityContribution, int64, int64, error) {
query := r.db.Model(&models.ActivityContribution{}).
Where("activity_id = ?", activityID)
if sinceTimestamp > 0 && sinceId > 0 {
// 同一毫秒内用 ID 做二次筛选
query = query.Where("(created_at > ? OR (created_at = ? AND id > ?))", sinceTimestamp, sinceTimestamp, sinceId)
} else if sinceTimestamp > 0 {
query = query.Where("created_at > ?", sinceTimestamp)
}
var contributions []*models.ActivityContribution
if err := query.Order("created_at DESC, id DESC").Limit(limit).Find(&contributions).Error; err != nil {
return nil, 0, 0, err
}
// 返回本次返回的最新时间戳和最大 ID
var latestTimestamp, latestId int64
if len(contributions) > 0 {
latestTimestamp = contributions[0].CreatedAt
latestId = contributions[0].Id
}
return contributions, latestTimestamp, latestId, nil
}
```
**获取连击数Service 层)**
```go
func (s *ActivityService) GetComboCount(userID int64, itemType string) (int64, error) {
key := fmt.Sprintf("combo:%d:%s", userID, itemType)
count, err := s.redis.Get(key).Int64()
if err != nil || count == 0 {
return 1, nil // 无连击时返回 1
}
return count, nil
}
```
#### 写入时机purchaseItem 改造)
当前 `PurchaseItem` 成功后已调用 `CreateContribution`需确保冗余字段user_nickname、user_avatar、item_name、item_icon一并写入。
**连击计数**
```go
// Redis INCR + EXPIRE
rdb.Incr(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType))
rdb.Expire(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType), 3*time.Second)
```
#### 查询时获取连击数
```go
// 从 Redis 获取 combo_count
comboCount, _ := rdb.Get(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType)).Int64()
```
---
## 4. 前端架构设计
### 4.1 目录结构
```
frontend/pages/support-activity/
├── components/
│ └── ContributionList.vue # 新增:贡献列表组件
├── composables/
│ └── useContributionPolling.js # 新增:贡献轮询逻辑
└── index.vue
```
### 4.2 数据流
```
页面可见
useContributionPolling.start()
首次全量拉取GET /api/v1/activity/{id}/contributions/latest?since_timestamp=0&since_id=0&limit=5
records = 返回的最新记录(最多 5 条)
latestTimestamp = 本次返回的最新时间戳
latestId = 本次返回的最大 ID
定时轮询(每秒)
GET /api/v1/activity/{id}/contributions/latest?since_timestamp={latestTimestamp}&since_id={latestId}&limit=1
比对 records
- 有新记录(时间戳更新 或 同时间戳内 ID 更新)→ 插入到列表头部,移除超出的记录,重置所有记录计时器
- 无新记录 → 不更新,计时器继续倒计时
更新 latestTimestamp 和 latestId
页面不可见
useContributionPolling.stop() — 停止轮询,保留 latestTimestamp 和 records
页面重新可见
useContributionPolling.start() — 直接用现有的 latestTimestamp 继续轮询
```
### 4.3 轮询策略
| 参数 | 值 | 说明 |
|------|-----|------|
| 轮询间隔 | 1000ms | 每秒拉取一次,确保实时感 |
| 首次请求 | 全量拉取 5 条最新记录 | 获取最近 5 条实时贡献 |
| 轮询请求 | limit=1 | 每秒拉取 1 条,保持实时感 |
| 列表上限 | 5 条 | 超出后移除最旧记录 |
| 消失计时 | 每条记录 5 秒无更新则逐条淡出消失 | 无新数据时,最早那条记录 5 秒后先消失,后续记录依次前移并同样计时 |
| 暂停条件 | onHide | 页面隐藏时停止轮询,保留 latestTimestamp、latestId 和 records |
| 恢复条件 | onShow | 页面显示时继续轮询,使用现有的 latestTimestamp 和 latestId |
| 切换活动 | - | activityId 变化时,调用 reset() 重置所有状态 |
---
## 5. 组件设计
### 5.1 ContributionList.vue
**功能**
- 展示贡献记录滚动列表,最新记录在顶部
- 页面不可见时整体隐藏v-if
- 新记录有淡入动画,记录超时淡出消失
**Props**
| 属性 | 类型 | 必填 | 说明 |
|------|------|------|------|
| activityId | string | 是 | 活动 ID |
**模板结构**
```vue
<template>
<view class="contribution-list" v-if="visible">
<view class="list-header">
<text class="header-title">实时贡献</text>
</view>
<scroll-view class="list-content" scroll-y>
<view
v-for="(record, index) in records"
:key="record.id"
class="contribution-item"
:class="{ 'new-item': index === 0, 'fading-out': record.fading }"
>
<image class="user-avatar" :src="record.user_avatar" mode="aspectFill" />
<text class="user-nickname">{{ record.user_nickname }}</text>
<text class="contribute-text">贡献了</text>
<image class="item-icon" :src="record.item_icon" mode="aspectFill" />
<text class="item-name">{{ record.item_name }}</text>
<text class="item-quantity">x{{ record.combo_count > 1 ? record.combo_count : record.quantity }}</text>
</view>
</scroll-view>
</view>
</template>
```
**样式要点**
- 固定高度(建议 200rpx超出部分滚动
- 半透明背景rgba(0,0,0,0.3)
- 每条记录一行:头像 + 昵称 + "贡献了" + 道具图标 + 名称 + 数量
- 新记录index === 0有 0.3s 淡入动画
- 消失时添加 fading 类0.5s 淡出动画
### 5.2 useContributionPolling.js
**功能**
- 管理贡献记录的轮询逻辑
- 提供 start / stop 方法
- 自动处理增量拉取和列表更新
- 每条记录独立计时,超时淡出消失
**接口**
```javascript
// 输入
activityId: Ref<string>
// 输出
{
records, // Ref<ContributionRecord[]> — 当前展示的贡献列表
visible, // Ref<boolean> — 列表是否可见(用于 v-if 控制)
loading, // Ref<boolean> — 是否正在加载
error, // Ref<string | null> — 错误信息
start, // () => void — 开始轮询
stop, // () => void — 停止轮询(保留状态)
reset // () => void — 重置所有状态(切换活动时用)
}
// 内部状态
{
latestTimestamp, // number — 当前列表中最新记录的时间戳(继续轮询的起点)
latestId, // number — 当前列表中最新记录的 ID同时间戳内二次筛选用
pollingTimer, // number — setInterval 返回的 timer ID
isPolling, // boolean — 是否正在轮询
recordTimers, // Map — 记录 ID -> 定时器
}
```
**核心逻辑**
```javascript
const MAX_RECORDS = 5
const POLL_INTERVAL = 1000 // 每秒拉取
const RECORD_TTL = 5000 // 每条记录 5 秒后消失
let latestTimestamp = 0 // 初始为 0获取所有最新记录
let latestId = 0 // 初始为 0
const recordTimers = new Map() // 记录 ID -> 定时器
function resetRecordTimer(record) {
// 清除已有定时器
if (recordTimers.has(record.id)) {
clearTimeout(recordTimers.get(record.id))
recordTimers.delete(record.id)
}
// 清除淡出状态(新数据到来时重置)
const target = records.value.find(r => r.id === record.id)
if (target) {
target.fading = false
}
// 设置新的消失定时器
const timer = setTimeout(() => {
// 标记为淡出状态
const target = records.value.find(r => r.id === record.id)
if (target) {
target.fading = true
// 等待动画完成后移除
setTimeout(() => {
records.value = records.value.filter(r => r.id !== record.id)
recordTimers.delete(record.id)
}, 500)
} else {
recordTimers.delete(record.id)
}
}, RECORD_TTL)
recordTimers.set(record.id, timer)
}
async function fetchLatest() {
// 拉取 since_timestamp 和 since_id 之后的最新记录
const res = await fetchActivityContributionsLatest(activityId.value, latestTimestamp, latestId, 1)
if (res.records.length === 0) return
const newRecord = res.records[0]
// 检测到新记录(时间戳更新,或时间戳相同但 ID 更新)
const isNew = newRecord.created_at > latestTimestamp ||
(newRecord.created_at === latestTimestamp && newRecord.id > latestId)
if (isNew) {
// 重置所有现有记录的计时器(新数据到来,刷新列表)
records.value.forEach(resetRecordTimer)
// 新记录插入到列表头部
records.value = [newRecord, ...records.value].slice(0, MAX_RECORDS)
// 为新记录启动消失计时器
resetRecordTimer(newRecord)
// 更新时间戳和 ID
latestTimestamp = newRecord.created_at
latestId = newRecord.id
}
}
function start() {
if (pollingTimer) return
isPolling.value = true
// 如果 latestTimestamp 为 0说明是首次或切换活动清空列表
if (latestTimestamp === 0) {
records.value = []
}
fetchLatest()
pollingTimer = setInterval(fetchLatest, POLL_INTERVAL)
}
function stop() {
if (pollingTimer) {
clearInterval(pollingTimer)
pollingTimer = null
}
// 停止所有记录的计时器
recordTimers.forEach(timer => clearTimeout(timer))
recordTimers.clear()
isPolling.value = false
// 不清空 records、latestTimestamp、latestId保留状态以便恢复
}
function reset() {
// 切换活动时调用,重置所有状态
stop()
latestTimestamp = 0
latestId = 0
records.value = []
}
// 监听页面显示/隐藏
onShow(() => {
visible.value = true
start()
})
onHide(() => {
visible.value = false
stop()
})
```
> `fetchActivityContributionsLatest` 调用 `GET /api/v1/activity/{id}/contributions/latest?since_timestamp={sinceTimestamp}&since_id={sinceId}&limit={limit}`
>
> 新增 `reset()` 方法,切换活动时调用,重置所有状态。
---
## 6. 页面集成
### 6.1 组件引入
`ThemeBanner` 下方、`StageArea` 上方引入:
```vue
<!-- ThemeBanner 下方 -->
<ThemeBanner v-if="config" ... />
<!-- 贡献列表 -->
<ContributionList
v-if="activityId && !isLoading"
:activity-id="activityId"
class="contribution-list-wrapper"
/>
<!-- StageArea -->
<StageArea v-if="config" ... />
```
### 6.2 样式控制
```css
.contribution-list-wrapper {
width: 100%;
padding: 0 24rpx;
}
.contribution-item.fading-out {
animation: fadeOut 0.5s forwards;
}
@keyframes fadeOut {
from { opacity: 1; }
to { opacity: 0; }
}
```
---
## 7. 性能与优化
| 优化点 | 方案 |
|--------|------|
| 列表更新 | 新记录插入到头部,使用展开运算符 |
| 去重 | 以 `record.id` 作为 v-for 的 :key + 时间戳+ID 双重比对 |
| 列表上限 | 超过 5 条时移除最旧记录 |
| 记录消失 | 每条记录 5 秒无更新则淡出消失 |
| 内存占用 | 页面隐藏时停止计时器,保留 records |
| 轮询策略 | 每秒拉取 1 条,保持实时感 |
| 同毫秒去重 | 使用 `since_timestamp + since_id` 双重条件,同一毫秒多条不漏 |
---
## 8. 错误处理
| 场景 | 处理方式 |
|------|----------|
| 接口返回 404活动不存在 | 组件隐藏,不影响页面其他功能 |
| 接口返回 500服务器错误 | 静默忽略,继续等待下一次 |
| 网络断开 | 页面 onHide 会停止轮询;恢复后 onShow 继续轮询 |
| 返回数据为空 | 不更新列表,不更新 latestTimestamp 和 latestId |
| 切换活动 | 调用 reset() 重置所有状态 |
| 同毫秒多条记录 | 使用 since_timestamp + since_id 双重筛选,不会漏数据 |
| 连击显示 | combo_count 从 Redis 获取3秒内同用户同道具累加 |
---
## 9. 实施步骤
### 9.1 后端实现
1. [x] 在 `activity_contributions` 表联表查询用户信息和道具信息LEFT JOIN users, LEFT JOIN activity_items
2. [ ] 在 Redis 中实现连击计数器 `combo:{user_id}:{item_type}`TTL 3秒
3. [ ] 在 `activity_repository.go` 添加 `GetLatestContributions` 方法
4. [ ] 在 Gateway 层添加 `contribution_controller.go`
5. [ ] 在路由注册新接口 `/api/v1/activity/:activityId/contributions/latest`
6. [ ] 验证 `purchaseItem``CreateContribution` 正常工作,并更新 Redis 连击计数器
### 9.2 前端 API 层
1. [ ] 在 `frontend/utils/activity-config.js` 中新增 `fetchActivityContributionsLatest` 方法
### 9.3 前端组件
1. [ ] 创建 `useContributionPolling.js`
2. [ ] 创建 `ContributionList.vue`
### 9.4 页面集成
1. [ ] 在 `index.vue` 中引入 `ContributionList`
### 9.5 测试验证
1. [ ] 轮询正常:每秒间隔拉取数据
2. [ ] 增量正确:新贡献出现时列表头部插入
3. [ ] 消失逻辑:无新数据 5 秒后记录淡出消失
4. [ ] 页面切换onHide 停止onShow 继续latestTimestamp 保持)
5. [ ] 列表上限:超过 5 条时移除最旧记录
---
---
## 11. Redis 缓存 + 时间窗口合并 + 分布式锁优化
### 11.1 背景与目标
#### 背景
- 前端已实现每秒轮询 `GET /api/v1/activity/:activityId/contributions/latest`
- 如果 10,000 用户同时轮询,每秒 10,000 次 DB 查询,存在性能瓶颈
#### 目标
- **秒级合并**10s/12s/13s 的查询请求,统一在 10s 窗口执行一次 DB 查询
- **有新数据立即返回**:如果缓存中检测到新记录,直接返回最新数据
- **无新数据合并查询**:窗口内无写入时,后续请求复用缓存结果
#### 性能指标
- 10k 并发轮询 → 实际 DB 查询频率 ≤ 1次/秒(正常状态)
- 缓存未命中时首个请求有 100-300ms 延迟,后续请求 < 5ms
- 缓存 TTL 5秒滚动窗口
### 11.2 整体流程
```
用户轮询请求 (10k 并发)
Gateway API
查询 Redis 缓存
activity:{id}:contributions:latest
├── 有缓存 + 窗口有效now_ms - updated_at < 1000ms
│ │
│ ▼
│ 检查 sinceTimestamp
│ - sinceTimestamp <= updated_at → 缓存数据够新,直接返回
│ - sinceTimestamp > updated_at → 继续查 DB数据可能不够新
└── 缓存不存在 / 过期
SETNX 加分布式锁
lock: activity:{id}:contributions:lock
(5秒自动释放防止死锁)
├── 获取锁成功
│ │
│ ▼
│ 查询 PostgreSQL
│ 回填 Redis (TTL=5秒)
│ 释放锁
│ │
│ ▼
│ 返回数据
└── 获取锁失败
等待 100ms 重试
(最多 3 次,避免长时间等待)
```
### 11.3 Redis Key 设计
| Key | 类型 | 说明 | TTL |
|-----|------|------|-----|
| `activity:{activityId}:contributions:latest` | Hash | 最新贡献记录缓存 | 5秒 |
| `activity:{activityId}:contributions:lock` | String | 分布式锁 | 5秒 |
### 11.4 缓存数据结构
```json
{
"records": [
{
"id": 12345,
"user_id": 1001,
"user_nickname": "用户昵称",
"user_avatar": "https://...",
"item_id": 1,
"item_type": "gift_flower",
"item_name": "玫瑰花",
"item_icon": "https://...",
"quantity": 10,
"crystal_spent": 100,
"contribution_points": 50,
"combo_count": 2,
"created_at": 1747133400000
}
],
"updated_at": 1747133400000,
"latest_id": 12345
}
```
**字段说明:**
- `records`: 最新贡献记录数组最多 5 **每条记录的 combo_count 已合并**
- `updated_at`: 窗口时间戳毫秒级 Unix 时间戳用于判断是否在有效窗口内
- `latest_id`: 最新记录的 ID用于增量检测
**时间戳单位**统一使用**毫秒**与前端 `sinceTimestamp` 参数单位一致
### 11.5 查询流程(伪代码)
```go
func GetContributionsLatest(activityId int64, sinceTimestamp int64, sinceId int64, limit int) ([]*ContributionRecord, error) {
ctx := context.Background()
cacheKey := fmt.Sprintf("activity:%d:contributions:latest", activityId)
lockKey := fmt.Sprintf("activity:%d:contributions:lock", activityId)
nowMs := time.Now().UnixMilli()
// 1. 尝试获取缓存
cached := redis.Get(ctx, cacheKey)
if cached != nil {
cache := parseCache(cached)
// 缓存有效1秒窗口内→ 检查 sinceTimestamp 是否在窗口内
if cache != nil && nowMs-cache.UpdatedAt < 1000 {
// sinceTimestamp <= updated_at说明请求的数据在缓存窗口内直接返回
if sinceTimestamp <= cache.UpdatedAt {
return cache.Records, nil
}
// sinceTimestamp > updated_at缓存数据可能不够新继续查 DB
}
}
// 2. 缓存不存在或过期或数据不够新,尝试加锁
locked := redis.SetNX(ctx, lockKey, "1", 5*time.Second)
if !locked {
// 3. 获取锁失败,等待重试
for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
cached := redis.Get(ctx, cacheKey)
if cached != nil {
cache := parseCache(cached)
if cache != nil && nowMs-cache.UpdatedAt < 1000 && sinceTimestamp <= cache.UpdatedAt {
return cache.Records, nil
}
}
}
// 重试 3 次后仍失败,返回错误或旧缓存
return nil, errors.New("cache unavailable after retry")
}
// 4. 获取锁成功,查 DB
defer redis.Del(ctx, lockKey)
records, err := db.QueryContributions(activityId, sinceTimestamp, sinceId, limit)
if err != nil {
return nil, err
}
// 5. 回填缓存TTL 5秒
if len(records) > 0 {
// 为每条记录获取最新的 combo_count
for _, record := range records {
comboCount, _ := redis.Get(ctx, fmt.Sprintf("combo:%d:%s", record.UserId, record.ItemType)).Int64()
if comboCount > 0 {
record.ComboCount = comboCount
}
}
cache := &ContributionCache{
Records: records,
UpdatedAt: nowMs,
LatestId: records[0].Id,
}
redis.Set(ctx, cacheKey, cache, 5*time.Second)
}
return records, nil
}
```
### 11.6 写入流程(使缓存失效)
`PurchaseItem` 成功写入 `activity_contributions`
```go
func (s *activityService) PurchaseItem(...) error {
// ... 原有的购买逻辑 ...
// 1. 写数据库
err := s.repo.CreateContribution(contribution)
if err != nil {
return err
}
// 2. 使缓存失效(不是更新,是删除)
cacheKey := fmt.Sprintf("activity:%d:contributions:latest", activityId)
redis.Del(ctx, cacheKey)
return nil
}
```
**为什么不更新缓存而是删除?**
- 如果更新缓存需要处理并发写入的 race condition
- 删除缓存让下次查询触发重建逻辑更简单且正确
### 11.7 窗口合并策略
#### 时间窗口对齐
- 窗口粒度**1 **1000 毫秒可调整
- 查询时如果 `now_ms - cache.updated_at < 1000ms`认为是同一窗口
- 超过 1 缓存过期下次查询触发重建
#### sinceTimestamp 过滤逻辑
- 前端传 `sinceTimestamp`毫秒进行增量查询
- 缓存命中时判断 `sinceTimestamp <= cache.updated_at`
- `true` 请求的数据在缓存窗口内直接返回缓存
- `false` 缓存数据不够新继续查 DB
#### 有新数据时的处理
- 后端对比 `sinceId`
- `sinceId > cache.latest_id` 有新数据返回最新记录
- `sinceId <= cache.latest_id` 无新数据返回缓存数据
#### combo_count 合并
- 缓存回填时 Redis 获取每条记录的 `combo:{user_id}:{item_type}`
- 合并到 `record.combo_count` 字段
- 如果 Redis 无值视为 1
### 11.8 分布式锁设计
#### 锁 Key
```
lock: activity:{activityId}:contributions:lock
```
#### 锁参数
- **TTL**: 5 防止进程崩溃导致死锁
- **重试**: 获取失败后等待 100ms 重试最多 3
- **释放**: 使用后立即删除`DEL` 命令
#### 可靠性说明
- 锁仅用于防止**缓存击穿**cache stampede
- 锁持有时间极短一次 DB 查询 10-50ms
- 单实例 Redis 足够无需 Redlock
### 11.9 错误处理
| 场景 | 处理方式 |
|------|----------|
| Redis 不可用 | 回退到直接查 DB降级 |
| 获取锁失败 + 重试 3 次后仍失败 | 返回 503 Service Unavailable 或返回旧缓存 |
| DB 查询失败 | 返回错误前端显示重试 |
| 缓存为空无数据 | 返回空数组不缓存 |
### 11.10 影响范围
#### 需要修改的文件
1. **Gateway 层**
- `gateway/controller/activity_controller.go` 添加 `GetContributionsLatest` 方法如果尚未添加
2. **Service 层**
- `services/activityService/provider/activity_provider.go` 实现缓存逻辑 + 锁逻辑
3. **Repository 层**
- `services/activityService/repository/activity_repository.go` `GetLatestContributions` 查询方法如果尚未添加
4. **Proto 定义**如使用 Dubbo RPC
- `pkg/proto/activity/activity.proto` 添加 `GetContributionsLatestRequest/Response`
- 重新生成 `activity.pb.go`
#### 不需要修改的文件
- 前端 `ContributionList.vue` `useContributionPolling.js` 无需改动接口兼容
### 11.11 测试要点
1. **并发测试**10k 请求同时发起验证 DB 只查询 1
2. **缓存失效测试**Purchase 验证缓存被正确删除
3. **锁竞争测试**缓存失效瞬间多个请求抢锁验证只有一个请求查 DB
4. **降级测试**Redis 不可用时验证服务能回退到直连 DB
5. **增量查询测试**传入 `sinceId`验证只返回增量数据
### 11.12 后续优化(可选)
1. **多级缓存**引入本地内存缓存 Go `sync.Map`减少 Redis 请求
2. **窗口动态调整**根据并发量动态调整窗口大小
3. **监控告警**监控缓存命中率锁等待时间DB 查询 QPS