generated from pavel.muhortov/template-bash
196 lines
6.4 KiB
Python
Executable File
196 lines
6.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# pylint: disable=W0612
|
|
|
|
"""It's the Wireguard server status parser.
|
|
"""
|
|
|
|
|
|
import json
|
|
import re
|
|
import time
|
|
from argparse import ArgumentParser
|
|
from os import path, walk
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
import requests
|
|
|
|
|
|
def status(configs_root: str, client_filter: str = '.*', client_geo: bool = False) -> dict:
|
|
"""Wireguard server's peers status parser.
|
|
|
|
Args:
|
|
configs_root (str): root path to peers configs.
|
|
client_filter (str, optional): client names filter by regex. Defaults to '.*'.
|
|
client_geo (bool, optional): check client real ip geo location. Defaults to False.
|
|
|
|
Returns:
|
|
dict: {
|
|
'stats_updated': timestamp,
|
|
'clients_count': int,
|
|
'clients_found': int,
|
|
'data': [
|
|
{
|
|
"name": str,
|
|
"r_ip": str,
|
|
"r_cc": str,
|
|
"v_ip": str,
|
|
"b_rx": int,
|
|
"b_tx": int,
|
|
"t_lh": timestamp,
|
|
},
|
|
]
|
|
}
|
|
"""
|
|
|
|
clients_dump = wg_dmp()
|
|
clients_array = []
|
|
clients_count = len(clients_dump)
|
|
clients_found = 0
|
|
stats_updated = time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
if clients_count > 0:
|
|
configs_dump = []
|
|
for root, dirs, files in walk(configs_root, topdown=False):
|
|
for file_name in files:
|
|
file_path = path.join(path.realpath(root), file_name)
|
|
with open(file_path, mode='r', encoding='utf-8') as file:
|
|
try:
|
|
file_data = file.read()
|
|
configs_dump.append(
|
|
{
|
|
'file_name': file_name,
|
|
'file_path': file_path,
|
|
'file_data': file_data
|
|
}
|
|
)
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
for client in clients_dump:
|
|
client_name = client['name']
|
|
for config in configs_dump:
|
|
if client_name in config['file_data'] and not client['w_if'] in config['file_name']:
|
|
client_name = config['file_name'].replace('.key', '').replace('.conf', '')
|
|
break
|
|
|
|
if re.findall(client_filter, client_name) or re.findall(client_filter, client['name']):
|
|
reject_after_time = 180
|
|
client_dl = int(time.mktime(time.strptime(stats_updated, "%Y-%m-%d %H:%M:%S")))
|
|
|
|
if client['p_lh'] + reject_after_time >= client_dl:
|
|
client_r_ip = client['r_ip']
|
|
client_r_cc = '--'
|
|
if client_geo and re.match(r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}', client_r_ip):
|
|
client_r_cc = ip_geo(addr=client_r_ip)
|
|
|
|
client_t_lh = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(client['p_lh']))
|
|
clients_array.append(
|
|
{
|
|
'name': client_name,
|
|
'r_ip': client_r_ip,
|
|
'r_cc': client_r_cc,
|
|
'v_ip': client['v_ip'],
|
|
'b_rx': client['b_rx'],
|
|
'b_tx': client['b_tx'],
|
|
't_lh': client_t_lh
|
|
}
|
|
)
|
|
clients_found += 1
|
|
|
|
clients_stats = {
|
|
'stats_updated': stats_updated,
|
|
'clients_count': clients_count,
|
|
'clients_found': clients_found,
|
|
'data': clients_array,
|
|
}
|
|
return clients_stats
|
|
|
|
|
|
def wg_dmp() -> list:
|
|
"""Parse 'wg show all dump' result.
|
|
|
|
Returns:
|
|
list: [
|
|
{
|
|
"name": str,
|
|
"r_ip": str,
|
|
"v_ip": str,
|
|
"b_rx": int,
|
|
"b_tx": int,
|
|
"p_lh": int,
|
|
"i_ka": int,
|
|
"w_if": str
|
|
},
|
|
]
|
|
"""
|
|
process = ['wg', 'show', 'all', 'dump']
|
|
with Popen(process, stdout=PIPE, stderr=STDOUT) as proc:
|
|
dlm = '\t'
|
|
dump_text = proc.stdout.read().decode('utf-8')
|
|
peer_list = []
|
|
for dump_line in dump_text.splitlines():
|
|
line_list = dump_line.split(dlm)
|
|
w_if = line_list[0]
|
|
if len(line_list) == 5:
|
|
pass
|
|
if len(line_list) == 9:
|
|
try:
|
|
i_ka = int(line_list[8])
|
|
except ValueError:
|
|
i_ka = 0
|
|
peer_list.append(
|
|
{
|
|
"name": line_list[1],
|
|
"r_ip": line_list[3].split(':')[0],
|
|
"v_ip": line_list[4],
|
|
"b_rx": int(line_list[6]),
|
|
"b_tx": int(line_list[7]),
|
|
"p_lh": int(line_list[5]),
|
|
"i_ka": i_ka,
|
|
"w_if": w_if
|
|
}
|
|
)
|
|
return peer_list
|
|
|
|
|
|
def ip_geo(addr: str) -> str:
|
|
"""Get ip address geo location.
|
|
|
|
Args:
|
|
addr (str): ip address.
|
|
|
|
Returns:
|
|
str: country code.
|
|
"""
|
|
try:
|
|
request = 'https://geolocation-db.com/json/' + addr
|
|
response = requests.get(request, timeout=5)
|
|
result = json.loads(response.content.decode())
|
|
return result['country_code']
|
|
except requests.exceptions.RequestException:
|
|
return '--'
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = ArgumentParser(
|
|
prog='wg_status',
|
|
description='Wireguard server status parser',
|
|
epilog='Dependencies: '
|
|
'- Python 3 (tested version 3.9.5), '
|
|
'- Python 3 modules: requests '
|
|
)
|
|
args.add_argument('-p', '--peers_root', type=str, default='/etc/wireguard/pki', required=False,
|
|
help='root path to peers configs or public keys')
|
|
args.add_argument('-f', '--filter', type=str, default='.*', required=False,
|
|
help='client names filter by regex')
|
|
args.add_argument('-g', '--geo', action='store_true', required=False,
|
|
help='check client real ip geo location (may be slow)')
|
|
args = vars(args.parse_args())
|
|
|
|
json_data = status(
|
|
configs_root=args['peers_root'],
|
|
client_filter=args['filter'],
|
|
client_geo=args['geo']
|
|
)
|
|
|
|
print(json.dumps(json_data, indent=2))
|