diff --git a/README.md b/README.md index a8dcc8b..3b678bc 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,51 @@ -# template-python +# cisco-management -Template repository for projects on python +Wireguard management and monitoring utils. -* [`script.py`](https://git.hmp.today/pavel.muhortov/template-python#script-py) +* [`cisco-port-failover`.py](https://git.hmp.today/pavel.muhortov/cisco-management#cisco-port-failover-py) ____ -## `script.py` +## `cisco-port-failover`.py **Description:** -> returning current username if privileged rights are exist -> or -> returning error, if privileged rights are not exist +> Cisco ports switching if the target is unreachable. **Dependencies:** > -> * Python 3 (tested version 3.9.5) +> * privileged rights +> * [Python 3](https://www.python.org/downloads/) (tested version 3.9.5 on [Debian GNU/Linux 11](http://ftp.debian.org/debian/dists/bullseye/)) +> * [paramiko](https://www.paramiko.org/) Python 3 module (tested version 3.1.0) -| PARAMETERS | DESCRIPTION | DEFAULT| -|-------------|-------------|--------| -|**[-s,--show]**|"" - execution with pauses.
"qn" - execution without pauses.|| -|**[-c,--conf]**|path to configuration file|`./script.conf`| +| PARAMETERS | DESCRIPTION | DEFAULT | +|--------------|------------------------|---------------| +|**[--host]**|cisco host address|**REQUIRED**| +|**[--port]**|cisco ssh port|`22`| +|**[--user]**|cisco ssh username|`cisco`| +|**[--pass]**|cisco ssh password|`cisco`| +|**[--p_en]**|cisco enable mode password|the same ssh password| +|**[--check]**|ping target host|`None`| +|**[--iface]**|cisco target interface|`None`| +|**[--wait]**|delay in seconds between commands|`0.05`| +|**[--log_root]**|path to log directory|the same script path| +|**[--log_level]**|DEBUG, INFO, WARNING, ERROR, CRITICAL|`INFO`| -Example usage in terminal with Python on Linux: +Example usage with cron: -```shell -python3 ./script.py +```bash +# install dependencies +sudo pip install paramiko +# download +sudo wget https://git.hmp.today/pavel.muhortov/cisco-management/raw/branch/master/cisco-port-failover.py -O /usr/local/bin/cisco-port-failover.py +sudo chmod +x /usr/local/bin/cisco-port-failover.py ``` -Example usage in terminal with make the script executable on Linux: - -```shell -chmod u+x ./script.py -script.py -s qn -c ./script.conf +```bash +# sudo crontab -e +*/2 * * * * /usr/bin/python3 /usr/local/bin/cisco-port-failover.py --host 10.0.0.1 --user USER --pass PASS --check 10.0.1.1 --check 10.0.1.2 --check 10.0.1.3 --iface Gi1/0/1 --iface Gi1/0/2 --log_root /var/log ``` -Example usage in terminal with Python on Windows: - -```shell -python .\script.py +```bash +# watching +tail -f /var/log/cisco-port-failover.log ``` diff --git a/cisco-port-failover.py b/cisco-port-failover.py new file mode 100644 index 0000000..292fe65 --- /dev/null +++ b/cisco-port-failover.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +# pylint: disable=C0103 + +"""Cisco port failover. Cisco ports switching if the target is unreachable. +""" + +import datetime +import inspect +import logging +import re +from argparse import ArgumentParser +from os import path, sep +from subprocess import Popen, PIPE, STDOUT +from sys import modules, platform +from time import sleep +from paramiko import SSHClient, AutoAddPolicy + + +class Connect: + """Set of connection methods (functions) for various protocols. + """ + @staticmethod + # pylint: disable=W0718 + def ssh_commands( + commands: (str, list), + hostname: str, + username: str, + password: str, + p_enable: (str, type(None)) = None, + port: int = 22, + wait: float = 0.5, + logger_alias: str = inspect.stack()[0].function + ) -> str: + """Handling SSH command executing. + + Args: + commands (str, list): commands for executing. + hostname (str): remote hostname or ip address. + username (str): remote host username. + password (str): remote host password. + p_enable (str, None): enable mode password. Defaults to remote host password. + port (int, optional): remote host connection port. Defaults to 22. + wait (float): delay in seconds between commands. Defaults to 0.5. + logger_alias (str, optional): sublogger name. Defaults to function or method name. + + Returns: + str: terminal response or 'ERROR'. + """ + local_logger = logging.getLogger(logger_alias) + if Do.args_valid(locals(), Connect.ssh_commands.__annotations__): + if isinstance(commands, str): + commands = [commands] + if p_enable is None: + p_enable = password + + client = SSHClient() + client.set_missing_host_key_policy(AutoAddPolicy()) + local_logger.debug(msg='' + + '\n' + 'host: ' + hostname + ':' + str(port) + + '\n' + 'user: ' + username + + '\n' + 'pass: ' + password + + '\n' + 'p_en: ' + p_enable + + '\n' + 'commands: ' + + '\n' + '\n '.join(commands) + ) + try: + client.connect(hostname=hostname, username=username, password=password, port=port) + chan = client.invoke_shell() + data = b'' + + for command in commands: + while not chan.send_ready(): + sleep(wait) + sleep(wait) + chan.send(command + '\n') + + while not chan.recv_ready(): + sleep(wait) + sleep(wait) + resp = chan.recv(8388608) + #print(resp.decode('utf-8')) + + if command == 'enable': + chan.send(p_enable + '\n') + + data += resp + + client.close() + return data.decode('utf-8') + except Exception as error: + local_logger.debug(msg='error: ' + '\n' + str(error)) + return 'ERROR' + + +class Cisco(Connect): + """Set of management methods (functions) for Cisco devices. + """ + def __init__(self, + host: str, + user: str, + pswd: str, + p_en: (str, type(None)) = None, + port: int = 22, + wait: float = 0.05 + ) -> None: + self._host = host + self._user = user + self._pswd = pswd + self._p_en = p_en + self._port = port + self._wait = wait + if self._p_en is None: + self._p_en = self._pswd + + def interface_status(self, + name: str, + logger_alias: str = inspect.stack()[0].function + ) -> str: + """Get interface status. + + Args: + name (str): interface name, example: 'gigabitEthernet 1/0/24'. + logger_alias (str, optional): sublogger name. Defaults to function or method name. + + Raises: + ValueError: _description_ + + Returns: + str: _description_ + """ + local_logger = logging.getLogger(logger_alias) + if Do.args_valid(locals(), Cisco.interface_status.__annotations__): + commands = ['show interface ' + name + ' status'] + + response = self.ssh_commands( + commands=commands, + hostname=self._host, + username=self._user, + password=self._pswd, + p_enable=self._p_en, + port=self._port, + wait=self._wait + ) + local_logger.debug(msg="" + + "\n" + self._host + " response: " + + "\n" + response + ) + if re.search(".*disabled.*", response, ): + return 'disabled' + elif re.search(".*notconnect.*", response, ): + return 'notconnect' + elif re.search(".*connect.*", response): + return 'connect' + else: + raise ValueError("port status unknown") + + def interfaces_enable(self, + names: (str, list), + logger_alias: str = inspect.stack()[0].function + ) -> None: + """Enable defined interfaces. + + Args: + names (str, list): interface name list, example: ['Gi0/1','Gi0/2']. + logger_alias (str, optional): sublogger name. Defaults to function or method name. + + Returns: + None. + """ + local_logger = logging.getLogger(logger_alias) + if Do.args_valid(locals(), Cisco.interfaces_enable.__annotations__): + if isinstance(names, str): + names = [names] + interfaces = [] + for name in names: + interfaces.append('interface ' + name) + interfaces.append('no shutdown') + + commands = ['enable', 'configure terminal'] + interfaces + ['end','write','exit'] + + response = self.ssh_commands( + commands=commands, + hostname=self._host, + username=self._user, + password=self._pswd, + p_enable=self._p_en, + port=self._port, + wait=self._wait + ) + local_logger.debug(msg="" + + "\n" + self._host + " response: " + + "\n" + response + ) + return None + + def interfaces_disable(self, + names: (str, list), + logger_alias: str = inspect.stack()[0].function + ) -> None: + """Disable defined interfaces. + + Args: + names (str, list): interface name list, example: ['Gi0/1','Gi0/2']. + logger_alias (str, optional): sublogger name. Defaults to function or method name. + + Returns: + None. + """ + local_logger = logging.getLogger(logger_alias) + if Do.args_valid(locals(), Cisco.interfaces_disable.__annotations__): + if isinstance(names, str): + names = [names] + interfaces = [] + for name in names: + interfaces.append('interface ' + name) + interfaces.append('shutdown') + + commands = ['enable', 'configure terminal'] + interfaces + ['end','write','exit'] + + response = self.ssh_commands( + commands=commands, + hostname=self._host, + username=self._user, + password=self._pswd, + p_enable=self._p_en, + port=self._port, + wait=self._wait + ) + local_logger.debug(msg="" + + "\n" + self._host + " response: " + + "\n" + response + ) + return None + + +class Do(): + """Set of various methods (functions) for routine. + """ + @staticmethod + def args_valid(arguments: dict, annotations: dict) -> bool: + """Arguments type validating by annotations. + + Args: + arguments (dict): 'locals()' immediately after starting the function. + annotations (dict): function.name.__annotations__. + + Raises: + TypeError: type of argument is not equal type in annotation. + + Returns: + bool: True if argument types are valid. + """ + for var_name, var_type in annotations.items(): + if not var_name == 'return': + if not isinstance(arguments[var_name], var_type): + raise TypeError("" + + "type of '" + + var_name + + "' = " + + str(arguments[var_name]) + + " is not " + + str(var_type) + ) + return True + + @staticmethod + def ping(host: str) -> bool: + """Send ICMP echo request. + + Args: + host (str): ip address or hostname. + + Returns: + bool: True - host replied, False - host didn't respond + """ + if platform.startswith('linux') or platform.startswith('darwin'): + process = ['ping', host, '-c', '1'] + elif platform.startswith('win32'): + process = ['ping', host, '-n', '1', '>>', 'NUL'] + + with Popen(process, stdout=PIPE, stderr=STDOUT) as proc: + proc.communicate() + if proc.returncode == 0: + return True + return False + + +if __name__ == "__main__": + time_start = datetime.datetime.now() + + if 'argparse' in modules: + args = ArgumentParser( + prog='cisco-port-failover', + description='Cisco ports switching if the target is unreachable.', + epilog='Dependencies: ' + '- Python 3 (tested version 3.9.5), ' + '- Python 3 modules: paramiko ' + ) + args.add_argument('--host', type=str, required=True, + help='cisco host address') + args.add_argument('--port', type=int, default=22, required=False, + help='cisco ssh port') + args.add_argument('--user', type=str, default='cisco', required=False, + help='cisco ssh username') + args.add_argument('--pass', type=str, default='cisco', required=False, + help='cisco ssh password') + args.add_argument('--p_en', type=str, default=None, required=False, + help='cisco enable mode password') + args.add_argument('--check', action='append', default=[], required=False, + help='ping target host') + args.add_argument('--iface', action='append', default=[], required=False, + help='cisco target interface') + args.add_argument('--wait', type=float, default=0.05, required=False, + help='delay in seconds between commands') + args.add_argument('--log_root', type=str, default=path.dirname(path.realpath(__file__)), + required=False, + help='path to log directory') + args.add_argument('--log_level', type=str, default='INFO', required=False, + help='DEBUG, INFO, WARNING, ERROR, CRITICAL') + args = vars(args.parse_args()) + + log_root = args['log_root'] + log_level = args['log_level'] + + logging.basicConfig( + format='%(asctime)s %(levelname)s: %(name)s: %(message)s', + datefmt='%Y-%m-%d_%H.%M.%S', + handlers=[ + logging.FileHandler( + filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log', + mode='a' + ), + logging.StreamHandler() + ], + level=log_level + ) + logging.getLogger("paramiko").setLevel(logging.WARNING) + + device = Cisco( + host=args['host'], + user=args['user'], + pswd=args['pass'], + p_en=args['p_en'], + port=args['port'], + wait=args['wait'] + ) + for address in args['check']: + if Do.ping(host=address): + break + else: + iface_states = {'old': {}, 'new': {}} + iface_active_old_index = -1 + + # get source ports states (~2sec) + for iface in args['iface']: + iface_states['old'][iface] = device.interface_status(iface) + if iface_states['old'][iface] != 'disabled': + iface_active_old_index = args['iface'].index(iface) + + # choice new active port + iface_active_new_index = iface_active_old_index + 1 + if iface_active_new_index >= len(args['iface']): + iface_active_new_index = 0 + + # set ports new states (~4sec) + device.interfaces_disable(args['iface']) + device.interfaces_enable(args['iface'][iface_active_new_index]) + + # get current ports states (~2sec) + for iface in args['iface']: + iface_states['new'][iface] = device.interface_status(iface) + + logging.info(msg=iface_states) + + time_execute = datetime.datetime.now() - time_start + logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')