topfans/backend/services/statisticService/repository/metric_repo.go
2026-06-09 00:37:42 +08:00

104 lines
4.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"context"
"database/sql"
"fmt"
"github.com/topfans/backend/services/statisticService/model"
)
// MetricRepository 预聚表 / 物化视图维护
type MetricRepository struct {
db *sql.DB
schema string
}
// NewMetricRepository 构造 MetricRepository
func NewMetricRepository(db *sql.DB, schema string) *MetricRepository {
return &MetricRepository{db: db, schema: schema}
}
// UpsertRecentLevelUp 同步写入最近升级记录(仅当 event_type == "asset.level_up"
func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error {
if e == nil || e.EventType != "asset.level_up" {
return nil
}
assetID := e.Properties["asset_id"]
fromLevel := e.Properties["from"]
toLevel := e.Properties["to"]
upgradeTime := e.OccurredAt
if assetID == "" || toLevel == "" {
return nil
}
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_recent_level_ups
(user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, r.schema),
e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "")
return err
}
// RefreshWeeklyUserIncome 全量重算本周 rank + totalpg_try_advisory_lock 防多实例)
func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error {
var got bool
if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil {
return err
}
if !got {
return nil // 抢不到锁本轮跳过
}
defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star)
SELECT
star_id, user_id,
DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start,
SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0
THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal,
ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star
FROM %s.events
WHERE event_type IN ('exhibition.revenue', 'crystal.change')
AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai')
GROUP BY star_id, user_id, DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')
ON CONFLICT (star_id, user_id, week_start) DO UPDATE
SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW()
`, r.schema, r.schema))
return err
}
// RefreshUpcomingLevelUps 计算每个 asset 的 like_progress + duration_progress
// 数据源: public.assets (id/owner_uid/star_id/is_active/deleted_at/like_count)
// + public.asset_level_records (current_level + season_likes + season_exhibition_hours)
// + public.asset_levels (level + require_likes + require_hours)
// 语义: 用「赛季累计点赞/赛季累计展出时长」除以「升级阈值」得到进度(%
// 排除条件:
// 1) 已达最高级UR—— 没东西可升
// 2) 初始等级 N —— 进度无意义require_likes=0, require_hours=0
// 3) NULL 防护 —— LEAST(100, NULL) 在 PG 里返回 100不是 NULL会污染显示
func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error {
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress)
SELECT
a.owner_uid AS user_id,
a.star_id,
a.id AS asset_id,
COALESCE(LEAST(100, (alr.season_likes::FLOAT / NULLIF(al.require_likes, 0) * 100)::INT), 0) AS like_progress,
COALESCE(LEAST(100, (alr.season_exhibition_hours::FLOAT / NULLIF(al.require_hours, 0) * 100)::INT), 0) AS duration_progress
FROM public.assets a
JOIN public.asset_level_records alr ON alr.asset_id = a.id
JOIN public.asset_levels al ON al.level = alr.current_level
WHERE a.is_active = TRUE
AND a.deleted_at IS NULL
AND al.level_order < (SELECT MAX(level_order) FROM public.asset_levels)
AND alr.current_level <> 'N'
ON CONFLICT (user_id, star_id, asset_id) DO UPDATE
SET like_progress = EXCLUDED.like_progress,
duration_progress = EXCLUDED.duration_progress,
updated_at = NOW()
`, r.schema))
return err
}