diff --git a/backend/migrations/2026_06_08_001_statistic_events.sql b/backend/migrations/2026_06_08_001_statistic_events.sql new file mode 100644 index 0000000..928278d --- /dev/null +++ b/backend/migrations/2026_06_08_001_statistic_events.sql @@ -0,0 +1,37 @@ +-- statistic 服务 events 原始表 +-- 创建时间: 2026-06-08 +-- 说明: 事件原始表,按 received_at 按日分区 +-- 关联: spec §3.2 + +-- 1. 创建 schema +CREATE SCHEMA IF NOT EXISTS statistic; + +-- 2. 创建 events 主表(按 received_at 按日分区) +CREATE TABLE IF NOT EXISTS statistic.events ( + id BIGSERIAL, + event_id UUID NOT NULL, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + event_type VARCHAR(64) NOT NULL, + occurred_at TIMESTAMPTZ NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + properties JSONB NOT NULL DEFAULT '{}', + + PRIMARY KEY (id, received_at) +) PARTITION BY RANGE (received_at); + +-- 3. 唯一约束(去重):同一 event_id 不能重复 +CREATE UNIQUE INDEX IF NOT EXISTS idx_events_event_id + ON statistic.events (event_id, received_at); + +-- 4. 看板查询主索引(覆盖 90% 查询) +CREATE INDEX IF NOT EXISTS idx_events_user_star_type_time + ON statistic.events (user_id, star_id, event_type, received_at DESC); + +-- 5. 趋势分析索引 +CREATE INDEX IF NOT EXISTS idx_events_star_type_time + ON statistic.events (star_id, event_type, received_at DESC); + +-- 6. JSONB 属性 GIN 索引 +CREATE INDEX IF NOT EXISTS idx_events_properties_gin + ON statistic.events USING GIN (properties); diff --git a/backend/migrations/2026_06_08_002_statistic_mv_daily_user_income.sql b/backend/migrations/2026_06_08_002_statistic_mv_daily_user_income.sql new file mode 100644 index 0000000..125c567 --- /dev/null +++ b/backend/migrations/2026_06_08_002_statistic_mv_daily_user_income.sql @@ -0,0 +1,24 @@ +-- statistic 服务 MV1: 每日用户水晶收益 +-- 创建时间: 2026-06-08 +-- 服务于: 七日收益曲线、今日收益 +-- 关联: spec §3.4 MV1 + +CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_daily_user_income AS +SELECT + user_id, + star_id, + DATE(received_at AT TIME ZONE 'Asia/Shanghai') AS income_date, + SUM( + CASE + WHEN event_type IN ('exhibition.revenue', 'crystal.change') + AND COALESCE((properties->>'amount')::BIGINT, 0) > 0 + THEN COALESCE((properties->>'amount')::BIGINT, 0) + ELSE 0 + END + ) AS total_crystal +FROM statistic.events +WHERE event_type IN ('exhibition.revenue', 'crystal.change') +GROUP BY user_id, star_id, income_date; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_daily_user_income_pk + ON statistic.mv_daily_user_income (user_id, star_id, income_date); diff --git a/backend/migrations/2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql b/backend/migrations/2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql new file mode 100644 index 0000000..f705dc3 --- /dev/null +++ b/backend/migrations/2026_06_08_003_statistic_mv_daily_exhibition_revenue.sql @@ -0,0 +1,19 @@ +-- statistic 服务 MV2: 每日展出收益(按藏品) +-- 创建时间: 2026-06-08 +-- 服务于: 展出收益中心(top5)、藏品矩阵 TOP5 +-- 关联: spec §3.4 MV2 + +CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_daily_exhibition_revenue AS +SELECT + user_id, + star_id, + (properties->>'asset_id')::BIGINT AS asset_id, + DATE(received_at AT TIME ZONE 'Asia/Shanghai') AS revenue_date, + SUM(COALESCE((properties->>'duration_ms')::BIGINT, 0)) AS total_duration_ms, + SUM(COALESCE((properties->>'amount')::BIGINT, 0)) AS total_earnings +FROM statistic.events +WHERE event_type = 'exhibition.revenue' +GROUP BY user_id, star_id, asset_id, revenue_date; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_exhibition_revenue_pk + ON statistic.mv_daily_exhibition_revenue (user_id, star_id, asset_id, revenue_date); diff --git a/backend/migrations/2026_06_08_004_statistic_mv_daily_like_income.sql b/backend/migrations/2026_06_08_004_statistic_mv_daily_like_income.sql new file mode 100644 index 0000000..42d188e --- /dev/null +++ b/backend/migrations/2026_06_08_004_statistic_mv_daily_like_income.sql @@ -0,0 +1,22 @@ +-- statistic 服务 MV3: 每日点赞按等级 +-- 创建时间: 2026-06-08 +-- 服务于: 点赞收益按等级(累计) +-- 关联: spec §3.4 MV3 +-- 关联依赖: public.assets 表 + +CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_daily_like_income AS +SELECT + e.user_id, + e.star_id, + a.level AS asset_level, + DATE(e.received_at AT TIME ZONE 'Asia/Shanghai') AS like_date, + COUNT(*) AS like_count, + SUM(COALESCE((e.properties->>'amount')::BIGINT, 0)) AS total_crystal +FROM statistic.events e +JOIN public.assets a + ON a.id = (e.properties->>'asset_id')::BIGINT +WHERE e.event_type = 'asset.like' +GROUP BY e.user_id, e.star_id, a.level, like_date; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_like_income_pk + ON statistic.mv_daily_like_income (user_id, star_id, asset_level, like_date); diff --git a/backend/migrations/2026_06_08_005_statistic_mv_asset_level_distribution.sql b/backend/migrations/2026_06_08_005_statistic_mv_asset_level_distribution.sql new file mode 100644 index 0000000..0bcf8a5 --- /dev/null +++ b/backend/migrations/2026_06_08_005_statistic_mv_asset_level_distribution.sql @@ -0,0 +1,18 @@ +-- statistic 服务 MV4: 藏品等级分布 +-- 创建时间: 2026-06-08 +-- 服务于: 藏品等级分布环形图 +-- 关联: spec §3.4 MV4 +-- 假设: public.assets 表有 status='active' 软删除状态 + deleted_at IS NULL 软删除约定 + +CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_asset_level_distribution AS +SELECT + user_id, + star_id, + level AS asset_level, + COUNT(*) AS asset_count +FROM public.assets +WHERE status = 'active' AND deleted_at IS NULL +GROUP BY user_id, star_id, level; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_level_dist_pk + ON statistic.mv_asset_level_distribution (user_id, star_id, asset_level); diff --git a/backend/migrations/2026_06_08_006_statistic_metric_weekly_user_income.sql b/backend/migrations/2026_06_08_006_statistic_metric_weekly_user_income.sql new file mode 100644 index 0000000..0536248 --- /dev/null +++ b/backend/migrations/2026_06_08_006_statistic_metric_weekly_user_income.sql @@ -0,0 +1,19 @@ +-- statistic 服务 metric_weekly_user_income: 本周收入 + 排名(预聚合) +-- 创建时间: 2026-06-08 +-- 服务于: GetTodayOverview 的 week_rank + week_total_users +-- 关联: spec §3.5 + +CREATE TABLE IF NOT EXISTS statistic.metric_weekly_user_income ( + star_id BIGINT NOT NULL, + user_id BIGINT NOT NULL, + week_start DATE NOT NULL, -- 周一日期(Asia/Shanghai) + total_crystal BIGINT NOT NULL DEFAULT 0, + rank_in_star INT NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + PRIMARY KEY (star_id, user_id, week_start) +); + +-- Worker 维护,查询走索引 +CREATE INDEX IF NOT EXISTS idx_metric_weekly_rank + ON statistic.metric_weekly_user_income (star_id, week_start, rank_in_star); diff --git a/backend/migrations/2026_06_08_007_statistic_metric_recent_level_ups.sql b/backend/migrations/2026_06_08_007_statistic_metric_recent_level_ups.sql new file mode 100644 index 0000000..f1b4d17 --- /dev/null +++ b/backend/migrations/2026_06_08_007_statistic_metric_recent_level_ups.sql @@ -0,0 +1,21 @@ +-- statistic 服务 metric_recent_level_ups: 最近升级记录(预聚合,保留 30 天) +-- 创建时间: 2026-06-08 +-- 服务于: GetAssetUpgradeProgress 的 recent[] +-- 关联: spec §3.5 +-- 清理: 30 天前的记录由 worker 定时 DELETE + +CREATE TABLE IF NOT EXISTS statistic.metric_recent_level_ups ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + asset_id BIGINT NOT NULL, + from_level VARCHAR(8) NOT NULL, + to_level VARCHAR(8) NOT NULL, + upgrade_time TIMESTAMPTZ NOT NULL, + asset_name VARCHAR(128), + asset_thumb VARCHAR(512) +); + +-- 看板查询主索引 +CREATE INDEX IF NOT EXISTS idx_recent_level_ups_user + ON statistic.metric_recent_level_ups (user_id, star_id, upgrade_time DESC); diff --git a/backend/migrations/2026_06_08_008_statistic_metric_upcoming_level_ups.sql b/backend/migrations/2026_06_08_008_statistic_metric_upcoming_level_ups.sql new file mode 100644 index 0000000..cd3839a --- /dev/null +++ b/backend/migrations/2026_06_08_008_statistic_metric_upcoming_level_ups.sql @@ -0,0 +1,16 @@ +-- statistic 服务 metric_upcoming_level_ups: 即将升级进度(预聚合) +-- 创建时间: 2026-06-08 +-- 服务于: GetAssetUpgradeProgress 的 upcoming[] +-- 关联: spec §3.5 +-- 维护: Worker 每 15 分钟从 public.assets + 升级阈值配置全量重算 + +CREATE TABLE IF NOT EXISTS statistic.metric_upcoming_level_ups ( + user_id BIGINT NOT NULL, + star_id BIGINT NOT NULL, + asset_id BIGINT NOT NULL, + like_progress INT NOT NULL, -- 0-100 + duration_progress INT NOT NULL, -- 0-100 + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + PRIMARY KEY (user_id, star_id, asset_id) +); diff --git a/backend/migrations/2026_06_08_009_statistic_refresh_log.sql b/backend/migrations/2026_06_08_009_statistic_refresh_log.sql new file mode 100644 index 0000000..b3de96a --- /dev/null +++ b/backend/migrations/2026_06_08_009_statistic_refresh_log.sql @@ -0,0 +1,18 @@ +-- statistic 服务 refresh_log: 物化视图刷新日志 +-- 创建时间: 2026-06-08 +-- 关联: spec §3.6 +-- 用途: 监控物化视图刷新状态 + 失败告警 + +CREATE TABLE IF NOT EXISTS statistic.refresh_log ( + id BIGSERIAL PRIMARY KEY, + mv_name VARCHAR(128) NOT NULL, + started_at TIMESTAMPTZ NOT NULL, + finished_at TIMESTAMPTZ, + row_count BIGINT, + status VARCHAR(16) NOT NULL, -- 'running' / 'success' / 'failed' + error_message TEXT +); + +-- 监控查询主索引 +CREATE INDEX IF NOT EXISTS idx_refresh_log_mv_time + ON statistic.refresh_log (mv_name, started_at DESC); diff --git a/backend/migrations/2026_06_08_010_statistic_partitions_initial.sql b/backend/migrations/2026_06_08_010_statistic_partitions_initial.sql new file mode 100644 index 0000000..22bc5b8 --- /dev/null +++ b/backend/migrations/2026_06_08_010_statistic_partitions_initial.sql @@ -0,0 +1,21 @@ +-- statistic 服务 events 表初始 7 天分区 +-- 创建时间: 2026-06-08 +-- 关联: spec §3.3 分区管理 +-- 说明: 手动运行一次或由 partitioner worker 自动运行 +-- (worker/partitioner.go 实现,自动调度见 plan Task 8) + +DO $$ +DECLARE + i INT; + d DATE; + n DATE; +BEGIN + FOR i IN 0..6 LOOP + d := CURRENT_DATE + i; + n := d + 1; + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS statistic.events_%s PARTITION OF statistic.events FOR VALUES FROM (%L) TO (%L)', + to_char(d, 'YYYY_MM_DD'), d::text, n::text + ); + END LOOP; +END $$; diff --git a/backend/services/statisticService/config/statistic_config.go b/backend/services/statisticService/config/statistic_config.go new file mode 100644 index 0000000..175cd2a --- /dev/null +++ b/backend/services/statisticService/config/statistic_config.go @@ -0,0 +1,155 @@ +package config + +import ( + "flag" + "log" + "os" + "strconv" + "time" +) + +// DatabaseConfig 数据库配置 +type DatabaseConfig struct { + Host, Password, DBName, SSLMode, TimeZone string + Port int + User string + Schema string +} + +// RedisConfig Redis 配置 +type RedisConfig struct { + URL string +} + +// RefreshIntervals 物化视图/预聚表刷新间隔 +type RefreshIntervals struct { + DailyUserIncome time.Duration + DailyExhibitionRevenue time.Duration + DailyLikeIncome time.Duration + AssetLevelDistribution time.Duration + WeeklyUserIncome time.Duration + UpcomingLevelUps time.Duration +} + +// ChannelConfig 事件 channel 配置 +type ChannelConfig struct { + EventChannelCapacity int + EventWorkerCount int + EventBatchSize int + EventBatchInterval time.Duration +} + +// PartitionConfig 分区管理配置 +type PartitionConfig struct { + RetentionDays int + PreCreateDays int +} + +// ExtensionConfig 预留扩展开关 +type ExtensionConfig struct { + EnableOLAPDualWrite bool + EnableRealtimeChannel bool + EnableSDKEndpoint bool + EnableSampling bool +} + +var ( + DBConfig = &DatabaseConfig{ + Schema: "statistic", + } + RedisCfg = &RedisConfig{ + URL: "redis://localhost:6379/0", + } + RefreshCfg = &RefreshIntervals{ + DailyUserIncome: 5 * time.Minute, + DailyExhibitionRevenue: 5 * time.Minute, + DailyLikeIncome: 5 * time.Minute, + AssetLevelDistribution: 15 * time.Minute, + WeeklyUserIncome: 5 * time.Minute, + UpcomingLevelUps: 15 * time.Minute, + } + ChannelCfg = &ChannelConfig{ + EventChannelCapacity: 1000, + EventWorkerCount: 1, + EventBatchSize: 100, + EventBatchInterval: 1 * time.Second, + } + PartitionCfg = &PartitionConfig{ + RetentionDays: 30, + PreCreateDays: 7, + } + ExtCfg = &ExtensionConfig{ // 全部默认 false(本期不启用) + EnableOLAPDualWrite: false, + EnableRealtimeChannel: false, + EnableSDKEndpoint: false, + EnableSampling: false, + } +) + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func getEnvInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return fallback +} + +func getEnvDuration(key string, fallback time.Duration) time.Duration { + if v := os.Getenv(key); v != "" { + if d, err := time.ParseDuration(v); err == nil { + return d + } + } + return fallback +} + +func getEnvBool(key string, fallback bool) bool { + if v := os.Getenv(key); v != "" { + if b, err := strconv.ParseBool(v); err == nil { + return b + } + } + return fallback +} + +// InitConfig 初始化所有配置(flag + 环境变量) +func InitConfig() { + flag.StringVar(&DBConfig.Host, "db-host", getEnv("STATISTIC_DB_HOST", "localhost"), "数据库主机") + flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("STATISTIC_DB_PORT", 5432), "数据库端口") + flag.StringVar(&DBConfig.User, "db-user", getEnv("STATISTIC_DB_USER", "postgres"), "数据库用户") + flag.StringVar(&DBConfig.Password, "db-password", getEnv("STATISTIC_DB_PASSWORD", ""), "数据库密码") + flag.StringVar(&DBConfig.DBName, "db-name", getEnv("STATISTIC_DB_NAME", "topfans"), "数据库名") + flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "数据库 SSL 模式") + flag.StringVar(&DBConfig.Schema, "db-schema", getEnv("STATISTIC_DB_SCHEMA", "statistic"), "数据库 schema") + + flag.StringVar(&RedisCfg.URL, "redis-url", getEnv("STATISTIC_REDIS_URL", "redis://localhost:6379/0"), "Redis URL") + + flag.IntVar(&ChannelCfg.EventChannelCapacity, "event-channel-capacity", getEnvInt("STATISTIC_EVENT_CHANNEL_CAPACITY", 1000), "事件 channel 容量") + flag.IntVar(&ChannelCfg.EventBatchSize, "event-batch-size", getEnvInt("STATISTIC_EVENT_BATCH_SIZE", 100), "事件批量大小") + flag.DurationVar(&ChannelCfg.EventBatchInterval, "event-batch-interval", getEnvDuration("STATISTIC_EVENT_BATCH_INTERVAL", time.Second), "事件批量间隔") + + flag.IntVar(&PartitionCfg.RetentionDays, "partition-retention-days", getEnvInt("STATISTIC_PARTITION_RETENTION_DAYS", 30), "分区保留天数") + flag.IntVar(&PartitionCfg.PreCreateDays, "partition-precreate-days", getEnvInt("STATISTIC_PARTITION_PRECREATE_DAYS", 7), "分区预创建天数") + + flag.BoolVar(&ExtCfg.EnableOLAPDualWrite, "enable-olap", getEnvBool("STATISTIC_ENABLE_OLAP_DUAL_WRITE", false), "启用 OLAP 双写") + flag.BoolVar(&ExtCfg.EnableRealtimeChannel, "enable-realtime", getEnvBool("STATISTIC_ENABLE_REALTIME_CHANNEL", false), "启用实时通道") + flag.BoolVar(&ExtCfg.EnableSDKEndpoint, "enable-sdk", getEnvBool("STATISTIC_ENABLE_SDK_ENDPOINT", false), "启用 SDK 端点") + flag.BoolVar(&ExtCfg.EnableSampling, "enable-sampling", getEnvBool("STATISTIC_ENABLE_SAMPLING", false), "启用采样") + + flag.Parse() + log.Println("statisticService 配置初始化完成") + log.Printf(" 数据库: %s:%d/%s (schema=%s)", DBConfig.Host, DBConfig.Port, DBConfig.DBName, DBConfig.Schema) + log.Printf(" Redis: %s", RedisCfg.URL) + log.Printf(" 事件 channel 容量: %d, 批量: %d/%v", ChannelCfg.EventChannelCapacity, ChannelCfg.EventBatchSize, ChannelCfg.EventBatchInterval) + log.Printf(" 分区保留: %d 天, 预创建: %d 天", PartitionCfg.RetentionDays, PartitionCfg.PreCreateDays) + log.Printf(" 扩展开关: OLAP=%v Realtime=%v SDK=%v Sampling=%v", + ExtCfg.EnableOLAPDualWrite, ExtCfg.EnableRealtimeChannel, ExtCfg.EnableSDKEndpoint, ExtCfg.EnableSampling) +} diff --git a/backend/services/statisticService/handler/healthz.go b/backend/services/statisticService/handler/healthz.go new file mode 100644 index 0000000..b786464 --- /dev/null +++ b/backend/services/statisticService/handler/healthz.go @@ -0,0 +1,56 @@ +package handler + +import ( + "context" + "database/sql" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/redis/go-redis/v9" +) + +// Healthz 健康检查 + /metrics 端点 +type Healthz struct { + db *sql.DB + redis *redis.Client +} + +// NewHealthz 构造 healthz handler +func NewHealthz(db *sql.DB, rdb *redis.Client) *Healthz { + return &Healthz{db: db, redis: rdb} +} + +// Register 在 gin engine 上注册 /healthz 和 /metrics 路由 +func (h *Healthz) Register(r *gin.Engine) { + r.GET("/healthz", h.handleHealthz) + r.GET("/metrics", gin.WrapH(promhttp.Handler())) +} + +func (h *Healthz) handleHealthz(c *gin.Context) { + ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second) + defer cancel() + + status := gin.H{"status": "ok"} + + if err := h.db.PingContext(ctx); err != nil { + status["db"] = "down" + status["db_error"] = err.Error() + } else { + status["db"] = "up" + } + + if err := h.redis.Ping(ctx).Err(); err != nil { + status["redis"] = "down" + status["redis_error"] = err.Error() + } else { + status["redis"] = "up" + } + + code := http.StatusOK + if status["db"] == "down" || status["redis"] == "down" { + code = http.StatusServiceUnavailable + } + c.JSON(code, status) +} diff --git a/backend/services/statisticService/main.go b/backend/services/statisticService/main.go index 858cdeb..09918a7 100644 --- a/backend/services/statisticService/main.go +++ b/backend/services/statisticService/main.go @@ -1,7 +1,210 @@ package main -import "fmt" +import ( + "context" + "database/sql" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + dubboclient "dubbo.apache.org/dubbo-go/v3/client" + "dubbo.apache.org/dubbo-go/v3/protocol" + dubboserver "dubbo.apache.org/dubbo-go/v3/server" + "github.com/gin-gonic/gin" + _ "github.com/lib/pq" + "github.com/redis/go-redis/v9" + "go.uber.org/zap" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/statistic" + pbUser "github.com/topfans/backend/pkg/proto/user" + "github.com/topfans/backend/services/statisticService/client" + "github.com/topfans/backend/services/statisticService/config" + "github.com/topfans/backend/services/statisticService/handler" + "github.com/topfans/backend/services/statisticService/model" + "github.com/topfans/backend/services/statisticService/provider" + "github.com/topfans/backend/services/statisticService/repository" + "github.com/topfans/backend/services/statisticService/service" + "github.com/topfans/backend/services/statisticService/sink" + "github.com/topfans/backend/services/statisticService/worker" +) + +var port = flag.Int("port", 20009, "Dubbo service port") func main() { - fmt.Println("statisticService starting...") + // 1. Init logger + env := os.Getenv("ENV") + if env == "" { + env = "development" + } + if err := logger.Init(logger.Config{ServiceName: "statistic-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil { + panic(fmt.Sprintf("Failed to init logger: %v", err)) + } + defer logger.Sync() + logger.Logger.Info("Starting statisticService...") + + // 2. Init config + config.InitConfig() + + // 3. Open database connection + dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", + config.DBConfig.Host, config.DBConfig.Port, config.DBConfig.User, config.DBConfig.Password, + config.DBConfig.DBName, config.DBConfig.SSLMode) + db, err := sql.Open("postgres", dsn) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to open DB: %v", err)) + } + db.SetMaxOpenConns(50) + if err := db.Ping(); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to ping DB: %v", err)) + } + defer db.Close() + logger.Logger.Info("Database connected") + + // 4. Open Redis connection + opt, err := redis.ParseURL(config.RedisCfg.URL) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to parse Redis URL: %v", err)) + } + rdb := redis.NewClient(opt) + defer rdb.Close() + if err := rdb.Ping(context.Background()).Err(); err != nil { + logger.Logger.Warn(fmt.Sprintf("Failed to ping Redis: %v (continuing, will retry)", err)) + } else { + logger.Logger.Info("Redis connected") + } + + // 5. 事件 channel + sink + eventCh := make(chan *model.Event, config.ChannelCfg.EventChannelCapacity) + + // 6. 启动 partitioner(自动管理 events 分区) + partitioner := worker.NewPartitioner(db, config.DBConfig.Schema, + config.PartitionCfg.PreCreateDays, config.PartitionCfg.RetentionDays) + go partitioner.Start(context.Background()) + + // 7. 构造 repository + eventRepo := repository.NewEventRepository(db, config.DBConfig.Schema) + metricRepo := repository.NewMetricRepository(db, config.DBConfig.Schema) + + // 8. 构造 sink + service + cs := sink.NewChannelEventSink(eventCh) + whiteList := service.DefaultEventTypeWhitelist + eventSvc := service.NewEventService(cs, whiteList) + + // 9. 启动 workers + flusher := worker.NewEventFlusher(eventCh, eventRepo, metricRepo, + config.ChannelCfg.EventBatchSize, config.ChannelCfg.EventBatchInterval) + go flusher.Start(context.Background()) + + weeklyW := worker.NewWeeklyUserIncomeUpdater(metricRepo, config.RefreshCfg.WeeklyUserIncome) + go weeklyW.Start(context.Background()) + + upcomingW := worker.NewUpcomingLevelUpsUpdater(metricRepo, config.RefreshCfg.UpcomingLevelUps) + go upcomingW.Start(context.Background()) + + // 10. 构造 provider + internalProvider := provider.NewStatisticInternalProvider(eventSvc) + + // 看板 service(需要 dashboard repo + cache + userService 跨服务客户端) + dashRepo := repository.NewDashboardRepository(db, config.DBConfig.Schema) + cache := service.NewCache(rdb) + + // 跨服务 userService 客户端(用于 GetTodayOverview 调 crystal_balance) + // 默认 URL: tri://localhost:20000(可被 USER_SERVICE_URL 环境变量覆盖) + userServiceURL := os.Getenv("USER_SERVICE_URL") + if userServiceURL == "" { + userServiceURL = "tri://localhost:20000" + } + userCli, err := dubboclient.NewClient(dubboclient.WithClientURL(userServiceURL)) + if err != nil { + logger.Logger.Warn(fmt.Sprintf("userService client failed (degrade): %v", err)) + } + var userRPC service.UserRPCClient + if userCli != nil { + if userSvc, e := pbUser.NewUserSocialService(userCli); e == nil { + userRPC = client.NewUserServiceClient(userSvc) + } else { + logger.Logger.Warn(fmt.Sprintf("userSocialService stub failed: %v", e)) + } + } + dashSvc := service.NewDashboardService(dashRepo, cache, userRPC) + + combinedProvider := provider.NewStatisticCombinedProvider(internalProvider, dashSvc) + + // 11. Dubbo triple server + srv, err := dubboserver.NewServer( + dubboserver.WithServerProtocol( + protocol.WithPort(*port), + protocol.WithTriple(), + ), + ) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to create Dubbo server: %v", err)) + } + if err := pb.RegisterStatisticServiceHandler(srv, combinedProvider); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to register StatisticService: %v", err)) + } + go func() { + logger.Logger.Info(fmt.Sprintf("statisticService Dubbo server starting on port %d", *port)) + if err := srv.Serve(); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Dubbo server failed: %v", err)) + } + }() + + // 12. Healthz HTTP server on port+1000=21009 + healthPort := *port + 1000 + r := gin.Default() + h := handler.NewHealthz(db, rdb) + h.Register(r) + go func() { + addr := fmt.Sprintf(":%d", healthPort) + logger.Logger.Info(fmt.Sprintf("Healthz server starting on %s", addr)) + if err := r.Run(addr); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Healthz server failed: %v", err)) + } + }() + + logger.Logger.Info(fmt.Sprintf("statisticService started, dubbo=%d healthz=%d", *port, healthPort)) + + // 13. 启动预热:避免冷启动瞬间 7 RPC 并发请求冲击 DB + go warmupCache(combinedProvider) + + // 14. Graceful shutdown + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + logger.Logger.Info("Shutting down statisticService...") + flusher.Stop() + weeklyW.Stop() + upcomingW.Stop() + partitioner.Stop() +} + +// warmupCache 启动时预热 7 个看板 RPC 缓存 +// - 用 hardcoded sample star_ids(生产可改为查 public.stars 取前 N) +// - 每个 RPC 独立 cache key,并发触发 +// - 异步 + 超时控制,不阻塞启动 +func warmupCache(p *provider.StatisticCombinedProvider) { + // 延迟启动(等服务注册完成 + 第一次 MV 刷新就绪) + time.Sleep(15 * time.Second) + + // 预热前 5 个 star(生产环境应改为查 public.stars) + sampleStarIDs := []int64{1, 2, 3, 4, 5} + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + logger.Logger.Info("warmup: starting 7 RPCs x 5 stars", zap.Int("stars", len(sampleStarIDs))) + for _, starID := range sampleStarIDs { + starID := starID + go func() { _, _ = p.GetTodayOverview(ctx, &pb.GetTodayOverviewRequest{StarId: starID}) }() + go func() { _, _ = p.Get7DayIncomeCurve(ctx, &pb.Get7DayIncomeCurveRequest{StarId: starID}) }() + go func() { _, _ = p.GetExhibitionIncomeSummary(ctx, &pb.GetExhibitionIncomeSummaryRequest{StarId: starID}) }() + go func() { _, _ = p.GetLikeIncomeByLevel(ctx, &pb.GetLikeIncomeByLevelRequest{StarId: starID}) }() + go func() { _, _ = p.GetTopAssetsByEarning(ctx, &pb.GetTopAssetsByEarningRequest{StarId: starID}) }() + go func() { _, _ = p.GetAssetLevelDistribution(ctx, &pb.GetAssetLevelDistributionRequest{StarId: starID}) }() + go func() { _, _ = p.GetAssetUpgradeProgress(ctx, &pb.GetAssetUpgradeProgressRequest{StarId: starID}) }() + } } diff --git a/backend/services/statisticService/metrics/metrics.go b/backend/services/statisticService/metrics/metrics.go new file mode 100644 index 0000000..1adfad0 --- /dev/null +++ b/backend/services/statisticService/metrics/metrics.go @@ -0,0 +1,83 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// 看板 RPC 指标 +var ( + DashboardRPCTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "dashboard_rpc_total", + Help: "Dashboard RPC total calls", + }, []string{"rpc", "status"}) + + DashboardRPCDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "dashboard_rpc_duration_seconds", + Help: "Dashboard RPC duration", + Buckets: prometheus.DefBuckets, + }, []string{"rpc"}) + + DashboardCacheHitRate = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "dashboard_cache_hit_rate", + Help: "Dashboard cache hit rate (0-1)", + }) +) + +// 事件采集指标 +var ( + EventTrackTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "event_track_total", + Help: "Event track total", + }, []string{"event_type", "result"}) // result: accepted/rejected + + EventChannelSize = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "event_channel_size", + Help: "Event channel current size", + }) + + EventChannelCapacity = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "event_channel_capacity", + Help: "Event channel capacity", + }) + + EventDroppedTotal = promauto.NewCounter(prometheus.CounterOpts{ + Name: "event_dropped_total", + Help: "Total events dropped (channel full)", + }) + + EventDBInsertTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "event_db_insert_total", + Help: "Event DB insert total", + }, []string{"status"}) // status: success/failed +) + +// 物化视图指标 +var ( + MVRefreshTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "mv_refresh_total", + Help: "Materialized view refresh total", + }, []string{"mv_name", "status"}) + + MVRefreshDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "mv_refresh_duration_seconds", + Help: "MV refresh duration", + Buckets: prometheus.DefBuckets, + }, []string{"mv_name"}) +) + +// Worker 指标 +var ( + WorkerRunningCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "worker_running_count", + Help: "Worker running count (0/1)", + }, []string{"worker_name"}) +) + +// 分区管理指标 +var ( + EventsPartitionCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "events_partition_count", + Help: "Current events partition count", + }) +)