Mediamtx+Python读取webrtc流
一、功能思绪:1、我采用ffmpeg -re -stream_loop -1 -i xcc.mp4 -c:v libx264 -profile:v baseline -x264opts "bframes=0:repeat_headers=1" -b:v 1500k -preset fast -f flv rtmp://127.0.0.1:1835/stream/111推流到mediamtx的rtmp上
2、通过mediamtx自带的转流,我将可以直接把推过来的流以webrtc方式访问http://127.0.0.1:8889/stream/111
3、由于我确认http://127.0.0.1:8889/stream/111能够播放出视频,所以我现在想采用python的方式读取http://127.0.0.1:8889/stream/111/whep的流,来实现保存流的每帧图像
涉及到的工具有:
[*]开源视频流服务器—>mediamtx
[*]本地推流rtmp工具—>ffmpeg+lal
[*]编写代码读取webrtc流工具—>PyCharm
https://i-blog.csdnimg.cn/direct/536424a80fd449dba2e1b1bbf68b4249.png
二、代码编写实现:
import asyncio
import json
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
import aiohttp
import cv2
import numpy as np
class DummyVideoTrack(VideoStreamTrack):
"""创建一个虚拟的视频轨道,以防止 WebRTC 报错"""
def __init__(self):
super().__init__()
async def recv(self):
# 生成一个黑色帧,防止 WebRTC 报错
width, height = 640, 480
frame = np.zeros((height, width, 3), dtype=np.uint8)
return frame
class VideoReceiver:
def __init__(self):
self.frame = None
async def consume_track(self, track):
while True:
try:
frame = await track.recv()
img = frame.to_ndarray(format="bgr24")
cv2.imshow("Video", img)
cv2.waitKey(1)
except Exception as e:
print("处理帧错误:", e)
break
async def send_ice_candidate(candidate, ice_url):
async with aiohttp.ClientSession() as session:
async with session.post(
ice_url,
data=json.dumps({"candidate": candidate}),
headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
print(f"发送 ICE candidate 错误: {response.status}")
async def run():
pc = RTCPeerConnection()
receiver = VideoReceiver()
# 添加一个虚拟的视频轨道,防止 WebRTC 报错
pc.addTrack(DummyVideoTrack())
# 处理接收到的媒体轨道
def on_track(track):
print(f"接收到 {track.kind} 轨道")
if track.kind == "video":
asyncio.create_task(receiver.consume_track(track))
pc.add_listener("track", on_track)
# 监听 ICE 连接状态变化
def on_ice_connection_state_change():
print(f"ICE 连接状态: {pc.iceConnectionState}")
pc.on("iceconnectionstatechange", on_ice_connection_state_change)
# 监听 ICE Candidate
def on_ice_candidate(candidate):
if candidate:
print(f"新 ICE candidate: {candidate}")
asyncio.create_task(send_ice_candidate(candidate, ice_url))
pc.on("icecandidate", on_ice_candidate)
# 创建 SDP Offer
offer = await pc.createOffer()
await pc.setLocalDescription(offer)
# 发送 Offer 到 WHEP 端点
whep_url = "http://127.0.0.1:8889/stream/111/whep"
headers = {"Content-Type": "application/sdp"}
async with aiohttp.ClientSession() as session:
async with session.post(
whep_url,
data=pc.localDescription.sdp,
headers=headers
) as response:
if response.status != 201:
raise Exception(f"服务器返回错误: {response.status}")
answer_sdp = await response.text()
answer = RTCSessionDescription(sdp=answer_sdp, type="answer")
await pc.setRemoteDescription(answer)
if "Location" in response.headers:
ice_url = "http://127.0.0.1:8889" + response.headers["Location"]
print("ICE 协商 URL:", ice_url)
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
finally:
await pc.close()
if __name__ == "__main__":
asyncio.run(run())
三、效果展示:
[*]浏览器可以直接播放webrtc的扩展地址,然后最上层就是代码cv2读取每帧表现出来的画面
https://i-blog.csdnimg.cn/direct/5cac1db8c7794221916cf43f0a428949.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]