-
Notifications
You must be signed in to change notification settings - Fork 0
/
export_weekly_non_relevant_contacts.py
140 lines (116 loc) · 6.72 KB
/
export_weekly_non_relevant_contacts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import argparse
import csv
import json
import sys
from core_data_modules.analysis import analysis_utils, AnalysisConfiguration
from core_data_modules.cleaners import Codes
from core_data_modules.logging import Logger
from core_data_modules.traced_data.io import TracedDataJsonIO
from id_infrastructure.firestore_uuid_table import FirestoreUuidTable
from storage.google_cloud import google_cloud_utils
from src.lib import PipelineConfiguration
log = Logger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generates lists of phone numbers who sent `non relevant messages` in the"
" past week from traced data so that we can resend them tailored advert")
parser.add_argument("--exclusion-list-file-path", nargs="?",
help="List of phone numbers to exclude from the ad group")
parser.add_argument("google_cloud_credentials_file_path", metavar="google-cloud-credentials-file-path",
help="Path to a Google Cloud service account credentials file to use to access the "
"credentials bucket")
parser.add_argument("pipeline_configuration_file_path", metavar="pipeline-configuration-file",
help="Path to the pipeline configuration json file")
parser.add_argument("target_dataset_name", metavar="target-dataset-name",
help="Target dataset name to check for message relevance from")
parser.add_argument("traced_data_paths", metavar="traced-data-paths", nargs="+",
help="Paths to the traced data files (either messages or individuals) to extract phone "
"numbers from")
parser.add_argument("csv_output_file_path", metavar="csv-output-file-path",
help="Path to a CSV file to write the contacts from the locations of interest to. "
"Exported file is in a format suitable for direct upload to Rapid Pro")
args = parser.parse_args()
exclusion_list_file_path = args.exclusion_list_file_path
google_cloud_credentials_file_path = args.google_cloud_credentials_file_path
pipeline_configuration_file_path = args.pipeline_configuration_file_path
target_dataset_name = args.target_dataset_name
traced_data_paths = args.traced_data_paths
csv_output_file_path = args.csv_output_file_path
sys.setrecursionlimit(10000)
log.info("Loading Pipeline Configuration File...")
with open(pipeline_configuration_file_path) as f:
pipeline_configuration = PipelineConfiguration.from_configuration_file(f)
Logger.set_project_name(pipeline_configuration.pipeline_name)
log.debug(f"Pipeline name is {pipeline_configuration.pipeline_name}")
log.info("Downloading Firestore UUID Table credentials...")
firestore_uuid_table_credentials = json.loads(google_cloud_utils.download_blob_to_string(
google_cloud_credentials_file_path,
pipeline_configuration.uuid_table.firebase_credentials_file_url
))
phone_number_uuid_table = FirestoreUuidTable.init_from_credentials(
firestore_uuid_table_credentials,
pipeline_configuration.uuid_table.table_name,
pipeline_configuration.uuid_table.uuid_prefix
)
log.info("Initialised the Firestore UUID table")
uuids = set()
for path in traced_data_paths:
# Load the traced data
log.info(f"Loading previous traced data from file '{path}'...")
with open(path) as f:
data = TracedDataJsonIO.import_jsonl_to_traced_data_iterable(f)
log.info(f"Loaded {len(data)} traced data objects")
for td in data:
if td["consent_withdrawn"] == Codes.TRUE:
continue
# Check for contacts who have only sent messages labeled as showtime_question", "greeting", "opt_in", "NC"
# in the target episodes. We will send a tailored follow up sms with the target episode question
for plan in PipelineConfiguration.RQA_CODING_PLANS:
if plan.dataset_name == target_dataset_name:
for cc in plan.coding_configurations:
analysis_configurations = AnalysisConfiguration(plan.dataset_name,
plan.raw_field,
cc.coded_field,
cc.code_scheme)
codes = analysis_utils.get_codes_from_td(td, analysis_configurations)
if not analysis_utils.relevant(td, "consent_withdrawn", analysis_configurations):
for code in codes:
if code.string_value in ["showtime_question", "greeting", "opt_in", "NC"]:
uuids.add(td["uid"])
log.info(f"Loaded {len(uuids)} uuids from TracedData")
if exclusion_list_file_path is not None:
# Load the exclusion list
log.info(f"Loading the exclusion list from {exclusion_list_file_path}...")
with open(exclusion_list_file_path) as f:
exclusion_list = json.load(f)
log.info(f"Loaded {len(exclusion_list)} numbers to exclude")
# Remove any uuids in the exclusion list
log.info(f"Removing exclusion list uuids from the contacts group")
removed = 0
for uuid in set(uuids):
if uuid not in exclusion_list:
removed += 1
uuids.remove(uuid)
log.info(f"Removed {removed} uuids; {len(uuids)} remain")
# Convert the uuids to phone numbers
log.info(f"Converting {len(uuids)} uuids to phone numbers...")
uuid_phone_number_lut = phone_number_uuid_table.uuid_to_data_batch(uuids)
phone_numbers = set()
skipped_uuids = set()
for uuid in uuids:
# Some uuids are no longer re-identifiable due to a uuid table consistency issue between OCHA and WorldBank-PLR
if uuid in uuid_phone_number_lut:
phone_numbers.add(f"+{uuid_phone_number_lut[uuid]}")
else:
skipped_uuids.add(uuid)
log.info(f"Successfully converted {len(phone_numbers)} uuids to phone numbers.")
log.warning(f"Unable to re-identify {len(skipped_uuids)} uuids")
# Export contacts CSV
log.warning(f"Exporting {len(phone_numbers)} phone numbers to {csv_output_file_path}...")
with open(csv_output_file_path, "w") as f:
writer = csv.DictWriter(f, fieldnames=["URN:Tel", "Name"], lineterminator="\n")
writer.writeheader()
for n in phone_numbers:
writer.writerow({
"URN:Tel": n
})
log.info(f"Wrote {len(phone_numbers)} contacts to {csv_output_file_path}")