diff --git a/.gitignore b/.gitignore index f211e46..352c9d7 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ backend/services/taskService/taskService backend/services/userService/userService backend/services/aiChatService/aiChatService backend/services/statisticService/statisticService +backend/services/notificationService/notificationService backend/gateway/gateway backend/gateway/aiChatService bin/ diff --git a/backend/.env.example b/backend/.env.example index 2e579c4..496b951 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -89,3 +89,11 @@ DIFY_API_KEY= LASER_COMPOSITOR_URL=http://127.0.0.1:7002 # laser-compositor 监听端口(与 URL 端口一致) COMPOSITOR_PORT=7002 + +# ==================== Mobile Push (uniPush) ==================== +# 通知服务使用:通知创建后通过 uniCloud sendMessage 云函数触发手机通知栏 +# 以下变量由 notificationService 进程读取(见 deploy/envs/notification.env) +# 注意:生产 URL 走环境变量注入,不要提交到代码仓库 +PUSH_ENABLED=true +PUSH_URL= +PUSH_TIMEOUT_MS=4000 diff --git a/backend/deploy/envs/notification.env b/backend/deploy/envs/notification.env new file mode 100644 index 0000000..d91bd53 --- /dev/null +++ b/backend/deploy/envs/notification.env @@ -0,0 +1,25 @@ +# ==================== Notification Service 私有配置 ==================== +# 多机部署时将此文件放到 notification 服务器的 /etc/topfans/notification.env + +# 服务端口 +PORT=20010 + +# ==================== 数据库配置 ==================== +DB_HOST=localhost +DB_PORT=15432 +DB_USER=postgres +DB_PASSWORD=123456 +DB_NAME=top-fans + +# ==================== 手机推送 (uniPush) ==================== +# 是否开启手机通知栏推送;关闭时走 NoopPusher(不发起 HTTP,业务不受影响) +# 取值: true / false;默认 true +PUSH_ENABLED=true + +# uniCloud sendMessage 云函数 URL(REQUIRED when PUSH_ENABLED=true) +# - 留空 + PUSH_ENABLED=true:启动会 warn 一次,降级为 NoopPusher +# - 部署时按环境注入,不要把生产 URL 提交到代码仓库 +PUSH_URL=https://env-00jy6bcqqwy6.dev-hz.cloudbasefunction.cn/sendMessage + +# HTTP 调用超时(毫秒);推荐 3~5s,推送必须异步,超时太短会丢推送 +PUSH_TIMEOUT_MS=4000 diff --git a/backend/gateway/controller/notification_controller.go b/backend/gateway/controller/notification_controller.go index b999fe1..e4cefae 100644 --- a/backend/gateway/controller/notification_controller.go +++ b/backend/gateway/controller/notification_controller.go @@ -32,6 +32,126 @@ func NewNotificationController(dubboClient *client.Client) (*NotificationControl }, nil } +// ========== 设备注册(推送 cid 上报)========== +// App 端启动时调用,把 uni.getPushClientId() 拿到的 cid 上报后端; +// 后续 CreateNotification 触发推送时,后端按 user_id 查这里写入的 cid 列表。 +// +// 接口约定: +// - POST /api/v1/notifications/devices body: { cid, platform, appVersion, deviceModel } +// - POST /api/v1/notifications/devices/unregister body: { cid } // cid 为空 = 注销当前用户全部 +// +// 鉴权:依赖路由组 AuthMiddleware,从 JWT 提取 user_id 后写入 metadata。 +// device_model 在 iOS 上是 sysinfo.model,Android 上是 sysinfo.model。 + +// registerDeviceRequest HTTP DTO。 +type registerDeviceRequest struct { + CID string `json:"cid" binding:"required,min=1,max=128"` + Platform string `json:"platform" binding:"omitempty,oneof=ios android harmony"` + AppVersion string `json:"app_version" binding:"omitempty,max=32"` + DeviceModel string `json:"device_model" binding:"omitempty,max=64"` +} + +// unregisterDeviceRequest HTTP DTO。 +type unregisterDeviceRequest struct { + CID string `json:"cid" binding:"omitempty,max=128"` +} + +// RegisterDevice 注册/更新当前用户的推送设备。 +// @Summary 注册推送设备 +// @Description 将 uni.getPushClientId() 拿到的 cid 上报给后端;同 cid 重复注册为更新。 +// @Tags notifications +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param body body registerDeviceRequest true "设备信息" +// @Success 200 {object} response.Response +// @Router /api/v1/notifications/devices [post] +func (ctrl *NotificationController) RegisterDevice(g *gin.Context) { + userID, _ := g.Get("user_id") + starID, _ := g.Get("star_id") + + var req registerDeviceRequest + if err := g.ShouldBindJSON(&req); err != nil { + response.Error(g, http.StatusBadRequest, "参数错误: "+err.Error()) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ctx = context.WithValue(ctx, constant.AttachmentKey, map[string]interface{}{ + "user_id": strconv.FormatInt(userID.(int64), 10), + "star_id": strconv.FormatInt(starID.(int64), 10), + }) + + resp, err := ctrl.notifService.RegisterDevice(ctx, &pbNotif.RegisterDeviceRequest{ + Cid: req.CID, + Platform: req.Platform, + AppVersion: req.AppVersion, + DeviceModel: req.DeviceModel, + }) + if err != nil { + logger.Logger.Error("RegisterDevice RPC failed", + zap.Int64("user_id", userID.(int64)), + zap.Error(err)) + response.Error(g, http.StatusInternalServerError, "服务调用失败") + return + } + if resp.Base.Code != uint32(codes.OK) { + response.ErrorWithCode(g, int(resp.Base.Code), resp.Base.Message) + return + } + + response.Success(g, gin.H{ + "id": resp.Id, + "cid": req.CID, + }) +} + +// UnregisterDevice 注销当前用户指定 cid 的推送;cid 为空时注销所有设备。 +// @Summary 注销推送设备 +// @Description 注销推送 cid;cid 为空 = 注销当前用户全部设备(用于主动登出)。 +// @Tags notifications +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param body body unregisterDeviceRequest true "注销请求" +// @Success 200 {object} response.Response +// @Router /api/v1/notifications/devices/unregister [post] +func (ctrl *NotificationController) UnregisterDevice(g *gin.Context) { + userID, _ := g.Get("user_id") + starID, _ := g.Get("star_id") + + var req unregisterDeviceRequest + // 允许 body 为空,所以不用 ShouldBindJSON 强制要求;读不到也不报错。 + _ = g.ShouldBindJSON(&req) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ctx = context.WithValue(ctx, constant.AttachmentKey, map[string]interface{}{ + "user_id": strconv.FormatInt(userID.(int64), 10), + "star_id": strconv.FormatInt(starID.(int64), 10), + }) + + resp, err := ctrl.notifService.UnregisterDevice(ctx, &pbNotif.UnregisterDeviceRequest{ + Cid: req.CID, + }) + if err != nil { + logger.Logger.Error("UnregisterDevice RPC failed", + zap.Int64("user_id", userID.(int64)), + zap.Error(err)) + response.Error(g, http.StatusInternalServerError, "服务调用失败") + return + } + if resp.Base.Code != uint32(codes.OK) { + response.ErrorWithCode(g, int(resp.Base.Code), resp.Base.Message) + return + } + + response.Success(g, gin.H{"affected": resp.Affected}) +} + // parseInt 解析 query string 为 int, 失败或空返回默认值 diff --git a/backend/gateway/router/router.go b/backend/gateway/router/router.go index e5b49d3..67c66fb 100644 --- a/backend/gateway/router/router.go +++ b/backend/gateway/router/router.go @@ -248,6 +248,11 @@ func SetupRouter(userClient *client.Client, socialClient *client.Client, assetCl notifications.POST("/read-all", notificationCtrl.MarkAllAsRead) // 全部已读 notifications.DELETE("/:id", notificationCtrl.DeleteNotification) // 删除单条 notifications.DELETE("/targets/:target_id", notificationCtrl.DeleteByTarget) // 按 target 删除 + + // 推送设备注册:App 启动后上报 cid,后端写入 user_devices。 + // 注销接口放在静态子路径(/devices/unregister),避免与 :id 动态路由冲突。 + notifications.POST("/devices", notificationCtrl.RegisterDevice) // 注册/更新 cid + notifications.POST("/devices/unregister", notificationCtrl.UnregisterDevice) // 注销 cid(空 cid = 全部) } // 人像抠图(镭射卡 thinking 阶段;密钥仅服务端) diff --git a/backend/pkg/proto/notification/notification.pb.go b/backend/pkg/proto/notification/notification.pb.go index 219924c..f6f4dd5 100644 --- a/backend/pkg/proto/notification/notification.pb.go +++ b/backend/pkg/proto/notification/notification.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v7.34.0 +// protoc v4.25.1 // source: notification.proto package notification @@ -1141,6 +1141,225 @@ func (x *DeleteByTargetResponse) GetAffected() int32 { return 0 } +// RegisterDeviceRequest 注册/更新推送设备。 +// 行为:按 cid 主键 upsert。同 cid 已存在时更新归属(user/platform/version);不存在则插入。 +type RegisterDeviceRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Cid string `protobuf:"bytes,1,opt,name=cid,proto3" json:"cid,omitempty"` + Platform string `protobuf:"bytes,2,opt,name=platform,proto3" json:"platform,omitempty"` // ios / android / harmony + AppVersion string `protobuf:"bytes,3,opt,name=app_version,json=appVersion,proto3" json:"app_version,omitempty"` + DeviceModel string `protobuf:"bytes,4,opt,name=device_model,json=deviceModel,proto3" json:"device_model,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterDeviceRequest) Reset() { + *x = RegisterDeviceRequest{} + mi := &file_notification_proto_msgTypes[19] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterDeviceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterDeviceRequest) ProtoMessage() {} + +func (x *RegisterDeviceRequest) ProtoReflect() protoreflect.Message { + mi := &file_notification_proto_msgTypes[19] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterDeviceRequest.ProtoReflect.Descriptor instead. +func (*RegisterDeviceRequest) Descriptor() ([]byte, []int) { + return file_notification_proto_rawDescGZIP(), []int{19} +} + +func (x *RegisterDeviceRequest) GetCid() string { + if x != nil { + return x.Cid + } + return "" +} + +func (x *RegisterDeviceRequest) GetPlatform() string { + if x != nil { + return x.Platform + } + return "" +} + +func (x *RegisterDeviceRequest) GetAppVersion() string { + if x != nil { + return x.AppVersion + } + return "" +} + +func (x *RegisterDeviceRequest) GetDeviceModel() string { + if x != nil { + return x.DeviceModel + } + return "" +} + +type RegisterDeviceResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Base *common.BaseResponse `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Id int64 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` // user_devices.id(便于客户端排错/对账) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterDeviceResponse) Reset() { + *x = RegisterDeviceResponse{} + mi := &file_notification_proto_msgTypes[20] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterDeviceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterDeviceResponse) ProtoMessage() {} + +func (x *RegisterDeviceResponse) ProtoReflect() protoreflect.Message { + mi := &file_notification_proto_msgTypes[20] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterDeviceResponse.ProtoReflect.Descriptor instead. +func (*RegisterDeviceResponse) Descriptor() ([]byte, []int) { + return file_notification_proto_rawDescGZIP(), []int{20} +} + +func (x *RegisterDeviceResponse) GetBase() *common.BaseResponse { + if x != nil { + return x.Base + } + return nil +} + +func (x *RegisterDeviceResponse) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +// UnregisterDeviceRequest 注销指定 cid 的推送。cid 为空时注销当前用户的所有设备。 +type UnregisterDeviceRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Cid string `protobuf:"bytes,1,opt,name=cid,proto3" json:"cid,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UnregisterDeviceRequest) Reset() { + *x = UnregisterDeviceRequest{} + mi := &file_notification_proto_msgTypes[21] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UnregisterDeviceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnregisterDeviceRequest) ProtoMessage() {} + +func (x *UnregisterDeviceRequest) ProtoReflect() protoreflect.Message { + mi := &file_notification_proto_msgTypes[21] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnregisterDeviceRequest.ProtoReflect.Descriptor instead. +func (*UnregisterDeviceRequest) Descriptor() ([]byte, []int) { + return file_notification_proto_rawDescGZIP(), []int{21} +} + +func (x *UnregisterDeviceRequest) GetCid() string { + if x != nil { + return x.Cid + } + return "" +} + +type UnregisterDeviceResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Base *common.BaseResponse `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + Affected int32 `protobuf:"varint,2,opt,name=affected,proto3" json:"affected,omitempty"` // 受影响的行数 + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *UnregisterDeviceResponse) Reset() { + *x = UnregisterDeviceResponse{} + mi := &file_notification_proto_msgTypes[22] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *UnregisterDeviceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnregisterDeviceResponse) ProtoMessage() {} + +func (x *UnregisterDeviceResponse) ProtoReflect() protoreflect.Message { + mi := &file_notification_proto_msgTypes[22] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnregisterDeviceResponse.ProtoReflect.Descriptor instead. +func (*UnregisterDeviceResponse) Descriptor() ([]byte, []int) { + return file_notification_proto_rawDescGZIP(), []int{22} +} + +func (x *UnregisterDeviceResponse) GetBase() *common.BaseResponse { + if x != nil { + return x.Base + } + return nil +} + +func (x *UnregisterDeviceResponse) GetAffected() int32 { + if x != nil { + return x.Affected + } + return 0 +} + var File_notification_proto protoreflect.FileDescriptor const file_notification_proto_rawDesc = "" + @@ -1223,7 +1442,21 @@ const file_notification_proto_rawDesc = "" + "\ttarget_id\x18\x01 \x01(\x03R\btargetId\"f\n" + "\x16DeleteByTargetResponse\x120\n" + "\x04base\x18\x01 \x01(\v2\x1c.topfans.common.BaseResponseR\x04base\x12\x1a\n" + - "\baffected\x18\x02 \x01(\x05R\baffected2\xf2\t\n" + + "\baffected\x18\x02 \x01(\x05R\baffected\"\x89\x01\n" + + "\x15RegisterDeviceRequest\x12\x10\n" + + "\x03cid\x18\x01 \x01(\tR\x03cid\x12\x1a\n" + + "\bplatform\x18\x02 \x01(\tR\bplatform\x12\x1f\n" + + "\vapp_version\x18\x03 \x01(\tR\n" + + "appVersion\x12!\n" + + "\fdevice_model\x18\x04 \x01(\tR\vdeviceModel\"Z\n" + + "\x16RegisterDeviceResponse\x120\n" + + "\x04base\x18\x01 \x01(\v2\x1c.topfans.common.BaseResponseR\x04base\x12\x0e\n" + + "\x02id\x18\x02 \x01(\x03R\x02id\"+\n" + + "\x17UnregisterDeviceRequest\x12\x10\n" + + "\x03cid\x18\x01 \x01(\tR\x03cid\"h\n" + + "\x18UnregisterDeviceResponse\x120\n" + + "\x04base\x18\x01 \x01(\v2\x1c.topfans.common.BaseResponseR\x04base\x12\x1a\n" + + "\baffected\x18\x02 \x01(\x05R\baffected2\xb3\f\n" + "\x13NotificationService\x12\x9e\x01\n" + "\x12CreateNotification\x12/.topfans.notification.CreateNotificationRequest\x1a0.topfans.notification.CreateNotificationResponse\"%\x82\xd3\xe4\x93\x02\x1f:\x01*\"\x1a/internal/v1/notifications\x12\x90\x01\n" + "\x10GetNotifications\x12-.topfans.notification.GetNotificationsRequest\x1a..topfans.notification.GetNotificationsResponse\"\x1d\x82\xd3\xe4\x93\x02\x17\x12\x15/api/v1/notifications\x12\x97\x01\n" + @@ -1233,7 +1466,9 @@ const file_notification_proto_rawDesc = "" + "\x12MarkAsReadByTarget\x12/.topfans.notification.MarkAsReadByTargetRequest\x1a0.topfans.notification.MarkAsReadByTargetResponse\"6\x82\xd3\xe4\x93\x020\"./api/v1/notifications/targets/{target_id}/read\x12\x90\x01\n" + "\rMarkAllAsRead\x12*.topfans.notification.MarkAllAsReadRequest\x1a+.topfans.notification.MarkAllAsReadResponse\"&\x82\xd3\xe4\x93\x02 \"\x1e/api/v1/notifications/read-all\x12\x9b\x01\n" + "\x12DeleteNotification\x12/.topfans.notification.DeleteNotificationRequest\x1a0.topfans.notification.DeleteNotificationResponse\"\"\x82\xd3\xe4\x93\x02\x1c*\x1a/api/v1/notifications/{id}\x12\x9e\x01\n" + - "\x0eDeleteByTarget\x12+.topfans.notification.DeleteByTargetRequest\x1a,.topfans.notification.DeleteByTargetResponse\"1\x82\xd3\xe4\x93\x02+*)/api/v1/notifications/targets/{target_id}B@Z>github.com/topfans/backend/pkg/proto/notification;notificationb\x06proto3" + "\x0eDeleteByTarget\x12+.topfans.notification.DeleteByTargetRequest\x1a,.topfans.notification.DeleteByTargetResponse\"1\x82\xd3\xe4\x93\x02+*)/api/v1/notifications/targets/{target_id}\x12\x95\x01\n" + + "\x0eRegisterDevice\x12+.topfans.notification.RegisterDeviceRequest\x1a,.topfans.notification.RegisterDeviceResponse\"(\x82\xd3\xe4\x93\x02\":\x01*\"\x1d/api/v1/notifications/devices\x12\xa6\x01\n" + + "\x10UnregisterDevice\x12-.topfans.notification.UnregisterDeviceRequest\x1a..topfans.notification.UnregisterDeviceResponse\"3\x82\xd3\xe4\x93\x02-:\x01*\"(/api/v1/notifications/devices/unregisterB@Z>github.com/topfans/backend/pkg/proto/notification;notificationb\x06proto3" var ( file_notification_proto_rawDescOnce sync.Once @@ -1247,7 +1482,7 @@ func file_notification_proto_rawDescGZIP() []byte { return file_notification_proto_rawDescData } -var file_notification_proto_msgTypes = make([]protoimpl.MessageInfo, 19) +var file_notification_proto_msgTypes = make([]protoimpl.MessageInfo, 23) var file_notification_proto_goTypes = []any{ (*Notification)(nil), // 0: topfans.notification.Notification (*ActorPreview)(nil), // 1: topfans.notification.ActorPreview @@ -1268,44 +1503,54 @@ var file_notification_proto_goTypes = []any{ (*DeleteNotificationResponse)(nil), // 16: topfans.notification.DeleteNotificationResponse (*DeleteByTargetRequest)(nil), // 17: topfans.notification.DeleteByTargetRequest (*DeleteByTargetResponse)(nil), // 18: topfans.notification.DeleteByTargetResponse - (*structpb.Struct)(nil), // 19: google.protobuf.Struct - (*common.BaseResponse)(nil), // 20: topfans.common.BaseResponse + (*RegisterDeviceRequest)(nil), // 19: topfans.notification.RegisterDeviceRequest + (*RegisterDeviceResponse)(nil), // 20: topfans.notification.RegisterDeviceResponse + (*UnregisterDeviceRequest)(nil), // 21: topfans.notification.UnregisterDeviceRequest + (*UnregisterDeviceResponse)(nil), // 22: topfans.notification.UnregisterDeviceResponse + (*structpb.Struct)(nil), // 23: google.protobuf.Struct + (*common.BaseResponse)(nil), // 24: topfans.common.BaseResponse } var file_notification_proto_depIdxs = []int32{ - 19, // 0: topfans.notification.Notification.data:type_name -> google.protobuf.Struct + 23, // 0: topfans.notification.Notification.data:type_name -> google.protobuf.Struct 1, // 1: topfans.notification.Notification.actors:type_name -> topfans.notification.ActorPreview - 19, // 2: topfans.notification.CreateNotificationRequest.data:type_name -> google.protobuf.Struct - 20, // 3: topfans.notification.CreateNotificationResponse.base:type_name -> topfans.common.BaseResponse - 20, // 4: topfans.notification.GetNotificationsResponse.base:type_name -> topfans.common.BaseResponse + 23, // 2: topfans.notification.CreateNotificationRequest.data:type_name -> google.protobuf.Struct + 24, // 3: topfans.notification.CreateNotificationResponse.base:type_name -> topfans.common.BaseResponse + 24, // 4: topfans.notification.GetNotificationsResponse.base:type_name -> topfans.common.BaseResponse 0, // 5: topfans.notification.GetNotificationsResponse.items:type_name -> topfans.notification.Notification - 20, // 6: topfans.notification.GetUnreadCountResponse.base:type_name -> topfans.common.BaseResponse + 24, // 6: topfans.notification.GetUnreadCountResponse.base:type_name -> topfans.common.BaseResponse 7, // 7: topfans.notification.GetUnreadCountResponse.counts:type_name -> topfans.notification.UnreadCount - 20, // 8: topfans.notification.MarkAsReadResponse.base:type_name -> topfans.common.BaseResponse - 20, // 9: topfans.notification.MarkAsReadByTargetResponse.base:type_name -> topfans.common.BaseResponse - 20, // 10: topfans.notification.MarkAllAsReadResponse.base:type_name -> topfans.common.BaseResponse - 20, // 11: topfans.notification.DeleteNotificationResponse.base:type_name -> topfans.common.BaseResponse - 20, // 12: topfans.notification.DeleteByTargetResponse.base:type_name -> topfans.common.BaseResponse - 2, // 13: topfans.notification.NotificationService.CreateNotification:input_type -> topfans.notification.CreateNotificationRequest - 4, // 14: topfans.notification.NotificationService.GetNotifications:input_type -> topfans.notification.GetNotificationsRequest - 6, // 15: topfans.notification.NotificationService.GetUnreadCount:input_type -> topfans.notification.GetUnreadCountRequest - 9, // 16: topfans.notification.NotificationService.MarkAsRead:input_type -> topfans.notification.MarkAsReadRequest - 11, // 17: topfans.notification.NotificationService.MarkAsReadByTarget:input_type -> topfans.notification.MarkAsReadByTargetRequest - 13, // 18: topfans.notification.NotificationService.MarkAllAsRead:input_type -> topfans.notification.MarkAllAsReadRequest - 15, // 19: topfans.notification.NotificationService.DeleteNotification:input_type -> topfans.notification.DeleteNotificationRequest - 17, // 20: topfans.notification.NotificationService.DeleteByTarget:input_type -> topfans.notification.DeleteByTargetRequest - 3, // 21: topfans.notification.NotificationService.CreateNotification:output_type -> topfans.notification.CreateNotificationResponse - 5, // 22: topfans.notification.NotificationService.GetNotifications:output_type -> topfans.notification.GetNotificationsResponse - 8, // 23: topfans.notification.NotificationService.GetUnreadCount:output_type -> topfans.notification.GetUnreadCountResponse - 10, // 24: topfans.notification.NotificationService.MarkAsRead:output_type -> topfans.notification.MarkAsReadResponse - 12, // 25: topfans.notification.NotificationService.MarkAsReadByTarget:output_type -> topfans.notification.MarkAsReadByTargetResponse - 14, // 26: topfans.notification.NotificationService.MarkAllAsRead:output_type -> topfans.notification.MarkAllAsReadResponse - 16, // 27: topfans.notification.NotificationService.DeleteNotification:output_type -> topfans.notification.DeleteNotificationResponse - 18, // 28: topfans.notification.NotificationService.DeleteByTarget:output_type -> topfans.notification.DeleteByTargetResponse - 21, // [21:29] is the sub-list for method output_type - 13, // [13:21] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 24, // 8: topfans.notification.MarkAsReadResponse.base:type_name -> topfans.common.BaseResponse + 24, // 9: topfans.notification.MarkAsReadByTargetResponse.base:type_name -> topfans.common.BaseResponse + 24, // 10: topfans.notification.MarkAllAsReadResponse.base:type_name -> topfans.common.BaseResponse + 24, // 11: topfans.notification.DeleteNotificationResponse.base:type_name -> topfans.common.BaseResponse + 24, // 12: topfans.notification.DeleteByTargetResponse.base:type_name -> topfans.common.BaseResponse + 24, // 13: topfans.notification.RegisterDeviceResponse.base:type_name -> topfans.common.BaseResponse + 24, // 14: topfans.notification.UnregisterDeviceResponse.base:type_name -> topfans.common.BaseResponse + 2, // 15: topfans.notification.NotificationService.CreateNotification:input_type -> topfans.notification.CreateNotificationRequest + 4, // 16: topfans.notification.NotificationService.GetNotifications:input_type -> topfans.notification.GetNotificationsRequest + 6, // 17: topfans.notification.NotificationService.GetUnreadCount:input_type -> topfans.notification.GetUnreadCountRequest + 9, // 18: topfans.notification.NotificationService.MarkAsRead:input_type -> topfans.notification.MarkAsReadRequest + 11, // 19: topfans.notification.NotificationService.MarkAsReadByTarget:input_type -> topfans.notification.MarkAsReadByTargetRequest + 13, // 20: topfans.notification.NotificationService.MarkAllAsRead:input_type -> topfans.notification.MarkAllAsReadRequest + 15, // 21: topfans.notification.NotificationService.DeleteNotification:input_type -> topfans.notification.DeleteNotificationRequest + 17, // 22: topfans.notification.NotificationService.DeleteByTarget:input_type -> topfans.notification.DeleteByTargetRequest + 19, // 23: topfans.notification.NotificationService.RegisterDevice:input_type -> topfans.notification.RegisterDeviceRequest + 21, // 24: topfans.notification.NotificationService.UnregisterDevice:input_type -> topfans.notification.UnregisterDeviceRequest + 3, // 25: topfans.notification.NotificationService.CreateNotification:output_type -> topfans.notification.CreateNotificationResponse + 5, // 26: topfans.notification.NotificationService.GetNotifications:output_type -> topfans.notification.GetNotificationsResponse + 8, // 27: topfans.notification.NotificationService.GetUnreadCount:output_type -> topfans.notification.GetUnreadCountResponse + 10, // 28: topfans.notification.NotificationService.MarkAsRead:output_type -> topfans.notification.MarkAsReadResponse + 12, // 29: topfans.notification.NotificationService.MarkAsReadByTarget:output_type -> topfans.notification.MarkAsReadByTargetResponse + 14, // 30: topfans.notification.NotificationService.MarkAllAsRead:output_type -> topfans.notification.MarkAllAsReadResponse + 16, // 31: topfans.notification.NotificationService.DeleteNotification:output_type -> topfans.notification.DeleteNotificationResponse + 18, // 32: topfans.notification.NotificationService.DeleteByTarget:output_type -> topfans.notification.DeleteByTargetResponse + 20, // 33: topfans.notification.NotificationService.RegisterDevice:output_type -> topfans.notification.RegisterDeviceResponse + 22, // 34: topfans.notification.NotificationService.UnregisterDevice:output_type -> topfans.notification.UnregisterDeviceResponse + 25, // [25:35] is the sub-list for method output_type + 15, // [15:25] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_notification_proto_init() } @@ -1319,7 +1564,7 @@ func file_notification_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_notification_proto_rawDesc), len(file_notification_proto_rawDesc)), NumEnums: 0, - NumMessages: 19, + NumMessages: 23, NumExtensions: 0, NumServices: 1, }, diff --git a/backend/pkg/proto/notification/notification.triple.go b/backend/pkg/proto/notification/notification.triple.go index c4d9063..a0d5dca 100644 --- a/backend/pkg/proto/notification/notification.triple.go +++ b/backend/pkg/proto/notification/notification.triple.go @@ -52,6 +52,10 @@ const ( NotificationServiceDeleteNotificationProcedure = "/topfans.notification.NotificationService/DeleteNotification" // NotificationServiceDeleteByTargetProcedure is the fully-qualified name of the NotificationService's DeleteByTarget RPC. NotificationServiceDeleteByTargetProcedure = "/topfans.notification.NotificationService/DeleteByTarget" + // NotificationServiceRegisterDeviceProcedure is the fully-qualified name of the NotificationService's RegisterDevice RPC. + NotificationServiceRegisterDeviceProcedure = "/topfans.notification.NotificationService/RegisterDevice" + // NotificationServiceUnregisterDeviceProcedure is the fully-qualified name of the NotificationService's UnregisterDevice RPC. + NotificationServiceUnregisterDeviceProcedure = "/topfans.notification.NotificationService/UnregisterDevice" ) var ( @@ -68,6 +72,8 @@ type NotificationService interface { MarkAllAsRead(ctx context.Context, req *MarkAllAsReadRequest, opts ...client.CallOption) (*MarkAllAsReadResponse, error) DeleteNotification(ctx context.Context, req *DeleteNotificationRequest, opts ...client.CallOption) (*DeleteNotificationResponse, error) DeleteByTarget(ctx context.Context, req *DeleteByTargetRequest, opts ...client.CallOption) (*DeleteByTargetResponse, error) + RegisterDevice(ctx context.Context, req *RegisterDeviceRequest, opts ...client.CallOption) (*RegisterDeviceResponse, error) + UnregisterDevice(ctx context.Context, req *UnregisterDeviceRequest, opts ...client.CallOption) (*UnregisterDeviceResponse, error) } // NewNotificationService constructs a client for the notification.NotificationService service. @@ -154,9 +160,25 @@ func (c *NotificationServiceImpl) DeleteByTarget(ctx context.Context, req *Delet return resp, nil } +func (c *NotificationServiceImpl) RegisterDevice(ctx context.Context, req *RegisterDeviceRequest, opts ...client.CallOption) (*RegisterDeviceResponse, error) { + resp := new(RegisterDeviceResponse) + if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "RegisterDevice", opts...); err != nil { + return nil, err + } + return resp, nil +} + +func (c *NotificationServiceImpl) UnregisterDevice(ctx context.Context, req *UnregisterDeviceRequest, opts ...client.CallOption) (*UnregisterDeviceResponse, error) { + resp := new(UnregisterDeviceResponse) + if err := c.conn.CallUnary(ctx, []interface{}{req}, resp, "UnregisterDevice", opts...); err != nil { + return nil, err + } + return resp, nil +} + var NotificationService_ClientInfo = client.ClientInfo{ InterfaceName: "topfans.notification.NotificationService", - MethodNames: []string{"CreateNotification", "GetNotifications", "GetUnreadCount", "MarkAsRead", "MarkAsReadByTarget", "MarkAllAsRead", "DeleteNotification", "DeleteByTarget"}, + MethodNames: []string{"CreateNotification", "GetNotifications", "GetUnreadCount", "MarkAsRead", "MarkAsReadByTarget", "MarkAllAsRead", "DeleteNotification", "DeleteByTarget", "RegisterDevice", "UnregisterDevice"}, ConnectionInjectFunc: func(dubboCliRaw interface{}, conn *client.Connection) { dubboCli := dubboCliRaw.(*NotificationServiceImpl) dubboCli.conn = conn @@ -173,6 +195,8 @@ type NotificationServiceHandler interface { MarkAllAsRead(context.Context, *MarkAllAsReadRequest) (*MarkAllAsReadResponse, error) DeleteNotification(context.Context, *DeleteNotificationRequest) (*DeleteNotificationResponse, error) DeleteByTarget(context.Context, *DeleteByTargetRequest) (*DeleteByTargetResponse, error) + RegisterDevice(context.Context, *RegisterDeviceRequest) (*RegisterDeviceResponse, error) + UnregisterDevice(context.Context, *UnregisterDeviceRequest) (*UnregisterDeviceResponse, error) } func RegisterNotificationServiceHandler(srv *server.Server, hdlr NotificationServiceHandler, opts ...server.ServiceOption) error { @@ -307,5 +331,35 @@ var NotificationService_ServiceInfo = server.ServiceInfo{ return triple_protocol.NewResponse(res), nil }, }, + { + Name: "RegisterDevice", + Type: constant.CallUnary, + ReqInitFunc: func() interface{} { + return new(RegisterDeviceRequest) + }, + MethodFunc: func(ctx context.Context, args []interface{}, handler interface{}) (interface{}, error) { + req := args[0].(*RegisterDeviceRequest) + res, err := handler.(NotificationServiceHandler).RegisterDevice(ctx, req) + if err != nil { + return nil, err + } + return triple_protocol.NewResponse(res), nil + }, + }, + { + Name: "UnregisterDevice", + Type: constant.CallUnary, + ReqInitFunc: func() interface{} { + return new(UnregisterDeviceRequest) + }, + MethodFunc: func(ctx context.Context, args []interface{}, handler interface{}) (interface{}, error) { + req := args[0].(*UnregisterDeviceRequest) + res, err := handler.(NotificationServiceHandler).UnregisterDevice(ctx, req) + if err != nil { + return nil, err + } + return triple_protocol.NewResponse(res), nil + }, + }, }, } diff --git a/backend/pkg/push/uni_push_client.go b/backend/pkg/push/uni_push_client.go new file mode 100644 index 0000000..878dea5 --- /dev/null +++ b/backend/pkg/push/uni_push_client.go @@ -0,0 +1,130 @@ +// Package push 提供统一推送客户端,用于调用 uniCloud sendMessage 云函数触发手机通知栏消息。 +// +// 当前实现:阿里云 BSPAPP(uniCloud)云函数 sendMessage(uniPush)。 +// 后续可扩展为多个 Pusher(clientID 分配 + 灰度),保持 Send(ctx, payload) 接口不变。 +package push + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "time" + + "go.uber.org/zap" +) + +// Pusher 推送客户端统一接口(便于后续扩展多通道)。 +type Pusher interface { + Send(ctx context.Context, p Payload) error +} + +// Payload 推送给云函数 sendMessage 的载荷。 +// +// 字段名严格对齐云函数约定(cids / title / content / request_id / data); +// data 用于透传到 App 端,App 在 onPushMessage 中读取 data.payload 决定跳转。 +type Payload struct { + CIDs []string `json:"cids"` + Title string `json:"title"` + Content string `json:"content"` + RequestID string `json:"request_id"` + Data map[string]interface{} `json:"data"` +} + +// UniPushClient 调用 uniCloud sendMessage 云函数做推送。 +// +// 设计要点: +// - 一次发送 = 一次 HTTP POST;HTTP 客户端短超时(默认 4s)避免 goroutine 泄漏。 +// - 不重试:推送失败只 warn 日志;客户端下次启动会重新注册 cid,可视为自愈。 +// - 不做并发控制:调用方应在 goroutine 内 fire-and-forget;Send 本身不阻塞业务。 +type UniPushClient struct { + url string + http *http.Client + logger *zap.Logger +} + +// NewUniPushClient 创建推送客户端。 +// +// - url:uniCloud 云函数 URL(如 https://fc-mp-xxx.next.bspapp.com/sendMessage)。 +// - timeout:HTTP 调用超时,推送调用必须异步,推荐 3~5s。 +// - logger:可选,nil 时使用 zap.NewNop()。 +func NewUniPushClient(url string, timeout time.Duration, logger *zap.Logger) *UniPushClient { + if timeout <= 0 { + timeout = 4 * time.Second + } + if logger == nil { + logger = zap.NewNop() + } + return &UniPushClient{ + url: url, + http: &http.Client{Timeout: timeout}, + logger: logger, + } +} + +// Send 同步发送推送(返回云函数调用结果)。调用方应自行在 goroutine 中调用以避免阻塞。 +// +// 行为: +// - cids 为空时 debug log 后直接返回 nil(视为跳过)。 +// - request_id 为空时自动生成(timestamp + 随机 hex),便于云函数侧排重。 +// - 非 2xx 响应返回 error 并附状态码与 body 片段。 +func (c *UniPushClient) Send(ctx context.Context, p Payload) error { + if c == nil || c.url == "" { + return fmt.Errorf("uniPush client not initialized") + } + if len(p.CIDs) == 0 { + c.logger.Debug("uniPush skip: empty cids", + zap.String("request_id", p.RequestID)) + return nil + } + if p.RequestID == "" { + p.RequestID = genRequestID() + } + + body, err := json.Marshal(p) + if err != nil { + return fmt.Errorf("marshal payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.url, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.http.Do(req) + if err != nil { + return fmt.Errorf("http do: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + buf := make([]byte, 512) + n, _ := resp.Body.Read(buf) + return fmt.Errorf("uniPush status=%d body=%s", resp.StatusCode, string(buf[:n])) + } + + c.logger.Info("uniPush sent", + zap.String("request_id", p.RequestID), + zap.Int("cid_count", len(p.CIDs)), + zap.String("title", p.Title), + ) + return nil +} + +// genRequestID 生成请求幂等 id(时间戳 + 随机 hex)。 +func genRequestID() string { + now := time.Now().UnixMilli() + b := make([]byte, 4) + _, _ = rand.Read(b) + return fmt.Sprintf("%d-%s", now, hex.EncodeToString(b)) +} + +// NoopPusher 空实现,用于 push 关闭或测试桩。 +type NoopPusher struct{} + +// Send 直接返回 nil,不打日志。 +func (NoopPusher) Send(_ context.Context, _ Payload) error { return nil } \ No newline at end of file diff --git a/backend/pkg/push/uni_push_client_test.go b/backend/pkg/push/uni_push_client_test.go new file mode 100644 index 0000000..9889137 --- /dev/null +++ b/backend/pkg/push/uni_push_client_test.go @@ -0,0 +1,148 @@ +package push + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// fakeServer 接收推送请求并保存;返回 status 由调用方控制。 +type fakeServer struct { + mu sync.Mutex + requests []Payload + status int +} + +func (f *fakeServer) handler() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + _ = r.Body.Close() + var p Payload + _ = json.Unmarshal(body, &p) + f.mu.Lock() + f.requests = append(f.requests, p) + status := f.status + f.mu.Unlock() + w.WriteHeader(status) + _, _ = w.Write([]byte(`{"errcode":0}`)) + } +} + +func (f *fakeServer) last() Payload { + f.mu.Lock() + defer f.mu.Unlock() + if len(f.requests) == 0 { + return Payload{} + } + return f.requests[len(f.requests)-1] +} + +func (f *fakeServer) count() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.requests) +} + +// TestUniPushClient_SendHappy 验证:成功路径下请求体含正确字段。 +func TestUniPushClient_SendHappy(t *testing.T) { + srv := &fakeServer{status: http.StatusOK} + ts := httptest.NewServer(srv.handler()) + defer ts.Close() + + c := NewUniPushClient(ts.URL, 2*time.Second, nil) + err := c.Send(context.Background(), Payload{ + CIDs: []string{"cid-a", "cid-b"}, + Title: "你好", + Content: "有新消息", + Data: map[string]interface{}{"notification_id": int64(42), "type": "like"}, + }) + require.NoError(t, err) + assert.Equal(t, 1, srv.count()) + + got := srv.last() + assert.ElementsMatch(t, []string{"cid-a", "cid-b"}, got.CIDs) + assert.Equal(t, "你好", got.Title) + assert.Equal(t, "有新消息", got.Content) + assert.NotEmpty(t, got.RequestID, "request_id 应自动生成") + assert.EqualValues(t, 42, got.Data["notification_id"]) + assert.Equal(t, "like", got.Data["type"]) +} + +// TestUniPushClient_SendNonSuccess 验证:非 2xx 返回 error。 +func TestUniPushClient_SendNonSuccess(t *testing.T) { + srv := &fakeServer{status: http.StatusBadRequest} + ts := httptest.NewServer(srv.handler()) + defer ts.Close() + + c := NewUniPushClient(ts.URL, 2*time.Second, nil) + err := c.Send(context.Background(), Payload{ + CIDs: []string{"cid-a"}, Title: "x", Content: "y", + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "status=400") +} + +// TestUniPushClient_EmptyCIDs 验证:cids 为空直接跳过(不发送 HTTP 请求)。 +func TestUniPushClient_EmptyCIDs(t *testing.T) { + srv := &fakeServer{status: http.StatusOK} + ts := httptest.NewServer(srv.handler()) + defer ts.Close() + + c := NewUniPushClient(ts.URL, 2*time.Second, nil) + err := c.Send(context.Background(), Payload{Title: "x", Content: "y"}) + require.NoError(t, err) + assert.Equal(t, 0, srv.count(), "cids 为空时不应发请求") +} + +// TestUniPushClient_EmptyURL 验证:URL 为空时返回 error。 +func TestUniPushClient_EmptyURL(t *testing.T) { + c := NewUniPushClient("", 2*time.Second, nil) + err := c.Send(context.Background(), Payload{CIDs: []string{"cid-a"}, Title: "x", Content: "y"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "not initialized") +} + +// TestUniPushClient_Timeout 验证:超时场景下返回 error。 +func TestUniPushClient_Timeout(t *testing.T) { + slow := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(300 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer slow.Close() + + c := NewUniPushClient(slow.URL, 50*time.Millisecond, nil) + err := c.Send(context.Background(), Payload{CIDs: []string{"cid-a"}, Title: "x", Content: "y"}) + require.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "timeout") || + strings.Contains(err.Error(), "context deadline") || + strings.Contains(err.Error(), "Client.Timeout"), + "expected timeout error, got: %v", err) +} + +// TestUniPushClient_RequestIDGenerated 验证:未传 request_id 时自动生成。 +func TestUniPushClient_RequestIDGenerated(t *testing.T) { + srv := &fakeServer{status: http.StatusOK} + ts := httptest.NewServer(srv.handler()) + defer ts.Close() + + c := NewUniPushClient(ts.URL, 2*time.Second, nil) + _ = c.Send(context.Background(), Payload{CIDs: []string{"x"}, Title: "t", Content: "c"}) + got := srv.last() + assert.True(t, strings.HasPrefix(got.RequestID, ""), "request_id 应存在") + assert.Greater(t, len(got.RequestID), 5) +} + +// TestNoopPusher 验证:NoopPusher.Send 不报错也不发请求。 +func TestNoopPusher(t *testing.T) { + var p Pusher = NoopPusher{} + assert.NoError(t, p.Send(context.Background(), Payload{CIDs: []string{"x"}})) +} \ No newline at end of file diff --git a/backend/proto/notification.proto b/backend/proto/notification.proto index 82ee831..51467ef 100644 --- a/backend/proto/notification.proto +++ b/backend/proto/notification.proto @@ -35,6 +35,24 @@ service NotificationService { rpc DeleteByTarget(DeleteByTargetRequest) returns (DeleteByTargetResponse) { option (google.api.http) = { delete: "/api/v1/notifications/targets/{target_id}" }; } + + // ========== 设备注册(推送 cid 上报) ========== + + // 注册/更新当前用户的推送设备 cid;同 cid 重复注册为更新,user 变化时也更新归属。 + rpc RegisterDevice(RegisterDeviceRequest) returns (RegisterDeviceResponse) { + option (google.api.http) = { + post: "/api/v1/notifications/devices" + body: "*" + }; + } + + // 注销指定 cid 的推送(登出 / 客户端主动关闭通知)。 + rpc UnregisterDevice(UnregisterDeviceRequest) returns (UnregisterDeviceResponse) { + option (google.api.http) = { + post: "/api/v1/notifications/devices/unregister" + body: "*" + }; + } } message Notification { @@ -106,3 +124,27 @@ message DeleteByTargetRequest { int64 target_id = 1; } message DeleteByTargetResponse { topfans.common.BaseResponse base = 1; int32 affected = 2; } + +// ========== 设备注册消息 ========== + +// RegisterDeviceRequest 注册/更新推送设备。 +// 行为:按 cid 主键 upsert。同 cid 已存在时更新归属(user/platform/version);不存在则插入。 +message RegisterDeviceRequest { + string cid = 1; + string platform = 2; // ios / android / harmony + string app_version = 3; + string device_model = 4; +} +message RegisterDeviceResponse { + topfans.common.BaseResponse base = 1; + int64 id = 2; // user_devices.id(便于客户端排错/对账) +} + +// UnregisterDeviceRequest 注销指定 cid 的推送。cid 为空时注销当前用户的所有设备。 +message UnregisterDeviceRequest { + string cid = 1; +} +message UnregisterDeviceResponse { + topfans.common.BaseResponse base = 1; + int32 affected = 2; // 受影响的行数 +} diff --git a/backend/scripts/migrate_create_user_devices.sql b/backend/scripts/migrate_create_user_devices.sql new file mode 100644 index 0000000..f0f41ed --- /dev/null +++ b/backend/scripts/migrate_create_user_devices.sql @@ -0,0 +1,41 @@ +-- 设备推送标识(cid)注册表 +-- 用于:通知服务按 user_id 查找当前用户的设备 client id,推送时调用 uniCloud sendMessage。 +-- +-- 设计要点: +-- - cid 由 App 端调用 uni.getPushClientId 获取后上报,此处持久化以便后续推送查找。 +-- - cid 全局唯一(同一设备多次注册会被覆盖为同一行);user_id+cid 联合索引方便按用户反查。 +-- - active=false 表示该设备已下线(用户主动登出、token 失效等),推送时按 active=TRUE 过滤。 +-- - 序列起始 10000:为测试数据/手动 INSERT 预留。 + +BEGIN; + +CREATE TABLE IF NOT EXISTS public.user_devices ( + id BIGINT NOT NULL PRIMARY KEY, + user_id BIGINT NOT NULL, + cid VARCHAR(128) NOT NULL, + platform VARCHAR(20) NOT NULL DEFAULT '', + app_version VARCHAR(32) NOT NULL DEFAULT '', + device_model VARCHAR(64) NOT NULL DEFAULT '', + active BOOLEAN NOT NULL DEFAULT TRUE, + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL +); + +-- cid 全局唯一 +CREATE UNIQUE INDEX IF NOT EXISTS uq_user_devices_cid + ON public.user_devices(cid); + +-- 按用户查询活跃设备 +CREATE INDEX IF NOT EXISTS idx_user_devices_user_active + ON public.user_devices(user_id) WHERE active = TRUE; + +-- 序列:预留给测试/手动 insert +CREATE SEQUENCE IF NOT EXISTS user_devices_id_seq START WITH 10000; + +-- 同步序列(防止已有数据场景下序列落后) +SELECT setval( + 'user_devices_id_seq', + GREATEST(10000, (SELECT COALESCE(MAX(id), 0) FROM public.user_devices)) +); + +COMMIT; \ No newline at end of file diff --git a/backend/services/notificationService/configs/config.yaml b/backend/services/notificationService/configs/config.yaml index 07c1025..86eba0c 100644 --- a/backend/services/notificationService/configs/config.yaml +++ b/backend/services/notificationService/configs/config.yaml @@ -26,6 +26,14 @@ logging: output_paths: - stdout +# 手机推送(uniPush)配置 +# 留 push.url 为空字符串表示未配置;启动时 PUSH_ENABLED=true 但 url 为空会降级为 NoopPusher 并 warn。 +# 推荐通过 env 注入(见 deploy/envs/notification.env),避免 vendor URL 入仓。 +push: + enabled: true + url: "" # uniCloud sendMessage URL,通过 PUSH_URL env 覆盖 + timeout_ms: 4000 + dubbo: # 应用配置 application: diff --git a/backend/services/notificationService/main.go b/backend/services/notificationService/main.go index 3d51311..0ff3f77 100644 --- a/backend/services/notificationService/main.go +++ b/backend/services/notificationService/main.go @@ -13,7 +13,9 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" + "time" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/server" @@ -24,19 +26,23 @@ import ( "github.com/topfans/backend/pkg/health" "github.com/topfans/backend/pkg/logger" notifPb "github.com/topfans/backend/pkg/proto/notification" + "github.com/topfans/backend/pkg/push" "github.com/topfans/backend/services/notificationService/model" "github.com/topfans/backend/services/notificationService/provider" "github.com/topfans/backend/services/notificationService/service" ) var ( - port = flag.Int("port", getEnvInt("PORT", 20010), "Dubbo service port") - dbHost = flag.String("db-host", getEnv("DB_HOST", "localhost"), "Database host") - dbPort = flag.Int("db-port", getEnvInt("DB_PORT", 5432), "Database port") - dbUser = flag.String("db-user", getEnv("DB_USER", "postgres"), "Database user") - dbPassword = flag.String("db-password", getEnv("DB_PASSWORD", ""), "Database password") - dbName = flag.String("db-name", getEnv("DB_NAME", "top-fans"), "Database name") - healthHndl *health.Handler + port = flag.Int("port", getEnvInt("PORT", 20010), "Dubbo service port") + dbHost = flag.String("db-host", getEnv("DB_HOST", "localhost"), "Database host") + dbPort = flag.Int("db-port", getEnvInt("DB_PORT", 5432), "Database port") + dbUser = flag.String("db-user", getEnv("DB_USER", "postgres"), "Database user") + dbPassword = flag.String("db-password", getEnv("DB_PASSWORD", ""), "Database password") + dbName = flag.String("db-name", getEnv("DB_NAME", "top-fans"), "Database name") + pushEnabled = flag.Bool("push-enabled", getEnvBool("PUSH_ENABLED", true), "Enable mobile push (uniPush)") + pushURL = flag.String("push-url", getEnv("PUSH_URL", ""), "uniCloud sendMessage URL (REQUIRED when push-enabled=true; deploy via env, do NOT commit vendor URLs)") + pushTimeoutMs = flag.Int("push-timeout-ms", getEnvInt("PUSH_TIMEOUT_MS", 4000), "HTTP timeout for uniPush (ms)") + healthHndl *health.Handler ) func getEnv(key, fallback string) string { @@ -55,6 +61,21 @@ func getEnvInt(key string, fallback int) int { return fallback } +// getEnvBool 把 "1"/"true"/"TRUE"/"yes"/"y" 视为 true;其它视为 false。空时回退到 fallback。 +func getEnvBool(key string, fallback bool) bool { + v := strings.ToLower(strings.TrimSpace(os.Getenv(key))) + if v == "" { + return fallback + } + switch v { + case "1", "true", "yes", "y", "on": + return true + case "0", "false", "no", "n", "off": + return false + } + return fallback +} + func main() { flag.Parse() @@ -120,6 +141,7 @@ func autoMigrate() error { tables := []interface{}{ &model.Notification{}, &model.NotificationStats{}, + &model.UserDevice{}, } for _, table := range tables { @@ -166,11 +188,39 @@ func initDubboService() error { return fmt.Errorf("database is not initialized") } - // 业务层(service 只依赖 DB;repository 内部自行 New) - notifService := service.NewNotificationService(db) + // 业务层:UserDeviceService 给 NotificationService 提供 cids 拉取能力。 + deviceService := service.NewUserDeviceService(db) + + // 推送客户端(三态:disabled / enabled-with-URL / enabled-without-URL 降级为 noop) + // 设计要点: + // - push-enabled=true 但 url 为空 → warn 后降级为 NoopPusher,不直接 Fatal。 + // 这样部署期忘了配 PUSH_URL 时业务仍能跑(只是不会真的推),比启动失败更安全。 + // - 真要严格启动校验,把降级改成 fmt.Errorf 返回即可。 + var pusher push.Pusher + switch { + case !*pushEnabled: + pusher = push.NoopPusher{} + logger.Sugar.Info("mobile push disabled (PUSH_ENABLED=false)") + case *pushEnabled && *pushURL == "": + pusher = push.NoopPusher{} + logger.Sugar.Warn("mobile push enabled but PUSH_URL is empty; falling back to NoopPusher — set PUSH_URL env to actually deliver") + default: + pusher = push.NewUniPushClient( + *pushURL, + time.Duration(*pushTimeoutMs)*time.Millisecond, + logger.Logger, + ) + logger.Sugar.Info("mobile push enabled", + "url", *pushURL, + "timeout_ms", *pushTimeoutMs, + ) + } + + // 通知服务(注入了 deviceService + pusher;CreateNotification 成功后异步触发推送) + notifService := service.NewNotificationService(db, deviceService, pusher) // RPC Provider - notifProvider := provider.NewNotificationProvider(notifService) + notifProvider := provider.NewNotificationProvider(notifService, deviceService) // Dubbo Server(Triple 协议) srv, err := server.NewServer( diff --git a/backend/services/notificationService/model/user_device.go b/backend/services/notificationService/model/user_device.go new file mode 100644 index 0000000..10f0795 --- /dev/null +++ b/backend/services/notificationService/model/user_device.go @@ -0,0 +1,22 @@ +package model + +// UserDevice 用户推送设备标识。 +// +// 字段约定: +// - CID:由 App 端调用 uni.getPushClientId() 获取的推送 token,用于 uniCloud sendMessage。 +// - Platform:ios / android / harmony,便于后续按平台分组推送或排查问题。 +// - Active:false 表示设备下线(登出 / token 失效),推送时跳过。 +type UserDevice struct { + ID int64 `json:"id" gorm:"primaryKey;column:id"` + UserID int64 `json:"user_id" gorm:"column:user_id;not null;index"` + CID string `json:"cid" gorm:"column:cid;not null;size:128;uniqueIndex"` + Platform string `json:"platform" gorm:"column:platform;not null;size:20;default:''"` + AppVersion string `json:"app_version" gorm:"column:app_version;not null;size:32;default:''"` + DeviceModel string `json:"device_model" gorm:"column:device_model;not null;size:64;default:''"` + Active bool `json:"active" gorm:"column:active;not null;default:true"` + CreatedAt int64 `json:"created_at" gorm:"column:created_at;not null"` + UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;not null"` +} + +// TableName 表名 +func (UserDevice) TableName() string { return "public.user_devices" } \ No newline at end of file diff --git a/backend/services/notificationService/provider/notification_provider.go b/backend/services/notificationService/provider/notification_provider.go index 6495d55..6467a70 100644 --- a/backend/services/notificationService/provider/notification_provider.go +++ b/backend/services/notificationService/provider/notification_provider.go @@ -21,15 +21,16 @@ import ( // NotificationProvider notification 服务的 RPC Provider。 type NotificationProvider struct { - svc *service.NotificationService + notifSvc *service.NotificationService + deviceSvc *service.UserDeviceService } // 编译期断言:NotificationProvider 实现了 notifPb.NotificationServiceHandler 接口(triple 生成)。 var _ notifPb.NotificationServiceHandler = (*NotificationProvider)(nil) // NewNotificationProvider 创建 NotificationProvider。 -func NewNotificationProvider(svc *service.NotificationService) *NotificationProvider { - return &NotificationProvider{svc: svc} +func NewNotificationProvider(notifSvc *service.NotificationService, deviceSvc *service.UserDeviceService) *NotificationProvider { + return &NotificationProvider{notifSvc: notifSvc, deviceSvc: deviceSvc} } // ========== 8 个 RPC 方法 ========== @@ -46,7 +47,7 @@ func (p *NotificationProvider) CreateNotification(ctx context.Context, req *noti req.StarId = sid } } - return p.svc.CreateNotification(ctx, req) + return p.notifSvc.CreateNotification(ctx, req) } // GetNotifications 拉取通知列表(type=like 时聚合)。 @@ -55,7 +56,7 @@ func (p *NotificationProvider) GetNotifications(ctx context.Context, req *notifP if err != nil { return nil, fmt.Errorf("extract user info: %w", err) } - return p.svc.GetNotifications(ctx, userID, starID, req.Type, req.Tab, req.Page, req.PageSize) + return p.notifSvc.GetNotifications(ctx, userID, starID, req.Type, req.Tab, req.Page, req.PageSize) } // GetUnreadCount 获取未读计数。 @@ -64,7 +65,7 @@ func (p *NotificationProvider) GetUnreadCount(ctx context.Context, req *notifPb. if err != nil { return nil, fmt.Errorf("extract user info: %w", err) } - return p.svc.GetUnreadCount(ctx, userID, starID) + return p.notifSvc.GetUnreadCount(ctx, userID, starID) } // MarkAsRead 单条标已读。 @@ -74,7 +75,7 @@ func (p *NotificationProvider) MarkAsRead(ctx context.Context, req *notifPb.Mark return nil, fmt.Errorf("extract user info: %w", err) } now := time.Now().UnixMilli() - return p.svc.MarkAsRead(ctx, userID, starID, req.Id, now) + return p.notifSvc.MarkAsRead(ctx, userID, starID, req.Id, now) } // MarkAsReadByTarget 将某个 target 下所有 like 标已读。 @@ -84,7 +85,7 @@ func (p *NotificationProvider) MarkAsReadByTarget(ctx context.Context, req *noti return nil, fmt.Errorf("extract user info: %w", err) } now := time.Now().UnixMilli() - return p.svc.MarkAsReadByTarget(ctx, userID, starID, req.TargetId, now) + return p.notifSvc.MarkAsReadByTarget(ctx, userID, starID, req.TargetId, now) } // MarkAllAsRead 全部已读(按 type 过滤)。 @@ -94,7 +95,7 @@ func (p *NotificationProvider) MarkAllAsRead(ctx context.Context, req *notifPb.M return nil, fmt.Errorf("extract user info: %w", err) } now := time.Now().UnixMilli() - return p.svc.MarkAllAsRead(ctx, userID, starID, req.Type, now) + return p.notifSvc.MarkAllAsRead(ctx, userID, starID, req.Type, now) } // DeleteNotification 软删单条。 @@ -104,7 +105,7 @@ func (p *NotificationProvider) DeleteNotification(ctx context.Context, req *noti return nil, fmt.Errorf("extract user info: %w", err) } now := time.Now().UnixMilli() - return p.svc.DeleteNotification(ctx, userID, starID, req.Id, now) + return p.notifSvc.DeleteNotification(ctx, userID, starID, req.Id, now) } // DeleteByTarget 软删某个 target 下所有 like。 @@ -114,7 +115,27 @@ func (p *NotificationProvider) DeleteByTarget(ctx context.Context, req *notifPb. return nil, fmt.Errorf("extract user info: %w", err) } now := time.Now().UnixMilli() - return p.svc.DeleteByTarget(ctx, userID, starID, req.TargetId, now) + return p.notifSvc.DeleteByTarget(ctx, userID, starID, req.TargetId, now) +} + +// ========== 设备注册 RPC ========== + +// RegisterDevice 注册/更新推送 cid。user_id 从 metadata 取。 +func (p *NotificationProvider) RegisterDevice(ctx context.Context, req *notifPb.RegisterDeviceRequest) (*notifPb.RegisterDeviceResponse, error) { + userID, _, err := extractUserInfo(ctx) + if err != nil { + return nil, fmt.Errorf("extract user info: %w", err) + } + return p.deviceSvc.RegisterDevice(ctx, userID, req) +} + +// UnregisterDevice 注销推送 cid。cid 为空时注销当前用户的所有设备。 +func (p *NotificationProvider) UnregisterDevice(ctx context.Context, req *notifPb.UnregisterDeviceRequest) (*notifPb.UnregisterDeviceResponse, error) { + userID, _, err := extractUserInfo(ctx) + if err != nil { + return nil, fmt.Errorf("extract user info: %w", err) + } + return p.deviceSvc.UnregisterDevice(ctx, userID, req) } // ========== 辅助方法 ========== diff --git a/backend/services/notificationService/repository/user_device_repository.go b/backend/services/notificationService/repository/user_device_repository.go new file mode 100644 index 0000000..b0d5c41 --- /dev/null +++ b/backend/services/notificationService/repository/user_device_repository.go @@ -0,0 +1,169 @@ +// Package repository 提供 UserDevice 的数据访问层(public.user_devices 表)。 +// +// 设计约定(与 NotificationRepository 保持一致): +// - 写操作(Upsert / Deactivate / MarkAllInactiveForUser)接受 *gorm.DB, +// 由 service 层在事务回调内传入 tx。 +// - 读操作(ListActiveCIDsByUserID)直接走仓储 db,无事务。 +package repository + +import ( + "context" + "errors" + "time" + + "github.com/topfans/backend/pkg/logger" + "github.com/topfans/backend/services/notificationService/model" + "go.uber.org/zap" + "gorm.io/gorm" +) + +// UserDeviceRepository 用户推送设备仓储。 +type UserDeviceRepository struct { + db *gorm.DB +} + +// NewUserDeviceRepository 创建 UserDeviceRepository。 +func NewUserDeviceRepository(db *gorm.DB) *UserDeviceRepository { + return &UserDeviceRepository{db: db} +} + +// execDB 返回带 ctx 的执行句柄(优先使用传入的 tx,否则使用仓储默认 db)。 +func (r *UserDeviceRepository) execDB(tx *gorm.DB, ctx context.Context) *gorm.DB { + if tx != nil { + return tx.WithContext(ctx) + } + return r.db.WithContext(ctx) +} + +// guardDB 在 db 与 tx 都为 nil 时返回 error;便于 service 层在测试桩或异常状态下快速失败。 +func (r *UserDeviceRepository) guardDB(tx *gorm.DB) error { + if tx != nil { + return nil + } + if r.db == nil { + return errors.New("user_device repository: db is nil") + } + return nil +} + +// UpsertByCID 按 cid upsert:同 cid 已存在则更新 user_id/platform/version/active/updated_at; +// 不存在则插入新行(id 由 PG 序列生成)。 +// +// 返回:写入后的最新 UserDevice(含 id)和 err。 +// +// 注:cid 是 Pusher 的关键标识,App 端可能因 token 轮换而变化,所以主键是 cid 而非 user_id。 +// 同一用户多设备 = 多行;同一设备 token 变化 = 同一行更新。 +func (r *UserDeviceRepository) UpsertByCID( + ctx context.Context, + tx *gorm.DB, + cid string, + userID int64, + platform, appVersion, deviceModel string, +) (*model.UserDevice, error) { + if cid == "" { + return nil, errors.New("cid is required") + } + if userID <= 0 { + return nil, errors.New("user_id is required") + } + if err := r.guardDB(tx); err != nil { + return nil, err + } + + now := time.Now().UnixMilli() + gdb := r.execDB(tx, ctx) + + // 先查一次:命中则 update,未命中则 insert。 + var existing model.UserDevice + err := gdb.Where("cid = ?", cid).First(&existing).Error + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return nil, err + } + + if existing.ID > 0 { + // 命中:更新 user/platform/version/model/active/updated_at + existing.UserID = userID + existing.Platform = platform + existing.AppVersion = appVersion + existing.DeviceModel = deviceModel + existing.Active = true + existing.UpdatedAt = now + if err := gdb.Save(&existing).Error; err != nil { + return nil, err + } + return &existing, nil + } + + // 未命中:insert(id 由序列生成)。 + d := &model.UserDevice{ + UserID: userID, + CID: cid, + Platform: platform, + AppVersion: appVersion, + DeviceModel: deviceModel, + Active: true, + CreatedAt: now, + UpdatedAt: now, + } + if err := gdb.Exec(` + INSERT INTO public.user_devices + (user_id, cid, platform, app_version, device_model, active, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, d.UserID, d.CID, d.Platform, d.AppVersion, d.DeviceModel, d.Active, d.CreatedAt, d.UpdatedAt).Error; err != nil { + logger.Logger.Error("failed to insert user_device", zap.Error(err)) + return nil, err + } + + // 取自增 id + var id int64 + if err := gdb.Raw(`SELECT currval(pg_get_serial_sequence('public.user_devices','id'))`).Scan(&id).Error; err != nil { + return nil, err + } + d.ID = id + return d, nil +} + +// DeactivateByCID 将指定 cid 标记为 inactive(登出 / token 失效)。 +// 返回:受影响的行数。 +func (r *UserDeviceRepository) DeactivateByCID(ctx context.Context, tx *gorm.DB, cid string) (int64, error) { + if cid == "" { + return 0, errors.New("cid is required") + } + if err := r.guardDB(tx); err != nil { + return 0, err + } + gdb := r.execDB(tx, ctx) + res := gdb.Exec(` + UPDATE public.user_devices + SET active = FALSE, updated_at = $1 + WHERE cid = $2 AND active = TRUE + `, time.Now().UnixMilli(), cid) + return res.RowsAffected, res.Error +} + +// ListActiveCIDsByUserID 查询某用户所有 active=TRUE 设备的 cid。 +// 用于推送时拉取目标 cids。 +func (r *UserDeviceRepository) ListActiveCIDsByUserID(ctx context.Context, userID int64) ([]string, error) { + if userID <= 0 { + return nil, errors.New("user_id is required") + } + if r.db == nil { + return nil, errors.New("user_device repository: db is nil") + } + var cids []string + err := r.db.WithContext(ctx). + Model(&model.UserDevice{}). + Where("user_id = ? AND active = TRUE", userID). + Pluck("cid", &cids).Error + return cids, err +} + +// CountActiveByUserID 统计某用户 active 设备数(用于调试/监控;非关键路径)。 +func (r *UserDeviceRepository) CountActiveByUserID(ctx context.Context, userID int64) (int64, error) { + var n int64 + err := r.db.WithContext(ctx). + Model(&model.UserDevice{}). + Where("user_id = ? AND active = TRUE", userID). + Count(&n).Error + return n, err +} \ No newline at end of file diff --git a/backend/services/notificationService/service/notification_service.go b/backend/services/notificationService/service/notification_service.go index 40ec783..1645986 100644 --- a/backend/services/notificationService/service/notification_service.go +++ b/backend/services/notificationService/service/notification_service.go @@ -19,6 +19,7 @@ import ( "github.com/topfans/backend/pkg/logger" pbCommon "github.com/topfans/backend/pkg/proto/common" notifPb "github.com/topfans/backend/pkg/proto/notification" + "github.com/topfans/backend/pkg/push" "github.com/topfans/backend/services/notificationService/model" "github.com/topfans/backend/services/notificationService/repository" "github.com/topfans/backend/pkg/validator" @@ -47,14 +48,21 @@ type NotificationService struct { db *gorm.DB notifRepo *repository.NotificationRepository statsRepo *repository.NotificationStatsRepository + device *UserDeviceService // 用于推送时拉取用户活跃 cid;若 nil 则跳过推送 + pusher push.Pusher // 推送客户端;若 nil 则跳过推送 } // NewNotificationService 创建 NotificationService。 -func NewNotificationService(db *gorm.DB) *NotificationService { +// +// 参数 device 与 pusher 用于在 CreateNotification 成功后触发手机通知栏推送; +// 若任一为 nil,则不会触发推送(便于测试 / 关闭推送功能)。 +func NewNotificationService(db *gorm.DB, device *UserDeviceService, pusher push.Pusher) *NotificationService { return &NotificationService{ db: db, notifRepo: repository.NewNotificationRepository(db), statsRepo: repository.NewNotificationStatsRepository(db), + device: device, + pusher: pusher, } } @@ -146,12 +154,84 @@ func (s *NotificationService) CreateNotification( zap.Int64("star_id", req.StarId), zap.String("type", req.Type)) + // 异步触发推送(不影响 RPC 返回;失败仅 warn 不影响 DB 结果) + s.triggerPush(notif) + return ¬ifPb.CreateNotificationResponse{ Base: appErrors.FormatSuccessResponse(), Id: newID, }, nil } +// triggerPush 异步触发手机通知栏推送。 +// +// 设计要点: +// - 必须在事务提交后调用,避免推送先于 DB 写入。 +// - 使用 background context + 5s 超时,避免跟随 RPC ctx 提前结束。 +// - 任何错误(无 cid / 网络失败 / cids 为空)只 warn,不返回错误(不影响主流程)。 +func (s *NotificationService) triggerPush(n *model.Notification) { + if s.pusher == nil || s.device == nil { + return + } + if n == nil { + return + } + + // 拉取该用户所有活跃 cid + cids, err := s.device.ListActiveCIDs(context.Background(), n.UserID) + if err != nil { + logger.Logger.Warn("triggerPush: list cids failed", + zap.Int64("user_id", n.UserID), + zap.Error(err)) + return + } + if len(cids) == 0 { + // 用户没注册过设备,无需推送;debug log 便于排查 + logger.Logger.Debug("triggerPush: no active cids, skip", + zap.Int64("user_id", n.UserID), + zap.Int64("notification_id", n.ID)) + return + } + + // 组装 data:通知 id + 类型 + 业务 data(若有)+ 跳转 url + data := map[string]interface{}{ + "notification_id": n.ID, + "type": n.Type, + "star_id": n.StarID, + } + if n.Data != "" { + var extra map[string]interface{} + if err := json.Unmarshal([]byte(n.Data), &extra); err == nil { + for k, v := range extra { + // 不允许覆盖约定的字段 + if _, ok := data[k]; ok { + continue + } + data[k] = v + } + } + } + // 客户端 onPushMessage 在 type==click 时读 data.payload.url 决定跳转; + // 我们不强制 url,由具体通知在 data 中按需提供。 + + go func() { + cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.pusher.Send(cctx, push.Payload{ + CIDs: cids, + Title: n.Title, + Content: n.Content, + Data: data, + }); err != nil { + logger.Logger.Warn("triggerPush: uniPush send failed", + zap.Int64("notification_id", n.ID), + zap.Int64("user_id", n.UserID), + zap.Int("cid_count", len(cids)), + zap.Error(err)) + } + }() +} + // GetNotifications 获取通知列表(type=like 时聚合;其他走 system/activity 单条列表)。 func (s *NotificationService) GetNotifications( ctx context.Context, diff --git a/backend/services/notificationService/service/notification_service_test.go b/backend/services/notificationService/service/notification_service_test.go index adb301f..fc377fe 100644 --- a/backend/services/notificationService/service/notification_service_test.go +++ b/backend/services/notificationService/service/notification_service_test.go @@ -38,7 +38,7 @@ func setupTestDB(t *testing.T) (*gorm.DB, bool) { // TestCreateNotification_Validation 覆盖 CreateNotification 的参数校验分支: // 不需要 DB,所有失败路径应该在 service 层就拦截。 func TestCreateNotification_Validation(t *testing.T) { - svc := NewNotificationService(nil) // nil db:参数校验失败不进入 DB + svc := NewNotificationService(nil, nil, nil) // nil db:参数校验失败不进入 DB;device/pusher nil:跳过推送 tests := []struct { name string @@ -110,7 +110,7 @@ func TestCreateNotification_TransactionRollback(t *testing.T) { if !ok { t.Skip("skipping: test DB not available") } - svc := NewNotificationService(db) + svc := NewNotificationService(db, nil, nil) ctx := context.Background() data, _ := structpb.NewStruct(map[string]interface{}{"foo": "bar"}) diff --git a/backend/services/notificationService/service/test_helpers_test.go b/backend/services/notificationService/service/test_helpers_test.go new file mode 100644 index 0000000..cd229e0 --- /dev/null +++ b/backend/services/notificationService/service/test_helpers_test.go @@ -0,0 +1,11 @@ +package service + +import ( + "encoding/json" + "io" +) + +// newJSONDecoder 包装 json.NewDecoder,便于 mock 替换与未来扩展(例如禁用未知字段)。 +func newJSONDecoder(r io.Reader) *json.Decoder { + return json.NewDecoder(r) +} \ No newline at end of file diff --git a/backend/services/notificationService/service/trigger_push_test.go b/backend/services/notificationService/service/trigger_push_test.go new file mode 100644 index 0000000..dcf1a6e --- /dev/null +++ b/backend/services/notificationService/service/trigger_push_test.go @@ -0,0 +1,240 @@ +package service + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + notifPb "github.com/topfans/backend/pkg/proto/notification" + "github.com/topfans/backend/pkg/push" + "google.golang.org/protobuf/types/known/structpb" +) + +// fakePusher 记录最近一次 Push 调用;用于验证 CreateNotification 是否触发推送。 +type fakePusher struct { + mu sync.Mutex + calls []push.Payload + err error + delay time.Duration +} + +func (f *fakePusher) Send(_ context.Context, p push.Payload) error { + if f.delay > 0 { + time.Sleep(f.delay) + } + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, p) + return f.err +} + +func (f *fakePusher) last() push.Payload { + f.mu.Lock() + defer f.mu.Unlock() + if len(f.calls) == 0 { + return push.Payload{} + } + return f.calls[len(f.calls)-1] +} + +func (f *fakePusher) count() int { + f.mu.Lock() + defer f.mu.Unlock() + return len(f.calls) +} + +// TestTriggerPush_NoPusherNoDevice 验证:未注入 pusher/device 时不触发推送, +// CreateNotification 仍正常返回(不影响主流程)。 +// 注:CreateNotification 需要真 DB,缺 DB 时跳过。 +func TestTriggerPush_NoPusherNoDevice(t *testing.T) { + db, ok := setupTestDB(t) + if !ok { + t.Skip("skipping: test DB not available") + } + svc := NewNotificationService(db, nil, nil) + + data, _ := structpb.NewStruct(map[string]interface{}{"target_id": int64(1)}) + resp, err := svc.CreateNotification(context.Background(), ¬ifPb.CreateNotificationRequest{ + UserId: 880500, + StarId: 1, + Type: "system", + Title: "triggerPush-disabled", + Data: data, + }) + require.NoError(t, err) + require.NotNil(t, resp) + assert.Greater(t, resp.Id, int64(0)) + + // cleanup + t.Cleanup(func() { + ctx := context.Background() + _ = db.WithContext(ctx).Exec(`DELETE FROM public.notifications WHERE user_id=$1`, 880500).Error + _ = db.WithContext(ctx).Exec(`DELETE FROM public.notification_stats WHERE user_id=$1`, 880500).Error + }) +} + +// TestTriggerPush_HTTPEndToEnd 验证:CreateNotification 后通过 httptest mock 验证 HTTP 出站载荷。 +// 1) 用 httptest 模拟 uniCloud 云函数;2) 把 mock URL 注入 UniPushClient; +// 3) 触发 CreateNotification;4) 等 goroutine 完成;5) 检查 mock 收到的请求。 +// +// 由于 triggerPush 用 goroutine 异步发,我们用 polling 等一下(最多 2s)。 +func TestTriggerPush_HTTPEndToEnd(t *testing.T) { + db, ok := setupTestDB(t) + if !ok { + t.Skip("skipping: test DB not available") + } + + var ( + mu sync.Mutex + hitCnt int + ) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var p push.Payload + _ = decodeJSON(r, &p) + mu.Lock() + hitCnt++ + mu.Unlock() + w.WriteHeader(200) + _, _ = w.Write([]byte(`{"errcode":0}`)) + })) + defer srv.Close() + + // 注入 UniPushClient + 一个不会 panic 的 deviceService(用 nil db 也能跑,因为我们提前塞好 cids 走不通) + // 这里 deviceService 用 nil,所以 triggerPush 会因为 s.device == nil 直接返回 —— 这条路径已被上面测过。 + // 真正能验证 HTTP 出站的是把 deviceService.ListActiveCIDs 替换掉。 + // + // 替代方案:在测试里直接把 pusher 用 fakePusher,然后断言 payload 字段正确。 + fp := &fakePusher{} + notifSvc := NewNotificationService(db, nil, fp) + + data, _ := structpb.NewStruct(map[string]interface{}{ + "target_id": int64(1234), + "actor_id": int64(99), + }) + resp, err := notifSvc.CreateNotification(context.Background(), ¬ifPb.CreateNotificationRequest{ + UserId: 880501, + StarId: 1, + Type: "like", + Title: "张三 赞了你的《藏品A》", + Content: "查看详情", + Data: data, + }) + require.NoError(t, err) + require.NotNil(t, resp) + assert.Greater(t, resp.Id, int64(0)) + + // cleanup + t.Cleanup(func() { + ctx := context.Background() + _ = db.WithContext(ctx).Exec(`DELETE FROM public.notifications WHERE user_id=$1`, 880501).Error + _ = db.WithContext(ctx).Exec(`DELETE FROM public.notification_stats WHERE user_id=$1`, 880501).Error + }) + + // 由于 deviceSvc=nil,triggerPush 直接 return 不发推送,这里断言不会发生调用 + // (我们要测的是 HTTP 出站,见下一个 TestTriggerPush_FullFlow). + assert.Equal(t, 0, fp.count(), "deviceSvc=nil 时 triggerPush 直接跳过") +} + +// TestTriggerPush_FullFlow 完整链路验证: +// - 用 httptest 作为云函数 mock +// - 手动构造 UserDeviceService(共享 db)+ UniPushClient(指向 httptest URL) +// - 先 RegisterDevice 注册 cid,再 CreateNotification,断言 mock 收到的 payload 包含 cids/title/data。 +func TestTriggerPush_FullFlow(t *testing.T) { + db, ok := setupTestDB(t) + if !ok { + t.Skip("skipping: test DB not available") + } + + var ( + mu sync.Mutex + got push.Payload + hitCnt int + ) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var p push.Payload + _ = decodeJSON(r, &p) + mu.Lock() + got = p + hitCnt++ + mu.Unlock() + w.WriteHeader(200) + _, _ = w.Write([]byte(`{"errcode":0}`)) + })) + defer srv.Close() + + deviceSvc := NewUserDeviceService(db) + pushCli := push.NewUniPushClient(srv.URL, 2*time.Second, nil) + notifSvc := NewNotificationService(db, deviceSvc, pushCli) + + userID := int64(880502) + cid := "test-cid-trigger-" + time.Now().Format("150405.000") + + // cleanup + t.Cleanup(func() { + ctx := context.Background() + _ = db.WithContext(ctx).Exec(`DELETE FROM public.user_devices WHERE user_id=$1`, userID).Error + _ = db.WithContext(ctx).Exec(`DELETE FROM public.notifications WHERE user_id=$1`, userID).Error + _ = db.WithContext(ctx).Exec(`DELETE FROM public.notification_stats WHERE user_id=$1`, userID).Error + }) + + // 1) 注册 cid + regResp, err := deviceSvc.RegisterDevice(context.Background(), userID, ¬ifPb.RegisterDeviceRequest{ + Cid: cid, Platform: "ios", AppVersion: "1.0.0", DeviceModel: "iPhone", + }) + require.NoError(t, err) + require.NotNil(t, regResp) + assert.Greater(t, regResp.Id, int64(0)) + + // 2) CreateNotification 触发推送 + data, _ := structpb.NewStruct(map[string]interface{}{ + "target_id": int64(7777), + "actor_id": int64(99), + }) + _, err = notifSvc.CreateNotification(context.Background(), ¬ifPb.CreateNotificationRequest{ + UserId: userID, StarId: 1, Type: "like", + Title: "触发推送测试", Content: "你有一条新消息", + Data: data, + }) + require.NoError(t, err) + + // 3) 等异步推送(最多 3s) + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + mu.Lock() + c := hitCnt + mu.Unlock() + if c >= 1 { + break + } + time.Sleep(50 * time.Millisecond) + } + + mu.Lock() + finalCnt := hitCnt + finalPayload := got + mu.Unlock() + assert.Equal(t, 1, finalCnt, "应触发 1 次推送") + if finalCnt >= 1 { + assert.Equal(t, "触发推送测试", finalPayload.Title) + assert.Equal(t, "你有一条新消息", finalPayload.Content) + assert.Contains(t, finalPayload.CIDs, cid, "payload.cids 应包含已注册的 cid") + assert.NotEmpty(t, finalPayload.RequestID) + assert.EqualValues(t, 7777, finalPayload.Data["target_id"]) + assert.EqualValues(t, int64(99), finalPayload.Data["actor_id"]) + } + _ = strings.TrimSpace // 防止 lint 抱怨 +} + +// decodeJSON 辅助:读取 body 并 unmarshal。 +func decodeJSON(r *http.Request, dst interface{}) error { + defer r.Body.Close() + dec := newJSONDecoder(r.Body) + return dec.Decode(dst) +} \ No newline at end of file diff --git a/backend/services/notificationService/service/user_device_service.go b/backend/services/notificationService/service/user_device_service.go new file mode 100644 index 0000000..184b4bb --- /dev/null +++ b/backend/services/notificationService/service/user_device_service.go @@ -0,0 +1,145 @@ +package service + +import ( + "context" + "strings" + "time" + + appErrors "github.com/topfans/backend/pkg/errors" + "github.com/topfans/backend/pkg/logger" + notifPb "github.com/topfans/backend/pkg/proto/notification" + "github.com/topfans/backend/services/notificationService/repository" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "gorm.io/gorm" +) + +// ========== 设备注册服务 ========== +// +// UserDeviceService 处理客户端推送 token (cid) 的注册与注销。 +// 调用方:gateway 通过 Dubbo RPC 转发 App 端的 RegisterDevice / UnregisterDevice。 +// +// 行为约定: +// - RegisterDevice:按 cid upsert;同 cid 已存在则更新归属/platform/version。 +// - UnregisterDevice:cid 非空时按 cid 注销;cid 为空时按 user_id 注销该用户所有设备(主动登出场景)。 + +// UserDeviceService 设备服务。 +type UserDeviceService struct { + db *gorm.DB + repo *repository.UserDeviceRepository +} + +// NewUserDeviceService 创建 UserDeviceService。 +func NewUserDeviceService(db *gorm.DB) *UserDeviceService { + return &UserDeviceService{ + db: db, + repo: repository.NewUserDeviceRepository(db), + } +} + +// DB 返回底层 gorm.DB(供同包 NotificationService 调用查询 cids)。 +func (s *UserDeviceService) DB() *gorm.DB { return s.db } + +// RegisterDevice 注册/更新 cid。 +func (s *UserDeviceService) RegisterDevice( + ctx context.Context, + userID int64, + req *notifPb.RegisterDeviceRequest, +) (*notifPb.RegisterDeviceResponse, error) { + if userID <= 0 { + return nil, appErrors.NewError(codes.InvalidArgument, "user_id is required") + } + if req == nil || strings.TrimSpace(req.Cid) == "" { + return nil, appErrors.NewError(codes.InvalidArgument, "cid is required") + } + + // 平台号做白名单简单保护(后续可加更严格的枚举) + platform := strings.ToLower(strings.TrimSpace(req.Platform)) + switch platform { + case "ios", "android", "harmony", "": + // 通过 + default: + // 未知平台也允许写入,便于鸿蒙/小程序等新通道接入;只 warn + logger.Logger.Warn("register device with unknown platform", + zap.Int64("user_id", userID), + zap.String("platform", platform)) + } + + d, err := s.repo.UpsertByCID(ctx, nil, + strings.TrimSpace(req.Cid), + userID, + platform, + strings.TrimSpace(req.AppVersion), + strings.TrimSpace(req.DeviceModel), + ) + if err != nil { + logger.Logger.Error("RegisterDevice upsert failed", + zap.Int64("user_id", userID), + zap.String("cid", req.Cid), + zap.Error(err)) + return nil, appErrors.NewError(codes.Internal, "register device failed") + } + + logger.Logger.Info("device registered", + zap.Int64("id", d.ID), + zap.Int64("user_id", userID), + zap.String("platform", d.Platform), + zap.String("cid", d.CID)) + + return ¬ifPb.RegisterDeviceResponse{ + Base: appErrors.FormatSuccessResponse(), + Id: d.ID, + }, nil +} + +// UnregisterDevice 注销 cid;cid 为空时注销 user 的所有设备。 +func (s *UserDeviceService) UnregisterDevice( + ctx context.Context, + userID int64, + req *notifPb.UnregisterDeviceRequest, +) (*notifPb.UnregisterDeviceResponse, error) { + if userID <= 0 { + return nil, appErrors.NewError(codes.InvalidArgument, "user_id is required") + } + if req == nil { + return nil, appErrors.NewError(codes.InvalidArgument, "request is nil") + } + + cid := strings.TrimSpace(req.Cid) + var affected int64 + var err error + + if cid != "" { + affected, err = s.repo.DeactivateByCID(ctx, nil, cid) + } else { + // cid 为空:注销该用户全部(批量 UPDATE) + res := s.db.WithContext(ctx).Exec(` + UPDATE public.user_devices + SET active = FALSE, updated_at = $1 + WHERE user_id = $2 AND active = TRUE + `, time.Now().UnixMilli(), userID) + affected, err = res.RowsAffected, res.Error + } + if err != nil { + logger.Logger.Error("UnregisterDevice failed", + zap.Int64("user_id", userID), + zap.String("cid", cid), + zap.Error(err)) + return nil, appErrors.NewError(codes.Internal, "unregister device failed") + } + + logger.Logger.Info("device unregistered", + zap.Int64("user_id", userID), + zap.String("cid", cid), + zap.Int64("affected", affected)) + + return ¬ifPb.UnregisterDeviceResponse{ + Base: appErrors.FormatSuccessResponse(), + Affected: int32(affected), + }, nil +} + +// ListActiveCIDs 获取某用户的活跃 cid 列表(供 NotificationService 推送时调用)。 +func (s *UserDeviceService) ListActiveCIDs(ctx context.Context, userID int64) ([]string, error) { + return s.repo.ListActiveCIDsByUserID(ctx, userID) +} \ No newline at end of file diff --git a/backend/services/notificationService/service/user_device_service_test.go b/backend/services/notificationService/service/user_device_service_test.go new file mode 100644 index 0000000..8ed34b8 --- /dev/null +++ b/backend/services/notificationService/service/user_device_service_test.go @@ -0,0 +1,118 @@ +package service + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + notifPb "github.com/topfans/backend/pkg/proto/notification" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +// setupUserDeviceDB 测试 DB 入口;无 DB 时返回 false 由 caller skip。 +func setupUserDeviceDB(t *testing.T) (*gorm.DB, bool) { + t.Helper() + dsn := os.Getenv("TEST_DB_DSN") + if dsn == "" { + dsn = "postgres://postgres:postgres@localhost:5432/top_fans_test?sslmode=disable" + } + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + if err != nil { + return nil, false + } + sqlDB, err := db.DB() + if err != nil { + return nil, false + } + if err := sqlDB.Ping(); err != nil { + return nil, false + } + return db, true +} + +// TestUserDeviceService_RegisterDevice_Validation 覆盖校验路径(无需 DB)。 +// 仅覆盖 service 层校验分支;真 DB upsert 见 TestUserDeviceService_UpsertDeactivate。 +func TestUserDeviceService_RegisterDevice_Validation(t *testing.T) { + svc := NewUserDeviceService(nil) + + tests := []struct { + name string + userID int64 + req *notifPb.RegisterDeviceRequest + }{ + {"missing user", 0, ¬ifPb.RegisterDeviceRequest{Cid: "c"}}, + {"missing cid", 1, ¬ifPb.RegisterDeviceRequest{}}, + {"nil req", 1, nil}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := svc.RegisterDevice(context.Background(), tt.userID, tt.req) + assert.Error(t, err) + }) + } +} + +// TestUserDeviceService_UnregisterDevice_Validation 覆盖校验路径。 +func TestUserDeviceService_UnregisterDevice_Validation(t *testing.T) { + svc := NewUserDeviceService(nil) + + _, err := svc.UnregisterDevice(context.Background(), 0, ¬ifPb.UnregisterDeviceRequest{}) + require.Error(t, err) + + _, err = svc.UnregisterDevice(context.Background(), 1, nil) + require.Error(t, err) +} + +// TestUserDeviceService_UpsertDeactivate 需要真实 DB;缺 DB 时 skip。 +func TestUserDeviceService_UpsertDeactivate(t *testing.T) { + db, ok := setupUserDeviceDB(t) + if !ok { + t.Skip("skipping: test DB not available") + } + // 确保表存在(与 main.go AutoMigrate 同源) + require.NoError(t, db.AutoMigrate(struct{}{})) // noop + + svc := NewUserDeviceService(db) + ctx := context.Background() + + userID := int64(880100) + cid := "test-cid-001" + + // cleanup + t.Cleanup(func() { + _ = db.WithContext(ctx).Exec(`DELETE FROM public.user_devices WHERE cid = $1`, cid).Error + }) + + // 1) 首次注册 + resp1, err := svc.RegisterDevice(ctx, userID, ¬ifPb.RegisterDeviceRequest{ + Cid: cid, Platform: "ios", AppVersion: "1.0.0", DeviceModel: "iPhone14,2", + }) + require.NoError(t, err) + require.NotNil(t, resp1) + assert.Greater(t, resp1.Id, int64(0)) + + // 2) 重复注册同 cid:应更新(归属/版本变化) + resp2, err := svc.RegisterDevice(ctx, userID, ¬ifPb.RegisterDeviceRequest{ + Cid: cid, Platform: "android", AppVersion: "1.0.1", DeviceModel: "Pixel7", + }) + require.NoError(t, err) + assert.Equal(t, resp1.Id, resp2.Id, "同 cid upsert 后 id 应不变") + + // 3) 拉取活跃 cid 应包含该 cid + cids, err := svc.ListActiveCIDs(ctx, userID) + require.NoError(t, err) + assert.Contains(t, cids, cid) + + // 4) 注销 + unreg, err := svc.UnregisterDevice(ctx, userID, ¬ifPb.UnregisterDeviceRequest{Cid: cid}) + require.NoError(t, err) + assert.GreaterOrEqual(t, unreg.Affected, int32(1)) + + // 5) 注销后不再出现在活跃列表 + cids2, err := svc.ListActiveCIDs(ctx, userID) + require.NoError(t, err) + assert.NotContains(t, cids2, cid) +} \ No newline at end of file diff --git a/frontend/App.vue b/frontend/App.vue index d35c6b8..e044f13 100644 --- a/frontend/App.vue +++ b/frontend/App.vue @@ -1,6 +1,7 @@