add additional headers to Connect.http()

This commit is contained in:
Pavel Muhortov 2023-06-18 10:44:43 +03:00
parent db76bb8e0b
commit 699492f77f

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# pylint: disable=C0103,C0302,C0114,W0621
import base64
import json import json
import logging import logging
import urllib.request import urllib.request
@ -18,6 +20,7 @@ from paramiko import SSHClient, AutoAddPolicy
class Parse: class Parse:
"""Parser of configs, arguments, parameters. """Parser of configs, arguments, parameters.
""" """
# pylint: disable=C0123
def __init__(self, parameters, block: str = None) -> None: def __init__(self, parameters, block: str = None) -> None:
"""Object constructor. """Object constructor.
@ -65,6 +68,7 @@ class Parse:
""" """
self.data.update(dictionary) self.data.update(dictionary)
# pylint: disable=C0206
def expand(self, store: str = None) -> dict: def expand(self, store: str = None) -> dict:
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}. """Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
@ -79,7 +83,7 @@ class Parse:
config = store + sep + self.data[key] config = store + sep + self.data[key]
else: else:
config = self.data[key] config = self.data[key]
with open(config) as file: with open(config, encoding='UTF-8') as file:
self.data[key] = Parse(file.read()).data self.data[key] = Parse(file.read()).data
return self.data return self.data
@ -106,7 +110,7 @@ class Parse:
Returns: Returns:
str: string as "var1=val1;\nvar2=val2;". str: string as "var1=val1;\nvar2=val2;".
""" """
with open(config) as file: with open(config, encoding='UTF-8') as file:
raw = file.read() raw = file.read()
strs = '' strs = ''
for line in raw.splitlines(): for line in raw.splitlines():
@ -130,7 +134,9 @@ class Parse:
strings = cls.block(blockname, strings) strings = cls.block(blockname, strings)
for line in strings.replace('\n', ';').split(';'): for line in strings.replace('\n', ';').split(';'):
if not line.lstrip().startswith('#') and "=" in line: if not line.lstrip().startswith('#') and "=" in line:
dictionary[line.split('=')[0].strip()] = line.split('=')[1].strip().split(';')[0].strip() dictionary[line.split('=')[0].strip()] = (
line.split('=')[1].strip().split(';')[0].strip()
)
return dictionary return dictionary
@classmethod @classmethod
@ -171,28 +177,41 @@ class Parse:
class Connect: class Connect:
# pylint: disable=W0105
"""Set of connection methods (functions) for various protocols. """Set of connection methods (functions) for various protocols.
""" """
@staticmethod @staticmethod
# pylint: disable=W0102, W0718
def http( def http(
url: str, method: str = 'GET', url: str,
username: str = '', password: str = '', authtype: str = None, method: str = 'GET',
contenttype: str = 'text/plain', contentdata: str = '' username: str = '',
) -> str: password: str = '',
authtype: (str, type(None)) = None,
contenttype: str = 'text/plain',
contentdata: (str, bytes) = '',
headers: dict = {}
) -> dict:
"""Handling HTTP request. """Handling HTTP request.
Args: Args:
url (str): request url. url (str): Handling HTTP request.
method (str, optional): HTTP request method. Defaults to 'GET'. method (str, optional): HTTP request method. Defaults to 'GET'.
username (str, optional): username for url authentication. Defaults to ''. username (str, optional): username for url authentication. Defaults to ''.
password (str, optional): password for url authentication. Defaults to ''. password (str, optional): password for url authentication. Defaults to ''.
authtype (str, optional): digest|basic authentication type. Defaults to None. authtype (str, None, optional): digest|basic authentication type. Defaults to None.
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'. contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
contentdata (str, optional): content data. Defaults to ''. contentdata (str, bytes, optional): content data. Defaults to ''.
headers (dict, optional): additional headers. Defaults to {}.
Returns: Returns:
str: HTTP response or 'ERROR'. dict: {'success':bool,'result':HTTP response or 'ERROR'}.
""" """
if Do.args_valid(locals(), Connect.http.__annotations__):
if contentdata != '':
headers['Content-Type'] = contenttype
if isinstance(contentdata, str):
contentdata = bytes(contentdata.encode('utf-8'))
# Preparing authorization # Preparing authorization
if authtype: if authtype:
@ -200,37 +219,51 @@ class Connect:
pswd.add_password(None, url, username, password) pswd.add_password(None, url, username, password)
if authtype == 'basic': if authtype == 'basic':
auth = urllib.request.HTTPBasicAuthHandler(pswd) auth = urllib.request.HTTPBasicAuthHandler(pswd)
token = base64.b64encode((username + ':' + password).encode())
headers['Authorization'] = 'Basic ' + token.decode('utf-8')
if authtype == 'digest': if authtype == 'digest':
auth = urllib.request.HTTPDigestAuthHandler(pswd) auth = urllib.request.HTTPDigestAuthHandler(pswd)
urllib.request.install_opener(urllib.request.build_opener(auth)) urllib.request.install_opener(urllib.request.build_opener(auth))
# Preparing request # Preparing request
request = urllib.request.Request(url=url, data=bytes(contentdata.encode('utf-8')), method=method) request = urllib.request.Request(
request.add_header('Content-Type', contenttype) url=url,
data=contentdata,
# Response method=method
try: )
response = urllib.request.urlopen(request).read() for key, val in headers.items():
logging.debug( request.add_header(key, val)
msg='' if len(contentdata) > 128:
contentdata = contentdata[:64] + b' ... ' + contentdata[-64:]
logging.debug(msg=''
+ '\n' + 'uri: ' + url + '\n' + 'uri: ' + url
+ '\n' + 'method: ' + method + '\n' + 'method: ' + method
+ '\n' + 'username: ' + username + '\n' + 'username: ' + username
+ '\n' + 'password: ' + password + '\n' + 'password: ' + password
+ '\n' + 'authtype: ' + authtype + '\n' + 'authtype: ' + str(authtype)
+ '\n' + 'content-type: ' + contenttype + '\n' + 'headers: ' + json.dumps(headers, indent=2)
+ '\n' + 'content-data: ' + contentdata + '\n' + 'content-data: ' + str(contentdata)
) )
if response.startswith(b'\xff\xd8'):
return response # Response
else: try:
return str(response.decode('utf-8')) response = urllib.request.urlopen(request).read()
if not response.startswith(b'\xff\xd8'):
response = str(response.decode('utf-8'))
return {"success": True, "result": response}
except Exception as error: except Exception as error:
logging.debug(msg='\n' + 'error: ' + str(error)) logging.debug(msg='\n' + 'error: ' + str(error))
return 'ERROR' return {"success": False, "result": "ERROR"}
@staticmethod @staticmethod
def ssh_commands(command: str, hostname: str, username: str, password: str, port: int = 22) -> str: # pylint: disable=W0718
def ssh_commands(
command: str,
hostname: str,
username: str,
password: str,
port: int = 22
) -> str:
"""Handling SSH command executing. """Handling SSH command executing.
Args: Args:
@ -267,7 +300,15 @@ class Connect:
return 'ERROR' return 'ERROR'
@staticmethod @staticmethod
def ssh_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str, port: int = 22) -> str: # pylint: disable=W0718
def ssh_put_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str,
port: int = 22
) -> str:
"""Handling SFTP upload file. """Handling SFTP upload file.
Args: Args:
@ -346,7 +387,14 @@ class Connect:
return 'ERROR' return 'ERROR'
''' '''
@staticmethod @staticmethod
def ftp_put_file(src_file: str, dst_file: str, hostname: str, username: str, password: str) -> bool: # pylint: disable=W0718,C0116
def ftp_put_file(
src_file: str,
dst_file: str,
hostname: str,
username: str,
password: str
) -> bool:
dst_path = dst_file.split('/')[:-1] dst_path = dst_file.split('/')[:-1]
ftp = FTP(host=hostname) ftp = FTP(host=hostname)
try: try:
@ -430,17 +478,22 @@ class HikISAPI(Connect):
Args: Args:
url (str): API path for request. url (str): API path for request.
method (str, optional): HTTP request method. Defaults to 'GET'. method (str, optional): HTTP request method. Defaults to 'GET'.
contenttype (str, optional): Content-Type header. Defaults to 'application/x-www-form-urlencoded'. contenttype (str, optional): Content-Type header.
Defaults to 'application/x-www-form-urlencoded'.
contentdata (str, optional): data for send with request. Defaults to ''. contentdata (str, optional): data for send with request. Defaults to ''.
Returns: Returns:
str: HTTP response content. str: HTTP response content or 'ERROR'.
""" """
return self.http( response = self.http(
url=url, method=method, url=url, method=method,
username=self._user, password=self._pswd, authtype=self._auth, username=self._user, password=self._pswd, authtype=self._auth,
contenttype=contenttype, contentdata=contentdata contenttype=contenttype, contentdata=contentdata
) )
if response['success']:
return response['result']
else:
return 'ERROR'
def capabilities(self) -> bool: def capabilities(self) -> bool:
"""Get camera capabilities. """Get camera capabilities.
@ -459,11 +512,16 @@ class HikISAPI(Connect):
else: else:
return False return False
def downloadjpeg(self, dst_file: str = path.splitext(__file__)[0] + '.jpeg', x: int = 1920, y: int = 1080) -> bool: def downloadjpeg(
self,
dst_file: str = path.splitext(__file__)[0] + '.jpeg',
x: int = 1920,
y: int = 1080
) -> bool:
"""Get static picture from camera. """Get static picture from camera.
Args: Args:
dst_file (str, optional): absolute path of picture to save. Defaults to scriptname+'.jpeg'. dst_file (str, optional): abs picture's path to save. Defaults to scriptname+'.jpeg'.
x (int, optional): picture width. Defaults to 1920. x (int, optional): picture width. Defaults to 1920.
y (int, optional): picture height. Defaults to 1080. y (int, optional): picture height. Defaults to 1080.
@ -491,8 +549,10 @@ class HikISAPI(Connect):
Returns: Returns:
bool: True if successed. Printing a response with a logger at the INFO level. bool: True if successed. Printing a response with a logger at the INFO level.
""" """
url = (self._prot + '://' + self._host + ':' + str(self._port) url = (
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status") self._prot + '://' + self._host + ':' + str(self._port)
+ "/ISAPI/PTZCtrl/channels/" + str(self._chan) + "/status"
)
response = self.__call(url=url, method='GET') response = self.__call(url=url, method='GET')
if response != 'ERROR': if response != 'ERROR':
logging.info(msg='\n' + response + '\n') logging.info(msg='\n' + response + '\n')
@ -726,9 +786,12 @@ class HikISAPI(Connect):
"""Set camera moving to direction until other signal or 180 seconds elapse. """Set camera moving to direction until other signal or 180 seconds elapse.
Args: Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0. x (int, optional): acceleration of horizontal camera movement from -100 to 100.
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0. y (int, optional): acceleration of vertical camera movement from -100 to 100.
Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
Returns: Returns:
bool: True if successed. bool: True if successed.
@ -757,10 +820,14 @@ class HikISAPI(Connect):
"""Set camera moving to direction until other signal or duration elapse. """Set camera moving to direction until other signal or duration elapse.
Args: Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0. x (int, optional): acceleration of horizontal camera movement from -100 to 100.
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0. y (int, optional): acceleration of vertical camera movement from -100 to 100.
t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 180000. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000.
Defaults to 180000.
Returns: Returns:
bool: True if successed. bool: True if successed.
@ -791,10 +858,14 @@ class HikISAPI(Connect):
"""Set camera moving to direction (polymorph abstraction). """Set camera moving to direction (polymorph abstraction).
Args: Args:
x (int, optional): acceleration of horizontal camera movement from -100 to 100. Defaults to 0. x (int, optional): acceleration of horizontal camera movement from -100 to 100.
y (int, optional): acceleration of vertical camera movement from -100 to 100. Defaults to 0. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100. Defaults to 0. y (int, optional): acceleration of vertical camera movement from -100 to 100.
t (int, optional): duration in ms of acceleration from 0 to 180000. Defaults to 0. Defaults to 0.
z (int, optional): acceleration of zoom camera movement from -100 to 100.
Defaults to 0.
t (int, optional): duration in ms of acceleration from 0 to 180000.
Defaults to 0.
Returns: Returns:
bool: True if successed. bool: True if successed.
@ -848,7 +919,13 @@ class HikISAPI(Connect):
else: else:
return False return False
def settextonosd(self, enabled: str = "true", x: int = 0, y: int = 0, message: str = "") -> bool: def settextonosd(
self,
enabled: str = "true",
x: int = 0,
y: int = 0,
message: str = ""
) -> bool:
"""Set message as video overlay text. """Set message as video overlay text.
Args: Args:
@ -929,6 +1006,7 @@ class Sensor(Connect):
username=self._user, password=self._pswd username=self._user, password=self._pswd
) )
# pylint: disable=W0718
def __temperature(self, nodename: str) -> str: def __temperature(self, nodename: str) -> str:
"""Preparating request for ds18b20 sensor type. """Preparating request for ds18b20 sensor type.
@ -969,6 +1047,7 @@ class Sequence:
"""Sequence handling. """Sequence handling.
""" """
@staticmethod @staticmethod
# pylint: disable=W0718
def run( def run(
device: HikISAPI, sensors: dict, sequence: dict, device: HikISAPI, sensors: dict, sequence: dict,
records_root_path: str = None, records_root_path: str = None,
@ -981,9 +1060,12 @@ class Sequence:
device (HikISAPI): HikISAPI object. device (HikISAPI): HikISAPI object.
sensors (dict): collection as key=sensorname:value=Sensor object. sensors (dict): collection as key=sensorname:value=Sensor object.
sequence (dict): sequence steps collection. sequence (dict): sequence steps collection.
records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory. Defaults to None. records_root_path (str, optional): path (local|smb|ftp,sftp) to records directory.
records_root_user (str, optional): username if path on remote host. Defaults to None. Defaults to None.
records_root_pass (str, optional): password if path on remote host. Defaults to None. records_root_user (str, optional): username if path on remote host.
Defaults to None.
records_root_pass (str, optional): password if path on remote host.
Defaults to None.
""" """
for key, value in sequence.items(): for key, value in sequence.items():
action = value.split(',')[0].strip() action = value.split(',')[0].strip()
@ -1002,8 +1084,8 @@ class Sequence:
m = sensor_value m = sensor_value
else: else:
m = '' m = ''
logging.info( logging.info(msg=''
msg=' action:' + key + ' = ' + action + ' action:' + key + ' = ' + action
+ ',' + x + ',' + y + ',' + z + ',' + x + ',' + y + ',' + z
+ ',' + p + ',' + s + ',' + t + ',' + p + ',' + s + ',' + t
+ ',' + w + ',' + m + ',' + w + ',' + m
@ -1052,8 +1134,14 @@ class Sequence:
th = datetime.now().strftime('%H') th = datetime.now().strftime('%H')
tm = datetime.now().strftime('%M') tm = datetime.now().strftime('%M')
ts = datetime.now().strftime('%S') ts = datetime.now().strftime('%S')
records_file_name = (key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg') records_file_name = (
if device.downloadjpeg(x=int(x), y=int(y), dst_file=records_root_temp + sep + records_file_name): key + '_' + dy + '-' + dm + '-' + dd + '_' + th + '.' + tm + '.' + ts + '.jpeg'
)
if device.downloadjpeg(
x=int(x),
y=int(y),
dst_file=records_root_temp + sep + records_file_name
):
hostname = 'localhost' hostname = 'localhost'
hostport, hosttype = None, None hostport, hosttype = None, None
username = records_root_user username = records_root_user
@ -1078,7 +1166,11 @@ class Sequence:
hostname = hostname.split(':')[0] hostname = hostname.split(':')[0]
if hosttype == 'ftp': if hosttype == 'ftp':
src_file = records_root_temp + sep + records_file_name src_file = records_root_temp + sep + records_file_name
dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name dst_file = (
hostpath
+ '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/'
+ records_file_name
)
if Connect.ftp_put_file( if Connect.ftp_put_file(
src_file=src_file, src_file=src_file,
dst_file=dst_file, dst_file=dst_file,
@ -1092,7 +1184,11 @@ class Sequence:
pass pass
elif hosttype == 'sftp': elif hosttype == 'sftp':
src_file = records_root_temp + sep + records_file_name src_file = records_root_temp + sep + records_file_name
dst_file = hostpath + '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/' + records_file_name dst_file = (
hostpath
+ '/' + dy + '/' + dm + '/' + dv + '/' + dd + '/'
+ records_file_name
)
response = Connect.ssh_put_file( response = Connect.ssh_put_file(
src_file=src_file, dst_file=dst_file, src_file=src_file, dst_file=dst_file,
hostname=hostname, port=hostport, hostname=hostname, port=hostport,
@ -1107,14 +1203,19 @@ class Sequence:
response = False response = False
else: else:
src_file = records_root_temp + sep + records_file_name src_file = records_root_temp + sep + records_file_name
dst_file = hostpath + sep + dy + sep + dm + sep + dv + sep + dd + sep + records_file_name dst_file = (
hostpath
+ sep + dy + sep + dm + sep + dv + sep + dd + sep
+ records_file_name)
try: try:
makedirs(hostpath + sep + dy + sep + dm + sep + dv + sep + dd, exist_ok=True) makedirs(
hostpath + sep + dy + sep + dm + sep + dv + sep + dd,
exist_ok=True
)
replace(src=src_file, dst=dst_file) replace(src=src_file, dst=dst_file)
response = True response = True
except Exception as error: except Exception as error:
logging.debug( logging.debug(msg=''
msg=''
+ '\n' + 'src_file: ' + src_file + '\n' + 'src_file: ' + src_file
+ '\n' + 'dst_file: ' + dst_file + '\n' + 'dst_file: ' + dst_file
+ '\n' + 'error: ' + str(error) + '\n' + 'error: ' + str(error)
@ -1134,6 +1235,7 @@ class Proc:
"""Find a running process from Python. """Find a running process from Python.
""" """
@classmethod @classmethod
# pylint: disable=W0612
def _list_windows(cls) -> list: def _list_windows(cls) -> list:
"""Find all running process with wmi. """Find all running process with wmi.
@ -1174,6 +1276,7 @@ class Proc:
return execlist return execlist
@classmethod @classmethod
# pylint: disable=W0612
def _list_linux(cls) -> list: def _list_linux(cls) -> list:
"""Find all running process with ps. """Find all running process with ps.
@ -1219,6 +1322,7 @@ class Proc:
return None return None
@classmethod @classmethod
# pylint: disable=W0150
def search(cls, find: str, exclude: str = None) -> list: def search(cls, find: str, exclude: str = None) -> list:
"""Find specified processes. """Find specified processes.
@ -1301,7 +1405,7 @@ class FFmpeg:
int: ffmpeg return code int: ffmpeg return code
""" """
if not raw: if not raw:
process = ('' process = ([]
+ cls._bin(ffpath).split() + cls._bin(ffpath).split()
+ cls._src(src).split() + cls._src(src).split()
+ cls._preset(preset, fps).split() + cls._preset(preset, fps).split()
@ -1350,11 +1454,11 @@ class FFmpeg:
'\tTarget: /usr/bin/ffmpeg\n' '\tTarget: /usr/bin/ffmpeg\n'
'\n' '\n'
'Install on Windows:\n' 'Install on Windows:\n'
'\tDownload and extract archive from: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n' '\tDownload and extract: https://www.gyan.dev/ffmpeg/builds/ffmpeg-release-full.7z\n'
'\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n' '\tTarget: "%PROGRAMFILES%\\ffmpeg\\bin\\ffmpeg.exe"\n'
'\n' '\n'
'Install on MacOS:\n' 'Install on MacOS:\n'
'\tDownload and extract archive from: https://evermeet.cx/ffmpeg/\n' '\tDownload and extract: https://evermeet.cx/ffmpeg/\n'
'\tTarget: /usr/bin/ffmpeg\n' '\tTarget: /usr/bin/ffmpeg\n'
) )
if not ffpath: if not ffpath:
@ -1565,7 +1669,8 @@ if __name__ == "__main__":
'- Python 3 (tested version 3.9.5), ' '- Python 3 (tested version 3.9.5), '
'- Python 3 modules: paramiko ' '- Python 3 modules: paramiko '
) )
args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf', required=False, args.add_argument('--config', type=str, default=path.splitext(__file__)[0] + '.conf',
required=False,
help='custom configuration file path') help='custom configuration file path')
args.add_argument('-b', '--broadcast', action='store_true', required=False, args.add_argument('-b', '--broadcast', action='store_true', required=False,
help='streaming media to destination') help='streaming media to destination')