Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use a datetime column in flights-3m.parquet #642

Merged
merged 7 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified data/flights-3m.parquet
Binary file not shown.
4 changes: 2 additions & 2 deletions datapackage.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
}
],
"version": "2.11.0",
"created": "2024-12-04T12:47:49.897600+00:00",
"created": "2024-12-06T16:14:38.044099+00:00",
"resources": [
{
"name": "7zip.png",
Expand Down Expand Up @@ -1060,7 +1060,7 @@
"fields": [
{
"name": "date",
"type": "integer"
"type": "datetime"
},
{
"name": "delay",
Expand Down
121 changes: 43 additions & 78 deletions scripts/flights.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "pandas",
# "pyarrow>=14.0.0",
# ]
# ///
"""
Process U.S. DOT On-Time Flight Performance data into a simplified CSV/JSON/Parquet format.

Expand Down Expand Up @@ -252,6 +259,7 @@ def process_flights_data(
df: pd.DataFrame,
num_rows: Optional[int] = None,
random_seed: int = 42,
datetime_convert: bool = True,
datetime_format: DateTimeFormat = DateTimeFormat.MMDDHHMM,
flag_date_changes: bool = False,
columns: Optional[List[str]] = None,
Expand Down Expand Up @@ -345,7 +353,8 @@ def process_flights_data(
result['date_changed'] = df['date_changed']

# Format datetime according to specified format
result = format_datetime(result, datetime_format)
if datetime_convert:
result = format_datetime(result, datetime_format)

# Handle sampling if requested
if num_rows is not None and num_rows < len(result):
Expand Down Expand Up @@ -446,35 +455,6 @@ def print_verbose_stats(stats: Dict[str, Any]) -> None:
logging.info(f" {', '.join(stats['airports']['destinations'])}")
logging.info("\n")

def save_output(
df: pd.DataFrame,
output_format: OutputFormat,
base_filename: str,
datetime_format: DateTimeFormat,
verbose: bool = False,
parquet_config: Optional[ParquetConfig] = None
) -> None:
"""Save the DataFrame in the specified format and optionally show statistics."""
if verbose:
stats = get_dataset_stats(df, datetime_format)
print_verbose_stats(stats)

if output_format == OutputFormat.CSV:
output_file = f"{base_filename}.csv"
df.to_csv(output_file, index=False, encoding='utf-8')
elif output_format == OutputFormat.JSON:
output_file = f"{base_filename}.json"
records = df.to_dict(orient='records')
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(records, f, separators=(',', ':'))
else: # PARQUET
if parquet_config is None:
parquet_config = ParquetConfig()
output_file = f"{base_filename}.parquet"
save_as_parquet(df, output_file, parquet_config)

logging.info(f"Successfully created {output_file} with {len(df)} rows")

def add_parquet_arguments(parser: argparse.ArgumentParser) -> None:
"""Add Parquet-specific arguments to the parser."""
parquet_group = parser.add_argument_group('Parquet options')
Expand Down Expand Up @@ -651,75 +631,58 @@ def load_zip_files(pattern: str) -> pd.DataFrame:
return pd.concat(dfs, ignore_index=True)


def save_as_parquet(
df: pd.DataFrame,
filename: str,
config: ParquetConfig
) -> None:
def save_as_parquet(df: pd.DataFrame, filename: str, config: ParquetConfig) -> None:
"""Save DataFrame as Parquet file with specified configuration."""
# Convert pandas DataFrame to Arrow Table
table = pa.Table.from_pandas(df, preserve_index=False)

# Prepare write options
write_options = {
'compression': config.compression,
'use_dictionary': config.enable_dictionary,
'write_statistics': config.write_statistics,
}

# Add ZSTD compression level if using ZSTD
if config.compression == 'zstd':
write_options['compression_level'] = config.compression_level

# Write the Parquet file
pq.write_table(
table,
is_zstd = config.compression == "zstd"
kwds = {"compression_level": config.compression_level} if is_zstd else {}
df.to_parquet(
filename,
**write_options
compression=config.compression,
index=False,
use_dictionary=config.enable_dictionary,
write_statistics=config.write_statistics,
**kwds,
)

# Get and log file statistics
file_stats = os.stat(filename)
parquet_file = pq.ParquetFile(filename)
logging.info(f"Parquet file statistics:")
logging.info(f" - File size: {file_stats.st_size / (1024*1024):.2f} MB")
logging.info(f" - Number of row groups: {parquet_file.num_row_groups}")
logging.info(f" - Compression: {config.compression}" +
(f" (level {config.compression_level})" if config.compression == 'zstd' else ""))
logging.info(f" - Row group size: {config.row_group_size / (1024*1024):.0f} MB")
level = f" (level {config.compression_level})" if is_zstd else ""
msg = (
f"Parquet file statistics:\n"
f" - File size: {file_stats.st_size / (1024*1024):.2f} MB\n"
f" - Number of row groups: {pq.ParquetFile(filename).num_row_groups}\n"
f" - Compression: {config.compression}{level}\n"
f" - Row group size: {config.row_group_size / (1024*1024):.0f} MB"
)
logging.info(msg)

def save_output(
df: pd.DataFrame,
output_format: OutputFormat,
base_filename: str,
datetime_format: DateTimeFormat,
parquet_config: ParquetConfig,
verbose: bool = False,
parquet_config: Optional[ParquetConfig] = None
) -> None:
"""Save the DataFrame in the specified format and optionally show statistics."""
if verbose:
stats = get_dataset_stats(df, datetime_format)
print_verbose_stats(stats)

output_filename = f"{base_filename}.{output_format.value}"
if output_format == OutputFormat.CSV:
output_file = f"{base_filename}.csv"
df.to_csv(output_file, index=False, encoding='utf-8')
df.to_csv(output_filename, index=False)
elif output_format == OutputFormat.JSON:
output_file = f"{base_filename}.json"
records = df.to_dict(orient='records')
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(records, f, separators=(',', ':'))
s = json.dumps(df.to_dict(orient="records"), separators=(",", ":"))
Path(output_filename).write_text(s, encoding="utf-8")
else: # PARQUET
if parquet_config is None:
parquet_config = ParquetConfig()
output_file = f"{base_filename}.parquet"
save_as_parquet(df, output_file, parquet_config)

logging.info(f"Successfully created {output_file} with {len(df)} rows")
save_as_parquet(df, output_filename, parquet_config)

logging.info(f"Successfully created {output_filename} with {len(df)} rows")

def main():
args = parse_args()
parquet_config = ParquetConfig.from_args(args) if args.format == 'parquet' else None
is_parquet = args.format == 'parquet'
parquet_config = ParquetConfig.from_args(args) if is_parquet else ParquetConfig()
base_filename = args.output
zip_pattern = str(Path(args.input_dir) / '*On_Time_Reporting*.zip')

Expand All @@ -728,11 +691,13 @@ def main():
columns = args.columns.split(',') if args.columns else None

raw_df = load_zip_files(zip_pattern)
datetime_format = DateTimeFormat(args.datetime_format)
processed_df = process_flights_data(
raw_df,
num_rows=args.num_rows,
random_seed=args.seed,
datetime_format=DateTimeFormat(args.datetime_format),
datetime_convert=not is_parquet,
datetime_format=datetime_format,
flag_date_changes=args.flag_date_changes,
columns=columns,
start_date=args.start_date,
Expand All @@ -743,7 +708,7 @@ def main():
processed_df,
output_format=OutputFormat(args.format),
base_filename=base_filename,
datetime_format=DateTimeFormat(args.datetime_format),
datetime_format=datetime_format,
verbose=args.verbose,
parquet_config=parquet_config
)
Expand Down
Loading