#!/usr/bin/env python3 import json import logging import urllib.request from argparse import ArgumentParser from datetime import datetime from ftplib import FTP from multiprocessing import Process, Queue from os import path, sep, makedirs, remove, replace, environ from subprocess import Popen, PIPE, STDOUT from sys import platform from time import sleep from paramiko import SSHClient, AutoAddPolicy class Parse: """Parser of configs, arguments, parameters. """ def __init__(self, parameters, block: str = None) -> None: """Object constructor. Args: parameters: dictionary as "key":"value" or ArgumentParser class object or string path to the file or string as "var1=val1;var2=val2". block (str, optional): name of target block from text. Defaults to None. """ self.path = '' self.data = {} if type(parameters) is dict: self._dict2dict(parameters) if type(parameters) is ArgumentParser: self._dict2dict(self.argv2dict(parameters)) if type(parameters) is str: if path.exists(parameters): self._dict2dict( self.strs2dict( self.conf2strs(parameters), block ) ) self.path = parameters else: self._dict2dict(self.strs2dict(parameters, block)) def __str__(self) -> str: """Overrides method for print(object). Returns: str: string with contents of the object's dictionary. """ string = '' for key, val in self.data.items(): string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n' return string def _dict2dict(self, dictionary: dict) -> None: """Updates or adds dictionary data. Args: dictionary (dict): dictionary as "key":"value". """ self.data.update(dictionary) def expand(self, store: str = None) -> dict: """Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}. Args: store (str, optional): path to directory with name.conf. Defaults to None. Returns: dict: expanded dictionary as "key":{subkey: subval}. """ for key in self.data: if store: config = store + sep + self.data[key] else: config = self.data[key] with open(config) as file: self.data[key] = Parse(file.read()).data return self.data @classmethod def argv2dict(cls, parser: ArgumentParser) -> dict: """Converts startup arguments to a dictionary. Args: parser (ArgumentParser): argparse.ArgumentParser class object. Returns: dict: dictionary as "key":"value". """ parser = ArgumentParser(add_help=False, parents=[parser]) return vars(parser.parse_args()) @classmethod def conf2strs(cls, config: str) -> str: """Builds a dictionary from a file containing parameters. Args: config (str): path to the config file. Returns: str: string as "var1=val1;\nvar2=val2;". """ with open(config) as file: raw = file.read() strs = '' for line in raw.splitlines(): if not line.lstrip().startswith('#'): strs += line + '\n' return strs @classmethod def strs2dict(cls, strings: str, blockname: str) -> dict: """Builds a dictionary from a strings containing parameters. Args: strings (str): string as "var1=val1;var2=val2;". blockname (str): name of target block from text. Returns: dict: dictionary as "key":"value". """ dictionary = {} if blockname: strings = cls.block(blockname, strings) for line in strings.replace('\n', ';').split(';'): if not line.lstrip().startswith('#') and "=" in line: dictionary[line.split('=')[0].strip()] = line.split('=')[1].strip().split(';')[0].strip() return dictionary @classmethod def str2bool(cls, value: str) -> bool: """Converts a string value to boolean. Args: value (str): string containing "true" or "false", "yes" or "no", "1" or "0". Returns: bool: bool True or False. """ return str(value).lower() in ("true", "yes", "1") @classmethod def block(cls, blockname: str, text: str) -> str: """Cuts a block of text between line [blockname] and line [next block] or EOF. Args: blockname (str): string in [] after which the block starts. text (str): string of text from which the block is needed. Returns: str: string of text between line [block name] and line [next block]. """ level = 1 save = False result = '' for line in text.splitlines(): if line.startswith('[') and blockname in line: level = line.count('[') save = True elif line.startswith('[') and '['*level in line: save = False elif save: result += line + '\n' return result class Connect: """Set of connection methods (functions) for various protocols. """ @staticmethod def http( url: str, method: str = 'GET', username: str = '', password: str = '', authtype: str = None, contenttype: str = 'text/plain', contentdata: str = '' ) -> str: """Handling HTTP request. Args: url (str): request url. method (str, optional): HTTP request method. Defaults to 'GET'. username (str, optional): username for url authentication. Defaults to ''. password (str, optional): password for url authentication. Defaults to ''. authtype (str, optional): digest|basic authentication type. Defaults to None. contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'. contentdata (str, optional): content data. Defaults to ''. Returns: str: HTTP response or 'ERROR'. """ # Preparing authorization if authtype: pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm() pswd.add_password(None, url, username, password) if authtype == 'basic': auth = urllib.request.HTTPBasicAuthHandler(pswd) if authtype == 'digest': auth = urllib.request.HTTPDigestAuthHandler(pswd) urllib.request.install_opener(urllib.request.build_opener(auth)) # Preparing request request = urllib.request.Request(url=url, data=bytes(contentdata.encode('utf-8')), method=method) request.add_header('Content-Type', contenttype) # Response try: response = urllib.request.urlopen(request).read() logging.debug( msg='' + '\n' + 'uri: ' + url + '\n' + 'method: ' + method + '\n' + 'username: ' + username + '\n' + 'password: ' + password + '\n' + 'authtype: ' + authtype + '\n' + 'content-type: ' + contenttype + '\n' + 'content-data: ' + contentdata ) if response.startswith(b'\xff\xd8'): return response else: return str(response.decode('utf-8')) except Exception as error: logging.debug(msg='\n' + 'error: ' + str(error)) return 'ERROR' @staticmethod def ssh_commands(command: str, hostname: str, username: str, password: str, port: int = 22) -> str: """Handling SSH command executing. Args: command (str): command for executing. hostname (str): remote hostname or ip address. username (str): remote host username. password (str): remote host password. port (int, optional): remote host connection port. Defaults to 22. Returns: str: terminal response or 'ERROR'. """ client = SSHClient() client.set_missing_host_key_policy(AutoAddPolicy()) try: client.connect(hostname=hostname, username=username, password=password, port=port) stdin, stdout, stderr = client.exec_command(command=command, get_pty=True) if 'sudo' in command: stdin.write(password + '\n') stdin.flush() stdout.flush() data = stdout.read() + stderr.read() client.close() return data.decode('utf-8') except Exception as error: logging.debug( msg='' + '\n' + 'host: ' + hostname + ':' + str(port) + '\n' + 'user: ' + username + '\n' + 'pass: ' + password + '\n' + 'command: ' + command + '\n' + 'error: ' + str(error) ) return 'ERROR' @staticmethod def ssh_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str: """Handling SFTP upload file. Args: src_file (str): /local/path/to/file. dst_file (str): /remote/path/to/file. hostname (str): remote hostname or ip address. username (str): remote host username. password (str): remote host password. port (int, optional): remote host connection port. Defaults to 22. Returns: str: '/remote/path/to/file' or 'ERROR'. """ client = SSHClient() client.set_missing_host_key_policy(AutoAddPolicy()) try: client.connect(hostname=hostname, username=username, password=password, port=port) client.exec_command('mkdir -p ' + path.dirname(dst_file)) try: sftp = client.open_sftp() sftp.put(localpath=src_file, remotepath=dst_file) sftp.stat(dst_file) sftp.close() return dst_file except Exception as error: logging.debug( msg='' + '\n' + 'dst_file: ' + dst_file + '\n' + 'error: ' + str(error) ) return 'ERROR' except Exception as error: logging.debug( msg='' + '\n' + 'host: ' + hostname + ':' + str(port) + '\n' + 'user: ' + username + '\n' + 'pass: ' + password + '\n' + 'src_file: ' + src_file + '\n' + 'dst_file: ' + dst_file + '\n' + 'error: ' + str(error) ) return 'ERROR' ''' @staticmethod def ssh_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str: """Handling SFTP download file. Args: src_file (str): /remote/path/to/file. dst_file (str): /local/path/to/file. hostname (str): remote hostname or ip address. username (str): remote host username. password (str): remote host password. port (int, optional): remote host connection port. Defaults to 22. Returns: str: '/local/path/to/file' or 'ERROR'. """ client = SSHClient() client.set_missing_host_key_policy(AutoAddPolicy()) try: client.connect(hostname=hostname, username=username, password=password, port=port) with client.open_sftp() as sftp: sftp.get(remotepath=src_file, localpath=dst_file) client.close() except Exception as error: logging.debug( msg='' + '\n' + 'host: ' + hostname + ':' + str(port) + '\n' + 'user: ' + username + '\n' + 'pass: ' + password + '\n' + 'src_file: ' + src_file + '\n' + 'dst_file: ' + dst_file + '\n' + 'error: ' + str(error) ) return 'ERROR' ''' @staticmethod def ftp_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str) -> bool: dst_path = dst_file.split('/')[:-1] ftp = FTP(host=hostname) try: ftp.login(user=username, passwd=password) for path_item in dst_path: if path_item.strip() == '': continue path_item = path_item.replace('/', '') try: ftp.cwd(path_item) except Exception: ftp.mkd(path_item) ftp.cwd(path_item) except Exception: pass with open(src_file, "rb") as file: ftp.storbinary(f"STOR {dst_file}", file) ftp.quit() return True ''' @staticmethod def ftp_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str): ftp = FTP(host=hostname) try: ftp.login(user=username, passwd=password) with open(dst_file, "wb") as file: ftp.retrbinary(f"RETR {src_file}", file.write) ftp.quit() except Exception: pass ''' ''' @staticmethod def xmlrpc(): pass ''' class HikISAPI(Connect): """Representing Hikvision device with ISAPI. The class inherits the necessary connection methods of the Connect class """ def __init__( self, hostname: str, username: str, userpass: str, authtype: str = 'digest', hostport: int = 80, protocol: str = 'http', channel: int = 101, videoid: int = 1 ) -> None: """Object constructor. Args: hostname (str): camera hostname or ip address. username (str): camera admin username. userpass (str): camera admin password. authtype (str, optional): digest|basic camera authentication type. Defaults to 'digest'. hostport (int, optional): camera connection port. Defaults to 80. protocol (str, optional): camera connection protocol. Defaults to 'http'. channel (int, optional): camera channel id. Defaults to 101. videoid (int, optional): camera video id. Defaults to 1. """ self._host = hostname self._port = hostport self._user = username self._pswd = userpass self._auth = authtype self._prot = protocol self._chan = channel self._viid = videoid def __call( self, url: str, method: str = 'GET', contenttype: str = 'application/x-www-form-urlencoded', contentdata: str = '' ) -> str: """Send request to camera. Args: url (str): API path for request. method (str, optional): HTTP request method. Defaults to 'GET'. contenttype (str, optional): Content-Type header. Defaults to 'application/x-www-form-urlencoded'. contentdata (str, optional): data for send with request. Defaults to ''. Returns: str: HTTP response content. """ return self.http( url=url, method=method, username=self._user, password=self._pswd, authtype=self._auth, contenttype=contenttype, contentdata=contentdata ) def capabilities(self) -> bool: """Get camera capabilities. Returns: bool: True if successed. Printing a response with a logger at the INFO level. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/PTZCtrl/channels/" + str(self._viid) + "/capabilities" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.info(msg='\n' + response + '\n') return True else: return False def downloadjpeg(self, dst_file: str = path.splitext(__file__)[0] + '.jpeg', x: int = 1920, y: int = 1080) -> bool: """Get static picture from camera. Args: dst_file (str, optional): absolute path of picture to save. Defaults to scriptname+'.jpeg'. x (int, optional): picture width. Defaults to 1920. y (int, optional): picture height. Defaults to 1080. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/Streaming/channels/" + str(self._viid) + "/picture?snapShotImageType=JPEG&videoResolutionWidth=" + str(x) + "&videoResolutionHeight=" + str(y) ) with open(dst_file, "wb") as file: response = self.__call(url=url, method='GET') if response != 'ERROR': file.write(response) logging.debug(msg='\n' + dst_file + '\n') return True else: return False def getcamerapos(self) -> bool: """Get current camera position. Returns: bool: True if successed. Printing a response with a logger at the INFO level. """ url = (self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status") response = self.__call(url=url, method='GET') if response != 'ERROR': logging.info(msg='\n' + response + '\n') return True else: return False def rebootcamera(self) -> bool: """Set camera reboot command. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/System/reboot" ) response = self.__call(url=url, method="PUT") if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptzmovyyu(self, speed: int = 1) -> bool: """Start camera moving to up. Args: speed (int, optional): moving speed from 1 to 7. Defaults to 1. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=TILT_UP&speed=" + str(speed) + "&mode=start" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptzmovyyd(self, speed: int = 1) -> bool: """Start camera moving to down. Args: speed (int, optional): moving speed from 1 to 7. Defaults to 1. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=TILT_DOWN&speed=" + str(speed) + "&mode=start" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptzmovxxl(self, speed: int = 1) -> bool: """Start camera moving to left. Args: speed (int, optional): moving speed from 1 to 7. Defaults to 1. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=PAN_LEFT&speed=" + str(speed) + "&mode=start" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptzmovxxr(self, speed: int = 1) -> bool: """Start camera moving to right. Args: speed (int, optional): moving speed from 1 to 7. Defaults to 1. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=PAN_RIGHT&speed=" + str(speed) + "&mode=start" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptzmovzzi(self, speed: int = 1) -> bool: """Start camera zoom in. Args: speed (int, optional): moving speed from 1 to 7. Defaults to 1. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=ZOOM_OUT&speed=" + str(speed) + "&mode=start" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptzmovzzo(self, speed: int = 1) -> bool: """Start camera zoom out. Args: speed (int, optional): moving speed from 1 to 7. Defaults to 1. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=ZOOM_IN&speed=" + str(speed) + "&mode=start" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptzpreset(self, preset: int, speed: int = 1) -> bool: """Start camera moving to preset. Args: preset (int): saved preset number. speed (int, optional): moving speed from 1 to 7. Defaults to 1. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=GOTO_PRESET&presetNo=" + str(preset) + "&speed=" + str(speed) + "&mode=start" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setptztostop(self) -> bool: """Stop any camera moving. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/PTZ/channels/" + str(self._viid) + "/PTZControl?command=GOTO_PRESET&mode=stop" ) response = self.__call(url=url, method='GET') if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setcamerapos(self, x: int = 0, y: int = 0, z: int = 0) -> bool: """Set camera moving to absolute position. Args: x (int, optional): horisontal camera position from 0 to 3600. Defaults to 0. y (int, optional): vertical camera position from -900 to 2700. Defaults to 0. z (int, optional): zoom camera position from 0 to 1000. Defaults to 0. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/absolute" ) xml = ''.join( '' + '' + '' + str(y) + '' + '' + str(x) + '' + '' + str(z) + '' + '' ) response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml) if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def __setcameramovcon(self, x: int = 0, y: int = 0, z: int = 0) -> bool: """Set camera moving to direction until other signal or 180 seconds elapse. Args: x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0. y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0. z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/continuous" ) xml = ''.join( '' + '' + '' + str(x) + '' + '' + str(y) + '' + '' + str(z) + '' + '' ) response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml) if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def __setcameramovmom(self, x: int = 0, y: int = 0, z: int = 0, t: int = 180000) -> bool: """Set camera moving to direction until other signal or duration elapse. Args: x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0. y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0. z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0. t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 180000. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/momentary" ) xml = ''.join( '' + '' + '' + str(x) + '' + '' + str(y) + '' + '' + str(z) + '' + '' + str(t) + '' + '' ) response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml) if response != 'ERROR': sleep(t/1000) logging.debug(msg='\n' + response + '\n') return True else: return False def setcameramov(self, x: int = 0, y: int = 0, z: int = 0, t: int = 0) -> bool: """Set camera moving to direction (polymorph abstraction). Args: x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0. y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0. z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0. t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 0. Returns: bool: True if successed. """ if t == '-' or int(t) == 0: return self.__setcameramovcon(x=int(x), y=int(y), z=int(z)) else: return self.__setcameramovmom(x=int(x), y=int(y), z=int(z), t=int(t)) def setmovtohome(self) -> bool: """Set camera moving to homeposition. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/homeposition/goto" ) xml = ''.join( '' + '' ) response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml) if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def setposashome(self) -> bool: """Save current camera position as homeposition. Returns: bool: True if successed. """ url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/homeposition" ) xml = ''.join( '' + '' ) response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml) if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False def settextonosd(self, enabled: str = "true", x: int = 0, y: int = 0, message: str = "") -> bool: """Set message as video overlay text. Args: enabled (str, optional): true or false. Defaults to "true". x (int, optional): horizontal text position from 0 to video width. Defaults to 0. y (int, optional): vertical text position from 0 to video heith. Defaults to 0. message (str, optional): overlay text content. Defaults to "". Returns: bool: True if successed. """ if message == '-': message = "" url = ( self._prot + '://' + self._host + ':' + str(self._port) + "/ISAPI/System/Video/inputs/channels/" + str(self._chan) + "/overlays/text" ) xml = ''.join( '' + '' + '' + '1' + '' + enabled + '' + '' + str(x) + '' + '' + str(y) + '' + '' + message + '' + '' + '' ) response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml) if response != 'ERROR': logging.debug(msg='\n' + response + '\n') return True else: return False class Sensor(Connect): """Representing sensor connected to remote host. The class inherits the necessary connection methods of the Connect class """ def __init__( self, hostname: str, username: str, userpass: str, nodetype: str, nodename: str, hostport: int = 22 ) -> None: """Object constructor. Args: hostname (str): sensor's remote host hostname or ip address. username (str): sensor's remote host username. userpass (str): sensor's remote host password. nodetype (str): 'ds18b20' or other sensor type. nodename (str): 28-1a2b3c4d5e6f (ds18b20 example). hostport (int, optional): sensor's remote host connection port. Defaults to 22. """ self._host = hostname self._port = hostport self._user = username self._pswd = userpass self._type = nodetype self._node = nodename def __call(self, command: str) -> str: """Send request to sensor's remote host. Args: command (str): command to poll the sensor. Returns: str: sensor's remote host response content. """ return self.ssh_commands( command=command, hostname=self._host, port=self._port, username=self._user, password=self._pswd ) def __temperature(self, nodename: str) -> str: """Preparating request for ds18b20 sensor type. Args: nodename (str): 28-1a2b3c4d5e6f (ds18b20 example). Returns: str: formatted string with temperature in Celsius. """ command = 'cat /sys/bus/w1/devices/' + nodename + '/temperature' response = self.__call(command=command) if response != 'ERROR': try: temperature = str(int(response)//1000) + "'C" return temperature except Exception as error: logging.debug( msg='' + '\n' + 'host: ' + self._host + ':' + str(self._port) + '\n' + 'user: ' + self._user + '\n' + 'pass: ' + self._pswd + '\n' + 'command: ' + command + '\n' + 'error: ' + str(error) ) return 'ERROR' def value(self) -> str: """Public method to get sensor value. Returns: str: sensor value. """ if self._type == 'ds18b20': return self.__temperature(nodename=self._node) class Sequence: """Sequence handling. """ @staticmethod def run( device: HikISAPI, sensors: dict, sequence: dict, records_root_path: str = None, records_root_user: str = None, records_root_pass: str = None ) -> None: """Sequences executor. Args: device (HikISAPI): HikISAPI object. sensors (dict): collection as key=sensorname:value=Sensor object. sequence (dict): sequence steps collection. records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory. Defaults to None. records_root_user (str, optional): username if path on remote host. Defaults to None. records_root_pass (str, optional): password if path on remote host. Defaults to None. """ for key, value in sequence.items(): action = value.split(',')[0].strip() x = value.split(',')[1].strip() y = value.split(',')[2].strip() z = value.split(',')[3].strip() p = value.split(',')[4].strip() s = value.split(',')[5].strip() t = value.split(',')[6].strip() w = value.split(',')[7].strip() m = value.split(',')[8].strip() if 'sensor-config:' in m: sensor_name = m.split(':')[1].strip() sensor_value = sensors[sensor_name].value() if sensor_value != 'ERROR': m = sensor_value else: m = '' logging.info( msg=' action:' + key + ' = ' + action + ',' + x + ',' + y + ',' + z + ',' + p + ',' + s + ',' + t + ',' + w + ',' + m ) if action == 'capabilities': response = device.capabilities() elif action == 'getcamerapos': response = device.getcamerapos() elif action == 'rebootcamera': response = device.rebootcamera() elif action == 'setptzmovyyu': response = device.setptzmovyyu(speed=int(s)) elif action == 'setptzmovyyd': response = device.setptzmovyyd(speed=int(s)) elif action == 'setptzmovxxl': response = device.setptzmovxxl(speed=int(s)) elif action == 'setptzmovxxr': response = device.setptzmovxxr(speed=int(s)) elif action == 'setptzmovzzi': response = device.setptzmovzzi(speed=int(s)) elif action == 'setptzmovzzo': response = device.setptzmovzzo(speed=int(s)) elif action == 'setptzpreset': response = device.setptzpreset(preset=int(p), speed=int(s)) elif action == 'setptztostop': response = device.setptztostop() elif action == 'setcamerapos': response = device.setcamerapos(x=int(x), y=int(y), z=int(z)) elif action == 'setcameramov': response = device.setcameramov(x=int(x), y=int(y), z=int(z), t=t) elif action == 'setmovtohome': response = device.setmovtohome() elif action == 'setposashome': response = device.setposashome() elif action == 'settextonosd': response = device.settextonosd(x=int(x), y=int(y), message=m) elif action == 'downloadjpeg': records_root_temp = records_root_path if records_root_temp != path.dirname(path.realpath(__file__)): records_root_temp = path.dirname(path.realpath(__file__)) + sep + 'temp' makedirs(records_root_temp, exist_ok=True) dy = datetime.now().strftime('%Y') dm = datetime.now().strftime('%m') dv = datetime.now().strftime('%V') dd = datetime.now().strftime('%d') th = datetime.now().strftime('%H') tm = datetime.now().strftime('%M') ts = datetime.now().strftime('%S') records_file_name = (key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg') if device.downloadjpeg(x=int(x), y=int(y), dst_file=records_root_temp + sep + records_file_name): hostname = 'localhost' hostport, hosttype = None, None username = records_root_user userpass = records_root_pass hostpath = records_root_path if '://' in records_root_path: hostname = records_root_path.split('/')[2] hosttype = records_root_path.split('://')[0] if hosttype == 'ftp': hostport = 21 if hosttype == 'sftp': hostport = 22 if hosttype == 'smb': hostport = 445 hostpath = records_root_path.replace(hosttype + '://' + hostname, '') if '@' in hostname: username = hostname.split('@')[0].split(':')[0] userpass = hostname.split('@')[0].split(':')[1] hostname = hostname.split('@')[1] if ':' in hostname: hostport = int(hostname.split(':')[1]) hostname = hostname.split(':')[0] if hosttype == 'ftp': src_file = records_root_temp + sep + records_file_name dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name if Connect.ftp_put_file( src_file=src_file, dst_file=dst_file, hostname=hostname, username=username, password=userpass ): try: remove(src_file) except OSError: pass elif hosttype == 'sftp': src_file = records_root_temp + sep + records_file_name dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name response = Connect.ssh_put_file( src_file=src_file, dst_file=dst_file, hostname=hostname, port=hostport, username=username, password=userpass) if response != 'ERROR': try: remove(src_file) except OSError: pass response = True else: response = False else: src_file = records_root_temp + sep + records_file_name dst_file = hostpath + sep + dy + sep + dm + sep + dv + sep + dd + sep + records_file_name try: makedirs(hostpath + sep + dy + sep + dm + sep + dv + sep + dd, exist_ok=True) replace(src=src_file, dst=dst_file) response = True except Exception as error: logging.debug( msg='' + '\n' + 'src_file: ' + src_file + '\n' + 'dst_file: ' + dst_file + '\n' + 'error: ' + str(error) ) response = False else: response = False if w != '-' or float(w) != 0: sleep(float(w)) if response: logging.info(msg=' result:' + key + ' = OK') else: logging.warning(msg='result:' + key + ' = ERROR') class Proc: """Find a running process from Python. """ @classmethod def _list_windows(cls) -> list: """Find all running process with wmi. Returns: list: dictionaries with descriptions of found processes. """ execlist = [] separate = b'\r\r\n' out, err = Popen( [ 'wmic', 'process', 'get', 'CommandLine,ExecutablePath,Name,ProcessId', '/format:list' ], stdout=PIPE, stderr=PIPE ).communicate() for line in out.split(separate + separate): execpid, exename, exepath, cmdline = None, None, None, None for subline in line.split(separate): if b'ProcessId=' in subline: execpid = subline.split(b'=')[1].decode('utf-8') if b'Name=' in subline: exename = subline.split(b'=')[1].decode('utf-8') if b'ExecutablePath=' in subline: exepath = subline.split(b'=')[1].decode('utf-8') if b'CommandLine=' in subline: cmdline = subline.split(b'=')[1].decode('utf-8') if execpid and exename: execlist.append( { 'execpid': execpid, 'exename': exename, 'exepath': exepath, 'cmdline': cmdline } ) return execlist @classmethod def _list_linux(cls) -> list: """Find all running process with ps. Returns: list: dictionaries with descriptions of found processes. """ execlist = [] out, err = Popen( [ '/bin/ps', '-eo', 'pid,args' ], stdout=PIPE, stderr=PIPE ).communicate() for line in out.splitlines(): execpid = line.split()[0].decode('utf-8') exepath = line.split()[1].decode('utf-8') exename = path.basename(exepath) cmdline = line.split(None, 1)[1].decode('utf-8') if execpid and exename: execlist.append( { 'execpid': execpid, 'exename': exename, 'exepath': exepath, 'cmdline': cmdline } ) return execlist @classmethod def list_all(cls) -> list: """Find all running process. Returns: list: dictionaries with descriptions of found processes. """ if platform.startswith('linux') or platform.startswith('darwin'): return cls._list_linux() elif platform.startswith('win32'): return cls._list_windows() else: return None @classmethod def search(cls, find: str, exclude: str = None) -> list: """Find specified processes. Args: find (str): find process pid, name or arguments. exclude (str, optional): exclude process pid, name or arguments. Defaults to None. Returns: list: dictionaries with descriptions of found processes. """ proc_found = [] try: for proc in cls.list_all(): if exclude and ( exclude in proc['execpid'] or exclude in proc['exename'] or exclude in proc['exepath'] or exclude in proc['cmdline'] ): pass elif ( find in proc['execpid'] or find in proc['exename'] or find in proc['exepath'] or find in proc['cmdline'] ): proc_found.append(proc) except TypeError as ex: print('ON', platform, 'PLATFORM', 'search ERROR:', ex) finally: if len(proc_found) == 0: return None else: return proc_found @classmethod def kill(cls, pid: int) -> None: """Kill the process by means of the OS. Args: pid (int): process ID. """ if platform.startswith('linux') or platform.startswith('darwin'): Popen(['kill', '-s', 'SIGKILL', str(pid)]) elif platform.startswith('win32'): Popen(['taskkill', '/PID', str(pid), '/F']) class FFmpeg: """FFmpeg management from Python. """ @classmethod def run( cls, src: (str, type(None)) = None, dst: str = None, fps: int = None, preset: str = None, raw: (str, type(None)) = None, ffpath: str = None, watchdog: bool = False, watchsec: int = None, onlyonce: bool = False ) -> int: """Running the installed ffmpeg. Args: src (str, type, optional): sources urls, example: 'rtsp://user:pass@host:554/Streaming/Channels/101, anull'. Defaults to None. dst (str, optional): destination url, example: 'rtp://239.0.0.1:5554'. Defaults to None. fps (int, optional): frame per second encoding output. Defaults to None. preset (str, optional): 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p. Defaults to None. raw (str, type, optional): custom ffmpeg parameters string. Defaults to None. ffpath (str, optional): custom path to bin, example: /usr/bin/ffmpeg. Defaults to None. watchdog (bool, optional): detect ffmpeg freeze and terminate. Defaults to False. watchsec (int, optional): seconds to wait before watchdog terminates. Defaults to None. onlyonce (bool, optional): detect ffmpeg running copy and terminate. Defaults to False. Returns: int: ffmpeg return code """ if not raw: process = ('' + cls._bin(ffpath).split() + cls._src(src).split() + cls._preset(preset, fps).split() + cls._dst(dst).split() ) else: process = cls._bin(ffpath).split() + raw.split() if onlyonce and Proc.search(' '.join(process)): print('Process already exist, exit...') else: logging.info(msg='Starting ' + ' '.join(process)) with Popen(process, stdout=PIPE, stderr=STDOUT) as proc: que = None if watchdog: que = Queue() Process( target=cls._watchdog, args=(proc.pid, watchsec, que,), daemon=True ).start() for line in proc.stdout: if not que: print(line, flush=True) else: que.put(line) return proc.returncode @classmethod def _bin(cls, ffpath: str, tool: str = 'ffmpeg') -> str: """Returns the path to the bin depending on the OS. Args: ffpath (str): custom path to bin. tool (str, optional): 'ffmpeg', 'ffprobe'. Defaults to 'ffmpeg'. Returns: str: path to bin or None, if path does not exist. """ faq = ( '\n' 'Main download page: https://ffmpeg.org/download.html\n' '\n' 'Install on Linux (Debian):\n' '\tsudo apt install -y ffmpeg\n' '\tTarget: /usr/bin/ffmpeg\n' '\n' 'Install on Windows:\n' '\tDownload and extract archive from: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n' '\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n' '\n' 'Install on MacOS:\n' '\tDownload and extract archive from: https://evermeet.cx/ffmpeg/\n' '\tTarget: /usr/bin/ffmpeg\n' ) if not ffpath: if platform.startswith('linux') or platform.startswith('darwin'): if tool == 'ffprobe': ffpath = '/usr/bin/ffprobe' else: ffpath = '/usr/bin/ffmpeg' elif platform.startswith('win32'): if tool == 'ffprobe': ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffprobe.exe" else: ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffmpeg.exe" if path.exists(ffpath): return ffpath else: print('ON', platform, 'PLATFORM', 'not found', tool, faq) return None @classmethod def _src(cls, sources: str) -> list: """Parsing sources into ffmpeg format. Args: sources (str): comma-separated list of sources in string format. Returns: list: ffmpeg format list of sources. """ list_sources = [] for src in sources.split(','): src = src.strip() if 'null' in src: src = ' '.join(['-f lavfi -i', src]) elif 'rtsp' in src: src = ' '.join(['-rtsp_transport tcp -i', src]) else: src = ' '.join(['-stream_loop -1 -re -i', src]) list_sources.append(src) return ' '.join(list_sources) @classmethod def _preset(cls, choice: str, fps: int) -> str: """Parsing preset into ffmpeg format. Args: choice (str): preset selection. fps (int): frame per second encoding output. Returns: str: ffmpeg format encoding parameters. """ tune = '-tune zerolatency' video = '-c:v copy' audio = '-c:a aac -b:a 128k' width, height, kbps = None, None, None if choice: if choice == '240p': width, height, kbps = 426, 240, 480 if choice == '360p': width, height, kbps = 640, 360, 720 if choice == '480p': width, height, kbps = 854, 480, 1920 if choice == '720p': width, height, kbps = 1280, 720, 3960 if choice == '1080p': width, height, kbps = 1920, 1080, 5940 if choice == '1440p': width, height, kbps = 2560, 1440, 12960 if choice == '2160p': width, height, kbps = 3840, 2160, 32400 if width and height and kbps: video = ''.join( [ '-vf scale=', str(width), ':', str(height), ',setsar=1:1' ] ) video = ' '.join( [ video, '-c:v libx264 -pix_fmt yuv420p -preset ultrafast' ] ) if fps: video = ' '.join([video, '-r', str(fps), '-g', str(fps * 2)]) video = ' '.join([video, '-b:v', str(kbps) + 'k']) return ' '.join([tune, video, audio]) @classmethod def _dst(cls, destination: str) -> str: """Parsing destination into ffmpeg format. Args: destination (str): destination path or url. Returns: str: ffmpeg format destination. """ container = '-f null' stdout = '-v debug' # '-nostdin -nostats' # '-report' if destination: if 'rtmp' in destination: container = '-f flv' elif "rtp" in destination: container = '-f rtp_mpegts' else: destination = '-' return ' '.join([container, destination, stdout]) @classmethod def _watchdog(cls, pid: int, sec: int, que: Queue = None) -> None: """If no data arrives in the queue, kill the process. Args: pid (int): process ID. sec (int): seconds to wait for data. que (Queue, optional): queue pointer. Defaults to None. """ if not sec: sec = 5 if que: while True: while not que.empty(): print(que.get()) sleep(sec) if que.empty(): Proc.kill(pid) print('exit by watchdog') break exit() @classmethod def probe( cls, target: (str, type(None)) = None, raw: (str, type(None)) = None, ffpath: str = None ) -> (dict, bytes, None): """Running the installed ffprobe. Args: target (str, type, optional): media file path to probe. Defaults to None. raw (str, type, optional): custom ffprobe parameters string. Defaults to None. ffpath (str, optional): custom path to bin, example: /usr/bin/ffprobe. Defaults to None. Returns: dict, bytes, None: ffprobe response or None. """ if not raw: command = ([] + cls._bin(ffpath=ffpath, tool='ffprobe').split() + ('-i ' + target + ' -v quiet -print_format json -show_format -show_programs -show_streams').split() ) else: command = cls._bin(ffpath=ffpath, tool='ffprobe').split() + raw.split() with Popen(command, stdout=PIPE, stderr=STDOUT) as process: result = process.communicate() if process.returncode == 0 and not raw: return json.loads(result[0].decode('utf-8')) elif process.returncode == 0 and raw: return result[0] else: return None if __name__ == "__main__": time_start = datetime.now() args = ArgumentParser( prog='cctv-scheduler', description='Hikvision PTZ IP-Camera management.', epilog='Dependencies: ' '- Python 3 (tested version 3.9.5), ' '- Python 3 modules: paramiko ' ) args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf', required=False, help='custom configuration file path') args.add_argument('-b', '--broadcast', action='store_true', required=False, help='streaming media to destination') args.add_argument('-s', '--sequences', action='store_true', required=False, help='run sequences from config file') args.add_argument('-c', '--converter', action='store_true', required=False, help='convert JPEG collection to MP4') args.add_argument('-p', '--publisher', action='store_true', required=False, help='publish content from templates') args = vars(args.parse_args()) log_root = path.dirname(path.realpath(__file__)) log_level = 'INFO' if path.exists(args['config']): conf = Parse(parameters=args['config'], block='common') if 'log_root' in conf.data: log_root = conf.data['log_root'] if 'log_level' in conf.data: if conf.data['log_level'] == 'DEBUG': log_level = logging.DEBUG elif conf.data['log_level'] == 'INFO': log_level = logging.INFO elif conf.data['log_level'] == 'WARNING': log_level = logging.WARNING elif conf.data['log_level'] == 'ERROR': log_level = logging.ERROR elif conf.data['log_level'] == 'CRITICAL': log_level = logging.CRITICAL logging.basicConfig( format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d_%H.%M.%S', handlers=[ logging.FileHandler( filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log', mode='a' ), logging.StreamHandler() ], level=log_level ) logging.getLogger("paramiko").setLevel(logging.WARNING) if args['broadcast']: logging.info(msg='Starting streaming media to destination') broadcasts = {} conf = Parse(parameters=args['config'], block='enable-broadcast') for key, value in conf.data.items(): if value == 'true': broadcast_config = Parse( parameters=args['config'], block='broadcast-config:' + key ).data src = None if 'src' in broadcast_config: src = broadcast_config['src'] dst = None if 'dst' in broadcast_config: dst = broadcast_config['dst'] fps = None if 'fps' in broadcast_config: fps = broadcast_config['fps'] preset = None if 'preset' in broadcast_config: preset = broadcast_config['preset'] ffpath = None if 'ffpath' in broadcast_config: ffpath = broadcast_config['ffpath'] watchdog = None if 'watchdog' in broadcast_config: watchdog = broadcast_config['watchdog'] watchsec = None if 'watchsec' in broadcast_config: watchsec = int(broadcast_config['watchsec']) onlyonce = None if 'onlyonce' in broadcast_config: onlyonce = broadcast_config['onlyonce'] FFmpeg.run( src=src, dst=dst, fps=fps, preset=preset, ffpath=ffpath, watchdog=watchdog, watchsec=watchsec, onlyonce=onlyonce ) elif args['sequences']: logging.info(msg='Starting PTZ sequences from config file') sensors = {} conf = Parse(parameters=args['config'], block='enable-sensor') for key, value in conf.data.items(): if value == 'true': device_config = Parse( parameters=args['config'], block='sensor-config:' + key ).data device_entity = Sensor( hostname=device_config['hostname'], username=device_config['username'], userpass=device_config['userpass'], nodetype=device_config['nodetype'], nodename=device_config['nodename'] ) sensors[key] = device_entity conf = Parse(parameters=args['config'], block='enable-sequences') for key, value in conf.data.items(): if value == 'true': device_sequence = Parse( parameters=args['config'], block='camera-sequences:' + key ).data device_config = Parse( parameters=args['config'], block='camera-config:' + key ).data device_entity = HikISAPI( hostname=device_config['hostname'], username=device_config['username'], userpass=device_config['userpass'] ) records_root_path = path.dirname(path.realpath(__file__)) records_root_user = None records_root_pass = None if 'records_root_path' in device_config: records_root_path = device_config['records_root_path'] if 'records_root_user' in device_config: records_root_user = device_config['records_root_user'] if 'records_root_pass' in device_config: records_root_pass = device_config['records_root_pass'] Sequence.run( device=device_entity, sensors=sensors, sequence=device_sequence, records_root_path=records_root_path, records_root_user=records_root_user, records_root_pass=records_root_pass ) elif args['converter']: logging.info(msg='Starting convert JPEG collection to MP4') elif args['publisher']: logging.info(msg='Starting publish content from templates') else: logging.info(msg='Start arguments was not selected. Exit.') time_execute = datetime.now() - time_start logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')