flask websocket服务搭建,flask-sock 和 flask-socketio

火影  论坛元老 | 2024-11-2 08:37:10 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1761|帖子 1761|积分 5283

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
flask websocket服务搭建,flask-sock 和 flask-socketio

首先说下socket、socketio 和 websockets 之间的区别

socket是网络上运行的两个程序之间的双向通信链路的一个端点。这是一个非常低级的东西,其他统统都是在 TCP 套接字之上实现的。 WebSocket 是 Web 的标准通信协议。它允许在客户端和服务器之间创建全双工通信通道。 Socket.IO 是一种基于 HTTP 和 WebSocket 构建的通信协议,提供自动重新连接、基于变乱的通知等额外功能。 Flask-SocketIO 是 Socket.IO 服务器端协议的实现,作为 Flask 扩展。
两个主流框架flask-sock 和 flask-socketio
flask-sock是原生的websocket协议,flask-socketio是socketio协议,必要根据你自己的需求选择合适的框架,假如用socketio那么客户端建议也用socketio库举行连接
websocket收发的都是原生的数据包
而socketio在原生的数据包上还加了协议数据好比状态码和监听变乱等
特性WebSocketSocket.IO连接协议基于WebSocket协议封装的WebSocket协议,提供更多功能数据包结构包含Opcode, Payload Length, Mask, Payload Data包含Event name和Data功能界说标准的双向通信,没有内建的重连或变乱支持支持变乱、重连、自动重试、定名空间等功能发送和吸收方式通过raw WebSocket send和onmessage直接发送和吸收封装的数据通过emit和on方法,使用变乱名来识别支持的传输方式只支持WebSocket支持多个传输方式(WebSocket, Polling等)

安装两个库
  1. pip install flask-socketio flask-sock
复制代码
flask-sock例子

服务端example
  1. from flask import Flask, render_template
  2. from flask_sock import Sock
  3. app = Flask(__name__)
  4. app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}
  5. sock = Sock(app)
  6. @app.route('/')
  7. def index():
  8.     return render_template('index.html')
  9. @sock.route('/echo')
  10. def echo(ws):
  11.     while True:
  12.         data = ws.receive()
  13.         if data == 'close':
  14.             break
  15.         ws.send(data)
  16. if __name__ == '__main__':
  17.     app.run()
复制代码
客户端example
  1. from flask import Flask, render_template
  2. from flask_sock import Sock
  3. app = Flask(__name__)
  4. app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}
  5. sock = Sock(app)
  6. @app.route('/')
  7. def index():
  8.     return render_template('index.html')
  9. @sock.route('/echo')
  10. def echo(ws):
  11.     while True:
  12.         data = ws.receive()
  13.         if data == 'close':
  14.             break
  15.         ws.send(data)
  16. if __name__ == '__main__':
  17.     app.run()
复制代码
flask-socketio例子

服务端example
  1. from flask import Flask, render_template
  2. from flask_socketio import SocketIO, emit
  3. app = Flask(__name__)
  4. app.config['SECRET_KEY'] = 'your_secret_key'  # 设置一个密钥用于会话、Cookie等
  5. socketio = SocketIO(app)
  6. @app.route('/')
  7. def index():
  8.     return render_template('index.html')
  9. @socketio.on('message')
  10. def handle_message(message):
  11.     print('Received message: ' + message)
  12.     emit('message', message)  # Echo the received message back
  13. @socketio.on('close')
  14. def handle_close():
  15.     print('Closing connection')
  16. if __name__ == '__main__':
  17.     socketio.run(app)
复制代码
客户端example
  1. import time
  2. import socketio
  3. # 创建 SocketIO 客户端实例
  4. sio = socketio.Client()
  5. @sio.event
  6. def connect():
  7.     print("Connection established")
  8. @sio.event
  9. def message(data):
  10.     print(f"Message received: {data}")
  11. @sio.event
  12. def disconnect():
  13.     print("Disconnected from server")
  14. # 连接到服务器
  15. sio.connect('http://localhost:5000')
  16. # 发送消息
  17. sio.send("Hello, Socket.IO!")
  18. # 等待一些时间以接收消息
  19. time.sleep(1)
  20. # 发送关闭消息并断开连接
  21. sio.disconnect()
复制代码
假如要用websocket格式来测试这个socketio接口
  1. import websockets
  2. import asyncio
  3. import json
  4. import time
  5. import random
  6. async def test_socketio():
  7.     # Socket.IO 在 WebSocket URL 上添加特定的路径和参数
  8.     uri = "ws://localhost:5000/socket.io/?EIO=4&transport=websocket"
  9.    
  10.     async with websockets.connect(uri) as ws:
  11.         print("Connected to server")
  12.         
  13.         # 发送 Socket.IO 握手消息
  14.         await ws.send("40")  # Socket.IO v4 握手消息
  15.         
  16.         # 发送消息
  17.         message = {
  18.             "type": "message",
  19.             "data": "Hello, Socket.IO!"
  20.         }
  21.         
  22.         # Socket.IO 消息格式: "42" + JSON数组 [event, data]
  23.         socketio_msg = f'42{json.dumps(["message", message])}'
  24.         await ws.send(socketio_msg)
  25.         
  26.         # 接收消息
  27.         try:
  28.             while True:
  29.                 response = await asyncio.wait_for(ws.recv(), timeout=2)
  30.                 print(f"Received: {response}")
  31.                
  32.                 # 如果是 PING 消息(2),回应 PONG(3)
  33.                 if response.startswith("2"):
  34.                     await ws.send("3")
  35.                     continue
  36.                
  37.                 # 解析常规消息
  38.                 if response.startswith("42"):
  39.                     try:
  40.                         data = json.loads(response[2:])
  41.                         print(f"Parsed message: {data}")
  42.                     except json.JSONDecodeError:
  43.                         print("Failed to parse message")
  44.         
  45.         except asyncio.TimeoutError:
  46.             print("No more messages")
  47.         
  48.         # 发送关闭消息
  49.         await ws.send("41")  # Socket.IO 关闭消息
  50. if __name__ == "__main__":
  51.     asyncio.run(test_socketio())
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

火影

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表