diff --git a/backend/services/statisticService/client/user_rpc_client.go b/backend/services/statisticService/client/user_rpc_client.go new file mode 100644 index 0000000..460f19c --- /dev/null +++ b/backend/services/statisticService/client/user_rpc_client.go @@ -0,0 +1,39 @@ +package client + +import ( + "context" + + pbUser "github.com/topfans/backend/pkg/proto/user" +) + +// UserServiceClient 封装 userService 的 Dubbo 调用 +type UserServiceClient struct { + userSocial pbUser.UserSocialService +} + +// NewUserServiceClient 构造 +func NewUserServiceClient(svc pbUser.UserSocialService) *UserServiceClient { + return &UserServiceClient{userSocial: svc} +} + +// GetCrystalBalance 调 userService.GetFanProfile 取 crystal_balance +// (userService 没单独 GetCrystalBalance,FanProfile 含此字段) +func (c *UserServiceClient) GetCrystalBalance(ctx context.Context, userID, starID int64) (int64, error) { + resp, err := c.userSocial.GetFanProfile(ctx, &pbUser.GetFanProfileRequest{ + UserId: userID, + StarId: starID, + }) + if err != nil { + return 0, err + } + if resp == nil || resp.Base == nil { + return 0, nil + } + if resp.Base.Code != 0 { + return 0, nil + } + if resp.Profile == nil { + return 0, nil + } + return resp.Profile.CrystalBalance, nil +} diff --git a/backend/services/statisticService/provider/statistic_combined_provider.go b/backend/services/statisticService/provider/statistic_combined_provider.go new file mode 100644 index 0000000..74020f3 --- /dev/null +++ b/backend/services/statisticService/provider/statistic_combined_provider.go @@ -0,0 +1,102 @@ +package provider + +import ( + "context" + "strconv" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/statistic" + "github.com/topfans/backend/services/statisticService/metrics" + "github.com/topfans/backend/services/statisticService/service" +) + +// StatisticCombinedProvider 完整 StatisticService 实现(9 RPC) +// - 看板 7 RPC(T12 由 DashboardService 实现) +// - 事件 2 RPC(T9 由 StatisticInternalProvider 实现) +type StatisticCombinedProvider struct { + *StatisticInternalProvider + dashSvc *service.DashboardService +} + +// NewStatisticCombinedProvider 构造 +func NewStatisticCombinedProvider(internal *StatisticInternalProvider, dashSvc *service.DashboardService) *StatisticCombinedProvider { + return &StatisticCombinedProvider{ + StatisticInternalProvider: internal, + dashSvc: dashSvc, + } +} + +func userIDFromContext(ctx context.Context) int64 { + if v := ctx.Value("user_id"); v != nil { + switch s := v.(type) { + case string: + n, _ := strconv.ParseInt(s, 10, 64) + return n + case int64: + return s + case int: + return int64(s) + } + } + return 0 +} + +func (p *StatisticCombinedProvider) recordRPC(rpc string, start time.Time, err error) { + status := "ok" + if err != nil { + status = "error" + } + metrics.DashboardRPCTotal.WithLabelValues(rpc, status).Inc() + metrics.DashboardRPCDuration.WithLabelValues(rpc).Observe(time.Since(start).Seconds()) +} + +// ===== 看板 7 RPC ===== + +func (p *StatisticCombinedProvider) GetTodayOverview(ctx context.Context, req *pb.GetTodayOverviewRequest) (*pb.GetTodayOverviewResponse, error) { + t0 := time.Now() + defer func() { p.recordRPC("GetTodayOverview", t0, nil) }() + resp, err := p.dashSvc.GetTodayOverview(ctx, userIDFromContext(ctx), req.StarId) + if err != nil { + logger.Logger.Warn("GetTodayOverview failed", zap.Error(err)) + } + return resp, err +} + +func (p *StatisticCombinedProvider) Get7DayIncomeCurve(ctx context.Context, req *pb.Get7DayIncomeCurveRequest) (*pb.Get7DayIncomeCurveResponse, error) { + t0 := time.Now() + defer func() { p.recordRPC("Get7DayIncomeCurve", t0, nil) }() + return p.dashSvc.Get7DayIncomeCurve(ctx, userIDFromContext(ctx), req.StarId) +} + +func (p *StatisticCombinedProvider) GetExhibitionIncomeSummary(ctx context.Context, req *pb.GetExhibitionIncomeSummaryRequest) (*pb.GetExhibitionIncomeSummaryResponse, error) { + t0 := time.Now() + defer func() { p.recordRPC("GetExhibitionIncomeSummary", t0, nil) }() + return p.dashSvc.GetExhibitionIncomeSummary(ctx, userIDFromContext(ctx), req.StarId) +} + +func (p *StatisticCombinedProvider) GetLikeIncomeByLevel(ctx context.Context, req *pb.GetLikeIncomeByLevelRequest) (*pb.GetLikeIncomeByLevelResponse, error) { + t0 := time.Now() + defer func() { p.recordRPC("GetLikeIncomeByLevel", t0, nil) }() + return p.dashSvc.GetLikeIncomeByLevel(ctx, userIDFromContext(ctx), req.StarId) +} + +func (p *StatisticCombinedProvider) GetTopAssetsByEarning(ctx context.Context, req *pb.GetTopAssetsByEarningRequest) (*pb.GetTopAssetsByEarningResponse, error) { + t0 := time.Now() + defer func() { p.recordRPC("GetTopAssetsByEarning", t0, nil) }() + return p.dashSvc.GetTopAssetsByEarning(ctx, userIDFromContext(ctx), req.StarId) +} + +func (p *StatisticCombinedProvider) GetAssetLevelDistribution(ctx context.Context, req *pb.GetAssetLevelDistributionRequest) (*pb.GetAssetLevelDistributionResponse, error) { + t0 := time.Now() + defer func() { p.recordRPC("GetAssetLevelDistribution", t0, nil) }() + return p.dashSvc.GetAssetLevelDistribution(ctx, userIDFromContext(ctx), req.StarId) +} + +func (p *StatisticCombinedProvider) GetAssetUpgradeProgress(ctx context.Context, req *pb.GetAssetUpgradeProgressRequest) (*pb.GetAssetUpgradeProgressResponse, error) { + t0 := time.Now() + defer func() { p.recordRPC("GetAssetUpgradeProgress", t0, nil) }() + return p.dashSvc.GetAssetUpgradeProgress(ctx, userIDFromContext(ctx), req.StarId) +} diff --git a/backend/services/statisticService/provider/statistic_internal_provider.go b/backend/services/statisticService/provider/statistic_internal_provider.go new file mode 100644 index 0000000..da241fc --- /dev/null +++ b/backend/services/statisticService/provider/statistic_internal_provider.go @@ -0,0 +1,65 @@ +package provider + +import ( + "context" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/statistic" + eventPb "github.com/topfans/backend/pkg/proto/event" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/service" +) + +// StatisticInternalProvider 实现 Dubbo StatisticService 中的事件相关 RPC +// (TrackEvent / BatchTrackEvent) +// 业务侧通过 gRPC 调用此处 +type StatisticInternalProvider struct { + eventSvc *service.EventService +} + +// NewStatisticInternalProvider 构造 +func NewStatisticInternalProvider(eventSvc *service.EventService) *StatisticInternalProvider { + return &StatisticInternalProvider{eventSvc: eventSvc} +} + +// TrackEvent 接收单个事件 +func (p *StatisticInternalProvider) TrackEvent(ctx context.Context, e *eventPb.Event) (*pb.TrackEventResponse, error) { + logger.Logger.Debug("StatisticInternalProvider.TrackEvent", + zap.String("event_id", e.EventId), + zap.String("event_type", e.EventType)) + return p.eventSvc.TrackEvent(ctx, toModel(e)) +} + +// BatchTrackEvent 批量接收事件 +func (p *StatisticInternalProvider) BatchTrackEvent(ctx context.Context, req *eventPb.BatchEventRequest) (*pb.TrackEventResponse, error) { + events := make([]*model.Event, 0, len(req.Events)) + for _, e := range req.Events { + events = append(events, toModel(e)) + } + return p.eventSvc.BatchTrackEvent(ctx, events) +} + +// toModel protobuf -> domain model +func toModel(e *eventPb.Event) *model.Event { + occurredAt := time.UnixMilli(e.OccurredAt) + receivedAt := time.UnixMilli(e.ReceivedAt) + if receivedAt.IsZero() { + receivedAt = time.Now() + } + props := e.Properties + if props == nil { + props = map[string]string{} + } + return &model.Event{ + EventID: e.EventId, + UserID: e.UserId, + StarID: e.StarId, + EventType: e.EventType, + OccurredAt: occurredAt, + ReceivedAt: receivedAt, + Properties: props, + } +} diff --git a/backend/services/statisticService/repository/dashboard_repo.go b/backend/services/statisticService/repository/dashboard_repo.go new file mode 100644 index 0000000..7830870 --- /dev/null +++ b/backend/services/statisticService/repository/dashboard_repo.go @@ -0,0 +1,379 @@ +package repository + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +// DashboardRepository 看板数据访问 +type DashboardRepository struct { + db *sql.DB + schema string +} + +// NewDashboardRepository 构造 +func NewDashboardRepository(db *sql.DB, schema string) *DashboardRepository { + return &DashboardRepository{db: db, schema: schema} +} + +// TodayOverviewPart week_rank + week_total_users +type TodayOverviewPart struct { + WeekRank int32 + WeekTotalUsers int32 +} + +// GetWeekRank + week total users(从预聚表) +func (r *DashboardRepository) GetWeekRank(ctx context.Context, userID, starID int64) (*TodayOverviewPart, error) { + weekStart := weekStartMonday(time.Now()) + var rank sql.NullInt64 + err := r.db.QueryRowContext(ctx, fmt.Sprintf(` + SELECT rank_in_star FROM %s.metric_weekly_user_income + WHERE star_id=$1 AND user_id=$2 AND week_start=$3 + `, r.schema), starID, userID, weekStart).Scan(&rank) + if err == sql.ErrNoRows { + return &TodayOverviewPart{WeekRank: -1, WeekTotalUsers: 0}, nil + } + if err != nil { + return nil, err + } + var totalUsers int32 + err = r.db.QueryRowContext(ctx, fmt.Sprintf(` + SELECT COUNT(*)::INT FROM %s.metric_weekly_user_income + WHERE star_id=$1 AND week_start=$2 AND total_crystal > 0 + `, r.schema), starID, weekStart).Scan(&totalUsers) + if err != nil { + return nil, err + } + return &TodayOverviewPart{ + WeekRank: int32(rank.Int64), + WeekTotalUsers: totalUsers, + }, nil +} + +// GetTodayIncome 今日收入 +func (r *DashboardRepository) GetTodayIncome(ctx context.Context, userID, starID int64) (int64, error) { + var income sql.NullInt64 + err := r.db.QueryRowContext(ctx, fmt.Sprintf(` + SELECT COALESCE(SUM(CASE WHEN (properties->>'amount')::BIGINT > 0 + THEN (properties->>'amount')::BIGINT ELSE 0 END), 0) + FROM %s.events + WHERE user_id=$1 AND star_id=$2 + AND event_type IN ('exhibition.revenue', 'crystal.change') + AND received_at >= DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai') + `, r.schema), userID, starID).Scan(&income) + if err != nil && err != sql.ErrNoRows { + return 0, err + } + return income.Int64, nil +} + +// DailyIncomePoint 七日曲线 +type DailyIncomePoint struct { + Date string + Income int64 + IsToday bool + IsPeak bool +} + +// Get7DayIncomeCurve 七日收益曲线 +func (r *DashboardRepository) Get7DayIncomeCurve(ctx context.Context, userID, starID int64) ([]DailyIncomePoint, int64, error) { + rows, err := r.db.QueryContext(ctx, fmt.Sprintf(` + SELECT income_date::text, COALESCE(total_crystal, 0) + FROM %s.mv_daily_user_income + WHERE user_id=$1 AND star_id=$2 + AND income_date >= (DATE_TRUNC('day', NOW() AT TIME ZONE 'Asia/Shanghai') - INTERVAL '6 days')::date + ORDER BY income_date ASC + `, r.schema), userID, starID) + if err != nil { + return nil, 0, err + } + defer rows.Close() + points := make([]DailyIncomePoint, 0, 7) + var total, peak int64 + for rows.Next() { + var p DailyIncomePoint + if err := rows.Scan(&p.Date, &p.Income); err != nil { + return nil, 0, err + } + points = append(points, p) + total += p.Income + if p.Income > peak { + peak = p.Income + } + } + today := time.Now().In(shanghaiLoc()).Format("2006-01-02") + for i := range points { + points[i].IsToday = points[i].Date == today + points[i].IsPeak = points[i].Income == peak && peak > 0 + } + return points, total, nil +} + +// GetExhibitionIncomeSummary 展出收益中心 +type ExhibitionSummary struct { + ExhibitingCount int32 + StarbookCount int32 + TotalDuration string + TotalEarnings int64 + Top5 []TopExhibitionRow +} + +// TopExhibitionRow top5 藏品 +type TopExhibitionRow struct { + AssetID int64 + AssetName string + AssetThumb string + Duration7d string + Earnings7d int64 + AvgEarnings int32 +} + +func (r *DashboardRepository) GetExhibitionIncomeSummary(ctx context.Context, userID, starID int64) (*ExhibitionSummary, error) { + // 简化实现:直接统计 events(生产环境用 mv_daily_exhibition_revenue) + rows, err := r.db.QueryContext(ctx, fmt.Sprintf(` + SELECT + (properties->>'asset_id')::BIGINT AS asset_id, + SUM((properties->>'amount')::BIGINT) AS earnings, + SUM((properties->>'duration_ms')::BIGINT) AS duration_ms + FROM %s.events + WHERE user_id=$1 AND star_id=$2 + AND event_type='exhibition.revenue' + AND received_at >= NOW() - INTERVAL '7 days' + GROUP BY asset_id + ORDER BY earnings DESC + LIMIT 5 + `, r.schema), userID, starID) + if err != nil { + return nil, err + } + defer rows.Close() + var top5 []TopExhibitionRow + var totalEarnings int64 + for rows.Next() { + var t TopExhibitionRow + var durationMs int64 + if err := rows.Scan(&t.AssetID, &t.Earnings7d, &durationMs); err != nil { + return nil, err + } + t.Duration7d = formatDuration(durationMs) + if t.Earnings7d > 0 { + t.AvgEarnings = int32(t.Earnings7d / 7) + } + totalEarnings += t.Earnings7d + top5 = append(top5, t) + } + + // 简化:exhibiting_count / starbook_count / total_duration 留 0 + return &ExhibitionSummary{ + ExhibitingCount: 0, + StarbookCount: 0, + TotalDuration: "0:00:00:00", + TotalEarnings: totalEarnings, + Top5: top5, + }, nil +} + +// GetLikeIncomeByLevel 点赞按等级 +type LikeIncomeLevelRow struct { + Level string + AssetCount int32 + Income int64 + Thumb string +} + +func (r *DashboardRepository) GetLikeIncomeByLevel(ctx context.Context, userID, starID int64) ([]LikeIncomeLevelRow, int64, int64, error) { + rows, err := r.db.QueryContext(ctx, fmt.Sprintf(` + SELECT a.level, COUNT(*), SUM(COALESCE((e.properties->>'amount')::BIGINT, 0)), COALESCE(MIN(a.cover_url), '') + FROM %s.events e JOIN public.assets a ON a.id = (e.properties->>'asset_id')::BIGINT + WHERE e.user_id=$1 AND e.star_id=$2 AND e.event_type='asset.like' + GROUP BY a.level + ORDER BY SUM(COALESCE((e.properties->>'amount')::BIGINT, 0)) DESC + `, r.schema), userID, starID) + if err != nil { + return nil, 0, 0, err + } + defer rows.Close() + var levels []LikeIncomeLevelRow + var totalIncome, totalCount int64 + for rows.Next() { + var r LikeIncomeLevelRow + if err := rows.Scan(&r.Level, &r.AssetCount, &r.Income, &r.Thumb); err != nil { + return nil, 0, 0, err + } + levels = append(levels, r) + totalIncome += r.Income + totalCount += int64(r.AssetCount) + } + return levels, totalCount, totalIncome, nil +} + +// GetTopAssetsByEarning 藏品 TOP5 +type TopAssetRow struct { + AssetID int64 + AssetName string + AssetThumb string + TotalEarnings int64 + Rank int32 +} + +func (r *DashboardRepository) GetTopAssetsByEarning(ctx context.Context, userID, starID int64) ([]TopAssetRow, error) { + rows, err := r.db.QueryContext(ctx, fmt.Sprintf(` + SELECT + (properties->>'asset_id')::BIGINT, + COALESCE(MIN(a.name), ''), + COALESCE(MIN(a.cover_url), ''), + SUM((properties->>'amount')::BIGINT) + FROM %s.events e LEFT JOIN public.assets a ON a.id = (e.properties->>'asset_id')::BIGINT + WHERE e.user_id=$1 AND e.star_id=$2 + AND e.event_type='exhibition.revenue' + AND e.received_at >= NOW() - INTERVAL '7 days' + GROUP BY (properties->>'asset_id')::BIGINT + ORDER BY SUM((properties->>'amount')::BIGINT) DESC + LIMIT 5 + `, r.schema), userID, starID) + if err != nil { + return nil, err + } + defer rows.Close() + var rows2 []TopAssetRow + var rank int32 = 1 + for rows.Next() { + var t TopAssetRow + if err := rows.Scan(&t.AssetID, &t.AssetName, &t.AssetThumb, &t.TotalEarnings); err != nil { + return nil, err + } + t.Rank = rank + rank++ + rows2 = append(rows2, t) + } + return rows2, nil +} + +// GetAssetLevelDistribution 等级分布 +type AssetLevelCount struct { + Level string + Count int32 + Total int32 +} + +func (r *DashboardRepository) GetAssetLevelDistribution(ctx context.Context, userID, starID int64) ([]AssetLevelCount, error) { + rows, err := r.db.QueryContext(ctx, ` + SELECT level, COUNT(*)::INT + FROM public.assets + WHERE user_id=$1 AND star_id=$2 + AND status='active' AND deleted_at IS NULL + GROUP BY level + ORDER BY COUNT(*) DESC + `, userID, starID) + if err != nil { + return nil, err + } + defer rows.Close() + var out []AssetLevelCount + var total int32 + for rows.Next() { + var a AssetLevelCount + if err := rows.Scan(&a.Level, &a.Count); err != nil { + return nil, err + } + total += a.Count + out = append(out, a) + } + for i := range out { + out[i].Total = total + } + return out, nil +} + +// GetAssetUpgradeProgress 升级进度 +type UpcomingLevelUpRow struct { + AssetID int64 + AssetName string + AssetThumb string + LikeProgress int32 + DurationProgress int32 +} +type RecentLevelUpRow struct { + AssetID int64 + AssetName string + AssetThumb string + NewLevel string + UpgradeTime int64 +} + +func (r *DashboardRepository) GetAssetUpgradeProgress(ctx context.Context, userID, starID int64) ([]UpcomingLevelUpRow, []RecentLevelUpRow, error) { + // upcoming 从 metric_upcoming_level_ups + rows, err := r.db.QueryContext(ctx, fmt.Sprintf(` + SELECT m.asset_id, COALESCE(a.name, ''), COALESCE(a.cover_url, ''), m.like_progress, m.duration_progress + FROM %s.metric_upcoming_level_ups m LEFT JOIN public.assets a ON a.id = m.asset_id + WHERE m.user_id=$1 AND m.star_id=$2 + ORDER BY m.like_progress + m.duration_progress DESC + LIMIT 10 + `, r.schema), userID, starID) + if err != nil { + return nil, nil, err + } + defer rows.Close() + var upcoming []UpcomingLevelUpRow + for rows.Next() { + var u UpcomingLevelUpRow + if err := rows.Scan(&u.AssetID, &u.AssetName, &u.AssetThumb, &u.LikeProgress, &u.DurationProgress); err != nil { + return nil, nil, err + } + upcoming = append(upcoming, u) + } + + // recent 从 metric_recent_level_ups + rows2, err := r.db.QueryContext(ctx, fmt.Sprintf(` + SELECT asset_id, COALESCE(asset_name, ''), COALESCE(asset_thumb, ''), to_level, EXTRACT(EPOCH FROM upgrade_time)::BIGINT + FROM %s.metric_recent_level_ups + WHERE user_id=$1 AND star_id=$2 + AND upgrade_time >= NOW() - INTERVAL '30 days' + ORDER BY upgrade_time DESC + LIMIT 5 + `, r.schema), userID, starID) + if err != nil { + return upcoming, nil, err + } + defer rows2.Close() + var recent []RecentLevelUpRow + for rows2.Next() { + var r RecentLevelUpRow + if err := rows2.Scan(&r.AssetID, &r.AssetName, &r.AssetThumb, &r.NewLevel, &r.UpgradeTime); err != nil { + return upcoming, nil, err + } + recent = append(recent, r) + } + return upcoming, recent, nil +} + +// helpers + +func shanghaiLoc() *time.Location { + loc, _ := time.LoadLocation("Asia/Shanghai") + if loc == nil { + loc = time.FixedZone("Asia/Shanghai", 8*3600) + } + return loc +} + +func weekStartMonday(t time.Time) time.Time { + t2 := t.In(shanghaiLoc()) + offset := (int(t2.Weekday()) + 6) % 7 // Mon=0 + return time.Date(t2.Year(), t2.Month(), t2.Day()-offset, 0, 0, 0, 0, shanghaiLoc()) +} + +func formatDuration(ms int64) string { + totalSec := ms / 1000 + h := totalSec / 3600 + m := (totalSec % 3600) / 60 + s := totalSec % 60 + if h >= 24 { + d := h / 24 + h = h % 24 + return fmt.Sprintf("%d:%02d:%02d:%02d", d, h, m, s) + } + return fmt.Sprintf("%d:%02d:%02d", h, m, s) +} diff --git a/backend/services/statisticService/service/cache.go b/backend/services/statisticService/service/cache.go new file mode 100644 index 0000000..b8f0b53 --- /dev/null +++ b/backend/services/statisticService/service/cache.go @@ -0,0 +1,64 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +// Cache Redis 缓存封装(5min TTL + 1min 空值 TTL 防穿透) +type Cache struct { + rdb *redis.Client + ttl time.Duration + emptyTTL time.Duration + missedHits int64 +} + +// NewCache 构造 +func NewCache(rdb *redis.Client) *Cache { + return &Cache{ + rdb: rdb, + ttl: 5 * time.Minute, + emptyTTL: 1 * time.Minute, + } +} + +// GetJSON 取缓存 + JSON 反序列化 +func (c *Cache) GetJSON(ctx context.Context, key string, dst interface{}) (bool, error) { + v, err := c.rdb.Get(ctx, key).Result() + if err == redis.Nil { + return false, nil + } + if err != nil { + return false, err + } + if v == "null" { + return false, nil // 缓存穿透防护的空标记 + } + if err := json.Unmarshal([]byte(v), dst); err != nil { + return false, err + } + return true, nil +} + +// SetJSON 序列化 + 缓存(5min TTL) +func (c *Cache) SetJSON(ctx context.Context, key string, value interface{}) error { + b, err := json.Marshal(value) + if err != nil { + return err + } + return c.rdb.Set(ctx, key, b, c.ttl).Err() +} + +// SetEmpty 缓存空值(1min TTL,防穿透) +func (c *Cache) SetEmpty(ctx context.Context, key string) error { + return c.rdb.Set(ctx, key, "null", c.emptyTTL).Err() +} + +// CacheKey 看板缓存 key 格式 +func CacheKey(rpc string, starID, userID int64) string { + return fmt.Sprintf("dash:%s:%d:%d", rpc, starID, userID) +} diff --git a/backend/services/statisticService/service/dashboard_service.go b/backend/services/statisticService/service/dashboard_service.go new file mode 100644 index 0000000..bafb3b1 --- /dev/null +++ b/backend/services/statisticService/service/dashboard_service.go @@ -0,0 +1,222 @@ +package service + +import ( + "context" + "fmt" + "time" + + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/statistic" + "github.com/topfans/backend/services/statisticService/repository" +) + +// UserRPCClient 跨服务调用 userService 的接口 +type UserRPCClient interface { + GetCrystalBalance(ctx context.Context, userID, starID int64) (int64, error) +} + +// DashboardService 看板 7 RPC 业务逻辑 +type DashboardService struct { + repo *repository.DashboardRepository + cache *Cache + userRPC UserRPCClient +} + +// NewDashboardService 构造 +func NewDashboardService(repo *repository.DashboardRepository, cache *Cache, userRPC UserRPCClient) *DashboardService { + return &DashboardService{repo: repo, cache: cache, userRPC: userRPC} +} + +// ===== 1. 今日概览 ===== +func (s *DashboardService) GetTodayOverview(ctx context.Context, userID, starID int64) (*pb.GetTodayOverviewResponse, error) { + key := CacheKey("today_overview", starID, userID) + var cached pb.GetTodayOverviewResponse + if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok { + return &cached, nil + } + + part, err := s.repo.GetWeekRank(ctx, userID, starID) + if err != nil { + return nil, err + } + todayIncome, err := s.repo.GetTodayIncome(ctx, userID, starID) + if err != nil { + return nil, err + } + var crystal int64 + if s.userRPC != nil { + if c, err := s.userRPC.GetCrystalBalance(ctx, userID, starID); err == nil { + crystal = c + } else { + logger.Logger.Warn("userService.GetCrystalBalance failed, use 0", zap.Error(err)) + } + } + resp := &pb.GetTodayOverviewResponse{ + CrystalBalance: crystal, + TodayIncome: todayIncome, + WeekRank: part.WeekRank, + WeekTotalUsers: part.WeekTotalUsers, + } + _ = s.cache.SetJSON(ctx, key, resp) + return resp, nil +} + +// ===== 2. 七日收益曲线 ===== +func (s *DashboardService) Get7DayIncomeCurve(ctx context.Context, userID, starID int64) (*pb.Get7DayIncomeCurveResponse, error) { + key := CacheKey("7day_income_curve", starID, userID) + var cached pb.Get7DayIncomeCurveResponse + if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok { + return &cached, nil + } + points, total, err := s.repo.Get7DayIncomeCurve(ctx, userID, starID) + if err != nil { + return nil, err + } + resp := &pb.Get7DayIncomeCurveResponse{ + TotalIncome: total, + AvgIncome: avgInt64(total, 7), + } + for _, p := range points { + resp.Points = append(resp.Points, &pb.DailyIncomePoint{ + Date: p.Date, Income: p.Income, IsToday: p.IsToday, IsPeak: p.IsPeak, + }) + } + _ = s.cache.SetJSON(ctx, key, resp) + return resp, nil +} + +// ===== 3. 展出收益中心 ===== +func (s *DashboardService) GetExhibitionIncomeSummary(ctx context.Context, userID, starID int64) (*pb.GetExhibitionIncomeSummaryResponse, error) { + key := CacheKey("exhibition_summary", starID, userID) + var cached pb.GetExhibitionIncomeSummaryResponse + if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok { + return &cached, nil + } + sum, err := s.repo.GetExhibitionIncomeSummary(ctx, userID, starID) + if err != nil { + return nil, err + } + resp := &pb.GetExhibitionIncomeSummaryResponse{ + ExhibitingCount: sum.ExhibitingCount, + StarbookCount: sum.StarbookCount, + TotalDuration: sum.TotalDuration, + TotalEarnings: sum.TotalEarnings, + } + for _, t := range sum.Top5 { + resp.Top5 = append(resp.Top5, &pb.TopExhibitionItem{ + AssetId: t.AssetID, AssetName: t.AssetName, AssetThumb: t.AssetThumb, + Duration_7D: t.Duration7d, Earnings_7D: t.Earnings7d, AvgEarnings: t.AvgEarnings, + }) + } + _ = s.cache.SetJSON(ctx, key, resp) + return resp, nil +} + +// ===== 4. 点赞按等级 ===== +func (s *DashboardService) GetLikeIncomeByLevel(ctx context.Context, userID, starID int64) (*pb.GetLikeIncomeByLevelResponse, error) { + key := CacheKey("like_income_by_level", starID, userID) + var cached pb.GetLikeIncomeByLevelResponse + if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok { + return &cached, nil + } + levels, totalCount, totalIncome, err := s.repo.GetLikeIncomeByLevel(ctx, userID, starID) + if err != nil { + return nil, err + } + resp := &pb.GetLikeIncomeByLevelResponse{ + TotalLikeCount: totalCount, + TotalIncome: totalIncome, + } + for _, l := range levels { + resp.Levels = append(resp.Levels, &pb.LikeIncomeLevelItem{ + Level: l.Level, AssetCount: l.AssetCount, TotalIncome: l.Income, Thumb: l.Thumb, + }) + } + _ = s.cache.SetJSON(ctx, key, resp) + return resp, nil +} + +// ===== 5. 藏品 TOP5 ===== +func (s *DashboardService) GetTopAssetsByEarning(ctx context.Context, userID, starID int64) (*pb.GetTopAssetsByEarningResponse, error) { + key := CacheKey("top_assets", starID, userID) + var cached pb.GetTopAssetsByEarningResponse + if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok { + return &cached, nil + } + rows, err := s.repo.GetTopAssetsByEarning(ctx, userID, starID) + if err != nil { + return nil, err + } + resp := &pb.GetTopAssetsByEarningResponse{} + for _, t := range rows { + resp.Items = append(resp.Items, &pb.TopAssetItem{ + AssetId: t.AssetID, AssetName: t.AssetName, AssetThumb: t.AssetThumb, + TotalEarnings: t.TotalEarnings, Rank: t.Rank, + }) + } + _ = s.cache.SetJSON(ctx, key, resp) + return resp, nil +} + +// ===== 6. 等级分布 ===== +func (s *DashboardService) GetAssetLevelDistribution(ctx context.Context, userID, starID int64) (*pb.GetAssetLevelDistributionResponse, error) { + key := CacheKey("level_distribution", starID, userID) + var cached pb.GetAssetLevelDistributionResponse + if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok { + return &cached, nil + } + items, err := s.repo.GetAssetLevelDistribution(ctx, userID, starID) + if err != nil { + return nil, err + } + resp := &pb.GetAssetLevelDistributionResponse{} + for _, i := range items { + resp.Items = append(resp.Items, &pb.AssetLevelItem{ + Level: i.Level, Count: i.Count, Total: i.Total, + }) + } + _ = s.cache.SetJSON(ctx, key, resp) + return resp, nil +} + +// ===== 7. 升级进度 ===== +func (s *DashboardService) GetAssetUpgradeProgress(ctx context.Context, userID, starID int64) (*pb.GetAssetUpgradeProgressResponse, error) { + key := CacheKey("upgrade_progress", starID, userID) + var cached pb.GetAssetUpgradeProgressResponse + if ok, _ := s.cache.GetJSON(ctx, key, &cached); ok { + return &cached, nil + } + upcoming, recent, err := s.repo.GetAssetUpgradeProgress(ctx, userID, starID) + if err != nil { + return nil, err + } + resp := &pb.GetAssetUpgradeProgressResponse{} + for _, u := range upcoming { + resp.Upcoming = append(resp.Upcoming, &pb.UpcomingLevelUpItem{ + AssetId: u.AssetID, AssetName: u.AssetName, AssetThumb: u.AssetThumb, + LikeProgress: u.LikeProgress, DurationProgress: u.DurationProgress, + }) + } + for _, r := range recent { + resp.Recent = append(resp.Recent, &pb.RecentLevelUpItem{ + AssetId: r.AssetID, AssetName: r.AssetName, AssetThumb: r.AssetThumb, + NewLevel: r.NewLevel, UpgradeTime: r.UpgradeTime, + }) + } + _ = s.cache.SetJSON(ctx, key, resp) + return resp, nil +} + +// helper +func avgInt64(total int64, n int) int64 { + if n == 0 { + return 0 + } + return total / int64(n) +} + +// sentinel +var _ = fmt.Sprintf +var _ = time.Second