Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thematic analysis output #5

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions docker-run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ done


# Check that the correct number of arguments were provided.
if [[ $# -ne 11 ]]; then
if [[ $# -ne 12 ]]; then
echo "Usage: ./docker-run.sh
[--profile-cpu <profile-output-path>]
[--drive-upload <drive-auth-file> <messages-drive-path> <individuals-drive-path> <production-drive-path>]
<user> <google-cloud-credentials-file-path> <phone-number-uuid-table-path>
<s02e01-input-path> <prev-coded-dir> <json-output-path>
<s02e01-input-path> <prev-coded-dir> <json-output-path>
as2388 marked this conversation as resolved.
Show resolved Hide resolved
<icr-output-dir> <coded-output-dir> <messages-output-csv> <individuals-output-csv> <production-output-csv>"
exit
fi
Expand All @@ -43,13 +43,14 @@ USER=$1
GOOGLE_CLOUD_CREDENTIALS_FILE_PATH=$2
INPUT_PHONE_UUID_TABLE=$3
INPUT_S02E01=$4
PREV_CODED_DIR=$5
OUTPUT_JSON=$6
OUTPUT_ICR_DIR=$7
OUTPUT_CODED_DIR=$8
OUTPUT_MESSAGES_CSV=$9
OUTPUT_INDIVIDUALS_CSV=${10}
OUTPUT_PRODUCTION_CSV=${11}
INPUT_DEMOG=$5
PREV_CODED_DIR=$6
OUTPUT_JSON=$7
OUTPUT_ICR_DIR=$8
OUTPUT_CODED_DIR=$9
OUTPUT_MESSAGES_CSV=${10}
OUTPUT_INDIVIDUALS_CSV=${11}
OUTPUT_PRODUCTION_CSV=${12}

# Build an image for this pipeline stage.
docker build --build-arg INSTALL_CPU_PROFILER="$PROFILE_CPU" -t "$IMAGE_NAME" .
Expand All @@ -70,7 +71,7 @@ if [[ "$DRIVE_UPLOAD" = true ]]; then
fi
CMD="pipenv run $PROFILE_CPU_CMD python -u pipeline.py $DRIVE_UPLOAD_ARG \
\"$USER\" pipeline_config.json /credentials/google-cloud-credentials.json /data/phone-number-uuid-table-input.json \
/data/s02e01-input.json /data/prev-coded \
/data/s02e01-input.json /data/demog-input.json /data/prev-coded \
/data/output.json /data/output-icr /data/coded \
/data/output-messages.csv /data/output-individuals.csv /data/output-production.csv \
"
Expand All @@ -86,6 +87,8 @@ trap finish EXIT
docker cp "$GOOGLE_CLOUD_CREDENTIALS_FILE_PATH" "$container:/credentials/google-cloud-credentials.json"
docker cp "$INPUT_PHONE_UUID_TABLE" "$container:/data/phone-number-uuid-table-input.json"
docker cp "$INPUT_S02E01" "$container:/data/s02e01-input.json"
docker cp "$INPUT_DEMOG" "$container:/data/demog-input.json"


if [[ -d "$PREV_CODED_DIR" ]]; then
docker cp "$PREV_CODED_DIR" "$container:/data/prev-coded"
Expand All @@ -111,7 +114,7 @@ mkdir -p "$(dirname "$OUTPUT_INDIVIDUALS_CSV")"
#docker cp "$container:/data/output-individuals.csv" "$OUTPUT_INDIVIDUALS_CSV"

mkdir -p "$(dirname "$OUTPUT_PRODUCTION_CSV")"
#docker cp "$container:/data/output-production.csv" "$OUTPUT_PRODUCTION_CSV"
docker cp "$container:/data/output-production.csv" "$OUTPUT_PRODUCTION_CSV"

if [[ "$PROFILE_CPU" = true ]]; then
mkdir -p "$(dirname "$CPU_PROFILE_OUTPUT_PATH")"
Expand Down
10 changes: 8 additions & 2 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import random

from core_data_modules.logging import Logger
from core_data_modules.traced_data.io import TracedDataJsonIO
from core_data_modules.util import PhoneNumberUuidTable, IOUtils
from storage.google_drive import drive_client_wrapper
Expand All @@ -11,6 +12,9 @@
from src.apply_manual_codes import ApplyManualCodes
from src.combine_raw_datasets import CombineRawDatasets
from src.translate_rapidpro_keys import TranslateRapidProKeys
from src.production_file import ProductionFile

log = Logger(__name__)

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Runs the post-fetch phase of the CAPYEI pipeline",
Expand Down Expand Up @@ -253,7 +257,7 @@
data = TranslateRapidProKeys.translate_rapid_pro_keys(user, data, pipeline_configuration, prev_coded_dir_path)

print("Auto Coding Surveys...")
data = AutoCodeSurveys.auto_code_surveys(user, data, icr_output_dir, coded_dir_path)
data = AutoCodeSurveys.auto_code_surveys(user, data, icr_output_dir, coded_dir_path, prev_coded_dir_path)

data = ProductionFile.generate(data, production_csv_output_path)

Expand All @@ -273,7 +277,9 @@
data = TranslateRapidProKeys.translate_rapid_pro_keys(user, data, pipeline_configuration, prev_coded_dir_path)

print("Auto Coding Surveys...")
data = AutoCodeSurveys.auto_code_surveys(user, data, icr_output_dir, coded_dir_path)
data = AutoCodeSurveys.auto_code_surveys(user, data, icr_output_dir, coded_dir_path, prev_coded_dir_path)

data = ProductionFile.generate(data, production_csv_output_path)

print("Applying Manual Codes from Coda...")
data = ApplyManualCodes.apply_manual_codes(user, data, prev_coded_dir_path)
Expand Down
10 changes: 5 additions & 5 deletions pipeline_config.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"RapidProDomain": "textit.in",
"RapidProTokenFileURL": "gs://avf-credentials/icraf-text-it-token.txt",
"RapidProTokenFileURL": "gs://avf-credentials/capyei-text-it-token.txt",
"RapidProKeyRemappings": [
{"RapidProKey": "avf_phone_id", "PipelineKey": "uid"},
{"RapidProKey": "Capyei_Valuable (Value) - capyei_pp", "PipelineKey": "capyei_valuable_raw"},
{"RapidProKey": "Capyei_Valuable (Time) - capyei_pp", "PipelineKey": "capyei_valuable_time"},
{"RapidProKey": "Capyei_Change (Value) - capyei_pp", "PipelineKey": "capyei_change_raw"},
{"RapidProKey": "Capyei_Change (Time) - capyei_pp", "PipelineKey": "capyei_change_time"}
{"RapidProKey": "Capyei_Valuable (Value) - capyei_pp_survey", "PipelineKey": "capyei_valuable_raw"},
{"RapidProKey": "Capyei_Valuable (Time) - capyei_pp_survey", "PipelineKey": "capyei_valuable_time"},
{"RapidProKey": "Capyei_Change (Value) - capyei_pp_survey", "PipelineKey": "capyei_change_raw"},
{"RapidProKey": "Capyei_Change (Time) - capyei_pp_survey", "PipelineKey": "capyei_change_time"}
]
}

2 changes: 1 addition & 1 deletion run_scripts/1_fetch_raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
uuid_table_path = f"{root_data_dir}/UUIDs/phone_uuids.json"

SURVEYS = [
"capyei_pp"
"capyei_pp_survey"
]

TEST_CONTACTS_PATH = os.path.abspath("./test_contact_rapid_pro_ids.json")
Expand Down
2 changes: 1 addition & 1 deletion run_scripts/generate_outputs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ mkdir -p "$DATA_ROOT/Outputs"
cd ..
./docker-run.sh ${CPU_PROFILE_ARG} ${DRIVE_UPLOAD_ARG} \
"$USER" "$GOOGLE_CLOUD_CREDENTIALS_FILE_PATH" "$DATA_ROOT/UUIDs/phone_uuids.json" \
"$DATA_ROOT/Raw Data/capyei_pp.json" "$DATA_ROOT/Coded Coda Files/" \
"$DATA_ROOT/Raw Data/capyei_pp_survey.json" "$DATA_ROOT/Raw Data/capyei_demog_cleaned.json" "$DATA_ROOT/Coded Coda Files/" \
"$DATA_ROOT/Outputs/traced_data.json" \
"$DATA_ROOT/Outputs/ICR/" "$DATA_ROOT/Outputs/Coda Files/" \
"$DATA_ROOT/Outputs/capyei_messages.csv" "$DATA_ROOT/Outputs/capyei_individuals.csv" \
Expand Down
29 changes: 24 additions & 5 deletions src/auto_code_surveys.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
from os import path
import random
import csv

from core_data_modules.cleaners import Codes, PhoneCleaner
from core_data_modules.cleaners.cleaning_utils import CleaningUtils
Expand All @@ -14,11 +15,11 @@

class AutoCodeSurveys(object):
SENT_ON_KEY = "sent_on"
ICR_MESSAGES_COUNT = 200
ICR_MESSAGES_COUNT = 250
ICR_SEED = 0

@classmethod
def auto_code_surveys(cls, user, data, icr_output_dir, coda_output_dir):
def auto_code_surveys(cls, user, data, icr_output_dir, coda_output_dir, prev_coded_dir):
# Auto-code surveys
for plan in PipelineConfiguration.SURVEY_CODING_PLANS:
if plan.cleaner is not None:
Expand All @@ -36,9 +37,27 @@ def auto_code_surveys(cls, user, data, icr_output_dir, coda_output_dir):
TracedDataCodaV2IO.export_traced_data_iterable_to_coda_2(
data, plan.raw_field, plan.time_field, plan.id_field, {plan.coded_field: plan.code_scheme}, f
)

# Output messages for thematic analysis
IOUtils.ensure_dirs_exist(icr_output_dir)
for plan in PipelineConfiguration.SURVEY_CODING_PLANS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be incremental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file output, you mean? Because we needed to start thematic analysis before the data collection concluded. Does that answer your question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, on other projects we have been able to regenerate all of the outputs every time the pipeline has re-run (e.g. for the production files on which thematic analysis was done previously, we just overwrote the existing file with a new one that contained the newest data).

I don't think I know enough about this project to comment, @lukechurch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The incremental data was coming from one survey. Due to delivery timelines, we started thematic analysis on Saturday of data from the survey sent on Friday, adding new data on Sunday.

prev_thematic_analysis_input_path = path.join(prev_coded_dir, plan.prev_thematic_analysis_filename)
with open(prev_thematic_analysis_input_path, "r") as f:
prev_thematic_analysis_dict = csv.DictReader(f)
prev_thematic_analysis_messages = [message[plan.raw_field] for message in prev_thematic_analysis_dict]
rqa_messages = []
for td in data:
if plan.raw_field in td:
if td[plan.raw_field] not in prev_thematic_analysis_messages:
rqa_messages.append(td)

icr_output_path = path.join(icr_output_dir, plan.thematic_analysis_filename)
with open(icr_output_path, "w") as f:
TracedDataCSVIO.export_traced_data_iterable_to_csv(
rqa_messages, f, headers=[plan.raw_field]
)

# Output messages for ICR
IOUtils.ensure_dirs_exist(icr_output_dir)
for plan in PipelineConfiguration.SURVEY_CODING_PLANS:
rqa_messages = []
for td in data:
Expand All @@ -51,8 +70,8 @@ def auto_code_surveys(cls, user, data, icr_output_dir, coda_output_dir):
icr_output_path = path.join(icr_output_dir, plan.icr_filename)
with open(icr_output_path, "w") as f:
TracedDataCSVIO.export_traced_data_iterable_to_csv(
icr_messages, f, headers=[plan.run_id_field, plan.raw_field]
icr_messages, f, headers=[plan.raw_field]
)


return data
return data
as2388 marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 16 additions & 1 deletion src/lib/pipeline_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ class CodeSchemes(object):
class CodingPlan(object):
def __init__(self, raw_field, coded_field, coda_filename, cleaner=None, code_scheme=None, time_field=None,
run_id_field=None, icr_filename=None, analysis_file_key=None, id_field=None,
binary_code_scheme=None, binary_coded_field=None, binary_analysis_file_key=None):
binary_code_scheme=None, binary_coded_field=None, binary_analysis_file_key=None, course_name=None,
thematic_analysis_filename=None, prev_thematic_analysis_filename=None):
self.raw_field = raw_field
self.coded_field = coded_field
self.coda_filename = coda_filename
self.icr_filename = icr_filename
self.thematic_analysis_filename = thematic_analysis_filename
self.prev_thematic_analysis_filename = prev_thematic_analysis_filename
self.cleaner = cleaner
self.code_scheme = code_scheme
self.time_field = time_field
Expand All @@ -30,6 +33,7 @@ def __init__(self, raw_field, coded_field, coda_filename, cleaner=None, code_sch
self.binary_code_scheme = binary_code_scheme
self.binary_coded_field = binary_coded_field
self.binary_analysis_file_key = binary_analysis_file_key
self.course_name = course_name
as2388 marked this conversation as resolved.
Show resolved Hide resolved

if id_field is None:
id_field = "{}_id".format(self.raw_field)
Expand All @@ -47,6 +51,8 @@ class PipelineConfiguration(object):
time_field="capyei_valuable_time",
coda_filename="capyei_valuable.json",
icr_filename="capyei_valuable.csv",
thematic_analysis_filename="capyei_valuable_thematic_new.csv",
prev_thematic_analysis_filename="capyei_valuable_thematic.csv",
analysis_file_key="capyei_valuable_",
cleaner=None,
code_scheme=CodeSchemes.VALUABLE),
Expand All @@ -56,11 +62,20 @@ class PipelineConfiguration(object):
time_field="capyei_change_time",
coda_filename="capyei_change.json",
icr_filename="capyei_change.csv",
thematic_analysis_filename="capyei_change_thematic_new.csv",
prev_thematic_analysis_filename="capyei_change_thematic.csv",
analysis_file_key="capyei_change_",
cleaner=None,
code_scheme=CodeSchemes.CHANGE)
]

DEMOGS = [
CodingPlan(raw_field="course_name", coded_field=None, coda_filename=None),
CodingPlan(raw_field="Sex", coded_field=None, coda_filename=None),
as2388 marked this conversation as resolved.
Show resolved Hide resolved
CodingPlan(raw_field="age", coded_field=None, coda_filename=None),
CodingPlan(raw_field="training_center", coded_field=None, coda_filename=None),
]

def __init__(self, rapid_pro_domain, rapid_pro_token_file_url, rapid_pro_key_remappings):
"""
:param rapid_pro_domain: URL of the Rapid Pro server to download data from.
Expand Down