Skip to content

Commit

Permalink
update regex to find prev_perm in table_id (#52)
Browse files Browse the repository at this point in the history
* update regex to find prev_perm in table_id

* fix regex

* update old pipeline
  • Loading branch information
patrick-troy authored Oct 9, 2024
1 parent b804da5 commit c05b23d
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 5 deletions.
2 changes: 1 addition & 1 deletion liiatools/cin_census_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def create_current_view(archive: DataframeArchive, process_folder: FS) -> FS:

def create_reports(current_folder: FS, process_folder: FS):
export_folder = process_folder.makedirs("export", recreate=True)
aggregate = DataframeAggregator(current_folder, load_pipeline_config())
aggregate = DataframeAggregator(current_folder, load_pipeline_config(), dataset="cin")
aggregate_data = aggregate.current()

for report in ["PAN"]:
Expand Down
5 changes: 3 additions & 2 deletions liiatools/common/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ class DataframeAggregator:
Only tables and columns defined in the pipeline config are aggregated.
"""

def __init__(self, fs: FS, config: PipelineConfig):
def __init__(self, fs: FS, config: PipelineConfig, dataset: str):
self.fs = fs
self.config = config
self.dataset = dataset

def list_files(self) -> List[str]:
"""
Expand All @@ -37,7 +38,7 @@ def load_file(self, file) -> DataContainer:
Load a file from the current directory.
"""
data = DataContainer()
table_id = re.search(r"_([a-zA-Z0-9]*)\.", file)
table_id = re.search(fr"{self.dataset}_([a-zA-Z0-9_]*)\.", file)

for table_spec in self.config.table_list:
if table_id and table_id.group(1) == table_spec.id:
Expand Down
2 changes: 1 addition & 1 deletion liiatools/ssda903_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def create_current_view(archive: DataframeArchive, process_folder: FS) -> FS:

def create_reports(current_folder: FS, process_folder: FS):
export_folder = process_folder.makedirs("export", recreate=True)
aggregate = DataframeAggregator(current_folder, load_pipeline_config())
aggregate = DataframeAggregator(current_folder, load_pipeline_config(), dataset="ssda903")
aggregate_data = aggregate.current()

for report in ["PAN", "SUFFICIENCY"]:
Expand Down
2 changes: 1 addition & 1 deletion liiatools_pipeline/ops/common_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def create_reports(
f"current/{config.dataset}", recreate=True
)
log.info("Aggregating Data Frames...")
aggregate = DataframeAggregator(session_folder, pipeline_config(config))
aggregate = DataframeAggregator(session_folder, pipeline_config(config), config.dataset)
aggregate_data = aggregate.current()
log.debug(f"Using config: {config}")
for report in pipeline_config(config).retention_period.keys():
Expand Down

0 comments on commit c05b23d

Please sign in to comment.