Skip to content

Commit

Permalink
desired functionality implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
robertswh committed Dec 22, 2024
1 parent 23a75ff commit 2e42d2a
Showing 1 changed file with 108 additions and 64 deletions.
172 changes: 108 additions & 64 deletions mbs_results/utilities/csw_to_spp_converter.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,66 @@
import fnmatch
from os import listdir
from os.path import isfile, join
from typing import List
import json
import uuid

import pandas as pd

from mbs_results.utilities.utils import convert_column_to_datetime

def create_snapshot(input_directory: str, periods: List[str], output_directory: str) -> pd.DataFrame:
"""
Reads qv and cp files, applies transformations and writes snapshot.
Parameters
----------
input_directory : str
Folder path to CSW files.
periods: List[str]
list of periods to include in the snapshot
def get_patern_df(filepath: str, pattern: str) -> pd.DataFrame:
"""Loads as pd dataframe all csv files with pattern.
Action
-------
Writes a json file in desired location that looks like a SPP snapshot
Example
-------
>>periods = [str(i) for i in range(202201, 202213)] + ["202301", "202302", "202303"]
>>input_directory = "path/mbs_anonymised_2024"
>>output_directory = "path/mbs-data"
>>create_snapshot(input_directory, periods, output_directory)
"""

qv_df = concat_files_from_pattern(input_directory, "qv*.csv", periods)
cp_df = concat_files_from_pattern(input_directory, "cp*.csv", periods)

responses = convert_qv_to_responses(qv_df)
contributors = convert_cp_to_contributors(cp_df)

output = {"id": input_directory + str(uuid.uuid4().hex),
"contributors": cp_df.to_dict("list"),
"responses": qv_df.to_dict("list")}

max_period = max([int(period) for period in periods])

with open(f'{output_directory}/snapshot_qv_cp_{max_period}_{len(periods)}.json', 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False, indent=4)


def concat_files_from_pattern(directory: str,
pattern: str,
periods: List[str]) -> pd.DataFrame:
"""
Loads as pd dataframe of all csv files with pattern and with periods specified in periods.
Parameters
----------
filepath : str
Filepath to folder containg desired files.
directory : str
Folder path to CSW files.
pattern : str
Regex pattern to filter files in the folder based on name.
periods: List[str]
list of periods to include in the snapshot
Returns
-------
Expand All @@ -24,82 +69,81 @@ def get_patern_df(filepath: str, pattern: str) -> pd.DataFrame:
"""

filenames = [
filename for filename in listdir(filepath) if isfile(join(filepath, filename))
filename for filename in listdir(directory) if ((isfile(join(directory, filename))) &
(filename[-10:-4] in periods))
]

filenames = fnmatch.filter(filenames, pattern)
df_list = [pd.read_csv(filepath + "/" + filename) for filename in filenames]
df_list = [pd.read_csv(directory + "/" + filename) for filename in filenames]
df = pd.concat(df_list, ignore_index=True)

return df


def get_qv_and_cp_data(
cp_path: str,
qv_path: str,
) -> pd.DataFrame:
"""Reads and joins qv and cp data.
def convert_cp_to_contributors(df):
"""
Converts a dataframe from a cp file from CSW and returns a dataframe that
looks like a contributors table in from an SPP snapshot.
Parameters
----------
cp_path : str
Filepath to folder containing cp data.
qv_path : str
Filepath to folder containing qv data.
df : pd.DataFrame
DataFrame from a cp file
Returns
-------
pd.DataFrame
Dataframe containing combined qv and cp data.
Dataframe that looks like a contributors table from a snapshot.
"""

qv_df = get_patern_df(qv_path, "qv*.csv")
cp_df = get_patern_df(cp_path, "cp*.csv")

qv_and_cp = pd.merge(qv_df, cp_df, how="left", on=["period", "reference"])

return qv_and_cp


def csw_to_spp(
cp_path: str,
qv_path: str,
output_path: str,
column_map: dict,
period: str,
period_range: int,
) -> None:
"""Combines cp and qv files, filters and renames columns based on a mapping, and
then saves the output as a json file.
df["combined_error_marker"] = df.apply(
lambda x: x["error_mkr"] if x["response_type"]<=2 else str(x["response_type"]),
axis=1
)

error_marker_map = {"C":"Clear",
"O": "Clear - overridden",
"E": "Check needed",
#TODO: Should W map to check needed or something else?
"W": "Check needed",
#TODO: Check which ones below are used in SPP
"3": "Sample deletion",
"4": "Nil1, dead letter",
"5": "Nil2, combined return, zero response",
"6": "Nil3, out-of-scope",
"7": "Nil4, ceased trading",
"8": "Nil5, dormant",
"9": "Nil6, out-of-scope and insufficient data",
"10": "Nil7, in-scope but suspect data",
"11": "Dead",
"12": "Nil8, part year return, death in year",
"13": "Nil9, out of scope and no UK activity"}

df["status"] = df["combined_error_marker"].map(error_marker_map)

return df[["period", "reference", "status"]]


def convert_qv_to_responses(df):
"""
Converts a dataframe from a qv file from CSW and returns a dataframe that
looks like a responses table in from an SPP snapshot.
Parameters
----------
cp_path : str
Filepath to folder containing cp data.
qv_path : str
Filepath to folder containing qv data.
output_path : str
Filepath to save json file.
column_map : dict
Dictionary containing desired columns from qv and cp data as keys and their
desired names as values.
period : str
Date to filter output on (YYYY-MM-DD).
period_range : str
Number of months from the period and previous to include in the output.
"""
qv_and_cp = get_qv_and_cp_data(cp_path, qv_path)

qv_and_cp["period"] = convert_column_to_datetime(qv_and_cp["period"])

period = pd.Timestamp(period)

qv_and_cp = qv_and_cp[
(qv_and_cp["period"] > period - pd.DateOffset(months=period_range))
& (qv_and_cp["period"] <= period)
]
df : pd.DataFrame
DataFrame from a qv file
qv_and_cp["period"] = qv_and_cp["period"].dt.strftime("%Y%m")

qv_and_cp = qv_and_cp[column_map.keys()].rename(columns=column_map)

qv_and_cp.to_json(f"{output_path}_{period.strftime('%Y%m')}_{period_range}.json")
Returns
-------
pd.DataFrame
Dataframe that looks like a responses table from a snapshot.
"""

rename_columns = {"question_no": "questionnumber",
"returned_value": "response",
"adjusted_value": "adjustedresponse"}

out_columns = ["reference"] + list(rename_columns.keys())

return df[out_columns].rename(rename_columns)

0 comments on commit 2e42d2a

Please sign in to comment.