Skip to content

Commit

Permalink
Spark 3.3: IcebergSource extends SessionConfigSupport (#11625)
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 authored Nov 23, 2024
1 parent 5e09cdc commit eddf9a1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
import org.apache.spark.sql.connector.catalog.SupportsCatalogOptions;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -61,7 +62,8 @@
* <p>The above list is in order of priority. For example: a matching catalog will take priority
* over any namespace resolution.
*/
public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions {
public class IcebergSource
implements DataSourceRegister, SupportsCatalogOptions, SessionConfigSupport {
private static final String DEFAULT_CATALOG_NAME = "default_iceberg";
private static final String DEFAULT_CACHE_CATALOG_NAME = "default_cache_iceberg";
private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME;
Expand All @@ -80,6 +82,11 @@ public String shortName() {
return "iceberg";
}

@Override
public String keyPrefix() {
return shortName();
}

@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,52 @@ private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier)
assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected);
}

@Test
public void testSessionConfigSupport() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
TableIdentifier tableIdentifier = TableIdentifier.of("db", "session_config_table");
Table table = createTable(tableIdentifier, SCHEMA, spec);

List<SimpleRecord> initialRecords =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));

Dataset<Row> df = spark.createDataFrame(initialRecords, SimpleRecord.class);

df.select("id", "data")
.write()
.format("iceberg")
.mode(SaveMode.Append)
.save(loadLocation(tableIdentifier));

long s1 = table.currentSnapshot().snapshotId();

withSQLConf(
// set write option through session configuration
ImmutableMap.of("spark.datasource.iceberg.snapshot-property.foo", "bar"),
() -> {
df.select("id", "data")
.write()
.format("iceberg")
.mode(SaveMode.Append)
.save(loadLocation(tableIdentifier));
});

table.refresh();
assertThat(table.currentSnapshot().summary()).containsEntry("foo", "bar");

withSQLConf(
// set read option through session configuration
ImmutableMap.of("spark.datasource.iceberg.snapshot-id", String.valueOf(s1)),
() -> {
Dataset<Row> result = spark.read().format("iceberg").load(loadLocation(tableIdentifier));
List<SimpleRecord> actual = result.as(Encoders.bean(SimpleRecord.class)).collectAsList();
assertThat(actual)
.as("Rows must match")
.containsExactlyInAnyOrderElementsOf(initialRecords);
});
}

private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =
Expand Down

0 comments on commit eddf9a1

Please sign in to comment.