15 KiB
15 KiB
修复文档:OnExhibitionCompleted 调用链路重建
日期: 2026-05-16 问题: OnExhibitionCompleted 未被调用,用户累计上架时长 (user_exhibition_hours) 数据异常 影响: 61 次重复调用记录,8 条有效记录 vs 53 小时重复累计
一、问题根因
1.1 当前代码状态(经验证)
| 组件 | 状态 | 位置 | 备注 |
|---|---|---|---|
| OnExhibitionCompleted RPC 定义 | ✅ 存在 | galleryService/client/task_rpc_client.go:51 |
完整实现,但从未被调用 |
| OnExhibitionCompleted 调用处 | ❌ 不存在 | 无任何代码调用 | 已搜索全项目无调用 |
| CleanupWorker NewCleanupWorker 调用 | ⚠️ 存在但实现缺失 | main.go:183 |
调用了但 service/cleanup_worker.go 不存在 |
| AddExhibitionHours 调用 | ❌ 缺失 | revenue_service.go:132-172 |
OnExhibitionCompleted 未调用 |
| revenueService.userClient | ✅ 已注入 | service/revenue_service.go:23 |
|
| main.go 调用签名不匹配 | ⚠️ 存在 | main.go:110 |
传3个参数,函数只接收2个 |
1.2 数据流断点
展位过期 → main.go 调用 NewCleanupWorker
↓
cleanup_worker.go 不存在
↓
无人调用 taskClient.OnExhibitionCompleted()
↓
OnExhibitionCompleted RPC 从未被触发
↓
revenueService.OnExhibitionCompleted 只创建 revenue record
↓
缺少: s.userClient.AddExhibitionHours()
↓
user_exhibition_hours 表无数据来源
1.3 验证过程
# 1. 搜索 OnExhibitionCompleted 调用 - 找到定义但无调用
grep -rn "\.OnExhibitionCompleted\(" backend/services/galleryService/
# 结果: task_rpc_client.go:77 是定义,其他都是 RPC handler
# 2. 确认 taskClient.OnExhibitionCompleted 无人调用
grep -rn "taskClient\.OnExhibitionCompleted\|taskRPCClient\.OnExhibitionCompleted" backend/
# 结果: 无匹配
# 3. 检查 service/ 目录文件
ls backend/services/galleryService/service/
# 结果: 只有 gallery_service.go,没有 cleanup_worker.go
# 4. 检查 revenueService 结构
grep -A 5 "type revenueService struct" backend/services/taskService/service/revenue_service.go
# 结果:
# type revenueService struct {
# revenueRepo repository.RevenueRepository
# userClient client.UserServiceClient ✅ 已注入
# }
# 5. 检查 AddExhibitionHours 调用位置
grep -n "AddExhibitionHours" backend/services/taskService/service/revenue_service.go
# 结果: 无匹配 - 确认未调用
# 6. 检查 main.go 签名不匹配
grep "NewRevenueService" backend/services/taskService/main.go
# 结果: main.go:110 传3个参数 (revenueRepo, userRPCClient, galleryRPCClient)
grep "func NewRevenueService" backend/services/taskService/service/revenue_service.go
# 结果: revenue_service.go:26 只接收2个参数 (revenueRepo, userClient)
1.4 风险分析
如果直接修复代码让 RemoveExhibitionByAsset 或 cleanup worker 调用 AddExhibitionHours:
- 基于当前 61 条记录,会重复累计约 53 小时(61 - 8 = 53 小时的重复)
- 需要先清理重复数据或添加去重机制
二、修复方案
2.1 修复步骤
Step 1: 创建 backend/services/galleryService/service/cleanup_worker.go
package service
import (
"context"
"time"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/pkg/models"
"github.com/topfans/backend/services/galleryService/client"
"github.com/topfans/backend/services/galleryService/repository"
"go.uber.org/zap"
)
type CleanupWorker struct {
repo *repository.GalleryRepository
assetClient client.AssetRPCClient
userClient client.UserRPCClient
taskClient client.TaskRPCClient
stopCh chan struct{}
ticker *time.Ticker
}
func NewCleanupWorker(
repo *repository.GalleryRepository,
assetClient client.AssetRPCClient,
userClient client.UserRPCClient,
taskClient client.TaskRPCClient,
) *CleanupWorker {
return &CleanupWorker{
repo: repo,
assetClient: assetClient,
userClient: userClient,
taskClient: taskClient,
stopCh: make(chan struct{}),
ticker: time.NewTicker(1 * time.Minute),
}
}
func (w *CleanupWorker) Start() {
logger.Logger.Info("CleanupWorker started")
w.cleanup() // 立即执行一次
for {
select {
case <-w.ticker.C:
w.cleanup()
case <-w.stopCh:
logger.Logger.Info("CleanupWorker stopped")
return
}
}
}
func (w *CleanupWorker) Stop() {
w.ticker.Stop()
close(w.stopCh)
}
func (w *CleanupWorker) cleanup() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 获取所有过期展品 (使用毫秒时间戳)
nowMs := time.Now().UnixMilli()
expiredExhibitions, err := w.repo.GetExpiredExhibitions(nowMs)
if err != nil {
logger.Logger.Error("CleanupWorker: failed to get expired exhibitions", zap.Error(err))
return
}
if len(expiredExhibitions) == 0 {
return
}
logger.Logger.Info("CleanupWorker: processing expired exhibitions", zap.Int("count", len(expiredExhibitions)))
for _, exhibition := range expiredExhibitions {
w.processExpiredExhibition(ctx, exhibition)
}
}
func (w *CleanupWorker) processExpiredExhibition(ctx context.Context, exhibition *models.Exhibition) {
// 获取展位信息
slot, err := w.repo.GetSlotByID(exhibition.SlotID)
if err != nil {
logger.Logger.Error("CleanupWorker: failed to get slot",
zap.Int64("slot_id", exhibition.SlotID), zap.Error(err))
return
}
// 计算实际上架时长(毫秒转小时)
startTime := exhibition.PlacedAt.UnixMilli()
expireAt := exhibition.ExpiresAt.UnixMilli()
actualHours := (expireAt - startTime) / 3600000 // 毫秒转小时
if actualHours < 1 {
actualHours = 1
}
// 构建请求
req := &client.OnExhibitionCompletedRequest{
ExhibitionId: exhibition.ID,
AssetId: exhibition.AssetID,
SlotId: exhibition.SlotID,
OccupierUid: exhibition.OccupierUID,
OccupierStarId: exhibition.OccupierStarID,
SlotOwnerUid: slot.UserID,
CrystalAmount: exhibition.CrystalAmount,
StartTime: startTime,
ExpireAt: expireAt,
}
// 调用 OnExhibitionCompleted - 这是缺失的关键调用
resp, err := w.taskClient.OnExhibitionCompleted(ctx, req)
if err != nil {
logger.Logger.Error("CleanupWorker: OnExhibitionCompleted failed",
zap.Int64("exhibition_id", exhibition.ID), zap.Error(err))
return
}
logger.Logger.Info("CleanupWorker: OnExhibitionCompleted succeeded",
zap.Int64("exhibition_id", exhibition.ID),
zap.Int64("revenue_record_id", resp.RevenueRecordId))
// 删除展品记录
if err := w.repo.DeleteExhibition(exhibition.ID); err != nil {
logger.Logger.Error("CleanupWorker: failed to delete exhibition",
zap.Int64("exhibition_id", exhibition.ID), zap.Error(err))
}
}
Step 2: 修改 revenue_service.go 添加 AddExhibitionHours 调用
文件: backend/services/taskService/service/revenue_service.go
在 OnExhibitionCompleted 成功创建 revenue record 后,添加 AddExhibitionHours 调用:
func (s *revenueService) OnExhibitionCompleted(ctx context.Context, req *pb.OnExhibitionCompletedRequest) (*pb.OnExhibitionCompletedResponse, error) {
// ... 现有代码 (检查 self-slot,创建 revenue record) ...
result, err := s.revenueRepo.CreateRevenueRecord(record)
if err != nil {
logger.Logger.Error("OnExhibitionCompleted: failed to create revenue record",
zap.Int64("slot_owner_uid", req.SlotOwnerUid),
zap.Int64("amount", req.CrystalAmount),
zap.Error(err))
return nil, err
}
// ===== 新增:调用 AddExhibitionHours =====
// 计算实际上架时长(毫秒转小时)
startTime := req.StartTime
expireAt := req.ExpireAt
actualHours := (expireAt - startTime) / 3600000
if actualHours < 1 {
actualHours = 1
}
// sourceID 用于去重,避免重复累计
sourceID := fmt.Sprintf("exhibition_%d", req.ExhibitionId)
// 增加用户累计上架时长(收益属于展位主人)
newLevel, levelDelta, crystalReward, err := s.userClient.AddExhibitionHours(
ctx,
req.SlotOwnerUid,
req.OccupierStarId,
actualHours,
sourceID,
)
if err != nil {
logger.Logger.Error("OnExhibitionCompleted: AddExhibitionHours failed",
zap.Int64("slot_owner_uid", req.SlotOwnerUid),
zap.Int64("hours", actualHours),
zap.Error(err))
// 不返回错误,因为收益记录已创建
} else {
logger.Logger.Info("OnExhibitionCompleted: AddExhibitionHours succeeded",
zap.Int64("slot_owner_uid", req.SlotOwnerUid),
zap.Int64("hours", actualHours),
zap.Int32("new_level", newLevel),
zap.Int32("level_delta", levelDelta),
zap.Int64("crystal_reward", crystalReward))
}
// ======================================
logger.Logger.Info("OnExhibitionCompleted: revenue record created",
zap.Int64("record_id", result.ID),
zap.Int64("slot_owner_uid", req.SlotOwnerUid),
zap.Int64("amount", req.CrystalAmount))
return &pb.OnExhibitionCompletedResponse{RevenueRecordId: result.ID}, nil
}
注意: 需要在文件头部添加 fmt import:
import (
"context"
"fmt" // 新增
// ...
)
Step 3: 修复 main.go 签名不匹配
文件: backend/services/taskService/main.go:110
当前代码:
revenueSvc := service.NewRevenueService(revenueRepo, userRPCClient, galleryRPCClient)
有两种修复方式:
方式 A: 如果 galleryRPCClient 不需要,从调用中移除:
revenueSvc := service.NewRevenueService(revenueRepo, userRPCClient)
方式 B: 如果需要 galleryRPCClient,修改 NewRevenueService 签名:
func NewRevenueService(revenueRepo repository.RevenueRepository, userClient client.UserServiceClient, galleryClient client.GalleryServiceClient) RevenueService {
return &revenueService{
revenueRepo: revenueRepo,
userClient: userClient,
galleryClient: galleryClient, // 新增字段
}
}
三、数据修复
3.1 重复数据清理
在修复代码之前,需要先处理现有的重复数据:
-- 查看重复记录
SELECT
user_id,
star_id,
source_id,
COUNT(*) as cnt,
SUM(exhibition_hours) as total_hours
FROM user_exhibition_hours
WHERE source_id LIKE 'exhibition_%'
GROUP BY user_id, star_id, source_id
HAVING COUNT(*) > 1;
-- 保留最新记录,删除重复
DELETE FROM user_exhibition_hours
WHERE id NOT IN (
SELECT MAX(id)
FROM user_exhibition_hours
WHERE source_id LIKE 'exhibition_%'
GROUP BY user_id, star_id, source_id
);
3.2 添加去重逻辑(可选增强)
在 AddExhibitionHours 中使用 source_id 进行去重:
func (r *fanProfileRepository) AddExhibitionHours(userID, starID int64, hours int64, sourceID string) (int32, int32, int64, error) {
// 检查是否已处理过此 source
var existing fanProfile
err := r.db.Where("user_id = ? AND star_id = ? AND exhibition_source_id = ?", userID, starID, sourceID).First(&existing).Error
if err == nil {
// 已存在,跳过(幂等性保证)
return existing.Level, 0, 0, nil
}
// ... 正常逻辑 ...
}
四、验证步骤
4.1 代码验证
- ✅ 确认
cleanup_worker.go文件存在于service/目录 - ✅ 确认
GetExpiredExhibitions方法签名正确(接收int64时间戳) - ✅ 确认 CleanupWorker 调用
taskClient.OnExhibitionCompleted() - ✅ 确认
revenue_service.go中OnExhibitionCompleted调用AddExhibitionHours - ✅ 确认
main.go和NewRevenueService签名匹配
4.2 功能验证
- 启动 galleryService
- 创建一个过期的展品(expire_at < NOW())
- 等待 CleanupWorker 执行(1分钟内)
- 检查 logs 中是否有 "CleanupWorker: processing expired exhibitions"
- 检查
exhibition_revenue_records表是否有新记录 - 检查
user_exhibition_hours表是否有对应的时长记录
4.3 去重验证
- 多次触发同一展品的 OnExhibitionCompleted
- 确认
user_exhibition_hours中只有一条记录 - 确认累计时长正确(不应累加多次)
五、风险与注意事项
- 幂等性: OnExhibitionCompleted 必须是幂等的,重复调用不应产生重复数据
- 事务性: AddExhibitionHours 和 revenue record 创建应该在同一事务中(当前未实现,需要评估)
- 时区问题: 使用 Asia/Shanghai 时区计算时长
- 历史数据: 修复前的历史重复数据需要手动清理
- 编译验证: 修改后需重新编译
go build ./services/galleryService/...和go build ./services/taskService/...
六、文件清单
| 文件路径 | 操作 | 关键修改 |
|---|---|---|
backend/services/galleryService/service/cleanup_worker.go |
新增 | 完整实现 CleanupWorker,调用 taskClient.OnExhibitionCompleted |
backend/services/taskService/service/revenue_service.go |
修改 | OnExhibitionCompleted 中添加 AddExhibitionHours 调用 |
backend/services/taskService/main.go |
修改 | 修复 NewRevenueService 调用签名 |
七、修复后数据流
正确的数据流:
展位过期 (expire_at < NOW())
↓
CleanupWorker.Start() 每分钟扫描
↓
GetExpiredExhibitions() 获取过期展品
↓
对每个展品调用 processExpiredExhibition()
↓
taskClient.OnExhibitionCompleted() RPC 调用 ← 缺失的调用
↓
TaskInternalProvider.OnExhibitionCompleted()
↓
revenueService.OnExhibitionCompleted()
├── 创建 revenue record (slot_owner 获取收益)
└── userClient.AddExhibitionHours() (slot_owner 累加上架时长) ← 缺失的调用
↓
repo.DeleteExhibition() 软删除展品
附:待确认 - main.go 调用签名不匹配
现象
$ grep "NewRevenueService" backend/services/taskService/main.go
→ revenueSvc := service.NewRevenueService(revenueRepo, userRPCClient, galleryRPCClient)
# 传3个参数
$ grep "func NewRevenueService" backend/services/taskService/service/revenue_service.go
→ func NewRevenueService(revenueRepo repository.RevenueRepository, userClient client.UserServiceClient) RevenueService {
# 只接收2个参数
验证方法
cd backend && go build ./services/taskService/...
如果编译报错,说明确实是签名不匹配问题。
可能原因
- 代码未同步 - main.go 或 revenue_service.go 有过修改但另一处未同步
- 存在另一个定义 - 如果编译通过,可能有其他地方定义了 NewRevenueService
搜索结果
全项目搜索只找到一个定义:backend/services/taskService/service/revenue_service.go:26
这表明 main.go 存在编译错误,需要将 galleryRPCClient 移除或修改函数签名。