diff --git a/backend/proto/task.proto b/backend/proto/task.proto new file mode 100644 index 0000000..28891ef --- /dev/null +++ b/backend/proto/task.proto @@ -0,0 +1,241 @@ +syntax = "proto3"; + +package topfans.task; + +option go_package = "github.com/topfans/backend/pkg/proto/task;task"; + +import "proto/common.proto"; +import "google/api/annotations.proto"; + +// ==================== 每日任务 ==================== + +message DailyTaskItem { + string task_key = 1; + int64 star_id = 2; + string name = 3; + string description = 4; + int64 crystal_reward = 5; + int64 exp_reward = 6; + string status = 7; // pending/completed/claimed + bool can_claim = 8; +} + +message GetDailyTasksRequest { + int64 star_id = 1; +} + +message GetDailyTasksResponse { + topfans.common.BaseResponse base = 1; + int64 star_id = 2; + repeated DailyTaskItem tasks = 3; +} + +message ReportEventRequest { + string event_type = 1; // 如 "daily_browse_asset", "daily_login" + int64 star_id = 2; +} + +message ReportEventResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; + string task_key = 3; + bool task_completed = 4; + string message = 5; +} + +message ClaimDailyTaskRequest { + string task_key = 1; + int64 star_id = 2; +} + +message ClaimDailyTaskResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +message ClaimAllDailyTasksRequest { + int64 star_id = 1; +} + +message ClaimAllDailyTasksResponse { + topfans.common.BaseResponse base = 1; + int32 claimed_count = 2; +} + +// ==================== 引导任务 ==================== + +message OnboardingStage { + int32 stage = 1; + string name = 2; + repeated string required_task_keys = 3; + int64 crystal_reward = 4; + int64 exp_reward = 5; + string status = 6; // pending/completed/in_progress + bool is_current = 7; +} + +message CompleteGuideRequest { + string task_key = 1; +} + +message CompleteGuideResponse { + topfans.common.BaseResponse base = 1; + int64 user_id = 2; + int32 current_stage = 3; + string status = 4; // pending/in_progress/completed + repeated OnboardingStage stages = 5; +} + +message GetOnboardingStatusRequest {} + +message GetOnboardingStatusResponse { + topfans.common.BaseResponse base = 1; + int64 user_id = 2; + int32 current_stage = 3; + string status = 4; + repeated OnboardingStage stages = 5; +} + +message AdvanceStageRequest { + int32 target_stage = 1; +} + +message AdvanceStageResponse { + topfans.common.BaseResponse base = 1; + int32 current_stage = 2; + string status = 3; + repeated OnboardingStage stages = 4; +} + +message ClaimOnboardingRewardRequest { + int32 stage = 1; +} + +message ClaimOnboardingRewardResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +// ==================== 展示收益 ==================== + +message ExhibitionRevenueItem { + int64 id = 1; + int64 star_id = 2; + int64 exhibition_id = 3; + int64 asset_id = 4; + int64 slot_id = 5; + string slot_type = 6; // own/friend + int64 crystal_amount = 7; + int64 cycle_start_time = 8; + int64 cycle_end_time = 9; + string status = 10; // claimable/claimed/failed + bool can_claim = 11; +} + +message GetExhibitionRevenueRequest { + int64 star_id = 1; + string status = 2; // 可选筛选 + int32 page = 3; + int32 page_size = 4; +} + +message GetExhibitionRevenueResponse { + topfans.common.BaseResponse base = 1; + repeated ExhibitionRevenueItem items = 2; + int64 total = 3; + int32 page = 4; + int32 page_size = 5; +} + +message ClaimExhibitionRevenueRequest { + int64 revenue_id = 1; + int64 star_id = 2; +} + +message ClaimExhibitionRevenueResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +message ClaimAllExhibitionRevenueRequest { + int64 star_id = 1; +} + +message ClaimAllExhibitionRevenueResponse { + topfans.common.BaseResponse base = 1; + int32 claimed_count = 2; +} + +// ==================== 内部RPC服务 ==================== + +message InitUserTasksRequest { + int64 user_id = 1; + int64 star_id = 2; +} + +message InitUserTasksResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +message OnExhibitionCompletedRequest { + int64 exhibition_id = 1; + int64 asset_id = 2; + int64 slot_id = 3; + int64 occupier_uid = 4; + int64 occupier_star_id = 5; + int64 slot_owner_uid = 6; + int64 crystal_amount = 7; + int64 start_time = 8; + int64 expire_at = 9; +} + +message OnExhibitionCompletedResponse { + topfans.common.BaseResponse base = 1; + int64 revenue_record_id = 2; +} + +// ==================== Mobile API Service ==================== + +service TaskMobileService { + rpc GetDailyTasks(GetDailyTasksRequest) returns (GetDailyTasksResponse) { + option (google.api.http) = { get: "/api/tasks/daily"; }; + } + rpc ReportEvent(ReportEventRequest) returns (ReportEventResponse) { + option (google.api.http) = { post: "/api/tasks/report-event"; body: "*"; }; + } + rpc ClaimDailyTask(ClaimDailyTaskRequest) returns (ClaimDailyTaskResponse) { + option (google.api.http) = { post: "/api/tasks/daily/claim"; body: "*"; }; + } + rpc ClaimAllDailyTasks(ClaimAllDailyTasksRequest) returns (ClaimAllDailyTasksResponse) { + option (google.api.http) = { post: "/api/tasks/daily/claim-all"; body: "*"; }; + } + rpc CompleteGuide(CompleteGuideRequest) returns (CompleteGuideResponse) { + option (google.api.http) = { post: "/api/tasks/guide/complete"; body: "*"; }; + } + rpc GetOnboardingStatus(GetOnboardingStatusRequest) returns (GetOnboardingStatusResponse) { + option (google.api.http) = { get: "/api/tasks/onboarding/status"; }; + } + rpc AdvanceStage(AdvanceStageRequest) returns (AdvanceStageResponse) { + option (google.api.http) = { post: "/api/tasks/onboarding/advance-stage"; body: "*"; }; + } + rpc ClaimOnboardingReward(ClaimOnboardingRewardRequest) returns (ClaimOnboardingRewardResponse) { + option (google.api.http) = { post: "/api/tasks/onboarding/claim-reward"; body: "*"; }; + } + rpc GetExhibitionRevenue(GetExhibitionRevenueRequest) returns (GetExhibitionRevenueResponse) { + option (google.api.http) = { get: "/api/tasks/exhibition-revenue"; }; + } + rpc ClaimExhibitionRevenue(ClaimExhibitionRevenueRequest) returns (ClaimExhibitionRevenueResponse) { + option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim"; body: "*"; }; + } + rpc ClaimAllExhibitionRevenue(ClaimAllExhibitionRevenueRequest) returns (ClaimAllExhibitionRevenueResponse) { + option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim-all"; body: "*"; }; + } +} + +// ==================== Internal RPC Service ==================== + +service TaskInternalService { + rpc InitUserTasks(InitUserTasksRequest) returns (InitUserTasksResponse); + rpc OnExhibitionCompleted(OnExhibitionCompletedRequest) returns (OnExhibitionCompletedResponse); +} diff --git a/backend/proto/user.proto b/backend/proto/user.proto index 8e596bb..f4c3a73 100644 --- a/backend/proto/user.proto +++ b/backend/proto/user.proto @@ -211,6 +211,19 @@ message UpdateAssetsCountResponse { int32 new_count = 2; // 更新后的资产数量 } +// 增加经验值请求(内部RPC调用,用于taskService增加经验) +message AddExperienceRequest { + int64 user_id = 1; // 用户ID + int64 star_id = 2; // 明星ID + int64 delta = 3; // 变化量(正数增加,负数减少) +} + +// 增加经验值响应 +message AddExperienceResponse { + topfans.common.BaseResponse base = 1; + int64 new_experience = 2; // 更新后的经验值 +} + // 获取当前登录用户信息请求 message GetCurrentUserRequest { // 空请求,从Token中获取user_id和star_id @@ -401,6 +414,9 @@ service UserSocialService { // 内部RPC:更新资产数量(仅供assetService调用) rpc UpdateAssetsCount(UpdateAssetsCountRequest) returns (UpdateAssetsCountResponse); + // 内部RPC:增加经验值(仅供taskService调用) + rpc AddExperience(AddExperienceRequest) returns (AddExperienceResponse); + rpc GetCurrentUser(GetCurrentUserRequest) returns (GetCurrentUserResponse) { option (google.api.http) = { get: "/api/v1/auth/me" diff --git a/backend/scripts/compile-proto.sh b/backend/scripts/compile-proto.sh index a638704..3022577 100755 --- a/backend/scripts/compile-proto.sh +++ b/backend/scripts/compile-proto.sh @@ -53,7 +53,7 @@ echo "" # 预先创建目标目录 echo "📁 创建目标目录..." -for name in common user social asset gallery ranking activity; do +for name in common user social asset gallery ranking activity task; do mkdir -p "pkg/proto/$name" done echo "" @@ -146,6 +146,19 @@ protoc --proto_path=proto \ echo "✅ activity.proto 编译完成" echo "" +# 编译 task.proto +echo "📦 编译 task.proto ..." +protoc --proto_path=proto \ + --proto_path=. \ + --go_out=pkg/proto/task \ + --go_opt=paths=source_relative \ + --go-triple_out=pkg/proto/task \ + --go-triple_opt=paths=source_relative \ + task.proto + +echo "✅ task.proto 编译完成" +echo "" + # 清理可能存在的冗余目录和文件 echo "🔄 清理冗余文件..." @@ -156,7 +169,7 @@ if [ -d "github.com" ]; then fi # 删除 proto 目录下的生成文件(如果存在) -for name in common user social asset gallery ranking activity; do +for name in common user social asset gallery ranking activity task; do if [ -f "proto/$name.pb.go" ]; then rm "proto/$name.pb.go" echo " ✅ proto/$name.pb.go 已清理" diff --git a/backend/services/taskService/client/user_rpc_client.go b/backend/services/taskService/client/user_rpc_client.go new file mode 100644 index 0000000..15ed003 --- /dev/null +++ b/backend/services/taskService/client/user_rpc_client.go @@ -0,0 +1,55 @@ +package client + +import ( + "context" + "github.com/topfans/backend/pkg/logger" + pbUser "github.com/topfans/backend/pkg/proto/user" + "go.uber.org/zap" +) + +type UserServiceClient interface { + UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error) + AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error) +} + +type userServiceClient struct { + client pbUser.UserSocialService +} + +func NewUserServiceClient(client pbUser.UserSocialService) UserServiceClient { + return &userServiceClient{client: client} +} + +func (c *userServiceClient) UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error) { + logger.Logger.Debug("Calling UserService.UpdateCrystalBalance", + zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta)) + resp, err := c.client.UpdateCrystalBalance(ctx, &pbUser.UpdateCrystalBalanceRequest{ + UserId: userID, StarId: starID, Delta: delta, + }) + if err != nil { + logger.Logger.Error("UserService.UpdateCrystalBalance failed", zap.Error(err)) + return 0, err + } + if resp.Base.Code != 0 { + logger.Logger.Warn("UpdateCrystalBalance non-zero code", zap.Int32("code", int32(resp.Base.Code))) + return 0, err + } + return resp.NewBalance, nil +} + +func (c *userServiceClient) AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error) { + logger.Logger.Debug("Calling UserService.AddExperience", + zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta)) + resp, err := c.client.AddExperience(ctx, &pbUser.AddExperienceRequest{ + UserId: userID, StarId: starID, Delta: delta, + }) + if err != nil { + logger.Logger.Error("UserService.AddExperience failed", zap.Error(err)) + return 0, err + } + if resp.Base.Code != 0 { + logger.Logger.Warn("AddExperience non-zero code", zap.Int32("code", int32(resp.Base.Code))) + return 0, err + } + return resp.NewExperience, nil +} diff --git a/backend/services/taskService/config/task_config.go b/backend/services/taskService/config/task_config.go new file mode 100644 index 0000000..010f202 --- /dev/null +++ b/backend/services/taskService/config/task_config.go @@ -0,0 +1,64 @@ +package config + +import ( + "flag" + "log" + "os" + "strconv" +) + +type DatabaseConfig struct { + Host, Password, DBName, SSLMode, TimeZone string + Port int + User string +} + +type ServiceURLs struct { + UserService string +} + +type WorkerConfig struct { + ResetHour, ResetMinute int + RevenueBatchSize int + RevenueMaxRetries int +} + +var ( + DBConfig = &DatabaseConfig{} + ServiceURLsConfig = &ServiceURLs{UserService: "tri://localhost:20000"} + WorkerConfigData = &WorkerConfig{ + ResetHour: 5, ResetMinute: 0, + RevenueBatchSize: 100, RevenueMaxRetries: 3, + } +) + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { return v } + return fallback +} + +func getEnvInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { return n } + } + return fallback +} + +func InitConfig() { + flag.StringVar(&DBConfig.Host, "db-host", getEnv("DB_HOST", "localhost"), "数据库主机") + flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("DB_PORT", 5432), "数据库端口") + flag.StringVar(&DBConfig.User, "db-user", getEnv("DB_USER", "postgres"), "数据库用户名") + flag.StringVar(&DBConfig.Password, "db-password", getEnv("DB_PASSWORD", ""), "数据库密码") + flag.StringVar(&DBConfig.DBName, "db-name", getEnv("DB_NAME", "topfans"), "数据库名称") + flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "数据库 SSL 模式") + flag.StringVar(&ServiceURLsConfig.UserService, "user-service-url", getEnv("USER_SERVICE_URL", "tri://localhost:20000"), "User Service RPC 地址") + flag.IntVar(&WorkerConfigData.ResetHour, "reset-hour", getEnvInt("RESET_HOUR", 5), "每日重置小时(Asia/Shanghai)") + flag.IntVar(&WorkerConfigData.ResetMinute, "reset-minute", getEnvInt("RESET_MINUTE", 0), "每日重置分钟") + flag.IntVar(&WorkerConfigData.RevenueBatchSize, "revenue-batch-size", getEnvInt("REVENUE_BATCH_SIZE", 100), "收益自动发放批次大小") + flag.IntVar(&WorkerConfigData.RevenueMaxRetries, "revenue-max-retries", getEnvInt("REVENUE_MAX_RETRIES", 3), "收益自动发放最大重试次数") + flag.Parse() + log.Println("taskService 配置初始化完成") + log.Printf(" 数据库: %s:%d/%s", DBConfig.Host, DBConfig.Port, DBConfig.DBName) + log.Printf(" User Service: %s", ServiceURLsConfig.UserService) + log.Printf(" 重置时间: %02d:%02d Asia/Shanghai", WorkerConfigData.ResetHour, WorkerConfigData.ResetMinute) +} diff --git a/backend/services/taskService/main.go b/backend/services/taskService/main.go new file mode 100644 index 0000000..4467706 --- /dev/null +++ b/backend/services/taskService/main.go @@ -0,0 +1,132 @@ +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + dubboclient "dubbo.apache.org/dubbo-go/v3/client" + _ "dubbo.apache.org/dubbo-go/v3/imports" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/server" + + "github.com/topfans/backend/pkg/database" + "github.com/topfans/backend/pkg/logger" + pbUser "github.com/topfans/backend/pkg/proto/user" + "github.com/topfans/backend/services/taskService/client" + "github.com/topfans/backend/services/taskService/config" + "github.com/topfans/backend/services/taskService/model" + "github.com/topfans/backend/services/taskService/provider" + "github.com/topfans/backend/services/taskService/repository" + "github.com/topfans/backend/services/taskService/service" + "github.com/topfans/backend/services/taskService/worker" +) + +var port = flag.Int("port", 20005, "Dubbo service port") + +func main() { + flag.Parse() + + // 1. Init logger(必须最前) + env := os.Getenv("ENV") + if env == "" { + env = "development" + } + if err := logger.Init(logger.Config{ServiceName: "task-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil { + panic(fmt.Sprintf("Failed to init logger: %v", err)) + } + defer logger.Sync() + logger.Logger.Info("Starting taskService...") + + // 2. Init config(读取 flags/env) + config.InitConfig() + + // 3. Init database + auto-migrate + dbConfig := database.Config{ + Host: config.DBConfig.Host, + Port: config.DBConfig.Port, + User: config.DBConfig.User, + Password: config.DBConfig.Password, + DBName: config.DBConfig.DBName, + SSLMode: config.DBConfig.SSLMode, + TimeZone: "Asia/Shanghai", + } + if err := database.Init(dbConfig); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to init DB: %v", err)) + } + db := database.GetDB() + if err := db.AutoMigrate( + &model.TaskDefinition{}, + &model.UserDailyTaskProgress{}, + &model.UserOnboardingProgress{}, + &model.UserOnboardingStatus{}, + &model.OnboardingStageConfig{}, + &model.ExhibitionRevenueRecord{}, + &model.TaskResetLog{}, + ); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to migrate tables: %v", err)) + } + logger.Logger.Info("Database initialized") + + // 4. Init repositories + dailyRepo := repository.NewDailyTaskRepository(db) + onboardingRepo := repository.NewOnboardingRepository(db) + revenueRepo := repository.NewRevenueRepository(db) + logger.Logger.Info("Repositories initialized") + + // 5. Init userService Dubbo client + RPC client + userCli, err := dubboclient.NewClient( + dubboclient.WithClientURL(config.ServiceURLsConfig.UserService), + ) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to create userService client: %v", err)) + } + userServiceClient, err := pbUser.NewUserSocialService(userCli) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to get userService: %v", err)) + } + userRPCClient := client.NewUserServiceClient(userServiceClient) + logger.Logger.Info("User RPC client initialized") + + // 6. Init services + dailySvc := service.NewDailyTaskService(dailyRepo, userRPCClient) + onboardingSvc := service.NewOnboardingService(onboardingRepo, dailyRepo, userRPCClient) + revenueSvc := service.NewRevenueService(revenueRepo, userRPCClient) + logger.Logger.Info("Services initialized") + + // 7. Init worker(goroutine 中启动) + resetWorker := worker.NewDailyResetWorker(dailyRepo, revenueRepo, userRPCClient) + go resetWorker.Start() + logger.Logger.Info("Reset worker started") + + // 8. Init providers + mobileProvider := provider.NewTaskMobileProvider(dailySvc, onboardingSvc, revenueSvc) + internalProvider := provider.NewTaskInternalProvider(onboardingSvc, revenueSvc) + logger.Logger.Info("Providers initialized") + + // 9. Create Dubbo server on port 20005 + srv, err := server.NewServer( + server.WithServerProtocol( + protocol.WithPort(*port), + protocol.WithTriple(), + ), + ) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to create server: %v", err)) + } + + // 注意:这里需要 TaskMobileService 和 TaskInternalService 的 RegisterHandler + // 这些会在 proto 编译后生成。在 proto 编译之前,这里会报编译错误。 + // Task 15 完成 proto 编译后,需要将下面的注释替换为实际的注册代码。 + + logger.Logger.Info(fmt.Sprintf("taskService configured on port %d, awaiting proto registration", *port)) + + // 10. Graceful shutdown + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + logger.Logger.Info("Shutting down taskService...") + resetWorker.Stop() +} diff --git a/backend/services/taskService/model/task_models.go b/backend/services/taskService/model/task_models.go new file mode 100644 index 0000000..d907b0a --- /dev/null +++ b/backend/services/taskService/model/task_models.go @@ -0,0 +1,110 @@ +package model + +// TaskDefinition 任务定义表 +type TaskDefinition struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + StarID *int64 `gorm:"column:star_id;index"` // NULL=全局默认 + TaskKey string `gorm:"column:task_key;size:50;not null"` + TaskType string `gorm:"column:task_type;size:20;not null"` // daily/onboarding + Name string `gorm:"column:name;size:100;not null"` + Description string `gorm:"column:description;type:text"` + CrystalReward int64 `gorm:"column:crystal_reward;default:0"` + ExpReward int64 `gorm:"column:exp_reward;default:0"` + SortOrder int `gorm:"column:sort_order;default:0"` + IsActive bool `gorm:"column:is_active;default:true"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (TaskDefinition) TableName() string { return "task_definitions" } + +// UserDailyTaskProgress 每日任务进度表 +type UserDailyTaskProgress struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + UserID int64 `gorm:"column:user_id;not null;index:idx_daily_user_star_key"` + StarID int64 `gorm:"column:star_id;not null;index:idx_daily_user_star_key"` + TaskKey string `gorm:"column:task_key;size:50;not null;index:idx_daily_user_star_key"` + Status string `gorm:"column:status;size:20;default:pending"` // pending/completed/claimed + CompletedAt *int64 `gorm:"column:completed_at"` + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (UserDailyTaskProgress) TableName() string { return "user_daily_task_progress" } + +// UserOnboardingProgress 引导任务进度表 +type UserOnboardingProgress struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + UserID int64 `gorm:"column:user_id;not null;index:idx_onboard_user_key"` + TaskKey string `gorm:"column:task_key;size:50;not null;index:idx_onboard_user_key"` + Status string `gorm:"column:status;size:20;default:pending"` + CompletedAt *int64 `gorm:"column:completed_at"` + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (UserOnboardingProgress) TableName() string { return "user_onboarding_progress" } + +// UserOnboardingStatus 引导流程状态表(per-user,非 per-star) +type UserOnboardingStatus struct { + UserID int64 `gorm:"primaryKey;column:user_id"` + CurrentStage int `gorm:"column:current_stage;default:0"` + Status string `gorm:"column:status;size:20;default:pending"` + IsFirstLoginBonusClaimed bool `gorm:"column:is_first_login_bonus_claimed;default:false"` // 废弃字段 + HasFriendDisplayBonus bool `gorm:"column:has_friend_display_bonus;default:false"` // 废弃字段 + CompletedAt *int64 `gorm:"column:completed_at"` + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (UserOnboardingStatus) TableName() string { return "user_onboarding_status" } + +// OnboardingStageConfig 引导阶段配置表 +type OnboardingStageConfig struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + Stage int `gorm:"column:stage;not null;uniqueIndex"` + Name string `gorm:"column:name;size:100;not null"` + Description string `gorm:"column:description;type:text"` + RequiredTaskKeys []string `gorm:"column:required_task_keys;text[]"` // PostgreSQL 数组 + CrystalReward int64 `gorm:"column:crystal_reward;default:0"` + ExpReward int64 `gorm:"column:exp_reward;default:0"` + SortOrder int `gorm:"column:sort_order;default:0"` + IsActive bool `gorm:"column:is_active;default:true"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (OnboardingStageConfig) TableName() string { return "onboarding_stage_config" } + +// ExhibitionRevenueRecord 展示收益记录表 +type ExhibitionRevenueRecord struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + UserID int64 `gorm:"column:user_id;not null;index:idx_revenue_user_star"` + StarID int64 `gorm:"column:star_id;not null;index:idx_revenue_user_star"` + ExhibitionID int64 `gorm:"column:exhibition_id;not null"` + AssetID int64 `gorm:"column:asset_id;not null"` + SlotID int64 `gorm:"column:slot_id;not null"` + SlotOwnerUID int64 `gorm:"column:slot_owner_uid;not null"` + SlotType string `gorm:"column:slot_type;size:20;not null"` // own/friend + CrystalAmount int64 `gorm:"column:crystal_amount;not null"` + CycleStartTime int64 `gorm:"column:cycle_start_time;not null"` + CycleEndTime int64 `gorm:"column:cycle_end_time;not null"` + Status string `gorm:"column:status;size:20;default:claimable"` // claimable/claimed/failed + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` +} + +func (ExhibitionRevenueRecord) TableName() string { return "exhibition_revenue_records" } + +// TaskResetLog 重置日志表 +type TaskResetLog struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + ResetType string `gorm:"column:reset_type;size:20;not null"` // daily + LastResetAt int64 `gorm:"column:last_reset_at;not null"` + CreatedAt int64 `gorm:"column:created_at"` +} + +func (TaskResetLog) TableName() string { return "task_reset_log" } diff --git a/backend/services/taskService/provider/task_internal_provider.go b/backend/services/taskService/provider/task_internal_provider.go new file mode 100644 index 0000000..d194d20 --- /dev/null +++ b/backend/services/taskService/provider/task_internal_provider.go @@ -0,0 +1,80 @@ +package provider + +import ( + "context" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/task" + "github.com/topfans/backend/services/taskService/service" + "go.uber.org/zap" +) + +// TaskInternalProvider 实现 TaskInternalService 接口(内部 RPC) +type TaskInternalProvider struct { + onboardingSvc service.OnboardingService + revenueSvc service.RevenueService +} + +func NewTaskInternalProvider( + onboardingSvc service.OnboardingService, + revenueSvc service.RevenueService, +) *TaskInternalProvider { + return &TaskInternalProvider{ + onboardingSvc: onboardingSvc, + revenueSvc: revenueSvc, + } +} + +// InitUserTasks 创建用户的 onboarding status 和每日任务进度 +// 由 userService 或其他服务在用户注册/新增粉丝身份时调用 +func (p *TaskInternalProvider) InitUserTasks(ctx context.Context, req *pb.InitUserTasksRequest) (*pb.InitUserTasksResponse, error) { + logger.Logger.Info("TaskInternalProvider.InitUserTasks called", + zap.Int64("user_id", req.UserId), + zap.Int64("star_id", req.StarId)) + + err := p.onboardingSvc.InitUserTasks(ctx, req.UserId, req.StarId) + if err != nil { + logger.Logger.Error("TaskInternalProvider.InitUserTasks failed", + zap.Int64("user_id", req.UserId), + zap.Int64("star_id", req.StarId), + zap.Error(err)) + return &pb.InitUserTasksResponse{ + Success: false, + }, nil + } + + logger.Logger.Info("TaskInternalProvider.InitUserTasks succeeded", + zap.Int64("user_id", req.UserId), + zap.Int64("star_id", req.StarId)) + + return &pb.InitUserTasksResponse{ + Success: true, + }, nil +} + +// OnExhibitionCompleted 当展位到期完成时由 galleryService 调用 +// 创建展示收益记录(仅在 slot_owner_uid != occupier_uid 时) +func (p *TaskInternalProvider) OnExhibitionCompleted(ctx context.Context, req *pb.OnExhibitionCompletedRequest) (*pb.OnExhibitionCompletedResponse, error) { + logger.Logger.Info("TaskInternalProvider.OnExhibitionCompleted called", + zap.Int64("exhibition_id", req.ExhibitionId), + zap.Int64("asset_id", req.AssetId), + zap.Int64("slot_id", req.SlotId), + zap.Int64("occupier_uid", req.OccupierUid), + zap.Int64("occupier_star_id", req.OccupierStarId), + zap.Int64("slot_owner_uid", req.SlotOwnerUid), + zap.Int64("crystal_amount", req.CrystalAmount)) + + resp, err := p.revenueSvc.OnExhibitionCompleted(ctx, req) + if err != nil { + logger.Logger.Error("TaskInternalProvider.OnExhibitionCompleted failed", + zap.Int64("exhibition_id", req.ExhibitionId), + zap.Error(err)) + return nil, err + } + + logger.Logger.Info("TaskInternalProvider.OnExhibitionCompleted succeeded", + zap.Int64("exhibition_id", req.ExhibitionId), + zap.Int64("revenue_record_id", resp.RevenueRecordId)) + + return resp, nil +} diff --git a/backend/services/taskService/provider/task_mobile_provider.go b/backend/services/taskService/provider/task_mobile_provider.go new file mode 100644 index 0000000..5601de0 --- /dev/null +++ b/backend/services/taskService/provider/task_mobile_provider.go @@ -0,0 +1,242 @@ +package provider + +import ( + "context" + "fmt" + "strconv" + + "github.com/topfans/backend/pkg/logger" + pb "github.com/topfans/backend/pkg/proto/task" + "github.com/topfans/backend/services/taskService/service" + "go.uber.org/zap" +) + +// TaskMobileProvider 实现 TaskMobileService 接口 +type TaskMobileProvider struct { + dailySvc service.DailyTaskService + onboardingSvc service.OnboardingService + revenueSvc service.RevenueService +} + +func NewTaskMobileProvider( + dailySvc service.DailyTaskService, + onboardingSvc service.OnboardingService, + revenueSvc service.RevenueService, +) *TaskMobileProvider { + return &TaskMobileProvider{ + dailySvc: dailySvc, + onboardingSvc: onboardingSvc, + revenueSvc: revenueSvc, + } +} + +// extractUserInfoFromDubboAttachments 从 Dubbo attachments 提取用户信息 +func extractUserInfoFromDubboAttachments(ctx context.Context) (int64, int64, error) { + // Dubbo-go 使用 constant.AttachmentKey 获取 attachments + // 但这里使用通用的 context.Value 方式 + if attachments := ctx.Value("attachments"); attachments != nil { + if attMap, ok := attachments.(map[string]interface{}); ok { + userID, starID := extractUserInfoFromMap(attMap) + if userID > 0 && starID > 0 { + return userID, starID, nil + } + } + } + return 0, 0, fmt.Errorf("failed to extract user info from Dubbo attachments") +} + +// extractUserInfoFromMap 从 map 中提取用户信息 +func extractUserInfoFromMap(attMap map[string]interface{}) (int64, int64) { + var userID, starID int64 + + if uid, ok := attMap["user_id"]; ok { + switch v := uid.(type) { + case int64: + userID = v + case float64: + userID = int64(v) + case string: + if parsed, err := strconv.ParseInt(v, 10, 64); err == nil { + userID = parsed + } + } + } + + if sid, ok := attMap["star_id"]; ok { + switch v := sid.(type) { + case int64: + starID = v + case float64: + starID = int64(v) + case string: + if parsed, err := strconv.ParseInt(v, 10, 64); err == nil { + starID = parsed + } + } + } + + return userID, starID +} + +func (p *TaskMobileProvider) GetDailyTasks(ctx context.Context, req *pb.GetDailyTasksRequest) (*pb.GetDailyTasksResponse, error) { + userID, starID, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("GetDailyTasks: failed to extract user", zap.Error(err)) + return &pb.GetDailyTasksResponse{ + Tasks: []*pb.DailyTaskItem{}, + }, nil + } + + logger.Logger.Debug("GetDailyTasks", + zap.Int64("user_id", userID), + zap.Int64("star_id", starID)) + + return p.dailySvc.GetDailyTasks(ctx, userID, starID) +} + +func (p *TaskMobileProvider) ReportEvent(ctx context.Context, req *pb.ReportEventRequest) (*pb.ReportEventResponse, error) { + userID, _, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("ReportEvent: failed to extract user", zap.Error(err)) + return &pb.ReportEventResponse{ + Success: false, + }, nil + } + + logger.Logger.Debug("ReportEvent", + zap.Int64("user_id", userID), + zap.String("event_type", req.EventType), + zap.Int64("star_id", req.StarId)) + + return p.dailySvc.ReportEvent(ctx, userID, req.StarId, req.EventType) +} + +func (p *TaskMobileProvider) ClaimDailyTask(ctx context.Context, req *pb.ClaimDailyTaskRequest) (*pb.ClaimDailyTaskResponse, error) { + userID, _, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("ClaimDailyTask: failed to extract user", zap.Error(err)) + return &pb.ClaimDailyTaskResponse{Success: false}, nil + } + + logger.Logger.Debug("ClaimDailyTask", + zap.Int64("user_id", userID), + zap.String("task_key", req.TaskKey), + zap.Int64("star_id", req.StarId)) + + return p.dailySvc.ClaimDailyTask(ctx, userID, req.StarId, req.TaskKey) +} + +func (p *TaskMobileProvider) ClaimAllDailyTasks(ctx context.Context, req *pb.ClaimAllDailyTasksRequest) (*pb.ClaimAllDailyTasksResponse, error) { + userID, _, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("ClaimAllDailyTasks: failed to extract user", zap.Error(err)) + return &pb.ClaimAllDailyTasksResponse{ClaimedCount: 0}, nil + } + + logger.Logger.Debug("ClaimAllDailyTasks", + zap.Int64("user_id", userID), + zap.Int64("star_id", req.StarId)) + + return p.dailySvc.ClaimAllDailyTasks(ctx, userID, req.StarId) +} + +func (p *TaskMobileProvider) CompleteGuide(ctx context.Context, req *pb.CompleteGuideRequest) (*pb.CompleteGuideResponse, error) { + userID, _, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("CompleteGuide: failed to extract user", zap.Error(err)) + return &pb.CompleteGuideResponse{}, nil + } + + logger.Logger.Debug("CompleteGuide", + zap.Int64("user_id", userID), + zap.String("task_key", req.TaskKey)) + + return p.onboardingSvc.CompleteGuide(ctx, userID, req.TaskKey) +} + +func (p *TaskMobileProvider) GetOnboardingStatus(ctx context.Context, req *pb.GetOnboardingStatusRequest) (*pb.GetOnboardingStatusResponse, error) { + userID, _, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("GetOnboardingStatus: failed to extract user", zap.Error(err)) + return &pb.GetOnboardingStatusResponse{}, nil + } + + logger.Logger.Debug("GetOnboardingStatus", + zap.Int64("user_id", userID)) + + return p.onboardingSvc.GetOnboardingStatus(ctx, userID) +} + +func (p *TaskMobileProvider) AdvanceStage(ctx context.Context, req *pb.AdvanceStageRequest) (*pb.AdvanceStageResponse, error) { + userID, _, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("AdvanceStage: failed to extract user", zap.Error(err)) + return &pb.AdvanceStageResponse{}, nil + } + + logger.Logger.Debug("AdvanceStage", + zap.Int64("user_id", userID), + zap.Int32("target_stage", req.TargetStage)) + + return p.onboardingSvc.AdvanceStage(ctx, userID, req.TargetStage) +} + +func (p *TaskMobileProvider) ClaimOnboardingReward(ctx context.Context, req *pb.ClaimOnboardingRewardRequest) (*pb.ClaimOnboardingRewardResponse, error) { + userID, _, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("ClaimOnboardingReward: failed to extract user", zap.Error(err)) + return &pb.ClaimOnboardingRewardResponse{Success: false}, nil + } + + logger.Logger.Debug("ClaimOnboardingReward", + zap.Int64("user_id", userID), + zap.Int32("stage", req.Stage)) + + return p.onboardingSvc.ClaimOnboardingReward(ctx, userID, req.Stage) +} + +func (p *TaskMobileProvider) GetExhibitionRevenue(ctx context.Context, req *pb.GetExhibitionRevenueRequest) (*pb.GetExhibitionRevenueResponse, error) { + userID, starID, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("GetExhibitionRevenue: failed to extract user", zap.Error(err)) + return &pb.GetExhibitionRevenueResponse{ + Items: []*pb.ExhibitionRevenueItem{}, + }, nil + } + + logger.Logger.Debug("GetExhibitionRevenue", + zap.Int64("user_id", userID), + zap.Int64("star_id", starID), + zap.String("status", req.Status)) + + return p.revenueSvc.GetExhibitionRevenue(ctx, userID, starID, req.Status, req.Page, req.PageSize) +} + +func (p *TaskMobileProvider) ClaimExhibitionRevenue(ctx context.Context, req *pb.ClaimExhibitionRevenueRequest) (*pb.ClaimExhibitionRevenueResponse, error) { + userID, starID, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("ClaimExhibitionRevenue: failed to extract user", zap.Error(err)) + return &pb.ClaimExhibitionRevenueResponse{Success: false}, nil + } + + logger.Logger.Debug("ClaimExhibitionRevenue", + zap.Int64("user_id", userID), + zap.Int64("star_id", starID), + zap.Int64("revenue_id", req.RevenueId)) + + return p.revenueSvc.ClaimExhibitionRevenue(ctx, userID, starID, req.RevenueId) +} + +func (p *TaskMobileProvider) ClaimAllExhibitionRevenue(ctx context.Context, req *pb.ClaimAllExhibitionRevenueRequest) (*pb.ClaimAllExhibitionRevenueResponse, error) { + userID, starID, err := extractUserInfoFromDubboAttachments(ctx) + if err != nil { + logger.Logger.Error("ClaimAllExhibitionRevenue: failed to extract user", zap.Error(err)) + return &pb.ClaimAllExhibitionRevenueResponse{ClaimedCount: 0}, nil + } + + logger.Logger.Debug("ClaimAllExhibitionRevenue", + zap.Int64("user_id", userID), + zap.Int64("star_id", starID)) + + return p.revenueSvc.ClaimAllExhibitionRevenue(ctx, userID, starID) +} diff --git a/backend/services/taskService/repository/daily_task_repo.go b/backend/services/taskService/repository/daily_task_repo.go new file mode 100644 index 0000000..45ea551 --- /dev/null +++ b/backend/services/taskService/repository/daily_task_repo.go @@ -0,0 +1,189 @@ +package repository + +import ( + "errors" + "time" + + "github.com/topfans/backend/services/taskService/model" + "gorm.io/gorm" +) + +// DailyTaskRepository 每日任务数据访问层接口 +type DailyTaskRepository interface { + // GetUserDailyProgress 获取用户指定每日任务的进度 + GetUserDailyProgress(userID, starID int64, taskKey string) (*model.UserDailyTaskProgress, error) + // GetOrCreateDailyProgress 获取或创建用户每日任务进度 + GetOrCreateDailyProgress(userID, starID int64, taskKey string, def *model.TaskDefinition) (*model.UserDailyTaskProgress, error) + // ListDailyTasksByUser 获取用户的所有每日任务进度 + ListDailyTasksByUser(userID, starID int64) ([]*model.UserDailyTaskProgress, error) + // ListActiveDailyTaskDefinitions 获取所有活跃的每日任务定义(包括star特定和全局默认) + ListActiveDailyTaskDefinitions(starID int64) ([]*model.TaskDefinition, error) + // UpdateDailyProgress 更新每日任务进度 + UpdateDailyProgress(progress *model.UserDailyTaskProgress) error + // ResetAllDailyTasks 重置所有非pending状态的每日任务为pending + ResetAllDailyTasks() (int64, error) + // InitDailyTasksForUser 为用户初始化该star的所有每日任务进度 + InitDailyTasksForUser(userID, starID int64) error + // AcquireAdvisoryLock 获取PostgreSQL advisory lock + AcquireAdvisoryLock(lockID int64) bool + // ReleaseAdvisoryLock 释放PostgreSQL advisory lock + ReleaseAdvisoryLock(lockID int64) +} + +// dailyTaskRepository Repository实现 +type dailyTaskRepository struct { + db *gorm.DB +} + +// NewDailyTaskRepository 创建Repository实例 +func NewDailyTaskRepository(db *gorm.DB) DailyTaskRepository { + return &dailyTaskRepository{db: db} +} + +// GetUserDailyProgress 获取用户指定每日任务的进度 +func (r *dailyTaskRepository) GetUserDailyProgress(userID, starID int64, taskKey string) (*model.UserDailyTaskProgress, error) { + var progress model.UserDailyTaskProgress + err := r.db.Where("user_id = ? AND star_id = ? AND task_key = ?", userID, starID, taskKey). + First(&progress).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, err + } + return &progress, nil +} + +// GetOrCreateDailyProgress 获取或创建用户每日任务进度 +func (r *dailyTaskRepository) GetOrCreateDailyProgress(userID, starID int64, taskKey string, def *model.TaskDefinition) (*model.UserDailyTaskProgress, error) { + progress, err := r.GetUserDailyProgress(userID, starID, taskKey) + if err != nil { + return nil, err + } + if progress != nil { + return progress, nil + } + + // 创建新的进度记录 + now := time.Now().UnixMilli() + progress = &model.UserDailyTaskProgress{ + UserID: userID, + StarID: starID, + TaskKey: taskKey, + Status: "pending", + CreatedAt: now, + UpdatedAt: now, + } + + if def != nil { + progress.CreatedAt = now + progress.UpdatedAt = now + } + + err = r.db.Create(progress).Error + if err != nil { + return nil, err + } + return progress, nil +} + +// ListDailyTasksByUser 获取用户的所有每日任务进度 +func (r *dailyTaskRepository) ListDailyTasksByUser(userID, starID int64) ([]*model.UserDailyTaskProgress, error) { + var progressList []*model.UserDailyTaskProgress + err := r.db.Where("user_id = ? AND star_id = ?", userID, starID). + Order("id ASC"). + Find(&progressList).Error + return progressList, err +} + +// ListActiveDailyTaskDefinitions 获取所有活跃的每日任务定义(包括star特定和全局默认) +func (r *dailyTaskRepository) ListActiveDailyTaskDefinitions(starID int64) ([]*model.TaskDefinition, error) { + var definitions []*model.TaskDefinition + err := r.db.Where("is_active = true AND (star_id = ? OR star_id IS NULL)", starID). + Where("task_type = ?", "daily"). + Order("sort_order ASC, id ASC"). + Find(&definitions).Error + return definitions, err +} + +// UpdateDailyProgress 更新每日任务进度 +func (r *dailyTaskRepository) UpdateDailyProgress(progress *model.UserDailyTaskProgress) error { + progress.UpdatedAt = time.Now().UnixMilli() + return r.db.Save(progress).Error +} + +// ResetAllDailyTasks 重置所有非pending状态的每日任务为pending +func (r *dailyTaskRepository) ResetAllDailyTasks() (int64, error) { + now := time.Now().UnixMilli() + result := r.db.Model(&model.UserDailyTaskProgress{}). + Where("status != ?", "pending"). + Updates(map[string]interface{}{ + "status": "pending", + "completed_at": nil, + "claimed_at": nil, + "updated_at": now, + }) + return result.RowsAffected, result.Error +} + +// InitDailyTasksForUser 为用户初始化该star的所有每日任务进度 +func (r *dailyTaskRepository) InitDailyTasksForUser(userID, starID int64) error { + // 获取所有活跃的每日任务定义 + definitions, err := r.ListActiveDailyTaskDefinitions(starID) + if err != nil { + return err + } + + if len(definitions) == 0 { + return nil + } + + now := time.Now().UnixMilli() + + return r.db.Transaction(func(tx *gorm.DB) error { + for _, def := range definitions { + // 检查是否已存在该任务的进度 + var count int64 + err := tx.Model(&model.UserDailyTaskProgress{}). + Where("user_id = ? AND star_id = ? AND task_key = ?", userID, starID, def.TaskKey). + Count(&count).Error + if err != nil { + return err + } + + // 如果已存在,跳过 + if count > 0 { + continue + } + + // 创建新的进度记录 + progress := &model.UserDailyTaskProgress{ + UserID: userID, + StarID: starID, + TaskKey: def.TaskKey, + Status: "pending", + CreatedAt: now, + UpdatedAt: now, + } + if err := tx.Create(progress).Error; err != nil { + return err + } + } + return nil + }) +} + +// AcquireAdvisoryLock 获取PostgreSQL advisory lock +func (r *dailyTaskRepository) AcquireAdvisoryLock(lockID int64) bool { + var result bool + err := r.db.Raw("SELECT pg_try_advisory_lock(?)", lockID).Scan(&result).Error + if err != nil { + return false + } + return result +} + +// ReleaseAdvisoryLock 释放PostgreSQL advisory lock +func (r *dailyTaskRepository) ReleaseAdvisoryLock(lockID int64) { + r.db.Exec("SELECT pg_advisory_unlock(?)", lockID) +} diff --git a/backend/services/taskService/repository/onboarding_repo.go b/backend/services/taskService/repository/onboarding_repo.go new file mode 100644 index 0000000..c08cfd9 --- /dev/null +++ b/backend/services/taskService/repository/onboarding_repo.go @@ -0,0 +1,117 @@ +package repository + +import ( + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/taskService/model" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type OnboardingRepository interface { + GetOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error) + GetOrCreateOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error) + UpdateOnboardingStatus(status *model.UserOnboardingStatus) error + GetUserOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error) + GetOrCreateOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error) + ListActiveStageConfigs() ([]*model.OnboardingStageConfig, error) + ListUserOnboardingProgressByUser(userID int64) ([]*model.UserOnboardingProgress, error) + GetStageConfig(stage int) (*model.OnboardingStageConfig, error) +} + +type onboardingRepository struct { + db *gorm.DB +} + +func NewOnboardingRepository(db *gorm.DB) OnboardingRepository { + return &onboardingRepository{db: db} +} + +func (r *onboardingRepository) GetOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error) { + var status model.UserOnboardingStatus + err := r.db.Where("user_id = ?", userID).First(&status).Error + if err != nil { + return nil, err + } + return &status, nil +} + +func (r *onboardingRepository) GetOrCreateOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error) { + var status model.UserOnboardingStatus + now := gorm.NowFunc().Unix() + err := r.db.Where("user_id = ?", userID).FirstOrCreate(&status, &model.UserOnboardingStatus{ + UserID: userID, + CurrentStage: 0, + Status: "pending", + CreatedAt: now, + UpdatedAt: now, + }).Error + if err != nil { + logger.Logger.Error("Failed to GetOrCreateOnboardingStatus", zap.Int64("user_id", userID), zap.Error(err)) + return nil, err + } + return &status, nil +} + +func (r *onboardingRepository) UpdateOnboardingStatus(status *model.UserOnboardingStatus) error { + status.UpdatedAt = gorm.NowFunc().Unix() + if err := r.db.Save(status).Error; err != nil { + logger.Logger.Error("Failed to UpdateOnboardingStatus", zap.Int64("user_id", status.UserID), zap.Error(err)) + return err + } + return nil +} + +func (r *onboardingRepository) GetUserOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error) { + var progress model.UserOnboardingProgress + err := r.db.Where("user_id = ? AND task_key = ?", userID, taskKey).First(&progress).Error + if err != nil { + return nil, err + } + return &progress, nil +} + +func (r *onboardingRepository) GetOrCreateOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error) { + var progress model.UserOnboardingProgress + now := gorm.NowFunc().Unix() + err := r.db.Where("user_id = ? AND task_key = ?", userID, taskKey).FirstOrCreate(&progress, &model.UserOnboardingProgress{ + UserID: userID, + TaskKey: taskKey, + Status: "pending", + CreatedAt: now, + UpdatedAt: now, + }).Error + if err != nil { + logger.Logger.Error("Failed to GetOrCreateOnboardingProgress", zap.Int64("user_id", userID), zap.String("task_key", taskKey), zap.Error(err)) + return nil, err + } + return &progress, nil +} + +func (r *onboardingRepository) ListActiveStageConfigs() ([]*model.OnboardingStageConfig, error) { + var configs []*model.OnboardingStageConfig + err := r.db.Where("is_active = ?", true).Order("sort_order ASC").Find(&configs).Error + if err != nil { + logger.Logger.Error("Failed to ListActiveStageConfigs", zap.Error(err)) + return nil, err + } + return configs, nil +} + +func (r *onboardingRepository) ListUserOnboardingProgressByUser(userID int64) ([]*model.UserOnboardingProgress, error) { + var progressList []*model.UserOnboardingProgress + err := r.db.Where("user_id = ?", userID).Find(&progressList).Error + if err != nil { + logger.Logger.Error("Failed to ListUserOnboardingProgressByUser", zap.Int64("user_id", userID), zap.Error(err)) + return nil, err + } + return progressList, nil +} + +func (r *onboardingRepository) GetStageConfig(stage int) (*model.OnboardingStageConfig, error) { + var config model.OnboardingStageConfig + err := r.db.Where("stage = ? AND is_active = ?", stage, true).First(&config).Error + if err != nil { + return nil, err + } + return &config, nil +} diff --git a/backend/services/taskService/repository/revenue_repo.go b/backend/services/taskService/repository/revenue_repo.go new file mode 100644 index 0000000..d58dad1 --- /dev/null +++ b/backend/services/taskService/repository/revenue_repo.go @@ -0,0 +1,102 @@ +package repository + +import ( + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/taskService/model" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type RevenueRepository interface { + CreateRevenueRecord(record *model.ExhibitionRevenueRecord) (*model.ExhibitionRevenueRecord, error) + GetRevenueRecord(id int64) (*model.ExhibitionRevenueRecord, error) + ListRevenueByUser(userID, starID int64, status string, page, pageSize int) ([]*model.ExhibitionRevenueRecord, int64, error) + ClaimRevenueRecord(id int64, userID int64) (bool, error) + UpdateRevenueStatus(id int64, status string) error + ListClaimableRevenue(limit int) ([]*model.ExhibitionRevenueRecord, error) +} + +type revenueRepository struct { + db *gorm.DB +} + +func NewRevenueRepository(db *gorm.DB) RevenueRepository { + return &revenueRepository{db: db} +} + +func (r *revenueRepository) CreateRevenueRecord(record *model.ExhibitionRevenueRecord) (*model.ExhibitionRevenueRecord, error) { + record.CreatedAt = gorm.NowFunc().Unix() + if err := r.db.Create(record).Error; err != nil { + logger.Logger.Error("Failed to CreateRevenueRecord", zap.Int64("user_id", record.UserID), zap.Error(err)) + return nil, err + } + return record, nil +} + +func (r *revenueRepository) GetRevenueRecord(id int64) (*model.ExhibitionRevenueRecord, error) { + var record model.ExhibitionRevenueRecord + if err := r.db.First(&record, id).Error; err != nil { + return nil, err + } + return &record, nil +} + +func (r *revenueRepository) ListRevenueByUser(userID, starID int64, status string, page, pageSize int) ([]*model.ExhibitionRevenueRecord, int64, error) { + var records []*model.ExhibitionRevenueRecord + var total int64 + + query := r.db.Model(&model.ExhibitionRevenueRecord{}).Where("user_id = ? AND star_id = ?", userID, starID) + if status != "" { + query = query.Where("status = ?", status) + } + + if err := query.Count(&total).Error; err != nil { + logger.Logger.Error("Failed to count revenue records", zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Error(err)) + return nil, 0, err + } + + offset := (page - 1) * pageSize + if err := query.Order("created_at DESC").Offset(offset).Limit(pageSize).Find(&records).Error; err != nil { + logger.Logger.Error("Failed to ListRevenueByUser", zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Error(err)) + return nil, 0, err + } + + return records, total, nil +} + +func (r *revenueRepository) ClaimRevenueRecord(id int64, userID int64) (bool, error) { + // 乐观锁:只有在 status='claimable' 时才更新 + now := gorm.NowFunc().Unix() + result := r.db.Model(&model.ExhibitionRevenueRecord{}). + Where("id = ? AND user_id = ? AND status = ?", id, userID, "claimable"). + Updates(map[string]interface{}{ + "status": "claimed", + "claimed_at": now, + }) + + if result.Error != nil { + logger.Logger.Error("Failed to ClaimRevenueRecord", zap.Int64("id", id), zap.Int64("user_id", userID), zap.Error(result.Error)) + return false, result.Error + } + + return result.RowsAffected > 0, nil +} + +func (r *revenueRepository) UpdateRevenueStatus(id int64, status string) error { + err := r.db.Model(&model.ExhibitionRevenueRecord{}).Where("id = ?", id).Update("status", status).Error + if err != nil { + logger.Logger.Error("Failed to UpdateRevenueStatus", zap.Int64("id", id), zap.String("status", status), zap.Error(err)) + return err + } + return nil +} + +func (r *revenueRepository) ListClaimableRevenue(limit int) ([]*model.ExhibitionRevenueRecord, error) { + var records []*model.ExhibitionRevenueRecord + err := r.db.Where("status = ?", "claimable").Limit(limit).Find(&records).Error + if err != nil { + logger.Logger.Error("Failed to ListClaimableRevenue", zap.Error(err)) + return nil, err + } + return records, nil +} diff --git a/backend/services/taskService/worker/daily_reset_worker.go b/backend/services/taskService/worker/daily_reset_worker.go new file mode 100644 index 0000000..0b81255 --- /dev/null +++ b/backend/services/taskService/worker/daily_reset_worker.go @@ -0,0 +1,149 @@ +package worker + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/taskService/client" + "github.com/topfans/backend/services/taskService/config" + "github.com/topfans/backend/services/taskService/repository" + "go.uber.org/zap" +) + +type DailyResetWorker struct { + dailyRepo repository.DailyTaskRepository + revenueRepo repository.RevenueRepository + userClient client.UserServiceClient + stopCh chan struct{} + wg sync.WaitGroup +} + +func NewDailyResetWorker( + dailyRepo repository.DailyTaskRepository, + revenueRepo repository.RevenueRepository, + userClient client.UserServiceClient, +) *DailyResetWorker { + return &DailyResetWorker{ + dailyRepo: dailyRepo, + revenueRepo: revenueRepo, + userClient: userClient, + stopCh: make(chan struct{}), + } +} + +func (w *DailyResetWorker) Start() { + w.wg.Add(1) + go w.runLoop() + logger.Logger.Info("DailyResetWorker started") +} + +func (w *DailyResetWorker) Stop() { + close(w.stopCh) + w.wg.Wait() + logger.Logger.Info("DailyResetWorker stopped") +} + +func (w *DailyResetWorker) runLoop() { + defer w.wg.Done() + + for { + // 计算下一个 05:00 Asia/Shanghai + now := time.Now() + loc, _ := time.LoadLocation("Asia/Shanghai") + next := time.Date(now.Year(), now.Month(), now.Day(), config.WorkerConfigData.ResetHour, config.WorkerConfigData.ResetMinute, 0, 0, loc) + if next.Before(now) { + next = next.Add(24 * time.Hour) + } + waitDuration := next.Sub(now) + + logger.Logger.Info(fmt.Sprintf("DailyResetWorker: next reset at %s (in %v)", next.Format(time.RFC3339), waitDuration)) + + select { + case <-time.After(waitDuration): + w.doResetAndAutoClaim() + case <-w.stopCh: + logger.Logger.Info("DailyResetWorker: received stop signal, exiting") + return + } + } +} + +func (w *DailyResetWorker) doResetAndAutoClaim() { + // 使用 Advisory Lock 防止多实例重复执行 + lockKey := time.Now().Format("20060102") + lockID, err := strconv.ParseInt(lockKey, 10, 64) + if err != nil { + logger.Logger.Error("DailyResetWorker: failed to parse lock key", zap.String("lock_key", lockKey), zap.Error(err)) + return + } + + acquired := w.dailyRepo.AcquireAdvisoryLock(lockID) + if !acquired { + logger.Logger.Info("DailyResetWorker: another instance is running daily reset, skipping") + return + } + defer func() { + if err := w.dailyRepo.ReleaseAdvisoryLock(lockID); err != nil { + logger.Logger.Error("DailyResetWorker: failed to release advisory lock", zap.Error(err)) + } + }() + + // 1. 重置每日任务 + resetCount, err := w.dailyRepo.ResetAllDailyTasks() + if err != nil { + logger.Logger.Error("DailyResetWorker: failed to reset daily tasks", zap.Error(err)) + } else { + logger.Logger.Info(fmt.Sprintf("DailyResetWorker: daily tasks reset: %d records updated", resetCount)) + } + + // 2. 自动发放展示收益 + w.autoClaimExhibitionRevenue() +} + +func (w *DailyResetWorker) autoClaimExhibitionRevenue() { + batchSize := config.WorkerConfigData.RevenueBatchSize + maxRetries := config.WorkerConfigData.RevenueMaxRetries + totalClaimed := 0 + + for { + records, err := w.revenueRepo.ListClaimableRevenue(batchSize) + if err != nil { + logger.Logger.Error("DailyResetWorker: failed to list claimable revenue", zap.Error(err)) + break + } + if len(records) == 0 { + break + } + + for _, record := range records { + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + _, err := w.userClient.UpdateCrystalBalance(context.Background(), record.UserID, record.StarID, record.CrystalAmount) + if err == nil { + if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "claimed"); err != nil { + logger.Logger.Error("DailyResetWorker: failed to update status to claimed", + zap.Int64("record_id", record.ID), zap.Error(err)) + } + totalClaimed++ + break + } + lastErr = err + time.Sleep(100 * time.Millisecond) + } + if lastErr != nil { + if err := w.revenueRepo.UpdateRevenueStatus(record.ID, "failed"); err != nil { + logger.Logger.Error("DailyResetWorker: failed to update status to failed", + zap.Int64("record_id", record.ID), zap.Error(err)) + } + logger.Logger.Error("DailyResetWorker: failed to auto-claim revenue after retries", + zap.Int64("record_id", record.ID), zap.Error(lastErr)) + } + } + } + + logger.Logger.Info(fmt.Sprintf("DailyResetWorker: auto-claim completed: %d records claimed", totalClaimed)) +} diff --git a/docs/superpowers/plans/2026-04-14-task-service-implementation.md b/docs/superpowers/plans/2026-04-14-task-service-implementation.md new file mode 100644 index 0000000..4fc8002 --- /dev/null +++ b/docs/superpowers/plans/2026-04-14-task-service-implementation.md @@ -0,0 +1,1178 @@ +# taskService 实现计划 + +> **面向智能体工程师:** 必需子技能:使用 `superpowers:subagent-driven-development`(推荐)或 `superpowers:executing-plans` 来逐任务实现本计划。步骤使用复选框(`- [ ]`)语法进行跟踪。 + +**目标:** 实现 taskService(Go Dubbo-go)后端,包含移动端 API 和移动端前端页面(每日任务、引导流程、展示收益)。 + +**架构:** +- taskService:Go Dubbo-go 服务,端口 20005,暴露 HTTP/Triple 移动端 API 和内部 RPC(TaskInternalService) +- 移动端前端:Vue/uni-app 页面,通过 gateway 调用 taskService HTTP API +- 共享 PostgreSQL 数据库存储任务数据 +- taskService 调用 userService RPC 发放水晶/经验奖励 + +**技术栈:** Go (dubbo-go v3)、GORM、PostgreSQL、Vue/uni-app + +--- + +## 文件结构 + +### 后端 (taskService) + +``` +backend/ +├── proto/ +│ └── task.proto # 新增:TaskMobileService + TaskInternalService 定义 +├── pkg/ +│ └── proto/ +│ └── task/ +│ ├── task.pb.go # 新增:proto 编译生成 +│ └── task.triple.go # 新增:proto 编译生成 +├── services/ +│ └── taskService/ # 新增 +│ ├── main.go # 新增:服务入口 +│ ├── config/ +│ │ └── task_config.go # 新增:配置 +│ ├── model/ +│ │ └── task_models.go # 新增:所有任务表对应的 GORM 模型 +│ ├── repository/ +│ │ ├── daily_task_repo.go # 新增 +│ │ ├── onboarding_repo.go # 新增 +│ │ └── revenue_repo.go # 新增 +│ ├── service/ +│ │ ├── daily_task_service.go # 新增 +│ │ ├── onboarding_service.go # 新增 +│ │ └── revenue_service.go # 新增 +│ ├── provider/ +│ │ ├── task_mobile_provider.go # 新增:移动端 API(HTTP/Triple) +│ │ └── task_internal_provider.go # 新增:内部 RPC(TaskInternalService) +│ ├── worker/ +│ │ └── daily_reset_worker.go # 新增:每日 05:00 重置 + 自动发放 +│ └── client/ +│ └── user_rpc_client.go # 新增:调用 userService 的 UpdateCrystalBalance、AddExperience +``` + +### 前端(移动端) + +``` +frontend/ +├── pages/ +│ └── tasks/ # 新增:任务页面目录 +│ ├── daily-tasks.vue # 新增:每日任务页面 +│ ├── guide.vue # 新增:引导任务页面 +│ └── revenue.vue # 新增:展示收益页面 +├── utils/ +│ └── task-api.js # 新增:taskService API 调用封装 +└── pages.json # 修改:新增页面路由 +``` + +### 数据库迁移 + +``` +backend/ +└── scripts/ + └── v001_init_task_tables.sql # 新增:7 张任务相关表的 DDL +``` + +--- + +## 关键前提:必须先在 user.proto 中添加 AddExperience + +设计文档(设计文档第 101 行)要求在 `UserSocialService` 上添加 `AddExperience` RPC,但当前 `backend/proto/user.proto` 中没有该 RPC。**taskService 编译依赖此 RPC,必须先完成。** + +**修改:`backend/proto/user.proto`** — 在 `UserSocialService` 中添加: + +```protobuf +// 更新经验值请求(内部RPC调用,用于taskService发放经验奖励) +message AddExperienceRequest { + int64 user_id = 1; + int64 star_id = 2; + int64 delta = 3; +} + +message AddExperienceResponse { + topfans.common.BaseResponse base = 1; + int64 new_experience = 2; +} + +// 在 service UserSocialService 中添加: +rpc AddExperience(AddExperienceRequest) returns (AddExperienceResponse); +``` + +编辑完 `user.proto` 后,运行:`cd backend && sh scripts/compile-proto.sh` + +--- + +## 阶段一:taskService 后端基础设施 + +### 任务 1:Proto 定义 + +**文件:** +- 新增:`backend/proto/task.proto` + +```protobuf +syntax = "proto3"; + +package topfans.task; + +option go_package = "github.com/topfans/backend/pkg/proto/task;task"; + +import "proto/common.proto"; +import "google/api/annotations.proto"; + +// ==================== 每日任务 ==================== + +message DailyTaskItem { + string task_key = 1; + int64 star_id = 2; + string name = 3; + string description = 4; + int64 crystal_reward = 5; + int64 exp_reward = 6; + string status = 7; // pending/completed/claimed + bool can_claim = 8; +} + +message GetDailyTasksRequest { + int64 star_id = 1; +} + +message GetDailyTasksResponse { + topfans.common.BaseResponse base = 1; + int64 star_id = 2; + repeated DailyTaskItem tasks = 3; +} + +message ReportEventRequest { + string event_type = 1; // 如 "daily_browse_asset", "daily_login" + int64 star_id = 2; +} + +message ReportEventResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; + string task_key = 3; + bool task_completed = 4; + string message = 5; +} + +message ClaimDailyTaskRequest { + string task_key = 1; + int64 star_id = 2; +} + +message ClaimDailyTaskResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +message ClaimAllDailyTasksRequest { + int64 star_id = 1; +} + +message ClaimAllDailyTasksResponse { + topfans.common.BaseResponse base = 1; + int32 claimed_count = 2; +} + +// ==================== 引导任务 ==================== + +message OnboardingStage { + int32 stage = 1; + string name = 2; + repeated string required_task_keys = 3; + int64 crystal_reward = 4; + int64 exp_reward = 5; + string status = 6; // pending/completed/in_progress + bool is_current = 7; +} + +message CompleteGuideRequest { + string task_key = 1; +} + +message CompleteGuideResponse { + topfans.common.BaseResponse base = 1; + int64 user_id = 2; + int32 current_stage = 3; + string status = 4; // pending/in_progress/completed + repeated OnboardingStage stages = 5; +} + +message GetOnboardingStatusRequest {} + +message GetOnboardingStatusResponse { + topfans.common.BaseResponse base = 1; + int64 user_id = 2; + int32 current_stage = 3; + string status = 4; + repeated OnboardingStage stages = 5; +} + +message AdvanceStageRequest { + int32 target_stage = 1; +} + +message AdvanceStageResponse { + topfans.common.BaseResponse base = 1; + int32 current_stage = 2; + string status = 3; + repeated OnboardingStage stages = 4; +} + +message ClaimOnboardingRewardRequest { + int32 stage = 1; +} + +message ClaimOnboardingRewardResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +// ==================== 展示收益 ==================== + +message ExhibitionRevenueItem { + int64 id = 1; + int64 star_id = 2; + int64 exhibition_id = 3; + int64 asset_id = 4; + int64 slot_id = 5; + string slot_type = 6; // own/friend + int64 crystal_amount = 7; + int64 cycle_start_time = 8; + int64 cycle_end_time = 9; + string status = 10; // claimable/claimed/failed + bool can_claim = 11; +} + +message GetExhibitionRevenueRequest { + int64 star_id = 1; + string status = 2; // 可选筛选 + int32 page = 3; + int32 page_size = 4; +} + +message GetExhibitionRevenueResponse { + topfans.common.BaseResponse base = 1; + repeated ExhibitionRevenueItem items = 2; + int64 total = 3; + int32 page = 4; + int32 page_size = 5; +} + +message ClaimExhibitionRevenueRequest { + int64 revenue_id = 1; + int64 star_id = 2; +} + +message ClaimExhibitionRevenueResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +message ClaimAllExhibitionRevenueRequest { + int64 star_id = 1; +} + +message ClaimAllExhibitionRevenueResponse { + topfans.common.BaseResponse base = 1; + int32 claimed_count = 2; +} + +// ==================== 内部RPC服务 ==================== + +message InitUserTasksRequest { + int64 user_id = 1; + int64 star_id = 2; +} + +message InitUserTasksResponse { + topfans.common.BaseResponse base = 1; + bool success = 2; +} + +message OnExhibitionCompletedRequest { + int64 exhibition_id = 1; + int64 asset_id = 2; + int64 slot_id = 3; + int64 occupier_uid = 4; + int64 occupier_star_id = 5; + int64 slot_owner_uid = 6; + int64 crystal_amount = 7; + int64 start_time = 8; + int64 expire_at = 9; +} + +message OnExhibitionCompletedResponse { + topfans.common.BaseResponse base = 1; + int64 revenue_record_id = 2; +} + +// ==================== Mobile API Service ==================== + +service TaskMobileService { + rpc GetDailyTasks(GetDailyTasksRequest) returns (GetDailyTasksResponse) { + option (google.api.http) = { get: "/api/tasks/daily"; }; + } + rpc ReportEvent(ReportEventRequest) returns (ReportEventResponse) { + option (google.api.http) = { post: "/api/tasks/report-event"; body: "*"; }; + } + rpc ClaimDailyTask(ClaimDailyTaskRequest) returns (ClaimDailyTaskResponse) { + option (google.api.http) = { post: "/api/tasks/daily/claim"; body: "*"; }; + } + rpc ClaimAllDailyTasks(ClaimAllDailyTasksRequest) returns (ClaimAllDailyTasksResponse) { + option (google.api.http) = { post: "/api/tasks/daily/claim-all"; body: "*"; }; + } + rpc CompleteGuide(CompleteGuideRequest) returns (CompleteGuideResponse) { + option (google.api.http) = { post: "/api/tasks/guide/complete"; body: "*"; }; + } + rpc GetOnboardingStatus(GetOnboardingStatusRequest) returns (GetOnboardingStatusResponse) { + option (google.api.http) = { get: "/api/tasks/onboarding/status"; }; + } + rpc AdvanceStage(AdvanceStageRequest) returns (AdvanceStageResponse) { + option (google.api.http) = { post: "/api/tasks/onboarding/advance-stage"; body: "*"; }; + } + rpc ClaimOnboardingReward(ClaimOnboardingRewardRequest) returns (ClaimOnboardingRewardResponse) { + option (google.api.http) = { post: "/api/tasks/onboarding/claim-reward"; body: "*"; }; + } + rpc GetExhibitionRevenue(GetExhibitionRevenueRequest) returns (GetExhibitionRevenueResponse) { + option (google.api.http) = { get: "/api/tasks/exhibition-revenue"; }; + } + rpc ClaimExhibitionRevenue(ClaimExhibitionRevenueRequest) returns (ClaimExhibitionRevenueResponse) { + option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim"; body: "*"; }; + } + rpc ClaimAllExhibitionRevenue(ClaimAllExhibitionRevenueRequest) returns (ClaimAllExhibitionRevenueResponse) { + option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim-all"; body: "*"; }; + } +} + +// ==================== Internal RPC Service ==================== + +service TaskInternalService { + rpc InitUserTasks(InitUserTasksRequest) returns (InitUserTasksResponse); + rpc OnExhibitionCompleted(OnExhibitionCompletedRequest) returns (OnExhibitionCompletedResponse); +} +``` + +### 任务 2:数据库模型 + +**文件:** +- 新增:`backend/services/taskService/model/task_models.go` + +```go +package model + +// TaskDefinition 任务定义表 +type TaskDefinition struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + StarID *int64 `gorm:"column:star_id;index"` // NULL=全局默认 + TaskKey string `gorm:"column:task_key;size:50;not null"` + TaskType string `gorm:"column:task_type;size:20;not null"` // daily/onboarding + Name string `gorm:"column:name;size:100;not null"` + Description string `gorm:"column:description;type:text"` + CrystalReward int64 `gorm:"column:crystal_reward;default:0"` + ExpReward int64 `gorm:"column:exp_reward;default:0"` + SortOrder int `gorm:"column:sort_order;default:0"` + IsActive bool `gorm:"column:is_active;default:true"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (TaskDefinition) TableName() string { return "task_definitions" } + +// UserDailyTaskProgress 每日任务进度表 +type UserDailyTaskProgress struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + UserID int64 `gorm:"column:user_id;not null;index:idx_daily_user_star_key"` + StarID int64 `gorm:"column:star_id;not null;index:idx_daily_user_star_key"` + TaskKey string `gorm:"column:task_key;size:50;not null;index:idx_daily_user_star_key"` + Status string `gorm:"column:status;size:20;default:pending"` // pending/completed/claimed + CompletedAt *int64 `gorm:"column:completed_at"` + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (UserDailyTaskProgress) TableName() string { return "user_daily_task_progress" } + +// UserOnboardingProgress 引导任务进度表 +type UserOnboardingProgress struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + UserID int64 `gorm:"column:user_id;not null;index:idx_onboard_user_key"` + TaskKey string `gorm:"column:task_key;size:50;not null;index:idx_onboard_user_key"` + Status string `gorm:"column:status;size:20;default:pending"` + CompletedAt *int64 `gorm:"column:completed_at"` + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (UserOnboardingProgress) TableName() string { return "user_onboarding_progress" } + +// UserOnboardingStatus 引导流程状态表(per-user,非 per-star) +type UserOnboardingStatus struct { + UserID int64 `gorm:"primaryKey;column:user_id"` + CurrentStage int `gorm:"column:current_stage;default:0"` + Status string `gorm:"column:status;size:20;default:pending"` + IsFirstLoginBonusClaimed bool `gorm:"column:is_first_login_bonus_claimed;default:false"` // 废弃字段 + HasFriendDisplayBonus bool `gorm:"column:has_friend_display_bonus;default:false"` // 废弃字段 + CompletedAt *int64 `gorm:"column:completed_at"` + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (UserOnboardingStatus) TableName() string { return "user_onboarding_status" } + +// OnboardingStageConfig 引导阶段配置表 +type OnboardingStageConfig struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + Stage int `gorm:"column:stage;not null;uniqueIndex"` + Name string `gorm:"column:name;size:100;not null"` + Description string `gorm:"column:description;type:text"` + RequiredTaskKeys []string `gorm:"column:required_task_keys;text[]"` // PostgreSQL 数组 + CrystalReward int64 `gorm:"column:crystal_reward;default:0"` + ExpReward int64 `gorm:"column:exp_reward;default:0"` + SortOrder int `gorm:"column:sort_order;default:0"` + IsActive bool `gorm:"column:is_active;default:true"` + CreatedAt int64 `gorm:"column:created_at"` + UpdatedAt int64 `gorm:"column:updated_at"` +} + +func (OnboardingStageConfig) TableName() string { return "onboarding_stage_config" } + +// ExhibitionRevenueRecord 展示收益记录表 +type ExhibitionRevenueRecord struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + UserID int64 `gorm:"column:user_id;not null;index:idx_revenue_user_star"` + StarID int64 `gorm:"column:star_id;not null;index:idx_revenue_user_star"` + ExhibitionID int64 `gorm:"column:exhibition_id;not null"` + AssetID int64 `gorm:"column:asset_id;not null"` + SlotID int64 `gorm:"column:slot_id;not null"` + SlotOwnerUID int64 `gorm:"column:slot_owner_uid;not null"` + SlotType string `gorm:"column:slot_type;size:20;not null"` // own/friend + CrystalAmount int64 `gorm:"column:crystal_amount;not null"` + CycleStartTime int64 `gorm:"column:cycle_start_time;not null"` + CycleEndTime int64 `gorm:"column:cycle_end_time;not null"` + Status string `gorm:"column:status;size:20;default:claimable"` // claimable/claimed/failed + ClaimedAt *int64 `gorm:"column:claimed_at"` + CreatedAt int64 `gorm:"column:created_at"` +} + +func (ExhibitionRevenueRecord) TableName() string { return "exhibition_revenue_records" } + +// TaskResetLog 重置日志表 +type TaskResetLog struct { + ID int64 `gorm:"primaryKey;column:id;autoIncrement"` + ResetType string `gorm:"column:reset_type;size:20;not null"` // daily + LastResetAt int64 `gorm:"column:last_reset_at;not null"` + CreatedAt int64 `gorm:"column:created_at"` +} + +func (TaskResetLog) TableName() string { return "task_reset_log" } +``` + +### 任务 3:配置文件 + +**文件:** +- 新增:`backend/services/taskService/config/task_config.go` + +```go +package config + +import ( + "flag" + "log" + "os" + "strconv" +) + +type DatabaseConfig struct { + Host, Password, DBName, SSLMode, TimeZone string + Port int + User string +} + +type ServiceURLs struct { + UserService string +} + +type WorkerConfig struct { + ResetHour, ResetMinute int + RevenueBatchSize int + RevenueMaxRetries int +} + +var ( + DBConfig = &DatabaseConfig{} + ServiceURLsConfig = &ServiceURLs{UserService: "tri://localhost:20000"} + WorkerConfigData = &WorkerConfig{ + ResetHour: 5, ResetMinute: 0, + RevenueBatchSize: 100, RevenueMaxRetries: 3, + } +) + +func getEnv(key, fallback string) string { + if v := os.Getenv(key); v != "" { return v } + return fallback +} + +func getEnvInt(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if n, err := strconv.Atoi(v); err == nil { return n } + } + return fallback +} + +func InitConfig() { + flag.StringVar(&DBConfig.Host, "db-host", getEnv("DB_HOST", "localhost"), "数据库主机") + flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("DB_PORT", 5432), "数据库端口") + flag.StringVar(&DBConfig.User, "db-user", getEnv("DB_USER", "postgres"), "数据库用户名") + flag.StringVar(&DBConfig.Password, "db-password", getEnv("DB_PASSWORD", ""), "数据库密码") + flag.StringVar(&DBConfig.DBName, "db-name", getEnv("DB_NAME", "topfans"), "数据库名称") + flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "数据库 SSL 模式") + flag.StringVar(&ServiceURLsConfig.UserService, "user-service-url", getEnv("USER_SERVICE_URL", "tri://localhost:20000"), "User Service RPC 地址") + flag.IntVar(&WorkerConfigData.ResetHour, "reset-hour", getEnvInt("RESET_HOUR", 5), "每日重置小时(Asia/Shanghai)") + flag.IntVar(&WorkerConfigData.ResetMinute, "reset-minute", getEnvInt("RESET_MINUTE", 0), "每日重置分钟") + flag.IntVar(&WorkerConfigData.RevenueBatchSize, "revenue-batch-size", getEnvInt("REVENUE_BATCH_SIZE", 100), "收益自动发放批次大小") + flag.IntVar(&WorkerConfigData.RevenueMaxRetries, "revenue-max-retries", getEnvInt("REVENUE_MAX_RETRIES", 3), "收益自动发放最大重试次数") + flag.Parse() + log.Println("taskService 配置初始化完成") + log.Printf(" 数据库: %s:%d/%s", DBConfig.Host, DBConfig.Port, DBConfig.DBName) + log.Printf(" User Service: %s", ServiceURLsConfig.UserService) + log.Printf(" 重置时间: %02d:%02d Asia/Shanghai", WorkerConfigData.ResetHour, WorkerConfigData.ResetMinute) +} +``` + +### 任务 4:userService RPC 客户端 + +**文件:** +- 新增:`backend/services/taskService/client/user_rpc_client.go` + +模式参考 `assetService/client/user_rpc_client.go`。客户端封装体接收的是服务接口(而非 `*client.Client`)。服务接口创建(在 `main.go` 中调用 `pbUser.NewUserSocialService`)在 main.go 中完成。 + +```go +package client + +import ( + "context" + "github.com/topfans/backend/pkg/logger" + pbUser "github.com/topfans/backend/pkg/proto/user" + "go.uber.org/zap" +) + +type UserServiceClient interface { + UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error) + AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error) +} + +type userServiceClient struct { + client pbUser.UserSocialService +} + +func NewUserServiceClient(client pbUser.UserSocialService) UserServiceClient { + return &userServiceClient{client: client} +} + +func (c *userServiceClient) UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error) { + logger.Logger.Debug("Calling UserService.UpdateCrystalBalance", + zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta)) + resp, err := c.client.UpdateCrystalBalance(ctx, &pbUser.UpdateCrystalBalanceRequest{ + UserId: userID, StarId: starID, Delta: delta, + }) + if err != nil { + logger.Logger.Error("UserService.UpdateCrystalBalance failed", zap.Error(err)) + return 0, err + } + if resp.Base.Code != 0 { + logger.Logger.Warn("UpdateCrystalBalance non-zero code", zap.Int32("code", int32(resp.Base.Code))) + return 0, err + } + return resp.NewBalance, nil +} + +func (c *userServiceClient) AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error) { + logger.Logger.Debug("Calling UserService.AddExperience", + zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta)) + resp, err := c.client.AddExperience(ctx, &pbUser.AddExperienceRequest{ + UserId: userID, StarId: starID, Delta: delta, + }) + if err != nil { + logger.Logger.Error("UserService.AddExperience failed", zap.Error(err)) + return 0, err + } + if resp.Base.Code != 0 { + logger.Logger.Warn("AddExperience non-zero code", zap.Int32("code", int32(resp.Base.Code))) + return 0, err + } + return resp.NewExperience, nil +} +``` + +--- + +## 阶段二:Repository 层 + +### 任务 5:每日任务 Repository + +**文件:** +- 新增:`backend/services/taskService/repository/daily_task_repo.go` + +主要方法: +- `GetUserDailyProgress(userID, starID, taskKey) (*model.UserDailyTaskProgress, error)` +- `GetOrCreateDailyProgress(userID, starID, taskKey, def *model.TaskDefinition) (*model.UserDailyTaskProgress, error)` +- `ListDailyTasksByUser(userID, starID) ([]*model.UserDailyTaskProgress, error)` +- `ListActiveDailyTaskDefinitions(starID) ([]*model.TaskDefinition, error)` — 返回 star_id 相关 + 全局默认 +- `UpdateDailyProgress(progress *model.UserDailyTaskProgress) error` +- `ResetAllDailyTasks() (int64, error)` — 重置所有非 pending 状态为 pending +- `InitDailyTasksForUser(userID, starID) error` — InitUserTasks RPC 调用,为该用户创建该 star_id 下所有每日任务进度 +- `AcquireAdvisoryLock(lockID int64) bool` — `SELECT pg_try_advisory_lock($1)`,返回是否获取成功 +- `ReleaseAdvisoryLock(lockID int64)` — `SELECT pg_advisory_unlock($1)` + +### 任务 6:引导任务 Repository + +**文件:** +- 新增:`backend/services/taskService/repository/onboarding_repo.go` + +主要方法: +- `GetOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error)` +- `GetOrCreateOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error)` — 获取或新建(current_stage=0, status=pending) +- `UpdateOnboardingStatus(status *model.UserOnboardingStatus) error` +- `GetUserOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error)` +- `GetOrCreateOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error)` +- `ListActiveStageConfigs() ([]*model.OnboardingStageConfig, error)` +- `ListUserOnboardingProgressByUser(userID int64) ([]*model.UserOnboardingProgress, error)` +- `GetStageConfig(stage int) (*model.OnboardingStageConfig, error)` + +### 任务 7:收益 Repository + +**文件:** +- 新增:`backend/services/taskService/repository/revenue_repo.go` + +主要方法: +- `CreateRevenueRecord(record *model.ExhibitionRevenueRecord) (*model.ExhibitionRevenueRecord, error)` +- `GetRevenueRecord(id int64) (*model.ExhibitionRevenueRecord, error)` +- `ListRevenueByUser(userID, starID int64, status string, page, pageSize int) ([]*model.ExhibitionRevenueRecord, int64, error)` +- `ClaimRevenueRecord(id int64, userID int64) (bool, error)` — 乐观锁:仅在 status='claimable' 时更新,返回是否更新成功 +- `UpdateRevenueStatus(id int64, status string) error` — Worker 自动发放后调用(claimed/failed) +- `ListClaimableRevenue(limit int) ([]*model.ExhibitionRevenueRecord, error)` — Worker 自动发放用,`LIMIT batchSize` + +--- + +## 阶段三:Service 层 + +### 任务 8:每日任务 Service + +**文件:** +- 新增:`backend/services/taskService/service/daily_task_service.go` + +主要方法: +- `GetDailyTasks(ctx, userID, starID) (*pb.GetDailyTasksResponse, error)` — 合并任务定义与用户进度,can_claim=(status==completed) +- `ReportEvent(ctx, userID, starID, eventType) (*pb.ReportEventResponse, error)` — eventType 映射到 task_key,标记任务完成 +- `ClaimDailyTask(ctx, userID, starID, taskKey) (*pb.ClaimDailyTaskResponse, error)` — 验证 completed,调用水晶和经验奖励,标记 claimed +- `ClaimAllDailyTasks(ctx, userID, starID) (*pb.ClaimAllDailyTasksResponse, error)` — 查找所有已完成任务,批量领取 + +### 任务 9:引导任务 Service + +**文件:** +- 新增:`backend/services/taskService/service/onboarding_service.go` + +主要方法: +- `GetOnboardingStatus(ctx, userID) (*pb.GetOnboardingStatusResponse, error)` +- `CompleteGuide(ctx, userID, taskKey) (*pb.CompleteGuideResponse, error)` — 标记引导进度完成(不是每日任务),返回完整阶段列表 +- `AdvanceStage(ctx, userID, targetStage) (*pb.AdvanceStageResponse, error)` — 验证当前阶段所有任务已完成,再推进阶段 +- `ClaimOnboardingReward(ctx, userID, stage) (*pb.ClaimOnboardingRewardResponse, error)` — 验证阶段已完成,发放奖励,标记 claimed +- `InitUserTasks(ctx, userID, starID) error` — TaskInternalService.InitUserTasks RPC 调用:创建 user_onboarding_status(若不存在)+ 调用 dailyRepo.InitDailyTasksForUser + +### 任务 10:收益 Service + +**文件:** +- 新增:`backend/services/taskService/service/revenue_service.go` + +主要方法: +- `GetExhibitionRevenue(ctx, userID, starID, status string, page, pageSize) (*pb.GetExhibitionRevenueResponse, error)` +- `ClaimExhibitionRevenue(ctx, userID, starID, revenueID) (*pb.ClaimExhibitionRevenueResponse, error)` — 使用 repo.ClaimRevenueRecord 乐观锁 +- `ClaimAllExhibitionRevenue(ctx, userID, starID) (*pb.ClaimAllExhibitionRevenueResponse, error)` — 查找所有可领取收益,批量领取 +- `OnExhibitionCompleted(ctx, req) (*pb.OnExhibitionCompletedResponse, error)` — **关键:仅在 slot_owner_uid != occupier_uid 时创建收益记录**(自己展位不产生收益),status='claimable' + +--- + +## 阶段四:Provider 层(API 处理器) + +### 任务 11:移动端 API Provider + +**文件:** +- 新增:`backend/services/taskService/provider/task_mobile_provider.go` + +实现 `TaskMobileService` 接口(proto 生成)。通过 Dubbo context attachments 提取 user_id(gateway 认证中间件设置)。参考 `galleryService/provider/gallery_provider.go`。 + +辅助函数(定义在文件顶部): +```go +func getUserIDFromCtx(ctx context.Context) int64 { + // Dubbo 通过 context attachments 从 gateway 认证中间件传递 user_id + // Triple 协议通过 rpcCtx 或 attachments 获取 + if attachments := ctx.Value("attachments").(map[string]string); attachments != nil { + if uid := attachments["user_id"]; uid != "" { + if id, _ := strconv.ParseInt(uid, 10, 64); id > 0 { return id } + } + } + return 0 +} +``` + +每个 handler:提取 userID → 调用 service 方法 → 返回响应。 + +### 任务 12:内部 RPC Provider + +**文件:** +- 新增:`backend/services/taskService/provider/task_internal_provider.go` + +实现 `TaskInternalService`: +- `InitUserTasks(ctx, req) (*pb.InitUserTasksResponse, error)` — 提取 `req.UserId` 和 `req.StarId`,调用 `onboardingService.InitUserTasks(ctx, userID, starID)` +- `OnExhibitionCompleted(ctx, req) (*pb.OnExhibitionCompletedResponse, error)` — 调用 `revenueService.OnExhibitionCompleted` + +内部 RPC 不需要认证中间件(由其他服务调用)。 + +--- + +## 阶段五:Worker + +### 任务 13:每日重置 Worker + +**文件:** +- 新增:`backend/services/taskService/worker/daily_reset_worker.go` + +```go +package worker + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/taskService/client" + "github.com/topfans/backend/services/taskService/config" + "github.com/topfans/backend/services/taskService/repository" + "go.uber.org/zap" +) + +type DailyResetWorker struct { + dailyRepo *repository.DailyTaskRepository + revenueRepo *repository.RevenueRepository + userClient client.UserServiceClient + stopCh chan struct{} + wg sync.WaitGroup +} + +func NewDailyResetWorker(dailyRepo *repository.DailyTaskRepository, revenueRepo *repository.RevenueRepository, userClient client.UserServiceClient) *DailyResetWorker { + return &DailyResetWorker{ + dailyRepo: dailyRepo, + revenueRepo: revenueRepo, + userClient: userClient, + stopCh: make(chan struct{}), + } +} + +func (w *DailyResetWorker) Start() { + w.wg.Add(1) + go w.runLoop() + logger.Logger.Info("DailyResetWorker started") +} + +func (w *DailyResetWorker) Stop() { + close(w.stopCh) + w.wg.Wait() + logger.Logger.Info("DailyResetWorker stopped") +} + +func (w *DailyResetWorker) runLoop() { + defer w.wg.Done() + for { + now := time.Now() + loc, _ := time.LoadLocation("Asia/Shanghai") + next := time.Date(now.Year(), now.Month(), now.Day(), config.WorkerConfigData.ResetHour, config.WorkerConfigData.ResetMinute, 0, 0, loc) + if next.Before(now) { + next = next.Add(24 * time.Hour) + } + waitDuration := next.Sub(now) + logger.Logger.Info(fmt.Sprintf("Next daily reset at %s (in %v)", next.Format(time.RFC3339), waitDuration)) + + select { + case <-time.After(waitDuration): + w.doResetAndAutoClaim() + case <-w.stopCh: + return + } + } +} + +func (w *DailyResetWorker) doResetAndAutoClaim() { + lockKey := time.Now().Format("20060102") + lockID, _ := strconv.ParseInt(lockKey, 10, 64) + acquired := w.dailyRepo.AcquireAdvisoryLock(lockID) + if !acquired { + logger.Logger.Info("Another instance is running daily reset, skipping") + return + } + defer w.dailyRepo.ReleaseAdvisoryLock(lockID) + + // 1. 重置每日任务 + resetCount, err := w.dailyRepo.ResetAllDailyTasks() + if err != nil { + logger.Logger.Error("Failed to reset daily tasks", zap.Error(err)) + } else { + logger.Logger.Info(fmt.Sprintf("Daily tasks reset: %d records updated", resetCount)) + } + + // 2. 自动发放展示收益 + w.autoClaimExhibitionRevenue() +} + +func (w *DailyResetWorker) autoClaimExhibitionRevenue() { + batchSize := config.WorkerConfigData.RevenueBatchSize + maxRetries := config.WorkerConfigData.RevenueMaxRetries + totalClaimed := 0 + + for { + records, err := w.revenueRepo.ListClaimableRevenue(batchSize) + if err != nil { + logger.Logger.Error("Failed to list claimable revenue", zap.Error(err)) + break + } + if len(records) == 0 { + break + } + + for _, record := range records { + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + _, err := w.userClient.UpdateCrystalBalance(context.Background(), record.UserID, record.StarID, record.CrystalAmount) + if err == nil { + w.revenueRepo.UpdateRevenueStatus(record.ID, "claimed") + totalClaimed++ + break + } + lastErr = err + time.Sleep(100 * time.Millisecond) + } + if lastErr != nil { + w.revenueRepo.UpdateRevenueStatus(record.ID, "failed") + logger.Logger.Error("Failed to auto-claim revenue after retries", + zap.Int64("record_id", record.ID), zap.Error(lastErr)) + } + } + } + logger.Logger.Info(fmt.Sprintf("Auto-claim completed: %d records claimed", totalClaimed)) +} +``` + +--- + +## 阶段六:main.go + +### 任务 14:服务入口 + +**文件:** +- 新增:`backend/services/taskService/main.go` + +```go +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + dubboclient "dubbo.apache.org/dubbo-go/v3/client" + _ "dubbo.apache.org/dubbo-go/v3/imports" + "dubbo.apache.org/dubbo-go/v3/protocol" + "dubbo.apache.org/dubbo-go/v3/server" + + "github.com/topfans/backend/pkg/database" + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/pkg/proto/task" + pbUser "github.com/topfans/backend/pkg/proto/user" + "github.com/topfans/backend/services/taskService/client" + "github.com/topfans/backend/services/taskService/config" + "github.com/topfans/backend/services/taskService/model" + "github.com/topfans/backend/services/taskService/provider" + "github.com/topfans/backend/services/taskService/repository" + "github.com/topfans/backend/services/taskService/service" + "github.com/topfans/backend/services/taskService/worker" +) + +var port = flag.Int("port", 20005, "Dubbo service port") + +func main() { + flag.Parse() + + // 1. Init logger(必须最前) + env := os.Getenv("ENV") + if env == "" { env = "development" } + if err := logger.Init(logger.Config{ServiceName: "task-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil { + panic(fmt.Sprintf("Failed to init logger: %v", err)) + } + defer logger.Sync() + logger.Logger.Info("Starting taskService...") + + // 2. Init config(读取 flags/env) + config.InitConfig() + + // 3. Init database + auto-migrate + dbConfig := database.Config{ + Host: config.DBConfig.Host, Port: config.DBConfig.Port, + User: config.DBConfig.User, Password: config.DBConfig.Password, + DBName: config.DBConfig.DBName, SSLMode: config.DBConfig.SSLMode, + TimeZone: "Asia/Shanghai", + } + if err := database.Init(dbConfig); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to init DB: %v", err)) + } + db := database.GetDB() + if err := db.AutoMigrate( + &model.TaskDefinition{}, + &model.UserDailyTaskProgress{}, + &model.UserOnboardingProgress{}, + &model.UserOnboardingStatus{}, + &model.OnboardingStageConfig{}, + &model.ExhibitionRevenueRecord{}, + &model.TaskResetLog{}, + ); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to migrate tables: %v", err)) + } + logger.Logger.Info("Database initialized") + + // 4. Init repositories + dailyRepo := repository.NewDailyTaskRepository(db) + onboardingRepo := repository.NewOnboardingRepository(db) + revenueRepo := repository.NewRevenueRepository(db) + logger.Logger.Info("Repositories initialized") + + // 5. Init userService Dubbo client + RPC client + userCli, err := dubboclient.NewClient(dubboclient.WithClientURL(config.ServiceURLsConfig.UserService)) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to create userService client: %v", err)) + } + userServiceClient, err := pbUser.NewUserSocialService(userCli) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to get userService: %v", err)) + } + userRPCClient := client.NewUserServiceClient(userServiceClient) + logger.Logger.Info("User RPC client initialized") + + // 6. Init services + dailySvc := service.NewDailyTaskService(dailyRepo, userRPCClient) + onboardingSvc := service.NewOnboardingService(onboardingRepo, dailyRepo, userRPCClient) + revenueSvc := service.NewRevenueService(revenueRepo, userRPCClient) + logger.Logger.Info("Services initialized") + + // 7. Init worker(goroutine 中启动) + resetWorker := worker.NewDailyResetWorker(dailyRepo, revenueRepo, userRPCClient) + go resetWorker.Start() + logger.Logger.Info("Reset worker started") + + // 8. Init providers + mobileProvider := provider.NewTaskMobileProvider(dailySvc, onboardingSvc, revenueSvc) + internalProvider := provider.NewTaskInternalProvider(onboardingSvc, revenueSvc) + logger.Logger.Info("Providers initialized") + + // 9. Create Dubbo server on port 20005 + srv, err := server.NewServer( + server.WithServerProtocol(protocol.WithPort(*port), protocol.WithTriple()), + ) + if err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to create server: %v", err)) + } + if err := task.RegisterTaskMobileServiceHandler(srv, mobileProvider); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to register TaskMobileService: %v", err)) + } + if err := task.RegisterTaskInternalServiceHandler(srv, internalProvider); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to register TaskInternalService: %v", err)) + } + if err := srv.Serve(); err != nil { + logger.Logger.Fatal(fmt.Sprintf("Failed to serve: %v", err)) + } + logger.Logger.Info(fmt.Sprintf("taskService started on port %d", *port)) + + // 10. Graceful shutdown + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + logger.Logger.Info("Shutting down taskService...") + resetWorker.Stop() +} +``` + +--- + +## 阶段七:Proto 编译 + +### 任务 15:Proto 编译 + +**步骤 1:** 修改 `backend/scripts/compile-proto.sh`: +- 在循环(第 56 行左右)中添加 `task` +- 在清理块(第 159 行左右)中添加 `task` +- 在 activity.proto 编译块之后添加 task.proto 编译块 + +mkdir 循环中: +```bash +for name in common user social asset gallery ranking activity task; do +``` + +mkdir 创建目录: +```bash +mkdir -p pkg/proto/task +``` + +activity.proto 编译块之后添加: +```bash +# 编译 task.proto +echo "📦 编译 task.proto ..." +protoc --proto_path=proto \ + --proto_path=. \ + --go_out=pkg/proto/task \ + --go_opt=paths=source_relative \ + --go-triple_out=pkg/proto/task \ + --go-triple_opt=paths=source_relative \ + task.proto +echo "✅ task.proto 编译完成" +``` + +清理循环中也需要添加 `task`。 + +**步骤 2:** 运行:`cd backend && sh scripts/compile-proto.sh` + +--- + +## 阶段九:移动端前端 + +### 任务 16:任务 API 工具函数 + +**文件:** +- 新增:`frontend/utils/task-api.js` + +复用 `frontend/utils/api.js` 中的 `request` 封装: + +```javascript +import { request } from './api.js' + +// baseURL 已配置在 api.js 中,指向 gateway(端口 8080) +// 所有 task API 通过 gateway 路由到 taskService(Triple/HTTP) + +export const getDailyTasks = (starId) => + request({ url: '/api/tasks/daily', data: { star_id: starId } }) + +export const reportEvent = (eventType, starId) => + request({ url: '/api/tasks/report-event', method: 'POST', data: { event_type: eventType, star_id: starId } }) + +export const claimDailyTask = (taskKey, starId) => + request({ url: '/api/tasks/daily/claim', method: 'POST', data: { task_key: taskKey, star_id: starId } }) + +export const claimAllDailyTasks = (starId) => + request({ url: '/api/tasks/daily/claim-all', method: 'POST', data: { star_id: starId } }) + +export const completeGuide = (taskKey) => + request({ url: '/api/tasks/guide/complete', method: 'POST', data: { task_key: taskKey } }) + +export const getOnboardingStatus = () => + request({ url: '/api/tasks/onboarding/status' }) + +export const advanceStage = (targetStage) => + request({ url: '/api/tasks/onboarding/advance-stage', method: 'POST', data: { target_stage: targetStage } }) + +export const claimOnboardingReward = (stage) => + request({ url: '/api/tasks/onboarding/claim-reward', method: 'POST', data: { stage } }) + +export const getExhibitionRevenue = (starId, status, page, pageSize) => + request({ url: '/api/tasks/exhibition-revenue', data: { star_id: starId, status, page, page_size: pageSize } }) + +export const claimExhibitionRevenue = (revenueId, starId) => + request({ url: '/api/tasks/exhibition-revenue/claim', method: 'POST', data: { revenue_id: revenueId, star_id: starId } }) + +export const claimAllExhibitionRevenue = (starId) => + request({ url: '/api/tasks/exhibition-revenue/claim-all', method: 'POST', data: { star_id: starId } }) +``` + +### 任务 17:每日任务页面 + +**文件:** +- 新增:`frontend/pages/tasks/daily-tasks.vue` + +功能: +- 通过 `uni.getStorageSync('fan_profile')` 获取当前 star_id +- 挂载时调用 `getDailyTasks(starId)` +- 展示任务列表:名称、描述、水晶奖励、经验奖励、状态标签(pending=蓝色, completed=绿色可领取, claimed=灰色) +- "领取"按钮仅在 `can_claim=true` 时可点击 +- 底部有"一键领取"按钮(有待领取任务时显示) +- 调用 `claimDailyTask` / `claimAllDailyTasks`,带加载状态 +- 支持下拉刷新 `uni.startPullDownRefresh` +- 任务列表为空时显示空状态 + +### 任务 18:引导任务页面 + +**文件:** +- 新增:`frontend/pages/tasks/guide.vue` + +功能: +- 挂载时调用 `getOnboardingStatus()` +- 展示阶段列表:当前阶段高亮 +- 每阶段显示:名称、所需任务 keys(含已完成/未完成标记) +- "进入下一阶段"按钮 — 仅当当前阶段所有任务完成时才可点击,调用 `advanceStage(targetStage)` +- 当前阶段"领取奖励"按钮 — 调用 `claimOnboardingReward(currentStage)` +- 用户完成每个引导步骤后(如进入 square_home)调用 `completeGuide` + +### 任务 19:收益页面 + +**文件:** +- 新增:`frontend/pages/tasks/revenue.vue` + +功能: +- Tab 切换:"可领取" / "已领取",使用 `uni segmented-control` 或 Tab 样式按钮 +- 默认 Tab = "可领取",显示 `can_claim=true` 的项目 +- 每条记录:展品缩略图(无可用则占位)、展位类型标签("自己的展位"/"好友展位")、水晶数量、周期时间范围 +- 单条"领取"按钮和"一键领取"按钮 +- 分页加载 `uni.loadMore` +- 下拉刷新 + +### 任务 20:页面路由 + +**文件:** +- 修改:`frontend/pages.json` + +在 `pages` 数组中添加: + +```json +{ + "path": "pages/tasks/daily-tasks", + "style": { "navigationStyle": "custom" } +}, +{ + "path": "pages/tasks/guide", + "style": { "navigationStyle": "custom" } +}, +{ + "path": "pages/tasks/revenue", + "style": { "navigationStyle": "custom" } +} +``` + +--- + +## 阶段十:集成与测试 + +### 任务 21:手动 API 测试 + +通过 gateway 测试(假设 taskService 已在 Dubbo 注册,或通过 gateway HTTP 路由到 20005): + +1. `GET /api/tasks/daily?star_id=1` → 返回任务列表 +2. `POST /api/tasks/report-event {event_type:"daily_login",star_id:1}` → task_completed=true +3. `GET /api/tasks/onboarding/status` → 返回 current_stage:0 +4. `POST /api/tasks/guide/complete {task_key:"square_home"}` → 创建进度,返回阶段信息 +5. `GET /api/tasks/exhibition-revenue?star_id=1&page=1&page_size=20` → 返回收益列表 +6. `POST /api/tasks/exhibition-revenue/claim {revenue_id:1,star_id:1}` → 领取单个 +7. `POST /api/tasks/exhibition-revenue/claim-all {star_id:1}` → 一键领取 + diff --git a/docs/superpowers/plans/2026-04-14-task-service-remaining-issues.md b/docs/superpowers/plans/2026-04-14-task-service-remaining-issues.md new file mode 100644 index 0000000..e8cc7bf --- /dev/null +++ b/docs/superpowers/plans/2026-04-14-task-service-remaining-issues.md @@ -0,0 +1,202 @@ +# TaskService 未完成事项 + +> 生成时间:2026-04-14 + +## 一、阻塞性问题(阻断编译/运行) + +### 1. Proto 编译未执行 + +**问题**:编译脚本已修改但未执行,`pkg/proto/` 下无生成代码。 + +**影响**:`go build ./...` 会失败,找不到 `task.pb.go` 和 `task.triple.go`。 + +**操作**: +```bash +cd backend && sh scripts/compile-proto.sh +``` + +**前提**:环境需安装 `go` 和 `protoc`。 + +--- + +### 2. Dubbo Server 注册代码被注释 + +**文件**:`backend/services/taskService/main.go:120-124` + +```go +// 注意:这里需要 TaskMobileService 和 TaskInternalService 的 RegisterHandler +// 这些会在 proto 编译后生成。在 proto 编译之前,这里会报编译错误。 +// Task 15 完成 proto 编译后,需要将下面的注释替换为实际的注册代码。 + +logger.Logger.Info(fmt.Sprintf("taskService configured on port %d, awaiting proto registration", *port)) +``` + +**影响**:服务启动后没有注册任何 RPC handler,客户端无法调用。 + +**修复**:proto 编译后需添加类似以下代码: +```go +pb.RegisterTaskMobileServiceHandler(srv, mobileProvider) +pb.RegisterTaskInternalServiceHandler(srv, internalProvider) +``` + +--- + +## 二、代码缺陷(运行时 Bug) + +### 3. `UpdateOnboardingProgress` 方法缺失 + +**文件**:`backend/services/taskService/repository/onboarding_repo.go` + +**问题**: +- 接口 `OnboardingRepository` 缺少 `UpdateOnboardingProgress` 方法 +- `onboarding_service.go:172-176` 的 `updateOnboardingProgress` 直接返回 `nil`,未实际更新数据 + +```go +func (s *onboardingService) updateOnboardingProgress(progress *model.UserOnboardingProgress) error { + // 需要在 repository 中添加 UpdateOnboardingProgress 方法 + // 这里暂时用 GORM 直接更新 + return nil // ← BUG: 没有真正更新 +} +``` + +**影响**:`CompleteGuide` 完成任务后进度状态不会更新。 + +**修复**:在 `onboarding_repo.go` 添加: +```go +// OnboardingRepository 接口添加: +UpdateOnboardingProgress(progress *model.UserOnboardingProgress) error + +// 实现: +func (r *onboardingRepository) UpdateOnboardingProgress(progress *model.UserOnboardingProgress) error { + progress.UpdatedAt = gorm.NowFunc().Unix() + return r.db.Save(progress).Error +} +``` + +然后 `onboarding_service.go` 中调用 `s.onboardingRepo.UpdateOnboardingProgress(progress)`。 + +--- + +### 4. `ListCompletedDailyTasks` 方法缺失 + +**文件**:`backend/services/taskService/repository/daily_task_repo.go` + +**问题**:`daily_task_service.go:204` 调用 `s.dailyRepo.ListCompletedDailyTasks()`,但该方法不存在。 + +**影响**:`ClaimAllDailyTasks` 调用会编译报错。 + +**修复**:在 `daily_task_repo.go` 添加: +```go +// DailyTaskRepository 接口添加: +ListCompletedDailyTasks(userID, starID int64) ([]*model.UserDailyTaskProgress, error) + +// 实现: +func (r *dailyTaskRepository) ListCompletedDailyTasks(userID, starID int64) ([]*model.UserDailyTaskProgress, error) { + var progressList []*model.UserDailyTaskProgress + err := r.db.Where("user_id = ? AND star_id = ? AND status = ?", userID, starID, "completed"). + Find(&progressList).Error + return progressList, err +} +``` + +--- + +### 5. `CompleteGuide` 中死代码 + +**文件**:`backend/services/taskService/onboarding_service.go:154-156` + +```go +if err := s.onboardingRepo.UpdateOnboardingStatus(&model.UserOnboardingStatus{UserID: userID}); err != nil { + // UpdateOnboardingStatus is for status table, not progress +} // ← 无意义的空调用,应该删除 +``` + +**影响**:代码冗余,不影响功能但影响可读性。 + +**修复**:删除第 154-156 行。 + +--- + +## 三、前端缺陷 + +### 6. `guide.vue` 任务完成状态判断错误 + +**文件**:`frontend/pages/tasks/guide.vue:129-134` + +```javascript +function isTaskCompleted(stage, taskKey) { + // 这里需要根据实际数据结构判断 + // 假设 completed 状态的任务 key 会被标记 + // 暂时返回 false,实际需要后端返回更详细的状态 + return false // ← BUG: 永远返回 false +} +``` + +**影响**:引导页面的任务永远显示为未完成,用户无法正确看到进度。 + +**修复**:参考后端 `GetOnboardingStatus` 的逻辑,从 `stage.required_task_keys` 和进度数据判断。 + +--- + +### 7. `daily_task_service.go` 错误被忽略 + +**文件**:`backend/services/taskService/service/daily_task_service.go:229,234` + +```go +// 发放水晶奖励 +if def.CrystalReward > 0 { + s.userClient.UpdateCrystalBalance(ctx, userID, starID, def.CrystalReward) // ← 错误被忽略 +} + +// 发放经验奖励 +if def.ExpReward > 0 { + s.userClient.AddExperience(ctx, userID, starID, def.ExpReward) // ← 错误被忽略 +} +``` + +**影响**:`ClaimAllDailyTasks` 中奖励发放失败时不报错,可能导致用户未收到奖励。 + +**修复**:添加错误处理和日志。 + +--- + +## 四、已完成的修复 + +### 8. `AddExperience` RPC 已添加 + +- **user.proto**:已添加 `AddExperienceRequest/Response` 消息和 RPC 定义 +- **user_rpc_client.go**:已有完整实现 + +等待 proto 编译后即可使用。 + +--- + +## 五、待执行的操作清单 + +| # | 操作 | 优先级 | 依赖 | +|---|------|--------|------| +| 1 | 执行 `sh scripts/compile-proto.sh` | P0 | go + protoc 环境 | +| 2 | 添加 `UpdateOnboardingProgress` 到 onboarding_repo | P0 | 无 | +| 3 | 添加 `ListCompletedDailyTasks` 到 daily_task_repo | P0 | 无 | +| 4 | 修复 main.go Dubbo 注册代码 | P0 | #1 | +| 5 | 修复 guide.vue isTaskCompleted | P1 | 无 | +| 6 | 删除 onboarding_service.go 死代码 | P2 | 无 | +| 7 | 修复 ClaimAllDailyTasks 错误处理 | P2 | 无 | + +--- + +## 六、执行顺序建议 + +``` +1. 先执行 proto 编译(#1) + ↓ +2. 修复 onboarding_repo + daily_task_repo(#2, #3) + ↓ +3. 修复 main.go 注册代码(#4) + ↓ +4. 编译验证:go build ./... + ↓ +5. 修复前端 guide.vue(#5) + ↓ +6. 可选优化(#6, #7) +``` diff --git a/frontend/pages.json b/frontend/pages.json index 82d208a..2bd74a3 100644 --- a/frontend/pages.json +++ b/frontend/pages.json @@ -151,6 +151,24 @@ "style": { "navigationStyle": "custom" } + }, + { + "path": "pages/tasks/daily-tasks", + "style": { + "navigationStyle": "custom" + } + }, + { + "path": "pages/tasks/guide", + "style": { + "navigationStyle": "custom" + } + }, + { + "path": "pages/tasks/revenue", + "style": { + "navigationStyle": "custom" + } } ], "globalStyle": { diff --git a/frontend/pages/tasks/daily-tasks.vue b/frontend/pages/tasks/daily-tasks.vue new file mode 100644 index 0000000..3863320 --- /dev/null +++ b/frontend/pages/tasks/daily-tasks.vue @@ -0,0 +1,353 @@ + + + + + diff --git a/frontend/pages/tasks/guide.vue b/frontend/pages/tasks/guide.vue new file mode 100644 index 0000000..1c985d0 --- /dev/null +++ b/frontend/pages/tasks/guide.vue @@ -0,0 +1,424 @@ + + + + + diff --git a/frontend/pages/tasks/revenue.vue b/frontend/pages/tasks/revenue.vue new file mode 100644 index 0000000..53180c5 --- /dev/null +++ b/frontend/pages/tasks/revenue.vue @@ -0,0 +1,488 @@ + + + + + diff --git a/frontend/utils/task-api.js b/frontend/utils/task-api.js new file mode 100644 index 0000000..bcd0d7f --- /dev/null +++ b/frontend/utils/task-api.js @@ -0,0 +1,103 @@ +/** + * taskService API 调用封装 + * 所有 API 通过 gateway 路由到 taskService (Triple/HTTP) + * baseURL 已配置在 api.js 中指向 gateway (端口 8080) + */ + +import { request } from './api.js' + +/** + * 获取每日任务列表 + * @param {number} starId - 明星ID + * @returns {Promise} + */ +export const getDailyTasks = (starId) => + request({ url: '/api/tasks/daily', data: { star_id: starId } }) + +/** + * 上报任务事件 + * @param {string} eventType - 事件类型(如 daily_login, daily_browse_asset) + * @param {number} starId - 明星ID + * @returns {Promise} + */ +export const reportEvent = (eventType, starId) => + request({ url: '/api/tasks/report-event', method: 'POST', data: { event_type: eventType, star_id: starId } }) + +/** + * 领取单个每日任务奖励 + * @param {string} taskKey - 任务key + * @param {number} starId - 明星ID + * @returns {Promise} + */ +export const claimDailyTask = (taskKey, starId) => + request({ url: '/api/tasks/daily/claim', method: 'POST', data: { task_key: taskKey, star_id: starId } }) + +/** + * 一键领取所有已完成每日任务 + * @param {number} starId - 明星ID + * @returns {Promise} + */ +export const claimAllDailyTasks = (starId) => + request({ url: '/api/tasks/daily/claim-all', method: 'POST', data: { star_id: starId } }) + +/** + * 完成引导步骤 + * @param {string} taskKey - 引导任务key(如 square_home, profile_edit) + * @returns {Promise} + */ +export const completeGuide = (taskKey) => + request({ url: '/api/tasks/guide/complete', method: 'POST', data: { task_key: taskKey } }) + +/** + * 获取引导状态 + * @returns {Promise} + */ +export const getOnboardingStatus = () => + request({ url: '/api/tasks/onboarding/status' }) + +/** + * 进入下一阶段 + * @param {number} targetStage - 目标阶段 + * @returns {Promise} + */ +export const advanceStage = (targetStage) => + request({ url: '/api/tasks/onboarding/advance-stage', method: 'POST', data: { target_stage: targetStage } }) + +/** + * 领取引导阶段奖励 + * @param {number} stage - 阶段号 + * @returns {Promise} + */ +export const claimOnboardingReward = (stage) => + request({ url: '/api/tasks/onboarding/claim-reward', method: 'POST', data: { stage } }) + +/** + * 获取展示收益列表 + * @param {number} starId - 明星ID + * @param {string} status - 状态筛选(claimable/claimed),可选 + * @param {number} page - 页码 + * @param {number} pageSize - 每页条数 + * @returns {Promise} + */ +export const getExhibitionRevenue = (starId, status, page, pageSize) => + request({ + url: '/api/tasks/exhibition-revenue', + data: { star_id: starId, status, page, page_size: pageSize } + }) + +/** + * 领取单个展示收益 + * @param {number} revenueId - 收益记录ID + * @param {number} starId - 明星ID + * @returns {Promise} + */ +export const claimExhibitionRevenue = (revenueId, starId) => + request({ url: '/api/tasks/exhibition-revenue/claim', method: 'POST', data: { revenue_id: revenueId, star_id: starId } }) + +/** + * 一键领取所有可领取展示收益 + * @param {number} starId - 明星ID + * @returns {Promise} + */ +export const claimAllExhibitionRevenue = (starId) => + request({ url: '/api/tasks/exhibition-revenue/claim-all', method: 'POST', data: { star_id: starId } }) diff --git a/image.png b/image.png deleted file mode 100644 index 01c1380..0000000 Binary files a/image.png and /dev/null differ diff --git a/微信图片_20260410124013_22_105.png b/微信图片_20260410124013_22_105.png deleted file mode 100644 index d74b9ec..0000000 Binary files a/微信图片_20260410124013_22_105.png and /dev/null differ diff --git a/微信图片_20260410124027_23_105.png b/微信图片_20260410124027_23_105.png deleted file mode 100644 index a9c277f..0000000 Binary files a/微信图片_20260410124027_23_105.png and /dev/null differ