万万哇 发表于 2025-2-15 01:46:24

Mediamtx+Python读取webrtc流

一、功能思绪:
1、我采用ffmpeg -re -stream_loop -1 -i xcc.mp4 -c:v libx264 -profile:v baseline -x264opts "bframes=0:repeat_headers=1" -b:v 1500k -preset fast -f flv rtmp://127.0.0.1:1835/stream/111推流到mediamtx的rtmp上
2、通过mediamtx自带的转流,我将可以直接把推过来的流以webrtc方式访问http://127.0.0.1:8889/stream/111
3、由于我确认http://127.0.0.1:8889/stream/111能够播放出视频,所以我现在想采用python的方式读取http://127.0.0.1:8889/stream/111/whep的流,来实现保存流的每帧图像
涉及到的工具有:


[*]开源视频流服务器—>mediamtx
[*]本地推流rtmp工具—>ffmpeg+lal
[*]编写代码读取webrtc流工具—>PyCharm
https://i-blog.csdnimg.cn/direct/536424a80fd449dba2e1b1bbf68b4249.png
二、代码编写实现:
import asyncio
import json
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
import aiohttp
import cv2
import numpy as np

class DummyVideoTrack(VideoStreamTrack):
    """创建一个虚拟的视频轨道,以防止 WebRTC 报错"""
    def __init__(self):
      super().__init__()

    async def recv(self):
      # 生成一个黑色帧,防止 WebRTC 报错
      width, height = 640, 480
      frame = np.zeros((height, width, 3), dtype=np.uint8)
      return frame

class VideoReceiver:
    def __init__(self):
      self.frame = None

    async def consume_track(self, track):
      while True:
            try:
                frame = await track.recv()
                img = frame.to_ndarray(format="bgr24")
                cv2.imshow("Video", img)
                cv2.waitKey(1)
            except Exception as e:
                print("处理帧错误:", e)
                break

async def send_ice_candidate(candidate, ice_url):
    async with aiohttp.ClientSession() as session:
      async with session.post(
            ice_url,
            data=json.dumps({"candidate": candidate}),
            headers={"Content-Type": "application/json"}
      ) as response:
            if response.status != 200:
                print(f"发送 ICE candidate 错误: {response.status}")

async def run():
    pc = RTCPeerConnection()
    receiver = VideoReceiver()

    # 添加一个虚拟的视频轨道,防止 WebRTC 报错
    pc.addTrack(DummyVideoTrack())


    # 处理接收到的媒体轨道
    def on_track(track):
      print(f"接收到 {track.kind} 轨道")
      if track.kind == "video":
            asyncio.create_task(receiver.consume_track(track))

    pc.add_listener("track", on_track)

    # 监听 ICE 连接状态变化
    def on_ice_connection_state_change():
      print(f"ICE 连接状态: {pc.iceConnectionState}")

    pc.on("iceconnectionstatechange", on_ice_connection_state_change)

    # 监听 ICE Candidate
    def on_ice_candidate(candidate):
      if candidate:
            print(f"新 ICE candidate: {candidate}")
            asyncio.create_task(send_ice_candidate(candidate, ice_url))

    pc.on("icecandidate", on_ice_candidate)

    # 创建 SDP Offer
    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)

    # 发送 Offer 到 WHEP 端点
    whep_url = "http://127.0.0.1:8889/stream/111/whep"
    headers = {"Content-Type": "application/sdp"}

    async with aiohttp.ClientSession() as session:
      async with session.post(
                whep_url,
                data=pc.localDescription.sdp,
                headers=headers
      ) as response:
            if response.status != 201:
                raise Exception(f"服务器返回错误: {response.status}")

            answer_sdp = await response.text()
            answer = RTCSessionDescription(sdp=answer_sdp, type="answer")
            await pc.setRemoteDescription(answer)

            if "Location" in response.headers:
                ice_url = "http://127.0.0.1:8889" + response.headers["Location"]
                print("ICE 协商 URL:", ice_url)

      try:
            while True:
                await asyncio.sleep(1)
      except KeyboardInterrupt:
            pass
      finally:
            await pc.close()

if __name__ == "__main__":
    asyncio.run(run())
三、效果展示:


[*]浏览器可以直接播放webrtc的扩展地址,然后最上层就是代码cv2读取每帧表现出来的画面
https://i-blog.csdnimg.cn/direct/5cac1db8c7794221916cf43f0a428949.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Mediamtx+Python读取webrtc流