# 用户贡献实时显示方案
## 1. 概述
### 1.1 需求描述
在活动页面实时展示所有用户的贡献动态,包括:
- 贡献者头像 / 昵称
- 道具图标 / 名称
- 贡献数量
### 1.2 展示形式
- **列表形式**:最新贡献显示在顶部
- **生命周期**:页面可见时显示,不可见时暂停
- **消失逻辑**:每条记录 5 秒无新数据则逐条淡出消失
### 1.3 技术方案
- 后端:Gateway 层新增 HTTP API,ActivityService 处理业务逻辑
- 前端:轮询 composable 与列表组件
- 数据库:`activity_contributions` 表已存在
---
## 2. 数据库设计
### 2.1 ER 关系
```
┌─────────────────┐ ┌──────────────────────────┐
│ activities │ │ activity_contributions │
│─────────────────│ │───────────────────────────│
│ id │──────< │ activity_id │
│ star_id │ │ user_id │
│ ... │ │ star_id │
└─────────────────┘ │ item_id │
│ item_type │
│ quantity │
│ contribution_points │
│ created_at(毫秒时间戳) │
└──────────────────────────┘
│
│ LEFT JOIN
▼
┌─────────────────┐
│ users │
│─────────────────│
│ id │
│ nickname │
│ avatar_url │
└─────────────────┘
│
│ LEFT JOIN
▼
┌─────────────────┐
│ activity_items │
│─────────────────│
│ id │
│ item_name │
│ icon_url │
└─────────────────┘
```
### 2.2 表结构
**activity_contributions(活动贡献记录表)**
| 字段 | 类型 | 约束 | 说明 |
|------|------|------|------|
| id | bigint | PK, AUTO_INCREMENT | 贡献记录 ID |
| activity_id | bigint | NOT NULL, INDEX | 关联活动 ID |
| user_id | bigint | NOT NULL | 贡献者用户 ID |
| star_id | bigint | NOT NULL | 明星 ID |
| item_id | bigint | NOT NULL | 道具 ID |
| item_type | varchar(50) | NOT NULL | 道具类型,如 `firework`、`megaphone` |
| quantity | int | DEFAULT 1 | 本次贡献数量 |
| crystal_spent | bigint | NOT NULL | 花费的水晶数量 |
| contribution_points | bigint | NOT NULL | 贡献值 |
| created_at | bigint | NOT NULL, INDEX | 创建时间(毫秒时间戳) |
**索引设计**
| 索引名 | 索引字段 | 类型 | 用途 |
|--------|----------|------|------|
| idx_activity_id | (activity_id) | BTREE | 按活动 ID 筛选 |
| idx_activity_created | (activity_id, created_at DESC, id DESC) | BTREE | 按活动时间降序+ID降序查询,支持 since_timestamp + since_id 轮询 |
> `created_at DESC` 索引可加速 `WHERE activity_id = ? AND created_at > sinceTimestamp` 查询。
### 2.3 写入时机
`purchaseItem` 成功后,**同步写入**此表。
**连击计数器(Redis)**:
用户每次点击购买道具时,更新 Redis 连击计数器:
- **Key**:`combo:{user_id}:{item_type}`
- **Value**:当前连击数
- **TTL**:3 秒(无新点击则过期)
```
用户点击购买 → Redis INCR combo:{user_id}:{item_type} + EXPIRE 3秒
```
### 2.4 查询时获取连击数
前端轮询时,后端从 Redis 获取当前连击数,合并到返回结果中:
```
GET combo:{user_id}:{item_type} → 返回当前计数
```
**连击逻辑**:
- 同一用户、同一道具:3 秒内的多次点击累加显示
- 不同道具:独立计数,互不影响
---
## 3. 后端 API 设计
### 3.1 接口概述
| 接口 | 方法 | 路径 | 说明 |
|------|------|------|------|
| 获取最新贡献记录 | GET | `/api/v1/activity/:activityId/contributions/latest` | 实时轮询用 |
### 3.2 获取最新贡献记录接口
**请求**
```
GET /api/v1/activity/:activityId/contributions/latest
```
**Query 参数**
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|------|------|------|--------|------|
| since_timestamp | int64 | 否 | 0 | 起始时间戳(毫秒),获取该时间之后的最新记录 |
| since_id | int64 | 否 | 0 | 起始记录 ID(用于同时间戳内的二次筛选) |
| limit | int32 | 否 | 1 | 每次拉取条数,最大 5 |
**响应(成功)**
```json
{
"code": 0,
"message": "success",
"data": {
"records": [
{
"id": 12345,
"activity_id": 1001,
"user_id": 888,
"user_nickname": "小明",
"user_avatar": "https://example.com/avatar.jpg",
"item_type": "firework",
"item_name": "烟花",
"item_icon": "https://example.com/firework.png",
"quantity": 3,
"contribution_points": 3,
"combo_count": 2,
"created_at": 1747133400000
}
],
"latest_timestamp": 1747133400000,
"latest_id": 12345
}
}
```
**字段说明**:
| 字段 | 说明 |
|------|------|
| quantity | 本次 contribution 的实际贡献数量 |
| combo_count | 从 Redis 获取的当前连击数(3秒内同用户同道具的累计点击) |
**响应(失败)**
```json
{
"code": 40401,
"message": "活动不存在",
"data": null
}
```
### 3.3 接口流程
```
请求到达
│
▼
参数校验(activityId 格式、limit 范围)
│
▼
Gateway 调用 ActivityService(直接查 Repository)
│
▼
查询 activity_contributions 表,并联表查询用户信息和道具信息
SELECT c.id, c.activity_id, c.user_id,
u.nickname as user_nickname, u.avatar_url as user_avatar,
i.item_type, i.item_name, i.icon_url as item_icon,
c.quantity, c.contribution_points, c.created_at
FROM activity_contributions c
LEFT JOIN users u ON c.user_id = u.id
LEFT JOIN activity_items i ON c.item_id = i.id
WHERE c.activity_id = ?
AND (c.created_at > ? OR (c.created_at = ? AND c.id > ?))
ORDER BY c.created_at DESC, c.id DESC
LIMIT limit
│
▼
组装响应
latest_timestamp = 本次返回的最新记录时间戳
latest_id = 本次返回的最大 ID
combo_count = 从 Redis 获取连击数
│
▼
返回 code: 0 + records + latest_timestamp + latest_id
```
> 使用时间戳 + ID 双重条件查询,好处是:
> 1. 用户退出页面再进来时,用列表中最新记录的时间戳和 ID 作为起点
> 2. 同一毫秒内多条记录时,用 ID 做二次筛选,不会漏数据
> 3. `ORDER BY created_at DESC, id DESC` 保证同时间戳内 ID 大的在前
### 3.4 后端实现
#### Gateway 层(HTTP API)
新增 Controller:`gateway/controller/contribution_controller.go`
```go
package controller
type ContributionController struct {
db *gorm.DB
}
func NewContributionController(db *gorm.DB) *ContributionController
// GetLatestContributions 获取最新贡献记录
// @Summary 获取活动最新贡献记录(实时轮询用)
// @Tags contributions
// @Accept json
// @Produce json
// @Security BearerAuth
// @Param activityId path int64 true "活动ID"
// @Param since_timestamp query int64 false "起始时间戳(毫秒)"
// @Param since_id query int64 false "起始记录ID(用于同时间戳内的二次筛选)"
// @Param limit query int32 false "拉取条数,默认1,最大5"
// @Success 200 {object} response.Response
// @Router /api/v1/activity/{activityId}/contributions/latest [get]
func (ctrl *ContributionController) GetLatestContributions(c *gin.Context)
```
#### Repository 层
```go
// services/activityService/repository/activity_repository.go
// GetLatestContributions 获取最新的贡献记录(用于实时轮询)
// sinceTimestamp: 起始时间戳,sinceId: 起始记录ID(用于同时间戳内的二次筛选)
func (r *activityRepository) GetLatestContributions(activityID int64, sinceTimestamp int64, sinceId int64, limit int) ([]*models.ActivityContribution, int64, int64, error) {
query := r.db.Model(&models.ActivityContribution{}).
Where("activity_id = ?", activityID)
if sinceTimestamp > 0 && sinceId > 0 {
// 同一毫秒内用 ID 做二次筛选
query = query.Where("(created_at > ? OR (created_at = ? AND id > ?))", sinceTimestamp, sinceTimestamp, sinceId)
} else if sinceTimestamp > 0 {
query = query.Where("created_at > ?", sinceTimestamp)
}
var contributions []*models.ActivityContribution
if err := query.Order("created_at DESC, id DESC").Limit(limit).Find(&contributions).Error; err != nil {
return nil, 0, 0, err
}
// 返回本次返回的最新时间戳和最大 ID
var latestTimestamp, latestId int64
if len(contributions) > 0 {
latestTimestamp = contributions[0].CreatedAt
latestId = contributions[0].Id
}
return contributions, latestTimestamp, latestId, nil
}
```
**获取连击数(Service 层)**:
```go
func (s *ActivityService) GetComboCount(userID int64, itemType string) (int64, error) {
key := fmt.Sprintf("combo:%d:%s", userID, itemType)
count, err := s.redis.Get(key).Int64()
if err != nil || count == 0 {
return 1, nil // 无连击时返回 1
}
return count, nil
}
```
#### 写入时机(purchaseItem 改造)
当前 `PurchaseItem` 成功后已调用 `CreateContribution`,需确保冗余字段(user_nickname、user_avatar、item_name、item_icon)一并写入。
**连击计数**:
```go
// Redis INCR + EXPIRE
rdb.Incr(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType))
rdb.Expire(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType), 3*time.Second)
```
#### 查询时获取连击数
```go
// 从 Redis 获取 combo_count
comboCount, _ := rdb.Get(ctx, fmt.Sprintf("combo:%d:%s", userID, itemType)).Int64()
```
---
## 4. 前端架构设计
### 4.1 目录结构
```
frontend/pages/support-activity/
├── components/
│ └── ContributionList.vue # 新增:贡献列表组件
├── composables/
│ └── useContributionPolling.js # 新增:贡献轮询逻辑
└── index.vue
```
### 4.2 数据流
```
页面可见
│
▼
useContributionPolling.start()
│
▼
首次全量拉取:GET /api/v1/activity/{id}/contributions/latest?since_timestamp=0&since_id=0&limit=5
│
▼
records = 返回的最新记录(最多 5 条)
latestTimestamp = 本次返回的最新时间戳
latestId = 本次返回的最大 ID
│
▼
定时轮询(每秒)
│
▼
GET /api/v1/activity/{id}/contributions/latest?since_timestamp={latestTimestamp}&since_id={latestId}&limit=1
│
▼
比对 records:
- 有新记录(时间戳更新 或 同时间戳内 ID 更新)→ 插入到列表头部,移除超出的记录,重置所有记录计时器
- 无新记录 → 不更新,计时器继续倒计时
│
▼
更新 latestTimestamp 和 latestId
页面不可见
│
▼
useContributionPolling.stop() — 停止轮询,保留 latestTimestamp 和 records
页面重新可见
│
▼
useContributionPolling.start() — 直接用现有的 latestTimestamp 继续轮询
```
### 4.3 轮询策略
| 参数 | 值 | 说明 |
|------|-----|------|
| 轮询间隔 | 1000ms | 每秒拉取一次,确保实时感 |
| 首次请求 | 全量拉取 5 条最新记录 | 获取最近 5 条实时贡献 |
| 轮询请求 | limit=1 | 每秒拉取 1 条,保持实时感 |
| 列表上限 | 5 条 | 超出后移除最旧记录 |
| 消失计时 | 每条记录 5 秒无更新则逐条淡出消失 | 无新数据时,最早那条记录 5 秒后先消失,后续记录依次前移并同样计时 |
| 暂停条件 | onHide | 页面隐藏时停止轮询,保留 latestTimestamp、latestId 和 records |
| 恢复条件 | onShow | 页面显示时继续轮询,使用现有的 latestTimestamp 和 latestId |
| 切换活动 | - | activityId 变化时,调用 reset() 重置所有状态 |
---
## 5. 组件设计
### 5.1 ContributionList.vue
**功能**
- 展示贡献记录滚动列表,最新记录在顶部
- 页面不可见时整体隐藏(v-if)
- 新记录有淡入动画,记录超时淡出消失
**Props**
| 属性 | 类型 | 必填 | 说明 |
|------|------|------|------|
| activityId | string | 是 | 活动 ID |
**模板结构**
```vue
{{ record.user_nickname }}
贡献了
{{ record.item_name }}
x{{ record.combo_count > 1 ? record.combo_count : record.quantity }}
```
**样式要点**
- 固定高度(建议 200rpx),超出部分滚动
- 半透明背景(rgba(0,0,0,0.3))
- 每条记录一行:头像 + 昵称 + "贡献了" + 道具图标 + 名称 + 数量
- 新记录(index === 0)有 0.3s 淡入动画
- 消失时添加 fading 类,0.5s 淡出动画
### 5.2 useContributionPolling.js
**功能**
- 管理贡献记录的轮询逻辑
- 提供 start / stop 方法
- 自动处理增量拉取和列表更新
- 每条记录独立计时,超时淡出消失
**接口**
```javascript
// 输入
activityId: Ref
// 输出
{
records, // Ref — 当前展示的贡献列表
visible, // Ref — 列表是否可见(用于 v-if 控制)
loading, // Ref — 是否正在加载
error, // Ref — 错误信息
start, // () => void — 开始轮询
stop, // () => void — 停止轮询(保留状态)
reset // () => void — 重置所有状态(切换活动时用)
}
// 内部状态
{
latestTimestamp, // number — 当前列表中最新记录的时间戳(继续轮询的起点)
latestId, // number — 当前列表中最新记录的 ID(同时间戳内二次筛选用)
pollingTimer, // number — setInterval 返回的 timer ID
isPolling, // boolean — 是否正在轮询
recordTimers, // Map — 记录 ID -> 定时器
}
```
**核心逻辑**
```javascript
const MAX_RECORDS = 5
const POLL_INTERVAL = 1000 // 每秒拉取
const RECORD_TTL = 5000 // 每条记录 5 秒后消失
let latestTimestamp = 0 // 初始为 0,获取所有最新记录
let latestId = 0 // 初始为 0
const recordTimers = new Map() // 记录 ID -> 定时器
function resetRecordTimer(record) {
// 清除已有定时器
if (recordTimers.has(record.id)) {
clearTimeout(recordTimers.get(record.id))
recordTimers.delete(record.id)
}
// 清除淡出状态(新数据到来时重置)
const target = records.value.find(r => r.id === record.id)
if (target) {
target.fading = false
}
// 设置新的消失定时器
const timer = setTimeout(() => {
// 标记为淡出状态
const target = records.value.find(r => r.id === record.id)
if (target) {
target.fading = true
// 等待动画完成后移除
setTimeout(() => {
records.value = records.value.filter(r => r.id !== record.id)
recordTimers.delete(record.id)
}, 500)
} else {
recordTimers.delete(record.id)
}
}, RECORD_TTL)
recordTimers.set(record.id, timer)
}
async function fetchLatest() {
// 拉取 since_timestamp 和 since_id 之后的最新记录
const res = await fetchActivityContributionsLatest(activityId.value, latestTimestamp, latestId, 1)
if (res.records.length === 0) return
const newRecord = res.records[0]
// 检测到新记录(时间戳更新,或时间戳相同但 ID 更新)
const isNew = newRecord.created_at > latestTimestamp ||
(newRecord.created_at === latestTimestamp && newRecord.id > latestId)
if (isNew) {
// 重置所有现有记录的计时器(新数据到来,刷新列表)
records.value.forEach(resetRecordTimer)
// 新记录插入到列表头部
records.value = [newRecord, ...records.value].slice(0, MAX_RECORDS)
// 为新记录启动消失计时器
resetRecordTimer(newRecord)
// 更新时间戳和 ID
latestTimestamp = newRecord.created_at
latestId = newRecord.id
}
}
function start() {
if (pollingTimer) return
isPolling.value = true
// 如果 latestTimestamp 为 0,说明是首次或切换活动,清空列表
if (latestTimestamp === 0) {
records.value = []
}
fetchLatest()
pollingTimer = setInterval(fetchLatest, POLL_INTERVAL)
}
function stop() {
if (pollingTimer) {
clearInterval(pollingTimer)
pollingTimer = null
}
// 停止所有记录的计时器
recordTimers.forEach(timer => clearTimeout(timer))
recordTimers.clear()
isPolling.value = false
// 不清空 records、latestTimestamp、latestId,保留状态以便恢复
}
function reset() {
// 切换活动时调用,重置所有状态
stop()
latestTimestamp = 0
latestId = 0
records.value = []
}
// 监听页面显示/隐藏
onShow(() => {
visible.value = true
start()
})
onHide(() => {
visible.value = false
stop()
})
```
> `fetchActivityContributionsLatest` 调用 `GET /api/v1/activity/{id}/contributions/latest?since_timestamp={sinceTimestamp}&since_id={sinceId}&limit={limit}`
>
> 新增 `reset()` 方法,切换活动时调用,重置所有状态。
---
## 6. 页面集成
### 6.1 组件引入
在 `ThemeBanner` 下方、`StageArea` 上方引入:
```vue
```
### 6.2 样式控制
```css
.contribution-list-wrapper {
width: 100%;
padding: 0 24rpx;
}
.contribution-item.fading-out {
animation: fadeOut 0.5s forwards;
}
@keyframes fadeOut {
from { opacity: 1; }
to { opacity: 0; }
}
```
---
## 7. 性能与优化
| 优化点 | 方案 |
|--------|------|
| 列表更新 | 新记录插入到头部,使用展开运算符 |
| 去重 | 以 `record.id` 作为 v-for 的 :key + 时间戳+ID 双重比对 |
| 列表上限 | 超过 5 条时移除最旧记录 |
| 记录消失 | 每条记录 5 秒无更新则淡出消失 |
| 内存占用 | 页面隐藏时停止计时器,保留 records |
| 轮询策略 | 每秒拉取 1 条,保持实时感 |
| 同毫秒去重 | 使用 `since_timestamp + since_id` 双重条件,同一毫秒多条不漏 |
---
## 8. 错误处理
| 场景 | 处理方式 |
|------|----------|
| 接口返回 404(活动不存在) | 组件隐藏,不影响页面其他功能 |
| 接口返回 500(服务器错误) | 静默忽略,继续等待下一次 |
| 网络断开 | 页面 onHide 会停止轮询;恢复后 onShow 继续轮询 |
| 返回数据为空 | 不更新列表,不更新 latestTimestamp 和 latestId |
| 切换活动 | 调用 reset() 重置所有状态 |
| 同毫秒多条记录 | 使用 since_timestamp + since_id 双重筛选,不会漏数据 |
| 连击显示 | combo_count 从 Redis 获取,3秒内同用户同道具累加 |
---
## 9. 实施步骤
### 9.1 后端实现
1. [x] 在 `activity_contributions` 表联表查询用户信息和道具信息(LEFT JOIN users, LEFT JOIN activity_items)
2. [ ] 在 Redis 中实现连击计数器 `combo:{user_id}:{item_type}`,TTL 3秒
3. [ ] 在 `activity_repository.go` 添加 `GetLatestContributions` 方法
4. [ ] 在 Gateway 层添加 `contribution_controller.go`
5. [ ] 在路由注册新接口 `/api/v1/activity/:activityId/contributions/latest`
6. [ ] 验证 `purchaseItem` 后 `CreateContribution` 正常工作,并更新 Redis 连击计数器
### 9.2 前端 API 层
1. [ ] 在 `frontend/utils/activity-config.js` 中新增 `fetchActivityContributionsLatest` 方法
### 9.3 前端组件
1. [ ] 创建 `useContributionPolling.js`
2. [ ] 创建 `ContributionList.vue`
### 9.4 页面集成
1. [ ] 在 `index.vue` 中引入 `ContributionList`
### 9.5 测试验证
1. [ ] 轮询正常:每秒间隔拉取数据
2. [ ] 增量正确:新贡献出现时列表头部插入
3. [ ] 消失逻辑:无新数据 5 秒后记录淡出消失
4. [ ] 页面切换:onHide 停止,onShow 继续(latestTimestamp 保持)
5. [ ] 列表上限:超过 5 条时移除最旧记录
---
---
## 11. Redis 缓存 + 时间窗口合并 + 分布式锁优化
### 11.1 背景与目标
#### 背景
- 前端已实现每秒轮询 `GET /api/v1/activity/:activityId/contributions/latest`
- 如果 10,000 用户同时轮询,每秒 10,000 次 DB 查询,存在性能瓶颈
#### 目标
- **秒级合并**:10s/12s/13s 的查询请求,统一在 10s 窗口执行一次 DB 查询
- **有新数据立即返回**:如果缓存中检测到新记录,直接返回最新数据
- **无新数据合并查询**:窗口内无写入时,后续请求复用缓存结果
#### 性能指标
- 10k 并发轮询 → 实际 DB 查询频率 ≤ 1次/秒(正常状态)
- 缓存未命中时首个请求有 100-300ms 延迟,后续请求 < 5ms
- 缓存 TTL 5秒,滚动窗口
### 11.2 整体流程
```
用户轮询请求 (10k 并发)
│
▼
Gateway API
│
▼
查询 Redis 缓存
activity:{id}:contributions:latest
│
├── 有缓存 + 窗口有效(now_ms - updated_at < 1000ms)
│ │
│ ▼
│ 检查 sinceTimestamp:
│ - sinceTimestamp <= updated_at → 缓存数据够新,直接返回
│ - sinceTimestamp > updated_at → 继续查 DB(数据可能不够新)
│
└── 缓存不存在 / 过期
│
▼
SETNX 加分布式锁
lock: activity:{id}:contributions:lock
(5秒自动释放,防止死锁)
│
├── 获取锁成功
│ │
│ ▼
│ 查询 PostgreSQL
│ 回填 Redis (TTL=5秒)
│ 释放锁
│ │
│ ▼
│ 返回数据
│
└── 获取锁失败
│
▼
等待 100ms 重试
(最多 3 次,避免长时间等待)
```
### 11.3 Redis Key 设计
| Key | 类型 | 说明 | TTL |
|-----|------|------|-----|
| `activity:{activityId}:contributions:latest` | Hash | 最新贡献记录缓存 | 5秒 |
| `activity:{activityId}:contributions:lock` | String | 分布式锁 | 5秒 |
### 11.4 缓存数据结构
```json
{
"records": [
{
"id": 12345,
"user_id": 1001,
"user_nickname": "用户昵称",
"user_avatar": "https://...",
"item_id": 1,
"item_type": "gift_flower",
"item_name": "玫瑰花",
"item_icon": "https://...",
"quantity": 10,
"crystal_spent": 100,
"contribution_points": 50,
"combo_count": 2,
"created_at": 1747133400000
}
],
"updated_at": 1747133400000,
"latest_id": 12345
}
```
**字段说明:**
- `records`: 最新贡献记录数组(最多 5 条),**每条记录的 combo_count 已合并**
- `updated_at`: 窗口时间戳(毫秒级 Unix 时间戳),用于判断是否在有效窗口内
- `latest_id`: 最新记录的 ID,用于增量检测
**时间戳单位**:统一使用**毫秒**,与前端 `sinceTimestamp` 参数单位一致
### 11.5 查询流程(伪代码)
```go
func GetContributionsLatest(activityId int64, sinceTimestamp int64, sinceId int64, limit int) ([]*ContributionRecord, error) {
ctx := context.Background()
cacheKey := fmt.Sprintf("activity:%d:contributions:latest", activityId)
lockKey := fmt.Sprintf("activity:%d:contributions:lock", activityId)
nowMs := time.Now().UnixMilli()
// 1. 尝试获取缓存
cached := redis.Get(ctx, cacheKey)
if cached != nil {
cache := parseCache(cached)
// 缓存有效(1秒窗口内)→ 检查 sinceTimestamp 是否在窗口内
if cache != nil && nowMs-cache.UpdatedAt < 1000 {
// sinceTimestamp <= updated_at,说明请求的数据在缓存窗口内,直接返回
if sinceTimestamp <= cache.UpdatedAt {
return cache.Records, nil
}
// sinceTimestamp > updated_at,缓存数据可能不够新,继续查 DB
}
}
// 2. 缓存不存在或过期或数据不够新,尝试加锁
locked := redis.SetNX(ctx, lockKey, "1", 5*time.Second)
if !locked {
// 3. 获取锁失败,等待重试
for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
cached := redis.Get(ctx, cacheKey)
if cached != nil {
cache := parseCache(cached)
if cache != nil && nowMs-cache.UpdatedAt < 1000 && sinceTimestamp <= cache.UpdatedAt {
return cache.Records, nil
}
}
}
// 重试 3 次后仍失败,返回错误或旧缓存
return nil, errors.New("cache unavailable after retry")
}
// 4. 获取锁成功,查 DB
defer redis.Del(ctx, lockKey)
records, err := db.QueryContributions(activityId, sinceTimestamp, sinceId, limit)
if err != nil {
return nil, err
}
// 5. 回填缓存(TTL 5秒)
if len(records) > 0 {
// 为每条记录获取最新的 combo_count
for _, record := range records {
comboCount, _ := redis.Get(ctx, fmt.Sprintf("combo:%d:%s", record.UserId, record.ItemType)).Int64()
if comboCount > 0 {
record.ComboCount = comboCount
}
}
cache := &ContributionCache{
Records: records,
UpdatedAt: nowMs,
LatestId: records[0].Id,
}
redis.Set(ctx, cacheKey, cache, 5*time.Second)
}
return records, nil
}
```
### 11.6 写入流程(使缓存失效)
在 `PurchaseItem` 成功写入 `activity_contributions` 后:
```go
func (s *activityService) PurchaseItem(...) error {
// ... 原有的购买逻辑 ...
// 1. 写数据库
err := s.repo.CreateContribution(contribution)
if err != nil {
return err
}
// 2. 使缓存失效(不是更新,是删除)
cacheKey := fmt.Sprintf("activity:%d:contributions:latest", activityId)
redis.Del(ctx, cacheKey)
return nil
}
```
**为什么不更新缓存而是删除?**
- 如果更新缓存,需要处理并发写入的 race condition
- 删除缓存让下次查询触发重建,逻辑更简单且正确
### 11.7 窗口合并策略
#### 时间窗口对齐
- 窗口粒度:**1 秒**(1000 毫秒,可调整)
- 查询时:如果 `now_ms - cache.updated_at < 1000ms`,认为是同一窗口
- 超过 1 秒:缓存过期,下次查询触发重建
#### sinceTimestamp 过滤逻辑
- 前端传 `sinceTimestamp`(毫秒)进行增量查询
- 缓存命中时,判断 `sinceTimestamp <= cache.updated_at`:
- `true` → 请求的数据在缓存窗口内,直接返回缓存
- `false` → 缓存数据不够新,继续查 DB
#### 有新数据时的处理
- 后端对比 `sinceId`:
- `sinceId > cache.latest_id` → 有新数据,返回最新记录
- `sinceId <= cache.latest_id` → 无新数据,返回缓存数据
#### combo_count 合并
- 缓存回填时,从 Redis 获取每条记录的 `combo:{user_id}:{item_type}` 值
- 合并到 `record.combo_count` 字段
- 如果 Redis 无值,视为 1
### 11.8 分布式锁设计
#### 锁 Key
```
lock: activity:{activityId}:contributions:lock
```
#### 锁参数
- **TTL**: 5 秒(防止进程崩溃导致死锁)
- **重试**: 获取失败后等待 100ms 重试,最多 3 次
- **释放**: 使用后立即删除(`DEL` 命令)
#### 可靠性说明
- 锁仅用于防止**缓存击穿**(cache stampede)
- 锁持有时间极短(一次 DB 查询,约 10-50ms)
- 单实例 Redis 足够,无需 Redlock
### 11.9 错误处理
| 场景 | 处理方式 |
|------|----------|
| Redis 不可用 | 回退到直接查 DB(降级) |
| 获取锁失败 + 重试 3 次后仍失败 | 返回 503 Service Unavailable 或返回旧缓存 |
| DB 查询失败 | 返回错误,前端显示重试 |
| 缓存为空(无数据) | 返回空数组,不缓存 |
### 11.10 影响范围
#### 需要修改的文件
1. **Gateway 层**
- `gateway/controller/activity_controller.go` — 添加 `GetContributionsLatest` 方法(如果尚未添加)
2. **Service 层**
- `services/activityService/provider/activity_provider.go` — 实现缓存逻辑 + 锁逻辑
3. **Repository 层**
- `services/activityService/repository/activity_repository.go` — `GetLatestContributions` 查询方法(如果尚未添加)
4. **Proto 定义**(如使用 Dubbo RPC)
- `pkg/proto/activity/activity.proto` — 添加 `GetContributionsLatestRequest/Response`
- 重新生成 `activity.pb.go`
#### 不需要修改的文件
- 前端 `ContributionList.vue` 和 `useContributionPolling.js` 无需改动(接口兼容)
### 11.11 测试要点
1. **并发测试**:10k 请求同时发起,验证 DB 只查询 1 次
2. **缓存失效测试**:Purchase 后,验证缓存被正确删除
3. **锁竞争测试**:缓存失效瞬间,多个请求抢锁,验证只有一个请求查 DB
4. **降级测试**:Redis 不可用时,验证服务能回退到直连 DB
5. **增量查询测试**:传入 `sinceId`,验证只返回增量数据
### 11.12 后续优化(可选)
1. **多级缓存**:引入本地内存缓存(如 Go 的 `sync.Map`),减少 Redis 请求
2. **窗口动态调整**:根据并发量动态调整窗口大小
3. **监控告警**:监控缓存命中率、锁等待时间、DB 查询 QPS