diff --git a/scripts/simplestream-test b/scripts/simplestream-test new file mode 100755 index 000000000..010833cdb --- /dev/null +++ b/scripts/simplestream-test @@ -0,0 +1,600 @@ +#!/usr/bin/python +# +# This program is a test jig that does a one-shot run of a test. The +# test being performed can be changed by altering the contents of the +# 'test_json_text' variable declared below with suitable JSON. See +# the 'test-*' PDFs in the documentation directory for details on +# format. +# +# A number of the things being done here will be migrated into the +# REST API code +# +# +# THIS PROGRAM IS NOT INTENDED FOR USE IN PRODUCTION. +# +# + +import datetime +import json +import pscheduler +import requests +import socket +import time +import urlparse + +from dateutil.tz import tzlocal + +test_json_text = """ +{ + "type": "simplestream", + "spec": { + "schema": 1, + "receiver": "dev7", + "dawdle": "PT1S", + "test-material": "Madam, in Eden, I'm Adam.", + "timeout": "PT3S" + } +} +""" + + +# ----------------------------------------------------------------------------- +# CODE BEYOND THIS POINT CONTAINS NO USER-SERVICEABLE PARTS +# ----------------------------------------------------------------------------- + + +test_json = pscheduler.json_load(test_json_text) +test_type = test_json['type'] + + +def server_url(host, path = None, port = 29285): + return 'http://' \ + + host \ + + (':' + str(port) if port is not None else '') \ + + ('/' + path if path is not None else '') + + + +print +print "TEST SPECIFICATION:" +print + +print "JSON:" +print test_json_text.strip() +print +print "Formatted:" + +spec_json = pscheduler.json_dump(test_json['spec']) + +returncode, stdout, stderr = pscheduler.run_program( + [ "pscheduler", "internal", "invoke", "test", test_type, "spec-format" ], + stdin = spec_json + ) + +if returncode == 0: + print stdout.strip() +else: + pscheduler.fail("Failed to format test: " + stderr) + + + +# +# +# SPEC PART +# +# + + +# Figure out the participant list. This we do directly since the code +# will become part of the server-side code. + +returncode, stdout, stderr = pscheduler.run_program( + [ "pscheduler", "internal", "invoke", "test", test_type, "participants" ], + stdin = spec_json + ) + +try: + # TODO: Map to FQDN or leave as localhost? + participants = [ host if host is not None else socket.getfqdn() + for host in pscheduler.json_load(stdout) ] +except Exception as ex: + pscheduler.fail("Unable to load returned participant list: " + str(ex)) +nparticipants = len(participants) + + +print +print "TOOL SELECTION:" +print + +tools = [] + +# TODO: Get local out of the database. If we don't and there's only +# one server thread, this will break. +for participant in participants: + print "Getting tools from", participant + try: + r = requests.get(server_url(participant, "tools"), + params={ 'test': test_json_text }) + if r.status_code != 200: + raise Exception("Bad status code") + tools.append( pscheduler.json_load(str(r.text)) ) + except Exception as ex: + print "No response from", participant, ex + break + +if len(tools) != nparticipants: + pscheduler.fail("Didn't get a full set of tool responses") + + +def pick_tool(lists): + participants = len(lists) + """Count and score the number of times each tool appears in a list + of lists retrieved from servers, then return the name of the tool + that was preferred or None if there were none in common.""" + + # The count is used to determine whether or not a tool is supported + # by all participants. + + # The score is the sum of each tool's position in each list and is + # used to determine its overall preference. Like golf, the tool + # with the smallest score has the highest preference. + + # TODO: At some point, we'll have to account for minimum schema + # version supported, too. (Or will hosts that don't support it just + # bow out?) + + count = {} + score = {} + + for tool_list in lists: + for position in range(len(tool_list)): + + tool = tool_list[position]['name'] + + try: + count[tool] += 1 + except KeyError: + count[tool] = 1 + + try: + score[tool] += position + except KeyError: + score[tool] = position + + # Pick out the tools all lists have in common and their scores. + + common = {} + for tool in count: + if count[tool] == participants: + common[tool] = score[tool] + + # Nothing in common means no thing can be picked. + + if not len(common): + return None + + # Pick out the common tool with the lowest score. + ordered = sorted(common.items(), key=lambda value: value[1]) + return ordered[0][0] + + +tool = pick_tool(tools) +if tool is None: + pscheduler.fail("Couldn't find a tool in common") +print "Selected tool", tool + + +# +# +# TASK PART +# +# + +print +print "TASK CREATION:" +print + +# TODO: The tool needs to be an array. +task = { + 'schema': 1, + 'test': test_json, + 'tool': tool, + 'schedule': { + 'start': 'PT2S', + 'slip': 'PT15S', + 'repeat': 'PT30S', + 'max_runs': 50 + }, + 'archives': [ + { + 'name': 'syslog', + 'data': { 'ident': 'log1' } + }, + { + 'name': 'failer', + 'data': {} + } + ] +} + + +task_data = pscheduler.json_dump(task) + +tasks_posted = [] + +# TODO: Task lead out of the database. If we don't and there's only +# one server thread, this will break. + +# Lead assigns the UUID. +try: + print "Tasking lead participant on", participants[0] + r = requests.post(server_url(participants[0], 'tasks'), + data=task_data) + if r.status_code != 200: + raise Exception("Bad status code: " + r.text) + print " ", r.text + tasks_posted.append(r.text) + lead_url = r.text + task_uuid = urlparse.urlparse(lead_url).path.split('/')[-1] + print " Task URL is", lead_url + print " Task ID is", task_uuid +except Exception as ex: + pscheduler.fail("Error from %s: %s" % (participants[0], ex)) + + +# Other participants get the UUID forced upon them. + + +for participant in range(1,nparticipants): + part_name = participants[participant] + print "Tasking participant %d on %s" % (participant, part_name) + try: + r = requests.post(server_url(part_name, 'tasks/' + task_uuid), + params={ 'participant': participant }, + data=task_data) + if r.status_code != 200: + raise Exception("Bad status code %d: %s", r.status_code, r.text) + print " ", r.text + tasks_posted.append(r.text) + except Exception as ex: + print "Error from %d: %s" % (participant, ex) + break + +# If we failed on any postings, delete the prior ones +if len(tasks_posted) < nparticipants: + print "Removing already-posted tasks:" + for url in tasks_posted: + print " ", url + r = requests.delete(url) + pscheduler.fail("Unable to continue.") + + + +print +print "SCHEDULING:" +print + +# TODO: Figure out the next desired time to run it. +# For now, just punt five seconds. + +print "Time now is ", datetime.datetime.now(tzlocal()) + +# TODO: Get this directly form the database if the lead is local? +r = requests.get(lead_url, params={ 'detail': 1 }) +if r.status_code != 200: + raise Exception("Bad status code") +full_task = pscheduler.json_load(r.text) + +# Calculate the run time range based on the start, then ask the +# servers for what ranges of time they have available. + +run_range_start = pscheduler.iso8601_as_datetime(full_task['detail']['start']) +task_duration = pscheduler.iso8601_as_timedelta(full_task['detail']['duration']) +task_slip = pscheduler.iso8601_as_timedelta(full_task['detail']['slip']) + +run_range_end = run_range_start + task_duration + task_slip + +print "Earliest start ", run_range_start +print "Latest end ", run_range_end +print "Duration ", task_duration + +range_params = { + 'start': pscheduler.datetime_as_iso8601(run_range_start), + 'end': pscheduler.datetime_as_iso8601(run_range_end) +} + +range_set = [] + +for task_url in tasks_posted: + print "Run times from", task_url + r = requests.get(task_url + '/runtimes', params=range_params) + if r.status_code != 200: + raise Exception("Bad status code %d: %s" % (r.status_code, r.text)) + json_ranges = pscheduler.json_load(r.text) + print " ", json_ranges + # TODO: Assert: json_ranges should be an array? + if len(json_ranges) == 0: + pscheduler.fail("Not all servers have a time slot available.") + range_set.append( [ (pscheduler.iso8601_as_datetime(item['lower']), + pscheduler.iso8601_as_datetime(item['upper'])) + for item in json_ranges ] ) + + +def coalesce_ranges(ranges, duration): + """ + Find the earliest mutually-agreeable time from lists provided by + participants. + + The 'ranges' argument is a list of ranges is an array, with one + element for each participant. Each of those elements is a sorted + list of tuples representing available time ranges. Each tuple is + (lower-time, upper-time). + + The 'duration' argument is the minimum length a set of times found + to be common must be to qualify. + + Return value is a tuple of (lower-time, upper-time) if there was a + mutually-agreeable range found, None otherwise. + + NOTES: + + This function makes no attempt to make sure the ranges + are lower-upper, so the GIGO principle applies. + + This function was written to be used with ranges of datetimes but + will work with anything that can be compared for sorting. + """ + + participants = len(ranges) + + if participants < 2: + # TODO: What's a good exception to raise here? + raise ValueError("Must have at least two items") + + matches = [] # Ranges we came up with that matched + + for outer in ranges[0]: + (earliest, latest) = outer + + # Weed out too-short lead proposals + if (latest - earliest) < duration: + continue + + matches = [ outer ] # Ranges in non-leads that work + + # Go through each of the other participants' proposed ranges + + for inner in range(1,participants): + + for candidate in ranges[inner]: + + (lower, upper) = candidate + + # Skip ranges that clearly aren't going to work: + # - Non-overlapping + # - Too short + if upper < earliest or lower > latest \ + or (upper - lower) < duration: + continue + + # Trim the candidate range to just the overlap and add + # its tuple to our list of good ones. + matches.append( (max(earliest, lower), min(upper, latest)) ) + + # A full set means we found a good match and can stop. + if len(matches) == participants: + break + + # If the loop above didn't leave us with a ranges from every + # participant, there isn't one. + if len(matches) < participants: + return None + + # Trim everything to the smallest mutually-acceptable range + (final_lower, final_upper) = matches.pop(0) + for match in matches: + (match_lower, match_upper) = match + final_lower = max(final_lower, match_lower) + final_upper = min(final_upper, match_upper) + + assert final_lower <= final_upper, "Came up with a bogus range." + assert (final_upper - final_lower) >= duration, "Duration is too short." + + return (final_lower, final_upper) + + +# TODO: Tester for coalesce_ranges() +#print coalesce_ranges( [ +# [ (1, 7), (9, 12) ], +## [ (1, 2), (3, 4) ], +# [ (1, 2), (3, 8) ], +## [ (2, 9), (4, 6) ], +## [ (1, 2), (37, 50), (100, 203) ], +# ], 3) + +schedule_range = coalesce_ranges( range_set, task_duration ) +if schedule_range is None: + pscheduler.fail("No mutually-agreeable time to run this task.") + +(schedule_lower, schedule_upper) = schedule_range + +# TODO: If there's random slip and there's room to apply it, do so. +schedule_upper = schedule_lower + task_duration + +print "Scheduling for", schedule_lower, "to", schedule_upper + + +print +print "POSTING RUNS:" +print + +run_params = { 'start': schedule_lower.isoformat() } +run_path = None +runs_posted = [] + + +for participant in range(0, len(tasks_posted)): + + if run_path is None: + + # First one is the lead. Post it and get the UUID. + + r = requests.post( tasks_posted[participant] + '/runs', + params=run_params ) + if r.status_code != 200: + raise Exception("Bad status code: " + r.text) + + run_path = '/runs/' + urlparse.urlparse(r.text).path.split('/')[-1] + + else: + + r = requests.post( tasks_posted[participant] + run_path, + params=run_params ) + if r.status_code != 200: + raise Exception("Bad status code: " + r.text) + + runs_posted.append(r.text) + + print "Participant %d: %s" % (participant, r.text) + + +if len(runs_posted) != nparticipants: + # TODO: Delete the runs we did post. + pscheduler.fail("Didn't successfully post all runs") + +# TODO: Stash the posted runs in the database. + + + + +print +print "FETCHING AND DISTRIBUTING PARTICIPANT DATA:" +print + +part_data = [] + +for run in runs_posted: + + r = requests.get(run) + if r.status_code != 200: + raise Exception("Bad status code: " + r.text) + # TODO: Handle lack of participant data + + print "Fetched run", run + part_data.append( pscheduler.json_load(str(r.text))['participant-data'] ) + + +print "Combined participant data:", part_data + +full_data = { + 'run': pscheduler.json_dump({ 'part-data-full': part_data }) + } + +for run in runs_posted: + + print "Updating", run + r = requests.put(run, params=full_data) + if r.status_code != 200: + raise Exception("Bad status code: " + r.text) + + + +# +# +# +# POST-RUN +# +# +# + + +print +print "WAITING FOR RUN:" +print + +sleeptime = ( schedule_upper - datetime.datetime.now(tzlocal()) ) \ + + datetime.timedelta(seconds=1) + +print "Sleeping for", sleeptime +time.sleep(pscheduler.timedelta_as_seconds(sleeptime)) + +### The code below has been folded into the runner. + +### print +### print "FETCHING RESULTS:" +### print +### +### full_result = [] +### +### for run in runs_posted: +### r = requests.get(run) +### if r.status_code != 200: +### raise Exception("Bad status code: " + r.text) +### try: +### full_result.append(pscheduler.json_load(str(r.text))['result-local']) +### except Exception as ex: +### pscheduler.fail("Unable to load result: %s" % str(ex)) +### +### print "Full result: ", pscheduler.json_dump(full_result) +### +### +### print +### print "STORING FULL RESULTS:" +### print +### +### full_params = { +### 'run': pscheduler.json_dump({ 'result-full' : full_result }) +### } +### for run in runs_posted: +### print "Storing in", run +### r = requests.put(run, params=full_params) +### if r.status_code != 200: +### raise Exception("Bad status code: " + r.text) + + +print +print "FINAL FULL RESULT:" +print + +r = requests.get(runs_posted[0]) +if r.status_code != 200: + raise Exception("Bad status code: " + r.text) + +merged_full_fetched = pscheduler.json_load(r.text) + +print "JSON:" +print json.dumps(merged_full_fetched['result-merged'], \ + sort_keys=True, \ + indent=4, \ + separators=(',', ': ') \ + ) + '\n' + + +for format in [ 'text/plain', 'text/html' ]: + print + print "Formatted %s:" % format + + returncode, stdout, stderr = pscheduler.run_program( + [ "pscheduler", "internal", "invoke", "test", test_type, "result-format", format ], + stdin = pscheduler.json_dump(merged_full_fetched['result-merged']) + ) + + if returncode == 0: + print stdout.strip() + else: + pscheduler.fail("Failed to format test: " + stderr) + + + +# +# Clean Up +# +# TODO: Re-enable prints +if False: + #print "Removing posted tasks:" + for url in tasks_posted: + # print " ", url + r = requests.delete(url) + if r.status_code != 200: + #print " WARNING: Failed %d: %s" % (r.status_code, r.text) + pass