utils/simplewc.py
2022-04-19 17:06:52 +03:00

247 lines
10 KiB
Python

#!/usr/bin/env python3
from os import path
from shutil import make_archive
from subprocess import Popen, PIPE, STDOUT
from time import sleep
from typing import Union
import urllib.request
try:
from dns import resolver, rdatatype, rdataclass, rdata, update, query, tsigkeyring, tsig, name
except ModuleNotFoundError:
print('You need to install the dnspython package using:\n'
'pip install dnspython\n'
'Then restart the program')
exit(1)
class NSupdate:
"""
Dynamic DNS update from Python
"""
def __init__(self, server: str, zone: str,
keyname: str, keydata: str, keyalgo: name.Name = tsig.HMAC_SHA512) -> None:
"""
Object constructor
:param server: string with master server containing the domain zone
:param zone: string with name of zone
:param keyname: string with name of the key to update the zone
:param keydata: string with content of the key to update the zone
:param keyalgo: dns.tsig method with algorithm for key
"""
self._server = server
self._domain = zone
self._keyring = tsigkeyring.from_text({keyname: keydata})
self._keyalgo = keyalgo
def rec_add(self, rec_key: str, rec_val: str, rec_ttl: int = 60,
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
"""
Add record to DNS zone
:param rec_key: string with record name
:param rec_val: string with record value
:param rec_ttl: integer with record ttl
:param rec_cls: rdataclass of record
:param rec_typ: rdatatype of record
:return: None, QueryMessage, UpdateMessage, Message after request
"""
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
rec_upd.add(rec_key, rec_ttl, rec_val)
return query.tcp(q=rec_upd, where=self._server)
def rec_del(self, rec_key: str, rec_val: str,
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
"""
Delete record from DNS zone
:param rec_key: string with record name
:param rec_val: string with record value
:param rec_cls: rdataclass of record
:param rec_typ: rdatatype of record
:return: None, QueryMessage, UpdateMessage, Message after request
"""
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
rec_upd.delete(rec_key, rec_val)
return query.tcp(q=rec_upd, where=self._server)
class HEupdate:
"""
Dynamic DNS update on dyn.dns.he.net from Python
"""
def __init__(self, ddnskey: str, server: str = 'https://dyn.dns.he.net/nic/update') -> None:
"""
Object constructor
:param ddnskey: string with content of the key (password) to update the record
:param server: string with server for dynamic update request
"""
self._server = server
self._ddnskey = ddnskey
def rec_add(self, rec_key: str, rec_val: str) -> None:
"""
Change record data from DNS zone
:param rec_key: string with record fqdn
:param rec_val: string with record value
:return: None, Message after request
"""
if rec_key[-1] == '.':
rec_key = rec_key[:-1]
self._data = 'hostname=' + rec_key + '&' + 'password=' + self._ddnskey + '&' + 'txt=' + rec_val
request = urllib.request.Request(url=self._server, data=bytes(self._data.encode('utf-8')), method='POST')
response = urllib.request.urlopen(request).read()
if response.startswith(b'\xff\xd8'):
return response
else:
return str(response.decode('utf-8'))
class ACMEcert:
"""
ACME launcher for DNS-01 challenge from Python
"""
def __init__(self, zone: str, acme_path: str = None, test: bool = False, force: bool = False) -> None:
"""
Object constructor
:param zone: string with name of zone
:param acme_path: alternative path to bin (example: ~/.acme.sh/acme.sh)
:param test: boolean "test" argument for the acme.sh
:param force: boolean "force" argument for the acme.sh
"""
self._domain = zone
if not acme_path:
self._acme_path = path.expanduser("~") + path.sep + '.acme.sh' + path.sep + 'acme.sh'
if not(path.exists(self._acme_path)):
print('You need to specify the path to the acme.sh\n'
'or perform the default installation using:\n'
'curl https://get.acme.sh | sh\n'
'Then restart the program')
exit(1)
self._acme_test = test
self._acme_force = force
self._acme_mode = '--issue'
@classmethod
def _run(cls, proc_args: list) -> list:
"""
Launch acme.sh
:param proc_args: list of arguments for acme.sh
:return: list of strings with raw stdout from acme.sh
"""
with Popen(proc_args, stdout=PIPE, stderr=STDOUT) as proc:
result = []
for line in proc.stdout:
result.append(line)
return result
def get(self) -> Union[None, str, list]:
"""
Get certificate or other result after launch acme.sh
:return: None if skip, string with path if issued, list of records if need update dns
"""
acme_args = [self._acme_path, self._acme_mode,
'-d', self._domain, '-d', '*.' + self._domain,
'--dns', '--yes-I-know-dns-manual-mode-enough-go-ahead-please'
]
if self._acme_test:
acme_args.append('--test')
if self._acme_force:
acme_args.append('--force')
acme_skip, acme_sign, acme_auth = b'Skip, Next renewal time is:', b'Your cert is in', b'TXT value:'
result = []
for line in self._run(acme_args):
if acme_skip in line:
return None
if acme_sign in line:
return path.dirname(line.split(acme_sign)[1].decode('utf-8').strip().replace("'", ""))
if acme_auth in line:
result.append(line.split(acme_auth)[1].decode('utf-8').strip().replace("'", ""))
self._acme_mode = '--renew'
return result
if __name__ == "__main__":
from argparse import ArgumentParser
args = ArgumentParser(
prog='SimpleWC',
description='Update Let\'s Encrypt wildcard certificate with DNS-01 challenge',
epilog='Dependencies: '
'Python 3 (tested version 3.9.5), '
'installed or downloaded acme.sh, '
'installed dnspython package, '
'dns is supported to dynamic update'
)
args.add_argument('--domain', type=str, required=True,
help='domain for which the wildcard certificate is issued')
args.add_argument('--server', type=str, required=True,
help='master server containing the domain zone')
args.add_argument('--keyname', type=str, required=True,
help='name of the key to update the zone')
args.add_argument('--keydata', type=str, required=True,
help='content of the key to update the zone')
args.add_argument('--acmepath', type=str, required=False, default=None,
help='alternative path to bin (example: ~/.acme.sh/acme.sh)')
args.add_argument('--force', action='store_true', required=False,
help='"force" argument for the acme.sh')
args.add_argument('--test', action='store_true', required=False,
help='"test" argument for the acme.sh')
args = vars(args.parse_args())
rec_fqdn = '_acme-challenge.' + args['domain'] + '.'
cer_force, cer_test = False, False
if args['force']:
cer_force = True
if args['test']:
cer_test = True
cer = ACMEcert(zone=args['domain'], acme_path=args['acmepath'], force=cer_force, test=cer_test)
dns = None
if 'dns.he.net' in args['server']:
dns = HEupdate(ddnskey=args['keydata'])
else:
dns = NSupdate(zone=args['domain'], server=args['server'], keyname=args['keyname'], keydata=args['keydata'])
attempts_pass, attempts_wait = 0, 60
while attempts_pass < 2:
print('Certificate issue request')
acme_result = cer.get()
if acme_result is None:
print('Skip renewal')
break
elif type(acme_result) is str:
print('You cert in:', acme_result + '.zip')
make_archive(acme_result, 'zip', acme_result)
break
elif type(acme_result) is list:
print('Start DNS-01 challenge')
for record_value in acme_result:
dns.rec_add(rec_fqdn, record_value)
attempts_dns_left, attempts_dns_pass = 1, 0
while attempts_dns_left > 0 and attempts_dns_pass < 5:
seconds = attempts_dns_pass * attempts_wait
print('Pause', seconds, 'seconds')
sleep(seconds)
try:
dns_result = resolver.resolve(qname=rec_fqdn, rdtype='TXT')
except resolver.NXDOMAIN:
print('Not found', rec_fqdn)
attempts_dns_left = attempts_dns_left + 1
else:
for record_value_from_acme in acme_result:
print('Check', rec_fqdn, 'IN TXT', record_value_from_acme)
found = False
for record_value_from_dns in dns_result:
if record_value_from_acme == str(record_value_from_dns).replace('"', ""):
print('Found', rec_fqdn, 'IN TXT', record_value_from_acme)
found = True
break
if not found:
attempts_dns_left = attempts_dns_left + 1
break
attempts_dns_pass = attempts_dns_pass + 1
attempts_pass = attempts_pass + 1