cctv-scheduler/cctv-scheduler.py

4553 lines
183 KiB
Python

#!/usr/bin/env python3
# pylint: disable=C0103,C0302,W0621
"""It's the main executor of cctv-scheduler. Processes incoming parameters and calls subclasses
"""
import calendar
import base64
import datetime
import inspect
import json
import logging
from random import choice
import re
import urllib.request
from argparse import ArgumentParser
from ftplib import FTP
from multiprocessing import Process, Queue
from os import environ, makedirs, path, remove, rmdir, sep, stat, walk
from shutil import copyfile
from string import ascii_letters, digits
from subprocess import Popen, PIPE, STDOUT
from sys import modules, platform
from time import sleep
import requests
from paramiko import AutoAddPolicy, SSHClient, SFTPClient
import smbclient
class Parse:
"""Parser of configs, arguments, parameters.
"""
# pylint: disable=C0123
def __init__(self, parameters, block: str = None) -> None:
"""Object constructor.
Args:
parameters: dictionary as "key":"value" or
ArgumentParser class object or
string path to the file or
string as "var1=val1;var2=val2".
block (str, optional): name of target block from text. Defaults to None.
"""
self.path = ''
self.data = {}
if type(parameters) is dict:
self._dict2dict(parameters)
if type(parameters) is ArgumentParser:
self._dict2dict(self.argv2dict(parameters))
if type(parameters) is str:
if path.exists(parameters):
self._dict2dict(
self.strs2dict(
self.conf2strs(parameters),
block
)
)
self.path = parameters
else:
self._dict2dict(self.strs2dict(parameters, block))
def __str__(self) -> str:
"""Overrides method for print(object).
Returns:
str: string with contents of the object's dictionary.
"""
string = ''
for key, val in self.data.items():
string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n'
return string
def _dict2dict(self, dictionary: dict) -> None:
"""Updates or adds dictionary data.
Args:
dictionary (dict): dictionary as "key":"value".
"""
self.data.update(dictionary)
# pylint: disable=C0206
def expand(self, store: str = None) -> dict:
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
Args:
store (str, optional): path to directory with name.conf. Defaults to None.
Returns:
dict: expanded dictionary as "key":{subkey: subval}.
"""
for key in self.data:
if store:
config = store + sep + self.data[key]
else:
config = self.data[key]
with open(config, encoding='UTF-8') as file:
self.data[key] = Parse(file.read()).data
return self.data
@classmethod
def argv2dict(cls, parser: ArgumentParser) -> dict:
"""Converts startup arguments to a dictionary.
Args:
parser (ArgumentParser): argparse.ArgumentParser class object.
Returns:
dict: dictionary as "key":"value".
"""
parser = ArgumentParser(add_help=False, parents=[parser])
return vars(parser.parse_args())
@classmethod
def conf2strs(cls, config: str) -> str:
"""Builds a dictionary from a file containing parameters.
Args:
config (str): path to the config file.
Returns:
str: string as "var1=val1;\nvar2=val2;".
"""
with open(config, encoding='UTF-8') as file:
raw = file.read()
strs = ''
for line in raw.splitlines():
if not line.lstrip().startswith('#'):
strs += line + '\n'
return strs
@classmethod
def strs2dict(cls, strings: str, blockname: str) -> dict:
"""Builds a dictionary from a strings containing parameters.
Args:
strings (str): string as "var1=val1;var2=val2;".
blockname (str): name of target block from text.
Returns:
dict: dictionary as "key":"value".
"""
dictionary = {}
if blockname:
strings = cls.block(blockname, strings)
for line in strings.replace('\n', ';').split(';'):
if not line.lstrip().startswith('#') and "=" in line:
dictionary[line.split('=')[0].strip()] = (
line.split('=', maxsplit=1)[1].strip().split(';')[0].strip()
)
return dictionary
@classmethod
def str2bool(cls, value: str) -> bool:
"""Converts a string value to boolean.
Args:
value (str): string containing "true" or "false", "yes" or "no", "1" or "0".
Returns:
bool: bool True or False.
"""
return str(value).lower() in ("true", "yes", "1")
@classmethod
def block(cls, blockname: str, text: str) -> str:
"""Cuts a block of text between line [blockname] and line [next block] or EOF.
Args:
blockname (str): string in [] after which the block starts.
text (str): string of text from which the block is needed.
Returns:
str: string of text between line [block name] and line [next block].
"""
level = 1
save = False
result = ''
for line in text.splitlines():
if line.startswith('[') and blockname in line:
level = line.count('[')
save = True
elif line.startswith('[') and '['*level in line:
save = False
elif save:
result += line + '\n'
return result
class Proc:
"""Find a running process from Python.
"""
@classmethod
# pylint: disable=W0612
def _list_windows(cls) -> list:
"""Find all running process with wmi.
Returns:
list: dictionaries with descriptions of found processes.
"""
execlist = []
separate = b'\r\r\n'
out, err = Popen(
[
'wmic', 'process', 'get',
'CommandLine,ExecutablePath,Name,ProcessId',
'/format:list'
],
stdout=PIPE,
stderr=PIPE
).communicate()
for line in out.split(separate + separate):
execpid, exename, exepath, cmdline = None, None, None, None
for subline in line.split(separate):
if b'ProcessId=' in subline:
execpid = subline.split(b'=')[1].decode('utf-8')
if b'Name=' in subline:
exename = subline.split(b'=')[1].decode('utf-8')
if b'ExecutablePath=' in subline:
exepath = subline.split(b'=')[1].decode('utf-8')
if b'CommandLine=' in subline:
cmdline = subline.split(b'=')[1].decode('utf-8')
if execpid and exename:
execlist.append(
{
'execpid': execpid,
'exename': exename,
'exepath': exepath,
'cmdline': cmdline
}
)
return execlist
@classmethod
# pylint: disable=W0612
def _list_linux(cls) -> list:
"""Find all running process with ps.
Returns:
list: dictionaries with descriptions of found processes.
"""
execlist = []
out, err = Popen(
[
'/bin/ps', '-eo', 'pid,args'
],
stdout=PIPE,
stderr=PIPE
).communicate()
for line in out.splitlines():
execpid = line.split()[0].decode('utf-8')
exepath = line.split()[1].decode('utf-8')
exename = path.basename(exepath)
cmdline = line.split(None, 1)[1].decode('utf-8')
if execpid and exename:
execlist.append(
{
'execpid': execpid,
'exename': exename,
'exepath': exepath,
'cmdline': cmdline
}
)
return execlist
@classmethod
def list_all(cls) -> list:
"""Find all running process.
Returns:
list: dictionaries with descriptions of found processes.
"""
if platform.startswith('linux') or platform.startswith('darwin'):
return cls._list_linux()
elif platform.startswith('win32'):
return cls._list_windows()
else:
return None
@classmethod
# pylint: disable=W0150
def search(
cls,
find: str,
exclude: str = None,
logger_alias: str = inspect.stack()[0].function
) -> list:
"""Find specified processes.
Args:
find (str): find process pid, name or arguments.
exclude (str, optional): exclude process pid, name or arguments. Defaults to None.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
list: dictionaries with descriptions of found processes.
"""
local_logger = logging.getLogger(logger_alias)
proc_found = []
try:
for proc in cls.list_all():
if exclude and (
exclude in proc['execpid'] or
exclude in proc['exename'] or
exclude in proc['exepath'] or
exclude in proc['cmdline']
):
pass
elif (
find in proc['execpid'] or
find in proc['exename'] or
find in proc['exepath'] or
find in proc['cmdline']
):
proc_found.append(proc)
except TypeError as error:
local_logger.warning(msg='ON ' + platform + ' PLATFORM search ERROR: ' + error)
finally:
if len(proc_found) == 0:
return None
else:
return proc_found
@classmethod
def kill(cls, pid: int) -> None:
"""Kill the process by means of the OS.
Args:
pid (int): process ID.
"""
if platform.startswith('linux') or platform.startswith('darwin'):
Popen(['kill', '-s', 'SIGKILL', str(pid)])
elif platform.startswith('win32'):
Popen(['taskkill', '/PID', str(pid), '/F'])
class FFmpeg:
"""FFmpeg management from Python.
"""
@classmethod
def run(
cls,
src: (str, type(None)) = None,
dst: str = None,
fps: int = None,
preset: str = None,
raw: (str, type(None)) = None,
ffpath: str = None,
watchdog: bool = False,
watchsec: int = None,
onlyonce: bool = False,
logger_alias: str = inspect.stack()[0].function
) -> int:
"""Running the installed ffmpeg.
Args:
src (str, type, optional): sources urls, example:
'rtsp://user:pass@host:554/Streaming/Channels/101, anull'. Defaults to None.
dst (str, optional): destination url, example: 'rtp://239.0.0.1:5554'. Defaults to None.
fps (int, optional): frame per second encoding output. Defaults to None.
preset (str, optional): 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p. Defaults to None.
raw (str, type, optional): custom ffmpeg parameters string. Defaults to None.
ffpath (str, optional): custom path to bin, example: /usr/bin/ffmpeg. Defaults to None.
watchdog (bool, optional): detect ffmpeg freeze and terminate. Defaults to False.
watchsec (int, optional): seconds to wait before watchdog terminates. Defaults to None.
onlyonce (bool, optional): detect ffmpeg running copy and terminate. Defaults to False.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
int: ffmpeg return code
"""
local_logger = logging.getLogger(logger_alias)
if not raw:
process = ([]
+ cls._bin(ffpath).split()
+ cls._src(src).split()
+ cls._preset(preset, fps).split()
+ cls._dst(dst).split()
)
else:
process = cls._bin(ffpath).split() + raw.split()
if onlyonce and Proc.search(' '.join(process)):
local_logger.info(msg='ffmpeg process already exist')
return 0
else:
local_logger.info(msg='Starting ' + ' '.join(process))
with Popen(process, stdout=PIPE, stderr=STDOUT) as proc:
que = None
if watchdog:
que = Queue()
Process(
target=cls._watchdog,
args=(proc.pid, watchsec, que,),
daemon=True
).start()
for line in proc.stdout:
if not que:
local_logger.debug(msg=line)
else:
que.put(line)
return proc.returncode
@classmethod
def _bin(
cls,
ffpath: str,
tool: str = 'ffmpeg',
logger_alias: str = inspect.stack()[0].function
) -> str:
"""Returns the path to the bin depending on the OS.
Args:
ffpath (str): custom path to bin.
tool (str, optional): 'ffmpeg', 'ffprobe'. Defaults to 'ffmpeg'.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
str: path to bin or None, if path does not exist.
"""
local_logger = logging.getLogger(logger_alias)
faq = (
'\n'
'Main download page: https://ffmpeg.org/download.html\n'
'\n'
'Install on Linux (Debian):\n'
'\tsudo apt install -y ffmpeg\n'
'\tTarget: /usr/bin/ffmpeg\n'
'\n'
'Install on Windows:\n'
'\tDownload and extract: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n'
'\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n'
'\n'
'Install on MacOS:\n'
'\tDownload and extract: https://evermeet.cx/ffmpeg/\n'
'\tTarget: /usr/bin/ffmpeg\n'
)
if not ffpath:
if platform.startswith('linux') or platform.startswith('darwin'):
if tool == 'ffprobe':
ffpath = '/usr/bin/ffprobe'
else:
ffpath = '/usr/bin/ffmpeg'
elif platform.startswith('win32'):
if tool == 'ffprobe':
ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffprobe.exe"
else:
ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffmpeg.exe"
if path.exists(ffpath):
return ffpath
else:
local_logger.warning(msg='ON ' + platform + ' PLATFORM not found ' + tool + faq)
return None
@classmethod
def _src(cls, sources: str) -> list:
"""Parsing sources into ffmpeg format.
Args:
sources (str): comma-separated list of sources in string format.
Returns:
list: ffmpeg format list of sources.
"""
list_sources = []
for src in sources.split(','):
src = src.strip()
if 'null' in src:
src = ' '.join(['-f lavfi -i', src])
elif 'rtsp' in src:
src = ' '.join(['-rtsp_transport tcp -i', src])
else:
src = ' '.join(['-stream_loop -1 -re -i', src])
list_sources.append(src)
return ' '.join(list_sources)
@classmethod
def _preset(cls, choice: str, fps: int) -> str:
"""Parsing preset into ffmpeg format.
Args:
choice (str): preset selection.
fps (int): frame per second encoding output.
Returns:
str: ffmpeg format encoding parameters.
"""
tune = '-tune zerolatency'
video = '-c:v copy'
audio = '-c:a aac -b:a 128k'
width, height, kbps = None, None, None
if choice:
if choice == '240p':
width, height, kbps = 426, 240, 480
if choice == '360p':
width, height, kbps = 640, 360, 720
if choice == '480p':
width, height, kbps = 854, 480, 1920
if choice == '720p':
width, height, kbps = 1280, 720, 3960
if choice == '1080p':
width, height, kbps = 1920, 1080, 5940
if choice == '1440p':
width, height, kbps = 2560, 1440, 12960
if choice == '2160p':
width, height, kbps = 3840, 2160, 32400
if width and height and kbps:
video = ''.join(
[
'-vf scale=',
str(width), ':', str(height),
',setsar=1:1'
]
)
video = ' '.join(
[
video,
'-c:v libx264 -pix_fmt yuv420p -preset ultrafast'
]
)
if fps:
video = ' '.join([video, '-r', str(fps), '-g', str(fps * 2)])
video = ' '.join([video, '-b:v', str(kbps) + 'k'])
return ' '.join([tune, video, audio])
@classmethod
def _dst(cls, destination: str) -> str:
"""Parsing destination into ffmpeg format.
Args:
destination (str): destination path or url.
Returns:
str: ffmpeg format destination.
"""
container = '-f null'
stdout = '-v debug' # '-nostdin -nostats' # '-report'
if destination:
if 'rtmp' in destination:
container = '-f flv'
elif "rtp" in destination:
container = '-f rtp_mpegts'
else:
destination = '-'
return ' '.join([container, destination, stdout])
@classmethod
def _watchdog(
cls,
pid: int,
sec: int,
que: Queue = None,
logger_alias: str = inspect.stack()[0].function
) -> None:
"""If no data arrives in the queue, kill the process.
Args:
pid (int): process ID.
sec (int): seconds to wait for data.
que (Queue, optional): queue pointer. Defaults to None.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
"""
local_logger = logging.getLogger(logger_alias)
if not sec:
sec = 5
if que:
while True:
while not que.empty():
que_str = que.get()
local_logger.debug(msg=que_str)
trigger = "Delay between the first packet and last packet in the muxing queue"
if re.match(".*" + trigger + ".*", que_str):
Proc.kill(pid)
local_logger.debug(msg='exit by watchdog, reason: ' + trigger)
sleep(sec)
if que.empty():
Proc.kill(pid)
trigger = "que is empty for " + str(sec) + " seconds"
local_logger.debug(msg='exit by watchdog, reason: ' + trigger)
break
exit()
@classmethod
def probe(
cls,
target: (str, type(None)) = None,
raw: (str, type(None)) = None,
ffpath: str = None
) -> (dict, bytes, None):
"""Running the installed ffprobe.
Args:
target (str, type, optional): media file path to probe. Defaults to None.
raw (str, type, optional): custom ffprobe parameters string. Defaults to None.
ffpath (str, optional): custom path to bin, example: /usr/bin/ffprobe. Defaults to None.
Returns:
dict, bytes, None: ffprobe response or None.
"""
if not raw:
command = ([]
+ cls._bin(ffpath=ffpath, tool='ffprobe').split()
+ ('-i ' + target
+ ' -v quiet -print_format json -show_format -show_programs -show_streams').split()
)
else:
command = cls._bin(ffpath=ffpath, tool='ffprobe').split() + raw.split()
with Popen(command, stdout=PIPE, stderr=STDOUT) as process:
result = process.communicate()
if process.returncode == 0 and not raw:
return json.loads(result[0].decode('utf-8'))
elif process.returncode == 0 and raw:
return result[0]
else:
return None
class Connect:
"""Set of connection methods (functions) for various protocols.
"""
@staticmethod
# pylint: disable=W0102, W0718
def http(
url: str,
method: str = 'GET',
username: str = '',
password: str = '',
authtype: (str, type(None)) = None,
contenttype: str = 'text/plain',
contentdata: (str, bytes) = '',
headers: dict = {},
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Handling HTTP request.
Args:
url (str): Handling HTTP request.
method (str, optional): HTTP request method. Defaults to 'GET'.
username (str, optional): username for url authentication. Defaults to ''.
password (str, optional): password for url authentication. Defaults to ''.
authtype (str, None, optional): digest|basic authentication type. Defaults to None.
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
contentdata (str, bytes, optional): content data. Defaults to ''.
headers (dict, optional): additional headers. Defaults to {}.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':HTTP response or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.http.__annotations__):
if contentdata != '':
headers['Content-Type'] = contenttype
if isinstance(contentdata, str):
contentdata = bytes(contentdata.encode('utf-8'))
# Preparing authorization
if authtype:
pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm()
pswd.add_password(None, url, username, password)
if authtype == 'basic':
auth = urllib.request.HTTPBasicAuthHandler(pswd)
token = base64.b64encode((username + ':' + password).encode())
headers['Authorization'] = 'Basic ' + token.decode('utf-8')
if authtype == 'digest':
auth = urllib.request.HTTPDigestAuthHandler(pswd)
urllib.request.install_opener(urllib.request.build_opener(auth))
# Preparing request
request = urllib.request.Request(
url=url,
data=contentdata,
method=method
)
for key, val in headers.items():
request.add_header(key, val)
if len(contentdata) > 128:
contentdata = contentdata[:64] + b' ... ' + contentdata[-64:]
# Response
local_logger.debug(msg=''
+ '\n' + 'uri: ' + url
+ '\n' + 'method: ' + method
+ '\n' + 'username: ' + username
+ '\n' + 'password: ' + password
+ '\n' + 'authtype: ' + str(authtype)
+ '\n' + 'headers: ' + json.dumps(headers, indent=2)
+ '\n' + 'content-data: ' + str(contentdata)
)
try:
response = urllib.request.urlopen(request).read()
try:
response = str(response.decode('utf-8'))
except UnicodeDecodeError:
pass
return {"success": True, "result": response}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def ftp_file_search(
root_path: str,
search_name: (str, type(None)) = None,
ftp: (FTP, type(None)) = None,
hostname: (str, type(None)) = None,
username: (str, type(None)) = None,
password: (str, type(None)) = None,
port: int = 21,
logger_alias: str = inspect.stack()[0].function
) -> list:
"""Search files over FTP.
Args:
root_path (str): where to search.
search_name (str, None, optional): full or partial filename for the filter.
Defaults to None.
ftp (FTP, None, optional): FTP object generated by recursive search.
Defaults to None.
hostname (str, None, optional): ftp hostname.
Defaults to None.
username (str, None, optional): ftp username.
Defaults to None.
password (str, None, optional): ftp password.
Defaults to None.
port (int, optional): remote host connection port.
Defaults to 21.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
list: list of found files.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ftp_file_search.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'root_path: ' + root_path
+ '\n' + 'search_name: ' + str(search_name)
+ '\n' + 'ftp: ' + str(ftp)
+ '\n' + 'hostname: ' + str(hostname)
+ '\n' + 'username: ' + str(username)
+ '\n' + 'password: ' + str(password)
)
parent = False
if not ftp:
try:
ftp = FTP(host=hostname)
ftp.connect(host=hostname, port=port)
ftp.login(user=username, passwd=password)
parent = True
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
result = []
ftp.cwd(root_path)
for file in ftp.mlsd():
if file[1]['type'] == 'dir':
result = result + Connect.ftp_file_search(
root_path=root_path + "/" + file[0],
search_name=search_name,
ftp=ftp
)
elif file[1]['type'] == 'file':
if search_name:
if search_name in file[0]:
result.append(root_path + "/" + file[0])
else:
result.append(root_path + "/" + file[0])
if parent:
ftp.close()
result.sort()
return result
@staticmethod
# pylint: disable=W0718
def ftp_get_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 21,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Download file from FTP.
Args:
src_file (str): /remote/path/to/file.
dst_file (str): /local/path/to/file.
hostname (str): ftp hostname.
username (str): ftp username.
password (str): ftp password.
port (int, optional): remote host connection port. Defaults to 21.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ftp_get_file.__annotations__):
ftp = FTP(host=hostname)
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'hostname: ' + hostname
+ '\n' + 'username: ' + username
+ '\n' + 'password: ' + password
)
try:
ftp.connect(host=hostname, port=port)
ftp.login(user=username, passwd=password)
with open(dst_file, "wb+") as file:
ftp.retrbinary(f"RETR {src_file}", file.write)
ftp.quit()
local_logger.info(msg=dst_file + ' saved')
return {"success": True, "result": dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def ftp_put_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 21,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Upload file to FTP.
Args:
src_file (str): /local/path/to/file.
dst_file (str): /remote/path/to/file.
hostname (str): ftp hostname.
username (str): ftp username.
password (str): ftp password.
port (int, optional): remote host connection port. Defaults to 21.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ftp_put_file.__annotations__):
dst_path = dst_file.split('/')[:-1]
ftp = FTP(host=hostname)
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'hostname: ' + hostname
+ '\n' + 'username: ' + username
+ '\n' + 'password: ' + password
)
try:
ftp.connect(host=hostname, port=port)
ftp.login(user=username, passwd=password)
for path_item in dst_path:
if path_item.strip() == '':
continue
path_item = path_item.replace('/', '')
try:
ftp.cwd(path_item)
except Exception:
ftp.mkd(path_item)
ftp.cwd(path_item)
with open(src_file, "rb") as file:
ftp.storbinary(f"STOR {dst_file}", file)
ftp.quit()
local_logger.info(msg='ftp://' + hostname + dst_file + ' saved')
return {"success": True, "result": 'ftp://' + hostname + dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def ssh_commands(
command: str,
hostname: str,
username: str,
password: str,
port: int = 22,
logger_alias: str = inspect.stack()[0].function
) -> str:
"""Handling SSH command executing.
Args:
command (str): command for executing.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
str: terminal response or 'ERROR'.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ssh_commands.__annotations__):
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_logger.debug(msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'command: ' + command
)
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
stdin, stdout, stderr = client.exec_command(command=command, get_pty=True)
if 'sudo' in command:
stdin.write(password + '\n')
stdin.flush()
stdout.flush()
data = stdout.read() + stderr.read()
client.close()
return data.decode('utf-8')
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return 'ERROR'
@staticmethod
# pylint: disable=W0718
def ssh_file_search(
root_path: str,
search_name: (str, type(None)) = None,
sftp: (SFTPClient, type(None)) = None,
hostname: (str, type(None)) = None,
username: (str, type(None)) = None,
password: (str, type(None)) = None,
port: int = 22,
logger_alias: str = inspect.stack()[0].function
) -> list:
"""Search files over SFTP.
Args:
root_path (str): where to search.
search_name (str, None, optional): full or partial filename for the filter.
Defaults to None.
sftp (SFTPClient, None, optional): SFTP object generated by recursive search.
Defaults to None.
hostname (str, None, optional): sftp hostname.
Defaults to None.
username (str, None, optional): sftp username.
Defaults to None.
password (str, None, optional): sftp password.
Defaults to None.
port (int, optional): remote host connection port.
Defaults to 22.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
list: list of found files.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ssh_file_search.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'root_path: ' + root_path
+ '\n' + 'search_name: ' + str(search_name)
+ '\n' + 'sftp: ' + str(sftp)
+ '\n' + 'hostname: ' + str(hostname)
+ '\n' + 'username: ' + str(username)
+ '\n' + 'password: ' + str(password)
)
parent = False
if not sftp:
try:
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
client.connect(
hostname=hostname,
username=username,
password=password,
port=port
)
sftp = client.open_sftp()
parent = True
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
result = []
for file in sftp.listdir(root_path):
try:
result = result + Connect.ssh_file_search(
root_path=root_path + '/' + file,
search_name=search_name,
sftp=sftp
)
except IOError:
if search_name:
if search_name in file:
result.append(root_path + '/' + file)
else:
result.append(root_path + '/' + file)
if parent:
client.close()
result.sort()
return result
@staticmethod
# pylint: disable=W0718
def ssh_get_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 22,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Download file from SFTP.
Args:
src_file (str): /remote/path/to/file.
dst_file (str): /local/path/to/file.
hostname (str): sftp hostname.
username (str): sftp username.
password (str): sftp password.
port (int, optional): remote host connection port. Defaults to 22.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ssh_get_file.__annotations__):
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + str(username)
+ '\n' + 'pass: ' + str(password)
)
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
with client.open_sftp() as sftp:
sftp.get(remotepath=src_file, localpath=dst_file)
client.close()
local_logger.info(msg=dst_file + ' saved')
return {"success": True, "result": dst_file}
except Exception as error:
remove(dst_file)
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def ssh_put_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 22,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Handling SFTP upload file.
Args:
src_file (str): /local/path/to/file.
dst_file (str): /remote/path/to/file.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ssh_put_file.__annotations__):
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
)
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
client.exec_command('mkdir -p ' + path.dirname(dst_file))
try:
sftp = client.open_sftp()
sftp.put(localpath=src_file, remotepath=dst_file)
sftp.stat(dst_file)
sftp.close()
local_logger.info(msg='sftp://' + hostname + dst_file + ' saved')
return {"success": True, "result": 'sftp://' + hostname + dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def smb_file_search(
root_path: str,
search_name: (str, type(None)) = None,
smb: (smbclient._pool.ClientConfig, type(None)) = None,
hostname: (str, type(None)) = None,
username: (str, type(None)) = None,
password: (str, type(None)) = None,
port: int = 445,
logger_alias: str = inspect.stack()[0].function
) -> list:
"""Search files over SMB.
Args:
root_path (str): where to search.
search_name (str, None, optional): full or partial filename for the filter.
Defaults to None.
smb (smbclient.ClientConfig, None, optional): object generated by recursive search.
Defaults to None.
hostname (str, None, optional): smb hostname.
Defaults to None.
username (str, None, optional): smb username.
Defaults to None.
password (str, None, optional): smb password.
Defaults to None.
port (int, optional): remote host connection port.
Defaults to 445.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
list: list of found files.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.smb_file_search.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'root_path: ' + root_path
+ '\n' + 'search_name: ' + str(search_name)
+ '\n' + 'sftp: ' + str(smb)
+ '\n' + 'hostname: ' + str(hostname)
+ '\n' + 'username: ' + str(username)
+ '\n' + 'password: ' + str(password)
)
parent = False
if not smb:
try:
smb = smbclient.ClientConfig(username=username, password=password)
parent = True
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
result = []
for file in smbclient.listdir(path='//' + hostname + root_path, port=port):
try:
result = result + Connect.smb_file_search(
root_path=root_path + '/' + file,
search_name=search_name,
hostname=hostname,
smb=smb
)
except IOError:
if search_name:
if search_name in file:
result.append(root_path + '/' + file)
else:
result.append(root_path + '/' + file)
if parent:
smbclient.reset_connection_cache()
result.sort()
return result
@staticmethod
# pylint: disable=W0718
def smb_get_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 445,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Download file from SMB.
Args:
src_file (str): /remote/path/to/file.
dst_file (str): /local/path/to/file.
hostname (str): smb hostname.
username (str): smb username.
password (str): smb password.
port (int, optional): remote host connection port. Defaults to 445.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.smb_get_file.__annotations__):
smbclient.ClientConfig(username=username, password=password)
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + str(username)
+ '\n' + 'pass: ' + str(password)
)
src_file = '//' + hostname + src_file
try:
with smbclient.open_file(src_file, mode="rb", port=port) as remote_file:
with open(dst_file, "wb+") as local_file:
local_file.write(remote_file.read())
smbclient.reset_connection_cache()
local_logger.info(msg=dst_file + ' saved')
return {"success": True, "result": dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def smb_put_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 445,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Upload file to SMB.
Args:
src_file (str): /local/path/to/file.
dst_file (str): /remote/path/to/file.
hostname (str): smb hostname.
username (str): smb username.
password (str): smb password.
port (int, optional): remote host connection port. Defaults to 445.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.smb_get_file.__annotations__):
smbclient.ClientConfig(username=username, password=password)
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + str(username)
+ '\n' + 'pass: ' + str(password)
)
dst_file = '//' + hostname + dst_file
try:
if not smbclient.path.exists(path.dirname(dst_file)):
smbclient.mkdir(path=path.dirname(dst_file))
with smbclient.open_file(dst_file, mode="wb+", port=port) as remote_file:
with open(src_file, "rb") as local_file:
remote_file.write(local_file.read())
smbclient.reset_connection_cache()
local_logger.info(msg='smb:' + dst_file + ' saved')
return {"success": True, "result": 'smb:' + dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0612
def local_file_search(
root_path: str,
search_name: (str, type(None)) = None,
logger_alias: str = inspect.stack()[0].function
) -> list:
"""Search local files.
Args:
root_path (str): where to search.
search_name (str, None, optional): full or partial filename for the filter.
Defaults to None.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
list: list of found files.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.local_file_search.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'root_path: ' + root_path
+ '\n' + 'search_name: ' + str(search_name)
)
result = []
for root, dirs, files in walk(root_path, topdown=False):
for file in files:
if search_name:
if search_name in file:
result.append(path.join(path.realpath(root), file))
else:
result.append(path.join(path.realpath(root), file))
result.sort()
return result
@staticmethod
# pylint: disable=W0718
def local_get_file(
src_file: str,
dst_file: str,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Like download local file. Interface plug.
Args:
src_file (str): /local/path/to/src/file.
dst_file (str): /local/path/to/dst/file.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.local_get_file.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
)
try:
makedirs(path.dirname(dst_file), exist_ok=True)
copyfile(src=src_file, dst=dst_file)
return {"success": True, "result": dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def local_put_file(
src_file: str,
dst_file: str,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Like upload local file. Interface plug.
Args:
src_file (str): /local/path/to/src/file.
dst_file (str): /local/path/to/dst/file.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.local_put_file.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
)
try:
makedirs(path.dirname(dst_file), exist_ok=True)
copyfile(src=src_file, dst=dst_file)
return {"success": True, "result": dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
def parse_connect_params(
connect_string: str,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""One line connection string separator.
Args:
connect_string (str): hosttype://username:password@hostname:hostport/some/path.
logger_alias (str, optional): logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
dict: {
'hostpath': remote_hostpath,
'hostname': remote_hostname,
'hostport': remote_hostport,
'hosttype': remote_hosttype,
'username': remote_username,
'password': remote_password
}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.parse_connect_params.__annotations__):
remote_hostpath = None
remote_hostname = None
remote_hostport = None
remote_hosttype = None
remote_username = None
remote_password = None
if '://' in connect_string:
remote_hostname = connect_string.split('/')[2]
remote_hosttype = connect_string.split('://')[0]
remote_hostpath = connect_string.replace(
remote_hosttype + '://' + remote_hostname, ''
)
if '@' in remote_hostname:
remote_username = remote_hostname.split('@')[0].split(':')[0]
remote_password = remote_hostname.split('@')[0].split(':')[1]
remote_hostname = remote_hostname.split('@')[1]
if ':' in remote_hostname:
remote_hostport = int(remote_hostname.split(':')[1])
remote_hostname = remote_hostname.split(':')[0]
else:
remote_hostpath = connect_string
remote_hosttype = 'local'
if not remote_hostport:
if remote_hosttype == 'ftp':
remote_hostport = 21
if remote_hosttype == 'sftp':
remote_hostport = 22
if remote_hosttype == 'smb':
remote_hostport = 445
local_logger.debug(msg=''
+ '\n' + 'hostpath: ' + str(remote_hostpath)
+ '\n' + 'hostname: ' + str(remote_hostname)
+ '\n' + 'hostport: ' + str(remote_hostport)
+ '\n' + 'hosttype: ' + str(remote_hosttype)
+ '\n' + 'username: ' + str(remote_username)
+ '\n' + 'password: ' + str(remote_password)
)
return {
'hostpath': remote_hostpath,
'hostname': remote_hostname,
'hostport': remote_hostport,
'hosttype': remote_hosttype,
'username': remote_username,
'password': remote_password
}
@classmethod
def file_search(
cls,
search_path: str,
search_name: (str, type(None)) = None,
hostname: (str, type(None)) = None,
hostport: (int, type(None)) = None,
hosttype: (str, type(None)) = None,
username: (str, type(None)) = None,
password: (str, type(None)) = None,
logger_alias: str = inspect.stack()[0].function
) -> list:
"""Interface for file search over FTP, SFTP, SMB.
Args:
search_path (str): where to search.
search_path = 'hosttype://username:password@hostname:hostport/some/path'
has lower priority and parameters are overwritten by
search_path='/some/path', hostname='hostname', username='username', etc.
search_name (str, type, optional): full or partial filename for the filter.
Defaults to None.
hostname (str, type, optional): remote hostname.
Defaults to None.
hostport (int, type, optional): remote host connection port.
Defaults to None.
hosttype (str, type, optional): 'ftp', 'sftp', 'smb'.
Defaults to None.
username (str, type, optional): remote username.
Defaults to None.
password (str, type, optional): remote password.
Defaults to None.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
list: list of found files.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), cls.file_search.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'search_path: ' + search_path
+ '\n' + 'search_name: ' + str(search_name)
+ '\n' + 'hostname: ' + str(hostname)
+ '\n' + 'hostport: ' + str(hostport)
+ '\n' + 'hosttype: ' + str(hosttype)
+ '\n' + 'username: ' + str(username)
+ '\n' + 'password: ' + str(password)
)
connect = cls.parse_connect_params(connect_string=search_path)
remote_hostpath = connect['hostpath']
remote_hostname = hostname
if not remote_hostname:
remote_hostname = connect['hostname']
remote_hostport = hostport
if not remote_hostport:
remote_hostport = connect['hostport']
remote_hosttype = hosttype
if not remote_hosttype:
remote_hosttype = connect['hosttype']
remote_username = username
if not remote_username:
remote_username = connect['username']
remote_password = password
if not remote_password:
remote_password = connect['password']
if remote_hosttype == 'ftp':
files_found = cls.ftp_file_search(
root_path=remote_hostpath,
search_name=search_name,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
elif remote_hosttype == 'sftp':
files_found = cls.ssh_file_search(
root_path=remote_hostpath,
search_name=search_name,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
elif remote_hosttype == 'smb':
files_found = cls.smb_file_search(
root_path=remote_hostpath,
search_name=search_name,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
else:
files_found = cls.local_file_search(
root_path=search_path,
search_name=search_name,
logger_alias=logger_alias
)
local_logger.debug(msg='files_found:' + '\n' + str(files_found))
return files_found
@classmethod
def file_download(
cls,
src_file: str,
dst_file: str,
hostname: (str, type(None)) = None,
hostport: (int, type(None)) = None,
hosttype: (str, type(None)) = None,
username: (str, type(None)) = None,
password: (str, type(None)) = None,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Interface for file download over FTP, SFTP, SMB.
Args:
src_file (str):
src_file = 'hosttype://username:password@hostname:hostport/remote/path/to/file.'
has lower priority and parameters are overwritten by
src_file='/remote/path/to/file', hostname='hostname', username='username', etc.
dst_file (str):/local/path/to/file.
hostname (str, type, optional): remote hostname.
Defaults to None.
hostport (int, type, optional): remote host connection port.
Defaults to None.
hosttype (str, type, optional): 'ftp', 'sftp', 'smb'.
Defaults to None.
username (str, type, optional): remote username.
Defaults to None.
password (str, type, optional): remote password.
Defaults to None.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), cls.file_download.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'hostname: ' + str(hostname)
+ '\n' + 'hostport: ' + str(hostport)
+ '\n' + 'hosttype: ' + str(hosttype)
+ '\n' + 'username: ' + str(username)
+ '\n' + 'password: ' + str(password)
)
connect = cls.parse_connect_params(connect_string=src_file)
remote_hostpath = connect['hostpath']
remote_hostname = hostname
if not remote_hostname:
remote_hostname = connect['hostname']
remote_hostport = hostport
if not remote_hostport:
remote_hostport = connect['hostport']
remote_hosttype = hosttype
if not remote_hosttype:
remote_hosttype = connect['hosttype']
remote_username = username
if not remote_username:
remote_username = connect['username']
remote_password = password
if not remote_password:
remote_password = connect['password']
if remote_hosttype == 'ftp':
result = cls.ftp_get_file(
src_file=remote_hostpath,
dst_file=dst_file,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
elif remote_hosttype == 'sftp':
result = cls.ssh_get_file(
src_file=remote_hostpath,
dst_file=dst_file,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
elif remote_hosttype == 'smb':
result = cls.smb_get_file(
src_file=remote_hostpath,
dst_file=dst_file,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
else:
result = cls.local_get_file(
src_file=remote_hostpath,
dst_file=dst_file,
logger_alias=logger_alias
)
local_logger.debug(msg='result: ' + '\n' + str(result))
return result
@classmethod
def file_upload(
cls,
src_file: str,
dst_file: str,
hostname: (str, type(None)) = None,
hostport: (int, type(None)) = None,
hosttype: (str, type(None)) = None,
username: (str, type(None)) = None,
password: (str, type(None)) = None,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Interface for file download over FTP, SFTP, SMB.
Args:
src_file (str):/local/path/to/file.
dst_file (str):
dst_file = 'hosttype://username:password@hostname:hostport/remote/path/to/file.'
has lower priority and parameters are overwritten by
dst_file='/remote/path/to/file', hostname='hostname', username='username', etc.
hostname (str, type, optional): remote hostname.
Defaults to None.
hostport (int, type, optional): remote host connection port.
Defaults to None.
hosttype (str, type, optional): 'ftp', 'sftp', 'smb'.
Defaults to None.
username (str, type, optional): remote username.
Defaults to None.
password (str, type, optional): remote password.
Defaults to None.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), cls.file_upload.__annotations__):
local_logger.debug(msg=''
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
+ '\n' + 'hostname: ' + str(hostname)
+ '\n' + 'hostport: ' + str(hostport)
+ '\n' + 'hosttype: ' + str(hosttype)
+ '\n' + 'username: ' + str(username)
+ '\n' + 'password: ' + str(password)
)
connect = cls.parse_connect_params(connect_string=dst_file)
remote_hostpath = connect['hostpath']
remote_hostname = hostname
if not remote_hostname:
remote_hostname = connect['hostname']
remote_hostport = hostport
if not remote_hostport:
remote_hostport = connect['hostport']
remote_hosttype = hosttype
if not remote_hosttype:
remote_hosttype = connect['hosttype']
remote_username = username
if not remote_username:
remote_username = connect['username']
remote_password = password
if not remote_password:
remote_password = connect['password']
if remote_hosttype == 'ftp':
result = cls.ftp_put_file(
src_file=src_file,
dst_file=remote_hostpath,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
elif remote_hosttype == 'sftp':
result = cls.ssh_put_file(
src_file=src_file,
dst_file=remote_hostpath,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
elif remote_hosttype == 'smb':
result = cls.smb_put_file(
src_file=src_file,
dst_file=remote_hostpath,
hostname=remote_hostname,
username=remote_username,
password=remote_password,
port=remote_hostport,
logger_alias=logger_alias
)
else:
result = cls.local_put_file(
src_file=src_file,
dst_file=remote_hostpath,
logger_alias=logger_alias
)
local_logger.debug(msg='result: ' + '\n' + str(result))
return result
class HikISAPI(Connect):
"""Representing Hikvision device with ISAPI.
The class inherits the necessary connection methods of the Connect class
"""
def __init__(
self,
hostname: str,
username: str, userpass: str,
authtype: str = 'digest',
hostport: int = 80, protocol: str = 'http',
channel: int = 101, videoid: int = 1
) -> None:
"""Object constructor.
Args:
hostname (str): camera hostname or ip address.
username (str): camera admin username.
userpass (str): camera admin password.
authtype (str, optional): digest|basic camera authentication type. Defaults to 'digest'.
hostport (int, optional): camera connection port. Defaults to 80.
protocol (str, optional): camera connection protocol. Defaults to 'http'.
channel (int, optional): camera channel id. Defaults to 101.
videoid (int, optional): camera video id. Defaults to 1.
"""
self._host = hostname
self._port = hostport
self._user = username
self._pswd = userpass
self._auth = authtype
self._prot = protocol
self._chan = channel
self._viid = videoid
def __call(
self,
url: str, method: str = 'GET',
contenttype: str = 'application/x-www-form-urlencoded',
contentdata: str = ''
) -> str:
"""Send request to camera.
Args:
url (str): API path for request.
method (str, optional): HTTP request method. Defaults to 'GET'.
contenttype (str, optional): Content-Type header.
Defaults to 'application/x-www-form-urlencoded'.
contentdata (str, optional): data for send with request. Defaults to ''.
Returns:
str: HTTP response content or 'ERROR'.
"""
response = self.http(
url=url, method=method,
username=self._user, password=self._pswd, authtype=self._auth,
contenttype=contenttype, contentdata=contentdata
)
if response['success']:
return response['result']
else:
return 'ERROR'
def capabilities(self, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Get camera capabilities.
Args:
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed. Printing a response with a logger at the INFO level.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._viid) + "/capabilities"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.info(msg='\n' + response + '\n')
return True
else:
return False
def downloadjpeg(
self,
dst_file: str = path.splitext(__file__)[0] + '.jpeg',
x: int = 1920,
y: int = 1080,
logger_alias: str = inspect.stack()[0].function
) -> bool:
"""Get static picture from camera.
Args:
dst_file (str, optional): abs picture's path to save. Defaults to scriptname+'.jpeg'.
x (int, optional): picture width. Defaults to 1920.
y (int, optional): picture height. Defaults to 1080.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/Streaming/channels/" + str(self._viid)
+ "/picture?snapShotImageType=JPEG&videoResolutionWidth="
+ str(x) + "&videoResolutionHeight=" + str(y)
)
with open(dst_file, "wb") as file:
response = self.__call(url=url, method='GET')
if response != 'ERROR':
file.write(response)
local_logger.info(msg=dst_file + ' saved')
return True
else:
return False
def getcamerapos(self, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Get current camera position.
Args:
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed. Printing a response with a logger at the INFO level.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.info(msg='\n' + response + '\n')
return True
else:
return False
def rebootcamera(self, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Set camera reboot command.
Args:
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/System/reboot"
)
response = self.__call(url=url, method="PUT")
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovyyu(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Start camera moving to up.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=TILT_UP&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovyyd(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Start camera moving to down.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=TILT_DOWN&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovxxl(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Start camera moving to left.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=PAN_LEFT&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovxxr(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Start camera moving to right.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=PAN_RIGHT&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovzzi(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Start camera zoom in.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=ZOOM_OUT&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzmovzzo(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Start camera zoom out.
Args:
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=ZOOM_IN&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptzpreset(
self,
preset: int,
speed: int = 1,
logger_alias: str = inspect.stack()[0].function
) -> bool:
"""Start camera moving to preset.
Args:
preset (int): saved preset number.
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=GOTO_PRESET&presetNo=" + str(preset)
+ "&speed=" + str(speed)
+ "&mode=start"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setptztostop(self, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Stop any camera moving.
Args:
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/PTZ/channels/" + str(self._viid)
+ "/PTZControl?command=GOTO_PRESET&mode=stop"
)
response = self.__call(url=url, method='GET')
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setcamerapos(
self,
x: int = 0,
y: int = 0,
z: int = 0,
logger_alias: str = inspect.stack()[0].function
) -> bool:
"""Set camera moving to absolute position.
Args:
x (int, optional): horisontal camera position from 0 to 3600. Defaults to 0.
y (int, optional): vertical camera position from -900 to 2700. Defaults to 0.
z (int, optional): zoom camera position from 0 to 1000. Defaults to 0.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/absolute"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData><AbsoluteHigh>'
+ '<elevation>' + str(y) + '</elevation>'
+ '<azimuth>' + str(x) + '</azimuth>'
+ '<absoluteZoom>' + str(z) + '</absoluteZoom>'
+ '</AbsoluteHigh></PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def __setcameramovcon(
self,
x: int = 0,
y: int = 0,
z: int = 0,
logger_alias: str = inspect.stack()[0].function
) -> bool:
"""Set camera moving to direction until other signal or 180 seconds elapse.
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100.
Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/continuous"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData>'
+ '<pan>' + str(x) + '</pan>'
+ '<tilt>' + str(y) + '</tilt>'
+ '<zoom>' + str(z) + '</zoom>'
+ '</PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def __setcameramovmom(
self,
x: int = 0,
y: int = 0,
z: int = 0,
t: int = 180000,
logger_alias: str = inspect.stack()[0].function
) -> bool:
"""Set camera moving to direction until other signal or duration elapse.
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100.
Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000.
Defaults to 180000.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/momentary"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData>'
+ '<pan>' + str(x) + '</pan>'
+ '<tilt>' + str(y) + '</tilt>'
+ '<zoom>' + str(z) + '</zoom>'
+ '<Momentary><duration>' + str(t) + '</duration></Momentary>'
+ '</PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
sleep(t/1000)
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setcameramov(self, x: int = 0, y: int = 0, z: int = 0, t: int = 0) -> bool:
"""Set camera moving to direction (polymorph abstraction).
Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
Defaults to 0.
y (int, optional): acceleration of vertical camera movement from -100 to 100.
Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000.
Defaults to 0.
Returns:
bool: True if successed.
"""
if t == '-' or int(t) == 0:
return self.__setcameramovcon(x=int(x), y=int(y), z=int(z))
else:
return self.__setcameramovmom(x=int(x), y=int(y), z=int(z), t=int(t))
def setmovtohome(self, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Set camera moving to homeposition.
Args:
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/homeposition/goto"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData></PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def setposashome(self, logger_alias: str = inspect.stack()[0].function) -> bool:
"""Save current camera position as homeposition.
Args:
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
+ "/homeposition"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<PTZData></PTZData>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
def settextonosd(
self,
enabled: str = "true",
x: int = 0,
y: int = 0,
message: str = "",
logger_alias: str = inspect.stack()[0].function
) -> bool:
"""Set message as video overlay text.
Args:
enabled (str, optional): true or false. Defaults to "true".
x (int, optional): horizontal text position from 0 to video width. Defaults to 0.
y (int, optional): vertical text position from 0 to video heith. Defaults to 0.
message (str, optional): overlay text content. Defaults to "".
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
bool: True if successed.
"""
local_logger = logging.getLogger(logger_alias)
if message == '-':
message = ""
url = (
self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/System/Video/inputs/channels/" + str(self._chan)
+ "/overlays/text"
)
xml = ''.join(
'<?xml version="1.0" encoding="UTF-8"?>'
+ '<TextOverlayList version="1.0" xmlns="http://www.hikvision.com/ver10/XMLSchema">'
+ '<TextOverlay>'
+ '<id>1</id>'
+ '<enabled>' + enabled + '</enabled>'
+ '<posX>' + str(x) + '</posX>'
+ '<posY>' + str(y) + '</posY>'
+ '<displayText>' + message + '</displayText>'
+ '</TextOverlay>'
+ '</TextOverlayList>'
)
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
if response != 'ERROR':
local_logger.debug(msg='\n' + response + '\n')
return True
else:
return False
class Sensor(Connect):
"""Representing sensor connected to remote host.
The class inherits the necessary connection methods of the Connect class
"""
def __init__(
self,
hostname: str, username: str, userpass: str,
nodetype: str, nodename: str,
hostport: int = 22
) -> None:
"""Object constructor.
Args:
hostname (str): sensor's remote host hostname or ip address.
username (str): sensor's remote host username.
userpass (str): sensor's remote host password.
nodetype (str): 'ds18b20' or other sensor type.
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
hostport (int, optional): sensor's remote host connection port. Defaults to 22.
"""
self._host = hostname
self._port = hostport
self._user = username
self._pswd = userpass
self._type = nodetype
self._node = nodename
def __call(self, command: str) -> str:
"""Send request to sensor's remote host.
Args:
command (str): command to poll the sensor.
Returns:
str: sensor's remote host response content.
"""
return self.ssh_commands(
command=command,
hostname=self._host, port=self._port,
username=self._user, password=self._pswd
)
# pylint: disable=W0718
def __temperature(self, nodename: str, logger_alias: str = inspect.stack()[0].function) -> str:
"""Preparating request for ds18b20 sensor type.
Args:
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
str: formatted string with temperature in Celsius.
"""
local_logger = logging.getLogger(logger_alias)
command = 'cat /sys/bus/w1/devices/' + nodename + '/temperature'
response = self.__call(command=command)
local_logger.debug(msg=''
+ '\n' + 'host: ' + self._host + ':' + str(self._port)
+ '\n' + 'user: ' + self._user
+ '\n' + 'pass: ' + self._pswd
+ '\n' + 'command: ' + command
)
if response != 'ERROR':
try:
temperature = str(int(response)//1000) + "'C"
return temperature
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return 'ERROR'
return 'ERROR'
def value(self) -> str:
"""Public method to get sensor value.
Returns:
str: sensor value.
"""
if self._type == 'ds18b20':
return self.__temperature(nodename=self._node)
class Wordpress(Connect):
"""Set of methods (functions) for Wordpress API.
Reference: https://developer.wordpress.org/rest-api/reference/
Args:
Connect (_type_): class with 'http' method.
"""
def __init__(
self,
hostname: str,
username: str,
password: str
):
"""Object constructor.
Args:
hostname (str, optional): www.wordpress.site.
username (str, optional): wordpress username.
password (str, optional): wordpress passwrod.
"""
if Do.args_valid(locals(), self.__init__.__annotations__):
self._host = hostname
self._user = username
self._pass = password
self.api_event = 'https://' + self._host + '/wp-json/tribe/events/v1/events'
self.api_media = 'https://' + self._host + '/wp-json/wp/v2/media'
self.api_pages = 'https://' + self._host + '/wp-json/wp/v2/pages'
self.url_files = 'https://' + self._host + '/wp-content/uploads'
def event_create(
self,
title: str,
slug: str,
date_start: str,
date_end: str,
date_publish: str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
all_day: bool = True,
description: str = None,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Create event by 'wp-json' and 'The Events Calendar'.
Args:
title (str, optional): event title.
slug (str, optional): event slug.
date_start (str, optional): '2022-12-31T23:59:59' format.
date_end (str, optional): '2022-12-31T23:59:59' format.
date_publish (_type_, optional): '2022-12-31T23:59:59' format. Defaults to now.
all_day (bool, optional): all day event duration flag. Defaults to True.
description (str, optional): event body. Defaults to None.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Raises:
ValueError: date formate is not 2022-12-31T23:59:59.
ValueError: description can't be empty.
Returns:
dict: {'success':bool,'result':'http/url/to/event'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.event_create.__annotations__):
pattern = "^([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})+$"
if (
not re.fullmatch(pattern, date_start) or
not re.fullmatch(pattern, date_end) or
not re.fullmatch(pattern, date_publish)
):
raise ValueError("date formate is not 2022-12-31T23:59:59")
if description == '':
raise ValueError("description can't be empty")
event_json = {
"title": title,
"slug": slug,
"date": date_publish,
"start_date": date_start,
"end_date": date_end,
"all_day": str(all_day),
"description": description
}
response = self.http(
url=self.api_event,
method='POST',
username=self._user,
password=self._pass,
authtype='basic',
contenttype='application/json; charset=UTF-8',
contentdata=json.dumps(event_json, indent=2)
)
local_logger.debug(msg=""
+ "\n" + "event API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for key, val in json.loads(response['result']).items():
if key == "url":
local_logger.info('event created: %s', val)
return {"success": True, "result": val}
else:
local_logger.warning("event didn't create")
return {"success": False, "result": "ERROR"}
def media_search(
self,
media_name: (str, type(None)) = None,
media_type: (str, type(None)) = None,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Search uploaded media by 'wp-json'.
Args:
media_name (str, type, optional): results matching a string. Defaults to None.
media_type (str, type, optional): application,image,video,audio,text. Defaults to None.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':['list/of/link/to/media']}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.media_search.__annotations__):
url = self.api_media + '?per_page=100'
if media_name:
url = url + '&search=' + media_name
if (media_type == 'application' or
media_type == 'image' or
media_type == 'video' or
media_type == 'audio' or
media_type == 'text'
):
url = url + '&media_type=' + media_type
media_list = []
response = self.http(url=url, method='GET')
local_logger.debug(msg=""
+ "\n" + "media API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for media in json.loads(response['result']):
media_list.append(media['guid']['rendered'])
return {"success": True, "result": media_list}
else:
local_logger.warning("media didn't list")
return {"success": False, "result": "ERROR"}
def media_upload(
self,
mediafile: str,
mediatype: str,
aliasfile: str = '',
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Upload media by 'wp-json'.
Args:
mediafile (str, optional): path to file.
mediatype (str, optional): 'image/jpeg', 'video/mp4', etc.
aliasfile (str, optional): uploaded media name. Defaults to original file.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Raises:
FileExistsError: mediafile is not exist.
Returns:
dict: {'success':bool,'result':'http/url/to/media'}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.media_upload.__annotations__):
if not path.exists(mediafile):
raise FileExistsError(mediafile + " is not exist")
else:
with open(mediafile, mode='rb') as file:
mediadata = file.read()
if aliasfile == '':
aliasfile = path.basename(mediafile)
response = self.http(
url=self.api_media,
method='POST',
username=self._user,
password=self._pass,
authtype='basic',
contenttype=mediatype,
contentdata=mediadata,
headers={
"Accept": "application/json",
'Content-Disposition': 'attachment; filename=' + aliasfile,
'Cache-Control': 'no-cache'
}
)
local_logger.debug(msg=""
+ "\n" + "media API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for key, val in json.loads(response['result']).items():
if key == "source_url":
local_logger.info('media uploaded: %s', val)
return {"success": True, "result": val}
else:
local_logger.warning("media didn't upload")
return {"success": False, "result": "ERROR"}
def pages_read(
self,
page_id: int,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Read page by 'wp-json'.
Args:
page_id (int): unique identifier for the page.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':'page data'}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.pages_read.__annotations__):
page_link = self.api_pages + '/' + str(page_id)
response = self.http(url=page_link)
if response['success']:
local_logger.debug(msg=""
+ "\n" + "wp page API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
return {"success": True, "result": response['result']}
else:
local_logger.warning("wp page didn't read")
return {"success": False, "result": "ERROR"}
def pages_update(
self,
page_id: int,
content: str,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Update page by 'wp-json'.
Args:
page_id (int): unique identifier for the page.
content (str): the content for the page.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':'http/url/to/page'}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.pages_update.__annotations__):
page_link = self.api_pages + '/' + str(page_id)
page_json = {
"content": content
}
response = self.http(
url=page_link,
method='POST',
username=self._user,
password=self._pass,
authtype='basic',
contenttype='application/json; charset=UTF-8',
contentdata=json.dumps(page_json)
)
local_logger.debug(msg=""
+ "\n" + "wp page API response: "
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
)
if response['success']:
for key, val in json.loads(response['result']).items():
if key == "link":
local_logger.info(msg="wp page " + str(page_id) + " updated: " + val)
return {"success": True, "result": val}
else:
local_logger.warning("wp page didn't update")
return {"success": False, "result": "ERROR"}
class Telegram:
"""Set of methods (functions) for Telegram Bot API.
Reference: https://core.telegram.org/bots/api#available-methods
"""
def __init__(self, token: str):
"""Object constructor.
Args:
token (str): Telegram Bot API access token.
"""
if Do.args_valid(locals(), self.__init__.__annotations__):
self._token = token
self.api_root = 'https://api.telegram.org'
self.api_path = self.api_root + '/bot' + self._token
def send_message(
self,
chat: str,
text: str,
parse_mode: str = 'HTML',
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Send text message.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
text (str): text of the message to be sent, 1-4096 characters after entities parsing.
parse_mode (str, optional): 'HTML', 'Markdown', 'MarkdownV2'. Defaults to 'HTML'.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {"success":bool,"result":"API response" or "ERROR"}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.send_message.__annotations__):
url=self.api_path + '/sendMessage'
data = {
"chat_id": chat,
"text": text,
"parse_mode": parse_mode,
"disable_notification": True
}
if '_' in chat:
data["chat_id"] = chat.split('_')[0]
data["message_thread_id"] = chat.split('_')[1]
response = requests.post(url=url, json=data, timeout=15)
if response.status_code == 200:
response = response.json()
local_logger.info(msg=""
+ "message '"
+ str(response['result']['message_id'])
+ "' sent to telegram chat "
+ str(chat)
)
return {'success': True, 'result': response}
else:
local_logger.warning(msg="message didn't send to telegram chat " + str(chat))
return {'success': False, 'result': response}
def delete_message(
self,
chat: str,
message_id: int,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Delete message.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
message_id (int): identifier of the message to delete.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {"success":bool,"result":"API response" or "ERROR"}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.delete_message.__annotations__):
url=self.api_path + '/deleteMessage'
data = {"chat_id": chat, "message_id": message_id}
response = requests.post(url=url, json=data, timeout=15)
if response.status_code == 200:
response = response.json()
local_logger.info(msg=""
+ "message '" + str(message_id) + "' deleted from telegram chat "
+ str(chat)
)
return {'success': True, 'result': response}
else:
local_logger.warning(msg=""
+ "message '" + str(message_id) + "' didn't deleted from telegram chat "
+ str(chat)
)
return {'success': False, 'result': response}
def __send_media(
self,
chat: str,
media_path: str,
media_type: str,
caption: (str, type(None)),
parse_mode: str,
disable_notification: bool,
additional_url_param: (str, type(None)),
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Send media by api.telegram.org.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
media_path (str): /local/path/to/file, https://url/to/file, file_id=EXISTFILEID.
media_type (str): 'document', 'photo', 'video', 'audio'.
caption (str, None): media caption less 1024 characters.
parse_mode (str): caption 'HTML', 'Markdown', 'MarkdownV2' parse mode.
disable_notification (bool): send silently.
additional_url_param (str, None): example: '&duration=30&width=960&height=540'.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Raises:
ValueError: "'media_type' value is wrong"
Returns:
dict: {'success':bool,'result':response}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.__send_media.__annotations__):
if (
media_type == 'document' or
media_type == 'photo' or
media_type == 'video' or
media_type == 'audio'
):
if '_' in chat:
thrd = chat.split('_')[1]
chat = chat.split('_')[0]
url = self.api_path + '/send' + media_type + '?chat_id=' + chat
if thrd:
url = url + '&message_thread_id=' + thrd
else:
raise ValueError("'media_type' value is wrong: " + media_type)
if caption:
url = url + '&caption=' + caption + '&parse_mode=' + parse_mode
if disable_notification:
url = url + "&disable_notification=True"
if additional_url_param:
url = url + additional_url_param
if re.match("^(?:http://|https://|file_id=)", media_path):
media_path = media_path.replace('file_id=', '')
response = requests.post(
url=url + "&" + media_type + "=" + media_path,
timeout=60
)
if response.status_code == 200:
response = response.json()
if media_type == 'photo':
file_id = response['result'][media_type][-1]['file_id']
else:
file_id = response['result'][media_type]['file_id']
local_logger.info(msg=""
+ media_type
+ " '"
+ str(file_id)
+ "' sent to telegram chat "
+ chat
)
return {'success': True, 'result': response}
else:
response = requests.post(
url=url,
files={media_type: open(media_path, "rb")},
timeout=60
)
if response.status_code == 200:
response = response.json()
if media_type == 'photo':
file_id = response['result'][media_type][-1]['file_id']
else:
file_id = response['result'][media_type]['file_id']
local_logger.info(msg=""
+ media_type
+ " '"
+ str(file_id)
+ "' sent to telegram chat "
+ chat
)
return {'success': True, 'result': response}
local_logger.warning(
msg=media_type + " " + media_path + " didn't send to telegram chat " + str(chat)
)
return {'success': False, 'result': response}
def send_document(
self,
chat: str,
document: str,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send document. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_document.__annotations__):
return self.__send_media(
chat=chat,
media_path=document,
media_type='document',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=None
)
def send_photo(
self,
chat: str,
photo: str,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send photo. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_photo.__annotations__):
return self.__send_media(
chat=chat,
media_path=photo,
media_type='photo',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=None
)
def send_video(
self,
chat: str,
video: str,
width: (int, type(None)) = None,
height: (int, type(None)) = None,
duration: (int, type(None)) = None,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send video. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_video.__annotations__):
if width or height or duration:
additional_url_param = ''
if width:
additional_url_param += '&width=' + str(width)
if height:
additional_url_param += '&height=' + str(height)
if duration:
additional_url_param += '&duration=' + str(duration)
else:
additional_url_param = None
return self.__send_media(
chat=chat,
media_path=video,
media_type='video',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=additional_url_param
)
def send_audio(
self,
chat: str,
audio: str,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True
) -> dict:
"""Send audio. See self.__send_media().
"""
if Do.args_valid(locals(), self.send_audio.__annotations__):
return self.__send_media(
chat=chat,
media_path=audio,
media_type='audio',
caption=caption,
parse_mode=parse_mode,
disable_notification=disable_notification,
additional_url_param=None
)
def send_mediagroup(
self,
chat: str,
media: dict,
caption: (str, type(None)) = None,
parse_mode: str = 'HTML',
disable_notification: bool = True,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Send media group of photo, video, audio, documents.
Args:
chat (str): unique identifier for the target chat or username of the target channel.
media (dict): {
name:{'type':'photo',path:'https://url/to/file',caption:text},
name:{'type':'video',path:'/local/path/to/file',caption:text},
name:{'type':'audio',path:'file_id=EXISTFILEID',caption:text},
}.
caption (str, type, optional): media caption less 1024 characters. Defaults to None.
parse_mode (str): caption 'HTML', 'Markdown', 'MarkdownV2' parse mode.
disable_notification (bool, optional): send silently. Defaults to True.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'success':bool,'result':response}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), self.send_mediagroup.__annotations__):
url=self.api_path + '/sendMediaGroup'
files = {}
group = []
for media_name in media.keys():
if re.match("^(?:http://|https://|file_id=)", media[media_name]['path']):
files[media_name] = None
media_source = media[media_name]['path'].replace('file_id=', '')
else:
with open(media[media_name]['path'], mode='rb') as file:
files[media_name] = file.read()
media_source = "attach://" + media_name
if not caption and media[media_name]['caption']:
media_caption = media[media_name]['caption']
else:
media_caption = ''
group.append({
"type": media[media_name]['type'],
"media": media_source,
"caption": media_caption
}
)
if caption:
group[0]['caption'] = caption
group[0]['parse_mode'] = parse_mode
data = {
'chat_id': chat,
'media': json.dumps(group),
"disable_notification": disable_notification
}
if '_' in chat:
data["chat_id"] = chat.split('_')[0]
data["message_thread_id"] = chat.split('_')[1]
response = requests.post(url=url, data=data, files=files, timeout=300)
if response.status_code == 200:
response = response.json()
local_logger.info(msg=""
+ "mediagroup '"
+ str(response['result'][0]['media_group_id'])
+ "' sent to telegram chat "
+ str(chat)
)
return {'success': True, 'result': response}
else:
local_logger.warning(msg="mediagroup didn't send to telegram chat " + str(chat))
return {'success': False, 'result': response}
class Sequence:
"""Sequence handling.
"""
@staticmethod
def run(
device: HikISAPI,
sensors: dict,
sequence: dict,
temp_path: str,
records_root_path: str = None,
records_root_host: str = None,
records_root_port: int = None,
records_root_type: str = None,
records_root_user: str = None,
records_root_pass: str = None,
logger_alias: str = inspect.stack()[0].function
) -> None:
"""Sequences executor.
Args:
device (HikISAPI): HikISAPI object.
sensors (dict): collection as key=sensorname:value=Sensor object.
sequence (dict): sequence steps collection.
temp_path (str): path to directory for temp files.
records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory.
Defaults to None.
records_root_host (str, optional): hostname if path on remote host.
Defaults to None.
records_root_port (int, optional): connection port if path on remote host.
Defaults to None.
records_root_type (str, optional): 'ftp', 'sftp', 'smb' if path on remote host.
Defaults to None.
records_root_user (str, optional): username if path on remote host.
Defaults to None.
records_root_pass (str, optional): password if path on remote host.
Defaults to None.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
"""
local_logger = logging.getLogger(logger_alias)
makedirs(temp_path, exist_ok=True)
for key, value in sequence.items():
action = value.split(',')[0].strip()
x = value.split(',')[1].strip()
y = value.split(',')[2].strip()
z = value.split(',')[3].strip()
p = value.split(',')[4].strip()
s = value.split(',')[5].strip()
t = value.split(',')[6].strip()
w = value.split(',')[7].strip()
m = value.split(',')[8].strip()
if 'sensor-config:' in m:
sensor_name = m.split(':')[1].strip()
sensor_value = sensors[sensor_name].value()
if sensor_value != 'ERROR':
m = sensor_value
else:
local_logger.warning(msg=""
+ "the 'ERROR' " + sensor_name + " value"
+ " has been replaced with an empty string"
)
m = ""
local_logger.info(msg=''
+ 'action:' + key + ' = ' + action
+ ',' + x + ',' + y + ',' + z
+ ',' + p + ',' + s + ',' + t
+ ',' + w + ',' + m
)
if action == 'capabilities':
response = device.capabilities()
elif action == 'getcamerapos':
response = device.getcamerapos()
elif action == 'rebootcamera':
response = device.rebootcamera()
elif action == 'setptzmovyyu':
response = device.setptzmovyyu(speed=int(s))
elif action == 'setptzmovyyd':
response = device.setptzmovyyd(speed=int(s))
elif action == 'setptzmovxxl':
response = device.setptzmovxxl(speed=int(s))
elif action == 'setptzmovxxr':
response = device.setptzmovxxr(speed=int(s))
elif action == 'setptzmovzzi':
response = device.setptzmovzzi(speed=int(s))
elif action == 'setptzmovzzo':
response = device.setptzmovzzo(speed=int(s))
elif action == 'setptzpreset':
response = device.setptzpreset(preset=int(p), speed=int(s))
elif action == 'setptztostop':
response = device.setptztostop()
elif action == 'setcamerapos':
response = device.setcamerapos(x=int(x), y=int(y), z=int(z))
elif action == 'setcameramov':
response = device.setcameramov(x=int(x), y=int(y), z=int(z), t=t)
elif action == 'setmovtohome':
response = device.setmovtohome()
elif action == 'setposashome':
response = device.setposashome()
elif action == 'settextonosd':
response = device.settextonosd(x=int(x), y=int(y), message=m)
elif action == 'downloadjpeg':
dy = datetime.datetime.now().strftime('%Y')
dm = datetime.datetime.now().strftime('%m')
dv = datetime.datetime.now().strftime('%V')
dd = datetime.datetime.now().strftime('%d')
th = datetime.datetime.now().strftime('%H')
tm = datetime.datetime.now().strftime('%M')
ts = datetime.datetime.now().strftime('%S')
records_file_name = (
key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg'
)
if device.downloadjpeg(
x=int(x),
y=int(y),
dst_file=temp_path + sep + records_file_name
):
src_file = temp_path + sep + records_file_name
dst_file = (''
+ records_root_path
+ '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/'
+ records_file_name
)
response = Connect.file_upload(
src_file=src_file,
dst_file=dst_file,
hostname=records_root_host,
hostport=records_root_port,
hosttype=records_root_type,
username=records_root_user,
password=records_root_pass,
logger_alias=logger_alias
)
if response['success']:
try:
remove(src_file)
except OSError as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
response = True
else:
response = False
else:
response = False
if w != '-' or float(w) != 0:
sleep(float(w))
if response:
local_logger.info(msg='result:' + key + ' = OK')
else:
local_logger.warning(msg='result: ' + key + ' = ERROR')
try:
rmdir(temp_path)
except OSError as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
class Convert:
"""Convert handling.
"""
@staticmethod
# pylint: disable=W0612
def run(
image_root_path: (str, list),
image_find_names: list,
video_dest_path: str,
video_dest_sufx: str,
video_scale_x: int,
video_scale_y: int,
video_framerate: int,
video_duration: int,
temp_path: str,
image_root_host: (str, type(None)) = None,
image_root_port: (int, type(None)) = None,
image_root_type: (str, type(None)) = None,
image_root_user: (str, type(None)) = None,
image_root_pass: (str, type(None)) = None,
video_dest_host: (str, type(None)) = None,
video_dest_port: (int, type(None)) = None,
video_dest_type: (str, type(None)) = None,
video_dest_user: (str, type(None)) = None,
video_dest_pass: (str, type(None)) = None,
logger_alias: str = inspect.stack()[0].function
) -> None:
"""Converting executor.
Args:
image_root_path (str, list): source images path.
image_find_names (list): image names to search.
video_dest_path (str): path to destination video.
video_dest_sufx (str): destination video name suffix.
video_scale_x (int): destination video width.
video_scale_y (int): destination video height.
video_framerate (int): destination video frame per second.
video_duration (int): destination video duration.
temp_path (str): path to directory for temp files.
image_root_host (str, None, optional): source images hostname.
Defaults to None.
image_root_port (int, None, optional): source images connection port.
Defaults to None.
image_root_type (str, None, optional): ftp,sftp,smb.
Defaults to None.
image_root_user (str, None, optional): username to source images ftp,sftp,smb path.
Defaults to None.
image_root_pass (str, None, optional): password to source images ftp,sftp,smb path.
Defaults to None.
video_dest_host (str, None, optional): destination video hostname.
Defaults to None.
video_dest_port (int, None, optional): destination video connection port.
Defaults to None.
video_dest_type (str, None, optional): ftp,sftp,smb.
Defaults to None.
video_dest_user (str, None, optional): username to destination video ftp,sftp,smb path.
Defaults to None.
video_dest_pass (str, None, optional): password to destination video ftp,sftp,smb path.
Defaults to None.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
"""
local_logger = logging.getLogger(logger_alias)
if isinstance(image_root_path, str):
image_root_path = [image_root_path]
temp_files = []
for name in image_find_names:
image_found = []
for image_root in image_root_path:
image_found = image_found + Connect.file_search(
search_path=image_root,
search_name=name,
hostname=image_root_host,
hostport=image_root_port,
hosttype=image_root_type,
username=image_root_user,
password=image_root_pass,
logger_alias=logger_alias
)
makedirs(temp_path, exist_ok=True)
for image in image_found:
connect = Connect.parse_connect_params(connect_string=image_root)
if not image_root_host:
image_root_host = connect['hostname']
if not image_root_port:
image_root_port = connect['hostport']
if not image_root_type:
image_root_type = connect['hosttype']
if not image_root_user:
image_root_user = connect['username']
if not image_root_pass:
image_root_pass = connect['password']
if Connect.file_download(
src_file=image,
dst_file=temp_path + sep + path.basename(image),
hostname=image_root_host,
hostport=image_root_port,
hosttype=image_root_type,
username=image_root_user,
password=image_root_pass,
)['success']:
temp_files.append(temp_path + sep + path.basename(image))
image_temp_list = temp_path + sep + 'convert.list'
image_amount = 0
with open(image_temp_list, mode='w+', encoding='UTF-8') as converter_list:
for file in Connect.file_search(search_path=temp_path, search_name=name):
converter_list.write("\nfile '" + file + "'")
image_amount += 1
temp_files.append(image_temp_list)
video_converted = name + '_' + video_dest_sufx + '.mp4'
video_temp_path = temp_path + sep + video_converted
video_conv_conf = (''
+ '-r '
+ str(image_amount) + '/' + str(video_duration)
+ ' -f concat -safe 0 -i '
+ image_temp_list
+ ' -c:v libx264 -vf scale=' + str(video_scale_x) + ':' + str(video_scale_y)
+ ',fps=' + str(video_framerate) + ',format=yuv420p '
+ video_temp_path
+ ' -y'
)
if FFmpeg.run(raw=video_conv_conf, logger_alias=logger_alias) == 0:
temp_files.append(video_temp_path)
if Connect.file_upload(
src_file=video_temp_path,
dst_file=video_dest_path + '/' + video_converted,
hostname=video_dest_host,
hostport=video_dest_port,
hosttype=video_dest_type,
username=video_dest_user,
password=video_dest_pass,
logger_alias=logger_alias
)['success']:
pass
for temp_file in temp_files:
try:
remove(temp_file)
except OSError as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
try:
rmdir(temp_path)
except OSError as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
class Publish:
"""Publish handling.
"""
@classmethod
# pylint: disable=W0612
def run(
cls,
video_root_path: (str, list),
video_find_names: list,
temp_path: str,
publish_period: str,
publish_amount: int,
video_root_host: (str, type(None)) = None,
video_root_port: (int, type(None)) = None,
video_root_type: (str, type(None)) = None,
video_root_user: (str, type(None)) = None,
video_root_pass: (str, type(None)) = None,
wp_site_name: (str, type(None)) = None,
wp_user_name: (str, type(None)) = None,
wp_user_pass: (str, type(None)) = None,
wp_update_page_id: (int, type(None)) = None,
tg_api_key: (str, type(None)) = None,
tg_chat_id: (str, type(None)) = None,
logger_alias: str = inspect.stack()[0].function
) -> None:
"""Publishing executor.
Args:
video_root_path (str, list): source video path.
video_find_names (list): video names to search.
temp_path (str): path to directory for temp files.
publish_period (str): +/- periods.
publish_amount (int): 'y'|'year','m'|'month','w'|'week','d'|'day'.
video_root_host (str, None, optional): source video hostname.
Defaults to None.
video_root_port (int, None, optional): source video connection port.
Defaults to None.
video_root_type (str, None, optional): ftp,sftp,smb.
Defaults to None.
video_root_user (str, None, optional): username to source video ftp,sftp,smb path.
Defaults to None.
video_root_pass (str, None, optional): password to source video ftp,sftp,smb path.
Defaults to None.
wp_site_name (str, None, optional): www.wordpress.site.
Defaults to None.
wp_user_name (str, None, optional): wordpress username.
Defaults to None.
wp_user_pass (str, None, optional): wordpress password.
Defaults to None.
wp_update_page_id (int, None, optional): unique identifier for the page.
Defaults to None.
tg_api_key (str, None, optional): Telegram Bot API access token.
Defaults to None.
tg_chat_id (str, None, optional): unique identifier for
the target chat or username of the target channel.
Defaults to None.
logger_alias (str, optional): sublogger name.
Defaults to function or method name.
"""
local_logger = logging.getLogger(logger_alias)
if isinstance(video_root_path, str):
video_root_path = [video_root_path]
publish_date = Do.date_calc(period=publish_period, amount=publish_amount)
if publish_date['period'] == 'day':
video_find_sufx = (''
+ str(publish_date['start']['y']) + '.'
+ str(publish_date['start']['m']).zfill(2) + '.'
+ str(publish_date['start']['d']).zfill(2)
)
if publish_date['period'] == 'week':
video_find_sufx = (''
+ str(publish_date['start']['y']) + '-w'
+ str(publish_date['start']['w']).zfill(2)
)
if publish_date['period'] == 'month':
video_find_sufx = (''
+ str(publish_date['start']['y']) + '.'
+ str(publish_date['start']['m']).zfill(2)
)
if publish_date['period'] == 'year':
video_find_sufx = (''
+ str(publish_date['start']['y'])
)
temp_path = temp_path + sep + Do.random_string(8)
temp_files = []
video_files = {publish_date['period']:{}}
for name in video_find_names:
video_found = []
for video_root in video_root_path:
video_found = video_found + Connect.file_search(
search_path=video_root,
search_name=name + '_' + video_find_sufx + '.mp4',
hostname=video_root_host,
hostport=video_root_port,
hosttype=video_root_type,
username=video_root_user,
password=video_root_pass,
logger_alias=logger_alias
)
makedirs(temp_path, exist_ok=True)
for video in video_found:
connect = Connect.parse_connect_params(connect_string=video_root)
if not video_root_host:
video_root_host = connect['hostname']
if not video_root_port:
video_root_port = connect['hostport']
if not video_root_type:
video_root_type = connect['hosttype']
if not video_root_user:
video_root_user = connect['username']
if not video_root_pass:
video_root_pass = connect['password']
if Connect.file_download(
src_file=video,
dst_file=temp_path + sep + path.basename(video),
hostname=video_root_host,
hostport=video_root_port,
hosttype=video_root_type,
username=video_root_user,
password=video_root_pass,
logger_alias=logger_alias
)['success']:
temp_files.append(temp_path + sep + path.basename(video))
video_files[publish_period][name] = (
temp_path + sep + path.basename(video)
)
if tg_api_key:
tg = Telegram(tg_api_key)
cls.tg_routine_media(
tg=tg,
targets_media_files=video_files,
period=publish_date['period'],
amount=publish_amount,
chat=tg_chat_id,
logger_alias=logger_alias
)
if wp_site_name:
wp = Wordpress(
hostname=wp_site_name,
username=wp_user_name,
password=wp_user_pass
)
cls.wp_routine_media(
wp=wp,
targets_media_files=video_files,
period=publish_date['period'],
amount=publish_amount,
page_id=wp_update_page_id,
logger_alias=logger_alias
)
for temp_file in temp_files:
try:
remove(temp_file)
except OSError as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
try:
rmdir(temp_path)
except OSError as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
@staticmethod
def wp_routine_media(
wp: Wordpress,
targets_media_files: dict,
period: str,
amount: int,
page_id: int,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Custom Wordpress media routine - upload media, create media events, update media page.
Args:
wp (Wordpress): wordpress object.
targets_media_files (dict): {'period':{'name':'local/path/to/file'}}.
period (str, optional): 'y','m','w','d'.
amount (int, optional): +/- periods.
page_id (int): unique identifier for the page.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
dict: {'media upload': bool, 'event create': bool, 'pages update': bool}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Publish.wp_routine_media.__annotations__):
default_media_links = {
"day": {
"point-01": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mm_.dd_.mp4"
),
"point-02": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mm_.dd_.mp4"
),
"point-04": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mm_.dd_.mp4"
),
"point-05": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mm_.dd_.mp4"
),
"point-11": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mm_.dd_.mp4"
),
"point-12": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mm_.dd_.mp4"
)
},
"week": {
"point-01": "https://www.hmp.today/wp-content/uploads/2022/07/point-01_w.mp4",
"point-02": "https://www.hmp.today/wp-content/uploads/2022/07/point-02_w.mp4",
"point-04": "https://www.hmp.today/wp-content/uploads/2022/07/point-04_w.mp4",
"point-05": "https://www.hmp.today/wp-content/uploads/2022/07/point-05_w.mp4",
"point-11": "https://www.hmp.today/wp-content/uploads/2022/07/point-11_w.mp4",
"point-12": "https://www.hmp.today/wp-content/uploads/2022/07/point-12_w.mp4"
},
"month": {
"point-01": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mm_.mp4"
),
"point-02": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mm_.mp4"
),
"point-04": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mm_.mp4"
),
"point-05": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mm_.mp4"
),
"point-11": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mm_.mp4"
),
"point-12": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mm_.mp4"
)
},
"year": {
"point-01": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mp4"
),
"point-02": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mp4"
),
"point-04": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mp4"
),
"point-05": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mp4"
),
"point-11": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mp4"
),
"point-12": (
"https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mp4"
)
}
}
current_media_links = default_media_links[period]
result = {}
# media upload
result['media upload'] = True
for name, link in current_media_links.items():
file_found = wp.media_search(
media_name=path.basename(targets_media_files[period][name]),
media_type='video'
)
if file_found['success'] and len(file_found['result']) > 0:
local_logger.info(
msg=""
+ "upload skipped, "
+ targets_media_files[period][name]
+ " found on site"
)
targets_media_files[period].pop(name, None)
current_media_links[name] = file_found['result'][0]
else:
file_upload = wp.media_upload(targets_media_files[period][name], 'video/mp4')
if file_upload['success']:
current_media_links[name] = file_upload['result']
else:
result['media upload'] = False
# event create
result['event create'] = True
for name, link in current_media_links.items():
description = (''
+ '<figure class="wp-block-video">'
+ '<video controls loop src="' + link + '"></video>'
+ '</figure>'
)
date = Do.date_calc(period=period, amount=amount)
date_start = (''
+ str(date['start']['y'])
+ '-'
+ str(date['start']['m']).zfill(2)
+ '-'
+ str(date['start']['d']).zfill(2)
+ 'T00:00:00'
)
date_end = (''
+ str(date['end']['y'])
+ '-'
+ str(date['end']['m']).zfill(2)
+ '-'
+ str(date['end']['d']).zfill(2)
+ 'T23:59:59'
)
if period == 'd' or period == 'day':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
+ '-'
+ str(date['start']['m']).zfill(2)
+ '-'
+ str(date['start']['d']).zfill(2)
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
+ '.'
+ str(date['start']['m']).zfill(2)
+ '.'
+ str(date['start']['d']).zfill(2)
)
if period == 'w' or period == 'week':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
+ '-w'
+ str(date['start']['w']).zfill(2)
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
+ '-w'
+ str(date['start']['w']).zfill(2)
)
if period == 'm' or period == 'month':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
+ '-'
+ str(date['start']['m']).zfill(2)
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
+ '.'
+ str(date['start']['m']).zfill(2)
)
if period == 'y' or period == 'year':
slug = (''
+ name
+ '_'
+ str(date['start']['y'])
)
title = (''
+ name
+ ' '
+ str(date['start']['y'])
)
event_api_slug = wp.api_event + '/by-slug/' + slug
if current_media_links[name] != default_media_links[period][name]:
pass
elif Connect.http(event_api_slug)['success']:
local_logger.info(msg="event skipped, " + event_api_slug + " found on site")
else:
event_create = wp.event_create(
title=title,
slug=slug,
date_start=date_start,
date_end=date_end,
date_publish=date_start,
description=description
)
if not event_create['success']:
result['event create'] = False
# pages update
result['pages update'] = True
page_read = wp.pages_read(page_id)
if page_read['success']:
content = json.loads(page_read['result'])['content']['rendered']
for name, link in current_media_links.items():
if period == 'd' or period == 'day':
reg_exp = (""
+ "_(?:[0-9]{4}|yyyy)"
+ ".(?:[0-9]{2}|mm_)"
+ ".(?:[0-9]{2}|dd_)(?:|-[0-9]).mp4"
)
if period == 'w' or period == 'week':
reg_exp = "(?:_[0-9]{4}-w[0-9]{2}|_w)(?:|-[0-9]).mp4"
if period == 'm' or period == 'month':
reg_exp = "_(?:[0-9]{4}|yyyy).(?:[0-9]{2}|mm_)(?:|-[0-9]).mp4"
if period == 'y' or period == 'year':
reg_exp = "_(?:[0-9]{4}|yyyy)(?:|-[0-9]).mp4"
replace = 0
new_str = link
pattern = wp.url_files + "/[0-9]{4}/[0-9]{2}/" + name + reg_exp
for old_str in re.findall(pattern, content):
if old_str == new_str:
local_logger.info(
msg=""
+ "page replace skipped, "
+ new_str
+ " found on page"
)
else:
content = content.replace(old_str, new_str)
replace += 1
local_logger.info(msg="page replace " + old_str + " to " + new_str)
if replace > 0:
page_update = wp.pages_update(page_id = page_id, content = content)
result['pages update'] = page_update['success']
return result
@staticmethod
# pylint: disable=W0612
def tg_routine_media(
tg: Telegram,
targets_media_files: dict,
period: str,
amount: int,
chat: str,
logger_alias: str = inspect.stack()[0].function
) -> dict:
"""Custom Telegram media routine - send mediagroup.
Args:
tg (Telegram): telegram object
targets_media_files (dict): {'period':{'name':'local/path/to/file'}}.
period (str): 'y','m','w','d'.
amount (int): +/- periods.
chat (str): unique identifier for the target chat or username of the target channel.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Raises:
ValueError: filename is not local file.
Returns:
dict: {'success':bool,'result':response}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Publish.tg_routine_media.__annotations__):
default_caption = (""
+ "`period:` yyyy.mm.dd\n"
+ "`source:` https://www.hmp.today/media\n"
+ "`stream:` https://youtu.be/ikB20ML5oyo"
)
group = {
"cover":
{
"type": "photo",
"path": (''
+ 'https://www.hmp.today/wp-content/uploads/'
+ '2021/02/site-slider_hmp-qr_bwg-3840x1705-1.png'
),
"caption": "source: https://www.hmp.today"
}
}
tmp_files = []
date = Do.date_calc(period=period, amount=amount)
if period == 'd' or period == 'day':
caption_date = (''
+ str(date['start']['y'])
+ '.'
+ str(date['start']['m']).zfill(2)
+ '.'
+ str(date['start']['d']).zfill(2)
)
if period == 'w' or period == 'week':
caption_date = (''
+ str(date['start']['y'])
+ '-w'
+ str(date['start']['w']).zfill(2)
)
if period == 'm' or period == 'month':
caption_date = (''
+ str(date['start']['y'])
+ '.'
+ str(date['start']['m']).zfill(2)
)
if period == 'y' or period == 'year':
caption_date = (''
+ str(date['start']['y'])
)
current_caption = default_caption.replace('yyyy.mm.dd', caption_date)
for media_name, media_path in targets_media_files[period].items():
if re.match("^(?:http://|https://|file_id=)", media_path):
raise ValueError(media_name + ' is not local file')
else:
tg_limit_video_size = 50000000
src_video_info = FFmpeg.probe(target=media_path)
if src_video_info:
cur_video_file = media_path
cur_video_duration = int(float(src_video_info['format']['duration']))
cur_video_fps = int(
src_video_info['streams'][0]['avg_frame_rate'].split('/')[0]
)
cur_video_width = int(src_video_info['streams'][0]['width'])
cur_video_height = int(src_video_info['streams'][0]['height'])
if stat(media_path).st_size >= tg_limit_video_size:
tmp_video_bitrate = int(
tg_limit_video_size*0.9/1000*8/cur_video_duration
)
tmp_video_file = media_path.replace('.mp4', '_compressed.mp4')
cur_video_width = int(cur_video_width/2)
cur_video_height = int(cur_video_height/2)
compressing_params = ' '.join(
[
'-i', media_path,
'-c:v libx264 -b:v', str(tmp_video_bitrate) + 'k',
'-vf scale='
+ str(cur_video_width) + ':' + str(cur_video_height)
+ ',fps=' + str(cur_video_fps)
+ ',format=yuv420p -preset veryslow',
tmp_video_file, '-y -loglevel quiet -stats'
]
)
if FFmpeg.run(raw=compressing_params) == 0:
local_logger.info(msg=''
+ media_path + " comressed to " + tmp_video_file
)
tmp_files.append(tmp_video_file)
cur_video_file = tmp_video_file
response_upload = tg.send_video(
chat=chat,
video=cur_video_file,
width=cur_video_width,
height=cur_video_height,
duration=cur_video_duration
)
if response_upload:
response_delete = tg.delete_message(
chat=chat,
message_id=response_upload['result']['result']['message_id']
)
group[media_name] = {
"type": "video",
"path": (
'file_id='
+ response_upload['result']['result']['video']['file_id']
)
}
response_result = tg.send_mediagroup(
chat=chat,
media=group,
caption=current_caption,
parse_mode='Markdown'
)
for file in tmp_files:
if remove(file):
local_logger.info(msg="deleted " + file)
return response_result
class Do():
"""Set of various methods (functions) for routine.
"""
@staticmethod
def random_string(length: int) -> str:
"""Generate string from lowercase letters, uppercase letters, digits.
Args:
length (int): string lenght.
Returns:
str: random string.
"""
return ''.join(choice(ascii_letters + digits) for i in range(length))
@staticmethod
def args_valid(arguments: dict, annotations: dict) -> bool:
"""Arguments type validating by annotations.
Args:
arguments (dict): 'locals()' immediately after starting the function.
annotations (dict): function.name.__annotations__.
Raises:
TypeError: type of argument is not equal type in annotation.
Returns:
bool: True if argument types are valid.
"""
for var_name, var_type in annotations.items():
if not var_name == 'return':
if not isinstance(arguments[var_name], var_type):
raise TypeError(""
+ "type of '"
+ var_name
+ "' = "
+ str(arguments[var_name])
+ " is not "
+ str(var_type)
)
return True
@staticmethod
def date_calc(
target: datetime.date = datetime.datetime.now(),
amount: int = 0,
period: (str, type(None)) = None
) -> dict:
"""Calculating start/end dates for period: day, week, month, year.
Args:
target (datetime.date, optional): date in the calculation period.
Defaults to now.
amount (int, optional): +/- periods.
Defaults to 0.
period (str, type, optional): 'y'|'year','m'|'month','w'|'week','d'|'day'.
Defaults to None.
Raises:
ValueError: 'period' value is wrong.
Returns:
dict: {
'period':day|week|month|year,
'start':{'y':int,'m':int,'w':int,'d':int},
'end':{'y':int,'m':int,'w':int,'d':int}
}.
"""
if Do.args_valid(locals(), Do.date_calc.__annotations__):
date = {}
if not period:
raise ValueError("'period' value is wrong: " + "''")
elif period == 'd' or period == 'day':
delta = target + datetime.timedelta(days=amount)
target = delta
date['period'] = 'day'
elif period == 'w' or period == 'week':
delta = target + datetime.timedelta(weeks=amount)
target_week = str(delta.year) + '-W' + str(delta.isocalendar()[1])
target = datetime.datetime.strptime(target_week + '-1', "%G-W%V-%u")
delta = target + datetime.timedelta(days=6)
date['period'] = 'week'
elif period == 'm' or period == 'month':
delta_month = (target.month + amount) % 12
if not delta_month:
delta_month = 12
delta_year = target.year + ((target.month) + amount - 1) // 12
delta_days = calendar.monthrange(delta_year, delta_month)[1]
delta = target = target.replace(
year=delta_year,
month=delta_month,
day=1
)
delta = delta.replace(
year=delta_year,
month=delta_month,
day=delta_days
)
date['period'] = 'month'
elif period == 'y' or period == 'year':
target = target.replace(
year=target.year + amount,
month=1,
day=1
)
delta = target.replace(
year=target.year,
month=12,
day=31
)
date['period'] = 'year'
else:
raise ValueError("'period' value is wrong: " + period)
date['start'] = {
'y': target.year,
'm': target.month,
'w': target.isocalendar()[1],
'd': target.day
}
date['end'] = {
'y': delta.year,
'm': delta.month,
'w': delta.isocalendar()[1],
'd': delta.day
}
return date
if __name__ == "__main__":
time_start = datetime.datetime.now()
if 'argparse' in modules:
args = ArgumentParser(
prog='cctv-scheduler',
description='Hikvision PTZ IP-Camera management.',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- Python 3 modules: paramiko, smbprotocol '
)
args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf',
required=False,
help='custom configuration file path')
args.add_argument('-b', '--broadcast', action='store_true', required=False,
help='streaming media to destination')
args.add_argument('-s', '--sequences', action='store_true', required=False,
help='run sequences from config file')
args.add_argument('-c', '--converter', action='store_true', required=False,
help='convert JPEG collection to MP4')
args.add_argument('-p', '--publisher', action='store_true', required=False,
help='publish content from templates')
args.add_argument('-d', '--day', type=int, default=0, required=False,
help=('day in amount of days from today, '
+ 'for which the publication or conversion is made')
)
args.add_argument('-w', '--week', type=int, default=0, required=False,
help=('week in amount of weeks from today, '
+ 'for which the publication or conversion is made')
)
args.add_argument('-m', '--month', type=int, default=0, required=False,
help=('month in amount of months from today, '
+ 'for which the publication or conversion is made')
)
args.add_argument('-y', '--year', type=int, default=0, required=False,
help=('year in amount of years from today, '
+ 'for which the publication or conversion is made')
)
args = vars(args.parse_args())
temp_path = path.dirname(path.realpath(__file__)) + sep + 'temp'
log_root = path.dirname(path.realpath(__file__))
log_level = 'INFO'
if path.exists(args['config']):
# common
conf = Parse(parameters=args['config'], block='common')
if 'temp_path' in conf.data:
temp_path = conf.data['temp_path']
if 'log_root' in conf.data:
log_root = conf.data['log_root']
if 'log_level' in conf.data:
if conf.data['log_level'] == 'DEBUG':
log_level = logging.DEBUG
elif conf.data['log_level'] == 'INFO':
log_level = logging.INFO
elif conf.data['log_level'] == 'WARNING':
log_level = logging.WARNING
elif conf.data['log_level'] == 'ERROR':
log_level = logging.ERROR
elif conf.data['log_level'] == 'CRITICAL':
log_level = logging.CRITICAL
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(name)s: %(message)s',
datefmt='%Y-%m-%d_%H.%M.%S',
handlers=[
logging.FileHandler(
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
mode='a'
),
logging.StreamHandler()
],
level=log_level
)
logging.getLogger("paramiko").setLevel(logging.WARNING)
logging.getLogger("smbprotocol").setLevel(logging.WARNING)
if args['broadcast']:
logging.info(msg='Streaming starts...')
broadcasts = {}
conf = Parse(parameters=args['config'], block='enable-broadcast')
for key, value in conf.data.items():
if value == 'true':
broadcast_config = Parse(
parameters=args['config'],
block='broadcast-config:' + key
).data
src = None
if 'src' in broadcast_config:
src = broadcast_config['src']
dst = None
if 'dst' in broadcast_config:
dst = broadcast_config['dst']
fps = None
if 'fps' in broadcast_config:
fps = broadcast_config['fps']
preset = None
if 'preset' in broadcast_config:
preset = broadcast_config['preset']
ffpath = None
if 'ffpath' in broadcast_config:
ffpath = broadcast_config['ffpath']
watchdog = None
if 'watchdog' in broadcast_config:
watchdog = broadcast_config['watchdog']
watchsec = None
if 'watchsec' in broadcast_config:
watchsec = int(broadcast_config['watchsec'])
onlyonce = None
if 'onlyonce' in broadcast_config:
onlyonce = broadcast_config['onlyonce']
exit_code = None
exit_code = FFmpeg.run(
src=src,
dst=dst,
fps=fps,
preset=preset,
ffpath=ffpath,
watchdog=watchdog,
watchsec=watchsec,
onlyonce=onlyonce,
logger_alias='streaming ' + key
)
logging.info(msg=''
+ 'Streaming ' + key + ' finished with exit code: '
+ str(exit_code)
)
elif args['sequences']:
logging.info(msg='Sequence starts...')
sensors = {}
conf = Parse(parameters=args['config'], block='enable-sensor')
for key, value in conf.data.items():
if value == 'true':
device_config = Parse(
parameters=args['config'],
block='sensor-config:' + key
).data
device_entity = Sensor(
hostname=device_config['hostname'],
username=device_config['username'],
userpass=device_config['userpass'],
nodetype=device_config['nodetype'],
nodename=device_config['nodename']
)
sensors[key] = device_entity
conf = Parse(parameters=args['config'], block='enable-sequences')
for key, value in conf.data.items():
if value == 'true':
device_sequence = Parse(
parameters=args['config'],
block='camera-sequences:' + key
).data
device_config = Parse(
parameters=args['config'],
block='camera-config:' + key
).data
device_entity = HikISAPI(
hostname=device_config['hostname'],
username=device_config['username'],
userpass=device_config['userpass']
)
records_root_path = path.dirname(path.realpath(__file__))
records_root_host = None
records_root_port = None
records_root_type = None
records_root_user = None
records_root_pass = None
if 'records_root_path' in device_config:
records_root_path = device_config['records_root_path']
if 'records_root_host' in device_config:
records_root_host = device_config['records_root_host']
if 'records_root_port' in device_config:
records_root_port = int(device_config['records_root_port'])
if 'records_root_type' in device_config:
records_root_type = device_config['records_root_type']
if 'records_root_user' in device_config:
records_root_user = device_config['records_root_user']
if 'records_root_pass' in device_config:
records_root_pass = device_config['records_root_pass']
Sequence.run(
device=device_entity,
sensors=sensors,
sequence=device_sequence,
temp_path=temp_path + sep + Do.random_string(8),
records_root_path=records_root_path,
records_root_host=records_root_host,
records_root_port=records_root_port,
records_root_type=records_root_type,
records_root_user=records_root_user,
records_root_pass=records_root_pass,
logger_alias='sequence ' + key
)
logging.info(msg='Sequence ' + key + ' finished')
elif args['day'] or args['week'] or args['month'] or args['year']:
period = None
amount = None
if args['day']:
period = 'day'
amount = args['day']
if args['week']:
period = 'week'
amount = args['week']
if args['month']:
period = 'month'
amount = args['month']
if args['year']:
period = 'year'
amount = args['year']
period = Do.date_calc(period=period, amount=amount)
if args['converter']:
logging.info(msg='Converting starts...')
conf = Parse(parameters=args['config'], block='enable-convert')
for key, value in conf.data.items():
if value == 'true':
convert_config = Parse(
parameters=args['config'],
block='convert-config:' + key
).data
image_root_path = path.dirname(path.realpath(__file__))
image_root_host = None
image_root_port = None
image_root_type = None
image_root_user = None
image_root_pass = None
image_find_names = None
if 'image_root_path' in convert_config:
image_root_path = convert_config['image_root_path']
if 'image_root_host' in convert_config:
image_root_host = convert_config['image_root_host']
if 'image_root_port' in convert_config:
image_root_port = int(convert_config['image_root_port'])
if 'image_root_type' in convert_config:
image_root_type = convert_config['image_root_type']
if 'image_root_user' in convert_config:
image_root_user = convert_config['image_root_user']
if 'image_root_pass' in convert_config:
image_root_pass = convert_config['image_root_pass']
image_find_names = []
for name in convert_config['image_find_names'].split(','):
image_find_names.append(name.strip())
if period['period'] == 'day':
image_root_path = (''
+ image_root_path + '/'
+ str(period['start']['y']) + '/'
+ str(period['start']['m']).zfill(2) + '/'
+ str(period['start']['w']).zfill(2) + '/'
+ str(period['start']['d']).zfill(2)
)
video_dest_sufx = (''
+ str(period['start']['y']) + '.'
+ str(period['start']['m']).zfill(2) + '.'
+ str(period['start']['d']).zfill(2)
)
video_duration = 1
if period['period'] == 'week':
if period['start']['m'] == period['end']['m']:
image_root_path = (''
+ image_root_path + '/'
+ str(period['start']['y']) + '/'
+ str(period['start']['m']).zfill(2) + '/'
+ str(period['start']['w']).zfill(2)
)
else:
image_root_path = [
(''
+ image_root_path + '/'
+ str(period['start']['y']) + '/'
+ str(period['start']['m']).zfill(2) + '/'
+ str(period['start']['w']).zfill(2)
),
(''
+ image_root_path + '/'
+ str(period['end']['y']) + '/'
+ str(period['end']['m']).zfill(2) + '/'
+ str(period['end']['w']).zfill(2)
)
]
video_dest_sufx = (''
+ str(period['start']['y']) + '-w'
+ str(period['start']['w']).zfill(2)
)
video_duration = 7
if period['period'] == 'month':
image_root_path = (''
+ image_root_path + '/'
+ str(period['start']['y']) + '/'
+ str(period['start']['m']).zfill(2)
)
video_dest_sufx = (''
+ str(period['start']['y']) + '.'
+ str(period['start']['m']).zfill(2)
)
video_duration = 30
if period['period'] == 'year':
image_root_path = (''
+ image_root_path + '/'
+ str(period['start']['y'])
)
video_dest_sufx = (''
+ str(period['start']['y'])
)
video_duration = 360
video_dest_path = path.dirname(path.realpath(__file__))
video_dest_host = None
video_dest_port = None
video_dest_type = None
video_dest_user = None
video_dest_pass = None
if 'video_dest_path' in convert_config:
video_dest_path = convert_config['video_dest_path']
if 'video_dest_host' in convert_config:
video_dest_host = convert_config['video_dest_host']
if 'video_dest_port' in convert_config:
video_dest_port = int(convert_config['video_dest_port'])
if 'video_dest_type' in convert_config:
video_dest_type = convert_config['video_dest_type']
if 'video_dest_user' in convert_config:
video_dest_user = convert_config['video_dest_user']
if 'video_dest_pass' in convert_config:
video_dest_pass = convert_config['video_dest_pass']
Convert.run(
image_root_path=image_root_path,
image_root_host=image_root_host,
image_root_port=image_root_port,
image_root_type=image_root_type,
image_root_user=image_root_user,
image_root_pass=image_root_pass,
image_find_names=image_find_names,
video_dest_path=video_dest_path,
video_dest_sufx=video_dest_sufx,
video_dest_host=video_dest_host,
video_dest_port=video_dest_port,
video_dest_type=video_dest_type,
video_dest_user=video_dest_user,
video_dest_pass=video_dest_pass,
video_scale_x=int(convert_config['video_scale_x']),
video_scale_y=int(convert_config['video_scale_y']),
video_framerate=int(convert_config['video_framerate']),
video_duration=video_duration,
temp_path=temp_path + sep + Do.random_string(8),
logger_alias='converting ' + key
)
logging.info(msg='Converting ' + key + ' finished')
if args['publisher']:
logging.info(msg='Publishing starts...')
conf = Parse(parameters=args['config'], block='enable-publish')
for key, value in conf.data.items():
if value == 'true':
publish_config = Parse(
parameters=args['config'],
block='publish-config:' + key
).data
video_root_path = path.dirname(path.realpath(__file__))
video_root_host = None
video_root_port = None
video_root_type = None
video_root_user = None
video_root_pass = None
video_find_names = None
if 'video_root_path' in publish_config:
video_root_path = publish_config['video_root_path']
if 'video_root_host' in publish_config:
video_root_host = publish_config['video_root_host']
if 'video_root_port' in publish_config:
video_root_port = int(publish_config['video_root_port'])
if 'video_root_type' in publish_config:
video_root_type = publish_config['video_root_type']
if 'video_root_user' in publish_config:
video_root_user = publish_config['video_root_user']
if 'video_root_pass' in publish_config:
video_root_pass = publish_config['video_root_pass']
video_find_names = []
for name in publish_config['video_find_names'].split(','):
video_find_names.append(name.strip())
wp_site_name = None
wp_user_name = None
wp_user_pass = None
wp_update_page_id = None
if ('wp_enabled' in publish_config and
publish_config['wp_enabled'] == 'true'
):
if 'wp_site_name' in publish_config:
wp_site_name = publish_config['wp_site_name']
if 'wp_user_name' in publish_config:
wp_user_name = publish_config['wp_user_name']
if 'wp_user_pass' in publish_config:
wp_user_pass = publish_config['wp_user_pass']
if 'wp_update_page_id' in publish_config:
wp_update_page_id = int(publish_config['wp_update_page_id'])
tg_api_key = None
tg_chat_id = None
if ('tg_enabled' in publish_config and
publish_config['tg_enabled'] == 'true'
):
if 'tg_api_key' in publish_config:
tg_api_key = publish_config['tg_api_key']
if 'tg_chat_id' in publish_config:
tg_chat_id = publish_config['tg_chat_id']
Publish.run(
video_root_path=video_root_path,
video_root_host=video_root_host,
video_root_port=video_root_port,
video_root_type=video_root_type,
video_root_user=video_root_user,
video_root_pass=video_root_pass,
video_find_names=video_find_names,
temp_path=temp_path + sep + Do.random_string(8),
publish_period=period['period'],
publish_amount=amount,
wp_site_name=wp_site_name,
wp_user_name=wp_user_name,
wp_user_pass=wp_user_pass,
wp_update_page_id=wp_update_page_id,
tg_api_key=tg_api_key,
tg_chat_id=tg_chat_id,
logger_alias='publishing ' + key
)
logging.info(msg='Publishing ' + key + ' finished')
else:
logging.info(msg='Start arguments was not selected. Exit.')
time_execute = datetime.datetime.now() - time_start
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')