topfans/backend/services/statisticService/main.go
2026-06-09 00:37:42 +08:00

213 lines
7.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
dubboclient "dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
"dubbo.apache.org/dubbo-go/v3/protocol"
_ "dubbo.apache.org/dubbo-go/v3/protocol/triple"
dubboserver "dubbo.apache.org/dubbo-go/v3/server"
"github.com/gin-gonic/gin"
_ "github.com/lib/pq"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
"github.com/topfans/backend/pkg/logger"
pb "github.com/topfans/backend/pkg/proto/statistic"
pbUser "github.com/topfans/backend/pkg/proto/user"
"github.com/topfans/backend/services/statisticService/client"
"github.com/topfans/backend/services/statisticService/config"
"github.com/topfans/backend/services/statisticService/handler"
"github.com/topfans/backend/services/statisticService/model"
"github.com/topfans/backend/services/statisticService/provider"
"github.com/topfans/backend/services/statisticService/repository"
"github.com/topfans/backend/services/statisticService/service"
"github.com/topfans/backend/services/statisticService/sink"
"github.com/topfans/backend/services/statisticService/worker"
)
var port = flag.Int("port", 20009, "Dubbo service port")
func main() {
// 1. Init logger
env := os.Getenv("ENV")
if env == "" {
env = "development"
}
if err := logger.Init(logger.Config{ServiceName: "statistic-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil {
panic(fmt.Sprintf("Failed to init logger: %v", err))
}
defer logger.Sync()
logger.Logger.Info("Starting statisticService...")
// 2. Init config
config.InitConfig()
// 3. Open database connection
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
config.DBConfig.Host, config.DBConfig.Port, config.DBConfig.User, config.DBConfig.Password,
config.DBConfig.DBName, config.DBConfig.SSLMode)
db, err := sql.Open("postgres", dsn)
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to open DB: %v", err))
}
db.SetMaxOpenConns(50)
if err := db.Ping(); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to ping DB: %v", err))
}
defer db.Close()
logger.Logger.Info("Database connected")
// 4. Open Redis connection
opt, err := redis.ParseURL(config.RedisCfg.URL())
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to parse Redis URL: %v", err))
}
rdb := redis.NewClient(opt)
defer rdb.Close()
if err := rdb.Ping(context.Background()).Err(); err != nil {
logger.Logger.Warn(fmt.Sprintf("Failed to ping Redis: %v (continuing, will retry)", err))
} else {
logger.Logger.Info("Redis connected")
}
// 5. 事件 channel + sink
eventCh := make(chan *model.Event, config.ChannelCfg.EventChannelCapacity)
// 6. 启动 partitioner自动管理 events 分区)
partitioner := worker.NewPartitioner(db, config.DBConfig.Schema,
config.PartitionCfg.PreCreateDays, config.PartitionCfg.RetentionDays)
go partitioner.Start(context.Background())
// 7. 构造 repository
eventRepo := repository.NewEventRepository(db, config.DBConfig.Schema)
metricRepo := repository.NewMetricRepository(db, config.DBConfig.Schema)
// 8. 构造 sink + service
cs := sink.NewChannelEventSink(eventCh)
whiteList := service.DefaultEventTypeWhitelist
eventSvc := service.NewEventService(cs, whiteList)
// 9. 启动 workers
flusher := worker.NewEventFlusher(eventCh, eventRepo, metricRepo,
config.ChannelCfg.EventBatchSize, config.ChannelCfg.EventBatchInterval)
go flusher.Start(context.Background())
weeklyW := worker.NewWeeklyUserIncomeUpdater(metricRepo, config.RefreshCfg.WeeklyUserIncome)
go weeklyW.Start(context.Background())
upcomingW := worker.NewUpcomingLevelUpsUpdater(metricRepo, config.RefreshCfg.UpcomingLevelUps)
go upcomingW.Start(context.Background())
// 10. 构造 provider
internalProvider := provider.NewStatisticInternalProvider(eventSvc)
// 看板 service需要 dashboard repo + cache + userService 跨服务客户端)
dashRepo := repository.NewDashboardRepository(db, config.DBConfig.Schema)
cache := service.NewCache(rdb)
// 跨服务 userService 客户端(用于 GetTodayOverview 调 crystal_balance
// 默认 URL: tri://localhost:20000可被 USER_SERVICE_URL 环境变量覆盖)
userServiceURL := os.Getenv("USER_SERVICE_URL")
if userServiceURL == "" {
userServiceURL = "tri://localhost:20000"
}
userCli, err := dubboclient.NewClient(dubboclient.WithClientURL(userServiceURL))
if err != nil {
logger.Logger.Warn(fmt.Sprintf("userService client failed (degrade): %v", err))
}
var userRPC service.UserRPCClient
if userCli != nil {
if userSvc, e := pbUser.NewUserSocialService(userCli); e == nil {
userRPC = client.NewUserServiceClient(userSvc)
} else {
logger.Logger.Warn(fmt.Sprintf("userSocialService stub failed: %v", e))
}
}
dashSvc := service.NewDashboardService(dashRepo, cache, userRPC)
combinedProvider := provider.NewStatisticCombinedProvider(internalProvider, dashSvc)
// 11. Dubbo triple server
srv, err := dubboserver.NewServer(
dubboserver.WithServerProtocol(
protocol.WithPort(*port),
protocol.WithTriple(),
),
)
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to create Dubbo server: %v", err))
}
if err := pb.RegisterStatisticServiceHandler(srv, combinedProvider); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to register StatisticService: %v", err))
}
go func() {
logger.Logger.Info(fmt.Sprintf("statisticService Dubbo server starting on port %d", *port))
if err := srv.Serve(); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Dubbo server failed: %v", err))
}
}()
// 12. Healthz HTTP server on port+1000=21009
healthPort := *port + 1000
r := gin.Default()
h := handler.NewHealthz(db, rdb)
h.Register(r)
go func() {
addr := fmt.Sprintf(":%d", healthPort)
logger.Logger.Info(fmt.Sprintf("Healthz server starting on %s", addr))
if err := r.Run(addr); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Healthz server failed: %v", err))
}
}()
logger.Logger.Info(fmt.Sprintf("statisticService started, dubbo=%d healthz=%d", *port, healthPort))
// 13. 启动预热:避免冷启动瞬间 7 RPC 并发请求冲击 DB
go warmupCache(combinedProvider)
// 14. Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Logger.Info("Shutting down statisticService...")
flusher.Stop()
weeklyW.Stop()
upcomingW.Stop()
partitioner.Stop()
}
// warmupCache 启动时预热 7 个看板 RPC 缓存
// - 用 hardcoded sample star_ids生产可改为查 public.stars 取前 N
// - 每个 RPC 独立 cache key并发触发
// - 异步 + 超时控制,不阻塞启动
func warmupCache(p *provider.StatisticCombinedProvider) {
// 延迟启动(等服务注册完成 + 第一次 MV 刷新就绪)
time.Sleep(15 * time.Second)
// 预热前 5 个 star生产环境应改为查 public.stars
sampleStarIDs := []int64{1, 2, 3, 4, 5}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
logger.Logger.Info("warmup: starting 7 RPCs x 5 stars", zap.Int("stars", len(sampleStarIDs)))
for _, starID := range sampleStarIDs {
starID := starID
go func() { _, _ = p.GetTodayOverview(ctx, &pb.GetTodayOverviewRequest{StarId: starID}) }()
go func() { _, _ = p.Get7DayIncomeCurve(ctx, &pb.Get7DayIncomeCurveRequest{StarId: starID}) }()
go func() { _, _ = p.GetExhibitionIncomeSummary(ctx, &pb.GetExhibitionIncomeSummaryRequest{StarId: starID}) }()
go func() { _, _ = p.GetLikeIncomeByLevel(ctx, &pb.GetLikeIncomeByLevelRequest{StarId: starID}) }()
go func() { _, _ = p.GetTopAssetsByEarning(ctx, &pb.GetTopAssetsByEarningRequest{StarId: starID}) }()
go func() { _, _ = p.GetAssetLevelDistribution(ctx, &pb.GetAssetLevelDistributionRequest{StarId: starID}) }()
go func() { _, _ = p.GetAssetUpgradeProgress(ctx, &pb.GetAssetUpgradeProgressRequest{StarId: starID}) }()
}
}