Skip to content

Commit

Permalink
fix: add sizeInBytes to _last_checkpoint and change size to # of …
Browse files Browse the repository at this point in the history
…actions (#1477)

# Description
The `size` field should be the number of actions stored in the
checkpoint while `sizeInBytes` is used for the total size in bytes.

Added `CheckPointBuilder` to make the creation of these easier to use.

# Related Issue(s)
- Closes #1468 

# Documentation


https://github.com/delta-io/delta/blob/master/PROTOCOL.md#last-checkpoint-file
  • Loading branch information
cmackenzie1 authored Jun 20, 2023
1 parent 0dda99b commit e5dd8e2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 20 deletions.
25 changes: 15 additions & 10 deletions rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
//! Implementation for writing delta checkpoints.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::iter::Iterator;
use std::ops::Add;

use arrow::datatypes::Schema as ArrowSchema;
use arrow::error::ArrowError;
use arrow::json::ReaderBuilder;

use chrono::{DateTime, Datelike, Duration, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
Expand All @@ -12,17 +18,13 @@ use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
use regex::Regex;
use serde_json::Value;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::iter::Iterator;
use std::ops::Add;

use super::{Action, Add as AddAction, MetaData, Protocol, ProtocolError, Txn};
use crate::delta_arrow::delta_log_schema_for_table;
use crate::schema::*;
use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::{open_table_with_version, time_utils, CheckPoint, DeltaTable};
use crate::{schema::*, CheckPointBuilder};

type SchemaPath = Vec<String>;

Expand Down Expand Up @@ -118,9 +120,7 @@ async fn create_checkpoint_for(
let last_checkpoint_path = storage.log_path().child("_last_checkpoint");

debug!("Writing parquet bytes to checkpoint buffer.");
let parquet_bytes = parquet_bytes_from_state(state)?;
let size = parquet_bytes.len() as i64;
let checkpoint = CheckPoint::new(version, size, None);
let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state)?;

let file_name = format!("{version:020}.checkpoint.parquet");
let checkpoint_path = storage.log_path().child(file_name);
Expand Down Expand Up @@ -281,7 +281,9 @@ pub async fn cleanup_expired_logs_for(
}
}

fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, ProtocolError> {
fn parquet_bytes_from_state(
state: &DeltaTableState,
) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> {
let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?;

let partition_col_data_types = current_metadata.get_partition_col_data_types();
Expand Down Expand Up @@ -375,7 +377,10 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Pro
let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");

Ok(bytes::Bytes::from(bytes))
let checkpoint = CheckPointBuilder::new(state.version(), jsons.len() as i64)
.with_size_in_bytes(bytes.len() as i64)
.build();
Ok((checkpoint, bytes::Bytes::from(bytes)))
}

fn checkpoint_add_from_state(
Expand Down
12 changes: 2 additions & 10 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,11 +791,7 @@ pub(crate) async fn find_latest_check_point_for_version(
continue;
}
if cp.is_none() || curr_ver > cp.unwrap().version {
cp = Some(CheckPoint {
version: curr_ver,
size: 0,
parts: None,
});
cp = Some(CheckPoint::new(curr_ver, 0, None));
}
continue;
}
Expand All @@ -810,11 +806,7 @@ pub(crate) async fn find_latest_check_point_for_version(
if cp.is_none() || curr_ver > cp.unwrap().version {
let parts_str = captures.get(2).unwrap().as_str();
let parts = parts_str.parse().unwrap();
cp = Some(CheckPoint {
version: curr_ver,
size: 0,
parts: Some(parts),
});
cp = Some(CheckPoint::new(curr_ver, 0, Some(parts)));
}
continue;
}
Expand Down
65 changes: 65 additions & 0 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,71 @@ pub use crate::builder::{DeltaTableBuilder, DeltaTableConfig, DeltaVersion};
pub struct CheckPoint {
/// Delta table version
pub(crate) version: i64, // 20 digits decimals
/// The number of actions that are stored in the checkpoint.
pub(crate) size: i64,
/// The number of fragments if the last checkpoint was written in multiple parts. This field is optional.
pub(crate) parts: Option<u32>, // 10 digits decimals
/// The number of bytes of the checkpoint. This field is optional.
pub(crate) size_in_bytes: Option<i64>,
/// The number of AddFile actions in the checkpoint. This field is optional.
pub(crate) num_of_add_files: Option<i64>,
}

/// Builder for CheckPoint
pub struct CheckPointBuilder {
/// Delta table version
pub(crate) version: i64, // 20 digits decimals
/// The number of actions that are stored in the checkpoint.
pub(crate) size: i64,
/// The number of fragments if the last checkpoint was written in multiple parts. This field is optional.
pub(crate) parts: Option<u32>, // 10 digits decimals
/// The number of bytes of the checkpoint. This field is optional.
pub(crate) size_in_bytes: Option<i64>,
/// The number of AddFile actions in the checkpoint. This field is optional.
pub(crate) num_of_add_files: Option<i64>,
}

impl CheckPointBuilder {
/// Creates a new [`CheckPointBuilder`] instance with the provided `version` and `size`.
/// Size is the total number of actions in the checkpoint. See size_in_bytes for total size in bytes.
pub fn new(version: i64, size: i64) -> Self {
CheckPointBuilder {
version,
size,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
}
}

/// The number of fragments if the last checkpoint was written in multiple parts. This field is optional.
pub fn with_parts(mut self, parts: u32) -> Self {
self.parts = Some(parts);
self
}

/// The number of bytes of the checkpoint. This field is optional.
pub fn with_size_in_bytes(mut self, size_in_bytes: i64) -> Self {
self.size_in_bytes = Some(size_in_bytes);
self
}

/// The number of AddFile actions in the checkpoint. This field is optional.
pub fn with_num_of_add_files(mut self, num_of_add_files: i64) -> Self {
self.num_of_add_files = Some(num_of_add_files);
self
}

/// Build the final [`CheckPoint`] struct.
pub fn build(self) -> CheckPoint {
CheckPoint {
version: self.version,
size: self.size,
parts: self.parts,
size_in_bytes: self.size_in_bytes,
num_of_add_files: self.num_of_add_files,
}
}
}

impl CheckPoint {
Expand All @@ -53,6 +116,8 @@ impl CheckPoint {
version,
size,
parts,
size_in_bytes: None,
num_of_add_files: None,
}
}
}
Expand Down

0 comments on commit e5dd8e2

Please sign in to comment.