From 0dbcd1a902d02c732455e1eb8f4668d2e1968a08 Mon Sep 17 00:00:00 2001 From: Avra Saslow Date: Wed, 12 Jul 2023 07:22:44 -0700 Subject: [PATCH] Add updated script --- .../contents/src/__init__.py | 108 ++++++++++++------ 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/soc_016_conflict_protest_events/contents/src/__init__.py b/soc_016_conflict_protest_events/contents/src/__init__.py index 766b4e83..f1025771 100644 --- a/soc_016_conflict_protest_events/contents/src/__init__.py +++ b/soc_016_conflict_protest_events/contents/src/__init__.py @@ -14,8 +14,11 @@ import warnings warnings.simplefilter(action='ignore', category=UserWarning) +from dotenv import load_dotenv +load_dotenv() + # do you want to delete everything currently in the Carto table when you run this script? -CLEAR_TABLE_FIRST = True +REFRESH_ALL_POINT_DATA = False # Carto username and API key for account where we will store the data CARTO_USER = os.getenv('CARTO_USER') @@ -77,7 +80,8 @@ # Important! Before testing this script: # Please change this ID OR comment out the getLayerIDs(DATASET_ID) function in the script below # Failing to do so will overwrite the last update date on a different dataset on Resource Watch -DATASET_ID = 'ea208a8b-4559-434b-82ee-95e041596a3a' + +#DATASET_ID = 'ea208a8b-4559-434b-82ee-95e041596a3a' ''' FUNCTIONS FOR ALL DATASETS @@ -158,7 +162,7 @@ def checkCreateTable(table, schema, id_field, time_field=''): They should all be checked because their format likely will need to be changed. ''' -def fetch_data(src_url): +def fetch_all_point_data(src_url): ''' Fetch ACLED data via the API INPUT src_url: the url to fetch data from (string) @@ -173,6 +177,9 @@ def fetch_data(src_url): # initialize at 0 so that we can start pulling from page 1 in the loop page = 0 + # number of records to return per page + limit = 5000 + # length (number of rows) of new_data # initialize at 1 so that the while loop works during first step new_count = 1 @@ -180,9 +187,10 @@ def fetch_data(src_url): # create an empty list to store ids of data new_ids = [] # create an empty dataframe to store data - data_df = pd.DataFrame() # get and parse each page; stop when no new results or max pages # process up to MIN_PAGES even if there are no new results from them + accumulated_json = {'data': []} + while page <= MIN_PAGES or new_count and page < MAX_PAGES: try: # increment page number in every loop @@ -191,39 +199,27 @@ def fetch_data(src_url): # create an empty list to store data new_rows = [] # generate the url and pull data for this page - r = requests.get(src_url.format(key=ACLED_KEY, user=ACLED_USER, date_start=date_start, date_end=date_end, page=page)) - # columns of the pandas dataframe - """ cols = ["data_id", "event_date", "year", "time_precision", "event_type", "sub_event_type", "actor1", "assoc_actor_1", "inter1", - "actor2", "assoc_actor_2", "inter2", "interaction", "country", "iso3", "region", "admin1", "admin2", "admin3", "location", - "geo_precision", "time_precision", "source", "source_scale", "notes", "fatalities", "latitude", "longitude"] """ - cols = ['event_id_cnty','event_type', 'latitude', 'longitude'] - - # pull data from request response json - for obs in r.json()['data']: - # append the id to the list for sending to Carto - new_ids.append(obs['event_id_cnty']) - # create an empty list to store data from this row - row = [] - # go through each column in the Carto table - for col in cols: - try: - # add data for remaining fields to the list of data from this row - row.append(obs[col]) - except: - logging.debug('{} not available for this row'.format(col)) - # if the column we are trying to retrieve doesn't exist in the source data, store blank - row.append('') - - # add the list of values from this row to the list of new data - new_rows.append(row) - - # number of new rows added in this page - new_count = len(new_rows) - # append the new rows to the pandas dataframe - data_df = data_df.append(pd.DataFrame(new_rows, columns=cols)) - + current_url = src_url.format(key=ACLED_KEY, user=ACLED_USER, date_start=date_start, date_end=date_end, page=page, limit=limit) + + with requests.get(current_url) as r: + print(current_url) + if r.ok: + ds = json.loads(r.content) + else: + raise ValueError(f'API request failed: {current_url}') + assert 'data' in ds + new_count = len(ds['data']) + + accumulated_json['data'].extend(ds['data']) except: logging.error('Could not fetch or process page {}'.format(page)) + + data_df = pd.json_normalize(accumulated_json, record_path =['data']) + + +#TODO: cols = ['event_id_cnty','event_type', 'latitude', 'longitude'] + + # drop duplicate records by event_id_cnty data_df = data_df.drop_duplicates(['event_id_cnty']).iloc[:, 1:] @@ -234,6 +230,35 @@ def fetch_data(src_url): return data_gdf +#TODO: New function to save source point data output to carto a table. Run if REFRESH_ALL_POINT_DATA is TRUE + +#TODO: New update_point_data function to update exsiting point data in in Carto table by adding new records and deleting deleted ones. Run if REFRESH_ALL_POINT_DATA is FALSE and return all records from carto as data_gdf. + +r = cartosql.getFields('event_id_cnty', CARTO_GEO, f='csv', post=True, user=CARTO_USER, key=CARTO_KEY) + +def update_point_data(data_gdf): + + ''' + Update point data in Carto table by adding new records and deleting deleted ones + INPUT data_gdf: geopandas dataframe storing the point ACLED data (geopandas dataframe) + RETURN data_gdf: geopandas dataframe storing the point ACLED data (geopandas dataframe) + ''' + + # get the ids of polygons from the carto table storing administrative areas + r = cartosql.getFields('event_id_cnty', CARTO_GEO, f='csv', post=True, user=CARTO_USER, key=CARTO_KEY) + + # turn the response into a list of ids + geo_id = r.text.split('\r\n')[1:-1] + + # create an empty list to store the ids of rows uploaded to Carto + uploaded_ids = [] + + + + + return data_gdf + + def processNewData(data_gdf): ''' Process and upload new data @@ -558,10 +583,16 @@ def main(): existing_ids = checkCreateTable(CARTO_TABLE, CARTO_SCHEMA, UID_FIELD) # fetch the ACLED data in the past 12 month - data_gdf = fetch_data(SOURCE_URL) + #TODO: only run if REFRESH_ALL_POINT_DATA is True + #TODO: Run update_point_data fuction if REFRESH_ALL_POINT_DATA is False + + if REFRESH_ALL_POINT_DATA is True: + data_gdf = fetch_all_point_data(SOURCE_URL) + else: + data_gdf = update_point_data(SOURCE_URL) # clear the table before starting, if specified - if CLEAR_TABLE_FIRST: + if REFRESH_ALL_POINT_DATA: logging.info("clearing table") # if the table exists if cartosql.tableExists(CARTO_TABLE, user=CARTO_USER, key=CARTO_KEY): @@ -584,4 +615,5 @@ def main(): # Update Resource Watch updateResourceWatch(num_new) - logging.info('SUCCESS') \ No newline at end of file + logging.info('SUCCESS') + \ No newline at end of file