generated from pavel.muhortov/template-bash
1241 lines
47 KiB
Python
1241 lines
47 KiB
Python
#!/usr/bin/env python3
|
|
|
|
|
|
import logging
|
|
import urllib.request
|
|
from argparse import ArgumentParser
|
|
from datetime import datetime
|
|
from ftplib import FTP
|
|
from os import path, sep, makedirs, remove, replace
|
|
from time import sleep
|
|
from paramiko import SSHClient, AutoAddPolicy
|
|
|
|
|
|
class Parse:
|
|
"""Parser of configs, arguments, parameters.
|
|
"""
|
|
def __init__(self, parameters, block: str = None) -> None:
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
parameters: dictionary as "key":"value" or
|
|
ArgumentParser class object or
|
|
string path to the file or
|
|
string as "var1=val1;var2=val2".
|
|
block (str, optional): name of target block from text. Defaults to None.
|
|
"""
|
|
self.path = ''
|
|
self.data = {}
|
|
if type(parameters) is dict:
|
|
self._dict2dict(parameters)
|
|
if type(parameters) is ArgumentParser:
|
|
self._dict2dict(self.argv2dict(parameters))
|
|
if type(parameters) is str:
|
|
if path.exists(parameters):
|
|
self._dict2dict(
|
|
self.strs2dict(
|
|
self.conf2strs(parameters),
|
|
block
|
|
)
|
|
)
|
|
self.path = parameters
|
|
else:
|
|
self._dict2dict(self.strs2dict(parameters, block))
|
|
|
|
def __str__(self) -> str:
|
|
"""Overrides method for print(object).
|
|
|
|
Returns:
|
|
str: string with contents of the object's dictionary.
|
|
"""
|
|
string = ''
|
|
for key, val in self.data.items():
|
|
string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n'
|
|
return string
|
|
|
|
def _dict2dict(self, dictionary: dict) -> None:
|
|
"""Updates or adds dictionary data.
|
|
|
|
Args:
|
|
dictionary (dict): dictionary as "key":"value".
|
|
"""
|
|
self.data.update(dictionary)
|
|
|
|
def expand(self, store: str = None) -> dict:
|
|
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
|
|
|
|
Args:
|
|
store (str, optional): path to directory with name.conf. Defaults to None.
|
|
|
|
Returns:
|
|
dict: expanded dictionary as "key":{subkey: subval}.
|
|
"""
|
|
for key in self.data:
|
|
if store:
|
|
config = store + sep + self.data[key]
|
|
else:
|
|
config = self.data[key]
|
|
with open(config) as file:
|
|
self.data[key] = Parse(file.read()).data
|
|
return self.data
|
|
|
|
@classmethod
|
|
def argv2dict(cls, parser: ArgumentParser) -> dict:
|
|
"""Converts startup arguments to a dictionary.
|
|
|
|
Args:
|
|
parser (ArgumentParser): argparse.ArgumentParser class object.
|
|
|
|
Returns:
|
|
dict: dictionary as "key":"value".
|
|
"""
|
|
parser = ArgumentParser(add_help=False, parents=[parser])
|
|
return vars(parser.parse_args())
|
|
|
|
@classmethod
|
|
def conf2strs(cls, config: str) -> str:
|
|
"""Builds a dictionary from a file containing parameters.
|
|
|
|
Args:
|
|
config (str): path to the config file.
|
|
|
|
Returns:
|
|
str: string as "var1=val1;\nvar2=val2;".
|
|
"""
|
|
with open(config) as file:
|
|
raw = file.read()
|
|
strs = ''
|
|
for line in raw.splitlines():
|
|
if not line.lstrip().startswith('#'):
|
|
strs += line + '\n'
|
|
return strs
|
|
|
|
@classmethod
|
|
def strs2dict(cls, strings: str, blockname: str) -> dict:
|
|
"""Builds a dictionary from a strings containing parameters.
|
|
|
|
Args:
|
|
strings (str): string as "var1=val1;var2=val2;".
|
|
blockname (str): name of target block from text.
|
|
|
|
Returns:
|
|
dict: dictionary as "key":"value".
|
|
"""
|
|
dictionary = {}
|
|
if blockname:
|
|
strings = cls.block(blockname, strings)
|
|
for line in strings.replace('\n', ';').split(';'):
|
|
if not line.lstrip().startswith('#') and "=" in line:
|
|
dictionary[line.split('=')[0].strip()] = line.split('=')[1].strip().split(';')[0].strip()
|
|
return dictionary
|
|
|
|
@classmethod
|
|
def str2bool(cls, value: str) -> bool:
|
|
"""Converts a string value to boolean.
|
|
|
|
Args:
|
|
value (str): string containing "true" or "false", "yes" or "no", "1" or "0".
|
|
|
|
Returns:
|
|
bool: bool True or False.
|
|
"""
|
|
return str(value).lower() in ("true", "yes", "1")
|
|
|
|
@classmethod
|
|
def block(cls, blockname: str, text: str) -> str:
|
|
"""Cuts a block of text between line [blockname] and line [next block] or EOF.
|
|
|
|
Args:
|
|
blockname (str): string in [] after which the block starts.
|
|
text (str): string of text from which the block is needed.
|
|
|
|
Returns:
|
|
str: string of text between line [block name] and line [next block].
|
|
"""
|
|
level = 1
|
|
save = False
|
|
result = ''
|
|
for line in text.splitlines():
|
|
if line.startswith('[') and blockname in line:
|
|
level = line.count('[')
|
|
save = True
|
|
elif line.startswith('[') and '['*level in line:
|
|
save = False
|
|
elif save:
|
|
result += line + '\n'
|
|
return result
|
|
|
|
|
|
class Connect:
|
|
"""Set of connection methods (functions) for various protocols.
|
|
"""
|
|
@staticmethod
|
|
def http(
|
|
url: str, method: str = 'GET',
|
|
username: str = '', password: str = '', authtype: str = None,
|
|
contenttype: str = 'text/plain', contentdata: str = ''
|
|
) -> str:
|
|
"""Handling HTTP request.
|
|
|
|
Args:
|
|
url (str): request url.
|
|
method (str, optional): HTTP request method. Defaults to 'GET'.
|
|
username (str, optional): username for url authentication. Defaults to ''.
|
|
password (str, optional): password for url authentication. Defaults to ''.
|
|
authtype (str, optional): digest|basic authentication type. Defaults to None.
|
|
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
|
|
contentdata (str, optional): content data. Defaults to ''.
|
|
|
|
Returns:
|
|
str: HTTP response or 'ERROR'.
|
|
"""
|
|
|
|
# Preparing authorization
|
|
if authtype:
|
|
pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm()
|
|
pswd.add_password(None, url, username, password)
|
|
if authtype == 'basic':
|
|
auth = urllib.request.HTTPBasicAuthHandler(pswd)
|
|
if authtype == 'digest':
|
|
auth = urllib.request.HTTPDigestAuthHandler(pswd)
|
|
urllib.request.install_opener(urllib.request.build_opener(auth))
|
|
|
|
# Preparing request
|
|
request = urllib.request.Request(url=url, data=bytes(contentdata.encode('utf-8')), method=method)
|
|
request.add_header('Content-Type', contenttype)
|
|
|
|
# Response
|
|
try:
|
|
response = urllib.request.urlopen(request).read()
|
|
logging.debug(
|
|
msg=''
|
|
+ '\n' + 'uri: ' + url
|
|
+ '\n' + 'method: ' + method
|
|
+ '\n' + 'username: ' + username
|
|
+ '\n' + 'password: ' + password
|
|
+ '\n' + 'authtype: ' + authtype
|
|
+ '\n' + 'content-type: ' + contenttype
|
|
+ '\n' + 'content-data: ' + contentdata
|
|
)
|
|
if response.startswith(b'\xff\xd8'):
|
|
return response
|
|
else:
|
|
return str(response.decode('utf-8'))
|
|
except Exception as error:
|
|
logging.debug(msg='\n' + 'error: ' + str(error))
|
|
return 'ERROR'
|
|
|
|
@staticmethod
|
|
def ssh_commands(command: str, hostname: str, username: str, password: str, port: int = 22) -> str:
|
|
"""Handling SSH command executing.
|
|
|
|
Args:
|
|
command (str): command for executing.
|
|
hostname (str): remote hostname or ip address.
|
|
username (str): remote host username.
|
|
password (str): remote host password.
|
|
port (int, optional): remote host connection port. Defaults to 22.
|
|
|
|
Returns:
|
|
str: terminal response or 'ERROR'.
|
|
"""
|
|
client = SSHClient()
|
|
client.set_missing_host_key_policy(AutoAddPolicy())
|
|
try:
|
|
client.connect(hostname=hostname, username=username, password=password, port=port)
|
|
stdin, stdout, stderr = client.exec_command(command=command, get_pty=True)
|
|
if 'sudo' in command:
|
|
stdin.write(password + '\n')
|
|
stdin.flush()
|
|
stdout.flush()
|
|
data = stdout.read() + stderr.read()
|
|
client.close()
|
|
return data.decode('utf-8')
|
|
except Exception as error:
|
|
logging.debug(
|
|
msg=''
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + username
|
|
+ '\n' + 'pass: ' + password
|
|
+ '\n' + 'command: ' + command
|
|
+ '\n' + 'error: ' + str(error)
|
|
)
|
|
return 'ERROR'
|
|
|
|
@staticmethod
|
|
def ssh_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str:
|
|
"""Handling SFTP upload file.
|
|
|
|
Args:
|
|
src_file (str): /local/path/to/file.
|
|
dst_file (str): /remote/path/to/file.
|
|
hostname (str): remote hostname or ip address.
|
|
username (str): remote host username.
|
|
password (str): remote host password.
|
|
port (int, optional): remote host connection port. Defaults to 22.
|
|
|
|
Returns:
|
|
str: '/remote/path/to/file' or 'ERROR'.
|
|
"""
|
|
client = SSHClient()
|
|
client.set_missing_host_key_policy(AutoAddPolicy())
|
|
try:
|
|
client.connect(hostname=hostname, username=username, password=password, port=port)
|
|
client.exec_command('mkdir -p ' + path.dirname(dst_file))
|
|
try:
|
|
sftp = client.open_sftp()
|
|
sftp.put(localpath=src_file, remotepath=dst_file)
|
|
sftp.stat(dst_file)
|
|
sftp.close()
|
|
return dst_file
|
|
except Exception as error:
|
|
logging.debug(
|
|
msg=''
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'error: ' + str(error)
|
|
)
|
|
return 'ERROR'
|
|
except Exception as error:
|
|
logging.debug(
|
|
msg=''
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + username
|
|
+ '\n' + 'pass: ' + password
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'error: ' + str(error)
|
|
)
|
|
return 'ERROR'
|
|
'''
|
|
@staticmethod
|
|
def ssh_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str:
|
|
"""Handling SFTP download file.
|
|
|
|
Args:
|
|
src_file (str): /remote/path/to/file.
|
|
dst_file (str): /local/path/to/file.
|
|
hostname (str): remote hostname or ip address.
|
|
username (str): remote host username.
|
|
password (str): remote host password.
|
|
port (int, optional): remote host connection port. Defaults to 22.
|
|
|
|
Returns:
|
|
str: '/local/path/to/file' or 'ERROR'.
|
|
"""
|
|
client = SSHClient()
|
|
client.set_missing_host_key_policy(AutoAddPolicy())
|
|
try:
|
|
client.connect(hostname=hostname, username=username, password=password, port=port)
|
|
with client.open_sftp() as sftp:
|
|
sftp.get(remotepath=src_file, localpath=dst_file)
|
|
client.close()
|
|
except Exception as error:
|
|
logging.debug(
|
|
msg=''
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + username
|
|
+ '\n' + 'pass: ' + password
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'error: ' + str(error)
|
|
)
|
|
return 'ERROR'
|
|
'''
|
|
@staticmethod
|
|
def ftp_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str) -> bool:
|
|
dst_path = dst_file.split('/')[:-1]
|
|
ftp = FTP(host=hostname)
|
|
try:
|
|
ftp.login(user=username, passwd=password)
|
|
for path_item in dst_path:
|
|
if path_item.strip() == '':
|
|
continue
|
|
path_item = path_item.replace('/', '')
|
|
try:
|
|
ftp.cwd(path_item)
|
|
except Exception:
|
|
ftp.mkd(path_item)
|
|
ftp.cwd(path_item)
|
|
except Exception:
|
|
pass
|
|
|
|
with open(src_file, "rb") as file:
|
|
ftp.storbinary(f"STOR {dst_file}", file)
|
|
ftp.quit()
|
|
return True
|
|
'''
|
|
@staticmethod
|
|
def ftp_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str):
|
|
ftp = FTP(host=hostname)
|
|
try:
|
|
ftp.login(user=username, passwd=password)
|
|
with open(dst_file, "wb") as file:
|
|
ftp.retrbinary(f"RETR {src_file}", file.write)
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
'''
|
|
'''
|
|
@staticmethod
|
|
def xmlrpc():
|
|
pass
|
|
'''
|
|
|
|
|
|
class HikISAPI(Connect):
|
|
"""Representing Hikvision device with ISAPI.
|
|
The class inherits the necessary connection methods of the Connect class
|
|
"""
|
|
def __init__(
|
|
self,
|
|
hostname: str,
|
|
username: str, userpass: str,
|
|
authtype: str = 'digest',
|
|
hostport: int = 80, protocol: str = 'http',
|
|
channel: int = 101, videoid: int = 1
|
|
) -> None:
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
hostname (str): camera hostname or ip address.
|
|
username (str): camera admin username.
|
|
userpass (str): camera admin password.
|
|
authtype (str, optional): digest|basic camera authentication type. Defaults to 'digest'.
|
|
hostport (int, optional): camera connection port. Defaults to 80.
|
|
protocol (str, optional): camera connection protocol. Defaults to 'http'.
|
|
channel (int, optional): camera channel id. Defaults to 101.
|
|
videoid (int, optional): camera video id. Defaults to 1.
|
|
"""
|
|
self._host = hostname
|
|
self._port = hostport
|
|
self._user = username
|
|
self._pswd = userpass
|
|
self._auth = authtype
|
|
self._prot = protocol
|
|
self._chan = channel
|
|
self._viid = videoid
|
|
|
|
def __call(
|
|
self,
|
|
url: str, method: str = 'GET',
|
|
contenttype: str = 'application/x-www-form-urlencoded',
|
|
contentdata: str = ''
|
|
) -> str:
|
|
"""Send request to camera.
|
|
|
|
Args:
|
|
url (str): API path for request.
|
|
method (str, optional): HTTP request method. Defaults to 'GET'.
|
|
contenttype (str, optional): Content-Type header. Defaults to 'application/x-www-form-urlencoded'.
|
|
contentdata (str, optional): data for send with request. Defaults to ''.
|
|
|
|
Returns:
|
|
str: HTTP response content.
|
|
"""
|
|
return self.http(
|
|
url=url, method=method,
|
|
username=self._user, password=self._pswd, authtype=self._auth,
|
|
contenttype=contenttype, contentdata=contentdata
|
|
)
|
|
|
|
def capabilities(self) -> bool:
|
|
"""Get camera capabilities.
|
|
|
|
Returns:
|
|
bool: True if successed. Printing a response with a logger at the INFO level.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._viid) + "/capabilities"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.info(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def downloadjpeg(self, dst_file: str = path.splitext(__file__)[0] + '.jpeg', x: int = 1920, y: int = 1080) -> bool:
|
|
"""Get static picture from camera.
|
|
|
|
Args:
|
|
dst_file (str, optional): absolute path of picture to save. Defaults to scriptname+'.jpeg'.
|
|
x (int, optional): picture width. Defaults to 1920.
|
|
y (int, optional): picture height. Defaults to 1080.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/Streaming/channels/" + str(self._viid)
|
|
+ "/picture?snapShotImageType=JPEG&videoResolutionWidth="
|
|
+ str(x) + "&videoResolutionHeight=" + str(y)
|
|
)
|
|
with open(dst_file, "wb") as file:
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
file.write(response)
|
|
logging.debug(msg='\n' + dst_file + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def getcamerapos(self) -> bool:
|
|
"""Get current camera position.
|
|
|
|
Returns:
|
|
bool: True if successed. Printing a response with a logger at the INFO level.
|
|
"""
|
|
url = (self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status")
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.info(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def rebootcamera(self) -> bool:
|
|
"""Set camera reboot command.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/System/reboot"
|
|
)
|
|
response = self.__call(url=url, method="PUT")
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovyyu(self, speed: int = 1) -> bool:
|
|
"""Start camera moving to up.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=TILT_UP&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovyyd(self, speed: int = 1) -> bool:
|
|
"""Start camera moving to down.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=TILT_DOWN&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovxxl(self, speed: int = 1) -> bool:
|
|
"""Start camera moving to left.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=PAN_LEFT&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovxxr(self, speed: int = 1) -> bool:
|
|
"""Start camera moving to right.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=PAN_RIGHT&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovzzi(self, speed: int = 1) -> bool:
|
|
"""Start camera zoom in.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=ZOOM_OUT&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovzzo(self, speed: int = 1) -> bool:
|
|
"""Start camera zoom out.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=ZOOM_IN&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzpreset(self, preset: int, speed: int = 1) -> bool:
|
|
"""Start camera moving to preset.
|
|
|
|
Args:
|
|
preset (int): saved preset number.
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=GOTO_PRESET&presetNo=" + str(preset)
|
|
+ "&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptztostop(self) -> bool:
|
|
"""Stop any camera moving.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=GOTO_PRESET&mode=stop"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setcamerapos(self, x: int = 0, y: int = 0, z: int = 0) -> bool:
|
|
"""Set camera moving to absolute position.
|
|
|
|
Args:
|
|
x (int, optional): horisontal camera position from 0 to 3600. Defaults to 0.
|
|
y (int, optional): vertical camera position from -900 to 2700. Defaults to 0.
|
|
z (int, optional): zoom camera position from 0 to 1000. Defaults to 0.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/absolute"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData><AbsoluteHigh>'
|
|
+ '<elevation>' + str(y) + '</elevation>'
|
|
+ '<azimuth>' + str(x) + '</azimuth>'
|
|
+ '<absoluteZoom>' + str(z) + '</absoluteZoom>'
|
|
+ '</AbsoluteHigh></PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def __setcameramovcon(self, x: int = 0, y: int = 0, z: int = 0) -> bool:
|
|
"""Set camera moving to direction until other signal or 180 seconds elapse.
|
|
|
|
Args:
|
|
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0.
|
|
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0.
|
|
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/continuous"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData>'
|
|
+ '<pan>' + str(x) + '</pan>'
|
|
+ '<tilt>' + str(y) + '</tilt>'
|
|
+ '<zoom>' + str(z) + '</zoom>'
|
|
+ '</PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def __setcameramovmom(self, x: int = 0, y: int = 0, z: int = 0, t: int = 180000) -> bool:
|
|
"""Set camera moving to direction until other signal or duration elapse.
|
|
|
|
Args:
|
|
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0.
|
|
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0.
|
|
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0.
|
|
t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 180000.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/momentary"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData>'
|
|
+ '<pan>' + str(x) + '</pan>'
|
|
+ '<tilt>' + str(y) + '</tilt>'
|
|
+ '<zoom>' + str(z) + '</zoom>'
|
|
+ '<Momentary><duration>' + str(t) + '</duration></Momentary>'
|
|
+ '</PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
sleep(t/1000)
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setcameramov(self, x: int = 0, y: int = 0, z: int = 0, t: int = 0) -> bool:
|
|
"""Set camera moving to direction (polymorph abstraction).
|
|
|
|
Args:
|
|
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0.
|
|
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0.
|
|
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0.
|
|
t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 0.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
if t == '-' or int(t) == 0:
|
|
return self.__setcameramovcon(x=int(x), y=int(y), z=int(z))
|
|
else:
|
|
return self.__setcameramovmom(x=int(x), y=int(y), z=int(z), t=int(t))
|
|
|
|
def setmovtohome(self) -> bool:
|
|
"""Set camera moving to homeposition.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/homeposition/goto"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData></PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setposashome(self) -> bool:
|
|
"""Save current camera position as homeposition.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/homeposition"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData></PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def settextonosd(self, enabled: str = "true", x: int = 0, y: int = 0, message: str = "") -> bool:
|
|
"""Set message as video overlay text.
|
|
|
|
Args:
|
|
enabled (str, optional): true or false. Defaults to "true".
|
|
x (int, optional): horizontal text position from 0 to video width. Defaults to 0.
|
|
y (int, optional): vertical text position from 0 to video heith. Defaults to 0.
|
|
message (str, optional): overlay text content. Defaults to "".
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
if message == '-':
|
|
message = ""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/System/Video/inputs/channels/" + str(self._chan)
|
|
+ "/overlays/text"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<TextOverlayList version="1.0" xmlns="http://www.hikvision.com/ver10/XMLSchema">'
|
|
+ '<TextOverlay>'
|
|
+ '<id>1</id>'
|
|
+ '<enabled>' + enabled + '</enabled>'
|
|
+ '<posX>' + str(x) + '</posX>'
|
|
+ '<posY>' + str(y) + '</posY>'
|
|
+ '<displayText>' + message + '</displayText>'
|
|
+ '</TextOverlay>'
|
|
+ '</TextOverlayList>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
logging.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
class Sensor(Connect):
|
|
"""Representing sensor connected to remote host.
|
|
The class inherits the necessary connection methods of the Connect class
|
|
"""
|
|
def __init__(
|
|
self,
|
|
hostname: str, username: str, userpass: str,
|
|
nodetype: str, nodename: str,
|
|
hostport: int = 22
|
|
) -> None:
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
hostname (str): sensor's remote host hostname or ip address.
|
|
username (str): sensor's remote host username.
|
|
userpass (str): sensor's remote host password.
|
|
nodetype (str): 'ds18b20' or other sensor type.
|
|
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
|
|
hostport (int, optional): sensor's remote host connection port. Defaults to 22.
|
|
"""
|
|
self._host = hostname
|
|
self._port = hostport
|
|
self._user = username
|
|
self._pswd = userpass
|
|
self._type = nodetype
|
|
self._node = nodename
|
|
|
|
def __call(self, command: str) -> str:
|
|
"""Send request to sensor's remote host.
|
|
|
|
Args:
|
|
command (str): command to poll the sensor.
|
|
|
|
Returns:
|
|
str: sensor's remote host response content.
|
|
"""
|
|
return self.ssh_commands(
|
|
command=command,
|
|
hostname=self._host, port=self._port,
|
|
username=self._user, password=self._pswd
|
|
)
|
|
|
|
def __temperature(self, nodename: str) -> str:
|
|
"""Preparating request for ds18b20 sensor type.
|
|
|
|
Args:
|
|
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
|
|
|
|
Returns:
|
|
str: formatted string with temperature in Celsius.
|
|
"""
|
|
command = 'cat /sys/bus/w1/devices/' + nodename + '/temperature'
|
|
response = self.__call(command=command)
|
|
if response != 'ERROR':
|
|
try:
|
|
temperature = str(int(response)//1000) + "'C"
|
|
return temperature
|
|
except Exception as error:
|
|
logging.debug(
|
|
msg=''
|
|
+ '\n' + 'host: ' + self._host + ':' + str(self._port)
|
|
+ '\n' + 'user: ' + self._user
|
|
+ '\n' + 'pass: ' + self._pswd
|
|
+ '\n' + 'command: ' + command
|
|
+ '\n' + 'error: ' + str(error)
|
|
)
|
|
return 'ERROR'
|
|
|
|
def value(self) -> str:
|
|
"""Public method to get sensor value.
|
|
|
|
Returns:
|
|
str: sensor value.
|
|
"""
|
|
if self._type == 'ds18b20':
|
|
return self.__temperature(nodename=self._node)
|
|
|
|
|
|
class Sequence:
|
|
"""Sequence handling.
|
|
"""
|
|
@staticmethod
|
|
def run(
|
|
device: HikISAPI, sensors: dict, sequence: dict,
|
|
records_root_path: str = None,
|
|
records_root_user: str = None,
|
|
records_root_pass: str = None
|
|
) -> None:
|
|
"""Sequences executor.
|
|
|
|
Args:
|
|
device (HikISAPI): HikISAPI object.
|
|
sensors (dict): collection as key=sensorname:value=Sensor object.
|
|
sequence (dict): sequence steps collection.
|
|
records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory. Defaults to None.
|
|
records_root_user (str, optional): username if path on remote host. Defaults to None.
|
|
records_root_pass (str, optional): password if path on remote host. Defaults to None.
|
|
"""
|
|
for key, value in sequence.items():
|
|
action = value.split(',')[0].strip()
|
|
x = value.split(',')[1].strip()
|
|
y = value.split(',')[2].strip()
|
|
z = value.split(',')[3].strip()
|
|
p = value.split(',')[4].strip()
|
|
s = value.split(',')[5].strip()
|
|
t = value.split(',')[6].strip()
|
|
w = value.split(',')[7].strip()
|
|
m = value.split(',')[8].strip()
|
|
if 'sensor-config:' in m:
|
|
sensor_name = m.split(':')[1].strip()
|
|
sensor_value = sensors[sensor_name].value()
|
|
if sensor_value != 'ERROR':
|
|
m = sensor_value
|
|
else:
|
|
m = ''
|
|
logging.info(
|
|
msg=' action:' + key + ' = ' + action
|
|
+ ',' + x + ',' + y + ',' + z
|
|
+ ',' + p + ',' + s + ',' + t
|
|
+ ',' + w + ',' + m
|
|
)
|
|
if action == 'capabilities':
|
|
response = device.capabilities()
|
|
elif action == 'getcamerapos':
|
|
response = device.getcamerapos()
|
|
elif action == 'rebootcamera':
|
|
response = device.rebootcamera()
|
|
elif action == 'setptzmovyyu':
|
|
response = device.setptzmovyyu(speed=int(s))
|
|
elif action == 'setptzmovyyd':
|
|
response = device.setptzmovyyd(speed=int(s))
|
|
elif action == 'setptzmovxxl':
|
|
response = device.setptzmovxxl(speed=int(s))
|
|
elif action == 'setptzmovxxr':
|
|
response = device.setptzmovxxr(speed=int(s))
|
|
elif action == 'setptzmovzzi':
|
|
response = device.setptzmovzzi(speed=int(s))
|
|
elif action == 'setptzmovzzo':
|
|
response = device.setptzmovzzo(speed=int(s))
|
|
elif action == 'setptzpreset':
|
|
response = device.setptzpreset(preset=int(p), speed=int(s))
|
|
elif action == 'setptztostop':
|
|
response = device.setptztostop()
|
|
elif action == 'setcamerapos':
|
|
response = device.setcamerapos(x=int(x), y=int(y), z=int(z))
|
|
elif action == 'setcameramov':
|
|
response = device.setcameramov(x=int(x), y=int(y), z=int(z), t=t)
|
|
elif action == 'setmovtohome':
|
|
response = device.setmovtohome()
|
|
elif action == 'setposashome':
|
|
response = device.setposashome()
|
|
elif action == 'settextonosd':
|
|
response = device.settextonosd(x=int(x), y=int(y), message=m)
|
|
elif action == 'downloadjpeg':
|
|
records_root_temp = records_root_path
|
|
if records_root_temp != path.dirname(path.realpath(__file__)):
|
|
records_root_temp = path.dirname(path.realpath(__file__)) + sep + 'temp'
|
|
makedirs(records_root_temp, exist_ok=True)
|
|
dy = datetime.now().strftime('%Y')
|
|
dm = datetime.now().strftime('%m')
|
|
dv = datetime.now().strftime('%V')
|
|
dd = datetime.now().strftime('%d')
|
|
th = datetime.now().strftime('%H')
|
|
tm = datetime.now().strftime('%M')
|
|
ts = datetime.now().strftime('%S')
|
|
records_file_name = (key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg')
|
|
if device.downloadjpeg(x=int(x), y=int(y), dst_file=records_root_temp + sep + records_file_name):
|
|
hostname = 'localhost'
|
|
hostport, hosttype = None, None
|
|
username = records_root_user
|
|
userpass = records_root_pass
|
|
hostpath = records_root_path
|
|
if '://' in records_root_path:
|
|
hostname = records_root_path.split('/')[2]
|
|
hosttype = records_root_path.split('://')[0]
|
|
if hosttype == 'ftp':
|
|
hostport = 21
|
|
if hosttype == 'sftp':
|
|
hostport = 22
|
|
if hosttype == 'smb':
|
|
hostport = 445
|
|
hostpath = records_root_path.replace(hosttype + '://' + hostname, '')
|
|
if '@' in hostname:
|
|
username = hostname.split('@')[0].split(':')[0]
|
|
userpass = hostname.split('@')[0].split(':')[1]
|
|
hostname = hostname.split('@')[1]
|
|
if ':' in hostname:
|
|
hostport = int(hostname.split(':')[1])
|
|
hostname = hostname.split(':')[0]
|
|
if hosttype == 'ftp':
|
|
src_file = records_root_temp + sep + records_file_name
|
|
dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name
|
|
if Connect.ftp_put_file(
|
|
src_file=src_file,
|
|
dst_file=dst_file,
|
|
hostname=hostname,
|
|
username=username,
|
|
password=userpass
|
|
):
|
|
try:
|
|
remove(src_file)
|
|
except OSError:
|
|
pass
|
|
elif hosttype == 'sftp':
|
|
src_file = records_root_temp + sep + records_file_name
|
|
dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name
|
|
response = Connect.ssh_put_file(
|
|
src_file=src_file, dst_file=dst_file,
|
|
hostname=hostname, port=hostport,
|
|
username=username, password=userpass)
|
|
if response != 'ERROR':
|
|
try:
|
|
remove(src_file)
|
|
except OSError:
|
|
pass
|
|
response = True
|
|
else:
|
|
response = False
|
|
else:
|
|
src_file = records_root_temp + sep + records_file_name
|
|
dst_file = hostpath + sep + dy + sep + dm + sep + dv + sep + dd + sep + records_file_name
|
|
try:
|
|
makedirs(hostpath + sep + dy + sep + dm + sep + dv + sep + dd, exist_ok=True)
|
|
replace(src=src_file, dst=dst_file)
|
|
response = True
|
|
except Exception as error:
|
|
logging.debug(
|
|
msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'error: ' + str(error)
|
|
)
|
|
response = False
|
|
else:
|
|
response = False
|
|
if w != '-' or float(w) != 0:
|
|
sleep(float(w))
|
|
if response:
|
|
logging.info(msg=' result:' + key + ' = OK')
|
|
else:
|
|
logging.warning(msg='result:' + key + ' = ERROR')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
time_start = datetime.now()
|
|
|
|
args = ArgumentParser(
|
|
prog='cctv-scheduler',
|
|
description='Hikvision PTZ IP-Camera management.',
|
|
epilog='Dependencies: '
|
|
'- Python 3 (tested version 3.9.5), '
|
|
'- Python 3 modules: paramiko '
|
|
)
|
|
args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf', required=False,
|
|
help='custom configuration file path')
|
|
args.add_argument('-s', '--sequences', action='store_true', required=False,
|
|
help='run sequences from config file')
|
|
args.add_argument('-c', '--converter', action='store_true', required=False,
|
|
help='convert JPEG collection to MP4')
|
|
args.add_argument('-p', '--publisher', action='store_true', required=False,
|
|
help='publish content from templates')
|
|
args = vars(args.parse_args())
|
|
|
|
log_root = path.dirname(path.realpath(__file__))
|
|
log_level = 'INFO'
|
|
if path.exists(args['config']):
|
|
conf = Parse(parameters=args['config'], block='common')
|
|
if 'log_root' in conf.data:
|
|
log_root = conf.data['log_root']
|
|
if 'log_level' in conf.data:
|
|
if conf.data['log_level'] == 'DEBUG':
|
|
log_level = logging.DEBUG
|
|
elif conf.data['log_level'] == 'INFO':
|
|
log_level = logging.INFO
|
|
elif conf.data['log_level'] == 'WARNING':
|
|
log_level = logging.WARNING
|
|
elif conf.data['log_level'] == 'ERROR':
|
|
log_level = logging.ERROR
|
|
elif conf.data['log_level'] == 'CRITICAL':
|
|
log_level = logging.CRITICAL
|
|
logging.basicConfig(
|
|
format='%(asctime)s %(levelname)s: %(message)s',
|
|
datefmt='%Y-%m-%d_%H.%M.%S',
|
|
handlers=[
|
|
logging.FileHandler(
|
|
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
|
|
mode='a'
|
|
),
|
|
logging.StreamHandler()
|
|
],
|
|
level=log_level
|
|
)
|
|
logging.getLogger("paramiko").setLevel(logging.WARNING)
|
|
|
|
if args['sequences']:
|
|
logging.info(msg='Starting PTZ sequences from config file')
|
|
sensors = {}
|
|
conf = Parse(parameters=args['config'], block='enable-sensor')
|
|
for key, value in conf.data.items():
|
|
if value == 'true':
|
|
device_config = Parse(
|
|
parameters=args['config'],
|
|
block='sensor-config:' + key
|
|
).data
|
|
device_entity = Sensor(
|
|
hostname=device_config['hostname'],
|
|
username=device_config['username'],
|
|
userpass=device_config['userpass'],
|
|
nodetype=device_config['nodetype'],
|
|
nodename=device_config['nodename']
|
|
)
|
|
sensors[key] = device_entity
|
|
|
|
conf = Parse(parameters=args['config'], block='enable-sequences')
|
|
for key, value in conf.data.items():
|
|
if value == 'true':
|
|
device_sequence = Parse(
|
|
parameters=args['config'],
|
|
block='camera-sequences:' + key
|
|
).data
|
|
device_config = Parse(
|
|
parameters=args['config'],
|
|
block='camera-config:' + key
|
|
).data
|
|
device_entity = HikISAPI(
|
|
hostname=device_config['hostname'],
|
|
username=device_config['username'],
|
|
userpass=device_config['userpass']
|
|
)
|
|
records_root_path = path.dirname(path.realpath(__file__))
|
|
records_root_user = None
|
|
records_root_pass = None
|
|
if 'records_root_path' in device_config:
|
|
records_root_path = device_config['records_root_path']
|
|
if 'records_root_user' in device_config:
|
|
records_root_user = device_config['records_root_user']
|
|
if 'records_root_pass' in device_config:
|
|
records_root_pass = device_config['records_root_pass']
|
|
Sequence.run(
|
|
device=device_entity,
|
|
sensors=sensors,
|
|
sequence=device_sequence,
|
|
records_root_path=records_root_path,
|
|
records_root_user=records_root_user,
|
|
records_root_pass=records_root_pass
|
|
)
|
|
elif args['converter']:
|
|
logging.info(msg='Starting convert JPEG collection to MP4')
|
|
elif args['publisher']:
|
|
logging.info(msg='Starting publish content from templates')
|
|
else:
|
|
logging.info(msg='Start arguments was not selected. Exit.')
|
|
|
|
time_execute = datetime.now() - time_start
|
|
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')
|