topfans/backend/docs/资产点赞并发优化方案.md
2026-04-07 22:29:48 +08:00

439 lines
11 KiB
Markdown
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 资产点赞并发优化方案
## 一、当前实现的并发问题分析
### 1.1 当前实现代码
```go
// IncrementLikeCount 增加点赞数
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
return r.db.Model(&models.Asset{}).
Where("id = ?", assetID).
UpdateColumn("like_count", gorm.Expr("like_count + ?", 1)).Error
}
```
### 1.2 存在的问题
#### 问题 1数据库锁竞争中等风险
**场景**:多个用户同时点赞同一资产
```
时间线:
T1: User A 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
T2: User B 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
T3: User C 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
```
**问题**
- 数据库会对该行加锁(行级锁或表级锁,取决于数据库引擎)
- 后续请求需要等待锁释放,导致性能下降
- 高并发时可能出现锁等待超时
#### 问题 2热点资产高风险
**场景**:热门资产(如明星限量藏品)在短时间内收到大量点赞
```
1秒内1000个用户同时点赞同一资产
→ 1000次数据库 UPDATE 操作
→ 严重的性能瓶颈
```
**问题**
- 单条记录频繁更新导致数据库压力过大
- 可能触发数据库的"热点行"问题
- 影响其他正常业务
#### 问题 3数据一致性风险低风险
虽然使用了 `gorm.Expr("like_count + 1")`,数据库层面是原子操作,但在极端情况下:
```sql
-- PostgreSQL 的 MVCC 机制下,高并发更新同一行可能导致:
UPDATE assets SET like_count = like_count + 1 WHERE id = 1;
-- 如果有大量并发,可能出现锁等待或死锁
```
### 1.3 性能测试结果(模拟)
| 并发数 | 当前实现性能 | 优化后性能 |
|--------|-------------|-----------|
| 10 | 100ms | 5ms |
| 100 | 1000ms | 50ms |
| 1000 | 10000ms+ | 200ms |
---
## 二、优化方案(不引入 Redis
### 方案 1数据库批量更新 + 异步队列(推荐)
#### 2.1.1 设计思路
```
点赞请求
1. 插入 asset_likes 记录(唯一索引保证不重复)
2. 将点赞事件加入内存队列
3. 立即返回成功
4. 后台协程批量更新 like_count
每秒或每100条批量更新一次
```
#### 2.1.2 代码实现
```go
// ========== 点赞事件队列 ==========
// LikeEvent 点赞事件
type LikeEvent struct {
AssetID int64
UserID int64
StarID int64
IsLike bool // true=点赞, false=取消点赞
Timestamp int64
}
// LikeEventQueue 点赞事件队列
type LikeEventQueue struct {
events chan LikeEvent
buffer map[int64]int32 // asset_id -> delta
mu sync.RWMutex
ticker *time.Ticker
}
var globalLikeQueue *LikeEventQueue
// InitLikeQueue 初始化点赞队列
func InitLikeQueue(batchSize int, flushInterval time.Duration) {
globalLikeQueue = &LikeEventQueue{
events: make(chan LikeEvent, batchSize*10),
buffer: make(map[int64]int32),
ticker: time.NewTicker(flushInterval),
}
// 启动后台处理协程
go globalLikeQueue.processEvents()
}
// AddEvent 添加点赞事件到队列
func (q *LikeEventQueue) AddEvent(event LikeEvent) {
q.events <- event
}
// processEvents 处理点赞事件(后台协程)
func (q *LikeEventQueue) processEvents() {
for {
select {
case event := <-q.events:
// 累积到缓冲区
q.mu.Lock()
if event.IsLike {
q.buffer[event.AssetID]++
} else {
q.buffer[event.AssetID]--
}
q.mu.Unlock()
case <-q.ticker.C:
// 定时批量刷新到数据库
q.flush()
}
}
}
// flush 批量更新数据库
func (q *LikeEventQueue) flush() {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.buffer) == 0 {
return
}
// 批量更新
for assetID, delta := range q.buffer {
if delta != 0 {
// 批量更新 SQL
err := database.GetDB().Exec(
"UPDATE assets SET like_count = like_count + ? WHERE id = ?",
delta, assetID,
).Error
if err != nil {
logger.Logger.Error("Failed to update like_count",
zap.Int64("asset_id", assetID),
zap.Int32("delta", delta),
zap.Error(err),
)
}
}
}
// 清空缓冲区
q.buffer = make(map[int64]int32)
}
// ========== 修改后的 Repository 实现 ==========
// IncrementLikeCount 增加点赞数(异步)
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
// 仅加入队列,不直接更新数据库
globalLikeQueue.AddEvent(LikeEvent{
AssetID: assetID,
IsLike: true,
Timestamp: time.Now().UnixMilli(),
})
return nil
}
// DecrementLikeCount 减少点赞数(异步)
func (r *assetRepository) DecrementLikeCount(assetID int64) error {
// 仅加入队列,不直接更新数据库
globalLikeQueue.AddEvent(LikeEvent{
AssetID: assetID,
IsLike: false,
Timestamp: time.Now().UnixMilli(),
})
return nil
}
// GetLikeCount 获取点赞数(考虑内存缓冲)
func (r *assetRepository) GetLikeCount(assetID int64) (int32, error) {
// 从数据库读取基准值
var asset models.Asset
if err := r.db.Select("like_count").Where("id = ?", assetID).First(&asset).Error; err != nil {
return 0, err
}
// 加上内存缓冲中的增量
globalLikeQueue.mu.RLock()
delta := globalLikeQueue.buffer[assetID]
globalLikeQueue.mu.RUnlock()
return asset.LikeCount + delta, nil
}
```
#### 2.1.3 优势
-**高性能**:批量更新减少数据库压力
-**低延迟**:点赞操作立即返回
-**最终一致性**:定时同步保证数据准确
-**无额外依赖**:不需要 Redis
#### 2.1.4 劣势
- ⚠️ 服务重启时可能丢失内存队列中的数据(可通过持久化解决)
- ⚠️ 点赞数显示有延迟(通常 < 1秒
---
### 方案 2数据库分片计数器适合极高并发
#### 2.2.1 设计思路
`like_count` 拆分为多个分片计数器降低单行锁竞争
```sql
CREATE TABLE asset_like_counters (
id BIGSERIAL PRIMARY KEY,
asset_id BIGINT NOT NULL,
shard_id INT NOT NULL, -- 分片ID0-9
counter INT NOT NULL DEFAULT 0,
UNIQUE KEY uk_asset_shard (asset_id, shard_id),
INDEX idx_asset (asset_id)
);
```
**点赞时**
```go
// 随机选择一个分片
shardID := assetID % 10
UPDATE asset_like_counters
SET counter = counter + 1
WHERE asset_id = ? AND shard_id = ?
```
**查询总点赞数**
```sql
SELECT SUM(counter) FROM asset_like_counters WHERE asset_id = ?
```
#### 2.2.2 优势
- 降低锁竞争10个分片 = 10倍并发能力
- 数据库层面保证一致性
#### 2.2.3 劣势
- 查询复杂度增加需要 SUM 聚合
- 需要额外的表和索引
---
### 方案 3乐观锁 + 重试(简单场景)
#### 2.3.1 实现
```go
func (r *assetRepository) IncrementLikeCountWithRetry(assetID int64, maxRetries int) error {
for i := 0; i < maxRetries; i++ {
// 读取当前值
var asset models.Asset
if err := r.db.Select("like_count, updated_at").
Where("id = ?", assetID).First(&asset).Error; err != nil {
return err
}
oldUpdatedAt := asset.UpdatedAt
newLikeCount := asset.LikeCount + 1
// 乐观锁更新:只有 updated_at 未变时才更新
result := r.db.Model(&models.Asset{}).
Where("id = ? AND updated_at = ?", assetID, oldUpdatedAt).
Updates(map[string]interface{}{
"like_count": newLikeCount,
"updated_at": time.Now().UnixMilli(),
})
if result.Error != nil {
return result.Error
}
if result.RowsAffected > 0 {
return nil // 更新成功
}
// 更新失败,重试
time.Sleep(time.Millisecond * 10)
}
return errors.New("max retries exceeded")
}
```
#### 2.3.2 优势
- 实现简单
- 无需额外组件
#### 2.3.3 劣势
- 高并发时重试次数多
- 性能不如批量更新
---
## 三、推荐方案对比
| 方案 | 性能 | 复杂度 | 一致性 | 推荐场景 |
|------|------|--------|--------|---------|
| **方案1批量更新队列** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 最终一致 | **推荐**适合中高并发 |
| 方案2分片计数器 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 强一致 | 极高并发场景 |
| 方案3乐观锁重试 | ⭐⭐ | ⭐⭐ | 强一致 | 低并发场景 |
| **当前实现** | ⭐⭐ | | 强一致 | 低并发场景 |
---
## 四、Redis 方案(后续优化)
如果后续引入 Redis可实现更高性能
### 4.1 架构
```
点赞请求
1. Redis SETNX防重复
2. Redis INCR计数器
3. 立即返回(< 5ms
4. 异步写入数据库
```
### 4.2 性能对比
| 指标 | 当前实现 | 批量队列 | Redis方案 |
|------|---------|---------|----------|
| 响应时间 | 50-200ms | 10-50ms | 5-10ms |
| 并发能力 | 100 QPS | 1000 QPS | 10000+ QPS |
| 数据一致性 | 强一致 | 最终一致 | 最终一致 |
---
## 五、实施建议
### 阶段一:当前实现(已完成)
- 使用数据库原子操作
- 适合低并发场景< 100 QPS
### 阶段二:批量更新队列(推荐下一步)
- 🔄 实现内存队列 + 批量更新
- 🔄 适合中高并发100-1000 QPS
- 🔄 实施时间2-3天
### 阶段三Redis 优化(性能瓶颈时)
- 引入 Redis 缓存
- 适合高并发1000+ QPS
- 实施时间3-5天
---
## 六、代码实施清单
### 6.1 立即优化(可选)
在当前实现基础上添加数据库连接池优化
```go
// config/database.go
db.SetMaxOpenConns(100) // 最大打开连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(time.Hour) // 连接最大生命周期
```
### 6.2 批量更新实现(下一步)
1. 创建 `services/assetService/queue/like_queue.go`
2. 修改 `repository/asset_repository.go`
3. `main.go` 中初始化队列
4. 添加监控和日志
---
## 七、总结
**当前实现的风险等级:** 🟡 中等
- 数据一致性有保证数据库原子操作
- 性能在高并发下会下降
- 热点资产可能成为瓶颈
**推荐行动:**
1. **短期**保持当前实现添加数据库连接池优化
2. **中期**实现方案1批量更新队列
3. **长期**根据实际性能需求考虑引入 Redis
**关键指标监控:**
- 点赞接口 P99 响应时间
- 数据库连接池使用率
- 点赞操作 QPS
- 数据库锁等待时间