openvpn-management/ovpn_status.py

236 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""It's the OpenVPN server status parser.
"""
import json
import re
import time
from argparse import ArgumentParser
from ipaddress import IPv4Network
from os import path
from cryptography import x509
import requests
def status(stats_file: str, client_filter: str = '.*', client_geo: bool = False) -> dict:
"""OpenVPN status log parser.
Args:
stats_file (str): path to OpenVPN status log file.
client_filter (str, optional): client names filter by regex. Defaults to '.*'.
client_geo (bool, optional): check client real ip geo location. Defaults to False.
Returns:
dict: {
'stats_updated': timestamp,
'clients_count': int,
'clients_found': int,
'data': [
{
"name": str,
"r_ip": str,
"r_cc": str,
"v_ip": str,
"b_rx": int,
"b_tx": int,
"t_cs": timestamp,
"t_cd": int
},
]
}
"""
with open(stats_file, mode='r', encoding='utf-8') as file:
stats_data = file.read()
if re.match("^OpenVPN CLIENT LIST", stats_data):
# status-version 1
stats_vers = 1
dlm = ','
elif re.match("^TITLE,", stats_data):
# status-version 2
stats_vers = 2
dlm = ','
elif re.match("^TITLE\t", stats_data):
# status-version 3
stats_vers = 3
dlm = '\t'
else:
stats_vers = 0
clients_array = []
clients_count = -1
clients_found = 0
stats_updated = -1
if stats_vers == 0:
pass
elif stats_vers == 1:
updated_r = re.search('Updated' + dlm + '.*', stats_data).group(0)
stats_updated = updated_r.replace('Updated' + dlm, '')
updated_t = time.mktime(time.strptime(stats_updated, "%Y-%m-%d %H:%M:%S"))
clients_s = 'Common Name,Real Address,Bytes Received,Bytes Sent,Connected Since'
clients_e = 'ROUTING TABLE'
clients_r = re.search(clients_s + "(.*)" + clients_e, stats_data, re.DOTALL).group(0)
stats_clients = re.sub(clients_s + '\n', '', re.sub(clients_e, '', clients_r))
routing_s = 'Virtual Address,Common Name,Real Address,Last Ref'
routing_e = 'GLOBAL STATS'
routing_r = re.search(routing_s + "(.*)" + routing_e, stats_data, re.DOTALL).group(0)
stats_routing = re.sub(routing_s + '\n', '', re.sub(routing_e, '', routing_r))
clients_count = len(stats_clients.splitlines())
if clients_count > 0:
for client in stats_clients.splitlines():
client_name = client.split(dlm)[0]
client_r_ip = client.split(dlm)[1].split(':')[0]
client_r_cc = '--'
if client_geo:
client_r_cc = ip_geo(addr=client_r_ip)
if re.findall(client_filter, client_name):
regex_v_ip = re.compile('.*' + dlm.join(client.split(dlm)[:2]) + '.*')
clients_array.append(
{
'name': client_name,
'r_ip': client_r_ip,
'r_cc': client_r_cc,
'v_ip': regex_v_ip.search(stats_routing).group(0).split(dlm)[0],
'b_rx': int(client.split(dlm)[2]),
'b_tx': int(client.split(dlm)[3]),
't_cs': client.split(dlm)[4],
't_cd': int(updated_t) - int(
time.mktime(
time.strptime(client.split(dlm)[4], "%Y-%m-%d %H:%M:%S")
)
)
}
)
clients_found += 1
else:
updated_r = re.search('TIME' + dlm + '.*', stats_data).group(0)
updated_t = updated_r.split(dlm)[2]
stats_updated = updated_r.split(dlm)[1]
stats_clients = '\n'.join(re.findall("^CLIENT_LIST.*", stats_data, re.MULTILINE))
clients_count = len(stats_clients.splitlines())
if clients_count > 0:
for client in stats_clients.splitlines():
client_name = client.split(dlm)[1]
client_r_ip = client.split(dlm)[2].split(':')[0]
client_r_cc = '--'
if client_geo:
client_r_cc = ip_geo(addr=client_r_ip)
if re.search(client_filter, client_name):
clients_array.append(
{
'name': client_name,
'r_ip': client_r_ip,
'r_cc': client_r_cc,
'v_ip': client.split(dlm)[3],
'b_rx': int(client.split(dlm)[5]),
'b_tx': int(client.split(dlm)[6]),
't_cs': client.split(dlm)[7],
't_cd': int(updated_t) - int(client.split(dlm)[8])
}
)
clients_found += 1
clients_stats = {
'stats_updated': stats_updated,
'clients_count': clients_count,
'clients_found': clients_found,
'data': clients_array,
}
return clients_stats
def ip_num(addr: str, mask: (str, int)) -> int:
"""OpenVPN client ip limit calculator (without --ifconfig-pool-linear).
Args:
addr (str): server subnet.
mask (str, int): server mask.
Returns:
int: ip limit.
"""
return int(IPv4Network(addr + '/' + mask).num_addresses/4-1)
def ce_exp(cert_path: str) -> int:
"""Get certificate expiration time.
Args:
cert_path (str): path to certificate file.
Returns:
int: certificate expiration time in epoch.
"""
with open(cert_path, mode='rb') as cert_file:
cert_data = x509.load_pem_x509_certificate(cert_file.read())
return int(cert_data.not_valid_after.timestamp())
def ip_geo(addr: str) -> str:
"""Get ip address geo location.
Args:
addr (str): ip address.
Returns:
str: country code.
"""
try:
request = 'https://geolocation-db.com/json/' + addr
response = requests.get(request, timeout=5)
result = json.loads(response.content.decode())
return result['country_code']
except requests.exceptions.RequestException:
return '--'
if __name__ == "__main__":
args = ArgumentParser(
prog='ovpn_status',
description='OpenVPN server status parser',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- Python 3 modules: cryptography, requests '
)
args.add_argument('-s', '--server_conf', type=str, required=True,
help='path to OpenVPN server configuration file')
args.add_argument('-f', '--filter', type=str, default='.*', required=False,
help='client names filter by regex')
args.add_argument('-g', '--geo', action='store_true', required=False,
help='check client real ip geo location (may be slow)')
args = vars(args.parse_args())
if path.exists(args['server_conf']):
with open(args['server_conf'], mode='r', encoding='utf-8') as conf_file:
conf_data = conf_file.read()
st_file_conf = re.search(r'status\s+\S*', conf_data, re.MULTILINE).group(0)
st_file_path = re.sub(r'status\s+', '', st_file_conf)
json_data = status(
stats_file=st_file_path,
client_filter=args['filter'],
client_geo=args['geo']
)
ca_file_conf = re.search(r'ca\s+\S*', conf_data, re.MULTILINE).group(0)
ca_file_path = re.sub(r'ca\s+', '', ca_file_conf)
json_data['ca_expiration'] = ce_exp(cert_path=ca_file_path) - int(time.time())
ce_file_conf = re.search(r'cert\s+\S*', conf_data, re.MULTILINE).group(0)
ce_file_path = re.sub(r'cert\s+', '', ce_file_conf)
json_data['ce_expiration'] = ce_exp(cert_path=ce_file_path) - int(time.time())
network_conf = re.search(r'server\s+\S*\s+\S*', conf_data, re.MULTILINE).group(0)
network_pool = re.sub(r'server\s+', '', network_conf)
network_addr = re.sub(r'\s+\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', '', network_pool)
network_mask = re.sub(r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}\s+', '', network_pool)
json_data['clients_limit'] = ip_num(addr=network_addr, mask=network_mask)
print(json.dumps(json_data, indent=2))