cisco-management/cisco-port-failover.py

378 lines
13 KiB
Python

#!/usr/bin/env python3
# pylint: disable=C0103
"""Cisco port failover. Cisco ports switching if the target is unreachable.
"""
import datetime
import inspect
import logging
import re
from argparse import ArgumentParser
from os import path, sep
from subprocess import Popen, PIPE, STDOUT
from sys import modules, platform
from time import sleep
from paramiko import SSHClient, AutoAddPolicy
class Connect:
"""Set of connection methods (functions) for various protocols.
"""
@staticmethod
# pylint: disable=W0718
def ssh_commands(
commands: (str, list),
hostname: str,
username: str,
password: str,
p_enable: (str, type(None)) = None,
port: int = 22,
wait: float = 0.5,
logger_alias: str = inspect.stack()[0].function
) -> str:
"""Handling SSH command executing.
Args:
commands (str, list): commands for executing.
hostname (str): remote hostname or ip address.
username (str): remote host username.
password (str): remote host password.
p_enable (str, None): enable mode password. Defaults to remote host password.
port (int, optional): remote host connection port. Defaults to 22.
wait (float): delay in seconds between commands. Defaults to 0.5.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
str: terminal response or 'ERROR'.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Connect.ssh_commands.__annotations__):
if isinstance(commands, str):
commands = [commands]
if p_enable is None:
p_enable = password
client = SSHClient()
client.set_missing_host_key_policy(AutoAddPolicy())
local_logger.debug(msg=''
+ '\n' + 'host: ' + hostname + ':' + str(port)
+ '\n' + 'user: ' + username
+ '\n' + 'pass: ' + password
+ '\n' + 'p_en: ' + p_enable
+ '\n' + 'commands: '
+ '\n' + '\n '.join(commands)
)
try:
client.connect(hostname=hostname, username=username, password=password, port=port)
chan = client.invoke_shell()
data = b''
for command in commands:
while not chan.send_ready():
sleep(wait)
sleep(wait)
chan.send(command + '\n')
while not chan.recv_ready():
sleep(wait)
sleep(wait)
resp = chan.recv(8388608)
#print(resp.decode('utf-8'))
if command == 'enable':
chan.send(p_enable + '\n')
data += resp
client.close()
return data.decode('utf-8')
except Exception as error:
local_logger.debug(msg='error: ' + '\n' + str(error))
return 'ERROR'
class Cisco(Connect):
"""Set of management methods (functions) for Cisco devices.
"""
def __init__(self,
host: str,
user: str,
pswd: str,
p_en: (str, type(None)) = None,
port: int = 22,
wait: float = 0.05
) -> None:
self._host = host
self._user = user
self._pswd = pswd
self._p_en = p_en
self._port = port
self._wait = wait
if self._p_en is None:
self._p_en = self._pswd
def interface_status(self,
name: str,
logger_alias: str = inspect.stack()[0].function
) -> str:
"""Get interface status.
Args:
name (str): interface name, example: 'gigabitEthernet 1/0/24'.
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Raises:
ValueError: _description_
Returns:
str: _description_
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Cisco.interface_status.__annotations__):
commands = ['show interface ' + name + ' status']
response = self.ssh_commands(
commands=commands,
hostname=self._host,
username=self._user,
password=self._pswd,
p_enable=self._p_en,
port=self._port,
wait=self._wait
)
local_logger.debug(msg=""
+ "\n" + self._host + " response: "
+ "\n" + response
)
if re.search(".*disabled.*", response, ):
return 'disabled'
elif re.search(".*notconnect.*", response, ):
return 'notconnect'
elif re.search(".*connect.*", response):
return 'connect'
else:
raise ValueError("port status unknown")
def interfaces_enable(self,
names: (str, list),
logger_alias: str = inspect.stack()[0].function
) -> None:
"""Enable defined interfaces.
Args:
names (str, list): interface name list, example: ['Gi0/1','Gi0/2'].
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
None.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Cisco.interfaces_enable.__annotations__):
if isinstance(names, str):
names = [names]
interfaces = []
for name in names:
interfaces.append('interface ' + name)
interfaces.append('no shutdown')
commands = ['enable', 'configure terminal'] + interfaces + ['end','write','exit']
response = self.ssh_commands(
commands=commands,
hostname=self._host,
username=self._user,
password=self._pswd,
p_enable=self._p_en,
port=self._port,
wait=self._wait
)
local_logger.debug(msg=""
+ "\n" + self._host + " response: "
+ "\n" + response
)
return None
def interfaces_disable(self,
names: (str, list),
logger_alias: str = inspect.stack()[0].function
) -> None:
"""Disable defined interfaces.
Args:
names (str, list): interface name list, example: ['Gi0/1','Gi0/2'].
logger_alias (str, optional): sublogger name. Defaults to function or method name.
Returns:
None.
"""
local_logger = logging.getLogger(logger_alias)
if Do.args_valid(locals(), Cisco.interfaces_disable.__annotations__):
if isinstance(names, str):
names = [names]
interfaces = []
for name in names:
interfaces.append('interface ' + name)
interfaces.append('shutdown')
commands = ['enable', 'configure terminal'] + interfaces + ['end','write','exit']
response = self.ssh_commands(
commands=commands,
hostname=self._host,
username=self._user,
password=self._pswd,
p_enable=self._p_en,
port=self._port,
wait=self._wait
)
local_logger.debug(msg=""
+ "\n" + self._host + " response: "
+ "\n" + response
)
return None
class Do():
"""Set of various methods (functions) for routine.
"""
@staticmethod
def args_valid(arguments: dict, annotations: dict) -> bool:
"""Arguments type validating by annotations.
Args:
arguments (dict): 'locals()' immediately after starting the function.
annotations (dict): function.name.__annotations__.
Raises:
TypeError: type of argument is not equal type in annotation.
Returns:
bool: True if argument types are valid.
"""
for var_name, var_type in annotations.items():
if not var_name == 'return':
if not isinstance(arguments[var_name], var_type):
raise TypeError(""
+ "type of '"
+ var_name
+ "' = "
+ str(arguments[var_name])
+ " is not "
+ str(var_type)
)
return True
@staticmethod
def ping(host: str) -> bool:
"""Send ICMP echo request.
Args:
host (str): ip address or hostname.
Returns:
bool: True - host replied, False - host didn't respond
"""
if platform.startswith('linux') or platform.startswith('darwin'):
process = ['ping', host, '-c', '1']
elif platform.startswith('win32'):
process = ['ping', host, '-n', '1', '>>', 'NUL']
with Popen(process, stdout=PIPE, stderr=STDOUT) as proc:
proc.communicate()
if proc.returncode == 0:
return True
return False
if __name__ == "__main__":
time_start = datetime.datetime.now()
if 'argparse' in modules:
args = ArgumentParser(
prog='cisco-port-failover',
description='Cisco ports switching if the target is unreachable.',
epilog='Dependencies: '
'- Python 3 (tested version 3.9.5), '
'- Python 3 modules: paramiko '
)
args.add_argument('--host', type=str, required=True,
help='cisco host address')
args.add_argument('--port', type=int, default=22, required=False,
help='cisco ssh port')
args.add_argument('--user', type=str, default='cisco', required=False,
help='cisco ssh username')
args.add_argument('--pass', type=str, default='cisco', required=False,
help='cisco ssh password')
args.add_argument('--p_en', type=str, default=None, required=False,
help='cisco enable mode password')
args.add_argument('--check', action='append', default=[], required=False,
help='ping target host')
args.add_argument('--iface', action='append', default=[], required=False,
help='cisco target interface')
args.add_argument('--wait', type=float, default=0.05, required=False,
help='delay in seconds between commands')
args.add_argument('--log_root', type=str, default=path.dirname(path.realpath(__file__)),
required=False,
help='path to log directory')
args.add_argument('--log_level', type=str, default='INFO', required=False,
help='DEBUG, INFO, WARNING, ERROR, CRITICAL')
args = vars(args.parse_args())
log_root = args['log_root']
log_level = args['log_level']
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(name)s: %(message)s',
datefmt='%Y-%m-%d_%H.%M.%S',
handlers=[
logging.FileHandler(
filename=log_root + sep + path.splitext(path.basename(__file__))[0] + '.log',
mode='a'
),
logging.StreamHandler()
],
level=log_level
)
logging.getLogger("paramiko").setLevel(logging.WARNING)
device = Cisco(
host=args['host'],
user=args['user'],
pswd=args['pass'],
p_en=args['p_en'],
port=args['port'],
wait=args['wait']
)
for address in args['check']:
if Do.ping(host=address):
logging.info(msg='target hosts are available. Nothing to do.')
break
else:
iface_states = {'old': {}, 'new': {}}
iface_active_old_index = -1
# get source ports states (~2sec)
for iface in args['iface']:
iface_states['old'][iface] = device.interface_status(iface)
if iface_states['old'][iface] != 'disabled':
iface_active_old_index = args['iface'].index(iface)
# choice new active port
iface_active_new_index = iface_active_old_index + 1
if iface_active_new_index >= len(args['iface']):
iface_active_new_index = 0
# set ports new states (~4sec)
device.interfaces_disable(args['iface'])
device.interfaces_enable(args['iface'][iface_active_new_index])
# get current ports states (~2sec)
for iface in args['iface']:
iface_states['new'][iface] = device.interface_status(iface)
logging.info(msg=iface_states)
time_execute = datetime.datetime.now() - time_start
logging.info(msg='execution time is ' + str(time_execute) + '. Exit.')