#!/usr/bin/env python3 """It's the OpenVPN server status parser. """ import json import re import time from argparse import ArgumentParser from ipaddress import IPv4Network from os import path from cryptography import x509 import requests def status(stats_file: str, client_filter: str = '.*', client_geo: bool = False) -> dict: """OpenVPN status log parser. Args: stats_file (str): path to OpenVPN status log file. client_filter (str, optional): client names filter by regex. Defaults to '.*'. client_geo (bool, optional): check client real ip geo location Defaults to False. Returns: dict: { 'stats_updated': timestamp, 'clients_count': int, 'clients_found': int, 'data': [ { "name": str, "r_ip": str, "r_cc": str, "v_ip": str, "b_rx": int, "b_tx": int, "t_cs": timestamp, "t_cd": int }, ] } """ with open(stats_file, mode='r', encoding='utf-8') as file: stats_data = file.read() if re.match("^OpenVPN CLIENT LIST", stats_data): # status-version 1 stats_vers = 1 dlm = ',' elif re.match("^TITLE,", stats_data): # status-version 2 stats_vers = 2 dlm = ',' elif re.match("^TITLE\t", stats_data): # status-version 3 stats_vers = 3 dlm = '\t' else: stats_vers = 0 clients_array = [] clients_count = -1 clients_found = 0 stats_updated = -1 if stats_vers == 0: pass elif stats_vers == 1: updated_r = re.search('Updated' + dlm + '.*', stats_data).group(0) stats_updated = updated_r.replace('Updated' + dlm, '') updated_t = time.mktime(time.strptime(stats_updated, "%Y-%m-%d %H:%M:%S")) clients_s = 'Common Name,Real Address,Bytes Received,Bytes Sent,Connected Since' clients_e = 'ROUTING TABLE' clients_r = re.search(clients_s + "(.*)" + clients_e, stats_data, re.DOTALL).group(0) stats_clients = re.sub(clients_s + '\n', '', re.sub(clients_e, '', clients_r)) routing_s = 'Virtual Address,Common Name,Real Address,Last Ref' routing_e = 'GLOBAL STATS' routing_r = re.search(routing_s + "(.*)" + routing_e, stats_data, re.DOTALL).group(0) stats_routing = re.sub(routing_s + '\n', '', re.sub(routing_e, '', routing_r)) clients_count = len(stats_clients.splitlines()) if clients_count > 0: for client in stats_clients.splitlines(): client_name = client.split(dlm)[0] client_r_ip = client.split(dlm)[1].split(':')[0] client_r_cc = '--' if client_geo: client_r_cc = ip_geo(addr=client_r_ip) if re.findall(client_filter, client_name): regex_v_ip = re.compile('.*' + dlm.join(client.split(dlm)[:2]) + '.*') clients_array.append( { 'name': client_name, 'r_ip': client_r_ip, 'r_cc': client_r_cc, 'v_ip': regex_v_ip.search(stats_routing).group(0).split(dlm)[0], 'b_rx': int(client.split(dlm)[2]), 'b_tx': int(client.split(dlm)[3]), 't_cs': client.split(dlm)[4], 't_cd': int(updated_t) - int( time.mktime( time.strptime(client.split(dlm)[4], "%Y-%m-%d %H:%M:%S") ) ) } ) clients_found += 1 else: updated_r = re.search('TIME' + dlm + '.*', stats_data).group(0) updated_t = updated_r.split(dlm)[2] stats_updated = updated_r.split(dlm)[1] stats_clients = '\n'.join(re.findall("^CLIENT_LIST.*", stats_data, re.MULTILINE)) clients_count = len(stats_clients.splitlines()) if clients_count > 0: for client in stats_clients.splitlines(): client_name = client.split(dlm)[1] client_r_ip = client.split(dlm)[2].split(':')[0] client_r_cc = '--' if client_geo: client_r_cc = ip_geo(addr=client_r_ip) if re.search(client_filter, client_name): clients_array.append( { 'name': client_name, 'r_ip': client_r_ip, 'r_cc': client_r_cc, 'v_ip': client.split(dlm)[3], 'b_rx': int(client.split(dlm)[5]), 'b_tx': int(client.split(dlm)[6]), 't_cs': client.split(dlm)[7], 't_cd': int(updated_t) - int(client.split(dlm)[8]) } ) clients_found += 1 clients_stats = { 'stats_updated': stats_updated, 'clients_count': clients_count, 'clients_found': clients_found, 'data': clients_array, } return clients_stats def ip_num(addr: str, mask: (str, int)) -> int: """OpenVPN client ip limit calculator (without --ifconfig-pool-linear). Args: addr (str): server subnet. mask (str, int): server mask. Returns: int: ip limit. """ return int(IPv4Network(addr + '/' + mask).num_addresses/4-1) def ce_exp(cert_path: str) -> int: """Get certificate expiration time. Args: cert_path (str): path to certificate file. Returns: int: certificate expiration time in epoch. """ with open(cert_path, mode='rb') as cert_file: cert_data = x509.load_pem_x509_certificate(cert_file.read()) return int(cert_data.not_valid_after.timestamp()) def ip_geo(addr: str) -> str: """Get ip address geo location. Args: addr (str): ip address. Returns: str: country code. """ try: request = 'https://geolocation-db.com/json/' + addr response = requests.get(request, timeout=5) result = json.loads(response.content.decode()) return result['country_code'] except requests.exceptions.RequestException: return '--' if __name__ == "__main__": args = ArgumentParser( prog='ovpn_status', description='OpenVPN server status parser', epilog='Dependencies: ' '- Python 3 (tested version 3.9.5), ' '- Python 3 modules: cryptography, requests ' ) args.add_argument('-s', '--server_conf', type=str, required=True, help='path to OpenVPN server configuration file') args.add_argument('-f', '--filter', type=str, default='.*', required=False, help='client names filter by regex') args.add_argument('-g', '--geo', action='store_true', required=False, help='check client real ip geo location (may be slow)') args = vars(args.parse_args()) if path.exists(args['server_conf']): with open(args['server_conf'], mode='r', encoding='utf-8') as conf_file: conf_data = conf_file.read() st_file_conf = re.search(r'status\s+\S*', conf_data, re.MULTILINE).group(0) st_file_path = re.sub(r'status\s+', '', st_file_conf) json_data = status( stats_file=st_file_path, client_filter=args['filter'], client_geo=args['geo'] ) ca_file_conf = re.search(r'ca\s+\S*', conf_data, re.MULTILINE).group(0) ca_file_path = re.sub(r'ca\s+', '', ca_file_conf) json_data['ca_expiration'] = ce_exp(cert_path=ca_file_path) - int(time.time()) ce_file_conf = re.search(r'cert\s+\S*', conf_data, re.MULTILINE).group(0) ce_file_path = re.sub(r'cert\s+', '', ce_file_conf) json_data['ce_expiration'] = ce_exp(cert_path=ce_file_path) - int(time.time()) network_conf = re.search(r'server\s+\S*\s+\S*', conf_data, re.MULTILINE).group(0) network_pool = re.sub(r'server\s+', '', network_conf) network_addr = re.sub(r'\s+\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', '', network_pool) network_mask = re.sub(r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}\s+', '', network_pool) json_data['clients_limit'] = ip_num(addr=network_addr, mask=network_mask) print(json.dumps(json_data, indent=2))