package repository import ( "context" "database/sql" "fmt" "github.com/topfans/backend/services/statisticService/model" ) // MetricRepository 预聚表 / 物化视图维护 type MetricRepository struct { db *sql.DB schema string } // NewMetricRepository 构造 MetricRepository func NewMetricRepository(db *sql.DB, schema string) *MetricRepository { return &MetricRepository{db: db, schema: schema} } // UpsertRecentLevelUp 同步写入最近升级记录(仅当 event_type == "asset.level_up") func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error { if e == nil || e.EventType != "asset.level_up" { return nil } assetID := e.Properties["asset_id"] fromLevel := e.Properties["from"] toLevel := e.Properties["to"] upgradeTime := e.OccurredAt if assetID == "" || toLevel == "" { return nil } _, err := r.db.ExecContext(ctx, fmt.Sprintf(` INSERT INTO %s.metric_recent_level_ups (user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) `, r.schema), e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "") return err } // RefreshWeeklyUserIncome 全量重算本周 rank + total(pg_try_advisory_lock 防多实例) func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error { var got bool if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil { return err } if !got { return nil // 抢不到锁本轮跳过 } defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)") _, err := r.db.ExecContext(ctx, fmt.Sprintf(` INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star) SELECT star_id, user_id, DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start, SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal, ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star FROM %s.events WHERE event_type IN ('exhibition.revenue', 'crystal.change') AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai') GROUP BY star_id, user_id, DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai') ON CONFLICT (star_id, user_id, week_start) DO UPDATE SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW() `, r.schema, r.schema)) return err } // RefreshUpcomingLevelUps 计算每个 asset 的 like_progress + duration_progress // 数据源: public.assets (id/owner_uid/star_id/is_active/deleted_at/like_count) // + public.asset_level_records (current_level + season_likes + season_exhibition_hours) // + public.asset_levels (level + require_likes + require_hours) // 语义: 用「赛季累计点赞/赛季累计展出时长」除以「升级阈值」得到进度(%) // 排除条件: // 1) 已达最高级(UR)—— 没东西可升 // 2) 初始等级 N —— 进度无意义(require_likes=0, require_hours=0) // 3) NULL 防护 —— LEAST(100, NULL) 在 PG 里返回 100(不是 NULL),会污染显示 func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error { _, err := r.db.ExecContext(ctx, fmt.Sprintf(` INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress) SELECT a.owner_uid AS user_id, a.star_id, a.id AS asset_id, COALESCE(LEAST(100, (alr.season_likes::FLOAT / NULLIF(al.require_likes, 0) * 100)::INT), 0) AS like_progress, COALESCE(LEAST(100, (alr.season_exhibition_hours::FLOAT / NULLIF(al.require_hours, 0) * 100)::INT), 0) AS duration_progress FROM public.assets a JOIN public.asset_level_records alr ON alr.asset_id = a.id JOIN public.asset_levels al ON al.level = alr.current_level WHERE a.is_active = TRUE AND a.deleted_at IS NULL AND al.level_order < (SELECT MAX(level_order) FROM public.asset_levels) AND alr.current_level <> 'N' ON CONFLICT (user_id, star_id, asset_id) DO UPDATE SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW() `, r.schema)) return err }