utils/my-route.py

455 lines
16 KiB
Python
Raw Normal View History

2023-05-30 16:32:59 +03:00
#!/usr/bin/env python3
# pylint: disable=C0103,C0114
import logging
import urllib.request
from argparse import ArgumentParser
from datetime import datetime
from os import path, sep, makedirs
from re import match
from sys import platform
from subprocess import Popen, PIPE
class Parse:
"""Parser of configs, arguments, parameters.
"""
def __init__(self, parameters, block: str = None) -> None:
"""Object constructor.
Args:
parameters: dictionary as "key":"value" or
ArgumentParser class object or
string path to the file or
string as "var1=val1;var2=val2".
block (str, optional): name of target block from text. Defaults to None.
"""
self.path = ''
self.data = {}
if isinstance(parameters, dict):
self._dict2dict(parameters)
if isinstance(parameters, ArgumentParser):
self._dict2dict(self.argv2dict(parameters))
if isinstance(parameters, str):
if path.exists(parameters):
self._dict2dict(
self.strs2dict(
self.conf2strs(parameters),
block
)
)
self.path = parameters
else:
self._dict2dict(self.strs2dict(parameters, block))
def __str__(self) -> str:
"""Overrides method for print(object).
Returns:
str: string with contents of the object's dictionary.
"""
string = ''
for key, val in self.data.items():
string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n'
return string
def _dict2dict(self, dictionary: dict) -> None:
"""Updates or adds dictionary data.
Args:
dictionary (dict): dictionary as "key":"value".
"""
self.data.update(dictionary)
def expand(self, store: str = None) -> dict:
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
Args:
store (str, optional): path to directory with name.conf. Defaults to None.
Returns:
dict: expanded dictionary as "key":{subkey: subval}.
"""
for key in self.data.items():
if store:
config = store + sep + self.data[key]
else:
config = self.data[key]
with open(config, mode='r', encoding='UTF-8') as file:
self.data[key] = Parse(file.read()).data
return self.data
@classmethod
def argv2dict(cls, parser: ArgumentParser) -> dict:
"""Converts startup arguments to a dictionary.
Args:
parser (ArgumentParser): argparse.ArgumentParser class object.
Returns:
dict: dictionary as "key":"value".
"""
parser = ArgumentParser(add_help=False, parents=[parser])
return vars(parser.parse_args())
@classmethod
def conf2strs(cls, config: str) -> str:
"""Builds a dictionary from a file containing parameters.
Args:
config (str): path to the config file.
Returns:
str: string as "var1=val1;\nvar2=val2;".
"""
with open(config, mode='r', encoding='UTF-8') as file:
raw = file.read()
strs = ''
for line in raw.splitlines():
if not line.lstrip().startswith('#'):
strs += line + '\n'
return strs
@classmethod
def strs2dict(cls, strings: str, blockname: str) -> dict:
"""Builds a dictionary from a strings containing parameters.
Args:
strings (str): string as "var1=val1;var2=val2;".
blockname (str): name of target block from text.
Returns:
dict: dictionary as "key":"value".
"""
dictionary = {}
if blockname:
strings = cls.block(blockname, strings)
for line in strings.replace('\n', ';').split(';'):
if not line.lstrip().startswith('#') and "=" in line:
key = line.split('=')[0].strip()
val = line.split('=')[1].strip().split(';')[0].strip()
dictionary[key] = val
return dictionary
@classmethod
def str2bool(cls, value: str) -> bool:
"""Converts a string value to boolean.
Args:
value (str): string containing "true" or "false", "yes" or "no", "1" or "0".
Returns:
bool: bool True or False.
"""
return str(value).lower() in ("true", "yes", "1")
@classmethod
def block(cls, blockname: str, text: str) -> str:
"""Cuts a block of text between line [blockname] and line [next block] or EOF.
Args:
blockname (str): string in [] after which the block starts.
text (str): string of text from which the block is needed.
Returns:
str: string of text between line [block name] and line [next block].
"""
level = 1
save = False
result = ''
for line in text.splitlines():
if line.startswith('[') and blockname in line:
level = line.count('[')
save = True
elif line.startswith('[') and '['*level in line:
save = False
elif save:
result += line + '\n'
return result
class Connect:
# pylint: disable=W0718
"""Set of connection methods (functions) for various protocols.
"""
@staticmethod
def http(
url: str, method: str = 'GET',
username: str = '', password: str = '', authtype: str = None,
contenttype: str = 'text/plain', contentdata: str = ''
) -> str:
"""Handling HTTP request.
Args:
url (str): request url.
method (str, optional): HTTP request method. Defaults to 'GET'.
username (str, optional): username for url authentication. Defaults to ''.
password (str, optional): password for url authentication. Defaults to ''.
authtype (str, optional): digest|basic authentication type. Defaults to None.
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
contentdata (str, optional): content data. Defaults to ''.
Returns:
str: HTTP response or 'ERROR'.
"""
# Preparing authorization
if authtype:
pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm()
pswd.add_password(None, url, username, password)
if authtype == 'basic':
auth = urllib.request.HTTPBasicAuthHandler(pswd)
if authtype == 'digest':
auth = urllib.request.HTTPDigestAuthHandler(pswd)
urllib.request.install_opener(urllib.request.build_opener(auth))
# Preparing request
request = urllib.request.Request(
url=url,
data=bytes(contentdata.encode('utf-8')),
method=method
)
request.add_header('Content-Type', contenttype)
# Response
try:
response = urllib.request.urlopen(request).read()
logging.debug(
msg=''
+ '\n' + 'uri: ' + url
+ '\n' + 'method: ' + method
+ '\n' + 'username: ' + username
+ '\n' + 'password: ' + password
+ '\n' + 'authtype: ' + str(authtype)
+ '\n' + 'content-type: ' + contenttype
+ '\n' + 'content-data: ' + contentdata
)
if response.startswith(b'\xff\xd8'):
return response
else:
return str(response.decode('utf-8'))
except Exception as error:
logging.debug(msg='\n' + 'error: ' + str(error))
return 'ERROR'
class Route(Connect):
"""Handling route operations.
"""
def __init__(self, gateway: str, cidr_root_path: str, cidr_name_list: list) -> None:
"""Object constructor.
Args:
gateway (str): route gateway ip address.
cidr_root_path (str): cidr db path.
cidr_name_list (list): list of cidr files.
"""
self._gateway = gateway
self._cidr_root_path = cidr_root_path
self._cidr_name_list = cidr_name_list
self._route_list = self.__cidr_name_list_to_route_list()
def __cidr_name_list_to_route_list(self) -> list:
"""Convert files content to route list.
Returns:
list: route list.
"""
route_list = []
for cidr_name in self._cidr_name_list:
cidr_path = self._cidr_root_path + sep + cidr_name
if not path.exists(cidr_path):
if not cidr_name.startswith('_'):
if not self.__cidr_download(cidr_name):
self._cidr_name_list.remove(cidr_name)
else:
self._cidr_name_list.remove(cidr_name)
for cidr_name in self._cidr_name_list:
cidr_path = self._cidr_root_path + sep + cidr_name
with open(cidr_path, mode='r', encoding='UTF-8') as file:
cidr_data = file.read()
for cidr in cidr_data.splitlines():
cidr = cidr.split('#')[0].strip()
if match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}$", cidr):
route_list.append(cidr + ' ' + self._gateway)
return route_list
def __cidr_download(self, cidr_name: str) -> bool:
# pylint: disable=W0718,W0719
"""Download CIDR file.
Args:
cidr_name (str): file name.
Raises:
Exception: downloading failed.
Returns:
bool: True - CIDR downloaded, False - there are exceptions.
"""
try:
cidr_url = ''.join(''
+ 'https://raw.githubusercontent.com/'
+ 'herrbischoff/'
+ 'country-ip-blocks/master/'
+ 'ipv4/' + cidr_name
)
response = self.http(url=cidr_url, method='GET')
cidr_path = self._cidr_root_path + sep + cidr_name
if response != 'ERROR':
makedirs(self._cidr_root_path, exist_ok=True)
with open(cidr_path, mode='w+', encoding='UTF-8') as file:
file.write(response)
return True
else:
raise Exception('downloading ' + cidr_name + ' failed')
except Exception as error:
logging.warning(
msg=''
+ str(error)
)
return False
def __cmd(self, command: list) -> None:
"""Executing command by terminal.
Args:
command (list): splitted command by words.
"""
out, err = Popen(
command,
stdout=PIPE,
stderr=PIPE
).communicate()
for line in out.splitlines():
logging.info(msg=line.decode('utf-8'))
for line in err.splitlines():
logging.warning(msg=line.decode('utf-8'))
def ro_add(self) -> None:
"""Add routes specified by config.
"""
for route in self._route_list:
if platform.startswith('linux') or platform.startswith('darwin'):
command = ['ip', 'ro', 'add'] + route.split()
logging.info(msg=' '.join(command))
self.__cmd(command=command)
elif platform.startswith('win32'):
# todo: windows
return False
else:
return False
logging.info(msg='added ' + str(len(self._route_list)) + ' records')
return True
def ro_del(self) -> None:
"""Del routes specified by config.
"""
for route in self._route_list:
if platform.startswith('linux') or platform.startswith('darwin'):
command = ['ip', 'ro', 'del'] + route.split()
logging.info(msg=' '.join(command))
self.__cmd(command=command)
elif platform.startswith('win32'):
# todo: windows
return False
else:
return False
logging.info(msg='deleted ' + str(len(self._route_list)) + ' records')
return True
def update(self) -> None:
"""Update CIDR db, del and add routes specified by config.
"""
for cidr_name in self._cidr_name_list:
if not cidr_name.startswith('_'):
self.__cidr_download(cidr_name=cidr_name)
self.ro_del()
self.ro_add()
def checkroot() -> bool:
# pylint: disable=C0415
"""Crossplatform privileged rights checker.
Returns:
bool: True - if privileged rights, False - if not privileged rights
"""
if platform.startswith('linux') or platform.startswith('darwin'):
from os import geteuid
if geteuid() == 0:
return True
return False
elif platform.startswith('win32'):
import ctypes
return ctypes.windll.shell32.IsUserAnAdmin()
if __name__ == "__main__":
time_start = datetime.now()
args = ArgumentParser(
prog='my-route',
description='Route management by CIDR lists.',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- privileged rights '
)
args.add_argument(
'--config',
type=str,
default=path.splitext(__file__)[0] + '.conf',
required=False,
help='custom configuration file path'
)
args.add_argument('-a', '--add', action='store_true', required=False,
help='add routes specified by config')
args.add_argument('-d', '--del', action='store_true', required=False,
help='del routes specified by config')
args.add_argument('-u', '--update', action='store_true', required=False,
help='update cidr db, del old routes, add new routes')
args = vars(args.parse_args())
cidr_root = path.dirname(path.realpath(__file__)) + sep + 'my-route.db'
log_level = 'INFO'
log_root = path.dirname(path.realpath(__file__))
gateways = {}
if path.exists(args['config']):
conf = Parse(parameters=args['config'], block='common')
if 'cidr_root' in conf.data:
cidr_root = conf.data['cidr_root']
if 'log_root' in conf.data:
log_root = conf.data['log_root']
if 'log_level' in conf.data:
if conf.data['log_level'] == 'DEBUG':
log_level = logging.DEBUG
elif conf.data['log_level'] == 'INFO':
log_level = logging.INFO
elif conf.data['log_level'] == 'WARNING':
log_level = logging.WARNING
elif conf.data['log_level'] == 'ERROR':
log_level = logging.ERROR
elif conf.data['log_level'] == 'CRITICAL':
log_level = logging.CRITICAL
conf = Parse(parameters=args['config'], block='enable-gateway')
for key, value in conf.data.items():
if value == 'true':
gateway_config = Parse(
parameters=args['config'],
block=key
)
gateways[key] = []
for cidr, enable in gateway_config.data.items():
if enable == 'true':
gateways[key].append(cidr)
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d_%H.%M.%S',
handlers=[
logging.FileHandler(
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
mode='a'
),
logging.StreamHandler()
],
level=log_level
)
if checkroot():
for key, value in gateways.items():
ro_list = Route(
gateway=key.replace('-', ' '),
cidr_root_path=cidr_root,
cidr_name_list=value
)
if args['update']:
ro_list.update()
elif args['del']:
ro_list.ro_del()
elif args['add']:
ro_list.ro_add()
else:
logging.info(msg='No start arguments selected. Exit.')
break
else:
logging.warning(msg='Restart this as root!')
time_execute = datetime.now() - time_start
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')