Skip to content

Commit

Permalink
revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebates123 authored and rustydb committed Nov 14, 2024
1 parent d9be1d0 commit faa64be
Showing 1 changed file with 228 additions and 46 deletions.
274 changes: 228 additions & 46 deletions canu/config/switches.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,27 @@
import json
from scrapli import Scrapli
import click
import logging

requests.packages.urllib3.disable_warnings(
requests.packages.urllib3.exceptions.InsecureRequestWarning
)

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def login(session, ip, username, password):
url = f"https://{ip}/rest/v10.11/login"
credentials = {"username": username, "password": password}
try:
response = session.post(url, data=credentials, verify=False)
response.raise_for_status()
logger.info(f"Successfully logged in to {ip}")
except requests.exceptions.RequestException as e:
click.secho(f"Error during login: {str(e)}", fg="red")
logger.error(f"Error during login: {str(e)}")
sys.exit(1)


Expand All @@ -32,8 +39,9 @@ def start_dryrun(session, ip, input_file):
headers = {"Content-Type": "text/plain"}
response = session.post(url, data=data, headers=headers, verify=False)
response.raise_for_status()
logger.info(f"Started dryrun on {ip}")
except requests.exceptions.RequestException as e:
click.secho(f"Error starting dryrun: {str(e)}", fg="red")
logger.error(f"Error starting dryrun: {str(e)}")
sys.exit(1)


Expand All @@ -47,9 +55,12 @@ def get_dryrun_results(session, ip):
if result["state"] != "pending":
break
time.sleep(5)
logger.info(f"Dryrun results obtained from {ip}")
if result["state"] == "error":
logger.error(f"Dryrun failed. Error details: {result['errors']}")
return result
except requests.exceptions.RequestException as e:
click.secho(f"Error getting dryrun results: {str(e)}", fg="red")
logger.error(f"Error getting dryrun results: {str(e)}")
sys.exit(1)


Expand All @@ -58,36 +69,59 @@ def clear_dryrun_results(session, ip):
try:
response = session.delete(url, verify=False)
response.raise_for_status()
logger.info(f"Dryrun results cleared on {ip}")
except requests.exceptions.RequestException as e:
click.secho(f"Error clearing dryrun results: {str(e)}", fg="red")
logger.error(f"Error clearing dryrun results: {str(e)}")
sys.exit(1)


def get_running_config(session, host):
running_config_url = f"https://{host}/rest/v10.11/configs/running-config"
response = session.get(running_config_url, verify=False)
logger.info(f"Running config obtained from {host}")
return response.json()


def update_config(config, mgmt_int, subsystems):
config["System"]["mgmt_intf"] = mgmt_int
config["System"]["subsystems"] = subsystems
logger.info("Config updated with mgmt_intf and subsystems")
return config


def write_config_json(config, filename):
with open(filename, "w") as file:
json.dump(config, file, indent=4)
logger.info(f"Config written to {filename}")


def upload_config_startup(session, host, config_json, force):
if force:
configs_url = f"https://{host}/rest/v10.11/fullconfigs/running-config"
logger.info("Force flag enabled. Writing to running configuration.")
else:
configs_url = f"https://{host}/rest/v10.11/fullconfigs/startup-config"

def upload_config_startup(session, host, config_json):
configs_url = f"https://{host}/rest/v10.11/fullconfigs/startup-config"
headers = {"Content-Type": "application/json"}
data = config_json
response = session.put(
configs_url, json=data, headers=headers, verify=False, timeout=120
)
return response.status_code
response = None # Initialize response variable
try:
response = session.put(
configs_url, json=data, headers=headers, verify=False, timeout=60
)
response.raise_for_status()
if force:
logger.info(f"Config uploaded to running-config on {host}")
else:
logger.info(f"Config uploaded to startup-config on {host}")
return response.status_code, response
except requests.exceptions.RequestException as e:
logger.error(
f"Error uploading config to {'running-config' if force else 'startup-config'} on {host}: {str(e)}"
)
if response is not None:
logger.error(f"Response content: {response.text}")
return None, response


def show_checkpoint_diff(ip, username, password):
Expand All @@ -102,6 +136,7 @@ def show_checkpoint_diff(ip, username, password):
conn = Scrapli(**device)
conn.open()
diff_output = conn.send_command("checkpoint diff startup-config running-config")
logger.info(f"Checkpoint diff obtained from {ip}")
return diff_output.result


Expand All @@ -116,6 +151,7 @@ def copy_startup_running(ip, username, password):
conn = Scrapli(**device)
conn.open()
output = conn.send_command("copy startup-config running-config")
logger.info(f"Copied startup-config to running-config on {ip}")
return output.result


Expand All @@ -133,56 +169,202 @@ def copy_running_startup(ip, username, password):
return output.result


def set_checkpoint_timer(session, host, minutes):
url = f"https://{host}/rest/v10.11/configs/autocheckpoint"
headers = {"Content-Type": "application/json"}
data = {"minutes": minutes}
try:
response = session.post(url, headers=headers, json=data, verify=False)
response.raise_for_status()
logger.info(f"Checkpoint timer set to {minutes} minutes on {host}")
except requests.exceptions.RequestException as e:
if (
response.status_code == 400
and "Checkpoint auto timer is already ON" in response.text
):
logger.warning(
f"Checkpoint auto timer is already ON on {host}. Acknowledging..."
)
acknowledge_autocheckpoint(session, host)
logger.info(f"Acknowledged existing checkpoint on {host}")
set_checkpoint_timer(
session, host, minutes
) # Retry setting the checkpoint timer
else:
logger.error(f"Error setting checkpoint timer on {host}: {str(e)}")
logger.error(f"Response content: {response.text}")
raise


def acknowledge_autocheckpoint(session, host):
url = f"https://{host}/rest/v10.11/configs/autocheckpoint"
headers = {"Content-Type": "application/json"}
try:
response = session.put(url, headers=headers, verify=False)
response.raise_for_status()
logger.info(f"Acknowledged autocheckpoint on {host}")
except requests.exceptions.RequestException as e:
logger.error(f"Error acknowledging autocheckpoint on {host}: {str(e)}")
logger.error(f"Response content: {response.text}")
raise


def copy_running_to_checkpoint(session, host, checkpoint_name):
url = f"https://{host}/rest/v10.11/fullconfigs/{checkpoint_name}?from=%2Frest%2Fv10.11%2Ffullconfigs%2Frunning-config"
headers = {"accept": "*/*"}
try:
response = session.put(url, headers=headers, verify=False)
response.raise_for_status()
logger.info(
f"Copied running configuration to checkpoint '{checkpoint_name}' on {host}"
)
except requests.exceptions.RequestException as e:
if response.status_code == 400 and "already exists" in response.text:
logger.warning(
f"Checkpoint '{checkpoint_name}' already exists or a checkpoint with the same config on {host}. Skipping checkpoint creation."
)
else:
logger.error(
f"Error copying running configuration to checkpoint '{checkpoint_name}' on {host}: {str(e)}"
)
logger.error(f"Response content: {response.text}")
raise


def verify_connectivity(ip, username, password, timeout=120, interval=10):
start_time = time.time()
while time.time() - start_time < timeout:
try:
device = {
"host": ip,
"auth_username": username,
"auth_password": password,
"auth_strict_key": False,
"platform": "aruba_aoscx",
}
conn = Scrapli(**device)
conn.open()
logger.info(f"Successfully connected to {ip} after configuration change")
return True
except Exception as e:
logger.warning(f"Failed to connect to {ip}: {str(e)}")
time.sleep(interval)
logger.error(f"Failed to connect to {ip} within the specified timeout")
return False


def logout(session, ip):
url = f"https://{ip}/rest/v10.11/logout"
try:
response = session.post(url, verify=False)
response.raise_for_status()
logger.info(f"Logged out from {ip}")
except requests.exceptions.RequestException as e:
click.secho(f"Error during logout: {str(e)}", fg="red")
logger.error(f"Error during logout: {str(e)}")
sys.exit(1)


def main():
@click.command()
@click.option("--log", is_flag=True, help="Enable logging")
@click.option("--ip", prompt="Enter switch IP", help="Switch IP address")
@click.option("--username", prompt="Enter your username", help="Username")
@click.option("--password", prompt="Enter your password", help="Password")
@click.option(
"--config",
prompt="Enter path to the config file",
help="Path to the configuration file",
)
@click.option(
"--force",
is_flag=True,
help="Force write to running configuration. Needed for Aruba-CX virtual switch.",
)
def main(log, ip, username, password, config, force):
if not log:
logging.disable(logging.CRITICAL)

session = requests.Session()
ip = "10.102.193.4"
username = input("Enter your username: ")
password = input("Enter your password: ")
input_file = "/Users/lucasbates/canu-container/canu/canu/config/sw-leaf-bmc-001.cfg"
checkpoint_name = f"ac_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"
validated_config_json = (
f"{os.path.dirname(os.path.abspath(__file__))}/{ip}_{checkpoint_name}.json"
)
login(session, ip, username, password)
start_dryrun(session, ip, input_file)
result = get_dryrun_results(session, ip)
if result["state"] == "error":
click.secho("Dryrun failed.", fg="red")
# Handle error parsing here
else:
click.secho("Dryrun Succeeded", fg="green")
original_config = json.loads(result["configs"]["json"])
running_config = get_running_config(session, ip)
mgmt_int = running_config["System"]["mgmt_intf"]
subsystems = running_config["System"]["subsystems"]
updated_config = update_config(original_config, mgmt_int, subsystems)
write_config_json(updated_config, validated_config_json)
status_code = upload_config_startup(session, ip, updated_config)
if status_code != 200:
click.secho("Failed to upload the configuration.", fg="red")
raise Exception("Failed to upload the configuration.")
diff_output = show_checkpoint_diff(ip, username, password)
print(diff_output)
user_confirmation = input(
"Do you want to replace the running config with the uploaded config? (y/n): "
)
if user_confirmation.lower() == "y":
copy_startup_running(ip, username, password)
else:
# copy the running configuration back to the startup configuration
copy_running_startup(ip, username, password)
clear_dryrun_results(session, ip)
logout(session, ip)

try:
login_successful = False
try:
login(session, ip, username, password)
login_successful = True
start_dryrun(session, ip, config)
result = get_dryrun_results(session, ip)
if result["state"] == "error":
logger.error(f"Dryrun failed. Error details: {result['errors']}")
else:
logger.info("Dryrun Succeeded")
original_config = json.loads(result["configs"]["json"])
running_config = get_running_config(session, ip)
copy_running_to_checkpoint(session, ip, checkpoint_name)
mgmt_int = running_config["System"]["mgmt_intf"]
subsystems = running_config["System"]["subsystems"]
updated_config = update_config(original_config, mgmt_int, subsystems)
write_config_json(updated_config, validated_config_json)
set_checkpoint_timer(session, ip, 2)
status_code = None
try:
# Other code...
status_code = upload_config_startup(
session, ip, updated_config, force
)
# Other code...
except Exception as e:
logger.exception(
f"An error occurred during configuration upload: {str(e)}"
)
raise

# Move the block outside of the try-except block
if status_code is None:
logger.error(
"Uploaded configuration failed. Rolling back to the previous configuration."
)
if verify_connectivity(ip, username, password):
logger.info(
f"Roll Back Successful. Connection re-established with {ip}."
)
else:
logger.error(
f"Failed to re-establish connection with {ip}. Aborting configuration upload."
)
raise Exception("Failed to upload configuration")

if not force:
diff_output = show_checkpoint_diff(ip, username, password)
print(diff_output)
user_confirmation = input(
"Do you want to replace the running config with the uploaded config? (y/n): "
)
if user_confirmation.lower() == "y":
copy_startup_running(ip, username, password)
if not verify_connectivity(ip, username, password):
logger.error(
"Uploaded configuration failed. Rolling back to the previous configuration."
)
copy_running_startup(ip, username, password)
else:
# Copy the running configuration to a checkpoint
copy_running_to_checkpoint(session, ip, checkpoint_name)
else:
logger.info("Force flag enabled. Skipping user confirmation.")

# Acknowledge autocheckpoint at the end of the "Dryrun Succeeded" block
acknowledge_autocheckpoint(session, ip)

clear_dryrun_results(session, ip)
except Exception as e:
logger.exception(f"An error occurred: {str(e)}")
raise
finally:
if login_successful:
logout(session, ip)


if __name__ == "__main__":
Expand Down

0 comments on commit faa64be

Please sign in to comment.