-
Notifications
You must be signed in to change notification settings - Fork 0
/
upload_analysis_files.py
129 lines (110 loc) · 8.29 KB
/
upload_analysis_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import argparse
import json
import os
from glob import glob
from core_data_modules.logging import Logger
from storage.google_cloud import google_cloud_utils
from storage.google_drive import drive_client_wrapper
from src.lib import PipelineConfiguration
log = Logger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Uploads analysis output files to google drive")
parser.add_argument("user", help="User launching this program")
parser.add_argument("pipeline_run_mode", help="whether to generate analysis files or not", choices=["all-stages", "auto-code-only"])
parser.add_argument("google_cloud_credentials_file_path", metavar="google-cloud-credentials-file-path",
help="Path to a Google Cloud service account credentials file to use to access the "
"credentials bucket")
parser.add_argument("pipeline_configuration_file_path", metavar="pipeline-configuration-file-path",
help="Path to the pipeline configuration json file")
parser.add_argument("run_id", metavar="run-id",
help="Identifier of this pipeline run")
parser.add_argument("production_csv_input_path", metavar="production-csv-input-path",
help="Path to a CSV file with raw message and demographic response, for use in "
"radio show production"),
parser.add_argument("messages_csv_input_path", metavar="messages-csv-input-path",
help="Path to analysis dataset CSV where messages are the unit for analysis (i.e. one message "
"per row)"),
parser.add_argument("individuals_csv_input_path", metavar="individuals-csv-input-path",
help="Path to analysis dataset CSV where respondents are the unit for analysis (i.e. one "
"respondent per row, with all their messages joined into a single cell)"),
parser.add_argument("automated_analysis_input_dir", metavar="automated-analysis-input-dir",
help="Directory to read the automated analysis outputs from")
args = parser.parse_args()
user = args.user
pipeline_run_mode = args.pipeline_run_mode
google_cloud_credentials_file_path = args.google_cloud_credentials_file_path
pipeline_configuration_file_path = args.pipeline_configuration_file_path
run_id = args.run_id
production_csv_input_path = args.production_csv_input_path
messages_csv_input_path = args.messages_csv_input_path
individuals_csv_input_path = args.individuals_csv_input_path
automated_analysis_input_dir = args.automated_analysis_input_dir
log.info("Loading Pipeline Configuration File...")
with open(pipeline_configuration_file_path) as f:
pipeline_configuration = PipelineConfiguration.from_configuration_file(f)
Logger.set_project_name(pipeline_configuration.pipeline_name)
log.debug(f"Pipeline name is {pipeline_configuration.pipeline_name}")
# Upload to Google Drive, if requested.
if pipeline_configuration.drive_upload is not None:
log.info(f"Downloading Google Drive service account credentials...")
credentials_info = json.loads(google_cloud_utils.download_blob_to_string(
google_cloud_credentials_file_path, pipeline_configuration.drive_upload.drive_credentials_file_url))
drive_client_wrapper.init_client_from_info(credentials_info)
log.info("Uploading Analysis CSVs to Google Drive...")
if pipeline_run_mode == "all-stages":
production_csv_drive_dir = os.path.dirname(pipeline_configuration.drive_upload.production_upload_path)
production_csv_drive_file_name = os.path.basename(pipeline_configuration.drive_upload.production_upload_path)
drive_client_wrapper.update_or_create(production_csv_input_path, production_csv_drive_dir,
target_file_name=production_csv_drive_file_name,
target_folder_is_shared_with_me=True, recursive=True,
fix_duplicates=True)
messages_csv_drive_dir = os.path.dirname(pipeline_configuration.drive_upload.messages_upload_path)
messages_csv_drive_file_name = os.path.basename(pipeline_configuration.drive_upload.messages_upload_path)
drive_client_wrapper.update_or_create(messages_csv_input_path, messages_csv_drive_dir,
target_file_name=messages_csv_drive_file_name,
target_folder_is_shared_with_me=True, recursive=True,
fix_duplicates=True)
individuals_csv_drive_dir = os.path.dirname(pipeline_configuration.drive_upload.individuals_upload_path)
individuals_csv_drive_file_name = os.path.basename(pipeline_configuration.drive_upload.individuals_upload_path)
drive_client_wrapper.update_or_create(individuals_csv_input_path, individuals_csv_drive_dir,
target_file_name=individuals_csv_drive_file_name,
target_folder_is_shared_with_me=True, recursive=True,
fix_duplicates=True)
paths_to_upload = glob(f"{automated_analysis_input_dir}/*.csv")
log.info(f"Uploading {len(paths_to_upload)} CSVs to Drive...")
drive_client_wrapper.update_or_create_batch(
paths_to_upload, pipeline_configuration.drive_upload.automated_analysis_dir,
target_folder_is_shared_with_me=True, recursive=True, fix_duplicates=True)
paths_to_upload = glob(f"{automated_analysis_input_dir}/graphs/*.png")
log.info(f"Uploading {len(paths_to_upload)} graphs to Drive...")
drive_client_wrapper.update_or_create_batch(
paths_to_upload, f"{pipeline_configuration.drive_upload.automated_analysis_dir}/graphs",
target_folder_is_shared_with_me=True, recursive=True, fix_duplicates=True)
paths_to_upload = glob(f"{automated_analysis_input_dir}/maps/regions/*.png")
log.info(f"Uploading {len(paths_to_upload)} region maps to Drive...")
drive_client_wrapper.update_or_create_batch(
paths_to_upload, f"{pipeline_configuration.drive_upload.automated_analysis_dir}/maps/regions",
target_folder_is_shared_with_me=True, recursive=True, fix_duplicates=True)
paths_to_upload = glob(f"{automated_analysis_input_dir}/maps/districts/*.png")
log.info(f"Uploading {len(paths_to_upload)} district maps to Drive")
drive_client_wrapper.update_or_create_batch(
paths_to_upload, f"{pipeline_configuration.drive_upload.automated_analysis_dir}/maps/districts/",
target_folder_is_shared_with_me=True, recursive=True, fix_duplicates=True)
paths_to_upload = glob(f"{automated_analysis_input_dir}/maps/mogadishu/*.png")
log.info(f"Uploading {len(paths_to_upload)} mogadishu maps to Drive")
drive_client_wrapper.update_or_create_batch(
paths_to_upload, f"{pipeline_configuration.drive_upload.automated_analysis_dir}/maps/mogadishu/",
target_folder_is_shared_with_me=True, recursive=True, fix_duplicates=True)
else:
assert pipeline_run_mode == "auto-code-only", "pipeline run mode must be either auto-code-only or all-stages"
production_csv_drive_dir = os.path.dirname(pipeline_configuration.drive_upload.production_upload_path)
production_csv_drive_file_name = os.path.basename(
pipeline_configuration.drive_upload.production_upload_path)
drive_client_wrapper.update_or_create(production_csv_input_path, production_csv_drive_dir,
target_file_name=production_csv_drive_file_name,
target_folder_is_shared_with_me=True, recursive=True,
fix_duplicates=True)
else:
log.info(
"Skipping uploading to Google Drive (because the pipeline configuration json does not contain the key "
"'DriveUploadPaths')")