王海鱼 发表于 2024-9-8 22:07:57

Python服务器和客户端功能库之websockets使用详解

https://i-blog.csdnimg.cn/direct/ac60d1e6cc9c4156a5ed7ad7a25a42aa.png

概要

WebSockets 是一种在单个 TCP 毗连上举行全双工通信的协议,特殊实用于需要低延伸和高频率数据传输的实时应用,比方在线游戏、谈天应用和实时数据流。websockets 是一个基于 asyncio 的 Python 库,旨在提供简单易用的 WebSockets 服务器和客户端功能。本文将详细介绍 websockets 库,包括其安装方法、主要特性、基本和高级功能,以及现实应用场景,资助全面了解并掌握该库的使用。
安装

要使用 websockets 库,首先需要安装它。以下是安装步骤:
使用 pip 安装

可以通过 pip 直接安装 websockets:
pip install websockets
确认安装

安装完成后,可以通过以下命令确认安装是否成功:
python -c "import websockets; print(websockets.__version__)"
特性


[*] 简单易用:提供简洁的 API,方便快速上手。
[*] 基于 asyncio:利用 Python 的 asyncio 库实现异步 I/O 操纵,支持高并发。
[*] 全双工通信:支持在单个毗连上同时举行数据发送和吸取。
[*] 支持多种协议:兼容 WebSocket 协议,支持 SSL/TLS 加密。
[*] 灵活扩展:支持自定义协媾和中间件,方便扩展功能。
基本功能

创建 WebSocket 服务器

可以使用 websockets.serve 创建一个简单的 WebSocket 服务器:
import asyncio
import websockets

async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
创建 WebSocket 客户端

可以使用 websockets.connect 创建一个简单的 WebSocket 客户端:
import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await websocket.send("Hello, World!")
        response = await websocket.recv()
        print(f"< {response}")

asyncio.get_event_loop().run_until_complete(hello())
处理非常

可以在服务器和客户端代码中处理毗连和传输中的非常:
import asyncio
import websockets

async def echo(websocket, path):
    try:
        async for message in websocket:
            await websocket.send(message)
    except websockets.ConnectionClosed as e:
        print(f"Connection closed: {e}")

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
高级功能

广播消息

可以实现消息广播功能,将消息发送给所有毗连的客户端:
import asyncio
import websockets

connected_clients = set()

async def handler(websocket, path):
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            await asyncio.wait()
    finally:
        connected_clients.remove(websocket)

start_server = websockets.serve(handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
SSL/TLS 加密

可以为 WebSocket 服务器添加 SSL/TLS 加密,确保数据传输安全:
import asyncio
import ssl
import websockets

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile="path/to/certfile", keyfile="path/to/keyfile")

async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)

start_server = websockets.serve(echo, "localhost", 8765, ssl=ssl_context)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
自定义协议

可以实现自定义 WebSocket 协议,扩展 WebSocket 的功能:
import asyncio
import websockets

class CustomProtocol(websockets.WebSocketServerProtocol):
    async def process_request(self, path, request_headers):
        if path != "/custom":
            return (404, [], b"Not Found")

start_server = websockets.serve(echo, "localhost", 8765, create_protocol=CustomProtocol)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
现实应用场景

实时谈天应用

在实时谈天应用中,通过 websockets 实现消息的实时传输和广播。
import asyncio
import websockets

connected_clients = set()

async def chat_handler(websocket, path):
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            await asyncio.wait()
    finally:
        connected_clients.remove(websocket)

start_server = websockets.serve(chat_handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
实时数据流

在实时数据流应用中,通过 websockets 实现服务器向客户端实时推送数据。
import asyncio
import websockets
import random
import time

async def data_stream(websocket, path):
    while True:
        data = random.randint(1, 100)
        await websocket.send(str(data))
        await asyncio.sleep(1)

start_server = websockets.serve(data_stream, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
在线游戏

在多人在线游戏中,通过 websockets 实现玩家间的实时通信和游戏状态同步。
import asyncio
import websockets

players = set()

async def game_handler(websocket, path):
    players.add(websocket)
    try:
        async for message in websocket:
            await asyncio.wait()
    finally:
        players.remove(websocket)

start_server = websockets.serve(game_handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
实时监控体系

在实时监控体系中,通过 websockets 实现服务器向多个客户端实时推送监控数据。
import asyncio
import websockets
import random

connected_clients = set()

async def monitor_handler(websocket, path):
    connected_clients.add(websocket)
    try:
        while True:
            data = random.randint(1, 100)
            await asyncio.wait()
            await asyncio.sleep(1)
    finally:
        connected_clients.remove(websocket)

start_server = websockets.serve(monitor_handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
总结

websockets 库是一个功能强大且易于使用的工具,能够资助开发者在各种应用场景中实现 WebSocket 通信。通过支持全双工通信、基于 asyncio 的高并发处理、多种协议支持和灵活扩展,websockets 提供了强大的功能和灵活的扩展本领。本文详细介绍了 websockets 库的安装方法、主要特性、基本和高级功能,以及现实应用场景。希望本文能资助各人全面掌握 websockets 库的使用,并在现实项目中发挥其优势。无论是在实时谈天、实时数据流照旧在线游戏和实时监控体系中,websockets 库都将是一个得力的工具。

如果你以为文章还不错,请各人 点赞、分享、留言 下,因为这将是我持续输出更多优质文章的最强动力!



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Python服务器和客户端功能库之websockets使用详解