diff --git a/mbs_results/utilities/csw_to_spp_converter.py b/mbs_results/utilities/csw_to_spp_converter.py index 229ed01..c9e7324 100644 --- a/mbs_results/utilities/csw_to_spp_converter.py +++ b/mbs_results/utilities/csw_to_spp_converter.py @@ -1,21 +1,66 @@ import fnmatch from os import listdir from os.path import isfile, join +from typing import List +import json +import uuid import pandas as pd -from mbs_results.utilities.utils import convert_column_to_datetime +def create_snapshot(input_directory: str, periods: List[str], output_directory: str) -> pd.DataFrame: + """ + Reads qv and cp files, applies transformations and writes snapshot. + + Parameters + ---------- + input_directory : str + Folder path to CSW files. + periods: List[str] + list of periods to include in the snapshot -def get_patern_df(filepath: str, pattern: str) -> pd.DataFrame: - """Loads as pd dataframe all csv files with pattern. + Action + ------- + Writes a json file in desired location that looks like a SPP snapshot + + Example + ------- + >>periods = [str(i) for i in range(202201, 202213)] + ["202301", "202302", "202303"] + >>input_directory = "path/mbs_anonymised_2024" + >>output_directory = "path/mbs-data" + >>create_snapshot(input_directory, periods, output_directory) + """ + + qv_df = concat_files_from_pattern(input_directory, "qv*.csv", periods) + cp_df = concat_files_from_pattern(input_directory, "cp*.csv", periods) + + responses = convert_qv_to_responses(qv_df) + contributors = convert_cp_to_contributors(cp_df) + + output = {"id": input_directory + str(uuid.uuid4().hex), + "contributors": cp_df.to_dict("list"), + "responses": qv_df.to_dict("list")} + + max_period = max([int(period) for period in periods]) + + with open(f'{output_directory}/snapshot_qv_cp_{max_period}_{len(periods)}.json', 'w', encoding='utf-8') as f: + json.dump(output, f, ensure_ascii=False, indent=4) + + +def concat_files_from_pattern(directory: str, + pattern: str, + periods: List[str]) -> pd.DataFrame: + """ + Loads as pd dataframe of all csv files with pattern and with periods specified in periods. Parameters ---------- - filepath : str - Filepath to folder containg desired files. + directory : str + Folder path to CSW files. pattern : str Regex pattern to filter files in the folder based on name. + periods: List[str] + list of periods to include in the snapshot Returns ------- @@ -24,82 +69,81 @@ def get_patern_df(filepath: str, pattern: str) -> pd.DataFrame: """ filenames = [ - filename for filename in listdir(filepath) if isfile(join(filepath, filename)) + filename for filename in listdir(directory) if ((isfile(join(directory, filename))) & + (filename[-10:-4] in periods)) ] + filenames = fnmatch.filter(filenames, pattern) - df_list = [pd.read_csv(filepath + "/" + filename) for filename in filenames] + df_list = [pd.read_csv(directory + "/" + filename) for filename in filenames] df = pd.concat(df_list, ignore_index=True) return df -def get_qv_and_cp_data( - cp_path: str, - qv_path: str, -) -> pd.DataFrame: - """Reads and joins qv and cp data. +def convert_cp_to_contributors(df): + """ + Converts a dataframe from a cp file from CSW and returns a dataframe that + looks like a contributors table in from an SPP snapshot. Parameters ---------- - cp_path : str - Filepath to folder containing cp data. - qv_path : str - Filepath to folder containing qv data. + df : pd.DataFrame + DataFrame from a cp file Returns ------- pd.DataFrame - Dataframe containing combined qv and cp data. + Dataframe that looks like a contributors table from a snapshot. """ - qv_df = get_patern_df(qv_path, "qv*.csv") - cp_df = get_patern_df(cp_path, "cp*.csv") - - qv_and_cp = pd.merge(qv_df, cp_df, how="left", on=["period", "reference"]) - - return qv_and_cp - - -def csw_to_spp( - cp_path: str, - qv_path: str, - output_path: str, - column_map: dict, - period: str, - period_range: int, -) -> None: - """Combines cp and qv files, filters and renames columns based on a mapping, and - then saves the output as a json file. + df["combined_error_marker"] = df.apply( + lambda x: x["error_mkr"] if x["response_type"]<=2 else str(x["response_type"]), + axis=1 + ) + + error_marker_map = {"C":"Clear", + "O": "Clear - overridden", + "E": "Check needed", + #TODO: Should W map to check needed or something else? + "W": "Check needed", + #TODO: Check which ones below are used in SPP + "3": "Sample deletion", + "4": "Nil1, dead letter", + "5": "Nil2, combined return, zero response", + "6": "Nil3, out-of-scope", + "7": "Nil4, ceased trading", + "8": "Nil5, dormant", + "9": "Nil6, out-of-scope and insufficient data", + "10": "Nil7, in-scope but suspect data", + "11": "Dead", + "12": "Nil8, part year return, death in year", + "13": "Nil9, out of scope and no UK activity"} + + df["status"] = df["combined_error_marker"].map(error_marker_map) + + return df[["period", "reference", "status"]] + + +def convert_qv_to_responses(df): + """ + Converts a dataframe from a qv file from CSW and returns a dataframe that + looks like a responses table in from an SPP snapshot. Parameters ---------- - cp_path : str - Filepath to folder containing cp data. - qv_path : str - Filepath to folder containing qv data. - output_path : str - Filepath to save json file. - column_map : dict - Dictionary containing desired columns from qv and cp data as keys and their - desired names as values. - period : str - Date to filter output on (YYYY-MM-DD). - period_range : str - Number of months from the period and previous to include in the output. - """ - qv_and_cp = get_qv_and_cp_data(cp_path, qv_path) - - qv_and_cp["period"] = convert_column_to_datetime(qv_and_cp["period"]) - - period = pd.Timestamp(period) - - qv_and_cp = qv_and_cp[ - (qv_and_cp["period"] > period - pd.DateOffset(months=period_range)) - & (qv_and_cp["period"] <= period) - ] + df : pd.DataFrame + DataFrame from a qv file - qv_and_cp["period"] = qv_and_cp["period"].dt.strftime("%Y%m") - - qv_and_cp = qv_and_cp[column_map.keys()].rename(columns=column_map) - - qv_and_cp.to_json(f"{output_path}_{period.strftime('%Y%m')}_{period_range}.json") + Returns + ------- + pd.DataFrame + Dataframe that looks like a responses table from a snapshot. + """ + + rename_columns = {"question_no": "questionnumber", + "returned_value": "response", + "adjusted_value": "adjustedresponse"} + + out_columns = ["reference"] + list(rename_columns.keys()) + + return df[out_columns].rename(rename_columns)