Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deal with invalid sessions for multiple simultaneous downloads #6

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 82 additions & 36 deletions eos_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import re
import time
import hashlib
from random import randint

# part of progress bar code
def viewBar(a,b):
Expand Down Expand Up @@ -143,6 +144,29 @@ def md5(fname):
return hash_md5.hexdigest()


def getSession(accessToken, session_code_url="https://www.arista.com/custom_data/api/cvp/getSessionCode/"):
jsonpost = {'accessToken': accessToken}
result = requests.post(session_code_url, data=json.dumps(jsonpost))
if result.json()["status"]["message"] == 'Access token expired':
print("The API token has expired. Please visit arista.com, click on your profile and select Regenerate Token then re-run the script with the new token.")
sys.exit()
elif result.json()["status"]["message"] == 'Invalid access token':
print("The API token is incorrect. Please visit arista.com, click on your profile and check the Access Token. Then re-run the script with the correct token.")
sys.exit()
session_code = (result.json()["data"]["session_code"])
return(session_code)

def jsonPost(url, token, data={}):
data['sessionCode'] = getSession(accessToken=creds)
result = requests.post(url, json.dumps(data))
i=0
while result.status_code != 200:
i +=1
print("Received HTTP status %s. Refreshing session and retrying (%s)." %(str(result.status_code), str(i)))
data['sessionCode'] = getSession(accessToken=creds)
result = requests.post(url, data)
return(result)

# use argparse to take the user input, can fill in default values here if the user wishes
# especially useful for the API key which won't change for a particular user
warnings.filterwarnings("ignore")
Expand Down Expand Up @@ -184,23 +208,15 @@ def md5(fname):
# the api key needs converting into base64 which outputs a byte value and then decoding to a string
creds = (base64.b64encode(api.encode())).decode("utf-8")

# there are 3 steps to downloading an image via the API, first is to get a session code
session_code_url = "https://www.arista.com/custom_data/api/cvp/getSessionCode/"
jsonpost = {'accessToken': creds}
result = requests.post(session_code_url, data=json.dumps(jsonpost))
if result.json()["status"]["message"] == 'Access token expired':
print("The API token has expired. Please visit arista.com, click on your profile and select Regenerate Token then re-run the script with the new token.")
sys.exit()
elif result.json()["status"]["message"] == 'Invalid access token':
print("The API token is incorrect. Please visit arista.com, click on your profile and check the Access Token. Then re-run the script with the correct token.")
sys.exit()
session_code = (result.json()["data"]["session_code"])

# then get the current folder tree, similar to what you see on the download page in XML format
# Get the current folder tree, similar to what you see on the download page in XML format
folder_tree_url = "https://www.arista.com/custom_data/api/cvp/getFolderTree/"
jsonpost = {'sessionCode': session_code}
result = requests.post(folder_tree_url, data=json.dumps(jsonpost))
folder_tree = (result.json()["data"]["xml"])
result = jsonPost(url=folder_tree_url, token=creds)
try:
folder_tree = (result.json()["data"]["xml"])
except:
time.sleep(randint(1,10))
result = jsonPost(url=folder_tree_url, token=creds)
folder_tree = (result.json()["data"]["xml"])

root = ET.fromstring(folder_tree)
path = ""
Expand All @@ -216,7 +232,7 @@ def md5(fname):
eos_filename = "EOS-2GB-" + image + "-INT.swi" # filename should be something like EOS-4.22.1F-INT.swi
image = image.rstrip("-INT") # image should be 4.22.1F, need to remove the -INT
elif "TerminAttr" in image: # if the user wants a TerminAttr image
z = 3 # corresponds to "CloudVision" top level folder
z = 2 # corresponds to "CloudVision" top level folder
eos_filename = image + "-1.swix" # filename should be something like TerminAttr-1.7.4-1.swix
elif "ipam" in img: # if the user wants a CVP IPAM image
z = 2 # corresponds to "CloudVision" top level folder
Expand Down Expand Up @@ -328,32 +344,52 @@ def md5(fname):
sys.exit()
# the 3rd part of downloading a file is to use the path and session code to get the actual direct download link URL
download_link_url = "https://www.arista.com/custom_data/api/cvp/getDownloadLink/"
jsonpost = {'sessionCode': session_code, 'filePath': path}
result = requests.post(download_link_url, data=json.dumps(jsonpost))
download_link = (result.json()["data"]["url"])
jsonpost = {'filePath': path}
result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
try:
download_link = (result.json()["data"]["url"])
except:
time.sleep(randint(1,10))
result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
download_link = (result.json()["data"]["url"])


print(eos_filename + " is currently downloading....")
# download the file to the current folder
download_file (download_link, eos_filename)
if img == "ipam": # for CVP IPAM there's 2 files to download so this grabs the 2nd file
jsonpost = {'sessionCode': session_code, 'filePath': path2}
result = requests.post(download_link_url, data=json.dumps(jsonpost))
download_link = (result.json()["data"]["url"])
jsonpost = {'filePath': path2}
result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
try:
download_link = (result.json()["data"]["url"])
except:
time.sleep(randint(1,10))
result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
download_link = (result.json()["data"]["url"])
print(ipam_filename + " is currently downloading....")
download_file(download_link, ipam_filename)
elif img == "cloudbuilder": # for CVP CloudBuilder there's 2 files to download so this grabs the 2nd file
jsonpost = {'sessionCode': session_code, 'filePath': path2}
result = requests.post(download_link_url, data=json.dumps(jsonpost))
download_link = (result.json()["data"]["url"])
jsonpost = {'filePath': path2}
result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
try:
download_link = (result.json()["data"]["url"])
except:
time.sleep(randint(1,10))
result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
download_link = (result.json()["data"]["url"])
print(cb_filename + " is currently downloading....")
download_file(download_link, cb_filename)


if (img != 'source') and (img != 'RN'):
jsonpost = {'sessionCode': session_code, 'filePath': sha512_path}
sha512_result = requests.post(download_link_url, data=json.dumps(jsonpost))
sha512_download_link = (sha512_result.json()["data"]["url"])
jsonpost = {'filePath': sha512_path}
sha512_result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
try:
sha512_download_link = (sha512_result.json()["data"]["url"])
except:
time.sleep(randint(1,10))
sha512_result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
sha512_download_link = (sha512_result.json()["data"]["url"])
if "TerminAttr" in image:
download_file (sha512_download_link, eos_filename + '.md5sum')
if "cvp" in image:
Expand All @@ -364,17 +400,27 @@ def md5(fname):
sha512_file = line

if img == "ipam":
jsonpost = {'sessionCode': session_code, 'filePath': sha512_path2}
sha512_result = requests.post(download_link_url, data=json.dumps(jsonpost))
sha512_download_link = (sha512_result.json()["data"]["url"])
jsonpost = {'filePath': sha512_path2}
sha512_result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
try:
sha512_download_link = (sha512_result.json()["data"]["url"])
except:
time.sleep(randint(1,10))
sha512_result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
sha512_download_link = (sha512_result.json()["data"]["url"])
download_file (sha512_download_link, ipam_filename + '.sha512sum')
for line in urllib.request.urlopen(sha512_download_link):
sha512_file2 = line

if img == "cloudbuilder":
jsonpost = {'sessionCode': session_code, 'filePath': sha512_path2}
sha512_result = requests.post(download_link_url, data=json.dumps(jsonpost))
sha512_download_link = (sha512_result.json()["data"]["url"])
jsonpost = {'filePath': sha512_path2}
sha512_result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
try:
sha512_download_link = (sha512_result.json()["data"]["url"])
except:
time.sleep(randint(1,10))
sha512_result = jsonPost(url=download_link_url, data=jsonpost, token=creds)
sha512_download_link = (sha512_result.json()["data"]["url"])
download_file (sha512_download_link, cb_filename + '.sha512sum')
for line in urllib.request.urlopen(sha512_download_link):
sha512_file2 = line
Expand Down Expand Up @@ -417,7 +463,7 @@ def md5(fname):
eos_filename = filename
eos_bundle = image
elif "TerminAttr" in image:
z = 3
z = 2
filename = image + "-1.swix"
terminattr_filename = filename
else:
Expand Down