#!/usr/bin/env python3 from multiprocessing import Process, Queue from os import path, environ from subprocess import Popen, PIPE, STDOUT from sys import platform from time import sleep class Proc: """ Find a running process from Python """ @classmethod def _list_windows(cls) -> list: """ Find all running process with wmi :return: list of dictionaries with descriptions of found processes """ execlist = [] separate = b'\r\r\n' out, err = Popen(['wmic', 'process', 'get', 'CommandLine,ExecutablePath,Name,ProcessId', '/format:list'], stdout=PIPE, stderr=PIPE).communicate() for line in out.split(separate + separate): execpid, exename, exepath, cmdline = None, None, None, None for subline in line.split(separate): if b'ProcessId=' in subline: execpid = subline.split(b'=')[1].decode('utf-8') if b'Name=' in subline: exename = subline.split(b'=')[1].decode('utf-8') if b'ExecutablePath=' in subline: exepath = subline.split(b'=')[1].decode('utf-8') if b'CommandLine=' in subline: cmdline = subline.split(b'=')[1].decode('utf-8') if execpid and exename: execlist.append({'execpid': execpid, 'exename': exename, 'exepath': exepath, 'cmdline': cmdline}) return execlist @classmethod def _list_linux(cls) -> list: """ Find all running process with ps :return: list of dictionaries with descriptions of found processes """ execlist = [] out, err = Popen(['/bin/ps', '-eo', 'pid,args'], stdout=PIPE, stderr=PIPE).communicate() for line in out.splitlines(): execpid = line.split()[0].decode('utf-8') exepath = line.split()[1].decode('utf-8') exename = path.basename(exepath) cmdline = line.split(None, 1)[1].decode('utf-8') if execpid and exename: execlist.append({'execpid': execpid, 'exename': exename, 'exepath': exepath, 'cmdline': cmdline}) return execlist @classmethod def list(cls) -> list: """ Find all running process :return: list of dictionaries with descriptions of found processes """ if platform.startswith('linux') or platform.startswith('darwin'): return cls._list_linux() elif platform.startswith('win32'): return cls._list_windows() else: return None @classmethod def search(cls, find: str, exclude: str = None) -> list: """ Find specified processes :param find: find process pid, name or arguments :param exclude: exclude process pid, name or arguments :return: list of dictionaries with descriptions of found processes """ proc_found = [] try: for proc in cls.list(): if exclude and (exclude in proc['execpid'] or exclude in proc['exename'] or exclude in proc['exepath'] or exclude in proc['cmdline']): pass elif find in proc['execpid'] or find in proc['exename'] or \ find in proc['exepath'] or find in proc['cmdline']: proc_found.append(proc) except TypeError as ex: print('ON', platform, 'PLATFORM', 'search ERROR:', ex) finally: if len(proc_found) == 0: return None else: return proc_found @classmethod def kill(cls, pid: int) -> None: """ Kill the process by means of the OS :param pid: process ID :return: None """ if platform.startswith('linux') or platform.startswith('darwin'): Popen(['kill', '-s', 'SIGKILL', str(pid)]) elif platform.startswith('win32'): Popen(['taskkill', '/PID', str(pid), '/F']) class FFmpeg: """ FFmpeg management from Python """ @classmethod def run(cls, src: str, preset: str = None, fps: int = None, dst: str = None, ffpath: str = None, watchdog: bool = False, sec: int = 5, mono: bool = False) -> None: """ Running the installed ffmpeg :param src: sources urls (example: "rtsp://user:pass@host:554/Streaming/Channels/101, anull") :param preset: 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p :param fps: frame per second encoding output :param dst: destination url (example: rtp://239.0.0.1:5554) :param ffpath: alternative path to bin (example: /usr/bin/ffmpeg) :param watchdog: detect ffmpeg freeze and terminate :param sec: seconds to wait before the watchdog terminates :param mono: detect ffmpeg running copy and terminate :return: None """ process = cls._bin(ffpath).split()+cls._src(src).split()+cls._preset(preset, fps).split()+cls._dst(dst).split() if mono and Proc.search(' '.join(process)): print('Process already exist, exit...') else: with Popen(process, stdout=PIPE, stderr=STDOUT) as proc: que = None if watchdog: que = Queue() Process(target=cls._watchdog, args=(proc.pid, sec, que,), daemon=True).start() for line in proc.stdout: if not que: print(line, flush=True) else: que.put(line) exit() @classmethod def _bin(cls, path_ffmpeg: str) -> str: """ Returns the path to the ffmpeg depending on the OS :param path_ffmpeg: alternative path to bin :return: path to ffmpeg """ faq = ('\n' 'Main download page: https://ffmpeg.org/download.html\n' '\n' 'Install on Linux (Debian):\n' '\tsudo apt install -y ffmpeg\n' '\tTarget: /usr/bin/ffmpeg\n' '\n' 'Install on Windows:\n' '\tDownload and extract archive from: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n' '\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n' '\n' 'Install on MacOS:\n' '\tDownload and extract archive from: https://evermeet.cx/ffmpeg/\n' '\tTarget: /usr/bin/ffmpeg\n') if not path_ffmpeg: if platform.startswith('linux') or platform.startswith('darwin'): path_ffmpeg = '/usr/bin/ffmpeg' elif platform.startswith('win32'): path_ffmpeg = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffmpeg.exe" if path.exists(path_ffmpeg): return path_ffmpeg else: print('ON', platform, 'PLATFORM', 'not found ffmpeg', faq) return None @classmethod def _src(cls, sources: str) -> list: """ Parsing sources into ffmpeg format :param sources: comma-separated list of sources in string format :return: ffmpeg format list of sources """ list_sources = [] for src in sources.split(','): src = src.strip() if 'null' in src: src = ' '.join(['-f lavfi -i', src]) elif 'rtsp' in src: src = ' '.join(['-rtsp_transport tcp -i', src]) else: src = ' '.join(['-stream_loop -1 -re -i', src]) list_sources.append(src) return ' '.join(list_sources) @classmethod def _preset(cls, choice: str, fps: int) -> str: """ Parsing preset into ffmpeg format :param choice: preset selection :param fps: frame per second encoding output :return: ffmpeg format encoding parameters """ tune = '-tune zerolatency' video = '-c:v copy' audio = '-c:a aac -b:a 128k' width, height, kbps = None, None, None if choice: if choice == '240p': width, height, kbps = 426, 240, 480 if choice == '360p': width, height, kbps = 640, 360, 720 if choice == '480p': width, height, kbps = 854, 480, 1920 if choice == '720p': width, height, kbps = 1280, 720, 3960 if choice == '1080p': width, height, kbps = 1920, 1080, 5940 if choice == '1440p': width, height, kbps = 2560, 1440, 12960 if choice == '2160p': width, height, kbps = 3840, 2160, 32400 if width and height and kbps: video = ''.join(['-vf scale=', str(width), ':', str(height), ',setsar=1:1']) video = ' '.join([video, '-c:v libx264 -pix_fmt yuv420p -preset ultrafast']) if fps: video = ' '.join([video, '-r', str(fps), '-g', str(fps * 2)]) video = ' '.join([video, '-b:v', str(kbps) + 'k']) return ' '.join([tune, video, audio]) @classmethod def _dst(cls, destination: str) -> str: """ Parsing destination into ffmpeg format :param destination: :return: ffmpeg format destination """ container = '-f null' stdout = '-v debug' # '-nostdin -nostats' # '-report' if destination: if 'rtmp' in destination: container = '-f flv' elif "rtp" in destination: container = '-f rtp_mpegts' else: destination = '-' return ' '.join([container, destination, stdout]) @classmethod def _watchdog(cls, pid: int, sec: int = 5, que: Queue = None) -> None: """ If no data arrives in the queue, kill the process :param pid: process ID :param sec: seconds to wait for data :param que: queue pointer :return: None """ if que: while True: while not que.empty(): print(que.get()) sleep(sec) if que.empty(): Proc.kill(pid) print('exit by watchdog') break exit() if __name__ == "__main__": from argparse import ArgumentParser args = ArgumentParser( prog='streaming', description='FFmpeg management from Python', epilog='Dependencies: ' 'Python 3 (tested version 3.9.5 on Debian GNU/Linux 11), ' 'ffmpeg (tested version 4.3.4 on Debian GNU/Linux 11)' ) args.add_argument('-s', '--src', type=str, required=True, help='sources urls (example: "rtsp://user:pass@host:554/Streaming/Channels/101, anull")') args.add_argument('--preset', type=str, default=None, required=False, help='240p, 360p, 480p, 720p, 1080p, 1440p, 2160p') args.add_argument('--fps', type=int, default=None, required=False, help='frame per second encoding output') args.add_argument('--dst', type=str, default=None, required=False, help='destination url (example: rtp://239.0.0.1:5554)') args.add_argument('--ffpath', type=str, default=None, required=False, help='alternative path to bin (example: /usr/bin/ffmpeg)') args.add_argument('--watchdog', action='store_true', required=False, help='detect ffmpeg freeze and terminate') args.add_argument('--sec', type=int, default=15, required=False, help='seconds to wait before the watchdog terminates') args.add_argument('--mono', action='store_true', required=False, help='detect ffmpeg running copy and terminate') args = vars(args.parse_args()) FFmpeg.run(src=args['src'], preset=args['preset'], fps=args['fps'], dst=args['dst'], ffpath=args['ffpath'], watchdog=args['watchdog'], sec=args['sec'], mono=args['mono'])