diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs index 6e47eb796..a33ccf9fc 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_scan.rs @@ -20,11 +20,11 @@ use std::sync::Arc; use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; -use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::types::{Int32Type, Int64Type, TimestampMillisecondType}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use crate::spec::TableMetadataRef; +use crate::spec::{SnapshotRef, TableMetadataRef}; use crate::table::Table; use crate::Result; @@ -36,6 +36,7 @@ use crate::Result; #[derive(Debug)] pub struct MetadataScan { metadata_ref: TableMetadataRef, + metadata_location: Option, } impl MetadataScan { @@ -43,6 +44,7 @@ impl MetadataScan { pub fn new(table: &Table) -> Self { Self { metadata_ref: table.metadata_ref(), + metadata_location: table.metadata_location().map(String::from), } } @@ -50,6 +52,11 @@ impl MetadataScan { pub fn snapshots(&self) -> Result { SnapshotsTable::scan(self) } + + /// Return the metadata log entries of the table. + pub fn metadata_log_entries(&self) -> Result { + MetadataLogEntriesTable::scan(self) + } } /// Table metadata scan. @@ -137,6 +144,87 @@ impl MetadataTable for SnapshotsTable { } } +/// Metadata log entries table. +/// +/// Use to inspect the current and historical metadata files in the table. +/// Contains every metadata file and the time it was added. For each metadata +/// file, the table contains information about the latest snapshot at the time. +pub struct MetadataLogEntriesTable; + +impl MetadataLogEntriesTable { + fn snapshot_id_as_of_time( + table_metadata: &TableMetadataRef, + timestamp_ms_inclusive: i64, + ) -> Option<&SnapshotRef> { + let mut snapshot_id = None; + // The table metadata snapshot log is chronological + for log_entry in table_metadata.history() { + if log_entry.timestamp_ms <= timestamp_ms_inclusive { + snapshot_id = Some(log_entry.snapshot_id); + } + } + snapshot_id.and_then(|id| table_metadata.snapshot_by_id(id)) + } +} + +impl MetadataTable for MetadataLogEntriesTable { + fn schema() -> Schema { + Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("file", DataType::Utf8, false), + Field::new("latest_snapshot_id", DataType::Int64, true), + Field::new("latest_schema_id", DataType::Int32, true), + Field::new("latest_sequence_number", DataType::Int64, true), + ]) + } + + fn scan(scan: &MetadataScan) -> Result { + let mut timestamp = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut file = StringBuilder::new(); + let mut latest_snapshot_id = PrimitiveBuilder::::new(); + let mut latest_schema_id = PrimitiveBuilder::::new(); + let mut latest_sequence_number = PrimitiveBuilder::::new(); + + let mut append_metadata_log_entry = |timestamp_ms: i64, metadata_file: &str| { + timestamp.append_value(timestamp_ms); + file.append_value(metadata_file); + + let snapshot = + MetadataLogEntriesTable::snapshot_id_as_of_time(&scan.metadata_ref, timestamp_ms); + latest_snapshot_id.append_option(snapshot.map(|s| s.snapshot_id())); + latest_schema_id.append_option(snapshot.and_then(|s| s.schema_id())); + latest_sequence_number.append_option(snapshot.map(|s| s.sequence_number())); + }; + + for metadata_log_entry in scan.metadata_ref.metadata_log() { + append_metadata_log_entry( + metadata_log_entry.timestamp_ms, + &metadata_log_entry.metadata_file, + ); + } + + if let Some(current_metadata_location) = &scan.metadata_location { + append_metadata_log_entry( + scan.metadata_ref.last_updated_ms(), + ¤t_metadata_location, + ); + } + + Ok(RecordBatch::try_new(Arc::new(Self::schema()), vec![ + Arc::new(timestamp.finish()), + Arc::new(file.finish()), + Arc::new(latest_snapshot_id.finish()), + Arc::new(latest_schema_id.finish()), + Arc::new(latest_sequence_number.finish()), + ])?) + } +} + #[cfg(test)] mod tests { use expect_test::{expect, Expect}; @@ -262,4 +350,55 @@ mod tests { Some("committed_at"), ); } + + #[test] + fn test_metadata_log_entries_table() { + let table = TableTestFixture::new().table; + let record_batch = table.metadata_scan().metadata_log_entries().unwrap(); + + // Check the current metadata location is included + let current_metadata_location = table.metadata_location().unwrap(); + assert!(record_batch + .column_by_name("file") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .any(|location| location.is_some_and(|l| l.eq(current_metadata_location)))); + + check_record_batch( + record_batch, + expect![[r#" + Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + timestamp: PrimitiveArray + [ + 1970-01-01T00:25:15.100+00:00, + 2020-10-14T01:22:53.590+00:00, + ], + file: (skipped), + latest_snapshot_id: PrimitiveArray + [ + null, + 3055729675574597004, + ], + latest_schema_id: PrimitiveArray + [ + null, + 1, + ], + latest_sequence_number: PrimitiveArray + [ + null, + 1, + ]"#]], + &["file"], + Some("timestamp"), + ); + } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index f5a213e99..f14e05421 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -993,7 +993,13 @@ pub mod tests { let table_location = tmp_dir.path().join("table1"); let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); + // This is a past metadata location in the metadata log let table_metadata1_location = table_location.join("metadata/v1.json"); + // This is the actual location of current metadata + let template_json_location = format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + ); let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) .unwrap() @@ -1001,11 +1007,7 @@ pub mod tests { .unwrap(); let table_metadata = { - let template_json_str = fs::read_to_string(format!( - "{}/testdata/example_table_metadata_v2.json", - env!("CARGO_MANIFEST_DIR") - )) - .unwrap(); + let template_json_str = fs::read_to_string(&template_json_location).unwrap(); let mut context = Context::new(); context.insert("table_location", &table_location); context.insert("manifest_list_1_location", &manifest_list1_location); @@ -1020,7 +1022,7 @@ pub mod tests { .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) - .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .metadata_location(template_json_location) .build() .unwrap();