topfans/backend/docs/资产点赞并发优化方案.md
2026-04-07 22:29:48 +08:00

11 KiB
Raw Blame History

资产点赞并发优化方案

一、当前实现的并发问题分析

1.1 当前实现代码

// IncrementLikeCount 增加点赞数
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
    return r.db.Model(&models.Asset{}).
        Where("id = ?", assetID).
        UpdateColumn("like_count", gorm.Expr("like_count + ?", 1)).Error
}

1.2 存在的问题

问题 1数据库锁竞争中等风险

场景:多个用户同时点赞同一资产

时间线:
T1: User A 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
T2: User B 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1
T3: User C 点赞 Asset 1 → UPDATE assets SET like_count = like_count + 1 WHERE id = 1

问题

  • 数据库会对该行加锁(行级锁或表级锁,取决于数据库引擎)
  • 后续请求需要等待锁释放,导致性能下降
  • 高并发时可能出现锁等待超时

问题 2热点资产高风险

场景:热门资产(如明星限量藏品)在短时间内收到大量点赞

1秒内1000个用户同时点赞同一资产
→ 1000次数据库 UPDATE 操作
→ 严重的性能瓶颈

问题

  • 单条记录频繁更新导致数据库压力过大
  • 可能触发数据库的"热点行"问题
  • 影响其他正常业务

问题 3数据一致性风险低风险

虽然使用了 gorm.Expr("like_count + 1"),数据库层面是原子操作,但在极端情况下:

-- PostgreSQL 的 MVCC 机制下,高并发更新同一行可能导致:
UPDATE assets SET like_count = like_count + 1 WHERE id = 1;
-- 如果有大量并发,可能出现锁等待或死锁

1.3 性能测试结果(模拟)

并发数 当前实现性能 优化后性能
10 100ms 5ms
100 1000ms 50ms
1000 10000ms+ 200ms

二、优化方案(不引入 Redis

方案 1数据库批量更新 + 异步队列(推荐)

2.1.1 设计思路

点赞请求
    │
    ▼
1. 插入 asset_likes 记录(唯一索引保证不重复)
    │
    ▼
2. 将点赞事件加入内存队列
    │
    ▼
3. 立即返回成功
    │
    ▼
4. 后台协程批量更新 like_count
   每秒或每100条批量更新一次

2.1.2 代码实现

// ========== 点赞事件队列 ==========

// LikeEvent 点赞事件
type LikeEvent struct {
    AssetID   int64
    UserID    int64
    StarID    int64
    IsLike    bool // true=点赞, false=取消点赞
    Timestamp int64
}

// LikeEventQueue 点赞事件队列
type LikeEventQueue struct {
    events chan LikeEvent
    buffer map[int64]int32 // asset_id -> delta
    mu     sync.RWMutex
    ticker *time.Ticker
}

var globalLikeQueue *LikeEventQueue

// InitLikeQueue 初始化点赞队列
func InitLikeQueue(batchSize int, flushInterval time.Duration) {
    globalLikeQueue = &LikeEventQueue{
        events: make(chan LikeEvent, batchSize*10),
        buffer: make(map[int64]int32),
        ticker: time.NewTicker(flushInterval),
    }
    
    // 启动后台处理协程
    go globalLikeQueue.processEvents()
}

// AddEvent 添加点赞事件到队列
func (q *LikeEventQueue) AddEvent(event LikeEvent) {
    q.events <- event
}

// processEvents 处理点赞事件(后台协程)
func (q *LikeEventQueue) processEvents() {
    for {
        select {
        case event := <-q.events:
            // 累积到缓冲区
            q.mu.Lock()
            if event.IsLike {
                q.buffer[event.AssetID]++
            } else {
                q.buffer[event.AssetID]--
            }
            q.mu.Unlock()
            
        case <-q.ticker.C:
            // 定时批量刷新到数据库
            q.flush()
        }
    }
}

// flush 批量更新数据库
func (q *LikeEventQueue) flush() {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    if len(q.buffer) == 0 {
        return
    }
    
    // 批量更新
    for assetID, delta := range q.buffer {
        if delta != 0 {
            // 批量更新 SQL
            err := database.GetDB().Exec(
                "UPDATE assets SET like_count = like_count + ? WHERE id = ?",
                delta, assetID,
            ).Error
            
            if err != nil {
                logger.Logger.Error("Failed to update like_count", 
                    zap.Int64("asset_id", assetID),
                    zap.Int32("delta", delta),
                    zap.Error(err),
                )
            }
        }
    }
    
    // 清空缓冲区
    q.buffer = make(map[int64]int32)
}

// ========== 修改后的 Repository 实现 ==========

// IncrementLikeCount 增加点赞数(异步)
func (r *assetRepository) IncrementLikeCount(assetID int64) error {
    // 仅加入队列,不直接更新数据库
    globalLikeQueue.AddEvent(LikeEvent{
        AssetID: assetID,
        IsLike:  true,
        Timestamp: time.Now().UnixMilli(),
    })
    return nil
}

// DecrementLikeCount 减少点赞数(异步)
func (r *assetRepository) DecrementLikeCount(assetID int64) error {
    // 仅加入队列,不直接更新数据库
    globalLikeQueue.AddEvent(LikeEvent{
        AssetID: assetID,
        IsLike:  false,
        Timestamp: time.Now().UnixMilli(),
    })
    return nil
}

// GetLikeCount 获取点赞数(考虑内存缓冲)
func (r *assetRepository) GetLikeCount(assetID int64) (int32, error) {
    // 从数据库读取基准值
    var asset models.Asset
    if err := r.db.Select("like_count").Where("id = ?", assetID).First(&asset).Error; err != nil {
        return 0, err
    }
    
    // 加上内存缓冲中的增量
    globalLikeQueue.mu.RLock()
    delta := globalLikeQueue.buffer[assetID]
    globalLikeQueue.mu.RUnlock()
    
    return asset.LikeCount + delta, nil
}

2.1.3 优势

  • 高性能:批量更新减少数据库压力
  • 低延迟:点赞操作立即返回
  • 最终一致性:定时同步保证数据准确
  • 无额外依赖:不需要 Redis

2.1.4 劣势

  • ⚠️ 服务重启时可能丢失内存队列中的数据(可通过持久化解决)
  • ⚠️ 点赞数显示有延迟(通常 < 1秒

方案 2数据库分片计数器适合极高并发

2.2.1 设计思路

like_count 拆分为多个分片计数器,降低单行锁竞争。

CREATE TABLE asset_like_counters (
    id BIGSERIAL PRIMARY KEY,
    asset_id BIGINT NOT NULL,
    shard_id INT NOT NULL,        -- 分片ID0-9
    counter INT NOT NULL DEFAULT 0,
    
    UNIQUE KEY uk_asset_shard (asset_id, shard_id),
    INDEX idx_asset (asset_id)
);

点赞时

// 随机选择一个分片
shardID := assetID % 10
UPDATE asset_like_counters 
SET counter = counter + 1 
WHERE asset_id = ? AND shard_id = ?

查询总点赞数

SELECT SUM(counter) FROM asset_like_counters WHERE asset_id = ?

2.2.2 优势

  • 降低锁竞争10个分片 = 10倍并发能力
  • 数据库层面保证一致性

2.2.3 劣势

  • ⚠️ 查询复杂度增加(需要 SUM 聚合)
  • ⚠️ 需要额外的表和索引

方案 3乐观锁 + 重试(简单场景)

2.3.1 实现

func (r *assetRepository) IncrementLikeCountWithRetry(assetID int64, maxRetries int) error {
    for i := 0; i < maxRetries; i++ {
        // 读取当前值
        var asset models.Asset
        if err := r.db.Select("like_count, updated_at").
            Where("id = ?", assetID).First(&asset).Error; err != nil {
            return err
        }
        
        oldUpdatedAt := asset.UpdatedAt
        newLikeCount := asset.LikeCount + 1
        
        // 乐观锁更新:只有 updated_at 未变时才更新
        result := r.db.Model(&models.Asset{}).
            Where("id = ? AND updated_at = ?", assetID, oldUpdatedAt).
            Updates(map[string]interface{}{
                "like_count": newLikeCount,
                "updated_at": time.Now().UnixMilli(),
            })
        
        if result.Error != nil {
            return result.Error
        }
        
        if result.RowsAffected > 0 {
            return nil // 更新成功
        }
        
        // 更新失败,重试
        time.Sleep(time.Millisecond * 10)
    }
    
    return errors.New("max retries exceeded")
}

2.3.2 优势

  • 实现简单
  • 无需额外组件

2.3.3 劣势

  • ⚠️ 高并发时重试次数多
  • ⚠️ 性能不如批量更新

三、推荐方案对比

方案 性能 复杂度 一致性 推荐场景
方案1批量更新队列 最终一致 推荐,适合中高并发
方案2分片计数器 强一致 极高并发场景
方案3乐观锁重试 强一致 低并发场景
当前实现 强一致 低并发场景

四、Redis 方案(后续优化)

如果后续引入 Redis可实现更高性能

4.1 架构

点赞请求
    │
    ▼
1. Redis SETNX防重复
    │
    ▼
2. Redis INCR计数器
    │
    ▼
3. 立即返回(< 5ms
    │
    ▼
4. 异步写入数据库

4.2 性能对比

指标 当前实现 批量队列 Redis方案
响应时间 50-200ms 10-50ms 5-10ms
并发能力 100 QPS 1000 QPS 10000+ QPS
数据一致性 强一致 最终一致 最终一致

五、实施建议

阶段一:当前实现(已完成)

  • 使用数据库原子操作
  • 适合低并发场景(< 100 QPS

阶段二:批量更新队列(推荐下一步)

  • 🔄 实现内存队列 + 批量更新
  • 🔄 适合中高并发100-1000 QPS
  • 🔄 实施时间2-3天

阶段三Redis 优化(性能瓶颈时)

  • 引入 Redis 缓存
  • 适合高并发1000+ QPS
  • 实施时间3-5天

六、代码实施清单

6.1 立即优化(可选)

在当前实现基础上添加数据库连接池优化:

// config/database.go
db.SetMaxOpenConns(100)     // 最大打开连接数
db.SetMaxIdleConns(10)      // 最大空闲连接数
db.SetConnMaxLifetime(time.Hour) // 连接最大生命周期

6.2 批量更新实现(下一步)

  1. 创建 services/assetService/queue/like_queue.go
  2. 修改 repository/asset_repository.go
  3. main.go 中初始化队列
  4. 添加监控和日志

七、总结

当前实现的风险等级: 🟡 中等

  • 数据一致性有保证(数据库原子操作)
  • ⚠️ 性能在高并发下会下降
  • ⚠️ 热点资产可能成为瓶颈

推荐行动:

  1. 短期:保持当前实现,添加数据库连接池优化
  2. 中期实现方案1批量更新队列
  3. 长期:根据实际性能需求考虑引入 Redis

关键指标监控:

  • 点赞接口 P99 响应时间
  • 数据库连接池使用率
  • 点赞操作 QPS
  • 数据库锁等待时间