wireguard-management/wg_status.py

196 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
# pylint: disable=W0612
"""It's the Wireguard server status parser.
"""
import json
import re
import time
from argparse import ArgumentParser
from os import path, walk
from subprocess import Popen, PIPE, STDOUT
import requests
def status(configs_root: str, client_filter: str = '.*', client_geo: bool = False) -> dict:
"""Wireguard server's peers status parser.
Args:
configs_root (str): root path to peers configs.
client_filter (str, optional): client names filter by regex. Defaults to '.*'.
client_geo (bool, optional): check client real ip geo location. Defaults to False.
Returns:
dict: {
'stats_updated': timestamp,
'clients_count': int,
'clients_found': int,
'data': [
{
"name": str,
"r_ip": str,
"r_cc": str,
"v_ip": str,
"b_rx": int,
"b_tx": int,
"t_lh": timestamp,
},
]
}
"""
clients_dump = wg_dmp()
clients_array = []
clients_count = len(clients_dump)
clients_found = 0
stats_updated = time.strftime("%Y-%m-%d %H:%M:%S")
if clients_count > 0:
configs_dump = []
for root, dirs, files in walk(configs_root, topdown=False):
for file_name in files:
file_path = path.join(path.realpath(root), file_name)
with open(file_path, mode='r', encoding='utf-8') as file:
try:
file_data = file.read()
configs_dump.append(
{
'file_name': file_name,
'file_path': file_path,
'file_data': file_data
}
)
except UnicodeDecodeError:
pass
for client in clients_dump:
client_name = client['name']
for config in configs_dump:
if client_name in config['file_data'] and not client['w_if'] in config['file_name']:
client_name = config['file_name'].replace('.key', '').replace('.conf', '')
break
if re.findall(client_filter, client_name) or re.findall(client_filter, client['name']):
reject_after_time = 180
client_dl = int(time.mktime(time.strptime(stats_updated, "%Y-%m-%d %H:%M:%S")))
if client['p_lh'] + reject_after_time >= client_dl:
client_r_ip = client['r_ip']
client_r_cc = '--'
if client_geo and re.match(r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', client_r_ip):
client_r_cc = ip_geo(addr=client_r_ip)
client_t_lh = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(client['p_lh']))
clients_array.append(
{
'name': client_name,
'r_ip': client_r_ip,
'r_cc': client_r_cc,
'v_ip': client['v_ip'],
'b_rx': client['b_rx'],
'b_tx': client['b_tx'],
't_lh': client_t_lh
}
)
clients_found += 1
clients_stats = {
'stats_updated': stats_updated,
'clients_count': clients_count,
'clients_found': clients_found,
'data': clients_array,
}
return clients_stats
def wg_dmp() -> list:
"""Parse 'wg show all dump' result.
Returns:
list: [
{
"name": str,
"r_ip": str,
"v_ip": str,
"b_rx": int,
"b_tx": int,
"p_lh": int,
"i_ka": int,
"w_if": str
},
]
"""
process = ['wg', 'show', 'all', 'dump']
with Popen(process, stdout=PIPE, stderr=STDOUT) as proc:
dlm = '\t'
dump_text = proc.stdout.read().decode('utf-8')
peer_list = []
for dump_line in dump_text.splitlines():
line_list = dump_line.split(dlm)
w_if = line_list[0]
if len(line_list) == 5:
pass
if len(line_list) == 9:
try:
i_ka = int(line_list[8])
except ValueError:
i_ka = 0
peer_list.append(
{
"name": line_list[1],
"r_ip": line_list[3].split(':')[0],
"v_ip": line_list[4],
"b_rx": int(line_list[6]),
"b_tx": int(line_list[7]),
"p_lh": int(line_list[5]),
"i_ka": i_ka,
"w_if": w_if
}
)
return peer_list
def ip_geo(addr: str) -> str:
"""Get ip address geo location.
Args:
addr (str): ip address.
Returns:
str: country code.
"""
try:
request = 'https://geolocation-db.com/json/' + addr
response = requests.get(request, timeout=5)
result = json.loads(response.content.decode())
return result['country_code']
except requests.exceptions.RequestException:
return '--'
if __name__ == "__main__":
args = ArgumentParser(
prog='wg_status',
description='Wireguard server status parser',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- Python 3 modules: requests '
)
args.add_argument('-p', '--peers_root', type=str, default='/etc/wireguard/pki', required=False,
help='root path to peers configs or public keys')
args.add_argument('-f', '--filter', type=str, default='.*', required=False,
help='client names filter by regex')
args.add_argument('-g', '--geo', action='store_true', required=False,
help='check client real ip geo location (may be slow)')
args = vars(args.parse_args())
json_data = status(
configs_root=args['peers_root'],
client_filter=args['filter'],
client_geo=args['geo']
)
print(json.dumps(json_data, indent=2))