topfans/docs/superpowers/plans/2026-06-16-notification-system.md
2026-06-16 21:30:58 +08:00

2829 lines
91 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 通知系统实现计划
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 实现统一通知系统,支持点赞/系统/活动三种类型,列表层按 target_id 聚合 like 通知
**Architecture:** 独立 Dubbo Notification Service事务内写通知+统计)+ social service 同步触发点赞通知 + admin 后端直写共享库触发系统/活动通知 + 列表层 GROUP BY 聚合 like
**Tech Stack:** Go 1.21+ / Dubbo-go / GORM / PostgreSQL / FastAPI + SQLAlchemy
**参考 spec:** `docs/superpowers/specs/2026-06-16-notification-system-design.md`
---
## 文件结构总览
### 新建文件
| 路径 | 职责 |
|------|------|
| `backend/migrations/2026_06_16_001_create_notifications.sql` | DB schemanotifications + notification_stats |
| `backend/proto/notification.proto` | RPC 接口定义 |
| `backend/services/notificationService/main.go` | Dubbo 启动入口 |
| `backend/services/notificationService/configs/config.yaml` | 端口 20010、DB 配置 |
| `backend/services/notificationService/provider/notification_provider.go` | RPC 实现 |
| `backend/services/notificationService/service/notification_service.go` | 业务逻辑 + 事务 |
| `backend/services/notificationService/repository/notification_repository.go` | notifications CRUD |
| `backend/services/notificationService/repository/notification_stats_repository.go` | 统计 UPSERT |
| `backend/services/notificationService/model/notification.go` | 数据模型 |
| `backend/services/notificationService/repository/notification_repository_test.go` | 仓储单测 |
| `backend/services/notificationService/service/notification_service_test.go` | 业务单测 |
| `backend/services/socialService/client/notification_client.go` | Dubbo 客户端 |
| `TopFans-activity-admin/backend/models/notification.py` | SQLAlchemy ORM |
| `TopFans-activity-admin/backend/crud/notification_crud.py` | 业务函数 |
| `TopFans-activity-admin/backend/crud/notification_crud_test.py` | crud 单测 |
| `TopFans-activity-admin/backend/handlers/notification.py` | FastAPI handler |
| `TopFans-activity-admin/backend/handlers/activity.py` | 改造:自动 broadcast |
### 修改文件
| 路径 | 改动 |
|------|------|
| `backend/proto/asset.proto` | `GetAssetForRPCResponse``name`/`cover_url` |
| `backend/services/assetService/provider/asset_provider.go` | 填充新增字段 |
| `backend/services/socialService/service/asset_like_service.go` | `LikeAsset` 末尾调 notification |
| `backend/services/socialService/main.go` | 注入 `NotificationClient` |
| `backend/services/socialService/service/asset_like_service_test.go` | 新增测试用例mock notification 失败时点赞仍成功) |
| `backend/gateway/router/router.go` | 注册 `/api/v1/notifications/*` |
| `backend/gateway/config/config.yaml` | 加 notification service backend |
| `TopFans-activity-admin/backend/router/__init__.py` | 注册 notification 路由 |
| `TopFans-activity-admin/backend/main.py` | DB schema 同步声明(如使用 SQLAlchemy create_all |
---
## 阶段 1数据库 Schema
### Task 1: 创建 notifications 迁移文件
**Files:**
- Create: `backend/migrations/2026_06_16_001_create_notifications.sql`
- [ ] **Step 1: 写迁移文件**
```sql
-- 通知系统主表 + 统计表
-- 创建时间: 2026-06-16
-- 关联: spec §4.1
-- 1. 通知主表
CREATE TABLE IF NOT EXISTS public.notifications (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
type VARCHAR(20) NOT NULL, -- like / system / activity
title VARCHAR(200) NOT NULL,
content VARCHAR(500),
data JSONB,
is_read BOOLEAN NOT NULL DEFAULT FALSE,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
created_at BIGINT NOT NULL,
read_at BIGINT
);
-- 2. 索引
CREATE INDEX IF NOT EXISTS idx_notifications_user_type_created
ON public.notifications (user_id, star_id, type, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_notifications_user_unread
ON public.notifications (user_id, star_id, is_read, created_at DESC)
WHERE is_deleted = FALSE;
-- 3. 通知统计表
CREATE TABLE IF NOT EXISTS public.notification_stats (
user_id BIGINT NOT NULL,
star_id BIGINT NOT NULL,
like_unread_count INT NOT NULL DEFAULT 0,
system_unread_count INT NOT NULL DEFAULT 0,
activity_unread_count INT NOT NULL DEFAULT 0,
total_unread_count INT NOT NULL DEFAULT 0,
updated_at BIGINT NOT NULL,
PRIMARY KEY (user_id, star_id)
);
-- 4. 序列起始值预留 10000CLAUDE.md 数据库规范)
ALTER SEQUENCE notifications_id_seq RESTART WITH 10000;
```
- [ ] **Step 2: 验证 SQL 语法**(如环境有 psql
```bash
PGPASSWORD=$DB_PASSWORD psql -h localhost -U $DB_USER -d $DB_NAME -f backend/migrations/2026_06_16_001_create_notifications.sql
```
Expected: 两条 CREATE TABLE 成功,无报错。
- [ ] **Step 3: 验证表结构**
```sql
\d public.notifications
\d public.notification_stats
SELECT last_value FROM public.notifications_id_seq;
```
Expected: 字段、索引齐全;序列 `last_value = 10000`
- [ ] **Step 4: 回归检查**CLAUDE.md 规范)
- 确认未影响其他表:`SELECT tablename FROM pg_tables WHERE schemaname='public' AND tablename NOT IN ('notifications', 'notification_stats') ORDER BY tablename;` 应返回原有所有表
- 序列起始值 10000 不与现有 `assets`/`users` 等 BIGSERIAL 表冲突
- [ ] **Step 5: Commit**
```bash
git add backend/migrations/2026_06_16_001_create_notifications.sql
git commit -m "feat(notification): add notifications and notification_stats tables"
```
---
## 阶段 2Proto 定义
### Task 2: 扩展 asset.proto 加 name/cover_url 字段
**Files:**
- Modify: `backend/proto/asset.proto:326-333`
- [ ] **Step 1: 修改 GetAssetForRPCResponse**
定位到 `backend/proto/asset.proto` line 326-333
```protobuf
// 获取资产信息响应内部RPC
message GetAssetForRPCResponse {
topfans.common.BaseResponse base = 1;
int64 asset_id = 2; // 资产ID
int64 owner_uid = 3; // 持有者ID
int64 star_id = 4; // 明星ID
int32 status = 5; // 状态0=Pending, 1=Active
bool is_active = 6; // 是否激活
string name = 7; // 藏品名称(用于通知)
string cover_url = 8; // 藏品封面(用于通知)
}
```
- [ ] **Step 2: 重新生成 proto Go 代码**
```bash
cd backend
./gen-swagger.sh 2>/dev/null || true # 触发 proto 生成
# 或者直接调 protoc
protoc --go_out=. --go-grpc_out=. proto/asset.proto
```
Expected: `pkg/proto/asset/asset.pb.go` 重新生成,编译通过。
- [ ] **Step 3: 回归检查**
`mcp__code-review-graph__query_graph_tool``GetAssetForRPC` 的所有调用方:
- `pattern=callers_of target=GetAssetForRPC`
- 确认 social service 现有调用点不会因新字段破坏(向后兼容,新字段为零值即可)
- [ ] **Step 4: 编译验证**
```bash
cd backend
go build ./...
```
Expected: 编译通过asset service 端还没填新字段,但新字段为零值不影响运行)。
- [ ] **Step 5: Commit**
```bash
git add backend/proto/asset.proto backend/pkg/proto/asset/
git commit -m "feat(proto): add name and cover_url to GetAssetForRPCResponse"
```
---
### Task 3: asset provider 实现填充新字段
**Files:**
- Modify: `backend/services/assetService/provider/asset_provider.go``GetAssetForRPC` 方法)
- [ ] **Step 1: 定位 GetAssetForRPC 实现**
```bash
grep -n "GetAssetForRPC" backend/services/assetService/provider/asset_provider.go
```
- [ ] **Step 2: 在 DB 查询中补充 name/cover_url**
定位到 `GetAssetForRPC` 的 SQL 查询处(一般在 `asset_repository.go` 调用处),补充 `name``cover_url` 字段到 SELECT。
参考实现位置(基于现有 `asset_repository.go` line 222 已有 `name`/`cover_url` 字段):
```go
resp := &assetPb.GetAssetForRPCResponse{
Base: baseResp,
AssetId: asset.ID,
OwnerUid: asset.OwnerUID,
StarId: asset.StarID,
Status: int32(asset.Status),
IsActive: asset.Status == 1,
Name: asset.Name, // 新增
CoverUrl: asset.CoverURL, // 新增
}
```
- [ ] **Step 3: 编译验证**
```bash
cd backend
go build ./services/assetService/...
```
Expected: 编译通过。
- [ ] **Step 4: 单元测试(如有 GetAssetForRPC 测试用例)**
```bash
cd backend
go test ./services/assetService/repository/ -run TestGetAssetForRPC -v
```
Expected: 既有测试通过;如有测试可加 case 验证 `name`/`cover_url` 返回正确。
- [ ] **Step 5: 回归检查**
`query_graph pattern=callers_of target=GetAssetForRPC` 再次确认asset service 启动一次端到端验证:
```bash
cd backend
./start.sh assetService # 或部署后健康检查
curl http://localhost:20003/health
```
- [ ] **Step 6: Commit**
```bash
git add backend/services/assetService/
git commit -m "feat(assetService): populate name and cover_url in GetAssetForRPC"
```
---
### Task 4: 创建 notification.proto
**Files:**
- Create: `backend/proto/notification.proto`
- [ ] **Step 1: 写 proto 文件**
```protobuf
syntax = "proto3";
package topfans.notification;
option go_package = "github.com/topfans/backend/pkg/proto/notification;notification";
import "proto/common.proto";
import "google/api/annotations.proto";
import "google/protobuf/struct.proto";
service NotificationService {
rpc CreateNotification(CreateNotificationRequest) returns (CreateNotificationResponse) {
option (google.api.http) = {
post: "/internal/v1/notifications"
body: "*"
};
}
rpc GetNotifications(GetNotificationsRequest) returns (GetNotificationsResponse) {
option (google.api.http) = { get: "/api/v1/notifications" };
}
rpc GetUnreadCount(GetUnreadCountRequest) returns (GetUnreadCountResponse) {
option (google.api.http) = { get: "/api/v1/notifications/unread-count" };
}
rpc MarkAsRead(MarkAsReadRequest) returns (MarkAsReadResponse) {
option (google.api.http) = { post: "/api/v1/notifications/{id}/read" };
}
rpc MarkAsReadByTarget(MarkAsReadByTargetRequest) returns (MarkAsReadByTargetResponse) {
option (google.api.http) = { post: "/api/v1/notifications/targets/{target_id}/read" };
}
rpc MarkAllAsRead(MarkAllAsReadRequest) returns (MarkAllAsReadResponse) {
option (google.api.http) = { post: "/api/v1/notifications/read-all" };
}
rpc DeleteNotification(DeleteNotificationRequest) returns (DeleteNotificationResponse) {
option (google.api.http) = { delete: "/api/v1/notifications/{id}" };
}
rpc DeleteByTarget(DeleteByTargetRequest) returns (DeleteByTargetResponse) {
option (google.api.http) = { delete: "/api/v1/notifications/targets/{target_id}" };
}
}
message Notification {
int64 id = 1; int64 user_id = 2; int64 star_id = 3;
string type = 4; string title = 5; string content = 6;
google.protobuf.Struct data = 7;
bool is_read = 8; int64 created_at = 9; int64 read_at = 10;
// 列表层聚合:仅 type=like 且按 target_id 聚合时返回
bool aggregated = 11;
int32 total_count = 12;
repeated ActorPreview actors = 13;
int64 target_id = 14;
}
message ActorPreview {
int64 user_id = 1; string nickname = 2; string avatar = 3;
int64 liked_at = 4;
}
message CreateNotificationRequest {
int64 user_id = 1; int64 star_id = 2;
string type = 3; string title = 4; string content = 5;
google.protobuf.Struct data = 6;
}
message CreateNotificationResponse {
topfans.common.BaseResponse base = 1;
int64 id = 2;
}
message GetNotificationsRequest {
string type = 1; // like / system / activity / 空=全部
string tab = 2; // today / history / 空=全部
int32 page = 3; int32 page_size = 4;
}
message GetNotificationsResponse {
topfans.common.BaseResponse base = 1;
repeated Notification items = 2;
int64 total = 3;
int32 page = 4; int32 page_size = 5;
}
message GetUnreadCountRequest {}
message UnreadCount {
int32 like = 1; int32 system = 2; int32 activity = 3; int32 total = 4;
}
message GetUnreadCountResponse {
topfans.common.BaseResponse base = 1;
UnreadCount counts = 2;
}
message MarkAsReadRequest { int64 id = 1; }
message MarkAsReadResponse { topfans.common.BaseResponse base = 1; }
message MarkAsReadByTargetRequest { int64 target_id = 1; }
message MarkAsReadByTargetResponse {
topfans.common.BaseResponse base = 1; int32 affected = 2;
}
message MarkAllAsReadRequest { string type = 1; }
message MarkAllAsReadResponse {
topfans.common.BaseResponse base = 1; int32 affected = 2;
}
message DeleteNotificationRequest { int64 id = 1; }
message DeleteNotificationResponse { topfans.common.BaseResponse base = 1; }
message DeleteByTargetRequest { int64 target_id = 1; }
message DeleteByTargetResponse {
topfans.common.BaseResponse base = 1; int32 affected = 2;
}
```
- [ ] **Step 2: 重新生成 Go 代码**
```bash
cd backend
protoc --go_out=. --go-grpc_out=. proto/notification.proto
# 或
make proto
```
Expected: `pkg/proto/notification/notification.pb.go` 生成成功。
- [ ] **Step 3: 编译验证**
```bash
cd backend
go build ./pkg/proto/notification/...
```
Expected: 编译通过。
- [ ] **Step 4: Commit**
```bash
git add backend/proto/notification.proto backend/pkg/proto/notification/
git commit -m "feat(proto): add notification service proto definition"
```
---
## 阶段 3Notification ServiceGo 端)
### Task 5: 创建 notification service 配置文件
**Files:**
- Create: `backend/services/notificationService/configs/config.yaml`
- [ ] **Step 1: 复制 socialService 的 config.yaml 模板**
```bash
cp backend/services/socialService/configs/config.yaml \
backend/services/notificationService/configs/config.yaml
```
- [ ] **Step 2: 修改服务名、端口、数据库名**
参考 `socialService/configs/config.yaml` 已有结构,修改:
- `service.name: notification-service`
- `service.port: 20010`
- `database.dbname: top-fans`(共享数据库)
- [ ] **Step 3: Commit**
```bash
git add backend/services/notificationService/configs/
git commit -m "feat(notificationService): add service config"
```
---
### Task 6: 创建 notification data model
**Files:**
- Create: `backend/services/notificationService/model/notification.go`
- [ ] **Step 1: 写 model**
```go
package model
import "time"
// Notification 通知数据模型
type Notification struct {
ID int64 `json:"id" gorm:"primaryKey;column:id"`
UserID int64 `json:"user_id" gorm:"column:user_id;not null;index"`
StarID int64 `json:"star_id" gorm:"column:star_id;not null"`
Type string `json:"type" gorm:"column:type;not null;size:20"`
Title string `json:"title" gorm:"column:title;not null;size:200"`
Content string `json:"content" gorm:"column:content;size:500"`
Data string `json:"data" gorm:"column:data;type:jsonb"` // JSONB stored as string
IsRead bool `json:"is_read" gorm:"column:is_read;not null;default:false"`
IsDeleted bool `json:"is_deleted" gorm:"column:is_deleted;not null;default:false"`
CreatedAt int64 `json:"created_at" gorm:"column:created_at;not null"`
ReadAt int64 `json:"read_at" gorm:"column:read_at"`
}
// TableName 表名
func (Notification) TableName() string {
return "public.notifications"
}
// NotificationStats 通知统计模型
type NotificationStats struct {
UserID int64 `json:"user_id" gorm:"primaryKey;column:user_id"`
StarID int64 `json:"star_id" gorm:"primaryKey;column:star_id"`
LikeUnreadCount int `json:"like_unread_count" gorm:"column:like_unread_count;not null;default:0"`
SystemUnreadCount int `json:"system_unread_count" gorm:"column:system_unread_count;not null;default:0"`
ActivityUnreadCount int `json:"activity_unread_count" gorm:"column:activity_unread_count;not null;default:0"`
TotalUnreadCount int `json:"total_unread_count" gorm:"column:total_unread_count;not null;default:0"`
UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;not null"`
}
// TableName 表名
func (NotificationStats) TableName() string {
return "public.notification_stats"
}
// ActorPreview 列表层聚合时的 actor 预览
type ActorPreview struct {
UserID int64 `json:"user_id"`
Nickname string `json:"nickname"`
Avatar string `json:"avatar"`
LikedAt int64 `json:"liked_at"`
}
// AggregatedNotification 聚合查询结果type=like 时返回)
type AggregatedNotification struct {
ID int64 `json:"id"` // 原始第一条 id
UserID int64 `json:"user_id"`
StarID int64 `json:"star_id"`
Type string `json:"type"`
Title string `json:"title"`
Content string `json:"content"`
TargetID int64 `json:"target_id"`
IsRead bool `json:"is_read"`
CreatedAt int64 `json:"created_at"`
ReadAt int64 `json:"read_at"`
TotalCount int32 `json:"total_count"`
Actors []ActorPreview `json:"actors"`
Data string `json:"data"`
}
```
- [ ] **Step 2: Commit**
```bash
git add backend/services/notificationService/model/
git commit -m "feat(notificationService): add data models"
```
---
### Task 7: 创建 notification repository仓储层
**Files:**
- Create: `backend/services/notificationService/repository/notification_repository.go`
- [ ] **Step 1: 写仓储实现**
```go
package repository
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/notificationService/model"
"go.uber.org/zap"
)
type NotificationRepository struct {
db *database.DB
}
func NewNotificationRepository(db *database.DB) *NotificationRepository {
return &NotificationRepository{db: db}
}
// Create 插入通知(事务内调用)
func (r *NotificationRepository) Create(ctx context.Context, tx *sql.Tx, n *model.Notification) (int64, error) {
var id int64
err := tx.QueryRowContext(ctx, `
INSERT INTO public.notifications
(user_id, star_id, type, title, content, data, is_read, is_deleted, created_at, read_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING id
`, n.UserID, n.StarID, n.Type, n.Title, n.Content, n.Data, n.IsRead, n.IsDeleted, n.CreatedAt, n.ReadAt).Scan(&id)
if err != nil {
logger.Logger.Error("failed to insert notification", zap.Error(err))
return 0, fmt.Errorf("insert notification: %w", err)
}
return id, nil
}
// ListSystemActivity 列出 system / activity 通知(不聚合)
func (r *NotificationRepository) ListSystemActivity(ctx context.Context, userID, starID int64, ntype, tab string, page, pageSize int) ([]*model.Notification, int64, error) {
args := []interface{}{userID, starID, ntype}
where := "user_id = $1 AND star_id = $2 AND type = $3 AND is_deleted = FALSE"
if tab == "today" {
startOfDay := startOfTodayMs()
where += " AND created_at >= $4"
args = append(args, startOfDay)
} else if tab == "history" {
startOfDay := startOfTodayMs()
where += " AND created_at < $4"
args = append(args, startOfDay)
}
// count
var total int64
if err := r.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM public.notifications WHERE "+where, args...).Scan(&total); err != nil {
return nil, 0, err
}
// list
offset := (page - 1) * pageSize
args = append(args, pageSize, offset)
limitIdx := len(args) - 1
offsetIdx := len(args)
query := fmt.Sprintf(`
SELECT id, user_id, star_id, type, title, COALESCE(content,''), data,
is_read, is_deleted, created_at, COALESCE(read_at, 0)
FROM public.notifications
WHERE %s
ORDER BY created_at DESC
LIMIT $%d OFFSET $%d
`, where, limitIdx, offsetIdx)
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
items := make([]*model.Notification, 0, pageSize)
for rows.Next() {
n := &model.Notification{}
if err := rows.Scan(&n.ID, &n.UserID, &n.StarID, &n.Type, &n.Title, &n.Content, &n.Data, &n.IsRead, &n.IsDeleted, &n.CreatedAt, &n.ReadAt); err != nil {
return nil, 0, err
}
items = append(items, n)
}
return items, total, nil
}
// ListLikesAggregated 列出 like 通知(按 target_id 聚合)
func (r *NotificationRepository) ListLikesAggregated(ctx context.Context, userID, starID int64, tab string, page, pageSize int) ([]*model.AggregatedNotification, int64, error) {
extra := ""
args := []interface{}{userID, starID}
if tab == "today" {
extra = " AND MAX(created_at) >= $3"
args = append(args, startOfTodayMs())
} else if tab == "history" {
extra = " AND MAX(created_at) < $3"
args = append(args, startOfTodayMs())
}
// count: distinct target_id 数
var total int64
countQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM (
SELECT (data->>'target_id')::bigint AS target_id
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
GROUP BY target_id
) t
`)
if err := r.db.QueryRowContext(ctx, countQuery, userID, starID).Scan(&total); err != nil {
return nil, 0, err
}
// 聚合查询:返回 target_id、total_count、最新时间、是否全部已读、actors 列表(前3)
// 使用 json_agg + 子查询分两步
offset := (page - 1) * pageSize
args = append(args, pageSize, offset)
limitIdx, offsetIdx := len(args)-1, len(args)
query := fmt.Sprintf(`
WITH agg AS (
SELECT
(data->>'target_id')::bigint AS target_id,
COUNT(*) AS total_count,
MAX(created_at) AS latest_at,
BOOL_AND(is_read) AS all_read
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
GROUP BY (data->>'target_id')
),
first_notif AS (
SELECT DISTINCT ON ((data->>'target_id')::bigint)
(data->>'target_id')::bigint AS target_id,
id, title, content, data, read_at
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
ORDER BY (data->>'target_id')::bigint, created_at DESC
),
actors AS (
SELECT (data->>'target_id')::bigint AS target_id,
json_agg(json_build_object(
'user_id', (data->>'actor_id')::bigint,
'nickname', COALESCE(data->>'actor_name', ''),
'avatar', COALESCE(data->>'actor_avatar', ''),
'liked_at', created_at
) ORDER BY created_at DESC) AS actor_previews
FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like' AND is_deleted=FALSE
GROUP BY (data->>'target_id')
)
SELECT
a.target_id, a.total_count, a.latest_at, a.all_read,
f.id, f.title, f.content, f.data, f.read_at,
COALESCE(act.actor_previews, '[]'::json) AS actor_previews
FROM agg a
JOIN first_notif f ON f.target_id = a.target_id
LEFT JOIN actors act ON act.target_id = a.target_id
ORDER BY a.latest_at DESC
LIMIT $%d OFFSET $%d
`, limitIdx, offsetIdx)
if extra != "" {
// tab 时间条件插在 WHERE 之外(已经在 CTE 内过滤 MAX
}
rows, err := r.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
items := make([]*model.AggregatedNotification, 0, pageSize)
for rows.Next() {
var item model.AggregatedNotification
var actorLikesJSON []byte
if err := rows.Scan(
&item.TargetID, &item.TotalCount, &item.CreatedAt, &item.IsRead,
&item.ID, &item.Title, &item.Content, &item.Data, &item.ReadAt,
&actorLikesJSON,
); err != nil {
return nil, 0, err
}
// actor_likes 暂存于 Data 字段,由 service 层做 user 信息补全
item.UserID = userID
item.StarID = starID
item.Type = "like"
item.Actors = parseActorLikes(actorLikesJSON) // 简易解析
items = append(items, &item)
}
return items, total, nil
}
// 注意actor 昵称/头像已在 social service 写入通知时塞入 data 字段
// data->>'actor_name' / data->>'actor_avatar'),聚合查询直接从 data 取
// 无需 notification service 再调 user service避免重复 RPC
// MarkAsReadByID 标单条已读
func (r *NotificationRepository) MarkAsReadByID(ctx context.Context, tx *sql.Tx, userID, starID, id int64, now int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_read = TRUE, read_at = $4
WHERE id = $1 AND user_id = $2 AND star_id = $3 AND is_read = FALSE AND is_deleted = FALSE
`, id, userID, starID, now)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// MarkAsReadByTarget 标某 target 下所有未读 like 已读
func (r *NotificationRepository) MarkAsReadByTarget(ctx context.Context, tx *sql.Tx, userID, starID, targetID, now int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_read = TRUE, read_at = $5
WHERE user_id=$1 AND star_id=$2 AND type='like'
AND (data->>'target_id')::bigint = $3
AND is_read = FALSE AND is_deleted = FALSE
`, userID, starID, targetID, now)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// MarkAllAsRead 标某 type 下所有未读已读
func (r *NotificationRepository) MarkAllAsRead(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype, now int64) (int32, error) {
if ntype == "" {
// 全部类型
}
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_read = TRUE, read_at = $4
WHERE user_id=$1 AND star_id=$2 AND type=$3 AND is_read=FALSE AND is_deleted=FALSE
`, userID, starID, ntype, now)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// SoftDeleteByID 软删单条
func (r *NotificationRepository) SoftDeleteByID(ctx context.Context, tx *sql.Tx, userID, starID, id int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_deleted = TRUE
WHERE id = $1 AND user_id = $2 AND star_id = $3 AND is_deleted = FALSE
`, id, userID, starID)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// SoftDeleteByTarget 软删某 target 下所有通知
func (r *NotificationRepository) SoftDeleteByTarget(ctx context.Context, tx *sql.Tx, userID, starID, targetID int64) (int32, error) {
res, err := tx.ExecContext(ctx, `
UPDATE public.notifications
SET is_deleted = TRUE
WHERE user_id=$1 AND star_id=$2 AND type='like'
AND (data->>'target_id')::bigint = $3 AND is_deleted = FALSE
`, userID, starID, targetID)
if err != nil {
return 0, err
}
affected, _ := res.RowsAffected()
return int32(affected), nil
}
// CountUnreadByID 数某条通知是否属于该 user 且未读
func (r *NotificationRepository) GetTypeByID(ctx context.Context, tx *sql.Tx, id, userID, starID int64) (string, bool, error) {
var ntype string
var isRead bool
err := tx.QueryRowContext(ctx, `
SELECT type, is_read FROM public.notifications
WHERE id=$1 AND user_id=$2 AND star_id=$3 AND is_deleted=FALSE
`, id, userID, starID).Scan(&ntype, &isRead)
if err == sql.ErrNoRows {
return "", false, nil
}
if err != nil {
return "", false, err
}
return ntype, isRead, nil
}
// startOfTodayMs 今日 0 点毫秒时间戳
func startOfTodayMs() int64 {
now := time.Now()
return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).UnixMilli()
}
// parseActorLikes 解析 JSON 数组user_id + nickname + avatar + liked_at
// 数据来源是 social service 写入时塞入 data 字段的 actor_name/actor_avatar
func parseActorLikes(data []byte) []model.ActorPreview {
type rawActor struct {
UserID int64 `json:"user_id"`
Nickname string `json:"nickname"`
Avatar string `json:"avatar"`
LikedAt int64 `json:"liked_at"`
}
var raws []rawActor
if err := json.Unmarshal(data, &raws); err != nil {
return nil
}
out := make([]model.ActorPreview, 0, len(raws))
for _, r := range raws {
out = append(out, model.ActorPreview{
UserID: r.UserID, Nickname: r.Nickname, Avatar: r.Avatar, LikedAt: r.LikedAt,
})
}
return out
}
```
- [ ] **Step 2: 添加 import "encoding/json"**
`import` 块加 `"encoding/json"`
- [ ] **Step 3: 编译验证**
```bash
cd backend
go build ./services/notificationService/...
```
Expected: 编译通过。
- [ ] **Step 4: Commit**
```bash
git add backend/services/notificationService/repository/notification_repository.go
git commit -m "feat(notificationService): add notification repository"
```
---
### Task 8: 创建 notification stats repository
**Files:**
- Create: `backend/services/notificationService/repository/notification_stats_repository.go`
- [ ] **Step 1: 写统计仓储**
```go
package repository
import (
"context"
"database/sql"
"fmt"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
"github.com/topfans/backend/services/notificationService/model"
"go.uber.org/zap"
)
type NotificationStatsRepository struct {
db *database.DB
}
func NewNotificationStatsRepository(db *database.DB) *NotificationStatsRepository {
return &NotificationStatsRepository{db: db}
}
// IncrementByType 在事务内对指定 type 未读数 +1total +1
func (r *NotificationStatsRepository) IncrementByType(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype string, now int64) error {
var col string
switch ntype {
case "like":
col = "like_unread_count"
case "system":
col = "system_unread_count"
case "activity":
col = "activity_unread_count"
default:
return fmt.Errorf("invalid notification type: %s", ntype)
}
query := fmt.Sprintf(`
INSERT INTO public.notification_stats (user_id, star_id, %s, total_unread_count, updated_at)
VALUES ($1, $2, 1, 1, $3)
ON CONFLICT (user_id, star_id) DO UPDATE SET
%s = public.notification_stats.%s + 1,
total_unread_count = public.notification_stats.total_unread_count + 1,
updated_at = $3
`, col, col, col)
_, err := tx.ExecContext(ctx, query, userID, starID, now)
if err != nil {
logger.Logger.Error("failed to increment notification stats", zap.Error(err))
return err
}
return nil
}
// DecrementByType 在事务内对指定 type 未读数 -Ntotal -N
func (r *NotificationStatsRepository) DecrementByType(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype string, delta int, now int64) error {
if delta <= 0 {
return nil
}
var col string
switch ntype {
case "like":
col = "like_unread_count"
case "system":
col = "system_unread_count"
case "activity":
col = "activity_unread_count"
default:
return fmt.Errorf("invalid notification type: %s", ntype)
}
query := fmt.Sprintf(`
UPDATE public.notification_stats
SET %s = GREATEST(0, %s - $3),
total_unread_count = GREATEST(0, total_unread_count - $3),
updated_at = $4
WHERE user_id = $1 AND star_id = $2
`, col, col)
_, err := tx.ExecContext(ctx, query, userID, starID, delta, now)
if err != nil {
logger.Logger.Error("failed to decrement notification stats", zap.Error(err))
return err
}
return nil
}
// ResetByType 把指定 type 未读数置 0
func (r *NotificationStatsRepository) ResetByType(ctx context.Context, tx *sql.Tx, userID, starID int64, ntype string, now int64) error {
var col string
switch ntype {
case "like":
col = "like_unread_count"
case "system":
col = "system_unread_count"
case "activity":
col = "activity_unread_count"
case "":
// 全部置 0
_, err := tx.ExecContext(ctx, `
UPDATE public.notification_stats
SET like_unread_count = 0, system_unread_count = 0,
activity_unread_count = 0, total_unread_count = 0,
updated_at = $3
WHERE user_id = $1 AND star_id = $2
`, userID, starID, now)
return err
default:
return fmt.Errorf("invalid notification type: %s", ntype)
}
query := fmt.Sprintf(`
UPDATE public.notification_stats
SET %s = 0,
total_unread_count = GREATEST(0, total_unread_count - %s),
updated_at = $3
WHERE user_id = $1 AND star_id = $2
`, col, col)
_, err := tx.ExecContext(ctx, query, userID, starID, now)
return err
}
// Get 取该 user+star 的统计
func (r *NotificationStatsRepository) Get(ctx context.Context, userID, starID int64) (*model.NotificationStats, error) {
s := &model.NotificationStats{}
err := r.db.QueryRowContext(ctx, `
SELECT user_id, star_id, like_unread_count, system_unread_count,
activity_unread_count, total_unread_count, updated_at
FROM public.notification_stats
WHERE user_id = $1 AND star_id = $2
`, userID, starID).Scan(&s.UserID, &s.StarID, &s.LikeUnreadCount, &s.SystemUnreadCount,
&s.ActivityUnreadCount, &s.TotalUnreadCount, &s.UpdatedAt)
if err == sql.ErrNoRows {
return &model.NotificationStats{UserID: userID, StarID: starID}, nil
}
if err != nil {
return nil, err
}
return s, nil
}
```
- [ ] **Step 2: 编译验证**
```bash
cd backend
go build ./services/notificationService/...
```
Expected: 编译通过。
- [ ] **Step 3: Commit**
```bash
git add backend/services/notificationService/repository/notification_stats_repository.go
git commit -m "feat(notificationService): add notification stats repository"
```
---
### Task 9: 写 notification repository 单元测试
**Files:**
- Create: `backend/services/notificationService/repository/notification_repository_test.go`
- [ ] **Step 1: 写测试**
```go
package repository_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/services/notificationService/model"
"github.com/topfans/backend/services/notificationService/repository"
)
// 依赖真实测试 DB环境变量 TEST_DB_HOST 等
func setupTestDB(t *testing.T) *database.DB {
t.Helper()
db, err := database.NewFromEnv()
if err != nil {
t.Skipf("skipping: test DB not available: %v", err)
}
return db
}
func TestNotificationRepository_CreateAndList(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewNotificationRepository(db)
statsRepo := repository.NewNotificationStatsRepository(db)
ctx := context.Background()
userID, starID := int64(990001), int64(1)
cleanup := func() {
db.ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1 AND star_id=$2`, userID, starID)
db.ExecContext(ctx, `DELETE FROM public.notification_stats WHERE user_id=$1 AND star_id=$2`, userID, starID)
}
cleanup()
defer cleanup()
now := time.Now().UnixMilli()
tx, _ := db.BeginTx(ctx, nil)
id, err := repo.Create(ctx, tx, &model.Notification{
UserID: userID, StarID: starID, Type: "like",
Title: "新点赞", Content: "test", Data: `{}`,
CreatedAt: now,
})
assert.NoError(t, err)
assert.Greater(t, id, int64(0))
err = statsRepo.IncrementByType(ctx, tx, userID, starID, "like", now)
assert.NoError(t, err)
tx.Commit()
// list system/activity
items, total, err := repo.ListSystemActivity(ctx, userID, starID, "like", "", 1, 20)
assert.NoError(t, err)
assert.Equal(t, int64(1), total)
assert.Len(t, items, 1)
assert.Equal(t, id, items[0].ID)
}
func TestNotificationRepository_LikeAggregation(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewNotificationRepository(db)
ctx := context.Background()
userID, starID, targetID := int64(990002), int64(1), int64(8888)
cleanup := func() {
db.ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1 AND star_id=$2`, userID, starID)
}
cleanup()
defer cleanup()
now := time.Now().UnixMilli()
for i := 0; i < 5; i++ {
tx, _ := db.BeginTx(ctx, nil)
_, err := repo.Create(ctx, tx, &model.Notification{
UserID: userID, StarID: starID, Type: "like",
Title: "新点赞", Data: `{"target_id": 8888, "actor_id": 1000}`,
CreatedAt: now + int64(i)*1000,
})
assert.NoError(t, err)
tx.Commit()
}
items, total, err := repo.ListLikesAggregated(ctx, userID, starID, "", 1, 20)
assert.NoError(t, err)
assert.Equal(t, int64(1), total)
assert.Len(t, items, 1)
assert.Equal(t, int32(5), items[0].TotalCount)
assert.Equal(t, targetID, items[0].TargetID)
}
func TestNotificationRepository_MarkAsReadByTarget(t *testing.T) {
db := setupTestDB(t)
repo := repository.NewNotificationRepository(db)
statsRepo := repository.NewNotificationStatsRepository(db)
ctx := context.Background()
userID, starID, targetID := int64(990003), int64(1), int64(7777)
cleanup := func() {
db.ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1 AND star_id=$2`, userID, starID)
db.ExecContext(ctx, `DELETE FROM public.notification_stats WHERE user_id=$1 AND star_id=$2`, userID, starID)
}
cleanup()
defer cleanup()
now := time.Now().UnixMilli()
for i := 0; i < 3; i++ {
tx, _ := db.BeginTx(ctx, nil)
_, _ = repo.Create(ctx, tx, &model.Notification{
UserID: userID, StarID: starID, Type: "like",
Title: "x", Data: `{"target_id": 7777, "actor_id": 1}`,
CreatedAt: now + int64(i)*100,
})
_ = statsRepo.IncrementByType(ctx, tx, userID, starID, "like", now)
tx.Commit()
}
tx, _ := db.BeginTx(ctx, nil)
affected, err := repo.MarkAsReadByTarget(ctx, tx, userID, starID, targetID, now)
_ = statsRepo.DecrementByType(ctx, tx, userID, starID, "like", int(affected), now)
tx.Commit()
assert.NoError(t, err)
assert.Equal(t, int32(3), affected)
}
```
- [ ] **Step 2: 运行测试**
```bash
cd backend
go test ./services/notificationService/repository/ -v -run TestNotificationRepository
```
Expected: 全部 PASS前提是测试 DB 可用,否则 skip
- [ ] **Step 3: Commit**
```bash
git add backend/services/notificationService/repository/notification_repository_test.go
git commit -m "test(notificationService): add notification repository tests"
```
---
### Task 10: 创建 notification service 业务层
**Files:**
- Create: `backend/services/notificationService/service/notification_service.go`
- [ ] **Step 1: 写业务层**
```go
package service
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/topfans/backend/pkg/database"
"github.com/topfans/backend/pkg/logger"
notifPb "github.com/topfans/backend/pkg/proto/notification"
pbCommon "github.com/topfans/backend/pkg/proto/common"
"github.com/topfans/backend/services/notificationService/model"
"github.com/topfans/backend/services/notificationService/repository"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/structpb"
)
type NotificationService struct {
db *database.DB
notifRepo *repository.NotificationRepository
statsRepo *repository.NotificationStatsRepository
}
func NewNotificationService(db *database.DB) *NotificationService {
return &NotificationService{
db: db,
notifRepo: repository.NewNotificationRepository(db),
statsRepo: repository.NewNotificationStatsRepository(db),
}
}
// CreateNotification 创建通知事务INSERT + UPSERT stats
func (s *NotificationService) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
// 参数校验
if req.UserId <= 0 || req.StarId <= 0 {
return errResp(codes.InvalidArgument, "user_id and star_id required"), nil
}
if req.Type == "" || req.Title == "" {
return errResp(codes.InvalidArgument, "type and title required"), nil
}
if len(req.Title) > 200 || len(req.Content) > 500 {
return errResp(codes.InvalidArgument, "title <= 200, content <= 500"), nil
}
if req.Type != "like" && req.Type != "system" && req.Type != "activity" {
return errResp(codes.InvalidArgument, "type must be like/system/activity"), nil
}
// 序列化 data
dataStr := "{}"
if req.Data != nil {
b, err := json.Marshal(req.Data.AsMap())
if err != nil {
return errResp(codes.InvalidArgument, "data marshal failed"), nil
}
dataStr = string(b)
}
now := time.Now().UnixMilli()
n := &model.Notification{
UserID: req.UserId, StarID: req.StarId, Type: req.Type,
Title: req.Title, Content: req.Content, Data: dataStr,
CreatedAt: now,
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return errResp(codes.Internal, "begin tx failed"), nil
}
defer tx.Rollback()
id, err := s.notifRepo.Create(ctx, tx, n)
if err != nil {
return errResp(codes.Internal, "create failed"), nil
}
if err := s.statsRepo.IncrementByType(ctx, tx, req.UserId, req.StarId, req.Type, now); err != nil {
return errResp(codes.Internal, "stats update failed"), nil
}
if err := tx.Commit(); err != nil {
return errResp(codes.Internal, "commit failed"), nil
}
logger.Logger.Info("notification created",
zap.Int64("id", id), zap.Int64("user_id", req.UserId),
zap.Int64("star_id", req.StarId), zap.String("type", req.Type))
return &notifPb.CreateNotificationResponse{
Base: okBase(), Id: id,
}, nil
}
// GetNotifications 列表查询
func (s *NotificationService) GetNotifications(ctx context.Context, userID, starID int64, ntype, tab string, page, pageSize int32) (*notifPb.GetNotificationsResponse, error) {
if page <= 0 { page = 1 }
if pageSize <= 0 { pageSize = 20 }
if pageSize > 100 { pageSize = 100 }
var items []*notifPb.Notification
var total int64
var err error
if ntype == "like" {
aggItems, aggTotal, e := s.notifRepo.ListLikesAggregated(ctx, userID, starID, tab, int(page), int(pageSize))
if e != nil {
return errResp(codes.Internal, "list likes failed"), nil
}
total = aggTotal
for _, a := range aggItems {
// 生成聚合标题:"{actor_names} 等 N 人赞了你的《{asset_title}》"
// asset_title 在 data 字段里social service 写入时已塞入)
assetTitle := extractAssetTitle(a.Data)
a.Title = buildAggregatedLikeTitle(a.Actors, a.TotalCount, assetTitle)
items = append(items, aggToProto(a))
}
} else {
// system / activity / 空 = 全部
if ntype == "" {
// 一次查 system 和 activity合并不优雅先支持 type 非空场景;空 type 暂不实现,留待 V1.1
return errResp(codes.InvalidArgument, "type is required (like/system/activity)"), nil
}
rows, t, e := s.notifRepo.ListSystemActivity(ctx, userID, starID, ntype, tab, int(page), int(pageSize))
if e != nil {
return errResp(codes.Internal, "list failed"), nil
}
total = t
for _, r := range rows {
items = append(items, rawToProto(r))
}
}
return &notifPb.GetNotificationsResponse{
Base: okBase(),
Items: items, Total: total,
Page: page, PageSize: pageSize,
}, nil
}
// GetUnreadCount
func (s *NotificationService) GetUnreadCount(ctx context.Context, userID, starID int64) (*notifPb.GetUnreadCountResponse, error) {
s2, err := s.statsRepo.Get(ctx, userID, starID)
if err != nil {
return errResp(codes.Internal, "get stats failed"), nil
}
return &notifPb.GetUnreadCountResponse{
Base: okBase(),
Counts: &notifPb.UnreadCount{
Like: int32(s2.LikeUnreadCount),
System: int32(s2.SystemUnreadCount),
Activity: int32(s2.ActivityUnreadCount),
Total: int32(s2.TotalUnreadCount),
},
}, nil
}
// MarkAsRead 标单条已读(自动判断 type走对应 stats
func (s *NotificationService) MarkAsRead(ctx context.Context, userID, starID, id, now int64) (*notifPb.MarkAsReadResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
ntype, isRead, err := s.notifRepo.GetTypeByID(ctx, tx, id, userID, starID)
if err != nil {
return errResp(codes.Internal, "lookup failed"), nil
}
if ntype == "" {
return errResp(codes.NotFound, "notification not found"), nil
}
if isRead {
return &notifPb.MarkAsReadResponse{Base: okBase()}, nil
}
affected, err := s.notifRepo.MarkAsReadByID(ctx, tx, userID, starID, id, now)
if err != nil {
return errResp(codes.Internal, "mark failed"), nil
}
if affected > 0 {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, ntype, int(affected), now)
}
tx.Commit()
return &notifPb.MarkAsReadResponse{Base: okBase()}, nil
}
// MarkAsReadByTarget 标某 target 下所有未读 like 已读
func (s *NotificationService) MarkAsReadByTarget(ctx context.Context, userID, starID, targetID, now int64) (*notifPb.MarkAsReadByTargetResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
affected, err := s.notifRepo.MarkAsReadByTarget(ctx, tx, userID, starID, targetID, now)
if err != nil {
return errResp(codes.Internal, "mark failed"), nil
}
if affected > 0 {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, "like", int(affected), now)
}
tx.Commit()
return &notifPb.MarkAsReadByTargetResponse{Base: okBase(), Affected: affected}, nil
}
// MarkAllAsRead
func (s *NotificationService) MarkAllAsRead(ctx context.Context, userID, starID int64, ntype string, now int64) (*notifPb.MarkAllAsReadResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
affected, err := s.notifRepo.MarkAllAsRead(ctx, tx, userID, starID, ntype, now)
if err != nil {
return errResp(codes.Internal, "mark all failed"), nil
}
if affected > 0 && ntype != "" {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, ntype, int(affected), now)
} else if ntype == "" {
_ = s.statsRepo.ResetByType(ctx, tx, userID, starID, "", now)
}
tx.Commit()
return &notifPb.MarkAllAsReadResponse{Base: okBase(), Affected: affected}, nil
}
// DeleteNotification 软删单条
func (s *NotificationService) DeleteNotification(ctx context.Context, userID, starID, id, now int64) (*notifPb.DeleteNotificationResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
ntype, isRead, err := s.notifRepo.GetTypeByID(ctx, tx, id, userID, starID)
if err != nil {
return errResp(codes.Internal, "lookup failed"), nil
}
if ntype == "" {
return errResp(codes.NotFound, "notification not found"), nil
}
affected, err := s.notifRepo.SoftDeleteByID(ctx, tx, userID, starID, id)
if err != nil {
return errResp(codes.Internal, "delete failed"), nil
}
if affected > 0 && !isRead {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, ntype, int(affected), now)
}
tx.Commit()
return &notifPb.DeleteNotificationResponse{Base: okBase()}, nil
}
// DeleteByTarget 软删某 target 下所有 like
func (s *NotificationService) DeleteByTarget(ctx context.Context, userID, starID, targetID, now int64) (*notifPb.DeleteByTargetResponse, error) {
tx, _ := s.db.BeginTx(ctx, nil)
defer tx.Rollback()
// 先查未读数
var unreadCount int
err := tx.QueryRowContext(ctx, `
SELECT COUNT(*) FROM public.notifications
WHERE user_id=$1 AND star_id=$2 AND type='like'
AND (data->>'target_id')::bigint = $3
AND is_read=FALSE AND is_deleted=FALSE
`, userID, starID, targetID).Scan(&unreadCount)
if err != nil {
return errResp(codes.Internal, "count failed"), nil
}
affected, err := s.notifRepo.SoftDeleteByTarget(ctx, tx, userID, starID, targetID)
if err != nil {
return errResp(codes.Internal, "delete failed"), nil
}
if unreadCount > 0 {
_ = s.statsRepo.DecrementByType(ctx, tx, userID, starID, "like", unreadCount, now)
}
tx.Commit()
return &notifPb.DeleteByTargetResponse{Base: okBase(), Affected: affected}, nil
}
// ===== helpers =====
func okBase() *pbCommon.BaseResponse {
return &pbCommon.BaseResponse{Code: uint32(codes.OK), Message: "success", Timestamp: time.Now().UnixMilli()}
}
func errResp(c codes.Code, msg string) *notifPb.CreateNotificationResponse {
return &notifPb.CreateNotificationResponse{Base: &pbCommon.BaseResponse{Code: uint32(c), Message: msg, Timestamp: time.Now().UnixMilli()}}
}
func aggToProto(a *model.AggregatedNotification) *notifPb.Notification {
// 把 Data JSON 字符串反序列化为 Struct
dataStruct, _ := structpb.NewStruct(nil)
if a.Data != "" {
var m map[string]interface{}
if err := json.Unmarshal([]byte(a.Data), &m); err == nil {
dataStruct, _ = structpb.NewStruct(m)
}
}
actors := make([]*notifPb.ActorPreview, 0, len(a.Actors))
for _, x := range a.Actors {
actors = append(actors, &notifPb.ActorPreview{
UserId: x.UserID, Nickname: x.Nickname, Avatar: x.Avatar, LikedAt: x.LikedAt,
})
}
return &notifPb.Notification{
Id: a.ID, UserId: a.UserID, StarId: a.StarID, Type: a.Type,
Title: a.Title, Content: a.Content, Data: dataStruct,
IsRead: a.IsRead, CreatedAt: a.CreatedAt, ReadAt: a.ReadAt,
Aggregated: true, TotalCount: a.TotalCount, Actors: actors, TargetId: a.TargetID,
}
}
func rawToProto(n *model.Notification) *notifPb.Notification {
dataStruct, _ := structpb.NewStruct(nil)
if n.Data != "" {
var m map[string]interface{}
if err := json.Unmarshal([]byte(n.Data), &m); err == nil {
dataStruct, _ = structpb.NewStruct(m)
}
}
return &notifPb.Notification{
Id: n.ID, UserId: n.UserID, StarId: n.StarID, Type: n.Type,
Title: n.Title, Content: n.Content, Data: dataStruct,
IsRead: n.IsRead, CreatedAt: n.CreatedAt, ReadAt: n.ReadAt,
}
}
// extractAssetTitle 从 data JSON 字符串中取 asset_title
func extractAssetTitle(dataStr string) string {
if dataStr == "" {
return ""
}
var m map[string]interface{}
if err := json.Unmarshal([]byte(dataStr), &m); err != nil {
return ""
}
if v, ok := m["asset_title"].(string); ok {
return v
}
return ""
}
// buildAggregatedLikeTitle 构造 like 聚合标题
// 1 个 actor: "{name} 赞了你的《{title}》"
// 2 个 actor: "{name1}、{name2} 赞了你的《{title}》"
// 3+ 个 actor: "{name1}、{name2} 等 N 人赞了你的《{title}》"
func buildAggregatedLikeTitle(actors []model.ActorPreview, total int32, assetTitle string) string {
names := make([]string, 0, 3)
for i, a := range actors {
if i >= 3 {
break
}
name := a.Nickname
if name == "" {
name = fmt.Sprintf("用户%d", a.UserID)
}
names = append(names, name)
}
title := assetTitle
if title == "" {
title = "你的藏品"
}
switch len(names) {
case 0:
return fmt.Sprintf("有 %d 人赞了你的《%s》", total, title)
case 1:
return fmt.Sprintf("%s 赞了你的《%s》", names[0], title)
case 2:
return fmt.Sprintf("%s、%s 赞了你的《%s》", names[0], names[1], title)
default:
return fmt.Sprintf("%s、%s 等 %d 人赞了你的《%s》", names[0], names[1], total, title)
}
}
```
- [ ] **Step 2: 编译验证**
```bash
cd backend
go build ./services/notificationService/...
```
Expected: 编译通过(可能需要 import `"google.golang.org/protobuf/types/known/structpb"`)。
- [ ] **Step 3: Commit**
```bash
git add backend/services/notificationService/service/notification_service.go
git commit -m "feat(notificationService): add notification service business layer"
```
---
### Task 11: 写 notification service 单元测试
**Files:**
- Create: `backend/services/notificationService/service/notification_service_test.go`
- [ ] **Step 1: 写测试**
```go
package service_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/topfans/backend/pkg/database"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"github.com/topfans/backend/services/notificationService/service"
)
func setupTestService(t *testing.T) *service.NotificationService {
t.Helper()
db, err := database.NewFromEnv()
if err != nil {
t.Skipf("test DB not available: %v", err)
}
return service.NewNotificationService(db)
}
func TestCreateNotification_Validation(t *testing.T) {
svc := setupTestService(t)
ctx := context.Background()
cases := []struct{ name string; req *notifPb.CreateNotificationRequest; expectCode uint32 }{
{"missing user", &notifPb.CreateNotificationRequest{StarId: 1, Type: "like", Title: "x"}, 3},
{"missing star", &notifPb.CreateNotificationRequest{UserId: 1, Type: "like", Title: "x"}, 3},
{"missing type", &notifPb.CreateNotificationRequest{UserId: 1, StarId: 1, Title: "x"}, 3},
{"bad type", &notifPb.CreateNotificationRequest{UserId: 1, StarId: 1, Type: "x", Title: "x"}, 3},
{"title too long", &notifPb.CreateNotificationRequest{UserId: 1, StarId: 1, Type: "like", Title: string(make([]byte, 201))}, 3},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
resp, _ := svc.CreateNotification(ctx, c.req)
assert.Equal(t, c.expectCode, resp.Base.Code)
})
}
}
func TestCreateNotification_TransactionRollback(t *testing.T) {
svc := setupTestService(t)
ctx := context.Background()
userID, starID := int64(990100), int64(1)
cleanup := func() {
svc.DB().ExecContext(ctx, `DELETE FROM public.notifications WHERE user_id=$1`, userID)
svc.DB().ExecContext(ctx, `DELETE FROM public.notification_stats WHERE user_id=$1`, userID)
}
cleanup()
defer cleanup()
// 正常创建
req := &notifPb.CreateNotificationRequest{
UserId: userID, StarId: starID, Type: "like", Title: "x", Content: "y",
}
resp, _ := svc.CreateNotification(ctx, req)
assert.Equal(t, uint32(0), resp.Base.Code)
assert.Greater(t, resp.Id, int64(0))
// 验证 stats 也写入
cntResp, _ := svc.GetUnreadCount(ctx, userID, starID)
assert.Equal(t, int32(1), cntResp.Counts.Like)
}
```
- [ ] **Step 2: 运行测试**
```bash
cd backend
go test ./services/notificationService/service/ -v
```
Expected: PASS前提是测试 DB 可用)。
- [ ] **Step 3: Commit**
```bash
git add backend/services/notificationService/service/notification_service_test.go
git commit -m "test(notificationService): add service unit tests"
```
---
### Task 12: 创建 notification providerRPC 层)
**Files:**
- Create: `backend/services/notificationService/provider/notification_provider.go`
- [ ] **Step 1: 写 provider**
```go
package provider
import (
"context"
"fmt"
"strconv"
"time"
commonPb "github.com/topfans/backend/pkg/proto/common"
"github.com/topfans/backend/pkg/logger"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"github.com/topfans/backend/services/notificationService/service"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)
type NotificationProvider struct {
notifPb.UnimplementedNotificationServiceServer
svc *service.NotificationService
}
func NewNotificationProvider(svc *service.NotificationService) *NotificationProvider {
return &NotificationProvider{svc: svc}
}
func (p *NotificationProvider) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
return p.svc.CreateNotification(ctx, req)
}
func (p *NotificationProvider) GetNotifications(ctx context.Context, req *notifPb.GetNotificationsRequest) (*notifPb.GetNotificationsResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return &notifPb.GetNotificationsResponse{Base: errBase(err)}, nil
}
return p.svc.GetNotifications(ctx, userID, starID, req.Type, req.Tab, req.Page, req.PageSize)
}
func (p *NotificationProvider) GetUnreadCount(ctx context.Context, req *notifPb.GetUnreadCountRequest) (*notifPb.GetUnreadCountResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return &notifPb.GetUnreadCountResponse{Base: errBase(err)}, nil
}
return p.svc.GetUnreadCount(ctx, userID, starID)
}
func (p *NotificationProvider) MarkAsRead(ctx context.Context, req *notifPb.MarkAsReadRequest) (*notifPb.MarkAsReadResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return &notifPb.MarkAsReadResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.MarkAsRead(ctx, userID, starID, req.Id, now)
}
func (p *NotificationProvider) MarkAsReadByTarget(ctx context.Context, req *notifPb.MarkAsReadByTargetRequest) (*notifPb.MarkAsReadByTargetResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return &notifPb.MarkAsReadByTargetResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.MarkAsReadByTarget(ctx, userID, starID, req.TargetId, now)
}
func (p *NotificationProvider) MarkAllAsRead(ctx context.Context, req *notifPb.MarkAllAsReadRequest) (*notifPb.MarkAllAsReadResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return &notifPb.MarkAllAsReadResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.MarkAllAsRead(ctx, userID, starID, req.Type, now)
}
func (p *NotificationProvider) DeleteNotification(ctx context.Context, req *notifPb.DeleteNotificationRequest) (*notifPb.DeleteNotificationResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return &notifPb.DeleteNotificationResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.DeleteNotification(ctx, userID, starID, req.Id, now)
}
func (p *NotificationProvider) DeleteByTarget(ctx context.Context, req *notifPb.DeleteByTargetRequest) (*notifPb.DeleteByTargetResponse, error) {
userID, starID, err := extractUserStarFromContext(ctx)
if err != nil {
return &notifPb.DeleteByTargetResponse{Base: errBase(err)}, nil
}
now := time.Now().UnixMilli()
return p.svc.DeleteByTarget(ctx, userID, starID, req.TargetId, now)
}
// extractUserStarFromContext 从 gRPC metadata 提取 user_id / star_id
// gateway 端通过 metadata.NewOutgoingContext 注入 x-user-id / x-star-id
// 实际格式参考 social service provider 已有实现
func extractUserStarFromContext(ctx context.Context) (int64, int64, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return 0, 0, fmt.Errorf("no metadata in context")
}
uidStrs := md.Get("x-user-id")
sidStrs := md.Get("x-star-id")
if len(uidStrs) == 0 || len(sidStrs) == 0 {
return 0, 0, fmt.Errorf("missing user_id or star_id in metadata")
}
uid, err := strconv.ParseInt(uidStrs[0], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid user_id: %w", err)
}
sid, err := strconv.ParseInt(sidStrs[0], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid star_id: %w", err)
}
return uid, sid, nil
}
func errBase(err error) *commonPb.BaseResponse {
return &commonPb.BaseResponse{Code: 13, Message: err.Error(), Timestamp: time.Now().UnixMilli()}
}
```
- [ ] **Step 2: 编译验证**
```bash
cd backend
go build ./services/notificationService/...
```
Expected: 编译通过。
- [ ] **Step 3: Commit**
```bash
git add backend/services/notificationService/provider/
git commit -m "feat(notificationService): add RPC provider"
```
---
### Task 13: 创建 notification service main.go
**Files:**
- Create: `backend/services/notificationService/main.go`
- [ ] **Step 1: 复制 socialService/main.go 模板**
```bash
cp backend/services/socialService/main.go backend/services/notificationService/main.go
```
- [ ] **Step 2: 修改包名、import、注册 service**
调整:
- `package main` 保持不变
- import 改为 `notificationService "github.com/topfans/backend/services/notificationService"`
- 端口 `20010`
- DB 配置与 social 一致
- 服务注册:`srv, _ := server.NewServer(...)` + 注册 `notificationPb.RegisterNotificationServiceServer(srv, notificationProvider)`
具体改完后代码结构与 social 保持一致,注册逻辑参考 socialService main.go line 80-150。
- [ ] **Step 3: 编译验证**
```bash
cd backend
go build ./services/notificationService/
```
Expected: 编译通过;生成 `notificationService` 可执行文件。
- [ ] **Step 4: 启动测试(可选,本地环境)**
```bash
cd backend
DB_HOST=localhost DB_PORT=5432 DB_USER=postgres DB_PASSWORD=xxx DB_NAME=top-fans \
./bin/notificationService -port 20010 &
sleep 2
curl http://localhost:20010/health
```
Expected: 进程启动health 200。
- [ ] **Step 5: 回归检查**
- `query_graph pattern=callers_of target=NotificationService` 确认未影响其他服务
- 既有 social/asset 等服务编译仍通过:`cd backend && go build ./...`
- [ ] **Step 6: Commit**
```bash
git add backend/services/notificationService/main.go
git commit -m "feat(notificationService): add main entry with dubbo bootstrap"
```
---
## 阶段 4social service 集成
### Task 14: 创建 notification clientsocial 端)
**Files:**
- Create: `backend/services/socialService/client/notification_client.go`
- [ ] **Step 1: 写 client**
```go
package client
import (
"context"
"fmt"
"dubbo.apache.org/dubbo-go/v3/client"
_ "dubbo.apache.org/dubbo-go/v3/imports"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"go.uber.org/zap"
)
type NotificationClient struct {
client notifPb.NotificationService
logger *zap.Logger
}
func NewNotificationClient(serviceURL string, logger *zap.Logger) (*NotificationClient, error) {
cli, err := client.NewClient(client.WithClientURL(serviceURL))
if err != nil {
return nil, fmt.Errorf("failed to create dubbo client: %w", err)
}
notifClient, err := notifPb.NewNotificationService(cli)
if err != nil {
return nil, fmt.Errorf("failed to create notification service client: %w", err)
}
return &NotificationClient{client: notifClient, logger: logger}, nil
}
func (c *NotificationClient) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
resp, err := c.client.CreateNotification(ctx, req)
if err != nil {
c.logger.Error("Failed to create notification", zap.Error(err))
return nil, err
}
return resp, nil
}
```
- [ ] **Step 2: 编译验证**
```bash
cd backend
go build ./services/socialService/client/...
```
Expected: 编译通过。
- [ ] **Step 3: Commit**
```bash
git add backend/services/socialService/client/notification_client.go
git commit -m "feat(socialService): add notification dubbo client"
```
---
### Task 15: 注入 NotificationClient 到 social service
**Files:**
- Modify: `backend/services/socialService/main.go`
- Modify: `backend/services/socialService/service/asset_like_service.go`
- [ ] **Step 1: 在 main.go 加 URL flag 与 client 初始化**
参考 `assetServiceURL` 添加:
```go
notificationServiceURL = flag.String("notification-service-url", getEnv("NOTIFICATION_SERVICE_URL", "tri://localhost:20010"), "Notification service URL")
```
在 main 函数 `NewAssetLikeService` 处加:
```go
notifClient, err := client.NewNotificationClient(*notificationServiceURL, logger.Logger)
if err != nil {
logger.Sugar.Fatalf("Failed to create notification client: %v", err)
}
```
修改 `NewAssetLikeService` 调用,把 `notifClient` 注入:
```go
assetLikeService := service.NewAssetLikeService(assetClient, socialRepo, notifClient)
```
- [ ] **Step 2: 修改 `AssetLikeService` 结构体**
`backend/services/socialService/service/asset_like_service.go` line 22-26
```go
type AssetLikeService struct {
assetClient *client.AssetClient
socialRepo repository.SocialRepository
notificationClient *client.NotificationClient
userClient UserServiceClient // 新增(如尚未注入)
}
func NewAssetLikeService(assetClient *client.AssetClient, socialRepo repository.SocialRepository, notificationClient *client.NotificationClient) *AssetLikeService {
return &AssetLikeService{
assetClient: assetClient,
socialRepo: socialRepo,
notificationClient: notificationClient,
}
}
```
- [ ] **Step 3: 修改 LikeAsset 末尾加通知调用**
`LikeAsset` 方法 line 132 之前return 之前)插入:
```go
// 创建点赞通知(失败仅日志,不影响点赞主路径)
s.createLikeNotification(ctx, getAssetResp, assetID, userID, starID)
```
在文件末尾添加 `createLikeNotification` 私有方法:
```go
func (s *AssetLikeService) createLikeNotification(ctx context.Context, asset *assetPb.GetAssetForRPCResponse, assetID, userID, starID int64) {
// 跳过异常数据
if asset.OwnerUid <= 0 {
logger.Logger.Warn("skip notification: invalid owner_uid",
zap.Int64("asset_id", assetID), zap.Int64("owner_uid", asset.OwnerUid))
return
}
// 查 actor 信息
actorName := strconv.FormatInt(userID, 10)
var actorAvatar string
if s.userClient != nil {
if actorInfo, err := s.userClient.GetUsersByIDs(ctx, []int64{userID}, starID); err == nil {
if info, exists := actorInfo[userID]; exists && info != nil {
actorName = info.Nickname
actorAvatar = info.Avatar
}
} else {
logger.Logger.Warn("failed to fetch actor info for notification",
zap.Int64("user_id", userID), zap.Error(err))
}
}
// 构造 data
data := map[string]interface{}{
"target_type": "asset",
"target_id": assetID,
"actor_id": userID,
"actor_name": actorName,
"actor_avatar": actorAvatar,
"asset_title": asset.Name,
"asset_cover": asset.CoverUrl,
"star_id": starID,
}
dataStruct, err := structpb.NewStruct(data)
if err != nil {
logger.Logger.Error("failed to build notification data struct",
zap.Int64("asset_id", assetID), zap.Error(err))
return
}
// 同步调 notification service
_, notifErr := s.notificationClient.CreateNotification(ctx, &notifPb.CreateNotificationRequest{
UserId: asset.OwnerUid,
StarId: starID,
Type: "like",
Title: "新点赞",
Content: fmt.Sprintf("%s 点赞了你的藏品", actorName),
Data: dataStruct,
})
if notifErr != nil {
logger.Logger.Error("failed to create like notification (like itself succeeded)",
zap.Int64("asset_id", assetID),
zap.Int64("actor_id", userID),
zap.Int64("owner_uid", asset.OwnerUid),
zap.Error(notifErr),
)
return
}
logger.Logger.Info("like notification created",
zap.Int64("asset_id", assetID), zap.Int64("owner_uid", asset.OwnerUid))
}
```
并在 import 块加 `"google.golang.org/protobuf/types/known/structpb"`
- [ ] **Step 4: 编译验证**
```bash
cd backend
go build ./services/socialService/...
```
Expected: 编译通过。
- [ ] **Step 5: 回归检查**
- `query_graph pattern=callers_of target=LikeAsset` 确认调用方无破坏
- 跑 social service 既有测试:`go test ./services/socialService/ -v`
- [ ] **Step 6: Commit**
```bash
git add backend/services/socialService/
git commit -m "feat(socialService): trigger notification on like (degrade on failure)"
```
---
### Task 16: 写点赞通知失败不影响主路径的测试
**Files:**
- Create/Modify: `backend/services/socialService/service/asset_like_service_test.go`
- Create: `backend/services/socialService/client/notification_client_mock.go`mock 工具,与 user_rpc_client.go 已有 MockUserServiceClient 模式一致)
- [ ] **Step 1: 先加 Mock 工具类**
`backend/services/socialService/client/notification_client_mock.go`
```go
package client
import (
"context"
"sync"
notifPb "github.com/topfans/backend/pkg/proto/notification"
)
// MockNotificationClient 通知客户端 mock用于测试
type MockNotificationClient struct {
mu sync.Mutex
CreateErr error
CreateCallCount int32
LastRequest *notifPb.CreateNotificationRequest
}
func (m *MockNotificationClient) CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.CreateCallCount++
m.LastRequest = req
if m.CreateErr != nil {
return nil, m.CreateErr
}
return &notifPb.CreateNotificationResponse{Id: 1}, nil
}
```
为了让 `AssetLikeService` 接受 mock而非具体类型 `*NotificationClient`),需要把 `notificationClient` 字段改为接口类型。改 `asset_like_service.go`
```go
type NotificationClientInterface interface {
CreateNotification(ctx context.Context, req *notifPb.CreateNotificationRequest) (*notifPb.CreateNotificationResponse, error)
}
type AssetLikeService struct {
assetClient *client.AssetClient
socialRepo repository.SocialRepository
notificationClient client.NotificationClientInterface // 改为接口
userClient UserServiceClient
}
```
- [ ] **Step 2: 加测试用例**
```go
func TestLikeAsset_NotificationFailureDoesNotFailLike(t *testing.T) {
// 注入 mock notification client 返回 error
// 期望 LikeAsset 仍返回成功
mockNotif := &client.MockNotificationClient{
CreateErr: fmt.Errorf("notification service down"),
}
svc := NewAssetLikeService(realAssetClient, realSocialRepo, mockNotif)
// 准备 mock assetClient 返回 GetAssetForRPC OK
newCount, err := svc.LikeAsset(ctx, assetID, userID, starID)
assert.NoError(t, err)
assert.Greater(t, newCount, int32(0))
}
func TestLikeAsset_NotificationSuccess(t *testing.T) {
mockNotif := &MockNotificationClient{}
svc := NewAssetLikeService(realAssetClient, realSocialRepo, mockNotif)
newCount, err := svc.LikeAsset(ctx, assetID, userID, starID)
assert.NoError(t, err)
assert.Equal(t, int32(1), mockNotif.CreateCallCount)
}
```
(具体 mock 实现参考 social service 既有的 `user_rpc_client.go``MockUserServiceClient` 模式)
- [ ] **Step 2: 运行测试**
```bash
cd backend
go test ./services/socialService/service/ -v -run TestLikeAsset
```
Expected: PASS。
- [ ] **Step 3: Commit**
```bash
git add backend/services/socialService/service/asset_like_service_test.go
git commit -m "test(socialService): verify notification failure doesn't fail like"
```
---
## 阶段 5Admin 后端集成
### Task 17: 创建 admin 端 Notification ORM 模型
**Files:**
- Create: `TopFans-activity-admin/backend/models/notification.py`
- [ ] **Step 1: 写 model**
```python
from sqlalchemy import Column, BigInteger, String, Boolean, Integer, JSON
from database import Base
class Notification(Base):
__tablename__ = "notifications"
id = Column(BigInteger, primary_key=True, index=True)
user_id = Column(BigInteger, nullable=False, index=True)
star_id = Column(BigInteger, nullable=False)
type = Column(String(20), nullable=False)
title = Column(String(200), nullable=False)
content = Column(String(500))
data = Column(JSON)
is_read = Column(Boolean, default=False, nullable=False)
is_deleted = Column(Boolean, default=False, nullable=False)
created_at = Column(BigInteger, nullable=False)
read_at = Column(BigInteger)
class NotificationStats(Base):
__tablename__ = "notification_stats"
user_id = Column(BigInteger, primary_key=True)
star_id = Column(BigInteger, primary_key=True)
like_unread_count = Column(Integer, default=0, nullable=False)
system_unread_count = Column(Integer, default=0, nullable=False)
activity_unread_count = Column(Integer, default=0, nullable=False)
total_unread_count = Column(Integer, default=0, nullable=False)
updated_at = Column(BigInteger, nullable=False)
```
- [ ] **Step 2: 在 `models/__init__.py` 注册**
`TopFans-activity-admin/backend/models/__init__.py` 追加:
```python
from .notification import Notification, NotificationStats
```
- [ ] **Step 3: Commit**
```bash
cd TopFans-activity-admin
git add backend/models/
git commit -m "feat(admin): add Notification and NotificationStats ORM models"
```
---
### Task 18: 创建 admin 端 notification CRUD
**Files:**
- Create: `TopFans-activity-admin/backend/crud/notification_crud.py`
- [ ] **Step 1: 写 CRUD**
```python
import time
from typing import Optional, List, Literal
from sqlalchemy.orm import Session
from sqlalchemy import text
from models.notification import Notification, NotificationStats
from models.models import User # 假设有 User 模型;如无则用 raw SQL
import logging
logger = logging.getLogger(__name__)
def _now_ms() -> int:
return int(time.time() * 1000)
def _resolve_recipients(
db: Session,
target_type: Literal["user", "star", "all"],
target_value: Optional[int],
) -> List[int]:
"""根据 target_type 解析接收方 user_id 列表"""
if target_type == "user":
return [target_value] if target_value else []
if target_type == "star":
# 查所有 star_id = target_value 的用户的 user_id
rows = db.execute(
text("SELECT user_id FROM user_fan_profiles WHERE star_id = :star_id"),
{"star_id": target_value},
).fetchall()
return [r[0] for r in rows]
if target_type == "all":
rows = db.execute(text("SELECT id FROM users WHERE is_active = TRUE")).fetchall()
return [r[0] for r in rows]
return []
def _upsert_stats(db: Session, user_id: int, star_id: int, ntype: str, delta: int, now: int):
col = {"like": "like_unread_count", "system": "system_unread_count", "activity": "activity_unread_count"}[ntype]
sql = f"""
INSERT INTO notification_stats (user_id, star_id, {col}, total_unread_count, updated_at)
VALUES (:uid, :sid, :delta, :delta, :now)
ON CONFLICT (user_id, star_id) DO UPDATE SET
{col} = notification_stats.{col} + :delta,
total_unread_count = notification_stats.total_unread_count + :delta,
updated_at = :now
"""
db.execute(text(sql), {"uid": user_id, "sid": star_id, "delta": delta, "now": now})
def create_system_notification(
db: Session,
*,
title: str,
content: str,
target_type: Literal["user", "star", "all"],
target_value: Optional[int],
data: Optional[dict] = None,
) -> int:
"""创建系统通知;返回插入条数"""
recipients = _resolve_recipients(db, target_type, target_value)
if not recipients:
logger.warning("no recipients for system notification")
return 0
now = _now_ms()
for uid in recipients:
n = Notification(
user_id=uid, star_id=1, # 系统通知默认 star_id=1如有多 star 需调整
type="system", title=title, content=content, data=data,
created_at=now,
)
db.add(n)
_upsert_stats(db, uid, 1, "system", 1, now)
db.commit()
return len(recipients)
def create_activity_notification(
db: Session,
*,
activity_id: int,
activity_title: str,
activity_cover: str,
recipients: List[int],
data: Optional[dict] = None,
) -> int:
"""创建活动通知;返回插入条数"""
if not recipients:
return 0
now = _now_ms()
payload = {
"activity_id": activity_id,
"activity_title": activity_title,
"activity_cover": activity_cover,
**(data or {}),
}
for uid in recipients:
n = Notification(
user_id=uid, star_id=1,
type="activity",
title=activity_title,
content="新活动上线",
data=payload,
created_at=now,
)
db.add(n)
_upsert_stats(db, uid, 1, "activity", 1, now)
db.commit()
return len(recipients)
```
- [ ] **Step 2: 写测试**
`TopFans-activity-admin/backend/crud/notification_crud_test.py`
```python
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from database import Base
import crud.notification_crud as nc
import time
@pytest.fixture
def db():
engine = create_engine("postgresql://postgres:test@localhost:5432/top_fans_test")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
def test_create_system_notification_single_user(db):
count = nc.create_system_notification(
db, title="维护通知", content="今晚 23:00 维护",
target_type="user", target_value=12345,
)
assert count == 1
# 验证 stats
row = db.execute("SELECT system_unread_count FROM notification_stats WHERE user_id=12345").first()
assert row[0] == 1
def test_create_system_notification_all(db):
# 假设测试环境 users 表至少有 3 个 is_active=TRUE 用户
count = nc.create_system_notification(
db, title="全量广播", content="重要通知",
target_type="all", target_value=None,
)
assert count >= 3
```
- [ ] **Step 3: 运行测试**
```bash
cd TopFans-activity-admin/backend
source venv/bin/activate
pytest crud/notification_crud_test.py -v
```
Expected: PASS。
- [ ] **Step 4: Commit**
```bash
cd TopFans-activity-admin
git add backend/crud/notification_crud.py backend/crud/notification_crud_test.py
git commit -m "feat(admin): add notification CRUD with multi-target broadcast"
```
---
### Task 19: 创建 admin 端 notification handler
**Files:**
- Create: `TopFans-activity-admin/backend/handlers/notification.py`
- [ ] **Step 1: 写 handler**
```python
from typing import Optional, Literal
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from database import get_db
from middleware.auth import require_admin # 复用现有 admin 鉴权
import crud.notification_crud as nc
router = APIRouter(prefix="/api/v1/admin/notifications", tags=["admin-notifications"])
class CreateSystemNotificationReq(BaseModel):
title: str
content: str
target_type: Literal["user", "star", "all"]
target_value: Optional[int] = None
data: Optional[dict] = None
@router.post("")
def create_system_notification(
req: CreateSystemNotificationReq,
db: Session = Depends(get_db),
admin=Depends(require_admin),
):
if not req.title or len(req.title) > 200:
raise HTTPException(400, "title required, <= 200 chars")
if req.content and len(req.content) > 500:
raise HTTPException(400, "content <= 500 chars")
if req.target_type in ("user", "star") and not req.target_value:
raise HTTPException(400, "target_value required for user/star type")
count = nc.create_system_notification(
db, title=req.title, content=req.content,
target_type=req.target_type, target_value=req.target_value,
data=req.data,
)
return {"affected": count}
@router.post("/activities/{activity_id}/broadcast")
def broadcast_activity(
activity_id: int,
db: Session = Depends(get_db),
admin=Depends(require_admin),
):
# 查 activity 信息
from models.models import MintingActivity
activity = db.query(MintingActivity).filter(MintingActivity.id == activity_id).first()
if not activity:
raise HTTPException(404, "activity not found")
# 解析接收方
recipients = nc._resolve_recipients(db, target_type="all", target_value=None)
count = nc.create_activity_notification(
db,
activity_id=activity_id,
activity_title=activity.title,
activity_cover=activity.icon_url or "",
recipients=recipients,
)
return {"affected": count}
```
- [ ] **Step 2: 注册路由**
修改 `TopFans-activity-admin/backend/router/__init__.py`
```python
from handlers import auth, activity, ..., notification
api_router.include_router(notification.router)
```
- [ ] **Step 3: 测试启动**
```bash
cd TopFans-activity-admin/backend
source venv/bin/activate
uvicorn main:app --reload --port 8080
curl -X POST http://localhost:8080/api/v1/admin/notifications \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"test","content":"hi","target_type":"user","target_value":12345}'
```
Expected: 200 + `{"affected": 1}`
- [ ] **Step 4: Commit**
```bash
cd TopFans-activity-admin
git add backend/handlers/notification.py backend/router/__init__.py
git commit -m "feat(admin): add notification admin HTTP endpoints"
```
---
### Task 20: admin activity handler 自动 broadcast
**Files:**
- Modify: `TopFans-activity-admin/backend/handlers/activity.py`
- [ ] **Step 1: 定位 create activity 函数**
```bash
grep -n "def create_minting_activity\|def.*create.*activity" \
TopFans-activity-admin/backend/handlers/activity.py
```
- [ ] **Step 2: 在创建成功后自动 broadcast**
参考 spec §7.6 写法,在 commit 之前插入 broadcast 调用:
```python
# activity 创建成功后立即广播
if getattr(payload, "auto_notify", True):
from crud import notification_crud
from handlers.notification import _resolve_recipients # 或直接调用 crud 内函数
recipients = notification_crud._resolve_recipients(db, "all", None)
notification_crud.create_activity_notification(
db, activity_id=created.id,
activity_title=created.title,
activity_cover=created.icon_url or "",
recipients=recipients,
)
# 然后 db.commit()
```
- [ ] **Step 3: 测试**
```bash
cd TopFans-activity-admin/backend
source venv/bin/activate
uvicorn main:app --reload --port 8080
# 通过 admin UI 或 curl 创建 activity
# 验证 notification_stats 的 activity_unread_count +N
```
- [ ] **Step 4: 回归检查**
- admin activity 既有测试仍通过
- 创建 activity 接口契约不变auto_notify 字段可选,默认 True
- [ ] **Step 5: Commit**
```bash
cd TopFans-activity-admin
git add backend/handlers/activity.py
git commit -m "feat(admin): auto-broadcast activity notification on creation"
```
---
## 阶段 6Gateway 路由配置
### Task 21: gateway 路由注册 notification 路径
**Files:**
- Modify: `backend/gateway/router/router.go`
- [ ] **Step 1: 定位 social 路由分组**
```bash
grep -n 'v1.Group("/social")\|v1.Group("/assets")' backend/gateway/router/router.go
```
- [ ] **Step 2: 在 social 路由后插入 notifications 路由组**
```go
notifications := v1.Group("/notifications")
{
notifications.GET("", controller.NotificationController.GetNotifications)
notifications.GET("/unread-count", controller.NotificationController.GetUnreadCount)
notifications.POST("/:id/read", controller.NotificationController.MarkAsRead)
notifications.POST("/targets/:target_id/read", controller.NotificationController.MarkAsReadByTarget)
notifications.POST("/read-all", controller.NotificationController.MarkAllAsRead)
notifications.DELETE("/:id", controller.NotificationController.DeleteNotification)
notifications.DELETE("/targets/:target_id", controller.NotificationController.DeleteByTarget)
}
```
- [ ] **Step 3: 实现 `NotificationController`**
`backend/gateway/controller/notification_controller.go`
```go
package controller
import (
"strconv"
"github.com/gin-gonic/gin"
notifPb "github.com/topfans/backend/pkg/proto/notification"
"github.com/topfans/backend/gateway/pkg/response"
"google.golang.org/grpc/metadata"
)
type NotificationController struct {
client notifPb.NotificationService
}
func NewNotificationController(client notifPb.NotificationService) *NotificationController {
return &NotificationController{client: client}
}
// withUserCtx 把 user_id / star_id 注入到 gRPC metadata与 social 端 controller 一致)
func (c *NotificationController) withUserCtx(g *gin.Context) (userID, starID int64) {
v1, _ := g.Get("user_id")
v2, _ := g.Get("star_id")
userID, _ = v1.(int64)
starID, _ = v2.(int64)
return
}
func parseInt(s string, def int) int {
if s == "" {
return def
}
n, err := strconv.Atoi(s)
if err != nil {
return def
}
return n
}
func (c *NotificationController) GetNotifications(g *gin.Context) {
uid, sid := c.withUserCtx(g)
ctx := metadata.AppendToOutgoingContext(g.Request.Context(),
"x-user-id", strconv.FormatInt(uid, 10),
"x-star-id", strconv.FormatInt(sid, 10),
)
resp, err := c.client.GetNotifications(ctx, &notifPb.GetNotificationsRequest{
Type: g.Query("type"),
Tab: g.Query("tab"),
Page: int32(parseInt(g.Query("page"), 1)),
PageSize: int32(parseInt(g.Query("pageSize"), 20)),
})
if err != nil {
response.Error(g, 500, "rpc failed: "+err.Error())
return
}
response.SuccessWithCode(g, int(resp.Base.Code), resp.Base.Message, gin.H{
"items": resp.Items,
"total": resp.Total,
"page": resp.Page,
"page_size": resp.PageSize,
})
}
// GetUnreadCount / MarkAsRead / MarkAsReadByTarget / MarkAllAsRead / DeleteNotification / DeleteByTarget
// 模式与 GetNotifications 相同withUserCtx 注入 metadata + 调用 RPC + response 输出
// 此处省略重复代码(参考 social_controller.go 中的现有方法)
```
- [ ] **Step 4: 编译验证**
```bash
cd backend
go build ./gateway/...
```
Expected: 编译通过。
- [ ] **Step 5: Commit**
```bash
git add backend/gateway/
git commit -m "feat(gateway): register notification HTTP routes"
```
---
### Task 22: gateway config 加 notification backend
**Files:**
- Modify: `backend/gateway/config/config.yaml`
- [ ] **Step 1: 加 notification service URL**
```yaml
services:
asset:
url: tri://localhost:20003
social:
url: tri://localhost:20002
user:
url: tri://localhost:20000
notification:
url: tri://localhost:20010
```
- [ ] **Step 2: 启动 gateway 验证**
```bash
cd backend
./start.sh gateway
curl http://localhost:8080/api/v1/notifications/unread-count -H "Authorization: Bearer $JWT"
```
Expected: 200 + counts 响应前提notification service 也在运行)。
- [ ] **Step 3: 回归检查**
- gateway 启动后其他路由仍正常(如 `/api/v1/social/...`
- `query_graph pattern=callers_of target=NotificationController` 确认新 controller 无循环依赖
- [ ] **Step 4: Commit**
```bash
git add backend/gateway/config/
git commit -m "feat(gateway): add notification service backend config"
```
---
## 阶段 7整体回归与验收
### Task 23: 整体回归检查CLAUDE.md 强制)
- [ ] **Step 1: 全量构建**
```bash
cd backend
go build ./...
cd ../TopFans-activity-admin/backend
source venv/bin/activate
python -c "from main import app; print('admin import OK')"
```
Expected: 全部编译通过。
- [ ] **Step 2: 全量测试**
```bash
cd backend
go test ./services/notificationService/... -v
go test ./services/socialService/... -v
go test ./services/assetService/... -v
```
Expected: 全部 PASS无 FAIL
```bash
cd TopFans-activity-admin/backend
source venv/bin/activate
pytest crud/notification_crud_test.py -v
```
Expected: PASS。
- [ ] **Step 3: graph 回归**
```bash
# 通过 mcp__code-review-graph__query_graph_tool
# pattern=callers_of target=GetAssetForRPC (确认 asset proto 扩展无破坏)
# pattern=callers_of target=LikeAsset (确认 social 改动无破坏)
# pattern=callers_of target=NotificationService (确认无循环依赖)
```
- [ ] **Step 4: 跨服务影响检查**
- social service `LikeAsset` 是否在 notification 失败时**确实不影响主路径**(手动验证 + 自动化测试)
- 修复 A 引入 B 排查:
- A1: asset proto 加字段 → B1: 旧调用方零值兼容(应无影响)
- A2: social service 加 notification 调用 → B2: notification service 不可用时点赞仍成功(已加测试)
- A3: gateway 路由注册 → B3: 既有路由冲突(手工验证)
- [ ] **Step 5: 端到端冒烟**
启动完整服务栈asset / social / user / notification / gateway手工验证
1. 用户 A 点赞用户 B 的藏品 → 用户 B 收到通知
2. 用户 B 查看通知列表 → 看到 1 张聚合卡(如多个人都赞了同一个)
3. 用户 B 点未读数 → 数字减 1
4. 用户 B 标已读MarkAsReadByTarget→ 数字变 0
5. 用户 B 删除DeleteByTarget→ 列表中该 target 消失
- [ ] **Step 6: 验收清单交叉检查**
对照 `docs/superpowers/specs/2026-06-16-notification-system-design.md` §12 验收清单逐项打勾。
---
## 任务依赖图
```
T1 (迁移)
├→ T2 (asset proto 扩展)
│ └→ T3 (asset provider 实现)
├→ T4 (notification proto)
│ └→ T5-T6 (config + model)
│ └→ T7-T8 (repositories)
│ └→ T9 (repo tests)
│ └→ T10 (service)
│ └→ T11 (service tests)
│ └→ T12 (provider)
│ └→ T13 (main)
└→ T14 (social client)
└→ T15 (social 集成)
└→ T16 (social 测试)
└→ T17-T20 (admin)
└→ T21-T22 (gateway)
└→ T23 (整体回归)
```
---
## 关键原则
- **TDD**:写测试 → 跑(红)→ 写实现 → 跑(绿)→ commit
- **DRY**common 错误处理函数 `okBase/errBase/extractUserStarFromContext`
- **YAGNI**:不实现 unique 约束、TTL、空 type=全部的查询
- **频繁 commit**:每个 Task 末尾都有 commit
- **回归检查**:每个修改任务末尾都有 `query_graph` + `go build` 验证