cctv-scheduler/cctv-scheduler.py

2758 lines
103 KiB
Python
Raw Normal View History

2023-03-12 14:27:03 +03:00
#!/usr/bin/env python3
# pylint: disable=C0103,C0302,C0114,W0621
2023-03-12 14:27:03 +03:00
2023-06-18 10:58:35 +03:00
import calendar
import base64
2023-06-18 07:55:58 +03:00
import json
2023-03-12 14:27:03 +03:00
import logging
2023-06-18 10:50:44 +03:00
import re
2023-03-12 14:27:03 +03:00
import urllib.request
from argparse import ArgumentParser
from datetime import datetime
from ftplib import FTP
from multiprocessing import Process, Queue
2023-06-18 11:02:12 +03:00
from os import environ, makedirs, path, remove, replace, sep, walk
from subprocess import Popen, PIPE, STDOUT
from sys import platform
2023-03-12 14:27:03 +03:00
from time import sleep
2023-06-18 10:55:19 +03:00
import requests
2023-03-12 14:27:03 +03:00
from paramiko import SSHClient, AutoAddPolicy
class Parse:
"""Parser of configs, arguments, parameters.
"""
# pylint: disable=C0123
2023-03-12 14:27:03 +03:00
def __init__(self, parameters, block: str = None) -> None:
"""Object constructor.
Args:
parameters: dictionary as "key":"value" or
ArgumentParser class object or
string path to the file or
string as "var1=val1;var2=val2".
block (str, optional): name of target block from text. Defaults to None.
"""
self.path = ''
self.data = {}
if type(parameters) is dict:
self._dict2dict(parameters)
if type(parameters) is ArgumentParser:
self._dict2dict(self.argv2dict(parameters))
if type(parameters) is str:
if path.exists(parameters):
self._dict2dict(
self.strs2dict(
self.conf2strs(parameters),
block
)
)
self.path = parameters
else:
self._dict2dict(self.strs2dict(parameters, block))
def __str__(self) -> str:
"""Overrides method for print(object).
Returns:
str: string with contents of the object's dictionary.
"""
string = ''
for key, val in self.data.items():
string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n'
return string
def _dict2dict(self, dictionary: dict) -> None:
"""Updates or adds dictionary data.
Args:
dictionary (dict): dictionary as "key":"value".
"""
self.data.update(dictionary)
# pylint: disable=C0206
2023-03-12 14:27:03 +03:00
def expand(self, store: str = None) -> dict:
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
Args:
store (str, optional): path to directory with name.conf. Defaults to None.
Returns:
dict: expanded dictionary as "key":{subkey: subval}.
"""
for key in self.data:
if store:
config = store + sep + self.data[key]
else:
config = self.data[key]
with open(config, encoding='UTF-8') as file:
2023-03-12 14:27:03 +03:00
self.data[key] = Parse(file.read()).data
return self.data
@classmethod
def argv2dict(cls, parser: ArgumentParser) -> dict:
"""Converts startup arguments to a dictionary.
Args:
parser (ArgumentParser): argparse.ArgumentParser class object.
Returns:
dict: dictionary as "key":"value".
"""
parser = ArgumentParser(add_help=False, parents=[parser])
return vars(parser.parse_args())
@classmethod
def conf2strs(cls, config: str) -> str:
"""Builds a dictionary from a file containing parameters.
Args:
config (str): path to the config file.
Returns:
str: string as "var1=val1;\nvar2=val2;".
"""
with open(config, encoding='UTF-8') as file:
2023-03-12 14:27:03 +03:00
raw = file.read()
strs = ''
for line in raw.splitlines():
if not line.lstrip().startswith('#'):
strs += line + '\n'
return strs
@classmethod
def strs2dict(cls, strings: str, blockname: str) -> dict:
"""Builds a dictionary from a strings containing parameters.
Args:
strings (str): string as "var1=val1;var2=val2;".
blockname (str): name of target block from text.
Returns:
dict: dictionary as "key":"value".
"""
dictionary = {}
if blockname:
strings = cls.block(blockname, strings)
for line in strings.replace('\n', ';').split(';'):
if not line.lstrip().startswith('#') and "=" in line:
dictionary[line.split('=')[0].strip()] = (
line.split('=')[1].strip().split(';')[0].strip()
)
2023-03-12 14:27:03 +03:00
return dictionary
@classmethod
def str2bool(cls, value: str) -> bool:
"""Converts a string value to boolean.
Args:
value (str): string containing "true" or "false", "yes" or "no", "1" or "0".
Returns:
bool: bool True or False.
"""
return str(value).lower() in ("true", "yes", "1")
@classmethod
def block(cls, blockname: str, text: str) -> str:
"""Cuts a block of text between line [blockname] and line [next block] or EOF.
Args:
blockname (str): string in [] after which the block starts.
text (str): string of text from which the block is needed.
Returns:
str: string of text between line [block name] and line [next block].
"""
level = 1
save = False
result = ''
for line in text.splitlines():
if line.startswith('[') and blockname in line:
level = line.count('[')
save = True
elif line.startswith('[') and '['*level in line:
save = False
elif save:
result += line + '\n'
return result
class Connect:
# pylint: disable=W0105
2023-03-12 14:27:03 +03:00
"""Set of connection methods (functions) for various protocols.
"""
@staticmethod
# pylint: disable=W0102, W0718
2023-03-12 14:27:03 +03:00
def http(
url: str,
method: str = 'GET',
username: str = '',
password: str = '',
authtype: (str, type(None)) = None,
contenttype: str = 'text/plain',
contentdata: (str, bytes) = '',
headers: dict = {}
) -> dict:
2023-03-12 14:27:03 +03:00
"""Handling HTTP request.
Args:
url (str): Handling HTTP request.
2023-03-12 14:27:03 +03:00
method (str, optional): HTTP request method. Defaults to 'GET'.
username (str, optional): username for url authentication. Defaults to ''.
password (str, optional): password for url authentication. Defaults to ''.
authtype (str, None, optional): digest|basic authentication type. Defaults to None.
2023-03-12 14:27:03 +03:00
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
contentdata (str, bytes, optional): content data. Defaults to ''.
headers (dict, optional): additional headers. Defaults to {}.
2023-03-12 14:27:03 +03:00
Returns:
dict: {'success':bool,'result':HTTP response or 'ERROR'}.
2023-03-12 14:27:03 +03:00
"""
if Do.args_valid(locals(), Connect.http.__annotations__):
if contentdata != '':
headers['Content-Type'] = contenttype
if isinstance(contentdata, str):
contentdata = bytes(contentdata.encode('utf-8'))
# Preparing authorization
if authtype:
pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm()
pswd.add_password(None, url, username, password)
if authtype == 'basic':
auth = urllib.request.HTTPBasicAuthHandler(pswd)
token = base64.b64encode((username + ':' + password).encode())
headers['Authorization'] = 'Basic ' + token.decode('utf-8')
if authtype == 'digest':
auth = urllib.request.HTTPDigestAuthHandler(pswd)
urllib.request.install_opener(urllib.request.build_opener(auth))
# Preparing request
request = urllib.request.Request(
url=url,
data=contentdata,
method=method
2023-03-12 14:27:03 +03:00
)
for key, val in headers.items():
request.add_header(key, val)
if len(contentdata) > 128:
contentdata = contentdata[:64] + b' ... ' + contentdata[-64:]
logging.debug(msg=''
+ '\n' + 'uri: ' + url
+ '\n' + 'method: ' + method
+ '\n' + 'username: ' + username
+ '\n' + 'password: ' + password
+ '\n' + 'authtype: ' + str(authtype)
+ '\n' + 'headers: ' + json.dumps(headers, indent=2)
+ '\n' + 'content-data: ' + str(contentdata)
)
# Response
try:
response = urllib.request.urlopen(request).read()
if not response.startswith(b'\xff\xd8'):
response = str(response.decode('utf-8'))
return {"success": True, "result": response}
except Exception as error:
logging.debug(msg='\n' + 'error: ' + str(error))
return {"success": False, "result": "ERROR"}
2023-03-12 14:27:03 +03:00
@staticmethod
# pylint: disable=W0718
def ssh_commands(
command: str,
hostname: str,
username: str,
password: str,
port: int = 22
) -> str:
2023-03-12 14:27:03 +03:00
"""Handling SSH command executing.
Args:
command (str): command for executing.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
Returns:
str: terminal response or 'ERROR'.
"""
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
stdin, stdout, stderr = client.exec_command(command=command, get_pty=True)
if 'sudo' in command:
stdin.write(password + '\n')
stdin.flush()
stdout.flush()
data = stdout.read() + stderr.read()
client.close()
return data.decode('utf-8')
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'command: ' + command
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
@staticmethod
# pylint: disable=W0718
def ssh_put_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 22
) -> str:
2023-03-12 14:27:03 +03:00
"""Handling SFTP upload file.
Args:
src_file (str): /local/path/to/file.
dst_file (str): /remote/path/to/file.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
Returns:
str: '/remote/path/to/file' or 'ERROR'.
"""
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
client.exec_command('mkdir -p ' + path.dirname(dst_file))
try:
sftp = client.open_sftp()
sftp.put(localpath=src_file, remotepath=dst_file)
sftp.stat(dst_file)
sftp.close()
return dst_file
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
'''
@staticmethod
def ssh_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str:
"""Handling SFTP download file.
Args:
src_file (str): /remote/path/to/file.
dst_file (str): /local/path/to/file.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
Returns:
str: '/local/path/to/file' or 'ERROR'.
"""
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
with client.open_sftp() as sftp:
sftp.get(remotepath=src_file, localpath=dst_file)
client.close()
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
'''
@staticmethod
# pylint: disable=W0718,C0116
def ftp_put_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str
) -> bool:
2023-03-12 14:27:03 +03:00
dst_path = dst_file.split('/')[:-1]
ftp = FTP(host=hostname)
try:
ftp.login(user=username, passwd=password)
for path_item in dst_path:
if path_item.strip() == '':
continue
path_item = path_item.replace('/', '')
try:
ftp.cwd(path_item)
except Exception:
ftp.mkd(path_item)
ftp.cwd(path_item)
except Exception:
pass
with open(src_file, "rb") as file:
ftp.storbinary(f"STOR {dst_file}", file)
ftp.quit()
return True
'''
@staticmethod
def ftp_get_file(src_file: str, dst_file: str, hostname: str, username: str, password: str):
ftp = FTP(host=hostname)
try:
ftp.login(user=username, passwd=password)
with open(dst_file, "wb") as file:
ftp.retrbinary(f"RETR {src_file}", file.write)
ftp.quit()
except Exception:
pass
'''
'''
@staticmethod
def xmlrpc():
pass
'''
class HikISAPI(Connect):
"""Representing Hikvision device with ISAPI.
The class inherits the necessary connection methods of the Connect class
"""
def __init__(
self,
hostname: str,
username: str, userpass: str,
authtype: str = 'digest',
hostport: int = 80, protocol: str = 'http',
channel: int = 101, videoid: int = 1
) -> None:
2023-03-12 14:27:03 +03:00
"""Object constructor.
Args:
hostname (str): camera hostname or ip address.
username (str): camera admin username.
userpass (str): camera admin password.
authtype (str, optional): digest|basic camera authentication type. Defaults to 'digest'.
hostport (int, optional): camera connection port. Defaults to 80.
protocol (str, optional): camera connection protocol. Defaults to 'http'.
channel (int, optional): camera channel id. Defaults to 101.
videoid (int, optional): camera video id. Defaults to 1.
"""
self._host = hostname
self._port = hostport
self._user = username
self._pswd = userpass
self._auth = authtype
self._prot = protocol
self._chan = channel
self._viid = videoid
def __call(
self,
url: str, method: str = 'GET',
contenttype: str = 'application/x-www-form-urlencoded',
contentdata: str = ''
) -> str:
2023-03-12 14:27:03 +03:00
"""Send request to camera.
Args:
url (str): API path for request.
method (str, optional): HTTP request method. Defaults to 'GET'.
contenttype (str, optional): Content-Type header.
Defaults to 'application/x-www-form-urlencoded'.
2023-03-12 14:27:03 +03:00
contentdata (str, optional): data for send with request. Defaults to ''.
Returns:
str: HTTP response content or 'ERROR'.
2023-03-12 14:27:03 +03:00
"""
response = self.http(
2023-03-12 14:27:03 +03:00
url=url, method=method,
username=self._user, password=self._pswd, authtype=self._auth,
contenttype=contenttype, contentdata=contentdata
)
if response['success']:
return response['result']
else:
return 'ERROR'
2023-03-12 14:27:03 +03:00
def capabilities(self) -> bool:
"""Get camera capabilities.
Returns:
bool: True if successed. Printing a response with a logger at the INFO level.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._viid) + "/capabilities"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.info(msg='\n' + response + '\n')
return True
else:
return False
def downloadjpeg(
self,
dst_file: str = path.splitext(__file__)[0] + '.jpeg',
x: int = 1920,
y: int = 1080
) -> bool:
2023-03-12 14:27:03 +03:00
"""Get static picture from camera.
Args:
dst_file (str, optional): abs picture's path to save. Defaults to scriptname+'.jpeg'.
2023-03-12 14:27:03 +03:00
x (int, optional): picture width. Defaults to 1920.
y (int, optional): picture height. Defaults to 1080.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/Streaming/channels/" + str(self._viid)
+ "/picture?snapShotImageType=JPEG&videoResolutionWidth="
+ str(x) + "&videoResolutionHeight=" + str(y)
)
2023-03-12 14:27:03 +03:00
with open(dst_file, "wb") as file:
response = self.__call(url=url, method='GET')
if response != 'ERROR':
file.write(response)
logging.debug(msg='\n' + dst_file + '\n')
return True
else:
return False
def getcamerapos(self) -> bool:
"""Get current camera position.
Returns:
bool: True if successed. Printing a response with a logger at the INFO level.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.info(msg='\n' + response + '\n')
return True
else:
return False
def rebootcamera(self) -> bool:
"""Set camera reboot command.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/System/reboot"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method="PUT")
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovyyu(self, speed: int = 1) -> bool:
"""Start camera moving to up.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=TILT_UP&speed=" + str(speed)
+ "&mode=start"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovyyd(self, speed: int = 1) -> bool:
"""Start camera moving to down.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=TILT_DOWN&speed=" + str(speed)
+ "&mode=start"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovxxl(self, speed: int = 1) -> bool:
"""Start camera moving to left.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=PAN_LEFT&speed=" + str(speed)
+ "&mode=start"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovxxr(self, speed: int = 1) -> bool:
"""Start camera moving to right.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=PAN_RIGHT&speed=" + str(speed)
+ "&mode=start"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovzzi(self, speed: int = 1) -> bool:
"""Start camera zoom in.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=ZOOM_OUT&speed=" + str(speed)
+ "&mode=start"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovzzo(self, speed: int = 1) -> bool:
"""Start camera zoom out.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=ZOOM_IN&speed=" + str(speed)
+ "&mode=start"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzpreset(self, preset: int, speed: int = 1) -> bool:
"""Start camera moving to preset.
Args:
preset (int): saved preset number.
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=GOTO_PRESET&presetNo=" + str(preset)
+ "&speed=" + str(speed)
+ "&mode=start"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptztostop(self) -> bool:
"""Stop any camera moving.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=GOTO_PRESET&mode=stop"
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method='GET')
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setcamerapos(self, x: int = 0, y: int = 0, z: int = 0) -> bool:
"""Set camera moving to absolute position.
Args:
x (int, optional): horisontal camera position from 0 to 3600. Defaults to 0.
y (int, optional): vertical camera position from -900 to 2700. Defaults to 0.
z (int, optional): zoom camera position from 0 to 1000. Defaults to 0.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/absolute"
)
2023-03-12 14:27:03 +03:00
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData><AbsoluteHigh>'
+ '<elevation>' + str(y) + '</elevation>'
+ '<azimuth>' + str(x) + '</azimuth>'
+ '<absoluteZoom>' + str(z) + '</absoluteZoom>'
+ '</AbsoluteHigh></PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def __setcameramovcon(self, x: int = 0, y: int = 0, z: int = 0) -> bool:
"""Set camera moving to direction until other signal or 180 seconds elapse.
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100.
Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
2023-03-12 14:27:03 +03:00
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/continuous"
)
2023-03-12 14:27:03 +03:00
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData>'
+ '<pan>' + str(x) + '</pan>'
+ '<tilt>' + str(y) + '</tilt>'
+ '<zoom>' + str(z) + '</zoom>'
+ '</PTZData>'
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def __setcameramovmom(self, x: int = 0, y: int = 0, z: int = 0, t: int = 180000) -> bool:
"""Set camera moving to direction until other signal or duration elapse.
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100.
Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000.
Defaults to 180000.
2023-03-12 14:27:03 +03:00
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/momentary"
)
2023-03-12 14:27:03 +03:00
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData>'
+ '<pan>' + str(x) + '</pan>'
+ '<tilt>' + str(y) + '</tilt>'
+ '<zoom>' + str(z) + '</zoom>'
+ '<Momentary><duration>' + str(t) + '</duration></Momentary>'
+ '</PTZData>'
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
sleep(t/1000)
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setcameramov(self, x: int = 0, y: int = 0, z: int = 0, t: int = 0) -> bool:
"""Set camera moving to direction (polymorph abstraction).
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100.
Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000.
Defaults to 0.
2023-03-12 14:27:03 +03:00
Returns:
bool: True if successed.
"""
if t == '-' or int(t) == 0:
return self.__setcameramovcon(x=int(x), y=int(y), z=int(z))
else:
return self.__setcameramovmom(x=int(x), y=int(y), z=int(z), t=int(t))
def setmovtohome(self) -> bool:
"""Set camera moving to homeposition.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/homeposition/goto"
)
2023-03-12 14:27:03 +03:00
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData></PTZData>'
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def setposashome(self) -> bool:
"""Save current camera position as homeposition.
Returns:
bool: True if successed.
"""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/homeposition"
)
2023-03-12 14:27:03 +03:00
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData></PTZData>'
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
def settextonosd(
self,
enabled: str = "true",
x: int = 0,
y: int = 0,
message: str = ""
) -> bool:
2023-03-12 14:27:03 +03:00
"""Set message as video overlay text.
Args:
enabled (str, optional): true or false. Defaults to "true".
x (int, optional): horizontal text position from 0 to video width. Defaults to 0.
y (int, optional): vertical text position from 0 to video heith. Defaults to 0.
message (str, optional): overlay text content. Defaults to "".
Returns:
bool: True if successed.
"""
if message == '-':
message = ""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/System/Video/inputs/channels/" + str(self._chan)
+ "/overlays/text"
)
2023-03-12 14:27:03 +03:00
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<TextOverlayList version="1.0" xmlns="http://www.hikvision.com/ver10/XMLSchema">'
+ '<TextOverlay>'
+ '<id>1</id>'
+ '<enabled>' + enabled + '</enabled>'
+ '<posX>' + str(x) + '</posX>'
+ '<posY>' + str(y) + '</posY>'
+ '<displayText>' + message + '</displayText>'
+ '</TextOverlay>'
+ '</TextOverlayList>'
)
2023-03-12 14:27:03 +03:00
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
logging.debug(msg='\n' + response + '\n')
return True
else:
return False
class Sensor(Connect):
"""Representing sensor connected to remote host.
The class inherits the necessary connection methods of the Connect class
"""
def __init__(
self,
hostname: str, username: str, userpass: str,
nodetype: str, nodename: str,
hostport: int = 22
) -> None:
2023-03-12 14:27:03 +03:00
"""Object constructor.
Args:
hostname (str): sensor's remote host hostname or ip address.
username (str): sensor's remote host username.
userpass (str): sensor's remote host password.
nodetype (str): 'ds18b20' or other sensor type.
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
hostport (int, optional): sensor's remote host connection port. Defaults to 22.
"""
self._host = hostname
self._port = hostport
self._user = username
self._pswd = userpass
self._type = nodetype
self._node = nodename
def __call(self, command: str) -> str:
"""Send request to sensor's remote host.
Args:
command (str): command to poll the sensor.
Returns:
str: sensor's remote host response content.
"""
return self.ssh_commands(
command=command,
hostname=self._host, port=self._port,
username=self._user, password=self._pswd
)
# pylint: disable=W0718
2023-03-12 14:27:03 +03:00
def __temperature(self, nodename: str) -> str:
"""Preparating request for ds18b20 sensor type.
Args:
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
Returns:
str: formatted string with temperature in Celsius.
"""
command = 'cat /sys/bus/w1/devices/' + nodename + '/temperature'
response = self.__call(command=command)
if response != 'ERROR':
try:
temperature = str(int(response)//1000) + "'C"
return temperature
except Exception as error:
logging.debug(
msg=''
+ '\n' + 'host: ' + self._host + ':' + str(self._port)
+ '\n' + 'user: ' + self._user
+ '\n' + 'pass: ' + self._pswd
+ '\n' + 'command: ' + command
+ '\n' + 'error: ' + str(error)
)
return 'ERROR'
def value(self) -> str:
"""Public method to get sensor value.
Returns:
str: sensor value.
"""
if self._type == 'ds18b20':
return self.__temperature(nodename=self._node)
2023-06-18 10:50:44 +03:00
class Wordpress(Connect):
"""Set of methods (functions) for Wordpress API.
Reference: https://developer.wordpress.org/rest-api/reference/
Args:
Connect (_type_): class with 'http' method.
"""
def __init__(
self,
hostname: str,
username: str,
password: str
):
"""Object constructor.
Args:
hostname (str, optional): www.wordpress.site.
username (str, optional): wordpress username.
password (str, optional): wordpress passwrod.
"""
if Do.args_valid(locals(), self.__init__.__annotations__):
self._host = hostname
self._user = username
self._pass = password
self.api_event = 'https://' + self._host + '/wp-json/tribe/events/v1/events'
self.api_media = 'https://' + self._host + '/wp-json/wp/v2/media'
self.api_pages = 'https://' + self._host + '/wp-json/wp/v2/pages'
self.url_files = 'https://' + self._host + '/wp-content/uploads'
def event_create(
self,
title: str,
slug: str,
date_start: str,
date_end: str,
date_publish: str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
all_day: bool = True,
description: str = None
) -> dict:
"""Create event by 'wp-json' and 'The Events Calendar'.
Args:
title (str, optional): event title.
slug (str, optional): event slug.
date_start (str, optional): '2022-12-31T23:59:59' format.
date_end (str, optional): '2022-12-31T23:59:59' format.
date_publish (_type_, optional): '2022-12-31T23:59:59' format. Defaults to now.
all_day (bool, optional): all day event duration flag. Defaults to True.
description (str, optional): event body. Defaults to None.
Raises:
ValueError: date formate is not 2022-12-31T23:59:59.
ValueError: description can't be empty.
Returns:
dict: {'success':bool,'result':'http/url/to/event'}.
"""
if Do.args_valid(locals(), self.event_create.__annotations__):
pattern = "^([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})+$"
if (
not re.fullmatch(pattern, date_start) or
not re.fullmatch(pattern, date_end) or
not re.fullmatch(pattern, date_publish)
):
raise ValueError("date formate is not 2022-12-31T23:59:59")
if description == '':
raise ValueError("description can't be empty")
event_json = {
"title": title,
"slug": slug,
"date": date_publish,
"start_date": date_start,
"end_date": date_end,
"all_day": str(all_day),
"description": description
}
response = self.http(
url=self.api_event,
method='POST',
username=self._user,
password=self._pass,
authtype='basic',
contenttype='application/json; charset=UTF-8',
contentdata=json.dumps(event_json, indent=2)
)
logging.debug(
msg=""
+ "\n" + "event API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for key, val in json.loads(response['result']).items():
if key == "url":
logging.info('event created: %s', val)
return {"success": True, "result": val}
else:
logging.warning("event didn't create")
return {"success": False, "result": "ERROR"}
def media_search(
self,
media_name: (str, type(None)) = None,
media_type: (str, type(None)) = None
) -> dict:
"""Search uploaded media by 'wp-json'.
Args:
media_name (str, type, optional): results matching a string. Defaults to None.
media_type (str, type, optional): application,image,video,audio,text. Defaults to None.
Returns:
dict: {'success':bool,'result':['list/of/link/to/media']}
"""
if Do.args_valid(locals(), self.media_search.__annotations__):
url = self.api_media + '?per_page=100'
if media_name:
url = url + '&search=' + media_name
if (media_type == 'application' or
media_type == 'image' or
media_type == 'video' or
media_type == 'audio' or
media_type == 'text'
):
url = url + '&media_type=' + media_type
media_list = []
response = self.http(url=url, method='GET')
logging.debug(
msg=""
+ "\n" + "media API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for media in json.loads(response['result']):
media_list.append(media['guid']['rendered'])
return {"success": True, "result": media_list}
else:
logging.warning("media didn't list")
return {"success": False, "result": "ERROR"}
def media_upload(
self,
mediafile: str,
mediatype: str,
aliasfile: str = ''
) -> dict:
"""Upload media by 'wp-json'.
Args:
mediafile (str, optional): path to file.
mediatype (str, optional): 'image/jpeg', 'video/mp4', etc.
aliasfile (str, optional): uploaded media name. Defaults to original file.
Raises:
FileExistsError: mediafile is not exist.
Returns:
dict: {'success':bool,'result':'http/url/to/media'}.
"""
if Do.args_valid(locals(), self.media_upload.__annotations__):
if not path.exists(mediafile):
raise FileExistsError(mediafile + " is not exist")
else:
with open(mediafile, mode='rb') as file:
mediadata = file.read()
if aliasfile == '':
aliasfile = path.basename(mediafile)
response = self.http(
url=self.api_media,
method='POST',
username=self._user,
password=self._pass,
authtype='basic',
contenttype=mediatype,
contentdata=mediadata,
headers={
"Accept": "application/json",
'Content-Disposition': 'attachment; filename=' + aliasfile,
'Cache-Control': 'no-cache'
}
)
logging.debug(
msg=""
+ "\n" + "media API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for key, val in json.loads(response['result']).items():
if key == "source_url":
logging.info('media uploaded: %s', val)
return {"success": True, "result": val}
else:
logging.warning("media didn't upload")
return {"success": False, "result": "ERROR"}
def pages_read(
self,
page_id: int
) -> dict:
"""Read page by 'wp-json'.
Args:
page_id (int): unique identifier for the page.
Returns:
dict: {'success':bool,'result':'page data'}
"""
if Do.args_valid(locals(), self.pages_read.__annotations__):
page_link = self.api_pages + '/' + str(page_id)
response = self.http(url=page_link)
if response['success']:
logging.debug(
msg=""
+ "\n" + "wp page API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
return {"success": True, "result": response['result']}
else:
logging.warning("wp page didn't read")
return {"success": False, "result": "ERROR"}
def pages_update(
self,
page_id: int,
content: str
) -> dict:
"""Update page by 'wp-json'.
Args:
page_id (int): unique identifier for the page.
content (str): the content for the page.
Returns:
dict: {'success':bool,'result':'http/url/to/page'}
"""
if Do.args_valid(locals(), self.pages_update.__annotations__):
page_link = self.api_pages + '/' + str(page_id)
page_json = {
"content": content
}
response = self.http(
url=page_link,
method='POST',
username=self._user,
password=self._pass,
authtype='basic',
contenttype='application/json; charset=UTF-8',
contentdata=json.dumps(page_json)
)
logging.debug(
msg=""
+ "\n" + "wp page API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for key, val in json.loads(response['result']).items():
if key == "link":
logging.info(msg="wp page " + str(page_id) + " updated: " + val)
return {"success": True, "result": val}
else:
logging.warning("wp page didn't update")
return {"success": False, "result": "ERROR"}
2023-06-18 10:55:19 +03:00
class Telegram():
"""Set of methods (functions) for Telegram Bot API.
Reference: https://core.telegram.org/bots/api#available-methods
"""
def __init__(self, token: str):
"""Object constructor.
Args:
token (str): Telegram Bot API access token.
"""
if Do.args_valid(locals(), self.__init__.__annotations__):
self._token = token
self.api_root = 'https://api.telegram.org'
self.api_path = self.api_root + '/bot' + self._token
def send_message(self, chat: str, text: str, parse_mode: str = 'HTML') -> dict:
"""Send text message.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
text (str): text of the message to be sent, 1-4096 characters after entities parsing.
parse_mode (str, optional): 'HTML', 'Markdown', 'MarkdownV2'. Defaults to 'HTML'.
Returns:
dict: {"success":bool,"result":"API response" or "ERROR"}
"""
if Do.args_valid(locals(), self.send_message.__annotations__):
url=self.api_path + '/sendMessage'
data = {
"chat_id": chat,
"text": text,
"parse_mode": parse_mode,
"disable_notification": True
}
response = requests.post(url=url, json=data, timeout=15)
if response.status_code == 200:
response = response.json()
logging.info(msg=""
+ "message '"
+ str(response['result']['message_id'])
+ "' sent to telegram chat "
+ str(chat)
)
return {'success': True, 'result': response}
else:
logging.warning(msg="message didn't send to telegram chat " + str(chat))
return {'success': False, 'result': response}
def delete_message(self, chat: str, message_id: int) -> dict:
"""Delete message.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
message_id (int): identifier of the message to delete.
Returns:
dict: {"success":bool,"result":"API response" or "ERROR"}
"""
if Do.args_valid(locals(), self.delete_message.__annotations__):
url=self.api_path + '/deleteMessage'
data = {"chat_id": chat, "message_id": message_id}
response = requests.post(url=url, json=data, timeout=15)
if response.status_code == 200:
response = response.json()
logging.info(msg=""
+ "message '" + str(message_id) + "' deleted from telegram chat "
+ str(chat)
)
return {'success': True, 'result': response}
else:
logging.warning(msg=""
+ "message '" + str(message_id) + "' didn't deleted from telegram chat "
+ str(chat)
)
return {'success': False, 'result': response}
def __send_media(
self,
chat: str,
media_path: str,
media_type: str,
caption: (str, type(None)),
parse_mode: str,
disable_notification: bool,
additional_url_param: (str, type(None))
) -> dict:
"""Send media by api.telegram.org.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
media_path (str): /local/path/to/file, https://url/to/file, file_id=EXISTFILEID.
media_type (str): 'document', 'photo', 'video', 'audio'.
caption (str, None): media caption less 1024 characters.
parse_mode (str): caption 'HTML', 'Markdown', 'MarkdownV2' parse mode.
disable_notification (bool): send silently.
additional_url_param (str, None): example: '&duration=30&width=960&height=540'.
Raises:
ValueError: "'media_type' value is wrong"
Returns:
dict: {'success':bool,'result':response}.
"""
if Do.args_valid(locals(), self.__send_media.__annotations__):
if (
media_type == 'document' or
media_type == 'photo' or
media_type == 'video' or
media_type == 'audio'
):
url = self.api_path + '/send' + media_type + '?chat_id=' + chat
else:
raise ValueError("'media_type' value is wrong: " + media_type)
if caption:
url = url + '&caption=' + caption + '&parse_mode=' + parse_mode
if disable_notification:
url = url + "&disable_notification=True"
if additional_url_param:
url = url + additional_url_param
if re.match("^(?:http://|https://|file_id=)", media_path):
media_path = media_path.replace('file_id=', '')
response = requests.post(
url=url + "&" + media_type + "=" + media_path,
timeout=60
)
if response.status_code == 200:
response = response.json()
if media_type == 'photo':
file_id = response['result'][media_type][-1]['file_id']
else:
file_id = response['result'][media_type]['file_id']
logging.info(msg=""
+ media_type
+ " '"
+ str(file_id)
+ "' sent to telegram chat "
+ chat
)
return {'success': True, 'result': response}
else:
response = requests.post(
url=url,
files={media_type: open(media_path, "rb")},
timeout=60
)
if response.status_code == 200:
response = response.json()
if media_type == 'photo':
file_id = response['result'][media_type][-1]['file_id']
else:
file_id = response['result'][media_type]['file_id']
logging.info(msg=""
+ media_type
+ " '"
+ str(file_id)
+ "' sent to telegram chat "
+ chat
)
return {'success': True, 'result': response}
logging.warning(
msg=media_type + " " + media_path + " didn't send to telegram chat " + str(chat)
)
return {'success': False, 'result': response}
def send_document(
self,
chat: str,
document: str,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send document. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_document.__annotations__):
return self.__send_media(
chat=chat,
media_path=document,
media_type='document',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=None
)
def send_photo(
self,
chat: str,
photo: str,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send photo. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_photo.__annotations__):
return self.__send_media(
chat=chat,
media_path=photo,
media_type='photo',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=None
)
def send_video(
self,
chat: str,
video: str,
width: (int, type(None)) = None,
height: (int, type(None)) = None,
duration: (int, type(None)) = None,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send video. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_video.__annotations__):
if width or height or duration:
additional_url_param = ''
if width:
additional_url_param += '&width=' + str(width)
if height:
additional_url_param += '&height=' + str(height)
if duration:
additional_url_param += '&duration=' + str(duration)
else:
additional_url_param = None
return self.__send_media(
chat=chat,
media_path=video,
media_type='video',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=additional_url_param
)
def send_audio(
self,
chat: str,
audio: str,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send audio. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_audio.__annotations__):
return self.__send_media(
chat=chat,
media_path=audio,
media_type='audio',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=None
)
def send_mediagroup(
self,
chat: str,
media: dict,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send media group of photo, video, audio, documents.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
media (dict): {
name:{'type':'photo',path:'https://url/to/file',caption:text},
name:{'type':'video',path:'/local/path/to/file',caption:text},
name:{'type':'audio',path:'file_id=EXISTFILEID',caption:text},
}.
caption (str, type, optional): media caption less 1024 characters. Defaults to None.
parse_mode (str): caption 'HTML', 'Markdown', 'MarkdownV2' parse mode.
disable_notification (bool, optional): send silently. Defaults to True.
Returns:
dict: {'success':bool,'result':response}.
"""
if Do.args_valid(locals(), self.send_mediagroup.__annotations__):
url=self.api_path + '/sendMediaGroup'
files = {}
group = []
for media_name in media.keys():
if re.match("^(?:http://|https://|file_id=)", media[media_name]['path']):
files[media_name] = None
media_source = media[media_name]['path'].replace('file_id=', '')
else:
with open(media[media_name]['path'], mode='rb') as file:
files[media_name] = file.read()
media_source = "attach://" + media_name
if not caption and media[media_name]['caption']:
media_caption = media[media_name]['caption']
else:
media_caption = ''
group.append({
"type": media[media_name]['type'],
"media": media_source,
"caption": media_caption
}
)
if caption:
group[0]['caption'] = caption
group[0]['parse_mode'] = parse_mode
data = {
'chat_id': chat,
'media': json.dumps(group),
"disable_notification": disable_notification
}
response = requests.post(url=url, data=data, files=files, timeout=300)
if response.status_code == 200:
response = response.json()
logging.info(msg=""
+ "mediagroup '"
+ str(response['result'][0]['media_group_id'])
+ "' sent to telegram chat "
+ str(chat)
)
return {'success': True, 'result': response}
logging.warning(msg="mediagroup didn't send to telegram chat " + str(chat))
return {'success': False, 'result': response}
2023-03-12 14:27:03 +03:00
class Sequence:
"""Sequence handling.
"""
@staticmethod
# pylint: disable=W0718
2023-03-12 14:27:03 +03:00
def run(
device: HikISAPI, sensors: dict, sequence: dict,
records_root_path: str = None,
records_root_user: str = None,
records_root_pass: str = None
) -> None:
2023-03-12 14:27:03 +03:00
"""Sequences executor.
Args:
device (HikISAPI): HikISAPI object.
sensors (dict): collection as key=sensorname:value=Sensor object.
sequence (dict): sequence steps collection.
records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory.
Defaults to None.
records_root_user (str, optional): username if path on remote host.
Defaults to None.
records_root_pass (str, optional): password if path on remote host.
Defaults to None.
2023-03-12 14:27:03 +03:00
"""
for key, value in sequence.items():
action = value.split(',')[0].strip()
x = value.split(',')[1].strip()
y = value.split(',')[2].strip()
z = value.split(',')[3].strip()
p = value.split(',')[4].strip()
s = value.split(',')[5].strip()
t = value.split(',')[6].strip()
w = value.split(',')[7].strip()
m = value.split(',')[8].strip()
if 'sensor-config:' in m:
sensor_name = m.split(':')[1].strip()
sensor_value = sensors[sensor_name].value()
if sensor_value != 'ERROR':
m = sensor_value
else:
m = ''
logging.info(msg=''
+ ' action:' + key + ' = ' + action
2023-03-12 14:27:03 +03:00
+ ',' + x + ',' + y + ',' + z
+ ',' + p + ',' + s + ',' + t
+ ',' + w + ',' + m
)
2023-03-12 14:27:03 +03:00
if action == 'capabilities':
response = device.capabilities()
elif action == 'getcamerapos':
response = device.getcamerapos()
elif action == 'rebootcamera':
response = device.rebootcamera()
elif action == 'setptzmovyyu':
response = device.setptzmovyyu(speed=int(s))
elif action == 'setptzmovyyd':
response = device.setptzmovyyd(speed=int(s))
elif action == 'setptzmovxxl':
response = device.setptzmovxxl(speed=int(s))
elif action == 'setptzmovxxr':
response = device.setptzmovxxr(speed=int(s))
elif action == 'setptzmovzzi':
response = device.setptzmovzzi(speed=int(s))
elif action == 'setptzmovzzo':
response = device.setptzmovzzo(speed=int(s))
elif action == 'setptzpreset':
response = device.setptzpreset(preset=int(p), speed=int(s))
elif action == 'setptztostop':
response = device.setptztostop()
elif action == 'setcamerapos':
response = device.setcamerapos(x=int(x), y=int(y), z=int(z))
elif action == 'setcameramov':
response = device.setcameramov(x=int(x), y=int(y), z=int(z), t=t)
elif action == 'setmovtohome':
response = device.setmovtohome()
elif action == 'setposashome':
response = device.setposashome()
elif action == 'settextonosd':
response = device.settextonosd(x=int(x), y=int(y), message=m)
elif action == 'downloadjpeg':
records_root_temp = records_root_path
if records_root_temp != path.dirname(path.realpath(__file__)):
records_root_temp = path.dirname(path.realpath(__file__)) + sep + 'temp'
makedirs(records_root_temp, exist_ok=True)
dy = datetime.now().strftime('%Y')
dm = datetime.now().strftime('%m')
dv = datetime.now().strftime('%V')
dd = datetime.now().strftime('%d')
th = datetime.now().strftime('%H')
tm = datetime.now().strftime('%M')
ts = datetime.now().strftime('%S')
records_file_name = (
key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg'
)
if device.downloadjpeg(
x=int(x),
y=int(y),
dst_file=records_root_temp + sep + records_file_name
):
2023-03-12 14:27:03 +03:00
hostname = 'localhost'
hostport, hosttype = None, None
username = records_root_user
userpass = records_root_pass
hostpath = records_root_path
if '://' in records_root_path:
hostname = records_root_path.split('/')[2]
hosttype = records_root_path.split('://')[0]
if hosttype == 'ftp':
hostport = 21
if hosttype == 'sftp':
hostport = 22
if hosttype == 'smb':
hostport = 445
hostpath = records_root_path.replace(hosttype + '://' + hostname, '')
if '@' in hostname:
username = hostname.split('@')[0].split(':')[0]
userpass = hostname.split('@')[0].split(':')[1]
hostname = hostname.split('@')[1]
if ':' in hostname:
hostport = int(hostname.split(':')[1])
hostname = hostname.split(':')[0]
if hosttype == 'ftp':
src_file = records_root_temp + sep + records_file_name
dst_file = (
hostpath
+ '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/'
+ records_file_name
)
2023-03-12 14:27:03 +03:00
if Connect.ftp_put_file(
src_file=src_file,
dst_file=dst_file,
hostname=hostname,
username=username,
password=userpass
):
try:
remove(src_file)
except OSError:
pass
elif hosttype == 'sftp':
src_file = records_root_temp + sep + records_file_name
dst_file = (
hostpath
+ '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/'
+ records_file_name
)
2023-03-12 14:27:03 +03:00
response = Connect.ssh_put_file(
src_file=src_file, dst_file=dst_file,
hostname=hostname, port=hostport,
username=username, password=userpass)
if response != 'ERROR':
try:
remove(src_file)
except OSError:
pass
response = True
else:
response = False
else:
src_file = records_root_temp + sep + records_file_name
dst_file = (
hostpath
+ sep + dy + sep + dm + sep + dv + sep + dd + sep
+ records_file_name)
2023-03-12 14:27:03 +03:00
try:
makedirs(
hostpath + sep + dy + sep + dm + sep + dv + sep + dd,
exist_ok=True
)
2023-03-12 14:27:03 +03:00
replace(src=src_file, dst=dst_file)
response = True
except Exception as error:
logging.debug(msg=''
2023-03-12 14:27:03 +03:00
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error)
)
2023-03-12 14:27:03 +03:00
response = False
else:
response = False
if w != '-' or float(w) != 0:
sleep(float(w))
if response:
logging.info(msg=' result:' + key + ' = OK')
else:
logging.warning(msg='result:' + key + ' = ERROR')
class Proc:
"""Find a running process from Python.
"""
@classmethod
# pylint: disable=W0612
def _list_windows(cls) -> list:
"""Find all running process with wmi.
Returns:
list: dictionaries with descriptions of found processes.
"""
execlist = []
separate = b'\r\r\n'
out, err = Popen(
[
'wmic', 'process', 'get',
'CommandLine,ExecutablePath,Name,ProcessId',
'/format:list'
],
stdout=PIPE,
stderr=PIPE
).communicate()
for line in out.split(separate + separate):
execpid, exename, exepath, cmdline = None, None, None, None
for subline in line.split(separate):
if b'ProcessId=' in subline:
execpid = subline.split(b'=')[1].decode('utf-8')
if b'Name=' in subline:
exename = subline.split(b'=')[1].decode('utf-8')
if b'ExecutablePath=' in subline:
exepath = subline.split(b'=')[1].decode('utf-8')
if b'CommandLine=' in subline:
cmdline = subline.split(b'=')[1].decode('utf-8')
if execpid and exename:
execlist.append(
{
'execpid': execpid,
'exename': exename,
'exepath': exepath,
'cmdline': cmdline
}
)
return execlist
@classmethod
# pylint: disable=W0612
def _list_linux(cls) -> list:
"""Find all running process with ps.
Returns:
list: dictionaries with descriptions of found processes.
"""
execlist = []
out, err = Popen(
[
'/bin/ps', '-eo', 'pid,args'
],
stdout=PIPE,
stderr=PIPE
).communicate()
for line in out.splitlines():
execpid = line.split()[0].decode('utf-8')
exepath = line.split()[1].decode('utf-8')
exename = path.basename(exepath)
cmdline = line.split(None, 1)[1].decode('utf-8')
if execpid and exename:
execlist.append(
{
'execpid': execpid,
'exename': exename,
'exepath': exepath,
'cmdline': cmdline
}
)
return execlist
@classmethod
2023-06-18 07:55:58 +03:00
def list_all(cls) -> list:
"""Find all running process.
Returns:
list: dictionaries with descriptions of found processes.
"""
if platform.startswith('linux') or platform.startswith('darwin'):
return cls._list_linux()
elif platform.startswith('win32'):
return cls._list_windows()
else:
return None
@classmethod
# pylint: disable=W0150
def search(cls, find: str, exclude: str = None) -> list:
"""Find specified processes.
Args:
find (str): find process pid, name or arguments.
exclude (str, optional): exclude process pid, name or arguments. Defaults to None.
Returns:
list: dictionaries with descriptions of found processes.
"""
proc_found = []
try:
2023-06-18 07:55:58 +03:00
for proc in cls.list_all():
if exclude and (
exclude in proc['execpid'] or
exclude in proc['exename'] or
exclude in proc['exepath'] or
exclude in proc['cmdline']
):
pass
elif (
find in proc['execpid'] or
find in proc['exename'] or
find in proc['exepath'] or
find in proc['cmdline']
):
proc_found.append(proc)
except TypeError as ex:
print('ON', platform, 'PLATFORM', 'search ERROR:', ex)
finally:
if len(proc_found) == 0:
return None
else:
return proc_found
@classmethod
def kill(cls, pid: int) -> None:
"""Kill the process by means of the OS.
Args:
pid (int): process ID.
"""
if platform.startswith('linux') or platform.startswith('darwin'):
Popen(['kill', '-s', 'SIGKILL', str(pid)])
elif platform.startswith('win32'):
Popen(['taskkill', '/PID', str(pid), '/F'])
class FFmpeg:
"""FFmpeg management from Python.
"""
@classmethod
def run(
cls,
2023-06-18 07:55:58 +03:00
src: (str, type(None)) = None,
dst: str = None,
fps: int = None,
preset: str = None,
2023-06-18 07:55:58 +03:00
raw: (str, type(None)) = None,
ffpath: str = None,
watchdog: bool = False,
watchsec: int = None,
onlyonce: bool = False
2023-06-18 07:55:58 +03:00
) -> int:
"""Running the installed ffmpeg.
Args:
2023-06-18 07:55:58 +03:00
src (str, type, optional): sources urls, example:
'rtsp://user:pass@host:554/Streaming/Channels/101, anull'. Defaults to None.
dst (str, optional): destination url, example: 'rtp://239.0.0.1:5554'. Defaults to None.
fps (int, optional): frame per second encoding output. Defaults to None.
preset (str, optional): 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p. Defaults to None.
2023-06-18 07:55:58 +03:00
raw (str, type, optional): custom ffmpeg parameters string. Defaults to None.
ffpath (str, optional): custom path to bin, example: /usr/bin/ffmpeg. Defaults to None.
watchdog (bool, optional): detect ffmpeg freeze and terminate. Defaults to False.
2023-06-18 07:55:58 +03:00
watchsec (int, optional): seconds to wait before watchdog terminates. Defaults to None.
onlyonce (bool, optional): detect ffmpeg running copy and terminate. Defaults to False.
2023-06-18 07:55:58 +03:00
Returns:
int: ffmpeg return code
"""
2023-06-18 07:55:58 +03:00
if not raw:
process = ([]
2023-06-18 07:55:58 +03:00
+ cls._bin(ffpath).split()
+ cls._src(src).split()
+ cls._preset(preset, fps).split()
+ cls._dst(dst).split()
)
2023-06-18 07:55:58 +03:00
else:
process = cls._bin(ffpath).split() + raw.split()
if onlyonce and Proc.search(' '.join(process)):
print('Process already exist, exit...')
else:
logging.info(msg='Starting ' + ' '.join(process))
with Popen(process, stdout=PIPE, stderr=STDOUT) as proc:
que = None
if watchdog:
que = Queue()
Process(
target=cls._watchdog,
args=(proc.pid, watchsec, que,),
daemon=True
).start()
for line in proc.stdout:
if not que:
print(line, flush=True)
else:
que.put(line)
return proc.returncode
@classmethod
2023-06-18 07:55:58 +03:00
def _bin(cls, ffpath: str, tool: str = 'ffmpeg') -> str:
"""Returns the path to the bin depending on the OS.
Args:
2023-06-18 07:55:58 +03:00
ffpath (str): custom path to bin.
tool (str, optional): 'ffmpeg', 'ffprobe'. Defaults to 'ffmpeg'.
Returns:
2023-06-18 07:55:58 +03:00
str: path to bin or None, if path does not exist.
"""
faq = (
'\n'
'Main download page: https://ffmpeg.org/download.html\n'
'\n'
'Install on Linux (Debian):\n'
'\tsudo apt install -y ffmpeg\n'
'\tTarget: /usr/bin/ffmpeg\n'
'\n'
'Install on Windows:\n'
'\tDownload and extract: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n'
'\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n'
'\n'
'Install on MacOS:\n'
'\tDownload and extract: https://evermeet.cx/ffmpeg/\n'
'\tTarget: /usr/bin/ffmpeg\n'
2023-06-18 07:55:58 +03:00
)
if not ffpath:
if platform.startswith('linux') or platform.startswith('darwin'):
2023-06-18 07:55:58 +03:00
if tool == 'ffprobe':
ffpath = '/usr/bin/ffprobe'
else:
ffpath = '/usr/bin/ffmpeg'
elif platform.startswith('win32'):
2023-06-18 07:55:58 +03:00
if tool == 'ffprobe':
ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffprobe.exe"
else:
ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffmpeg.exe"
if path.exists(ffpath):
return ffpath
else:
2023-06-18 07:55:58 +03:00
print('ON', platform, 'PLATFORM', 'not found', tool, faq)
return None
@classmethod
def _src(cls, sources: str) -> list:
"""Parsing sources into ffmpeg format.
Args:
sources (str): comma-separated list of sources in string format.
Returns:
list: ffmpeg format list of sources.
"""
list_sources = []
for src in sources.split(','):
src = src.strip()
if 'null' in src:
src = ' '.join(['-f lavfi -i', src])
elif 'rtsp' in src:
src = ' '.join(['-rtsp_transport tcp -i', src])
else:
src = ' '.join(['-stream_loop -1 -re -i', src])
list_sources.append(src)
return ' '.join(list_sources)
@classmethod
def _preset(cls, choice: str, fps: int) -> str:
"""Parsing preset into ffmpeg format.
Args:
choice (str): preset selection.
fps (int): frame per second encoding output.
Returns:
str: ffmpeg format encoding parameters.
"""
tune = '-tune zerolatency'
video = '-c:v copy'
audio = '-c:a aac -b:a 128k'
width, height, kbps = None, None, None
if choice:
if choice == '240p':
width, height, kbps = 426, 240, 480
if choice == '360p':
width, height, kbps = 640, 360, 720
if choice == '480p':
width, height, kbps = 854, 480, 1920
if choice == '720p':
width, height, kbps = 1280, 720, 3960
if choice == '1080p':
width, height, kbps = 1920, 1080, 5940
if choice == '1440p':
width, height, kbps = 2560, 1440, 12960
if choice == '2160p':
width, height, kbps = 3840, 2160, 32400
if width and height and kbps:
video = ''.join(
[
'-vf scale=',
str(width), ':', str(height),
',setsar=1:1'
]
)
video = ' '.join(
[
video,
'-c:v libx264 -pix_fmt yuv420p -preset ultrafast'
]
)
if fps:
video = ' '.join([video, '-r', str(fps), '-g', str(fps * 2)])
video = ' '.join([video, '-b:v', str(kbps) + 'k'])
return ' '.join([tune, video, audio])
@classmethod
def _dst(cls, destination: str) -> str:
"""Parsing destination into ffmpeg format.
Args:
destination (str): destination path or url.
Returns:
str: ffmpeg format destination.
"""
container = '-f null'
stdout = '-v debug' # '-nostdin -nostats' # '-report'
if destination:
if 'rtmp' in destination:
container = '-f flv'
elif "rtp" in destination:
container = '-f rtp_mpegts'
else:
destination = '-'
return ' '.join([container, destination, stdout])
@classmethod
def _watchdog(cls, pid: int, sec: int, que: Queue = None) -> None:
"""If no data arrives in the queue, kill the process.
Args:
pid (int): process ID.
sec (int): seconds to wait for data.
que (Queue, optional): queue pointer. Defaults to None.
"""
if not sec:
sec = 5
if que:
while True:
while not que.empty():
print(que.get())
sleep(sec)
if que.empty():
Proc.kill(pid)
print('exit by watchdog')
break
exit()
2023-06-18 07:55:58 +03:00
@classmethod
def probe(
cls,
target: (str, type(None)) = None,
raw: (str, type(None)) = None,
ffpath: str = None
) -> (dict, bytes, None):
"""Running the installed ffprobe.
Args:
target (str, type, optional): media file path to probe. Defaults to None.
raw (str, type, optional): custom ffprobe parameters string. Defaults to None.
ffpath (str, optional): custom path to bin, example: /usr/bin/ffprobe. Defaults to None.
Returns:
dict, bytes, None: ffprobe response or None.
"""
if not raw:
command = ([]
+ cls._bin(ffpath=ffpath, tool='ffprobe').split()
+ ('-i ' + target
+ ' -v quiet -print_format json -show_format -show_programs -show_streams').split()
)
else:
command = cls._bin(ffpath=ffpath, tool='ffprobe').split() + raw.split()
with Popen(command, stdout=PIPE, stderr=STDOUT) as process:
result = process.communicate()
if process.returncode == 0 and not raw:
return json.loads(result[0].decode('utf-8'))
elif process.returncode == 0 and raw:
return result[0]
else:
return None
2023-06-18 08:12:18 +03:00
class Do():
"""Set of various methods (functions) for routine.
"""
@staticmethod
def args_valid(arguments: dict, annotations: dict) -> bool:
"""Arguments type validating by annotations.
Args:
arguments (dict): 'locals()' immediately after starting the function.
annotations (dict): function.name.__annotations__.
Raises:
TypeError: type of argument is not equal type in annotation.
Returns:
bool: True if argument types are valid.
"""
for var_name, var_type in annotations.items():
if not var_name == 'return':
if not isinstance(arguments[var_name], var_type):
raise TypeError(""
+ "type of '"
+ var_name
+ "' = "
+ str(arguments[var_name])
+ " is not "
+ str(var_type)
)
return True
2023-06-18 10:58:35 +03:00
@staticmethod
def date_calc(
target: datetime.date = datetime.datetime.now(),
amount: int = 0,
period: str = None
) -> dict:
"""Calculating start/end dates for period: day, week, month, year.
Args:
target (datetime.date, optional): date in the calculation period. Defaults to now.
amount (int, optional): +/- periods. Defaults to 0.
period (str, optional): 'y'|'year','m'|'month','w'|'week','d'|'day'. Defaults to None.
Raises:
ValueError: 'period' value is wrong.
Returns:
dict: {
'start':{'y':int,'m':int,'w':int,'d':int},
'end':{'y':int,'m':int,'w':int,'d':int}
}.
"""
if Do.args_valid(locals(), Do.date_calc.__annotations__):
date = {}
if not period:
raise ValueError("'period' value is wrong: " + "''")
elif period == 'd' or period == 'day':
delta = target + datetime.timedelta(days=amount)
target = delta
date['period'] = 'day'
elif period == 'w' or period == 'week':
delta = target + datetime.timedelta(weeks=amount)
target_week = str(delta.year) + '-W' + str(delta.isocalendar()[1])
target = datetime.datetime.strptime(target_week + '-1', "%G-W%V-%u")
delta = target + datetime.timedelta(days=6)
date['period'] = 'week'
elif period == 'm' or period == 'month':
delta_month = (target.month + amount) % 12
if not delta_month:
delta_month = 12
delta_year = target.year + ((target.month) + amount - 1) // 12
delta_days = calendar.monthrange(delta_year, delta_month)[1]
delta = target = target.replace(
year=delta_year,
month=delta_month,
day=1
)
delta = delta.replace(
year=delta_year,
month=delta_month,
day=delta_days
)
date['period'] = 'month'
elif period == 'y' or period == 'year':
target = target.replace(
year=target.year + amount,
month=1,
day=1
)
delta = target.replace(
year=target.year,
month=12,
day=31
)
date['period'] = 'year'
else:
raise ValueError("'period' value is wrong: " + period)
date['start'] = {
'y': target.year,
'm': target.month,
'w': target.isocalendar()[1],
'd': target.day
}
date['end'] = {
'y': delta.year,
'm': delta.month,
'w': delta.isocalendar()[1],
'd': delta.day
}
return date
2023-06-18 11:02:12 +03:00
@staticmethod
# pylint: disable=W0612
def file_search(root_path: str, search_name: (str, type(None)) = None) -> list:
"""Search files.
Args:
root_path (str): where to search.
search_name (str, type, optional): full or partial filename for the filter.
Defaults to None.
Returns:
list: list of found files.
"""
found_list = []
if Do.args_valid(locals(), Do.file_search.__annotations__):
for root, dirs, files in walk(root_path, topdown=False):
for file in files:
if search_name:
if search_name in file:
found_list.append(path.join(path.realpath(root), file))
else:
found_list.append(path.join(path.realpath(root), file))
return found_list
2023-06-18 13:01:31 +03:00
@staticmethod
def wp_routine_media(
wp: Wordpress,
targets_media_files: dict,
period: str,
amount: int,
page_id: int
) -> dict:
"""Custom Wordpress media routine - upload media, create media events, update media page.
Args:
wp (Wordpress): wordpress object.
targets_media_files (dict): {'period':{'name':'local/path/to/file'}}.
period (str, optional): 'y','m','w','d'.
amount (int, optional): +/- periods.
page_id (int): unique identifier for the page.
Returns:
dict: {'media upload': bool, 'event create': bool, 'pages update': bool}
"""
if Do.args_valid(locals(), Do.wp_routine_media.__annotations__):
default_media_links = {
"d": {
"point-01": "https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mm_.dd_.mp4",
"point-02": "https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mm_.dd_.mp4",
"point-04": "https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mm_.dd_.mp4",
"point-05": "https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mm_.dd_.mp4",
"point-11": "https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mm_.dd_.mp4",
"point-12": "https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mm_.dd_.mp4"
},
"w": {
"point-01": "https://www.hmp.today/wp-content/uploads/2022/07/point-01_w.mp4",
"point-02": "https://www.hmp.today/wp-content/uploads/2022/07/point-02_w.mp4",
"point-04": "https://www.hmp.today/wp-content/uploads/2022/07/point-04_w.mp4",
"point-05": "https://www.hmp.today/wp-content/uploads/2022/07/point-05_w.mp4",
"point-11": "https://www.hmp.today/wp-content/uploads/2022/07/point-11_w.mp4",
"point-12": "https://www.hmp.today/wp-content/uploads/2022/07/point-12_w.mp4"
},
"m": {
"point-01": "https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mm_.mp4",
"point-02": "https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mm_.mp4",
"point-04": "https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mm_.mp4",
"point-05": "https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mm_.mp4",
"point-11": "https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mm_.mp4",
"point-12": "https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mm_.mp4"
},
"y": {
"point-01": "https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mp4",
"point-02": "https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mp4",
"point-04": "https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mp4",
"point-05": "https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mp4",
"point-11": "https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mp4",
"point-12": "https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mp4"
}
}
current_media_links = default_media_links[period]
result = {}
# media upload
result['media upload'] = True
for name, link in current_media_links.items():
file_found = wp.media_search(
media_name=path.basename(targets_media_files[name]),
media_type='video'
)
if file_found['success'] and len(file_found['result']) > 0:
logging.info(
msg=""
+ "upload skipped, "
+ targets_media_files[name]
+ " found on site"
)
targets_media_files.pop(name, None)
current_media_links[name] = file_found['result'][0]
else:
file_upload = wp.media_upload(targets_media_files[name], 'video/mp4')
if file_upload['success']:
current_media_links[name] = file_upload['result']
else:
result['media upload'] = False
# event create
result['event create'] = True
for name, link in current_media_links.items():
description = (''
+ '<figure class="wp-block-video">'
+ '<video controls loop src="' + link + '"></video>'
+ '</figure>'
)
date = Do.date_calc(period=period, amount=amount)
date_start = (''
+ str(date['start']['y'])
+ '-'
+ str(date['start']['m']).zfill(2)
+ '-'
+ str(date['start']['d']).zfill(2)
+ 'T00:00:00'
)
date_end = (''
+ str(date['end']['y'])
+ '-'
+ str(date['end']['m']).zfill(2)
+ '-'
+ str(date['end']['d']).zfill(2)
+ 'T23:59:59'
)
if period == 'd':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
+ '-'
+ str(date['start']['m']).zfill(2)
+ '-'
+ str(date['start']['d']).zfill(2)
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
+ '.'
+ str(date['start']['m']).zfill(2)
+ '.'
+ str(date['start']['d']).zfill(2)
)
if period == 'w':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
+ '-w'
+ str(date['start']['w']).zfill(2)
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
+ '-w'
+ str(date['start']['w']).zfill(2)
)
if period == 'm':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
+ '-'
+ str(date['start']['m']).zfill(2)
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
+ '.'
+ str(date['start']['m']).zfill(2)
)
if period == 'y':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
)
event_api_slug = wp.api_event + '/by-slug/' + slug
if current_media_links[name] != default_media_links[period][name]:
pass
elif Connect.http(event_api_slug)['success']:
logging.info(msg="event skipped, " + event_api_slug + " found on site")
else:
event_create = wp.event_create(
title=title,
slug=slug,
date_start=date_start,
date_end=date_end,
date_publish=date_start,
description=description
)
if not event_create['success']:
result['event create'] = False
# pages update
result['pages update'] = True
page_read = wp.pages_read(page_id)
if page_read['success']:
content = json.loads(page_read['result'])['content']['rendered']
for name, link in current_media_links.items():
if period == 'd':
reg_exp = (""
+ "_(?:[0-9]{4}|yyyy)"
+ ".(?:[0-9]{2}|mm_)"
+ ".(?:[0-9]{2}|dd_)(?:|-[0-9]).mp4"
)
if period == 'w':
reg_exp = "(?:_[0-9]{4}-w[0-9]{2}|_w)(?:|-[0-9]).mp4"
if period == 'm':
reg_exp = "_(?:[0-9]{4}|yyyy).(?:[0-9]{2}|mm_)(?:|-[0-9]).mp4"
if period == 'y':
reg_exp = "_(?:[0-9]{4}|yyyy)(?:|-[0-9])"
replace = 0
new_str = link
pattern = wp.url_files + "/[0-9]{4}/[0-9]{2}/" + name + reg_exp
for old_str in re.findall(pattern, content):
if old_str == new_str:
logging.info(
msg=""
+ "page replace skipped, "
+ new_str
+ " found on page"
)
else:
content = content.replace(old_str, new_str)
replace += 1
logging.info(msg="page replace" + old_str + " to " + new_str)
if replace > 0:
page_update = wp.pages_update(page_id = page_id, content = content)
result['pages update'] = page_update['success']
return result
2023-06-18 08:12:18 +03:00
2023-03-12 14:27:03 +03:00
if __name__ == "__main__":
time_start = datetime.now()
args = ArgumentParser(
prog='cctv-scheduler',
description='Hikvision PTZ IP-Camera management.',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- Python 3 modules: paramiko '
)
args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf',
required=False,
2023-03-12 14:27:03 +03:00
help='custom configuration file path')
args.add_argument('-b', '--broadcast', action='store_true', required=False,
help='streaming media to destination')
2023-03-12 14:27:03 +03:00
args.add_argument('-s', '--sequences', action='store_true', required=False,
help='run sequences from config file')
args.add_argument('-c', '--converter', action='store_true', required=False,
help='convert JPEG collection to MP4')
args.add_argument('-p', '--publisher', action='store_true', required=False,
help='publish content from templates')
args = vars(args.parse_args())
log_root = path.dirname(path.realpath(__file__))
log_level = 'INFO'
if path.exists(args['config']):
conf = Parse(parameters=args['config'], block='common')
if 'log_root' in conf.data:
log_root = conf.data['log_root']
if 'log_level' in conf.data:
if conf.data['log_level'] == 'DEBUG':
log_level = logging.DEBUG
elif conf.data['log_level'] == 'INFO':
log_level = logging.INFO
elif conf.data['log_level'] == 'WARNING':
log_level = logging.WARNING
elif conf.data['log_level'] == 'ERROR':
log_level = logging.ERROR
elif conf.data['log_level'] == 'CRITICAL':
log_level = logging.CRITICAL
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d_%H.%M.%S',
handlers=[
logging.FileHandler(
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
mode='a'
),
logging.StreamHandler()
],
level=log_level
)
logging.getLogger("paramiko").setLevel(logging.WARNING)
if args['broadcast']:
logging.info(msg='Starting streaming media to destination')
broadcasts = {}
conf = Parse(parameters=args['config'], block='enable-broadcast')
for key, value in conf.data.items():
if value == 'true':
broadcast_config = Parse(
parameters=args['config'],
block='broadcast-config:' + key
).data
src = None
if 'src' in broadcast_config:
src = broadcast_config['src']
dst = None
if 'dst' in broadcast_config:
dst = broadcast_config['dst']
fps = None
if 'fps' in broadcast_config:
fps = broadcast_config['fps']
preset = None
if 'preset' in broadcast_config:
preset = broadcast_config['preset']
ffpath = None
if 'ffpath' in broadcast_config:
ffpath = broadcast_config['ffpath']
watchdog = None
if 'watchdog' in broadcast_config:
watchdog = broadcast_config['watchdog']
watchsec = None
if 'watchsec' in broadcast_config:
watchsec = int(broadcast_config['watchsec'])
2023-06-18 07:55:58 +03:00
onlyonce = None
if 'onlyonce' in broadcast_config:
onlyonce = broadcast_config['onlyonce']
FFmpeg.run(
src=src,
dst=dst,
fps=fps,
preset=preset,
ffpath=ffpath,
watchdog=watchdog,
watchsec=watchsec,
onlyonce=onlyonce
2023-06-18 07:55:58 +03:00
)
elif args['sequences']:
2023-03-12 14:27:03 +03:00
logging.info(msg='Starting PTZ sequences from config file')
sensors = {}
conf = Parse(parameters=args['config'], block='enable-sensor')
for key, value in conf.data.items():
if value == 'true':
device_config = Parse(
parameters=args['config'],
block='sensor-config:' + key
).data
device_entity = Sensor(
hostname=device_config['hostname'],
username=device_config['username'],
userpass=device_config['userpass'],
nodetype=device_config['nodetype'],
nodename=device_config['nodename']
)
sensors[key] = device_entity
conf = Parse(parameters=args['config'], block='enable-sequences')
for key, value in conf.data.items():
if value == 'true':
device_sequence = Parse(
parameters=args['config'],
block='camera-sequences:' + key
).data
device_config = Parse(
parameters=args['config'],
block='camera-config:' + key
).data
device_entity = HikISAPI(
hostname=device_config['hostname'],
username=device_config['username'],
userpass=device_config['userpass']
)
records_root_path = path.dirname(path.realpath(__file__))
records_root_user = None
records_root_pass = None
if 'records_root_path' in device_config:
records_root_path = device_config['records_root_path']
if 'records_root_user' in device_config:
records_root_user = device_config['records_root_user']
if 'records_root_pass' in device_config:
records_root_pass = device_config['records_root_pass']
Sequence.run(
device=device_entity,
sensors=sensors,
sequence=device_sequence,
records_root_path=records_root_path,
records_root_user=records_root_user,
records_root_pass=records_root_pass
)
elif args['converter']:
logging.info(msg='Starting convert JPEG collection to MP4')
elif args['publisher']:
logging.info(msg='Starting publish content from templates')
else:
logging.info(msg='Start arguments was not selected. Exit.')
time_execute = datetime.now() - time_start
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')