cctv-scheduler/archive/0.2/cctv-scheduler.py

1241 lines
47 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import logging
import urllib.request
from argparse import ArgumentParser
from datetime import datetime
from ftplib import FTP
from os import path, sep, makedirs, remove, replace
from time import sleep
from paramiko import SSHClient, AutoAddPolicy
class Parse:
"""Parser of configs, arguments, parameters.
"""
def __init__(self, parameters, block: str = None) -> None:
"""Object constructor.
Args:
parameters: dictionary as "key":"value" or
ArgumentParser class object or
string path to the file or
string as "var1=val1;var2=val2".
block (str, optional): name of target block from text. Defaults to None.
"""
self.path = ''
self.data = {}
if type(parameters) is dict:
self._dict2dict(parameters)
if type(parameters) is ArgumentParser:
self._dict2dict(self.argv2dict(parameters))
if type(parameters) is str:
if path.exists(parameters):
self._dict2dict(
self.strs2dict(
self.conf2strs(parameters),
block
)
)
self.path = parameters
else:
self._dict2dict(self.strs2dict(parameters, block))
def __str__(self) -> str:
"""Overrides method for print(object).
Returns:
str: string with contents of the object's dictionary.
"""
string = ''
for key, val in self.data.items():
string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n'
return string
def _dict2dict(self, dictionary: dict) -> None:
"""Updates or adds dictionary data.
Args:
dictionary (dict): dictionary as "key":"value".
"""
self.data.update(dictionary)
def expand(self, store: str = None) -> dict:
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
Args:
store (str, optional): path to directory with name.conf. Defaults to None.
Returns:
dict: expanded dictionary as "key":{subkey: subval}.
"""
for key in self.data:
if store:
config = store + sep + self.data[key]
else:
config = self.data[key]
with open(config) as file:
self.data[key] = Parse(file.read()).data
return self.data
@classmethod
def argv2dict(cls, parser: ArgumentParser) -> dict:
"""Converts startup arguments to a dictionary.
Args:
parser (ArgumentParser): argparse.ArgumentParser class object.
Returns:
dict: dictionary as "key":"value".
"""
parser = ArgumentParser(add_help=False, parents=[parser])
return vars(parser.parse_args())
@classmethod
def conf2strs(cls, config: str) -> str:
"""Builds a dictionary from a file containing parameters.
Args:
config (str): path to the config file.
Returns:
str: string as "var1=val1;\nvar2=val2;".
"""
with open(config) as file:
raw = file.read()
strs = ''
for line in raw.splitlines():
if not line.lstrip().startswith('#'):
strs += line + '\n'
return strs
@classmethod
def strs2dict(cls, strings: str, blockname: str) -> dict:
"""Builds a dictionary from a strings containing parameters.
Args:
strings (str): string as "var1=val1;var2=val2;".
blockname (str): name of target block from text.
Returns:
dict: dictionary as "key":"value".
"""
dictionary = {}
if blockname:
strings = cls.block(blockname, strings)
for line in strings.replace('\n', ';').split(';'):
if not line.lstrip().startswith('#') and "=" in line:
dictionary[line.split('=')[0].strip()] = line.split('=')[1].strip().split(';')[0].strip()
return dictionary
@classmethod
def str2bool(cls, value: str) -> bool:
"""Converts a string value to boolean.
Args:
value (str): string containing "true" or "false", "yes" or "no", "1" or "0".
Returns:
bool: bool True or False.
"""
return str(value).lower() in ("true", "yes", "1")
@classmethod
def block(cls, blockname: str, text: str) -> str:
"""Cuts a block of text between line [blockname] and line [next block] or EOF.
Args:
blockname (str): string in [] after which the block starts.
text (str): string of text from which the block is needed.
Returns:
str: string of text between line [block name] and line [next block].
"""
level = 1
save = False
result = ''
for line in text.splitlines():
if line.startswith('[') and blockname in line:
level = line.count('[')
save = True
elif line.startswith('[') and '['*level in line:
save = False
elif save:
result += line + '\n'
return result
class Connect:
"""Set of connection methods (functions) for various protocols.
"""
@staticmethod
def http(
url: str, method: str = 'GET',
username: str = '', password: str = '', authtype: str = None,
contenttype: str = 'text/plain', contentdata: str = ''
) -> str:
"""Handling HTTP request.
Args:
url (str): request url.
method (str, optional): HTTP request method. Defaults to 'GET'.
username (str, optional): username for url authentication. Defaults to ''.
password (str, optional): password for url authentication. Defaults to ''.
authtype (str, optional): digest|basic authentication type. Defaults to None.
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
contentdata (str, optional): content data. Defaults to ''.
Returns:
str: HTTP response or 'ERROR'.
"""
# Preparing authorization
if authtype:
pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm()
pswd.add_password(None, url, username, password)
if authtype == 'basic':
auth = urllib.request.HTTPBasicAuthHandler(pswd)
if authtype == 'digest':
auth = urllib.request.HTTPDigestAuthHandler(pswd)
urllib.request.install_opener(urllib.request.build_opener(auth))
# Preparing request
request = urllib.request.Request(url=url, data=bytes(contentdata.encode('utf-8')), method=method)
request.add_header('Content-Type', contenttype)
# Response
try:
response = urllib.request.urlopen(request).read()
logging.debug(
msg=''
+ '\n' + 'uri: ' + url
+ '\n' + 'method: ' + method
+ '\n' + 'username: ' + username
+ '\n' + 'password: ' + password
+ '\n' + 'authtype: ' + authtype
+ '\n' + 'content-type: ' + contenttype
+ '\n' + 'content-data: ' + contentdata
)
if response.startswith(b'\xff\xd8'):
return response
else:
return str(response.decode('utf-8'))
except Exception as error:
logging.debug(msg='\n' + 'error: ' + str(error))
return 'ERROR'
@staticmethod
def ssh_commands(command: str, hostname: str, username: str, password: str, port: int = 22) -> str:
"""Handling SSH command executing.
Args:
command (str): command for executing.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
Returns:
str: terminal response or 'ERROR'.
"""
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
stdin, stdout, stderr = client.exec_command(command=command, get_pty=True)
if 'sudo' in command:
stdin.write(password + '\n')
stdin.flush()
stdout.flush()
data = stdout.read() + stderr.read()
client.close()
return data.decode('utf-8')
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'command: ' + command
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
@staticmethod
def ssh_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str:
"""Handling SFTP upload file.
Args:
src_file (str): /local/path/to/file.
dst_file (str): /remote/path/to/file.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
Returns:
str: '/remote/path/to/file' or 'ERROR'.
"""
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
client.exec_command('mkdir -p ' + path.dirname(dst_file))
try:
sftp = client.open_sftp()
sftp.put(localpath=src_file, remotepath=dst_file)
sftp.stat(dst_file)
sftp.close()
return dst_file
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
'''
@staticmethod
def ssh_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str:
"""Handling SFTP download file.
Args:
src_file (str): /remote/path/to/file.
dst_file (str): /local/path/to/file.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
Returns:
str: '/local/path/to/file' or 'ERROR'.
"""
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
with client.open_sftp() as sftp:
sftp.get(remotepath=src_file, localpath=dst_file)
client.close()
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
'''
@staticmethod
def ftp_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str) -> bool:
dst_path = dst_file.split('/')[:-1]
ftp = FTP(host=hostname)
try:
ftp.login(user=username, passwd=password)
for path_item in dst_path:
if path_item.strip() == '':
continue
path_item = path_item.replace('/', '')
try:
ftp.cwd(path_item)
except Exception:
ftp.mkd(path_item)
ftp.cwd(path_item)
except Exception:
pass
with open(src_file, "rb") as file:
ftp.storbinary(f"STOR {dst_file}", file)
ftp.quit()
return True
'''
@staticmethod
def ftp_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str):
ftp = FTP(host=hostname)
try:
ftp.login(user=username, passwd=password)
with open(dst_file, "wb") as file:
ftp.retrbinary(f"RETR {src_file}", file.write)
ftp.quit()
except Exception:
pass
'''
'''
@staticmethod
def xmlrpc():
pass
'''
class HikISAPI(Connect):
"""Representing Hikvision device with ISAPI.
The class inherits the necessary connection methods of the Connect class
"""
def __init__(
self,
hostname: str,
username: str, userpass: str,
authtype: str = 'digest',
hostport: int = 80, protocol: str = 'http',
channel: int = 101, videoid: int = 1
) -> None:
"""Object constructor.
Args:
hostname (str): camera hostname or ip address.
username (str): camera admin username.
userpass (str): camera admin password.
authtype (str, optional): digest|basic camera authentication type. Defaults to 'digest'.
hostport (int, optional): camera connection port. Defaults to 80.
protocol (str, optional): camera connection protocol. Defaults to 'http'.
channel (int, optional): camera channel id. Defaults to 101.
videoid (int, optional): camera video id. Defaults to 1.
"""
self._host = hostname
self._port = hostport
self._user = username
self._pswd = userpass
self._auth = authtype
self._prot = protocol
self._chan = channel
self._viid = videoid
def __call(
self,
url: str, method: str = 'GET',
contenttype: str = 'application/x-www-form-urlencoded',
contentdata: str = ''
) -> str:
"""Send request to camera.
Args:
url (str): API path for request.
method (str, optional): HTTP request method. Defaults to 'GET'.
contenttype (str, optional): Content-Type header. Defaults to 'application/x-www-form-urlencoded'.
contentdata (str, optional): data for send with request. Defaults to ''.
Returns:
str: HTTP response content.
"""
return self.http(
url=url, method=method,
username=self._user, password=self._pswd, authtype=self._auth,
contenttype=contenttype, contentdata=contentdata
)
def capabilities(self) -> bool:
"""Get camera capabilities.
Returns:
bool: True if successed. Printing a response with a logger at the INFO level.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._viid) + "/capabilities"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.info(msg='\n' + response + '\n')
return True
else:
return False
def downloadjpeg(self, dst_file: str = path.splitext(__file__)[0] + '.jpeg', x: int = 1920, y: int = 1080) -> bool:
"""Get static picture from camera.
Args:
dst_file (str, optional): absolute path of picture to save. Defaults to scriptname+'.jpeg'.
x (int, optional): picture width. Defaults to 1920.
y (int, optional): picture height. Defaults to 1080.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/Streaming/channels/" + str(self._viid)
+ "/picture?snapShotImageType=JPEG&videoResolutionWidth="
+ str(x) + "&videoResolutionHeight=" + str(y)
)
with open(dst_file, "wb") as file:
response = self.__call(url=url, method='GET')
if response != 'ERROR':
file.write(response)
logging.debug(msg='\n' + dst_file + '\n')
return True
else:
return False
def getcamerapos(self) -> bool:
"""Get current camera position.
Returns:
bool: True if successed. Printing a response with a logger at the INFO level.
"""
url = (self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status")
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.info(msg='\n' + response + '\n')
return True
else:
return False
def rebootcamera(self) -> bool:
"""Set camera reboot command.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/System/reboot"
)
response = self.__call(url=url, method="PUT")
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovyyu(self, speed: int = 1) -> bool:
"""Start camera moving to up.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=TILT_UP&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovyyd(self, speed: int = 1) -> bool:
"""Start camera moving to down.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=TILT_DOWN&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovxxl(self, speed: int = 1) -> bool:
"""Start camera moving to left.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=PAN_LEFT&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovxxr(self, speed: int = 1) -> bool:
"""Start camera moving to right.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=PAN_RIGHT&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovzzi(self, speed: int = 1) -> bool:
"""Start camera zoom in.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=ZOOM_OUT&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovzzo(self, speed: int = 1) -> bool:
"""Start camera zoom out.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=ZOOM_IN&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzpreset(self, preset: int, speed: int = 1) -> bool:
"""Start camera moving to preset.
Args:
preset (int): saved preset number.
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=GOTO_PRESET&presetNo=" + str(preset)
+ "&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptztostop(self) -> bool:
"""Stop any camera moving.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=GOTO_PRESET&mode=stop"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setcamerapos(self, x: int = 0, y: int = 0, z: int = 0) -> bool:
"""Set camera moving to absolute position.
Args:
x (int, optional): horisontal camera position from 0 to 3600. Defaults to 0.
y (int, optional): vertical camera position from -900 to 2700. Defaults to 0.
z (int, optional): zoom camera position from 0 to 1000. Defaults to 0.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/absolute"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData><AbsoluteHigh>'
+ '<elevation>' + str(y) + '</elevation>'
+ '<azimuth>' + str(x) + '</azimuth>'
+ '<absoluteZoom>' + str(z) + '</absoluteZoom>'
+ '</AbsoluteHigh></PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def __setcameramovcon(self, x: int = 0, y: int = 0, z: int = 0) -> bool:
"""Set camera moving to direction until other signal or 180 seconds elapse.
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/continuous"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData>'
+ '<pan>' + str(x) + '</pan>'
+ '<tilt>' + str(y) + '</tilt>'
+ '<zoom>' + str(z) + '</zoom>'
+ '</PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def __setcameramovmom(self, x: int = 0, y: int = 0, z: int = 0, t: int = 180000) -> bool:
"""Set camera moving to direction until other signal or duration elapse.
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 180000.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/momentary"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData>'
+ '<pan>' + str(x) + '</pan>'
+ '<tilt>' + str(y) + '</tilt>'
+ '<zoom>' + str(z) + '</zoom>'
+ '<Momentary><duration>' + str(t) + '</duration></Momentary>'
+ '</PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
sleep(t/1000)
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setcameramov(self, x: int = 0, y: int = 0, z: int = 0, t: int = 0) -> bool:
"""Set camera moving to direction (polymorph abstraction).
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 0.
Returns:
bool: True if successed.
"""
if t == '-' or int(t) == 0:
return self.__setcameramovcon(x=int(x), y=int(y), z=int(z))
else:
return self.__setcameramovmom(x=int(x), y=int(y), z=int(z), t=int(t))
def setmovtohome(self) -> bool:
"""Set camera moving to homeposition.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/homeposition/goto"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData></PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setposashome(self) -> bool:
"""Save current camera position as homeposition.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/homeposition"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData></PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def settextonosd(self, enabled: str = "true", x: int = 0, y: int = 0, message: str = "") -> bool:
"""Set message as video overlay text.
Args:
enabled (str, optional): true or false. Defaults to "true".
x (int, optional): horizontal text position from 0 to video width. Defaults to 0.
y (int, optional): vertical text position from 0 to video heith. Defaults to 0.
message (str, optional): overlay text content. Defaults to "".
Returns:
bool: True if successed.
"""
if message == '-':
message = ""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/System/Video/inputs/channels/" + str(self._chan)
+ "/overlays/text"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<TextOverlayList version="1.0" xmlns="http://www.hikvision.com/ver10/XMLSchema">'
+ '<TextOverlay>'
+ '<id>1</id>'
+ '<enabled>' + enabled + '</enabled>'
+ '<posX>' + str(x) + '</posX>'
+ '<posY>' + str(y) + '</posY>'
+ '<displayText>' + message + '</displayText>'
+ '</TextOverlay>'
+ '</TextOverlayList>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
class Sensor(Connect):
"""Representing sensor connected to remote host.
The class inherits the necessary connection methods of the Connect class
"""
def __init__(
self,
hostname: str, username: str, userpass: str,
nodetype: str, nodename: str,
hostport: int = 22
) -> None:
"""Object constructor.
Args:
hostname (str): sensor's remote host hostname or ip address.
username (str): sensor's remote host username.
userpass (str): sensor's remote host password.
nodetype (str): 'ds18b20' or other sensor type.
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
hostport (int, optional): sensor's remote host connection port. Defaults to 22.
"""
self._host = hostname
self._port = hostport
self._user = username
self._pswd = userpass
self._type = nodetype
self._node = nodename
def __call(self, command: str) -> str:
"""Send request to sensor's remote host.
Args:
command (str): command to poll the sensor.
Returns:
str: sensor's remote host response content.
"""
return self.ssh_commands(
command=command,
hostname=self._host, port=self._port,
username=self._user, password=self._pswd
)
def __temperature(self, nodename: str) -> str:
"""Preparating request for ds18b20 sensor type.
Args:
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
Returns:
str: formatted string with temperature in Celsius.
"""
command = 'cat /sys/bus/w1/devices/' + nodename + '/temperature'
response = self.__call(command=command)
if response != 'ERROR':
try:
temperature = str(int(response)//1000) + "'C"
return temperature
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + self._host + ':' + str(self._port)
+ '\n' + 'user: ' + self._user
+ '\n' + 'pass: ' + self._pswd
+ '\n' + 'command: ' + command
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
def value(self) -> str:
"""Public method to get sensor value.
Returns:
str: sensor value.
"""
if self._type == 'ds18b20':
return self.__temperature(nodename=self._node)
class Sequence:
"""Sequence handling.
"""
@staticmethod
def run(
device: HikISAPI, sensors: dict, sequence: dict,
records_root_path: str = None,
records_root_user: str = None,
records_root_pass: str = None
) -> None:
"""Sequences executor.
Args:
device (HikISAPI): HikISAPI object.
sensors (dict): collection as key=sensorname:value=Sensor object.
sequence (dict): sequence steps collection.
records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory. Defaults to None.
records_root_user (str, optional): username if path on remote host. Defaults to None.
records_root_pass (str, optional): password if path on remote host. Defaults to None.
"""
for key, value in sequence.items():
action = value.split(',')[0].strip()
x = value.split(',')[1].strip()
y = value.split(',')[2].strip()
z = value.split(',')[3].strip()
p = value.split(',')[4].strip()
s = value.split(',')[5].strip()
t = value.split(',')[6].strip()
w = value.split(',')[7].strip()
m = value.split(',')[8].strip()
if 'sensor-config:' in m:
sensor_name = m.split(':')[1].strip()
sensor_value = sensors[sensor_name].value()
if sensor_value != 'ERROR':
m = sensor_value
else:
m = ''
logging.info(
msg=' action:' + key + ' = ' + action
+ ',' + x + ',' + y + ',' + z
+ ',' + p + ',' + s + ',' + t
+ ',' + w + ',' + m
)
if action == 'capabilities':
response = device.capabilities()
elif action == 'getcamerapos':
response = device.getcamerapos()
elif action == 'rebootcamera':
response = device.rebootcamera()
elif action == 'setptzmovyyu':
response = device.setptzmovyyu(speed=int(s))
elif action == 'setptzmovyyd':
response = device.setptzmovyyd(speed=int(s))
elif action == 'setptzmovxxl':
response = device.setptzmovxxl(speed=int(s))
elif action == 'setptzmovxxr':
response = device.setptzmovxxr(speed=int(s))
elif action == 'setptzmovzzi':
response = device.setptzmovzzi(speed=int(s))
elif action == 'setptzmovzzo':
response = device.setptzmovzzo(speed=int(s))
elif action == 'setptzpreset':
response = device.setptzpreset(preset=int(p), speed=int(s))
elif action == 'setptztostop':
response = device.setptztostop()
elif action == 'setcamerapos':
response = device.setcamerapos(x=int(x), y=int(y), z=int(z))
elif action == 'setcameramov':
response = device.setcameramov(x=int(x), y=int(y), z=int(z), t=t)
elif action == 'setmovtohome':
response = device.setmovtohome()
elif action == 'setposashome':
response = device.setposashome()
elif action == 'settextonosd':
response = device.settextonosd(x=int(x), y=int(y), message=m)
elif action == 'downloadjpeg':
records_root_temp = records_root_path
if records_root_temp != path.dirname(path.realpath(__file__)):
records_root_temp = path.dirname(path.realpath(__file__)) + sep + 'temp'
makedirs(records_root_temp, exist_ok=True)
dy = datetime.now().strftime('%Y')
dm = datetime.now().strftime('%m')
dv = datetime.now().strftime('%V')
dd = datetime.now().strftime('%d')
th = datetime.now().strftime('%H')
tm = datetime.now().strftime('%M')
ts = datetime.now().strftime('%S')
records_file_name = (key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg')
if device.downloadjpeg(x=int(x), y=int(y), dst_file=records_root_temp + sep + records_file_name):
hostname = 'localhost'
hostport, hosttype = None, None
username = records_root_user
userpass = records_root_pass
hostpath = records_root_path
if '://' in records_root_path:
hostname = records_root_path.split('/')[2]
hosttype = records_root_path.split('://')[0]
if hosttype == 'ftp':
hostport = 21
if hosttype == 'sftp':
hostport = 22
if hosttype == 'smb':
hostport = 445
hostpath = records_root_path.replace(hosttype + '://' + hostname, '')
if '@' in hostname:
username = hostname.split('@')[0].split(':')[0]
userpass = hostname.split('@')[0].split(':')[1]
hostname = hostname.split('@')[1]
if ':' in hostname:
hostport = int(hostname.split(':')[1])
hostname = hostname.split(':')[0]
if hosttype == 'ftp':
src_file = records_root_temp + sep + records_file_name
dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name
if Connect.ftp_put_file(
src_file=src_file,
dst_file=dst_file,
hostname=hostname,
username=username,
password=userpass
):
try:
remove(src_file)
except OSError:
pass
elif hosttype == 'sftp':
src_file = records_root_temp + sep + records_file_name
dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name
response = Connect.ssh_put_file(
src_file=src_file, dst_file=dst_file,
hostname=hostname, port=hostport,
username=username, password=userpass)
if response != 'ERROR':
try:
remove(src_file)
except OSError:
pass
response = True
else:
response = False
else:
src_file = records_root_temp + sep + records_file_name
dst_file = hostpath + sep + dy + sep + dm + sep + dv + sep + dd + sep + records_file_name
try:
makedirs(hostpath + sep + dy + sep + dm + sep + dv + sep + dd, exist_ok=True)
replace(src=src_file, dst=dst_file)
response = True
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
response = False
else:
response = False
if w != '-' or float(w) != 0:
sleep(float(w))
if response:
logging.info(msg=' result:' + key + ' = OK')
else:
logging.warning(msg='result:' + key + ' = ERROR')
if __name__ == "__main__":
time_start = datetime.now()
args = ArgumentParser(
prog='cctv-scheduler',
description='Hikvision PTZ IP-Camera management.',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- Python 3 modules: paramiko '
)
args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf', required=False,
help='custom configuration file path')
args.add_argument('-s', '--sequences', action='store_true', required=False,
help='run sequences from config file')
args.add_argument('-c', '--converter', action='store_true', required=False,
help='convert JPEG collection to MP4')
args.add_argument('-p', '--publisher', action='store_true', required=False,
help='publish content from templates')
args = vars(args.parse_args())
log_root = path.dirname(path.realpath(__file__))
log_level = 'INFO'
if path.exists(args['config']):
conf = Parse(parameters=args['config'], block='common')
if 'log_root' in conf.data:
log_root = conf.data['log_root']
if 'log_level' in conf.data:
if conf.data['log_level'] == 'DEBUG':
log_level = logging.DEBUG
elif conf.data['log_level'] == 'INFO':
log_level = logging.INFO
elif conf.data['log_level'] == 'WARNING':
log_level = logging.WARNING
elif conf.data['log_level'] == 'ERROR':
log_level = logging.ERROR
elif conf.data['log_level'] == 'CRITICAL':
log_level = logging.CRITICAL
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d_%H.%M.%S',
handlers=[
logging.FileHandler(
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
mode='a'
),
logging.StreamHandler()
],
level=log_level
)
logging.getLogger("paramiko").setLevel(logging.WARNING)
if args['sequences']:
logging.info(msg='Starting PTZ sequences from config file')
sensors = {}
conf = Parse(parameters=args['config'], block='enable-sensor')
for key, value in conf.data.items():
if value == 'true':
device_config = Parse(
parameters=args['config'],
block='sensor-config:' + key
).data
device_entity = Sensor(
hostname=device_config['hostname'],
username=device_config['username'],
userpass=device_config['userpass'],
nodetype=device_config['nodetype'],
nodename=device_config['nodename']
)
sensors[key] = device_entity
conf = Parse(parameters=args['config'], block='enable-sequences')
for key, value in conf.data.items():
if value == 'true':
device_sequence = Parse(
parameters=args['config'],
block='camera-sequences:' + key
).data
device_config = Parse(
parameters=args['config'],
block='camera-config:' + key
).data
device_entity = HikISAPI(
hostname=device_config['hostname'],
username=device_config['username'],
userpass=device_config['userpass']
)
records_root_path = path.dirname(path.realpath(__file__))
records_root_user = None
records_root_pass = None
if 'records_root_path' in device_config:
records_root_path = device_config['records_root_path']
if 'records_root_user' in device_config:
records_root_user = device_config['records_root_user']
if 'records_root_pass' in device_config:
records_root_pass = device_config['records_root_pass']
Sequence.run(
device=device_entity,
sensors=sensors,
sequence=device_sequence,
records_root_path=records_root_path,
records_root_user=records_root_user,
records_root_pass=records_root_pass
)
elif args['converter']:
logging.info(msg='Starting convert JPEG collection to MP4')
elif args['publisher']:
logging.info(msg='Starting publish content from templates')
else:
logging.info(msg='Start arguments was not selected. Exit.')
time_execute = datetime.now() - time_start
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')