公司在做的产品是基于SIP协议的音视频通讯终端,最新的产品需要进行音视频的测试,之前构建的工具中仅支持音频回环测试,后来做了视频回环功能。近期开会又讲需要完善音视频的测试,测试使用不同的codec完成测试,觉得我之前的回环并不能作为很好的测试条件(因为还涉及到产品自身的音视频采集),所以才有这么一个需求。
其实一开始没有选择使用ffmpeg而是选择使用了一个我忘记叫什么的库,反正不太好用,也是基于C的库,后来开会说让用ffmeg,就找到了ffmpeg-python 和 python-ffmeg这两个库都是基于ffmpeg-CLI的,操作粒度不是很够,找来找去就找到了pyav。
pyav是用cython写的一个库,把ffmeg的对象重新封装了一遍,以便在python中使用。
但是文档极其稀少,中文文档更是少得可怜。所以就是自己摸着石头过河。
这里我的需求是打开本地的文件,再以rtp流的形式通过本地指定端口发送给目标地址的目标端口。
既需要发送视频也需要发送音频,通过两个不同的端口。
demo期间使用的音频codec是PCMU,使用的视频codec是h264。
音频方面 因为pcmu我希望的ptime是20ms,在windows系统下time.sleep()无法做到毫秒级等待,所以需要在linux系统中运行,并且为了可以更加准确的做到20ms间隔发包,还需要专门对时间进行检校。
我这里在程序开始前记录了一个standard_start
,并且在每次解码新的帧之前对standard_start +20ms
,以此作为标准的解码开始时间,然后再解码前记了一下时间really_start
,启动延迟delay_time = really_start -standard_start
就是在编解码结束mux流之后计算了一下time.time()-really_start
作为用时used
,这样剩余的应该sleep的时间就是20ms - delay_time - used
如果大于零就等待,如果小于零就立刻进入下一帧的编解码过程。
视频方面 因为编码器有40帧的缓冲区(不知道在哪里设置,反正确实如此),所以不应该每向编码器队列中入队一帧就等待,而是应该等待编码器有输出的时候再开始等待。然后再结束的时候通过out_packets = self.output_stream.encode()
将缓冲区队列中的40帧取出,然后推流。
伪代码大概就是:
// 打开输入文件容器
input_ = av.open(in_file)
// 打开输出容器
output = av.open(rtp_url)
// 添加输出流
output_stream = output.add_stream(codec)
// 解码输入帧
input_frame = input.decode(audio=0)
// 编码输出包
output_packets = output_stream.encode(input_frame)
// 输出包
output.mux(output_packets)
pythonclass AVPlayer:
def __init__(self, use_port: int, res: str = 'test_30fps_8000hz.avi'):
self.remote_ip = ''
self.remote_port = None
self.use_port = use_port
self.res = os.path.join(os.path.dirname(__file__), res)
self._input = av.open(self.res, 'r')
self.url = ''
self.url_option = ''
self.codec = ''
self.rate = None
self._output = None
self.output_stream = None
self.resampler = None
self.is_config = False
self.is_sending = False
class MyAudioPlayer(AVPlayer):
def __init__(self, use_port, res: str = 'test_30fps_8000hz.avi'):
super().__init__(use_port, res)
self.channels = None
self.format = None
self.ptime = None
self.pkt_size = None
def config(self, aim_ip, aim_port, codec: str = 'pcmu', ptime: int = 20):
if codec == 'pcmu':
self.codec = 'pcm_mulaw'
self.rate = 8000
self.channels = 1
elif codec == 'pcma':
self.codec = 'pcm_alaw'
self.rate = 8000
self.channels = 1
else:
raise TypeError
if self.channels == 1:
self.format = "mono"
else:
self.format = "stereo"
self.ptime = ptime
self.pkt_size = self.channels * self.rate * self.ptime / 1000
self.url_option = f'&pkt_size={self.pkt_size + 12}'
self.url = f'rtp://{aim_ip}:{aim_port}?localrtpport={self.use_port}{self.url_option}'
self._output = av.open(self.url, 'w', format="rtp")
self.output_stream = self._output.add_stream(self.codec, self.rate)
self.output_stream.codec_context.channels = self.channels
self.resampler = AudioResampler("s16", self.format, self.rate, self.pkt_size)
self.is_config = True
self.is_sending = True
thread = threading.Thread(target=self._send)
thread.setDaemon(True)
thread.start()
def _send(self):
standard_start = time.time() # 标准启动时
pts = 0
while self.is_sending:
with av.open(self.res, 'r') as _input:
for input_frame in _input.decode(audio=0):
resample_frames = self.resampler.resample(input_frame)
for resample_frame in resample_frames:
out_packets = self.output_stream.encode(resample_frame)
if out_packets:
for out_packet in out_packets:
out_packet.pts = pts
out_packet.dts = pts
really_start = time.time() # 实际启动时
delay_time = really_start - standard_start # 启动偏差
self._output.mux(out_packet)
used = time.time() - really_start # 实际用时
ptime = (self.ptime / 1000) # 标准用时
if (delay_time + used) < ptime:
time.sleep(ptime - used - delay_time)
standard_start += ptime
pts += 160
_input.close()
class MyVideoPlayer(AVPlayer):
def __init__(self, use_port, res: str = 'test_30fps_noB_30gop_8000hz_20ms.avi'):
super().__init__(use_port, res)
self.stream_option = {}
self.pack_mode = None
self.option = {}
def config(self, aim_ip, aim_port, codec: str = 'h264', pack_mode: int = 1):
if codec == 'h264':
self.codec = 'libx264'
self.rate = 30
self.stream_option = {"crf": "25"}
else:
raise TypeError
if pack_mode == 0:
self.url_option = '&pkt_size=65535'
self.pack_mode = pack_mode
self.option = {"rtpflags": "h264_mode0"}
else:
self.url_option = ''
self.pack_mode = pack_mode
self.option = {}
self.url = f'rtp://{aim_ip}:{aim_port}?localrtpport={self.use_port}{self.url_option}'
self._output = av.open(self.url, "w", format="rtp", options=self.option)
self.output_stream = self._output.add_stream(self.codec, self.rate, options=self.stream_option)
self.output_stream.width = self._input.streams.video[0].codec_context.width
self.output_stream.height = self._input.streams.video[0].codec_context.height
self.output_stream.pix_fmt = self._input.streams.video[0].codec_context.pix_fmt
self._input.close()
self.is_config = True
self.is_sending = True
thread = threading.Thread(target=self._send)
thread.setDaemon(True)
thread.start()
def _send(self):
standard_start = time.time() # 标准启动时
pts = 0
while self.is_sending:
with av.open(self.res, 'r') as _input:
for input_frame in _input.decode(video=0):
pts += 1
input_frame.pts = pts
out_packets = self.output_stream.encode(input_frame)
if out_packets:
for out_packet in out_packets:
out_packet.dts = out_packet.pts
really_start = time.time() # 实际启动时
delay_time = really_start - standard_start # 启动偏差
self._output.mux(out_packet)
used = time.time() - really_start # 实际用时
ptime = 1 / self.rate # 标准用时
if (delay_time + used) < ptime:
time.sleep(ptime - used - delay_time)
standard_start += ptime
if not self.is_sending:
break
out_packets = self.output_stream.encode()
for out_packet in out_packets:
out_packet.dts = out_packet.pts
really_start = time.time() # 实际启动时
delay_time = really_start - standard_start # 启动偏差
self._output.mux(out_packet)
used = time.time() - really_start # 实际用时
ptime = 1 / self.rate # 标准用时
if (delay_time + used) < ptime:
time.sleep(ptime - used - delay_time)
standard_start += ptime
class Media:
def __init__(self):
self.res = os.path.join(os.path.dirname(__file__), 'test_30fps_8000hz.avi')
self.aim_ip = '10.20.0.13'
self.aim_a_port = 12100
self.aim_v_port = 12150
self.use_a_port = 12100
self.use_v_port = 12150
self.audio_codec = ''
self.audio_player = MyAudioPlayer(self.use_a_port)
self.video_player = MyVideoPlayer(self.use_v_port)
def send_all(self):
self.audio_player.config(self.aim_ip, self.aim_a_port)
self.video_player.config(self.aim_ip, self.aim_v_port)
本文作者:Yaki
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
预览: