- StatisticInternalProvider: TrackEvent/BatchTrackEvent
- StatisticCombinedProvider: all 9 RPCs (7 dashboard + 2 event) on single service
- materializer: 4 MV REFRESH CONCURRENTLY + pg_try_advisory_lock + refresh_log
- dashboard_repo: 7 aggregation SQLs (week_rank / 7d curve / top5 / level dist / upgrade progress)
- dashboard_service: 7 RPCs with Redis 5min TTL + cache miss protection (1min empty)
- Cache wrapper: JSON serialize + format dash:{rpc}:{starID}:{userID}
- main.go: integrated workers + Dubbo triple server :20009
- cross-service userService.GetFanProfile (for crystal_balance)
- client/user_rpc_client.go
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
103 lines
3.8 KiB
Go
103 lines
3.8 KiB
Go
package provider
|
||
|
||
import (
|
||
"context"
|
||
"strconv"
|
||
"time"
|
||
|
||
"go.uber.org/zap"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
"github.com/topfans/backend/services/statisticService/metrics"
|
||
"github.com/topfans/backend/services/statisticService/service"
|
||
)
|
||
|
||
// StatisticCombinedProvider 完整 StatisticService 实现(9 RPC)
|
||
// - 看板 7 RPC(T12 由 DashboardService 实现)
|
||
// - 事件 2 RPC(T9 由 StatisticInternalProvider 实现)
|
||
type StatisticCombinedProvider struct {
|
||
*StatisticInternalProvider
|
||
dashSvc *service.DashboardService
|
||
}
|
||
|
||
// NewStatisticCombinedProvider 构造
|
||
func NewStatisticCombinedProvider(internal *StatisticInternalProvider, dashSvc *service.DashboardService) *StatisticCombinedProvider {
|
||
return &StatisticCombinedProvider{
|
||
StatisticInternalProvider: internal,
|
||
dashSvc: dashSvc,
|
||
}
|
||
}
|
||
|
||
func userIDFromContext(ctx context.Context) int64 {
|
||
if v := ctx.Value("user_id"); v != nil {
|
||
switch s := v.(type) {
|
||
case string:
|
||
n, _ := strconv.ParseInt(s, 10, 64)
|
||
return n
|
||
case int64:
|
||
return s
|
||
case int:
|
||
return int64(s)
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func (p *StatisticCombinedProvider) recordRPC(rpc string, start time.Time, err error) {
|
||
status := "ok"
|
||
if err != nil {
|
||
status = "error"
|
||
}
|
||
metrics.DashboardRPCTotal.WithLabelValues(rpc, status).Inc()
|
||
metrics.DashboardRPCDuration.WithLabelValues(rpc).Observe(time.Since(start).Seconds())
|
||
}
|
||
|
||
// ===== 看板 7 RPC =====
|
||
|
||
func (p *StatisticCombinedProvider) GetTodayOverview(ctx context.Context, req *pb.GetTodayOverviewRequest) (*pb.GetTodayOverviewResponse, error) {
|
||
t0 := time.Now()
|
||
defer func() { p.recordRPC("GetTodayOverview", t0, nil) }()
|
||
resp, err := p.dashSvc.GetTodayOverview(ctx, userIDFromContext(ctx), req.StarId)
|
||
if err != nil {
|
||
logger.Logger.Warn("GetTodayOverview failed", zap.Error(err))
|
||
}
|
||
return resp, err
|
||
}
|
||
|
||
func (p *StatisticCombinedProvider) Get7DayIncomeCurve(ctx context.Context, req *pb.Get7DayIncomeCurveRequest) (*pb.Get7DayIncomeCurveResponse, error) {
|
||
t0 := time.Now()
|
||
defer func() { p.recordRPC("Get7DayIncomeCurve", t0, nil) }()
|
||
return p.dashSvc.Get7DayIncomeCurve(ctx, userIDFromContext(ctx), req.StarId)
|
||
}
|
||
|
||
func (p *StatisticCombinedProvider) GetExhibitionIncomeSummary(ctx context.Context, req *pb.GetExhibitionIncomeSummaryRequest) (*pb.GetExhibitionIncomeSummaryResponse, error) {
|
||
t0 := time.Now()
|
||
defer func() { p.recordRPC("GetExhibitionIncomeSummary", t0, nil) }()
|
||
return p.dashSvc.GetExhibitionIncomeSummary(ctx, userIDFromContext(ctx), req.StarId)
|
||
}
|
||
|
||
func (p *StatisticCombinedProvider) GetLikeIncomeByLevel(ctx context.Context, req *pb.GetLikeIncomeByLevelRequest) (*pb.GetLikeIncomeByLevelResponse, error) {
|
||
t0 := time.Now()
|
||
defer func() { p.recordRPC("GetLikeIncomeByLevel", t0, nil) }()
|
||
return p.dashSvc.GetLikeIncomeByLevel(ctx, userIDFromContext(ctx), req.StarId)
|
||
}
|
||
|
||
func (p *StatisticCombinedProvider) GetTopAssetsByEarning(ctx context.Context, req *pb.GetTopAssetsByEarningRequest) (*pb.GetTopAssetsByEarningResponse, error) {
|
||
t0 := time.Now()
|
||
defer func() { p.recordRPC("GetTopAssetsByEarning", t0, nil) }()
|
||
return p.dashSvc.GetTopAssetsByEarning(ctx, userIDFromContext(ctx), req.StarId)
|
||
}
|
||
|
||
func (p *StatisticCombinedProvider) GetAssetLevelDistribution(ctx context.Context, req *pb.GetAssetLevelDistributionRequest) (*pb.GetAssetLevelDistributionResponse, error) {
|
||
t0 := time.Now()
|
||
defer func() { p.recordRPC("GetAssetLevelDistribution", t0, nil) }()
|
||
return p.dashSvc.GetAssetLevelDistribution(ctx, userIDFromContext(ctx), req.StarId)
|
||
}
|
||
|
||
func (p *StatisticCombinedProvider) GetAssetUpgradeProgress(ctx context.Context, req *pb.GetAssetUpgradeProgressRequest) (*pb.GetAssetUpgradeProgressResponse, error) {
|
||
t0 := time.Now()
|
||
defer func() { p.recordRPC("GetAssetUpgradeProgress", t0, nil) }()
|
||
return p.dashSvc.GetAssetUpgradeProgress(ctx, userIDFromContext(ctx), req.StarId)
|
||
}
|