add local paths support

This commit is contained in:
Pavel Muhortov 2023-06-30 15:18:20 +03:00
parent c3b2d897c7
commit fa636bf603
2 changed files with 225 additions and 228 deletions

View File

@ -93,7 +93,7 @@ userpass = pass
#
# If a record directory on a remote host is used, a username and password must be specified.
# Supported protocols:
# FTP, SFTP, SMB.
# FTP, SFTP, SMB or local path.
records_root_path = ftp://user:pass@192.168.254.254:21/Records/camera.test.local
#
# One line parameters string has lower priority and parameters are overwritten by
@ -155,7 +155,7 @@ step999 = rebootcamera, -, -, -, -, -, -,
image_find_names = step071, image-01, image-02
# If image root or destination video directories on a remote host is used, username and password must be specified.
# Supported protocols:
# FTP, SFTP, SMB.
# FTP, SFTP, SMB or local path.
image_root_path = sftp://user:pass@192.168.254.254/Records/camera.test.local
#
# One line parameters string has lower priority and parameters are overwritten by
@ -207,26 +207,26 @@ video_framerate = 25
video_find_names = step071, image-01, image-02
# If a video directory on a remote host is used, a username and password must be specified.
# Supported protocols:
# FTP, SFTP, SMB.
video_root_path = ftp://user:pass@192.168.254.254/Downloads
# FTP, SFTP, SMB or local path.
video_root_path = /home/user/Downloads
#
# One line parameters string has lower priority and parameters are overwritten by
# separated parameter variables if you use both.
#
#video_dest_path = /Downloads
#video_dest_path = /home/user/Downloads
#
#video_dest_host = 192.168.254.254
#video_dest_host =
#
# Optionality you can set custom connection port:
#video_dest_port = 21
#video_dest_port =
#
# You must set connection type (ftp is faster than sftp, sftp is faster than smb):
# ftp, sftp, smb.
#video_dest_type = ftp
#video_dest_type =
#
#video_dest_user = user
#video_dest_user =
#
#video_dest_pass = pass
#video_dest_pass =
#
# Optionality you can enable or disable publishing by Wordpress:
# true - Wordpress enabled, false - Wordpress disbaled.

View File

@ -17,6 +17,7 @@ from argparse import ArgumentParser
from ftplib import FTP
from multiprocessing import Process, Queue
from os import environ, makedirs, path, remove, rmdir, sep, stat, walk
from shutil import copyfile
from string import ascii_letters, digits
from subprocess import Popen, PIPE, STDOUT
from sys import modules, platform
@ -689,52 +690,6 @@ class Connect:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def ssh_commands(
command: str,
hostname: str,
username: str,
password: str,
port: int = 22,
logger_alias: str = inspect.stack()[0].function
) -> str:
"""Handling SSH command executing.
Args:
command (str): command for executing.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
str: terminal response or 'ERROR'.
"""
local_logger = logging.getLogger(logger_alias)
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_logger.debug(msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'command: ' + command
)
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
stdin, stdout, stderr = client.exec_command(command=command, get_pty=True)
if 'sudo' in command:
stdin.write(password + '\n')
stdin.flush()
stdout.flush()
data = stdout.read() + stderr.read()
client.close()
return data.decode('utf-8')
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return 'ERROR'
@staticmethod
# pylint: disable=W0718
def ftp_file_search(
@ -914,6 +869,53 @@ class Connect:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def ssh_commands(
command: str,
hostname: str,
username: str,
password: str,
port: int = 22,
logger_alias: str = inspect.stack()[0].function
) -> str:
"""Handling SSH command executing.
Args:
command (str): command for executing.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
port (int, optional): remote host connection port. Defaults to 22.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
str: terminal response or 'ERROR'.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ssh_commands.__annotations__):
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_logger.debug(msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'command: ' + command
)
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
stdin, stdout, stderr = client.exec_command(command=command, get_pty=True)
if 'sudo' in command:
stdin.write(password + '\n')
stdin.flush()
stdout.flush()
data = stdout.read() + stderr.read()
client.close()
return data.decode('utf-8')
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return 'ERROR'
@staticmethod
# pylint: disable=W0718
def ssh_file_search(
@ -1264,9 +1266,8 @@ class Connect:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0612
def local_file_search(
root_path: str,
search_name: (str, type(None)) = None,
@ -1290,9 +1291,19 @@ class Connect:
+ '\n' + 'root_path: ' + root_path
+ '\n' + 'search_name: ' + str(search_name)
)
return []
result = []
for root, dirs, files in walk(root_path, topdown=False):
for file in files:
if search_name:
if search_name in file:
result.append(path.join(path.realpath(root), file))
else:
result.append(path.join(path.realpath(root), file))
result.sort()
return result
@staticmethod
# pylint: disable=W0718
def local_get_file(
src_file: str,
dst_file: str,
@ -1314,9 +1325,16 @@ class Connect:
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
)
try:
makedirs(path.dirname(dst_file), exist_ok=True)
copyfile(src=src_file, dst=dst_file)
return {"success": True, "result": dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
# pylint: disable=W0718
def local_put_file(
src_file: str,
dst_file: str,
@ -1338,10 +1356,14 @@ class Connect:
+ '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file
)
try:
makedirs(path.dirname(dst_file), exist_ok=True)
copyfile(src=src_file, dst=dst_file)
return {"success": True, "result": dst_file}
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return {"success": False, "result": "ERROR"}
@staticmethod
def parse_connect_params(
connect_string: str,
@ -3314,7 +3336,7 @@ class Convert:
image_temp_list = temp_path + sep + 'convert.list'
image_amount = 0
with open(image_temp_list, mode='w+', encoding='UTF-8') as converter_list:
for file in Do.file_search(root_path=temp_path, search_name=name):
for file in Connect.file_search(search_path=temp_path, search_name=name):
converter_list.write("\nfile '" + file + "'")
image_amount += 1
temp_files.append(image_temp_list)
@ -3360,9 +3382,10 @@ class Convert:
class Publish:
"""Publish handling.
"""
@staticmethod
@classmethod
# pylint: disable=W0612
def run(
cls,
video_root_path: (str, list),
video_find_names: list,
temp_path: str,
@ -3490,7 +3513,7 @@ class Publish:
if tg_api_key:
tg = Telegram(tg_api_key)
Do.tg_routine_media(
cls.tg_routine_media(
tg=tg,
targets_media_files=video_files,
period=publish_date['period'],
@ -3504,7 +3527,7 @@ class Publish:
username=wp_user_name,
password=wp_user_pass
)
Do.wp_routine_media(
cls.wp_routine_media(
wp=wp,
targets_media_files=video_files,
period=publish_date['period'],
@ -3523,160 +3546,6 @@ class Publish:
except OSError as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
class Do():
"""Set of various methods (functions) for routine.
"""
@staticmethod
def random_string(length: int) -> str:
"""Generate string from lowercase letters, uppercase letters, digits.
Args:
length (int): string lenght.
Returns:
str: random string.
"""
return ''.join(choice(ascii_letters + digits) for i in range(length))
@staticmethod
def args_valid(arguments: dict, annotations: dict) -> bool:
"""Arguments type validating by annotations.
Args:
arguments (dict): 'locals()' immediately after starting the function.
annotations (dict): function.name.__annotations__.
Raises:
TypeError: type of argument is not equal type in annotation.
Returns:
bool: True if argument types are valid.
"""
for var_name, var_type in annotations.items():
if not var_name == 'return':
if not isinstance(arguments[var_name], var_type):
raise TypeError(""
+ "type of '"
+ var_name
+ "' = "
+ str(arguments[var_name])
+ " is not "
+ str(var_type)
)
return True
@staticmethod
def date_calc(
target: datetime.date = datetime.datetime.now(),
amount: int = 0,
period: (str, type(None)) = None
) -> dict:
"""Calculating start/end dates for period: day, week, month, year.
Args:
target (datetime.date, optional): date in the calculation period.
Defaults to now.
amount (int, optional): +/- periods.
Defaults to 0.
period (str, type, optional): 'y'|'year','m'|'month','w'|'week','d'|'day'.
Defaults to None.
Raises:
ValueError: 'period' value is wrong.
Returns:
dict: {
'period':day|week|month|year,
'start':{'y':int,'m':int,'w':int,'d':int},
'end':{'y':int,'m':int,'w':int,'d':int}
}.
"""
if Do.args_valid(locals(), Do.date_calc.__annotations__):
date = {}
if not period:
raise ValueError("'period' value is wrong: " + "''")
elif period == 'd' or period == 'day':
delta = target + datetime.timedelta(days=amount)
target = delta
date['period'] = 'day'
elif period == 'w' or period == 'week':
delta = target + datetime.timedelta(weeks=amount)
target_week = str(delta.year) + '-W' + str(delta.isocalendar()[1])
target = datetime.datetime.strptime(target_week + '-1', "%G-W%V-%u")
delta = target + datetime.timedelta(days=6)
date['period'] = 'week'
elif period == 'm' or period == 'month':
delta_month = (target.month + amount) % 12
if not delta_month:
delta_month = 12
delta_year = target.year + ((target.month) + amount - 1) // 12
delta_days = calendar.monthrange(delta_year, delta_month)[1]
delta = target = target.replace(
year=delta_year,
month=delta_month,
day=1
)
delta = delta.replace(
year=delta_year,
month=delta_month,
day=delta_days
)
date['period'] = 'month'
elif period == 'y' or period == 'year':
target = target.replace(
year=target.year + amount,
month=1,
day=1
)
delta = target.replace(
year=target.year,
month=12,
day=31
)
date['period'] = 'year'
else:
raise ValueError("'period' value is wrong: " + period)
date['start'] = {
'y': target.year,
'm': target.month,
'w': target.isocalendar()[1],
'd': target.day
}
date['end'] = {
'y': delta.year,
'm': delta.month,
'w': delta.isocalendar()[1],
'd': delta.day
}
return date
@staticmethod
# pylint: disable=W0612
def file_search(root_path: str, search_name: (str, type(None)) = None) -> list:
"""Search files.
Args:
root_path (str): where to search.
search_name (str, type, optional): full or partial filename for the filter.
Defaults to None.
Returns:
list: list of found files.
"""
found_list = []
if Do.args_valid(locals(), Do.file_search.__annotations__):
for root, dirs, files in walk(root_path, topdown=False):
for file in files:
if search_name:
if search_name in file:
found_list.append(path.join(path.realpath(root), file))
else:
found_list.append(path.join(path.realpath(root), file))
found_list.sort()
return found_list
@staticmethod
def wp_routine_media(
wp: Wordpress,
@ -3700,7 +3569,7 @@ class Do():
dict: {'media upload': bool, 'event create': bool, 'pages update': bool}
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Do.wp_routine_media.__annotations__):
if Do.args_valid(locals(), Publish.wp_routine_media.__annotations__):
default_media_links = {
"day": {
"point-01": (
@ -3968,7 +3837,7 @@ class Do():
dict: {'success':bool,'result':response}.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Do.tg_routine_media.__annotations__):
if Do.args_valid(locals(), Publish.tg_routine_media.__annotations__):
default_caption = (""
+ "`period:` yyyy.mm.dd\n"
+ "`source:` https://www.hmp.today/media\n"
@ -4087,6 +3956,134 @@ class Do():
return response_result
class Do():
"""Set of various methods (functions) for routine.
"""
@staticmethod
def random_string(length: int) -> str:
"""Generate string from lowercase letters, uppercase letters, digits.
Args:
length (int): string lenght.
Returns:
str: random string.
"""
return ''.join(choice(ascii_letters + digits) for i in range(length))
@staticmethod
def args_valid(arguments: dict, annotations: dict) -> bool:
"""Arguments type validating by annotations.
Args:
arguments (dict): 'locals()' immediately after starting the function.
annotations (dict): function.name.__annotations__.
Raises:
TypeError: type of argument is not equal type in annotation.
Returns:
bool: True if argument types are valid.
"""
for var_name, var_type in annotations.items():
if not var_name == 'return':
if not isinstance(arguments[var_name], var_type):
raise TypeError(""
+ "type of '"
+ var_name
+ "' = "
+ str(arguments[var_name])
+ " is not "
+ str(var_type)
)
return True
@staticmethod
def date_calc(
target: datetime.date = datetime.datetime.now(),
amount: int = 0,
period: (str, type(None)) = None
) -> dict:
"""Calculating start/end dates for period: day, week, month, year.
Args:
target (datetime.date, optional): date in the calculation period.
Defaults to now.
amount (int, optional): +/- periods.
Defaults to 0.
period (str, type, optional): 'y'|'year','m'|'month','w'|'week','d'|'day'.
Defaults to None.
Raises:
ValueError: 'period' value is wrong.
Returns:
dict: {
'period':day|week|month|year,
'start':{'y':int,'m':int,'w':int,'d':int},
'end':{'y':int,'m':int,'w':int,'d':int}
}.
"""
if Do.args_valid(locals(), Do.date_calc.__annotations__):
date = {}
if not period:
raise ValueError("'period' value is wrong: " + "''")
elif period == 'd' or period == 'day':
delta = target + datetime.timedelta(days=amount)
target = delta
date['period'] = 'day'
elif period == 'w' or period == 'week':
delta = target + datetime.timedelta(weeks=amount)
target_week = str(delta.year) + '-W' + str(delta.isocalendar()[1])
target = datetime.datetime.strptime(target_week + '-1', "%G-W%V-%u")
delta = target + datetime.timedelta(days=6)
date['period'] = 'week'
elif period == 'm' or period == 'month':
delta_month = (target.month + amount) % 12
if not delta_month:
delta_month = 12
delta_year = target.year + ((target.month) + amount - 1) // 12
delta_days = calendar.monthrange(delta_year, delta_month)[1]
delta = target = target.replace(
year=delta_year,
month=delta_month,
day=1
)
delta = delta.replace(
year=delta_year,
month=delta_month,
day=delta_days
)
date['period'] = 'month'
elif period == 'y' or period == 'year':
target = target.replace(
year=target.year + amount,
month=1,
day=1
)
delta = target.replace(
year=target.year,
month=12,
day=31
)
date['period'] = 'year'
else:
raise ValueError("'period' value is wrong: " + period)
date['start'] = {
'y': target.year,
'm': target.month,
'w': target.isocalendar()[1],
'd': target.day
}
date['end'] = {
'y': delta.year,
'm': delta.month,
'w': delta.isocalendar()[1],
'd': delta.day
}
return date
if __name__ == "__main__":
time_start = datetime.datetime.now()