232 lines
6.2 KiB
Go
232 lines
6.2 KiB
Go
package main
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"syscall"
|
|
|
|
"dubbo.apache.org/dubbo-go/v3/client"
|
|
_ "dubbo.apache.org/dubbo-go/v3/imports"
|
|
"dubbo.apache.org/dubbo-go/v3/protocol"
|
|
"dubbo.apache.org/dubbo-go/v3/server"
|
|
|
|
"github.com/topfans/backend/pkg/database"
|
|
"github.com/topfans/backend/pkg/health"
|
|
"github.com/topfans/backend/pkg/logger"
|
|
"github.com/topfans/backend/pkg/models"
|
|
pb "github.com/topfans/backend/pkg/proto/activity"
|
|
pbUser "github.com/topfans/backend/pkg/proto/user"
|
|
activityClient "github.com/topfans/backend/services/activityService/client"
|
|
"github.com/topfans/backend/services/activityService/provider"
|
|
"github.com/topfans/backend/services/activityService/repository"
|
|
"github.com/topfans/backend/services/activityService/service"
|
|
)
|
|
|
|
var (
|
|
port = flag.Int("port", getEnvInt("PORT", 20004), "Dubbo service port")
|
|
dbHost = flag.String("db-host", getEnv("DB_HOST", "localhost"), "Database host")
|
|
dbPort = flag.Int("db-port", getEnvInt("DB_PORT", 5432), "Database port")
|
|
dbUser = flag.String("db-user", getEnv("DB_USER", "postgres"), "Database user")
|
|
dbPassword = flag.String("db-password", getEnv("DB_PASSWORD", ""), "Database password")
|
|
dbName = flag.String("db-name", getEnv("DB_NAME", "top-fans"), "Database name")
|
|
userServiceURL = flag.String("user-service-url", getEnv("USER_SERVICE_URL", "tri://localhost:20000"), "User service URL")
|
|
healthHandler *health.Handler
|
|
)
|
|
|
|
func getEnv(key, fallback string) string {
|
|
if v := os.Getenv(key); v != "" {
|
|
return v
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func getEnvInt(key string, fallback int) int {
|
|
if v := os.Getenv(key); v != "" {
|
|
if n, err := strconv.Atoi(v); err == nil {
|
|
return n
|
|
}
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
// 初始化日志(必须在最前面)
|
|
env := os.Getenv("ENV")
|
|
if env == "" {
|
|
env = "development"
|
|
}
|
|
|
|
if err := logger.Init(logger.Config{
|
|
ServiceName: "activity-service",
|
|
Environment: env,
|
|
LogLevel: os.Getenv("LOG_LEVEL"),
|
|
}); err != nil {
|
|
panic(fmt.Sprintf("Failed to initialize logger: %v", err))
|
|
}
|
|
defer logger.Sync()
|
|
|
|
logger.Sugar.Info("Starting Activity Service...")
|
|
|
|
// 初始化数据库
|
|
if err := initDatabase(); err != nil {
|
|
logger.Sugar.Fatalf("Failed to initialize database: %v", err)
|
|
}
|
|
|
|
// 自动迁移数据库表
|
|
if err := autoMigrate(); err != nil {
|
|
logger.Sugar.Fatalf("Failed to migrate database: %v", err)
|
|
}
|
|
|
|
// 初始化 Repository
|
|
activityRepo := repository.NewActivityRepository()
|
|
|
|
// 初始化 User RPC Client
|
|
userRPCClient, err := createUserRPCClient()
|
|
if err != nil {
|
|
logger.Sugar.Fatalf("Failed to create user RPC client: %v", err)
|
|
}
|
|
|
|
// 初始化 Service
|
|
activityService := service.NewActivityService(activityRepo, userRPCClient)
|
|
|
|
// 初始化 Provider
|
|
activityProvider := provider.NewActivityProvider(activityService)
|
|
|
|
// 初始化 Dubbo-go 服务器
|
|
if err := initDubboService(activityProvider); err != nil {
|
|
logger.Sugar.Fatalf("Failed to initialize Dubbo service: %v", err)
|
|
}
|
|
|
|
// 等待信号(优雅关闭)
|
|
logger.Sugar.Info("Activity service started successfully. Press Ctrl+C to exit.")
|
|
gracefulShutdown()
|
|
}
|
|
|
|
// initDatabase 初始化数据库连接
|
|
func initDatabase() error {
|
|
config := database.Config{
|
|
Host: *dbHost,
|
|
Port: *dbPort,
|
|
User: *dbUser,
|
|
Password: *dbPassword,
|
|
DBName: *dbName,
|
|
SSLMode: "disable",
|
|
TimeZone: "Asia/Shanghai",
|
|
}
|
|
|
|
return database.Init(config)
|
|
}
|
|
|
|
// autoMigrate 自动迁移数据库表
|
|
func autoMigrate() error {
|
|
db := database.GetDB()
|
|
if db == nil {
|
|
return fmt.Errorf("database is not initialized")
|
|
}
|
|
|
|
// 迁移活动相关的表
|
|
tables := []interface{}{
|
|
&models.Activity{},
|
|
&models.ActivityItem{},
|
|
&models.ActivityContribution{},
|
|
&models.ActivityUserStats{},
|
|
}
|
|
|
|
for _, table := range tables {
|
|
if err := db.AutoMigrate(table); err != nil {
|
|
return fmt.Errorf("failed to migrate table: %w", err)
|
|
}
|
|
}
|
|
|
|
logger.Sugar.Info("Database migration completed successfully")
|
|
return nil
|
|
}
|
|
|
|
// createUserRPCClient 创建User Service RPC客户端
|
|
func createUserRPCClient() (activityClient.UserRPCClient, error) {
|
|
// 创建 Dubbo 客户端(直连模式)
|
|
cli, err := client.NewClient(
|
|
client.WithClientURL(*userServiceURL),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Dubbo client: %w", err)
|
|
}
|
|
|
|
// 创建 UserSocialService 客户端
|
|
_, err = pbUser.NewUserSocialService(cli)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create UserSocialService: %w", err)
|
|
}
|
|
|
|
// 创建我们的 RPC 客户端封装
|
|
userRPCClient, err := activityClient.NewUserRPCClient(cli)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create user RPC client: %w", err)
|
|
}
|
|
|
|
logger.Sugar.Infof("User service client created successfully, url: %s", *userServiceURL)
|
|
|
|
return userRPCClient, nil
|
|
}
|
|
|
|
// initDubboService 初始化 Dubbo 服务
|
|
func initDubboService(activityProvider *provider.ActivityProvider) error {
|
|
// 启动健康检查 HTTP 服务器
|
|
healthPort := *port + 1000 // e.g., 20004 -> 21004
|
|
healthHandler = health.NewHandler("activity-service", healthPort)
|
|
healthHandler.Start()
|
|
|
|
// 创建 Dubbo Server
|
|
srv, err := server.NewServer(
|
|
server.WithServerProtocol(
|
|
protocol.WithPort(*port),
|
|
protocol.WithTriple(),
|
|
),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create Dubbo server: %w", err)
|
|
}
|
|
|
|
// 注册服务
|
|
if err := pb.RegisterActivityServiceHandler(srv, activityProvider); err != nil {
|
|
return fmt.Errorf("failed to register ActivityService handler: %w", err)
|
|
}
|
|
|
|
logger.Sugar.Infof("Dubbo-go provider registered successfully, service: topfans.activity.ActivityService, port: %d", *port)
|
|
|
|
// 在后台启动 Dubbo 服务器
|
|
go func() {
|
|
if err := srv.Serve(); err != nil {
|
|
logger.Sugar.Fatalf("Failed to serve Dubbo: %v", err)
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// gracefulShutdown 优雅关闭
|
|
func gracefulShutdown() {
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
|
<-quit
|
|
|
|
logger.Sugar.Info("Shutting down Activity Service...")
|
|
|
|
// 关闭健康检查服务器
|
|
if healthHandler != nil {
|
|
healthHandler.Stop()
|
|
}
|
|
|
|
// 关闭数据库连接
|
|
if err := database.Close(); err != nil {
|
|
logger.Sugar.Errorf("Error closing database: %v", err)
|
|
}
|
|
|
|
logger.Sugar.Info("Activity Service stopped")
|
|
}
|