Skip to content

Commit

Permalink
API: Add function for removing unused specs from metadata.json
Browse files Browse the repository at this point in the history
Previously there was no way to remove partition specs from a table once they were
added. To fix this we add an api which searches through all reachable manifest
files and records their specsIds. Any specIds which do not find are marked for
removal which is done through a serializable commit.
  • Loading branch information
RussellSpitzer authored and advancedxy committed Sep 23, 2024
1 parent 4482565 commit 5a3ec7c
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 0 deletions.
31 changes: 31 additions & 0 deletions api/src/main/java/org/apache/iceberg/MetadataMaintenance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

/** APIs for table metadata maintenance, such as removing unused partition specs. */
public interface MetadataMaintenance {
/**
* Remove any partition specs from the Metadata that are no longer used in any data files.
*
* <p>Always preserves the current default spec even if it has not yet been used.
*
* @return a new {@link RemoveUnusedSpecs}
*/
RemoveUnusedSpecs removeUnusedSpecs();
}
29 changes: 29 additions & 0 deletions api/src/main/java/org/apache/iceberg/RemoveUnusedSpecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;

/**
* API for removing partition specs from the metadata which are not the default spec and no longer
* refer to any datafiles in the table.
*
* <p>{@link #apply()} returns the specs that will remain if committed on the current metadata
*/
public interface RemoveUnusedSpecs extends PendingUpdate<List<PartitionSpec>> {}
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ default IncrementalChangelogScan newIncrementalChangelogScan() {
*/
AppendFiles newAppend();

/**
* Create a new {@link MetadataMaintenance maintenance API} to perform metadata maintenance
* operations.
*
* @return a new {@link MetadataMaintenance}
*/
default MetadataMaintenance maintenance() {
throw new UnsupportedOperationException(
"Maintenance operations are not supported by " + getClass().getName());
}

/**
* Create a new {@link AppendFiles append API} to add files to this table and commit.
*
Expand Down
32 changes: 32 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataMaintenance.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

class BaseMetadataMaintenance implements MetadataMaintenance {
private final TableOperations ops;

BaseMetadataMaintenance(TableOperations ops) {
this.ops = ops;
}

@Override
public RemoveUnusedSpecs removeUnusedSpecs() {
return new BaseRemoveUnusedSpecs(ops);
}
}
113 changes: 113 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRemoveUnusedSpecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;

/**
* Implementation of RemoveUnusedSpecs API to remove unused partition specs.
*
* <p>When committing, these changes will be applied to the latest table metadata. Commit conflicts
* will be resolved by recalculating which specs are no longer in use again in the latest metadata
* and retrying.
*/
class BaseRemoveUnusedSpecs implements RemoveUnusedSpecs {
private final TableOperations ops;

private TableMetadata base;

BaseRemoveUnusedSpecs(TableOperations ops) {
this.ops = ops;
this.base = ops.current();
}

@Override
public List<PartitionSpec> apply() {
TableMetadata newMetadata = internalApply();
return newMetadata.specs();
}

@Override
public void commit() {
this.base = ops.refresh();
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(
taskOps -> {
TableMetadata newMetadata = internalApply();
taskOps.commit(base, newMetadata);
});
}

private Iterable<ManifestFile> reachableManifests() {
Iterable<Snapshot> snapshots = base.snapshots();
Iterable<Iterable<ManifestFile>> manifestIterables =
Iterables.transform(snapshots, snapshot -> snapshot.allManifests(ops.io()));
try (CloseableIterable<ManifestFile> iterable =
new ParallelIterable<>(manifestIterables, ThreadPools.getWorkerPool())) {
return Sets.newHashSet(iterable);
} catch (IOException e) {
throw new UncheckedIOException("Failed to close parallel iterable", e);
}
}

private TableMetadata internalApply() {
this.base = ops.refresh();
List<PartitionSpec> specs = base.specs();
int currentSpecId = base.defaultSpecId();

Set<Integer> specsInUse =
Sets.newHashSet(Iterables.transform(reachableManifests(), ManifestFile::partitionSpecId));

specsInUse.add(currentSpecId);

List<PartitionSpec> specsToRemove =
specs.stream()
.filter(spec -> !specsInUse.contains(spec.specId()))
.collect(Collectors.toList());

return TableMetadata.buildFrom(base).removeUnusedSpecs(specsToRemove).build();
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public ReplaceSortOrder replaceSortOrder() {
return new BaseReplaceSortOrder(ops);
}

@Override
public MetadataMaintenance maintenance() {
return new BaseMetadataMaintenance(ops);
}

@Override
public UpdateLocation updateLocation() {
return new SetLocation(ops);
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,12 @@ public UpdateProperties updateProperties() {
return BaseTransaction.this.updateProperties();
}

@Override
public MetadataMaintenance maintenance() {
throw new UnsupportedOperationException(
"Cannot perform metadata maintenance as part of a transaction");
}

@Override
public ReplaceSortOrder replaceSortOrder() {
return BaseTransaction.this.replaceSortOrder();
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
}
}

class RemoveUnusedSpecs implements MetadataUpdate {
private final Set<Integer> specIds;

public RemoveUnusedSpecs(Set<Integer> specIds) {
this.specIds = specIds;
}

public Set<Integer> specIds() {
return specIds;
}
}

class AddSortOrder implements MetadataUpdate {
private final UnboundSortOrder sortOrder;

Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ public AppendFiles newAppend() {
throw new UnsupportedOperationException(errorMsg("newAppend"));
}

@Override
public MetadataMaintenance maintenance() {
throw new UnsupportedOperationException(errorMsg("metadataMaintenance"));
}

@Override
public RewriteFiles newRewrite() {
throw new UnsupportedOperationException(errorMsg("newRewrite"));
Expand Down
40 changes: 40 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,46 @@ public Builder setDefaultPartitionSpec(int specId) {
return this;
}

Builder removeUnusedSpecs(Iterable<PartitionSpec> specsToRemove) {
Set<Integer> specIdsToRemove = Sets.newHashSet();
for (PartitionSpec spec : specsToRemove) {
Preconditions.checkArgument(
spec.specId() != defaultSpecId, "Cannot remove default partition spec");
PartitionSpec toBeRemoved = specsById.remove(spec.specId());
Preconditions.checkArgument(
toBeRemoved == null || toBeRemoved.equals(spec),
"Cannot remove an unknown spec, spec id: %s",
spec.specId());
if (toBeRemoved != null) {
specIdsToRemove.add(spec.specId());
}
}
this.specs =
specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove));
return this;
}

Builder removeUnusedSpecsById(Iterable<Integer> specIds) {
Set<Integer> specIdsToRemove = Sets.newHashSet();
for (Integer specId : specIds) {
Preconditions.checkArgument(
specId != defaultSpecId, "Cannot remove default partition spec");
PartitionSpec toBeRemoved = specsById.remove(specId);
if (toBeRemoved != null) {
specIdsToRemove.add(specId);
}
}
this.specs =
specs.stream()
.filter(s -> !specIdsToRemove.contains(s.specId()))
.collect(Collectors.toList());
changes.add(new MetadataUpdate.RemoveUnusedSpecs(specIdsToRemove));
return this;
}

public Builder addPartitionSpec(UnboundPartitionSpec spec) {
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
return this;
Expand Down
Loading

0 comments on commit 5a3ec7c

Please sign in to comment.