#!/usr/bin/env python3 # pylint: disable=C0103 """Cisco port failover. Cisco ports switching if the target is unreachable. """ import datetime import inspect import logging import re from argparse import ArgumentParser from os import path, sep from subprocess import Popen, PIPE, STDOUT from sys import modules, platform from time import sleep from paramiko import SSHClient, AutoAddPolicy class Connect: """Set of connection methods (functions) for various protocols. """ @staticmethod # pylint: disable=W0718 def ssh_commands( commands: (str, list), hostname: str, username: str, password: str, p_enable: (str, type(None)) = None, port: int = 22, wait: float = 0.5, logger_alias: str = inspect.stack()[0].function ) -> str: """Handling SSH command executing. Args: commands (str, list): commands for executing. hostname (str): remote hostname or ip address. username (str): remote host username. password (str): remote host password. p_enable (str, None): enable mode password. Defaults to remote host password. port (int, optional): remote host connection port. Defaults to 22. wait (float): delay in seconds between commands. Defaults to 0.5. logger_alias (str, optional): sublogger name. Defaults to function or method name. Returns: str: terminal response or 'ERROR'. """ local_logger = logging.getLogger(logger_alias) if Do.args_valid(locals(), Connect.ssh_commands.__annotations__): if isinstance(commands, str): commands = [commands] if p_enable is None: p_enable = password client = SSHClient() client.set_missing_host_key_policy(AutoAddPolicy()) local_logger.debug(msg='' + '\n' + 'host: ' + hostname + ':' + str(port) + '\n' + 'user: ' + username + '\n' + 'pass: ' + password + '\n' + 'p_en: ' + p_enable + '\n' + 'commands: ' + '\n' + '\n '.join(commands) ) try: client.connect(hostname=hostname, username=username, password=password, port=port) chan = client.invoke_shell() data = b'' for command in commands: while not chan.send_ready(): sleep(wait) sleep(wait) chan.send(command + '\n') while not chan.recv_ready(): sleep(wait) sleep(wait) resp = chan.recv(8388608) #print(resp.decode('utf-8')) if command == 'enable': chan.send(p_enable + '\n') data += resp client.close() return data.decode('utf-8') except Exception as error: local_logger.debug(msg='error: ' + '\n' + str(error)) return 'ERROR' class Cisco(Connect): """Set of management methods (functions) for Cisco devices. """ def __init__(self, host: str, user: str, pswd: str, p_en: (str, type(None)) = None, port: int = 22, wait: float = 0.05 ) -> None: self._host = host self._user = user self._pswd = pswd self._p_en = p_en self._port = port self._wait = wait if self._p_en is None: self._p_en = self._pswd def interface_status(self, name: str, logger_alias: str = inspect.stack()[0].function ) -> str: """Get interface status. Args: name (str): interface name, example: 'gigabitEthernet 1/0/24'. logger_alias (str, optional): sublogger name. Defaults to function or method name. Raises: ValueError: _description_ Returns: str: _description_ """ local_logger = logging.getLogger(logger_alias) if Do.args_valid(locals(), Cisco.interface_status.__annotations__): commands = ['show interface ' + name + ' status'] response = self.ssh_commands( commands=commands, hostname=self._host, username=self._user, password=self._pswd, p_enable=self._p_en, port=self._port, wait=self._wait ) local_logger.debug(msg="" + "\n" + self._host + " response: " + "\n" + response ) if re.search(".*disabled.*", response, ): return 'disabled' elif re.search(".*notconnect.*", response, ): return 'notconnect' elif re.search(".*connect.*", response): return 'connect' else: raise ValueError("port status unknown") def interfaces_enable(self, names: (str, list), logger_alias: str = inspect.stack()[0].function ) -> None: """Enable defined interfaces. Args: names (str, list): interface name list, example: ['Gi0/1','Gi0/2']. logger_alias (str, optional): sublogger name. Defaults to function or method name. Returns: None. """ local_logger = logging.getLogger(logger_alias) if Do.args_valid(locals(), Cisco.interfaces_enable.__annotations__): if isinstance(names, str): names = [names] interfaces = [] for name in names: interfaces.append('interface ' + name) interfaces.append('no shutdown') commands = ['enable', 'configure terminal'] + interfaces + ['end','write','exit'] response = self.ssh_commands( commands=commands, hostname=self._host, username=self._user, password=self._pswd, p_enable=self._p_en, port=self._port, wait=self._wait ) local_logger.debug(msg="" + "\n" + self._host + " response: " + "\n" + response ) return None def interfaces_disable(self, names: (str, list), logger_alias: str = inspect.stack()[0].function ) -> None: """Disable defined interfaces. Args: names (str, list): interface name list, example: ['Gi0/1','Gi0/2']. logger_alias (str, optional): sublogger name. Defaults to function or method name. Returns: None. """ local_logger = logging.getLogger(logger_alias) if Do.args_valid(locals(), Cisco.interfaces_disable.__annotations__): if isinstance(names, str): names = [names] interfaces = [] for name in names: interfaces.append('interface ' + name) interfaces.append('shutdown') commands = ['enable', 'configure terminal'] + interfaces + ['end','write','exit'] response = self.ssh_commands( commands=commands, hostname=self._host, username=self._user, password=self._pswd, p_enable=self._p_en, port=self._port, wait=self._wait ) local_logger.debug(msg="" + "\n" + self._host + " response: " + "\n" + response ) return None class Do(): """Set of various methods (functions) for routine. """ @staticmethod def args_valid(arguments: dict, annotations: dict) -> bool: """Arguments type validating by annotations. Args: arguments (dict): 'locals()' immediately after starting the function. annotations (dict): function.name.__annotations__. Raises: TypeError: type of argument is not equal type in annotation. Returns: bool: True if argument types are valid. """ for var_name, var_type in annotations.items(): if not var_name == 'return': if not isinstance(arguments[var_name], var_type): raise TypeError("" + "type of '" + var_name + "' = " + str(arguments[var_name]) + " is not " + str(var_type) ) return True @staticmethod def ping(host: str) -> bool: """Send ICMP echo request. Args: host (str): ip address or hostname. Returns: bool: True - host replied, False - host didn't respond """ if platform.startswith('linux') or platform.startswith('darwin'): process = ['ping', host, '-c', '1'] elif platform.startswith('win32'): process = ['ping', host, '-n', '1', '>>', 'NUL'] with Popen(process, stdout=PIPE, stderr=STDOUT) as proc: proc.communicate() if proc.returncode == 0: return True return False if __name__ == "__main__": time_start = datetime.datetime.now() if 'argparse' in modules: args = ArgumentParser( prog='cisco-port-failover', description='Cisco ports switching if the target is unreachable.', epilog='Dependencies: ' '- Python 3 (tested version 3.9.5), ' '- Python 3 modules: paramiko ' ) args.add_argument('--host', type=str, required=True, help='cisco host address') args.add_argument('--port', type=int, default=22, required=False, help='cisco ssh port') args.add_argument('--user', type=str, default='cisco', required=False, help='cisco ssh username') args.add_argument('--pass', type=str, default='cisco', required=False, help='cisco ssh password') args.add_argument('--p_en', type=str, default=None, required=False, help='cisco enable mode password') args.add_argument('--check', action='append', default=[], required=False, help='ping target host') args.add_argument('--iface', action='append', default=[], required=False, help='cisco target interface') args.add_argument('--wait', type=float, default=0.05, required=False, help='delay in seconds between commands') args.add_argument('--log_root', type=str, default=path.dirname(path.realpath(__file__)), required=False, help='path to log directory') args.add_argument('--log_level', type=str, default='INFO', required=False, help='DEBUG, INFO, WARNING, ERROR, CRITICAL') args = vars(args.parse_args()) log_root = args['log_root'] log_level = args['log_level'] logging.basicConfig( format='%(asctime)s %(levelname)s: %(name)s: %(message)s', datefmt='%Y-%m-%d_%H.%M.%S', handlers=[ logging.FileHandler( filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log', mode='a' ), logging.StreamHandler() ], level=log_level ) logging.getLogger("paramiko").setLevel(logging.WARNING) device = Cisco( host=args['host'], user=args['user'], pswd=args['pass'], p_en=args['p_en'], port=args['port'], wait=args['wait'] ) for address in args['check']: if Do.ping(host=address): break else: iface_states = {'old': {}, 'new': {}} iface_active_old_index = -1 # get source ports states (~2sec) for iface in args['iface']: iface_states['old'][iface] = device.interface_status(iface) if iface_states['old'][iface] != 'disabled': iface_active_old_index = args['iface'].index(iface) # choice new active port iface_active_new_index = iface_active_old_index + 1 if iface_active_new_index >= len(args['iface']): iface_active_new_index = 0 # set ports new states (~4sec) device.interfaces_disable(args['iface']) device.interfaces_enable(args['iface'][iface_active_new_index]) # get current ports states (~2sec) for iface in args['iface']: iface_states['new'][iface] = device.interface_status(iface) logging.info(msg=iface_states) time_execute = datetime.datetime.now() - time_start logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')