cctv-scheduler/archive/0.2/streaming.py

298 lines
12 KiB
Python

#!/usr/bin/env python3
from multiprocessing import Process, Queue
from os import path, environ
from subprocess import Popen, PIPE, STDOUT
from sys import platform
from time import sleep
class Proc:
"""
Find a running process from Python
"""
@classmethod
def _list_windows(cls) -> list:
"""
Find all running process with wmi
:return: list of dictionaries with descriptions of found processes
"""
execlist = []
separate = b'\r\r\n'
out, err = Popen(['wmic', 'process', 'get', 'CommandLine,ExecutablePath,Name,ProcessId', '/format:list'],
stdout=PIPE, stderr=PIPE).communicate()
for line in out.split(separate + separate):
execpid, exename, exepath, cmdline = None, None, None, None
for subline in line.split(separate):
if b'ProcessId=' in subline:
execpid = subline.split(b'=')[1].decode('utf-8')
if b'Name=' in subline:
exename = subline.split(b'=')[1].decode('utf-8')
if b'ExecutablePath=' in subline:
exepath = subline.split(b'=')[1].decode('utf-8')
if b'CommandLine=' in subline:
cmdline = subline.split(b'=')[1].decode('utf-8')
if execpid and exename:
execlist.append({'execpid': execpid, 'exename': exename, 'exepath': exepath, 'cmdline': cmdline})
return execlist
@classmethod
def _list_linux(cls) -> list:
"""
Find all running process with ps
:return: list of dictionaries with descriptions of found processes
"""
execlist = []
out, err = Popen(['/bin/ps', '-eo', 'pid,args'], stdout=PIPE, stderr=PIPE).communicate()
for line in out.splitlines():
execpid = line.split()[0].decode('utf-8')
exepath = line.split()[1].decode('utf-8')
exename = path.basename(exepath)
cmdline = line.split(None, 1)[1].decode('utf-8')
if execpid and exename:
execlist.append({'execpid': execpid, 'exename': exename, 'exepath': exepath, 'cmdline': cmdline})
return execlist
@classmethod
def list(cls) -> list:
"""
Find all running process
:return: list of dictionaries with descriptions of found processes
"""
if platform.startswith('linux') or platform.startswith('darwin'):
return cls._list_linux()
elif platform.startswith('win32'):
return cls._list_windows()
else:
return None
@classmethod
def search(cls, find: str, exclude: str = None) -> list:
"""
Find specified processes
:param find: find process pid, name or arguments
:param exclude: exclude process pid, name or arguments
:return: list of dictionaries with descriptions of found processes
"""
proc_found = []
try:
for proc in cls.list():
if exclude and (exclude in proc['execpid'] or exclude in proc['exename'] or
exclude in proc['exepath'] or exclude in proc['cmdline']):
pass
elif find in proc['execpid'] or find in proc['exename'] or \
find in proc['exepath'] or find in proc['cmdline']:
proc_found.append(proc)
except TypeError as ex:
print('ON', platform, 'PLATFORM', 'search ERROR:', ex)
finally:
if len(proc_found) == 0:
return None
else:
return proc_found
@classmethod
def kill(cls, pid: int) -> None:
"""
Kill the process by means of the OS
:param pid: process ID
:return: None
"""
if platform.startswith('linux') or platform.startswith('darwin'):
Popen(['kill', '-s', 'SIGKILL', str(pid)])
elif platform.startswith('win32'):
Popen(['taskkill', '/PID', str(pid), '/F'])
class FFmpeg:
"""
FFmpeg management from Python
"""
@classmethod
def run(cls, src: str, preset: str = None, fps: int = None, dst: str = None,
ffpath: str = None, watchdog: bool = False, sec: int = 5, mono: bool = False) -> None:
"""
Running the installed ffmpeg
:param src: sources urls (example: "rtsp://user:pass@host:554/Streaming/Channels/101, anull")
:param preset: 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p
:param fps: frame per second encoding output
:param dst: destination url (example: rtp://239.0.0.1:5554)
:param ffpath: alternative path to bin (example: /usr/bin/ffmpeg)
:param watchdog: detect ffmpeg freeze and terminate
:param sec: seconds to wait before the watchdog terminates
:param mono: detect ffmpeg running copy and terminate
:return: None
"""
process = cls._bin(ffpath).split()+cls._src(src).split()+cls._preset(preset, fps).split()+cls._dst(dst).split()
if mono and Proc.search(' '.join(process)):
print('Process already exist, exit...')
else:
with Popen(process, stdout=PIPE, stderr=STDOUT) as proc:
que = None
if watchdog:
que = Queue()
Process(target=cls._watchdog, args=(proc.pid, sec, que,), daemon=True).start()
for line in proc.stdout:
if not que:
print(line, flush=True)
else:
que.put(line)
exit()
@classmethod
def _bin(cls, path_ffmpeg: str) -> str:
"""
Returns the path to the ffmpeg depending on the OS
:param path_ffmpeg: alternative path to bin
:return: path to ffmpeg
"""
faq = ('\n'
'Main download page: https://ffmpeg.org/download.html\n'
'\n'
'Install on Linux (Debian):\n'
'\tsudo apt install -y ffmpeg\n'
'\tTarget: /usr/bin/ffmpeg\n'
'\n'
'Install on Windows:\n'
'\tDownload and extract archive from: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n'
'\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n'
'\n'
'Install on MacOS:\n'
'\tDownload and extract archive from: https://evermeet.cx/ffmpeg/\n'
'\tTarget: /usr/bin/ffmpeg\n')
if not path_ffmpeg:
if platform.startswith('linux') or platform.startswith('darwin'):
path_ffmpeg = '/usr/bin/ffmpeg'
elif platform.startswith('win32'):
path_ffmpeg = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffmpeg.exe"
if path.exists(path_ffmpeg):
return path_ffmpeg
else:
print('ON', platform, 'PLATFORM', 'not found ffmpeg', faq)
return None
@classmethod
def _src(cls, sources: str) -> list:
"""
Parsing sources into ffmpeg format
:param sources: comma-separated list of sources in string format
:return: ffmpeg format list of sources
"""
list_sources = []
for src in sources.split(','):
src = src.strip()
if 'null' in src:
src = ' '.join(['-f lavfi -i', src])
elif 'rtsp' in src:
src = ' '.join(['-rtsp_transport tcp -i', src])
else:
src = ' '.join(['-stream_loop -1 -re -i', src])
list_sources.append(src)
return ' '.join(list_sources)
@classmethod
def _preset(cls, choice: str, fps: int) -> str:
"""
Parsing preset into ffmpeg format
:param choice: preset selection
:param fps: frame per second encoding output
:return: ffmpeg format encoding parameters
"""
tune = '-tune zerolatency'
video = '-c:v copy'
audio = '-c:a aac -b:a 128k'
width, height, kbps = None, None, None
if choice:
if choice == '240p':
width, height, kbps = 426, 240, 480
if choice == '360p':
width, height, kbps = 640, 360, 720
if choice == '480p':
width, height, kbps = 854, 480, 1920
if choice == '720p':
width, height, kbps = 1280, 720, 3960
if choice == '1080p':
width, height, kbps = 1920, 1080, 5940
if choice == '1440p':
width, height, kbps = 2560, 1440, 12960
if choice == '2160p':
width, height, kbps = 3840, 2160, 32400
if width and height and kbps:
video = ''.join(['-vf scale=', str(width), ':', str(height), ',setsar=1:1'])
video = ' '.join([video, '-c:v libx264 -pix_fmt yuv420p -preset ultrafast'])
if fps:
video = ' '.join([video, '-r', str(fps), '-g', str(fps * 2)])
video = ' '.join([video, '-b:v', str(kbps) + 'k'])
return ' '.join([tune, video, audio])
@classmethod
def _dst(cls, destination: str) -> str:
"""
Parsing destination into ffmpeg format
:param destination:
:return: ffmpeg format destination
"""
container = '-f null'
stdout = '-v debug' # '-nostdin -nostats' # '-report'
if destination:
if 'rtmp' in destination:
container = '-f flv'
elif "rtp" in destination:
container = '-f rtp_mpegts'
else:
destination = '-'
return ' '.join([container, destination, stdout])
@classmethod
def _watchdog(cls, pid: int, sec: int = 5, que: Queue = None) -> None:
"""
If no data arrives in the queue, kill the process
:param pid: process ID
:param sec: seconds to wait for data
:param que: queue pointer
:return: None
"""
if que:
while True:
while not que.empty():
print(que.get())
sleep(sec)
if que.empty():
Proc.kill(pid)
print('exit by watchdog')
break
exit()
if __name__ == "__main__":
from argparse import ArgumentParser
args = ArgumentParser(
prog='streaming',
description='FFmpeg management from Python',
epilog='Dependencies: '
'Python 3 (tested version 3.9.5 on Debian GNU/Linux 11), '
'ffmpeg (tested version 4.3.4 on Debian GNU/Linux 11)'
)
args.add_argument('-s', '--src', type=str, required=True,
help='sources urls (example: "rtsp://user:pass@host:554/Streaming/Channels/101, anull")')
args.add_argument('--preset', type=str, default=None, required=False,
help='240p, 360p, 480p, 720p, 1080p, 1440p, 2160p')
args.add_argument('--fps', type=int, default=None, required=False,
help='frame per second encoding output')
args.add_argument('--dst', type=str, default=None, required=False,
help='destination url (example: rtp://239.0.0.1:5554)')
args.add_argument('--ffpath', type=str, default=None, required=False,
help='alternative path to bin (example: /usr/bin/ffmpeg)')
args.add_argument('--watchdog', action='store_true', required=False,
help='detect ffmpeg freeze and terminate')
args.add_argument('--sec', type=int, default=15, required=False,
help='seconds to wait before the watchdog terminates')
args.add_argument('--mono', action='store_true', required=False,
help='detect ffmpeg running copy and terminate')
args = vars(args.parse_args())
FFmpeg.run(src=args['src'], preset=args['preset'], fps=args['fps'], dst=args['dst'],
ffpath=args['ffpath'], watchdog=args['watchdog'], sec=args['sec'], mono=args['mono'])