11 KiB
11 KiB
资产点赞并发优化方案
一、当前实现的并发问题分析
1.1 当前实现代码
// IncrementLikeCount 增加点赞数
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
return r.db.Model(&models.Asset{}).
Where("id = ?", assetID).
UpdateColumn("like_count", gorm.Expr("like_count + ?", 1)).Error
}
1.2 存在的问题
问题 1:数据库锁竞争(中等风险)
场景:多个用户同时点赞同一资产
时间线:
T1: User A 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
T2: User B 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
T3: User C 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
问题:
- 数据库会对该行加锁(行级锁或表级锁,取决于数据库引擎)
- 后续请求需要等待锁释放,导致性能下降
- 高并发时可能出现锁等待超时
问题 2:热点资产(高风险)
场景:热门资产(如明星限量藏品)在短时间内收到大量点赞
1秒内:1000个用户同时点赞同一资产
→ 1000次数据库 UPDATE 操作
→ 严重的性能瓶颈
问题:
- 单条记录频繁更新导致数据库压力过大
- 可能触发数据库的"热点行"问题
- 影响其他正常业务
问题 3:数据一致性风险(低风险)
虽然使用了 gorm.Expr("like_count + 1"),数据库层面是原子操作,但在极端情况下:
-- PostgreSQL 的 MVCC 机制下,高并发更新同一行可能导致:
UPDATE assets SET like_count = like_count + 1 WHERE id = 1;
-- 如果有大量并发,可能出现锁等待或死锁
1.3 性能测试结果(模拟)
| 并发数 | 当前实现性能 | 优化后性能 |
|---|---|---|
| 10 | 100ms | 5ms |
| 100 | 1000ms | 50ms |
| 1000 | 10000ms+ | 200ms |
二、优化方案(不引入 Redis)
方案 1:数据库批量更新 + 异步队列(推荐)
2.1.1 设计思路
点赞请求
│
▼
1. 插入 asset_likes 记录(唯一索引保证不重复)
│
▼
2. 将点赞事件加入内存队列
│
▼
3. 立即返回成功
│
▼
4. 后台协程批量更新 like_count
(每秒或每100条批量更新一次)
2.1.2 代码实现
// ========== 点赞事件队列 ==========
// LikeEvent 点赞事件
type LikeEvent struct {
AssetID int64
UserID int64
StarID int64
IsLike bool // true=点赞, false=取消点赞
Timestamp int64
}
// LikeEventQueue 点赞事件队列
type LikeEventQueue struct {
events chan LikeEvent
buffer map[int64]int32 // asset_id -> delta
mu sync.RWMutex
ticker *time.Ticker
}
var globalLikeQueue *LikeEventQueue
// InitLikeQueue 初始化点赞队列
func InitLikeQueue(batchSize int, flushInterval time.Duration) {
globalLikeQueue = &LikeEventQueue{
events: make(chan LikeEvent, batchSize*10),
buffer: make(map[int64]int32),
ticker: time.NewTicker(flushInterval),
}
// 启动后台处理协程
go globalLikeQueue.processEvents()
}
// AddEvent 添加点赞事件到队列
func (q *LikeEventQueue) AddEvent(event LikeEvent) {
q.events <- event
}
// processEvents 处理点赞事件(后台协程)
func (q *LikeEventQueue) processEvents() {
for {
select {
case event := <-q.events:
// 累积到缓冲区
q.mu.Lock()
if event.IsLike {
q.buffer[event.AssetID]++
} else {
q.buffer[event.AssetID]--
}
q.mu.Unlock()
case <-q.ticker.C:
// 定时批量刷新到数据库
q.flush()
}
}
}
// flush 批量更新数据库
func (q *LikeEventQueue) flush() {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.buffer) == 0 {
return
}
// 批量更新
for assetID, delta := range q.buffer {
if delta != 0 {
// 批量更新 SQL
err := database.GetDB().Exec(
"UPDATE assets SET like_count = like_count + ? WHERE id = ?",
delta, assetID,
).Error
if err != nil {
logger.Logger.Error("Failed to update like_count",
zap.Int64("asset_id", assetID),
zap.Int32("delta", delta),
zap.Error(err),
)
}
}
}
// 清空缓冲区
q.buffer = make(map[int64]int32)
}
// ========== 修改后的 Repository 实现 ==========
// IncrementLikeCount 增加点赞数(异步)
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
// 仅加入队列,不直接更新数据库
globalLikeQueue.AddEvent(LikeEvent{
AssetID: assetID,
IsLike: true,
Timestamp: time.Now().UnixMilli(),
})
return nil
}
// DecrementLikeCount 减少点赞数(异步)
func (r *assetRepository) DecrementLikeCount(assetID int64) error {
// 仅加入队列,不直接更新数据库
globalLikeQueue.AddEvent(LikeEvent{
AssetID: assetID,
IsLike: false,
Timestamp: time.Now().UnixMilli(),
})
return nil
}
// GetLikeCount 获取点赞数(考虑内存缓冲)
func (r *assetRepository) GetLikeCount(assetID int64) (int32, error) {
// 从数据库读取基准值
var asset models.Asset
if err := r.db.Select("like_count").Where("id = ?", assetID).First(&asset).Error; err != nil {
return 0, err
}
// 加上内存缓冲中的增量
globalLikeQueue.mu.RLock()
delta := globalLikeQueue.buffer[assetID]
globalLikeQueue.mu.RUnlock()
return asset.LikeCount + delta, nil
}
2.1.3 优势
- ✅ 高性能:批量更新减少数据库压力
- ✅ 低延迟:点赞操作立即返回
- ✅ 最终一致性:定时同步保证数据准确
- ✅ 无额外依赖:不需要 Redis
2.1.4 劣势
- ⚠️ 服务重启时可能丢失内存队列中的数据(可通过持久化解决)
- ⚠️ 点赞数显示有延迟(通常 < 1秒)
方案 2:数据库分片计数器(适合极高并发)
2.2.1 设计思路
将 like_count 拆分为多个分片计数器,降低单行锁竞争。
CREATE TABLE asset_like_counters (
id BIGSERIAL PRIMARY KEY,
asset_id BIGINT NOT NULL,
shard_id INT NOT NULL, -- 分片ID(0-9)
counter INT NOT NULL DEFAULT 0,
UNIQUE KEY uk_asset_shard (asset_id, shard_id),
INDEX idx_asset (asset_id)
);
点赞时:
// 随机选择一个分片
shardID := assetID % 10
UPDATE asset_like_counters
SET counter = counter + 1
WHERE asset_id = ? AND shard_id = ?
查询总点赞数:
SELECT SUM(counter) FROM asset_like_counters WHERE asset_id = ?
2.2.2 优势
- ✅ 降低锁竞争(10个分片 = 10倍并发能力)
- ✅ 数据库层面保证一致性
2.2.3 劣势
- ⚠️ 查询复杂度增加(需要 SUM 聚合)
- ⚠️ 需要额外的表和索引
方案 3:乐观锁 + 重试(简单场景)
2.3.1 实现
func (r *assetRepository) IncrementLikeCountWithRetry(assetID int64, maxRetries int) error {
for i := 0; i < maxRetries; i++ {
// 读取当前值
var asset models.Asset
if err := r.db.Select("like_count, updated_at").
Where("id = ?", assetID).First(&asset).Error; err != nil {
return err
}
oldUpdatedAt := asset.UpdatedAt
newLikeCount := asset.LikeCount + 1
// 乐观锁更新:只有 updated_at 未变时才更新
result := r.db.Model(&models.Asset{}).
Where("id = ? AND updated_at = ?", assetID, oldUpdatedAt).
Updates(map[string]interface{}{
"like_count": newLikeCount,
"updated_at": time.Now().UnixMilli(),
})
if result.Error != nil {
return result.Error
}
if result.RowsAffected > 0 {
return nil // 更新成功
}
// 更新失败,重试
time.Sleep(time.Millisecond * 10)
}
return errors.New("max retries exceeded")
}
2.3.2 优势
- ✅ 实现简单
- ✅ 无需额外组件
2.3.3 劣势
- ⚠️ 高并发时重试次数多
- ⚠️ 性能不如批量更新
三、推荐方案对比
| 方案 | 性能 | 复杂度 | 一致性 | 推荐场景 |
|---|---|---|---|---|
| 方案1:批量更新队列 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 最终一致 | 推荐,适合中高并发 |
| 方案2:分片计数器 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 强一致 | 极高并发场景 |
| 方案3:乐观锁重试 | ⭐⭐ | ⭐⭐ | 强一致 | 低并发场景 |
| 当前实现 | ⭐⭐ | ⭐ | 强一致 | 低并发场景 |
四、Redis 方案(后续优化)
如果后续引入 Redis,可实现更高性能:
4.1 架构
点赞请求
│
▼
1. Redis SETNX(防重复)
│
▼
2. Redis INCR(计数器)
│
▼
3. 立即返回(< 5ms)
│
▼
4. 异步写入数据库
4.2 性能对比
| 指标 | 当前实现 | 批量队列 | Redis方案 |
|---|---|---|---|
| 响应时间 | 50-200ms | 10-50ms | 5-10ms |
| 并发能力 | 100 QPS | 1000 QPS | 10000+ QPS |
| 数据一致性 | 强一致 | 最终一致 | 最终一致 |
五、实施建议
阶段一:当前实现(已完成)
- ✅ 使用数据库原子操作
- ✅ 适合低并发场景(< 100 QPS)
阶段二:批量更新队列(推荐下一步)
- 🔄 实现内存队列 + 批量更新
- 🔄 适合中高并发(100-1000 QPS)
- 🔄 实施时间:2-3天
阶段三:Redis 优化(性能瓶颈时)
- ⏳ 引入 Redis 缓存
- ⏳ 适合高并发(1000+ QPS)
- ⏳ 实施时间:3-5天
六、代码实施清单
6.1 立即优化(可选)
在当前实现基础上添加数据库连接池优化:
// config/database.go
db.SetMaxOpenConns(100) // 最大打开连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(time.Hour) // 连接最大生命周期
6.2 批量更新实现(下一步)
- 创建
services/assetService/queue/like_queue.go - 修改
repository/asset_repository.go - 在
main.go中初始化队列 - 添加监控和日志
七、总结
当前实现的风险等级: 🟡 中等
- ✅ 数据一致性有保证(数据库原子操作)
- ⚠️ 性能在高并发下会下降
- ⚠️ 热点资产可能成为瓶颈
推荐行动:
- 短期:保持当前实现,添加数据库连接池优化
- 中期:实现方案1(批量更新队列)
- 长期:根据实际性能需求考虑引入 Redis
关键指标监控:
- 点赞接口 P99 响应时间
- 数据库连接池使用率
- 点赞操作 QPS
- 数据库锁等待时间