add simplewc.py
This commit is contained in:
parent
252dae4b12
commit
83e71ba140
34
README.md
34
README.md
|
@ -7,6 +7,7 @@ Small tools needed to solve immediate tasks independently or as part of a projec
|
||||||
* [`ffmpeger`.py](https://git.hmp.today/pavel.muhortov/utils#ffmpeger-py)
|
* [`ffmpeger`.py](https://git.hmp.today/pavel.muhortov/utils#ffmpeger-py)
|
||||||
* [`procutil`.py](https://git.hmp.today/pavel.muhortov/utils#procutil-py)
|
* [`procutil`.py](https://git.hmp.today/pavel.muhortov/utils#procutil-py)
|
||||||
* [`sendmail`.py](https://git.hmp.today/pavel.muhortov/utils#sendmail-py)
|
* [`sendmail`.py](https://git.hmp.today/pavel.muhortov/utils#sendmail-py)
|
||||||
|
* [`simplewc`.py](https://git.hmp.today/pavel.muhortov/utils#simplewc-py)
|
||||||
____
|
____
|
||||||
## `camsutil`
|
## `camsutil`
|
||||||
**Description:** Creation of a request to the camera API based on the prepared template
|
**Description:** Creation of a request to the camera API based on the prepared template
|
||||||
|
@ -123,7 +124,7 @@ if path.exists(conf):
|
||||||
____
|
____
|
||||||
## `ffmpeger`.py
|
## `ffmpeger`.py
|
||||||
**Description:** FFmpeg management from Python
|
**Description:** FFmpeg management from Python
|
||||||
**Dependencies:** Python 3 (tested version 3.9.5), installed or downloaded ffmpeg, [procutil.py](https://git.hmp.today/pavel.muhortov/utils#procutil-py) in the same directory
|
**Dependencies:** Python 3 (tested version 3.9.5), installed or downloaded [ffmpeg](https://ffmpeg.org/download.html), [procutil.py](https://git.hmp.today/pavel.muhortov/utils#procutil-py) in the same directory
|
||||||
|
|
||||||
| PARAMETERS | DESCRIPTION | DEFAULT|
|
| PARAMETERS | DESCRIPTION | DEFAULT|
|
||||||
|-------------|-------------|--------|
|
|-------------|-------------|--------|
|
||||||
|
@ -139,6 +140,7 @@ ____
|
||||||
|
|
||||||
Example usage in cron with Python:
|
Example usage in cron with Python:
|
||||||
```shell
|
```shell
|
||||||
|
# at every minute
|
||||||
* * * * * /usr/bin/python3 ~/ffmpeger.py -s rtsp://user:pass@host:554/Streaming/Channels/101 --dst rtmp://a.rtmp.youtube.com/live2/YOUKEY --mono --watchdog --sec 30 >> /dev/null 2>&1
|
* * * * * /usr/bin/python3 ~/ffmpeger.py -s rtsp://user:pass@host:554/Streaming/Channels/101 --dst rtmp://a.rtmp.youtube.com/live2/YOUKEY --mono --watchdog --sec 30 >> /dev/null 2>&1
|
||||||
```
|
```
|
||||||
Example usage in terminal with make the script executable:
|
Example usage in terminal with make the script executable:
|
||||||
|
@ -224,3 +226,33 @@ print(log)
|
||||||
```
|
```
|
||||||
|
|
||||||
____
|
____
|
||||||
|
## `simplewc`.py
|
||||||
|
**Description:** Update Let's Encrypt wildcard certificate with DNS-01 challenge
|
||||||
|
**Dependencies:** Python 3 (tested version 3.9.5),
|
||||||
|
installed or downloaded [acme.sh](https://github.com/Neilpang/acme.sh),
|
||||||
|
installed [dnspython](https://github.com/rthalley/dnspython) package,
|
||||||
|
dns is supported to [dynamic update](https://en.wikipedia.org/wiki/Dynamic_DNS)
|
||||||
|
|
||||||
|
| PARAMETERS | DESCRIPTION | DEFAULT|
|
||||||
|
|-------------|-------------|--------|
|
||||||
|
|**--domain**|domain for which the wildcard certificate is issued|**REQUIRED**|
|
||||||
|
|**--server**|master server containing the domain zone|**REQUIRED**|
|
||||||
|
|**--keyname**|name of the key to update the zone|**REQUIRED**|
|
||||||
|
|**--keydata**|content of the key to update the zone|**REQUIRED**|
|
||||||
|
|**[-h]**|print help and exit||
|
||||||
|
|**[--acmepath]**|alternative path to bin (example: ~/.acme.sh/acme.sh)|`None`|
|
||||||
|
|**[--force]**|"force" argument for the acme.sh|`False`|
|
||||||
|
|**[--test]**|"test" argument for the acme.sh|`False`|
|
||||||
|
|
||||||
|
Example usage in cron with Python:
|
||||||
|
```shell
|
||||||
|
# at 00:00 on Monday
|
||||||
|
0 0 * * 1 /usr/bin/python3 ~/simplewc.py --domain EXAMPLE.COM --server 8.8.8.8 --keyname KEY --keydata YOU_KEY_CONTENT > /dev/null
|
||||||
|
```
|
||||||
|
Example usage in terminal with make the script executable:
|
||||||
|
```shell
|
||||||
|
chmod u+x ./simplewc.py
|
||||||
|
./simplewc.py --domain EXAMPLE.COM --server 8.8.8.8 --keyname KEY --keydata YOU_KEY_CONTENT --test --force
|
||||||
|
```
|
||||||
|
|
||||||
|
____
|
||||||
|
|
216
simplewc.py
Normal file
216
simplewc.py
Normal file
|
@ -0,0 +1,216 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
|
||||||
|
from os import path
|
||||||
|
from shutil import make_archive
|
||||||
|
from subprocess import Popen, PIPE, STDOUT
|
||||||
|
from time import sleep
|
||||||
|
from typing import Union
|
||||||
|
try:
|
||||||
|
from dns import resolver, rdatatype, rdataclass, rdata, update, query, tsigkeyring, tsig, name
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
print('You need to install the dnspython package using:\n'
|
||||||
|
'pip install dnspython\n'
|
||||||
|
'Then restart the program')
|
||||||
|
exit(1)
|
||||||
|
try:
|
||||||
|
from sendmail import Mail, str2bool
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
print('You need sendmail.py in the same directory.\n'
|
||||||
|
'Then restart the program')
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
class NSupdate:
|
||||||
|
"""
|
||||||
|
Dynamic DNS update from Python
|
||||||
|
"""
|
||||||
|
def __init__(self, server: str, zone: str,
|
||||||
|
keyname: str, keydata: str, keyalgo: name.Name = tsig.HMAC_SHA512) -> None:
|
||||||
|
"""
|
||||||
|
Object constructor
|
||||||
|
:param server: string with master server containing the domain zone
|
||||||
|
:param zone: string with name of zone
|
||||||
|
:param keyname: string with name of the key to update the zone
|
||||||
|
:param keydata: string with content of the key to update the zone
|
||||||
|
:param keyalgo: dns.tsig method with algorithm for key
|
||||||
|
"""
|
||||||
|
self._server = server
|
||||||
|
self._domain = zone
|
||||||
|
self._keyring = tsigkeyring.from_text({keyname: keydata})
|
||||||
|
self._keyalgo = keyalgo
|
||||||
|
|
||||||
|
def rec_add(self, rec_key: str, rec_val: str, rec_ttl: int = 60,
|
||||||
|
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
|
||||||
|
"""
|
||||||
|
Add record to DNS zone
|
||||||
|
:param rec_key: string with record name
|
||||||
|
:param rec_val: string with record value
|
||||||
|
:param rec_ttl: integer with record ttl
|
||||||
|
:param rec_cls: rdataclass of record
|
||||||
|
:param rec_typ: rdatatype of record
|
||||||
|
:return: None, QueryMessage, UpdateMessage, Message after request
|
||||||
|
"""
|
||||||
|
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
|
||||||
|
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
|
||||||
|
rec_upd.add(rec_key, rec_ttl, rec_val)
|
||||||
|
return query.tcp(q=rec_upd, where=self._server)
|
||||||
|
|
||||||
|
def rec_del(self, rec_key: str, rec_val: str,
|
||||||
|
rec_cls: rdataclass.RdataClass = rdataclass.IN, rec_typ: rdatatype.RdataType = rdatatype.TXT) -> None:
|
||||||
|
"""
|
||||||
|
Delete record from DNS zone
|
||||||
|
:param rec_key: string with record name
|
||||||
|
:param rec_val: string with record value
|
||||||
|
:param rec_cls: rdataclass of record
|
||||||
|
:param rec_typ: rdatatype of record
|
||||||
|
:return: None, QueryMessage, UpdateMessage, Message after request
|
||||||
|
"""
|
||||||
|
rec_val = rdata.from_text(rec_cls, rec_typ, rec_val)
|
||||||
|
rec_upd = update.Update(zone=self._domain, keyring=self._keyring, keyalgorithm=self._keyalgo)
|
||||||
|
rec_upd.delete(rec_key, rec_val)
|
||||||
|
return query.tcp(q=rec_upd, where=self._server)
|
||||||
|
|
||||||
|
|
||||||
|
class ACMEcert:
|
||||||
|
"""
|
||||||
|
ACME launcher for DNS-01 challenge from Python
|
||||||
|
"""
|
||||||
|
def __init__(self, zone: str, acme_path: str = None, test: bool = False, force: bool = False) -> None:
|
||||||
|
"""
|
||||||
|
Object constructor
|
||||||
|
:param zone: string with name of zone
|
||||||
|
:param acme_path: alternative path to bin (example: ~/.acme.sh/acme.sh)
|
||||||
|
:param test: boolean "test" argument for the acme.sh
|
||||||
|
:param force: boolean "force" argument for the acme.sh
|
||||||
|
"""
|
||||||
|
self._domain = zone
|
||||||
|
if not acme_path:
|
||||||
|
self._acme_path = path.expanduser("~") + path.sep + '.acme.sh' + path.sep + 'acme.sh'
|
||||||
|
if not(path.exists(self._acme_path)):
|
||||||
|
print('You need to specify the path to the acme.sh\n'
|
||||||
|
'or perform the default installation using:\n'
|
||||||
|
'curl https://get.acme.sh | sh\n'
|
||||||
|
'Then restart the program')
|
||||||
|
exit(1)
|
||||||
|
self._acme_test = test
|
||||||
|
self._acme_force = force
|
||||||
|
self._acme_mode = '--issue'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _run(cls, proc_args: list) -> list:
|
||||||
|
"""
|
||||||
|
Launch acme.sh
|
||||||
|
:param proc_args: list of arguments for acme.sh
|
||||||
|
:return: list of strings with raw stdout from acme.sh
|
||||||
|
"""
|
||||||
|
with Popen(proc_args, stdout=PIPE, stderr=STDOUT) as proc:
|
||||||
|
result = []
|
||||||
|
for line in proc.stdout:
|
||||||
|
result.append(line)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get(self) -> Union[None, str, list]:
|
||||||
|
"""
|
||||||
|
Get certificate or other result after launch acme.sh
|
||||||
|
:return: None if skip, string with path if issued, list of records if need update dns
|
||||||
|
"""
|
||||||
|
acme_args = [self._acme_path, self._acme_mode,
|
||||||
|
'-d', self._domain, '-d', '*.' + self._domain,
|
||||||
|
'--dns', '--yes-I-know-dns-manual-mode-enough-go-ahead-please'
|
||||||
|
]
|
||||||
|
if self._acme_test:
|
||||||
|
acme_args.append('--test')
|
||||||
|
if self._acme_force:
|
||||||
|
acme_args.append('--force')
|
||||||
|
acme_skip, acme_sign, acme_auth = b'Skip, Next renewal time is:', b'Your cert is in', b'TXT value:'
|
||||||
|
result = []
|
||||||
|
for line in self._run(acme_args):
|
||||||
|
if acme_skip in line:
|
||||||
|
return None
|
||||||
|
if acme_sign in line:
|
||||||
|
return path.dirname(line.split(acme_sign)[1].decode('utf-8').strip().replace("'", ""))
|
||||||
|
if acme_auth in line:
|
||||||
|
result.append(line.split(acme_auth)[1].decode('utf-8').strip().replace("'", ""))
|
||||||
|
self._acme_mode = '--renew'
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
|
||||||
|
args = ArgumentParser(
|
||||||
|
prog='SimpleWC',
|
||||||
|
description='Update Let\'s Encrypt wildcard certificate with DNS-01 challenge',
|
||||||
|
epilog='Dependencies: '
|
||||||
|
'Python 3 (tested version 3.9.5), '
|
||||||
|
'installed or downloaded acme.sh, '
|
||||||
|
'installed dnspython package, '
|
||||||
|
'dns is supported to dynamic update'
|
||||||
|
)
|
||||||
|
args.add_argument('--domain', type=str, required=True,
|
||||||
|
help='domain for which the wildcard certificate is issued')
|
||||||
|
args.add_argument('--server', type=str, required=True,
|
||||||
|
help='master server containing the domain zone')
|
||||||
|
args.add_argument('--keyname', type=str, required=True,
|
||||||
|
help='name of the key to update the zone')
|
||||||
|
args.add_argument('--keydata', type=str, required=True,
|
||||||
|
help='content of the key to update the zone')
|
||||||
|
args.add_argument('--acmepath', type=str, required=False, default=None,
|
||||||
|
help='alternative path to bin (example: ~/.acme.sh/acme.sh)')
|
||||||
|
args.add_argument('--force', action='store_true', required=False,
|
||||||
|
help='"force" argument for the acme.sh')
|
||||||
|
args.add_argument('--test', action='store_true', required=False,
|
||||||
|
help='"test" argument for the acme.sh')
|
||||||
|
args = vars(args.parse_args())
|
||||||
|
|
||||||
|
rec_fqdn = '_acme-challenge.' + args['domain'] + '.'
|
||||||
|
cer_force, cer_test = False, False
|
||||||
|
if args['force']:
|
||||||
|
cer_force = True
|
||||||
|
if args['test']:
|
||||||
|
cer_test = True
|
||||||
|
|
||||||
|
cer = ACMEcert(zone=args['domain'], acme_path=args['acmepath'], force=cer_force, test=cer_test)
|
||||||
|
dns = NSupdate(zone=args['domain'], server=args['server'], keyname=args['keyname'], keydata=args['keydata'])
|
||||||
|
|
||||||
|
attempts_pass, attempts_wait = 0, 60
|
||||||
|
while attempts_pass < 2:
|
||||||
|
print('Certificate issue request')
|
||||||
|
acme_result = cer.get()
|
||||||
|
if acme_result is None:
|
||||||
|
print('Skip renewal')
|
||||||
|
break
|
||||||
|
elif type(acme_result) is str:
|
||||||
|
print('You cert in:', acme_result + '.zip')
|
||||||
|
make_archive(acme_result, 'zip', acme_result)
|
||||||
|
break
|
||||||
|
elif type(acme_result) is list:
|
||||||
|
print('Start DNS-01 challenge')
|
||||||
|
for record_value in acme_result:
|
||||||
|
dns.rec_add(rec_fqdn, record_value)
|
||||||
|
|
||||||
|
attempts_dns_left, attempts_dns_pass = 1, 0
|
||||||
|
while attempts_dns_left > 0 and attempts_dns_pass < 5:
|
||||||
|
seconds = attempts_dns_pass * attempts_wait
|
||||||
|
print('Pause', seconds, 'seconds')
|
||||||
|
sleep(seconds)
|
||||||
|
try:
|
||||||
|
dns_result = resolver.resolve(qname=rec_fqdn, rdtype='TXT')
|
||||||
|
except resolver.NXDOMAIN:
|
||||||
|
print('Not found', rec_fqdn)
|
||||||
|
attempts_dns_left = attempts_dns_left + 1
|
||||||
|
else:
|
||||||
|
for record_value_from_acme in acme_result:
|
||||||
|
print('Check', rec_fqdn, 'IN TXT', record_value_from_acme)
|
||||||
|
found = False
|
||||||
|
for record_value_from_dns in dns_result:
|
||||||
|
if record_value_from_acme == str(record_value_from_dns).replace('"', ""):
|
||||||
|
print('Found', rec_fqdn, 'IN TXT', record_value_from_acme)
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
if not found:
|
||||||
|
attempts_dns_left = attempts_dns_left + 1
|
||||||
|
break
|
||||||
|
attempts_dns_pass = attempts_dns_pass + 1
|
||||||
|
attempts_pass = attempts_pass + 1
|
Loading…
Reference in New Issue
Block a user