topfans/frontend/utils/socket/SocketManager.js
Lenticular Studio Agent fd5e317e05 merge: 解决 docker-compose 和 SocketManager 冲突
Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-23 23:06:24 +08:00

404 lines
13 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { getWebSocketBaseUrl } from '../api'
/**
* WebSocket 管理器
* 支持多个 WebSocket 连接,自动管理鉴权、心跳、重连
*/
class SocketManager {
constructor(options = {}) {
this.serviceName = options.serviceName || 'unknown'
this.baseUrl = 'ws://127.0.0.1:8080' // 占位符,等 connect() 时再获取真实地址
this.token = null
this.socket = null
this.heartbeatTimer = null
this.reconnectTimer = null
this.reconnectInterval = options.reconnectInterval || 3000
this.heartbeatInterval = options.heartbeatInterval || 30000
this.maxReconnectAttempts = options.maxReconnectAttempts || 5
this.reconnectAttempts = 0
// 状态
this.isConnected = false
this.isAuthed = false
this.isClosing = false // 标记是否主动关闭
// 事件处理器
this.eventHandlers = {
'connect': [],
'disconnect': [],
'auth_success': [],
'auth_fail': [],
'error': [],
'message': [] // 通用消息处理
}
// 子类可覆盖的消息类型处理
this.messageHandlers = {}
}
/**
* 连接到 WebSocket 服务器
* - 若已有 token 且与新 token 不一致:强制关闭旧连接、清空状态、让子类清理订阅缓存,
* 避免上一个用户(以旧 token 鉴权)的 WS 被新用户复用,导致"另一个账户收不到实时推送"。
*/
async connect(token, path) {
const tokenChanged = !!this.token && this.token !== token
if (tokenChanged) {
console.log(`[${this.serviceName}] Token changed, force reconnecting (old token len=${this.token.length}, new token len=${token.length})`)
this._hardReset()
}
this.token = token
this.path = path
this.reconnectAttempts = 0
this.isClosing = false // 重置关闭状态,允许重连
// 异步获取真实 WebSocket 地址(等待环境检测完成)
this.baseUrl = await getWebSocketBaseUrl()
console.log(`[${this.serviceName}] WebSocket base URL: ${this.baseUrl}`)
this._doConnect()
}
/**
* 硬重置:关闭现有连接、清空状态。子类可覆写 _resetSubscriptions 清理自己持有的状态(如订阅列表)。
*/
_hardReset() {
// 先标记 isClosing=true 防止 onClose 回调里触发 _tryReconnect
this.isClosing = true
if (this.socket) {
try {
if (typeof this.socket.close === 'function') {
this.socket.close()
} else if (typeof this.socket.complete === 'function') {
this.socket.complete()
}
} catch (err) {
console.warn(`[${this.serviceName}] Close old socket error:`, err)
}
}
// _cleanup 清掉 isConnected/isAuthed/heartbeat/reconnectTimer
this._cleanup()
this.socket = null
this.isClosing = false
// 子类钩子:清掉自己持有的订阅/缓存(如 ActivitySocket._topics)
if (typeof this._resetSubscriptions === 'function') {
try {
this._resetSubscriptions()
} catch (err) {
console.warn(`[${this.serviceName}] _resetSubscriptions error:`, err)
}
}
}
_doConnect() {
// 如果已有连接且已连接,不重复连接
if (this.socket && this.isConnected) {
console.log(`[${this.serviceName}] Already connected (${this.isConnected}), skip reconnect`)
return
}
// 防止多组件在同一 tick 里都调 connect() 导致重复创建 socket
if (this._isConnecting) {
console.log(`[${this.serviceName}] Connect already in progress, skip`)
return
}
this._isConnecting = true
console.log(`[${this.serviceName}] _doConnect called, clearing old socket`)
// 清理旧连接
if (this.socket) {
this.socket = null
}
this.isConnected = false
const url = `${this.baseUrl}${this.path}?token=Bearer_${this.token}`
console.log(`[${this.serviceName}] Connecting to ${url}`)
// UniApp: connectSocket 是异步的,需要处理错误
try {
this.socket = uni.connectSocket({
url,
fail: (err) => {
console.error(`[${this.serviceName}] connectSocket fail:`, err)
this._emit('error', { code: 'CONNECT_FAILED', message: err.errMsg || '连接失败' })
}
})
if (!this.socket) {
console.error(`[${this.serviceName}] socket is null`)
this._isConnecting = false
return
}
console.log(`[${this.serviceName}] SocketTask created, checking methods:`, Object.keys(this.socket))
this._setupListeners()
} catch (err) {
console.error(`[${this.serviceName}] Exception during connect:`, err)
this._isConnecting = false
}
}
_setupListeners() {
// 检查 socket 是否有效
if (!this.socket) {
console.error(`[${this.serviceName}] socket is null in _setupListeners`)
return
}
// UniApp SocketTask API: onOpen/onClose/onError/onMessage 是设置回调的方法
// 也可能是事件名是 'open', 'close', 'error', 'message'
const socket = this.socket
const self = this
// 连接打开
if (typeof socket.onOpen === 'function') {
socket.onOpen(function() {
console.log(`[${self.serviceName}] WebSocket connected`)
self.isConnected = true
self._isConnecting = false // 连接成功,允许后续重连
// 清除重连计时器
if (self.reconnectTimer) {
clearTimeout(self.reconnectTimer)
self.reconnectTimer = null
}
self.reconnectAttempts = 0
self._emit('connect')
})
} else if (typeof socket.onopen === 'function') {
// 标准 WebSocket 风格
socket.onopen(function() {
console.log(`[${self.serviceName}] WebSocket connected`)
self.isConnected = true
self._isConnecting = false // 连接成功,允许后续重连
// 清除重连计时器
if (self.reconnectTimer) {
clearTimeout(self.reconnectTimer)
self.reconnectTimer = null
}
self.reconnectAttempts = 0
self._emit('connect')
})
} else {
console.warn(`[${self.serviceName}] Unknown socket API, socket keys:`, Object.keys(socket))
}
// 接收消息
if (typeof socket.onMessage === 'function') {
socket.onMessage(function(event) {
const data = JSON.parse(event.data)
self._handleMessage(data)
})
} else if (typeof socket.onmessage === 'function') {
socket.onmessage(function(event) {
const data = JSON.parse(event.data)
self._handleMessage(data)
})
}
// 连接关闭
if (typeof socket.onClose === 'function') {
socket.onClose(function() {
console.log(`[${self.serviceName}] WebSocket closed`)
self._isConnecting = false // 连接关闭,允许后续重连
self._cleanup()
self._emit('disconnect')
self._tryReconnect()
})
} else if (typeof socket.onclose === 'function') {
socket.onclose(function() {
console.log(`[${self.serviceName}] WebSocket closed`)
self._isConnecting = false // 连接关闭,允许后续重连
self._cleanup()
self._emit('disconnect')
self._tryReconnect()
})
}
// 连接错误
var handleSocketError = function(err) {
console.error(`[${self.serviceName}] WebSocket error:`, err)
// 检查是否是鉴权相关的错误401/403
var errMsg = (err && (err.errMsg || err.message || '')).toLowerCase()
if (errMsg.indexOf('auth') !== -1 || errMsg.indexOf('reject') !== -1 || errMsg.indexOf('401') !== -1) {
console.warn(`[${self.serviceName}] Connection rejected (auth failure), clearing token`)
self._emit('auth_fail', err)
self.close()
return
}
self._emit('error', err)
}
if (typeof socket.onError === 'function') {
socket.onError(handleSocketError)
} else if (typeof socket.onerror === 'function') {
socket.onerror(handleSocketError)
}
}
_handleMessage(data) {
// 触发通用消息事件
this._emit('message', data)
// 根据消息类型处理
const { type, action } = data
// 1. 鉴权响应(通用)
if (type === 'auth_response') {
if (data.success) {
this.isAuthed = true
this._emit('auth_success', data)
this._startHeartbeat()
} else {
this.isAuthed = false
this._emit('auth_fail', data)
this.close()
}
return
}
// 2. 心跳响应(通用)
if (type === 'pong') {
console.log(`[${this.serviceName}] Heartbeat received`)
return
}
// 3. 错误响应(通用)
if (type === 'error') {
this._emit('error', data)
return
}
// 4. 服务特定消息类型处理
const handler = this.messageHandlers[type] || this.messageHandlers[action]
if (handler) {
handler(data)
}
}
/**
* 发送消息
*/
send(data) {
if (!this.socket || !this.isConnected) {
console.warn(`[${this.serviceName}] Socket not connected`)
return false
}
// UniApp: socket.send({ data, success, fail })
if (typeof this.socket.send === 'function') {
this.socket.send({
data: JSON.stringify(data),
fail: (err) => {
console.error(`[${this.serviceName}] send fail:`, err)
}
})
} else {
console.warn(`[${this.serviceName}] socket.send is not a function`)
}
return true
}
/**
* 发送心跳
*/
_startHeartbeat() {
this._stopHeartbeat()
this.heartbeatTimer = setInterval(() => {
if (this.isConnected) {
this.send({ action: 'ping' })
}
}, this.heartbeatInterval)
}
_stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = null
}
}
/**
* 重连机制
*/
_tryReconnect() {
if (this.isClosing) {
console.log(`[${this.serviceName}] Closing intentionally, skip reconnect`)
return
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.warn(`[${this.serviceName}] Max reconnect attempts reached`)
this._emit('error', { code: 'RECONNECT_FAILED', message: '重连次数已达上限' })
return
}
this.reconnectAttempts++
console.log(`[${this.serviceName}] Auto reconnecting... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
this.reconnectTimer = setTimeout(() => {
this._doConnect()
}, this.reconnectInterval)
}
_cleanup() {
this._stopHeartbeat()
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
this.isConnected = false
this.isAuthed = false
}
/**
* 关闭连接
*/
close() {
this.isClosing = true
this._cleanup()
if (this.socket) {
// UniApp: socket.close() 可能不存在,用 complete 代替
if (typeof this.socket.close === 'function') {
this.socket.close()
} else if (typeof this.socket.complete === 'function') {
this.socket.complete()
}
this.socket = null
}
}
// ===== 事件系统 =====
on(event, handler) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].push(handler)
}
return () => this.off(event, handler) // 返回取消订阅函数
}
off(event, handler) {
if (this.eventHandlers[event]) {
this.eventHandlers[event] = this.eventHandlers[event].filter(h => h !== handler)
}
}
_emit(event, data) {
if (this.eventHandlers[event]) {
this.eventHandlers[event].forEach(handler => handler(data))
}
}
// ===== 订阅特定消息类型 =====
/**
* 注册特定消息类型的处理器
* @param {string} type 消息类型或 action
* @param {function} handler 处理函数
*/
registerHandler(type, handler) {
this.messageHandlers[type] = handler
}
}
/** 导出 */
export default SocketManager