feat(statistic): T3 service skeleton (config + main + healthz + metrics + 10 SQL)

- 6 config blocks (DB/Redis/Channel/Refresh/Partition/Extension with 4 EnableXxx=false)
- 14 Prometheus metric declarations
- self-impl healthz (/healthz + /metrics) — bypasses pkg/health to keep /metrics
- main.go startup: logger → config → DB → Redis → healthz HTTP :21009
- 10 SQL migrations: events partitioned + 4 MV + 3 pre-agg + refresh_log + 7-day initial

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
zerosaturation 2026-06-08 17:15:44 +08:00
parent 182cc812ce
commit f5ece5e1d2
14 changed files with 714 additions and 2 deletions

View File

@ -0,0 +1,37 @@
-- statistic 服务 events 原始表
-- 创建时间: 2026-06-08
-- 说明: 事件原始表,按 received_at 按日分区
-- 关联: spec §3.2
-- 1. 创建 schema
CREATE SCHEMA IF NOT EXISTS statistic;
-- 2. 创建 events 主表(按 received_at 按日分区)
CREATE TABLE IF NOT EXISTS statistic.events (
id BIGSERIAL,
event_id UUID NOT NULL,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
event_type VARCHAR(64) NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
properties JSONB NOT NULL DEFAULT '{}',
PRIMARY KEY (id, received_at)
) PARTITION BY RANGE (received_at);
-- 3. 唯一约束(去重):同一 event_id 不能重复
CREATE UNIQUE INDEX IF NOT EXISTS idx_events_event_id
ON statistic.events (event_id, received_at);
-- 4. 看板查询主索引(覆盖 90% 查询)
CREATE INDEX IF NOT EXISTS idx_events_user_star_type_time
ON statistic.events (user_id, star_id, event_type, received_at DESC);
-- 5. 趋势分析索引
CREATE INDEX IF NOT EXISTS idx_events_star_type_time
ON statistic.events (star_id, event_type, received_at DESC);
-- 6. JSONB 属性 GIN 索引
CREATE INDEX IF NOT EXISTS idx_events_properties_gin
ON statistic.events USING GIN (properties);

View File

@ -0,0 +1,24 @@
-- statistic 服务 MV1: 每日用户水晶收益
-- 创建时间: 2026-06-08
-- 服务于: 七日收益曲线、今日收益
-- 关联: spec §3.4 MV1
CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_daily_user_income AS
SELECT
user_id,
star_id,
DATE(received_at AT TIME ZONE 'Asia/Shanghai') AS income_date,
SUM(
CASE
WHEN event_type IN ('exhibition.revenue', 'crystal.change')
AND COALESCE((properties->>'amount')::BIGINT, 0) > 0
THEN COALESCE((properties->>'amount')::BIGINT, 0)
ELSE 0
END
) AS total_crystal
FROM statistic.events
WHERE event_type IN ('exhibition.revenue', 'crystal.change')
GROUP BY user_id, star_id, income_date;
CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_daily_user_income_pk
ON statistic.mv_daily_user_income (user_id, star_id, income_date);

View File

@ -0,0 +1,19 @@
-- statistic 服务 MV2: 每日展出收益(按藏品)
-- 创建时间: 2026-06-08
-- 服务于: 展出收益中心top5、藏品矩阵 TOP5
-- 关联: spec §3.4 MV2
CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_daily_exhibition_revenue AS
SELECT
user_id,
star_id,
(properties->>'asset_id')::BIGINT AS asset_id,
DATE(received_at AT TIME ZONE 'Asia/Shanghai') AS revenue_date,
SUM(COALESCE((properties->>'duration_ms')::BIGINT, 0)) AS total_duration_ms,
SUM(COALESCE((properties->>'amount')::BIGINT, 0)) AS total_earnings
FROM statistic.events
WHERE event_type = 'exhibition.revenue'
GROUP BY user_id, star_id, asset_id, revenue_date;
CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_exhibition_revenue_pk
ON statistic.mv_daily_exhibition_revenue (user_id, star_id, asset_id, revenue_date);

View File

@ -0,0 +1,22 @@
-- statistic 服务 MV3: 每日点赞按等级
-- 创建时间: 2026-06-08
-- 服务于: 点赞收益按等级(累计)
-- 关联: spec §3.4 MV3
-- 关联依赖: public.assets 表
CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_daily_like_income AS
SELECT
e.user_id,
e.star_id,
a.level AS asset_level,
DATE(e.received_at AT TIME ZONE 'Asia/Shanghai') AS like_date,
COUNT(*) AS like_count,
SUM(COALESCE((e.properties->>'amount')::BIGINT, 0)) AS total_crystal
FROM statistic.events e
JOIN public.assets a
ON a.id = (e.properties->>'asset_id')::BIGINT
WHERE e.event_type = 'asset.like'
GROUP BY e.user_id, e.star_id, a.level, like_date;
CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_like_income_pk
ON statistic.mv_daily_like_income (user_id, star_id, asset_level, like_date);

View File

@ -0,0 +1,18 @@
-- statistic 服务 MV4: 藏品等级分布
-- 创建时间: 2026-06-08
-- 服务于: 藏品等级分布环形图
-- 关联: spec §3.4 MV4
-- 假设: public.assets 表有 status='active' 软删除状态 + deleted_at IS NULL 软删除约定
CREATE MATERIALIZED VIEW IF NOT EXISTS statistic.mv_asset_level_distribution AS
SELECT
user_id,
star_id,
level AS asset_level,
COUNT(*) AS asset_count
FROM public.assets
WHERE status = 'active' AND deleted_at IS NULL
GROUP BY user_id, star_id, level;
CREATE UNIQUE INDEX IF NOT EXISTS idx_mv_level_dist_pk
ON statistic.mv_asset_level_distribution (user_id, star_id, asset_level);

View File

@ -0,0 +1,19 @@
-- statistic 服务 metric_weekly_user_income: 本周收入 + 排名(预聚合)
-- 创建时间: 2026-06-08
-- 服务于: GetTodayOverview 的 week_rank + week_total_users
-- 关联: spec §3.5
CREATE TABLE IF NOT EXISTS statistic.metric_weekly_user_income (
star_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
week_start DATE NOT NULL, -- 周一日期Asia/Shanghai
total_crystal BIGINT NOT NULL DEFAULT 0,
rank_in_star INT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (star_id, user_id, week_start)
);
-- Worker 维护,查询走索引
CREATE INDEX IF NOT EXISTS idx_metric_weekly_rank
ON statistic.metric_weekly_user_income (star_id, week_start, rank_in_star);

View File

@ -0,0 +1,21 @@
-- statistic 服务 metric_recent_level_ups: 最近升级记录(预聚合,保留 30 天)
-- 创建时间: 2026-06-08
-- 服务于: GetAssetUpgradeProgress 的 recent[]
-- 关联: spec §3.5
-- 清理: 30 天前的记录由 worker 定时 DELETE
CREATE TABLE IF NOT EXISTS statistic.metric_recent_level_ups (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
asset_id BIGINT NOT NULL,
from_level VARCHAR(8) NOT NULL,
to_level VARCHAR(8) NOT NULL,
upgrade_time TIMESTAMPTZ NOT NULL,
asset_name VARCHAR(128),
asset_thumb VARCHAR(512)
);
-- 看板查询主索引
CREATE INDEX IF NOT EXISTS idx_recent_level_ups_user
ON statistic.metric_recent_level_ups (user_id, star_id, upgrade_time DESC);

View File

@ -0,0 +1,16 @@
-- statistic 服务 metric_upcoming_level_ups: 即将升级进度(预聚合)
-- 创建时间: 2026-06-08
-- 服务于: GetAssetUpgradeProgress 的 upcoming[]
-- 关联: spec §3.5
-- 维护: Worker 每 15 分钟从 public.assets + 升级阈值配置全量重算
CREATE TABLE IF NOT EXISTS statistic.metric_upcoming_level_ups (
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
asset_id BIGINT NOT NULL,
like_progress INT NOT NULL, -- 0-100
duration_progress INT NOT NULL, -- 0-100
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (user_id, star_id, asset_id)
);

View File

@ -0,0 +1,18 @@
-- statistic 服务 refresh_log: 物化视图刷新日志
-- 创建时间: 2026-06-08
-- 关联: spec §3.6
-- 用途: 监控物化视图刷新状态 + 失败告警
CREATE TABLE IF NOT EXISTS statistic.refresh_log (
id BIGSERIAL PRIMARY KEY,
mv_name VARCHAR(128) NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ,
row_count BIGINT,
status VARCHAR(16) NOT NULL, -- 'running' / 'success' / 'failed'
error_message TEXT
);
-- 监控查询主索引
CREATE INDEX IF NOT EXISTS idx_refresh_log_mv_time
ON statistic.refresh_log (mv_name, started_at DESC);

View File

@ -0,0 +1,21 @@
-- statistic 服务 events 表初始 7 天分区
-- 创建时间: 2026-06-08
-- 关联: spec §3.3 分区管理
-- 说明: 手动运行一次或由 partitioner worker 自动运行
-- worker/partitioner.go 实现,自动调度见 plan Task 8
DO $$
DECLARE
i INT;
d DATE;
n DATE;
BEGIN
FOR i IN 0..6 LOOP
d := CURRENT_DATE + i;
n := d + 1;
EXECUTE format(
'CREATE TABLE IF NOT EXISTS statistic.events_%s PARTITION OF statistic.events FOR VALUES FROM (%L) TO (%L)',
to_char(d, 'YYYY_MM_DD'), d::text, n::text
);
END LOOP;
END $$;

View File

@ -0,0 +1,155 @@
package config
import (
"flag"
"log"
"os"
"strconv"
"time"
)
// DatabaseConfig 数据库配置
type DatabaseConfig struct {
Host, Password, DBName, SSLMode, TimeZone string
Port int
User string
Schema string
}
// RedisConfig Redis 配置
type RedisConfig struct {
URL string
}
// RefreshIntervals 物化视图/预聚表刷新间隔
type RefreshIntervals struct {
DailyUserIncome time.Duration
DailyExhibitionRevenue time.Duration
DailyLikeIncome time.Duration
AssetLevelDistribution time.Duration
WeeklyUserIncome time.Duration
UpcomingLevelUps time.Duration
}
// ChannelConfig 事件 channel 配置
type ChannelConfig struct {
EventChannelCapacity int
EventWorkerCount int
EventBatchSize int
EventBatchInterval time.Duration
}
// PartitionConfig 分区管理配置
type PartitionConfig struct {
RetentionDays int
PreCreateDays int
}
// ExtensionConfig 预留扩展开关
type ExtensionConfig struct {
EnableOLAPDualWrite bool
EnableRealtimeChannel bool
EnableSDKEndpoint bool
EnableSampling bool
}
var (
DBConfig = &DatabaseConfig{
Schema: "statistic",
}
RedisCfg = &RedisConfig{
URL: "redis://localhost:6379/0",
}
RefreshCfg = &RefreshIntervals{
DailyUserIncome: 5 * time.Minute,
DailyExhibitionRevenue: 5 * time.Minute,
DailyLikeIncome: 5 * time.Minute,
AssetLevelDistribution: 15 * time.Minute,
WeeklyUserIncome: 5 * time.Minute,
UpcomingLevelUps: 15 * time.Minute,
}
ChannelCfg = &ChannelConfig{
EventChannelCapacity: 1000,
EventWorkerCount: 1,
EventBatchSize: 100,
EventBatchInterval: 1 * time.Second,
}
PartitionCfg = &PartitionConfig{
RetentionDays: 30,
PreCreateDays: 7,
}
ExtCfg = &ExtensionConfig{ // 全部默认 false本期不启用
EnableOLAPDualWrite: false,
EnableRealtimeChannel: false,
EnableSDKEndpoint: false,
EnableSampling: false,
}
)
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func getEnvInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return fallback
}
func getEnvDuration(key string, fallback time.Duration) time.Duration {
if v := os.Getenv(key); v != "" {
if d, err := time.ParseDuration(v); err == nil {
return d
}
}
return fallback
}
func getEnvBool(key string, fallback bool) bool {
if v := os.Getenv(key); v != "" {
if b, err := strconv.ParseBool(v); err == nil {
return b
}
}
return fallback
}
// InitConfig 初始化所有配置flag + 环境变量)
func InitConfig() {
flag.StringVar(&DBConfig.Host, "db-host", getEnv("STATISTIC_DB_HOST", "localhost"), "数据库主机")
flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("STATISTIC_DB_PORT", 5432), "数据库端口")
flag.StringVar(&DBConfig.User, "db-user", getEnv("STATISTIC_DB_USER", "postgres"), "数据库用户")
flag.StringVar(&DBConfig.Password, "db-password", getEnv("STATISTIC_DB_PASSWORD", ""), "数据库密码")
flag.StringVar(&DBConfig.DBName, "db-name", getEnv("STATISTIC_DB_NAME", "topfans"), "数据库名")
flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "数据库 SSL 模式")
flag.StringVar(&DBConfig.Schema, "db-schema", getEnv("STATISTIC_DB_SCHEMA", "statistic"), "数据库 schema")
flag.StringVar(&RedisCfg.URL, "redis-url", getEnv("STATISTIC_REDIS_URL", "redis://localhost:6379/0"), "Redis URL")
flag.IntVar(&ChannelCfg.EventChannelCapacity, "event-channel-capacity", getEnvInt("STATISTIC_EVENT_CHANNEL_CAPACITY", 1000), "事件 channel 容量")
flag.IntVar(&ChannelCfg.EventBatchSize, "event-batch-size", getEnvInt("STATISTIC_EVENT_BATCH_SIZE", 100), "事件批量大小")
flag.DurationVar(&ChannelCfg.EventBatchInterval, "event-batch-interval", getEnvDuration("STATISTIC_EVENT_BATCH_INTERVAL", time.Second), "事件批量间隔")
flag.IntVar(&PartitionCfg.RetentionDays, "partition-retention-days", getEnvInt("STATISTIC_PARTITION_RETENTION_DAYS", 30), "分区保留天数")
flag.IntVar(&PartitionCfg.PreCreateDays, "partition-precreate-days", getEnvInt("STATISTIC_PARTITION_PRECREATE_DAYS", 7), "分区预创建天数")
flag.BoolVar(&ExtCfg.EnableOLAPDualWrite, "enable-olap", getEnvBool("STATISTIC_ENABLE_OLAP_DUAL_WRITE", false), "启用 OLAP 双写")
flag.BoolVar(&ExtCfg.EnableRealtimeChannel, "enable-realtime", getEnvBool("STATISTIC_ENABLE_REALTIME_CHANNEL", false), "启用实时通道")
flag.BoolVar(&ExtCfg.EnableSDKEndpoint, "enable-sdk", getEnvBool("STATISTIC_ENABLE_SDK_ENDPOINT", false), "启用 SDK 端点")
flag.BoolVar(&ExtCfg.EnableSampling, "enable-sampling", getEnvBool("STATISTIC_ENABLE_SAMPLING", false), "启用采样")
flag.Parse()
log.Println("statisticService 配置初始化完成")
log.Printf(" 数据库: %s:%d/%s (schema=%s)", DBConfig.Host, DBConfig.Port, DBConfig.DBName, DBConfig.Schema)
log.Printf(" Redis: %s", RedisCfg.URL)
log.Printf(" 事件 channel 容量: %d, 批量: %d/%v", ChannelCfg.EventChannelCapacity, ChannelCfg.EventBatchSize, ChannelCfg.EventBatchInterval)
log.Printf(" 分区保留: %d 天, 预创建: %d 天", PartitionCfg.RetentionDays, PartitionCfg.PreCreateDays)
log.Printf(" 扩展开关: OLAP=%v Realtime=%v SDK=%v Sampling=%v",
ExtCfg.EnableOLAPDualWrite, ExtCfg.EnableRealtimeChannel, ExtCfg.EnableSDKEndpoint, ExtCfg.EnableSampling)
}

View File

@ -0,0 +1,56 @@
package handler
import (
"context"
"database/sql"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/go-redis/v9"
)
// Healthz 健康检查 + /metrics 端点
type Healthz struct {
db *sql.DB
redis *redis.Client
}
// NewHealthz 构造 healthz handler
func NewHealthz(db *sql.DB, rdb *redis.Client) *Healthz {
return &Healthz{db: db, redis: rdb}
}
// Register 在 gin engine 上注册 /healthz 和 /metrics 路由
func (h *Healthz) Register(r *gin.Engine) {
r.GET("/healthz", h.handleHealthz)
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
}
func (h *Healthz) handleHealthz(c *gin.Context) {
ctx, cancel := context.WithTimeout(c.Request.Context(), 1*time.Second)
defer cancel()
status := gin.H{"status": "ok"}
if err := h.db.PingContext(ctx); err != nil {
status["db"] = "down"
status["db_error"] = err.Error()
} else {
status["db"] = "up"
}
if err := h.redis.Ping(ctx).Err(); err != nil {
status["redis"] = "down"
status["redis_error"] = err.Error()
} else {
status["redis"] = "up"
}
code := http.StatusOK
if status["db"] == "down" || status["redis"] == "down" {
code = http.StatusServiceUnavailable
}
c.JSON(code, status)
}

View File

@ -1,7 +1,210 @@
package main
import "fmt"
import (
"context"
"database/sql"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
dubboclient "dubbo.apache.org/dubbo-go/v3/client"
"dubbo.apache.org/dubbo-go/v3/protocol"
dubboserver "dubbo.apache.org/dubbo-go/v3/server"
"github.com/gin-gonic/gin"
_ "github.com/lib/pq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
pb "github.com/topfans/backend/pkg/proto/statistic"
pbUser "github.com/topfans/backend/pkg/proto/user"
"github.com/topfans/backend/services/statisticService/client"
"github.com/topfans/backend/services/statisticService/config"
"github.com/topfans/backend/services/statisticService/handler"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/provider"
"github.com/topfans/backend/services/statisticService/repository"
"github.com/topfans/backend/services/statisticService/service"
"github.com/topfans/backend/services/statisticService/sink"
"github.com/topfans/backend/services/statisticService/worker"
)
var port = flag.Int("port", 20009, "Dubbo service port")
func main() {
fmt.Println("statisticService starting...")
// 1. Init logger
env := os.Getenv("ENV")
if env == "" {
env = "development"
}
if err := logger.Init(logger.Config{ServiceName: "statistic-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil {
panic(fmt.Sprintf("Failed to init logger: %v", err))
}
defer logger.Sync()
logger.Logger.Info("Starting statisticService...")
// 2. Init config
config.InitConfig()
// 3. Open database connection
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
config.DBConfig.Host, config.DBConfig.Port, config.DBConfig.User, config.DBConfig.Password,
config.DBConfig.DBName, config.DBConfig.SSLMode)
db, err := sql.Open("postgres", dsn)
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to open DB: %v", err))
}
db.SetMaxOpenConns(50)
if err := db.Ping(); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to ping DB: %v", err))
}
defer db.Close()
logger.Logger.Info("Database connected")
// 4. Open Redis connection
opt, err := redis.ParseURL(config.RedisCfg.URL)
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to parse Redis URL: %v", err))
}
rdb := redis.NewClient(opt)
defer rdb.Close()
if err := rdb.Ping(context.Background()).Err(); err != nil {
logger.Logger.Warn(fmt.Sprintf("Failed to ping Redis: %v (continuing, will retry)", err))
} else {
logger.Logger.Info("Redis connected")
}
// 5. 事件 channel + sink
eventCh := make(chan *model.Event, config.ChannelCfg.EventChannelCapacity)
// 6. 启动 partitioner自动管理 events 分区)
partitioner := worker.NewPartitioner(db, config.DBConfig.Schema,
config.PartitionCfg.PreCreateDays, config.PartitionCfg.RetentionDays)
go partitioner.Start(context.Background())
// 7. 构造 repository
eventRepo := repository.NewEventRepository(db, config.DBConfig.Schema)
metricRepo := repository.NewMetricRepository(db, config.DBConfig.Schema)
// 8. 构造 sink + service
cs := sink.NewChannelEventSink(eventCh)
whiteList := service.DefaultEventTypeWhitelist
eventSvc := service.NewEventService(cs, whiteList)
// 9. 启动 workers
flusher := worker.NewEventFlusher(eventCh, eventRepo, metricRepo,
config.ChannelCfg.EventBatchSize, config.ChannelCfg.EventBatchInterval)
go flusher.Start(context.Background())
weeklyW := worker.NewWeeklyUserIncomeUpdater(metricRepo, config.RefreshCfg.WeeklyUserIncome)
go weeklyW.Start(context.Background())
upcomingW := worker.NewUpcomingLevelUpsUpdater(metricRepo, config.RefreshCfg.UpcomingLevelUps)
go upcomingW.Start(context.Background())
// 10. 构造 provider
internalProvider := provider.NewStatisticInternalProvider(eventSvc)
// 看板 service需要 dashboard repo + cache + userService 跨服务客户端)
dashRepo := repository.NewDashboardRepository(db, config.DBConfig.Schema)
cache := service.NewCache(rdb)
// 跨服务 userService 客户端(用于 GetTodayOverview 调 crystal_balance
// 默认 URL: tri://localhost:20000可被 USER_SERVICE_URL 环境变量覆盖)
userServiceURL := os.Getenv("USER_SERVICE_URL")
if userServiceURL == "" {
userServiceURL = "tri://localhost:20000"
}
userCli, err := dubboclient.NewClient(dubboclient.WithClientURL(userServiceURL))
if err != nil {
logger.Logger.Warn(fmt.Sprintf("userService client failed (degrade): %v", err))
}
var userRPC service.UserRPCClient
if userCli != nil {
if userSvc, e := pbUser.NewUserSocialService(userCli); e == nil {
userRPC = client.NewUserServiceClient(userSvc)
} else {
logger.Logger.Warn(fmt.Sprintf("userSocialService stub failed: %v", e))
}
}
dashSvc := service.NewDashboardService(dashRepo, cache, userRPC)
combinedProvider := provider.NewStatisticCombinedProvider(internalProvider, dashSvc)
// 11. Dubbo triple server
srv, err := dubboserver.NewServer(
dubboserver.WithServerProtocol(
protocol.WithPort(*port),
protocol.WithTriple(),
),
)
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to create Dubbo server: %v", err))
}
if err := pb.RegisterStatisticServiceHandler(srv, combinedProvider); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to register StatisticService: %v", err))
}
go func() {
logger.Logger.Info(fmt.Sprintf("statisticService Dubbo server starting on port %d", *port))
if err := srv.Serve(); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Dubbo server failed: %v", err))
}
}()
// 12. Healthz HTTP server on port+1000=21009
healthPort := *port + 1000
r := gin.Default()
h := handler.NewHealthz(db, rdb)
h.Register(r)
go func() {
addr := fmt.Sprintf(":%d", healthPort)
logger.Logger.Info(fmt.Sprintf("Healthz server starting on %s", addr))
if err := r.Run(addr); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Healthz server failed: %v", err))
}
}()
logger.Logger.Info(fmt.Sprintf("statisticService started, dubbo=%d healthz=%d", *port, healthPort))
// 13. 启动预热:避免冷启动瞬间 7 RPC 并发请求冲击 DB
go warmupCache(combinedProvider)
// 14. Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Logger.Info("Shutting down statisticService...")
flusher.Stop()
weeklyW.Stop()
upcomingW.Stop()
partitioner.Stop()
}
// warmupCache 启动时预热 7 个看板 RPC 缓存
// - 用 hardcoded sample star_ids生产可改为查 public.stars 取前 N
// - 每个 RPC 独立 cache key并发触发
// - 异步 + 超时控制,不阻塞启动
func warmupCache(p *provider.StatisticCombinedProvider) {
// 延迟启动(等服务注册完成 + 第一次 MV 刷新就绪)
time.Sleep(15 * time.Second)
// 预热前 5 个 star生产环境应改为查 public.stars
sampleStarIDs := []int64{1, 2, 3, 4, 5}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
logger.Logger.Info("warmup: starting 7 RPCs x 5 stars", zap.Int("stars", len(sampleStarIDs)))
for _, starID := range sampleStarIDs {
starID := starID
go func() { _, _ = p.GetTodayOverview(ctx, &pb.GetTodayOverviewRequest{StarId: starID}) }()
go func() { _, _ = p.Get7DayIncomeCurve(ctx, &pb.Get7DayIncomeCurveRequest{StarId: starID}) }()
go func() { _, _ = p.GetExhibitionIncomeSummary(ctx, &pb.GetExhibitionIncomeSummaryRequest{StarId: starID}) }()
go func() { _, _ = p.GetLikeIncomeByLevel(ctx, &pb.GetLikeIncomeByLevelRequest{StarId: starID}) }()
go func() { _, _ = p.GetTopAssetsByEarning(ctx, &pb.GetTopAssetsByEarningRequest{StarId: starID}) }()
go func() { _, _ = p.GetAssetLevelDistribution(ctx, &pb.GetAssetLevelDistributionRequest{StarId: starID}) }()
go func() { _, _ = p.GetAssetUpgradeProgress(ctx, &pb.GetAssetUpgradeProgressRequest{StarId: starID}) }()
}
}

View File

@ -0,0 +1,83 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// 看板 RPC 指标
var (
DashboardRPCTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "dashboard_rpc_total",
Help: "Dashboard RPC total calls",
}, []string{"rpc", "status"})
DashboardRPCDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "dashboard_rpc_duration_seconds",
Help: "Dashboard RPC duration",
Buckets: prometheus.DefBuckets,
}, []string{"rpc"})
DashboardCacheHitRate = promauto.NewGauge(prometheus.GaugeOpts{
Name: "dashboard_cache_hit_rate",
Help: "Dashboard cache hit rate (0-1)",
})
)
// 事件采集指标
var (
EventTrackTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "event_track_total",
Help: "Event track total",
}, []string{"event_type", "result"}) // result: accepted/rejected
EventChannelSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "event_channel_size",
Help: "Event channel current size",
})
EventChannelCapacity = promauto.NewGauge(prometheus.GaugeOpts{
Name: "event_channel_capacity",
Help: "Event channel capacity",
})
EventDroppedTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "event_dropped_total",
Help: "Total events dropped (channel full)",
})
EventDBInsertTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "event_db_insert_total",
Help: "Event DB insert total",
}, []string{"status"}) // status: success/failed
)
// 物化视图指标
var (
MVRefreshTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mv_refresh_total",
Help: "Materialized view refresh total",
}, []string{"mv_name", "status"})
MVRefreshDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "mv_refresh_duration_seconds",
Help: "MV refresh duration",
Buckets: prometheus.DefBuckets,
}, []string{"mv_name"})
)
// Worker 指标
var (
WorkerRunningCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "worker_running_count",
Help: "Worker running count (0/1)",
}, []string{"worker_name"})
)
// 分区管理指标
var (
EventsPartitionCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "events_partition_count",
Help: "Current events partition count",
})
)