一、功能思绪:
- 1、我采用ffmpeg -re -stream_loop -1 -i xcc.mp4 -c:v libx264 -profile:v baseline -x264opts "bframes=0:repeat_headers=1" -b:v 1500k -preset fast -f flv rtmp://127.0.0.1:1835/stream/111推流到mediamtx的rtmp上
- 2、通过mediamtx自带的转流,我将可以直接把推过来的流以webrtc方式访问http://127.0.0.1:8889/stream/111
- 3、由于我确认http://127.0.0.1:8889/stream/111能够播放出视频,所以我现在想采用python的方式读取http://127.0.0.1:8889/stream/111/whep的流,来实现保存流的每帧图像
复制代码 涉及到的工具有:
- 开源视频流服务器—>mediamtx
- 本地推流rtmp工具—>ffmpeg+lal
- 编写代码读取webrtc流工具—>
yCharm
二、代码编写实现:
- import asyncio
- import json
- from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
- import aiohttp
- import cv2
- import numpy as np
- class DummyVideoTrack(VideoStreamTrack):
- """创建一个虚拟的视频轨道,以防止 WebRTC 报错"""
- def __init__(self):
- super().__init__()
- async def recv(self):
- # 生成一个黑色帧,防止 WebRTC 报错
- width, height = 640, 480
- frame = np.zeros((height, width, 3), dtype=np.uint8)
- return frame
- class VideoReceiver:
- def __init__(self):
- self.frame = None
- async def consume_track(self, track):
- while True:
- try:
- frame = await track.recv()
- img = frame.to_ndarray(format="bgr24")
- cv2.imshow("Video", img)
- cv2.waitKey(1)
- except Exception as e:
- print("处理帧错误:", e)
- break
- async def send_ice_candidate(candidate, ice_url):
- async with aiohttp.ClientSession() as session:
- async with session.post(
- ice_url,
- data=json.dumps({"candidate": candidate}),
- headers={"Content-Type": "application/json"}
- ) as response:
- if response.status != 200:
- print(f"发送 ICE candidate 错误: {response.status}")
- async def run():
- pc = RTCPeerConnection()
- receiver = VideoReceiver()
- # 添加一个虚拟的视频轨道,防止 WebRTC 报错
- pc.addTrack(DummyVideoTrack())
- # 处理接收到的媒体轨道
- def on_track(track):
- print(f"接收到 {track.kind} 轨道")
- if track.kind == "video":
- asyncio.create_task(receiver.consume_track(track))
- pc.add_listener("track", on_track)
- # 监听 ICE 连接状态变化
- def on_ice_connection_state_change():
- print(f"ICE 连接状态: {pc.iceConnectionState}")
- pc.on("iceconnectionstatechange", on_ice_connection_state_change)
- # 监听 ICE Candidate
- def on_ice_candidate(candidate):
- if candidate:
- print(f"新 ICE candidate: {candidate}")
- asyncio.create_task(send_ice_candidate(candidate, ice_url))
- pc.on("icecandidate", on_ice_candidate)
- # 创建 SDP Offer
- offer = await pc.createOffer()
- await pc.setLocalDescription(offer)
- # 发送 Offer 到 WHEP 端点
- whep_url = "http://127.0.0.1:8889/stream/111/whep"
- headers = {"Content-Type": "application/sdp"}
- async with aiohttp.ClientSession() as session:
- async with session.post(
- whep_url,
- data=pc.localDescription.sdp,
- headers=headers
- ) as response:
- if response.status != 201:
- raise Exception(f"服务器返回错误: {response.status}")
- answer_sdp = await response.text()
- answer = RTCSessionDescription(sdp=answer_sdp, type="answer")
- await pc.setRemoteDescription(answer)
- if "Location" in response.headers:
- ice_url = "http://127.0.0.1:8889" + response.headers["Location"]
- print("ICE 协商 URL:", ice_url)
- try:
- while True:
- await asyncio.sleep(1)
- except KeyboardInterrupt:
- pass
- finally:
- await pc.close()
- if __name__ == "__main__":
- asyncio.run(run())
复制代码 三、效果展示:
- 浏览器可以直接播放webrtc的扩展地址,然后最上层就是代码cv2读取每帧表现出来的画面
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |