- Event model + ToJSON - EventSink interface + ChannelEventSink (non-blocking Submit) - event_repo: batch INSERT ON CONFLICT DO NOTHING dedup - event_service: 7-type whitelist + 1KB props limit + ReceivedAt auto-fill - event_flusher: 100/1s batch + sync metric_recent_level_ups on level_up - metric_weekly + metric_upcoming workers (5min/15min with pg_try_advisory_lock) - partitioner: 7-day pre-create + 30-day cleanup (00:05 create / 00:30 cleanup) - 22 unit + integration tests (model/repo/service/sink/worker) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
89 lines
3.6 KiB
Go
89 lines
3.6 KiB
Go
package repository
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
)
|
||
|
||
// MetricRepository 预聚表 / 物化视图维护
|
||
type MetricRepository struct {
|
||
db *sql.DB
|
||
schema string
|
||
}
|
||
|
||
// NewMetricRepository 构造 MetricRepository
|
||
func NewMetricRepository(db *sql.DB, schema string) *MetricRepository {
|
||
return &MetricRepository{db: db, schema: schema}
|
||
}
|
||
|
||
// UpsertRecentLevelUp 同步写入最近升级记录(仅当 event_type == "asset.level_up")
|
||
func (r *MetricRepository) UpsertRecentLevelUp(ctx context.Context, e *model.Event) error {
|
||
if e == nil || e.EventType != "asset.level_up" {
|
||
return nil
|
||
}
|
||
assetID := e.Properties["asset_id"]
|
||
fromLevel := e.Properties["from"]
|
||
toLevel := e.Properties["to"]
|
||
upgradeTime := e.OccurredAt
|
||
if assetID == "" || toLevel == "" {
|
||
return nil
|
||
}
|
||
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
|
||
INSERT INTO %s.metric_recent_level_ups
|
||
(user_id, star_id, asset_id, from_level, to_level, upgrade_time, asset_name, asset_thumb)
|
||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||
`, r.schema),
|
||
e.UserID, e.StarID, assetID, fromLevel, toLevel, upgradeTime, "", "")
|
||
return err
|
||
}
|
||
|
||
// RefreshWeeklyUserIncome 全量重算本周 rank + total(pg_try_advisory_lock 防多实例)
|
||
func (r *MetricRepository) RefreshWeeklyUserIncome(ctx context.Context) error {
|
||
var got bool
|
||
if err := r.db.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(123456)").Scan(&got); err != nil {
|
||
return err
|
||
}
|
||
if !got {
|
||
return nil // 抢不到锁本轮跳过
|
||
}
|
||
defer r.db.ExecContext(ctx, "SELECT pg_advisory_unlock(123456)")
|
||
|
||
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
|
||
INSERT INTO %s.metric_weekly_user_income (star_id, user_id, week_start, total_crystal, rank_in_star)
|
||
SELECT
|
||
star_id, user_id,
|
||
DATE_TRUNC('week', received_at AT TIME ZONE 'Asia/Shanghai')::date AS week_start,
|
||
SUM(CASE WHEN event_type IN ('exhibition.revenue', 'crystal.change') AND (properties->>'amount')::BIGINT > 0
|
||
THEN (properties->>'amount')::BIGINT ELSE 0 END) AS total_crystal,
|
||
ROW_NUMBER() OVER (PARTITION BY star_id ORDER BY SUM(CASE WHEN event_type IN ('exhibition.revenue','crystal.change') AND (properties->>'amount')::BIGINT > 0 THEN (properties->>'amount')::BIGINT ELSE 0 END) DESC) AS rank_in_star
|
||
FROM %s.events
|
||
WHERE event_type IN ('exhibition.revenue', 'crystal.change')
|
||
AND received_at >= DATE_TRUNC('week', NOW() AT TIME ZONE 'Asia/Shanghai')
|
||
GROUP BY star_id, user_id
|
||
ON CONFLICT (star_id, user_id, week_start) DO UPDATE
|
||
SET total_crystal = EXCLUDED.total_crystal, rank_in_star = EXCLUDED.rank_in_star, updated_at = NOW()
|
||
`, r.schema, r.schema))
|
||
return err
|
||
}
|
||
|
||
// RefreshUpcomingLevelUps 计算每个 asset 的 like_progress + duration_progress
|
||
// 注: public.asset_level_config 表名/字段名需 P1 末向 assetService 同学确认
|
||
func (r *MetricRepository) RefreshUpcomingLevelUps(ctx context.Context) error {
|
||
_, err := r.db.ExecContext(ctx, fmt.Sprintf(`
|
||
INSERT INTO %s.metric_upcoming_level_ups (user_id, star_id, asset_id, like_progress, duration_progress)
|
||
SELECT
|
||
a.user_id, a.star_id, a.id,
|
||
LEAST(100, (a.like_count::FLOAT / NULLIF(alc.upgrade_like_threshold, 0) * 100)::INT) AS like_progress,
|
||
LEAST(100, (EXTRACT(EPOCH FROM (NOW() - a.placed_at))::FLOAT / NULLIF(alc.upgrade_duration_seconds, 1) * 100)::INT) AS duration_progress
|
||
FROM public.assets a
|
||
JOIN public.asset_level_config alc ON alc.level = a.level
|
||
WHERE a.status = 'active' AND a.deleted_at IS NULL
|
||
ON CONFLICT (user_id, star_id, asset_id) DO UPDATE
|
||
SET like_progress = EXCLUDED.like_progress, duration_progress = EXCLUDED.duration_progress, updated_at = NOW()
|
||
`, r.schema))
|
||
return err
|
||
}
|