-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add updated script #1155
base: master
Are you sure you want to change the base?
Add updated script #1155
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,8 +14,11 @@ | |
import warnings | ||
warnings.simplefilter(action='ignore', category=UserWarning) | ||
|
||
from dotenv import load_dotenv | ||
load_dotenv() | ||
|
||
# do you want to delete everything currently in the Carto table when you run this script? | ||
CLEAR_TABLE_FIRST = True | ||
REFRESH_ALL_POINT_DATA = False | ||
|
||
# Carto username and API key for account where we will store the data | ||
CARTO_USER = os.getenv('CARTO_USER') | ||
|
@@ -77,7 +80,8 @@ | |
# Important! Before testing this script: | ||
# Please change this ID OR comment out the getLayerIDs(DATASET_ID) function in the script below | ||
# Failing to do so will overwrite the last update date on a different dataset on Resource Watch | ||
DATASET_ID = 'ea208a8b-4559-434b-82ee-95e041596a3a' | ||
|
||
#DATASET_ID = 'ea208a8b-4559-434b-82ee-95e041596a3a' | ||
Comment on lines
-80
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason this is commented out? |
||
|
||
''' | ||
FUNCTIONS FOR ALL DATASETS | ||
|
@@ -158,7 +162,7 @@ def checkCreateTable(table, schema, id_field, time_field=''): | |
They should all be checked because their format likely will need to be changed. | ||
''' | ||
|
||
def fetch_data(src_url): | ||
def fetch_all_point_data(src_url): | ||
''' | ||
Fetch ACLED data via the API | ||
INPUT src_url: the url to fetch data from (string) | ||
|
@@ -173,16 +177,20 @@ def fetch_data(src_url): | |
# initialize at 0 so that we can start pulling from page 1 in the loop | ||
page = 0 | ||
|
||
# number of records to return per page | ||
limit = 5000 | ||
|
||
# length (number of rows) of new_data | ||
# initialize at 1 so that the while loop works during first step | ||
new_count = 1 | ||
|
||
# create an empty list to store ids of data | ||
new_ids = [] | ||
# create an empty dataframe to store data | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this comment be removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or updated to say we are creating an empty json array? |
||
data_df = pd.DataFrame() | ||
# get and parse each page; stop when no new results or max pages | ||
# process up to MIN_PAGES even if there are no new results from them | ||
accumulated_json = {'data': []} | ||
|
||
while page <= MIN_PAGES or new_count and page < MAX_PAGES: | ||
try: | ||
# increment page number in every loop | ||
|
@@ -191,39 +199,27 @@ def fetch_data(src_url): | |
# create an empty list to store data | ||
new_rows = [] | ||
# generate the url and pull data for this page | ||
r = requests.get(src_url.format(key=ACLED_KEY, user=ACLED_USER, date_start=date_start, date_end=date_end, page=page)) | ||
# columns of the pandas dataframe | ||
""" cols = ["data_id", "event_date", "year", "time_precision", "event_type", "sub_event_type", "actor1", "assoc_actor_1", "inter1", | ||
"actor2", "assoc_actor_2", "inter2", "interaction", "country", "iso3", "region", "admin1", "admin2", "admin3", "location", | ||
"geo_precision", "time_precision", "source", "source_scale", "notes", "fatalities", "latitude", "longitude"] """ | ||
cols = ['event_id_cnty','event_type', 'latitude', 'longitude'] | ||
|
||
# pull data from request response json | ||
for obs in r.json()['data']: | ||
# append the id to the list for sending to Carto | ||
new_ids.append(obs['event_id_cnty']) | ||
# create an empty list to store data from this row | ||
row = [] | ||
# go through each column in the Carto table | ||
for col in cols: | ||
try: | ||
# add data for remaining fields to the list of data from this row | ||
row.append(obs[col]) | ||
except: | ||
logging.debug('{} not available for this row'.format(col)) | ||
# if the column we are trying to retrieve doesn't exist in the source data, store blank | ||
row.append('') | ||
|
||
# add the list of values from this row to the list of new data | ||
new_rows.append(row) | ||
|
||
# number of new rows added in this page | ||
new_count = len(new_rows) | ||
# append the new rows to the pandas dataframe | ||
data_df = data_df.append(pd.DataFrame(new_rows, columns=cols)) | ||
|
||
current_url = src_url.format(key=ACLED_KEY, user=ACLED_USER, date_start=date_start, date_end=date_end, page=page, limit=limit) | ||
|
||
with requests.get(current_url) as r: | ||
print(current_url) | ||
if r.ok: | ||
ds = json.loads(r.content) | ||
else: | ||
raise ValueError(f'API request failed: {current_url}') | ||
assert 'data' in ds | ||
new_count = len(ds['data']) | ||
|
||
accumulated_json['data'].extend(ds['data']) | ||
except: | ||
logging.error('Could not fetch or process page {}'.format(page)) | ||
|
||
data_df = pd.json_normalize(accumulated_json, record_path =['data']) | ||
|
||
|
||
#TODO: cols = ['event_id_cnty','event_type', 'latitude', 'longitude'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What needs to be done here? |
||
|
||
|
||
# drop duplicate records by event_id_cnty | ||
data_df = data_df.drop_duplicates(['event_id_cnty']).iloc[:, 1:] | ||
|
||
|
@@ -234,6 +230,35 @@ def fetch_data(src_url): | |
|
||
return data_gdf | ||
|
||
#TODO: New function to save source point data output to carto a table. Run if REFRESH_ALL_POINT_DATA is TRUE | ||
|
||
#TODO: New update_point_data function to update exsiting point data in in Carto table by adding new records and deleting deleted ones. Run if REFRESH_ALL_POINT_DATA is FALSE and return all records from carto as data_gdf. | ||
|
||
r = cartosql.getFields('event_id_cnty', CARTO_GEO, f='csv', post=True, user=CARTO_USER, key=CARTO_KEY) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this needed here? |
||
|
||
def update_point_data(data_gdf): | ||
|
||
''' | ||
Update point data in Carto table by adding new records and deleting deleted ones | ||
INPUT data_gdf: geopandas dataframe storing the point ACLED data (geopandas dataframe) | ||
RETURN data_gdf: geopandas dataframe storing the point ACLED data (geopandas dataframe) | ||
''' | ||
|
||
# get the ids of polygons from the carto table storing administrative areas | ||
r = cartosql.getFields('event_id_cnty', CARTO_GEO, f='csv', post=True, user=CARTO_USER, key=CARTO_KEY) | ||
|
||
# turn the response into a list of ids | ||
geo_id = r.text.split('\r\n')[1:-1] | ||
|
||
# create an empty list to store the ids of rows uploaded to Carto | ||
uploaded_ids = [] | ||
|
||
|
||
|
||
|
||
return data_gdf | ||
Comment on lines
+239
to
+259
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't look like this was finished maybe? |
||
|
||
|
||
def processNewData(data_gdf): | ||
''' | ||
Process and upload new data | ||
|
@@ -558,10 +583,16 @@ def main(): | |
existing_ids = checkCreateTable(CARTO_TABLE, CARTO_SCHEMA, UID_FIELD) | ||
|
||
# fetch the ACLED data in the past 12 month | ||
data_gdf = fetch_data(SOURCE_URL) | ||
#TODO: only run if REFRESH_ALL_POINT_DATA is True | ||
#TODO: Run update_point_data fuction if REFRESH_ALL_POINT_DATA is False | ||
|
||
if REFRESH_ALL_POINT_DATA is True: | ||
data_gdf = fetch_all_point_data(SOURCE_URL) | ||
else: | ||
data_gdf = update_point_data(SOURCE_URL) | ||
|
||
# clear the table before starting, if specified | ||
if CLEAR_TABLE_FIRST: | ||
if REFRESH_ALL_POINT_DATA: | ||
logging.info("clearing table") | ||
# if the table exists | ||
if cartosql.tableExists(CARTO_TABLE, user=CARTO_USER, key=CARTO_KEY): | ||
|
@@ -584,4 +615,5 @@ def main(): | |
# Update Resource Watch | ||
updateResourceWatch(num_new) | ||
|
||
logging.info('SUCCESS') | ||
logging.info('SUCCESS') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are mostly not including this in the scripts, though it would be nice to come up with a more standard way of handling this.