generated from pavel.muhortov/template-bash
4553 lines
183 KiB
Python
4553 lines
183 KiB
Python
#!/usr/bin/env python3
|
|
# pylint: disable=C0103,C0302,W0621
|
|
|
|
"""It's the main executor of cctv-scheduler. Processes incoming parameters and calls subclasses
|
|
"""
|
|
|
|
import calendar
|
|
import base64
|
|
import datetime
|
|
import inspect
|
|
import json
|
|
import logging
|
|
from random import choice
|
|
import re
|
|
import urllib.request
|
|
from argparse import ArgumentParser
|
|
from ftplib import FTP
|
|
from multiprocessing import Process, Queue
|
|
from os import environ, makedirs, path, remove, rmdir, sep, stat, walk
|
|
from shutil import copyfile
|
|
from string import ascii_letters, digits
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
from sys import modules, platform
|
|
from time import sleep
|
|
import requests
|
|
from paramiko import AutoAddPolicy, SSHClient, SFTPClient
|
|
import smbclient
|
|
|
|
|
|
class Parse:
|
|
"""Parser of configs, arguments, parameters.
|
|
"""
|
|
# pylint: disable=C0123
|
|
def __init__(self, parameters, block: str = None) -> None:
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
parameters: dictionary as "key":"value" or
|
|
ArgumentParser class object or
|
|
string path to the file or
|
|
string as "var1=val1;var2=val2".
|
|
block (str, optional): name of target block from text. Defaults to None.
|
|
"""
|
|
self.path = ''
|
|
self.data = {}
|
|
if type(parameters) is dict:
|
|
self._dict2dict(parameters)
|
|
if type(parameters) is ArgumentParser:
|
|
self._dict2dict(self.argv2dict(parameters))
|
|
if type(parameters) is str:
|
|
if path.exists(parameters):
|
|
self._dict2dict(
|
|
self.strs2dict(
|
|
self.conf2strs(parameters),
|
|
block
|
|
)
|
|
)
|
|
self.path = parameters
|
|
else:
|
|
self._dict2dict(self.strs2dict(parameters, block))
|
|
|
|
def __str__(self) -> str:
|
|
"""Overrides method for print(object).
|
|
|
|
Returns:
|
|
str: string with contents of the object's dictionary.
|
|
"""
|
|
string = ''
|
|
for key, val in self.data.items():
|
|
string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n'
|
|
return string
|
|
|
|
def _dict2dict(self, dictionary: dict) -> None:
|
|
"""Updates or adds dictionary data.
|
|
|
|
Args:
|
|
dictionary (dict): dictionary as "key":"value".
|
|
"""
|
|
self.data.update(dictionary)
|
|
|
|
# pylint: disable=C0206
|
|
def expand(self, store: str = None) -> dict:
|
|
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
|
|
|
|
Args:
|
|
store (str, optional): path to directory with name.conf. Defaults to None.
|
|
|
|
Returns:
|
|
dict: expanded dictionary as "key":{subkey: subval}.
|
|
"""
|
|
for key in self.data:
|
|
if store:
|
|
config = store + sep + self.data[key]
|
|
else:
|
|
config = self.data[key]
|
|
with open(config, encoding='UTF-8') as file:
|
|
self.data[key] = Parse(file.read()).data
|
|
return self.data
|
|
|
|
@classmethod
|
|
def argv2dict(cls, parser: ArgumentParser) -> dict:
|
|
"""Converts startup arguments to a dictionary.
|
|
|
|
Args:
|
|
parser (ArgumentParser): argparse.ArgumentParser class object.
|
|
|
|
Returns:
|
|
dict: dictionary as "key":"value".
|
|
"""
|
|
parser = ArgumentParser(add_help=False, parents=[parser])
|
|
return vars(parser.parse_args())
|
|
|
|
@classmethod
|
|
def conf2strs(cls, config: str) -> str:
|
|
"""Builds a dictionary from a file containing parameters.
|
|
|
|
Args:
|
|
config (str): path to the config file.
|
|
|
|
Returns:
|
|
str: string as "var1=val1;\nvar2=val2;".
|
|
"""
|
|
with open(config, encoding='UTF-8') as file:
|
|
raw = file.read()
|
|
strs = ''
|
|
for line in raw.splitlines():
|
|
if not line.lstrip().startswith('#'):
|
|
strs += line + '\n'
|
|
return strs
|
|
|
|
@classmethod
|
|
def strs2dict(cls, strings: str, blockname: str) -> dict:
|
|
"""Builds a dictionary from a strings containing parameters.
|
|
|
|
Args:
|
|
strings (str): string as "var1=val1;var2=val2;".
|
|
blockname (str): name of target block from text.
|
|
|
|
Returns:
|
|
dict: dictionary as "key":"value".
|
|
"""
|
|
dictionary = {}
|
|
if blockname:
|
|
strings = cls.block(blockname, strings)
|
|
for line in strings.replace('\n', ';').split(';'):
|
|
if not line.lstrip().startswith('#') and "=" in line:
|
|
dictionary[line.split('=')[0].strip()] = (
|
|
line.split('=', maxsplit=1)[1].strip().split(';')[0].strip()
|
|
)
|
|
return dictionary
|
|
|
|
@classmethod
|
|
def str2bool(cls, value: str) -> bool:
|
|
"""Converts a string value to boolean.
|
|
|
|
Args:
|
|
value (str): string containing "true" or "false", "yes" or "no", "1" or "0".
|
|
|
|
Returns:
|
|
bool: bool True or False.
|
|
"""
|
|
return str(value).lower() in ("true", "yes", "1")
|
|
|
|
@classmethod
|
|
def block(cls, blockname: str, text: str) -> str:
|
|
"""Cuts a block of text between line [blockname] and line [next block] or EOF.
|
|
|
|
Args:
|
|
blockname (str): string in [] after which the block starts.
|
|
text (str): string of text from which the block is needed.
|
|
|
|
Returns:
|
|
str: string of text between line [block name] and line [next block].
|
|
"""
|
|
level = 1
|
|
save = False
|
|
result = ''
|
|
for line in text.splitlines():
|
|
if line.startswith('[') and blockname in line:
|
|
level = line.count('[')
|
|
save = True
|
|
elif line.startswith('[') and '['*level in line:
|
|
save = False
|
|
elif save:
|
|
result += line + '\n'
|
|
return result
|
|
|
|
|
|
class Proc:
|
|
"""Find a running process from Python.
|
|
"""
|
|
@classmethod
|
|
# pylint: disable=W0612
|
|
def _list_windows(cls) -> list:
|
|
"""Find all running process with wmi.
|
|
|
|
Returns:
|
|
list: dictionaries with descriptions of found processes.
|
|
"""
|
|
execlist = []
|
|
separate = b'\r\r\n'
|
|
out, err = Popen(
|
|
[
|
|
'wmic', 'process', 'get',
|
|
'CommandLine,ExecutablePath,Name,ProcessId',
|
|
'/format:list'
|
|
],
|
|
stdout=PIPE,
|
|
stderr=PIPE
|
|
).communicate()
|
|
for line in out.split(separate + separate):
|
|
execpid, exename, exepath, cmdline = None, None, None, None
|
|
for subline in line.split(separate):
|
|
if b'ProcessId=' in subline:
|
|
execpid = subline.split(b'=')[1].decode('utf-8')
|
|
if b'Name=' in subline:
|
|
exename = subline.split(b'=')[1].decode('utf-8')
|
|
if b'ExecutablePath=' in subline:
|
|
exepath = subline.split(b'=')[1].decode('utf-8')
|
|
if b'CommandLine=' in subline:
|
|
cmdline = subline.split(b'=')[1].decode('utf-8')
|
|
if execpid and exename:
|
|
execlist.append(
|
|
{
|
|
'execpid': execpid,
|
|
'exename': exename,
|
|
'exepath': exepath,
|
|
'cmdline': cmdline
|
|
}
|
|
)
|
|
return execlist
|
|
|
|
@classmethod
|
|
# pylint: disable=W0612
|
|
def _list_linux(cls) -> list:
|
|
"""Find all running process with ps.
|
|
|
|
Returns:
|
|
list: dictionaries with descriptions of found processes.
|
|
"""
|
|
execlist = []
|
|
out, err = Popen(
|
|
[
|
|
'/bin/ps', '-eo', 'pid,args'
|
|
],
|
|
stdout=PIPE,
|
|
stderr=PIPE
|
|
).communicate()
|
|
for line in out.splitlines():
|
|
execpid = line.split()[0].decode('utf-8')
|
|
exepath = line.split()[1].decode('utf-8')
|
|
exename = path.basename(exepath)
|
|
cmdline = line.split(None, 1)[1].decode('utf-8')
|
|
if execpid and exename:
|
|
execlist.append(
|
|
{
|
|
'execpid': execpid,
|
|
'exename': exename,
|
|
'exepath': exepath,
|
|
'cmdline': cmdline
|
|
}
|
|
)
|
|
return execlist
|
|
|
|
@classmethod
|
|
def list_all(cls) -> list:
|
|
"""Find all running process.
|
|
|
|
Returns:
|
|
list: dictionaries with descriptions of found processes.
|
|
"""
|
|
if platform.startswith('linux') or platform.startswith('darwin'):
|
|
return cls._list_linux()
|
|
elif platform.startswith('win32'):
|
|
return cls._list_windows()
|
|
else:
|
|
return None
|
|
|
|
@classmethod
|
|
# pylint: disable=W0150
|
|
def search(
|
|
cls,
|
|
find: str,
|
|
exclude: str = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> list:
|
|
"""Find specified processes.
|
|
|
|
Args:
|
|
find (str): find process pid, name or arguments.
|
|
exclude (str, optional): exclude process pid, name or arguments. Defaults to None.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
list: dictionaries with descriptions of found processes.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
proc_found = []
|
|
try:
|
|
for proc in cls.list_all():
|
|
if exclude and (
|
|
exclude in proc['execpid'] or
|
|
exclude in proc['exename'] or
|
|
exclude in proc['exepath'] or
|
|
exclude in proc['cmdline']
|
|
):
|
|
pass
|
|
elif (
|
|
find in proc['execpid'] or
|
|
find in proc['exename'] or
|
|
find in proc['exepath'] or
|
|
find in proc['cmdline']
|
|
):
|
|
proc_found.append(proc)
|
|
except TypeError as error:
|
|
local_logger.warning(msg='ON ' + platform + ' PLATFORM search ERROR: ' + error)
|
|
finally:
|
|
if len(proc_found) == 0:
|
|
return None
|
|
else:
|
|
return proc_found
|
|
|
|
@classmethod
|
|
def kill(cls, pid: int) -> None:
|
|
"""Kill the process by means of the OS.
|
|
|
|
Args:
|
|
pid (int): process ID.
|
|
"""
|
|
if platform.startswith('linux') or platform.startswith('darwin'):
|
|
Popen(['kill', '-s', 'SIGKILL', str(pid)])
|
|
elif platform.startswith('win32'):
|
|
Popen(['taskkill', '/PID', str(pid), '/F'])
|
|
|
|
|
|
class FFmpeg:
|
|
"""FFmpeg management from Python.
|
|
"""
|
|
@classmethod
|
|
def run(
|
|
cls,
|
|
src: (str, type(None)) = None,
|
|
dst: str = None,
|
|
fps: int = None,
|
|
preset: str = None,
|
|
raw: (str, type(None)) = None,
|
|
ffpath: str = None,
|
|
watchdog: bool = False,
|
|
watchsec: int = None,
|
|
onlyonce: bool = False,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> int:
|
|
"""Running the installed ffmpeg.
|
|
|
|
Args:
|
|
src (str, type, optional): sources urls, example:
|
|
'rtsp://user:pass@host:554/Streaming/Channels/101, anull'. Defaults to None.
|
|
dst (str, optional): destination url, example: 'rtp://239.0.0.1:5554'. Defaults to None.
|
|
fps (int, optional): frame per second encoding output. Defaults to None.
|
|
preset (str, optional): 240p, 360p, 480p, 720p, 1080p, 1440p, 2160p. Defaults to None.
|
|
raw (str, type, optional): custom ffmpeg parameters string. Defaults to None.
|
|
ffpath (str, optional): custom path to bin, example: /usr/bin/ffmpeg. Defaults to None.
|
|
watchdog (bool, optional): detect ffmpeg freeze and terminate. Defaults to False.
|
|
watchsec (int, optional): seconds to wait before watchdog terminates. Defaults to None.
|
|
onlyonce (bool, optional): detect ffmpeg running copy and terminate. Defaults to False.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
int: ffmpeg return code
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if not raw:
|
|
process = ([]
|
|
+ cls._bin(ffpath).split()
|
|
+ cls._src(src).split()
|
|
+ cls._preset(preset, fps).split()
|
|
+ cls._dst(dst).split()
|
|
)
|
|
else:
|
|
process = cls._bin(ffpath).split() + raw.split()
|
|
|
|
if onlyonce and Proc.search(' '.join(process)):
|
|
local_logger.info(msg='ffmpeg process already exist')
|
|
return 0
|
|
else:
|
|
local_logger.info(msg='Starting ' + ' '.join(process))
|
|
with Popen(process, stdout=PIPE, stderr=STDOUT) as proc:
|
|
que = None
|
|
if watchdog:
|
|
que = Queue()
|
|
Process(
|
|
target=cls._watchdog,
|
|
args=(proc.pid, watchsec, que,),
|
|
daemon=True
|
|
).start()
|
|
for line in proc.stdout:
|
|
if not que:
|
|
local_logger.debug(msg=line)
|
|
else:
|
|
que.put(line)
|
|
return proc.returncode
|
|
|
|
@classmethod
|
|
def _bin(
|
|
cls,
|
|
ffpath: str,
|
|
tool: str = 'ffmpeg',
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> str:
|
|
"""Returns the path to the bin depending on the OS.
|
|
|
|
Args:
|
|
ffpath (str): custom path to bin.
|
|
tool (str, optional): 'ffmpeg', 'ffprobe'. Defaults to 'ffmpeg'.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
str: path to bin or None, if path does not exist.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
faq = (
|
|
'\n'
|
|
'Main download page: https://ffmpeg.org/download.html\n'
|
|
'\n'
|
|
'Install on Linux (Debian):\n'
|
|
'\tsudo apt install -y ffmpeg\n'
|
|
'\tTarget: /usr/bin/ffmpeg\n'
|
|
'\n'
|
|
'Install on Windows:\n'
|
|
'\tDownload and extract: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n'
|
|
'\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n'
|
|
'\n'
|
|
'Install on MacOS:\n'
|
|
'\tDownload and extract: https://evermeet.cx/ffmpeg/\n'
|
|
'\tTarget: /usr/bin/ffmpeg\n'
|
|
)
|
|
if not ffpath:
|
|
if platform.startswith('linux') or platform.startswith('darwin'):
|
|
if tool == 'ffprobe':
|
|
ffpath = '/usr/bin/ffprobe'
|
|
else:
|
|
ffpath = '/usr/bin/ffmpeg'
|
|
elif platform.startswith('win32'):
|
|
if tool == 'ffprobe':
|
|
ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffprobe.exe"
|
|
else:
|
|
ffpath = environ['PROGRAMFILES'] + "\\ffmpeg\\bin\\ffmpeg.exe"
|
|
if path.exists(ffpath):
|
|
return ffpath
|
|
else:
|
|
local_logger.warning(msg='ON ' + platform + ' PLATFORM not found ' + tool + faq)
|
|
return None
|
|
|
|
@classmethod
|
|
def _src(cls, sources: str) -> list:
|
|
"""Parsing sources into ffmpeg format.
|
|
|
|
Args:
|
|
sources (str): comma-separated list of sources in string format.
|
|
|
|
Returns:
|
|
list: ffmpeg format list of sources.
|
|
"""
|
|
list_sources = []
|
|
for src in sources.split(','):
|
|
src = src.strip()
|
|
if 'null' in src:
|
|
src = ' '.join(['-f lavfi -i', src])
|
|
elif 'rtsp' in src:
|
|
src = ' '.join(['-rtsp_transport tcp -i', src])
|
|
else:
|
|
src = ' '.join(['-stream_loop -1 -re -i', src])
|
|
list_sources.append(src)
|
|
return ' '.join(list_sources)
|
|
|
|
@classmethod
|
|
def _preset(cls, choice: str, fps: int) -> str:
|
|
"""Parsing preset into ffmpeg format.
|
|
|
|
Args:
|
|
choice (str): preset selection.
|
|
fps (int): frame per second encoding output.
|
|
|
|
Returns:
|
|
str: ffmpeg format encoding parameters.
|
|
"""
|
|
tune = '-tune zerolatency'
|
|
video = '-c:v copy'
|
|
audio = '-c:a aac -b:a 128k'
|
|
width, height, kbps = None, None, None
|
|
if choice:
|
|
if choice == '240p':
|
|
width, height, kbps = 426, 240, 480
|
|
if choice == '360p':
|
|
width, height, kbps = 640, 360, 720
|
|
if choice == '480p':
|
|
width, height, kbps = 854, 480, 1920
|
|
if choice == '720p':
|
|
width, height, kbps = 1280, 720, 3960
|
|
if choice == '1080p':
|
|
width, height, kbps = 1920, 1080, 5940
|
|
if choice == '1440p':
|
|
width, height, kbps = 2560, 1440, 12960
|
|
if choice == '2160p':
|
|
width, height, kbps = 3840, 2160, 32400
|
|
if width and height and kbps:
|
|
video = ''.join(
|
|
[
|
|
'-vf scale=',
|
|
str(width), ':', str(height),
|
|
',setsar=1:1'
|
|
]
|
|
)
|
|
video = ' '.join(
|
|
[
|
|
video,
|
|
'-c:v libx264 -pix_fmt yuv420p -preset ultrafast'
|
|
]
|
|
)
|
|
if fps:
|
|
video = ' '.join([video, '-r', str(fps), '-g', str(fps * 2)])
|
|
video = ' '.join([video, '-b:v', str(kbps) + 'k'])
|
|
return ' '.join([tune, video, audio])
|
|
|
|
@classmethod
|
|
def _dst(cls, destination: str) -> str:
|
|
"""Parsing destination into ffmpeg format.
|
|
|
|
Args:
|
|
destination (str): destination path or url.
|
|
|
|
Returns:
|
|
str: ffmpeg format destination.
|
|
"""
|
|
container = '-f null'
|
|
stdout = '-v debug' # '-nostdin -nostats' # '-report'
|
|
if destination:
|
|
if 'rtmp' in destination:
|
|
container = '-f flv'
|
|
elif "rtp" in destination:
|
|
container = '-f rtp_mpegts'
|
|
else:
|
|
destination = '-'
|
|
return ' '.join([container, destination, stdout])
|
|
|
|
@classmethod
|
|
def _watchdog(
|
|
cls,
|
|
pid: int,
|
|
sec: int,
|
|
que: Queue = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> None:
|
|
"""If no data arrives in the queue, kill the process.
|
|
|
|
Args:
|
|
pid (int): process ID.
|
|
sec (int): seconds to wait for data.
|
|
que (Queue, optional): queue pointer. Defaults to None.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if not sec:
|
|
sec = 5
|
|
if que:
|
|
while True:
|
|
while not que.empty():
|
|
que_str = que.get()
|
|
local_logger.debug(msg=que_str)
|
|
|
|
trigger = "Delay between the first packet and last packet in the muxing queue"
|
|
if re.match(".*" + trigger + ".*", que_str):
|
|
Proc.kill(pid)
|
|
local_logger.debug(msg='exit by watchdog, reason: ' + trigger)
|
|
sleep(sec)
|
|
if que.empty():
|
|
Proc.kill(pid)
|
|
trigger = "que is empty for " + str(sec) + " seconds"
|
|
local_logger.debug(msg='exit by watchdog, reason: ' + trigger)
|
|
break
|
|
exit()
|
|
|
|
@classmethod
|
|
def probe(
|
|
cls,
|
|
target: (str, type(None)) = None,
|
|
raw: (str, type(None)) = None,
|
|
ffpath: str = None
|
|
) -> (dict, bytes, None):
|
|
"""Running the installed ffprobe.
|
|
|
|
Args:
|
|
target (str, type, optional): media file path to probe. Defaults to None.
|
|
raw (str, type, optional): custom ffprobe parameters string. Defaults to None.
|
|
ffpath (str, optional): custom path to bin, example: /usr/bin/ffprobe. Defaults to None.
|
|
|
|
Returns:
|
|
dict, bytes, None: ffprobe response or None.
|
|
"""
|
|
if not raw:
|
|
command = ([]
|
|
+ cls._bin(ffpath=ffpath, tool='ffprobe').split()
|
|
+ ('-i ' + target
|
|
+ ' -v quiet -print_format json -show_format -show_programs -show_streams').split()
|
|
)
|
|
else:
|
|
command = cls._bin(ffpath=ffpath, tool='ffprobe').split() + raw.split()
|
|
|
|
with Popen(command, stdout=PIPE, stderr=STDOUT) as process:
|
|
result = process.communicate()
|
|
if process.returncode == 0 and not raw:
|
|
return json.loads(result[0].decode('utf-8'))
|
|
elif process.returncode == 0 and raw:
|
|
return result[0]
|
|
else:
|
|
return None
|
|
|
|
|
|
class Connect:
|
|
"""Set of connection methods (functions) for various protocols.
|
|
"""
|
|
@staticmethod
|
|
# pylint: disable=W0102, W0718
|
|
def http(
|
|
url: str,
|
|
method: str = 'GET',
|
|
username: str = '',
|
|
password: str = '',
|
|
authtype: (str, type(None)) = None,
|
|
contenttype: str = 'text/plain',
|
|
contentdata: (str, bytes) = '',
|
|
headers: dict = {},
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Handling HTTP request.
|
|
|
|
Args:
|
|
url (str): Handling HTTP request.
|
|
method (str, optional): HTTP request method. Defaults to 'GET'.
|
|
username (str, optional): username for url authentication. Defaults to ''.
|
|
password (str, optional): password for url authentication. Defaults to ''.
|
|
authtype (str, None, optional): digest|basic authentication type. Defaults to None.
|
|
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
|
|
contentdata (str, bytes, optional): content data. Defaults to ''.
|
|
headers (dict, optional): additional headers. Defaults to {}.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':HTTP response or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.http.__annotations__):
|
|
if contentdata != '':
|
|
headers['Content-Type'] = contenttype
|
|
if isinstance(contentdata, str):
|
|
contentdata = bytes(contentdata.encode('utf-8'))
|
|
|
|
# Preparing authorization
|
|
if authtype:
|
|
pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm()
|
|
pswd.add_password(None, url, username, password)
|
|
if authtype == 'basic':
|
|
auth = urllib.request.HTTPBasicAuthHandler(pswd)
|
|
token = base64.b64encode((username + ':' + password).encode())
|
|
headers['Authorization'] = 'Basic ' + token.decode('utf-8')
|
|
if authtype == 'digest':
|
|
auth = urllib.request.HTTPDigestAuthHandler(pswd)
|
|
urllib.request.install_opener(urllib.request.build_opener(auth))
|
|
|
|
# Preparing request
|
|
request = urllib.request.Request(
|
|
url=url,
|
|
data=contentdata,
|
|
method=method
|
|
)
|
|
for key, val in headers.items():
|
|
request.add_header(key, val)
|
|
if len(contentdata) > 128:
|
|
contentdata = contentdata[:64] + b' ... ' + contentdata[-64:]
|
|
|
|
# Response
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'uri: ' + url
|
|
+ '\n' + 'method: ' + method
|
|
+ '\n' + 'username: ' + username
|
|
+ '\n' + 'password: ' + password
|
|
+ '\n' + 'authtype: ' + str(authtype)
|
|
+ '\n' + 'headers: ' + json.dumps(headers, indent=2)
|
|
+ '\n' + 'content-data: ' + str(contentdata)
|
|
)
|
|
try:
|
|
response = urllib.request.urlopen(request).read()
|
|
|
|
try:
|
|
response = str(response.decode('utf-8'))
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
return {"success": True, "result": response}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def ftp_file_search(
|
|
root_path: str,
|
|
search_name: (str, type(None)) = None,
|
|
ftp: (FTP, type(None)) = None,
|
|
hostname: (str, type(None)) = None,
|
|
username: (str, type(None)) = None,
|
|
password: (str, type(None)) = None,
|
|
port: int = 21,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> list:
|
|
"""Search files over FTP.
|
|
|
|
Args:
|
|
root_path (str): where to search.
|
|
search_name (str, None, optional): full or partial filename for the filter.
|
|
Defaults to None.
|
|
ftp (FTP, None, optional): FTP object generated by recursive search.
|
|
Defaults to None.
|
|
hostname (str, None, optional): ftp hostname.
|
|
Defaults to None.
|
|
username (str, None, optional): ftp username.
|
|
Defaults to None.
|
|
password (str, None, optional): ftp password.
|
|
Defaults to None.
|
|
port (int, optional): remote host connection port.
|
|
Defaults to 21.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
list: list of found files.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.ftp_file_search.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'root_path: ' + root_path
|
|
+ '\n' + 'search_name: ' + str(search_name)
|
|
+ '\n' + 'ftp: ' + str(ftp)
|
|
+ '\n' + 'hostname: ' + str(hostname)
|
|
+ '\n' + 'username: ' + str(username)
|
|
+ '\n' + 'password: ' + str(password)
|
|
)
|
|
parent = False
|
|
if not ftp:
|
|
try:
|
|
ftp = FTP(host=hostname)
|
|
ftp.connect(host=hostname, port=port)
|
|
ftp.login(user=username, passwd=password)
|
|
parent = True
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
|
|
result = []
|
|
ftp.cwd(root_path)
|
|
for file in ftp.mlsd():
|
|
if file[1]['type'] == 'dir':
|
|
result = result + Connect.ftp_file_search(
|
|
root_path=root_path + "/" + file[0],
|
|
search_name=search_name,
|
|
ftp=ftp
|
|
)
|
|
elif file[1]['type'] == 'file':
|
|
if search_name:
|
|
if search_name in file[0]:
|
|
result.append(root_path + "/" + file[0])
|
|
else:
|
|
result.append(root_path + "/" + file[0])
|
|
|
|
if parent:
|
|
ftp.close()
|
|
result.sort()
|
|
return result
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def ftp_get_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 21,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Download file from FTP.
|
|
|
|
Args:
|
|
src_file (str): /remote/path/to/file.
|
|
dst_file (str): /local/path/to/file.
|
|
hostname (str): ftp hostname.
|
|
username (str): ftp username.
|
|
password (str): ftp password.
|
|
port (int, optional): remote host connection port. Defaults to 21.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.ftp_get_file.__annotations__):
|
|
ftp = FTP(host=hostname)
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'hostname: ' + hostname
|
|
+ '\n' + 'username: ' + username
|
|
+ '\n' + 'password: ' + password
|
|
)
|
|
try:
|
|
ftp.connect(host=hostname, port=port)
|
|
ftp.login(user=username, passwd=password)
|
|
with open(dst_file, "wb+") as file:
|
|
ftp.retrbinary(f"RETR {src_file}", file.write)
|
|
ftp.quit()
|
|
local_logger.info(msg=dst_file + ' saved')
|
|
return {"success": True, "result": dst_file}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def ftp_put_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 21,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Upload file to FTP.
|
|
|
|
Args:
|
|
src_file (str): /local/path/to/file.
|
|
dst_file (str): /remote/path/to/file.
|
|
hostname (str): ftp hostname.
|
|
username (str): ftp username.
|
|
password (str): ftp password.
|
|
port (int, optional): remote host connection port. Defaults to 21.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.ftp_put_file.__annotations__):
|
|
dst_path = dst_file.split('/')[:-1]
|
|
ftp = FTP(host=hostname)
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'hostname: ' + hostname
|
|
+ '\n' + 'username: ' + username
|
|
+ '\n' + 'password: ' + password
|
|
)
|
|
try:
|
|
ftp.connect(host=hostname, port=port)
|
|
ftp.login(user=username, passwd=password)
|
|
for path_item in dst_path:
|
|
if path_item.strip() == '':
|
|
continue
|
|
path_item = path_item.replace('/', '')
|
|
try:
|
|
ftp.cwd(path_item)
|
|
except Exception:
|
|
ftp.mkd(path_item)
|
|
ftp.cwd(path_item)
|
|
with open(src_file, "rb") as file:
|
|
ftp.storbinary(f"STOR {dst_file}", file)
|
|
ftp.quit()
|
|
local_logger.info(msg='ftp://' + hostname + dst_file + ' saved')
|
|
return {"success": True, "result": 'ftp://' + hostname + dst_file}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def ssh_commands(
|
|
command: str,
|
|
hostname: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 22,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> str:
|
|
"""Handling SSH command executing.
|
|
|
|
Args:
|
|
command (str): command for executing.
|
|
hostname (str): remote hostname or ip address.
|
|
username (str): remote host username.
|
|
password (str): remote host password.
|
|
port (int, optional): remote host connection port. Defaults to 22.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
str: terminal response or 'ERROR'.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.ssh_commands.__annotations__):
|
|
client = SSHClient()
|
|
client.set_missing_host_key_policy(AutoAddPolicy())
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + username
|
|
+ '\n' + 'pass: ' + password
|
|
+ '\n' + 'command: ' + command
|
|
)
|
|
try:
|
|
client.connect(hostname=hostname, username=username, password=password, port=port)
|
|
stdin, stdout, stderr = client.exec_command(command=command, get_pty=True)
|
|
if 'sudo' in command:
|
|
stdin.write(password + '\n')
|
|
stdin.flush()
|
|
stdout.flush()
|
|
data = stdout.read() + stderr.read()
|
|
client.close()
|
|
return data.decode('utf-8')
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return 'ERROR'
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def ssh_file_search(
|
|
root_path: str,
|
|
search_name: (str, type(None)) = None,
|
|
sftp: (SFTPClient, type(None)) = None,
|
|
hostname: (str, type(None)) = None,
|
|
username: (str, type(None)) = None,
|
|
password: (str, type(None)) = None,
|
|
port: int = 22,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> list:
|
|
"""Search files over SFTP.
|
|
|
|
Args:
|
|
root_path (str): where to search.
|
|
search_name (str, None, optional): full or partial filename for the filter.
|
|
Defaults to None.
|
|
sftp (SFTPClient, None, optional): SFTP object generated by recursive search.
|
|
Defaults to None.
|
|
hostname (str, None, optional): sftp hostname.
|
|
Defaults to None.
|
|
username (str, None, optional): sftp username.
|
|
Defaults to None.
|
|
password (str, None, optional): sftp password.
|
|
Defaults to None.
|
|
port (int, optional): remote host connection port.
|
|
Defaults to 22.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
list: list of found files.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.ssh_file_search.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'root_path: ' + root_path
|
|
+ '\n' + 'search_name: ' + str(search_name)
|
|
+ '\n' + 'sftp: ' + str(sftp)
|
|
+ '\n' + 'hostname: ' + str(hostname)
|
|
+ '\n' + 'username: ' + str(username)
|
|
+ '\n' + 'password: ' + str(password)
|
|
)
|
|
parent = False
|
|
if not sftp:
|
|
try:
|
|
client = SSHClient()
|
|
client.set_missing_host_key_policy(AutoAddPolicy())
|
|
client.connect(
|
|
hostname=hostname,
|
|
username=username,
|
|
password=password,
|
|
port=port
|
|
)
|
|
sftp = client.open_sftp()
|
|
parent = True
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
|
|
result = []
|
|
for file in sftp.listdir(root_path):
|
|
try:
|
|
result = result + Connect.ssh_file_search(
|
|
root_path=root_path + '/' + file,
|
|
search_name=search_name,
|
|
sftp=sftp
|
|
)
|
|
except IOError:
|
|
if search_name:
|
|
if search_name in file:
|
|
result.append(root_path + '/' + file)
|
|
else:
|
|
result.append(root_path + '/' + file)
|
|
|
|
if parent:
|
|
client.close()
|
|
result.sort()
|
|
return result
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def ssh_get_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 22,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Download file from SFTP.
|
|
|
|
Args:
|
|
src_file (str): /remote/path/to/file.
|
|
dst_file (str): /local/path/to/file.
|
|
hostname (str): sftp hostname.
|
|
username (str): sftp username.
|
|
password (str): sftp password.
|
|
port (int, optional): remote host connection port. Defaults to 22.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.ssh_get_file.__annotations__):
|
|
client = SSHClient()
|
|
client.set_missing_host_key_policy(AutoAddPolicy())
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + str(username)
|
|
+ '\n' + 'pass: ' + str(password)
|
|
)
|
|
try:
|
|
client.connect(hostname=hostname, username=username, password=password, port=port)
|
|
with client.open_sftp() as sftp:
|
|
sftp.get(remotepath=src_file, localpath=dst_file)
|
|
client.close()
|
|
local_logger.info(msg=dst_file + ' saved')
|
|
return {"success": True, "result": dst_file}
|
|
except Exception as error:
|
|
remove(dst_file)
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def ssh_put_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 22,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Handling SFTP upload file.
|
|
|
|
Args:
|
|
src_file (str): /local/path/to/file.
|
|
dst_file (str): /remote/path/to/file.
|
|
hostname (str): remote hostname or ip address.
|
|
username (str): remote host username.
|
|
password (str): remote host password.
|
|
port (int, optional): remote host connection port. Defaults to 22.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.ssh_put_file.__annotations__):
|
|
client = SSHClient()
|
|
client.set_missing_host_key_policy(AutoAddPolicy())
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + username
|
|
+ '\n' + 'pass: ' + password
|
|
)
|
|
try:
|
|
client.connect(hostname=hostname, username=username, password=password, port=port)
|
|
client.exec_command('mkdir -p ' + path.dirname(dst_file))
|
|
try:
|
|
sftp = client.open_sftp()
|
|
sftp.put(localpath=src_file, remotepath=dst_file)
|
|
sftp.stat(dst_file)
|
|
sftp.close()
|
|
local_logger.info(msg='sftp://' + hostname + dst_file + ' saved')
|
|
return {"success": True, "result": 'sftp://' + hostname + dst_file}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def smb_file_search(
|
|
root_path: str,
|
|
search_name: (str, type(None)) = None,
|
|
smb: (smbclient._pool.ClientConfig, type(None)) = None,
|
|
hostname: (str, type(None)) = None,
|
|
username: (str, type(None)) = None,
|
|
password: (str, type(None)) = None,
|
|
port: int = 445,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> list:
|
|
"""Search files over SMB.
|
|
|
|
Args:
|
|
root_path (str): where to search.
|
|
search_name (str, None, optional): full or partial filename for the filter.
|
|
Defaults to None.
|
|
smb (smbclient.ClientConfig, None, optional): object generated by recursive search.
|
|
Defaults to None.
|
|
hostname (str, None, optional): smb hostname.
|
|
Defaults to None.
|
|
username (str, None, optional): smb username.
|
|
Defaults to None.
|
|
password (str, None, optional): smb password.
|
|
Defaults to None.
|
|
port (int, optional): remote host connection port.
|
|
Defaults to 445.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
list: list of found files.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.smb_file_search.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'root_path: ' + root_path
|
|
+ '\n' + 'search_name: ' + str(search_name)
|
|
+ '\n' + 'sftp: ' + str(smb)
|
|
+ '\n' + 'hostname: ' + str(hostname)
|
|
+ '\n' + 'username: ' + str(username)
|
|
+ '\n' + 'password: ' + str(password)
|
|
)
|
|
parent = False
|
|
if not smb:
|
|
try:
|
|
smb = smbclient.ClientConfig(username=username, password=password)
|
|
parent = True
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
|
|
result = []
|
|
for file in smbclient.listdir(path='//' + hostname + root_path, port=port):
|
|
try:
|
|
result = result + Connect.smb_file_search(
|
|
root_path=root_path + '/' + file,
|
|
search_name=search_name,
|
|
hostname=hostname,
|
|
smb=smb
|
|
)
|
|
except IOError:
|
|
if search_name:
|
|
if search_name in file:
|
|
result.append(root_path + '/' + file)
|
|
else:
|
|
result.append(root_path + '/' + file)
|
|
|
|
if parent:
|
|
smbclient.reset_connection_cache()
|
|
result.sort()
|
|
return result
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def smb_get_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 445,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Download file from SMB.
|
|
|
|
Args:
|
|
src_file (str): /remote/path/to/file.
|
|
dst_file (str): /local/path/to/file.
|
|
hostname (str): smb hostname.
|
|
username (str): smb username.
|
|
password (str): smb password.
|
|
port (int, optional): remote host connection port. Defaults to 445.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.smb_get_file.__annotations__):
|
|
smbclient.ClientConfig(username=username, password=password)
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + str(username)
|
|
+ '\n' + 'pass: ' + str(password)
|
|
)
|
|
src_file = '//' + hostname + src_file
|
|
try:
|
|
with smbclient.open_file(src_file, mode="rb", port=port) as remote_file:
|
|
with open(dst_file, "wb+") as local_file:
|
|
local_file.write(remote_file.read())
|
|
smbclient.reset_connection_cache()
|
|
local_logger.info(msg=dst_file + ' saved')
|
|
return {"success": True, "result": dst_file}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def smb_put_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: str,
|
|
username: str,
|
|
password: str,
|
|
port: int = 445,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Upload file to SMB.
|
|
|
|
Args:
|
|
src_file (str): /local/path/to/file.
|
|
dst_file (str): /remote/path/to/file.
|
|
hostname (str): smb hostname.
|
|
username (str): smb username.
|
|
password (str): smb password.
|
|
port (int, optional): remote host connection port. Defaults to 445.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.smb_get_file.__annotations__):
|
|
smbclient.ClientConfig(username=username, password=password)
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'host: ' + hostname + ':' + str(port)
|
|
+ '\n' + 'user: ' + str(username)
|
|
+ '\n' + 'pass: ' + str(password)
|
|
)
|
|
dst_file = '//' + hostname + dst_file
|
|
try:
|
|
if not smbclient.path.exists(path.dirname(dst_file)):
|
|
smbclient.mkdir(path=path.dirname(dst_file))
|
|
with smbclient.open_file(dst_file, mode="wb+", port=port) as remote_file:
|
|
with open(src_file, "rb") as local_file:
|
|
remote_file.write(local_file.read())
|
|
smbclient.reset_connection_cache()
|
|
local_logger.info(msg='smb:' + dst_file + ' saved')
|
|
return {"success": True, "result": 'smb:' + dst_file}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0612
|
|
def local_file_search(
|
|
root_path: str,
|
|
search_name: (str, type(None)) = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> list:
|
|
"""Search local files.
|
|
|
|
Args:
|
|
root_path (str): where to search.
|
|
search_name (str, None, optional): full or partial filename for the filter.
|
|
Defaults to None.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
list: list of found files.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.local_file_search.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'root_path: ' + root_path
|
|
+ '\n' + 'search_name: ' + str(search_name)
|
|
)
|
|
result = []
|
|
for root, dirs, files in walk(root_path, topdown=False):
|
|
for file in files:
|
|
if search_name:
|
|
if search_name in file:
|
|
result.append(path.join(path.realpath(root), file))
|
|
else:
|
|
result.append(path.join(path.realpath(root), file))
|
|
result.sort()
|
|
return result
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def local_get_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Like download local file. Interface plug.
|
|
|
|
Args:
|
|
src_file (str): /local/path/to/src/file.
|
|
dst_file (str): /local/path/to/dst/file.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.local_get_file.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
)
|
|
try:
|
|
makedirs(path.dirname(dst_file), exist_ok=True)
|
|
copyfile(src=src_file, dst=dst_file)
|
|
return {"success": True, "result": dst_file}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0718
|
|
def local_put_file(
|
|
src_file: str,
|
|
dst_file: str,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Like upload local file. Interface plug.
|
|
|
|
Args:
|
|
src_file (str): /local/path/to/src/file.
|
|
dst_file (str): /local/path/to/dst/file.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.local_put_file.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
)
|
|
try:
|
|
makedirs(path.dirname(dst_file), exist_ok=True)
|
|
copyfile(src=src_file, dst=dst_file)
|
|
return {"success": True, "result": dst_file}
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
@staticmethod
|
|
def parse_connect_params(
|
|
connect_string: str,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""One line connection string separator.
|
|
|
|
Args:
|
|
connect_string (str): hosttype://username:password@hostname:hostport/some/path.
|
|
logger_alias (str, optional): logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {
|
|
'hostpath': remote_hostpath,
|
|
'hostname': remote_hostname,
|
|
'hostport': remote_hostport,
|
|
'hosttype': remote_hosttype,
|
|
'username': remote_username,
|
|
'password': remote_password
|
|
}
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Connect.parse_connect_params.__annotations__):
|
|
remote_hostpath = None
|
|
remote_hostname = None
|
|
remote_hostport = None
|
|
remote_hosttype = None
|
|
remote_username = None
|
|
remote_password = None
|
|
if '://' in connect_string:
|
|
remote_hostname = connect_string.split('/')[2]
|
|
remote_hosttype = connect_string.split('://')[0]
|
|
remote_hostpath = connect_string.replace(
|
|
remote_hosttype + '://' + remote_hostname, ''
|
|
)
|
|
if '@' in remote_hostname:
|
|
remote_username = remote_hostname.split('@')[0].split(':')[0]
|
|
remote_password = remote_hostname.split('@')[0].split(':')[1]
|
|
remote_hostname = remote_hostname.split('@')[1]
|
|
if ':' in remote_hostname:
|
|
remote_hostport = int(remote_hostname.split(':')[1])
|
|
remote_hostname = remote_hostname.split(':')[0]
|
|
else:
|
|
remote_hostpath = connect_string
|
|
remote_hosttype = 'local'
|
|
if not remote_hostport:
|
|
if remote_hosttype == 'ftp':
|
|
remote_hostport = 21
|
|
if remote_hosttype == 'sftp':
|
|
remote_hostport = 22
|
|
if remote_hosttype == 'smb':
|
|
remote_hostport = 445
|
|
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'hostpath: ' + str(remote_hostpath)
|
|
+ '\n' + 'hostname: ' + str(remote_hostname)
|
|
+ '\n' + 'hostport: ' + str(remote_hostport)
|
|
+ '\n' + 'hosttype: ' + str(remote_hosttype)
|
|
+ '\n' + 'username: ' + str(remote_username)
|
|
+ '\n' + 'password: ' + str(remote_password)
|
|
)
|
|
return {
|
|
'hostpath': remote_hostpath,
|
|
'hostname': remote_hostname,
|
|
'hostport': remote_hostport,
|
|
'hosttype': remote_hosttype,
|
|
'username': remote_username,
|
|
'password': remote_password
|
|
}
|
|
|
|
@classmethod
|
|
def file_search(
|
|
cls,
|
|
search_path: str,
|
|
search_name: (str, type(None)) = None,
|
|
hostname: (str, type(None)) = None,
|
|
hostport: (int, type(None)) = None,
|
|
hosttype: (str, type(None)) = None,
|
|
username: (str, type(None)) = None,
|
|
password: (str, type(None)) = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> list:
|
|
"""Interface for file search over FTP, SFTP, SMB.
|
|
|
|
Args:
|
|
search_path (str): where to search.
|
|
search_path = 'hosttype://username:password@hostname:hostport/some/path'
|
|
has lower priority and parameters are overwritten by
|
|
search_path='/some/path', hostname='hostname', username='username', etc.
|
|
search_name (str, type, optional): full or partial filename for the filter.
|
|
Defaults to None.
|
|
hostname (str, type, optional): remote hostname.
|
|
Defaults to None.
|
|
hostport (int, type, optional): remote host connection port.
|
|
Defaults to None.
|
|
hosttype (str, type, optional): 'ftp', 'sftp', 'smb'.
|
|
Defaults to None.
|
|
username (str, type, optional): remote username.
|
|
Defaults to None.
|
|
password (str, type, optional): remote password.
|
|
Defaults to None.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
list: list of found files.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), cls.file_search.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'search_path: ' + search_path
|
|
+ '\n' + 'search_name: ' + str(search_name)
|
|
+ '\n' + 'hostname: ' + str(hostname)
|
|
+ '\n' + 'hostport: ' + str(hostport)
|
|
+ '\n' + 'hosttype: ' + str(hosttype)
|
|
+ '\n' + 'username: ' + str(username)
|
|
+ '\n' + 'password: ' + str(password)
|
|
)
|
|
|
|
connect = cls.parse_connect_params(connect_string=search_path)
|
|
remote_hostpath = connect['hostpath']
|
|
remote_hostname = hostname
|
|
if not remote_hostname:
|
|
remote_hostname = connect['hostname']
|
|
remote_hostport = hostport
|
|
if not remote_hostport:
|
|
remote_hostport = connect['hostport']
|
|
remote_hosttype = hosttype
|
|
if not remote_hosttype:
|
|
remote_hosttype = connect['hosttype']
|
|
remote_username = username
|
|
if not remote_username:
|
|
remote_username = connect['username']
|
|
remote_password = password
|
|
if not remote_password:
|
|
remote_password = connect['password']
|
|
|
|
if remote_hosttype == 'ftp':
|
|
files_found = cls.ftp_file_search(
|
|
root_path=remote_hostpath,
|
|
search_name=search_name,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
elif remote_hosttype == 'sftp':
|
|
files_found = cls.ssh_file_search(
|
|
root_path=remote_hostpath,
|
|
search_name=search_name,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
elif remote_hosttype == 'smb':
|
|
files_found = cls.smb_file_search(
|
|
root_path=remote_hostpath,
|
|
search_name=search_name,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
else:
|
|
files_found = cls.local_file_search(
|
|
root_path=search_path,
|
|
search_name=search_name,
|
|
logger_alias=logger_alias
|
|
)
|
|
|
|
local_logger.debug(msg='files_found:' + '\n' + str(files_found))
|
|
return files_found
|
|
|
|
@classmethod
|
|
def file_download(
|
|
cls,
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: (str, type(None)) = None,
|
|
hostport: (int, type(None)) = None,
|
|
hosttype: (str, type(None)) = None,
|
|
username: (str, type(None)) = None,
|
|
password: (str, type(None)) = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Interface for file download over FTP, SFTP, SMB.
|
|
|
|
Args:
|
|
src_file (str):
|
|
src_file = 'hosttype://username:password@hostname:hostport/remote/path/to/file.'
|
|
has lower priority and parameters are overwritten by
|
|
src_file='/remote/path/to/file', hostname='hostname', username='username', etc.
|
|
dst_file (str):/local/path/to/file.
|
|
hostname (str, type, optional): remote hostname.
|
|
Defaults to None.
|
|
hostport (int, type, optional): remote host connection port.
|
|
Defaults to None.
|
|
hosttype (str, type, optional): 'ftp', 'sftp', 'smb'.
|
|
Defaults to None.
|
|
username (str, type, optional): remote username.
|
|
Defaults to None.
|
|
password (str, type, optional): remote password.
|
|
Defaults to None.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/local/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), cls.file_download.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'hostname: ' + str(hostname)
|
|
+ '\n' + 'hostport: ' + str(hostport)
|
|
+ '\n' + 'hosttype: ' + str(hosttype)
|
|
+ '\n' + 'username: ' + str(username)
|
|
+ '\n' + 'password: ' + str(password)
|
|
)
|
|
|
|
connect = cls.parse_connect_params(connect_string=src_file)
|
|
remote_hostpath = connect['hostpath']
|
|
remote_hostname = hostname
|
|
if not remote_hostname:
|
|
remote_hostname = connect['hostname']
|
|
remote_hostport = hostport
|
|
if not remote_hostport:
|
|
remote_hostport = connect['hostport']
|
|
remote_hosttype = hosttype
|
|
if not remote_hosttype:
|
|
remote_hosttype = connect['hosttype']
|
|
remote_username = username
|
|
if not remote_username:
|
|
remote_username = connect['username']
|
|
remote_password = password
|
|
if not remote_password:
|
|
remote_password = connect['password']
|
|
|
|
if remote_hosttype == 'ftp':
|
|
result = cls.ftp_get_file(
|
|
src_file=remote_hostpath,
|
|
dst_file=dst_file,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
elif remote_hosttype == 'sftp':
|
|
result = cls.ssh_get_file(
|
|
src_file=remote_hostpath,
|
|
dst_file=dst_file,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
elif remote_hosttype == 'smb':
|
|
result = cls.smb_get_file(
|
|
src_file=remote_hostpath,
|
|
dst_file=dst_file,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
else:
|
|
result = cls.local_get_file(
|
|
src_file=remote_hostpath,
|
|
dst_file=dst_file,
|
|
logger_alias=logger_alias
|
|
)
|
|
|
|
local_logger.debug(msg='result: ' + '\n' + str(result))
|
|
return result
|
|
|
|
@classmethod
|
|
def file_upload(
|
|
cls,
|
|
src_file: str,
|
|
dst_file: str,
|
|
hostname: (str, type(None)) = None,
|
|
hostport: (int, type(None)) = None,
|
|
hosttype: (str, type(None)) = None,
|
|
username: (str, type(None)) = None,
|
|
password: (str, type(None)) = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Interface for file download over FTP, SFTP, SMB.
|
|
|
|
Args:
|
|
src_file (str):/local/path/to/file.
|
|
dst_file (str):
|
|
dst_file = 'hosttype://username:password@hostname:hostport/remote/path/to/file.'
|
|
has lower priority and parameters are overwritten by
|
|
dst_file='/remote/path/to/file', hostname='hostname', username='username', etc.
|
|
hostname (str, type, optional): remote hostname.
|
|
Defaults to None.
|
|
hostport (int, type, optional): remote host connection port.
|
|
Defaults to None.
|
|
hosttype (str, type, optional): 'ftp', 'sftp', 'smb'.
|
|
Defaults to None.
|
|
username (str, type, optional): remote username.
|
|
Defaults to None.
|
|
password (str, type, optional): remote password.
|
|
Defaults to None.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':/remote/path/to/file or 'ERROR'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), cls.file_upload.__annotations__):
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'src_file: ' + src_file
|
|
+ '\n' + 'dst_file: ' + dst_file
|
|
+ '\n' + 'hostname: ' + str(hostname)
|
|
+ '\n' + 'hostport: ' + str(hostport)
|
|
+ '\n' + 'hosttype: ' + str(hosttype)
|
|
+ '\n' + 'username: ' + str(username)
|
|
+ '\n' + 'password: ' + str(password)
|
|
)
|
|
|
|
connect = cls.parse_connect_params(connect_string=dst_file)
|
|
remote_hostpath = connect['hostpath']
|
|
remote_hostname = hostname
|
|
if not remote_hostname:
|
|
remote_hostname = connect['hostname']
|
|
remote_hostport = hostport
|
|
if not remote_hostport:
|
|
remote_hostport = connect['hostport']
|
|
remote_hosttype = hosttype
|
|
if not remote_hosttype:
|
|
remote_hosttype = connect['hosttype']
|
|
remote_username = username
|
|
if not remote_username:
|
|
remote_username = connect['username']
|
|
remote_password = password
|
|
if not remote_password:
|
|
remote_password = connect['password']
|
|
|
|
if remote_hosttype == 'ftp':
|
|
result = cls.ftp_put_file(
|
|
src_file=src_file,
|
|
dst_file=remote_hostpath,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
elif remote_hosttype == 'sftp':
|
|
result = cls.ssh_put_file(
|
|
src_file=src_file,
|
|
dst_file=remote_hostpath,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
elif remote_hosttype == 'smb':
|
|
result = cls.smb_put_file(
|
|
src_file=src_file,
|
|
dst_file=remote_hostpath,
|
|
hostname=remote_hostname,
|
|
username=remote_username,
|
|
password=remote_password,
|
|
port=remote_hostport,
|
|
logger_alias=logger_alias
|
|
)
|
|
else:
|
|
result = cls.local_put_file(
|
|
src_file=src_file,
|
|
dst_file=remote_hostpath,
|
|
logger_alias=logger_alias
|
|
)
|
|
|
|
local_logger.debug(msg='result: ' + '\n' + str(result))
|
|
return result
|
|
|
|
|
|
class HikISAPI(Connect):
|
|
"""Representing Hikvision device with ISAPI.
|
|
The class inherits the necessary connection methods of the Connect class
|
|
"""
|
|
def __init__(
|
|
self,
|
|
hostname: str,
|
|
username: str, userpass: str,
|
|
authtype: str = 'digest',
|
|
hostport: int = 80, protocol: str = 'http',
|
|
channel: int = 101, videoid: int = 1
|
|
) -> None:
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
hostname (str): camera hostname or ip address.
|
|
username (str): camera admin username.
|
|
userpass (str): camera admin password.
|
|
authtype (str, optional): digest|basic camera authentication type. Defaults to 'digest'.
|
|
hostport (int, optional): camera connection port. Defaults to 80.
|
|
protocol (str, optional): camera connection protocol. Defaults to 'http'.
|
|
channel (int, optional): camera channel id. Defaults to 101.
|
|
videoid (int, optional): camera video id. Defaults to 1.
|
|
"""
|
|
self._host = hostname
|
|
self._port = hostport
|
|
self._user = username
|
|
self._pswd = userpass
|
|
self._auth = authtype
|
|
self._prot = protocol
|
|
self._chan = channel
|
|
self._viid = videoid
|
|
|
|
def __call(
|
|
self,
|
|
url: str, method: str = 'GET',
|
|
contenttype: str = 'application/x-www-form-urlencoded',
|
|
contentdata: str = ''
|
|
) -> str:
|
|
"""Send request to camera.
|
|
|
|
Args:
|
|
url (str): API path for request.
|
|
method (str, optional): HTTP request method. Defaults to 'GET'.
|
|
contenttype (str, optional): Content-Type header.
|
|
Defaults to 'application/x-www-form-urlencoded'.
|
|
contentdata (str, optional): data for send with request. Defaults to ''.
|
|
|
|
Returns:
|
|
str: HTTP response content or 'ERROR'.
|
|
"""
|
|
response = self.http(
|
|
url=url, method=method,
|
|
username=self._user, password=self._pswd, authtype=self._auth,
|
|
contenttype=contenttype, contentdata=contentdata
|
|
)
|
|
if response['success']:
|
|
return response['result']
|
|
else:
|
|
return 'ERROR'
|
|
|
|
def capabilities(self, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Get camera capabilities.
|
|
|
|
Args:
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed. Printing a response with a logger at the INFO level.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._viid) + "/capabilities"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.info(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def downloadjpeg(
|
|
self,
|
|
dst_file: str = path.splitext(__file__)[0] + '.jpeg',
|
|
x: int = 1920,
|
|
y: int = 1080,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> bool:
|
|
"""Get static picture from camera.
|
|
|
|
Args:
|
|
dst_file (str, optional): abs picture's path to save. Defaults to scriptname+'.jpeg'.
|
|
x (int, optional): picture width. Defaults to 1920.
|
|
y (int, optional): picture height. Defaults to 1080.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/Streaming/channels/" + str(self._viid)
|
|
+ "/picture?snapShotImageType=JPEG&videoResolutionWidth="
|
|
+ str(x) + "&videoResolutionHeight=" + str(y)
|
|
)
|
|
with open(dst_file, "wb") as file:
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
file.write(response)
|
|
local_logger.info(msg=dst_file + ' saved')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def getcamerapos(self, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Get current camera position.
|
|
|
|
Args:
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed. Printing a response with a logger at the INFO level.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.info(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def rebootcamera(self, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Set camera reboot command.
|
|
|
|
Args:
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/System/reboot"
|
|
)
|
|
response = self.__call(url=url, method="PUT")
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovyyu(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Start camera moving to up.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=TILT_UP&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovyyd(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Start camera moving to down.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=TILT_DOWN&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovxxl(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Start camera moving to left.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=PAN_LEFT&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovxxr(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Start camera moving to right.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=PAN_RIGHT&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovzzi(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Start camera zoom in.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=ZOOM_OUT&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzmovzzo(self, speed: int = 1, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Start camera zoom out.
|
|
|
|
Args:
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=ZOOM_IN&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptzpreset(
|
|
self,
|
|
preset: int,
|
|
speed: int = 1,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> bool:
|
|
"""Start camera moving to preset.
|
|
|
|
Args:
|
|
preset (int): saved preset number.
|
|
speed (int, optional): moving speed from 1 to 7. Defaults to 1.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=GOTO_PRESET&presetNo=" + str(preset)
|
|
+ "&speed=" + str(speed)
|
|
+ "&mode=start"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setptztostop(self, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Stop any camera moving.
|
|
|
|
Args:
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/PTZ/channels/" + str(self._viid)
|
|
+ "/PTZControl?command=GOTO_PRESET&mode=stop"
|
|
)
|
|
response = self.__call(url=url, method='GET')
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setcamerapos(
|
|
self,
|
|
x: int = 0,
|
|
y: int = 0,
|
|
z: int = 0,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> bool:
|
|
"""Set camera moving to absolute position.
|
|
|
|
Args:
|
|
x (int, optional): horisontal camera position from 0 to 3600. Defaults to 0.
|
|
y (int, optional): vertical camera position from -900 to 2700. Defaults to 0.
|
|
z (int, optional): zoom camera position from 0 to 1000. Defaults to 0.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/absolute"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData><AbsoluteHigh>'
|
|
+ '<elevation>' + str(y) + '</elevation>'
|
|
+ '<azimuth>' + str(x) + '</azimuth>'
|
|
+ '<absoluteZoom>' + str(z) + '</absoluteZoom>'
|
|
+ '</AbsoluteHigh></PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def __setcameramovcon(
|
|
self,
|
|
x: int = 0,
|
|
y: int = 0,
|
|
z: int = 0,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> bool:
|
|
"""Set camera moving to direction until other signal or 180 seconds elapse.
|
|
|
|
Args:
|
|
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
y (int, optional): acceleration of vertical camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
z (int, optional): acceleration of zoom camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/continuous"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData>'
|
|
+ '<pan>' + str(x) + '</pan>'
|
|
+ '<tilt>' + str(y) + '</tilt>'
|
|
+ '<zoom>' + str(z) + '</zoom>'
|
|
+ '</PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def __setcameramovmom(
|
|
self,
|
|
x: int = 0,
|
|
y: int = 0,
|
|
z: int = 0,
|
|
t: int = 180000,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> bool:
|
|
"""Set camera moving to direction until other signal or duration elapse.
|
|
|
|
Args:
|
|
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
y (int, optional): acceleration of vertical camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
z (int, optional): acceleration of zoom camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
t (int, optional): duration in ms of acceleration from 0 to 180000.
|
|
Defaults to 180000.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/momentary"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData>'
|
|
+ '<pan>' + str(x) + '</pan>'
|
|
+ '<tilt>' + str(y) + '</tilt>'
|
|
+ '<zoom>' + str(z) + '</zoom>'
|
|
+ '<Momentary><duration>' + str(t) + '</duration></Momentary>'
|
|
+ '</PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
sleep(t/1000)
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setcameramov(self, x: int = 0, y: int = 0, z: int = 0, t: int = 0) -> bool:
|
|
"""Set camera moving to direction (polymorph abstraction).
|
|
|
|
Args:
|
|
x (int, optional): acceleration of horizontal camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
y (int, optional): acceleration of vertical camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
z (int, optional): acceleration of zoom camera movement from -100 to 100.
|
|
Defaults to 0.
|
|
t (int, optional): duration in ms of acceleration from 0 to 180000.
|
|
Defaults to 0.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
if t == '-' or int(t) == 0:
|
|
return self.__setcameramovcon(x=int(x), y=int(y), z=int(z))
|
|
else:
|
|
return self.__setcameramovmom(x=int(x), y=int(y), z=int(z), t=int(t))
|
|
|
|
def setmovtohome(self, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Set camera moving to homeposition.
|
|
|
|
Args:
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/homeposition/goto"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData></PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def setposashome(self, logger_alias: str = inspect.stack()[0].function) -> bool:
|
|
"""Save current camera position as homeposition.
|
|
|
|
Args:
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan)
|
|
+ "/homeposition"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<PTZData></PTZData>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def settextonosd(
|
|
self,
|
|
enabled: str = "true",
|
|
x: int = 0,
|
|
y: int = 0,
|
|
message: str = "",
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> bool:
|
|
"""Set message as video overlay text.
|
|
|
|
Args:
|
|
enabled (str, optional): true or false. Defaults to "true".
|
|
x (int, optional): horizontal text position from 0 to video width. Defaults to 0.
|
|
y (int, optional): vertical text position from 0 to video heith. Defaults to 0.
|
|
message (str, optional): overlay text content. Defaults to "".
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
bool: True if successed.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if message == '-':
|
|
message = ""
|
|
url = (
|
|
self._prot + '://' + self._host + ':' + str(self._port)
|
|
+ "/ISAPI/System/Video/inputs/channels/" + str(self._chan)
|
|
+ "/overlays/text"
|
|
)
|
|
xml = ''.join(
|
|
'<?xml version="1.0" encoding="UTF-8"?>'
|
|
+ '<TextOverlayList version="1.0" xmlns="http://www.hikvision.com/ver10/XMLSchema">'
|
|
+ '<TextOverlay>'
|
|
+ '<id>1</id>'
|
|
+ '<enabled>' + enabled + '</enabled>'
|
|
+ '<posX>' + str(x) + '</posX>'
|
|
+ '<posY>' + str(y) + '</posY>'
|
|
+ '<displayText>' + message + '</displayText>'
|
|
+ '</TextOverlay>'
|
|
+ '</TextOverlayList>'
|
|
)
|
|
response = self.__call(url=url, method="PUT", contenttype="text/xml", contentdata=xml)
|
|
if response != 'ERROR':
|
|
local_logger.debug(msg='\n' + response + '\n')
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
class Sensor(Connect):
|
|
"""Representing sensor connected to remote host.
|
|
The class inherits the necessary connection methods of the Connect class
|
|
"""
|
|
def __init__(
|
|
self,
|
|
hostname: str, username: str, userpass: str,
|
|
nodetype: str, nodename: str,
|
|
hostport: int = 22
|
|
) -> None:
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
hostname (str): sensor's remote host hostname or ip address.
|
|
username (str): sensor's remote host username.
|
|
userpass (str): sensor's remote host password.
|
|
nodetype (str): 'ds18b20' or other sensor type.
|
|
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
|
|
hostport (int, optional): sensor's remote host connection port. Defaults to 22.
|
|
"""
|
|
self._host = hostname
|
|
self._port = hostport
|
|
self._user = username
|
|
self._pswd = userpass
|
|
self._type = nodetype
|
|
self._node = nodename
|
|
|
|
def __call(self, command: str) -> str:
|
|
"""Send request to sensor's remote host.
|
|
|
|
Args:
|
|
command (str): command to poll the sensor.
|
|
|
|
Returns:
|
|
str: sensor's remote host response content.
|
|
"""
|
|
return self.ssh_commands(
|
|
command=command,
|
|
hostname=self._host, port=self._port,
|
|
username=self._user, password=self._pswd
|
|
)
|
|
|
|
# pylint: disable=W0718
|
|
def __temperature(self, nodename: str, logger_alias: str = inspect.stack()[0].function) -> str:
|
|
"""Preparating request for ds18b20 sensor type.
|
|
|
|
Args:
|
|
nodename (str): 28-1a2b3c4d5e6f (ds18b20 example).
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
str: formatted string with temperature in Celsius.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
command = 'cat /sys/bus/w1/devices/' + nodename + '/temperature'
|
|
response = self.__call(command=command)
|
|
local_logger.debug(msg=''
|
|
+ '\n' + 'host: ' + self._host + ':' + str(self._port)
|
|
+ '\n' + 'user: ' + self._user
|
|
+ '\n' + 'pass: ' + self._pswd
|
|
+ '\n' + 'command: ' + command
|
|
)
|
|
if response != 'ERROR':
|
|
try:
|
|
temperature = str(int(response)//1000) + "'C"
|
|
return temperature
|
|
except Exception as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
return 'ERROR'
|
|
return 'ERROR'
|
|
|
|
def value(self) -> str:
|
|
"""Public method to get sensor value.
|
|
|
|
Returns:
|
|
str: sensor value.
|
|
"""
|
|
if self._type == 'ds18b20':
|
|
return self.__temperature(nodename=self._node)
|
|
|
|
|
|
class Wordpress(Connect):
|
|
"""Set of methods (functions) for Wordpress API.
|
|
Reference: https://developer.wordpress.org/rest-api/reference/
|
|
|
|
Args:
|
|
Connect (_type_): class with 'http' method.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
hostname: str,
|
|
username: str,
|
|
password: str
|
|
):
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
hostname (str, optional): www.wordpress.site.
|
|
username (str, optional): wordpress username.
|
|
password (str, optional): wordpress passwrod.
|
|
"""
|
|
if Do.args_valid(locals(), self.__init__.__annotations__):
|
|
self._host = hostname
|
|
self._user = username
|
|
self._pass = password
|
|
self.api_event = 'https://' + self._host + '/wp-json/tribe/events/v1/events'
|
|
self.api_media = 'https://' + self._host + '/wp-json/wp/v2/media'
|
|
self.api_pages = 'https://' + self._host + '/wp-json/wp/v2/pages'
|
|
self.url_files = 'https://' + self._host + '/wp-content/uploads'
|
|
|
|
def event_create(
|
|
self,
|
|
title: str,
|
|
slug: str,
|
|
date_start: str,
|
|
date_end: str,
|
|
date_publish: str = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
|
|
all_day: bool = True,
|
|
description: str = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Create event by 'wp-json' and 'The Events Calendar'.
|
|
|
|
Args:
|
|
title (str, optional): event title.
|
|
slug (str, optional): event slug.
|
|
date_start (str, optional): '2022-12-31T23:59:59' format.
|
|
date_end (str, optional): '2022-12-31T23:59:59' format.
|
|
date_publish (_type_, optional): '2022-12-31T23:59:59' format. Defaults to now.
|
|
all_day (bool, optional): all day event duration flag. Defaults to True.
|
|
description (str, optional): event body. Defaults to None.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Raises:
|
|
ValueError: date formate is not 2022-12-31T23:59:59.
|
|
ValueError: description can't be empty.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':'http/url/to/event'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.event_create.__annotations__):
|
|
pattern = "^([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})+$"
|
|
if (
|
|
not re.fullmatch(pattern, date_start) or
|
|
not re.fullmatch(pattern, date_end) or
|
|
not re.fullmatch(pattern, date_publish)
|
|
):
|
|
raise ValueError("date formate is not 2022-12-31T23:59:59")
|
|
if description == '':
|
|
raise ValueError("description can't be empty")
|
|
|
|
event_json = {
|
|
"title": title,
|
|
"slug": slug,
|
|
"date": date_publish,
|
|
"start_date": date_start,
|
|
"end_date": date_end,
|
|
"all_day": str(all_day),
|
|
"description": description
|
|
}
|
|
response = self.http(
|
|
url=self.api_event,
|
|
method='POST',
|
|
username=self._user,
|
|
password=self._pass,
|
|
authtype='basic',
|
|
contenttype='application/json; charset=UTF-8',
|
|
contentdata=json.dumps(event_json, indent=2)
|
|
)
|
|
local_logger.debug(msg=""
|
|
+ "\n" + "event API response: "
|
|
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
|
|
)
|
|
if response['success']:
|
|
for key, val in json.loads(response['result']).items():
|
|
if key == "url":
|
|
local_logger.info('event created: %s', val)
|
|
return {"success": True, "result": val}
|
|
else:
|
|
local_logger.warning("event didn't create")
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
def media_search(
|
|
self,
|
|
media_name: (str, type(None)) = None,
|
|
media_type: (str, type(None)) = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Search uploaded media by 'wp-json'.
|
|
|
|
Args:
|
|
media_name (str, type, optional): results matching a string. Defaults to None.
|
|
media_type (str, type, optional): application,image,video,audio,text. Defaults to None.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':['list/of/link/to/media']}
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.media_search.__annotations__):
|
|
url = self.api_media + '?per_page=100'
|
|
if media_name:
|
|
url = url + '&search=' + media_name
|
|
if (media_type == 'application' or
|
|
media_type == 'image' or
|
|
media_type == 'video' or
|
|
media_type == 'audio' or
|
|
media_type == 'text'
|
|
):
|
|
url = url + '&media_type=' + media_type
|
|
|
|
media_list = []
|
|
response = self.http(url=url, method='GET')
|
|
local_logger.debug(msg=""
|
|
+ "\n" + "media API response: "
|
|
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
|
|
)
|
|
if response['success']:
|
|
for media in json.loads(response['result']):
|
|
media_list.append(media['guid']['rendered'])
|
|
return {"success": True, "result": media_list}
|
|
else:
|
|
local_logger.warning("media didn't list")
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
def media_upload(
|
|
self,
|
|
mediafile: str,
|
|
mediatype: str,
|
|
aliasfile: str = '',
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Upload media by 'wp-json'.
|
|
|
|
Args:
|
|
mediafile (str, optional): path to file.
|
|
mediatype (str, optional): 'image/jpeg', 'video/mp4', etc.
|
|
aliasfile (str, optional): uploaded media name. Defaults to original file.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Raises:
|
|
FileExistsError: mediafile is not exist.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':'http/url/to/media'}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.media_upload.__annotations__):
|
|
if not path.exists(mediafile):
|
|
raise FileExistsError(mediafile + " is not exist")
|
|
else:
|
|
with open(mediafile, mode='rb') as file:
|
|
mediadata = file.read()
|
|
if aliasfile == '':
|
|
aliasfile = path.basename(mediafile)
|
|
|
|
response = self.http(
|
|
url=self.api_media,
|
|
method='POST',
|
|
username=self._user,
|
|
password=self._pass,
|
|
authtype='basic',
|
|
contenttype=mediatype,
|
|
contentdata=mediadata,
|
|
headers={
|
|
"Accept": "application/json",
|
|
'Content-Disposition': 'attachment; filename=' + aliasfile,
|
|
'Cache-Control': 'no-cache'
|
|
}
|
|
)
|
|
local_logger.debug(msg=""
|
|
+ "\n" + "media API response: "
|
|
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
|
|
)
|
|
if response['success']:
|
|
for key, val in json.loads(response['result']).items():
|
|
if key == "source_url":
|
|
local_logger.info('media uploaded: %s', val)
|
|
return {"success": True, "result": val}
|
|
else:
|
|
local_logger.warning("media didn't upload")
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
def pages_read(
|
|
self,
|
|
page_id: int,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Read page by 'wp-json'.
|
|
|
|
Args:
|
|
page_id (int): unique identifier for the page.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':'page data'}
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.pages_read.__annotations__):
|
|
page_link = self.api_pages + '/' + str(page_id)
|
|
response = self.http(url=page_link)
|
|
if response['success']:
|
|
local_logger.debug(msg=""
|
|
+ "\n" + "wp page API response: "
|
|
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
|
|
)
|
|
return {"success": True, "result": response['result']}
|
|
else:
|
|
local_logger.warning("wp page didn't read")
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
def pages_update(
|
|
self,
|
|
page_id: int,
|
|
content: str,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Update page by 'wp-json'.
|
|
|
|
Args:
|
|
page_id (int): unique identifier for the page.
|
|
content (str): the content for the page.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':'http/url/to/page'}
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.pages_update.__annotations__):
|
|
page_link = self.api_pages + '/' + str(page_id)
|
|
page_json = {
|
|
"content": content
|
|
}
|
|
response = self.http(
|
|
url=page_link,
|
|
method='POST',
|
|
username=self._user,
|
|
password=self._pass,
|
|
authtype='basic',
|
|
contenttype='application/json; charset=UTF-8',
|
|
contentdata=json.dumps(page_json)
|
|
)
|
|
local_logger.debug(msg=""
|
|
+ "\n" + "wp page API response: "
|
|
+ "\n" + json.dumps(json.loads(response['result']), indent=2)
|
|
)
|
|
if response['success']:
|
|
for key, val in json.loads(response['result']).items():
|
|
if key == "link":
|
|
local_logger.info(msg="wp page " + str(page_id) + " updated: " + val)
|
|
return {"success": True, "result": val}
|
|
else:
|
|
local_logger.warning("wp page didn't update")
|
|
return {"success": False, "result": "ERROR"}
|
|
|
|
|
|
class Telegram:
|
|
"""Set of methods (functions) for Telegram Bot API.
|
|
Reference: https://core.telegram.org/bots/api#available-methods
|
|
"""
|
|
def __init__(self, token: str):
|
|
"""Object constructor.
|
|
|
|
Args:
|
|
token (str): Telegram Bot API access token.
|
|
"""
|
|
if Do.args_valid(locals(), self.__init__.__annotations__):
|
|
self._token = token
|
|
self.api_root = 'https://api.telegram.org'
|
|
self.api_path = self.api_root + '/bot' + self._token
|
|
|
|
def send_message(
|
|
self,
|
|
chat: str,
|
|
text: str,
|
|
parse_mode: str = 'HTML',
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Send text message.
|
|
|
|
Args:
|
|
chat (str): unique identifier for the target chat or username of the target channel.
|
|
text (str): text of the message to be sent, 1-4096 characters after entities parsing.
|
|
parse_mode (str, optional): 'HTML', 'Markdown', 'MarkdownV2'. Defaults to 'HTML'.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {"success":bool,"result":"API response" or "ERROR"}
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.send_message.__annotations__):
|
|
url=self.api_path + '/sendMessage'
|
|
data = {
|
|
"chat_id": chat,
|
|
"text": text,
|
|
"parse_mode": parse_mode,
|
|
"disable_notification": True
|
|
}
|
|
if '_' in chat:
|
|
data["chat_id"] = chat.split('_')[0]
|
|
data["message_thread_id"] = chat.split('_')[1]
|
|
response = requests.post(url=url, json=data, timeout=15)
|
|
if response.status_code == 200:
|
|
response = response.json()
|
|
local_logger.info(msg=""
|
|
+ "message '"
|
|
+ str(response['result']['message_id'])
|
|
+ "' sent to telegram chat "
|
|
+ str(chat)
|
|
)
|
|
return {'success': True, 'result': response}
|
|
else:
|
|
local_logger.warning(msg="message didn't send to telegram chat " + str(chat))
|
|
return {'success': False, 'result': response}
|
|
|
|
def delete_message(
|
|
self,
|
|
chat: str,
|
|
message_id: int,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Delete message.
|
|
|
|
Args:
|
|
chat (str): unique identifier for the target chat or username of the target channel.
|
|
message_id (int): identifier of the message to delete.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {"success":bool,"result":"API response" or "ERROR"}
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.delete_message.__annotations__):
|
|
url=self.api_path + '/deleteMessage'
|
|
data = {"chat_id": chat, "message_id": message_id}
|
|
response = requests.post(url=url, json=data, timeout=15)
|
|
if response.status_code == 200:
|
|
response = response.json()
|
|
local_logger.info(msg=""
|
|
+ "message '" + str(message_id) + "' deleted from telegram chat "
|
|
+ str(chat)
|
|
)
|
|
return {'success': True, 'result': response}
|
|
else:
|
|
local_logger.warning(msg=""
|
|
+ "message '" + str(message_id) + "' didn't deleted from telegram chat "
|
|
+ str(chat)
|
|
)
|
|
return {'success': False, 'result': response}
|
|
|
|
def __send_media(
|
|
self,
|
|
chat: str,
|
|
media_path: str,
|
|
media_type: str,
|
|
caption: (str, type(None)),
|
|
parse_mode: str,
|
|
disable_notification: bool,
|
|
additional_url_param: (str, type(None)),
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Send media by api.telegram.org.
|
|
|
|
Args:
|
|
chat (str): unique identifier for the target chat or username of the target channel.
|
|
media_path (str): /local/path/to/file, https://url/to/file, file_id=EXISTFILEID.
|
|
media_type (str): 'document', 'photo', 'video', 'audio'.
|
|
caption (str, None): media caption less 1024 characters.
|
|
parse_mode (str): caption 'HTML', 'Markdown', 'MarkdownV2' parse mode.
|
|
disable_notification (bool): send silently.
|
|
additional_url_param (str, None): example: '&duration=30&width=960&height=540'.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Raises:
|
|
ValueError: "'media_type' value is wrong"
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':response}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.__send_media.__annotations__):
|
|
if (
|
|
media_type == 'document' or
|
|
media_type == 'photo' or
|
|
media_type == 'video' or
|
|
media_type == 'audio'
|
|
):
|
|
if '_' in chat:
|
|
thrd = chat.split('_')[1]
|
|
chat = chat.split('_')[0]
|
|
url = self.api_path + '/send' + media_type + '?chat_id=' + chat
|
|
if thrd:
|
|
url = url + '&message_thread_id=' + thrd
|
|
else:
|
|
raise ValueError("'media_type' value is wrong: " + media_type)
|
|
|
|
if caption:
|
|
url = url + '&caption=' + caption + '&parse_mode=' + parse_mode
|
|
if disable_notification:
|
|
url = url + "&disable_notification=True"
|
|
if additional_url_param:
|
|
url = url + additional_url_param
|
|
if re.match("^(?:http://|https://|file_id=)", media_path):
|
|
media_path = media_path.replace('file_id=', '')
|
|
response = requests.post(
|
|
url=url + "&" + media_type + "=" + media_path,
|
|
timeout=60
|
|
)
|
|
if response.status_code == 200:
|
|
response = response.json()
|
|
if media_type == 'photo':
|
|
file_id = response['result'][media_type][-1]['file_id']
|
|
else:
|
|
file_id = response['result'][media_type]['file_id']
|
|
local_logger.info(msg=""
|
|
+ media_type
|
|
+ " '"
|
|
+ str(file_id)
|
|
+ "' sent to telegram chat "
|
|
+ chat
|
|
)
|
|
return {'success': True, 'result': response}
|
|
else:
|
|
response = requests.post(
|
|
url=url,
|
|
files={media_type: open(media_path, "rb")},
|
|
timeout=60
|
|
)
|
|
if response.status_code == 200:
|
|
response = response.json()
|
|
if media_type == 'photo':
|
|
file_id = response['result'][media_type][-1]['file_id']
|
|
else:
|
|
file_id = response['result'][media_type]['file_id']
|
|
local_logger.info(msg=""
|
|
+ media_type
|
|
+ " '"
|
|
+ str(file_id)
|
|
+ "' sent to telegram chat "
|
|
+ chat
|
|
)
|
|
return {'success': True, 'result': response}
|
|
|
|
local_logger.warning(
|
|
msg=media_type + " " + media_path + " didn't send to telegram chat " + str(chat)
|
|
)
|
|
return {'success': False, 'result': response}
|
|
|
|
def send_document(
|
|
self,
|
|
chat: str,
|
|
document: str,
|
|
caption: (str, type(None)) = None,
|
|
parse_mode: str = 'HTML',
|
|
disable_notification: bool = True
|
|
) -> dict:
|
|
"""Send document. See self.__send_media().
|
|
"""
|
|
if Do.args_valid(locals(), self.send_document.__annotations__):
|
|
return self.__send_media(
|
|
chat=chat,
|
|
media_path=document,
|
|
media_type='document',
|
|
caption=caption,
|
|
parse_mode=parse_mode,
|
|
disable_notification=disable_notification,
|
|
additional_url_param=None
|
|
)
|
|
|
|
def send_photo(
|
|
self,
|
|
chat: str,
|
|
photo: str,
|
|
caption: (str, type(None)) = None,
|
|
parse_mode: str = 'HTML',
|
|
disable_notification: bool = True
|
|
) -> dict:
|
|
"""Send photo. See self.__send_media().
|
|
"""
|
|
if Do.args_valid(locals(), self.send_photo.__annotations__):
|
|
return self.__send_media(
|
|
chat=chat,
|
|
media_path=photo,
|
|
media_type='photo',
|
|
caption=caption,
|
|
parse_mode=parse_mode,
|
|
disable_notification=disable_notification,
|
|
additional_url_param=None
|
|
)
|
|
|
|
def send_video(
|
|
self,
|
|
chat: str,
|
|
video: str,
|
|
width: (int, type(None)) = None,
|
|
height: (int, type(None)) = None,
|
|
duration: (int, type(None)) = None,
|
|
caption: (str, type(None)) = None,
|
|
parse_mode: str = 'HTML',
|
|
disable_notification: bool = True
|
|
) -> dict:
|
|
"""Send video. See self.__send_media().
|
|
"""
|
|
if Do.args_valid(locals(), self.send_video.__annotations__):
|
|
|
|
if width or height or duration:
|
|
additional_url_param = ''
|
|
if width:
|
|
additional_url_param += '&width=' + str(width)
|
|
if height:
|
|
additional_url_param += '&height=' + str(height)
|
|
if duration:
|
|
additional_url_param += '&duration=' + str(duration)
|
|
else:
|
|
additional_url_param = None
|
|
|
|
return self.__send_media(
|
|
chat=chat,
|
|
media_path=video,
|
|
media_type='video',
|
|
caption=caption,
|
|
parse_mode=parse_mode,
|
|
disable_notification=disable_notification,
|
|
additional_url_param=additional_url_param
|
|
)
|
|
|
|
def send_audio(
|
|
self,
|
|
chat: str,
|
|
audio: str,
|
|
caption: (str, type(None)) = None,
|
|
parse_mode: str = 'HTML',
|
|
disable_notification: bool = True
|
|
) -> dict:
|
|
"""Send audio. See self.__send_media().
|
|
"""
|
|
if Do.args_valid(locals(), self.send_audio.__annotations__):
|
|
return self.__send_media(
|
|
chat=chat,
|
|
media_path=audio,
|
|
media_type='audio',
|
|
caption=caption,
|
|
parse_mode=parse_mode,
|
|
disable_notification=disable_notification,
|
|
additional_url_param=None
|
|
)
|
|
|
|
def send_mediagroup(
|
|
self,
|
|
chat: str,
|
|
media: dict,
|
|
caption: (str, type(None)) = None,
|
|
parse_mode: str = 'HTML',
|
|
disable_notification: bool = True,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Send media group of photo, video, audio, documents.
|
|
|
|
Args:
|
|
chat (str): unique identifier for the target chat or username of the target channel.
|
|
media (dict): {
|
|
name:{'type':'photo',path:'https://url/to/file',caption:text},
|
|
name:{'type':'video',path:'/local/path/to/file',caption:text},
|
|
name:{'type':'audio',path:'file_id=EXISTFILEID',caption:text},
|
|
}.
|
|
caption (str, type, optional): media caption less 1024 characters. Defaults to None.
|
|
parse_mode (str): caption 'HTML', 'Markdown', 'MarkdownV2' parse mode.
|
|
disable_notification (bool, optional): send silently. Defaults to True.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':response}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), self.send_mediagroup.__annotations__):
|
|
url=self.api_path + '/sendMediaGroup'
|
|
files = {}
|
|
group = []
|
|
|
|
for media_name in media.keys():
|
|
if re.match("^(?:http://|https://|file_id=)", media[media_name]['path']):
|
|
files[media_name] = None
|
|
media_source = media[media_name]['path'].replace('file_id=', '')
|
|
else:
|
|
with open(media[media_name]['path'], mode='rb') as file:
|
|
files[media_name] = file.read()
|
|
media_source = "attach://" + media_name
|
|
|
|
if not caption and media[media_name]['caption']:
|
|
media_caption = media[media_name]['caption']
|
|
else:
|
|
media_caption = ''
|
|
|
|
group.append({
|
|
"type": media[media_name]['type'],
|
|
"media": media_source,
|
|
"caption": media_caption
|
|
}
|
|
)
|
|
|
|
if caption:
|
|
group[0]['caption'] = caption
|
|
group[0]['parse_mode'] = parse_mode
|
|
|
|
data = {
|
|
'chat_id': chat,
|
|
'media': json.dumps(group),
|
|
"disable_notification": disable_notification
|
|
}
|
|
if '_' in chat:
|
|
data["chat_id"] = chat.split('_')[0]
|
|
data["message_thread_id"] = chat.split('_')[1]
|
|
|
|
response = requests.post(url=url, data=data, files=files, timeout=300)
|
|
if response.status_code == 200:
|
|
response = response.json()
|
|
local_logger.info(msg=""
|
|
+ "mediagroup '"
|
|
+ str(response['result'][0]['media_group_id'])
|
|
+ "' sent to telegram chat "
|
|
+ str(chat)
|
|
)
|
|
return {'success': True, 'result': response}
|
|
else:
|
|
local_logger.warning(msg="mediagroup didn't send to telegram chat " + str(chat))
|
|
return {'success': False, 'result': response}
|
|
|
|
|
|
class Sequence:
|
|
"""Sequence handling.
|
|
"""
|
|
@staticmethod
|
|
def run(
|
|
device: HikISAPI,
|
|
sensors: dict,
|
|
sequence: dict,
|
|
temp_path: str,
|
|
records_root_path: str = None,
|
|
records_root_host: str = None,
|
|
records_root_port: int = None,
|
|
records_root_type: str = None,
|
|
records_root_user: str = None,
|
|
records_root_pass: str = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> None:
|
|
"""Sequences executor.
|
|
|
|
Args:
|
|
device (HikISAPI): HikISAPI object.
|
|
sensors (dict): collection as key=sensorname:value=Sensor object.
|
|
sequence (dict): sequence steps collection.
|
|
temp_path (str): path to directory for temp files.
|
|
records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory.
|
|
Defaults to None.
|
|
records_root_host (str, optional): hostname if path on remote host.
|
|
Defaults to None.
|
|
records_root_port (int, optional): connection port if path on remote host.
|
|
Defaults to None.
|
|
records_root_type (str, optional): 'ftp', 'sftp', 'smb' if path on remote host.
|
|
Defaults to None.
|
|
records_root_user (str, optional): username if path on remote host.
|
|
Defaults to None.
|
|
records_root_pass (str, optional): password if path on remote host.
|
|
Defaults to None.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
makedirs(temp_path, exist_ok=True)
|
|
for key, value in sequence.items():
|
|
action = value.split(',')[0].strip()
|
|
x = value.split(',')[1].strip()
|
|
y = value.split(',')[2].strip()
|
|
z = value.split(',')[3].strip()
|
|
p = value.split(',')[4].strip()
|
|
s = value.split(',')[5].strip()
|
|
t = value.split(',')[6].strip()
|
|
w = value.split(',')[7].strip()
|
|
m = value.split(',')[8].strip()
|
|
if 'sensor-config:' in m:
|
|
sensor_name = m.split(':')[1].strip()
|
|
sensor_value = sensors[sensor_name].value()
|
|
if sensor_value != 'ERROR':
|
|
m = sensor_value
|
|
else:
|
|
local_logger.warning(msg=""
|
|
+ "the 'ERROR' " + sensor_name + " value"
|
|
+ " has been replaced with an empty string"
|
|
)
|
|
m = ""
|
|
local_logger.info(msg=''
|
|
+ 'action:' + key + ' = ' + action
|
|
+ ',' + x + ',' + y + ',' + z
|
|
+ ',' + p + ',' + s + ',' + t
|
|
+ ',' + w + ',' + m
|
|
)
|
|
if action == 'capabilities':
|
|
response = device.capabilities()
|
|
elif action == 'getcamerapos':
|
|
response = device.getcamerapos()
|
|
elif action == 'rebootcamera':
|
|
response = device.rebootcamera()
|
|
elif action == 'setptzmovyyu':
|
|
response = device.setptzmovyyu(speed=int(s))
|
|
elif action == 'setptzmovyyd':
|
|
response = device.setptzmovyyd(speed=int(s))
|
|
elif action == 'setptzmovxxl':
|
|
response = device.setptzmovxxl(speed=int(s))
|
|
elif action == 'setptzmovxxr':
|
|
response = device.setptzmovxxr(speed=int(s))
|
|
elif action == 'setptzmovzzi':
|
|
response = device.setptzmovzzi(speed=int(s))
|
|
elif action == 'setptzmovzzo':
|
|
response = device.setptzmovzzo(speed=int(s))
|
|
elif action == 'setptzpreset':
|
|
response = device.setptzpreset(preset=int(p), speed=int(s))
|
|
elif action == 'setptztostop':
|
|
response = device.setptztostop()
|
|
elif action == 'setcamerapos':
|
|
response = device.setcamerapos(x=int(x), y=int(y), z=int(z))
|
|
elif action == 'setcameramov':
|
|
response = device.setcameramov(x=int(x), y=int(y), z=int(z), t=t)
|
|
elif action == 'setmovtohome':
|
|
response = device.setmovtohome()
|
|
elif action == 'setposashome':
|
|
response = device.setposashome()
|
|
elif action == 'settextonosd':
|
|
response = device.settextonosd(x=int(x), y=int(y), message=m)
|
|
elif action == 'downloadjpeg':
|
|
dy = datetime.datetime.now().strftime('%Y')
|
|
dm = datetime.datetime.now().strftime('%m')
|
|
dv = datetime.datetime.now().strftime('%V')
|
|
dd = datetime.datetime.now().strftime('%d')
|
|
th = datetime.datetime.now().strftime('%H')
|
|
tm = datetime.datetime.now().strftime('%M')
|
|
ts = datetime.datetime.now().strftime('%S')
|
|
records_file_name = (
|
|
key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg'
|
|
)
|
|
if device.downloadjpeg(
|
|
x=int(x),
|
|
y=int(y),
|
|
dst_file=temp_path + sep + records_file_name
|
|
):
|
|
src_file = temp_path + sep + records_file_name
|
|
dst_file = (''
|
|
+ records_root_path
|
|
+ '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/'
|
|
+ records_file_name
|
|
)
|
|
response = Connect.file_upload(
|
|
src_file=src_file,
|
|
dst_file=dst_file,
|
|
hostname=records_root_host,
|
|
hostport=records_root_port,
|
|
hosttype=records_root_type,
|
|
username=records_root_user,
|
|
password=records_root_pass,
|
|
logger_alias=logger_alias
|
|
)
|
|
if response['success']:
|
|
try:
|
|
remove(src_file)
|
|
except OSError as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
response = True
|
|
else:
|
|
response = False
|
|
else:
|
|
response = False
|
|
if w != '-' or float(w) != 0:
|
|
sleep(float(w))
|
|
if response:
|
|
local_logger.info(msg='result:' + key + ' = OK')
|
|
else:
|
|
local_logger.warning(msg='result: ' + key + ' = ERROR')
|
|
try:
|
|
rmdir(temp_path)
|
|
except OSError as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
|
|
|
|
class Convert:
|
|
"""Convert handling.
|
|
"""
|
|
@staticmethod
|
|
# pylint: disable=W0612
|
|
def run(
|
|
image_root_path: (str, list),
|
|
image_find_names: list,
|
|
video_dest_path: str,
|
|
video_dest_sufx: str,
|
|
video_scale_x: int,
|
|
video_scale_y: int,
|
|
video_framerate: int,
|
|
video_duration: int,
|
|
temp_path: str,
|
|
image_root_host: (str, type(None)) = None,
|
|
image_root_port: (int, type(None)) = None,
|
|
image_root_type: (str, type(None)) = None,
|
|
image_root_user: (str, type(None)) = None,
|
|
image_root_pass: (str, type(None)) = None,
|
|
video_dest_host: (str, type(None)) = None,
|
|
video_dest_port: (int, type(None)) = None,
|
|
video_dest_type: (str, type(None)) = None,
|
|
video_dest_user: (str, type(None)) = None,
|
|
video_dest_pass: (str, type(None)) = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> None:
|
|
"""Converting executor.
|
|
|
|
Args:
|
|
image_root_path (str, list): source images path.
|
|
image_find_names (list): image names to search.
|
|
video_dest_path (str): path to destination video.
|
|
video_dest_sufx (str): destination video name suffix.
|
|
video_scale_x (int): destination video width.
|
|
video_scale_y (int): destination video height.
|
|
video_framerate (int): destination video frame per second.
|
|
video_duration (int): destination video duration.
|
|
temp_path (str): path to directory for temp files.
|
|
image_root_host (str, None, optional): source images hostname.
|
|
Defaults to None.
|
|
image_root_port (int, None, optional): source images connection port.
|
|
Defaults to None.
|
|
image_root_type (str, None, optional): ftp,sftp,smb.
|
|
Defaults to None.
|
|
image_root_user (str, None, optional): username to source images ftp,sftp,smb path.
|
|
Defaults to None.
|
|
image_root_pass (str, None, optional): password to source images ftp,sftp,smb path.
|
|
Defaults to None.
|
|
video_dest_host (str, None, optional): destination video hostname.
|
|
Defaults to None.
|
|
video_dest_port (int, None, optional): destination video connection port.
|
|
Defaults to None.
|
|
video_dest_type (str, None, optional): ftp,sftp,smb.
|
|
Defaults to None.
|
|
video_dest_user (str, None, optional): username to destination video ftp,sftp,smb path.
|
|
Defaults to None.
|
|
video_dest_pass (str, None, optional): password to destination video ftp,sftp,smb path.
|
|
Defaults to None.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if isinstance(image_root_path, str):
|
|
image_root_path = [image_root_path]
|
|
|
|
temp_files = []
|
|
for name in image_find_names:
|
|
image_found = []
|
|
for image_root in image_root_path:
|
|
image_found = image_found + Connect.file_search(
|
|
search_path=image_root,
|
|
search_name=name,
|
|
hostname=image_root_host,
|
|
hostport=image_root_port,
|
|
hosttype=image_root_type,
|
|
username=image_root_user,
|
|
password=image_root_pass,
|
|
logger_alias=logger_alias
|
|
)
|
|
makedirs(temp_path, exist_ok=True)
|
|
for image in image_found:
|
|
|
|
connect = Connect.parse_connect_params(connect_string=image_root)
|
|
if not image_root_host:
|
|
image_root_host = connect['hostname']
|
|
if not image_root_port:
|
|
image_root_port = connect['hostport']
|
|
if not image_root_type:
|
|
image_root_type = connect['hosttype']
|
|
if not image_root_user:
|
|
image_root_user = connect['username']
|
|
if not image_root_pass:
|
|
image_root_pass = connect['password']
|
|
|
|
if Connect.file_download(
|
|
src_file=image,
|
|
dst_file=temp_path + sep + path.basename(image),
|
|
hostname=image_root_host,
|
|
hostport=image_root_port,
|
|
hosttype=image_root_type,
|
|
username=image_root_user,
|
|
password=image_root_pass,
|
|
)['success']:
|
|
temp_files.append(temp_path + sep + path.basename(image))
|
|
image_temp_list = temp_path + sep + 'convert.list'
|
|
image_amount = 0
|
|
with open(image_temp_list, mode='w+', encoding='UTF-8') as converter_list:
|
|
for file in Connect.file_search(search_path=temp_path, search_name=name):
|
|
converter_list.write("\nfile '" + file + "'")
|
|
image_amount += 1
|
|
temp_files.append(image_temp_list)
|
|
|
|
video_converted = name + '_' + video_dest_sufx + '.mp4'
|
|
video_temp_path = temp_path + sep + video_converted
|
|
video_conv_conf = (''
|
|
+ '-r '
|
|
+ str(image_amount) + '/' + str(video_duration)
|
|
+ ' -f concat -safe 0 -i '
|
|
+ image_temp_list
|
|
+ ' -c:v libx264 -vf scale=' + str(video_scale_x) + ':' + str(video_scale_y)
|
|
+ ',fps=' + str(video_framerate) + ',format=yuv420p '
|
|
+ video_temp_path
|
|
+ ' -y'
|
|
)
|
|
|
|
if FFmpeg.run(raw=video_conv_conf, logger_alias=logger_alias) == 0:
|
|
temp_files.append(video_temp_path)
|
|
if Connect.file_upload(
|
|
src_file=video_temp_path,
|
|
dst_file=video_dest_path + '/' + video_converted,
|
|
hostname=video_dest_host,
|
|
hostport=video_dest_port,
|
|
hosttype=video_dest_type,
|
|
username=video_dest_user,
|
|
password=video_dest_pass,
|
|
logger_alias=logger_alias
|
|
)['success']:
|
|
pass
|
|
|
|
for temp_file in temp_files:
|
|
try:
|
|
remove(temp_file)
|
|
except OSError as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
try:
|
|
rmdir(temp_path)
|
|
except OSError as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
|
|
|
|
class Publish:
|
|
"""Publish handling.
|
|
"""
|
|
@classmethod
|
|
# pylint: disable=W0612
|
|
def run(
|
|
cls,
|
|
video_root_path: (str, list),
|
|
video_find_names: list,
|
|
temp_path: str,
|
|
publish_period: str,
|
|
publish_amount: int,
|
|
video_root_host: (str, type(None)) = None,
|
|
video_root_port: (int, type(None)) = None,
|
|
video_root_type: (str, type(None)) = None,
|
|
video_root_user: (str, type(None)) = None,
|
|
video_root_pass: (str, type(None)) = None,
|
|
wp_site_name: (str, type(None)) = None,
|
|
wp_user_name: (str, type(None)) = None,
|
|
wp_user_pass: (str, type(None)) = None,
|
|
wp_update_page_id: (int, type(None)) = None,
|
|
tg_api_key: (str, type(None)) = None,
|
|
tg_chat_id: (str, type(None)) = None,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> None:
|
|
"""Publishing executor.
|
|
|
|
Args:
|
|
video_root_path (str, list): source video path.
|
|
video_find_names (list): video names to search.
|
|
temp_path (str): path to directory for temp files.
|
|
publish_period (str): +/- periods.
|
|
publish_amount (int): 'y'|'year','m'|'month','w'|'week','d'|'day'.
|
|
video_root_host (str, None, optional): source video hostname.
|
|
Defaults to None.
|
|
video_root_port (int, None, optional): source video connection port.
|
|
Defaults to None.
|
|
video_root_type (str, None, optional): ftp,sftp,smb.
|
|
Defaults to None.
|
|
video_root_user (str, None, optional): username to source video ftp,sftp,smb path.
|
|
Defaults to None.
|
|
video_root_pass (str, None, optional): password to source video ftp,sftp,smb path.
|
|
Defaults to None.
|
|
wp_site_name (str, None, optional): www.wordpress.site.
|
|
Defaults to None.
|
|
wp_user_name (str, None, optional): wordpress username.
|
|
Defaults to None.
|
|
wp_user_pass (str, None, optional): wordpress password.
|
|
Defaults to None.
|
|
wp_update_page_id (int, None, optional): unique identifier for the page.
|
|
Defaults to None.
|
|
tg_api_key (str, None, optional): Telegram Bot API access token.
|
|
Defaults to None.
|
|
tg_chat_id (str, None, optional): unique identifier for
|
|
the target chat or username of the target channel.
|
|
Defaults to None.
|
|
logger_alias (str, optional): sublogger name.
|
|
Defaults to function or method name.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if isinstance(video_root_path, str):
|
|
video_root_path = [video_root_path]
|
|
|
|
publish_date = Do.date_calc(period=publish_period, amount=publish_amount)
|
|
if publish_date['period'] == 'day':
|
|
video_find_sufx = (''
|
|
+ str(publish_date['start']['y']) + '.'
|
|
+ str(publish_date['start']['m']).zfill(2) + '.'
|
|
+ str(publish_date['start']['d']).zfill(2)
|
|
)
|
|
if publish_date['period'] == 'week':
|
|
video_find_sufx = (''
|
|
+ str(publish_date['start']['y']) + '-w'
|
|
+ str(publish_date['start']['w']).zfill(2)
|
|
)
|
|
if publish_date['period'] == 'month':
|
|
video_find_sufx = (''
|
|
+ str(publish_date['start']['y']) + '.'
|
|
+ str(publish_date['start']['m']).zfill(2)
|
|
)
|
|
if publish_date['period'] == 'year':
|
|
video_find_sufx = (''
|
|
+ str(publish_date['start']['y'])
|
|
)
|
|
|
|
temp_path = temp_path + sep + Do.random_string(8)
|
|
temp_files = []
|
|
video_files = {publish_date['period']:{}}
|
|
for name in video_find_names:
|
|
video_found = []
|
|
for video_root in video_root_path:
|
|
video_found = video_found + Connect.file_search(
|
|
search_path=video_root,
|
|
search_name=name + '_' + video_find_sufx + '.mp4',
|
|
hostname=video_root_host,
|
|
hostport=video_root_port,
|
|
hosttype=video_root_type,
|
|
username=video_root_user,
|
|
password=video_root_pass,
|
|
logger_alias=logger_alias
|
|
)
|
|
makedirs(temp_path, exist_ok=True)
|
|
|
|
for video in video_found:
|
|
|
|
connect = Connect.parse_connect_params(connect_string=video_root)
|
|
if not video_root_host:
|
|
video_root_host = connect['hostname']
|
|
if not video_root_port:
|
|
video_root_port = connect['hostport']
|
|
if not video_root_type:
|
|
video_root_type = connect['hosttype']
|
|
if not video_root_user:
|
|
video_root_user = connect['username']
|
|
if not video_root_pass:
|
|
video_root_pass = connect['password']
|
|
|
|
if Connect.file_download(
|
|
src_file=video,
|
|
dst_file=temp_path + sep + path.basename(video),
|
|
hostname=video_root_host,
|
|
hostport=video_root_port,
|
|
hosttype=video_root_type,
|
|
username=video_root_user,
|
|
password=video_root_pass,
|
|
logger_alias=logger_alias
|
|
)['success']:
|
|
temp_files.append(temp_path + sep + path.basename(video))
|
|
video_files[publish_period][name] = (
|
|
temp_path + sep + path.basename(video)
|
|
)
|
|
|
|
if tg_api_key:
|
|
tg = Telegram(tg_api_key)
|
|
cls.tg_routine_media(
|
|
tg=tg,
|
|
targets_media_files=video_files,
|
|
period=publish_date['period'],
|
|
amount=publish_amount,
|
|
chat=tg_chat_id,
|
|
logger_alias=logger_alias
|
|
)
|
|
if wp_site_name:
|
|
wp = Wordpress(
|
|
hostname=wp_site_name,
|
|
username=wp_user_name,
|
|
password=wp_user_pass
|
|
)
|
|
cls.wp_routine_media(
|
|
wp=wp,
|
|
targets_media_files=video_files,
|
|
period=publish_date['period'],
|
|
amount=publish_amount,
|
|
page_id=wp_update_page_id,
|
|
logger_alias=logger_alias
|
|
)
|
|
|
|
for temp_file in temp_files:
|
|
try:
|
|
remove(temp_file)
|
|
except OSError as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
try:
|
|
rmdir(temp_path)
|
|
except OSError as error:
|
|
local_logger.debug(msg='error: ' + '\n' + str(error))
|
|
|
|
@staticmethod
|
|
def wp_routine_media(
|
|
wp: Wordpress,
|
|
targets_media_files: dict,
|
|
period: str,
|
|
amount: int,
|
|
page_id: int,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Custom Wordpress media routine - upload media, create media events, update media page.
|
|
|
|
Args:
|
|
wp (Wordpress): wordpress object.
|
|
targets_media_files (dict): {'period':{'name':'local/path/to/file'}}.
|
|
period (str, optional): 'y','m','w','d'.
|
|
amount (int, optional): +/- periods.
|
|
page_id (int): unique identifier for the page.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Returns:
|
|
dict: {'media upload': bool, 'event create': bool, 'pages update': bool}
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Publish.wp_routine_media.__annotations__):
|
|
default_media_links = {
|
|
"day": {
|
|
"point-01": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mm_.dd_.mp4"
|
|
),
|
|
"point-02": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mm_.dd_.mp4"
|
|
),
|
|
"point-04": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mm_.dd_.mp4"
|
|
),
|
|
"point-05": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mm_.dd_.mp4"
|
|
),
|
|
"point-11": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mm_.dd_.mp4"
|
|
),
|
|
"point-12": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mm_.dd_.mp4"
|
|
)
|
|
},
|
|
"week": {
|
|
"point-01": "https://www.hmp.today/wp-content/uploads/2022/07/point-01_w.mp4",
|
|
"point-02": "https://www.hmp.today/wp-content/uploads/2022/07/point-02_w.mp4",
|
|
"point-04": "https://www.hmp.today/wp-content/uploads/2022/07/point-04_w.mp4",
|
|
"point-05": "https://www.hmp.today/wp-content/uploads/2022/07/point-05_w.mp4",
|
|
"point-11": "https://www.hmp.today/wp-content/uploads/2022/07/point-11_w.mp4",
|
|
"point-12": "https://www.hmp.today/wp-content/uploads/2022/07/point-12_w.mp4"
|
|
},
|
|
"month": {
|
|
"point-01": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mm_.mp4"
|
|
),
|
|
"point-02": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mm_.mp4"
|
|
),
|
|
"point-04": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mm_.mp4"
|
|
),
|
|
"point-05": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mm_.mp4"
|
|
),
|
|
"point-11": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mm_.mp4"
|
|
),
|
|
"point-12": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mm_.mp4"
|
|
)
|
|
},
|
|
"year": {
|
|
"point-01": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-01_yyyy.mp4"
|
|
),
|
|
"point-02": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-02_yyyy.mp4"
|
|
),
|
|
"point-04": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-04_yyyy.mp4"
|
|
),
|
|
"point-05": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-05_yyyy.mp4"
|
|
),
|
|
"point-11": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-11_yyyy.mp4"
|
|
),
|
|
"point-12": (
|
|
"https://www.hmp.today/wp-content/uploads/2022/07/point-12_yyyy.mp4"
|
|
)
|
|
}
|
|
}
|
|
current_media_links = default_media_links[period]
|
|
result = {}
|
|
|
|
# media upload
|
|
result['media upload'] = True
|
|
for name, link in current_media_links.items():
|
|
file_found = wp.media_search(
|
|
media_name=path.basename(targets_media_files[period][name]),
|
|
media_type='video'
|
|
)
|
|
if file_found['success'] and len(file_found['result']) > 0:
|
|
local_logger.info(
|
|
msg=""
|
|
+ "upload skipped, "
|
|
+ targets_media_files[period][name]
|
|
+ " found on site"
|
|
)
|
|
targets_media_files[period].pop(name, None)
|
|
current_media_links[name] = file_found['result'][0]
|
|
else:
|
|
file_upload = wp.media_upload(targets_media_files[period][name], 'video/mp4')
|
|
if file_upload['success']:
|
|
current_media_links[name] = file_upload['result']
|
|
else:
|
|
result['media upload'] = False
|
|
|
|
# event create
|
|
result['event create'] = True
|
|
for name, link in current_media_links.items():
|
|
description = (''
|
|
+ '<figure class="wp-block-video">'
|
|
+ '<video controls loop src="' + link + '"></video>'
|
|
+ '</figure>'
|
|
)
|
|
date = Do.date_calc(period=period, amount=amount)
|
|
date_start = (''
|
|
+ str(date['start']['y'])
|
|
+ '-'
|
|
+ str(date['start']['m']).zfill(2)
|
|
+ '-'
|
|
+ str(date['start']['d']).zfill(2)
|
|
+ 'T00:00:00'
|
|
)
|
|
date_end = (''
|
|
+ str(date['end']['y'])
|
|
+ '-'
|
|
+ str(date['end']['m']).zfill(2)
|
|
+ '-'
|
|
+ str(date['end']['d']).zfill(2)
|
|
+ 'T23:59:59'
|
|
)
|
|
if period == 'd' or period == 'day':
|
|
slug = (''
|
|
+ name
|
|
+ '_'
|
|
+ str(date['start']['y'])
|
|
+ '-'
|
|
+ str(date['start']['m']).zfill(2)
|
|
+ '-'
|
|
+ str(date['start']['d']).zfill(2)
|
|
)
|
|
title = (''
|
|
+ name
|
|
+ ' '
|
|
+ str(date['start']['y'])
|
|
+ '.'
|
|
+ str(date['start']['m']).zfill(2)
|
|
+ '.'
|
|
+ str(date['start']['d']).zfill(2)
|
|
)
|
|
if period == 'w' or period == 'week':
|
|
slug = (''
|
|
+ name
|
|
+ '_'
|
|
+ str(date['start']['y'])
|
|
+ '-w'
|
|
+ str(date['start']['w']).zfill(2)
|
|
)
|
|
title = (''
|
|
+ name
|
|
+ ' '
|
|
+ str(date['start']['y'])
|
|
+ '-w'
|
|
+ str(date['start']['w']).zfill(2)
|
|
)
|
|
if period == 'm' or period == 'month':
|
|
slug = (''
|
|
+ name
|
|
+ '_'
|
|
+ str(date['start']['y'])
|
|
+ '-'
|
|
+ str(date['start']['m']).zfill(2)
|
|
)
|
|
title = (''
|
|
+ name
|
|
+ ' '
|
|
+ str(date['start']['y'])
|
|
+ '.'
|
|
+ str(date['start']['m']).zfill(2)
|
|
)
|
|
if period == 'y' or period == 'year':
|
|
slug = (''
|
|
+ name
|
|
+ '_'
|
|
+ str(date['start']['y'])
|
|
)
|
|
title = (''
|
|
+ name
|
|
+ ' '
|
|
+ str(date['start']['y'])
|
|
)
|
|
|
|
event_api_slug = wp.api_event + '/by-slug/' + slug
|
|
if current_media_links[name] != default_media_links[period][name]:
|
|
pass
|
|
elif Connect.http(event_api_slug)['success']:
|
|
local_logger.info(msg="event skipped, " + event_api_slug + " found on site")
|
|
else:
|
|
event_create = wp.event_create(
|
|
title=title,
|
|
slug=slug,
|
|
date_start=date_start,
|
|
date_end=date_end,
|
|
date_publish=date_start,
|
|
description=description
|
|
)
|
|
if not event_create['success']:
|
|
result['event create'] = False
|
|
|
|
# pages update
|
|
result['pages update'] = True
|
|
page_read = wp.pages_read(page_id)
|
|
if page_read['success']:
|
|
content = json.loads(page_read['result'])['content']['rendered']
|
|
for name, link in current_media_links.items():
|
|
if period == 'd' or period == 'day':
|
|
reg_exp = (""
|
|
+ "_(?:[0-9]{4}|yyyy)"
|
|
+ ".(?:[0-9]{2}|mm_)"
|
|
+ ".(?:[0-9]{2}|dd_)(?:|-[0-9]).mp4"
|
|
)
|
|
if period == 'w' or period == 'week':
|
|
reg_exp = "(?:_[0-9]{4}-w[0-9]{2}|_w)(?:|-[0-9]).mp4"
|
|
if period == 'm' or period == 'month':
|
|
reg_exp = "_(?:[0-9]{4}|yyyy).(?:[0-9]{2}|mm_)(?:|-[0-9]).mp4"
|
|
if period == 'y' or period == 'year':
|
|
reg_exp = "_(?:[0-9]{4}|yyyy)(?:|-[0-9]).mp4"
|
|
|
|
replace = 0
|
|
new_str = link
|
|
pattern = wp.url_files + "/[0-9]{4}/[0-9]{2}/" + name + reg_exp
|
|
for old_str in re.findall(pattern, content):
|
|
if old_str == new_str:
|
|
local_logger.info(
|
|
msg=""
|
|
+ "page replace skipped, "
|
|
+ new_str
|
|
+ " found on page"
|
|
)
|
|
else:
|
|
content = content.replace(old_str, new_str)
|
|
replace += 1
|
|
local_logger.info(msg="page replace " + old_str + " to " + new_str)
|
|
|
|
if replace > 0:
|
|
page_update = wp.pages_update(page_id = page_id, content = content)
|
|
result['pages update'] = page_update['success']
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
# pylint: disable=W0612
|
|
def tg_routine_media(
|
|
tg: Telegram,
|
|
targets_media_files: dict,
|
|
period: str,
|
|
amount: int,
|
|
chat: str,
|
|
logger_alias: str = inspect.stack()[0].function
|
|
) -> dict:
|
|
"""Custom Telegram media routine - send mediagroup.
|
|
|
|
Args:
|
|
tg (Telegram): telegram object
|
|
targets_media_files (dict): {'period':{'name':'local/path/to/file'}}.
|
|
period (str): 'y','m','w','d'.
|
|
amount (int): +/- periods.
|
|
chat (str): unique identifier for the target chat or username of the target channel.
|
|
logger_alias (str, optional): sublogger name. Defaults to function or method name.
|
|
|
|
Raises:
|
|
ValueError: filename is not local file.
|
|
|
|
Returns:
|
|
dict: {'success':bool,'result':response}.
|
|
"""
|
|
local_logger = logging.getLogger(logger_alias)
|
|
if Do.args_valid(locals(), Publish.tg_routine_media.__annotations__):
|
|
default_caption = (""
|
|
+ "`period:` yyyy.mm.dd\n"
|
|
+ "`source:` https://www.hmp.today/media\n"
|
|
+ "`stream:` https://youtu.be/ikB20ML5oyo"
|
|
)
|
|
group = {
|
|
"cover":
|
|
{
|
|
"type": "photo",
|
|
"path": (''
|
|
+ 'https://www.hmp.today/wp-content/uploads/'
|
|
+ '2021/02/site-slider_hmp-qr_bwg-3840x1705-1.png'
|
|
),
|
|
"caption": "source: https://www.hmp.today"
|
|
}
|
|
}
|
|
tmp_files = []
|
|
|
|
date = Do.date_calc(period=period, amount=amount)
|
|
if period == 'd' or period == 'day':
|
|
caption_date = (''
|
|
+ str(date['start']['y'])
|
|
+ '.'
|
|
+ str(date['start']['m']).zfill(2)
|
|
+ '.'
|
|
+ str(date['start']['d']).zfill(2)
|
|
)
|
|
if period == 'w' or period == 'week':
|
|
caption_date = (''
|
|
+ str(date['start']['y'])
|
|
+ '-w'
|
|
+ str(date['start']['w']).zfill(2)
|
|
)
|
|
if period == 'm' or period == 'month':
|
|
caption_date = (''
|
|
+ str(date['start']['y'])
|
|
+ '.'
|
|
+ str(date['start']['m']).zfill(2)
|
|
)
|
|
if period == 'y' or period == 'year':
|
|
caption_date = (''
|
|
+ str(date['start']['y'])
|
|
)
|
|
current_caption = default_caption.replace('yyyy.mm.dd', caption_date)
|
|
|
|
for media_name, media_path in targets_media_files[period].items():
|
|
if re.match("^(?:http://|https://|file_id=)", media_path):
|
|
raise ValueError(media_name + ' is not local file')
|
|
else:
|
|
tg_limit_video_size = 50000000
|
|
src_video_info = FFmpeg.probe(target=media_path)
|
|
if src_video_info:
|
|
cur_video_file = media_path
|
|
cur_video_duration = int(float(src_video_info['format']['duration']))
|
|
cur_video_fps = int(
|
|
src_video_info['streams'][0]['avg_frame_rate'].split('/')[0]
|
|
)
|
|
cur_video_width = int(src_video_info['streams'][0]['width'])
|
|
cur_video_height = int(src_video_info['streams'][0]['height'])
|
|
|
|
if stat(media_path).st_size >= tg_limit_video_size:
|
|
tmp_video_bitrate = int(
|
|
tg_limit_video_size*0.9/1000*8/cur_video_duration
|
|
)
|
|
tmp_video_file = media_path.replace('.mp4', '_compressed.mp4')
|
|
cur_video_width = int(cur_video_width/2)
|
|
cur_video_height = int(cur_video_height/2)
|
|
|
|
compressing_params = ' '.join(
|
|
[
|
|
'-i', media_path,
|
|
'-c:v libx264 -b:v', str(tmp_video_bitrate) + 'k',
|
|
'-vf scale='
|
|
+ str(cur_video_width) + ':' + str(cur_video_height)
|
|
+ ',fps=' + str(cur_video_fps)
|
|
+ ',format=yuv420p -preset veryslow',
|
|
tmp_video_file, '-y -loglevel quiet -stats'
|
|
]
|
|
)
|
|
if FFmpeg.run(raw=compressing_params) == 0:
|
|
local_logger.info(msg=''
|
|
+ media_path + " comressed to " + tmp_video_file
|
|
)
|
|
tmp_files.append(tmp_video_file)
|
|
cur_video_file = tmp_video_file
|
|
|
|
response_upload = tg.send_video(
|
|
chat=chat,
|
|
video=cur_video_file,
|
|
width=cur_video_width,
|
|
height=cur_video_height,
|
|
duration=cur_video_duration
|
|
)
|
|
if response_upload:
|
|
response_delete = tg.delete_message(
|
|
chat=chat,
|
|
message_id=response_upload['result']['result']['message_id']
|
|
)
|
|
group[media_name] = {
|
|
"type": "video",
|
|
"path": (
|
|
'file_id='
|
|
+ response_upload['result']['result']['video']['file_id']
|
|
)
|
|
}
|
|
|
|
response_result = tg.send_mediagroup(
|
|
chat=chat,
|
|
media=group,
|
|
caption=current_caption,
|
|
parse_mode='Markdown'
|
|
)
|
|
for file in tmp_files:
|
|
if remove(file):
|
|
local_logger.info(msg="deleted " + file)
|
|
return response_result
|
|
|
|
|
|
class Do():
|
|
"""Set of various methods (functions) for routine.
|
|
"""
|
|
@staticmethod
|
|
def random_string(length: int) -> str:
|
|
"""Generate string from lowercase letters, uppercase letters, digits.
|
|
|
|
Args:
|
|
length (int): string lenght.
|
|
|
|
Returns:
|
|
str: random string.
|
|
"""
|
|
return ''.join(choice(ascii_letters + digits) for i in range(length))
|
|
|
|
@staticmethod
|
|
def args_valid(arguments: dict, annotations: dict) -> bool:
|
|
"""Arguments type validating by annotations.
|
|
|
|
Args:
|
|
arguments (dict): 'locals()' immediately after starting the function.
|
|
annotations (dict): function.name.__annotations__.
|
|
|
|
Raises:
|
|
TypeError: type of argument is not equal type in annotation.
|
|
|
|
Returns:
|
|
bool: True if argument types are valid.
|
|
"""
|
|
for var_name, var_type in annotations.items():
|
|
if not var_name == 'return':
|
|
if not isinstance(arguments[var_name], var_type):
|
|
raise TypeError(""
|
|
+ "type of '"
|
|
+ var_name
|
|
+ "' = "
|
|
+ str(arguments[var_name])
|
|
+ " is not "
|
|
+ str(var_type)
|
|
)
|
|
return True
|
|
|
|
@staticmethod
|
|
def date_calc(
|
|
target: datetime.date = datetime.datetime.now(),
|
|
amount: int = 0,
|
|
period: (str, type(None)) = None
|
|
) -> dict:
|
|
"""Calculating start/end dates for period: day, week, month, year.
|
|
|
|
Args:
|
|
target (datetime.date, optional): date in the calculation period.
|
|
Defaults to now.
|
|
amount (int, optional): +/- periods.
|
|
Defaults to 0.
|
|
period (str, type, optional): 'y'|'year','m'|'month','w'|'week','d'|'day'.
|
|
Defaults to None.
|
|
|
|
Raises:
|
|
ValueError: 'period' value is wrong.
|
|
|
|
Returns:
|
|
dict: {
|
|
'period':day|week|month|year,
|
|
'start':{'y':int,'m':int,'w':int,'d':int},
|
|
'end':{'y':int,'m':int,'w':int,'d':int}
|
|
}.
|
|
"""
|
|
if Do.args_valid(locals(), Do.date_calc.__annotations__):
|
|
date = {}
|
|
if not period:
|
|
raise ValueError("'period' value is wrong: " + "''")
|
|
elif period == 'd' or period == 'day':
|
|
delta = target + datetime.timedelta(days=amount)
|
|
target = delta
|
|
date['period'] = 'day'
|
|
elif period == 'w' or period == 'week':
|
|
delta = target + datetime.timedelta(weeks=amount)
|
|
target_week = str(delta.year) + '-W' + str(delta.isocalendar()[1])
|
|
target = datetime.datetime.strptime(target_week + '-1', "%G-W%V-%u")
|
|
delta = target + datetime.timedelta(days=6)
|
|
date['period'] = 'week'
|
|
elif period == 'm' or period == 'month':
|
|
delta_month = (target.month + amount) % 12
|
|
if not delta_month:
|
|
delta_month = 12
|
|
delta_year = target.year + ((target.month) + amount - 1) // 12
|
|
delta_days = calendar.monthrange(delta_year, delta_month)[1]
|
|
delta = target = target.replace(
|
|
year=delta_year,
|
|
month=delta_month,
|
|
day=1
|
|
)
|
|
delta = delta.replace(
|
|
year=delta_year,
|
|
month=delta_month,
|
|
day=delta_days
|
|
)
|
|
date['period'] = 'month'
|
|
elif period == 'y' or period == 'year':
|
|
target = target.replace(
|
|
year=target.year + amount,
|
|
month=1,
|
|
day=1
|
|
)
|
|
delta = target.replace(
|
|
year=target.year,
|
|
month=12,
|
|
day=31
|
|
)
|
|
date['period'] = 'year'
|
|
else:
|
|
raise ValueError("'period' value is wrong: " + period)
|
|
date['start'] = {
|
|
'y': target.year,
|
|
'm': target.month,
|
|
'w': target.isocalendar()[1],
|
|
'd': target.day
|
|
}
|
|
date['end'] = {
|
|
'y': delta.year,
|
|
'm': delta.month,
|
|
'w': delta.isocalendar()[1],
|
|
'd': delta.day
|
|
}
|
|
return date
|
|
|
|
|
|
if __name__ == "__main__":
|
|
time_start = datetime.datetime.now()
|
|
|
|
if 'argparse' in modules:
|
|
args = ArgumentParser(
|
|
prog='cctv-scheduler',
|
|
description='Hikvision PTZ IP-Camera management.',
|
|
epilog='Dependencies: '
|
|
'- Python 3 (tested version 3.9.5), '
|
|
'- Python 3 modules: paramiko, smbprotocol '
|
|
)
|
|
args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf',
|
|
required=False,
|
|
help='custom configuration file path')
|
|
args.add_argument('-b', '--broadcast', action='store_true', required=False,
|
|
help='streaming media to destination')
|
|
args.add_argument('-s', '--sequences', action='store_true', required=False,
|
|
help='run sequences from config file')
|
|
args.add_argument('-c', '--converter', action='store_true', required=False,
|
|
help='convert JPEG collection to MP4')
|
|
args.add_argument('-p', '--publisher', action='store_true', required=False,
|
|
help='publish content from templates')
|
|
args.add_argument('-d', '--day', type=int, default=0, required=False,
|
|
help=('day in amount of days from today, '
|
|
+ 'for which the publication or conversion is made')
|
|
)
|
|
args.add_argument('-w', '--week', type=int, default=0, required=False,
|
|
help=('week in amount of weeks from today, '
|
|
+ 'for which the publication or conversion is made')
|
|
)
|
|
args.add_argument('-m', '--month', type=int, default=0, required=False,
|
|
help=('month in amount of months from today, '
|
|
+ 'for which the publication or conversion is made')
|
|
)
|
|
args.add_argument('-y', '--year', type=int, default=0, required=False,
|
|
help=('year in amount of years from today, '
|
|
+ 'for which the publication or conversion is made')
|
|
)
|
|
args = vars(args.parse_args())
|
|
|
|
temp_path = path.dirname(path.realpath(__file__)) + sep + 'temp'
|
|
log_root = path.dirname(path.realpath(__file__))
|
|
log_level = 'INFO'
|
|
if path.exists(args['config']):
|
|
|
|
# common
|
|
conf = Parse(parameters=args['config'], block='common')
|
|
if 'temp_path' in conf.data:
|
|
temp_path = conf.data['temp_path']
|
|
if 'log_root' in conf.data:
|
|
log_root = conf.data['log_root']
|
|
if 'log_level' in conf.data:
|
|
if conf.data['log_level'] == 'DEBUG':
|
|
log_level = logging.DEBUG
|
|
elif conf.data['log_level'] == 'INFO':
|
|
log_level = logging.INFO
|
|
elif conf.data['log_level'] == 'WARNING':
|
|
log_level = logging.WARNING
|
|
elif conf.data['log_level'] == 'ERROR':
|
|
log_level = logging.ERROR
|
|
elif conf.data['log_level'] == 'CRITICAL':
|
|
log_level = logging.CRITICAL
|
|
logging.basicConfig(
|
|
format='%(asctime)s %(levelname)s: %(name)s: %(message)s',
|
|
datefmt='%Y-%m-%d_%H.%M.%S',
|
|
handlers=[
|
|
logging.FileHandler(
|
|
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
|
|
mode='a'
|
|
),
|
|
logging.StreamHandler()
|
|
],
|
|
level=log_level
|
|
)
|
|
logging.getLogger("paramiko").setLevel(logging.WARNING)
|
|
logging.getLogger("smbprotocol").setLevel(logging.WARNING)
|
|
|
|
if args['broadcast']:
|
|
logging.info(msg='Streaming starts...')
|
|
|
|
broadcasts = {}
|
|
conf = Parse(parameters=args['config'], block='enable-broadcast')
|
|
for key, value in conf.data.items():
|
|
if value == 'true':
|
|
broadcast_config = Parse(
|
|
parameters=args['config'],
|
|
block='broadcast-config:' + key
|
|
).data
|
|
src = None
|
|
if 'src' in broadcast_config:
|
|
src = broadcast_config['src']
|
|
dst = None
|
|
if 'dst' in broadcast_config:
|
|
dst = broadcast_config['dst']
|
|
fps = None
|
|
if 'fps' in broadcast_config:
|
|
fps = broadcast_config['fps']
|
|
preset = None
|
|
if 'preset' in broadcast_config:
|
|
preset = broadcast_config['preset']
|
|
ffpath = None
|
|
if 'ffpath' in broadcast_config:
|
|
ffpath = broadcast_config['ffpath']
|
|
watchdog = None
|
|
if 'watchdog' in broadcast_config:
|
|
watchdog = broadcast_config['watchdog']
|
|
watchsec = None
|
|
if 'watchsec' in broadcast_config:
|
|
watchsec = int(broadcast_config['watchsec'])
|
|
onlyonce = None
|
|
if 'onlyonce' in broadcast_config:
|
|
onlyonce = broadcast_config['onlyonce']
|
|
exit_code = None
|
|
exit_code = FFmpeg.run(
|
|
src=src,
|
|
dst=dst,
|
|
fps=fps,
|
|
preset=preset,
|
|
ffpath=ffpath,
|
|
watchdog=watchdog,
|
|
watchsec=watchsec,
|
|
onlyonce=onlyonce,
|
|
logger_alias='streaming ' + key
|
|
)
|
|
logging.info(msg=''
|
|
+ 'Streaming ' + key + ' finished with exit code: '
|
|
+ str(exit_code)
|
|
)
|
|
|
|
elif args['sequences']:
|
|
logging.info(msg='Sequence starts...')
|
|
|
|
sensors = {}
|
|
conf = Parse(parameters=args['config'], block='enable-sensor')
|
|
for key, value in conf.data.items():
|
|
if value == 'true':
|
|
device_config = Parse(
|
|
parameters=args['config'],
|
|
block='sensor-config:' + key
|
|
).data
|
|
device_entity = Sensor(
|
|
hostname=device_config['hostname'],
|
|
username=device_config['username'],
|
|
userpass=device_config['userpass'],
|
|
nodetype=device_config['nodetype'],
|
|
nodename=device_config['nodename']
|
|
)
|
|
sensors[key] = device_entity
|
|
|
|
conf = Parse(parameters=args['config'], block='enable-sequences')
|
|
for key, value in conf.data.items():
|
|
if value == 'true':
|
|
device_sequence = Parse(
|
|
parameters=args['config'],
|
|
block='camera-sequences:' + key
|
|
).data
|
|
device_config = Parse(
|
|
parameters=args['config'],
|
|
block='camera-config:' + key
|
|
).data
|
|
device_entity = HikISAPI(
|
|
hostname=device_config['hostname'],
|
|
username=device_config['username'],
|
|
userpass=device_config['userpass']
|
|
)
|
|
records_root_path = path.dirname(path.realpath(__file__))
|
|
records_root_host = None
|
|
records_root_port = None
|
|
records_root_type = None
|
|
records_root_user = None
|
|
records_root_pass = None
|
|
if 'records_root_path' in device_config:
|
|
records_root_path = device_config['records_root_path']
|
|
if 'records_root_host' in device_config:
|
|
records_root_host = device_config['records_root_host']
|
|
if 'records_root_port' in device_config:
|
|
records_root_port = int(device_config['records_root_port'])
|
|
if 'records_root_type' in device_config:
|
|
records_root_type = device_config['records_root_type']
|
|
if 'records_root_user' in device_config:
|
|
records_root_user = device_config['records_root_user']
|
|
if 'records_root_pass' in device_config:
|
|
records_root_pass = device_config['records_root_pass']
|
|
Sequence.run(
|
|
device=device_entity,
|
|
sensors=sensors,
|
|
sequence=device_sequence,
|
|
temp_path=temp_path + sep + Do.random_string(8),
|
|
records_root_path=records_root_path,
|
|
records_root_host=records_root_host,
|
|
records_root_port=records_root_port,
|
|
records_root_type=records_root_type,
|
|
records_root_user=records_root_user,
|
|
records_root_pass=records_root_pass,
|
|
logger_alias='sequence ' + key
|
|
)
|
|
logging.info(msg='Sequence ' + key + ' finished')
|
|
|
|
elif args['day'] or args['week'] or args['month'] or args['year']:
|
|
period = None
|
|
amount = None
|
|
if args['day']:
|
|
period = 'day'
|
|
amount = args['day']
|
|
if args['week']:
|
|
period = 'week'
|
|
amount = args['week']
|
|
if args['month']:
|
|
period = 'month'
|
|
amount = args['month']
|
|
if args['year']:
|
|
period = 'year'
|
|
amount = args['year']
|
|
period = Do.date_calc(period=period, amount=amount)
|
|
|
|
if args['converter']:
|
|
logging.info(msg='Converting starts...')
|
|
|
|
conf = Parse(parameters=args['config'], block='enable-convert')
|
|
for key, value in conf.data.items():
|
|
if value == 'true':
|
|
convert_config = Parse(
|
|
parameters=args['config'],
|
|
block='convert-config:' + key
|
|
).data
|
|
image_root_path = path.dirname(path.realpath(__file__))
|
|
image_root_host = None
|
|
image_root_port = None
|
|
image_root_type = None
|
|
image_root_user = None
|
|
image_root_pass = None
|
|
image_find_names = None
|
|
if 'image_root_path' in convert_config:
|
|
image_root_path = convert_config['image_root_path']
|
|
if 'image_root_host' in convert_config:
|
|
image_root_host = convert_config['image_root_host']
|
|
if 'image_root_port' in convert_config:
|
|
image_root_port = int(convert_config['image_root_port'])
|
|
if 'image_root_type' in convert_config:
|
|
image_root_type = convert_config['image_root_type']
|
|
if 'image_root_user' in convert_config:
|
|
image_root_user = convert_config['image_root_user']
|
|
if 'image_root_pass' in convert_config:
|
|
image_root_pass = convert_config['image_root_pass']
|
|
image_find_names = []
|
|
for name in convert_config['image_find_names'].split(','):
|
|
image_find_names.append(name.strip())
|
|
|
|
if period['period'] == 'day':
|
|
image_root_path = (''
|
|
+ image_root_path + '/'
|
|
+ str(period['start']['y']) + '/'
|
|
+ str(period['start']['m']).zfill(2) + '/'
|
|
+ str(period['start']['w']).zfill(2) + '/'
|
|
+ str(period['start']['d']).zfill(2)
|
|
)
|
|
video_dest_sufx = (''
|
|
+ str(period['start']['y']) + '.'
|
|
+ str(period['start']['m']).zfill(2) + '.'
|
|
+ str(period['start']['d']).zfill(2)
|
|
)
|
|
video_duration = 1
|
|
if period['period'] == 'week':
|
|
if period['start']['m'] == period['end']['m']:
|
|
image_root_path = (''
|
|
+ image_root_path + '/'
|
|
+ str(period['start']['y']) + '/'
|
|
+ str(period['start']['m']).zfill(2) + '/'
|
|
+ str(period['start']['w']).zfill(2)
|
|
)
|
|
else:
|
|
image_root_path = [
|
|
(''
|
|
+ image_root_path + '/'
|
|
+ str(period['start']['y']) + '/'
|
|
+ str(period['start']['m']).zfill(2) + '/'
|
|
+ str(period['start']['w']).zfill(2)
|
|
),
|
|
(''
|
|
+ image_root_path + '/'
|
|
+ str(period['end']['y']) + '/'
|
|
+ str(period['end']['m']).zfill(2) + '/'
|
|
+ str(period['end']['w']).zfill(2)
|
|
)
|
|
]
|
|
video_dest_sufx = (''
|
|
+ str(period['start']['y']) + '-w'
|
|
+ str(period['start']['w']).zfill(2)
|
|
)
|
|
video_duration = 7
|
|
if period['period'] == 'month':
|
|
image_root_path = (''
|
|
+ image_root_path + '/'
|
|
+ str(period['start']['y']) + '/'
|
|
+ str(period['start']['m']).zfill(2)
|
|
)
|
|
video_dest_sufx = (''
|
|
+ str(period['start']['y']) + '.'
|
|
+ str(period['start']['m']).zfill(2)
|
|
)
|
|
video_duration = 30
|
|
if period['period'] == 'year':
|
|
image_root_path = (''
|
|
+ image_root_path + '/'
|
|
+ str(period['start']['y'])
|
|
)
|
|
video_dest_sufx = (''
|
|
+ str(period['start']['y'])
|
|
)
|
|
video_duration = 360
|
|
|
|
video_dest_path = path.dirname(path.realpath(__file__))
|
|
video_dest_host = None
|
|
video_dest_port = None
|
|
video_dest_type = None
|
|
video_dest_user = None
|
|
video_dest_pass = None
|
|
if 'video_dest_path' in convert_config:
|
|
video_dest_path = convert_config['video_dest_path']
|
|
if 'video_dest_host' in convert_config:
|
|
video_dest_host = convert_config['video_dest_host']
|
|
if 'video_dest_port' in convert_config:
|
|
video_dest_port = int(convert_config['video_dest_port'])
|
|
if 'video_dest_type' in convert_config:
|
|
video_dest_type = convert_config['video_dest_type']
|
|
if 'video_dest_user' in convert_config:
|
|
video_dest_user = convert_config['video_dest_user']
|
|
if 'video_dest_pass' in convert_config:
|
|
video_dest_pass = convert_config['video_dest_pass']
|
|
|
|
Convert.run(
|
|
image_root_path=image_root_path,
|
|
image_root_host=image_root_host,
|
|
image_root_port=image_root_port,
|
|
image_root_type=image_root_type,
|
|
image_root_user=image_root_user,
|
|
image_root_pass=image_root_pass,
|
|
image_find_names=image_find_names,
|
|
video_dest_path=video_dest_path,
|
|
video_dest_sufx=video_dest_sufx,
|
|
video_dest_host=video_dest_host,
|
|
video_dest_port=video_dest_port,
|
|
video_dest_type=video_dest_type,
|
|
video_dest_user=video_dest_user,
|
|
video_dest_pass=video_dest_pass,
|
|
video_scale_x=int(convert_config['video_scale_x']),
|
|
video_scale_y=int(convert_config['video_scale_y']),
|
|
video_framerate=int(convert_config['video_framerate']),
|
|
video_duration=video_duration,
|
|
temp_path=temp_path + sep + Do.random_string(8),
|
|
logger_alias='converting ' + key
|
|
)
|
|
logging.info(msg='Converting ' + key + ' finished')
|
|
if args['publisher']:
|
|
logging.info(msg='Publishing starts...')
|
|
|
|
conf = Parse(parameters=args['config'], block='enable-publish')
|
|
for key, value in conf.data.items():
|
|
if value == 'true':
|
|
publish_config = Parse(
|
|
parameters=args['config'],
|
|
block='publish-config:' + key
|
|
).data
|
|
video_root_path = path.dirname(path.realpath(__file__))
|
|
video_root_host = None
|
|
video_root_port = None
|
|
video_root_type = None
|
|
video_root_user = None
|
|
video_root_pass = None
|
|
video_find_names = None
|
|
if 'video_root_path' in publish_config:
|
|
video_root_path = publish_config['video_root_path']
|
|
if 'video_root_host' in publish_config:
|
|
video_root_host = publish_config['video_root_host']
|
|
if 'video_root_port' in publish_config:
|
|
video_root_port = int(publish_config['video_root_port'])
|
|
if 'video_root_type' in publish_config:
|
|
video_root_type = publish_config['video_root_type']
|
|
if 'video_root_user' in publish_config:
|
|
video_root_user = publish_config['video_root_user']
|
|
if 'video_root_pass' in publish_config:
|
|
video_root_pass = publish_config['video_root_pass']
|
|
|
|
video_find_names = []
|
|
for name in publish_config['video_find_names'].split(','):
|
|
video_find_names.append(name.strip())
|
|
|
|
wp_site_name = None
|
|
wp_user_name = None
|
|
wp_user_pass = None
|
|
wp_update_page_id = None
|
|
if ('wp_enabled' in publish_config and
|
|
publish_config['wp_enabled'] == 'true'
|
|
):
|
|
if 'wp_site_name' in publish_config:
|
|
wp_site_name = publish_config['wp_site_name']
|
|
if 'wp_user_name' in publish_config:
|
|
wp_user_name = publish_config['wp_user_name']
|
|
if 'wp_user_pass' in publish_config:
|
|
wp_user_pass = publish_config['wp_user_pass']
|
|
if 'wp_update_page_id' in publish_config:
|
|
wp_update_page_id = int(publish_config['wp_update_page_id'])
|
|
|
|
tg_api_key = None
|
|
tg_chat_id = None
|
|
if ('tg_enabled' in publish_config and
|
|
publish_config['tg_enabled'] == 'true'
|
|
):
|
|
if 'tg_api_key' in publish_config:
|
|
tg_api_key = publish_config['tg_api_key']
|
|
if 'tg_chat_id' in publish_config:
|
|
tg_chat_id = publish_config['tg_chat_id']
|
|
|
|
Publish.run(
|
|
video_root_path=video_root_path,
|
|
video_root_host=video_root_host,
|
|
video_root_port=video_root_port,
|
|
video_root_type=video_root_type,
|
|
video_root_user=video_root_user,
|
|
video_root_pass=video_root_pass,
|
|
video_find_names=video_find_names,
|
|
temp_path=temp_path + sep + Do.random_string(8),
|
|
publish_period=period['period'],
|
|
publish_amount=amount,
|
|
wp_site_name=wp_site_name,
|
|
wp_user_name=wp_user_name,
|
|
wp_user_pass=wp_user_pass,
|
|
wp_update_page_id=wp_update_page_id,
|
|
tg_api_key=tg_api_key,
|
|
tg_chat_id=tg_chat_id,
|
|
logger_alias='publishing ' + key
|
|
)
|
|
logging.info(msg='Publishing ' + key + ' finished')
|
|
|
|
else:
|
|
logging.info(msg='Start arguments was not selected. Exit.')
|
|
|
|
time_execute = datetime.datetime.now() - time_start
|
|
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')
|