generated from pavel.muhortov/template-bash
236 lines
8.7 KiB
Python
236 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""It's the OpenVPN server status parser.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import time
|
|
from argparse import ArgumentParser
|
|
from ipaddress import IPv4Network
|
|
from os import path
|
|
from cryptography import x509
|
|
import requests
|
|
|
|
|
|
def status(stats_file: str, client_filter: str = '.*', client_geo: bool = False) -> dict:
|
|
"""OpenVPN status log parser.
|
|
|
|
Args:
|
|
stats_file (str): path to OpenVPN status log file.
|
|
client_filter (str, optional): client names filter by regex. Defaults to '.*'.
|
|
client_geo (bool, optional): check client real ip geo location. Defaults to False.
|
|
|
|
Returns:
|
|
dict: {
|
|
'stats_updated': timestamp,
|
|
'clients_count': int,
|
|
'clients_found': int,
|
|
'data': [
|
|
{
|
|
"name": str,
|
|
"r_ip": str,
|
|
"r_cc": str,
|
|
"v_ip": str,
|
|
"b_rx": int,
|
|
"b_tx": int,
|
|
"t_cs": timestamp,
|
|
"t_cd": int
|
|
},
|
|
]
|
|
}
|
|
"""
|
|
with open(stats_file, mode='r', encoding='utf-8') as file:
|
|
stats_data = file.read()
|
|
|
|
if re.match("^OpenVPN CLIENT LIST", stats_data):
|
|
# status-version 1
|
|
stats_vers = 1
|
|
dlm = ','
|
|
elif re.match("^TITLE,", stats_data):
|
|
# status-version 2
|
|
stats_vers = 2
|
|
dlm = ','
|
|
elif re.match("^TITLE\t", stats_data):
|
|
# status-version 3
|
|
stats_vers = 3
|
|
dlm = '\t'
|
|
else:
|
|
stats_vers = 0
|
|
|
|
clients_array = []
|
|
clients_count = -1
|
|
clients_found = 0
|
|
stats_updated = -1
|
|
|
|
if stats_vers == 0:
|
|
pass
|
|
elif stats_vers == 1:
|
|
updated_r = re.search('Updated' + dlm + '.*', stats_data).group(0)
|
|
stats_updated = updated_r.replace('Updated' + dlm, '')
|
|
updated_t = time.mktime(time.strptime(stats_updated, "%Y-%m-%d %H:%M:%S"))
|
|
|
|
clients_s = 'Common Name,Real Address,Bytes Received,Bytes Sent,Connected Since'
|
|
clients_e = 'ROUTING TABLE'
|
|
clients_r = re.search(clients_s + "(.*)" + clients_e, stats_data, re.DOTALL).group(0)
|
|
stats_clients = re.sub(clients_s + '\n', '', re.sub(clients_e, '', clients_r))
|
|
|
|
routing_s = 'Virtual Address,Common Name,Real Address,Last Ref'
|
|
routing_e = 'GLOBAL STATS'
|
|
routing_r = re.search(routing_s + "(.*)" + routing_e, stats_data, re.DOTALL).group(0)
|
|
stats_routing = re.sub(routing_s + '\n', '', re.sub(routing_e, '', routing_r))
|
|
|
|
clients_count = len(stats_clients.splitlines())
|
|
if clients_count > 0:
|
|
for client in stats_clients.splitlines():
|
|
client_name = client.split(dlm)[0]
|
|
client_r_ip = client.split(dlm)[1].split(':')[0]
|
|
client_r_cc = '--'
|
|
if client_geo:
|
|
client_r_cc = ip_geo(addr=client_r_ip)
|
|
if re.findall(client_filter, client_name):
|
|
regex_v_ip = re.compile('.*' + dlm.join(client.split(dlm)[:2]) + '.*')
|
|
clients_array.append(
|
|
{
|
|
'name': client_name,
|
|
'r_ip': client_r_ip,
|
|
'r_cc': client_r_cc,
|
|
'v_ip': regex_v_ip.search(stats_routing).group(0).split(dlm)[0],
|
|
'b_rx': int(client.split(dlm)[2]),
|
|
'b_tx': int(client.split(dlm)[3]),
|
|
't_cs': client.split(dlm)[4],
|
|
't_cd': int(updated_t) - int(
|
|
time.mktime(
|
|
time.strptime(client.split(dlm)[4], "%Y-%m-%d %H:%M:%S")
|
|
)
|
|
)
|
|
}
|
|
)
|
|
clients_found += 1
|
|
else:
|
|
updated_r = re.search('TIME' + dlm + '.*', stats_data).group(0)
|
|
updated_t = updated_r.split(dlm)[2]
|
|
stats_updated = updated_r.split(dlm)[1]
|
|
stats_clients = '\n'.join(re.findall("^CLIENT_LIST.*", stats_data, re.MULTILINE))
|
|
|
|
clients_count = len(stats_clients.splitlines())
|
|
if clients_count > 0:
|
|
for client in stats_clients.splitlines():
|
|
client_name = client.split(dlm)[1]
|
|
client_r_ip = client.split(dlm)[2].split(':')[0]
|
|
client_r_cc = '--'
|
|
if client_geo:
|
|
client_r_cc = ip_geo(addr=client_r_ip)
|
|
if re.search(client_filter, client_name):
|
|
clients_array.append(
|
|
{
|
|
'name': client_name,
|
|
'r_ip': client_r_ip,
|
|
'r_cc': client_r_cc,
|
|
'v_ip': client.split(dlm)[3],
|
|
'b_rx': int(client.split(dlm)[5]),
|
|
'b_tx': int(client.split(dlm)[6]),
|
|
't_cs': client.split(dlm)[7],
|
|
't_cd': int(updated_t) - int(client.split(dlm)[8])
|
|
}
|
|
)
|
|
clients_found += 1
|
|
|
|
clients_stats = {
|
|
'stats_updated': stats_updated,
|
|
'clients_count': clients_count,
|
|
'clients_found': clients_found,
|
|
'data': clients_array,
|
|
}
|
|
return clients_stats
|
|
|
|
|
|
def ip_num(addr: str, mask: (str, int)) -> int:
|
|
"""OpenVPN client ip limit calculator (without --ifconfig-pool-linear).
|
|
|
|
Args:
|
|
addr (str): server subnet.
|
|
mask (str, int): server mask.
|
|
|
|
Returns:
|
|
int: ip limit.
|
|
"""
|
|
return int(IPv4Network(addr + '/' + mask).num_addresses/4-1)
|
|
|
|
|
|
def ce_exp(cert_path: str) -> int:
|
|
"""Get certificate expiration time.
|
|
|
|
Args:
|
|
cert_path (str): path to certificate file.
|
|
|
|
Returns:
|
|
int: certificate expiration time in epoch.
|
|
"""
|
|
with open(cert_path, mode='rb') as cert_file:
|
|
cert_data = x509.load_pem_x509_certificate(cert_file.read())
|
|
return int(cert_data.not_valid_after.timestamp())
|
|
|
|
|
|
def ip_geo(addr: str) -> str:
|
|
"""Get ip address geo location.
|
|
|
|
Args:
|
|
addr (str): ip address.
|
|
|
|
Returns:
|
|
str: country code.
|
|
"""
|
|
try:
|
|
request = 'https://geolocation-db.com/json/' + addr
|
|
response = requests.get(request, timeout=5)
|
|
result = json.loads(response.content.decode())
|
|
return result['country_code']
|
|
except requests.exceptions.RequestException:
|
|
return '--'
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = ArgumentParser(
|
|
prog='ovpn_status',
|
|
description='OpenVPN server status parser',
|
|
epilog='Dependencies: '
|
|
'- Python 3 (tested version 3.9.5), '
|
|
'- Python 3 modules: cryptography, requests '
|
|
)
|
|
args.add_argument('-s', '--server_conf', type=str, required=True,
|
|
help='path to OpenVPN server configuration file')
|
|
args.add_argument('-f', '--filter', type=str, default='.*', required=False,
|
|
help='client names filter by regex')
|
|
args.add_argument('-g', '--geo', action='store_true', required=False,
|
|
help='check client real ip geo location (may be slow)')
|
|
args = vars(args.parse_args())
|
|
|
|
if path.exists(args['server_conf']):
|
|
with open(args['server_conf'], mode='r', encoding='utf-8') as conf_file:
|
|
conf_data = conf_file.read()
|
|
|
|
st_file_conf = re.search(r'status\s+\S*', conf_data, re.MULTILINE).group(0)
|
|
st_file_path = re.sub(r'status\s+', '', st_file_conf)
|
|
json_data = status(
|
|
stats_file=st_file_path,
|
|
client_filter=args['filter'],
|
|
client_geo=args['geo']
|
|
)
|
|
|
|
ca_file_conf = re.search(r'ca\s+\S*', conf_data, re.MULTILINE).group(0)
|
|
ca_file_path = re.sub(r'ca\s+', '', ca_file_conf)
|
|
json_data['ca_expiration'] = ce_exp(cert_path=ca_file_path) - int(time.time())
|
|
|
|
ce_file_conf = re.search(r'cert\s+\S*', conf_data, re.MULTILINE).group(0)
|
|
ce_file_path = re.sub(r'cert\s+', '', ce_file_conf)
|
|
json_data['ce_expiration'] = ce_exp(cert_path=ce_file_path) - int(time.time())
|
|
|
|
network_conf = re.search(r'server\s+\S*\s+\S*', conf_data, re.MULTILINE).group(0)
|
|
network_pool = re.sub(r'server\s+', '', network_conf)
|
|
network_addr = re.sub(r'\s+\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', '', network_pool)
|
|
network_mask = re.sub(r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}\s+', '', network_pool)
|
|
json_data['clients_limit'] = ip_num(addr=network_addr, mask=network_mask)
|
|
|
|
print(json.dumps(json_data, indent=2))
|