91 KiB
通知系统实现计划
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 实现统一通知系统,支持点赞/系统/活动三种类型,列表层按 target_id 聚合 like 通知
Architecture: 独立 Dubbo Notification Service(事务内写通知+统计)+ social service 同步触发点赞通知 + admin 后端直写共享库触发系统/活动通知 + 列表层 GROUP BY 聚合 like
Tech Stack: Go 1.21+ / Dubbo-go / GORM / PostgreSQL / FastAPI + SQLAlchemy
参考 spec: docs/superpowers/specs/2026-06-16-notification-system-design.md
文件结构总览
新建文件
| 路径 | 职责 |
|---|---|
backend/migrations/2026_06_16_001_create_notifications.sql |
DB schema(notifications + notification_stats) |
backend/proto/notification.proto |
RPC 接口定义 |
backend/services/notificationService/main.go |
Dubbo 启动入口 |
backend/services/notificationService/configs/config.yaml |
端口 20010、DB 配置 |
backend/services/notificationService/provider/notification_provider.go |
RPC 实现 |
backend/services/notificationService/service/notification_service.go |
业务逻辑 + 事务 |
backend/services/notificationService/repository/notification_repository.go |
notifications CRUD |
backend/services/notificationService/repository/notification_stats_repository.go |
统计 UPSERT |
backend/services/notificationService/model/notification.go |
数据模型 |
backend/services/notificationService/repository/notification_repository_test.go |
仓储单测 |
backend/services/notificationService/service/notification_service_test.go |
业务单测 |
backend/services/socialService/client/notification_client.go |
Dubbo 客户端 |
TopFans-activity-admin/backend/models/notification.py |
SQLAlchemy ORM |
TopFans-activity-admin/backend/crud/notification_crud.py |
业务函数 |
TopFans-activity-admin/backend/crud/notification_crud_test.py |
crud 单测 |
TopFans-activity-admin/backend/handlers/notification.py |
FastAPI handler |
TopFans-activity-admin/backend/handlers/activity.py |
改造:自动 broadcast |
修改文件
| 路径 | 改动 |
|---|---|
backend/proto/asset.proto |
GetAssetForRPCResponse 加 name/cover_url |
backend/services/assetService/provider/asset_provider.go |
填充新增字段 |
backend/services/socialService/service/asset_like_service.go |
LikeAsset 末尾调 notification |
backend/services/socialService/main.go |
注入 NotificationClient |
backend/services/socialService/service/asset_like_service_test.go |
新增测试用例(mock notification 失败时点赞仍成功) |
backend/gateway/router/router.go |
注册 /api/v1/notifications/* |
backend/gateway/config/config.yaml |
加 notification service backend |
TopFans-activity-admin/backend/router/__init__.py |
注册 notification 路由 |
TopFans-activity-admin/backend/main.py |
DB schema 同步声明(如使用 SQLAlchemy create_all) |
阶段 1:数据库 Schema
Task 1: 创建 notifications 迁移文件
Files:
-
Create:
backend/migrations/2026_06_16_001_create_notifications.sql -
Step 1: 写迁移文件
-- 通知系统主表 + 统计表
-- 创建时间: 2026-06-16
-- 关联: spec §4.1
-- 1. 通知主表
CREATE TABLE IF NOT EXISTS public.notifications (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
type VARCHAR(20) NOT NULL, -- like / system / activity
title VARCHAR(200) NOT NULL,
content VARCHAR(500),
data JSONB,
is_read BOOLEAN NOT NULL DEFAULT FALSE,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at BIGINT NOT NULL,
read_at BIGINT
);
-- 2. 索引
CREATE INDEX IF NOT EXISTS idx_notifications_user_type_created
ON public.notifications (user_id, star_id, type, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_notifications_user_unread
ON public.notifications (user_id, star_id, is_read, created_at DESC)
WHERE is_deleted = FALSE;
-- 3. 通知统计表
CREATE TABLE IF NOT EXISTS public.notification_stats (
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
like_unread_count INT NOT NULL DEFAULT 0,
system_unread_count INT NOT NULL DEFAULT 0,
activity_unread_count INT NOT NULL DEFAULT 0,
total_unread_count INT NOT NULL DEFAULT 0,
updated_at BIGINT NOT NULL,
PRIMARY KEY (user_id, star_id)
);
-- 4. 序列起始值预留 10000(CLAUDE.md 数据库规范)
ALTER SEQUENCE notifications_id_seq RESTART WITH 10000;
- Step 2: 验证 SQL 语法(如环境有 psql)
PGPASSWORD=$DB_PASSWORD psql -h localhost -U $DB_USER -d $DB_NAME -f backend/migrations/2026_06_16_001_create_notifications.sql
Expected: 两条 CREATE TABLE 成功,无报错。
- Step 3: 验证表结构
\d public.notifications
\d public.notification_stats
SELECT last_value FROM public.notifications_id_seq;
Expected: 字段、索引齐全;序列 last_value = 10000。
-
Step 4: 回归检查(CLAUDE.md 规范)
-
确认未影响其他表:
SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename NOT IN ('notifications', 'notification_stats') ORDER BY tablename;应返回原有所有表 -
序列起始值 10000 不与现有
assets/users等 BIGSERIAL 表冲突 -
Step 5: Commit
git add backend/migrations/2026_06_16_001_create_notifications.sql
git commit -m "feat(notification): add notifications and notification_stats tables"
阶段 2:Proto 定义
Task 2: 扩展 asset.proto 加 name/cover_url 字段
Files:
-
Modify:
backend/proto/asset.proto:326-333 -
Step 1: 修改 GetAssetForRPCResponse
定位到 backend/proto/asset.proto line 326-333:
// 获取资产信息响应(内部RPC)
message GetAssetForRPCResponse {
topfans.common.BaseResponse base = 1;
int64 asset_id = 2; // 资产ID
int64 owner_uid = 3; // 持有者ID
int64 star_id = 4; // 明星ID
int32 status = 5; // 状态:0=Pending, 1=Active
bool is_active = 6; // 是否激活
string name = 7; // 藏品名称(用于通知)
string cover_url = 8; // 藏品封面(用于通知)
}
- Step 2: 重新生成 proto Go 代码
cd backend
./gen-swagger.sh 2>/dev/null || true # 触发 proto 生成
# 或者直接调 protoc:
protoc --go_out=. --go-grpc_out=. proto/asset.proto
Expected: pkg/proto/asset/asset.pb.go 重新生成,编译通过。
- Step 3: 回归检查
用 mcp__code-review-graph__query_graph_tool 查 GetAssetForRPC 的所有调用方:
-
pattern=callers_of target=GetAssetForRPC -
确认 social service 现有调用点不会因新字段破坏(向后兼容,新字段为零值即可)
-
Step 4: 编译验证
cd backend
go build ./...
Expected: 编译通过(asset service 端还没填新字段,但新字段为零值不影响运行)。
- Step 5: Commit
git add backend/proto/asset.proto backend/pkg/proto/asset/
git commit -m "feat(proto): add name and cover_url to GetAssetForRPCResponse"
Task 3: asset provider 实现填充新字段
Files:
-
Modify:
backend/services/assetService/provider/asset_provider.go(GetAssetForRPC方法) -
Step 1: 定位 GetAssetForRPC 实现
grep -n "GetAssetForRPC" backend/services/assetService/provider/asset_provider.go
- Step 2: 在 DB 查询中补充 name/cover_url
定位到 GetAssetForRPC 的 SQL 查询处(一般在 asset_repository.go 调用处),补充 name 和 cover_url 字段到 SELECT。
参考实现位置(基于现有 asset_repository.go line 222 已有 name/cover_url 字段):
resp := &assetPb.GetAssetForRPCResponse{
Base: baseResp,
AssetId: asset.ID,
OwnerUid: asset.OwnerUID,
StarId: asset.StarID,
Status: int32(asset.Status),
IsActive: asset.Status == 1,
Name: asset.Name, // 新增
CoverUrl: asset.CoverURL, // 新增
}
- Step 3: 编译验证
cd backend
go build ./services/assetService/...
Expected: 编译通过。
- Step 4: 单元测试(如有 GetAssetForRPC 测试用例)
cd backend
go test ./services/assetService/repository/ -run TestGetAssetForRPC -v
Expected: 既有测试通过;如有测试可加 case 验证 name/cover_url 返回正确。
- Step 5: 回归检查
query_graph pattern=callers_of target=GetAssetForRPC 再次确认;asset service 启动一次端到端验证:
cd backend
./start.sh assetService # 或部署后健康检查
curl http://localhost:20003/health
- Step 6: Commit
git add backend/services/assetService/
git commit -m "feat(assetService): populate name and cover_url in GetAssetForRPC"
Task 4: 创建 notification.proto
Files:
-
Create:
backend/proto/notification.proto -
Step 1: 写 proto 文件
syntax = "proto3";
package topfans.notification;
option go_package = "github.com/topfans/backend/pkg/proto/notification;notification";
import "proto/common.proto";
import "google/api/annotations.proto";
import "google/protobuf/struct.proto";
service NotificationService {
rpc CreateNotification(CreateNotificationRequest) returns (CreateNotificationResponse) {
option (google.api.http) = {
post: "/internal/v1/notifications"
body: "*"
};
}
rpc GetNotifications(GetNotificationsRequest) returns (GetNotificationsResponse) {
option (google.api.http) = { get: "/api/v1/notifications" };
}
rpc GetUnreadCount(GetUnreadCountRequest) returns (GetUnreadCountResponse) {
option (google.api.http) = { get: "/api/v1/notifications/unread-count" };
}
rpc MarkAsRead(MarkAsReadRequest) returns (MarkAsReadResponse) {
option (google.api.http) = { post: "/api/v1/notifications/{id}/read" };
}
rpc MarkAsReadByTarget(MarkAsReadByTargetRequest) returns (MarkAsReadByTargetResponse) {
option (google.api.http) = { post: "/api/v1/notifications/targets/{target_id}/read" };
}
rpc MarkAllAsRead(MarkAllAsReadRequest) returns (MarkAllAsReadResponse) {
option (google.api.http) = { post: "/api/v1/notifications/read-all" };
}
rpc DeleteNotification(DeleteNotificationRequest) returns (DeleteNotificationResponse) {
option (google.api.http) = { delete: "/api/v1/notifications/{id}" };
}
rpc DeleteByTarget(DeleteByTargetRequest) returns (DeleteByTargetResponse) {
option (google.api.http) = { delete: "/api/v1/notifications/targets/{target_id}" };
}
}
message Notification {
int64 id = 1; int64 user_id = 2; int64 star_id = 3;
string type = 4; string title = 5; string content = 6;
google.protobuf.Struct data = 7;
bool is_read = 8; int64 created_at = 9; int64 read_at = 10;
// 列表层聚合:仅 type=like 且按 target_id 聚合时返回
bool aggregated = 11;
int32 total_count = 12;
repeated ActorPreview actors = 13;
int64 target_id = 14;
}
message ActorPreview {
int64 user_id = 1; string nickname = 2; string avatar = 3;
int64 liked_at = 4;
}
message CreateNotificationRequest {
int64 user_id = 1; int64 star_id = 2;
string type = 3; string title = 4; string content = 5;
google.protobuf.Struct data = 6;
}
message CreateNotificationResponse {
topfans.common.BaseResponse base = 1;
int64 id = 2;
}
message GetNotificationsRequest {
string type = 1; // like / system / activity / 空=全部
string tab = 2; // today / history / 空=全部
int32 page = 3; int32 page_size = 4;
}
message GetNotificationsResponse {
topfans.common.BaseResponse base = 1;
repeated Notification items = 2;
int64 total = 3;
int32 page = 4; int32 page_size = 5;
}
message GetUnreadCountRequest {}
message UnreadCount {
int32 like = 1; int32 system = 2; int32 activity = 3; int32 total = 4;
}
message GetUnreadCountResponse {
topfans.common.BaseResponse base = 1;
UnreadCount counts = 2;
}
message MarkAsReadRequest { int64 id = 1; }
message MarkAsReadResponse { topfans.common.BaseResponse base = 1; }
message MarkAsReadByTargetRequest { int64 target_id = 1; }
message MarkAsReadByTargetResponse {
topfans.common.BaseResponse base = 1; int32 affected = 2;
}
message MarkAllAsReadRequest { string type = 1; }
message MarkAllAsReadResponse {
topfans.common.BaseResponse base = 1; int32 affected = 2;
}
message DeleteNotificationRequest { int64 id = 1; }
message DeleteNotificationResponse { topfans.common.BaseResponse base = 1; }
message DeleteByTargetRequest { int64 target_id = 1; }
message DeleteByTargetResponse {
topfans.common.BaseResponse base = 1; int32 affected = 2;
}
- Step 2: 重新生成 Go 代码
cd backend
protoc --go_out=. --go-grpc_out=. proto/notification.proto
# 或
make proto
Expected: pkg/proto/notification/notification.pb.go 生成成功。
- Step 3: 编译验证
cd backend
go build ./pkg/proto/notification/...
Expected: 编译通过。
- Step 4: Commit
git add backend/proto/notification.proto backend/pkg/proto/notification/
git commit -m "feat(proto): add notification service proto definition"
阶段 3:Notification Service(Go 端)
Task 5: 创建 notification service 配置文件
Files:
-
Create:
backend/services/notificationService/configs/config.yaml -
Step 1: 复制 socialService 的 config.yaml 模板
cp backend/services/socialService/configs/config.yaml \
backend/services/notificationService/configs/config.yaml
- Step 2: 修改服务名、端口、数据库名
参考 socialService/configs/config.yaml 已有结构,修改:
-
service.name: notification-service -
service.port: 20010 -
database.dbname: top-fans(共享数据库) -
Step 3: Commit
git add backend/services/notificationService/configs/
git commit -m "feat(notificationService): add service config"
Task 6: 创建 notification data model
Files:
-
Create:
backend/services/notificationService/model/notification.go -
Step 1: 写 model
package model
import "time"
// Notification 通知数据模型
type Notification struct {
ID int64 `json:"id" gorm:"primaryKey;column:id"`
UserID int64 `json:"user_id" gorm:"column:user_id;not null;index"`
StarID int64 `json:"star_id" gorm:"column:star_id;not null"`
Type string `json:"type" gorm:"column:type;not null;size:20"`
Title string `json:"title" gorm:"column:title;not null;size:200"`
Content string `json:"content" gorm:"column:content;size:500"`
Data string `json:"data" gorm:"column:data;type:jsonb"` // JSONB stored as string
IsRead bool `json:"is_read" gorm:"column:is_read;not null;default:false"`
IsDeleted bool `json:"is_deleted" gorm:"column:is_deleted;not null;default:false"`
CreatedAt int64 `json:"created_at" gorm:"column:created_at;not null"`
ReadAt int64 `json:"read_at" gorm:"column:read_at"`
}
// TableName 表名
func (Notification) TableName() string {
return "public.notifications"
}
// NotificationStats 通知统计模型
type NotificationStats struct {
UserID int64 `json:"user_id" gorm:"primaryKey;column:user_id"`
StarID int64 `json:"star_id" gorm:"primaryKey;column:star_id"`
LikeUnreadCount int `json:"like_unread_count" gorm:"column:like_unread_count;not null;default:0"`
SystemUnreadCount int `json:"system_unread_count" gorm:"column:system_unread_count;not null;default:0"`
ActivityUnreadCount int `json:"activity_unread_count" gorm:"column:activity_unread_count;not null;default:0"`
TotalUnreadCount int `json:"total_unread_count" gorm:"column:total_unread_count;not null;default:0"`
UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;not null"`
}
// TableName 表名
func (NotificationStats) TableName() string {
return "public.notification_stats"
}
// ActorPreview 列表层聚合时的 actor 预览
type ActorPreview struct {
UserID int64 `json:"user_id"`
Nickname string `json:"nickname"`
Avatar string `json:"avatar"`
LikedAt int64 `json:"liked_at"`
}
// AggregatedNotification 聚合查询结果(type=like 时返回)
type AggregatedNotification struct {
ID int64 `json:"id"` // 原始第一条 id
UserID int64 `json:"user_id"`
StarID int64 `json:"star_id"`
Type string `json:"type"`
Title string `json:"title"`
Content string `json:"content"`
TargetID int64 `json:"target_id"`
IsRead bool `json:"is_read"`
CreatedAt int64 `json:"created_at"`
ReadAt int64 `json:"read_at"`
TotalCount int32 `json:"total_count"`
Actors []ActorPreview `json:"actors"`
Data string `json:"data"`
}
- Step 2: Commit
git add backend/services/notificationService/model/
git commit -m "feat(notificationService): add data models"
Task 7: 创建 notification repository(仓储层)
Files:
-
Create:
backend/services/notificationService/repository/notification_repository.go -
Step 1: 写仓储实现
package repository
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/notificationService/model"
"go.uber.org/zap"
)
type NotificationRepository struct {
db *database.DB
}
func NewNotificationRepository(db *database.DB) *NotificationRepository {
return &NotificationRepository{db: db}
}
// Create 插入通知(事务内调用)
func (r *NotificationRepository) Create(ctx context.Context, tx *sql.Tx, n *model.Notification) (int64, error) {
var id int64
err := tx.QueryRowContext(ctx, `
INSERT INTO public.notifications
(user_id, star_id, type, title, content, data, is_read, is_deleted, created_at, read_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING id
`, n.UserID, n.StarID, n.Type, n.Title, n.Content, n.Data, n.IsRead, n.IsDeleted, n.CreatedAt, n.ReadAt).Scan(&id)
if err != nil {
logger.Logger.Error("failed to insert notification", zap.Error(err))
return 0, fmt.Errorf("insert notification: %w", err)
}
return id, nil
}
// ListSystemActivity 列出 system / activity 通知(不聚合)
func (r *NotificationRepository) ListSystemActivity(ctx context.Context, userID, starID int64, ntype, tab string, page, pageSize int) ([]*model.Notification, int64, error) {
args := []interface{}{userID, starID, ntype}
where := "user_id = $1 AND star_id = $2 AND type = $3 AND is_deleted = FALSE"
if tab == "today" {
startOfDay := startOfTodayMs()
where += " AND created_at >= $4"
args = append(args, startOfDay)
} else if tab == "history" {
startOfDay := startOfTodayMs()
where += " AND created_at < $4"
args = append(args, startOfDay)
}
// count
var total int64
if err := r.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM public.notifications WHERE "+where, args...).Scan(&total); err != nil {
return nil, 0, err
}
// list
offset := (page - 1) * pageSize
args = append(args, pageSize, offset)
limitIdx := len(args) - 1
offsetIdx := len(args)
query := fmt.Sprintf(`
SELECT id, user_id, star_id, type, title, COALESCE(content,''), data,
is_read, is_deleted, created_at, COALESCE(read_at, 0)
FROM public.notifications
WHERE %s
ORDER BY created_at DESC
LIMIT $%d OFFSET $%d
`, where, limitIdx, offsetIdx)
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
items := make([]*model.Notification, 0, pageSize)
for rows.Next() {
n := &model.Notification{}
if err := rows.Scan(&n.ID, &n.UserID, &n.StarID, &n.Type, &n.Title, &n.Content, &n.Data, &n.IsRead, &n.IsDeleted, &n.CreatedAt, &n.ReadAt); err != nil {
return nil, 0, err
}
items = append(items, n)
}
return items, total, nil
}
// ListLikesAggregated 列出 like 通知(按 target_id 聚合)
func (r *NotificationRepository) ListLikesAggregated(ctx context.Context, userID, starID int64, tab string, page, pageSize int) ([]*model.AggregatedNotification, int64, error) {
extra := ""
args := []interface{}{userID, starID}
if tab == "today" {
extra = " AND MAX(created_at) >= $3"
args = append(args, startOfTodayMs())
} else if tab == "history" {
extra = " AND MAX(created_at) < $3"
args = append(args, startOfTodayMs())
}
// count: distinct target_id 数
var total int64
countQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM (
SELECT (data->>'target_id')::bigint AS target_id
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
GROUP BY target_id
) t
`)
if err := r.db.QueryRowContext(ctx, countQuery, userID, starID).Scan(&total); err != nil {
return nil, 0, err
}
// 聚合查询:返回 target_id、total_count、最新时间、是否全部已读、actors 列表(前3)
// 使用 json_agg + 子查询分两步
offset := (page - 1) * pageSize
args = append(args, pageSize, offset)
limitIdx, offsetIdx := len(args)-1, len(args)
query := fmt.Sprintf(`
WITH agg AS (
SELECT
(data->>'target_id')::bigint AS target_id,
COUNT(*) AS total_count,
MAX(created_at) AS latest_at,
BOOL_AND(is_read) AS all_read
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
GROUP BY (data->>'target_id')
),
first_notif AS (
SELECT DISTINCT ON ((data->>'target_id')::bigint)
(data->>'target_id')::bigint AS target_id,
id, title, content, data, read_at
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
ORDER BY (data->>'target_id')::bigint, created_at DESC
),
actors AS (
SELECT (data->>'target_id')::bigint AS target_id,
json_agg(json_build_object(
'user_id', (data->>'actor_id')::bigint,
'nickname', COALESCE(data->>'actor_name', ''),
'avatar', COALESCE(data->>'actor_avatar', ''),
'liked_at', created_at
) ORDER BY created_at DESC) AS actor_previews
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
GROUP BY (data->>'target_id')
)
SELECT
a.target_id, a.total_count, a.latest_at, a.all_read,
f.id, f.title, f.content, f.data, f.read_at,
COALESCE(act.actor_previews, '[]'::json) AS actor_previews
FROM agg a
JOIN first_notif f ON f.target_id = a.target_id
LEFT JOIN actors act ON act.target_id = a.target_id
ORDER BY a.latest_at DESC
LIMIT $%d OFFSET $%d
`, limitIdx, offsetIdx)
if extra != "" {
// tab 时间条件插在 WHERE 之外(已经在 CTE 内过滤 MAX)
}
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
items := make([]*model.AggregatedNotification, 0, pageSize)
for rows.Next() {
var item model.AggregatedNotification
var actorLikesJSON []byte
if err := rows.Scan(
&item.TargetID, &item.TotalCount, &item.CreatedAt, &item.IsRead,
&item.ID, &item.Title, &item.Content, &item.Data, &item.ReadAt,
&actorLikesJSON,
); err != nil {
return nil, 0, err
}
// actor_likes 暂存于 Data 字段,由 service 层做 user 信息补全
item.UserID = userID
item.StarID = starID
item.Type = "like"
item.Actors = parseActorLikes(actorLikesJSON) // 简易解析
items = append(items, &item)
}
return items, total, nil
}
// 注意:actor 昵称/头像已在 social service 写入通知时塞入 data 字段
// (data->>'actor_name' / data->>'actor_avatar'),聚合查询直接从 data 取
// 无需 notification service 再调 user service(避免重复 RPC)。
// MarkAsReadByID 标单条已读
func (r *NotificationRepository) MarkAsReadByID(ctx context.Context, tx *sql.Tx, userID, starID, id int64, now int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_read = TRUE, read_at = $4
WHERE id = $1 AND user_id = $2 AND star_id = $3 AND is_read = FALSE AND is_deleted = FALSE
`, id, userID, starID, now)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// MarkAsReadByTarget 标某 target 下所有未读 like 已读
func (r *NotificationRepository) MarkAsReadByTarget(ctx context.Context, tx *sql.Tx, userID, starID, targetID, now int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_read = TRUE, read_at = $5
WHERE user_id=$1 AND star_id=$2 AND type='like'
AND (data->>'target_id')::bigint = $3
AND is_read = FALSE AND is_deleted = FALSE
`, userID, starID, targetID, now)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// MarkAllAsRead 标某 type 下所有未读已读
func (r *NotificationRepository) MarkAllAsRead(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype, now int64) (int32, error) {
if ntype == "" {
// 全部类型
}
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_read = TRUE, read_at = $4
WHERE user_id=$1 AND star_id=$2 AND type=$3 AND is_read=FALSE AND is_deleted=FALSE
`, userID, starID, ntype, now)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// SoftDeleteByID 软删单条
func (r *NotificationRepository) SoftDeleteByID(ctx context.Context, tx *sql.Tx, userID, starID, id int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_deleted = TRUE
WHERE id = $1 AND user_id = $2 AND star_id = $3 AND is_deleted = FALSE
`, id, userID, starID)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// SoftDeleteByTarget 软删某 target 下所有通知
func (r *NotificationRepository) SoftDeleteByTarget(ctx context.Context, tx *sql.Tx, userID, starID, targetID int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_deleted = TRUE
WHERE user_id=$1 AND star_id=$2 AND type='like'
AND (data->>'target_id')::bigint = $3 AND is_deleted = FALSE
`, userID, starID, targetID)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// CountUnreadByID 数某条通知是否属于该 user 且未读
func (r *NotificationRepository) GetTypeByID(ctx context.Context, tx *sql.Tx, id, userID, starID int64) (string, bool, error) {
var ntype string
var isRead bool
err := tx.QueryRowContext(ctx, `
SELECT type, is_read FROM public.notifications
WHERE id=$1 AND user_id=$2 AND star_id=$3 AND is_deleted=FALSE
`, id, userID, starID).Scan(&ntype, &isRead)
if err == sql.ErrNoRows {
return "", false, nil
}
if err != nil {
return "", false, err
}
return ntype, isRead, nil
}
// startOfTodayMs 今日 0 点毫秒时间戳
func startOfTodayMs() int64 {
now := time.Now()
return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).UnixMilli()
}
// parseActorLikes 解析 JSON 数组(user_id + nickname + avatar + liked_at)
// 数据来源是 social service 写入时塞入 data 字段的 actor_name/actor_avatar
func parseActorLikes(data []byte) []model.ActorPreview {
type rawActor struct {
UserID int64 `json:"user_id"`
Nickname string `json:"nickname"`
Avatar string `json:"avatar"`
LikedAt int64 `json:"liked_at"`
}
var raws []rawActor
if err := json.Unmarshal(data, &raws); err != nil {
return nil
}
out := make([]model.ActorPreview, 0, len(raws))
for _, r := range raws {
out = append(out, model.ActorPreview{
UserID: r.UserID, Nickname: r.Nickname, Avatar: r.Avatar, LikedAt: r.LikedAt,
})
}
return out
}
- Step 2: 添加 import "encoding/json"
在 import 块加 "encoding/json"。
- Step 3: 编译验证
cd backend
go build ./services/notificationService/...
Expected: 编译通过。
- Step 4: Commit
git add backend/services/notificationService/repository/notification_repository.go
git commit -m "feat(notificationService): add notification repository"
Task 8: 创建 notification stats repository
Files:
-
Create:
backend/services/notificationService/repository/notification_stats_repository.go -
Step 1: 写统计仓储
package repository
import (
"context"
"database/sql"
"fmt"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/notificationService/model"
"go.uber.org/zap"
)
type NotificationStatsRepository struct {
db *database.DB
}
func NewNotificationStatsRepository(db *database.DB) *NotificationStatsRepository {
return &NotificationStatsRepository{db: db}
}
// IncrementByType 在事务内对指定 type 未读数 +1,total +1
func (r *NotificationStatsRepository) IncrementByType(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype string, now int64) error {
var col string
switch ntype {
case "like":
col = "like_unread_count"
case "system":
col = "system_unread_count"
case "activity":
col = "activity_unread_count"
default:
return fmt.Errorf("invalid notification type: %s", ntype)
}
query := fmt.Sprintf(`
INSERT INTO public.notification_stats (user_id, star_id, %s, total_unread_count, updated_at)
VALUES ($1, $2, 1, 1, $3)
ON CONFLICT (user_id, star_id) DO UPDATE SET
%s = public.notification_stats.%s + 1,
total_unread_count = public.notification_stats.total_unread_count + 1,
updated_at = $3
`, col, col, col)
_, err := tx.ExecContext(ctx, query, userID, starID, now)
if err != nil {
logger.Logger.Error("failed to increment notification stats", zap.Error(err))
return err
}
return nil
}
// DecrementByType 在事务内对指定 type 未读数 -N,total -N
func (r *NotificationStatsRepository) DecrementByType(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype string, delta int, now int64) error {
if delta <= 0 {
return nil
}
var col string
switch ntype {
case "like":
col = "like_unread_count"
case "system":
col = "system_unread_count"
case "activity":
col = "activity_unread_count"
default:
return fmt.Errorf("invalid notification type: %s", ntype)
}
query := fmt.Sprintf(`
UPDATE public.notification_stats
SET %s = GREATEST(0, %s - $3),
total_unread_count = GREATEST(0, total_unread_count - $3),
updated_at = $4
WHERE user_id = $1 AND star_id = $2
`, col, col)
_, err := tx.ExecContext(ctx, query, userID, starID, delta, now)
if err != nil {
logger.Logger.Error("failed to decrement notification stats", zap.Error(err))
return err
}
return nil
}
// ResetByType 把指定 type 未读数置 0
func (r *NotificationStatsRepository) ResetByType(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype string, now int64) error {
var col string
switch ntype {
case "like":
col = "like_unread_count"
case "system":
col = "system_unread_count"
case "activity":
col = "activity_unread_count"
case "":
// 全部置 0
_, err := tx.ExecContext(ctx, `
UPDATE public.notification_stats
SET like_unread_count = 0, system_unread_count = 0,
activity_unread_count = 0, total_unread_count = 0,
updated_at = $3
WHERE user_id = $1 AND star_id = $2
`, userID, starID, now)
return err
default:
return fmt.Errorf("invalid notification type: %s", ntype)
}
query := fmt.Sprintf(`
UPDATE public.notification_stats
SET %s = 0,
total_unread_count = GREATEST(0, total_unread_count - %s),
updated_at = $3
WHERE user_id = $1 AND star_id = $2
`, col, col)
_, err := tx.ExecContext(ctx, query, userID, starID, now)
return err
}
// Get 取该 user+star 的统计
func (r *NotificationStatsRepository) Get(ctx context.Context, userID, starID int64) (*model.NotificationStats, error) {
s := &model.NotificationStats{}
err := r.db.QueryRowContext(ctx, `
SELECT user_id, star_id, like_unread_count, system_unread_count,
activity_unread_count, total_unread_count, updated_at
FROM public.notification_stats
WHERE user_id = $1 AND star_id = $2
`, userID, starID).Scan(&s.UserID, &s.StarID, &s.LikeUnreadCount, &s.SystemUnreadCount,
&s.ActivityUnreadCount, &s.TotalUnreadCount, &s.UpdatedAt)
if err == sql.ErrNoRows {
return &model.NotificationStats{UserID: userID, StarID: starID}, nil
}
if err != nil {
return nil, err
}
return s, nil
}
- Step 2: 编译验证
cd backend
go build ./services/notificationService/...
Expected: 编译通过。
- Step 3: Commit
git add backend/services/notificationService/repository/notification_stats_repository.go
git commit -m "feat(notificationService): add notification stats repository"
Task 9: 写 notification repository 单元测试
Files:
-
Create:
backend/services/notificationService/repository/notification_repository_test.go -
Step 1: 写测试
package repository_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/services/notificationService/model"
"github.com/topfans/backend/services/notificationService/repository"
)
// 依赖真实测试 DB;环境变量 TEST_DB_HOST 等
func setupTestDB(t *testing.T) *database.DB {
t.Helper()
db, err := database.NewFromEnv()
if err != nil {
t.Skipf("skipping: test DB not available: %v", err)
}
return db
}
func TestNotificationRepository_CreateAndList(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewNotificationRepository(db)
statsRepo := repository.NewNotificationStatsRepository(db)
ctx := context.Background()
userID, starID := int64(990001), int64(1)
cleanup := func() {
db.ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1 AND star_id=$2`, userID, starID)
db.ExecContext(ctx, `DELETE FROM public.notification_stats WHERE user_id=$1 AND star_id=$2`, userID, starID)
}
cleanup()
defer cleanup()
now := time.Now().UnixMilli()
tx, _ := db.BeginTx(ctx, nil)
id, err := repo.Create(ctx, tx, &model.Notification{
UserID: userID, StarID: starID, Type: "like",
Title: "新点赞", Content: "test", Data: `{}`,
CreatedAt: now,
})
assert.NoError(t, err)
assert.Greater(t, id, int64(0))
err = statsRepo.IncrementByType(ctx, tx, userID, starID, "like", now)
assert.NoError(t, err)
tx.Commit()
// list system/activity
items, total, err := repo.ListSystemActivity(ctx, userID, starID, "like", "", 1, 20)
assert.NoError(t, err)
assert.Equal(t, int64(1), total)
assert.Len(t, items, 1)
assert.Equal(t, id, items[0].ID)
}
func TestNotificationRepository_LikeAggregation(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewNotificationRepository(db)
ctx := context.Background()
userID, starID, targetID := int64(990002), int64(1), int64(8888)
cleanup := func() {
db.ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1 AND star_id=$2`, userID, starID)
}
cleanup()
defer cleanup()
now := time.Now().UnixMilli()
for i := 0; i < 5; i++ {
tx, _ := db.BeginTx(ctx, nil)
_, err := repo.Create(ctx, tx, &model.Notification{
UserID: userID, StarID: starID, Type: "like",
Title: "新点赞", Data: `{"target_id": 8888, "actor_id": 1000}`,
CreatedAt: now + int64(i)*1000,
})
assert.NoError(t, err)
tx.Commit()
}
items, total, err := repo.ListLikesAggregated(ctx, userID, starID, "", 1, 20)
assert.NoError(t, err)
assert.Equal(t, int64(1), total)
assert.Len(t, items, 1)
assert.Equal(t, int32(5), items[0].TotalCount)
assert.Equal(t, targetID, items[0].TargetID)
}
func TestNotificationRepository_MarkAsReadByTarget(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewNotificationRepository(db)
statsRepo := repository.NewNotificationStatsRepository(db)
ctx := context.Background()
userID, starID, targetID := int64(990003), int64(1), int64(7777)
cleanup := func() {
db.ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1 AND star_id=$2`, userID, starID)
db.ExecContext(ctx, `DELETE FROM public.notification_stats WHERE user_id=$1 AND star_id=$2`, userID, starID)
}
cleanup()
defer cleanup()
now := time.Now().UnixMilli()
for i := 0; i < 3; i++ {
tx, _ := db.BeginTx(ctx, nil)
_, _ = repo.Create(ctx, tx, &model.Notification{
UserID: userID, StarID: starID, Type: "like",
Title: "x", Data: `{"target_id": 7777, "actor_id": 1}`,
CreatedAt: now + int64(i)*100,
})
_ = statsRepo.IncrementByType(ctx, tx, userID, starID, "like", now)
tx.Commit()
}
tx, _ := db.BeginTx(ctx, nil)
affected, err := repo.MarkAsReadByTarget(ctx, tx, userID, starID, targetID, now)
_ = statsRepo.DecrementByType(ctx, tx, userID, starID, "like", int(affected), now)
tx.Commit()
assert.NoError(t, err)
assert.Equal(t, int32(3), affected)
}
- Step 2: 运行测试
cd backend
go test ./services/notificationService/repository/ -v -run TestNotificationRepository
Expected: 全部 PASS(前提是测试 DB 可用,否则 skip)。
- Step 3: Commit
git add backend/services/notificationService/repository/notification_repository_test.go
git commit -m "test(notificationService): add notification repository tests"
Task 10: 创建 notification service 业务层
Files:
-
Create:
backend/services/notificationService/service/notification_service.go -
Step 1: 写业务层
package service
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
notifPb "github.com/topfans/backend/pkg/proto/notification"
pbCommon "github.com/topfans/backend/pkg/proto/common"
"github.com/topfans/backend/services/notificationService/model"
"github.com/topfans/backend/services/notificationService/repository"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/structpb"
)
type NotificationService struct {
db *database.DB
notifRepo *repository.NotificationRepository
statsRepo *repository.NotificationStatsRepository
}
func NewNotificationService(db *database.DB) *NotificationService {
return &NotificationService{
db: db,
notifRepo: repository.NewNotificationRepository(db),
statsRepo: repository.NewNotificationStatsRepository(db),
}
}
// CreateNotification 创建通知(事务:INSERT + UPSERT stats)
func (s *NotificationService) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
// 参数校验
if req.UserId <= 0 || req.StarId <= 0 {
return errResp(codes.InvalidArgument, "user_id and star_id required"), nil
}
if req.Type == "" || req.Title == "" {
return errResp(codes.InvalidArgument, "type and title required"), nil
}
if len(req.Title) > 200 || len(req.Content) > 500 {
return errResp(codes.InvalidArgument, "title <= 200, content <= 500"), nil
}
if req.Type != "like" && req.Type != "system" && req.Type != "activity" {
return errResp(codes.InvalidArgument, "type must be like/system/activity"), nil
}
// 序列化 data
dataStr := "{}"
if req.Data != nil {
b, err := json.Marshal(req.Data.AsMap())
if err != nil {
return errResp(codes.InvalidArgument, "data marshal failed"), nil
}
dataStr = string(b)
}
now := time.Now().UnixMilli()
n := &model.Notification{
UserID: req.UserId, StarID: req.StarId, Type: req.Type,
Title: req.Title, Content: req.Content, Data: dataStr,
CreatedAt: now,
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return errResp(codes.Internal, "begin tx failed"), nil
}
defer tx.Rollback()
id, err := s.notifRepo.Create(ctx, tx, n)
if err != nil {
return errResp(codes.Internal, "create failed"), nil
}
if err := s.statsRepo.IncrementByType(ctx, tx, req.UserId, req.StarId, req.Type, now); err != nil {
return errResp(codes.Internal, "stats update failed"), nil
}
if err := tx.Commit(); err != nil {
return errResp(codes.Internal, "commit failed"), nil
}
logger.Logger.Info("notification created",
zap.Int64("id", id), zap.Int64("user_id", req.UserId),
zap.Int64("star_id", req.StarId), zap.String("type", req.Type))
return ¬ifPb.CreateNotificationResponse{
Base: okBase(), Id: id,
}, nil
}
// GetNotifications 列表查询
func (s *NotificationService) GetNotifications(ctx context.Context, userID, starID int64, ntype, tab string, page, pageSize int32) (*notifPb.GetNotificationsResponse, error) {
if page <= 0 { page = 1 }
if pageSize <= 0 { pageSize = 20 }
if pageSize > 100 { pageSize = 100 }
var items []*notifPb.Notification
var total int64
var err error
if ntype == "like" {
aggItems, aggTotal, e := s.notifRepo.ListLikesAggregated(ctx, userID, starID, tab, int(page), int(pageSize))
if e != nil {
return errResp(codes.Internal, "list likes failed"), nil
}
total = aggTotal
for _, a := range aggItems {
// 生成聚合标题:"{actor_names} 等 N 人赞了你的《{asset_title}》"
// asset_title 在 data 字段里(social service 写入时已塞入)
assetTitle := extractAssetTitle(a.Data)
a.Title = buildAggregatedLikeTitle(a.Actors, a.TotalCount, assetTitle)
items = append(items, aggToProto(a))
}
} else {
// system / activity / 空 = 全部
if ntype == "" {
// 一次查 system 和 activity(合并不优雅,先支持 type 非空场景;空 type 暂不实现,留待 V1.1)
return errResp(codes.InvalidArgument, "type is required (like/system/activity)"), nil
}
rows, t, e := s.notifRepo.ListSystemActivity(ctx, userID, starID, ntype, tab, int(page), int(pageSize))
if e != nil {
return errResp(codes.Internal, "list failed"), nil
}
total = t
for _, r := range rows {
items = append(items, rawToProto(r))
}
}
return ¬ifPb.GetNotificationsResponse{
Base: okBase(),
Items: items, Total: total,
Page: page, PageSize: pageSize,
}, nil
}
// GetUnreadCount
func (s *NotificationService) GetUnreadCount(ctx context.Context, userID, starID int64) (*notifPb.GetUnreadCountResponse, error) {
s2, err := s.statsRepo.Get(ctx, userID, starID)
if err != nil {
return errResp(codes.Internal, "get stats failed"), nil
}
return ¬ifPb.GetUnreadCountResponse{
Base: okBase(),
Counts: ¬ifPb.UnreadCount{
Like: int32(s2.LikeUnreadCount),
System: int32(s2.SystemUnreadCount),
Activity: int32(s2.ActivityUnreadCount),
Total: int32(s2.TotalUnreadCount),
},
}, nil
}
// MarkAsRead 标单条已读(自动判断 type,走对应 stats)
func (s *NotificationService) MarkAsRead(ctx context.Context, userID, starID, id, now int64) (*notifPb.MarkAsReadResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
ntype, isRead, err := s.notifRepo.GetTypeByID(ctx, tx, id, userID, starID)
if err != nil {
return errResp(codes.Internal, "lookup failed"), nil
}
if ntype == "" {
return errResp(codes.NotFound, "notification not found"), nil
}
if isRead {
return ¬ifPb.MarkAsReadResponse{Base: okBase()}, nil
}
affected, err := s.notifRepo.MarkAsReadByID(ctx, tx, userID, starID, id, now)
if err != nil {
return errResp(codes.Internal, "mark failed"), nil
}
if affected > 0 {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, ntype, int(affected), now)
}
tx.Commit()
return ¬ifPb.MarkAsReadResponse{Base: okBase()}, nil
}
// MarkAsReadByTarget 标某 target 下所有未读 like 已读
func (s *NotificationService) MarkAsReadByTarget(ctx context.Context, userID, starID, targetID, now int64) (*notifPb.MarkAsReadByTargetResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
affected, err := s.notifRepo.MarkAsReadByTarget(ctx, tx, userID, starID, targetID, now)
if err != nil {
return errResp(codes.Internal, "mark failed"), nil
}
if affected > 0 {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, "like", int(affected), now)
}
tx.Commit()
return ¬ifPb.MarkAsReadByTargetResponse{Base: okBase(), Affected: affected}, nil
}
// MarkAllAsRead
func (s *NotificationService) MarkAllAsRead(ctx context.Context, userID, starID int64, ntype string, now int64) (*notifPb.MarkAllAsReadResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
affected, err := s.notifRepo.MarkAllAsRead(ctx, tx, userID, starID, ntype, now)
if err != nil {
return errResp(codes.Internal, "mark all failed"), nil
}
if affected > 0 && ntype != "" {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, ntype, int(affected), now)
} else if ntype == "" {
_ = s.statsRepo.ResetByType(ctx, tx, userID, starID, "", now)
}
tx.Commit()
return ¬ifPb.MarkAllAsReadResponse{Base: okBase(), Affected: affected}, nil
}
// DeleteNotification 软删单条
func (s *NotificationService) DeleteNotification(ctx context.Context, userID, starID, id, now int64) (*notifPb.DeleteNotificationResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
ntype, isRead, err := s.notifRepo.GetTypeByID(ctx, tx, id, userID, starID)
if err != nil {
return errResp(codes.Internal, "lookup failed"), nil
}
if ntype == "" {
return errResp(codes.NotFound, "notification not found"), nil
}
affected, err := s.notifRepo.SoftDeleteByID(ctx, tx, userID, starID, id)
if err != nil {
return errResp(codes.Internal, "delete failed"), nil
}
if affected > 0 && !isRead {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, ntype, int(affected), now)
}
tx.Commit()
return ¬ifPb.DeleteNotificationResponse{Base: okBase()}, nil
}
// DeleteByTarget 软删某 target 下所有 like
func (s *NotificationService) DeleteByTarget(ctx context.Context, userID, starID, targetID, now int64) (*notifPb.DeleteByTargetResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 先查未读数
var unreadCount int
err := tx.QueryRowContext(ctx, `
SELECT COUNT(*) FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like'
AND (data->>'target_id')::bigint = $3
AND is_read=FALSE AND is_deleted=FALSE
`, userID, starID, targetID).Scan(&unreadCount)
if err != nil {
return errResp(codes.Internal, "count failed"), nil
}
affected, err := s.notifRepo.SoftDeleteByTarget(ctx, tx, userID, starID, targetID)
if err != nil {
return errResp(codes.Internal, "delete failed"), nil
}
if unreadCount > 0 {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, "like", unreadCount, now)
}
tx.Commit()
return ¬ifPb.DeleteByTargetResponse{Base: okBase(), Affected: affected}, nil
}
// ===== helpers =====
func okBase() *pbCommon.BaseResponse {
return &pbCommon.BaseResponse{Code: uint32(codes.OK), Message: "success", Timestamp: time.Now().UnixMilli()}
}
func errResp(c codes.Code, msg string) *notifPb.CreateNotificationResponse {
return ¬ifPb.CreateNotificationResponse{Base: &pbCommon.BaseResponse{Code: uint32(c), Message: msg, Timestamp: time.Now().UnixMilli()}}
}
func aggToProto(a *model.AggregatedNotification) *notifPb.Notification {
// 把 Data JSON 字符串反序列化为 Struct
dataStruct, _ := structpb.NewStruct(nil)
if a.Data != "" {
var m map[string]interface{}
if err := json.Unmarshal([]byte(a.Data), &m); err == nil {
dataStruct, _ = structpb.NewStruct(m)
}
}
actors := make([]*notifPb.ActorPreview, 0, len(a.Actors))
for _, x := range a.Actors {
actors = append(actors, ¬ifPb.ActorPreview{
UserId: x.UserID, Nickname: x.Nickname, Avatar: x.Avatar, LikedAt: x.LikedAt,
})
}
return ¬ifPb.Notification{
Id: a.ID, UserId: a.UserID, StarId: a.StarID, Type: a.Type,
Title: a.Title, Content: a.Content, Data: dataStruct,
IsRead: a.IsRead, CreatedAt: a.CreatedAt, ReadAt: a.ReadAt,
Aggregated: true, TotalCount: a.TotalCount, Actors: actors, TargetId: a.TargetID,
}
}
func rawToProto(n *model.Notification) *notifPb.Notification {
dataStruct, _ := structpb.NewStruct(nil)
if n.Data != "" {
var m map[string]interface{}
if err := json.Unmarshal([]byte(n.Data), &m); err == nil {
dataStruct, _ = structpb.NewStruct(m)
}
}
return ¬ifPb.Notification{
Id: n.ID, UserId: n.UserID, StarId: n.StarID, Type: n.Type,
Title: n.Title, Content: n.Content, Data: dataStruct,
IsRead: n.IsRead, CreatedAt: n.CreatedAt, ReadAt: n.ReadAt,
}
}
// extractAssetTitle 从 data JSON 字符串中取 asset_title
func extractAssetTitle(dataStr string) string {
if dataStr == "" {
return ""
}
var m map[string]interface{}
if err := json.Unmarshal([]byte(dataStr), &m); err != nil {
return ""
}
if v, ok := m["asset_title"].(string); ok {
return v
}
return ""
}
// buildAggregatedLikeTitle 构造 like 聚合标题
// 1 个 actor: "{name} 赞了你的《{title}》"
// 2 个 actor: "{name1}、{name2} 赞了你的《{title}》"
// 3+ 个 actor: "{name1}、{name2} 等 N 人赞了你的《{title}》"
func buildAggregatedLikeTitle(actors []model.ActorPreview, total int32, assetTitle string) string {
names := make([]string, 0, 3)
for i, a := range actors {
if i >= 3 {
break
}
name := a.Nickname
if name == "" {
name = fmt.Sprintf("用户%d", a.UserID)
}
names = append(names, name)
}
title := assetTitle
if title == "" {
title = "你的藏品"
}
switch len(names) {
case 0:
return fmt.Sprintf("有 %d 人赞了你的《%s》", total, title)
case 1:
return fmt.Sprintf("%s 赞了你的《%s》", names[0], title)
case 2:
return fmt.Sprintf("%s、%s 赞了你的《%s》", names[0], names[1], title)
default:
return fmt.Sprintf("%s、%s 等 %d 人赞了你的《%s》", names[0], names[1], total, title)
}
}
- Step 2: 编译验证
cd backend
go build ./services/notificationService/...
Expected: 编译通过(可能需要 import "google.golang.org/protobuf/types/known/structpb")。
- Step 3: Commit
git add backend/services/notificationService/service/notification_service.go
git commit -m "feat(notificationService): add notification service business layer"
Task 11: 写 notification service 单元测试
Files:
-
Create:
backend/services/notificationService/service/notification_service_test.go -
Step 1: 写测试
package service_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/topfans/backend/pkg/database"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"github.com/topfans/backend/services/notificationService/service"
)
func setupTestService(t *testing.T) *service.NotificationService {
t.Helper()
db, err := database.NewFromEnv()
if err != nil {
t.Skipf("test DB not available: %v", err)
}
return service.NewNotificationService(db)
}
func TestCreateNotification_Validation(t *testing.T) {
svc := setupTestService(t)
ctx := context.Background()
cases := []struct{ name string; req *notifPb.CreateNotificationRequest; expectCode uint32 }{
{"missing user", ¬ifPb.CreateNotificationRequest{StarId: 1, Type: "like", Title: "x"}, 3},
{"missing star", ¬ifPb.CreateNotificationRequest{UserId: 1, Type: "like", Title: "x"}, 3},
{"missing type", ¬ifPb.CreateNotificationRequest{UserId: 1, StarId: 1, Title: "x"}, 3},
{"bad type", ¬ifPb.CreateNotificationRequest{UserId: 1, StarId: 1, Type: "x", Title: "x"}, 3},
{"title too long", ¬ifPb.CreateNotificationRequest{UserId: 1, StarId: 1, Type: "like", Title: string(make([]byte, 201))}, 3},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
resp, _ := svc.CreateNotification(ctx, c.req)
assert.Equal(t, c.expectCode, resp.Base.Code)
})
}
}
func TestCreateNotification_TransactionRollback(t *testing.T) {
svc := setupTestService(t)
ctx := context.Background()
userID, starID := int64(990100), int64(1)
cleanup := func() {
svc.DB().ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1`, userID)
svc.DB().ExecContext(ctx, `DELETE FROM public.notification_stats WHERE user_id=$1`, userID)
}
cleanup()
defer cleanup()
// 正常创建
req := ¬ifPb.CreateNotificationRequest{
UserId: userID, StarId: starID, Type: "like", Title: "x", Content: "y",
}
resp, _ := svc.CreateNotification(ctx, req)
assert.Equal(t, uint32(0), resp.Base.Code)
assert.Greater(t, resp.Id, int64(0))
// 验证 stats 也写入
cntResp, _ := svc.GetUnreadCount(ctx, userID, starID)
assert.Equal(t, int32(1), cntResp.Counts.Like)
}
- Step 2: 运行测试
cd backend
go test ./services/notificationService/service/ -v
Expected: PASS(前提是测试 DB 可用)。
- Step 3: Commit
git add backend/services/notificationService/service/notification_service_test.go
git commit -m "test(notificationService): add service unit tests"
Task 12: 创建 notification provider(RPC 层)
Files:
-
Create:
backend/services/notificationService/provider/notification_provider.go -
Step 1: 写 provider
package provider
import (
"context"
"fmt"
"strconv"
"time"
commonPb "github.com/topfans/backend/pkg/proto/common"
"github.com/topfans/backend/pkg/logger"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"github.com/topfans/backend/services/notificationService/service"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)
type NotificationProvider struct {
notifPb.UnimplementedNotificationServiceServer
svc *service.NotificationService
}
func NewNotificationProvider(svc *service.NotificationService) *NotificationProvider {
return &NotificationProvider{svc: svc}
}
func (p *NotificationProvider) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
return p.svc.CreateNotification(ctx, req)
}
func (p *NotificationProvider) GetNotifications(ctx context.Context, req *notifPb.GetNotificationsRequest) (*notifPb.GetNotificationsResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return ¬ifPb.GetNotificationsResponse{Base: errBase(err)}, nil
}
return p.svc.GetNotifications(ctx, userID, starID, req.Type, req.Tab, req.Page, req.PageSize)
}
func (p *NotificationProvider) GetUnreadCount(ctx context.Context, req *notifPb.GetUnreadCountRequest) (*notifPb.GetUnreadCountResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return ¬ifPb.GetUnreadCountResponse{Base: errBase(err)}, nil
}
return p.svc.GetUnreadCount(ctx, userID, starID)
}
func (p *NotificationProvider) MarkAsRead(ctx context.Context, req *notifPb.MarkAsReadRequest) (*notifPb.MarkAsReadResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return ¬ifPb.MarkAsReadResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.MarkAsRead(ctx, userID, starID, req.Id, now)
}
func (p *NotificationProvider) MarkAsReadByTarget(ctx context.Context, req *notifPb.MarkAsReadByTargetRequest) (*notifPb.MarkAsReadByTargetResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return ¬ifPb.MarkAsReadByTargetResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.MarkAsReadByTarget(ctx, userID, starID, req.TargetId, now)
}
func (p *NotificationProvider) MarkAllAsRead(ctx context.Context, req *notifPb.MarkAllAsReadRequest) (*notifPb.MarkAllAsReadResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return ¬ifPb.MarkAllAsReadResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.MarkAllAsRead(ctx, userID, starID, req.Type, now)
}
func (p *NotificationProvider) DeleteNotification(ctx context.Context, req *notifPb.DeleteNotificationRequest) (*notifPb.DeleteNotificationResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return ¬ifPb.DeleteNotificationResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.DeleteNotification(ctx, userID, starID, req.Id, now)
}
func (p *NotificationProvider) DeleteByTarget(ctx context.Context, req *notifPb.DeleteByTargetRequest) (*notifPb.DeleteByTargetResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return ¬ifPb.DeleteByTargetResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.DeleteByTarget(ctx, userID, starID, req.TargetId, now)
}
// extractUserStarFromContext 从 gRPC metadata 提取 user_id / star_id
// gateway 端通过 metadata.NewOutgoingContext 注入 x-user-id / x-star-id
// 实际格式参考 social service provider 已有实现
func extractUserStarFromContext(ctx context.Context) (int64, int64, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return 0, 0, fmt.Errorf("no metadata in context")
}
uidStrs := md.Get("x-user-id")
sidStrs := md.Get("x-star-id")
if len(uidStrs) == 0 || len(sidStrs) == 0 {
return 0, 0, fmt.Errorf("missing user_id or star_id in metadata")
}
uid, err := strconv.ParseInt(uidStrs[0], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid user_id: %w", err)
}
sid, err := strconv.ParseInt(sidStrs[0], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid star_id: %w", err)
}
return uid, sid, nil
}
func errBase(err error) *commonPb.BaseResponse {
return &commonPb.BaseResponse{Code: 13, Message: err.Error(), Timestamp: time.Now().UnixMilli()}
}
- Step 2: 编译验证
cd backend
go build ./services/notificationService/...
Expected: 编译通过。
- Step 3: Commit
git add backend/services/notificationService/provider/
git commit -m "feat(notificationService): add RPC provider"
Task 13: 创建 notification service main.go
Files:
-
Create:
backend/services/notificationService/main.go -
Step 1: 复制 socialService/main.go 模板
cp backend/services/socialService/main.go backend/services/notificationService/main.go
- Step 2: 修改包名、import、注册 service
调整:
package main保持不变- import 改为
notificationService "github.com/topfans/backend/services/notificationService" - 端口
20010 - DB 配置与 social 一致
- 服务注册:
srv, _ := server.NewServer(...)+ 注册notificationPb.RegisterNotificationServiceServer(srv, notificationProvider)
具体改完后代码结构与 social 保持一致,注册逻辑参考 socialService main.go line 80-150。
- Step 3: 编译验证
cd backend
go build ./services/notificationService/
Expected: 编译通过;生成 notificationService 可执行文件。
- Step 4: 启动测试(可选,本地环境)
cd backend
DB_HOST=localhost DB_PORT=5432 DB_USER=postgres DB_PASSWORD=xxx DB_NAME=top-fans \
./bin/notificationService -port 20010 &
sleep 2
curl http://localhost:20010/health
Expected: 进程启动,health 200。
-
Step 5: 回归检查
-
query_graph pattern=callers_of target=NotificationService确认未影响其他服务 -
既有 social/asset 等服务编译仍通过:
cd backend && go build ./... -
Step 6: Commit
git add backend/services/notificationService/main.go
git commit -m "feat(notificationService): add main entry with dubbo bootstrap"
阶段 4:social service 集成
Task 14: 创建 notification client(social 端)
Files:
-
Create:
backend/services/socialService/client/notification_client.go -
Step 1: 写 client
package client
import (
"context"
"fmt"
"dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"go.uber.org/zap"
)
type NotificationClient struct {
client notifPb.NotificationService
logger *zap.Logger
}
func NewNotificationClient(serviceURL string, logger *zap.Logger) (*NotificationClient, error) {
cli, err := client.NewClient(client.WithClientURL(serviceURL))
if err != nil {
return nil, fmt.Errorf("failed to create dubbo client: %w", err)
}
notifClient, err := notifPb.NewNotificationService(cli)
if err != nil {
return nil, fmt.Errorf("failed to create notification service client: %w", err)
}
return &NotificationClient{client: notifClient, logger: logger}, nil
}
func (c *NotificationClient) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
resp, err := c.client.CreateNotification(ctx, req)
if err != nil {
c.logger.Error("Failed to create notification", zap.Error(err))
return nil, err
}
return resp, nil
}
- Step 2: 编译验证
cd backend
go build ./services/socialService/client/...
Expected: 编译通过。
- Step 3: Commit
git add backend/services/socialService/client/notification_client.go
git commit -m "feat(socialService): add notification dubbo client"
Task 15: 注入 NotificationClient 到 social service
Files:
-
Modify:
backend/services/socialService/main.go -
Modify:
backend/services/socialService/service/asset_like_service.go -
Step 1: 在 main.go 加 URL flag 与 client 初始化
参考 assetServiceURL 添加:
notificationServiceURL = flag.String("notification-service-url", getEnv("NOTIFICATION_SERVICE_URL", "tri://localhost:20010"), "Notification service URL")
在 main 函数 NewAssetLikeService 处加:
notifClient, err := client.NewNotificationClient(*notificationServiceURL, logger.Logger)
if err != nil {
logger.Sugar.Fatalf("Failed to create notification client: %v", err)
}
修改 NewAssetLikeService 调用,把 notifClient 注入:
assetLikeService := service.NewAssetLikeService(assetClient, socialRepo, notifClient)
- Step 2: 修改
AssetLikeService结构体
在 backend/services/socialService/service/asset_like_service.go line 22-26:
type AssetLikeService struct {
assetClient *client.AssetClient
socialRepo repository.SocialRepository
notificationClient *client.NotificationClient
userClient UserServiceClient // 新增(如尚未注入)
}
func NewAssetLikeService(assetClient *client.AssetClient, socialRepo repository.SocialRepository, notificationClient *client.NotificationClient) *AssetLikeService {
return &AssetLikeService{
assetClient: assetClient,
socialRepo: socialRepo,
notificationClient: notificationClient,
}
}
- Step 3: 修改 LikeAsset 末尾加通知调用
在 LikeAsset 方法 line 132 之前(return 之前)插入:
// 创建点赞通知(失败仅日志,不影响点赞主路径)
s.createLikeNotification(ctx, getAssetResp, assetID, userID, starID)
在文件末尾添加 createLikeNotification 私有方法:
func (s *AssetLikeService) createLikeNotification(ctx context.Context, asset *assetPb.GetAssetForRPCResponse, assetID, userID, starID int64) {
// 跳过异常数据
if asset.OwnerUid <= 0 {
logger.Logger.Warn("skip notification: invalid owner_uid",
zap.Int64("asset_id", assetID), zap.Int64("owner_uid", asset.OwnerUid))
return
}
// 查 actor 信息
actorName := strconv.FormatInt(userID, 10)
var actorAvatar string
if s.userClient != nil {
if actorInfo, err := s.userClient.GetUsersByIDs(ctx, []int64{userID}, starID); err == nil {
if info, exists := actorInfo[userID]; exists && info != nil {
actorName = info.Nickname
actorAvatar = info.Avatar
}
} else {
logger.Logger.Warn("failed to fetch actor info for notification",
zap.Int64("user_id", userID), zap.Error(err))
}
}
// 构造 data
data := map[string]interface{}{
"target_type": "asset",
"target_id": assetID,
"actor_id": userID,
"actor_name": actorName,
"actor_avatar": actorAvatar,
"asset_title": asset.Name,
"asset_cover": asset.CoverUrl,
"star_id": starID,
}
dataStruct, err := structpb.NewStruct(data)
if err != nil {
logger.Logger.Error("failed to build notification data struct",
zap.Int64("asset_id", assetID), zap.Error(err))
return
}
// 同步调 notification service
_, notifErr := s.notificationClient.CreateNotification(ctx, ¬ifPb.CreateNotificationRequest{
UserId: asset.OwnerUid,
StarId: starID,
Type: "like",
Title: "新点赞",
Content: fmt.Sprintf("%s 点赞了你的藏品", actorName),
Data: dataStruct,
})
if notifErr != nil {
logger.Logger.Error("failed to create like notification (like itself succeeded)",
zap.Int64("asset_id", assetID),
zap.Int64("actor_id", userID),
zap.Int64("owner_uid", asset.OwnerUid),
zap.Error(notifErr),
)
return
}
logger.Logger.Info("like notification created",
zap.Int64("asset_id", assetID), zap.Int64("owner_uid", asset.OwnerUid))
}
并在 import 块加 "google.golang.org/protobuf/types/known/structpb"。
- Step 4: 编译验证
cd backend
go build ./services/socialService/...
Expected: 编译通过。
-
Step 5: 回归检查
-
query_graph pattern=callers_of target=LikeAsset确认调用方无破坏 -
跑 social service 既有测试:
go test ./services/socialService/ -v -
Step 6: Commit
git add backend/services/socialService/
git commit -m "feat(socialService): trigger notification on like (degrade on failure)"
Task 16: 写点赞通知失败不影响主路径的测试
Files:
-
Create/Modify:
backend/services/socialService/service/asset_like_service_test.go -
Create:
backend/services/socialService/client/notification_client_mock.go(mock 工具,与 user_rpc_client.go 已有 MockUserServiceClient 模式一致) -
Step 1: 先加 Mock 工具类
backend/services/socialService/client/notification_client_mock.go:
package client
import (
"context"
"sync"
notifPb "github.com/topfans/backend/pkg/proto/notification"
)
// MockNotificationClient 通知客户端 mock(用于测试)
type MockNotificationClient struct {
mu sync.Mutex
CreateErr error
CreateCallCount int32
LastRequest *notifPb.CreateNotificationRequest
}
func (m *MockNotificationClient) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.CreateCallCount++
m.LastRequest = req
if m.CreateErr != nil {
return nil, m.CreateErr
}
return ¬ifPb.CreateNotificationResponse{Id: 1}, nil
}
为了让 AssetLikeService 接受 mock(而非具体类型 *NotificationClient),需要把 notificationClient 字段改为接口类型。改 asset_like_service.go:
type NotificationClientInterface interface {
CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error)
}
type AssetLikeService struct {
assetClient *client.AssetClient
socialRepo repository.SocialRepository
notificationClient client.NotificationClientInterface // 改为接口
userClient UserServiceClient
}
- Step 2: 加测试用例
func TestLikeAsset_NotificationFailureDoesNotFailLike(t *testing.T) {
// 注入 mock notification client 返回 error
// 期望 LikeAsset 仍返回成功
mockNotif := &client.MockNotificationClient{
CreateErr: fmt.Errorf("notification service down"),
}
svc := NewAssetLikeService(realAssetClient, realSocialRepo, mockNotif)
// 准备 mock assetClient 返回 GetAssetForRPC OK
newCount, err := svc.LikeAsset(ctx, assetID, userID, starID)
assert.NoError(t, err)
assert.Greater(t, newCount, int32(0))
}
func TestLikeAsset_NotificationSuccess(t *testing.T) {
mockNotif := &MockNotificationClient{}
svc := NewAssetLikeService(realAssetClient, realSocialRepo, mockNotif)
newCount, err := svc.LikeAsset(ctx, assetID, userID, starID)
assert.NoError(t, err)
assert.Equal(t, int32(1), mockNotif.CreateCallCount)
}
(具体 mock 实现参考 social service 既有的 user_rpc_client.go 中 MockUserServiceClient 模式)
- Step 2: 运行测试
cd backend
go test ./services/socialService/service/ -v -run TestLikeAsset
Expected: PASS。
- Step 3: Commit
git add backend/services/socialService/service/asset_like_service_test.go
git commit -m "test(socialService): verify notification failure doesn't fail like"
阶段 5:Admin 后端集成
Task 17: 创建 admin 端 Notification ORM 模型
Files:
-
Create:
TopFans-activity-admin/backend/models/notification.py -
Step 1: 写 model
from sqlalchemy import Column, BigInteger, String, Boolean, Integer, JSON
from database import Base
class Notification(Base):
__tablename__ = "notifications"
id = Column(BigInteger, primary_key=True, index=True)
user_id = Column(BigInteger, nullable=False, index=True)
star_id = Column(BigInteger, nullable=False)
type = Column(String(20), nullable=False)
title = Column(String(200), nullable=False)
content = Column(String(500))
data = Column(JSON)
is_read = Column(Boolean, default=False, nullable=False)
is_deleted = Column(Boolean, default=False, nullable=False)
created_at = Column(BigInteger, nullable=False)
read_at = Column(BigInteger)
class NotificationStats(Base):
__tablename__ = "notification_stats"
user_id = Column(BigInteger, primary_key=True)
star_id = Column(BigInteger, primary_key=True)
like_unread_count = Column(Integer, default=0, nullable=False)
system_unread_count = Column(Integer, default=0, nullable=False)
activity_unread_count = Column(Integer, default=0, nullable=False)
total_unread_count = Column(Integer, default=0, nullable=False)
updated_at = Column(BigInteger, nullable=False)
- Step 2: 在
models/__init__.py注册
TopFans-activity-admin/backend/models/__init__.py 追加:
from .notification import Notification, NotificationStats
- Step 3: Commit
cd TopFans-activity-admin
git add backend/models/
git commit -m "feat(admin): add Notification and NotificationStats ORM models"
Task 18: 创建 admin 端 notification CRUD
Files:
-
Create:
TopFans-activity-admin/backend/crud/notification_crud.py -
Step 1: 写 CRUD
import time
from typing import Optional, List, Literal
from sqlalchemy.orm import Session
from sqlalchemy import text
from models.notification import Notification, NotificationStats
from models.models import User # 假设有 User 模型;如无则用 raw SQL
import logging
logger = logging.getLogger(__name__)
def _now_ms() -> int:
return int(time.time() * 1000)
def _resolve_recipients(
db: Session,
target_type: Literal["user", "star", "all"],
target_value: Optional[int],
) -> List[int]:
"""根据 target_type 解析接收方 user_id 列表"""
if target_type == "user":
return [target_value] if target_value else []
if target_type == "star":
# 查所有 star_id = target_value 的用户的 user_id
rows = db.execute(
text("SELECT user_id FROM user_fan_profiles WHERE star_id = :star_id"),
{"star_id": target_value},
).fetchall()
return [r[0] for r in rows]
if target_type == "all":
rows = db.execute(text("SELECT id FROM users WHERE is_active = TRUE")).fetchall()
return [r[0] for r in rows]
return []
def _upsert_stats(db: Session, user_id: int, star_id: int, ntype: str, delta: int, now: int):
col = {"like": "like_unread_count", "system": "system_unread_count", "activity": "activity_unread_count"}[ntype]
sql = f"""
INSERT INTO notification_stats (user_id, star_id, {col}, total_unread_count, updated_at)
VALUES (:uid, :sid, :delta, :delta, :now)
ON CONFLICT (user_id, star_id) DO UPDATE SET
{col} = notification_stats.{col} + :delta,
total_unread_count = notification_stats.total_unread_count + :delta,
updated_at = :now
"""
db.execute(text(sql), {"uid": user_id, "sid": star_id, "delta": delta, "now": now})
def create_system_notification(
db: Session,
*,
title: str,
content: str,
target_type: Literal["user", "star", "all"],
target_value: Optional[int],
data: Optional[dict] = None,
) -> int:
"""创建系统通知;返回插入条数"""
recipients = _resolve_recipients(db, target_type, target_value)
if not recipients:
logger.warning("no recipients for system notification")
return 0
now = _now_ms()
for uid in recipients:
n = Notification(
user_id=uid, star_id=1, # 系统通知默认 star_id=1;如有多 star 需调整
type="system", title=title, content=content, data=data,
created_at=now,
)
db.add(n)
_upsert_stats(db, uid, 1, "system", 1, now)
db.commit()
return len(recipients)
def create_activity_notification(
db: Session,
*,
activity_id: int,
activity_title: str,
activity_cover: str,
recipients: List[int],
data: Optional[dict] = None,
) -> int:
"""创建活动通知;返回插入条数"""
if not recipients:
return 0
now = _now_ms()
payload = {
"activity_id": activity_id,
"activity_title": activity_title,
"activity_cover": activity_cover,
**(data or {}),
}
for uid in recipients:
n = Notification(
user_id=uid, star_id=1,
type="activity",
title=activity_title,
content="新活动上线",
data=payload,
created_at=now,
)
db.add(n)
_upsert_stats(db, uid, 1, "activity", 1, now)
db.commit()
return len(recipients)
- Step 2: 写测试
TopFans-activity-admin/backend/crud/notification_crud_test.py:
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database import Base
import crud.notification_crud as nc
import time
@pytest.fixture
def db():
engine = create_engine("postgresql://postgres:test@localhost:5432/top_fans_test")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
def test_create_system_notification_single_user(db):
count = nc.create_system_notification(
db, title="维护通知", content="今晚 23:00 维护",
target_type="user", target_value=12345,
)
assert count == 1
# 验证 stats
row = db.execute("SELECT system_unread_count FROM notification_stats WHERE user_id=12345").first()
assert row[0] == 1
def test_create_system_notification_all(db):
# 假设测试环境 users 表至少有 3 个 is_active=TRUE 用户
count = nc.create_system_notification(
db, title="全量广播", content="重要通知",
target_type="all", target_value=None,
)
assert count >= 3
- Step 3: 运行测试
cd TopFans-activity-admin/backend
source venv/bin/activate
pytest crud/notification_crud_test.py -v
Expected: PASS。
- Step 4: Commit
cd TopFans-activity-admin
git add backend/crud/notification_crud.py backend/crud/notification_crud_test.py
git commit -m "feat(admin): add notification CRUD with multi-target broadcast"
Task 19: 创建 admin 端 notification handler
Files:
-
Create:
TopFans-activity-admin/backend/handlers/notification.py -
Step 1: 写 handler
from typing import Optional, Literal
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from database import get_db
from middleware.auth import require_admin # 复用现有 admin 鉴权
import crud.notification_crud as nc
router = APIRouter(prefix="/api/v1/admin/notifications", tags=["admin-notifications"])
class CreateSystemNotificationReq(BaseModel):
title: str
content: str
target_type: Literal["user", "star", "all"]
target_value: Optional[int] = None
data: Optional[dict] = None
@router.post("")
def create_system_notification(
req: CreateSystemNotificationReq,
db: Session = Depends(get_db),
admin=Depends(require_admin),
):
if not req.title or len(req.title) > 200:
raise HTTPException(400, "title required, <= 200 chars")
if req.content and len(req.content) > 500:
raise HTTPException(400, "content <= 500 chars")
if req.target_type in ("user", "star") and not req.target_value:
raise HTTPException(400, "target_value required for user/star type")
count = nc.create_system_notification(
db, title=req.title, content=req.content,
target_type=req.target_type, target_value=req.target_value,
data=req.data,
)
return {"affected": count}
@router.post("/activities/{activity_id}/broadcast")
def broadcast_activity(
activity_id: int,
db: Session = Depends(get_db),
admin=Depends(require_admin),
):
# 查 activity 信息
from models.models import MintingActivity
activity = db.query(MintingActivity).filter(MintingActivity.id == activity_id).first()
if not activity:
raise HTTPException(404, "activity not found")
# 解析接收方
recipients = nc._resolve_recipients(db, target_type="all", target_value=None)
count = nc.create_activity_notification(
db,
activity_id=activity_id,
activity_title=activity.title,
activity_cover=activity.icon_url or "",
recipients=recipients,
)
return {"affected": count}
- Step 2: 注册路由
修改 TopFans-activity-admin/backend/router/__init__.py:
from handlers import auth, activity, ..., notification
api_router.include_router(notification.router)
- Step 3: 测试启动
cd TopFans-activity-admin/backend
source venv/bin/activate
uvicorn main:app --reload --port 8080
curl -X POST http://localhost:8080/api/v1/admin/notifications \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"test","content":"hi","target_type":"user","target_value":12345}'
Expected: 200 + {"affected": 1}。
- Step 4: Commit
cd TopFans-activity-admin
git add backend/handlers/notification.py backend/router/__init__.py
git commit -m "feat(admin): add notification admin HTTP endpoints"
Task 20: admin activity handler 自动 broadcast
Files:
-
Modify:
TopFans-activity-admin/backend/handlers/activity.py -
Step 1: 定位 create activity 函数
grep -n "def create_minting_activity\|def.*create.*activity" \
TopFans-activity-admin/backend/handlers/activity.py
- Step 2: 在创建成功后自动 broadcast
参考 spec §7.6 写法,在 commit 之前插入 broadcast 调用:
# activity 创建成功后立即广播
if getattr(payload, "auto_notify", True):
from crud import notification_crud
from handlers.notification import _resolve_recipients # 或直接调用 crud 内函数
recipients = notification_crud._resolve_recipients(db, "all", None)
notification_crud.create_activity_notification(
db, activity_id=created.id,
activity_title=created.title,
activity_cover=created.icon_url or "",
recipients=recipients,
)
# 然后 db.commit()
- Step 3: 测试
cd TopFans-activity-admin/backend
source venv/bin/activate
uvicorn main:app --reload --port 8080
# 通过 admin UI 或 curl 创建 activity
# 验证 notification_stats 的 activity_unread_count +N
-
Step 4: 回归检查
-
admin activity 既有测试仍通过
-
创建 activity 接口契约不变(auto_notify 字段可选,默认 True)
-
Step 5: Commit
cd TopFans-activity-admin
git add backend/handlers/activity.py
git commit -m "feat(admin): auto-broadcast activity notification on creation"
阶段 6:Gateway 路由配置
Task 21: gateway 路由注册 notification 路径
Files:
-
Modify:
backend/gateway/router/router.go -
Step 1: 定位 social 路由分组
grep -n 'v1.Group("/social")\|v1.Group("/assets")' backend/gateway/router/router.go
- Step 2: 在 social 路由后插入 notifications 路由组
notifications := v1.Group("/notifications")
{
notifications.GET("", controller.NotificationController.GetNotifications)
notifications.GET("/unread-count", controller.NotificationController.GetUnreadCount)
notifications.POST("/:id/read", controller.NotificationController.MarkAsRead)
notifications.POST("/targets/:target_id/read", controller.NotificationController.MarkAsReadByTarget)
notifications.POST("/read-all", controller.NotificationController.MarkAllAsRead)
notifications.DELETE("/:id", controller.NotificationController.DeleteNotification)
notifications.DELETE("/targets/:target_id", controller.NotificationController.DeleteByTarget)
}
- Step 3: 实现
NotificationController
backend/gateway/controller/notification_controller.go:
package controller
import (
"strconv"
"github.com/gin-gonic/gin"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"github.com/topfans/backend/gateway/pkg/response"
"google.golang.org/grpc/metadata"
)
type NotificationController struct {
client notifPb.NotificationService
}
func NewNotificationController(client notifPb.NotificationService) *NotificationController {
return &NotificationController{client: client}
}
// withUserCtx 把 user_id / star_id 注入到 gRPC metadata(与 social 端 controller 一致)
func (c *NotificationController) withUserCtx(g *gin.Context) (userID, starID int64) {
v1, _ := g.Get("user_id")
v2, _ := g.Get("star_id")
userID, _ = v1.(int64)
starID, _ = v2.(int64)
return
}
func parseInt(s string, def int) int {
if s == "" {
return def
}
n, err := strconv.Atoi(s)
if err != nil {
return def
}
return n
}
func (c *NotificationController) GetNotifications(g *gin.Context) {
uid, sid := c.withUserCtx(g)
ctx := metadata.AppendToOutgoingContext(g.Request.Context(),
"x-user-id", strconv.FormatInt(uid, 10),
"x-star-id", strconv.FormatInt(sid, 10),
)
resp, err := c.client.GetNotifications(ctx, ¬ifPb.GetNotificationsRequest{
Type: g.Query("type"),
Tab: g.Query("tab"),
Page: int32(parseInt(g.Query("page"), 1)),
PageSize: int32(parseInt(g.Query("pageSize"), 20)),
})
if err != nil {
response.Error(g, 500, "rpc failed: "+err.Error())
return
}
response.SuccessWithCode(g, int(resp.Base.Code), resp.Base.Message, gin.H{
"items": resp.Items,
"total": resp.Total,
"page": resp.Page,
"page_size": resp.PageSize,
})
}
// GetUnreadCount / MarkAsRead / MarkAsReadByTarget / MarkAllAsRead / DeleteNotification / DeleteByTarget
// 模式与 GetNotifications 相同:withUserCtx 注入 metadata + 调用 RPC + response 输出
// 此处省略重复代码(参考 social_controller.go 中的现有方法)
- Step 4: 编译验证
cd backend
go build ./gateway/...
Expected: 编译通过。
- Step 5: Commit
git add backend/gateway/
git commit -m "feat(gateway): register notification HTTP routes"
Task 22: gateway config 加 notification backend
Files:
-
Modify:
backend/gateway/config/config.yaml -
Step 1: 加 notification service URL
services:
asset:
url: tri://localhost:20003
social:
url: tri://localhost:20002
user:
url: tri://localhost:20000
notification:
url: tri://localhost:20010
- Step 2: 启动 gateway 验证
cd backend
./start.sh gateway
curl http://localhost:8080/api/v1/notifications/unread-count -H "Authorization: Bearer $JWT"
Expected: 200 + counts 响应(前提:notification service 也在运行)。
-
Step 3: 回归检查
-
gateway 启动后其他路由仍正常(如
/api/v1/social/...) -
query_graph pattern=callers_of target=NotificationController确认新 controller 无循环依赖 -
Step 4: Commit
git add backend/gateway/config/
git commit -m "feat(gateway): add notification service backend config"
阶段 7:整体回归与验收
Task 23: 整体回归检查(CLAUDE.md 强制)
- Step 1: 全量构建
cd backend
go build ./...
cd ../TopFans-activity-admin/backend
source venv/bin/activate
python -c "from main import app; print('admin import OK')"
Expected: 全部编译通过。
- Step 2: 全量测试
cd backend
go test ./services/notificationService/... -v
go test ./services/socialService/... -v
go test ./services/assetService/... -v
Expected: 全部 PASS(无 FAIL)。
cd TopFans-activity-admin/backend
source venv/bin/activate
pytest crud/notification_crud_test.py -v
Expected: PASS。
- Step 3: graph 回归
# 通过 mcp__code-review-graph__query_graph_tool
# pattern=callers_of target=GetAssetForRPC (确认 asset proto 扩展无破坏)
# pattern=callers_of target=LikeAsset (确认 social 改动无破坏)
# pattern=callers_of target=NotificationService (确认无循环依赖)
-
Step 4: 跨服务影响检查
-
social service
LikeAsset是否在 notification 失败时确实不影响主路径(手动验证 + 自动化测试) -
修复 A 引入 B 排查:
- A1: asset proto 加字段 → B1: 旧调用方零值兼容(应无影响)
- A2: social service 加 notification 调用 → B2: notification service 不可用时点赞仍成功(已加测试)
- A3: gateway 路由注册 → B3: 既有路由冲突(手工验证)
-
Step 5: 端到端冒烟
启动完整服务栈(asset / social / user / notification / gateway)后,手工验证:
- 用户 A 点赞用户 B 的藏品 → 用户 B 收到通知
- 用户 B 查看通知列表 → 看到 1 张聚合卡(如多个人都赞了同一个)
- 用户 B 点未读数 → 数字减 1
- 用户 B 标已读(MarkAsReadByTarget)→ 数字变 0
- 用户 B 删除(DeleteByTarget)→ 列表中该 target 消失
- Step 6: 验收清单交叉检查
对照 docs/superpowers/specs/2026-06-16-notification-system-design.md §12 验收清单逐项打勾。
任务依赖图
T1 (迁移)
├→ T2 (asset proto 扩展)
│ └→ T3 (asset provider 实现)
├→ T4 (notification proto)
│ └→ T5-T6 (config + model)
│ └→ T7-T8 (repositories)
│ └→ T9 (repo tests)
│ └→ T10 (service)
│ └→ T11 (service tests)
│ └→ T12 (provider)
│ └→ T13 (main)
└→ T14 (social client)
└→ T15 (social 集成)
└→ T16 (social 测试)
└→ T17-T20 (admin)
└→ T21-T22 (gateway)
└→ T23 (整体回归)
关键原则
- TDD:写测试 → 跑(红)→ 写实现 → 跑(绿)→ commit
- DRY:common 错误处理函数
okBase/errBase/extractUserStarFromContext等 - YAGNI:不实现 unique 约束、TTL、空 type=全部的查询
- 频繁 commit:每个 Task 末尾都有 commit
- 回归检查:每个修改任务末尾都有
query_graph+go build验证