topfans/backend/services/statisticService/service/dashboard_service.go
zerosaturation dd9952ccc9 feat(statistic): T9-T12 dashboard 7 RPCs (Provider + Materializer + Service + Cache)
- StatisticInternalProvider: TrackEvent/BatchTrackEvent
- StatisticCombinedProvider: all 9 RPCs (7 dashboard + 2 event) on single service
- materializer: 4 MV REFRESH CONCURRENTLY + pg_try_advisory_lock + refresh_log
- dashboard_repo: 7 aggregation SQLs (week_rank / 7d curve / top5 / level dist / upgrade progress)
- dashboard_service: 7 RPCs with Redis 5min TTL + cache miss protection (1min empty)
- Cache wrapper: JSON serialize + format dash:{rpc}:{starID}:{userID}
- main.go: integrated workers + Dubbo triple server :20009
- cross-service userService.GetFanProfile (for crystal_balance)
- client/user_rpc_client.go

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:20:53 +08:00

223 lines
6.9 KiB
Go

package service
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
pb "github.com/topfans/backend/pkg/proto/statistic"
"github.com/topfans/backend/services/statisticService/repository"
)
// UserRPCClient 跨服务调用 userService 的接口
type UserRPCClient interface {
GetCrystalBalance(ctx context.Context, userID, starID int64) (int64, error)
}
// DashboardService 看板 7 RPC 业务逻辑
type DashboardService struct {
repo *repository.DashboardRepository
cache *Cache
userRPC UserRPCClient
}
// NewDashboardService 构造
func NewDashboardService(repo *repository.DashboardRepository, cache *Cache, userRPC UserRPCClient) *DashboardService {
return &DashboardService{repo: repo, cache: cache, userRPC: userRPC}
}
// ===== 1. 今日概览 =====
func (s *DashboardService) GetTodayOverview(ctx context.Context, userID, starID int64) (*pb.GetTodayOverviewResponse, error) {
key := CacheKey("today_overview", starID, userID)
var cached pb.GetTodayOverviewResponse
if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok {
return &cached, nil
}
part, err := s.repo.GetWeekRank(ctx, userID, starID)
if err != nil {
return nil, err
}
todayIncome, err := s.repo.GetTodayIncome(ctx, userID, starID)
if err != nil {
return nil, err
}
var crystal int64
if s.userRPC != nil {
if c, err := s.userRPC.GetCrystalBalance(ctx, userID, starID); err == nil {
crystal = c
} else {
logger.Logger.Warn("userService.GetCrystalBalance failed, use 0", zap.Error(err))
}
}
resp := &pb.GetTodayOverviewResponse{
CrystalBalance: crystal,
TodayIncome: todayIncome,
WeekRank: part.WeekRank,
WeekTotalUsers: part.WeekTotalUsers,
}
_ = s.cache.SetJSON(ctx, key, resp)
return resp, nil
}
// ===== 2. 七日收益曲线 =====
func (s *DashboardService) Get7DayIncomeCurve(ctx context.Context, userID, starID int64) (*pb.Get7DayIncomeCurveResponse, error) {
key := CacheKey("7day_income_curve", starID, userID)
var cached pb.Get7DayIncomeCurveResponse
if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok {
return &cached, nil
}
points, total, err := s.repo.Get7DayIncomeCurve(ctx, userID, starID)
if err != nil {
return nil, err
}
resp := &pb.Get7DayIncomeCurveResponse{
TotalIncome: total,
AvgIncome: avgInt64(total, 7),
}
for _, p := range points {
resp.Points = append(resp.Points, &pb.DailyIncomePoint{
Date: p.Date, Income: p.Income, IsToday: p.IsToday, IsPeak: p.IsPeak,
})
}
_ = s.cache.SetJSON(ctx, key, resp)
return resp, nil
}
// ===== 3. 展出收益中心 =====
func (s *DashboardService) GetExhibitionIncomeSummary(ctx context.Context, userID, starID int64) (*pb.GetExhibitionIncomeSummaryResponse, error) {
key := CacheKey("exhibition_summary", starID, userID)
var cached pb.GetExhibitionIncomeSummaryResponse
if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok {
return &cached, nil
}
sum, err := s.repo.GetExhibitionIncomeSummary(ctx, userID, starID)
if err != nil {
return nil, err
}
resp := &pb.GetExhibitionIncomeSummaryResponse{
ExhibitingCount: sum.ExhibitingCount,
StarbookCount: sum.StarbookCount,
TotalDuration: sum.TotalDuration,
TotalEarnings: sum.TotalEarnings,
}
for _, t := range sum.Top5 {
resp.Top5 = append(resp.Top5, &pb.TopExhibitionItem{
AssetId: t.AssetID, AssetName: t.AssetName, AssetThumb: t.AssetThumb,
Duration_7D: t.Duration7d, Earnings_7D: t.Earnings7d, AvgEarnings: t.AvgEarnings,
})
}
_ = s.cache.SetJSON(ctx, key, resp)
return resp, nil
}
// ===== 4. 点赞按等级 =====
func (s *DashboardService) GetLikeIncomeByLevel(ctx context.Context, userID, starID int64) (*pb.GetLikeIncomeByLevelResponse, error) {
key := CacheKey("like_income_by_level", starID, userID)
var cached pb.GetLikeIncomeByLevelResponse
if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok {
return &cached, nil
}
levels, totalCount, totalIncome, err := s.repo.GetLikeIncomeByLevel(ctx, userID, starID)
if err != nil {
return nil, err
}
resp := &pb.GetLikeIncomeByLevelResponse{
TotalLikeCount: totalCount,
TotalIncome: totalIncome,
}
for _, l := range levels {
resp.Levels = append(resp.Levels, &pb.LikeIncomeLevelItem{
Level: l.Level, AssetCount: l.AssetCount, TotalIncome: l.Income, Thumb: l.Thumb,
})
}
_ = s.cache.SetJSON(ctx, key, resp)
return resp, nil
}
// ===== 5. 藏品 TOP5 =====
func (s *DashboardService) GetTopAssetsByEarning(ctx context.Context, userID, starID int64) (*pb.GetTopAssetsByEarningResponse, error) {
key := CacheKey("top_assets", starID, userID)
var cached pb.GetTopAssetsByEarningResponse
if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok {
return &cached, nil
}
rows, err := s.repo.GetTopAssetsByEarning(ctx, userID, starID)
if err != nil {
return nil, err
}
resp := &pb.GetTopAssetsByEarningResponse{}
for _, t := range rows {
resp.Items = append(resp.Items, &pb.TopAssetItem{
AssetId: t.AssetID, AssetName: t.AssetName, AssetThumb: t.AssetThumb,
TotalEarnings: t.TotalEarnings, Rank: t.Rank,
})
}
_ = s.cache.SetJSON(ctx, key, resp)
return resp, nil
}
// ===== 6. 等级分布 =====
func (s *DashboardService) GetAssetLevelDistribution(ctx context.Context, userID, starID int64) (*pb.GetAssetLevelDistributionResponse, error) {
key := CacheKey("level_distribution", starID, userID)
var cached pb.GetAssetLevelDistributionResponse
if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok {
return &cached, nil
}
items, err := s.repo.GetAssetLevelDistribution(ctx, userID, starID)
if err != nil {
return nil, err
}
resp := &pb.GetAssetLevelDistributionResponse{}
for _, i := range items {
resp.Items = append(resp.Items, &pb.AssetLevelItem{
Level: i.Level, Count: i.Count, Total: i.Total,
})
}
_ = s.cache.SetJSON(ctx, key, resp)
return resp, nil
}
// ===== 7. 升级进度 =====
func (s *DashboardService) GetAssetUpgradeProgress(ctx context.Context, userID, starID int64) (*pb.GetAssetUpgradeProgressResponse, error) {
key := CacheKey("upgrade_progress", starID, userID)
var cached pb.GetAssetUpgradeProgressResponse
if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok {
return &cached, nil
}
upcoming, recent, err := s.repo.GetAssetUpgradeProgress(ctx, userID, starID)
if err != nil {
return nil, err
}
resp := &pb.GetAssetUpgradeProgressResponse{}
for _, u := range upcoming {
resp.Upcoming = append(resp.Upcoming, &pb.UpcomingLevelUpItem{
AssetId: u.AssetID, AssetName: u.AssetName, AssetThumb: u.AssetThumb,
LikeProgress: u.LikeProgress, DurationProgress: u.DurationProgress,
})
}
for _, r := range recent {
resp.Recent = append(resp.Recent, &pb.RecentLevelUpItem{
AssetId: r.AssetID, AssetName: r.AssetName, AssetThumb: r.AssetThumb,
NewLevel: r.NewLevel, UpgradeTime: r.UpgradeTime,
})
}
_ = s.cache.SetJSON(ctx, key, resp)
return resp, nil
}
// helper
func avgInt64(total int64, n int) int64 {
if n == 0 {
return 0
}
return total / int64(n)
}
// sentinel
var _ = fmt.Sprintf
var _ = time.Second