667 lines
26 KiB
Markdown
667 lines
26 KiB
Markdown
# 活动实时推送(留言 + 贡献)WebSocket 实施规格
|
||
|
||
> 本文档**替代** `backend/docs/活动留言板接口文档.md` 中关于 HTTP 接口与轮询的部分,作为留言板与贡献(购买道具)实时推送的**实施规格**。
|
||
>
|
||
> 范围:单一活动页(`pages/support-activity/index.vue`)的留言板(`MessageBoard`)与贡献列表(`ContributionList`)统一走 WebSocket 推送,长轮询仅作断线降级。
|
||
|
||
---
|
||
|
||
## 0. 与原文档的关系
|
||
|
||
| 章节 | 状态 | 原因 |
|
||
|------|------|------|
|
||
| 1. 背景与现状 | 保留并更新 | MessageBoard 仍为纯展示组件;贡献列表已从 mock 切到 `useContributionPolling` |
|
||
| 3. 数据库设计 | **保留并新增 activity_messages 表** | 留言需要持久化;`activity_contributions` 表已存在,无须新建 |
|
||
| 4. Proto 定义 | **保留并精简** | 删除 GetLatestActivityMessages 轮询接口;新增 messages 相关 RPC |
|
||
| 5. HTTP 接口设计 | **改造** | 留言列表 / 发送走 HTTP;增量获取贡献走 WebSocket(HTTP 保留为降级) |
|
||
| 6. 错误码扩展 | 保留 | 复用 |
|
||
| 7. 分层实现 | 改造 | 新增 WebSocket Hub 层 |
|
||
| 8. 前端集成要点 | **重写** | 改用 ActivitySocket + useContributionRealtime + useMessageRealtime |
|
||
| 9. 缓存与性能 | 保留并补充 | 新增 Redis Pub/Sub channel |
|
||
| 10. 测试用例 | 改造 | 增加 WS 相关测试 |
|
||
| 11. 开放问题 | 收口 | 留言敏感词 / WebSocket 推送方案由本文档确定 |
|
||
| 12. 变更影响面 | 保留 | |
|
||
|
||
---
|
||
|
||
## 1. 背景与现状
|
||
|
||
### 1.1 前端组件
|
||
|
||
`MessageBoard.vue` 是**纯展示组件**(presentational component),通过 props.messages 接收留言列表。`ContributionList.vue` 通过 `useContributionPolling.js` 每 1 秒轮询一次 `getActivityContributionsLatestApi`,按 `highest_id` 增量取最新 5 条。
|
||
|
||
两个组件都挂在 `pages/support-activity/index.vue` 同一页面,期望**实时**显示:
|
||
|
||
- 用户 A 发出留言 → 同一页面下其他用户能立刻看到
|
||
- 用户 B 购买道具 → 同一页面下所有用户的 `ContributionList` 立刻新增一条气泡
|
||
|
||
### 1.2 后端现状
|
||
|
||
- `services/activityService` 已有 `PurchaseItem` / `BatchPurchaseItem` 写入 `activity_contributions` 表
|
||
- `activity_contributions` 表已存在(`migrate_create_activity_contributions_table.sql`),但**没有 Pub/Sub 广播**
|
||
- 数据库**没有** `activity_messages` 表(需要新建)
|
||
- Redis 已用于连击计数 `combo:{user_id}:{item_type}`,**未**用于 Pub/Sub
|
||
- `backend/gateway/socket/ai_chat_socket.go` 已有 WebSocket Hub 实现,可参照其模式新增活动 Hub
|
||
|
||
### 1.3 前端 WebSocket 现状
|
||
|
||
- `frontend/utils/socket/SocketManager.js` 基础类
|
||
- `frontend/utils/socket/AiChatSocket.js` AI Chat WS 实现
|
||
- `frontend/utils/socket/GlobalSocketManager.js` 统一管理多服务连接
|
||
- 需要新增 `ActivitySocket.js` 走 `/activity`
|
||
|
||
---
|
||
|
||
## 2. 设计目标
|
||
|
||
1. 留言发送走 HTTP POST(可重试,保留接口文档)
|
||
2. 留言接收 / 贡献接收统一走 WebSocket 推送
|
||
3. 单一连接 + topic 频道(`messages` / `contributions`),不增加额外 TCP 连接
|
||
4. WS 断线时前端自动降级为 `getActivityContributionsLatestApi` 轮询
|
||
5. WS 重连后用 `highest_id` 拉差量,补齐漏推
|
||
6. 不破坏现有 `MessageBoard.vue` props 契约,前端只换数据源
|
||
|
||
---
|
||
|
||
## 3. 数据库设计
|
||
|
||
### 3.1 新增表 `activity_messages`
|
||
|
||
```sql
|
||
-- 2026_06_22_012_activity_messages.sql
|
||
CREATE TABLE IF NOT EXISTS public.activity_messages (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
activity_id BIGINT NOT NULL,
|
||
user_id BIGINT NOT NULL,
|
||
star_id BIGINT NOT NULL,
|
||
content VARCHAR(500) NOT NULL,
|
||
status SMALLINT NOT NULL DEFAULT 0,
|
||
created_at BIGINT NOT NULL,
|
||
updated_at BIGINT NOT NULL,
|
||
deleted_at BIGINT,
|
||
CONSTRAINT fk_messages_activity
|
||
FOREIGN KEY (activity_id) REFERENCES public.activities(id) ON DELETE CASCADE,
|
||
CONSTRAINT fk_messages_user
|
||
FOREIGN KEY (user_id) REFERENCES public.users(id),
|
||
CONSTRAINT fk_messages_star
|
||
FOREIGN KEY (star_id) REFERENCES public.stars(star_id)
|
||
);
|
||
|
||
CREATE SEQUENCE IF NOT EXISTS activity_messages_id_seq START WITH 10000;
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_activity_messages_activity_created
|
||
ON public.activity_messages (activity_id, created_at DESC, id DESC)
|
||
WHERE deleted_at IS NULL;
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_activity_messages_user_created
|
||
ON public.activity_messages (user_id, created_at DESC)
|
||
WHERE deleted_at IS NULL;
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_activity_messages_activity_incr
|
||
ON public.activity_messages (activity_id, id DESC)
|
||
WHERE deleted_at IS NULL;
|
||
|
||
COMMENT ON TABLE public.activity_messages IS '活动留言表';
|
||
COMMENT ON COLUMN public.activity_messages.id IS '主键,自增';
|
||
COMMENT ON COLUMN public.activity_messages.activity_id IS 'FK -> activities.id';
|
||
COMMENT ON COLUMN public.activity_messages.user_id IS '留言用户 ID';
|
||
COMMENT ON COLUMN public.activity_messages.star_id IS '所属明星/星球 ID';
|
||
COMMENT ON COLUMN public.activity_messages.content IS '留言正文,1-500 字';
|
||
COMMENT ON COLUMN public.activity_messages.status IS '0=正常|1=隐藏|2=已删除';
|
||
COMMENT ON COLUMN public.activity_messages.created_at IS '留言时间,毫秒时间戳';
|
||
COMMENT ON COLUMN public.activity_messages.updated_at IS '更新时间,毫秒时间戳';
|
||
COMMENT ON COLUMN public.activity_messages.deleted_at IS '软删除时间';
|
||
```
|
||
|
||
> 序列起始 10000 遵循 `CLAUDE.md` 规范。序列健康检查由 `assets_id_seq` 模式扩展为 `pg_sequences WHERE sequencename = 'activity_messages_id_seq'`。
|
||
|
||
### 3.2 现有 `activity_contributions` 表(沿用)
|
||
|
||
字段已包含 `id, activity_id, user_id, star_id, item_id, item_type, item_name, item_icon, quantity, combo_count, created_at`,详见 `migrate_create_activity_contributions_table.sql`。无结构改动。
|
||
|
||
---
|
||
|
||
## 4. Proto 定义
|
||
|
||
在 `backend/proto/activity.proto` 末尾追加:
|
||
|
||
```protobuf
|
||
// ============== 留言相关 RPC ==============
|
||
|
||
message ActivityMessage {
|
||
int64 id = 1;
|
||
int64 activity_id = 2;
|
||
int64 user_id = 3;
|
||
int64 star_id = 4;
|
||
string nickname = 5;
|
||
string avatar_url = 6;
|
||
string content = 7;
|
||
int64 created_at = 8;
|
||
}
|
||
|
||
message ListActivityMessagesRequest {
|
||
int64 activity_id = 1;
|
||
int32 page = 2; // 默认 1
|
||
int32 page_size = 3; // 默认 20,最大 50
|
||
}
|
||
message ListActivityMessagesResponse {
|
||
topfans.common.BaseResponse base = 1;
|
||
repeated ActivityMessage messages = 2;
|
||
int32 page = 3;
|
||
int32 page_size = 4;
|
||
int32 total = 5;
|
||
}
|
||
|
||
message CreateActivityMessageRequest {
|
||
int64 activity_id = 1;
|
||
int64 user_id = 2; // 从 JWT 解析,gateway 注入
|
||
int64 star_id = 3;
|
||
string content = 4;
|
||
}
|
||
message CreateActivityMessageResponse {
|
||
topfans.common.BaseResponse base = 1;
|
||
ActivityMessage message = 2;
|
||
}
|
||
|
||
service ActivityService {
|
||
// ... 现有 RPC ...
|
||
|
||
// 列出活动留言(首次/下拉加载用)
|
||
rpc ListActivityMessages(ListActivityMessagesRequest) returns (ListActivityMessagesResponse) {
|
||
option (google.api.http) = {
|
||
get: "/api/v1/activities/{activity_id}/messages"
|
||
};
|
||
}
|
||
|
||
// 发送一条留言
|
||
rpc CreateActivityMessage(CreateActivityMessageRequest) returns (CreateActivityMessageResponse) {
|
||
option (google.api.http) = {
|
||
post: "/api/v1/activities/{activity_id}/messages"
|
||
body: "*"
|
||
};
|
||
}
|
||
}
|
||
```
|
||
|
||
> **删除** 原文档中的 `GetLatestActivityMessages` 增量接口,理由:贡献实时走 WS,留言实时也走 WS,HTTP 增量接口不必要。
|
||
|
||
---
|
||
|
||
## 5. HTTP 接口设计
|
||
|
||
### 5.1 通用约定
|
||
|
||
- Base URL:`/api/v1`
|
||
- 鉴权:JWT(`middleware.AuthMiddleware()`)
|
||
- 响应结构:沿用 `pkg/response.Response`(`{code, message, data}`)
|
||
- 错误码:沿用 `pkg/errors` 的 gRPC code 映射
|
||
|
||
### 5.2 接口清单
|
||
|
||
| # | 方法 | 路径 | 说明 | 鉴权 |
|
||
|-----|------|------|------|------|
|
||
| 1 | GET | `/api/v1/activities/{activity_id}/messages` | 列出活动留言(首次/下拉) | 需要 |
|
||
| 2 | POST | `/api/v1/activities/{activity_id}/messages` | 发送一条留言 | 需要 |
|
||
| 3 | GET | `/api/v1/activities/{activity_id}/contributions/latest` | 增量获取最新贡献(**已存在,仅作 WS 断线降级**) | 需要 |
|
||
|
||
> 接口 #3 是已有的 `GetLatestContributions`(proto `activity.proto:296`),**不需新写**,只在前端降级路径调用。
|
||
>
|
||
> 注:原 `ListActivityMessages` / `CreateActivityMessage` 的请求/响应字段、错误码、业务规则沿用 `活动留言板接口文档.md §5.3 ~ §5.4`,不在本文档重复。差异点:
|
||
> - 列表查询去掉"分页 20",首版固定 page=1, page_size=20 即可
|
||
> - 错误码增加 `ErrActivityMessageActivityInactive` 映射(活动状态非 active)
|
||
|
||
---
|
||
|
||
## 6. WebSocket 协议
|
||
|
||
### 6.1 连接
|
||
|
||
- 路径:`ws://{host}/activity?token={JWT}`
|
||
- 鉴权:URL token 参数(参考 `ai_chat_socket.go` 的 `validateToken`)
|
||
- 失败:返回 HTTP 401,body `{"type":"auth_response","success":false,"error":"invalid_token"}`
|
||
- 成功:服务端立即 push 一条 `auth_response`:`{"type":"auth_response","success":true,"user_id":..,"star_id":..}`
|
||
- 心跳:30s 客户端 `ping` / 服务端 `pong`(与 ai-chat 一致)
|
||
- 重连:客户端使用 **1s, 2s, 4s, 8s, 16s, 30s 指数退避**,**前 5 次按退避重试;之后以 30s 固定间隔持续重试**(不停止),确保弱网下也能恢复
|
||
|
||
### 6.2 客户端 → 服务端
|
||
|
||
| action | body | 说明 |
|
||
|--------|------|------|
|
||
| `ping` | `{}` | 心跳 |
|
||
| `subscribe` | `{activity_id, topics:["messages","contributions"]}` | 订阅活动主题 |
|
||
| `unsubscribe` | `{activity_id, topics:[...]}` | 取消订阅 |
|
||
|
||
> subscribe 是**幂等**的:同一 (activity_id, topic) 多次订阅只算一次。unsubscribe 同理。
|
||
|
||
### 6.3 服务端 → 客户端
|
||
|
||
| type | body | 触发时机 |
|
||
|------|------|---------|
|
||
| `auth_response` | `{success, user_id, star_id}` | 连接成功 |
|
||
| `pong` | `{}` | 收到 `ping` |
|
||
| `subscribe_response` | `{activity_id, topics:[...]}` | subscribe 成功 |
|
||
| `unsubscribe_response` | `{activity_id, topics:[...]}` | unsubscribe 成功 |
|
||
| `messages_response` | `{activity_id, message:{ActivityMessage}}` | activity_messages 表新写入 |
|
||
| `contributions_response` | `{activity_id, record:{ContributionRecord}}` | activity_contributions 表新写入 |
|
||
| `error` | `{code, message}` | 业务错误(如订阅非法 topic) |
|
||
|
||
### 6.4 Pub/Sub 频道定义
|
||
|
||
| Channel | Publish 端 | Subscribe 端 |
|
||
|---------|-----------|--------------|
|
||
| `act:{activity_id}:messages` | `activityService.CreateActivityMessage` | `gateway/socket/activity_socket.go` |
|
||
| `act:{activity_id}:contributions` | `activityService.PurchaseItem` / `BatchPurchaseItem` | `gateway/socket/activity_socket.go` |
|
||
|
||
发布 payload 格式(与 push 协议一致):
|
||
```json
|
||
{"activity_id": 42, "type": "messages_response", "message": {...}}
|
||
{"activity_id": 42, "type": "contributions_response", "record": {...}}
|
||
```
|
||
|
||
---
|
||
|
||
## 7. 后端实现
|
||
|
||
### 7.1 分层结构
|
||
|
||
```
|
||
HTTP POST /messages
|
||
│
|
||
▼
|
||
gateway/controller/activity_controller.go ← ListActivityMessages / CreateActivityMessage
|
||
│
|
||
│ Dubbo / gRPC (Triple)
|
||
▼
|
||
services/activityService/provider/activity_provider.go ← RPC 入口
|
||
│
|
||
▼
|
||
services/activityService/service/activity_service.go ← 业务编排
|
||
│ 1. 写 PG
|
||
│ 2. Redis Publish "act:{id}:messages"
|
||
▼
|
||
services/activityService/repository/ ← SQL 封装
|
||
|
||
WebSocket /activity
|
||
│
|
||
▼
|
||
gateway/socket/activity_socket.go ← ActivityHub
|
||
│ 1. 启动时 psubscribe "act:*:messages" / "act:*:contributions"
|
||
│ 2. 收到 Publish → 按 (activity_id, topic) 路由到本地连接
|
||
│ 3. writeJSON push 给客户端
|
||
▼
|
||
前端
|
||
```
|
||
|
||
### 7.2 待新增 / 修改文件清单
|
||
|
||
| 类型 | 文件 | 说明 |
|
||
|------|------|------|
|
||
| 新增 | `backend/migrations/2026_06_22_012_activity_messages.sql` | 建表 + 索引 + 注释 |
|
||
| 修改 | `backend/proto/activity.proto` | 追加 3 个 message + 2 个 RPC |
|
||
| 新增 | `backend/services/activityService/repository/activity_messages_repository.go` | 仓储层 |
|
||
| 修改 | `backend/services/activityService/service/activity_service.go` | 追加 2 个业务方法 + 在 `PurchaseItem` / `BatchPurchaseItem` / `CreateActivityMessage` 末尾 Redis Publish |
|
||
| 修改 | `backend/services/activityService/provider/activity_provider.go` | 追加 2 个 RPC 入口(仅做参数透传,业务在 service 层) |
|
||
| 修改 | `backend/services/activityService/repository/activity_repository.go` | 不动;新建 `activity_messages_repository.go` 与之并列 |
|
||
| 新增 | `backend/gateway/socket/activity_socket.go` | ActivityHub(仿 `ai_chat_socket.go` 模式,但**不修改** ai-chat 现有代码) |
|
||
| 修改 | `backend/gateway/router/router.go` | 注册 `r.GET("/activity", ...)` |
|
||
| 修改 | `backend/gateway/main.go` | 创建 ActivityHub 并注入 |
|
||
| 修改 | `backend/gateway/config/config.go` | 加 `WebSocket.ActivityPath` 字段 |
|
||
| 修改 | `backend/services/activityService/configs/config.yaml` | 加 `message_rate_limit_per_min: 5` / `message_limit_per_activity: 100` |
|
||
| 修改 | `backend/pkg/errors/errors.go` | 加 7 个留言错误变量 + ToGRPCCode 映射 |
|
||
| 修改 | `backend/gateway/pkg/response/response.go` | 加 6 条中文错误映射 |
|
||
| 新增 | `frontend/utils/socket/ActivitySocket.js` | 继承 SocketManager |
|
||
| 修改 | `frontend/utils/socket/GlobalSocketManager.js` | 加 `_initActivity()` |
|
||
| 新增 | `frontend/utils/socket/activityHandlers.js` | 注册 messages / contributions 处理器 |
|
||
| 新增 | `frontend/pages/support-activity/composables/useContributionRealtime.js` | 替代 useContributionPolling |
|
||
| 新增 | `frontend/pages/support-activity/composables/useMessageRealtime.js` | 替代 mock |
|
||
| 修改 | `frontend/utils/api.js` | 加 `listActivityMessagesApi` / `createActivityMessageApi` |
|
||
| 修改 | `frontend/pages/support-activity/index.vue` | 接入 useMessageRealtime / useContributionRealtime |
|
||
| 修改 | `frontend/pages/support-activity/components/MessageBoard.vue` | props 不变(仅消费端) |
|
||
|
||
### 7.3 service 层要点
|
||
|
||
`CreateActivityMessage` 业务逻辑:
|
||
|
||
```go
|
||
func (s *activityService) CreateActivityMessage(ctx context.Context, req *pb.CreateActivityMessageRequest) (*pb.CreateActivityMessageResponse, error) {
|
||
// 1. 入参校验
|
||
if strings.TrimSpace(req.Content) == "" { return nil, ErrActivityMessageContentEmpty }
|
||
if utf8.RuneCountInString(req.Content) > 500 { return nil, ErrActivityMessageContentTooLong }
|
||
|
||
// 2. 活动状态
|
||
activity, _ := s.activityRepo.GetActivityByID(req.ActivityId)
|
||
if activity == nil { return nil, ErrActivityNotFound }
|
||
if activity.Status != "active" { return nil, ErrActivityMessageActivityInactive }
|
||
|
||
// 3. 频控(Redis INCR + EXPIRE)
|
||
rateKey := fmt.Sprintf("msg:rate:%d:%d", req.ActivityId, req.UserId)
|
||
count, _ := s.redisClient.Incr(ctx, rateKey).Result()
|
||
if count == 1 { s.redisClient.Expire(ctx, rateKey, 60*time.Second) }
|
||
if count > cfg.MessageRateLimitPerMin { return nil, ErrActivityMessageTooFrequent }
|
||
|
||
// 4. 累计上限
|
||
total, _ := s.messagesRepo.CountByUserActivity(ctx, req.ActivityId, req.UserId)
|
||
if total >= cfg.MessageLimitPerActivity { return nil, ErrActivityMessageLimitReached }
|
||
|
||
// 5. 敏感词(首版本地词表,后续接 dify)
|
||
if containsBannedWord(req.Content) { return nil, ErrActivityMessageContentInvalid }
|
||
|
||
// 6. 写入 + 回查昵称头像
|
||
now := time.Now().UnixMilli()
|
||
msgID, _ := s.messagesRepo.Insert(ctx, repository.ActivityMessage{...})
|
||
profile, _ := s.userRPCClient.GetFanProfile(ctx, req.UserId, req.StarId)
|
||
|
||
msg := buildActivityMessage(msgID, req, profile, now)
|
||
|
||
// 7. Redis Publish(不论客户端是否订阅都要发,方便后续接入审计/分析)
|
||
payload, _ := json.Marshal(map[string]interface{}{
|
||
"activity_id": req.ActivityId,
|
||
"type": "messages_response",
|
||
"message": msg,
|
||
})
|
||
s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:messages", req.ActivityId), payload)
|
||
|
||
return &pb.CreateActivityMessageResponse{
|
||
Base: &pbCommon.BaseResponse{Code: uint32(codes.OK), Message: "ok"},
|
||
Message: msg,
|
||
}, nil
|
||
}
|
||
```
|
||
|
||
`PurchaseItem` 末尾追加:
|
||
|
||
```go
|
||
// 在 PurchaseItem / BatchPurchaseItem 写库成功后
|
||
payload, _ := json.Marshal(map[string]interface{}{
|
||
"activity_id": req.ActivityId,
|
||
"type": "contributions_response",
|
||
"record": buildContributionRecord(inserted),
|
||
})
|
||
s.redisClient.Publish(ctx, fmt.Sprintf("act:%d:contributions", req.ActivityId), payload)
|
||
```
|
||
|
||
### 7.4 ActivityHub 关键代码骨架
|
||
|
||
```go
|
||
// gateway/socket/activity_socket.go
|
||
type ActivityHub struct {
|
||
clients map[int64]*ActivityConn // userId -> conn
|
||
subscriptions map[string]map[*ActivityConn]struct{} // "act:42:messages" -> conns
|
||
redisClient *redis.Client
|
||
activityPath string
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
type ActivityConn struct {
|
||
UserID int64
|
||
StarID int64
|
||
Conn *websocket.Conn
|
||
Send chan []byte
|
||
Hub *ActivityHub
|
||
}
|
||
|
||
func (h *ActivityHub) Run(ctx context.Context) {
|
||
// 启动时 psubscribe 全部 act:*:messages / act:*:contributions
|
||
sub := h.redisClient.PSubscribe(ctx, "act:*:messages", "act:*:contributions")
|
||
ch := sub.Channel()
|
||
for msg := range ch {
|
||
var payload map[string]interface{}
|
||
json.Unmarshal([]byte(msg.Payload), &payload)
|
||
h.fanout(msg.Channel, payload)
|
||
}
|
||
}
|
||
|
||
func (h *ActivityHub) fanout(channel string, payload map[string]interface{}) {
|
||
h.mu.RLock()
|
||
conns := h.subscriptions[channel]
|
||
h.mu.RUnlock()
|
||
for c := range conns {
|
||
c.writeJSON(payload)
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 8. 前端实现
|
||
|
||
### 8.1 字段映射(MessageBoard)
|
||
|
||
| 后端字段 | 前端 props | 转换 |
|
||
|---------|-----------|------|
|
||
| `id` | `id` | 直接 |
|
||
| `nickname` | `user` | 直接 |
|
||
| `avatar_url` | `avatar` | 直接 |
|
||
| `content` | `content` | 直接 |
|
||
| `created_at`(ms) | `time` | `formatRelativeTime(ms)` |
|
||
| — | `isSelf` | `user_id === currentUserId` |
|
||
|
||
### 8.2 ActivitySocket API
|
||
|
||
```js
|
||
// frontend/utils/socket/ActivitySocket.js
|
||
class ActivitySocket extends SocketManager {
|
||
connect(token) { /* 同 AiChat */ }
|
||
subscribe(activityId, topics)
|
||
unsubscribe(activityId, topics)
|
||
// 注册回调
|
||
onMessagesResponse(cb) // cb({activity_id, message})
|
||
onContributionsResponse(cb) // cb({activity_id, record})
|
||
}
|
||
```
|
||
|
||
### 8.3 useContributionRealtime 钩子
|
||
|
||
> 设计要点:**WS 优先**,轮询仅在 WS 断线时启动;轮询与 WS **互斥**,不会同时跑(避免重复记录)。
|
||
|
||
```js
|
||
// composables/useContributionRealtime.js
|
||
export function useContributionRealtime(activityId, isPageActive) {
|
||
// 复用 useContributionPolling 的 records / highestId / 增量合并逻辑
|
||
// 但不直接调 start(),由本钩子按 WS 状态决定何时调 start/stop
|
||
const { records, start, stop, reset, highestIdRef } = useContributionPolling(
|
||
activityId, isPageActive
|
||
)
|
||
const socket = getActivitySocket()
|
||
let usingWS = false // 默认 false,等 WS onConnect 后才置 true
|
||
|
||
function onWsMessage(payload) {
|
||
if (payload.activity_id !== activityId.value) return
|
||
// 与轮询同款 highest_id 增量逻辑
|
||
if (payload.record.id > highestIdRef.value) {
|
||
records.value = [payload.record, ...records.value].slice(0, MAX_RECORDS)
|
||
highestIdRef.value = payload.record.id
|
||
}
|
||
}
|
||
|
||
function onWsConnect() {
|
||
if (usingWS) return
|
||
usingWS = true
|
||
stop() // 停掉可能的轮询
|
||
socket.subscribe(activityId.value, ['contributions'])
|
||
}
|
||
|
||
function onWsDisconnect() {
|
||
if (!usingWS) return
|
||
usingWS = false
|
||
// 降级:开始轮询(highestId 已保留,避免重拉)
|
||
start()
|
||
}
|
||
|
||
socket.onContributionsResponse(onWsMessage)
|
||
socket.on('connect', onWsConnect)
|
||
socket.on('disconnect', onWsDisconnect)
|
||
|
||
// 组件挂载时若已连上立即订阅
|
||
onMounted(() => {
|
||
if (socket.isConnected) onWsConnect()
|
||
})
|
||
|
||
// 卸载清理
|
||
onUnmounted(() => {
|
||
if (usingWS) socket.unsubscribe(activityId.value, ['contributions'])
|
||
socket.off('connect', onWsConnect)
|
||
socket.off('disconnect', onWsDisconnect)
|
||
socket.offContributionsResponse(onWsMessage)
|
||
stop()
|
||
})
|
||
|
||
return { records }
|
||
}
|
||
```
|
||
|
||
> **注意**:`useContributionPolling` 当前不暴露 `highestIdRef`,需要小幅重构让它返回 `{records, start, stop, reset, highestIdRef, latestTimestampRef}`,或在本钩子内自己维护 `highestIdRef`。实施时二选一。
|
||
|
||
### 8.4 useMessageRealtime 钩子
|
||
|
||
```js
|
||
// composables/useMessageRealtime.js
|
||
export function useMessageRealtime(activityId) {
|
||
const messages = ref([])
|
||
const currentUserId = computed(() => store.state.user?.userInfo?.uid)
|
||
const socket = getActivitySocket()
|
||
|
||
async function loadHistory() {
|
||
const res = await listActivityMessagesApi(activityId.value)
|
||
if (res.code === 0) {
|
||
messages.value = res.data.messages.map(toComponentShape)
|
||
}
|
||
}
|
||
|
||
function onMessage(payload) {
|
||
if (payload.activity_id !== activityId.value) return
|
||
messages.value.push(toComponentShape(payload.message))
|
||
if (messages.value.length > 50) messages.value.shift() // 上限保护
|
||
}
|
||
|
||
async function sendMessage(content) {
|
||
const res = await createActivityMessageApi(activityId.value, content)
|
||
if (res.code === 0) {
|
||
// 成功时不本地 push,等 WS 推回来(避免重复)
|
||
// 若 WS 断线:服务端不会推送,由前端 fallback 把 res.data.message 插入本地
|
||
if (!socket.isConnected) {
|
||
messages.value.push(toComponentShape(res.data.message))
|
||
if (messages.value.length > 50) messages.value.shift()
|
||
}
|
||
} else {
|
||
uni.showToast({ title: res.message || '留言失败', icon: 'none' })
|
||
}
|
||
}
|
||
|
||
onMounted(() => {
|
||
loadHistory()
|
||
socket.subscribe(activityId.value, ['messages'])
|
||
socket.onMessagesResponse(onMessage)
|
||
})
|
||
onUnmounted(() => {
|
||
socket.unsubscribe(activityId.value, ['messages'])
|
||
})
|
||
|
||
return { messages, sendMessage }
|
||
}
|
||
```
|
||
|
||
### 8.5 index.vue 集成
|
||
|
||
```vue
|
||
<script setup>
|
||
const { messages: messageList, sendMessage } = useMessageRealtime(activityId)
|
||
const { records } = useContributionRealtime(activityId, isPageActive)
|
||
|
||
function handleSendMessage(text) {
|
||
sendMessage(text)
|
||
}
|
||
</script>
|
||
|
||
<template>
|
||
<MessageBoard :messages="messageList" />
|
||
<ContributionList :records="records" />
|
||
</template>
|
||
```
|
||
|
||
---
|
||
|
||
## 9. 缓存与 Pub/Sub
|
||
|
||
- **频控**:`msg:rate:{activity_id}:{user_id}` INCR + EXPIRE 60s;上限由 `config.yaml` 的 `message_rate_limit_per_min` 控制(默认 5)
|
||
- **累计上限**:`message_limit_per_activity`(默认 100),超过时拒绝发送
|
||
- **连击**:`combo:{user_id}:{item_type}` 沿用现有
|
||
- **Pub/Sub channel**:`act:{activity_id}:messages` / `act:{activity_id}:contributions`
|
||
- **不缓存历史**:实时数据不适合缓存;留言 / 贡献查询走 DB
|
||
|
||
---
|
||
|
||
## 10. 错误码扩展
|
||
|
||
在 `backend/pkg/errors/errors.go` 追加:
|
||
|
||
```go
|
||
var (
|
||
ErrActivityMessageNotFound = errors.New("活动留言不存在")
|
||
ErrActivityMessageTooFrequent = errors.New("留言太频繁,请稍后再试")
|
||
ErrActivityMessageLimitReached = errors.New("当前活动留言已达上限")
|
||
ErrActivityMessageContentEmpty = errors.New("留言内容不能为空")
|
||
ErrActivityMessageContentTooLong = errors.New("留言内容过长,最多500字")
|
||
ErrActivityMessageContentInvalid = errors.New("留言内容包含不当内容")
|
||
ErrActivityMessageActivityInactive = errors.New("活动不在进行中")
|
||
)
|
||
```
|
||
|
||
ToGRPCCode 映射 + 中文 errorMap 与 `活动留言板接口文档.md §6` 一致。
|
||
|
||
---
|
||
|
||
## 11. 测试用例
|
||
|
||
### 11.1 service 层
|
||
|
||
- [ ] CreateActivityMessage:合法 content → 写库 + Publish `act:{id}:messages`
|
||
- [ ] 空 content → ErrActivityMessageContentEmpty
|
||
- [ ] 超 500 字 → ErrActivityMessageContentTooLong
|
||
- [ ] 活动 status=pending → ErrActivityMessageActivityInactive
|
||
- [ ] 1 分钟内 > 5 条 → ErrActivityMessageTooFrequent
|
||
- [ ] 累计 > 100 条 → ErrActivityMessageLimitReached
|
||
- [ ] PurchaseItem:写库后 Publish `act:{id}:contributions`
|
||
- [ ] BatchPurchaseItem:每个 item 写库后 Publish `act:{id}:contributions`(或批量发布一次,节省 Redis 流量;首版每条一次便于前端叠加动画)
|
||
|
||
### 11.2 gateway ActivityHub
|
||
|
||
- [ ] 启动时 psubscribe 模式订阅 `act:*:messages` `act:*:contributions`
|
||
- [ ] 收到 `act:42:messages` 频道 → 只 fanout 到订阅了 (42, messages) 的连接
|
||
- [ ] 收到 `act:42:contributions` 频道 → 不影响 messages 订阅者
|
||
- [ ] 断开连接 → 清理 subscriptions 与 clients
|
||
- [ ] 鉴权失败 → 401
|
||
|
||
### 11.3 前端
|
||
|
||
- [ ] useContributionRealtime:WS connected 时不轮询
|
||
- [ ] WS disconnect → 切到轮询
|
||
- [ ] WS reconnect + subscribe → 切回 WS
|
||
- [ ] useMessageRealtime:loadHistory 失败时降级为空列表
|
||
- [ ] sendMessage 失败 → toast,列表不变化
|
||
|
||
---
|
||
|
||
## 12. 变更影响面(按 CLAUDE.md 自审)
|
||
|
||
- [x] `activities` 表结构不变
|
||
- [x] `activity_contributions` 表结构不变(仅新增 Publish 副作用)
|
||
- [x] `activityService` 现有 RPC 签名不变(仅 PurchaseItem 末尾增加 Publish)
|
||
- [x] `pkg/errors` 追加 7 个错误变量,不影响 ToGRPCCode 现有行为
|
||
- [x] `frontend/pages/support-activity/components/MessageBoard.vue` props 不变
|
||
- [x] `activity_messages_id_seq` 起始 10000,符合 CLAUDE.md 测试数据预留规范
|
||
- [x] WebSocket 路径 `/activity` 与 AI Chat `/ai-chat` 独立,无端口/路径冲突
|
||
- [x] 多 gateway 实例:所有实例均订阅同一组 Pub/Sub,每实例只 fanout 本地连接(Redis 已有的共享通道语义)
|
||
|
||
---
|
||
|
||
## 13. 后续迭代(已收口)
|
||
|
||
| 主题 | 决策 |
|
||
|------|------|
|
||
| 留言敏感词 | 首版本地词表,错误码 `ErrActivityMessageContentInvalid` 预留,后续接 dify |
|
||
| 留言置顶 | 不在本期范围 |
|
||
| 留言删除 | 不在本期范围;软删除字段已就位 |
|
||
| 跨活动聚合 | 不在本期范围;HTTP GET 路径 `/api/v1/me/activity-messages` 留作下期 |
|
||
| 中心页订阅 | 本期只在 `index.vue` 接入;`center.vue` 列表卡片暂不实时刷新 |
|