Skip to content

Commit

Permalink
feat: Support RemovePartitionSpec in REST catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
advancedxy committed Sep 23, 2024
1 parent 5a3ec7c commit a85edcf
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 15 deletions.
9 changes: 7 additions & 2 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,21 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemoveUnusedSpecs implements MetadataUpdate {
class RemovePartitionSpecs implements MetadataUpdate {
private final Set<Integer> specIds;

public RemoveUnusedSpecs(Set<Integer> specIds) {
public RemovePartitionSpecs(Set<Integer> specIds) {
this.specIds = specIds;
}

public Set<Integer> specIds() {
return specIds;
}

@Override
public void applyTo(TableMetadata.Builder metadataBuilder) {
metadataBuilder.removeUnusedSpecsById(specIds);
}
}

class AddSortOrder implements MetadataUpdate {
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private MetadataUpdateParser() {}
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";

// AssignUUID
private static final String UUID = "uuid";
Expand Down Expand Up @@ -126,6 +127,9 @@ private MetadataUpdateParser() {}
// SetCurrentViewVersion
private static final String VIEW_VERSION_ID = "view-version-id";

// RemovePartitionSpecs
private static final String PARTITION_SPEC_IDS = "partition-spec-ids";

private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
Expand All @@ -149,6 +153,7 @@ private MetadataUpdateParser() {}
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
.buildOrThrow();

public static String toJson(MetadataUpdate metadataUpdate) {
Expand Down Expand Up @@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
writeSetCurrentViewVersionId(
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
break;
case REMOVE_PARTITION_SPECS:
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
break;
default:
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
return readAddViewVersion(jsonNode);
case SET_CURRENT_VIEW_VERSION:
return readCurrentViewVersionId(jsonNode);
case REMOVE_PARTITION_SPECS:
return readRemoveUnusedSpecs(jsonNode);
default:
throw new UnsupportedOperationException(
String.format("Cannot convert metadata update action to json: %s", action));
Expand Down Expand Up @@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId(
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
}

private static void writeRemovePartitionSpecs(
MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException {
JsonUtil.writeIntegerArray(PARTITION_SPEC_IDS, metadataUpdate.specIds(), gen);
}

private static MetadataUpdate readAssignUUID(JsonNode node) {
String uuid = JsonUtil.getString(UUID, node);
return new MetadataUpdate.AssignUUID(uuid);
Expand Down Expand Up @@ -596,4 +611,9 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) {
private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
}

private static MetadataUpdate readRemoveUnusedSpecs(JsonNode node) {
return new MetadataUpdate.RemovePartitionSpecs(
JsonUtil.getIntegerSet(PARTITION_SPEC_IDS, node));
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ Builder removeUnusedSpecs(Iterable<PartitionSpec> specsToRemove) {
specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove));
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
return this;
}

Expand All @@ -1144,7 +1144,7 @@ Builder removeUnusedSpecsById(Iterable<Integer> specIds) {
specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove));
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
return this;
}

Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/org/apache/iceberg/UpdateRequirements.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) {
update((MetadataUpdate.SetDefaultPartitionSpec) update);
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
update((MetadataUpdate.SetDefaultSortOrder) update);
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
update((MetadataUpdate.RemovePartitionSpecs) update);
}

return this;
Expand Down Expand Up @@ -173,6 +175,26 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
}
}

private void update(MetadataUpdate.RemovePartitionSpecs unused) {
// require that the default partition spec has not changed
if (!setSpecId) {
if (base != null && !isReplace) {
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
}
this.setSpecId = true;
}
// require that all the branch has not changed, so that old specs won't be written.
if (base != null && !isReplace) {
base.refs()
.forEach(
(name, ref) -> {
if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) {
require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId()));
}
});
}
}

private List<UpdateRequirement> build() {
return requirements.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.TestTemplate;

public class TestRemoveUnusedSpecs extends TestBase {
public class TestRemovePartitionSpecs extends TestBase {

@TestTemplate
public void testRemoveAllButCurrent() {
Expand Down
13 changes: 3 additions & 10 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.junit.jupiter.api.Assumptions;
Expand Down Expand Up @@ -1172,15 +1171,9 @@ public void testRemoveUnusedSpec() {
List<PartitionSpec> specsToRetain = remove.apply();
assertThat(specsToRetain).hasSize(1).map(PartitionSpec::specId).contains(currentSpecId);

if (catalog instanceof RESTCatalog) {
// RESTCatalog does not support maintenance operations yet
// TODO: Remove this once REST spec is updated to support remove unused spec
assertThatThrownBy(remove::commit);
} else {
remove.commit();
Table loaded = catalog.loadTable(TABLE);
assertThat(loaded.specs().values()).hasSameElementsAs(specsToRetain);
}
remove.commit();
Table loaded = catalog.loadTable(TABLE);
assertThat(loaded.specs().values()).hasSameElementsAs(specsToRetain);
}

@Test
Expand Down

0 comments on commit a85edcf

Please sign in to comment.