213 lines
7.8 KiB
Go
213 lines
7.8 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"flag"
|
||
"fmt"
|
||
"os"
|
||
"os/signal"
|
||
"syscall"
|
||
"time"
|
||
|
||
dubboclient "dubbo.apache.org/dubbo-go/v3/client"
|
||
_ "dubbo.apache.org/dubbo-go/v3/imports"
|
||
"dubbo.apache.org/dubbo-go/v3/protocol"
|
||
_ "dubbo.apache.org/dubbo-go/v3/protocol/triple"
|
||
dubboserver "dubbo.apache.org/dubbo-go/v3/server"
|
||
"github.com/gin-gonic/gin"
|
||
_ "github.com/lib/pq"
|
||
"github.com/redis/go-redis/v9"
|
||
"go.uber.org/zap"
|
||
|
||
"github.com/topfans/backend/pkg/logger"
|
||
pb "github.com/topfans/backend/pkg/proto/statistic"
|
||
pbUser "github.com/topfans/backend/pkg/proto/user"
|
||
"github.com/topfans/backend/services/statisticService/client"
|
||
"github.com/topfans/backend/services/statisticService/config"
|
||
"github.com/topfans/backend/services/statisticService/handler"
|
||
"github.com/topfans/backend/services/statisticService/model"
|
||
"github.com/topfans/backend/services/statisticService/provider"
|
||
"github.com/topfans/backend/services/statisticService/repository"
|
||
"github.com/topfans/backend/services/statisticService/service"
|
||
"github.com/topfans/backend/services/statisticService/sink"
|
||
"github.com/topfans/backend/services/statisticService/worker"
|
||
)
|
||
|
||
var port = flag.Int("port", 20009, "Dubbo service port")
|
||
|
||
func main() {
|
||
// 1. Init logger
|
||
env := os.Getenv("ENV")
|
||
if env == "" {
|
||
env = "development"
|
||
}
|
||
if err := logger.Init(logger.Config{ServiceName: "statistic-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil {
|
||
panic(fmt.Sprintf("Failed to init logger: %v", err))
|
||
}
|
||
defer logger.Sync()
|
||
logger.Logger.Info("Starting statisticService...")
|
||
|
||
// 2. Init config
|
||
config.InitConfig()
|
||
|
||
// 3. Open database connection
|
||
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
|
||
config.DBConfig.Host, config.DBConfig.Port, config.DBConfig.User, config.DBConfig.Password,
|
||
config.DBConfig.DBName, config.DBConfig.SSLMode)
|
||
db, err := sql.Open("postgres", dsn)
|
||
if err != nil {
|
||
logger.Logger.Fatal(fmt.Sprintf("Failed to open DB: %v", err))
|
||
}
|
||
db.SetMaxOpenConns(50)
|
||
if err := db.Ping(); err != nil {
|
||
logger.Logger.Fatal(fmt.Sprintf("Failed to ping DB: %v", err))
|
||
}
|
||
defer db.Close()
|
||
logger.Logger.Info("Database connected")
|
||
|
||
// 4. Open Redis connection
|
||
opt, err := redis.ParseURL(config.RedisCfg.URL())
|
||
if err != nil {
|
||
logger.Logger.Fatal(fmt.Sprintf("Failed to parse Redis URL: %v", err))
|
||
}
|
||
rdb := redis.NewClient(opt)
|
||
defer rdb.Close()
|
||
if err := rdb.Ping(context.Background()).Err(); err != nil {
|
||
logger.Logger.Warn(fmt.Sprintf("Failed to ping Redis: %v (continuing, will retry)", err))
|
||
} else {
|
||
logger.Logger.Info("Redis connected")
|
||
}
|
||
|
||
// 5. 事件 channel + sink
|
||
eventCh := make(chan *model.Event, config.ChannelCfg.EventChannelCapacity)
|
||
|
||
// 6. 启动 partitioner(自动管理 events 分区)
|
||
partitioner := worker.NewPartitioner(db, config.DBConfig.Schema,
|
||
config.PartitionCfg.PreCreateDays, config.PartitionCfg.RetentionDays)
|
||
go partitioner.Start(context.Background())
|
||
|
||
// 7. 构造 repository
|
||
eventRepo := repository.NewEventRepository(db, config.DBConfig.Schema)
|
||
metricRepo := repository.NewMetricRepository(db, config.DBConfig.Schema)
|
||
|
||
// 8. 构造 sink + service
|
||
cs := sink.NewChannelEventSink(eventCh)
|
||
whiteList := service.DefaultEventTypeWhitelist
|
||
eventSvc := service.NewEventService(cs, whiteList)
|
||
|
||
// 9. 启动 workers
|
||
flusher := worker.NewEventFlusher(eventCh, eventRepo, metricRepo,
|
||
config.ChannelCfg.EventBatchSize, config.ChannelCfg.EventBatchInterval)
|
||
go flusher.Start(context.Background())
|
||
|
||
weeklyW := worker.NewWeeklyUserIncomeUpdater(metricRepo, config.RefreshCfg.WeeklyUserIncome)
|
||
go weeklyW.Start(context.Background())
|
||
|
||
upcomingW := worker.NewUpcomingLevelUpsUpdater(metricRepo, config.RefreshCfg.UpcomingLevelUps)
|
||
go upcomingW.Start(context.Background())
|
||
|
||
// 10. 构造 provider
|
||
internalProvider := provider.NewStatisticInternalProvider(eventSvc)
|
||
|
||
// 看板 service(需要 dashboard repo + cache + userService 跨服务客户端)
|
||
dashRepo := repository.NewDashboardRepository(db, config.DBConfig.Schema)
|
||
cache := service.NewCache(rdb)
|
||
|
||
// 跨服务 userService 客户端(用于 GetTodayOverview 调 crystal_balance)
|
||
// 默认 URL: tri://localhost:20000(可被 USER_SERVICE_URL 环境变量覆盖)
|
||
userServiceURL := os.Getenv("USER_SERVICE_URL")
|
||
if userServiceURL == "" {
|
||
userServiceURL = "tri://localhost:20000"
|
||
}
|
||
userCli, err := dubboclient.NewClient(dubboclient.WithClientURL(userServiceURL))
|
||
if err != nil {
|
||
logger.Logger.Warn(fmt.Sprintf("userService client failed (degrade): %v", err))
|
||
}
|
||
var userRPC service.UserRPCClient
|
||
if userCli != nil {
|
||
if userSvc, e := pbUser.NewUserSocialService(userCli); e == nil {
|
||
userRPC = client.NewUserServiceClient(userSvc)
|
||
} else {
|
||
logger.Logger.Warn(fmt.Sprintf("userSocialService stub failed: %v", e))
|
||
}
|
||
}
|
||
dashSvc := service.NewDashboardService(dashRepo, cache, userRPC)
|
||
|
||
combinedProvider := provider.NewStatisticCombinedProvider(internalProvider, dashSvc)
|
||
|
||
// 11. Dubbo triple server
|
||
srv, err := dubboserver.NewServer(
|
||
dubboserver.WithServerProtocol(
|
||
protocol.WithPort(*port),
|
||
protocol.WithTriple(),
|
||
),
|
||
)
|
||
if err != nil {
|
||
logger.Logger.Fatal(fmt.Sprintf("Failed to create Dubbo server: %v", err))
|
||
}
|
||
if err := pb.RegisterStatisticServiceHandler(srv, combinedProvider); err != nil {
|
||
logger.Logger.Fatal(fmt.Sprintf("Failed to register StatisticService: %v", err))
|
||
}
|
||
go func() {
|
||
logger.Logger.Info(fmt.Sprintf("statisticService Dubbo server starting on port %d", *port))
|
||
if err := srv.Serve(); err != nil {
|
||
logger.Logger.Fatal(fmt.Sprintf("Dubbo server failed: %v", err))
|
||
}
|
||
}()
|
||
|
||
// 12. Healthz HTTP server on port+1000=21009
|
||
healthPort := *port + 1000
|
||
r := gin.Default()
|
||
h := handler.NewHealthz(db, rdb)
|
||
h.Register(r)
|
||
go func() {
|
||
addr := fmt.Sprintf(":%d", healthPort)
|
||
logger.Logger.Info(fmt.Sprintf("Healthz server starting on %s", addr))
|
||
if err := r.Run(addr); err != nil {
|
||
logger.Logger.Fatal(fmt.Sprintf("Healthz server failed: %v", err))
|
||
}
|
||
}()
|
||
|
||
logger.Logger.Info(fmt.Sprintf("statisticService started, dubbo=%d healthz=%d", *port, healthPort))
|
||
|
||
// 13. 启动预热:避免冷启动瞬间 7 RPC 并发请求冲击 DB
|
||
go warmupCache(combinedProvider)
|
||
|
||
// 14. Graceful shutdown
|
||
quit := make(chan os.Signal, 1)
|
||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||
<-quit
|
||
logger.Logger.Info("Shutting down statisticService...")
|
||
flusher.Stop()
|
||
weeklyW.Stop()
|
||
upcomingW.Stop()
|
||
partitioner.Stop()
|
||
}
|
||
|
||
// warmupCache 启动时预热 7 个看板 RPC 缓存
|
||
// - 用 hardcoded sample star_ids(生产可改为查 public.stars 取前 N)
|
||
// - 每个 RPC 独立 cache key,并发触发
|
||
// - 异步 + 超时控制,不阻塞启动
|
||
func warmupCache(p *provider.StatisticCombinedProvider) {
|
||
// 延迟启动(等服务注册完成 + 第一次 MV 刷新就绪)
|
||
time.Sleep(15 * time.Second)
|
||
|
||
// 预热前 5 个 star(生产环境应改为查 public.stars)
|
||
sampleStarIDs := []int64{1, 2, 3, 4, 5}
|
||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||
defer cancel()
|
||
|
||
logger.Logger.Info("warmup: starting 7 RPCs x 5 stars", zap.Int("stars", len(sampleStarIDs)))
|
||
for _, starID := range sampleStarIDs {
|
||
starID := starID
|
||
go func() { _, _ = p.GetTodayOverview(ctx, &pb.GetTodayOverviewRequest{StarId: starID}) }()
|
||
go func() { _, _ = p.Get7DayIncomeCurve(ctx, &pb.Get7DayIncomeCurveRequest{StarId: starID}) }()
|
||
go func() { _, _ = p.GetExhibitionIncomeSummary(ctx, &pb.GetExhibitionIncomeSummaryRequest{StarId: starID}) }()
|
||
go func() { _, _ = p.GetLikeIncomeByLevel(ctx, &pb.GetLikeIncomeByLevelRequest{StarId: starID}) }()
|
||
go func() { _, _ = p.GetTopAssetsByEarning(ctx, &pb.GetTopAssetsByEarningRequest{StarId: starID}) }()
|
||
go func() { _, _ = p.GetAssetLevelDistribution(ctx, &pb.GetAssetLevelDistributionRequest{StarId: starID}) }()
|
||
go func() { _, _ = p.GetAssetUpgradeProgress(ctx, &pb.GetAssetUpgradeProgressRequest{StarId: starID}) }()
|
||
}
|
||
}
|