Skip to content

Commit

Permalink
REF TRACKED II
Browse files Browse the repository at this point in the history
  • Loading branch information
finnegancarroll committed Jun 20, 2024
1 parent e8b7913 commit 15d13a2
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.file.RefTrackedOnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.utils.TransferManager;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

Expand Down Expand Up @@ -74,7 +75,8 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
if (fileInfo.name().startsWith(VIRTUAL_FILE_PREFIX)) {
return new ByteArrayIndexInput(fileInfo.physicalName(), fileInfo.metadata().hash().bytes);
}
return new OnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager);

return new RefTrackedOnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
*/
protected final long originalFileSize;

public OnDemandBlockSnapshotIndexInput(OnDemandBlockSnapshotIndexInput sourceObj) {
this(sourceObj.fileInfo, sourceObj.directory, sourceObj.transferManager);
// ensures that clones may be positioned at the same point as the blocked file they were cloned from
this.cloneBlock(sourceObj);
}

public OnDemandBlockSnapshotIndexInput(FileInfo fileInfo, FSDirectory directory, TransferManager transferManager) {
this(
"BlockedSnapshotIndexInput(path=\""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.file;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.FSDirectory;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.ArrayList;

/**
* Maintains a list of weak references to all clones. On close ensure all clones are closed as well.
* Per {@link IndexInput} cloned IndexInputs can be closed with the parent.
*/
public class RefTrackedOnDemandBlockSnapshotIndexInput extends OnDemandBlockSnapshotIndexInput {

private final ArrayList<WeakReference<IndexInput>> cloneRefs;

public RefTrackedOnDemandBlockSnapshotIndexInput(OnDemandBlockSnapshotIndexInput parentObject, ArrayList<WeakReference<IndexInput>> parentRefList) {
super(parentObject);
this.cloneRefs = parentRefList;
}

public RefTrackedOnDemandBlockSnapshotIndexInput(BlobStoreIndexShardSnapshot.FileInfo fileInfo, FSDirectory directory, TransferManager transferManager) {
super(fileInfo, directory, transferManager);
this.cloneRefs = new ArrayList<>();
}

@Override
public RefTrackedOnDemandBlockSnapshotIndexInput clone() {
RefTrackedOnDemandBlockSnapshotIndexInput retClone = new RefTrackedOnDemandBlockSnapshotIndexInput(super.clone(), cloneRefs);
cloneRefs.add(new WeakReference<>(retClone));
return retClone;
}

@Override
public void close() throws IOException {
if (!isClone) {
for (WeakReference<IndexInput> ref : cloneRefs) {
IndexInput input = ref.get();
if (input != null) {
System.out.println("CLOSING AN UNCLOSED REF: " + input);

// input.close();
}

System.out.println("NO CLOSE - REF ALREADY NULL: " + input);

// cloneRefs.remove(ref); // For now don't remove for debugging
}
}
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ private void evict() {
iterator.remove();
// Notify the listener only if the entry was evicted
data.remove(node.key, node);

System.out.println("EVICTING: " + node.key.toString());

usage -= node.weight;
statsCounter.recordEviction(node.weight);
listener.onRemoval(new RemovalNotification<>(node.key, node.value, RemovalReason.CAPACITY));
Expand Down

0 comments on commit 15d13a2

Please sign in to comment.