-
Notifications
You must be signed in to change notification settings - Fork 0
/
export_weekly_ad_contacts.py
106 lines (89 loc) · 4.85 KB
/
export_weekly_ad_contacts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import argparse
import csv
import json
import sys
from core_data_modules.cleaners import Codes, PhoneCleaner
from core_data_modules.logging import Logger
from core_data_modules.traced_data.io import TracedDataJsonIO
from id_infrastructure.firestore_uuid_table import FirestoreUuidTable
from storage.google_cloud import google_cloud_utils
log = Logger(__name__)
sys.setrecursionlimit(50000)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Exports weekly ad contacts from analysis Traced Data")
parser.add_argument("--target-mnos", nargs="?", action="store",
help="Comma-separated list of mobile network operators to filter for. "
"For example, to export Golis/Hormuud urns only, use '--target-mnos=golis,hormud'")
parser.add_argument("google_cloud_credentials_file_path", metavar="google-cloud-credentials-file-path",
help="Path to a Google Cloud service account credentials file to use to access the "
"credentials bucket"),
parser.add_argument("uuid_table_credentials_file_url", metavar="uuid-table-credentials-file-url",
help="GS URL to the Firebase credentials file to use for the uuid table")
parser.add_argument("uuid_table_name", metavar="uuid-table-name",
help="Name of the uuid table to use to re-identify the participants")
parser.add_argument("traced_data_paths", metavar="traced-data-paths", nargs="+",
help="Paths to the traced data files (either messages or individuals) to extract phone "
"numbers from")
parser.add_argument("csv_output_file_path", metavar="csv-output-file-path",
help="Path to a CSV file to write the contacts from the locations of interest to. "
"Exported file is in a format suitable for direct upload to Rapid Pro")
args = parser.parse_args()
target_mnos = None if args.target_mnos is None else args.target_mnos.split(",")
google_cloud_credentials_file_path = args.google_cloud_credentials_file_path
uuid_table_credentials_file_url = args.uuid_table_credentials_file_url
uuid_table_name = args.uuid_table_name
traced_data_paths = args.traced_data_paths
csv_output_file_path = args.csv_output_file_path
log.info("Initialising uuid table client...")
credentials = json.loads(google_cloud_utils.download_blob_to_string(
google_cloud_credentials_file_path,
uuid_table_credentials_file_url
))
uuid_table = FirestoreUuidTable.init_from_credentials(
credentials,
uuid_table_name,
None # We can use None here because we're not going to create any ids
)
log.info("Initialised uuid table client")
uuids = set()
opt_out_uuids = set()
for path in traced_data_paths:
log.info(f"Loading previous traced data from file '{path}'...")
with open(path) as f:
data = TracedDataJsonIO.import_jsonl_to_traced_data_iterable(f)
log.info(f"Loaded {len(data)} traced data objects")
for td in data:
if td["consent_withdrawn"] == Codes.TRUE:
opt_out_uuids.add(td["participant_uuid"])
if td["participant_uuid"].startswith("avf-participant-uuid-"):
uuids.add(td["participant_uuid"])
log.info(f"Loaded {len(uuids)} uuids from TracedData (of which {len(opt_out_uuids)} uuids withdrew consent)")
uuids = uuids - opt_out_uuids
log.info(f"Proceeding with {len(uuids)} opt-in uuids")
log.info(f"Converting {len(uuids)} uuids to urns...")
urn_lut = uuid_table.uuid_to_data_batch(uuids)
urns = {urn_lut[uuid] for uuid in uuids}
log.info(f"Converted {len(uuids)} uuids to {len(urns)} urns")
if target_mnos is not None:
log.info(f"Filtering {len(urns)} urns for those from operators {target_mnos}...")
filtered_urns = set()
for urn in urns:
operator = PhoneCleaner.clean_operator(urn)
if operator in target_mnos:
filtered_urns.add(urn)
log.info(f"Filtered urns for those from operators {target_mnos}. {len(filtered_urns)}/{len(urns)} urns remain")
urns = filtered_urns
# Export contacts CSV
log.warning(f"Exporting {len(urns)} urns to {csv_output_file_path}...")
with open(csv_output_file_path, "w") as f:
urn_namespaces = {urn.split(":")[0] for urn in urns}
headers = [f"URN:{namespace}" for namespace in sorted(urn_namespaces)]
writer = csv.DictWriter(f, fieldnames=headers, lineterminator="\n")
writer.writeheader()
for urn in urns:
namespace = urn.split(":")[0]
value = urn.split(":")[1]
writer.writerow({
f"URN:{namespace}": value
})
log.info(f"Wrote {len(urns)} urns to {csv_output_file_path}")