Skip to content

Commit

Permalink
[FLINK-36819] Support link() in ForStFlinkFileSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia committed Dec 5, 2024
1 parent 7394878 commit 36c8046
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 0 deletions.
63 changes: 63 additions & 0 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,69 @@ IOStatus FlinkFileSystem::UnlockFile(FileLock* /*lock*/,
return IOStatus::OK();
}

IOStatus FlinkFileSystem::LinkFile(const std::string& src,
const std::string& target,
const IOOptions& options,
IODebugContext* dbg) {
IOStatus status = FileExists(src, options, dbg);
if (!status.ok()) {
return status.IsNotFound()
? IOStatus::PathNotFound(
std::string(
"Could not find src path when linkFile, path: ")
.append(ConstructPath(src)))
: status;
}

JNIEnv* jniEnv = getJNIEnv();

std::string srcFilePath = ConstructPath(src);
// Construct src Path Instance
jobject srcPathInstance;
status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance);
if (!status.ok()) {
return status;
}

std::string targetFilePath = ConstructPath(target);
// Construct target Path Instance
jobject targetPathInstance;
status =
class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance);
if (!status.ok()) {
jniEnv->DeleteLocalRef(srcPathInstance);
return status;
}

JavaClassCache::JavaMethodContext linkMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_LINK_FILE);
jint linked =
jniEnv->CallIntMethod(file_system_instance_, linkMethod.javaMethod,
srcPathInstance, targetPathInstance);
jniEnv->DeleteLocalRef(srcPathInstance);
jniEnv->DeleteLocalRef(targetPathInstance);

status = CurrentStatus([srcFilePath, targetFilePath]() {
return std::string("Exception when LinkFile, src: ")
.append(srcFilePath)
.append(", target: ")
.append(targetFilePath);
});
if (!status.ok()) {
return status;
}

if (linked == -1) {
return IOStatus::NotSupported();
} else if (linked > 0) {
return IOStatus::IOError(std::string("Exception when LinkFile, src: ")
.append(srcFilePath)
.append(", target: ")
.append(targetFilePath));
}
return IOStatus::OK();
}

Status FlinkFileSystem::Create(const std::shared_ptr<FileSystem>& base,
const std::string& uri,
std::unique_ptr<FileSystem>* result,
Expand Down
4 changes: 4 additions & 0 deletions env/flink/env_flink.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class FlinkFileSystem : public FileSystemWrapper {
const IOOptions& /*options*/, bool* /*is_dir*/,
IODebugContext* /*dbg*/) override;

IOStatus LinkFile(const std::string& /*src*/, const std::string& /*target*/,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) override;

private:
const std::string base_path_;
JavaClassCache* class_cache_;
Expand Down
8 changes: 8 additions & 0 deletions env/flink/jni_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ IOStatus JavaClassCache::Init() {
.signature =
"(Lorg/apache/flink/core/fs/Path;Lorg/apache/flink/core/fs/Path;)Z";

cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE]
.javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM];
cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE]
.methodName = "link";
cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LINK_FILE]
.signature =
"(Lorg/apache/flink/core/fs/Path;Lorg/apache/flink/core/fs/Path;)I";

cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN]
.javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM];
cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN].methodName =
Expand Down
1 change: 1 addition & 0 deletions env/flink/jni_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class JavaClassCache {
JM_FLINK_FILE_STATUS_GET_LEN,
JM_FLINK_FILE_STATUS_GET_MODIFICATION_TIME,
JM_FLINK_FILE_STATUS_IS_DIR,
JM_FLINK_FILE_SYSTEM_LINK_FILE,
NUM_CACHED_METHODS
} CachedJavaMethod;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public boolean rename(Path src, Path dst) throws IOException {
return flinkFS.rename(src, dst);
}

public int link(Path src, Path dst) throws IOException {
// let forstdb copy the file
return -1;
}

@Override
public boolean isDistributedFS() {
return flinkFS.isDistributedFS();
Expand Down

0 comments on commit 36c8046

Please sign in to comment.