Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36868] Use file system methods with string parameters via JNI #27

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#-----------------------------------------------

FORST_VERSION ?= 0.1.3-beta
FORST_VERSION ?= 0.1.4-beta

BASH_EXISTS := $(shell which bash)
SHELL := $(shell which bash)
Expand Down
183 changes: 45 additions & 138 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,13 @@ class FlinkWritableFile : public FSWritableFile {

IOStatus Init() {
JNIEnv* jniEnv = getJNIEnv();
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(file_path_, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(file_path_.c_str());

JavaClassCache::JavaMethodContext fileSystemCreateMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_CREATE);
jobject fsDataOutputStream = jniEnv->CallObjectMethod(
file_system_instance_, fileSystemCreateMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, fileSystemCreateMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
if (fsDataOutputStream == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
Expand Down Expand Up @@ -184,19 +178,13 @@ class FlinkReadableFile : virtual public FSSequentialFile,

IOStatus Init() {
JNIEnv* jniEnv = getJNIEnv();
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(file_path_, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(file_path_.c_str());

JavaClassCache::JavaMethodContext openMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_OPEN);
jobject fsDataInputStream = jniEnv->CallObjectMethod(
file_system_instance_, openMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, openMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
if (fsDataInputStream == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
Expand Down Expand Up @@ -345,29 +333,13 @@ Status FlinkFileSystem::Init() {
JavaClassCache::JavaMethodContext fileSystemGetMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET);

JavaClassCache::JavaClassContext uriClass =
class_cache_->GetJClass(JavaClassCache::JC_URI);
JavaClassCache::JavaMethodContext uriConstructor =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR);

// Construct URI
jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str());
jobject uriInstance = jniEnv->NewObject(
uriClass.javaClass, uriConstructor.javaMethod, uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (uriInstance == nullptr) {
return CheckThenError(
std::string("NewObject Exception when Init FlinkFileSystem, ")
.append(uriClass.ToString())
.append(uriConstructor.ToString())
.append(", args: ")
.append(base_path_));
}

// Construct FileSystem
jobject fileSystemInstance = jniEnv->CallStaticObjectMethod(
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance);
jniEnv->DeleteLocalRef(uriInstance);
fileSystemClass.javaClass, fileSystemGetMethod.javaMethod,
uriStringArg);
jniEnv->DeleteLocalRef(uriStringArg);
if (fileSystemInstance == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string(
Expand Down Expand Up @@ -472,23 +444,17 @@ IOStatus FlinkFileSystem::FileExists(const std::string& file_name,
const IOOptions& /*options*/,
IODebugContext* /*dbg*/) {
std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
JNIEnv* jniEnv = getJNIEnv();
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call exist method
JNIEnv* jniEnv = getJNIEnv();
JavaClassCache::JavaMethodContext existsMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_EXISTS);
jboolean exists = jniEnv->CallBooleanMethod(
file_system_instance_, existsMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, existsMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);

status = CurrentStatus([filePath]() {
IOStatus status = CurrentStatus([filePath]() {
return std::string("Exception when FileExists, path: ").append(filePath);
});
if (!status.ok()) {
Expand All @@ -513,21 +479,15 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,
}

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}

JNIEnv* jniEnv = getJNIEnv();
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

JavaClassCache::JavaMethodContext listStatusMethod = class_cache_->GetJMethod(
JavaClassCache::JM_FLINK_FILE_SYSTEM_LIST_STATUS);

auto fileStatusArray = (jobjectArray)jniEnv->CallObjectMethod(
file_system_instance_, listStatusMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, listStatusMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
if (fileStatusArray == nullptr || jniEnv->ExceptionCheck()) {
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
Expand All @@ -548,26 +508,14 @@ IOStatus FlinkFileSystem::GetChildren(const std::string& file_name,

JavaClassCache::JavaMethodContext getPathMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_STATUS_GET_PATH);
jobject subPath =
jniEnv->CallObjectMethod(fileStatusObj, getPathMethod.javaMethod);
jniEnv->DeleteLocalRef(fileStatusObj);
if (subPath == nullptr || jniEnv->ExceptionCheck()) {
jniEnv->DeleteLocalRef(fileStatusArray);
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
.append(getPathMethod.ToString()));
}

JavaClassCache::JavaMethodContext pathToStringMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_PATH_TO_STRING);
auto subPathStr = (jstring)jniEnv->CallObjectMethod(
subPath, pathToStringMethod.javaMethod);
jniEnv->DeleteLocalRef(subPath);
fileStatusObj, getPathMethod.javaMethod);
jniEnv->DeleteLocalRef(fileStatusObj);
if (subPathStr == nullptr || jniEnv->ExceptionCheck()) {
jniEnv->DeleteLocalRef(fileStatusArray);
return CheckThenError(
std::string("Exception when CallObjectMethod in GetChildren, ")
.append(pathToStringMethod.ToString()));
.append(getPathMethod.ToString()));
}

const char* str = jniEnv->GetStringUTFChars(subPathStr, nullptr);
Expand Down Expand Up @@ -603,25 +551,19 @@ IOStatus FlinkFileSystem::Delete(const std::string& file_name,
.append(ConstructPath(file_name)))
: fileExistsStatus;
}
JNIEnv* jniEnv = getJNIEnv();

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call delete method
JNIEnv* jniEnv = getJNIEnv();
JavaClassCache::JavaMethodContext deleteMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_DELETE);
jboolean deleted = jniEnv->CallBooleanMethod(
file_system_instance_, deleteMethod.javaMethod, pathInstance, recursive);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, deleteMethod.javaMethod, pathString, recursive);
jniEnv->DeleteLocalRef(pathString);

status = CurrentStatus([filePath]() {
IOStatus status = CurrentStatus([filePath]() {
return std::string("Exception when Delete, path: ").append(filePath);
});
if (!status.ok()) {
Expand Down Expand Up @@ -652,21 +594,15 @@ IOStatus FlinkFileSystem::CreateDirIfMissing(const std::string& file_name,
JNIEnv* jniEnv = getJNIEnv();

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
IOStatus status =
class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call mkdirs method
JavaClassCache::JavaMethodContext mkdirMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_MKDIR);
jboolean created = jniEnv->CallBooleanMethod(
file_system_instance_, mkdirMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
status = CurrentStatus([filePath]() {
file_system_instance_, mkdirMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);
IOStatus status = CurrentStatus([filePath]() {
return std::string("Exception when CreateDirIfMissing, path: ")
.append(filePath);
});
Expand Down Expand Up @@ -722,22 +658,18 @@ IOStatus FlinkFileSystem::GetFileStatus(const std::string& file_name,
: status;
}

JNIEnv* jniEnv = getJNIEnv();

std::string filePath = ConstructPath(file_name);
// Construct Path Instance
jobject pathInstance;
status = class_cache_->ConstructPathInstance(filePath, &pathInstance);
if (!status.ok()) {
return status;
}
jstring pathString = jniEnv->NewStringUTF(filePath.c_str());

// Call getFileStatus method
JNIEnv* jniEnv = getJNIEnv();
JavaClassCache::JavaMethodContext getFileStatusMethod =
class_cache_->GetJMethod(
JavaClassCache::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS);
*fileStatus = jniEnv->CallObjectMethod(
file_system_instance_, getFileStatusMethod.javaMethod, pathInstance);
jniEnv->DeleteLocalRef(pathInstance);
file_system_instance_, getFileStatusMethod.javaMethod, pathString);
jniEnv->DeleteLocalRef(pathString);

return CurrentStatus([filePath]() {
return std::string("Exception when GetFileStatus, path: ").append(filePath);
Expand Down Expand Up @@ -818,30 +750,17 @@ IOStatus FlinkFileSystem::RenameFile(const std::string& src,
JNIEnv* jniEnv = getJNIEnv();

std::string srcFilePath = ConstructPath(src);
// Construct src Path Instance
jobject srcPathInstance;
status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance);
if (!status.ok()) {
return status;
}

jstring srcPathString = jniEnv->NewStringUTF(srcFilePath.c_str());
std::string targetFilePath = ConstructPath(target);
// Construct target Path Instance
jobject targetPathInstance;
status =
class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance);
if (!status.ok()) {
jniEnv->DeleteLocalRef(srcPathInstance);
return status;
}
jstring targetPathString = jniEnv->NewStringUTF(targetFilePath.c_str());

JavaClassCache::JavaMethodContext renameMethod = class_cache_->GetJMethod(
JavaClassCache::JM_FLINK_FILE_SYSTEM_RENAME_FILE);
jboolean renamed =
jniEnv->CallBooleanMethod(file_system_instance_, renameMethod.javaMethod,
srcPathInstance, targetPathInstance);
jniEnv->DeleteLocalRef(srcPathInstance);
jniEnv->DeleteLocalRef(targetPathInstance);
srcPathString, targetPathString);
jniEnv->DeleteLocalRef(srcPathString);
jniEnv->DeleteLocalRef(targetPathString);

status = CurrentStatus([srcFilePath, targetFilePath]() {
return std::string("Exception when RenameFile, src: ")
Expand Down Expand Up @@ -895,30 +814,18 @@ IOStatus FlinkFileSystem::LinkFile(const std::string& src,
JNIEnv* jniEnv = getJNIEnv();

std::string srcFilePath = ConstructPath(src);
// Construct src Path Instance
jobject srcPathInstance;
status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance);
if (!status.ok()) {
return status;
}
jstring srcPathString = jniEnv->NewStringUTF(srcFilePath.c_str());

std::string targetFilePath = ConstructPath(target);
// Construct target Path Instance
jobject targetPathInstance;
status =
class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance);
if (!status.ok()) {
jniEnv->DeleteLocalRef(srcPathInstance);
return status;
}
jstring targetPathString = jniEnv->NewStringUTF(targetFilePath.c_str());

JavaClassCache::JavaMethodContext linkMethod =
class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_LINK_FILE);
jint linked =
jniEnv->CallIntMethod(file_system_instance_, linkMethod.javaMethod,
srcPathInstance, targetPathInstance);
jniEnv->DeleteLocalRef(srcPathInstance);
jniEnv->DeleteLocalRef(targetPathInstance);
srcPathString, targetPathString);
jniEnv->DeleteLocalRef(srcPathString);
jniEnv->DeleteLocalRef(targetPathString);

status = CurrentStatus([srcFilePath, targetFilePath]() {
return std::string("Exception when LinkFile, src: ")
Expand Down
Loading
Loading