247 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			247 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
 | 
						|
 | 
						|
from os import path
 | 
						|
from shutil import make_archive
 | 
						|
from subprocess import Popen, PIPE, STDOUT
 | 
						|
from time import sleep
 | 
						|
from typing import Union
 | 
						|
import urllib.request
 | 
						|
try:
 | 
						|
    from dns import resolver, rdatatype, rdataclass, rdata, update, query, tsigkeyring, tsig, name
 | 
						|
except ModuleNotFoundError:
 | 
						|
    print('You need to install the dnspython package using:\n'
 | 
						|
          'pip install dnspython\n'
 | 
						|
          'Then restart the program')
 | 
						|
    exit(1)
 | 
						|
 | 
						|
 | 
						|
class NSupdate:
 | 
						|
    """
 | 
						|
    Dynamic DNS update from Python
 | 
						|
    """
 | 
						|
    def __init__(self, server: str, zone: str,
 | 
						|
                 keyname: str, keydata: str, keyalgo: name.Name = tsig.HMAC_SHA512) -> None:
 | 
						|
        """
 | 
						|
        Object constructor
 | 
						|
        :param server: string with master server containing the domain zone
 | 
						|
        :param zone: string with name of zone
 | 
						|
        :param keyname: string with name of the key to update the zone
 | 
						|
        :param keydata: string with content of the key to update the zone
 | 
						|
        :param keyalgo: dns.tsig method with algorithm for key
 | 
						|
        """
 | 
						|
        self._server = server
 | 
						|
        self._domain = zone
 | 
						|
        self._keyring = tsigkeyring.from_text({keyname: keydata})
 | 
						|
        self._keyalgo = keyalgo
 | 
						|
 | 
						|
    def rec_add(self, rec_key: str, rec_val: str, rec_ttl: int = 60,
 | 
						|
                rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
 | 
						|
        """
 | 
						|
        Add record to DNS zone
 | 
						|
        :param rec_key: string with record name
 | 
						|
        :param rec_val: string with record value
 | 
						|
        :param rec_ttl: integer with record ttl
 | 
						|
        :param rec_cls: rdataclass of record
 | 
						|
        :param rec_typ: rdatatype of record
 | 
						|
        :return: None, QueryMessage, UpdateMessage, Message after request
 | 
						|
        """
 | 
						|
        rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
 | 
						|
        rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
 | 
						|
        rec_upd.add(rec_key, rec_ttl, rec_val)
 | 
						|
        return query.tcp(q=rec_upd, where=self._server)
 | 
						|
 | 
						|
    def rec_del(self, rec_key: str, rec_val: str,
 | 
						|
                rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
 | 
						|
        """
 | 
						|
        Delete record from DNS zone
 | 
						|
        :param rec_key: string with record name
 | 
						|
        :param rec_val: string with record value
 | 
						|
        :param rec_cls: rdataclass of record
 | 
						|
        :param rec_typ: rdatatype of record
 | 
						|
        :return: None, QueryMessage, UpdateMessage, Message after request
 | 
						|
        """
 | 
						|
        rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
 | 
						|
        rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
 | 
						|
        rec_upd.delete(rec_key, rec_val)
 | 
						|
        return query.tcp(q=rec_upd, where=self._server)
 | 
						|
 | 
						|
 | 
						|
class HEupdate:
 | 
						|
    """
 | 
						|
    Dynamic DNS update on dyn.dns.he.net from Python
 | 
						|
    """
 | 
						|
    def __init__(self, ddnskey: str, server: str = 'https://dyn.dns.he.net/nic/update') -> None:
 | 
						|
        """
 | 
						|
        Object constructor
 | 
						|
        :param ddnskey: string with content of the key (password) to update the record
 | 
						|
        :param server: string with server for dynamic update request        
 | 
						|
        """
 | 
						|
        self._server = server
 | 
						|
        self._ddnskey = ddnskey
 | 
						|
 | 
						|
    def rec_add(self, rec_key: str, rec_val: str) -> None:
 | 
						|
        """
 | 
						|
        Change record data from DNS zone
 | 
						|
        :param rec_key: string with record fqdn
 | 
						|
        :param rec_val: string with record value
 | 
						|
        :return: None, Message after request
 | 
						|
        """
 | 
						|
        if rec_key[-1] == '.':
 | 
						|
            rec_key = rec_key[:-1]
 | 
						|
        self._data = 'hostname=' + rec_key + '&' + 'password=' + self._ddnskey + '&' + 'txt=' + rec_val
 | 
						|
        request = urllib.request.Request(url=self._server, data=bytes(self._data.encode('utf-8')), method='POST')
 | 
						|
        response = urllib.request.urlopen(request).read()
 | 
						|
        if response.startswith(b'\xff\xd8'):
 | 
						|
            return response
 | 
						|
        else:
 | 
						|
            return str(response.decode('utf-8'))
 | 
						|
 | 
						|
 | 
						|
class ACMEcert:
 | 
						|
    """
 | 
						|
    ACME launcher for DNS-01 challenge from Python
 | 
						|
    """
 | 
						|
    def __init__(self, zone: str, acme_path: str = None, test: bool = False, force: bool = False) -> None:
 | 
						|
        """
 | 
						|
        Object constructor
 | 
						|
        :param zone: string with name of zone
 | 
						|
        :param acme_path: alternative path to bin (example: ~/.acme.sh/acme.sh)
 | 
						|
        :param test: boolean "test" argument for the acme.sh
 | 
						|
        :param force: boolean "force" argument for the acme.sh
 | 
						|
        """
 | 
						|
        self._domain = zone
 | 
						|
        if not acme_path:
 | 
						|
            self._acme_path = path.expanduser("~") + path.sep + '.acme.sh' + path.sep + 'acme.sh'
 | 
						|
        if not(path.exists(self._acme_path)):
 | 
						|
            print('You need to specify the path to the acme.sh\n'
 | 
						|
                  'or perform the default installation using:\n'
 | 
						|
                  'curl https://get.acme.sh | sh\n'
 | 
						|
                  'Then restart the program')
 | 
						|
            exit(1)
 | 
						|
        self._acme_test = test
 | 
						|
        self._acme_force = force
 | 
						|
        self._acme_mode = '--issue'
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def _run(cls, proc_args: list) -> list:
 | 
						|
        """
 | 
						|
        Launch acme.sh
 | 
						|
        :param proc_args: list of arguments for acme.sh
 | 
						|
        :return: list of strings with raw stdout from acme.sh
 | 
						|
        """
 | 
						|
        with Popen(proc_args, stdout=PIPE, stderr=STDOUT) as proc:
 | 
						|
            result = []
 | 
						|
            for line in proc.stdout:
 | 
						|
                result.append(line)
 | 
						|
            return result
 | 
						|
 | 
						|
    def get(self) -> Union[None, str, list]:
 | 
						|
        """
 | 
						|
        Get certificate or other result after launch acme.sh
 | 
						|
        :return: None if skip, string with path if issued, list of records if need update dns
 | 
						|
        """
 | 
						|
        acme_args = [self._acme_path, self._acme_mode,
 | 
						|
                     '-d', self._domain, '-d', '*.' + self._domain,
 | 
						|
                     '--dns', '--yes-I-know-dns-manual-mode-enough-go-ahead-please'
 | 
						|
                     ]
 | 
						|
        if self._acme_test:
 | 
						|
            acme_args.append('--test')
 | 
						|
        if self._acme_force:
 | 
						|
            acme_args.append('--force')
 | 
						|
        acme_skip, acme_sign, acme_auth = b'Skip, Next renewal time is:', b'Your cert is in', b'TXT value:'
 | 
						|
        result = []
 | 
						|
        for line in self._run(acme_args):
 | 
						|
            if acme_skip in line:
 | 
						|
                return None
 | 
						|
            if acme_sign in line:
 | 
						|
                return path.dirname(line.split(acme_sign)[1].decode('utf-8').strip().replace("'", ""))
 | 
						|
            if acme_auth in line:
 | 
						|
                result.append(line.split(acme_auth)[1].decode('utf-8').strip().replace("'", ""))
 | 
						|
        self._acme_mode = '--renew'
 | 
						|
        return result
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    from argparse import ArgumentParser
 | 
						|
 | 
						|
    args = ArgumentParser(
 | 
						|
        prog='SimpleWC',
 | 
						|
        description='Update Let\'s Encrypt wildcard certificate with DNS-01 challenge',
 | 
						|
        epilog='Dependencies: '
 | 
						|
               'Python 3 (tested version 3.9.5), '
 | 
						|
               'installed or downloaded acme.sh, '
 | 
						|
               'installed dnspython package, '
 | 
						|
               'dns is supported to dynamic update'
 | 
						|
    )
 | 
						|
    args.add_argument('--domain', type=str, required=True,
 | 
						|
                      help='domain for which the wildcard certificate is issued')
 | 
						|
    args.add_argument('--server', type=str, required=True,
 | 
						|
                      help='master server containing the domain zone')
 | 
						|
    args.add_argument('--keyname', type=str, required=True,
 | 
						|
                      help='name of the key to update the zone')
 | 
						|
    args.add_argument('--keydata', type=str, required=True,
 | 
						|
                      help='content of the key to update the zone')
 | 
						|
    args.add_argument('--acmepath', type=str, required=False, default=None,
 | 
						|
                      help='alternative path to bin (example: ~/.acme.sh/acme.sh)')
 | 
						|
    args.add_argument('--force', action='store_true', required=False,
 | 
						|
                      help='"force" argument for the acme.sh')
 | 
						|
    args.add_argument('--test', action='store_true', required=False,
 | 
						|
                      help='"test" argument for the acme.sh')
 | 
						|
    args = vars(args.parse_args())
 | 
						|
 | 
						|
    rec_fqdn = '_acme-challenge.' + args['domain'] + '.'
 | 
						|
    cer_force, cer_test = False, False
 | 
						|
    if args['force']:
 | 
						|
        cer_force = True
 | 
						|
    if args['test']:
 | 
						|
        cer_test = True
 | 
						|
 | 
						|
    cer = ACMEcert(zone=args['domain'], acme_path=args['acmepath'], force=cer_force, test=cer_test)
 | 
						|
    dns = None
 | 
						|
    if 'dns.he.net' in args['server']:
 | 
						|
        dns = HEupdate(ddnskey=args['keydata'])
 | 
						|
    else:
 | 
						|
        dns = NSupdate(zone=args['domain'], server=args['server'], keyname=args['keyname'], keydata=args['keydata'])
 | 
						|
 | 
						|
    attempts_pass, attempts_wait = 0, 60
 | 
						|
    while attempts_pass < 2:
 | 
						|
        print('Certificate issue request')
 | 
						|
        acme_result = cer.get()
 | 
						|
        if acme_result is None:
 | 
						|
            print('Skip renewal')
 | 
						|
            break
 | 
						|
        elif type(acme_result) is str:
 | 
						|
            print('You cert in:', acme_result + '.zip')
 | 
						|
            make_archive(acme_result, 'zip', acme_result)
 | 
						|
            break
 | 
						|
        elif type(acme_result) is list:
 | 
						|
            print('Start DNS-01 challenge')
 | 
						|
            for record_value in acme_result:
 | 
						|
                dns.rec_add(rec_fqdn, record_value)
 | 
						|
 | 
						|
            attempts_dns_left, attempts_dns_pass = 1, 0
 | 
						|
            while attempts_dns_left > 0 and attempts_dns_pass < 5:
 | 
						|
                seconds = attempts_dns_pass * attempts_wait
 | 
						|
                print('Pause', seconds, 'seconds')
 | 
						|
                sleep(seconds)
 | 
						|
                try:
 | 
						|
                    dns_result = resolver.resolve(qname=rec_fqdn, rdtype='TXT')
 | 
						|
                except resolver.NXDOMAIN:
 | 
						|
                    print('Not found', rec_fqdn)
 | 
						|
                    attempts_dns_left = attempts_dns_left + 1
 | 
						|
                else:
 | 
						|
                    for record_value_from_acme in acme_result:
 | 
						|
                        print('Check', rec_fqdn, 'IN TXT', record_value_from_acme)
 | 
						|
                        found = False
 | 
						|
                        for record_value_from_dns in dns_result:
 | 
						|
                            if record_value_from_acme == str(record_value_from_dns).replace('"', ""):
 | 
						|
                                print('Found', rec_fqdn, 'IN TXT', record_value_from_acme)
 | 
						|
                                found = True
 | 
						|
                                break
 | 
						|
                        if not found:
 | 
						|
                            attempts_dns_left = attempts_dns_left + 1
 | 
						|
                            break
 | 
						|
                attempts_dns_pass = attempts_dns_pass + 1
 | 
						|
        attempts_pass = attempts_pass + 1
 |