From 02c0da037f1f4ce50b2c5d8fc60ac1fbf0bde14e Mon Sep 17 00:00:00 2001 From: SunWuyuan Date: Sat, 25 Oct 2025 17:10:22 +0800 Subject: [PATCH] feat: integrate Socket.IO for real-time updates and online device management - Added Socket.IO dependency to enable real-time communication. - Initialized Socket.IO in the server and bound it to the HTTP server. - Implemented functionality to allow clients to join device channels using KV tokens. - Added endpoints to retrieve online devices and broadcast key changes. - Enhanced existing routes to include device names in responses. - Implemented broadcasting of key changes for KV operations. - Updated documentation to reflect the new Socket.IO integration and usage. --- SOCKET_API.md | 565 +++++++++++++++++++++++++++++++++++++++++++++ bin/www | 4 + package.json | 1 + pnpm-lock.yaml | 134 ++++++++++- routes/apps.js | 2 + routes/device.js | 35 ++- routes/kv-token.js | 36 ++- utils/socket.js | 206 +++++++++++++++++ 8 files changed, 978 insertions(+), 5 deletions(-) create mode 100644 SOCKET_API.md create mode 100644 utils/socket.js diff --git a/SOCKET_API.md b/SOCKET_API.md new file mode 100644 index 0000000..a907cb2 --- /dev/null +++ b/SOCKET_API.md @@ -0,0 +1,565 @@ +# Socket.IO 实时频道接口文档(前端) + +## 概述 + +ClassworksKV 提供基于 Socket.IO 的实时键值变更通知服务。前端使用 **KV token**(应用安装 token)加入频道,服务端会自动将 token 映射到对应设备的 uuid 房间。**同一设备的不同 token 会被归入同一频道**,因此多个客户端/应用可以共享实时更新。 + +**重要变更**:不再支持直接使用 uuid 加入频道,所有连接必须使用有效的 KV token。 + +## 安装依赖 + +前端项目安装 Socket.IO 客户端: + +```bash +# npm +npm install socket.io-client + +# pnpm +pnpm add socket.io-client + +# yarn +yarn add socket.io-client +``` + +## 连接服务器 + +### 基础连接 + +```typescript +import { io, Socket } from 'socket.io-client'; + +const SERVER_URL = 'http://localhost:3000'; // 替换为实际服务器地址 + +const socket: Socket = io(SERVER_URL, { + transports: ['websocket'], +}); +``` + +### 连接时自动加入频道(推荐) + +在连接握手时通过 query 参数传入 token,自动加入对应设备频道: + +```typescript +const socket = io(SERVER_URL, { + transports: ['websocket'], + query: { + token: '', // 或使用 apptoken 参数 + }, +}); + +// 监听加入成功 +socket.on('joined', (info) => { + console.log('已加入频道:', info); + // { by: 'token', uuid: 'device-uuid-xxx' } +}); + +// 监听加入失败 +socket.on('join-error', (error) => { + console.error('加入频道失败:', error); + // { by: 'token', reason: 'invalid_token' } +}); +``` + +## 事件接口 + +### 1. 客户端发送的事件 + +#### `join-token` - 使用 token 加入频道 + +连接后按需加入频道。 + +**载荷格式:** +```typescript +{ + token?: string; // KV token(二选一) + apptoken?: string; // 或使用 apptoken 字段 +} +``` + +**示例:** +```typescript +socket.emit('join-token', { token: '' }); +``` + +--- + +#### `leave-token` - 使用 token 离开频道 + +离开指定 token 对应的设备频道。 + +**载荷格式:** +```typescript +{ + token?: string; + apptoken?: string; +} +``` + +**示例:** +```typescript +socket.emit('leave-token', { token: '' }); +``` + +--- + +#### `leave-all` - 离开所有频道 + +断开前清理,离开该连接加入的所有设备频道。 + +**载荷:** 无 + +**示例:** +```typescript +socket.emit('leave-all'); +``` + +--- + +### 2. 服务端发送的事件 + +#### `joined` - 加入成功通知 + +当成功加入频道后,服务端会发送此事件。 + +**载荷格式:** +```typescript +{ + by: 'token'; + uuid: string; // 设备 uuid(用于调试/日志) +} +``` + +**示例:** +```typescript +socket.on('joined', (info) => { + console.log(`成功加入设备 ${info.uuid} 的频道`); +}); +``` + +--- + +#### `join-error` - 加入失败通知 + +token 无效或查询失败时触发。 + +**载荷格式:** +```typescript +{ + by: 'token'; + reason: 'invalid_token'; // 失败原因 +} +``` + +**示例:** +```typescript +socket.on('join-error', (error) => { + console.error('Token 无效,无法加入频道'); +}); +``` + +--- + +#### `kv-key-changed` - 键值变更广播 + +当设备下的 KV 键被创建/更新/删除时,向该设备频道内所有连接广播此事件。 + +**载荷格式:** +```typescript +{ + uuid: string; // 设备 uuid + key: string; // 变更的键名 + action: 'upsert' | 'delete'; // 操作类型 + + // 仅 action='upsert' 时存在: + created?: boolean; // 是否首次创建 + updatedAt?: string; // 更新时间(ISO 8601) + batch?: boolean; // 是否为批量导入中的单条 + + // 仅 action='delete' 时存在: + deletedAt?: string; // 删除时间(ISO 8601) +} +``` + +**示例:** +```typescript +socket.on('kv-key-changed', (msg) => { + if (msg.action === 'upsert') { + console.log(`键 ${msg.key} 已${msg.created ? '创建' : '更新'}`); + // 刷新本地缓存或重新获取数据 + } else if (msg.action === 'delete') { + console.log(`键 ${msg.key} 已删除`); + // 从本地缓存移除 + } +}); +``` + +**载荷示例:** + +- 新建/更新键: + ```json + { + "uuid": "device-001", + "key": "settings/theme", + "action": "upsert", + "created": false, + "updatedAt": "2025-10-25T08:30:00.000Z" + } + ``` + +- 删除键: + ```json + { + "uuid": "device-001", + "key": "settings/theme", + "action": "delete", + "deletedAt": "2025-10-25T08:35:00.000Z" + } + ``` + +- 批量导入中的单条: + ```json + { + "uuid": "device-001", + "key": "config/version", + "action": "upsert", + "created": true, + "updatedAt": "2025-10-25T08:40:00.000Z", + "batch": true + } + ``` + +--- + +#### `device-joined` - 设备频道连接数变化(可选) + +当有新连接加入某设备频道时广播,用于显示在线人数。 + +**载荷格式:** +```typescript +{ + uuid: string; // 设备 uuid + connections: number; // 当前连接数 +} +``` + +**示例:** +```typescript +socket.on('device-joined', (info) => { + console.log(`设备 ${info.uuid} 当前有 ${info.connections} 个连接`); +}); +``` + +--- + +## 完整使用示例 + +### React Hook 封装 + +```typescript +import { useEffect, useRef } from 'react'; +import { io, Socket } from 'socket.io-client'; + +const SERVER_URL = import.meta.env.VITE_SERVER_URL || 'http://localhost:3000'; + +interface KvKeyChange { + uuid: string; + key: string; + action: 'upsert' | 'delete'; + created?: boolean; + updatedAt?: string; + deletedAt?: string; + batch?: boolean; +} + +export function useKvChannel( + token: string | null, + onKeyChanged?: (event: KvKeyChange) => void +) { + const socketRef = useRef(null); + + useEffect(() => { + if (!token) return; + + // 创建连接并加入频道 + const socket = io(SERVER_URL, { + transports: ['websocket'], + query: { token }, + }); + + socket.on('joined', (info) => { + console.log('已加入设备频道:', info.uuid); + }); + + socket.on('join-error', (err) => { + console.error('加入频道失败:', err.reason); + }); + + socket.on('kv-key-changed', (msg: KvKeyChange) => { + onKeyChanged?.(msg); + }); + + socketRef.current = socket; + + return () => { + socket.emit('leave-all'); + socket.close(); + }; + }, [token]); + + return socketRef.current; +} +``` + +### Vue Composable 封装 + +```typescript +import { ref, watch, onUnmounted } from 'vue'; +import { io, Socket } from 'socket.io-client'; + +const SERVER_URL = import.meta.env.VITE_SERVER_URL || 'http://localhost:3000'; + +export function useKvChannel(token: Ref) { + const socket = ref(null); + const isConnected = ref(false); + const deviceUuid = ref(null); + + watch(token, (newToken) => { + // 清理旧连接 + if (socket.value) { + socket.value.emit('leave-all'); + socket.value.close(); + socket.value = null; + } + + if (!newToken) return; + + // 创建新连接 + const s = io(SERVER_URL, { + transports: ['websocket'], + query: { token: newToken }, + }); + + s.on('connect', () => { + isConnected.value = true; + }); + + s.on('disconnect', () => { + isConnected.value = false; + }); + + s.on('joined', (info) => { + deviceUuid.value = info.uuid; + console.log('已加入设备频道:', info.uuid); + }); + + s.on('join-error', (err) => { + console.error('加入失败:', err.reason); + }); + + socket.value = s; + }, { immediate: true }); + + onUnmounted(() => { + if (socket.value) { + socket.value.emit('leave-all'); + socket.value.close(); + } + }); + + return { socket, isConnected, deviceUuid }; +} +``` + +### 使用示例(React) + +```tsx +import { useKvChannel } from './hooks/useKvChannel'; + +function MyComponent() { + const token = localStorage.getItem('kv-token'); + + useKvChannel(token, (event) => { + console.log('KV 变更:', event); + + if (event.action === 'upsert') { + // 更新本地状态或重新获取数据 + fetchKeyValue(event.key); + } else if (event.action === 'delete') { + // 从本地移除 + removeFromCache(event.key); + } + }); + + return
实时监听中...
; +} +``` + +--- + +## REST API:查询在线设备 + +除了 Socket.IO 实时事件,还提供 HTTP 接口查询当前在线设备列表。 + +### `GET /devices/online` + +**响应格式:** +```typescript +{ + success: true; + devices: Array<{ + uuid: string; // 设备 uuid + connections: number; // 当前连接数 + name: string | null; // 设备名称(若已设置) + }>; +} +``` + +**示例:** +```typescript +const response = await fetch(`${SERVER_URL}/devices/online`); +const data = await response.json(); + +console.log('在线设备:', data.devices); +// [{ uuid: 'device-001', connections: 3, name: 'My Device' }, ...] +``` + +--- + +## 获取 KV Token + +前端需要先获取有效的 KV token 才能加入频道。Token 通过以下接口获取: + +### 安装应用获取 token + +**接口:** `POST /apps/devices/:uuid/install/:appId` + +**认证:** 需要设备 UUID 认证(密码或账户 JWT) + +**响应包含:** +```typescript +{ + id: string; + appId: string; + token: string; // 用于 KV 操作和加入频道 + note: string | null; + name: string | null; // 等同于 note,便于展示 + installedAt: string; +} +``` + +### 列出设备已有的 token + +**接口:** `GET /apps/tokens?uuid=` + +**响应:** +```typescript +{ + success: true; + tokens: Array<{ + id: string; + token: string; + appId: string; + installedAt: string; + note: string | null; + name: string | null; // 等同于 note + }>; + deviceUuid: string; +} +``` + +--- + +## 注意事项与最佳实践 + +1. **Token 必需**:所有连接必须提供有效的 KV token,不再支持直接使用 uuid。 + +2. **频道归并**:同一设备的不同 token 会自动归入同一房间(以设备 uuid 为房间名),因此多个应用/客户端可以共享实时更新。 + +3. **连接管理**: + - 组件卸载时调用 `leave-all` 或 `leave-token` 清理连接 + - 避免频繁创建/销毁连接,建议在应用全局维护单个 socket 实例 + +4. **重连处理**: + - Socket.IO 客户端内置自动重连 + - 在 `connect` 事件后重新 emit `join-token` 确保重连后仍在频道内(或在握手时传 token 自动加入) + +5. **CORS 配置**: + - 服务端通过环境变量 `FRONTEND_URL` 控制允许的来源 + - 未设置时默认为 `*`(允许所有来源) + - 生产环境建议设置为前端实际域名 + +6. **错误处理**: + - 监听 `join-error` 事件处理 token 无效情况 + - 监听 `connect_error` 处理网络连接失败 + +7. **性能优化**: + - 批量导入时会逐条广播,前端可根据 `batch: true` 标记做去抖处理 + - 建议在本地维护 KV 缓存,收到变更通知时增量更新而非全量刷新 + +--- + +## 环境变量配置 + +服务端需要配置以下环境变量: + +```env +# Socket.IO CORS 允许的来源 +FRONTEND_URL=http://localhost:5173 + +# 服务器端口(可选,默认 3000) +PORT=3000 +``` + +--- + +## 常见问题 + +### Q: 如何支持多个设备? + +A: 对每个设备的 token 分别调用 `join-token`,或在连接时传入一个 token,后续通过事件加入其他设备。 + +```typescript +socket.emit('join-token', { token: token1 }); +socket.emit('join-token', { token: token2 }); +``` + +### Q: 广播延迟有多大? + +A: 通常在毫秒级,取决于网络状况。WebSocket 连接建立后,广播几乎实时。 + +### Q: Token 过期怎么办? + +A: Token 本身不会过期,除非手动删除应用安装记录。如收到 `join-error`,检查 token 是否已被卸载。 + +### Q: 可以在 Node.js 后端使用吗? + +A: 可以,使用相同的 socket.io-client 包,接口完全一致。 + +--- + +## 更新日志 + +### v1.1.0 (2025-10-25) + +**破坏性变更:** +- 移除直接使用 uuid 加入频道的接口(`join-device` / `leave-device`) +- 现在必须使用 KV token 通过 `join-token` 或握手 query 加入 + +**新增:** +- `leave-all` 事件:离开所有已加入的频道 +- 握手时支持 `token` 和 `apptoken` 两种参数名 + +**改进:** +- 同一设备的不同 token 自动归入同一房间 +- 优化在线设备计数准确性 + +--- + +## 技术支持 + +如有问题,请查阅: +- 服务端源码:`utils/socket.js` +- KV 路由:`routes/kv-token.js` +- 设备管理:`routes/device.js` + +或提交 Issue 到项目仓库。 diff --git a/bin/www b/bin/www index 01f4156..2546a83 100644 --- a/bin/www +++ b/bin/www @@ -6,6 +6,7 @@ import app from '../app.js'; import { createServer } from 'http'; +import { initSocket } from '../utils/socket.js'; /** * Get port from environment and store in Express. @@ -20,6 +21,9 @@ app.set("port", port); var server = createServer(app); +// 初始化 Socket.IO 并绑定到 HTTP Server +initSocket(server); + /** * Listen on provided port, on all network interfaces. */ diff --git a/package.json b/package.json index afdbf38..feab494 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "js-base64": "^3.7.7", "jsonwebtoken": "^9.0.2", "morgan": "~1.10.0", + "socket.io": "^4.8.1", "uuid": "^11.1.0" }, "devDependencies": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cd87d07..af36691 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -71,6 +71,9 @@ importers: morgan: specifier: ~1.10.0 version: 1.10.0 + socket.io: + specifier: ^4.8.1 + version: 4.8.1 uuid: specifier: ^11.1.0 version: 11.1.0 @@ -1078,6 +1081,9 @@ packages: cpu: [x64] os: [win32] + '@socket.io/component-emitter@3.1.2': + resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==} + '@standard-schema/spec@1.0.0': resolution: {integrity: sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA==} @@ -1201,6 +1207,9 @@ packages: '@types/connect@3.4.38': resolution: {integrity: sha512-K6uROf1LD88uDQqJCktA4yzL1YYAK6NgfsI0v/mTgyPKWsX1CnJ0XPSDhViejru1GcRkLWb8RlzFYJRqGUbaug==} + '@types/cors@2.8.19': + resolution: {integrity: sha512-mFNylyeyqN93lfe/9CSxOGREz8cpzAhH+E93xJ4xWQf62V8sQ/24reV2nyzUWM6H6Xji+GGHpkbLe7pVoUEskg==} + '@types/estree@1.0.8': resolution: {integrity: sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w==} @@ -1347,6 +1356,10 @@ packages: peerDependencies: vue: ^3.5.0 + accepts@1.3.8: + resolution: {integrity: sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==} + engines: {node: '>= 0.6'} + accepts@2.0.0: resolution: {integrity: sha512-5cvg6CtKwfgdmVqY1WIiXKc3Q1bkRqGLi+2W/6ao+6Y7gu/RCwRuAhGEzh5B4KlszSuTLgZYuqFqo5bImjNKng==} engines: {node: '>= 0.6'} @@ -1405,6 +1418,10 @@ packages: balanced-match@1.0.2: resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==} + base64id@2.0.0: + resolution: {integrity: sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==} + engines: {node: ^4.5.0 || >= 5.9} + basic-auth@2.0.1: resolution: {integrity: sha512-NF+epuEdnUYVlGuhaxbbq+dvJttwLnGY+YixlXlME5KpQ5W3CnXA5cVTneY3SPbPDRkcjMbifrwmFYcClgOZeg==} engines: {node: '>= 0.8'} @@ -1547,6 +1564,15 @@ packages: supports-color: optional: true + debug@4.3.7: + resolution: {integrity: sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==} + engines: {node: '>=6.0'} + peerDependencies: + supports-color: '*' + peerDependenciesMeta: + supports-color: + optional: true + debug@4.4.1: resolution: {integrity: sha512-KcKCqiftBJcZr++7ykoDIEwSa3XWowTfNPo92BYxjXiyYEVrUQh2aLyhxBCwww+heortUFxEJYcRzosstTEBYQ==} engines: {node: '>=6.0'} @@ -1615,6 +1641,14 @@ packages: resolution: {integrity: sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg==} engines: {node: '>= 0.8'} + engine.io-parser@5.2.3: + resolution: {integrity: sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==} + engines: {node: '>=10.0.0'} + + engine.io@6.6.4: + resolution: {integrity: sha512-ZCkIjSYNDyGn0R6ewHDtXgns/Zre/NT6Agvq1/WobF7JXgFff4SeDroKiCO3fNJreU9YG429Sc81o4w5ok/W5g==} + engines: {node: '>=10.2.0'} + enhanced-resolve@5.18.3: resolution: {integrity: sha512-d4lC8xfavMeBjzGr2vECC3fsGXziXZQyJxD868h2M/mBI3PwAuODxAkLkq5HYuvrPYcUtiLzsTo8U3PgX3Ocww==} engines: {node: '>=10.13.0'} @@ -2049,6 +2083,10 @@ packages: engines: {node: ^18 || >=20} hasBin: true + negotiator@0.6.3: + resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==} + engines: {node: '>= 0.6'} + negotiator@1.0.0: resolution: {integrity: sha512-8Ofs/AUQh8MaEcrlq5xOX0CQ9ypTF5dl78mjlMNfOK08fzpgTHQRQPBxcPlEtIw0yRpws+Zo/3r+5WRby7u3Gg==} engines: {node: '>= 0.6'} @@ -2300,6 +2338,17 @@ packages: resolution: {integrity: sha512-ZX99e6tRweoUXqR+VBrslhda51Nh5MTQwou5tnUDgbtyM0dBgmhEDtWGP/xbKn6hqfPRHujUNwz5fy/wbbhnpw==} engines: {node: '>= 0.4'} + socket.io-adapter@2.5.5: + resolution: {integrity: sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==} + + socket.io-parser@4.2.4: + resolution: {integrity: sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==} + engines: {node: '>=10.0.0'} + + socket.io@4.8.1: + resolution: {integrity: sha512-oZ7iUCxph8WYRHHcjBEc9unw3adt5CmSNlppj/5Q4k2RIrhl8Z5yY2Xr4j9zj0+wzVZ0bxmYoGSzKJnRl6A4yg==} + engines: {node: '>=10.2.0'} + source-map-js@1.2.1: resolution: {integrity: sha512-UXWMKhLOwVKb728IUtQPXxfYU+usdybtUrK/8uGE8CQMvrhOpwvzDBwj0QhSL7MQc7vIsISBG8VQ8+IDQxpfQA==} engines: {node: '>=0.10.0'} @@ -2523,6 +2572,18 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + ws@8.17.1: + resolution: {integrity: sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + xtend@4.0.2: resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} engines: {node: '>=0.4'} @@ -3591,6 +3652,8 @@ snapshots: '@rollup/rollup-win32-x64-msvc@4.52.3': optional: true + '@socket.io/component-emitter@3.1.2': {} + '@standard-schema/spec@1.0.0': {} '@swc/helpers@0.5.17': @@ -3692,6 +3755,10 @@ snapshots: dependencies: '@types/node': 22.15.17 + '@types/cors@2.8.19': + dependencies: + '@types/node': 24.6.1 + '@types/estree@1.0.8': {} '@types/memcached@2.2.10': @@ -3713,7 +3780,6 @@ snapshots: '@types/node@24.6.1': dependencies: undici-types: 7.13.0 - optional: true '@types/oracledb@6.5.2': dependencies: @@ -3904,6 +3970,11 @@ snapshots: dependencies: vue: 3.5.22(typescript@5.8.3) + accepts@1.3.8: + dependencies: + mime-types: 2.1.35 + negotiator: 0.6.3 + accepts@2.0.0: dependencies: mime-types: 3.0.1 @@ -3955,6 +4026,8 @@ snapshots: balanced-match@1.0.2: {} + base64id@2.0.0: {} + basic-auth@2.0.1: dependencies: safe-buffer: 5.1.2 @@ -4099,6 +4172,10 @@ snapshots: dependencies: ms: 2.0.0 + debug@4.3.7: + dependencies: + ms: 2.1.3 + debug@4.4.1: dependencies: ms: 2.1.3 @@ -4146,6 +4223,24 @@ snapshots: encodeurl@2.0.0: {} + engine.io-parser@5.2.3: {} + + engine.io@6.6.4: + dependencies: + '@types/cors': 2.8.19 + '@types/node': 24.6.1 + accepts: 1.3.8 + base64id: 2.0.0 + cookie: 0.7.2 + cors: 2.8.5 + debug: 4.3.7 + engine.io-parser: 5.2.3 + ws: 8.17.1 + transitivePeerDependencies: + - bufferutil + - supports-color + - utf-8-validate + enhanced-resolve@5.18.3: dependencies: graceful-fs: 4.2.11 @@ -4596,6 +4691,8 @@ snapshots: nanoid@5.1.6: {} + negotiator@0.6.3: {} + negotiator@1.0.0: {} node-addon-api@8.3.1: {} @@ -4906,6 +5003,36 @@ snapshots: side-channel-map: 1.0.1 side-channel-weakmap: 1.0.2 + socket.io-adapter@2.5.5: + dependencies: + debug: 4.3.7 + ws: 8.17.1 + transitivePeerDependencies: + - bufferutil + - supports-color + - utf-8-validate + + socket.io-parser@4.2.4: + dependencies: + '@socket.io/component-emitter': 3.1.2 + debug: 4.3.7 + transitivePeerDependencies: + - supports-color + + socket.io@4.8.1: + dependencies: + accepts: 1.3.8 + base64id: 2.0.0 + cors: 2.8.5 + debug: 4.3.7 + engine.io: 6.6.4 + socket.io-adapter: 2.5.5 + socket.io-parser: 4.2.4 + transitivePeerDependencies: + - bufferutil + - supports-color + - utf-8-validate + source-map-js@1.2.1: {} speakingurl@14.0.1: {} @@ -4978,8 +5105,7 @@ snapshots: undici-types@7.11.0: {} - undici-types@7.13.0: - optional: true + undici-types@7.13.0: {} unpipe@1.0.0: {} @@ -5085,6 +5211,8 @@ snapshots: wrappy@1.0.2: {} + ws@8.17.1: {} + xtend@4.0.2: {} y18n@5.0.8: {} diff --git a/routes/apps.js b/routes/apps.js index b1bf386..2aebfd9 100644 --- a/routes/apps.js +++ b/routes/apps.js @@ -75,6 +75,7 @@ router.post( appId: installation.appId, token: installation.token, note: installation.note, + name: installation.note, // 备注同时作为名称返回 installedAt: installation.createdAt, }); }) @@ -146,6 +147,7 @@ router.get( appId: install.appId, installedAt: install.installedAt, note: install.note, + name: install.note, // 备注同时作为名称返回 })); return res.json({ diff --git a/routes/device.js b/routes/device.js index 67951b2..4ff82d0 100644 --- a/routes/device.js +++ b/routes/device.js @@ -5,6 +5,7 @@ import { PrismaClient } from "@prisma/client"; import crypto from "crypto"; import errors from "../utils/errors.js"; import { hashPassword, verifyDevicePassword } from "../utils/crypto.js"; +import { getOnlineDevices } from "../utils/socket.js"; const prisma = new PrismaClient(); @@ -322,4 +323,36 @@ router.delete( }) ); -export default router; \ No newline at end of file +export default router; + +/** + * GET /devices/online + * 查询在线设备(WebSocket 已连接) + * 返回:[{ uuid, connections, name? }] + */ +router.get( + "/online", + errors.catchAsync(async (req, res) => { + const list = getOnlineDevices(); + + if (list.length === 0) { + return res.json({ success: true, devices: [] }); + } + + // 补充设备名称 + const uuids = list.map((x) => x.uuid); + const rows = await prisma.device.findMany({ + where: { uuid: { in: uuids } }, + select: { uuid: true, name: true }, + }); + const nameMap = new Map(rows.map((r) => [r.uuid, r.name])); + + const devices = list.map((x) => ({ + uuid: x.uuid, + connections: x.connections, + name: nameMap.get(x.uuid) || null, + })); + + res.json({ success: true, devices }); + }) +); \ No newline at end of file diff --git a/routes/kv-token.js b/routes/kv-token.js index 674703a..2df04d5 100644 --- a/routes/kv-token.js +++ b/routes/kv-token.js @@ -1,6 +1,7 @@ import { Router } from "express"; const router = Router(); import kvStore from "../utils/kvStore.js"; +import { broadcastKeyChanged } from "../utils/socket.js"; import { kvTokenAuth } from "../middleware/kvTokenAuth.js"; import errors from "../utils/errors.js"; import { PrismaClient } from "@prisma/client"; @@ -219,7 +220,7 @@ router.post( req.connection.socket?.remoteAddress || ""; - const results = []; + const results = []; const errorList = []; // 批量处理所有键值对 @@ -230,6 +231,17 @@ router.post( key: result.key, created: result.createdAt.getTime() === result.updatedAt.getTime(), }); + // 广播每个键的变更 + const uuid = res.locals.device?.uuid; + if (uuid) { + broadcastKeyChanged(uuid, { + key: result.key, + action: "upsert", + created: result.createdAt.getTime() === result.updatedAt.getTime(), + updatedAt: result.updatedAt, + batch: true, + }); + } } catch (error) { errorList.push({ key, @@ -273,6 +285,18 @@ router.post( ""; const result = await kvStore.upsert(deviceId, key, value, creatorIp); + + // 广播单个键的变更 + const uuid = res.locals.device?.uuid; + if (uuid) { + broadcastKeyChanged(uuid, { + key: result.key, + action: "upsert", + created: result.createdAt.getTime() === result.updatedAt.getTime(), + updatedAt: result.updatedAt, + }); + } + return res.status(200).json({ deviceId: result.deviceId, key: result.key, @@ -300,6 +324,16 @@ router.delete( ); } + // 广播删除 + const uuid = res.locals.device?.uuid; + if (uuid) { + broadcastKeyChanged(uuid, { + key, + action: "delete", + deletedAt: new Date(), + }); + } + // 204状态码表示成功但无内容返回 return res.status(204).end(); }) diff --git a/utils/socket.js b/utils/socket.js new file mode 100644 index 0000000..3b264d1 --- /dev/null +++ b/utils/socket.js @@ -0,0 +1,206 @@ +/** + * Socket.IO 管理与事件转发 + * + * 功能: + * - 初始化 Socket.IO 并与 HTTP Server 绑定 + * - 前端使用 KV token 加入设备频道(自动映射到对应设备 uuid 房间) + * - 同一设备的不同 token 会被归入同一频道 + * - 维护在线设备列表 + * - 提供广播 KV 键变更的工具方法 + */ + +import { Server } from "socket.io"; +import { PrismaClient } from "@prisma/client"; + +// Socket.IO 单例实例 +let io = null; + +// 在线设备映射:uuid -> Set +const onlineMap = new Map(); +const prisma = new PrismaClient(); + +/** + * 初始化 Socket.IO + * @param {import('http').Server} server HTTP Server 实例 + */ +export function initSocket(server) { + if (io) return io; + + const allowOrigin = process.env.FRONTEND_URL || "*"; + + io = new Server(server, { + cors: { + origin: allowOrigin, + methods: ["GET", "POST"], + credentials: true, + }, + }); + + io.on("connection", (socket) => { + // 初始化每个连接所加入的设备房间集合 + socket.data.deviceUuids = new Set(); + + // 仅允许通过 query.token/apptoken 加入 + const qToken = socket.handshake?.query?.token || socket.handshake?.query?.apptoken; + if (qToken && typeof qToken === "string") { + joinByToken(socket, qToken).catch(() => {}); + } + + // 客户端使用 KV token 加入房间 + socket.on("join-token", (payload) => { + const token = payload?.token || payload?.apptoken; + if (typeof token === "string" && token.length > 0) { + joinByToken(socket, token).catch(() => {}); + } + }); + + // 客户端使用 token 离开房间 + socket.on("leave-token", async (payload) => { + try { + const token = payload?.token || payload?.apptoken; + if (typeof token !== "string" || token.length === 0) return; + const appInstall = await prisma.appInstall.findUnique({ + where: { token }, + include: { device: { select: { uuid: true } } }, + }); + const uuid = appInstall?.device?.uuid; + if (uuid) leaveDeviceRoom(socket, uuid); + } catch { + // ignore + } + }); + + // 离开所有已加入的设备房间 + socket.on("leave-all", () => { + const uuids = Array.from(socket.data.deviceUuids || []); + uuids.forEach((u) => leaveDeviceRoom(socket, u)); + }); + + // 聊天室:发送文本消息到加入的设备频道 + socket.on("chat:send", (data) => { + try { + const text = typeof data === "string" ? data : data?.text; + if (typeof text !== "string") return; + const trimmed = text.trim(); + if (!trimmed) return; + + // 限制消息最大长度,避免滥用 + const MAX_LEN = 2000; + const safeText = trimmed.length > MAX_LEN ? trimmed.slice(0, MAX_LEN) : trimmed; + + const uuids = Array.from(socket.data.deviceUuids || []); + if (uuids.length === 0) return; + + const at = new Date().toISOString(); + const payload = { text: safeText, at, senderId: socket.id }; + + uuids.forEach((uuid) => { + io.to(uuid).emit("chat:message", { uuid, ...payload }); + }); + } catch (err) { + console.error("chat:send error:", err); + } + }); + + socket.on("disconnect", () => { + const uuids = Array.from(socket.data.deviceUuids || []); + uuids.forEach((u) => removeOnline(u, socket.id)); + }); + }); + + return io; +} + +/** 返回 Socket.IO 实例 */ +export function getIO() { + return io; +} + +/** + * 让 socket 加入设备房间并记录在线 + * @param {import('socket.io').Socket} socket + * @param {string} uuid + */ +function joinDeviceRoom(socket, uuid) { + socket.join(uuid); + if (!socket.data.deviceUuids) socket.data.deviceUuids = new Set(); + socket.data.deviceUuids.add(uuid); + // 记录在线 + const set = onlineMap.get(uuid) || new Set(); + set.add(socket.id); + onlineMap.set(uuid, set); + // 可选:通知加入 + io.to(uuid).emit("device-joined", { uuid, connections: set.size }); +} + +/** + * 让 socket 离开设备房间并更新在线表 + * @param {import('socket.io').Socket} socket + * @param {string} uuid + */ +function leaveDeviceRoom(socket, uuid) { + socket.leave(uuid); + if (socket.data.deviceUuids) socket.data.deviceUuids.delete(uuid); + removeOnline(uuid, socket.id); +} + +function removeOnline(uuid, socketId) { + const set = onlineMap.get(uuid); + if (!set) return; + set.delete(socketId); + if (set.size === 0) { + onlineMap.delete(uuid); + } else { + onlineMap.set(uuid, set); + } +} + +/** + * 广播某设备下 KV 键已变更 + * @param {string} uuid 设备 uuid + * @param {object} payload { key, action: 'upsert'|'delete'|'batch', updatedAt?, created? } + */ +export function broadcastKeyChanged(uuid, payload) { + if (!io || !uuid) return; + io.to(uuid).emit("kv-key-changed", { uuid, ...payload }); +} + +/** + * 获取在线设备列表 + * @returns {Array<{uuid:string, connections:number}>} + */ +export function getOnlineDevices() { + const list = []; + for (const [uuid, set] of onlineMap.entries()) { + list.push({ uuid, connections: set.size }); + } + // 默认按连接数降序 + return list.sort((a, b) => b.connections - a.connections); +} + +export default { + initSocket, + getIO, + broadcastKeyChanged, + getOnlineDevices, +}; + +/** + * 通过 KV token 让 socket 加入对应设备的房间 + * @param {import('socket.io').Socket} socket + * @param {string} token + */ +async function joinByToken(socket, token) { + const appInstall = await prisma.appInstall.findUnique({ + where: { token }, + include: { device: { select: { uuid: true } } }, + }); + const uuid = appInstall?.device?.uuid; + if (uuid) { + joinDeviceRoom(socket, uuid); + // 可选:回执 + socket.emit("joined", { by: "token", uuid }); + } else { + socket.emit("join-error", { by: "token", reason: "invalid_token" }); + } +}