diff --git a/README.md b/README.md index 0496920..043d786 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,6 @@ Small tools needed to solve immediate tasks independently or as part of a projec * [`build-python`.sh](https://git.hmp.today/pavel.muhortov/utils#build-python-sh) * [`cronutil`](https://git.hmp.today/pavel.muhortov/utils#cronutil) * [`confutil`.py](https://git.hmp.today/pavel.muhortov/utils#confutil-py) -* [`my-route`.py](https://git.hmp.today/pavel.muhortov/utils#my-route-py) * [`sendmail`.py](https://git.hmp.today/pavel.muhortov/utils#sendmail-py) * [`simplewc`.py](https://git.hmp.today/pavel.muhortov/utils#simplewc-py) @@ -127,53 +126,6 @@ if path.exists(conf): ____ -## `my-route`.py - -**Description:** -> Route management by CIDR lists. - -**Dependencies:** -> -> * privileged rights -> * [Python 3](https://www.python.org/downloads/) (tested version 3.9.5 on [Debian GNU/Linux 11](http://ftp.debian.org/debian/dists/bullseye/)) - -| PARAMETERS | DESCRIPTION | DEFAULT| -|-------------|-------------|--------| -|**[-h]**|print help and exit|| -|**[-a, --add]**|add routes specified by config|`None`| -|**[-d, --del]**|del routes specified by config|`None`| -|**[-u, --update]**|update cidr db, del old routes, add new routes|`None`| -|**[--config]**|custom configuration file path|`./my-route.conf`| - -Example usage: - -```bash -# download -sudo wget https://git.hmp.today/pavel.muhortov/utils/raw/branch/master/my-route.py -O /usr/local/bin/my-route.py -sudo chmod +x /usr/local/bin/my-route.py -``` - -```bash -# download and edit config file -sudo wget https://git.hmp.today/pavel.muhortov/utils/raw/branch/master/my-route.conf -O /usr/local/bin/my-route.conf -sudo nano /usr/local/bin/my-route.conf -``` - -```bash -# create and edit cidr file -sudo mkdir /usr/local/bin/my-route.db -sudo tee /usr/local/bin/my-route.db/_custom.cidr > /dev/null <<'EOF' -34.117.59.81/32 # ipinfo.io -EOF -``` - -```bash -# sudo crontab -e -0 0 * * * /usr/bin/python3 /usr/local/bin/my-route.py --update -``` - -____ - ## `sendmail`.py **Description:** diff --git a/my-route.conf b/my-route.conf deleted file mode 100755 index 5e547b8..0000000 --- a/my-route.conf +++ /dev/null @@ -1,265 +0,0 @@ -[common] -# By default, a database directory is created in the same path where the script is located. -# If you need change it, uncomment the parameter and set the path you want. -#cidr_root = /tmp/my-route.db -# -# By default, logs use the same directory where the script is located. -# If you need change it, uncomment the parameter and set the path you want. -#log_root = /var/log/my-route -# -# The default log level is "INFO". -# If you get errors or want to change the logging level, uncomment the parameter and set the level you want: -# DEBUG, INFO, WARNING, ERROR, CRITICAL. -#log_level = DEBUG - -[enable-gateway] -# List the gateway block names. Only blocks with the TRUE value will be used. -via-192.168.0.1 = false -dev-wg1 = true - -[via-192.168.0.1] -# List of CIDR. Only CIDR with the TRUE value will be used. -_custom.cidr = false - -[dev-wg1] -_custom.cidr = true -ad.cidr -ae.cidr -af.cidr -ag.cidr -ai.cidr -al.cidr -am.cidr -ao.cidr -ap.cidr -aq.cidr -ar.cidr -as.cidr -at.cidr -au.cidr -aw.cidr -ax.cidr -az.cidr -ba.cidr -bb.cidr -bd.cidr -be.cidr -bf.cidr -bg.cidr -bh.cidr -bi.cidr -bj.cidr -bl.cidr -bm.cidr -bn.cidr -bo.cidr -bq.cidr -br.cidr -bs.cidr -bt.cidr -bw.cidr -by.cidr -bz.cidr -ca.cidr -cd.cidr -cf.cidr -cg.cidr -ch.cidr -ci.cidr -ck.cidr -cl.cidr -cm.cidr -cn.cidr -co.cidr -cr.cidr -cu.cidr -cv.cidr -cw.cidr -cy.cidr -cz.cidr -de.cidr -dj.cidr -dk.cidr -dm.cidr -do.cidr -dz.cidr -ec.cidr -ee.cidr -eg.cidr -er.cidr -es.cidr -et.cidr -fi.cidr -fj.cidr -fk.cidr -fm.cidr -fo.cidr -fr.cidr -ga.cidr -gb.cidr -gd.cidr -ge.cidr -gf.cidr -gg.cidr -gh.cidr -gi.cidr -gl.cidr -gm.cidr -gn.cidr -gp.cidr -gq.cidr -gr.cidr -gt.cidr -gu.cidr -gw.cidr -gy.cidr -hk.cidr -hn.cidr -hr.cidr -ht.cidr -hu.cidr -id.cidr -ie.cidr -il.cidr -im.cidr -in.cidr -io.cidr -iq.cidr -ir.cidr -is.cidr -it.cidr -je.cidr -jm.cidr -jo.cidr -jp.cidr -ke.cidr -kg.cidr -kh.cidr -ki.cidr -km.cidr -kn.cidr -kp.cidr -kr.cidr -kw.cidr -ky.cidr -kz.cidr -la.cidr -lb.cidr -lc.cidr -li.cidr -lk.cidr -lr.cidr -ls.cidr -lt.cidr -lu.cidr -lv.cidr -ly.cidr -ma.cidr -mc.cidr -md.cidr -me.cidr -mf.cidr -mg.cidr -mh.cidr -mk.cidr -ml.cidr -mm.cidr -mn.cidr -mo.cidr -mp.cidr -mq.cidr -mr.cidr -ms.cidr -mt.cidr -mu.cidr -mv.cidr -mw.cidr -mx.cidr -my.cidr -mz.cidr -na.cidr -nc.cidr -ne.cidr -nf.cidr -ng.cidr -ni.cidr -nl.cidr -no.cidr -np.cidr -nr.cidr -nu.cidr -nz.cidr -om.cidr -pa.cidr -pe.cidr -pf.cidr -pg.cidr -ph.cidr -pk.cidr -pl.cidr -pm.cidr -pr.cidr -ps.cidr -pt.cidr -pw.cidr -py.cidr -qa.cidr -re.cidr -ro.cidr -rs.cidr -ru.cidr -rw.cidr -sa.cidr -sb.cidr -sc.cidr -sd.cidr -se.cidr -sg.cidr -si.cidr -sk.cidr -sl.cidr -sm.cidr -sn.cidr -so.cidr -sr.cidr -ss.cidr -st.cidr -sv.cidr -sx.cidr -sy.cidr -sz.cidr -tc.cidr -td.cidr -tg.cidr -th.cidr -tj.cidr -tk.cidr -tl.cidr -tm.cidr -tn.cidr -to.cidr -tr.cidr -tt.cidr -tv.cidr -tw.cidr -tz.cidr -ua.cidr -ug.cidr -us.cidr -uy.cidr -uz.cidr -va.cidr -vc.cidr -ve.cidr -vg.cidr -vi.cidr -vn.cidr -vu.cidr -wf.cidr -ws.cidr -ye.cidr -yt.cidr -za.cidr -zm.cidr -zw.cidr -zz.cidr diff --git a/my-route.py b/my-route.py deleted file mode 100755 index ccf3140..0000000 --- a/my-route.py +++ /dev/null @@ -1,454 +0,0 @@ -#!/usr/bin/env python3 -# pylint: disable=C0103,C0114 - -import logging -import urllib.request -from argparse import ArgumentParser -from datetime import datetime -from os import path, sep, makedirs -from re import match -from sys import platform -from subprocess import Popen, PIPE - - -class Parse: - """Parser of configs, arguments, parameters. - """ - def __init__(self, parameters, block: str = None) -> None: - """Object constructor. - Args: - parameters: dictionary as "key":"value" or - ArgumentParser class object or - string path to the file or - string as "var1=val1;var2=val2". - block (str, optional): name of target block from text. Defaults to None. - """ - self.path = '' - self.data = {} - if isinstance(parameters, dict): - self._dict2dict(parameters) - if isinstance(parameters, ArgumentParser): - self._dict2dict(self.argv2dict(parameters)) - if isinstance(parameters, str): - if path.exists(parameters): - self._dict2dict( - self.strs2dict( - self.conf2strs(parameters), - block - ) - ) - self.path = parameters - else: - self._dict2dict(self.strs2dict(parameters, block)) - def __str__(self) -> str: - """Overrides method for print(object). - Returns: - str: string with contents of the object's dictionary. - """ - string = '' - for key, val in self.data.items(): - string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n' - return string - def _dict2dict(self, dictionary: dict) -> None: - """Updates or adds dictionary data. - Args: - dictionary (dict): dictionary as "key":"value". - """ - self.data.update(dictionary) - def expand(self, store: str = None) -> dict: - """Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}. - Args: - store (str, optional): path to directory with name.conf. Defaults to None. - Returns: - dict: expanded dictionary as "key":{subkey: subval}. - """ - for key in self.data.items(): - if store: - config = store + sep + self.data[key] - else: - config = self.data[key] - with open(config, mode='r', encoding='UTF-8') as file: - self.data[key] = Parse(file.read()).data - return self.data - @classmethod - def argv2dict(cls, parser: ArgumentParser) -> dict: - """Converts startup arguments to a dictionary. - Args: - parser (ArgumentParser): argparse.ArgumentParser class object. - Returns: - dict: dictionary as "key":"value". - """ - parser = ArgumentParser(add_help=False, parents=[parser]) - return vars(parser.parse_args()) - @classmethod - def conf2strs(cls, config: str) -> str: - """Builds a dictionary from a file containing parameters. - Args: - config (str): path to the config file. - Returns: - str: string as "var1=val1;\nvar2=val2;". - """ - with open(config, mode='r', encoding='UTF-8') as file: - raw = file.read() - strs = '' - for line in raw.splitlines(): - if not line.lstrip().startswith('#'): - strs += line + '\n' - return strs - @classmethod - def strs2dict(cls, strings: str, blockname: str) -> dict: - """Builds a dictionary from a strings containing parameters. - Args: - strings (str): string as "var1=val1;var2=val2;". - blockname (str): name of target block from text. - Returns: - dict: dictionary as "key":"value". - """ - dictionary = {} - if blockname: - strings = cls.block(blockname, strings) - for line in strings.replace('\n', ';').split(';'): - if not line.lstrip().startswith('#') and "=" in line: - key = line.split('=')[0].strip() - val = line.split('=')[1].strip().split(';')[0].strip() - dictionary[key] = val - return dictionary - @classmethod - def str2bool(cls, value: str) -> bool: - """Converts a string value to boolean. - Args: - value (str): string containing "true" or "false", "yes" or "no", "1" or "0". - Returns: - bool: bool True or False. - """ - return str(value).lower() in ("true", "yes", "1") - @classmethod - def block(cls, blockname: str, text: str) -> str: - """Cuts a block of text between line [blockname] and line [next block] or EOF. - Args: - blockname (str): string in [] after which the block starts. - text (str): string of text from which the block is needed. - Returns: - str: string of text between line [block name] and line [next block]. - """ - level = 1 - save = False - result = '' - for line in text.splitlines(): - if line.startswith('[') and blockname in line: - level = line.count('[') - save = True - elif line.startswith('[') and '['*level in line: - save = False - elif save: - result += line + '\n' - return result - - -class Connect: - # pylint: disable=W0718 - """Set of connection methods (functions) for various protocols. - """ - @staticmethod - def http( - url: str, method: str = 'GET', - username: str = '', password: str = '', authtype: str = None, - contenttype: str = 'text/plain', contentdata: str = '' - ) -> str: - """Handling HTTP request. - Args: - url (str): request url. - method (str, optional): HTTP request method. Defaults to 'GET'. - username (str, optional): username for url authentication. Defaults to ''. - password (str, optional): password for url authentication. Defaults to ''. - authtype (str, optional): digest|basic authentication type. Defaults to None. - contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'. - contentdata (str, optional): content data. Defaults to ''. - Returns: - str: HTTP response or 'ERROR'. - """ - # Preparing authorization - if authtype: - pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm() - pswd.add_password(None, url, username, password) - if authtype == 'basic': - auth = urllib.request.HTTPBasicAuthHandler(pswd) - if authtype == 'digest': - auth = urllib.request.HTTPDigestAuthHandler(pswd) - urllib.request.install_opener(urllib.request.build_opener(auth)) - # Preparing request - request = urllib.request.Request( - url=url, - data=bytes(contentdata.encode('utf-8')), - method=method - ) - request.add_header('Content-Type', contenttype) - # Response - try: - response = urllib.request.urlopen(request).read() - logging.debug( - msg='' - + '\n' + 'uri: ' + url - + '\n' + 'method: ' + method - + '\n' + 'username: ' + username - + '\n' + 'password: ' + password - + '\n' + 'authtype: ' + str(authtype) - + '\n' + 'content-type: ' + contenttype - + '\n' + 'content-data: ' + contentdata - ) - if response.startswith(b'\xff\xd8'): - return response - else: - return str(response.decode('utf-8')) - except Exception as error: - logging.debug(msg='\n' + 'error: ' + str(error)) - return 'ERROR' - - -class Route(Connect): - """Handling route operations. - """ - def __init__(self, gateway: str, cidr_root_path: str, cidr_name_list: list) -> None: - """Object constructor. - - Args: - gateway (str): route gateway ip address. - cidr_root_path (str): cidr db path. - cidr_name_list (list): list of cidr files. - """ - self._gateway = gateway - self._cidr_root_path = cidr_root_path - self._cidr_name_list = cidr_name_list - self._route_list = self.__cidr_name_list_to_route_list() - - def __cidr_name_list_to_route_list(self) -> list: - """Convert files content to route list. - - Returns: - list: route list. - """ - route_list = [] - for cidr_name in self._cidr_name_list: - cidr_path = self._cidr_root_path + sep + cidr_name - if not path.exists(cidr_path): - if not cidr_name.startswith('_'): - if not self.__cidr_download(cidr_name): - self._cidr_name_list.remove(cidr_name) - else: - self._cidr_name_list.remove(cidr_name) - for cidr_name in self._cidr_name_list: - cidr_path = self._cidr_root_path + sep + cidr_name - with open(cidr_path, mode='r', encoding='UTF-8') as file: - cidr_data = file.read() - for cidr in cidr_data.splitlines(): - cidr = cidr.split('#')[0].strip() - if match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}$", cidr): - route_list.append(cidr + ' ' + self._gateway) - return route_list - - def __cidr_download(self, cidr_name: str) -> bool: - # pylint: disable=W0718,W0719 - """Download CIDR file. - - Args: - cidr_name (str): file name. - - Raises: - Exception: downloading failed. - - Returns: - bool: True - CIDR downloaded, False - there are exceptions. - """ - try: - cidr_url = ''.join('' - + 'https://raw.githubusercontent.com/' - + 'herrbischoff/' - + 'country-ip-blocks/master/' - + 'ipv4/' + cidr_name - ) - response = self.http(url=cidr_url, method='GET') - cidr_path = self._cidr_root_path + sep + cidr_name - if response != 'ERROR': - makedirs(self._cidr_root_path, exist_ok=True) - with open(cidr_path, mode='w+', encoding='UTF-8') as file: - file.write(response) - return True - else: - raise Exception('downloading ' + cidr_name + ' failed') - except Exception as error: - logging.warning( - msg='' - + str(error) - ) - return False - - def __cmd(self, command: list) -> None: - """Executing command by terminal. - - Args: - command (list): splitted command by words. - """ - out, err = Popen( - command, - stdout=PIPE, - stderr=PIPE - ).communicate() - for line in out.splitlines(): - logging.info(msg=line.decode('utf-8')) - for line in err.splitlines(): - logging.warning(msg=line.decode('utf-8')) - - def ro_add(self) -> None: - """Add routes specified by config. - """ - for route in self._route_list: - if platform.startswith('linux') or platform.startswith('darwin'): - command = ['ip', 'ro', 'add'] + route.split() - logging.info(msg=' '.join(command)) - self.__cmd(command=command) - elif platform.startswith('win32'): - # todo: windows - return False - else: - return False - logging.info(msg='added ' + str(len(self._route_list)) + ' records') - return True - - def ro_del(self) -> None: - """Del routes specified by config. - """ - for route in self._route_list: - if platform.startswith('linux') or platform.startswith('darwin'): - command = ['ip', 'ro', 'del'] + route.split() - logging.info(msg=' '.join(command)) - self.__cmd(command=command) - elif platform.startswith('win32'): - # todo: windows - return False - else: - return False - logging.info(msg='deleted ' + str(len(self._route_list)) + ' records') - return True - - def update(self) -> None: - """Update CIDR db, del and add routes specified by config. - """ - for cidr_name in self._cidr_name_list: - if not cidr_name.startswith('_'): - self.__cidr_download(cidr_name=cidr_name) - self.ro_del() - self.ro_add() - - -def checkroot() -> bool: - # pylint: disable=C0415 - """Crossplatform privileged rights checker. - - Returns: - bool: True - if privileged rights, False - if not privileged rights - """ - if platform.startswith('linux') or platform.startswith('darwin'): - from os import geteuid - if geteuid() == 0: - return True - return False - elif platform.startswith('win32'): - import ctypes - return ctypes.windll.shell32.IsUserAnAdmin() - - -if __name__ == "__main__": - time_start = datetime.now() - - args = ArgumentParser( - prog='my-route', - description='Route management by CIDR lists.', - epilog='Dependencies: ' - '- Python 3 (tested version 3.9.5), ' - '- privileged rights ' - ) - args.add_argument( - '--config', - type=str, - default=path.splitext(__file__)[0] + '.conf', - required=False, - help='custom configuration file path' - ) - args.add_argument('-a', '--add', action='store_true', required=False, - help='add routes specified by config') - args.add_argument('-d', '--del', action='store_true', required=False, - help='del routes specified by config') - args.add_argument('-u', '--update', action='store_true', required=False, - help='update cidr db, del old routes, add new routes') - args = vars(args.parse_args()) - - cidr_root = path.dirname(path.realpath(__file__)) + sep + 'my-route.db' - log_level = 'INFO' - log_root = path.dirname(path.realpath(__file__)) - gateways = {} - if path.exists(args['config']): - - conf = Parse(parameters=args['config'], block='common') - if 'cidr_root' in conf.data: - cidr_root = conf.data['cidr_root'] - if 'log_root' in conf.data: - log_root = conf.data['log_root'] - if 'log_level' in conf.data: - if conf.data['log_level'] == 'DEBUG': - log_level = logging.DEBUG - elif conf.data['log_level'] == 'INFO': - log_level = logging.INFO - elif conf.data['log_level'] == 'WARNING': - log_level = logging.WARNING - elif conf.data['log_level'] == 'ERROR': - log_level = logging.ERROR - elif conf.data['log_level'] == 'CRITICAL': - log_level = logging.CRITICAL - - conf = Parse(parameters=args['config'], block='enable-gateway') - for key, value in conf.data.items(): - if value == 'true': - gateway_config = Parse( - parameters=args['config'], - block=key - ) - gateways[key] = [] - for cidr, enable in gateway_config.data.items(): - if enable == 'true': - gateways[key].append(cidr) - - logging.basicConfig( - format='%(asctime)s %(levelname)s: %(message)s', - datefmt='%Y-%m-%d_%H.%M.%S', - handlers=[ - logging.FileHandler( - filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log', - mode='a' - ), - logging.StreamHandler() - ], - level=log_level - ) - - if checkroot(): - for key, value in gateways.items(): - ro_list = Route( - gateway=key.replace('-', ' '), - cidr_root_path=cidr_root, - cidr_name_list=value - ) - - if args['update']: - ro_list.update() - elif args['del']: - ro_list.ro_del() - elif args['add']: - ro_list.ro_add() - else: - logging.info(msg='No start arguments selected. Exit.') - break - else: - logging.warning(msg='Restart this as root!') - - time_execute = datetime.now() - time_start - logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')