#!/usr/bin/env python3 # pylint: disable=C0103,C0114 import logging import urllib.request from argparse import ArgumentParser from datetime import datetime from os import path, sep, makedirs from re import match from sys import platform from subprocess import Popen, PIPE class Parse: """Parser of configs, arguments, parameters. """ def __init__(self, parameters, block: str = None) -> None: """Object constructor. Args: parameters: dictionary as "key":"value" or ArgumentParser class object or string path to the file or string as "var1=val1;var2=val2". block (str, optional): name of target block from text. Defaults to None. """ self.path = '' self.data = {} if isinstance(parameters, dict): self._dict2dict(parameters) if isinstance(parameters, ArgumentParser): self._dict2dict(self.argv2dict(parameters)) if isinstance(parameters, str): if path.exists(parameters): self._dict2dict( self.strs2dict( self.conf2strs(parameters), block ) ) self.path = parameters else: self._dict2dict(self.strs2dict(parameters, block)) def __str__(self) -> str: """Overrides method for print(object). Returns: str: string with contents of the object's dictionary. """ string = '' for key, val in self.data.items(): string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n' return string def _dict2dict(self, dictionary: dict) -> None: """Updates or adds dictionary data. Args: dictionary (dict): dictionary as "key":"value". """ self.data.update(dictionary) def expand(self, store: str = None) -> dict: """Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}. Args: store (str, optional): path to directory with name.conf. Defaults to None. Returns: dict: expanded dictionary as "key":{subkey: subval}. """ for key in self.data.items(): if store: config = store + sep + self.data[key] else: config = self.data[key] with open(config, mode='r', encoding='UTF-8') as file: self.data[key] = Parse(file.read()).data return self.data @classmethod def argv2dict(cls, parser: ArgumentParser) -> dict: """Converts startup arguments to a dictionary. Args: parser (ArgumentParser): argparse.ArgumentParser class object. Returns: dict: dictionary as "key":"value". """ parser = ArgumentParser(add_help=False, parents=[parser]) return vars(parser.parse_args()) @classmethod def conf2strs(cls, config: str) -> str: """Builds a dictionary from a file containing parameters. Args: config (str): path to the config file. Returns: str: string as "var1=val1;\nvar2=val2;". """ with open(config, mode='r', encoding='UTF-8') as file: raw = file.read() strs = '' for line in raw.splitlines(): if not line.lstrip().startswith('#'): strs += line + '\n' return strs @classmethod def strs2dict(cls, strings: str, blockname: str) -> dict: """Builds a dictionary from a strings containing parameters. Args: strings (str): string as "var1=val1;var2=val2;". blockname (str): name of target block from text. Returns: dict: dictionary as "key":"value". """ dictionary = {} if blockname: strings = cls.block(blockname, strings) for line in strings.replace('\n', ';').split(';'): if not line.lstrip().startswith('#') and "=" in line: key = line.split('=')[0].strip() val = line.split('=')[1].strip().split(';')[0].strip() dictionary[key] = val return dictionary @classmethod def str2bool(cls, value: str) -> bool: """Converts a string value to boolean. Args: value (str): string containing "true" or "false", "yes" or "no", "1" or "0". Returns: bool: bool True or False. """ return str(value).lower() in ("true", "yes", "1") @classmethod def block(cls, blockname: str, text: str) -> str: """Cuts a block of text between line [blockname] and line [next block] or EOF. Args: blockname (str): string in [] after which the block starts. text (str): string of text from which the block is needed. Returns: str: string of text between line [block name] and line [next block]. """ level = 1 save = False result = '' for line in text.splitlines(): if line.startswith('[') and blockname in line: level = line.count('[') save = True elif line.startswith('[') and '['*level in line: save = False elif save: result += line + '\n' return result class Connect: # pylint: disable=W0718 """Set of connection methods (functions) for various protocols. """ @staticmethod def http( url: str, method: str = 'GET', username: str = '', password: str = '', authtype: str = None, contenttype: str = 'text/plain', contentdata: str = '' ) -> str: """Handling HTTP request. Args: url (str): request url. method (str, optional): HTTP request method. Defaults to 'GET'. username (str, optional): username for url authentication. Defaults to ''. password (str, optional): password for url authentication. Defaults to ''. authtype (str, optional): digest|basic authentication type. Defaults to None. contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'. contentdata (str, optional): content data. Defaults to ''. Returns: str: HTTP response or 'ERROR'. """ # Preparing authorization if authtype: pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm() pswd.add_password(None, url, username, password) if authtype == 'basic': auth = urllib.request.HTTPBasicAuthHandler(pswd) if authtype == 'digest': auth = urllib.request.HTTPDigestAuthHandler(pswd) urllib.request.install_opener(urllib.request.build_opener(auth)) # Preparing request request = urllib.request.Request( url=url, data=bytes(contentdata.encode('utf-8')), method=method ) request.add_header('Content-Type', contenttype) # Response try: response = urllib.request.urlopen(request).read() logging.debug( msg='' + '\n' + 'uri: ' + url + '\n' + 'method: ' + method + '\n' + 'username: ' + username + '\n' + 'password: ' + password + '\n' + 'authtype: ' + str(authtype) + '\n' + 'content-type: ' + contenttype + '\n' + 'content-data: ' + contentdata ) if response.startswith(b'\xff\xd8'): return response else: return str(response.decode('utf-8')) except Exception as error: logging.debug(msg='\n' + 'error: ' + str(error)) return 'ERROR' class Route(Connect): """Handling route operations. """ def __init__(self, gateway: str, cidr_root_path: str, cidr_name_list: list) -> None: """Object constructor. Args: gateway (str): route gateway ip address. cidr_root_path (str): cidr db path. cidr_name_list (list): list of cidr files. """ self._gateway = gateway self._cidr_root_path = cidr_root_path self._cidr_name_list = cidr_name_list self._route_list = self.__cidr_name_list_to_route_list() def __cidr_name_list_to_route_list(self) -> list: """Convert files content to route list. Returns: list: route list. """ route_list = [] for cidr_name in self._cidr_name_list: cidr_path = self._cidr_root_path + sep + cidr_name if not path.exists(cidr_path): if not cidr_name.startswith('_'): if not self.__cidr_download(cidr_name): self._cidr_name_list.remove(cidr_name) else: self._cidr_name_list.remove(cidr_name) for cidr_name in self._cidr_name_list: cidr_path = self._cidr_root_path + sep + cidr_name with open(cidr_path, mode='r', encoding='UTF-8') as file: cidr_data = file.read() for cidr in cidr_data.splitlines(): cidr = cidr.split('#')[0].strip() if match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}$", cidr): route_list.append(cidr + ' ' + self._gateway) return route_list def __cidr_download(self, cidr_name: str) -> bool: # pylint: disable=W0718,W0719 """Download CIDR file. Args: cidr_name (str): file name. Raises: Exception: downloading failed. Returns: bool: True - CIDR downloaded, False - there are exceptions. """ try: cidr_url = ''.join('' + 'https://raw.githubusercontent.com/' + 'herrbischoff/' + 'country-ip-blocks/master/' + 'ipv4/' + cidr_name ) response = self.http(url=cidr_url, method='GET') cidr_path = self._cidr_root_path + sep + cidr_name if response != 'ERROR': makedirs(self._cidr_root_path, exist_ok=True) with open(cidr_path, mode='w+', encoding='UTF-8') as file: file.write(response) return True else: raise Exception('downloading ' + cidr_name + ' failed') except Exception as error: logging.warning( msg='' + str(error) ) return False def __cmd(self, command: list) -> None: """Executing command by terminal. Args: command (list): splitted command by words. """ out, err = Popen( command, stdout=PIPE, stderr=PIPE ).communicate() for line in out.splitlines(): logging.info(msg=line.decode('utf-8')) for line in err.splitlines(): logging.warning(msg=line.decode('utf-8')) def ro_add(self) -> None: """Add routes specified by config. """ for route in self._route_list: if platform.startswith('linux') or platform.startswith('darwin'): command = ['ip', 'ro', 'add'] + route.split() logging.info(msg=' '.join(command)) self.__cmd(command=command) elif platform.startswith('win32'): # todo: windows return False else: return False logging.info(msg='added ' + str(len(self._route_list)) + ' records') return True def ro_del(self) -> None: """Del routes specified by config. """ for route in self._route_list: if platform.startswith('linux') or platform.startswith('darwin'): command = ['ip', 'ro', 'del'] + route.split() logging.info(msg=' '.join(command)) self.__cmd(command=command) elif platform.startswith('win32'): # todo: windows return False else: return False logging.info(msg='deleted ' + str(len(self._route_list)) + ' records') return True def update(self) -> None: """Update CIDR db, del and add routes specified by config. """ for cidr_name in self._cidr_name_list: if not cidr_name.startswith('_'): self.__cidr_download(cidr_name=cidr_name) self.ro_del() self.ro_add() def checkroot() -> bool: # pylint: disable=C0415 """Crossplatform privileged rights checker. Returns: bool: True - if privileged rights, False - if not privileged rights """ if platform.startswith('linux') or platform.startswith('darwin'): from os import geteuid if geteuid() == 0: return True return False elif platform.startswith('win32'): import ctypes return ctypes.windll.shell32.IsUserAnAdmin() if __name__ == "__main__": time_start = datetime.now() args = ArgumentParser( prog='my-route', description='Route management by CIDR lists.', epilog='Dependencies: ' '- Python 3 (tested version 3.9.5), ' '- privileged rights ' ) args.add_argument( '--config', type=str, default=path.splitext(__file__)[0] + '.conf', required=False, help='custom configuration file path' ) args.add_argument('-a', '--add', action='store_true', required=False, help='add routes specified by config') args.add_argument('-d', '--del', action='store_true', required=False, help='del routes specified by config') args.add_argument('-u', '--update', action='store_true', required=False, help='update cidr db, del old routes, add new routes') args = vars(args.parse_args()) cidr_root = path.dirname(path.realpath(__file__)) + sep + 'my-route.db' log_level = 'INFO' log_root = path.dirname(path.realpath(__file__)) gateways = {} if path.exists(args['config']): conf = Parse(parameters=args['config'], block='common') if 'cidr_root' in conf.data: cidr_root = conf.data['cidr_root'] if 'log_root' in conf.data: log_root = conf.data['log_root'] if 'log_level' in conf.data: if conf.data['log_level'] == 'DEBUG': log_level = logging.DEBUG elif conf.data['log_level'] == 'INFO': log_level = logging.INFO elif conf.data['log_level'] == 'WARNING': log_level = logging.WARNING elif conf.data['log_level'] == 'ERROR': log_level = logging.ERROR elif conf.data['log_level'] == 'CRITICAL': log_level = logging.CRITICAL conf = Parse(parameters=args['config'], block='enable-gateway') for key, value in conf.data.items(): if value == 'true': gateway_config = Parse( parameters=args['config'], block=key ) gateways[key] = [] for cidr, enable in gateway_config.data.items(): if enable == 'true': gateways[key].append(cidr) logging.basicConfig( format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d_%H.%M.%S', handlers=[ logging.FileHandler( filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log', mode='a' ), logging.StreamHandler() ], level=log_level ) if checkroot(): for key, value in gateways.items(): ro_list = Route( gateway=key.replace('-', ' '), cidr_root_path=cidr_root, cidr_name_list=value ) if args['update']: ro_list.update() elif args['del']: ro_list.ro_del() elif args['add']: ro_list.ro_add() else: logging.info(msg='No start arguments selected. Exit.') break else: logging.warning(msg='Restart this as root!') time_execute = datetime.now() - time_start logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')