From 83e71ba1402e0e0508c5d397ae521238ba058d72 Mon Sep 17 00:00:00 2001 From: PavelMuhortov Date: Sat, 10 Jul 2021 11:06:54 +0300 Subject: [PATCH] add simplewc.py --- README.md | 34 ++++++++- simplewc.py | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 249 insertions(+), 1 deletion(-) create mode 100644 simplewc.py diff --git a/README.md b/README.md index c861a49..8f89042 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ Small tools needed to solve immediate tasks independently or as part of a projec * [`ffmpeger`.py](https://git.hmp.today/pavel.muhortov/utils#ffmpeger-py) * [`procutil`.py](https://git.hmp.today/pavel.muhortov/utils#procutil-py) * [`sendmail`.py](https://git.hmp.today/pavel.muhortov/utils#sendmail-py) +* [`simplewc`.py](https://git.hmp.today/pavel.muhortov/utils#simplewc-py) ____ ## `camsutil` **Description:** Creation of a request to the camera API based on the prepared template @@ -123,7 +124,7 @@ if path.exists(conf): ____ ## `ffmpeger`.py **Description:** FFmpeg management from Python -**Dependencies:** Python 3 (tested version 3.9.5), installed or downloaded ffmpeg, [procutil.py](https://git.hmp.today/pavel.muhortov/utils#procutil-py) in the same directory +**Dependencies:** Python 3 (tested version 3.9.5), installed or downloaded [ffmpeg](https://ffmpeg.org/download.html), [procutil.py](https://git.hmp.today/pavel.muhortov/utils#procutil-py) in the same directory | PARAMETERS | DESCRIPTION | DEFAULT| |-------------|-------------|--------| @@ -139,6 +140,7 @@ ____ Example usage in cron with Python: ```shell +# at every minute * * * * * /usr/bin/python3 ~/ffmpeger.py -s rtsp://user:pass@host:554/Streaming/Channels/101 --dst rtmp://a.rtmp.youtube.com/live2/YOUKEY --mono --watchdog --sec 30 >> /dev/null 2>&1 ``` Example usage in terminal with make the script executable: @@ -224,3 +226,33 @@ print(log) ``` ____ +## `simplewc`.py +**Description:** Update Let's Encrypt wildcard certificate with DNS-01 challenge +**Dependencies:** Python 3 (tested version 3.9.5), +installed or downloaded [acme.sh](https://github.com/Neilpang/acme.sh), +installed [dnspython](https://github.com/rthalley/dnspython) package, +dns is supported to [dynamic update](https://en.wikipedia.org/wiki/Dynamic_DNS) + +| PARAMETERS | DESCRIPTION | DEFAULT| +|-------------|-------------|--------| +|**--domain**|domain for which the wildcard certificate is issued|**REQUIRED**| +|**--server**|master server containing the domain zone|**REQUIRED**| +|**--keyname**|name of the key to update the zone|**REQUIRED**| +|**--keydata**|content of the key to update the zone|**REQUIRED**| +|**[-h]**|print help and exit|| +|**[--acmepath]**|alternative path to bin (example: ~/.acme.sh/acme.sh)|`None`| +|**[--force]**|"force" argument for the acme.sh|`False`| +|**[--test]**|"test" argument for the acme.sh|`False`| + +Example usage in cron with Python: +```shell +# at 00:00 on Monday +0 0 * * 1 /usr/bin/python3 ~/simplewc.py --domain EXAMPLE.COM --server 8.8.8.8 --keyname KEY --keydata YOU_KEY_CONTENT > /dev/null +``` +Example usage in terminal with make the script executable: +```shell +chmod u+x ./simplewc.py +./simplewc.py --domain EXAMPLE.COM --server 8.8.8.8 --keyname KEY --keydata YOU_KEY_CONTENT --test --force +``` + +____ diff --git a/simplewc.py b/simplewc.py new file mode 100644 index 0000000..1f94354 --- /dev/null +++ b/simplewc.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 + + +from os import path +from shutil import make_archive +from subprocess import Popen, PIPE, STDOUT +from time import sleep +from typing import Union +try: + from dns import resolver, rdatatype, rdataclass, rdata, update, query, tsigkeyring, tsig, name +except ModuleNotFoundError: + print('You need to install the dnspython package using:\n' + 'pip install dnspython\n' + 'Then restart the program') + exit(1) +try: + from sendmail import Mail, str2bool +except ModuleNotFoundError: + print('You need sendmail.py in the same directory.\n' + 'Then restart the program') + exit(1) + + +class NSupdate: + """ + Dynamic DNS update from Python + """ + def __init__(self, server: str, zone: str, + keyname: str, keydata: str, keyalgo: name.Name = tsig.HMAC_SHA512) -> None: + """ + Object constructor + :param server: string with master server containing the domain zone + :param zone: string with name of zone + :param keyname: string with name of the key to update the zone + :param keydata: string with content of the key to update the zone + :param keyalgo: dns.tsig method with algorithm for key + """ + self._server = server + self._domain = zone + self._keyring = tsigkeyring.from_text({keyname: keydata}) + self._keyalgo = keyalgo + + def rec_add(self, rec_key: str, rec_val: str, rec_ttl: int = 60, + rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None: + """ + Add record to DNS zone + :param rec_key: string with record name + :param rec_val: string with record value + :param rec_ttl: integer with record ttl + :param rec_cls: rdataclass of record + :param rec_typ: rdatatype of record + :return: None, QueryMessage, UpdateMessage, Message after request + """ + rec_val = rdata.from_text(rec_cls, rec_typ, rec_val) + rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo) + rec_upd.add(rec_key, rec_ttl, rec_val) + return query.tcp(q=rec_upd, where=self._server) + + def rec_del(self, rec_key: str, rec_val: str, + rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None: + """ + Delete record from DNS zone + :param rec_key: string with record name + :param rec_val: string with record value + :param rec_cls: rdataclass of record + :param rec_typ: rdatatype of record + :return: None, QueryMessage, UpdateMessage, Message after request + """ + rec_val = rdata.from_text(rec_cls, rec_typ, rec_val) + rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo) + rec_upd.delete(rec_key, rec_val) + return query.tcp(q=rec_upd, where=self._server) + + +class ACMEcert: + """ + ACME launcher for DNS-01 challenge from Python + """ + def __init__(self, zone: str, acme_path: str = None, test: bool = False, force: bool = False) -> None: + """ + Object constructor + :param zone: string with name of zone + :param acme_path: alternative path to bin (example: ~/.acme.sh/acme.sh) + :param test: boolean "test" argument for the acme.sh + :param force: boolean "force" argument for the acme.sh + """ + self._domain = zone + if not acme_path: + self._acme_path = path.expanduser("~") + path.sep + '.acme.sh' + path.sep + 'acme.sh' + if not(path.exists(self._acme_path)): + print('You need to specify the path to the acme.sh\n' + 'or perform the default installation using:\n' + 'curl https://get.acme.sh | sh\n' + 'Then restart the program') + exit(1) + self._acme_test = test + self._acme_force = force + self._acme_mode = '--issue' + + @classmethod + def _run(cls, proc_args: list) -> list: + """ + Launch acme.sh + :param proc_args: list of arguments for acme.sh + :return: list of strings with raw stdout from acme.sh + """ + with Popen(proc_args, stdout=PIPE, stderr=STDOUT) as proc: + result = [] + for line in proc.stdout: + result.append(line) + return result + + def get(self) -> Union[None, str, list]: + """ + Get certificate or other result after launch acme.sh + :return: None if skip, string with path if issued, list of records if need update dns + """ + acme_args = [self._acme_path, self._acme_mode, + '-d', self._domain, '-d', '*.' + self._domain, + '--dns', '--yes-I-know-dns-manual-mode-enough-go-ahead-please' + ] + if self._acme_test: + acme_args.append('--test') + if self._acme_force: + acme_args.append('--force') + acme_skip, acme_sign, acme_auth = b'Skip, Next renewal time is:', b'Your cert is in', b'TXT value:' + result = [] + for line in self._run(acme_args): + if acme_skip in line: + return None + if acme_sign in line: + return path.dirname(line.split(acme_sign)[1].decode('utf-8').strip().replace("'", "")) + if acme_auth in line: + result.append(line.split(acme_auth)[1].decode('utf-8').strip().replace("'", "")) + self._acme_mode = '--renew' + return result + + +if __name__ == "__main__": + from argparse import ArgumentParser + + args = ArgumentParser( + prog='SimpleWC', + description='Update Let\'s Encrypt wildcard certificate with DNS-01 challenge', + epilog='Dependencies: ' + 'Python 3 (tested version 3.9.5), ' + 'installed or downloaded acme.sh, ' + 'installed dnspython package, ' + 'dns is supported to dynamic update' + ) + args.add_argument('--domain', type=str, required=True, + help='domain for which the wildcard certificate is issued') + args.add_argument('--server', type=str, required=True, + help='master server containing the domain zone') + args.add_argument('--keyname', type=str, required=True, + help='name of the key to update the zone') + args.add_argument('--keydata', type=str, required=True, + help='content of the key to update the zone') + args.add_argument('--acmepath', type=str, required=False, default=None, + help='alternative path to bin (example: ~/.acme.sh/acme.sh)') + args.add_argument('--force', action='store_true', required=False, + help='"force" argument for the acme.sh') + args.add_argument('--test', action='store_true', required=False, + help='"test" argument for the acme.sh') + args = vars(args.parse_args()) + + rec_fqdn = '_acme-challenge.' + args['domain'] + '.' + cer_force, cer_test = False, False + if args['force']: + cer_force = True + if args['test']: + cer_test = True + + cer = ACMEcert(zone=args['domain'], acme_path=args['acmepath'], force=cer_force, test=cer_test) + dns = NSupdate(zone=args['domain'], server=args['server'], keyname=args['keyname'], keydata=args['keydata']) + + attempts_pass, attempts_wait = 0, 60 + while attempts_pass < 2: + print('Certificate issue request') + acme_result = cer.get() + if acme_result is None: + print('Skip renewal') + break + elif type(acme_result) is str: + print('You cert in:', acme_result + '.zip') + make_archive(acme_result, 'zip', acme_result) + break + elif type(acme_result) is list: + print('Start DNS-01 challenge') + for record_value in acme_result: + dns.rec_add(rec_fqdn, record_value) + + attempts_dns_left, attempts_dns_pass = 1, 0 + while attempts_dns_left > 0 and attempts_dns_pass < 5: + seconds = attempts_dns_pass * attempts_wait + print('Pause', seconds, 'seconds') + sleep(seconds) + try: + dns_result = resolver.resolve(qname=rec_fqdn, rdtype='TXT') + except resolver.NXDOMAIN: + print('Not found', rec_fqdn) + attempts_dns_left = attempts_dns_left + 1 + else: + for record_value_from_acme in acme_result: + print('Check', rec_fqdn, 'IN TXT', record_value_from_acme) + found = False + for record_value_from_dns in dns_result: + if record_value_from_acme == str(record_value_from_dns).replace('"', ""): + print('Found', rec_fqdn, 'IN TXT', record_value_from_acme) + found = True + break + if not found: + attempts_dns_left = attempts_dns_left + 1 + break + attempts_dns_pass = attempts_dns_pass + 1 + attempts_pass = attempts_pass + 1