#!/usr/bin/env python3 from os import path from shutil import make_archive from subprocess import Popen, PIPE, STDOUT from time import sleep from typing import Union try: from dns import resolver, rdatatype, rdataclass, rdata, update, query, tsigkeyring, tsig, name except ModuleNotFoundError: print('You need to install the dnspython package using:\n' 'pip install dnspython\n' 'Then restart the program') exit(1) class NSupdate: """ Dynamic DNS update from Python """ def __init__(self, server: str, zone: str, keyname: str, keydata: str, keyalgo: name.Name = tsig.HMAC_SHA512) -> None: """ Object constructor :param server: string with master server containing the domain zone :param zone: string with name of zone :param keyname: string with name of the key to update the zone :param keydata: string with content of the key to update the zone :param keyalgo: dns.tsig method with algorithm for key """ self._server = server self._domain = zone self._keyring = tsigkeyring.from_text({keyname: keydata}) self._keyalgo = keyalgo def rec_add(self, rec_key: str, rec_val: str, rec_ttl: int = 60, rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None: """ Add record to DNS zone :param rec_key: string with record name :param rec_val: string with record value :param rec_ttl: integer with record ttl :param rec_cls: rdataclass of record :param rec_typ: rdatatype of record :return: None, QueryMessage, UpdateMessage, Message after request """ rec_val = rdata.from_text(rec_cls, rec_typ, rec_val) rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo) rec_upd.add(rec_key, rec_ttl, rec_val) return query.tcp(q=rec_upd, where=self._server) def rec_del(self, rec_key: str, rec_val: str, rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None: """ Delete record from DNS zone :param rec_key: string with record name :param rec_val: string with record value :param rec_cls: rdataclass of record :param rec_typ: rdatatype of record :return: None, QueryMessage, UpdateMessage, Message after request """ rec_val = rdata.from_text(rec_cls, rec_typ, rec_val) rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo) rec_upd.delete(rec_key, rec_val) return query.tcp(q=rec_upd, where=self._server) class ACMEcert: """ ACME launcher for DNS-01 challenge from Python """ def __init__(self, zone: str, acme_path: str = None, test: bool = False, force: bool = False) -> None: """ Object constructor :param zone: string with name of zone :param acme_path: alternative path to bin (example: ~/.acme.sh/acme.sh) :param test: boolean "test" argument for the acme.sh :param force: boolean "force" argument for the acme.sh """ self._domain = zone if not acme_path: self._acme_path = path.expanduser("~") + path.sep + '.acme.sh' + path.sep + 'acme.sh' if not(path.exists(self._acme_path)): print('You need to specify the path to the acme.sh\n' 'or perform the default installation using:\n' 'curl https://get.acme.sh | sh\n' 'Then restart the program') exit(1) self._acme_test = test self._acme_force = force self._acme_mode = '--issue' @classmethod def _run(cls, proc_args: list) -> list: """ Launch acme.sh :param proc_args: list of arguments for acme.sh :return: list of strings with raw stdout from acme.sh """ with Popen(proc_args, stdout=PIPE, stderr=STDOUT) as proc: result = [] for line in proc.stdout: result.append(line) return result def get(self) -> Union[None, str, list]: """ Get certificate or other result after launch acme.sh :return: None if skip, string with path if issued, list of records if need update dns """ acme_args = [self._acme_path, self._acme_mode, '-d', self._domain, '-d', '*.' + self._domain, '--dns', '--yes-I-know-dns-manual-mode-enough-go-ahead-please' ] if self._acme_test: acme_args.append('--test') if self._acme_force: acme_args.append('--force') acme_skip, acme_sign, acme_auth = b'Skip, Next renewal time is:', b'Your cert is in', b'TXT value:' result = [] for line in self._run(acme_args): if acme_skip in line: return None if acme_sign in line: return path.dirname(line.split(acme_sign)[1].decode('utf-8').strip().replace("'", "")) if acme_auth in line: result.append(line.split(acme_auth)[1].decode('utf-8').strip().replace("'", "")) self._acme_mode = '--renew' return result if __name__ == "__main__": from argparse import ArgumentParser args = ArgumentParser( prog='SimpleWC', description='Update Let\'s Encrypt wildcard certificate with DNS-01 challenge', epilog='Dependencies: ' 'Python 3 (tested version 3.9.5), ' 'installed or downloaded acme.sh, ' 'installed dnspython package, ' 'dns is supported to dynamic update' ) args.add_argument('--domain', type=str, required=True, help='domain for which the wildcard certificate is issued') args.add_argument('--server', type=str, required=True, help='master server containing the domain zone') args.add_argument('--keyname', type=str, required=True, help='name of the key to update the zone') args.add_argument('--keydata', type=str, required=True, help='content of the key to update the zone') args.add_argument('--acmepath', type=str, required=False, default=None, help='alternative path to bin (example: ~/.acme.sh/acme.sh)') args.add_argument('--force', action='store_true', required=False, help='"force" argument for the acme.sh') args.add_argument('--test', action='store_true', required=False, help='"test" argument for the acme.sh') args = vars(args.parse_args()) rec_fqdn = '_acme-challenge.' + args['domain'] + '.' cer_force, cer_test = False, False if args['force']: cer_force = True if args['test']: cer_test = True cer = ACMEcert(zone=args['domain'], acme_path=args['acmepath'], force=cer_force, test=cer_test) dns = NSupdate(zone=args['domain'], server=args['server'], keyname=args['keyname'], keydata=args['keydata']) attempts_pass, attempts_wait = 0, 60 while attempts_pass < 2: print('Certificate issue request') acme_result = cer.get() if acme_result is None: print('Skip renewal') break elif type(acme_result) is str: print('You cert in:', acme_result + '.zip') make_archive(acme_result, 'zip', acme_result) break elif type(acme_result) is list: print('Start DNS-01 challenge') for record_value in acme_result: dns.rec_add(rec_fqdn, record_value) attempts_dns_left, attempts_dns_pass = 1, 0 while attempts_dns_left > 0 and attempts_dns_pass < 5: seconds = attempts_dns_pass * attempts_wait print('Pause', seconds, 'seconds') sleep(seconds) try: dns_result = resolver.resolve(qname=rec_fqdn, rdtype='TXT') except resolver.NXDOMAIN: print('Not found', rec_fqdn) attempts_dns_left = attempts_dns_left + 1 else: for record_value_from_acme in acme_result: print('Check', rec_fqdn, 'IN TXT', record_value_from_acme) found = False for record_value_from_dns in dns_result: if record_value_from_acme == str(record_value_from_dns).replace('"', ""): print('Found', rec_fqdn, 'IN TXT', record_value_from_acme) found = True break if not found: attempts_dns_left = attempts_dns_left + 1 break attempts_dns_pass = attempts_dns_pass + 1 attempts_pass = attempts_pass + 1