火影 发表于 2024-11-2 08:37:10

flask websocket服务搭建,flask-sock 和 flask-socketio

flask websocket服务搭建,flask-sock 和 flask-socketio

首先说下socket、socketio 和 websockets 之间的区别

socket是网络上运行的两个程序之间的双向通信链路的一个端点。这是一个非常低级的东西,其他统统都是在 TCP 套接字之上实现的。 WebSocket 是 Web 的标准通信协议。它允许在客户端和服务器之间创建全双工通信通道。 Socket.IO 是一种基于 HTTP 和 WebSocket 构建的通信协议,提供自动重新连接、基于变乱的通知等额外功能。 Flask-SocketIO 是 Socket.IO 服务器端协议的实现,作为 Flask 扩展。
两个主流框架flask-sock 和 flask-socketio
flask-sock是原生的websocket协议,flask-socketio是socketio协议,必要根据你自己的需求选择合适的框架,假如用socketio那么客户端建议也用socketio库举行连接
websocket收发的都是原生的数据包
而socketio在原生的数据包上还加了协议数据好比状态码和监听变乱等
特性WebSocketSocket.IO连接协议基于WebSocket协议封装的WebSocket协议,提供更多功能数据包结构包含Opcode, Payload Length, Mask, Payload Data包含Event name和Data功能界说标准的双向通信,没有内建的重连或变乱支持支持变乱、重连、自动重试、定名空间等功能发送和吸收方式通过raw WebSocket send和onmessage直接发送和吸收封装的数据通过emit和on方法,使用变乱名来识别支持的传输方式只支持WebSocket支持多个传输方式(WebSocket, Polling等)

安装两个库
pip install flask-socketio flask-sock
flask-sock例子

服务端example
from flask import Flask, render_template
from flask_sock import Sock

app = Flask(__name__)
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}

sock = Sock(app)


@app.route('/')
def index():
    return render_template('index.html')


@sock.route('/echo')
def echo(ws):
    while True:
      data = ws.receive()
      if data == 'close':
            break
      ws.send(data)


if __name__ == '__main__':
    app.run()
客户端example
from flask import Flask, render_template
from flask_sock import Sock

app = Flask(__name__)
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 25}

sock = Sock(app)


@app.route('/')
def index():
    return render_template('index.html')


@sock.route('/echo')
def echo(ws):
    while True:
      data = ws.receive()
      if data == 'close':
            break
      ws.send(data)


if __name__ == '__main__':
    app.run()
flask-socketio例子

服务端example
from flask import Flask, render_template
from flask_socketio import SocketIO, emit

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'# 设置一个密钥用于会话、Cookie等

socketio = SocketIO(app)

@app.route('/')
def index():
    return render_template('index.html')

@socketio.on('message')
def handle_message(message):
    print('Received message: ' + message)
    emit('message', message)# Echo the received message back

@socketio.on('close')
def handle_close():
    print('Closing connection')

if __name__ == '__main__':
    socketio.run(app)
客户端example
import time
import socketio

# 创建 SocketIO 客户端实例
sio = socketio.Client()

@sio.event
def connect():
    print("Connection established")

@sio.event
def message(data):
    print(f"Message received: {data}")

@sio.event
def disconnect():
    print("Disconnected from server")

# 连接到服务器
sio.connect('http://localhost:5000')

# 发送消息
sio.send("Hello, Socket.IO!")

# 等待一些时间以接收消息
time.sleep(1)

# 发送关闭消息并断开连接
sio.disconnect()
假如要用websocket格式来测试这个socketio接口
import websockets
import asyncio
import json
import time
import random

async def test_socketio():
    # Socket.IO 在 WebSocket URL 上添加特定的路径和参数
    uri = "ws://localhost:5000/socket.io/?EIO=4&transport=websocket"
   
    async with websockets.connect(uri) as ws:
      print("Connected to server")
      
      # 发送 Socket.IO 握手消息
      await ws.send("40")# Socket.IO v4 握手消息
      
      # 发送消息
      message = {
            "type": "message",
            "data": "Hello, Socket.IO!"
      }
      
      # Socket.IO 消息格式: "42" + JSON数组
      socketio_msg = f'42{json.dumps(["message", message])}'
      await ws.send(socketio_msg)
      
      # 接收消息
      try:
            while True:
                response = await asyncio.wait_for(ws.recv(), timeout=2)
                print(f"Received: {response}")
               
                # 如果是 PING 消息(2),回应 PONG(3)
                if response.startswith("2"):
                  await ws.send("3")
                  continue
               
                # 解析常规消息
                if response.startswith("42"):
                  try:
                        data = json.loads(response)
                        print(f"Parsed message: {data}")
                  except json.JSONDecodeError:
                        print("Failed to parse message")
      
      except asyncio.TimeoutError:
            print("No more messages")
      
      # 发送关闭消息
      await ws.send("41")# Socket.IO 关闭消息

if __name__ == "__main__":
    asyncio.run(test_socketio())

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: flask websocket服务搭建,flask-sock 和 flask-socketio