Skip to content

Commit

Permalink
Support metadata_log_entries metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
rshkv committed Dec 25, 2024
1 parent 8ca0ab6 commit 067ada8
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 8 deletions.
143 changes: 141 additions & 2 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
use std::sync::Arc;

use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::types::{Int32Type, Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};

use crate::spec::TableMetadataRef;
use crate::spec::{SnapshotRef, TableMetadataRef};
use crate::table::Table;
use crate::Result;

Expand All @@ -36,20 +36,27 @@ use crate::Result;
#[derive(Debug)]
pub struct MetadataScan {
metadata_ref: TableMetadataRef,
metadata_location: Option<String>,
}

impl MetadataScan {
/// Creates a new metadata scan.
pub fn new(table: &Table) -> Self {
Self {
metadata_ref: table.metadata_ref(),
metadata_location: table.metadata_location().map(String::from),
}
}

/// Returns the snapshots of the table.
pub fn snapshots(&self) -> Result<RecordBatch> {
SnapshotsTable::scan(self)
}

/// Return the metadata log entries of the table.
pub fn metadata_log_entries(&self) -> Result<RecordBatch> {
MetadataLogEntriesTable::scan(self)
}
}

/// Table metadata scan.
Expand Down Expand Up @@ -137,6 +144,87 @@ impl MetadataTable for SnapshotsTable {
}
}

/// Metadata log entries table.
///
/// Use to inspect the current and historical metadata files in the table.
/// Contains every metadata file and the time it was added. For each metadata
/// file, the table contains information about the latest snapshot at the time.
pub struct MetadataLogEntriesTable;

impl MetadataLogEntriesTable {
fn snapshot_id_as_of_time(
table_metadata: &TableMetadataRef,
timestamp_ms_inclusive: i64,
) -> Option<&SnapshotRef> {
let mut snapshot_id = None;
// The table metadata snapshot log is chronological
for log_entry in table_metadata.history() {
if log_entry.timestamp_ms <= timestamp_ms_inclusive {
snapshot_id = Some(log_entry.snapshot_id);
}
}
snapshot_id.and_then(|id| table_metadata.snapshot_by_id(id))
}
}

impl MetadataTable for MetadataLogEntriesTable {
fn schema() -> Schema {
Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
false,
),
Field::new("file", DataType::Utf8, false),
Field::new("latest_snapshot_id", DataType::Int64, true),
Field::new("latest_schema_id", DataType::Int32, true),
Field::new("latest_sequence_number", DataType::Int64, true),
])
}

fn scan(scan: &MetadataScan) -> Result<RecordBatch> {
let mut timestamp =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut file = StringBuilder::new();
let mut latest_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut latest_schema_id = PrimitiveBuilder::<Int32Type>::new();
let mut latest_sequence_number = PrimitiveBuilder::<Int64Type>::new();

let mut append_metadata_log_entry = |timestamp_ms: i64, metadata_file: &str| {
timestamp.append_value(timestamp_ms);
file.append_value(metadata_file);

let snapshot =
MetadataLogEntriesTable::snapshot_id_as_of_time(&scan.metadata_ref, timestamp_ms);
latest_snapshot_id.append_option(snapshot.map(|s| s.snapshot_id()));
latest_schema_id.append_option(snapshot.and_then(|s| s.schema_id()));
latest_sequence_number.append_option(snapshot.map(|s| s.sequence_number()));
};

for metadata_log_entry in scan.metadata_ref.metadata_log() {
append_metadata_log_entry(
metadata_log_entry.timestamp_ms,
&metadata_log_entry.metadata_file,
);
}

if let Some(current_metadata_location) = &scan.metadata_location {
append_metadata_log_entry(
scan.metadata_ref.last_updated_ms(),
&current_metadata_location,
);
}

Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![
Arc::new(timestamp.finish()),
Arc::new(file.finish()),
Arc::new(latest_snapshot_id.finish()),
Arc::new(latest_schema_id.finish()),
Arc::new(latest_sequence_number.finish()),
])?)
}
}

#[cfg(test)]
mod tests {
use expect_test::{expect, Expect};
Expand Down Expand Up @@ -262,4 +350,55 @@ mod tests {
Some("committed_at"),
);
}

#[test]
fn test_metadata_log_entries_table() {
let table = TableTestFixture::new().table;
let record_batch = table.metadata_scan().metadata_log_entries().unwrap();

// Check the current metadata location is included
let current_metadata_location = table.metadata_location().unwrap();
assert!(record_batch
.column_by_name("file")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
.any(|location| location.is_some_and(|l| l.eq(current_metadata_location))));

check_record_batch(
record_batch,
expect![[r#"
Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
expect![[r#"
timestamp: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
[
1970-01-01T00:25:15.100+00:00,
2020-10-14T01:22:53.590+00:00,
],
file: (skipped),
latest_snapshot_id: PrimitiveArray<Int64>
[
null,
3055729675574597004,
],
latest_schema_id: PrimitiveArray<Int32>
[
null,
1,
],
latest_sequence_number: PrimitiveArray<Int64>
[
null,
1,
]"#]],
&["file"],
Some("timestamp"),
);
}
}
14 changes: 8 additions & 6 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,19 +993,21 @@ pub mod tests {
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
// This is a past metadata location in the metadata log
let table_metadata1_location = table_location.join("metadata/v1.json");
// This is the actual location of current metadata
let template_json_location = format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
);

let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

let table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let template_json_str = fs::read_to_string(&template_json_location).unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("manifest_list_1_location", &manifest_list1_location);
Expand All @@ -1020,7 +1022,7 @@ pub mod tests {
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.metadata_location(template_json_location)
.build()
.unwrap();

Expand Down

0 comments on commit 067ada8

Please sign in to comment.