247 lines
10 KiB
247 lines
10 KiB
#!/usr/bin/env python3
from os import path
from shutil import make_archive
from subprocess import Popen, PIPE, STDOUT
from time import sleep
from typing import Union
import urllib.request
from dns import resolver, rdatatype, rdataclass, rdata, update, query, tsigkeyring, tsig, name
except ModuleNotFoundError:
print('You need to install the dnspython package using:\n'
'pip install dnspython\n'
'Then restart the program')
class NSupdate:
Dynamic DNS update from Python
def __init__(self, server: str, zone: str,
keyname: str, keydata: str, keyalgo: name.Name = tsig.HMAC_SHA512) -> None:
Object constructor
:param server: string with master server containing the domain zone
:param zone: string with name of zone
:param keyname: string with name of the key to update the zone
:param keydata: string with content of the key to update the zone
:param keyalgo: dns.tsig method with algorithm for key
self._server = server
self._domain = zone
self._keyring = tsigkeyring.from_text({keyname: keydata})
self._keyalgo = keyalgo
def rec_add(self, rec_key: str, rec_val: str, rec_ttl: int = 60,
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
Add record to DNS zone
:param rec_key: string with record name
:param rec_val: string with record value
:param rec_ttl: integer with record ttl
:param rec_cls: rdataclass of record
:param rec_typ: rdatatype of record
:return: None, QueryMessage, UpdateMessage, Message after request
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
rec_upd.add(rec_key, rec_ttl, rec_val)
return query.tcp(q=rec_upd, where=self._server)
def rec_del(self, rec_key: str, rec_val: str,
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
Delete record from DNS zone
:param rec_key: string with record name
:param rec_val: string with record value
:param rec_cls: rdataclass of record
:param rec_typ: rdatatype of record
:return: None, QueryMessage, UpdateMessage, Message after request
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
rec_upd.delete(rec_key, rec_val)
return query.tcp(q=rec_upd, where=self._server)
class HEupdate:
Dynamic DNS update on dyn.dns.he.net from Python
def __init__(self, ddnskey: str, server: str = 'https://dyn.dns.he.net/nic/update') -> None:
Object constructor
:param ddnskey: string with content of the key (password) to update the record
:param server: string with server for dynamic update request
self._server = server
self._ddnskey = ddnskey
def rec_add(self, rec_key: str, rec_val: str) -> None:
Change record data from DNS zone
:param rec_key: string with record fqdn
:param rec_val: string with record value
:return: None, Message after request
if rec_key[-1] == '.':
rec_key = rec_key[:-1]
self._data = 'hostname=' + rec_key + '&' + 'password=' + self._ddnskey + '&' + 'txt=' + rec_val
request = urllib.request.Request(url=self._server, data=bytes(self._data.encode('utf-8')), method='POST')
response = urllib.request.urlopen(request).read()
if response.startswith(b'\xff\xd8'):
return response
return str(response.decode('utf-8'))
class ACMEcert:
ACME launcher for DNS-01 challenge from Python
def __init__(self, zone: str, acme_path: str = None, test: bool = False, force: bool = False) -> None:
Object constructor
:param zone: string with name of zone
:param acme_path: alternative path to bin (example: ~/.acme.sh/acme.sh)
:param test: boolean "test" argument for the acme.sh
:param force: boolean "force" argument for the acme.sh
self._domain = zone
if not acme_path:
self._acme_path = path.expanduser("~") + path.sep + '.acme.sh' + path.sep + 'acme.sh'
if not(path.exists(self._acme_path)):
print('You need to specify the path to the acme.sh\n'
'or perform the default installation using:\n'
'curl https://get.acme.sh | sh\n'
'Then restart the program')
self._acme_test = test
self._acme_force = force
self._acme_mode = '--issue'
def _run(cls, proc_args: list) -> list:
Launch acme.sh
:param proc_args: list of arguments for acme.sh
:return: list of strings with raw stdout from acme.sh
with Popen(proc_args, stdout=PIPE, stderr=STDOUT) as proc:
result = []
for line in proc.stdout:
return result
def get(self) -> Union[None, str, list]:
Get certificate or other result after launch acme.sh
:return: None if skip, string with path if issued, list of records if need update dns
acme_args = [self._acme_path, self._acme_mode,
'-d', self._domain, '-d', '*.' + self._domain,
'--dns', '--yes-I-know-dns-manual-mode-enough-go-ahead-please'
if self._acme_test:
if self._acme_force:
acme_skip, acme_sign, acme_auth = b'Skip, Next renewal time is:', b'Your cert is in', b'TXT value:'
result = []
for line in self._run(acme_args):
if acme_skip in line:
return None
if acme_sign in line:
return path.dirname(line.split(acme_sign)[1].decode('utf-8').strip().replace("'", ""))
if acme_auth in line:
result.append(line.split(acme_auth)[1].decode('utf-8').strip().replace("'", ""))
self._acme_mode = '--renew'
return result
if __name__ == "__main__":
from argparse import ArgumentParser
args = ArgumentParser(
description='Update Let\'s Encrypt wildcard certificate with DNS-01 challenge',
epilog='Dependencies: '
'Python 3 (tested version 3.9.5), '
'installed or downloaded acme.sh, '
'installed dnspython package, '
'dns is supported to dynamic update'
args.add_argument('--domain', type=str, required=True,
help='domain for which the wildcard certificate is issued')
args.add_argument('--server', type=str, required=True,
help='master server containing the domain zone')
args.add_argument('--keyname', type=str, required=True,
help='name of the key to update the zone')
args.add_argument('--keydata', type=str, required=True,
help='content of the key to update the zone')
args.add_argument('--acmepath', type=str, required=False, default=None,
help='alternative path to bin (example: ~/.acme.sh/acme.sh)')
args.add_argument('--force', action='store_true', required=False,
help='"force" argument for the acme.sh')
args.add_argument('--test', action='store_true', required=False,
help='"test" argument for the acme.sh')
args = vars(args.parse_args())
rec_fqdn = '_acme-challenge.' + args['domain'] + '.'
cer_force, cer_test = False, False
if args['force']:
cer_force = True
if args['test']:
cer_test = True
cer = ACMEcert(zone=args['domain'], acme_path=args['acmepath'], force=cer_force, test=cer_test)
dns = None
if 'dns.he.net' in args['server']:
dns = HEupdate(ddnskey=args['keydata'])
dns = NSupdate(zone=args['domain'], server=args['server'], keyname=args['keyname'], keydata=args['keydata'])
attempts_pass, attempts_wait = 0, 60
while attempts_pass < 2:
print('Certificate issue request')
acme_result = cer.get()
if acme_result is None:
print('Skip renewal')
elif type(acme_result) is str:
print('You cert in:', acme_result + '.zip')
make_archive(acme_result, 'zip', acme_result)
elif type(acme_result) is list:
print('Start DNS-01 challenge')
for record_value in acme_result:
dns.rec_add(rec_fqdn, record_value)
attempts_dns_left, attempts_dns_pass = 1, 0
while attempts_dns_left > 0 and attempts_dns_pass < 5:
seconds = attempts_dns_pass * attempts_wait
print('Pause', seconds, 'seconds')
dns_result = resolver.resolve(qname=rec_fqdn, rdtype='TXT')
except resolver.NXDOMAIN:
print('Not found', rec_fqdn)
attempts_dns_left = attempts_dns_left + 1
for record_value_from_acme in acme_result:
print('Check', rec_fqdn, 'IN TXT', record_value_from_acme)
found = False
for record_value_from_dns in dns_result:
if record_value_from_acme == str(record_value_from_dns).replace('"', ""):
print('Found', rec_fqdn, 'IN TXT', record_value_from_acme)
found = True
if not found:
attempts_dns_left = attempts_dns_left + 1
attempts_dns_pass = attempts_dns_pass + 1
attempts_pass = attempts_pass + 1