From ddfcda07ae1c79e7172c4e99179573aee58947c2 Mon Sep 17 00:00:00 2001 From: jaredb7 Date: Tue, 6 Jun 2023 20:00:55 +1000 Subject: [PATCH 1/2] Threaded BirdWeather submissions with retry support Run BirdWeather submissions in a separate thread to run in the background it doesn't hold up or slow down processing audio and detection's. Instead of creating a new thread to process each Birdweather submission as they occur, add the data used for the Birdweather submission to a queue. Then use a single thread running birdweather_submission_processor in a endless loop to process Birdweather submissions as they are added to that queue. This avoids accidentally sending too many Birdweather submission at once, so we're only ever sending 1 submission at any one time. Add timeout values to the requests as they will run indefinitely if no timeout is specified Further, the ability to retry requests was implemented but only for connection timeouts and specific responses codes that can be retried on. A exponential backoff delay is calculated before retrying again to provide adequate delay between requests and ease server pressure --- scripts/server.py | 265 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) diff --git a/scripts/server.py b/scripts/server.py index 4e2953eff..88506ac2c 100755 --- a/scripts/server.py +++ b/scripts/server.py @@ -1,4 +1,6 @@ +import queue import re +import sys from pathlib import Path from tzlocal import get_localzone import datetime @@ -42,6 +44,26 @@ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +# List of our BirdWeather submission threads +bw_worker_threads = list() +bw_submission_queue = queue.Queue() +# BirdWeather Soundscape ID caching +bw_soundscape_submission_id_cache = list() +bw_soundscape_submission_id_cache_limit = 40 +# +# Retry on these response codes +bw_request_retry_on_status = [404, 429, 500, 502, 503, 504] +bw_default_request_timeout = 10 +bw_default_post_timeout = 6 * bw_default_request_timeout +# Stop processing once we hit the max number of tries, API might be down so limit the retries, together with the +# bw_default_request_timeout the total retry period is 60 seconds without taking the backoff time between requests into account +bw_request_max_retries = 6 +# Used in an exponential calculation to provide the number of seconds to wait before making the request again +bw_request_backoff_factor = 3.5 + +# DEBUG flag to enable debug output in select functions +debug_birdweather_submissions = False + try: server.bind(ADDR) except BaseException: @@ -302,6 +324,7 @@ def analyzeAudioData(chunks, lat, lon, week, sensitivity, overlap,): for x in range(len(p)): if "Human" in p[x][0]: HUMAN_DETECTED = True + break # Save result and timestamp pred_end = pred_start + 3.0 @@ -618,10 +641,252 @@ def handle_client(conn, addr): conn.close() +def birdweather_submit(bw_submission_data): + soundscape_id = None + # + extra_debug_output = '' + + # Grab the URL and sound data form the supplied dictionary + # Soundscape POST data + soundscape_url = bw_submission_data.pop('soundscape_url') + wave_sound_data = bw_submission_data.pop('gzip_wav_data') + soundscape_filename = bw_submission_data.pop('soundscape_filename') + # Detection post data + detection_url = bw_submission_data.pop('detection_url') + detection_post_json = bw_submission_data.pop('detection_post_json') + + if debug_birdweather_submissions: + print(f'BirdWeather Submission:: DEBUG:: URL: {soundscape_url}', flush=True) + + ################################## + # SOUNDSCAPE UPLOAD ############# + ################################## + # Loop for the max number of retries + for ss_p_rt in range(bw_request_max_retries): + extra_debug_output = '' + # Don't calculate or sleep on the first loop as this is the initial attempt + if ss_p_rt > 0: + # Calculate the backoff time before making another request + request_backoff_time = bw_request_backoff_factor * (2 ** (ss_p_rt - 1)) + # We're retrying, retry after the calculated backoff time + print(f'BirdWeather Submission Error:: Retrying after {request_backoff_time}s, Retry ({ss_p_rt} of {bw_request_max_retries})', flush=True) + time.sleep(request_backoff_time) + + try: + # First see if we've already submitted this wave file and get the soundscape id for it + find_existing_soundscape = birdweather_soundscape_id_cache('search', soundscape_filename) + + # Didn't find a soundscape id for the file we're processing, upload it to Birdweather and cache it's soundscape id + if not find_existing_soundscape['soundscape_found']: + if debug_birdweather_submissions: + print(f'BirdWeather Submission:: Did not find soundscape {soundscape_filename} in cache, Posting soundscape to BirdWeather', flush=True) + + # Submit the soundscape submission + soundscape_async_response = requests.post(url=soundscape_url, data=wave_sound_data, + headers={'Content-Type': 'application/octet-stream', + 'Content-Encoding': 'gzip'}, + timeout=bw_default_post_timeout) + # Raise a error if response is not 2XX + soundscape_async_response.raise_for_status() + + # Spit out the whole dict response if debugging + if debug_birdweather_submissions: + print(f'BirdWeather Submission:: DEBUG:: Soundscape POST - RESPONSE: {soundscape_async_response}', flush=True) + + # Extract some data + soundscape_response_json = soundscape_async_response.json() + # Get the soundscape id for the soundscape uploaded + soundscape_id = soundscape_response_json['soundscape']['id'] + + # Cache the soundscape id for corresponding to the filename the wave uploaded + birdweather_soundscape_id_cache('add', soundscape_filename, soundscape_id) + + print(f"BirdWeather Submission:: Soundscape Successfully Uploaded - status:{soundscape_async_response.status_code} soundscape_id:{soundscape_id}", flush=True) + else: + # We found the soundscape filename and the Birdweather Soundscape ID for it, soundscape considered uploaded & use the ID for the detection + soundscape_id = find_existing_soundscape['soundscape_id'] + if debug_birdweather_submissions: + print(f"BirdWeather Submission:: Found Existing Soundscape in cache for {soundscape_filename} - using soundscape_id:{soundscape_id}", flush=True) + + # Break the loop, if we reach here then were no exceptions and the detection posted successfully + break + except (requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as conn_exec: + print(f"BirdWeather Submission Error:: Soundscape POST - Connection Error! - {conn_exec}", flush=True) + continue + except requests.exceptions.RequestException as request_exc: + # Check if the status code is one that we can try on + if request_exc.response.status_code in bw_request_retry_on_status: + + if debug_birdweather_submissions: + extra_debug_output = f' - {request_exc.response.reason} - {request_exc}' + + print(f"BirdWeather Submission Error:: Soundscape POST - HTTP Request Exception! {extra_debug_output}", flush=True) + continue + else: + print(f"BirdWeather Submission Error:: Soundscape POST - HTTP Request Exception! Cannot retry - {request_exc}", flush=True) + # break the loop on non-retryable status + break + except (requests.exceptions.JSONDecodeError, requests.exceptions.InvalidJSONError) as json_error_exec: + print(f'BirdWeather Submission Error:: Soundscape POST - Something went wrong decoding JSON data - {json_error_exec}', flush=True) + except BaseException as ss_ex: + print(f'BirdWeather Submission Error:: Soundscape POST - Something went wrong - {ss_ex}', flush=True) + + ################################## + # DETECTION UPLOAD ############# + ################################## + # Loop for the max number of retries + for detect_p_rt in range(bw_request_max_retries): + extra_debug_output = '' + # Don't calculate or sleep on the first loop as this is the initial attempt + if detect_p_rt > 0: + # Calculate the backoff time before making another request + request_backoff_time = bw_request_backoff_factor * (2 ** (detect_p_rt - 1)) + # We're retrying, retry after the calculated backoff time + print(f'BirdWeather Submission Error:: Retrying after {request_backoff_time}s, Retry ({detect_p_rt} of {bw_request_max_retries})', flush=True) + time.sleep(request_backoff_time) + + # We need to substitute in the soundscape_id into the detection_post_json data, since it the ID isn't available until the soundscape is uploaded + # and because we submitted the full json data with a placeholder set for the soundscape_id + detection_post_json = detection_post_json.replace("{{soundscape_id}}", str(soundscape_id)) + + # Some debugging output if needed + if debug_birdweather_submissions: + print(f'BirdWeather Submission:: DEBUG:: Detection POST - detection_url: {detection_url} - detection_json: {detection_post_json} - Soundscape_ID: {soundscape_id}', flush=True) + + # Submit the detection + try: + # POST detection to server + detection_async_response = requests.post(detection_url, + json=json.loads(detection_post_json), + timeout=bw_default_request_timeout) + # Raise a error if response is not 2XX + detection_async_response.raise_for_status() + + # Spit out the whole dict response if debugging + if debug_birdweather_submissions: + print(f'BirdWeather Submission:: DEBUG:: Detection POST - RESPONSE: {detection_async_response}', flush=True) + + # Extract some data + detection_response_status_json = detection_async_response.json() + + # Check the response + # Extract the bird detection info to display in the output + bird_detection_string = "N/A" + if 'detection' in detection_response_status_json: + bird_detection_name = detection_response_status_json['detection']['species']['commonName'] + bird_detection_confidence = detection_response_status_json['detection']['confidence'] + bird_detection_timestamp = datetime.datetime.fromisoformat( + detection_response_status_json['detection']['timestamp']) + bird_detection_time = bird_detection_timestamp.time() + bird_detection_string = f"- {bird_detection_time}/{bird_detection_name}/{bird_detection_confidence}" + + if debug_birdweather_submissions: + # Add in the JSON response if debugging just in case we might want to view it + extra_debug_output = f'- json:{detection_response_status_json}' + + print(f"BirdWeather Submission:: Detection Successfully Uploaded - status:{detection_async_response.status_code} {bird_detection_string} {extra_debug_output}", flush=True) + + # Break the loop, if we reach here then were no exceptions and the detection posted successfully + break + except (requests.exceptions.ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout) as conn_exec: + print(f"BirdWeather Submission Error:: Detection POST - Connection Error! - {conn_exec}", flush=True) + continue + except requests.exceptions.RequestException as request_exc: + # Check if the status code is one that we can try on + req_status_code = request_exc.response.status_code + if req_status_code in bw_request_retry_on_status: + + if debug_birdweather_submissions: + extra_debug_output = f'- {request_exc.response.reason} - {request_exc}' + + print(f"BirdWeather Submission Error:: Detection POST - HTTP Request Exception! {extra_debug_output}", flush=True) + continue + else: + print( + f"BirdWeather Submission Error:: Detection POST - HTTP Request Exception! Cannot retry - {request_exc}", + flush=True) + # break the loop on non-retryable status + break + except (requests.exceptions.JSONDecodeError, requests.exceptions.InvalidJSONError) as json_error_exec: + print( + f'BirdWeather Submission Error:: Detection POST - Something went wrong decoding JSON data - {json_error_exec}', + flush=True) + except BaseException as dp_ex: + print(f'BirdWeather Submission Error:: Detection POST - Something went wrong - {dp_ex}', flush=True) + + +def birdweather_submission_processor(): + if debug_birdweather_submissions: + print("Starting: birdweather_submission_processor thread") + + # Loop over the queue containing the data used for the BirdWeather submissions + while True: + # Get the BirdWeather submission data, this is a dictionary containing the necessary data + bw_submission_data = bw_submission_queue.get() + + if debug_birdweather_submissions: + print( + f"Processing: Soundscape:{bw_submission_data['soundscape_url']}, Detection:{bw_submission_data['detection_url']}, Detection_Data:{bw_submission_data['detection_post_json']}") + + # Perform the submission + birdweather_submit(bw_submission_data) + # Processing finished so the task is now done + bw_submission_queue.task_done() + + +def birdweather_soundscape_id_cache(mode, soundscape_filename, bw_soundscape_id=None): + global bw_soundscape_submission_id_cache + ss_id_was_found = False + ss_id_to_return = 0 + + if debug_birdweather_submissions: + print(f"birdweather_soundscape_id_cache - mode:{mode} - for {soundscape_filename}") + + if mode == 'search': + # Search the list for the filename + for soundscape_submission in bw_soundscape_submission_id_cache: + this_ss_filename = soundscape_submission['soundscape_filename'] + this_ss_id = soundscape_submission['soundscape_id'] + # If this soundscape filename matches the one we're searching for, return the bw soundscape id + if this_ss_filename == soundscape_filename: + ss_id_was_found = True + ss_id_to_return = this_ss_id + if debug_birdweather_submissions: + print( + f"birdweather_soundscape_id_cache - Found {soundscape_filename} with soundscape_id:{ss_id_to_return}") + break + + return {'soundscape_found': ss_id_was_found, 'soundscape_id': ss_id_to_return} + elif mode == 'add': + # If a filename AND soundscape ID has been supplied then we want to store it in the list + if soundscape_filename is not None and bw_soundscape_id is not None: + # Check the length of the list first + if len(bw_soundscape_submission_id_cache) >= bw_soundscape_submission_id_cache_limit: + # Remove the first item in the list before inserting a new item + del bw_soundscape_submission_id_cache[0] + + # Create a new dict containing the appropriate data + new_ss_submission = dict() + new_ss_submission['soundscape_filename'] = soundscape_filename + new_ss_submission['soundscape_id'] = bw_soundscape_id + # Append it to the list + bw_soundscape_submission_id_cache.append(new_ss_submission) + + if debug_birdweather_submissions: + print( + f"birdweather_soundscape_id_cache - Inserting entry soundscape_filename:{soundscape_filename} - soundscape_id:{bw_soundscape_id}") + + def start(): # Load model global INTERPRETER, INCLUDE_LIST, EXCLUDE_LIST INTERPRETER = loadModel() + + # Run the BirdWeather submission queue processor in a thread + bw_submission = threading.Thread(target=birdweather_submission_processor) + bw_worker_threads.append(bw_submission) + bw_submission.start() + server.listen() # print(f"[LISTENING] Server is listening on {SERVER}") while True: From a68a3508339a8634efdcdf2977d189a85af22e33 Mon Sep 17 00:00:00 2001 From: jaredb7 Date: Fri, 15 Sep 2023 11:22:48 +1000 Subject: [PATCH 2/2] Updated to include changes in commit #479bd27 & BW improvement added --- scripts/server.py | 98 +++++++++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 37 deletions(-) diff --git a/scripts/server.py b/scripts/server.py index 88506ac2c..247324434 100755 --- a/scripts/server.py +++ b/scripts/server.py @@ -585,55 +585,79 @@ def handle_client(conn, addr): if birdweather_id != "99999": try: - if soundscape_uploaded is False: - # POST soundscape to server - soundscape_url = 'https://app.birdweather.com/api/v1/stations/' + \ - birdweather_id + \ - '/soundscapes' + \ - '?timestamp=' + \ - current_iso8601 - - with open(args.i, 'rb') as f: - wav_data = f.read() - gzip_wav_data = gzip.compress(wav_data) - response = requests.post(url=soundscape_url, data=gzip_wav_data, headers={'Content-Type': 'application/octet-stream', - 'Content-Encoding': 'gzip'}) - print("Soundscape POST Response Status - ", response.status_code) - sdata = response.json() - soundscape_id = sdata['soundscape']['id'] - soundscape_uploaded = True + # BirdWeather submissions are now added to a queue and processed in a seperate thread (via birdweather_submission_processor and birdweather_submit) + # We still collect and compile all the URLS and JSON to pass through + + # POST soundscape to server + # Build the URL used when uploading the soundscape + soundscape_url = 'https://app.birdweather.com/api/v1/stations/' + \ + birdweather_id + \ + '/soundscapes' + \ + '?timestamp=' + \ + current_iso8601 + + # Get the filename for this soundscape file + soundscape_filename = Path(args.i).stem + + # Extract the audio + with open(args.i, 'rb') as f: + wav_data = f.read() + + # Compress the audio data using gzip + gzip_wav_data = gzip.compress(wav_data) # POST detection to server + # Build the URL and all other data (to be added to a json array used when uploading the detection data detection_url = "https://app.birdweather.com/api/v1/stations/" + birdweather_id + "/detections" start_time = d.split(';')[0] end_time = d.split(';')[1] - post_begin = "{ " + # now_p_start = now + datetime.timedelta(seconds=float(start_time)) current_iso8601 = now_p_start.astimezone(get_localzone()).isoformat() - post_timestamp = "\"timestamp\": \"" + current_iso8601 + "\"," - post_lat = "\"lat\": " + str(args.lat) + "," - post_lon = "\"lon\": " + str(args.lon) + "," - post_soundscape_id = "\"soundscapeId\": " + str(soundscape_id) + "," - post_soundscape_start_time = "\"soundscapeStartTime\": " + start_time + "," - post_soundscape_end_time = "\"soundscapeEndTime\": " + end_time + "," - post_commonName = "\"commonName\": \"" + entry[0].split('_')[1].split("/")[0] + "\"," - post_scientificName = "\"scientificName\": \"" + entry[0].split('_')[0] + "\"," + # Build a dictionary that represents the JSON data that will be posted to BirdWeather + # Instead of hand crafting the JSON string, just to ease maintainability + post_data = dict() + post_data['timestamp'] = current_iso8601 + post_data['lat'] = str(args.lat) + post_data['lon'] = str(args.lon) + post_data['soundscapeId'] = "{{soundscape_id}}" + post_data['soundscapeStartTime'] = start_time + post_data['soundscapeEndTime'] = end_time + post_data['commonName'] = entry[0].split('_')[1].split("/")[0] + post_data['scientificName'] = entry[0].split('_')[0] + + # Determine the algorithm used if model == "BirdNET_GLOBAL_6K_V2.4_Model_FP16": - post_algorithm = "\"algorithm\": " + "\"2p4\"" + "," + post_data['algorithm'] = "2p4" else: - post_algorithm = "\"algorithm\": " + "\"alpha\"" + "," + post_data['algorithm'] = "alpha" - post_confidence = "\"confidence\": " + str(entry[1]) - post_end = " }" + post_data['confidence'] = str(entry[1]) + #### + + # Convert the detection data dictionary into a JSON string + post_json = json.dumps(post_data) + # print(post_json) + + if debug_birdweather_submissions: + print( + f'handle_client:: debug_birdweather_submissions:: Add Birdweather queue submission with, soundscape_url:{soundscape_url} - wave_data:{sys.getsizeof(gzip_wav_data) / 1000}KB - detection_url:{detection_url} - json_detection_data:{post_json}') + + # Create a dictionary containing all the data we need to posts a submission to BirdWeather + submission_data = dict() + submission_data['soundscape_url'] = soundscape_url + submission_data['soundscape_filename'] = soundscape_filename + submission_data['gzip_wav_data'] = gzip_wav_data + submission_data['detection_url'] = detection_url + submission_data['detection_post_json'] = post_json + + # Add it to the queue to be processed + bw_submission_queue.put_nowait(submission_data) + + except BaseException as b_exec: + print(f"ERROR: Cannot POST to BirdWeather right now - {b_exec}") - post_json = post_begin + post_timestamp + post_lat + post_lon + post_soundscape_id + post_soundscape_start_time + \ - post_soundscape_end_time + post_commonName + post_scientificName + post_algorithm + post_confidence + post_end - print(post_json) - response = requests.post(detection_url, json=json.loads(post_json)) - print("Detection POST Response Status - ", response.status_code) - except BaseException: - print("Cannot POST right now") conn.send(myReturn.encode(FORMAT)) # time.sleep(3)