2023-03-24
技术文档
00

目录

探索
需求
思路
demo

公司在做的产品是基于SIP协议的音视频通讯终端,最新的产品需要进行音视频的测试,之前构建的工具中仅支持音频回环测试,后来做了视频回环功能。近期开会又讲需要完善音视频的测试,测试使用不同的codec完成测试,觉得我之前的回环并不能作为很好的测试条件(因为还涉及到产品自身的音视频采集),所以才有这么一个需求。

探索

其实一开始没有选择使用ffmpeg而是选择使用了一个我忘记叫什么的库,反正不太好用,也是基于C的库,后来开会说让用ffmeg,就找到了ffmpeg-python 和 python-ffmeg这两个库都是基于ffmpeg-CLI的,操作粒度不是很够,找来找去就找到了pyav。

pyav是用cython写的一个库,把ffmeg的对象重新封装了一遍,以便在python中使用。

但是文档极其稀少,中文文档更是少得可怜。所以就是自己摸着石头过河。

需求

这里我的需求是打开本地的文件,再以rtp流的形式通过本地指定端口发送给目标地址的目标端口。

既需要发送视频也需要发送音频,通过两个不同的端口。

demo期间使用的音频codec是PCMU,使用的视频codec是h264。

思路

音频方面 因为pcmu我希望的ptime是20ms,在windows系统下time.sleep()无法做到毫秒级等待,所以需要在linux系统中运行,并且为了可以更加准确的做到20ms间隔发包,还需要专门对时间进行检校。

我这里在程序开始前记录了一个standard_start ,并且在每次解码新的帧之前对standard_start +20ms,以此作为标准的解码开始时间,然后再解码前记了一下时间really_start ,启动延迟delay_time = really_start -standard_start 就是在编解码结束mux流之后计算了一下time.time()-really_start作为用时used ,这样剩余的应该sleep的时间就是20ms - delay_time - used 如果大于零就等待,如果小于零就立刻进入下一帧的编解码过程。

视频方面 因为编码器有40帧的缓冲区(不知道在哪里设置,反正确实如此),所以不应该每向编码器队列中入队一帧就等待,而是应该等待编码器有输出的时候再开始等待。然后再结束的时候通过out_packets = self.output_stream.encode()将缓冲区队列中的40帧取出,然后推流。

demo

伪代码大概就是:

// 打开输入文件容器

input_ = av.open(in_file)

// 打开输出容器

output = av.open(rtp_url)

// 添加输出流

output_stream = output.add_stream(codec)

// 解码输入帧

input_frame = input.decode(audio=0)

// 编码输出包

output_packets = output_stream.encode(input_frame)

// 输出包

output.mux(output_packets)

python
class AVPlayer: def __init__(self, use_port: int, res: str = 'test_30fps_8000hz.avi'): self.remote_ip = '' self.remote_port = None self.use_port = use_port self.res = os.path.join(os.path.dirname(__file__), res) self._input = av.open(self.res, 'r') self.url = '' self.url_option = '' self.codec = '' self.rate = None self._output = None self.output_stream = None self.resampler = None self.is_config = False self.is_sending = False class MyAudioPlayer(AVPlayer): def __init__(self, use_port, res: str = 'test_30fps_8000hz.avi'): super().__init__(use_port, res) self.channels = None self.format = None self.ptime = None self.pkt_size = None def config(self, aim_ip, aim_port, codec: str = 'pcmu', ptime: int = 20): if codec == 'pcmu': self.codec = 'pcm_mulaw' self.rate = 8000 self.channels = 1 elif codec == 'pcma': self.codec = 'pcm_alaw' self.rate = 8000 self.channels = 1 else: raise TypeError if self.channels == 1: self.format = "mono" else: self.format = "stereo" self.ptime = ptime self.pkt_size = self.channels * self.rate * self.ptime / 1000 self.url_option = f'&pkt_size={self.pkt_size + 12}' self.url = f'rtp://{aim_ip}:{aim_port}?localrtpport={self.use_port}{self.url_option}' self._output = av.open(self.url, 'w', format="rtp") self.output_stream = self._output.add_stream(self.codec, self.rate) self.output_stream.codec_context.channels = self.channels self.resampler = AudioResampler("s16", self.format, self.rate, self.pkt_size) self.is_config = True self.is_sending = True thread = threading.Thread(target=self._send) thread.setDaemon(True) thread.start() def _send(self): standard_start = time.time() # 标准启动时 pts = 0 while self.is_sending: with av.open(self.res, 'r') as _input: for input_frame in _input.decode(audio=0): resample_frames = self.resampler.resample(input_frame) for resample_frame in resample_frames: out_packets = self.output_stream.encode(resample_frame) if out_packets: for out_packet in out_packets: out_packet.pts = pts out_packet.dts = pts really_start = time.time() # 实际启动时 delay_time = really_start - standard_start # 启动偏差 self._output.mux(out_packet) used = time.time() - really_start # 实际用时 ptime = (self.ptime / 1000) # 标准用时 if (delay_time + used) < ptime: time.sleep(ptime - used - delay_time) standard_start += ptime pts += 160 _input.close() class MyVideoPlayer(AVPlayer): def __init__(self, use_port, res: str = 'test_30fps_noB_30gop_8000hz_20ms.avi'): super().__init__(use_port, res) self.stream_option = {} self.pack_mode = None self.option = {} def config(self, aim_ip, aim_port, codec: str = 'h264', pack_mode: int = 1): if codec == 'h264': self.codec = 'libx264' self.rate = 30 self.stream_option = {"crf": "25"} else: raise TypeError if pack_mode == 0: self.url_option = '&pkt_size=65535' self.pack_mode = pack_mode self.option = {"rtpflags": "h264_mode0"} else: self.url_option = '' self.pack_mode = pack_mode self.option = {} self.url = f'rtp://{aim_ip}:{aim_port}?localrtpport={self.use_port}{self.url_option}' self._output = av.open(self.url, "w", format="rtp", options=self.option) self.output_stream = self._output.add_stream(self.codec, self.rate, options=self.stream_option) self.output_stream.width = self._input.streams.video[0].codec_context.width self.output_stream.height = self._input.streams.video[0].codec_context.height self.output_stream.pix_fmt = self._input.streams.video[0].codec_context.pix_fmt self._input.close() self.is_config = True self.is_sending = True thread = threading.Thread(target=self._send) thread.setDaemon(True) thread.start() def _send(self): standard_start = time.time() # 标准启动时 pts = 0 while self.is_sending: with av.open(self.res, 'r') as _input: for input_frame in _input.decode(video=0): pts += 1 input_frame.pts = pts out_packets = self.output_stream.encode(input_frame) if out_packets: for out_packet in out_packets: out_packet.dts = out_packet.pts really_start = time.time() # 实际启动时 delay_time = really_start - standard_start # 启动偏差 self._output.mux(out_packet) used = time.time() - really_start # 实际用时 ptime = 1 / self.rate # 标准用时 if (delay_time + used) < ptime: time.sleep(ptime - used - delay_time) standard_start += ptime if not self.is_sending: break out_packets = self.output_stream.encode() for out_packet in out_packets: out_packet.dts = out_packet.pts really_start = time.time() # 实际启动时 delay_time = really_start - standard_start # 启动偏差 self._output.mux(out_packet) used = time.time() - really_start # 实际用时 ptime = 1 / self.rate # 标准用时 if (delay_time + used) < ptime: time.sleep(ptime - used - delay_time) standard_start += ptime class Media: def __init__(self): self.res = os.path.join(os.path.dirname(__file__), 'test_30fps_8000hz.avi') self.aim_ip = '10.20.0.13' self.aim_a_port = 12100 self.aim_v_port = 12150 self.use_a_port = 12100 self.use_v_port = 12150 self.audio_codec = '' self.audio_player = MyAudioPlayer(self.use_a_port) self.video_player = MyVideoPlayer(self.use_v_port) def send_all(self): self.audio_player.config(self.aim_ip, self.aim_a_port) self.video_player.config(self.aim_ip, self.aim_v_port)

本文作者:Yaki

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.14.8