439 lines
11 KiB
Markdown
439 lines
11 KiB
Markdown
# 资产点赞并发优化方案
|
||
|
||
## 一、当前实现的并发问题分析
|
||
|
||
### 1.1 当前实现代码
|
||
|
||
```go
|
||
// IncrementLikeCount 增加点赞数
|
||
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
|
||
return r.db.Model(&models.Asset{}).
|
||
Where("id = ?", assetID).
|
||
UpdateColumn("like_count", gorm.Expr("like_count + ?", 1)).Error
|
||
}
|
||
```
|
||
|
||
### 1.2 存在的问题
|
||
|
||
#### 问题 1:数据库锁竞争(中等风险)
|
||
|
||
**场景**:多个用户同时点赞同一资产
|
||
|
||
```
|
||
时间线:
|
||
T1: User A 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
|
||
T2: User B 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
|
||
T3: User C 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
|
||
```
|
||
|
||
**问题**:
|
||
- 数据库会对该行加锁(行级锁或表级锁,取决于数据库引擎)
|
||
- 后续请求需要等待锁释放,导致性能下降
|
||
- 高并发时可能出现锁等待超时
|
||
|
||
#### 问题 2:热点资产(高风险)
|
||
|
||
**场景**:热门资产(如明星限量藏品)在短时间内收到大量点赞
|
||
|
||
```
|
||
1秒内:1000个用户同时点赞同一资产
|
||
→ 1000次数据库 UPDATE 操作
|
||
→ 严重的性能瓶颈
|
||
```
|
||
|
||
**问题**:
|
||
- 单条记录频繁更新导致数据库压力过大
|
||
- 可能触发数据库的"热点行"问题
|
||
- 影响其他正常业务
|
||
|
||
#### 问题 3:数据一致性风险(低风险)
|
||
|
||
虽然使用了 `gorm.Expr("like_count + 1")`,数据库层面是原子操作,但在极端情况下:
|
||
|
||
```sql
|
||
-- PostgreSQL 的 MVCC 机制下,高并发更新同一行可能导致:
|
||
UPDATE assets SET like_count = like_count + 1 WHERE id = 1;
|
||
-- 如果有大量并发,可能出现锁等待或死锁
|
||
```
|
||
|
||
### 1.3 性能测试结果(模拟)
|
||
|
||
| 并发数 | 当前实现性能 | 优化后性能 |
|
||
|--------|-------------|-----------|
|
||
| 10 | 100ms | 5ms |
|
||
| 100 | 1000ms | 50ms |
|
||
| 1000 | 10000ms+ | 200ms |
|
||
|
||
---
|
||
|
||
## 二、优化方案(不引入 Redis)
|
||
|
||
### 方案 1:数据库批量更新 + 异步队列(推荐)
|
||
|
||
#### 2.1.1 设计思路
|
||
|
||
```
|
||
点赞请求
|
||
│
|
||
▼
|
||
1. 插入 asset_likes 记录(唯一索引保证不重复)
|
||
│
|
||
▼
|
||
2. 将点赞事件加入内存队列
|
||
│
|
||
▼
|
||
3. 立即返回成功
|
||
│
|
||
▼
|
||
4. 后台协程批量更新 like_count
|
||
(每秒或每100条批量更新一次)
|
||
```
|
||
|
||
#### 2.1.2 代码实现
|
||
|
||
```go
|
||
// ========== 点赞事件队列 ==========
|
||
|
||
// LikeEvent 点赞事件
|
||
type LikeEvent struct {
|
||
AssetID int64
|
||
UserID int64
|
||
StarID int64
|
||
IsLike bool // true=点赞, false=取消点赞
|
||
Timestamp int64
|
||
}
|
||
|
||
// LikeEventQueue 点赞事件队列
|
||
type LikeEventQueue struct {
|
||
events chan LikeEvent
|
||
buffer map[int64]int32 // asset_id -> delta
|
||
mu sync.RWMutex
|
||
ticker *time.Ticker
|
||
}
|
||
|
||
var globalLikeQueue *LikeEventQueue
|
||
|
||
// InitLikeQueue 初始化点赞队列
|
||
func InitLikeQueue(batchSize int, flushInterval time.Duration) {
|
||
globalLikeQueue = &LikeEventQueue{
|
||
events: make(chan LikeEvent, batchSize*10),
|
||
buffer: make(map[int64]int32),
|
||
ticker: time.NewTicker(flushInterval),
|
||
}
|
||
|
||
// 启动后台处理协程
|
||
go globalLikeQueue.processEvents()
|
||
}
|
||
|
||
// AddEvent 添加点赞事件到队列
|
||
func (q *LikeEventQueue) AddEvent(event LikeEvent) {
|
||
q.events <- event
|
||
}
|
||
|
||
// processEvents 处理点赞事件(后台协程)
|
||
func (q *LikeEventQueue) processEvents() {
|
||
for {
|
||
select {
|
||
case event := <-q.events:
|
||
// 累积到缓冲区
|
||
q.mu.Lock()
|
||
if event.IsLike {
|
||
q.buffer[event.AssetID]++
|
||
} else {
|
||
q.buffer[event.AssetID]--
|
||
}
|
||
q.mu.Unlock()
|
||
|
||
case <-q.ticker.C:
|
||
// 定时批量刷新到数据库
|
||
q.flush()
|
||
}
|
||
}
|
||
}
|
||
|
||
// flush 批量更新数据库
|
||
func (q *LikeEventQueue) flush() {
|
||
q.mu.Lock()
|
||
defer q.mu.Unlock()
|
||
|
||
if len(q.buffer) == 0 {
|
||
return
|
||
}
|
||
|
||
// 批量更新
|
||
for assetID, delta := range q.buffer {
|
||
if delta != 0 {
|
||
// 批量更新 SQL
|
||
err := database.GetDB().Exec(
|
||
"UPDATE assets SET like_count = like_count + ? WHERE id = ?",
|
||
delta, assetID,
|
||
).Error
|
||
|
||
if err != nil {
|
||
logger.Logger.Error("Failed to update like_count",
|
||
zap.Int64("asset_id", assetID),
|
||
zap.Int32("delta", delta),
|
||
zap.Error(err),
|
||
)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 清空缓冲区
|
||
q.buffer = make(map[int64]int32)
|
||
}
|
||
|
||
// ========== 修改后的 Repository 实现 ==========
|
||
|
||
// IncrementLikeCount 增加点赞数(异步)
|
||
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
|
||
// 仅加入队列,不直接更新数据库
|
||
globalLikeQueue.AddEvent(LikeEvent{
|
||
AssetID: assetID,
|
||
IsLike: true,
|
||
Timestamp: time.Now().UnixMilli(),
|
||
})
|
||
return nil
|
||
}
|
||
|
||
// DecrementLikeCount 减少点赞数(异步)
|
||
func (r *assetRepository) DecrementLikeCount(assetID int64) error {
|
||
// 仅加入队列,不直接更新数据库
|
||
globalLikeQueue.AddEvent(LikeEvent{
|
||
AssetID: assetID,
|
||
IsLike: false,
|
||
Timestamp: time.Now().UnixMilli(),
|
||
})
|
||
return nil
|
||
}
|
||
|
||
// GetLikeCount 获取点赞数(考虑内存缓冲)
|
||
func (r *assetRepository) GetLikeCount(assetID int64) (int32, error) {
|
||
// 从数据库读取基准值
|
||
var asset models.Asset
|
||
if err := r.db.Select("like_count").Where("id = ?", assetID).First(&asset).Error; err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
// 加上内存缓冲中的增量
|
||
globalLikeQueue.mu.RLock()
|
||
delta := globalLikeQueue.buffer[assetID]
|
||
globalLikeQueue.mu.RUnlock()
|
||
|
||
return asset.LikeCount + delta, nil
|
||
}
|
||
```
|
||
|
||
#### 2.1.3 优势
|
||
|
||
- ✅ **高性能**:批量更新减少数据库压力
|
||
- ✅ **低延迟**:点赞操作立即返回
|
||
- ✅ **最终一致性**:定时同步保证数据准确
|
||
- ✅ **无额外依赖**:不需要 Redis
|
||
|
||
#### 2.1.4 劣势
|
||
|
||
- ⚠️ 服务重启时可能丢失内存队列中的数据(可通过持久化解决)
|
||
- ⚠️ 点赞数显示有延迟(通常 < 1秒)
|
||
|
||
---
|
||
|
||
### 方案 2:数据库分片计数器(适合极高并发)
|
||
|
||
#### 2.2.1 设计思路
|
||
|
||
将 `like_count` 拆分为多个分片计数器,降低单行锁竞争。
|
||
|
||
```sql
|
||
CREATE TABLE asset_like_counters (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
asset_id BIGINT NOT NULL,
|
||
shard_id INT NOT NULL, -- 分片ID(0-9)
|
||
counter INT NOT NULL DEFAULT 0,
|
||
|
||
UNIQUE KEY uk_asset_shard (asset_id, shard_id),
|
||
INDEX idx_asset (asset_id)
|
||
);
|
||
```
|
||
|
||
**点赞时**:
|
||
```go
|
||
// 随机选择一个分片
|
||
shardID := assetID % 10
|
||
UPDATE asset_like_counters
|
||
SET counter = counter + 1
|
||
WHERE asset_id = ? AND shard_id = ?
|
||
```
|
||
|
||
**查询总点赞数**:
|
||
```sql
|
||
SELECT SUM(counter) FROM asset_like_counters WHERE asset_id = ?
|
||
```
|
||
|
||
#### 2.2.2 优势
|
||
|
||
- ✅ 降低锁竞争(10个分片 = 10倍并发能力)
|
||
- ✅ 数据库层面保证一致性
|
||
|
||
#### 2.2.3 劣势
|
||
|
||
- ⚠️ 查询复杂度增加(需要 SUM 聚合)
|
||
- ⚠️ 需要额外的表和索引
|
||
|
||
---
|
||
|
||
### 方案 3:乐观锁 + 重试(简单场景)
|
||
|
||
#### 2.3.1 实现
|
||
|
||
```go
|
||
func (r *assetRepository) IncrementLikeCountWithRetry(assetID int64, maxRetries int) error {
|
||
for i := 0; i < maxRetries; i++ {
|
||
// 读取当前值
|
||
var asset models.Asset
|
||
if err := r.db.Select("like_count, updated_at").
|
||
Where("id = ?", assetID).First(&asset).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
oldUpdatedAt := asset.UpdatedAt
|
||
newLikeCount := asset.LikeCount + 1
|
||
|
||
// 乐观锁更新:只有 updated_at 未变时才更新
|
||
result := r.db.Model(&models.Asset{}).
|
||
Where("id = ? AND updated_at = ?", assetID, oldUpdatedAt).
|
||
Updates(map[string]interface{}{
|
||
"like_count": newLikeCount,
|
||
"updated_at": time.Now().UnixMilli(),
|
||
})
|
||
|
||
if result.Error != nil {
|
||
return result.Error
|
||
}
|
||
|
||
if result.RowsAffected > 0 {
|
||
return nil // 更新成功
|
||
}
|
||
|
||
// 更新失败,重试
|
||
time.Sleep(time.Millisecond * 10)
|
||
}
|
||
|
||
return errors.New("max retries exceeded")
|
||
}
|
||
```
|
||
|
||
#### 2.3.2 优势
|
||
|
||
- ✅ 实现简单
|
||
- ✅ 无需额外组件
|
||
|
||
#### 2.3.3 劣势
|
||
|
||
- ⚠️ 高并发时重试次数多
|
||
- ⚠️ 性能不如批量更新
|
||
|
||
---
|
||
|
||
## 三、推荐方案对比
|
||
|
||
| 方案 | 性能 | 复杂度 | 一致性 | 推荐场景 |
|
||
|------|------|--------|--------|---------|
|
||
| **方案1:批量更新队列** | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 最终一致 | **推荐**,适合中高并发 |
|
||
| 方案2:分片计数器 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 强一致 | 极高并发场景 |
|
||
| 方案3:乐观锁重试 | ⭐⭐ | ⭐⭐ | 强一致 | 低并发场景 |
|
||
| **当前实现** | ⭐⭐ | ⭐ | 强一致 | 低并发场景 |
|
||
|
||
---
|
||
|
||
## 四、Redis 方案(后续优化)
|
||
|
||
如果后续引入 Redis,可实现更高性能:
|
||
|
||
### 4.1 架构
|
||
|
||
```
|
||
点赞请求
|
||
│
|
||
▼
|
||
1. Redis SETNX(防重复)
|
||
│
|
||
▼
|
||
2. Redis INCR(计数器)
|
||
│
|
||
▼
|
||
3. 立即返回(< 5ms)
|
||
│
|
||
▼
|
||
4. 异步写入数据库
|
||
```
|
||
|
||
### 4.2 性能对比
|
||
|
||
| 指标 | 当前实现 | 批量队列 | Redis方案 |
|
||
|------|---------|---------|----------|
|
||
| 响应时间 | 50-200ms | 10-50ms | 5-10ms |
|
||
| 并发能力 | 100 QPS | 1000 QPS | 10000+ QPS |
|
||
| 数据一致性 | 强一致 | 最终一致 | 最终一致 |
|
||
|
||
---
|
||
|
||
## 五、实施建议
|
||
|
||
### 阶段一:当前实现(已完成)
|
||
- ✅ 使用数据库原子操作
|
||
- ✅ 适合低并发场景(< 100 QPS)
|
||
|
||
### 阶段二:批量更新队列(推荐下一步)
|
||
- 🔄 实现内存队列 + 批量更新
|
||
- 🔄 适合中高并发(100-1000 QPS)
|
||
- 🔄 实施时间:2-3天
|
||
|
||
### 阶段三:Redis 优化(性能瓶颈时)
|
||
- ⏳ 引入 Redis 缓存
|
||
- ⏳ 适合高并发(1000+ QPS)
|
||
- ⏳ 实施时间:3-5天
|
||
|
||
---
|
||
|
||
## 六、代码实施清单
|
||
|
||
### 6.1 立即优化(可选)
|
||
|
||
在当前实现基础上添加数据库连接池优化:
|
||
|
||
```go
|
||
// config/database.go
|
||
db.SetMaxOpenConns(100) // 最大打开连接数
|
||
db.SetMaxIdleConns(10) // 最大空闲连接数
|
||
db.SetConnMaxLifetime(time.Hour) // 连接最大生命周期
|
||
```
|
||
|
||
### 6.2 批量更新实现(下一步)
|
||
|
||
1. 创建 `services/assetService/queue/like_queue.go`
|
||
2. 修改 `repository/asset_repository.go`
|
||
3. 在 `main.go` 中初始化队列
|
||
4. 添加监控和日志
|
||
|
||
---
|
||
|
||
## 七、总结
|
||
|
||
**当前实现的风险等级:** 🟡 中等
|
||
|
||
- ✅ 数据一致性有保证(数据库原子操作)
|
||
- ⚠️ 性能在高并发下会下降
|
||
- ⚠️ 热点资产可能成为瓶颈
|
||
|
||
**推荐行动:**
|
||
1. **短期**:保持当前实现,添加数据库连接池优化
|
||
2. **中期**:实现方案1(批量更新队列)
|
||
3. **长期**:根据实际性能需求考虑引入 Redis
|
||
|
||
**关键指标监控:**
|
||
- 点赞接口 P99 响应时间
|
||
- 数据库连接池使用率
|
||
- 点赞操作 QPS
|
||
- 数据库锁等待时间
|