#!/usr/bin/env python3 # pylint: disable=W0612 """It's the Wireguard server status parser. """ import json import re import time from argparse import ArgumentParser from os import path, walk from subprocess import Popen, PIPE, STDOUT import requests def status(configs_root: str, client_filter: str = '.*', client_geo: bool = False) -> dict: """Wireguard server's peers status parser. Args: configs_root (str): root path to peers configs. client_filter (str, optional): client names filter by regex. Defaults to '.*'. client_geo (bool, optional): check client real ip geo location. Defaults to False. Returns: dict: { 'stats_updated': timestamp, 'clients_count': int, 'clients_found': int, 'data': [ { "name": str, "r_ip": str, "r_cc": str, "v_ip": str, "b_rx": int, "b_tx": int, "t_lh": timestamp, }, ] } """ clients_dump = wg_dmp() clients_array = [] clients_count = len(clients_dump) clients_found = 0 stats_updated = time.strftime("%Y-%m-%d %H:%M:%S") if clients_count > 0: configs_dump = [] for root, dirs, files in walk(configs_root, topdown=False): for file_name in files: file_path = path.join(path.realpath(root), file_name) with open(file_path, mode='r', encoding='utf-8') as file: try: file_data = file.read() configs_dump.append( { 'file_name': file_name, 'file_path': file_path, 'file_data': file_data } ) except UnicodeDecodeError: pass for client in clients_dump: client_name = client['name'] for config in configs_dump: if client_name in config['file_data'] and not client['w_if'] in config['file_name']: client_name = config['file_name'].replace('.key', '').replace('.conf', '') break if re.findall(client_filter, client_name) or re.findall(client_filter, client['name']): reject_after_time = 180 client_dl = int(time.mktime(time.strptime(stats_updated, "%Y-%m-%d %H:%M:%S"))) if client['p_lh'] + reject_after_time >= client_dl: client_r_ip = client['r_ip'] client_r_cc = '--' if client_geo and re.match(r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', client_r_ip): client_r_cc = ip_geo(addr=client_r_ip) client_t_lh = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(client['p_lh'])) clients_array.append( { 'name': client_name, 'r_ip': client_r_ip, 'r_cc': client_r_cc, 'v_ip': client['v_ip'], 'b_rx': client['b_rx'], 'b_tx': client['b_tx'], 't_lh': client_t_lh } ) clients_found += 1 clients_stats = { 'stats_updated': stats_updated, 'clients_count': clients_count, 'clients_found': clients_found, 'data': clients_array, } return clients_stats def wg_dmp() -> list: """Parse 'wg show all dump' result. Returns: list: [ { "name": str, "r_ip": str, "v_ip": str, "b_rx": int, "b_tx": int, "p_lh": int, "i_ka": int, "w_if": str }, ] """ process = ['wg', 'show', 'all', 'dump'] with Popen(process, stdout=PIPE, stderr=STDOUT) as proc: dlm = '\t' dump_text = proc.stdout.read().decode('utf-8') peer_list = [] for dump_line in dump_text.splitlines(): line_list = dump_line.split(dlm) w_if = line_list[0] if len(line_list) == 5: pass if len(line_list) == 9: peer_list.append( { "name": line_list[1], "r_ip": line_list[3].split(':')[0], "v_ip": line_list[4], "b_rx": int(line_list[6]), "b_tx": int(line_list[7]), "p_lh": int(line_list[5]), "i_ka": int(line_list[8]), "w_if": w_if } ) return peer_list def ip_geo(addr: str) -> str: """Get ip address geo location. Args: addr (str): ip address. Returns: str: country code. """ try: request = 'https://geolocation-db.com/json/' + addr response = requests.get(request, timeout=5) result = json.loads(response.content.decode()) return result['country_code'] except requests.exceptions.RequestException: return '--' if __name__ == "__main__": args = ArgumentParser( prog='wg_status', description='Wireguard server status parser', epilog='Dependencies: ' '- Python 3 (tested version 3.9.5), ' '- Python 3 modules: requests ' ) args.add_argument('-p', '--peers_root', type=str, default='/etc/wireguard/pki', required=False, help='root path to peers configs or public keys') args.add_argument('-f', '--filter', type=str, default='.*', required=False, help='client names filter by regex') args.add_argument('-g', '--geo', action='store_true', required=False, help='check client real ip geo location (may be slow)') args = vars(args.parse_args()) json_data = status( configs_root=args['peers_root'], client_filter=args['filter'], client_geo=args['geo'] ) print(json.dumps(json_data, indent=2))