deleted my-route.py

This commit is contained in:
Pavel Muhortov 2023-08-17 21:27:24 +03:00
parent c170e3f830
commit bb8f868578
3 changed files with 0 additions and 767 deletions

View File

@ -5,7 +5,6 @@ Small tools needed to solve immediate tasks independently or as part of a projec
* [`build-python`.sh](https://git.hmp.today/pavel.muhortov/utils#build-python-sh)
* [`cronutil`](https://git.hmp.today/pavel.muhortov/utils#cronutil)
* [`confutil`.py](https://git.hmp.today/pavel.muhortov/utils#confutil-py)
* [`my-route`.py](https://git.hmp.today/pavel.muhortov/utils#my-route-py)
* [`sendmail`.py](https://git.hmp.today/pavel.muhortov/utils#sendmail-py)
* [`simplewc`.py](https://git.hmp.today/pavel.muhortov/utils#simplewc-py)
@ -127,53 +126,6 @@ if path.exists(conf):
____
## `my-route`.py
**Description:**
> Route management by CIDR lists.
**Dependencies:**
>
> * privileged rights
> * [Python 3](https://www.python.org/downloads/) (tested version 3.9.5 on [Debian GNU/Linux 11](http://ftp.debian.org/debian/dists/bullseye/))
| PARAMETERS | DESCRIPTION | DEFAULT|
|-------------|-------------|--------|
|**[-h]**|print help and exit||
|**[-a, --add]**|add routes specified by config|`None`|
|**[-d, --del]**|del routes specified by config|`None`|
|**[-u, --update]**|update cidr db, del old routes, add new routes|`None`|
|**[--config]**|custom configuration file path|`./my-route.conf`|
Example usage:
```bash
# download
sudo wget https://git.hmp.today/pavel.muhortov/utils/raw/branch/master/my-route.py -O /usr/local/bin/my-route.py
sudo chmod +x /usr/local/bin/my-route.py
```
```bash
# download and edit config file
sudo wget https://git.hmp.today/pavel.muhortov/utils/raw/branch/master/my-route.conf -O /usr/local/bin/my-route.conf
sudo nano /usr/local/bin/my-route.conf
```
```bash
# create and edit cidr file
sudo mkdir /usr/local/bin/my-route.db
sudo tee /usr/local/bin/my-route.db/_custom.cidr > /dev/null <<'EOF'
34.117.59.81/32 # ipinfo.io
EOF
```
```bash
# sudo crontab -e
0 0 * * * /usr/bin/python3 /usr/local/bin/my-route.py --update
```
____
## `sendmail`.py
**Description:**

View File

@ -1,265 +0,0 @@
[common]
# By default, a database directory is created in the same path where the script is located.
# If you need change it, uncomment the parameter and set the path you want.
#cidr_root = /tmp/my-route.db
#
# By default, logs use the same directory where the script is located.
# If you need change it, uncomment the parameter and set the path you want.
#log_root = /var/log/my-route
#
# The default log level is "INFO".
# If you get errors or want to change the logging level, uncomment the parameter and set the level you want:
# DEBUG, INFO, WARNING, ERROR, CRITICAL.
#log_level = DEBUG
[enable-gateway]
# List the gateway block names. Only blocks with the TRUE value will be used.
via-192.168.0.1 = false
dev-wg1 = true
[via-192.168.0.1]
# List of CIDR. Only CIDR with the TRUE value will be used.
_custom.cidr = false
[dev-wg1]
_custom.cidr = true
ad.cidr
ae.cidr
af.cidr
ag.cidr
ai.cidr
al.cidr
am.cidr
ao.cidr
ap.cidr
aq.cidr
ar.cidr
as.cidr
at.cidr
au.cidr
aw.cidr
ax.cidr
az.cidr
ba.cidr
bb.cidr
bd.cidr
be.cidr
bf.cidr
bg.cidr
bh.cidr
bi.cidr
bj.cidr
bl.cidr
bm.cidr
bn.cidr
bo.cidr
bq.cidr
br.cidr
bs.cidr
bt.cidr
bw.cidr
by.cidr
bz.cidr
ca.cidr
cd.cidr
cf.cidr
cg.cidr
ch.cidr
ci.cidr
ck.cidr
cl.cidr
cm.cidr
cn.cidr
co.cidr
cr.cidr
cu.cidr
cv.cidr
cw.cidr
cy.cidr
cz.cidr
de.cidr
dj.cidr
dk.cidr
dm.cidr
do.cidr
dz.cidr
ec.cidr
ee.cidr
eg.cidr
er.cidr
es.cidr
et.cidr
fi.cidr
fj.cidr
fk.cidr
fm.cidr
fo.cidr
fr.cidr
ga.cidr
gb.cidr
gd.cidr
ge.cidr
gf.cidr
gg.cidr
gh.cidr
gi.cidr
gl.cidr
gm.cidr
gn.cidr
gp.cidr
gq.cidr
gr.cidr
gt.cidr
gu.cidr
gw.cidr
gy.cidr
hk.cidr
hn.cidr
hr.cidr
ht.cidr
hu.cidr
id.cidr
ie.cidr
il.cidr
im.cidr
in.cidr
io.cidr
iq.cidr
ir.cidr
is.cidr
it.cidr
je.cidr
jm.cidr
jo.cidr
jp.cidr
ke.cidr
kg.cidr
kh.cidr
ki.cidr
km.cidr
kn.cidr
kp.cidr
kr.cidr
kw.cidr
ky.cidr
kz.cidr
la.cidr
lb.cidr
lc.cidr
li.cidr
lk.cidr
lr.cidr
ls.cidr
lt.cidr
lu.cidr
lv.cidr
ly.cidr
ma.cidr
mc.cidr
md.cidr
me.cidr
mf.cidr
mg.cidr
mh.cidr
mk.cidr
ml.cidr
mm.cidr
mn.cidr
mo.cidr
mp.cidr
mq.cidr
mr.cidr
ms.cidr
mt.cidr
mu.cidr
mv.cidr
mw.cidr
mx.cidr
my.cidr
mz.cidr
na.cidr
nc.cidr
ne.cidr
nf.cidr
ng.cidr
ni.cidr
nl.cidr
no.cidr
np.cidr
nr.cidr
nu.cidr
nz.cidr
om.cidr
pa.cidr
pe.cidr
pf.cidr
pg.cidr
ph.cidr
pk.cidr
pl.cidr
pm.cidr
pr.cidr
ps.cidr
pt.cidr
pw.cidr
py.cidr
qa.cidr
re.cidr
ro.cidr
rs.cidr
ru.cidr
rw.cidr
sa.cidr
sb.cidr
sc.cidr
sd.cidr
se.cidr
sg.cidr
si.cidr
sk.cidr
sl.cidr
sm.cidr
sn.cidr
so.cidr
sr.cidr
ss.cidr
st.cidr
sv.cidr
sx.cidr
sy.cidr
sz.cidr
tc.cidr
td.cidr
tg.cidr
th.cidr
tj.cidr
tk.cidr
tl.cidr
tm.cidr
tn.cidr
to.cidr
tr.cidr
tt.cidr
tv.cidr
tw.cidr
tz.cidr
ua.cidr
ug.cidr
us.cidr
uy.cidr
uz.cidr
va.cidr
vc.cidr
ve.cidr
vg.cidr
vi.cidr
vn.cidr
vu.cidr
wf.cidr
ws.cidr
ye.cidr
yt.cidr
za.cidr
zm.cidr
zw.cidr
zz.cidr

View File

@ -1,454 +0,0 @@
#!/usr/bin/env python3
# pylint: disable=C0103,C0114
import logging
import urllib.request
from argparse import ArgumentParser
from datetime import datetime
from os import path, sep, makedirs
from re import match
from sys import platform
from subprocess import Popen, PIPE
class Parse:
"""Parser of configs, arguments, parameters.
"""
def __init__(self, parameters, block: str = None) -> None:
"""Object constructor.
Args:
parameters: dictionary as "key":"value" or
ArgumentParser class object or
string path to the file or
string as "var1=val1;var2=val2".
block (str, optional): name of target block from text. Defaults to None.
"""
self.path = ''
self.data = {}
if isinstance(parameters, dict):
self._dict2dict(parameters)
if isinstance(parameters, ArgumentParser):
self._dict2dict(self.argv2dict(parameters))
if isinstance(parameters, str):
if path.exists(parameters):
self._dict2dict(
self.strs2dict(
self.conf2strs(parameters),
block
)
)
self.path = parameters
else:
self._dict2dict(self.strs2dict(parameters, block))
def __str__(self) -> str:
"""Overrides method for print(object).
Returns:
str: string with contents of the object's dictionary.
"""
string = ''
for key, val in self.data.items():
string += str(type(val)) + ' ' + str(key) + ' = ' + str(val) + '\n'
return string
def _dict2dict(self, dictionary: dict) -> None:
"""Updates or adds dictionary data.
Args:
dictionary (dict): dictionary as "key":"value".
"""
self.data.update(dictionary)
def expand(self, store: str = None) -> dict:
"""Expand dictionary "key":"name.conf" to dictionary "key":{subkey: subval}.
Args:
store (str, optional): path to directory with name.conf. Defaults to None.
Returns:
dict: expanded dictionary as "key":{subkey: subval}.
"""
for key in self.data.items():
if store:
config = store + sep + self.data[key]
else:
config = self.data[key]
with open(config, mode='r', encoding='UTF-8') as file:
self.data[key] = Parse(file.read()).data
return self.data
@classmethod
def argv2dict(cls, parser: ArgumentParser) -> dict:
"""Converts startup arguments to a dictionary.
Args:
parser (ArgumentParser): argparse.ArgumentParser class object.
Returns:
dict: dictionary as "key":"value".
"""
parser = ArgumentParser(add_help=False, parents=[parser])
return vars(parser.parse_args())
@classmethod
def conf2strs(cls, config: str) -> str:
"""Builds a dictionary from a file containing parameters.
Args:
config (str): path to the config file.
Returns:
str: string as "var1=val1;\nvar2=val2;".
"""
with open(config, mode='r', encoding='UTF-8') as file:
raw = file.read()
strs = ''
for line in raw.splitlines():
if not line.lstrip().startswith('#'):
strs += line + '\n'
return strs
@classmethod
def strs2dict(cls, strings: str, blockname: str) -> dict:
"""Builds a dictionary from a strings containing parameters.
Args:
strings (str): string as "var1=val1;var2=val2;".
blockname (str): name of target block from text.
Returns:
dict: dictionary as "key":"value".
"""
dictionary = {}
if blockname:
strings = cls.block(blockname, strings)
for line in strings.replace('\n', ';').split(';'):
if not line.lstrip().startswith('#') and "=" in line:
key = line.split('=')[0].strip()
val = line.split('=')[1].strip().split(';')[0].strip()
dictionary[key] = val
return dictionary
@classmethod
def str2bool(cls, value: str) -> bool:
"""Converts a string value to boolean.
Args:
value (str): string containing "true" or "false", "yes" or "no", "1" or "0".
Returns:
bool: bool True or False.
"""
return str(value).lower() in ("true", "yes", "1")
@classmethod
def block(cls, blockname: str, text: str) -> str:
"""Cuts a block of text between line [blockname] and line [next block] or EOF.
Args:
blockname (str): string in [] after which the block starts.
text (str): string of text from which the block is needed.
Returns:
str: string of text between line [block name] and line [next block].
"""
level = 1
save = False
result = ''
for line in text.splitlines():
if line.startswith('[') and blockname in line:
level = line.count('[')
save = True
elif line.startswith('[') and '['*level in line:
save = False
elif save:
result += line + '\n'
return result
class Connect:
# pylint: disable=W0718
"""Set of connection methods (functions) for various protocols.
"""
@staticmethod
def http(
url: str, method: str = 'GET',
username: str = '', password: str = '', authtype: str = None,
contenttype: str = 'text/plain', contentdata: str = ''
) -> str:
"""Handling HTTP request.
Args:
url (str): request url.
method (str, optional): HTTP request method. Defaults to 'GET'.
username (str, optional): username for url authentication. Defaults to ''.
password (str, optional): password for url authentication. Defaults to ''.
authtype (str, optional): digest|basic authentication type. Defaults to None.
contenttype (str, optional): 'Content-Type' header. Defaults to 'text/plain'.
contentdata (str, optional): content data. Defaults to ''.
Returns:
str: HTTP response or 'ERROR'.
"""
# Preparing authorization
if authtype:
pswd = urllib.request.HTTPPasswordMgrWithDefaultRealm()
pswd.add_password(None, url, username, password)
if authtype == 'basic':
auth = urllib.request.HTTPBasicAuthHandler(pswd)
if authtype == 'digest':
auth = urllib.request.HTTPDigestAuthHandler(pswd)
urllib.request.install_opener(urllib.request.build_opener(auth))
# Preparing request
request = urllib.request.Request(
url=url,
data=bytes(contentdata.encode('utf-8')),
method=method
)
request.add_header('Content-Type', contenttype)
# Response
try:
response = urllib.request.urlopen(request).read()
logging.debug(
msg=''
+ '\n' + 'uri: ' + url
+ '\n' + 'method: ' + method
+ '\n' + 'username: ' + username
+ '\n' + 'password: ' + password
+ '\n' + 'authtype: ' + str(authtype)
+ '\n' + 'content-type: ' + contenttype
+ '\n' + 'content-data: ' + contentdata
)
if response.startswith(b'\xff\xd8'):
return response
else:
return str(response.decode('utf-8'))
except Exception as error:
logging.debug(msg='\n' + 'error: ' + str(error))
return 'ERROR'
class Route(Connect):
"""Handling route operations.
"""
def __init__(self, gateway: str, cidr_root_path: str, cidr_name_list: list) -> None:
"""Object constructor.
Args:
gateway (str): route gateway ip address.
cidr_root_path (str): cidr db path.
cidr_name_list (list): list of cidr files.
"""
self._gateway = gateway
self._cidr_root_path = cidr_root_path
self._cidr_name_list = cidr_name_list
self._route_list = self.__cidr_name_list_to_route_list()
def __cidr_name_list_to_route_list(self) -> list:
"""Convert files content to route list.
Returns:
list: route list.
"""
route_list = []
for cidr_name in self._cidr_name_list:
cidr_path = self._cidr_root_path + sep + cidr_name
if not path.exists(cidr_path):
if not cidr_name.startswith('_'):
if not self.__cidr_download(cidr_name):
self._cidr_name_list.remove(cidr_name)
else:
self._cidr_name_list.remove(cidr_name)
for cidr_name in self._cidr_name_list:
cidr_path = self._cidr_root_path + sep + cidr_name
with open(cidr_path, mode='r', encoding='UTF-8') as file:
cidr_data = file.read()
for cidr in cidr_data.splitlines():
cidr = cidr.split('#')[0].strip()
if match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}$", cidr):
route_list.append(cidr + ' ' + self._gateway)
return route_list
def __cidr_download(self, cidr_name: str) -> bool:
# pylint: disable=W0718,W0719
"""Download CIDR file.
Args:
cidr_name (str): file name.
Raises:
Exception: downloading failed.
Returns:
bool: True - CIDR downloaded, False - there are exceptions.
"""
try:
cidr_url = ''.join(''
+ 'https://raw.githubusercontent.com/'
+ 'herrbischoff/'
+ 'country-ip-blocks/master/'
+ 'ipv4/' + cidr_name
)
response = self.http(url=cidr_url, method='GET')
cidr_path = self._cidr_root_path + sep + cidr_name
if response != 'ERROR':
makedirs(self._cidr_root_path, exist_ok=True)
with open(cidr_path, mode='w+', encoding='UTF-8') as file:
file.write(response)
return True
else:
raise Exception('downloading ' + cidr_name + ' failed')
except Exception as error:
logging.warning(
msg=''
+ str(error)
)
return False
def __cmd(self, command: list) -> None:
"""Executing command by terminal.
Args:
command (list): splitted command by words.
"""
out, err = Popen(
command,
stdout=PIPE,
stderr=PIPE
).communicate()
for line in out.splitlines():
logging.info(msg=line.decode('utf-8'))
for line in err.splitlines():
logging.warning(msg=line.decode('utf-8'))
def ro_add(self) -> None:
"""Add routes specified by config.
"""
for route in self._route_list:
if platform.startswith('linux') or platform.startswith('darwin'):
command = ['ip', 'ro', 'add'] + route.split()
logging.info(msg=' '.join(command))
self.__cmd(command=command)
elif platform.startswith('win32'):
# todo: windows
return False
else:
return False
logging.info(msg='added ' + str(len(self._route_list)) + ' records')
return True
def ro_del(self) -> None:
"""Del routes specified by config.
"""
for route in self._route_list:
if platform.startswith('linux') or platform.startswith('darwin'):
command = ['ip', 'ro', 'del'] + route.split()
logging.info(msg=' '.join(command))
self.__cmd(command=command)
elif platform.startswith('win32'):
# todo: windows
return False
else:
return False
logging.info(msg='deleted ' + str(len(self._route_list)) + ' records')
return True
def update(self) -> None:
"""Update CIDR db, del and add routes specified by config.
"""
for cidr_name in self._cidr_name_list:
if not cidr_name.startswith('_'):
self.__cidr_download(cidr_name=cidr_name)
self.ro_del()
self.ro_add()
def checkroot() -> bool:
# pylint: disable=C0415
"""Crossplatform privileged rights checker.
Returns:
bool: True - if privileged rights, False - if not privileged rights
"""
if platform.startswith('linux') or platform.startswith('darwin'):
from os import geteuid
if geteuid() == 0:
return True
return False
elif platform.startswith('win32'):
import ctypes
return ctypes.windll.shell32.IsUserAnAdmin()
if __name__ == "__main__":
time_start = datetime.now()
args = ArgumentParser(
prog='my-route',
description='Route management by CIDR lists.',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- privileged rights '
)
args.add_argument(
'--config',
type=str,
default=path.splitext(__file__)[0] + '.conf',
required=False,
help='custom configuration file path'
)
args.add_argument('-a', '--add', action='store_true', required=False,
help='add routes specified by config')
args.add_argument('-d', '--del', action='store_true', required=False,
help='del routes specified by config')
args.add_argument('-u', '--update', action='store_true', required=False,
help='update cidr db, del old routes, add new routes')
args = vars(args.parse_args())
cidr_root = path.dirname(path.realpath(__file__)) + sep + 'my-route.db'
log_level = 'INFO'
log_root = path.dirname(path.realpath(__file__))
gateways = {}
if path.exists(args['config']):
conf = Parse(parameters=args['config'], block='common')
if 'cidr_root' in conf.data:
cidr_root = conf.data['cidr_root']
if 'log_root' in conf.data:
log_root = conf.data['log_root']
if 'log_level' in conf.data:
if conf.data['log_level'] == 'DEBUG':
log_level = logging.DEBUG
elif conf.data['log_level'] == 'INFO':
log_level = logging.INFO
elif conf.data['log_level'] == 'WARNING':
log_level = logging.WARNING
elif conf.data['log_level'] == 'ERROR':
log_level = logging.ERROR
elif conf.data['log_level'] == 'CRITICAL':
log_level = logging.CRITICAL
conf = Parse(parameters=args['config'], block='enable-gateway')
for key, value in conf.data.items():
if value == 'true':
gateway_config = Parse(
parameters=args['config'],
block=key
)
gateways[key] = []
for cidr, enable in gateway_config.data.items():
if enable == 'true':
gateways[key].append(cidr)
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d_%H.%M.%S',
handlers=[
logging.FileHandler(
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
mode='a'
),
logging.StreamHandler()
],
level=log_level
)
if checkroot():
for key, value in gateways.items():
ro_list = Route(
gateway=key.replace('-', ' '),
cidr_root_path=cidr_root,
cidr_name_list=value
)
if args['update']:
ro_list.update()
elif args['del']:
ro_list.ro_del()
elif args['add']:
ro_list.ro_add()
else:
logging.info(msg='No start arguments selected. Exit.')
break
else:
logging.warning(msg='Restart this as root!')
time_execute = datetime.now() - time_start
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')