# fastapi_websocket **Repository Path**: duans/fastapi_websocket ## Basic Information - **Project Name**: fastapi_websocket - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-03-03 - **Last Updated**: 2026-03-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## websocket WebSocket 是一种网络通信协议,它在单个 TCP 连接上提供全双工(full-duplex)通信信道。与传统的 HTTP 请求 - 响应模式不同,WebSocket 允许服务器主动向客户端推送数据,非常适合需要实时交互的应用场景(如在线聊天、实时游戏、股票行情推送、协同编辑等)。websocket即在web场景中实现socket双向通信。 以下是 WebSocket 的核心特点和基本概念: ### 核心特点 - **持久连接**:一旦建立连接,它会一直保持打开状态,直到客户端或服务器主动关闭。这避免了 HTTP 短连接带来的频繁握手开销。 - **双向通信**:客户端和服务器可以随时互相发送数据,无需等待对方的请求。 - **低延迟**:由于不需要每次通信都进行 HTTP 头部传输和握手,数据传输的延迟极低。 - **轻量级帧**:数据以“帧”的形式传输,头部开销很小(最小只有 2 字节)。 ### 工作原理 WebSocket 的连接建立过程通常始于一个标准的 HTTP 请求(称为“握手”): 1. 握手(Handshake) - 客户端发送一个特殊的 HTTP 请求,包含 `Upgrade: websocket` 和 `Connection: Upgrade` 头信息。 - 服务器如果支持 WebSocket,会返回一个 `101 Switching Protocols` 状态码,表示协议切换成功。 2. 数据传输 - 握手完成后,HTTP 连接升级为 WebSocket 连接。 - 双方通过二进制帧或文本帧直接交换数据。 3. 关闭连接 - 任何一方都可以发送关闭帧来终止连接。  ### http和websocket的区别  ### 代码示例 #### **前端 (JavaScript)** 浏览器原生支持 WebSocket API: ```js // 创建连接 const socket = new WebSocket('ws://example.com/socket'); // const socket = new WebSocket('wss://example.com/socket'); // 连接打开时触发 socket.onopen = function(event) { console.log('连接已建立'); // 发送数据 // socket.send(JSON.stringify({ type:"chat":content:'你好,服务器!' })); socket.send('你好,服务器!'); }; // 收到消息时触发 socket.onmessage = function(event) { // 如果服务端发送的是json, 需要进行格式化 console.log('收到消息:', JSON.parse(event.data)); }; // 发生错误时触发 socket.onerror = function(error) { console.error('发生错误:', error); }; // 连接关闭时触发 socket.onclose = function(event) { console.log('连接已关闭'); }; ``` 心跳检测 ```js const socket = new WebSocket('ws://example.com'); let heartbeatInterval; let timeoutTimer; socket.onopen = () => { console.log('连接建立'); // 启动心跳发送 (每 30 秒) heartbeatInterval = setInterval(() => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ type: 'ping', ts: Date.now() })); // 设置超时检测:如果 10 秒内没收到 pong,认为断开 timeoutTimer = setTimeout(() => { console.warn('心跳超时,关闭连接'); socket.close(); }, 10000); } }, 30000); }; socket.onmessage = (event) => { const data = JSON.parse(event.data); if (data.type === 'pong') { // 收到 pong,清除超时定时器 if (timeoutTimer) clearTimeout(timeoutTimer); } // 处理其他业务消息... }; socket.onclose = () => { if (heartbeatInterval) clearInterval(heartbeatInterval); if (timeoutTimer) clearTimeout(timeoutTimer); }; ``` *注意:安全环境下(HTTPS)通常使用 `wss://` 协议(相当于 HTTP over SSL/TLS)。* #### **后端 (python 示例 - 使用** `websockets` ) ```python import asyncio import websockets # websocket: 系统参数, 保存了客户端相关信息 async def handler(websocket): try: async for message in websocket: # 处理业务消息 print(f"收到消息: {message}") # 将客户端发送的消息原样发送给了客户端 await websocket.send(f"Echo: {message}") except websockets.exceptions.ConnectionClosed: print("连接已关闭") # start_server 参数配置: # ping_interval=20: 每 20 秒发送一次 ping # ping_timeout=20: 如果 20 秒内没收到 pong,则关闭连接 async def main(): # 创建一个websocket实例 start_server = websockets.serve( handler, "localhost", 8765, # ping_interval=20, # ping_timeout=20 ) # 启动一个事件循环 asyncio.run(main()) ``` ### 常见应用场景 - **即时通讯 (IM)**:微信、Slack 等聊天软件的实时消息推送。 - **在线游戏**:多人竞技游戏的实时状态同步。 - **金融交易**:股票、加密货币的实时价格更新。 - **协作工具**:Google Docs 等多人同时编辑文档的光标同步和内容更新。 - **实时监控**:服务器监控仪表盘、物联网 (IoT) 设备数据上报。 ### fastapi中使用websocket ```python # app.py from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from typing import List import json # 创建fastapi应用实例 app = FastAPI() # 挂载静态页面 app.mount("/static", StaticFiles(directory="static"), name="static") # 自定义类实现管理websocket客户端连接 class ConnectionManager: def __init__(self): # 存储所有活跃的 websocket 连接 self.active_connections: List[WebSocket] = [] # 当客户端建立连接的时候, 自动执行connect方法 async def connect(self, websocket: WebSocket): # 接受websocket连接 await websocket.accept() # 将新连接添加到列表 self.active_connections.append(websocket) print(f"新用户加入,当前在线人数: {len(self.active_connections)}") # 当客户端断开链接的时候, 自动执行disconnect方法 def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: # 从列表中移除断开的连接 self.active_connections.remove(websocket) print(f"用户离开,当前在线人数: {len(self.active_connections)}") # 发送个人消息给指定的websocket连接 async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) # 广播消息给所有连接,或者除了发送者之外的所有连接 async def broadcast(self, message: str, sender_websocket: WebSocket = None): """ 广播消息给所有连接,或者除了发送者之外的所有连接 :param message: 要发送的消息 :param sender_websocket: 可选,如果是群聊,通常不发给发送者自己(因为客户端已经显示了) """ for connection in self.active_connections: if connection != sender_websocket: try: await connection.send_text(message) except Exception as e: # 如果发送失败(例如连接已断开),移除该连接 print(f"发送消息失败,移除连接: {e}") self.disconnect(connection) # 初始化连接管理器 manager = ConnectionManager() # new Websocket('ws://localhost:8000/ws/user001') # 监听websocket链接 @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: str): # 连接到管理器 await manager.connect(websocket) try: # 广播欢迎消息 await manager.broadcast(json.dumps( { 'client_id':'系统', 'message':f'{client_id} 加入了群聊' } ), sender_websocket=websocket) while True: # 接收消息 data = await websocket.receive_text() # 构建消息对象(可选:包含发送者ID) message_data = { "client_id": client_id, "message": data } # 将消息字典转换为json字符串 formatted_message = json.dumps(message_data) # 广播给其他人 await manager.broadcast(formatted_message, sender_websocket=websocket) except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f"用户 {client_id} 离开了群聊") except Exception as e: manager.disconnect(websocket) print(f"发生错误: {e}") # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000) ############################################################ # 启动服务器 # # uvicorn app:app --reload --host 0.0.0.0 --port 8000 # # # ############################################################ ``` 前端页面 ```html