Python服务器和客户端功能库之websockets使用详解

打印 上一主题 下一主题

主题 531|帖子 531|积分 1593




概要

WebSockets 是一种在单个 TCP 毗连上举行全双工通信的协议,特殊实用于需要低延伸和高频率数据传输的实时应用,比方在线游戏、谈天应用和实时数据流。websockets 是一个基于 asyncio 的 Python 库,旨在提供简单易用的 WebSockets 服务器和客户端功能。本文将详细介绍 websockets 库,包括其安装方法、主要特性、基本和高级功能,以及现实应用场景,资助全面了解并掌握该库的使用。

安装

要使用 websockets 库,首先需要安装它。以下是安装步骤:
使用 pip 安装

可以通过 pip 直接安装 websockets:
  1. pip install websockets
复制代码
确认安装

安装完成后,可以通过以下命令确认安装是否成功:
  1. python -c "import websockets; print(websockets.__version__)"
复制代码
特性


  • 简单易用:提供简洁的 API,方便快速上手。
  • 基于 asyncio:利用 Python 的 asyncio 库实现异步 I/O 操纵,支持高并发。
  • 全双工通信:支持在单个毗连上同时举行数据发送和吸取。
  • 支持多种协议:兼容 WebSocket 协议,支持 SSL/TLS 加密。
  • 灵活扩展:支持自定义协媾和中间件,方便扩展功能。
基本功能

创建 WebSocket 服务器

可以使用 websockets.serve 创建一个简单的 WebSocket 服务器:
  1. import asyncio
  2. import websockets
  3. async def echo(websocket, path):
  4.     async for message in websocket:
  5.         await websocket.send(message)
  6. start_server = websockets.serve(echo, "localhost", 8765)
  7. asyncio.get_event_loop().run_until_complete(start_server)
  8. asyncio.get_event_loop().run_forever()
复制代码
创建 WebSocket 客户端

可以使用 websockets.connect 创建一个简单的 WebSocket 客户端:
  1. import asyncio
  2. import websockets
  3. async def hello():
  4.     uri = "ws://localhost:8765"
  5.     async with websockets.connect(uri) as websocket:
  6.         await websocket.send("Hello, World!")
  7.         response = await websocket.recv()
  8.         print(f"< {response}")
  9. asyncio.get_event_loop().run_until_complete(hello())
复制代码
处理非常

可以在服务器和客户端代码中处理毗连和传输中的非常:
  1. import asyncio
  2. import websockets
  3. async def echo(websocket, path):
  4.     try:
  5.         async for message in websocket:
  6.             await websocket.send(message)
  7.     except websockets.ConnectionClosed as e:
  8.         print(f"Connection closed: {e}")
  9. start_server = websockets.serve(echo, "localhost", 8765)
  10. asyncio.get_event_loop().run_until_complete(start_server)
  11. asyncio.get_event_loop().run_forever()
复制代码
高级功能

广播消息

可以实现消息广播功能,将消息发送给所有毗连的客户端:
  1. import asyncio
  2. import websockets
  3. connected_clients = set()
  4. async def handler(websocket, path):
  5.     connected_clients.add(websocket)
  6.     try:
  7.         async for message in websocket:
  8.             await asyncio.wait([client.send(message) for client in connected_clients])
  9.     finally:
  10.         connected_clients.remove(websocket)
  11. start_server = websockets.serve(handler, "localhost", 8765)
  12. asyncio.get_event_loop().run_until_complete(start_server)
  13. asyncio.get_event_loop().run_forever()
复制代码
SSL/TLS 加密

可以为 WebSocket 服务器添加 SSL/TLS 加密,确保数据传输安全:
  1. import asyncio
  2. import ssl
  3. import websockets
  4. ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  5. ssl_context.load_cert_chain(certfile="path/to/certfile", keyfile="path/to/keyfile")
  6. async def echo(websocket, path):
  7.     async for message in websocket:
  8.         await websocket.send(message)
  9. start_server = websockets.serve(echo, "localhost", 8765, ssl=ssl_context)
  10. asyncio.get_event_loop().run_until_complete(start_server)
  11. asyncio.get_event_loop().run_forever()
复制代码
自定义协议

可以实现自定义 WebSocket 协议,扩展 WebSocket 的功能:
  1. import asyncio
  2. import websockets
  3. class CustomProtocol(websockets.WebSocketServerProtocol):
  4.     async def process_request(self, path, request_headers):
  5.         if path != "/custom":
  6.             return (404, [], b"Not Found")
  7. start_server = websockets.serve(echo, "localhost", 8765, create_protocol=CustomProtocol)
  8. asyncio.get_event_loop().run_until_complete(start_server)
  9. asyncio.get_event_loop().run_forever()
复制代码
现实应用场景

实时谈天应用

在实时谈天应用中,通过 websockets 实现消息的实时传输和广播。
  1. import asyncio
  2. import websockets
  3. connected_clients = set()
  4. async def chat_handler(websocket, path):
  5.     connected_clients.add(websocket)
  6.     try:
  7.         async for message in websocket:
  8.             await asyncio.wait([client.send(message) for client in connected_clients])
  9.     finally:
  10.         connected_clients.remove(websocket)
  11. start_server = websockets.serve(chat_handler, "localhost", 8765)
  12. asyncio.get_event_loop().run_until_complete(start_server)
  13. asyncio.get_event_loop().run_forever()
复制代码
实时数据流

在实时数据流应用中,通过 websockets 实现服务器向客户端实时推送数据。
  1. import asyncio
  2. import websockets
  3. import random
  4. import time
  5. async def data_stream(websocket, path):
  6.     while True:
  7.         data = random.randint(1, 100)
  8.         await websocket.send(str(data))
  9.         await asyncio.sleep(1)
  10. start_server = websockets.serve(data_stream, "localhost", 8765)
  11. asyncio.get_event_loop().run_until_complete(start_server)
  12. asyncio.get_event_loop().run_forever()
复制代码
在线游戏

在多人在线游戏中,通过 websockets 实现玩家间的实时通信和游戏状态同步。
  1. import asyncio
  2. import websockets
  3. players = set()
  4. async def game_handler(websocket, path):
  5.     players.add(websocket)
  6.     try:
  7.         async for message in websocket:
  8.             await asyncio.wait([player.send(message) for player in players])
  9.     finally:
  10.         players.remove(websocket)
  11. start_server = websockets.serve(game_handler, "localhost", 8765)
  12. asyncio.get_event_loop().run_until_complete(start_server)
  13. asyncio.get_event_loop().run_forever()
复制代码
实时监控体系

在实时监控体系中,通过 websockets 实现服务器向多个客户端实时推送监控数据。
  1. import asyncio
  2. import websockets
  3. import random
  4. connected_clients = set()
  5. async def monitor_handler(websocket, path):
  6.     connected_clients.add(websocket)
  7.     try:
  8.         while True:
  9.             data = random.randint(1, 100)
  10.             await asyncio.wait([client.send(str(data)) for client in connected_clients])
  11.             await asyncio.sleep(1)
  12.     finally:
  13.         connected_clients.remove(websocket)
  14. start_server = websockets.serve(monitor_handler, "localhost", 8765)
  15. asyncio.get_event_loop().run_until_complete(start_server)
  16. asyncio.get_event_loop().run_forever()
复制代码
总结

websockets 库是一个功能强大且易于使用的工具,能够资助开发者在各种应用场景中实现 WebSocket 通信。通过支持全双工通信、基于 asyncio 的高并发处理、多种协议支持和灵活扩展,websockets 提供了强大的功能和灵活的扩展本领。本文详细介绍了 websockets 库的安装方法、主要特性、基本和高级功能,以及现实应用场景。希望本文能资助各人全面掌握 websockets 库的使用,并在现实项目中发挥其优势。无论是在实时谈天、实时数据流照旧在线游戏和实时监控体系中,websockets 库都将是一个得力的工具。

如果你以为文章还不错,请各人 点赞、分享、留言 下,因为这将是我持续输出更多优质文章的最强动力!



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表