diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 686b88e19..c10d904b6 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1285,7 +1285,7 @@ mod tests { .null_order(NullOrder::First) .build(), ) - .build() + .build_unbound() .unwrap(), ) .build(); diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 434ce4d05..bd579d598 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -17,20 +17,19 @@ //! Catalog API for Apache Iceberg -use serde_derive::{Deserialize, Serialize}; -use urlencoding::encode; - use crate::spec::{ FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, UnboundPartitionSpec, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; use async_trait::async_trait; +use serde_derive::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Debug; use std::mem::take; use std::ops::Deref; use typed_builder::TypedBuilder; +use urlencoding::encode; use uuid::Uuid; /// The catalog API for Iceberg Rust. @@ -866,7 +865,7 @@ mod tests { .transform(Transform::Bucket(4)) .build(), ) - .build() + .build_unbound() .unwrap(), }; diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index 01a1eddea..a4d2f7dc7 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -18,7 +18,12 @@ /*! * Sorting */ +use crate::error::Result; +use crate::spec::Schema; +use crate::{Error, ErrorKind}; +use core::fmt; use serde::{Deserialize, Serialize}; +use std::fmt::Formatter; use std::sync::Arc; use typed_builder::TypedBuilder; @@ -26,7 +31,7 @@ use super::transform::Transform; /// Reference to [`SortOrder`]. pub type SortOrderRef = Arc; -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)] /// Sort direction in a partition, either ascending or descending pub enum SortDirection { /// Ascending @@ -37,7 +42,16 @@ pub enum SortDirection { Descending, } -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +impl fmt::Display for SortDirection { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match *self { + SortDirection::Ascending => write!(f, "ascending"), + SortDirection::Descending => write!(f, "descending"), + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)] /// Describes the order of null values when sorted. pub enum NullOrder { #[serde(rename = "nulls-first")] @@ -48,6 +62,15 @@ pub enum NullOrder { Last, } +impl fmt::Display for NullOrder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match *self { + NullOrder::First => write!(f, "first"), + NullOrder::Last => write!(f, "last"), + } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)] #[serde(rename_all = "kebab-case")] /// Entry for every column that is to be sorted @@ -62,9 +85,20 @@ pub struct SortField { pub null_order: NullOrder, } +impl fmt::Display for SortField { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "SortField {{ source_id: {}, transform: {}, direction: {}, null_order: {} }}", + self.source_id, self.transform, self.direction, self.null_order + ) + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder, Default)] #[serde(rename_all = "kebab-case")] #[builder(setter(prefix = "with"))] +#[builder(build_fn(skip))] /// A sort order is defined by a sort order id and a list of sort fields. /// The order of the sort fields within the list defines the order in which the sort is applied to the data. pub struct SortOrder { @@ -77,11 +111,21 @@ pub struct SortOrder { } impl SortOrder { + const UNSORTED_ORDER_ID: i64 = 0; + /// Create sort order builder pub fn builder() -> SortOrderBuilder { SortOrderBuilder::default() } + /// Create an unbound unsorted order + pub fn unsorted_order() -> SortOrder { + SortOrder { + order_id: SortOrder::UNSORTED_ORDER_ID, + fields: Vec::new(), + } + } + /// Returns true if the sort order is unsorted. /// /// A [`SortOrder`] is unsorted if it has no sort fields. @@ -90,13 +134,82 @@ impl SortOrder { } } +impl SortOrderBuilder { + /// Creates a new unbound sort order. + pub fn build_unbound(&self) -> Result { + let fields = self.fields.clone().unwrap_or_default(); + return match (self.order_id, fields.as_slice()) { + (Some(SortOrder::UNSORTED_ORDER_ID) | None, []) => Ok(SortOrder::unsorted_order()), + (_, []) => Err(Error::new( + ErrorKind::Unexpected, + format!("Unsorted order ID must be {}", SortOrder::UNSORTED_ORDER_ID), + )), + (Some(SortOrder::UNSORTED_ORDER_ID), [..]) => Err(Error::new( + ErrorKind::Unexpected, + format!( + "Sort order ID {} is reserved for unsorted order", + SortOrder::UNSORTED_ORDER_ID + ), + )), + (maybe_order_id, [..]) => Ok(SortOrder { + order_id: maybe_order_id.unwrap_or(1), + fields: fields.to_vec(), + }), + }; + } + + /// Creates a new bound sort order. + pub fn build(&self, schema: Schema) -> Result { + let unbound_sort_order = self.build_unbound()?; + SortOrderBuilder::check_compatibility(unbound_sort_order, schema) + } + + /// Returns the given sort order if it is compatible with the given schema + fn check_compatibility(sort_order: SortOrder, schema: Schema) -> Result { + let sort_fields = &sort_order.fields; + for sort_field in sort_fields { + match schema.field_by_id(sort_field.source_id) { + None => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Cannot find source column for sort field: {sort_field}"), + )) + } + Some(source_field) => { + let source_type = source_field.field_type.as_ref(); + + if !source_type.is_primitive() { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Cannot sort by non-primitive source field: {source_type}"), + )); + } + + let field_transform = sort_field.transform; + if field_transform.result_type(source_type).is_err() { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Invalid source type {source_type} for transform {field_transform}" + ), + )); + } + } + } + } + + Ok(sort_order) + } +} + #[cfg(test)] mod tests { use super::*; + use crate::spec::{ListType, NestedField, PrimitiveType, Type}; #[test] - fn sort_field() { - let sort_field = r#" + fn test_sort_field() { + let spec = r#" { "transform": "bucket[4]", "source-id": 3, @@ -105,7 +218,7 @@ mod tests { } "#; - let field: SortField = serde_json::from_str(sort_field).unwrap(); + let field: SortField = serde_json::from_str(spec).unwrap(); assert_eq!(Transform::Bucket(4), field.transform); assert_eq!(3, field.source_id); assert_eq!(SortDirection::Descending, field.direction); @@ -113,8 +226,8 @@ mod tests { } #[test] - fn sort_order() { - let sort_order = r#" + fn test_sort_order() { + let spec = r#" { "order-id": 1, "fields": [ { @@ -131,7 +244,7 @@ mod tests { } "#; - let order: SortOrder = serde_json::from_str(sort_order).unwrap(); + let order: SortOrder = serde_json::from_str(spec).unwrap(); assert_eq!(Transform::Identity, order.fields[0].transform); assert_eq!(2, order.fields[0].source_id); assert_eq!(SortDirection::Ascending, order.fields[0].direction); @@ -142,4 +255,226 @@ mod tests { assert_eq!(SortDirection::Descending, order.fields[1].direction); assert_eq!(NullOrder::Last, order.fields[1].null_order); } + + #[test] + fn test_build_unbound_should_return_err_if_unsorted_order_does_not_have_an_order_id_of_zero() { + assert_eq!( + SortOrder::builder() + .with_order_id(1) + .build_unbound() + .expect_err("Expected an Err value") + .message(), + "Unsorted order ID must be 0" + ) + } + + #[test] + fn test_build_unbound_should_return_err_if_order_id_equals_zero_is_used_for_anything_other_than_unsorted_order( + ) { + assert_eq!( + SortOrder::builder() + .with_order_id(SortOrder::UNSORTED_ORDER_ID) + .with_sort_field( + SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build() + ) + .build_unbound() + .expect_err("Expected an Err value") + .message(), + "Sort order ID 0 is reserved for unsorted order" + ) + } + + #[test] + fn test_build_unbound_should_return_unsorted_sort_order() { + assert_eq!( + SortOrder::builder() + .with_order_id(SortOrder::UNSORTED_ORDER_ID) + .build_unbound() + .expect("Expected an Ok value"), + SortOrder::unsorted_order() + ) + } + + #[test] + fn test_build_unbound_should_return_sort_order_with_given_order_id_and_sort_fields() { + let sort_field = SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(); + + assert_eq!( + SortOrder::builder() + .with_order_id(2) + .with_sort_field(sort_field.clone()) + .build_unbound() + .expect("Expected an Ok value"), + SortOrder { + order_id: 2, + fields: vec![sort_field] + } + ) + } + + #[test] + fn test_build_unbound_should_return_sort_order_with_given_sort_fields_and_defaults_to_1_if_missing_an_order_id( + ) { + let sort_field = SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(); + + assert_eq!( + SortOrder::builder() + .with_sort_field(sort_field.clone()) + .build_unbound() + .expect("Expected an Ok value"), + SortOrder { + order_id: 1, + fields: vec![sort_field] + } + ) + } + + #[test] + fn test_build_should_return_err_if_sort_order_field_is_not_present_in_schema() { + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + let sort_order_builder_result = SortOrder::builder() + .with_sort_field( + SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(), + ) + .build(schema); + + assert_eq!( + sort_order_builder_result + .expect_err("Expected an Err value") + .message(), + "Cannot find source column for sort field: SortField { source_id: 2, transform: identity, direction: ascending, null_order: first }" + ) + } + + #[test] + fn test_build_should_return_err_if_source_field_is_not_a_primitive_type() { + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::List(ListType { + element_field: NestedField::list_element( + 2, + Type::Primitive(PrimitiveType::String), + true, + ) + .into(), + }), + ) + .into()]) + .build() + .unwrap(); + + let sort_order_builder_result = SortOrder::builder() + .with_sort_field( + SortField::builder() + .source_id(1) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(), + ) + .build(schema); + + assert_eq!( + sort_order_builder_result + .expect_err("Expected an Err value") + .message(), + "Cannot sort by non-primitive source field: list" + ) + } + + #[test] + fn test_build_should_return_err_if_source_field_type_is_not_supported_by_transform() { + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + let sort_order_builder_result = SortOrder::builder() + .with_sort_field( + SortField::builder() + .source_id(1) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Year) + .build(), + ) + .build(schema); + + assert_eq!( + sort_order_builder_result + .expect_err("Expected an Err value") + .message(), + "Invalid source type int for transform year" + ) + } + + #[test] + fn test_build_should_return_valid_sort_order() { + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let sort_field = SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(); + + let sort_order_builder_result = SortOrder::builder() + .with_sort_field(sort_field.clone()) + .build(schema); + + assert_eq!( + sort_order_builder_result.expect("Expected an Ok value"), + SortOrder { + order_id: 1, + fields: vec![sort_field], + } + ) + } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 9903d6e4b..a6eb05c6c 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -1097,7 +1097,10 @@ mod tests { .build() .unwrap(); - let sort_order = SortOrder::builder().with_order_id(0).build().unwrap(); + let sort_order = SortOrder::builder() + .with_order_id(0) + .build_unbound() + .unwrap(); let snapshot = Snapshot::builder() .with_snapshot_id(638933773299822130) @@ -1221,7 +1224,7 @@ mod tests { direction: SortDirection::Descending, null_order: NullOrder::Last, }) - .build() + .build_unbound() .unwrap(); let snapshot1 = Snapshot::builder() @@ -1346,7 +1349,7 @@ mod tests { direction: SortDirection::Descending, null_order: NullOrder::Last, }) - .build() + .build_unbound() .unwrap(); let expected = TableMetadata { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index b26581327..165fb8950 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -140,12 +140,13 @@ impl<'a> ReplaceSortOrderAction<'a> { /// Finished building the action and apply it to the transaction. pub fn apply(mut self) -> Result> { + let unbound_sort_order = SortOrder::builder() + .with_fields(self.sort_fields) + .build_unbound()?; + let updates = vec![ TableUpdate::AddSortOrder { - sort_order: SortOrder { - fields: self.sort_fields, - ..SortOrder::default() - }, + sort_order: unbound_sort_order, }, TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, ];