247 lines
10 KiB
Python
247 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
|
|
|
|
from os import path
|
|
from shutil import make_archive
|
|
from subprocess import Popen, PIPE, STDOUT
|
|
from time import sleep
|
|
from typing import Union
|
|
import urllib.request
|
|
try:
|
|
from dns import resolver, rdatatype, rdataclass, rdata, update, query, tsigkeyring, tsig, name
|
|
except ModuleNotFoundError:
|
|
print('You need to install the dnspython package using:\n'
|
|
'pip install dnspython\n'
|
|
'Then restart the program')
|
|
exit(1)
|
|
|
|
|
|
class NSupdate:
|
|
"""
|
|
Dynamic DNS update from Python
|
|
"""
|
|
def __init__(self, server: str, zone: str,
|
|
keyname: str, keydata: str, keyalgo: name.Name = tsig.HMAC_SHA512) -> None:
|
|
"""
|
|
Object constructor
|
|
:param server: string with master server containing the domain zone
|
|
:param zone: string with name of zone
|
|
:param keyname: string with name of the key to update the zone
|
|
:param keydata: string with content of the key to update the zone
|
|
:param keyalgo: dns.tsig method with algorithm for key
|
|
"""
|
|
self._server = server
|
|
self._domain = zone
|
|
self._keyring = tsigkeyring.from_text({keyname: keydata})
|
|
self._keyalgo = keyalgo
|
|
|
|
def rec_add(self, rec_key: str, rec_val: str, rec_ttl: int = 60,
|
|
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
|
|
"""
|
|
Add record to DNS zone
|
|
:param rec_key: string with record name
|
|
:param rec_val: string with record value
|
|
:param rec_ttl: integer with record ttl
|
|
:param rec_cls: rdataclass of record
|
|
:param rec_typ: rdatatype of record
|
|
:return: None, QueryMessage, UpdateMessage, Message after request
|
|
"""
|
|
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
|
|
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
|
|
rec_upd.add(rec_key, rec_ttl, rec_val)
|
|
return query.tcp(q=rec_upd, where=self._server)
|
|
|
|
def rec_del(self, rec_key: str, rec_val: str,
|
|
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
|
|
"""
|
|
Delete record from DNS zone
|
|
:param rec_key: string with record name
|
|
:param rec_val: string with record value
|
|
:param rec_cls: rdataclass of record
|
|
:param rec_typ: rdatatype of record
|
|
:return: None, QueryMessage, UpdateMessage, Message after request
|
|
"""
|
|
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
|
|
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
|
|
rec_upd.delete(rec_key, rec_val)
|
|
return query.tcp(q=rec_upd, where=self._server)
|
|
|
|
|
|
class HEupdate:
|
|
"""
|
|
Dynamic DNS update on dyn.dns.he.net from Python
|
|
"""
|
|
def __init__(self, ddnskey: str, server: str = 'https://dyn.dns.he.net/nic/update') -> None:
|
|
"""
|
|
Object constructor
|
|
:param ddnskey: string with content of the key (password) to update the record
|
|
:param server: string with server for dynamic update request
|
|
"""
|
|
self._server = server
|
|
self._ddnskey = ddnskey
|
|
|
|
def rec_add(self, rec_key: str, rec_val: str) -> None:
|
|
"""
|
|
Change record data from DNS zone
|
|
:param rec_key: string with record fqdn
|
|
:param rec_val: string with record value
|
|
:return: None, Message after request
|
|
"""
|
|
if rec_key[-1] == '.':
|
|
rec_key = rec_key[:-1]
|
|
self._data = 'hostname=' + rec_key + '&' + 'password=' + self._ddnskey + '&' + 'txt=' + rec_val
|
|
request = urllib.request.Request(url=self._server, data=bytes(self._data.encode('utf-8')), method='POST')
|
|
response = urllib.request.urlopen(request).read()
|
|
if response.startswith(b'\xff\xd8'):
|
|
return response
|
|
else:
|
|
return str(response.decode('utf-8'))
|
|
|
|
|
|
class ACMEcert:
|
|
"""
|
|
ACME launcher for DNS-01 challenge from Python
|
|
"""
|
|
def __init__(self, zone: str, acme_path: str = None, test: bool = False, force: bool = False) -> None:
|
|
"""
|
|
Object constructor
|
|
:param zone: string with name of zone
|
|
:param acme_path: alternative path to bin (example: ~/.acme.sh/acme.sh)
|
|
:param test: boolean "test" argument for the acme.sh
|
|
:param force: boolean "force" argument for the acme.sh
|
|
"""
|
|
self._domain = zone
|
|
if not acme_path:
|
|
self._acme_path = path.expanduser("~") + path.sep + '.acme.sh' + path.sep + 'acme.sh'
|
|
if not(path.exists(self._acme_path)):
|
|
print('You need to specify the path to the acme.sh\n'
|
|
'or perform the default installation using:\n'
|
|
'curl https://get.acme.sh | sh\n'
|
|
'Then restart the program')
|
|
exit(1)
|
|
self._acme_test = test
|
|
self._acme_force = force
|
|
self._acme_mode = '--issue'
|
|
|
|
@classmethod
|
|
def _run(cls, proc_args: list) -> list:
|
|
"""
|
|
Launch acme.sh
|
|
:param proc_args: list of arguments for acme.sh
|
|
:return: list of strings with raw stdout from acme.sh
|
|
"""
|
|
with Popen(proc_args, stdout=PIPE, stderr=STDOUT) as proc:
|
|
result = []
|
|
for line in proc.stdout:
|
|
result.append(line)
|
|
return result
|
|
|
|
def get(self) -> Union[None, str, list]:
|
|
"""
|
|
Get certificate or other result after launch acme.sh
|
|
:return: None if skip, string with path if issued, list of records if need update dns
|
|
"""
|
|
acme_args = [self._acme_path, self._acme_mode,
|
|
'-d', self._domain, '-d', '*.' + self._domain,
|
|
'--dns', '--yes-I-know-dns-manual-mode-enough-go-ahead-please'
|
|
]
|
|
if self._acme_test:
|
|
acme_args.append('--test')
|
|
if self._acme_force:
|
|
acme_args.append('--force')
|
|
acme_skip, acme_sign, acme_auth = b'Skip, Next renewal time is:', b'Your cert is in', b'TXT value:'
|
|
result = []
|
|
for line in self._run(acme_args):
|
|
if acme_skip in line:
|
|
return None
|
|
if acme_sign in line:
|
|
return path.dirname(line.split(acme_sign)[1].decode('utf-8').strip().replace("'", ""))
|
|
if acme_auth in line:
|
|
result.append(line.split(acme_auth)[1].decode('utf-8').strip().replace("'", ""))
|
|
self._acme_mode = '--renew'
|
|
return result
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from argparse import ArgumentParser
|
|
|
|
args = ArgumentParser(
|
|
prog='SimpleWC',
|
|
description='Update Let\'s Encrypt wildcard certificate with DNS-01 challenge',
|
|
epilog='Dependencies: '
|
|
'Python 3 (tested version 3.9.5), '
|
|
'installed or downloaded acme.sh, '
|
|
'installed dnspython package, '
|
|
'dns is supported to dynamic update'
|
|
)
|
|
args.add_argument('--domain', type=str, required=True,
|
|
help='domain for which the wildcard certificate is issued')
|
|
args.add_argument('--server', type=str, required=True,
|
|
help='master server containing the domain zone')
|
|
args.add_argument('--keyname', type=str, required=True,
|
|
help='name of the key to update the zone')
|
|
args.add_argument('--keydata', type=str, required=True,
|
|
help='content of the key to update the zone')
|
|
args.add_argument('--acmepath', type=str, required=False, default=None,
|
|
help='alternative path to bin (example: ~/.acme.sh/acme.sh)')
|
|
args.add_argument('--force', action='store_true', required=False,
|
|
help='"force" argument for the acme.sh')
|
|
args.add_argument('--test', action='store_true', required=False,
|
|
help='"test" argument for the acme.sh')
|
|
args = vars(args.parse_args())
|
|
|
|
rec_fqdn = '_acme-challenge.' + args['domain'] + '.'
|
|
cer_force, cer_test = False, False
|
|
if args['force']:
|
|
cer_force = True
|
|
if args['test']:
|
|
cer_test = True
|
|
|
|
cer = ACMEcert(zone=args['domain'], acme_path=args['acmepath'], force=cer_force, test=cer_test)
|
|
dns = None
|
|
if 'dns.he.net' in args['server']:
|
|
dns = HEupdate(ddnskey=args['keydata'])
|
|
else:
|
|
dns = NSupdate(zone=args['domain'], server=args['server'], keyname=args['keyname'], keydata=args['keydata'])
|
|
|
|
attempts_pass, attempts_wait = 0, 60
|
|
while attempts_pass < 2:
|
|
print('Certificate issue request')
|
|
acme_result = cer.get()
|
|
if acme_result is None:
|
|
print('Skip renewal')
|
|
break
|
|
elif type(acme_result) is str:
|
|
print('You cert in:', acme_result + '.zip')
|
|
make_archive(acme_result, 'zip', acme_result)
|
|
break
|
|
elif type(acme_result) is list:
|
|
print('Start DNS-01 challenge')
|
|
for record_value in acme_result:
|
|
dns.rec_add(rec_fqdn, record_value)
|
|
|
|
attempts_dns_left, attempts_dns_pass = 1, 0
|
|
while attempts_dns_left > 0 and attempts_dns_pass < 5:
|
|
seconds = attempts_dns_pass * attempts_wait
|
|
print('Pause', seconds, 'seconds')
|
|
sleep(seconds)
|
|
try:
|
|
dns_result = resolver.resolve(qname=rec_fqdn, rdtype='TXT')
|
|
except resolver.NXDOMAIN:
|
|
print('Not found', rec_fqdn)
|
|
attempts_dns_left = attempts_dns_left + 1
|
|
else:
|
|
for record_value_from_acme in acme_result:
|
|
print('Check', rec_fqdn, 'IN TXT', record_value_from_acme)
|
|
found = False
|
|
for record_value_from_dns in dns_result:
|
|
if record_value_from_acme == str(record_value_from_dns).replace('"', ""):
|
|
print('Found', rec_fqdn, 'IN TXT', record_value_from_acme)
|
|
found = True
|
|
break
|
|
if not found:
|
|
attempts_dns_left = attempts_dns_left + 1
|
|
break
|
|
attempts_dns_pass = attempts_dns_pass + 1
|
|
attempts_pass = attempts_pass + 1
|