43 KiB
taskService 实现计划
面向智能体工程师: 必需子技能:使用
superpowers:subagent-driven-development(推荐)或superpowers:executing-plans来逐任务实现本计划。步骤使用复选框(- [ ])语法进行跟踪。
目标: 实现 taskService(Go Dubbo-go)后端,包含移动端 API 和移动端前端页面(每日任务、引导流程、展示收益)。
架构:
- taskService:Go Dubbo-go 服务,端口 20005,暴露 HTTP/Triple 移动端 API 和内部 RPC(TaskInternalService)
- 移动端前端:Vue/uni-app 页面,通过 gateway 调用 taskService HTTP API
- 共享 PostgreSQL 数据库存储任务数据
- taskService 调用 userService RPC 发放水晶/经验奖励
技术栈: Go (dubbo-go v3)、GORM、PostgreSQL、Vue/uni-app
文件结构
后端 (taskService)
backend/
├── proto/
│ └── task.proto # 新增:TaskMobileService + TaskInternalService 定义
├── pkg/
│ └── proto/
│ └── task/
│ ├── task.pb.go # 新增:proto 编译生成
│ └── task.triple.go # 新增:proto 编译生成
├── services/
│ └── taskService/ # 新增
│ ├── main.go # 新增:服务入口
│ ├── config/
│ │ └── task_config.go # 新增:配置
│ ├── model/
│ │ └── task_models.go # 新增:所有任务表对应的 GORM 模型
│ ├── repository/
│ │ ├── daily_task_repo.go # 新增
│ │ ├── onboarding_repo.go # 新增
│ │ └── revenue_repo.go # 新增
│ ├── service/
│ │ ├── daily_task_service.go # 新增
│ │ ├── onboarding_service.go # 新增
│ │ └── revenue_service.go # 新增
│ ├── provider/
│ │ ├── task_mobile_provider.go # 新增:移动端 API(HTTP/Triple)
│ │ └── task_internal_provider.go # 新增:内部 RPC(TaskInternalService)
│ ├── worker/
│ │ └── daily_reset_worker.go # 新增:每日 05:00 重置 + 自动发放
│ └── client/
│ └── user_rpc_client.go # 新增:调用 userService 的 UpdateCrystalBalance、AddExperience
前端(移动端)
frontend/
├── pages/
│ └── tasks/ # 新增:任务页面目录
│ ├── daily-tasks.vue # 新增:每日任务页面
│ ├── guide.vue # 新增:引导任务页面
│ └── revenue.vue # 新增:展示收益页面
├── utils/
│ └── task-api.js # 新增:taskService API 调用封装
└── pages.json # 修改:新增页面路由
数据库迁移
backend/
└── scripts/
└── v001_init_task_tables.sql # 新增:7 张任务相关表的 DDL
关键前提:必须先在 user.proto 中添加 AddExperience
设计文档(设计文档第 101 行)要求在 UserSocialService 上添加 AddExperience RPC,但当前 backend/proto/user.proto 中没有该 RPC。taskService 编译依赖此 RPC,必须先完成。
修改:backend/proto/user.proto — 在 UserSocialService 中添加:
// 更新经验值请求(内部RPC调用,用于taskService发放经验奖励)
message AddExperienceRequest {
int64 user_id = 1;
int64 star_id = 2;
int64 delta = 3;
}
message AddExperienceResponse {
topfans.common.BaseResponse base = 1;
int64 new_experience = 2;
}
// 在 service UserSocialService 中添加:
rpc AddExperience(AddExperienceRequest) returns (AddExperienceResponse);
编辑完 user.proto 后,运行:cd backend && sh scripts/compile-proto.sh
阶段一:taskService 后端基础设施
任务 1:Proto 定义
文件:
- 新增:
backend/proto/task.proto
syntax = "proto3";
package topfans.task;
option go_package = "github.com/topfans/backend/pkg/proto/task;task";
import "proto/common.proto";
import "google/api/annotations.proto";
// ==================== 每日任务 ====================
message DailyTaskItem {
string task_key = 1;
int64 star_id = 2;
string name = 3;
string description = 4;
int64 crystal_reward = 5;
int64 exp_reward = 6;
string status = 7; // pending/completed/claimed
bool can_claim = 8;
}
message GetDailyTasksRequest {
int64 star_id = 1;
}
message GetDailyTasksResponse {
topfans.common.BaseResponse base = 1;
int64 star_id = 2;
repeated DailyTaskItem tasks = 3;
}
message ReportEventRequest {
string event_type = 1; // 如 "daily_browse_asset", "daily_login"
int64 star_id = 2;
}
message ReportEventResponse {
topfans.common.BaseResponse base = 1;
bool success = 2;
string task_key = 3;
bool task_completed = 4;
string message = 5;
}
message ClaimDailyTaskRequest {
string task_key = 1;
int64 star_id = 2;
}
message ClaimDailyTaskResponse {
topfans.common.BaseResponse base = 1;
bool success = 2;
}
message ClaimAllDailyTasksRequest {
int64 star_id = 1;
}
message ClaimAllDailyTasksResponse {
topfans.common.BaseResponse base = 1;
int32 claimed_count = 2;
}
// ==================== 引导任务 ====================
message OnboardingStage {
int32 stage = 1;
string name = 2;
repeated string required_task_keys = 3;
int64 crystal_reward = 4;
int64 exp_reward = 5;
string status = 6; // pending/completed/in_progress
bool is_current = 7;
}
message CompleteGuideRequest {
string task_key = 1;
}
message CompleteGuideResponse {
topfans.common.BaseResponse base = 1;
int64 user_id = 2;
int32 current_stage = 3;
string status = 4; // pending/in_progress/completed
repeated OnboardingStage stages = 5;
}
message GetOnboardingStatusRequest {}
message GetOnboardingStatusResponse {
topfans.common.BaseResponse base = 1;
int64 user_id = 2;
int32 current_stage = 3;
string status = 4;
repeated OnboardingStage stages = 5;
}
message AdvanceStageRequest {
int32 target_stage = 1;
}
message AdvanceStageResponse {
topfans.common.BaseResponse base = 1;
int32 current_stage = 2;
string status = 3;
repeated OnboardingStage stages = 4;
}
message ClaimOnboardingRewardRequest {
int32 stage = 1;
}
message ClaimOnboardingRewardResponse {
topfans.common.BaseResponse base = 1;
bool success = 2;
}
// ==================== 展示收益 ====================
message ExhibitionRevenueItem {
int64 id = 1;
int64 star_id = 2;
int64 exhibition_id = 3;
int64 asset_id = 4;
int64 slot_id = 5;
string slot_type = 6; // own/friend
int64 crystal_amount = 7;
int64 cycle_start_time = 8;
int64 cycle_end_time = 9;
string status = 10; // claimable/claimed/failed
bool can_claim = 11;
}
message GetExhibitionRevenueRequest {
int64 star_id = 1;
string status = 2; // 可选筛选
int32 page = 3;
int32 page_size = 4;
}
message GetExhibitionRevenueResponse {
topfans.common.BaseResponse base = 1;
repeated ExhibitionRevenueItem items = 2;
int64 total = 3;
int32 page = 4;
int32 page_size = 5;
}
message ClaimExhibitionRevenueRequest {
int64 revenue_id = 1;
int64 star_id = 2;
}
message ClaimExhibitionRevenueResponse {
topfans.common.BaseResponse base = 1;
bool success = 2;
}
message ClaimAllExhibitionRevenueRequest {
int64 star_id = 1;
}
message ClaimAllExhibitionRevenueResponse {
topfans.common.BaseResponse base = 1;
int32 claimed_count = 2;
}
// ==================== 内部RPC服务 ====================
message InitUserTasksRequest {
int64 user_id = 1;
int64 star_id = 2;
}
message InitUserTasksResponse {
topfans.common.BaseResponse base = 1;
bool success = 2;
}
message OnExhibitionCompletedRequest {
int64 exhibition_id = 1;
int64 asset_id = 2;
int64 slot_id = 3;
int64 occupier_uid = 4;
int64 occupier_star_id = 5;
int64 slot_owner_uid = 6;
int64 crystal_amount = 7;
int64 start_time = 8;
int64 expire_at = 9;
}
message OnExhibitionCompletedResponse {
topfans.common.BaseResponse base = 1;
int64 revenue_record_id = 2;
}
// ==================== Mobile API Service ====================
service TaskMobileService {
rpc GetDailyTasks(GetDailyTasksRequest) returns (GetDailyTasksResponse) {
option (google.api.http) = { get: "/api/tasks/daily"; };
}
rpc ReportEvent(ReportEventRequest) returns (ReportEventResponse) {
option (google.api.http) = { post: "/api/tasks/report-event"; body: "*"; };
}
rpc ClaimDailyTask(ClaimDailyTaskRequest) returns (ClaimDailyTaskResponse) {
option (google.api.http) = { post: "/api/tasks/daily/claim"; body: "*"; };
}
rpc ClaimAllDailyTasks(ClaimAllDailyTasksRequest) returns (ClaimAllDailyTasksResponse) {
option (google.api.http) = { post: "/api/tasks/daily/claim-all"; body: "*"; };
}
rpc CompleteGuide(CompleteGuideRequest) returns (CompleteGuideResponse) {
option (google.api.http) = { post: "/api/tasks/guide/complete"; body: "*"; };
}
rpc GetOnboardingStatus(GetOnboardingStatusRequest) returns (GetOnboardingStatusResponse) {
option (google.api.http) = { get: "/api/tasks/onboarding/status"; };
}
rpc AdvanceStage(AdvanceStageRequest) returns (AdvanceStageResponse) {
option (google.api.http) = { post: "/api/tasks/onboarding/advance-stage"; body: "*"; };
}
rpc ClaimOnboardingReward(ClaimOnboardingRewardRequest) returns (ClaimOnboardingRewardResponse) {
option (google.api.http) = { post: "/api/tasks/onboarding/claim-reward"; body: "*"; };
}
rpc GetExhibitionRevenue(GetExhibitionRevenueRequest) returns (GetExhibitionRevenueResponse) {
option (google.api.http) = { get: "/api/tasks/exhibition-revenue"; };
}
rpc ClaimExhibitionRevenue(ClaimExhibitionRevenueRequest) returns (ClaimExhibitionRevenueResponse) {
option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim"; body: "*"; };
}
rpc ClaimAllExhibitionRevenue(ClaimAllExhibitionRevenueRequest) returns (ClaimAllExhibitionRevenueResponse) {
option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim-all"; body: "*"; };
}
}
// ==================== Internal RPC Service ====================
service TaskInternalService {
rpc InitUserTasks(InitUserTasksRequest) returns (InitUserTasksResponse);
rpc OnExhibitionCompleted(OnExhibitionCompletedRequest) returns (OnExhibitionCompletedResponse);
}
任务 2:数据库模型
文件:
- 新增:
backend/services/taskService/model/task_models.go
package model
// TaskDefinition 任务定义表
type TaskDefinition struct {
ID int64 `gorm:"primaryKey;column:id;autoIncrement"`
StarID *int64 `gorm:"column:star_id;index"` // NULL=全局默认
TaskKey string `gorm:"column:task_key;size:50;not null"`
TaskType string `gorm:"column:task_type;size:20;not null"` // daily/onboarding
Name string `gorm:"column:name;size:100;not null"`
Description string `gorm:"column:description;type:text"`
CrystalReward int64 `gorm:"column:crystal_reward;default:0"`
ExpReward int64 `gorm:"column:exp_reward;default:0"`
SortOrder int `gorm:"column:sort_order;default:0"`
IsActive bool `gorm:"column:is_active;default:true"`
CreatedAt int64 `gorm:"column:created_at"`
UpdatedAt int64 `gorm:"column:updated_at"`
}
func (TaskDefinition) TableName() string { return "task_definitions" }
// UserDailyTaskProgress 每日任务进度表
type UserDailyTaskProgress struct {
ID int64 `gorm:"primaryKey;column:id;autoIncrement"`
UserID int64 `gorm:"column:user_id;not null;index:idx_daily_user_star_key"`
StarID int64 `gorm:"column:star_id;not null;index:idx_daily_user_star_key"`
TaskKey string `gorm:"column:task_key;size:50;not null;index:idx_daily_user_star_key"`
Status string `gorm:"column:status;size:20;default:pending"` // pending/completed/claimed
CompletedAt *int64 `gorm:"column:completed_at"`
ClaimedAt *int64 `gorm:"column:claimed_at"`
CreatedAt int64 `gorm:"column:created_at"`
UpdatedAt int64 `gorm:"column:updated_at"`
}
func (UserDailyTaskProgress) TableName() string { return "user_daily_task_progress" }
// UserOnboardingProgress 引导任务进度表
type UserOnboardingProgress struct {
ID int64 `gorm:"primaryKey;column:id;autoIncrement"`
UserID int64 `gorm:"column:user_id;not null;index:idx_onboard_user_key"`
TaskKey string `gorm:"column:task_key;size:50;not null;index:idx_onboard_user_key"`
Status string `gorm:"column:status;size:20;default:pending"`
CompletedAt *int64 `gorm:"column:completed_at"`
ClaimedAt *int64 `gorm:"column:claimed_at"`
CreatedAt int64 `gorm:"column:created_at"`
UpdatedAt int64 `gorm:"column:updated_at"`
}
func (UserOnboardingProgress) TableName() string { return "user_onboarding_progress" }
// UserOnboardingStatus 引导流程状态表(per-user,非 per-star)
type UserOnboardingStatus struct {
UserID int64 `gorm:"primaryKey;column:user_id"`
CurrentStage int `gorm:"column:current_stage;default:0"`
Status string `gorm:"column:status;size:20;default:pending"`
IsFirstLoginBonusClaimed bool `gorm:"column:is_first_login_bonus_claimed;default:false"` // 废弃字段
HasFriendDisplayBonus bool `gorm:"column:has_friend_display_bonus;default:false"` // 废弃字段
CompletedAt *int64 `gorm:"column:completed_at"`
ClaimedAt *int64 `gorm:"column:claimed_at"`
CreatedAt int64 `gorm:"column:created_at"`
UpdatedAt int64 `gorm:"column:updated_at"`
}
func (UserOnboardingStatus) TableName() string { return "user_onboarding_status" }
// OnboardingStageConfig 引导阶段配置表
type OnboardingStageConfig struct {
ID int64 `gorm:"primaryKey;column:id;autoIncrement"`
Stage int `gorm:"column:stage;not null;uniqueIndex"`
Name string `gorm:"column:name;size:100;not null"`
Description string `gorm:"column:description;type:text"`
RequiredTaskKeys []string `gorm:"column:required_task_keys;text[]"` // PostgreSQL 数组
CrystalReward int64 `gorm:"column:crystal_reward;default:0"`
ExpReward int64 `gorm:"column:exp_reward;default:0"`
SortOrder int `gorm:"column:sort_order;default:0"`
IsActive bool `gorm:"column:is_active;default:true"`
CreatedAt int64 `gorm:"column:created_at"`
UpdatedAt int64 `gorm:"column:updated_at"`
}
func (OnboardingStageConfig) TableName() string { return "onboarding_stage_config" }
// ExhibitionRevenueRecord 展示收益记录表
type ExhibitionRevenueRecord struct {
ID int64 `gorm:"primaryKey;column:id;autoIncrement"`
UserID int64 `gorm:"column:user_id;not null;index:idx_revenue_user_star"`
StarID int64 `gorm:"column:star_id;not null;index:idx_revenue_user_star"`
ExhibitionID int64 `gorm:"column:exhibition_id;not null"`
AssetID int64 `gorm:"column:asset_id;not null"`
SlotID int64 `gorm:"column:slot_id;not null"`
SlotOwnerUID int64 `gorm:"column:slot_owner_uid;not null"`
SlotType string `gorm:"column:slot_type;size:20;not null"` // own/friend
CrystalAmount int64 `gorm:"column:crystal_amount;not null"`
CycleStartTime int64 `gorm:"column:cycle_start_time;not null"`
CycleEndTime int64 `gorm:"column:cycle_end_time;not null"`
Status string `gorm:"column:status;size:20;default:claimable"` // claimable/claimed/failed
ClaimedAt *int64 `gorm:"column:claimed_at"`
CreatedAt int64 `gorm:"column:created_at"`
}
func (ExhibitionRevenueRecord) TableName() string { return "exhibition_revenue_records" }
// TaskResetLog 重置日志表
type TaskResetLog struct {
ID int64 `gorm:"primaryKey;column:id;autoIncrement"`
ResetType string `gorm:"column:reset_type;size:20;not null"` // daily
LastResetAt int64 `gorm:"column:last_reset_at;not null"`
CreatedAt int64 `gorm:"column:created_at"`
}
func (TaskResetLog) TableName() string { return "task_reset_log" }
任务 3:配置文件
文件:
- 新增:
backend/services/taskService/config/task_config.go
package config
import (
"flag"
"log"
"os"
"strconv"
)
type DatabaseConfig struct {
Host, Password, DBName, SSLMode, TimeZone string
Port int
User string
}
type ServiceURLs struct {
UserService string
}
type WorkerConfig struct {
ResetHour, ResetMinute int
RevenueBatchSize int
RevenueMaxRetries int
}
var (
DBConfig = &DatabaseConfig{}
ServiceURLsConfig = &ServiceURLs{UserService: "tri://localhost:20000"}
WorkerConfigData = &WorkerConfig{
ResetHour: 5, ResetMinute: 0,
RevenueBatchSize: 100, RevenueMaxRetries: 3,
}
)
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" { return v }
return fallback
}
func getEnvInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if n, err := strconv.Atoi(v); err == nil { return n }
}
return fallback
}
func InitConfig() {
flag.StringVar(&DBConfig.Host, "db-host", getEnv("DB_HOST", "localhost"), "数据库主机")
flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("DB_PORT", 5432), "数据库端口")
flag.StringVar(&DBConfig.User, "db-user", getEnv("DB_USER", "postgres"), "数据库用户名")
flag.StringVar(&DBConfig.Password, "db-password", getEnv("DB_PASSWORD", ""), "数据库密码")
flag.StringVar(&DBConfig.DBName, "db-name", getEnv("DB_NAME", "topfans"), "数据库名称")
flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "数据库 SSL 模式")
flag.StringVar(&ServiceURLsConfig.UserService, "user-service-url", getEnv("USER_SERVICE_URL", "tri://localhost:20000"), "User Service RPC 地址")
flag.IntVar(&WorkerConfigData.ResetHour, "reset-hour", getEnvInt("RESET_HOUR", 5), "每日重置小时(Asia/Shanghai)")
flag.IntVar(&WorkerConfigData.ResetMinute, "reset-minute", getEnvInt("RESET_MINUTE", 0), "每日重置分钟")
flag.IntVar(&WorkerConfigData.RevenueBatchSize, "revenue-batch-size", getEnvInt("REVENUE_BATCH_SIZE", 100), "收益自动发放批次大小")
flag.IntVar(&WorkerConfigData.RevenueMaxRetries, "revenue-max-retries", getEnvInt("REVENUE_MAX_RETRIES", 3), "收益自动发放最大重试次数")
flag.Parse()
log.Println("taskService 配置初始化完成")
log.Printf(" 数据库: %s:%d/%s", DBConfig.Host, DBConfig.Port, DBConfig.DBName)
log.Printf(" User Service: %s", ServiceURLsConfig.UserService)
log.Printf(" 重置时间: %02d:%02d Asia/Shanghai", WorkerConfigData.ResetHour, WorkerConfigData.ResetMinute)
}
任务 4:userService RPC 客户端
文件:
- 新增:
backend/services/taskService/client/user_rpc_client.go
模式参考 assetService/client/user_rpc_client.go。客户端封装体接收的是服务接口(而非 *client.Client)。服务接口创建(在 main.go 中调用 pbUser.NewUserSocialService)在 main.go 中完成。
package client
import (
"context"
"github.com/topfans/backend/pkg/logger"
pbUser "github.com/topfans/backend/pkg/proto/user"
"go.uber.org/zap"
)
type UserServiceClient interface {
UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error)
AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error)
}
type userServiceClient struct {
client pbUser.UserSocialService
}
func NewUserServiceClient(client pbUser.UserSocialService) UserServiceClient {
return &userServiceClient{client: client}
}
func (c *userServiceClient) UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error) {
logger.Logger.Debug("Calling UserService.UpdateCrystalBalance",
zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta))
resp, err := c.client.UpdateCrystalBalance(ctx, &pbUser.UpdateCrystalBalanceRequest{
UserId: userID, StarId: starID, Delta: delta,
})
if err != nil {
logger.Logger.Error("UserService.UpdateCrystalBalance failed", zap.Error(err))
return 0, err
}
if resp.Base.Code != 0 {
logger.Logger.Warn("UpdateCrystalBalance non-zero code", zap.Int32("code", int32(resp.Base.Code)))
return 0, err
}
return resp.NewBalance, nil
}
func (c *userServiceClient) AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error) {
logger.Logger.Debug("Calling UserService.AddExperience",
zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta))
resp, err := c.client.AddExperience(ctx, &pbUser.AddExperienceRequest{
UserId: userID, StarId: starID, Delta: delta,
})
if err != nil {
logger.Logger.Error("UserService.AddExperience failed", zap.Error(err))
return 0, err
}
if resp.Base.Code != 0 {
logger.Logger.Warn("AddExperience non-zero code", zap.Int32("code", int32(resp.Base.Code)))
return 0, err
}
return resp.NewExperience, nil
}
阶段二:Repository 层
任务 5:每日任务 Repository
文件:
- 新增:
backend/services/taskService/repository/daily_task_repo.go
主要方法:
GetUserDailyProgress(userID, starID, taskKey) (*model.UserDailyTaskProgress, error)GetOrCreateDailyProgress(userID, starID, taskKey, def *model.TaskDefinition) (*model.UserDailyTaskProgress, error)ListDailyTasksByUser(userID, starID) ([]*model.UserDailyTaskProgress, error)ListActiveDailyTaskDefinitions(starID) ([]*model.TaskDefinition, error)— 返回 star_id 相关 + 全局默认UpdateDailyProgress(progress *model.UserDailyTaskProgress) errorResetAllDailyTasks() (int64, error)— 重置所有非 pending 状态为 pendingInitDailyTasksForUser(userID, starID) error— InitUserTasks RPC 调用,为该用户创建该 star_id 下所有每日任务进度AcquireAdvisoryLock(lockID int64) bool—SELECT pg_try_advisory_lock($1),返回是否获取成功ReleaseAdvisoryLock(lockID int64)—SELECT pg_advisory_unlock($1)
任务 6:引导任务 Repository
文件:
- 新增:
backend/services/taskService/repository/onboarding_repo.go
主要方法:
GetOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error)GetOrCreateOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error)— 获取或新建(current_stage=0, status=pending)UpdateOnboardingStatus(status *model.UserOnboardingStatus) errorGetUserOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error)GetOrCreateOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error)ListActiveStageConfigs() ([]*model.OnboardingStageConfig, error)ListUserOnboardingProgressByUser(userID int64) ([]*model.UserOnboardingProgress, error)GetStageConfig(stage int) (*model.OnboardingStageConfig, error)
任务 7:收益 Repository
文件:
- 新增:
backend/services/taskService/repository/revenue_repo.go
主要方法:
CreateRevenueRecord(record *model.ExhibitionRevenueRecord) (*model.ExhibitionRevenueRecord, error)GetRevenueRecord(id int64) (*model.ExhibitionRevenueRecord, error)ListRevenueByUser(userID, starID int64, status string, page, pageSize int) ([]*model.ExhibitionRevenueRecord, int64, error)ClaimRevenueRecord(id int64, userID int64) (bool, error)— 乐观锁:仅在 status='claimable' 时更新,返回是否更新成功UpdateRevenueStatus(id int64, status string) error— Worker 自动发放后调用(claimed/failed)ListClaimableRevenue(limit int) ([]*model.ExhibitionRevenueRecord, error)— Worker 自动发放用,LIMIT batchSize
阶段三:Service 层
任务 8:每日任务 Service
文件:
- 新增:
backend/services/taskService/service/daily_task_service.go
主要方法:
GetDailyTasks(ctx, userID, starID) (*pb.GetDailyTasksResponse, error)— 合并任务定义与用户进度,can_claim=(status==completed)ReportEvent(ctx, userID, starID, eventType) (*pb.ReportEventResponse, error)— eventType 映射到 task_key,标记任务完成ClaimDailyTask(ctx, userID, starID, taskKey) (*pb.ClaimDailyTaskResponse, error)— 验证 completed,调用水晶和经验奖励,标记 claimedClaimAllDailyTasks(ctx, userID, starID) (*pb.ClaimAllDailyTasksResponse, error)— 查找所有已完成任务,批量领取
任务 9:引导任务 Service
文件:
- 新增:
backend/services/taskService/service/onboarding_service.go
主要方法:
GetOnboardingStatus(ctx, userID) (*pb.GetOnboardingStatusResponse, error)CompleteGuide(ctx, userID, taskKey) (*pb.CompleteGuideResponse, error)— 标记引导进度完成(不是每日任务),返回完整阶段列表AdvanceStage(ctx, userID, targetStage) (*pb.AdvanceStageResponse, error)— 验证当前阶段所有任务已完成,再推进阶段ClaimOnboardingReward(ctx, userID, stage) (*pb.ClaimOnboardingRewardResponse, error)— 验证阶段已完成,发放奖励,标记 claimedInitUserTasks(ctx, userID, starID) error— TaskInternalService.InitUserTasks RPC 调用:创建 user_onboarding_status(若不存在)+ 调用 dailyRepo.InitDailyTasksForUser
任务 10:收益 Service
文件:
- 新增:
backend/services/taskService/service/revenue_service.go
主要方法:
GetExhibitionRevenue(ctx, userID, starID, status string, page, pageSize) (*pb.GetExhibitionRevenueResponse, error)ClaimExhibitionRevenue(ctx, userID, starID, revenueID) (*pb.ClaimExhibitionRevenueResponse, error)— 使用 repo.ClaimRevenueRecord 乐观锁ClaimAllExhibitionRevenue(ctx, userID, starID) (*pb.ClaimAllExhibitionRevenueResponse, error)— 查找所有可领取收益,批量领取OnExhibitionCompleted(ctx, req) (*pb.OnExhibitionCompletedResponse, error)— 关键:仅在 slot_owner_uid != occupier_uid 时创建收益记录(自己展位不产生收益),status='claimable'
阶段四:Provider 层(API 处理器)
任务 11:移动端 API Provider
文件:
- 新增:
backend/services/taskService/provider/task_mobile_provider.go
实现 TaskMobileService 接口(proto 生成)。通过 Dubbo context attachments 提取 user_id(gateway 认证中间件设置)。参考 galleryService/provider/gallery_provider.go。
辅助函数(定义在文件顶部):
func getUserIDFromCtx(ctx context.Context) int64 {
// Dubbo 通过 context attachments 从 gateway 认证中间件传递 user_id
// Triple 协议通过 rpcCtx 或 attachments 获取
if attachments := ctx.Value("attachments").(map[string]string); attachments != nil {
if uid := attachments["user_id"]; uid != "" {
if id, _ := strconv.ParseInt(uid, 10, 64); id > 0 { return id }
}
}
return 0
}
每个 handler:提取 userID → 调用 service 方法 → 返回响应。
任务 12:内部 RPC Provider
文件:
- 新增:
backend/services/taskService/provider/task_internal_provider.go
实现 TaskInternalService:
InitUserTasks(ctx, req) (*pb.InitUserTasksResponse, error)— 提取req.UserId和req.StarId,调用onboardingService.InitUserTasks(ctx, userID, starID)OnExhibitionCompleted(ctx, req) (*pb.OnExhibitionCompletedResponse, error)— 调用revenueService.OnExhibitionCompleted
内部 RPC 不需要认证中间件(由其他服务调用)。
阶段五:Worker
任务 13:每日重置 Worker
文件:
- 新增:
backend/services/taskService/worker/daily_reset_worker.go
package worker
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/taskService/client"
"github.com/topfans/backend/services/taskService/config"
"github.com/topfans/backend/services/taskService/repository"
"go.uber.org/zap"
)
type DailyResetWorker struct {
dailyRepo *repository.DailyTaskRepository
revenueRepo *repository.RevenueRepository
userClient client.UserServiceClient
stopCh chan struct{}
wg sync.WaitGroup
}
func NewDailyResetWorker(dailyRepo *repository.DailyTaskRepository, revenueRepo *repository.RevenueRepository, userClient client.UserServiceClient) *DailyResetWorker {
return &DailyResetWorker{
dailyRepo: dailyRepo,
revenueRepo: revenueRepo,
userClient: userClient,
stopCh: make(chan struct{}),
}
}
func (w *DailyResetWorker) Start() {
w.wg.Add(1)
go w.runLoop()
logger.Logger.Info("DailyResetWorker started")
}
func (w *DailyResetWorker) Stop() {
close(w.stopCh)
w.wg.Wait()
logger.Logger.Info("DailyResetWorker stopped")
}
func (w *DailyResetWorker) runLoop() {
defer w.wg.Done()
for {
now := time.Now()
loc, _ := time.LoadLocation("Asia/Shanghai")
next := time.Date(now.Year(), now.Month(), now.Day(), config.WorkerConfigData.ResetHour, config.WorkerConfigData.ResetMinute, 0, 0, loc)
if next.Before(now) {
next = next.Add(24 * time.Hour)
}
waitDuration := next.Sub(now)
logger.Logger.Info(fmt.Sprintf("Next daily reset at %s (in %v)", next.Format(time.RFC3339), waitDuration))
select {
case <-time.After(waitDuration):
w.doResetAndAutoClaim()
case <-w.stopCh:
return
}
}
}
func (w *DailyResetWorker) doResetAndAutoClaim() {
lockKey := time.Now().Format("20060102")
lockID, _ := strconv.ParseInt(lockKey, 10, 64)
acquired := w.dailyRepo.AcquireAdvisoryLock(lockID)
if !acquired {
logger.Logger.Info("Another instance is running daily reset, skipping")
return
}
defer w.dailyRepo.ReleaseAdvisoryLock(lockID)
// 1. 重置每日任务
resetCount, err := w.dailyRepo.ResetAllDailyTasks()
if err != nil {
logger.Logger.Error("Failed to reset daily tasks", zap.Error(err))
} else {
logger.Logger.Info(fmt.Sprintf("Daily tasks reset: %d records updated", resetCount))
}
// 2. 自动发放展示收益
w.autoClaimExhibitionRevenue()
}
func (w *DailyResetWorker) autoClaimExhibitionRevenue() {
batchSize := config.WorkerConfigData.RevenueBatchSize
maxRetries := config.WorkerConfigData.RevenueMaxRetries
totalClaimed := 0
for {
records, err := w.revenueRepo.ListClaimableRevenue(batchSize)
if err != nil {
logger.Logger.Error("Failed to list claimable revenue", zap.Error(err))
break
}
if len(records) == 0 {
break
}
for _, record := range records {
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
_, err := w.userClient.UpdateCrystalBalance(context.Background(), record.UserID, record.StarID, record.CrystalAmount)
if err == nil {
w.revenueRepo.UpdateRevenueStatus(record.ID, "claimed")
totalClaimed++
break
}
lastErr = err
time.Sleep(100 * time.Millisecond)
}
if lastErr != nil {
w.revenueRepo.UpdateRevenueStatus(record.ID, "failed")
logger.Logger.Error("Failed to auto-claim revenue after retries",
zap.Int64("record_id", record.ID), zap.Error(lastErr))
}
}
}
logger.Logger.Info(fmt.Sprintf("Auto-claim completed: %d records claimed", totalClaimed))
}
阶段六:main.go
任务 14:服务入口
文件:
- 新增:
backend/services/taskService/main.go
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"syscall"
dubboclient "dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/server"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/pkg/proto/task"
pbUser "github.com/topfans/backend/pkg/proto/user"
"github.com/topfans/backend/services/taskService/client"
"github.com/topfans/backend/services/taskService/config"
"github.com/topfans/backend/services/taskService/model"
"github.com/topfans/backend/services/taskService/provider"
"github.com/topfans/backend/services/taskService/repository"
"github.com/topfans/backend/services/taskService/service"
"github.com/topfans/backend/services/taskService/worker"
)
var port = flag.Int("port", 20005, "Dubbo service port")
func main() {
flag.Parse()
// 1. Init logger(必须最前)
env := os.Getenv("ENV")
if env == "" { env = "development" }
if err := logger.Init(logger.Config{ServiceName: "task-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil {
panic(fmt.Sprintf("Failed to init logger: %v", err))
}
defer logger.Sync()
logger.Logger.Info("Starting taskService...")
// 2. Init config(读取 flags/env)
config.InitConfig()
// 3. Init database + auto-migrate
dbConfig := database.Config{
Host: config.DBConfig.Host, Port: config.DBConfig.Port,
User: config.DBConfig.User, Password: config.DBConfig.Password,
DBName: config.DBConfig.DBName, SSLMode: config.DBConfig.SSLMode,
TimeZone: "Asia/Shanghai",
}
if err := database.Init(dbConfig); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to init DB: %v", err))
}
db := database.GetDB()
if err := db.AutoMigrate(
&model.TaskDefinition{},
&model.UserDailyTaskProgress{},
&model.UserOnboardingProgress{},
&model.UserOnboardingStatus{},
&model.OnboardingStageConfig{},
&model.ExhibitionRevenueRecord{},
&model.TaskResetLog{},
); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to migrate tables: %v", err))
}
logger.Logger.Info("Database initialized")
// 4. Init repositories
dailyRepo := repository.NewDailyTaskRepository(db)
onboardingRepo := repository.NewOnboardingRepository(db)
revenueRepo := repository.NewRevenueRepository(db)
logger.Logger.Info("Repositories initialized")
// 5. Init userService Dubbo client + RPC client
userCli, err := dubboclient.NewClient(dubboclient.WithClientURL(config.ServiceURLsConfig.UserService))
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to create userService client: %v", err))
}
userServiceClient, err := pbUser.NewUserSocialService(userCli)
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to get userService: %v", err))
}
userRPCClient := client.NewUserServiceClient(userServiceClient)
logger.Logger.Info("User RPC client initialized")
// 6. Init services
dailySvc := service.NewDailyTaskService(dailyRepo, userRPCClient)
onboardingSvc := service.NewOnboardingService(onboardingRepo, dailyRepo, userRPCClient)
revenueSvc := service.NewRevenueService(revenueRepo, userRPCClient)
logger.Logger.Info("Services initialized")
// 7. Init worker(goroutine 中启动)
resetWorker := worker.NewDailyResetWorker(dailyRepo, revenueRepo, userRPCClient)
go resetWorker.Start()
logger.Logger.Info("Reset worker started")
// 8. Init providers
mobileProvider := provider.NewTaskMobileProvider(dailySvc, onboardingSvc, revenueSvc)
internalProvider := provider.NewTaskInternalProvider(onboardingSvc, revenueSvc)
logger.Logger.Info("Providers initialized")
// 9. Create Dubbo server on port 20005
srv, err := server.NewServer(
server.WithServerProtocol(protocol.WithPort(*port), protocol.WithTriple()),
)
if err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to create server: %v", err))
}
if err := task.RegisterTaskMobileServiceHandler(srv, mobileProvider); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to register TaskMobileService: %v", err))
}
if err := task.RegisterTaskInternalServiceHandler(srv, internalProvider); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to register TaskInternalService: %v", err))
}
if err := srv.Serve(); err != nil {
logger.Logger.Fatal(fmt.Sprintf("Failed to serve: %v", err))
}
logger.Logger.Info(fmt.Sprintf("taskService started on port %d", *port))
// 10. Graceful shutdown
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
logger.Logger.Info("Shutting down taskService...")
resetWorker.Stop()
}
阶段七:Proto 编译
任务 15:Proto 编译
步骤 1: 修改 backend/scripts/compile-proto.sh:
- 在循环(第 56 行左右)中添加
task - 在清理块(第 159 行左右)中添加
task - 在 activity.proto 编译块之后添加 task.proto 编译块
mkdir 循环中:
for name in common user social asset gallery ranking activity task; do
mkdir 创建目录:
mkdir -p pkg/proto/task
activity.proto 编译块之后添加:
# 编译 task.proto
echo "📦 编译 task.proto ..."
protoc --proto_path=proto \
--proto_path=. \
--go_out=pkg/proto/task \
--go_opt=paths=source_relative \
--go-triple_out=pkg/proto/task \
--go-triple_opt=paths=source_relative \
task.proto
echo "✅ task.proto 编译完成"
清理循环中也需要添加 task。
步骤 2: 运行:cd backend && sh scripts/compile-proto.sh
阶段九:移动端前端
任务 16:任务 API 工具函数
文件:
- 新增:
frontend/utils/task-api.js
复用 frontend/utils/api.js 中的 request 封装:
import { request } from './api.js'
// baseURL 已配置在 api.js 中,指向 gateway(端口 8080)
// 所有 task API 通过 gateway 路由到 taskService(Triple/HTTP)
export const getDailyTasks = (starId) =>
request({ url: '/api/tasks/daily', data: { star_id: starId } })
export const reportEvent = (eventType, starId) =>
request({ url: '/api/tasks/report-event', method: 'POST', data: { event_type: eventType, star_id: starId } })
export const claimDailyTask = (taskKey, starId) =>
request({ url: '/api/tasks/daily/claim', method: 'POST', data: { task_key: taskKey, star_id: starId } })
export const claimAllDailyTasks = (starId) =>
request({ url: '/api/tasks/daily/claim-all', method: 'POST', data: { star_id: starId } })
export const completeGuide = (taskKey) =>
request({ url: '/api/tasks/guide/complete', method: 'POST', data: { task_key: taskKey } })
export const getOnboardingStatus = () =>
request({ url: '/api/tasks/onboarding/status' })
export const advanceStage = (targetStage) =>
request({ url: '/api/tasks/onboarding/advance-stage', method: 'POST', data: { target_stage: targetStage } })
export const claimOnboardingReward = (stage) =>
request({ url: '/api/tasks/onboarding/claim-reward', method: 'POST', data: { stage } })
export const getExhibitionRevenue = (starId, status, page, pageSize) =>
request({ url: '/api/tasks/exhibition-revenue', data: { star_id: starId, status, page, page_size: pageSize } })
export const claimExhibitionRevenue = (revenueId, starId) =>
request({ url: '/api/tasks/exhibition-revenue/claim', method: 'POST', data: { revenue_id: revenueId, star_id: starId } })
export const claimAllExhibitionRevenue = (starId) =>
request({ url: '/api/tasks/exhibition-revenue/claim-all', method: 'POST', data: { star_id: starId } })
任务 17:每日任务页面
文件:
- 新增:
frontend/pages/tasks/daily-tasks.vue
功能:
- 通过
uni.getStorageSync('fan_profile')获取当前 star_id - 挂载时调用
getDailyTasks(starId) - 展示任务列表:名称、描述、水晶奖励、经验奖励、状态标签(pending=蓝色, completed=绿色可领取, claimed=灰色)
- "领取"按钮仅在
can_claim=true时可点击 - 底部有"一键领取"按钮(有待领取任务时显示)
- 调用
claimDailyTask/claimAllDailyTasks,带加载状态 - 支持下拉刷新
uni.startPullDownRefresh - 任务列表为空时显示空状态
任务 18:引导任务页面
文件:
- 新增:
frontend/pages/tasks/guide.vue
功能:
- 挂载时调用
getOnboardingStatus() - 展示阶段列表:当前阶段高亮
- 每阶段显示:名称、所需任务 keys(含已完成/未完成标记)
- "进入下一阶段"按钮 — 仅当当前阶段所有任务完成时才可点击,调用
advanceStage(targetStage) - 当前阶段"领取奖励"按钮 — 调用
claimOnboardingReward(currentStage) - 用户完成每个引导步骤后(如进入 square_home)调用
completeGuide
任务 19:收益页面
文件:
- 新增:
frontend/pages/tasks/revenue.vue
功能:
- Tab 切换:"可领取" / "已领取",使用
uni segmented-control或 Tab 样式按钮 - 默认 Tab = "可领取",显示
can_claim=true的项目 - 每条记录:展品缩略图(无可用则占位)、展位类型标签("自己的展位"/"好友展位")、水晶数量、周期时间范围
- 单条"领取"按钮和"一键领取"按钮
- 分页加载
uni.loadMore - 下拉刷新
任务 20:页面路由
文件:
- 修改:
frontend/pages.json
在 pages 数组中添加:
{
"path": "pages/tasks/daily-tasks",
"style": { "navigationStyle": "custom" }
},
{
"path": "pages/tasks/guide",
"style": { "navigationStyle": "custom" }
},
{
"path": "pages/tasks/revenue",
"style": { "navigationStyle": "custom" }
}
阶段十:集成与测试
任务 21:手动 API 测试
通过 gateway 测试(假设 taskService 已在 Dubbo 注册,或通过 gateway HTTP 路由到 20005):
GET /api/tasks/daily?star_id=1→ 返回任务列表POST /api/tasks/report-event {event_type:"daily_login",star_id:1}→ task_completed=trueGET /api/tasks/onboarding/status→ 返回 current_stage:0POST /api/tasks/guide/complete {task_key:"square_home"}→ 创建进度,返回阶段信息GET /api/tasks/exhibition-revenue?star_id=1&page=1&page_size=20→ 返回收益列表POST /api/tasks/exhibition-revenue/claim {revenue_id:1,star_id:1}→ 领取单个POST /api/tasks/exhibition-revenue/claim-all {star_id:1}→ 一键领取