topfans/backend/services/statisticService/repository/metric_repo.go
zerosaturation bed8f8e578 feat(statistic): T4-T8 event collection framework (Event + Sink + Repo + Service + Workers)
- Event model + ToJSON
- EventSink interface + ChannelEventSink (non-blocking Submit)
- event_repo: batch INSERT ON CONFLICT DO NOTHING dedup
- event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill
- event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up
- metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock)
- partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup)
- 22 unit + integration tests (model/repo/service/sink/worker)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-08 17:20:53 +08:00

89 lines
3.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package repository
import (
"context"
"database/sql"
"fmt"
"github.com/topfans/backend/services/statisticService/model"
)
// MetricRepository 预聚表 / 物化视图维护
type MetricRepository struct {
db *sql.DB
schema string
}
// NewMetricRepository 构造 MetricRepository
func NewMetricRepository(db *sql.DB, schema string) *MetricRepository {
return &MetricRepository{db: db, schema: schema}
}
// UpsertRecentLevelUp 同步写入最近升级记录(仅当 event_type == "asset.level_up"
func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error {
if e == nil || e.EventType != "asset.level_up" {
return nil
}
assetID := e.Properties["asset_id"]
fromLevel := e.Properties["from"]
toLevel := e.Properties["to"]
upgradeTime := e.OccurredAt
if assetID == "" || toLevel == "" {
return nil
}
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_recent_level_ups
(user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, r.schema),
e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "")
return err
}
// RefreshWeeklyUserIncome 全量重算本周 rank + totalpg_try_advisory_lock 防多实例)
func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error {
var got bool
if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil {
return err
}
if !got {
return nil // 抢不到锁本轮跳过
}
defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star)
SELECT
star_id, user_id,
DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start,
SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0
THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal,
ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star
FROM %s.events
WHERE event_type IN ('exhibition.revenue', 'crystal.change')
AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai')
GROUP BY star_id, user_id
ON CONFLICT (star_id, user_id, week_start) DO UPDATE
SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW()
`, r.schema, r.schema))
return err
}
// RefreshUpcomingLevelUps 计算每个 asset 的 like_progress + duration_progress
// 注: public.asset_level_config 表名/字段名需 P1 末向 assetService 同学确认
func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error {
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress)
SELECT
a.user_id, a.star_id, a.id,
LEAST(100, (a.like_count::FLOAT / NULLIF(alc.upgrade_like_threshold, 0) * 100)::INT) AS like_progress,
LEAST(100, (EXTRACT(EPOCH FROM (NOW() - a.placed_at))::FLOAT / NULLIF(alc.upgrade_duration_seconds, 1) * 100)::INT) AS duration_progress
FROM public.assets a
JOIN public.asset_level_config alc ON alc.level = a.level
WHERE a.status = 'active' AND a.deleted_at IS NULL
ON CONFLICT (user_id, star_id, asset_id) DO UPDATE
SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW()
`, r.schema))
return err
}