topfans/docs/superpowers/plans/2026-04-14-task-service-implementation.md

43 KiB
Raw Blame History

taskService 实现计划

面向智能体工程师: 必需子技能:使用 superpowers:subagent-driven-development(推荐)或 superpowers:executing-plans 来逐任务实现本计划。步骤使用复选框(- [ ])语法进行跟踪。

目标: 实现 taskServiceGo Dubbo-go后端包含移动端 API 和移动端前端页面(每日任务、引导流程、展示收益)。

架构:

  • taskServiceGo Dubbo-go 服务,端口 20005暴露 HTTP/Triple 移动端 API 和内部 RPCTaskInternalService
  • 移动端前端Vue/uni-app 页面,通过 gateway 调用 taskService HTTP API
  • 共享 PostgreSQL 数据库存储任务数据
  • taskService 调用 userService RPC 发放水晶/经验奖励

技术栈: Go (dubbo-go v3)、GORM、PostgreSQL、Vue/uni-app


文件结构

后端 (taskService)

backend/
├── proto/
│   └── task.proto                          # 新增TaskMobileService + TaskInternalService 定义
├── pkg/
│   └── proto/
│       └── task/
│           ├── task.pb.go                  # 新增proto 编译生成
│           └── task.triple.go              # 新增proto 编译生成
├── services/
│   └── taskService/                       # 新增
│       ├── main.go                         # 新增:服务入口
│       ├── config/
│       │   └── task_config.go             # 新增:配置
│       ├── model/
│       │   └── task_models.go             # 新增:所有任务表对应的 GORM 模型
│       ├── repository/
│       │   ├── daily_task_repo.go         # 新增
│       │   ├── onboarding_repo.go          # 新增
│       │   └── revenue_repo.go            # 新增
│       ├── service/
│       │   ├── daily_task_service.go       # 新增
│       │   ├── onboarding_service.go       # 新增
│       │   └── revenue_service.go          # 新增
│       ├── provider/
│       │   ├── task_mobile_provider.go     # 新增:移动端 APIHTTP/Triple
│       │   └── task_internal_provider.go   # 新增:内部 RPCTaskInternalService
│       ├── worker/
│       │   └── daily_reset_worker.go      # 新增:每日 05:00 重置 + 自动发放
│       └── client/
│           └── user_rpc_client.go          # 新增:调用 userService 的 UpdateCrystalBalance、AddExperience

前端(移动端)

frontend/
├── pages/
│   └── tasks/                              # 新增:任务页面目录
│       ├── daily-tasks.vue                  # 新增:每日任务页面
│       ├── guide.vue                        # 新增:引导任务页面
│       └── revenue.vue                      # 新增:展示收益页面
├── utils/
│   └── task-api.js                         # 新增taskService API 调用封装
└── pages.json                              # 修改:新增页面路由

数据库迁移

backend/
└── scripts/
    └── v001_init_task_tables.sql           # 新增7 张任务相关表的 DDL

关键前提:必须先在 user.proto 中添加 AddExperience

设计文档(设计文档第 101 行)要求在 UserSocialService 上添加 AddExperience RPC但当前 backend/proto/user.proto 中没有该 RPC。taskService 编译依赖此 RPC必须先完成。

修改:backend/proto/user.proto — 在 UserSocialService 中添加:

// 更新经验值请求内部RPC调用用于taskService发放经验奖励
message AddExperienceRequest {
  int64 user_id = 1;
  int64 star_id = 2;
  int64 delta = 3;
}

message AddExperienceResponse {
  topfans.common.BaseResponse base = 1;
  int64 new_experience = 2;
}

// 在 service UserSocialService 中添加:
rpc AddExperience(AddExperienceRequest) returns (AddExperienceResponse);

编辑完 user.proto 后,运行:cd backend && sh scripts/compile-proto.sh


阶段一taskService 后端基础设施

任务 1Proto 定义

文件:

  • 新增:backend/proto/task.proto
syntax = "proto3";

package topfans.task;

option go_package = "github.com/topfans/backend/pkg/proto/task;task";

import "proto/common.proto";
import "google/api/annotations.proto";

// ==================== 每日任务 ====================

message DailyTaskItem {
  string task_key = 1;
  int64 star_id = 2;
  string name = 3;
  string description = 4;
  int64 crystal_reward = 5;
  int64 exp_reward = 6;
  string status = 7;       // pending/completed/claimed
  bool can_claim = 8;
}

message GetDailyTasksRequest {
  int64 star_id = 1;
}

message GetDailyTasksResponse {
  topfans.common.BaseResponse base = 1;
  int64 star_id = 2;
  repeated DailyTaskItem tasks = 3;
}

message ReportEventRequest {
  string event_type = 1;  // 如 "daily_browse_asset", "daily_login"
  int64 star_id = 2;
}

message ReportEventResponse {
  topfans.common.BaseResponse base = 1;
  bool success = 2;
  string task_key = 3;
  bool task_completed = 4;
  string message = 5;
}

message ClaimDailyTaskRequest {
  string task_key = 1;
  int64 star_id = 2;
}

message ClaimDailyTaskResponse {
  topfans.common.BaseResponse base = 1;
  bool success = 2;
}

message ClaimAllDailyTasksRequest {
  int64 star_id = 1;
}

message ClaimAllDailyTasksResponse {
  topfans.common.BaseResponse base = 1;
  int32 claimed_count = 2;
}

// ==================== 引导任务 ====================

message OnboardingStage {
  int32 stage = 1;
  string name = 2;
  repeated string required_task_keys = 3;
  int64 crystal_reward = 4;
  int64 exp_reward = 5;
  string status = 6;         // pending/completed/in_progress
  bool is_current = 7;
}

message CompleteGuideRequest {
  string task_key = 1;
}

message CompleteGuideResponse {
  topfans.common.BaseResponse base = 1;
  int64 user_id = 2;
  int32 current_stage = 3;
  string status = 4;         // pending/in_progress/completed
  repeated OnboardingStage stages = 5;
}

message GetOnboardingStatusRequest {}

message GetOnboardingStatusResponse {
  topfans.common.BaseResponse base = 1;
  int64 user_id = 2;
  int32 current_stage = 3;
  string status = 4;
  repeated OnboardingStage stages = 5;
}

message AdvanceStageRequest {
  int32 target_stage = 1;
}

message AdvanceStageResponse {
  topfans.common.BaseResponse base = 1;
  int32 current_stage = 2;
  string status = 3;
  repeated OnboardingStage stages = 4;
}

message ClaimOnboardingRewardRequest {
  int32 stage = 1;
}

message ClaimOnboardingRewardResponse {
  topfans.common.BaseResponse base = 1;
  bool success = 2;
}

// ==================== 展示收益 ====================

message ExhibitionRevenueItem {
  int64 id = 1;
  int64 star_id = 2;
  int64 exhibition_id = 3;
  int64 asset_id = 4;
  int64 slot_id = 5;
  string slot_type = 6;      // own/friend
  int64 crystal_amount = 7;
  int64 cycle_start_time = 8;
  int64 cycle_end_time = 9;
  string status = 10;         // claimable/claimed/failed
  bool can_claim = 11;
}

message GetExhibitionRevenueRequest {
  int64 star_id = 1;
  string status = 2;          // 可选筛选
  int32 page = 3;
  int32 page_size = 4;
}

message GetExhibitionRevenueResponse {
  topfans.common.BaseResponse base = 1;
  repeated ExhibitionRevenueItem items = 2;
  int64 total = 3;
  int32 page = 4;
  int32 page_size = 5;
}

message ClaimExhibitionRevenueRequest {
  int64 revenue_id = 1;
  int64 star_id = 2;
}

message ClaimExhibitionRevenueResponse {
  topfans.common.BaseResponse base = 1;
  bool success = 2;
}

message ClaimAllExhibitionRevenueRequest {
  int64 star_id = 1;
}

message ClaimAllExhibitionRevenueResponse {
  topfans.common.BaseResponse base = 1;
  int32 claimed_count = 2;
}

// ==================== 内部RPC服务 ====================

message InitUserTasksRequest {
  int64 user_id = 1;
  int64 star_id = 2;
}

message InitUserTasksResponse {
  topfans.common.BaseResponse base = 1;
  bool success = 2;
}

message OnExhibitionCompletedRequest {
  int64 exhibition_id = 1;
  int64 asset_id = 2;
  int64 slot_id = 3;
  int64 occupier_uid = 4;
  int64 occupier_star_id = 5;
  int64 slot_owner_uid = 6;
  int64 crystal_amount = 7;
  int64 start_time = 8;
  int64 expire_at = 9;
}

message OnExhibitionCompletedResponse {
  topfans.common.BaseResponse base = 1;
  int64 revenue_record_id = 2;
}

// ==================== Mobile API Service ====================

service TaskMobileService {
  rpc GetDailyTasks(GetDailyTasksRequest) returns (GetDailyTasksResponse) {
    option (google.api.http) = { get: "/api/tasks/daily"; };
  }
  rpc ReportEvent(ReportEventRequest) returns (ReportEventResponse) {
    option (google.api.http) = { post: "/api/tasks/report-event"; body: "*"; };
  }
  rpc ClaimDailyTask(ClaimDailyTaskRequest) returns (ClaimDailyTaskResponse) {
    option (google.api.http) = { post: "/api/tasks/daily/claim"; body: "*"; };
  }
  rpc ClaimAllDailyTasks(ClaimAllDailyTasksRequest) returns (ClaimAllDailyTasksResponse) {
    option (google.api.http) = { post: "/api/tasks/daily/claim-all"; body: "*"; };
  }
  rpc CompleteGuide(CompleteGuideRequest) returns (CompleteGuideResponse) {
    option (google.api.http) = { post: "/api/tasks/guide/complete"; body: "*"; };
  }
  rpc GetOnboardingStatus(GetOnboardingStatusRequest) returns (GetOnboardingStatusResponse) {
    option (google.api.http) = { get: "/api/tasks/onboarding/status"; };
  }
  rpc AdvanceStage(AdvanceStageRequest) returns (AdvanceStageResponse) {
    option (google.api.http) = { post: "/api/tasks/onboarding/advance-stage"; body: "*"; };
  }
  rpc ClaimOnboardingReward(ClaimOnboardingRewardRequest) returns (ClaimOnboardingRewardResponse) {
    option (google.api.http) = { post: "/api/tasks/onboarding/claim-reward"; body: "*"; };
  }
  rpc GetExhibitionRevenue(GetExhibitionRevenueRequest) returns (GetExhibitionRevenueResponse) {
    option (google.api.http) = { get: "/api/tasks/exhibition-revenue"; };
  }
  rpc ClaimExhibitionRevenue(ClaimExhibitionRevenueRequest) returns (ClaimExhibitionRevenueResponse) {
    option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim"; body: "*"; };
  }
  rpc ClaimAllExhibitionRevenue(ClaimAllExhibitionRevenueRequest) returns (ClaimAllExhibitionRevenueResponse) {
    option (google.api.http) = { post: "/api/tasks/exhibition-revenue/claim-all"; body: "*"; };
  }
}

// ==================== Internal RPC Service ====================

service TaskInternalService {
  rpc InitUserTasks(InitUserTasksRequest) returns (InitUserTasksResponse);
  rpc OnExhibitionCompleted(OnExhibitionCompletedRequest) returns (OnExhibitionCompletedResponse);
}

任务 2数据库模型

文件:

  • 新增:backend/services/taskService/model/task_models.go
package model

// TaskDefinition 任务定义表
type TaskDefinition struct {
    ID            int64   `gorm:"primaryKey;column:id;autoIncrement"`
    StarID        *int64  `gorm:"column:star_id;index"`                         // NULL=全局默认
    TaskKey       string  `gorm:"column:task_key;size:50;not null"`
    TaskType      string  `gorm:"column:task_type;size:20;not null"`           // daily/onboarding
    Name          string  `gorm:"column:name;size:100;not null"`
    Description   string  `gorm:"column:description;type:text"`
    CrystalReward int64   `gorm:"column:crystal_reward;default:0"`
    ExpReward     int64   `gorm:"column:exp_reward;default:0"`
    SortOrder     int     `gorm:"column:sort_order;default:0"`
    IsActive      bool    `gorm:"column:is_active;default:true"`
    CreatedAt     int64   `gorm:"column:created_at"`
    UpdatedAt     int64   `gorm:"column:updated_at"`
}

func (TaskDefinition) TableName() string { return "task_definitions" }

// UserDailyTaskProgress 每日任务进度表
type UserDailyTaskProgress struct {
    ID          int64  `gorm:"primaryKey;column:id;autoIncrement"`
    UserID      int64  `gorm:"column:user_id;not null;index:idx_daily_user_star_key"`
    StarID      int64  `gorm:"column:star_id;not null;index:idx_daily_user_star_key"`
    TaskKey     string `gorm:"column:task_key;size:50;not null;index:idx_daily_user_star_key"`
    Status      string `gorm:"column:status;size:20;default:pending"` // pending/completed/claimed
    CompletedAt *int64 `gorm:"column:completed_at"`
    ClaimedAt   *int64 `gorm:"column:claimed_at"`
    CreatedAt   int64  `gorm:"column:created_at"`
    UpdatedAt   int64  `gorm:"column:updated_at"`
}

func (UserDailyTaskProgress) TableName() string { return "user_daily_task_progress" }

// UserOnboardingProgress 引导任务进度表
type UserOnboardingProgress struct {
    ID          int64  `gorm:"primaryKey;column:id;autoIncrement"`
    UserID      int64  `gorm:"column:user_id;not null;index:idx_onboard_user_key"`
    TaskKey     string `gorm:"column:task_key;size:50;not null;index:idx_onboard_user_key"`
    Status      string `gorm:"column:status;size:20;default:pending"`
    CompletedAt *int64 `gorm:"column:completed_at"`
    ClaimedAt   *int64 `gorm:"column:claimed_at"`
    CreatedAt   int64  `gorm:"column:created_at"`
    UpdatedAt   int64  `gorm:"column:updated_at"`
}

func (UserOnboardingProgress) TableName() string { return "user_onboarding_progress" }

// UserOnboardingStatus 引导流程状态表per-user非 per-star
type UserOnboardingStatus struct {
    UserID                    int64  `gorm:"primaryKey;column:user_id"`
    CurrentStage              int    `gorm:"column:current_stage;default:0"`
    Status                    string `gorm:"column:status;size:20;default:pending"`
    IsFirstLoginBonusClaimed  bool   `gorm:"column:is_first_login_bonus_claimed;default:false"` // 废弃字段
    HasFriendDisplayBonus     bool   `gorm:"column:has_friend_display_bonus;default:false"`     // 废弃字段
    CompletedAt               *int64 `gorm:"column:completed_at"`
    ClaimedAt                 *int64 `gorm:"column:claimed_at"`
    CreatedAt                 int64  `gorm:"column:created_at"`
    UpdatedAt                 int64  `gorm:"column:updated_at"`
}

func (UserOnboardingStatus) TableName() string { return "user_onboarding_status" }

// OnboardingStageConfig 引导阶段配置表
type OnboardingStageConfig struct {
    ID               int64    `gorm:"primaryKey;column:id;autoIncrement"`
    Stage            int      `gorm:"column:stage;not null;uniqueIndex"`
    Name             string   `gorm:"column:name;size:100;not null"`
    Description      string   `gorm:"column:description;type:text"`
    RequiredTaskKeys []string `gorm:"column:required_task_keys;text[]"` // PostgreSQL 数组
    CrystalReward    int64    `gorm:"column:crystal_reward;default:0"`
    ExpReward        int64    `gorm:"column:exp_reward;default:0"`
    SortOrder        int      `gorm:"column:sort_order;default:0"`
    IsActive         bool     `gorm:"column:is_active;default:true"`
    CreatedAt        int64    `gorm:"column:created_at"`
    UpdatedAt        int64    `gorm:"column:updated_at"`
}

func (OnboardingStageConfig) TableName() string { return "onboarding_stage_config" }

// ExhibitionRevenueRecord 展示收益记录表
type ExhibitionRevenueRecord struct {
    ID             int64  `gorm:"primaryKey;column:id;autoIncrement"`
    UserID         int64  `gorm:"column:user_id;not null;index:idx_revenue_user_star"`
    StarID         int64  `gorm:"column:star_id;not null;index:idx_revenue_user_star"`
    ExhibitionID   int64  `gorm:"column:exhibition_id;not null"`
    AssetID        int64  `gorm:"column:asset_id;not null"`
    SlotID         int64  `gorm:"column:slot_id;not null"`
    SlotOwnerUID   int64  `gorm:"column:slot_owner_uid;not null"`
    SlotType       string `gorm:"column:slot_type;size:20;not null"` // own/friend
    CrystalAmount  int64  `gorm:"column:crystal_amount;not null"`
    CycleStartTime int64  `gorm:"column:cycle_start_time;not null"`
    CycleEndTime   int64  `gorm:"column:cycle_end_time;not null"`
    Status         string `gorm:"column:status;size:20;default:claimable"` // claimable/claimed/failed
    ClaimedAt      *int64 `gorm:"column:claimed_at"`
    CreatedAt      int64  `gorm:"column:created_at"`
}

func (ExhibitionRevenueRecord) TableName() string { return "exhibition_revenue_records" }

// TaskResetLog 重置日志表
type TaskResetLog struct {
    ID          int64 `gorm:"primaryKey;column:id;autoIncrement"`
    ResetType   string `gorm:"column:reset_type;size:20;not null"` // daily
    LastResetAt int64  `gorm:"column:last_reset_at;not null"`
    CreatedAt   int64  `gorm:"column:created_at"`
}

func (TaskResetLog) TableName() string { return "task_reset_log" }

任务 3配置文件

文件:

  • 新增:backend/services/taskService/config/task_config.go
package config

import (
    "flag"
    "log"
    "os"
    "strconv"
)

type DatabaseConfig struct {
    Host, Password, DBName, SSLMode, TimeZone string
    Port                                         int
    User                                         string
}

type ServiceURLs struct {
    UserService string
}

type WorkerConfig struct {
    ResetHour, ResetMinute int
    RevenueBatchSize      int
    RevenueMaxRetries     int
}

var (
    DBConfig             = &DatabaseConfig{}
    ServiceURLsConfig    = &ServiceURLs{UserService: "tri://localhost:20000"}
    WorkerConfigData     = &WorkerConfig{
        ResetHour: 5, ResetMinute: 0,
        RevenueBatchSize: 100, RevenueMaxRetries: 3,
    }
)

func getEnv(key, fallback string) string {
    if v := os.Getenv(key); v != "" { return v }
    return fallback
}

func getEnvInt(key string, fallback int) int {
    if v := os.Getenv(key); v != "" {
        if n, err := strconv.Atoi(v); err == nil { return n }
    }
    return fallback
}

func InitConfig() {
    flag.StringVar(&DBConfig.Host, "db-host", getEnv("DB_HOST", "localhost"), "数据库主机")
    flag.IntVar(&DBConfig.Port, "db-port", getEnvInt("DB_PORT", 5432), "数据库端口")
    flag.StringVar(&DBConfig.User, "db-user", getEnv("DB_USER", "postgres"), "数据库用户名")
    flag.StringVar(&DBConfig.Password, "db-password", getEnv("DB_PASSWORD", ""), "数据库密码")
    flag.StringVar(&DBConfig.DBName, "db-name", getEnv("DB_NAME", "topfans"), "数据库名称")
    flag.StringVar(&DBConfig.SSLMode, "db-sslmode", "disable", "数据库 SSL 模式")
    flag.StringVar(&ServiceURLsConfig.UserService, "user-service-url", getEnv("USER_SERVICE_URL", "tri://localhost:20000"), "User Service RPC 地址")
    flag.IntVar(&WorkerConfigData.ResetHour, "reset-hour", getEnvInt("RESET_HOUR", 5), "每日重置小时Asia/Shanghai")
    flag.IntVar(&WorkerConfigData.ResetMinute, "reset-minute", getEnvInt("RESET_MINUTE", 0), "每日重置分钟")
    flag.IntVar(&WorkerConfigData.RevenueBatchSize, "revenue-batch-size", getEnvInt("REVENUE_BATCH_SIZE", 100), "收益自动发放批次大小")
    flag.IntVar(&WorkerConfigData.RevenueMaxRetries, "revenue-max-retries", getEnvInt("REVENUE_MAX_RETRIES", 3), "收益自动发放最大重试次数")
    flag.Parse()
    log.Println("taskService 配置初始化完成")
    log.Printf("  数据库: %s:%d/%s", DBConfig.Host, DBConfig.Port, DBConfig.DBName)
    log.Printf("  User Service: %s", ServiceURLsConfig.UserService)
    log.Printf("  重置时间: %02d:%02d Asia/Shanghai", WorkerConfigData.ResetHour, WorkerConfigData.ResetMinute)
}

任务 4userService RPC 客户端

文件:

  • 新增:backend/services/taskService/client/user_rpc_client.go

模式参考 assetService/client/user_rpc_client.go。客户端封装体接收的是服务接口(而非 *client.Client)。服务接口创建(在 main.go 中调用 pbUser.NewUserSocialService)在 main.go 中完成。

package client

import (
    "context"
    "github.com/topfans/backend/pkg/logger"
    pbUser "github.com/topfans/backend/pkg/proto/user"
    "go.uber.org/zap"
)

type UserServiceClient interface {
    UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error)
    AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error)
}

type userServiceClient struct {
    client pbUser.UserSocialService
}

func NewUserServiceClient(client pbUser.UserSocialService) UserServiceClient {
    return &userServiceClient{client: client}
}

func (c *userServiceClient) UpdateCrystalBalance(ctx context.Context, userID, starID int64, delta int64) (int64, error) {
    logger.Logger.Debug("Calling UserService.UpdateCrystalBalance",
        zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta))
    resp, err := c.client.UpdateCrystalBalance(ctx, &pbUser.UpdateCrystalBalanceRequest{
        UserId: userID, StarId: starID, Delta: delta,
    })
    if err != nil {
        logger.Logger.Error("UserService.UpdateCrystalBalance failed", zap.Error(err))
        return 0, err
    }
    if resp.Base.Code != 0 {
        logger.Logger.Warn("UpdateCrystalBalance non-zero code", zap.Int32("code", int32(resp.Base.Code)))
        return 0, err
    }
    return resp.NewBalance, nil
}

func (c *userServiceClient) AddExperience(ctx context.Context, userID, starID int64, delta int64) (int64, error) {
    logger.Logger.Debug("Calling UserService.AddExperience",
        zap.Int64("user_id", userID), zap.Int64("star_id", starID), zap.Int64("delta", delta))
    resp, err := c.client.AddExperience(ctx, &pbUser.AddExperienceRequest{
        UserId: userID, StarId: starID, Delta: delta,
    })
    if err != nil {
        logger.Logger.Error("UserService.AddExperience failed", zap.Error(err))
        return 0, err
    }
    if resp.Base.Code != 0 {
        logger.Logger.Warn("AddExperience non-zero code", zap.Int32("code", int32(resp.Base.Code)))
        return 0, err
    }
    return resp.NewExperience, nil
}

阶段二Repository 层

任务 5每日任务 Repository

文件:

  • 新增:backend/services/taskService/repository/daily_task_repo.go

主要方法:

  • GetUserDailyProgress(userID, starID, taskKey) (*model.UserDailyTaskProgress, error)
  • GetOrCreateDailyProgress(userID, starID, taskKey, def *model.TaskDefinition) (*model.UserDailyTaskProgress, error)
  • ListDailyTasksByUser(userID, starID) ([]*model.UserDailyTaskProgress, error)
  • ListActiveDailyTaskDefinitions(starID) ([]*model.TaskDefinition, error) — 返回 star_id 相关 + 全局默认
  • UpdateDailyProgress(progress *model.UserDailyTaskProgress) error
  • ResetAllDailyTasks() (int64, error) — 重置所有非 pending 状态为 pending
  • InitDailyTasksForUser(userID, starID) error — InitUserTasks RPC 调用,为该用户创建该 star_id 下所有每日任务进度
  • AcquireAdvisoryLock(lockID int64) boolSELECT pg_try_advisory_lock($1),返回是否获取成功
  • ReleaseAdvisoryLock(lockID int64)SELECT pg_advisory_unlock($1)

任务 6引导任务 Repository

文件:

  • 新增:backend/services/taskService/repository/onboarding_repo.go

主要方法:

  • GetOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error)
  • GetOrCreateOnboardingStatus(userID int64) (*model.UserOnboardingStatus, error) — 获取或新建current_stage=0, status=pending
  • UpdateOnboardingStatus(status *model.UserOnboardingStatus) error
  • GetUserOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error)
  • GetOrCreateOnboardingProgress(userID int64, taskKey string) (*model.UserOnboardingProgress, error)
  • ListActiveStageConfigs() ([]*model.OnboardingStageConfig, error)
  • ListUserOnboardingProgressByUser(userID int64) ([]*model.UserOnboardingProgress, error)
  • GetStageConfig(stage int) (*model.OnboardingStageConfig, error)

任务 7收益 Repository

文件:

  • 新增:backend/services/taskService/repository/revenue_repo.go

主要方法:

  • CreateRevenueRecord(record *model.ExhibitionRevenueRecord) (*model.ExhibitionRevenueRecord, error)
  • GetRevenueRecord(id int64) (*model.ExhibitionRevenueRecord, error)
  • ListRevenueByUser(userID, starID int64, status string, page, pageSize int) ([]*model.ExhibitionRevenueRecord, int64, error)
  • ClaimRevenueRecord(id int64, userID int64) (bool, error) — 乐观锁:仅在 status='claimable' 时更新,返回是否更新成功
  • UpdateRevenueStatus(id int64, status string) error — Worker 自动发放后调用claimed/failed
  • ListClaimableRevenue(limit int) ([]*model.ExhibitionRevenueRecord, error) — Worker 自动发放用,LIMIT batchSize

阶段三Service 层

任务 8每日任务 Service

文件:

  • 新增:backend/services/taskService/service/daily_task_service.go

主要方法:

  • GetDailyTasks(ctx, userID, starID) (*pb.GetDailyTasksResponse, error) — 合并任务定义与用户进度can_claim=(status==completed)
  • ReportEvent(ctx, userID, starID, eventType) (*pb.ReportEventResponse, error) — eventType 映射到 task_key标记任务完成
  • ClaimDailyTask(ctx, userID, starID, taskKey) (*pb.ClaimDailyTaskResponse, error) — 验证 completed调用水晶和经验奖励标记 claimed
  • ClaimAllDailyTasks(ctx, userID, starID) (*pb.ClaimAllDailyTasksResponse, error) — 查找所有已完成任务,批量领取

任务 9引导任务 Service

文件:

  • 新增:backend/services/taskService/service/onboarding_service.go

主要方法:

  • GetOnboardingStatus(ctx, userID) (*pb.GetOnboardingStatusResponse, error)
  • CompleteGuide(ctx, userID, taskKey) (*pb.CompleteGuideResponse, error) — 标记引导进度完成(不是每日任务),返回完整阶段列表
  • AdvanceStage(ctx, userID, targetStage) (*pb.AdvanceStageResponse, error) — 验证当前阶段所有任务已完成,再推进阶段
  • ClaimOnboardingReward(ctx, userID, stage) (*pb.ClaimOnboardingRewardResponse, error) — 验证阶段已完成,发放奖励,标记 claimed
  • InitUserTasks(ctx, userID, starID) error — TaskInternalService.InitUserTasks RPC 调用:创建 user_onboarding_status若不存在+ 调用 dailyRepo.InitDailyTasksForUser

任务 10收益 Service

文件:

  • 新增:backend/services/taskService/service/revenue_service.go

主要方法:

  • GetExhibitionRevenue(ctx, userID, starID, status string, page, pageSize) (*pb.GetExhibitionRevenueResponse, error)
  • ClaimExhibitionRevenue(ctx, userID, starID, revenueID) (*pb.ClaimExhibitionRevenueResponse, error) — 使用 repo.ClaimRevenueRecord 乐观锁
  • ClaimAllExhibitionRevenue(ctx, userID, starID) (*pb.ClaimAllExhibitionRevenueResponse, error) — 查找所有可领取收益,批量领取
  • OnExhibitionCompleted(ctx, req) (*pb.OnExhibitionCompletedResponse, error)关键:仅在 slot_owner_uid != occupier_uid 时创建收益记录自己展位不产生收益status='claimable'

阶段四Provider 层API 处理器)

任务 11移动端 API Provider

文件:

  • 新增:backend/services/taskService/provider/task_mobile_provider.go

实现 TaskMobileService 接口proto 生成)。通过 Dubbo context attachments 提取 user_idgateway 认证中间件设置)。参考 galleryService/provider/gallery_provider.go

辅助函数(定义在文件顶部):

func getUserIDFromCtx(ctx context.Context) int64 {
    // Dubbo 通过 context attachments 从 gateway 认证中间件传递 user_id
    // Triple 协议通过 rpcCtx 或 attachments 获取
    if attachments := ctx.Value("attachments").(map[string]string); attachments != nil {
        if uid := attachments["user_id"]; uid != "" {
            if id, _ := strconv.ParseInt(uid, 10, 64); id > 0 { return id }
        }
    }
    return 0
}

每个 handler提取 userID → 调用 service 方法 → 返回响应。

任务 12内部 RPC Provider

文件:

  • 新增:backend/services/taskService/provider/task_internal_provider.go

实现 TaskInternalService

  • InitUserTasks(ctx, req) (*pb.InitUserTasksResponse, error) — 提取 req.UserIdreq.StarId,调用 onboardingService.InitUserTasks(ctx, userID, starID)
  • OnExhibitionCompleted(ctx, req) (*pb.OnExhibitionCompletedResponse, error) — 调用 revenueService.OnExhibitionCompleted

内部 RPC 不需要认证中间件(由其他服务调用)。


阶段五Worker

任务 13每日重置 Worker

文件:

  • 新增:backend/services/taskService/worker/daily_reset_worker.go
package worker

import (
    "context"
    "fmt"
    "strconv"
    "sync"
    "time"

    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/services/taskService/client"
    "github.com/topfans/backend/services/taskService/config"
    "github.com/topfans/backend/services/taskService/repository"
    "go.uber.org/zap"
)

type DailyResetWorker struct {
    dailyRepo   *repository.DailyTaskRepository
    revenueRepo *repository.RevenueRepository
    userClient  client.UserServiceClient
    stopCh      chan struct{}
    wg          sync.WaitGroup
}

func NewDailyResetWorker(dailyRepo *repository.DailyTaskRepository, revenueRepo *repository.RevenueRepository, userClient client.UserServiceClient) *DailyResetWorker {
    return &DailyResetWorker{
        dailyRepo:   dailyRepo,
        revenueRepo: revenueRepo,
        userClient:  userClient,
        stopCh:      make(chan struct{}),
    }
}

func (w *DailyResetWorker) Start() {
    w.wg.Add(1)
    go w.runLoop()
    logger.Logger.Info("DailyResetWorker started")
}

func (w *DailyResetWorker) Stop() {
    close(w.stopCh)
    w.wg.Wait()
    logger.Logger.Info("DailyResetWorker stopped")
}

func (w *DailyResetWorker) runLoop() {
    defer w.wg.Done()
    for {
        now := time.Now()
        loc, _ := time.LoadLocation("Asia/Shanghai")
        next := time.Date(now.Year(), now.Month(), now.Day(), config.WorkerConfigData.ResetHour, config.WorkerConfigData.ResetMinute, 0, 0, loc)
        if next.Before(now) {
            next = next.Add(24 * time.Hour)
        }
        waitDuration := next.Sub(now)
        logger.Logger.Info(fmt.Sprintf("Next daily reset at %s (in %v)", next.Format(time.RFC3339), waitDuration))

        select {
        case <-time.After(waitDuration):
            w.doResetAndAutoClaim()
        case <-w.stopCh:
            return
        }
    }
}

func (w *DailyResetWorker) doResetAndAutoClaim() {
    lockKey := time.Now().Format("20060102")
    lockID, _ := strconv.ParseInt(lockKey, 10, 64)
    acquired := w.dailyRepo.AcquireAdvisoryLock(lockID)
    if !acquired {
        logger.Logger.Info("Another instance is running daily reset, skipping")
        return
    }
    defer w.dailyRepo.ReleaseAdvisoryLock(lockID)

    // 1. 重置每日任务
    resetCount, err := w.dailyRepo.ResetAllDailyTasks()
    if err != nil {
        logger.Logger.Error("Failed to reset daily tasks", zap.Error(err))
    } else {
        logger.Logger.Info(fmt.Sprintf("Daily tasks reset: %d records updated", resetCount))
    }

    // 2. 自动发放展示收益
    w.autoClaimExhibitionRevenue()
}

func (w *DailyResetWorker) autoClaimExhibitionRevenue() {
    batchSize := config.WorkerConfigData.RevenueBatchSize
    maxRetries := config.WorkerConfigData.RevenueMaxRetries
    totalClaimed := 0

    for {
        records, err := w.revenueRepo.ListClaimableRevenue(batchSize)
        if err != nil {
            logger.Logger.Error("Failed to list claimable revenue", zap.Error(err))
            break
        }
        if len(records) == 0 {
            break
        }

        for _, record := range records {
            var lastErr error
            for attempt := 0; attempt < maxRetries; attempt++ {
                _, err := w.userClient.UpdateCrystalBalance(context.Background(), record.UserID, record.StarID, record.CrystalAmount)
                if err == nil {
                    w.revenueRepo.UpdateRevenueStatus(record.ID, "claimed")
                    totalClaimed++
                    break
                }
                lastErr = err
                time.Sleep(100 * time.Millisecond)
            }
            if lastErr != nil {
                w.revenueRepo.UpdateRevenueStatus(record.ID, "failed")
                logger.Logger.Error("Failed to auto-claim revenue after retries",
                    zap.Int64("record_id", record.ID), zap.Error(lastErr))
            }
        }
    }
    logger.Logger.Info(fmt.Sprintf("Auto-claim completed: %d records claimed", totalClaimed))
}

阶段六main.go

任务 14服务入口

文件:

  • 新增:backend/services/taskService/main.go
package main

import (
    "flag"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    dubboclient "dubbo.apache.org/dubbo-go/v3/client"
    _ "dubbo.apache.org/dubbo-go/v3/imports"
    "dubbo.apache.org/dubbo-go/v3/protocol"
    "dubbo.apache.org/dubbo-go/v3/server"

    "github.com/topfans/backend/pkg/database"
    "github.com/topfans/backend/pkg/logger"
    "github.com/topfans/backend/pkg/proto/task"
    pbUser "github.com/topfans/backend/pkg/proto/user"
    "github.com/topfans/backend/services/taskService/client"
    "github.com/topfans/backend/services/taskService/config"
    "github.com/topfans/backend/services/taskService/model"
    "github.com/topfans/backend/services/taskService/provider"
    "github.com/topfans/backend/services/taskService/repository"
    "github.com/topfans/backend/services/taskService/service"
    "github.com/topfans/backend/services/taskService/worker"
)

var port = flag.Int("port", 20005, "Dubbo service port")

func main() {
    flag.Parse()

    // 1. Init logger必须最前
    env := os.Getenv("ENV")
    if env == "" { env = "development" }
    if err := logger.Init(logger.Config{ServiceName: "task-service", Environment: env, LogLevel: os.Getenv("LOG_LEVEL")}); err != nil {
        panic(fmt.Sprintf("Failed to init logger: %v", err))
    }
    defer logger.Sync()
    logger.Logger.Info("Starting taskService...")

    // 2. Init config读取 flags/env
    config.InitConfig()

    // 3. Init database + auto-migrate
    dbConfig := database.Config{
        Host: config.DBConfig.Host, Port: config.DBConfig.Port,
        User: config.DBConfig.User, Password: config.DBConfig.Password,
        DBName: config.DBConfig.DBName, SSLMode: config.DBConfig.SSLMode,
        TimeZone: "Asia/Shanghai",
    }
    if err := database.Init(dbConfig); err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to init DB: %v", err))
    }
    db := database.GetDB()
    if err := db.AutoMigrate(
        &model.TaskDefinition{},
        &model.UserDailyTaskProgress{},
        &model.UserOnboardingProgress{},
        &model.UserOnboardingStatus{},
        &model.OnboardingStageConfig{},
        &model.ExhibitionRevenueRecord{},
        &model.TaskResetLog{},
    ); err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to migrate tables: %v", err))
    }
    logger.Logger.Info("Database initialized")

    // 4. Init repositories
    dailyRepo := repository.NewDailyTaskRepository(db)
    onboardingRepo := repository.NewOnboardingRepository(db)
    revenueRepo := repository.NewRevenueRepository(db)
    logger.Logger.Info("Repositories initialized")

    // 5. Init userService Dubbo client + RPC client
    userCli, err := dubboclient.NewClient(dubboclient.WithClientURL(config.ServiceURLsConfig.UserService))
    if err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to create userService client: %v", err))
    }
    userServiceClient, err := pbUser.NewUserSocialService(userCli)
    if err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to get userService: %v", err))
    }
    userRPCClient := client.NewUserServiceClient(userServiceClient)
    logger.Logger.Info("User RPC client initialized")

    // 6. Init services
    dailySvc := service.NewDailyTaskService(dailyRepo, userRPCClient)
    onboardingSvc := service.NewOnboardingService(onboardingRepo, dailyRepo, userRPCClient)
    revenueSvc := service.NewRevenueService(revenueRepo, userRPCClient)
    logger.Logger.Info("Services initialized")

    // 7. Init workergoroutine 中启动)
    resetWorker := worker.NewDailyResetWorker(dailyRepo, revenueRepo, userRPCClient)
    go resetWorker.Start()
    logger.Logger.Info("Reset worker started")

    // 8. Init providers
    mobileProvider := provider.NewTaskMobileProvider(dailySvc, onboardingSvc, revenueSvc)
    internalProvider := provider.NewTaskInternalProvider(onboardingSvc, revenueSvc)
    logger.Logger.Info("Providers initialized")

    // 9. Create Dubbo server on port 20005
    srv, err := server.NewServer(
        server.WithServerProtocol(protocol.WithPort(*port), protocol.WithTriple()),
    )
    if err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to create server: %v", err))
    }
    if err := task.RegisterTaskMobileServiceHandler(srv, mobileProvider); err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to register TaskMobileService: %v", err))
    }
    if err := task.RegisterTaskInternalServiceHandler(srv, internalProvider); err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to register TaskInternalService: %v", err))
    }
    if err := srv.Serve(); err != nil {
        logger.Logger.Fatal(fmt.Sprintf("Failed to serve: %v", err))
    }
    logger.Logger.Info(fmt.Sprintf("taskService started on port %d", *port))

    // 10. Graceful shutdown
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit
    logger.Logger.Info("Shutting down taskService...")
    resetWorker.Stop()
}

阶段七Proto 编译

任务 15Proto 编译

步骤 1 修改 backend/scripts/compile-proto.sh

  • 在循环(第 56 行左右)中添加 task
  • 在清理块(第 159 行左右)中添加 task
  • 在 activity.proto 编译块之后添加 task.proto 编译块

mkdir 循环中:

for name in common user social asset gallery ranking activity task; do

mkdir 创建目录:

mkdir -p pkg/proto/task

activity.proto 编译块之后添加:

# 编译 task.proto
echo "📦 编译 task.proto ..."
protoc --proto_path=proto \
  --proto_path=. \
  --go_out=pkg/proto/task \
  --go_opt=paths=source_relative \
  --go-triple_out=pkg/proto/task \
  --go-triple_opt=paths=source_relative \
  task.proto
echo "✅ task.proto 编译完成"

清理循环中也需要添加 task

步骤 2 运行:cd backend && sh scripts/compile-proto.sh


阶段九:移动端前端

任务 16任务 API 工具函数

文件:

  • 新增:frontend/utils/task-api.js

复用 frontend/utils/api.js 中的 request 封装:

import { request } from './api.js'

// baseURL 已配置在 api.js 中,指向 gateway端口 8080
// 所有 task API 通过 gateway 路由到 taskServiceTriple/HTTP

export const getDailyTasks = (starId) =>
  request({ url: '/api/tasks/daily', data: { star_id: starId } })

export const reportEvent = (eventType, starId) =>
  request({ url: '/api/tasks/report-event', method: 'POST', data: { event_type: eventType, star_id: starId } })

export const claimDailyTask = (taskKey, starId) =>
  request({ url: '/api/tasks/daily/claim', method: 'POST', data: { task_key: taskKey, star_id: starId } })

export const claimAllDailyTasks = (starId) =>
  request({ url: '/api/tasks/daily/claim-all', method: 'POST', data: { star_id: starId } })

export const completeGuide = (taskKey) =>
  request({ url: '/api/tasks/guide/complete', method: 'POST', data: { task_key: taskKey } })

export const getOnboardingStatus = () =>
  request({ url: '/api/tasks/onboarding/status' })

export const advanceStage = (targetStage) =>
  request({ url: '/api/tasks/onboarding/advance-stage', method: 'POST', data: { target_stage: targetStage } })

export const claimOnboardingReward = (stage) =>
  request({ url: '/api/tasks/onboarding/claim-reward', method: 'POST', data: { stage } })

export const getExhibitionRevenue = (starId, status, page, pageSize) =>
  request({ url: '/api/tasks/exhibition-revenue', data: { star_id: starId, status, page, page_size: pageSize } })

export const claimExhibitionRevenue = (revenueId, starId) =>
  request({ url: '/api/tasks/exhibition-revenue/claim', method: 'POST', data: { revenue_id: revenueId, star_id: starId } })

export const claimAllExhibitionRevenue = (starId) =>
  request({ url: '/api/tasks/exhibition-revenue/claim-all', method: 'POST', data: { star_id: starId } })

任务 17每日任务页面

文件:

  • 新增:frontend/pages/tasks/daily-tasks.vue

功能:

  • 通过 uni.getStorageSync('fan_profile') 获取当前 star_id
  • 挂载时调用 getDailyTasks(starId)
  • 展示任务列表名称、描述、水晶奖励、经验奖励、状态标签pending=蓝色, completed=绿色可领取, claimed=灰色)
  • "领取"按钮仅在 can_claim=true 时可点击
  • 底部有"一键领取"按钮(有待领取任务时显示)
  • 调用 claimDailyTask / claimAllDailyTasks,带加载状态
  • 支持下拉刷新 uni.startPullDownRefresh
  • 任务列表为空时显示空状态

任务 18引导任务页面

文件:

  • 新增:frontend/pages/tasks/guide.vue

功能:

  • 挂载时调用 getOnboardingStatus()
  • 展示阶段列表:当前阶段高亮
  • 每阶段显示:名称、所需任务 keys含已完成/未完成标记)
  • "进入下一阶段"按钮 — 仅当当前阶段所有任务完成时才可点击,调用 advanceStage(targetStage)
  • 当前阶段"领取奖励"按钮 — 调用 claimOnboardingReward(currentStage)
  • 用户完成每个引导步骤后(如进入 square_home调用 completeGuide

任务 19收益页面

文件:

  • 新增:frontend/pages/tasks/revenue.vue

功能:

  • Tab 切换:"可领取" / "已领取",使用 uni segmented-control 或 Tab 样式按钮
  • 默认 Tab = "可领取",显示 can_claim=true 的项目
  • 每条记录:展品缩略图(无可用则占位)、展位类型标签("自己的展位"/"好友展位")、水晶数量、周期时间范围
  • 单条"领取"按钮和"一键领取"按钮
  • 分页加载 uni.loadMore
  • 下拉刷新

任务 20页面路由

文件:

  • 修改:frontend/pages.json

pages 数组中添加:

{
  "path": "pages/tasks/daily-tasks",
  "style": { "navigationStyle": "custom" }
},
{
  "path": "pages/tasks/guide",
  "style": { "navigationStyle": "custom" }
},
{
  "path": "pages/tasks/revenue",
  "style": { "navigationStyle": "custom" }
}

阶段十:集成与测试

任务 21手动 API 测试

通过 gateway 测试(假设 taskService 已在 Dubbo 注册,或通过 gateway HTTP 路由到 20005

  1. GET /api/tasks/daily?star_id=1 → 返回任务列表
  2. POST /api/tasks/report-event {event_type:"daily_login",star_id:1} → task_completed=true
  3. GET /api/tasks/onboarding/status → 返回 current_stage:0
  4. POST /api/tasks/guide/complete {task_key:"square_home"} → 创建进度,返回阶段信息
  5. GET /api/tasks/exhibition-revenue?star_id=1&page=1&page_size=20 → 返回收益列表
  6. POST /api/tasks/exhibition-revenue/claim {revenue_id:1,star_id:1} → 领取单个
  7. POST /api/tasks/exhibition-revenue/claim-all {star_id:1} → 一键领取