diff --git a/env/flink/env_flink.cc b/env/flink/env_flink.cc index 87183f131..caa3f738e 100644 --- a/env/flink/env_flink.cc +++ b/env/flink/env_flink.cc @@ -5,6 +5,812 @@ // TODO: // 1. Register flink env to ObjectLibrary -// 2. Implement all methods of env_flink.h -#include "env_flink.h" \ No newline at end of file +#include "env_flink.h" + +#include "jvm_util.h" + +// +// This file defines a Flink environment for ForSt. It uses the JNI call +// to access Flink FileSystem. All files created by one instance of ForSt +// will reside on the actual Flink FileSystem. +// +namespace ROCKSDB_NAMESPACE { + +// Appends to an existing file in Flink FileSystem. +class FlinkWritableFile : public FSWritableFile { + private: + std::string file_path_; + jobject file_system_instance_; + jobject fs_data_output_stream_instance_; + JavaClassCache* class_cache_; + + public: + FlinkWritableFile(jobject file_system_instance, + JavaClassCache* java_class_cache, + const std::string& file_path, const FileOptions& options) + : FSWritableFile(options), + file_path_(file_path), + file_system_instance_(file_system_instance), + class_cache_(java_class_cache) {} + + ~FlinkWritableFile() override { + JNIEnv* jniEnv = getJNIEnv(); + if (fs_data_output_stream_instance_ != nullptr) { + jniEnv->DeleteGlobalRef(fs_data_output_stream_instance_); + } + } + + IOStatus Init() { + JNIEnv* jniEnv = getJNIEnv(); + // Construct Path Instance + jobject pathInstance; + IOStatus status = + class_cache_->ConstructPathInstance(file_path_, &pathInstance); + if (!status.ok()) { + return status; + } + + JavaClassCache::JavaMethodContext fileSystemCreateMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_CREATE); + jobject fsDataOutputStream = jniEnv->CallObjectMethod( + file_system_instance_, fileSystemCreateMethod.javaMethod, pathInstance); + jniEnv->DeleteLocalRef(pathInstance); + if (fsDataOutputStream == nullptr) { + return CheckThenError( + std::string( + "CallObjectMethod Exception when Init FlinkWritableFile, ") + .append(fileSystemCreateMethod.ToString()) + .append(", args: Path(") + .append(file_path_) + .append(")")); + } + fs_data_output_stream_instance_ = jniEnv->NewGlobalRef(fsDataOutputStream); + jniEnv->DeleteLocalRef(fsDataOutputStream); + return IOStatus::OK(); + } + + IOStatus Append(const Slice& data, const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + JNIEnv* jniEnv = getJNIEnv(); + // TODO: check about unsigned long to long + jobject directByteBuffer = + jniEnv->NewDirectByteBuffer((void*)data.data(), data.size()); + + JavaClassCache::JavaMethodContext writeMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FS_OUTPUT_STREAM_WRITE); + jniEnv->CallVoidMethod(fs_data_output_stream_instance_, + writeMethod.javaMethod, directByteBuffer); + jniEnv->DeleteLocalRef(directByteBuffer); + + std::string filePath = file_path_; + return CurrentStatus([filePath]() { + return std::string("Exception when Appending file, path: ") + .append(filePath); + }); + } + + IOStatus Append(const Slice& data, const IOOptions& options, + const DataVerificationInfo& /* verification_info */, + IODebugContext* dbg) override { + return Append(data, options, dbg); + } + + IOStatus Flush(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + JavaClassCache::JavaMethodContext flushMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FS_OUTPUT_STREAM_FLUSH); + JNIEnv* jniEnv = getJNIEnv(); + jniEnv->CallVoidMethod(fs_data_output_stream_instance_, + flushMethod.javaMethod); + + std::string filePath = file_path_; + return CurrentStatus([filePath]() { + return std::string("Exception when Flush file, path: ").append(filePath); + }); + } + + IOStatus Sync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + JavaClassCache::JavaMethodContext flushMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FS_OUTPUT_STREAM_SYNC); + JNIEnv* jniEnv = getJNIEnv(); + jniEnv->CallVoidMethod(fs_data_output_stream_instance_, + flushMethod.javaMethod); + + std::string filePath = file_path_; + return CurrentStatus([filePath]() { + return std::string("Exception when Sync file, path: ").append(filePath); + }); + } + + IOStatus Close(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + JavaClassCache::JavaMethodContext closeMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FS_OUTPUT_STREAM_CLOSE); + JNIEnv* jniEnv = getJNIEnv(); + jniEnv->CallVoidMethod(fs_data_output_stream_instance_, + closeMethod.javaMethod); + + std::string filePath = file_path_; + return CurrentStatus([filePath]() { + return std::string("Exception when Close file, path: ").append(filePath); + }); + } +}; + +// Used for reading a file from Flink FileSystem. It implements both +// sequential-read access methods and random read access methods. +class FlinkReadableFile : virtual public FSSequentialFile, + virtual public FSRandomAccessFile { + private: + std::string file_path_; + jobject file_system_instance_; + jobject fs_data_input_stream_instance_; + JavaClassCache* class_cache_; + + public: + FlinkReadableFile(jobject file_system_instance, + JavaClassCache* java_class_cache, + const std::string& file_path) + : file_path_(file_path), + file_system_instance_(file_system_instance), + class_cache_(java_class_cache) {} + + ~FlinkReadableFile() override { + JNIEnv* jniEnv = getJNIEnv(); + if (fs_data_input_stream_instance_ != nullptr) { + jniEnv->DeleteGlobalRef(fs_data_input_stream_instance_); + } + } + + IOStatus Init() { + JNIEnv* jniEnv = getJNIEnv(); + // Construct Path Instance + jobject pathInstance; + IOStatus status = + class_cache_->ConstructPathInstance(file_path_, &pathInstance); + if (!status.ok()) { + return status; + } + + JavaClassCache::JavaMethodContext openMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_OPEN); + jobject fsDataInputStream = jniEnv->CallObjectMethod( + file_system_instance_, openMethod.javaMethod, pathInstance); + jniEnv->DeleteLocalRef(pathInstance); + if (fsDataInputStream == nullptr) { + return CheckThenError( + std::string( + "CallObjectMethod Exception when Init FlinkReadableFile, ") + .append(openMethod.ToString()) + .append(", args: Path(") + .append(file_path_) + .append(")")); + } + + fs_data_input_stream_instance_ = jniEnv->NewGlobalRef(fsDataInputStream); + jniEnv->DeleteLocalRef(fsDataInputStream); + return IOStatus::OK(); + } + + // sequential access, read data at current offset in file + IOStatus Read(size_t n, const IOOptions& /*options*/, Slice* result, + char* scratch, IODebugContext* /*dbg*/) override { + JNIEnv* jniEnv = getJNIEnv(); + // TODO: check about unsigned long to long + jobject directByteBuffer = jniEnv->NewDirectByteBuffer((void*)scratch, n); + + JavaClassCache::JavaMethodContext readMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FS_INPUT_STREAM_SEQ_READ); + jint totalBytesRead = + jniEnv->CallIntMethod(fs_data_input_stream_instance_, + readMethod.javaMethod, directByteBuffer); + + jniEnv->DeleteLocalRef(directByteBuffer); + + std::string filePath = file_path_; + IOStatus status = CurrentStatus([filePath]() { + return std::string("Exception when Reading file, path: ") + .append(filePath); + }); + if (!status.ok()) { + return status; + } + + *result = Slice(scratch, totalBytesRead == -1 ? 0 : totalBytesRead); + return IOStatus::OK(); + } + + // random access, read data from specified offset in file + IOStatus Read(uint64_t offset, size_t n, const IOOptions& /*options*/, + Slice* result, char* scratch, + IODebugContext* /*dbg*/) const override { + JNIEnv* jniEnv = getJNIEnv(); + // TODO: check about unsigned long to long + jobject directByteBuffer = jniEnv->NewDirectByteBuffer((void*)scratch, n); + + JavaClassCache::JavaMethodContext readMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FS_INPUT_STREAM_RANDOM_READ); + jint totalBytesRead = + jniEnv->CallIntMethod(fs_data_input_stream_instance_, + readMethod.javaMethod, offset, directByteBuffer); + + jniEnv->DeleteLocalRef(directByteBuffer); + + std::string filePath = file_path_; + IOStatus status = CurrentStatus([filePath]() { + return std::string("Exception when Reading file, path: ") + .append(filePath); + }); + if (!status.ok()) { + return status; + } + + *result = Slice(scratch, totalBytesRead == -1 ? 0 : totalBytesRead); + return IOStatus::OK(); + } + + IOStatus Skip(uint64_t n) override { + JNIEnv* jniEnv = getJNIEnv(); + JavaClassCache::JavaMethodContext skipMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FS_INPUT_STREAM_SKIP); + jniEnv->CallVoidMethod(fs_data_input_stream_instance_, + skipMethod.javaMethod, n); + + std::string filePath = file_path_; + return CurrentStatus([filePath]() { + return std::string("Exception when skipping file, path: ") + .append(filePath); + }); + } +}; + +// Simple implementation of FSDirectory, Shouldn't influence the normal usage +class FlinkDirectory : public FSDirectory { + public: + explicit FlinkDirectory(int fd) : fd_(fd) {} + ~FlinkDirectory() {} + + IOStatus Fsync(const IOOptions& /*options*/, + IODebugContext* /*dbg*/) override { + return IOStatus::OK(); + } + + int GetFd() const { return fd_; } + + private: + int fd_; +}; + +FlinkFileSystem::FlinkFileSystem(const std::shared_ptr& base_fs, + const std::string& base_path) + : FileSystemWrapper(base_fs), base_path_(base_path) {} + +FlinkFileSystem::~FlinkFileSystem() { delete class_cache_; } + +Status FlinkFileSystem::Init() { + JNIEnv* jniEnv = getJNIEnv(); + std::unique_ptr javaClassCache; + Status status = JavaClassCache::Create(jniEnv, &javaClassCache); + if (!status.ok()) { + return status; + } + class_cache_ = javaClassCache.release(); + + // Delegate Flink to load real FileSystem (e.g. + // S3FileSystem/OSSFileSystem/...) + JavaClassCache::JavaClassContext fileSystemClass = + class_cache_->GetJClass(JavaClassCache::JC_FLINK_FILE_SYSTEM); + JavaClassCache::JavaMethodContext fileSystemGetMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_GET); + + JavaClassCache::JavaClassContext uriClass = + class_cache_->GetJClass(JavaClassCache::JC_URI); + JavaClassCache::JavaMethodContext uriConstructor = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_URI_CONSTRUCTOR); + + // Construct URI + jstring uriStringArg = jniEnv->NewStringUTF(base_path_.c_str()); + jobject uriInstance = jniEnv->NewObject( + uriClass.javaClass, uriConstructor.javaMethod, uriStringArg); + jniEnv->DeleteLocalRef(uriStringArg); + if (uriInstance == nullptr) { + return CheckThenError( + std::string("NewObject Exception when Init FlinkFileSystem, ") + .append(uriClass.ToString()) + .append(uriConstructor.ToString()) + .append(", args: ") + .append(base_path_)); + } + + // Construct FileSystem + jobject fileSystemInstance = jniEnv->CallStaticObjectMethod( + fileSystemClass.javaClass, fileSystemGetMethod.javaMethod, uriInstance); + jniEnv->DeleteLocalRef(uriInstance); + if (fileSystemInstance == nullptr) { + return CheckThenError( + std::string( + "CallStaticObjectMethod Exception when Init FlinkFileSystem, ") + .append(fileSystemClass.ToString()) + .append(fileSystemGetMethod.ToString()) + .append(", args: URI(") + .append(base_path_) + .append(")")); + } + file_system_instance_ = jniEnv->NewGlobalRef(fileSystemInstance); + jniEnv->DeleteLocalRef(fileSystemInstance); + return Status::OK(); +} + +std::string FlinkFileSystem::ConstructPath(const std::string& fname) { + return fname.at(0) == '/' ? base_path_ + fname : base_path_ + "/" + fname; +} + +// open a file for sequential reading +IOStatus FlinkFileSystem::NewSequentialFile( + const std::string& fname, const FileOptions& options, + std::unique_ptr* result, IODebugContext* dbg) { + result->reset(); + IOStatus status = FileExists(fname, IOOptions(), dbg); + if (!status.ok()) { + return status; + } + + auto f = new FlinkReadableFile(file_system_instance_, class_cache_, + ConstructPath(fname)); + IOStatus valid = f->Init(); + if (!valid.ok()) { + delete f; + return valid; + } + result->reset(f); + return IOStatus::OK(); +} + +// open a file for random reading +IOStatus FlinkFileSystem::NewRandomAccessFile( + const std::string& fname, const FileOptions& options, + std::unique_ptr* result, IODebugContext* dbg) { + result->reset(); + IOStatus status = FileExists(fname, IOOptions(), dbg); + if (!status.ok()) { + return status; + } + + auto f = new FlinkReadableFile(file_system_instance_, class_cache_, + ConstructPath(fname)); + IOStatus valid = f->Init(); + if (!valid.ok()) { + delete f; + return valid; + } + result->reset(f); + return IOStatus::OK(); +} + +// create a new file for writing +IOStatus FlinkFileSystem::NewWritableFile( + const std::string& fname, const FileOptions& options, + std::unique_ptr* result, IODebugContext* /*dbg*/) { + result->reset(); + auto f = new FlinkWritableFile(file_system_instance_, class_cache_, + ConstructPath(fname), options); + IOStatus valid = f->Init(); + if (!valid.ok()) { + delete f; + return valid; + } + result->reset(f); + return IOStatus::OK(); +} + +IOStatus FlinkFileSystem::NewDirectory(const std::string& name, + const IOOptions& options, + std::unique_ptr* result, + IODebugContext* dbg) { + IOStatus s = FileExists(name, options, dbg); + if (s.ok()) { + result->reset(new FlinkDirectory(0)); + } + return s; +} + +IOStatus FlinkFileSystem::FileExists(const std::string& file_name, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) { + std::string filePath = ConstructPath(file_name); + // Construct Path Instance + jobject pathInstance; + IOStatus status = + class_cache_->ConstructPathInstance(filePath, &pathInstance); + if (!status.ok()) { + return status; + } + + // Call exist method + JNIEnv* jniEnv = getJNIEnv(); + JavaClassCache::JavaMethodContext existsMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_EXISTS); + jboolean exists = jniEnv->CallBooleanMethod( + file_system_instance_, existsMethod.javaMethod, pathInstance); + jniEnv->DeleteLocalRef(pathInstance); + + status = CurrentStatus([filePath]() { + return std::string("Exception when FileExists, path: ").append(filePath); + }); + if (!status.ok()) { + return status; + } + + return exists == JNI_TRUE ? IOStatus::OK() : IOStatus::NotFound(); +} + +// TODO: Not Efficient! Consider adding usable methods in FLink FileSystem +IOStatus FlinkFileSystem::GetChildren(const std::string& file_name, + const IOOptions& options, + std::vector* result, + IODebugContext* dbg) { + IOStatus fileExistsStatus = FileExists(file_name, options, dbg); + if (!fileExistsStatus.ok()) { + return fileExistsStatus.IsNotFound() + ? IOStatus::PathNotFound( + std::string("Could not find path when GetChildren, path: ") + .append(ConstructPath(file_name))) + : fileExistsStatus; + } + + std::string filePath = ConstructPath(file_name); + // Construct Path Instance + jobject pathInstance; + IOStatus status = + class_cache_->ConstructPathInstance(filePath, &pathInstance); + if (!status.ok()) { + return status; + } + + JNIEnv* jniEnv = getJNIEnv(); + JavaClassCache::JavaMethodContext listStatusMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FILE_SYSTEM_LIST_STATUS); + + auto fileStatusArray = (jobjectArray)jniEnv->CallObjectMethod( + file_system_instance_, listStatusMethod.javaMethod, pathInstance); + jniEnv->DeleteLocalRef(pathInstance); + if (fileStatusArray == nullptr) { + return CheckThenError( + std::string("Exception when CallObjectMethod in GetChildren, ") + .append(listStatusMethod.ToString()) + .append(", args: Path(") + .append(filePath) + .append(")")); + } + + jsize fileStatusArrayLen = jniEnv->GetArrayLength(fileStatusArray); + for (jsize i = 0; i < fileStatusArrayLen; i++) { + jobject fileStatusObj = jniEnv->GetObjectArrayElement(fileStatusArray, i); + if (fileStatusObj == nullptr) { + jniEnv->DeleteLocalRef(fileStatusArray); + return CheckThenError( + "Exception when GetObjectArrayElement in GetChildren"); + } + + JavaClassCache::JavaMethodContext getPathMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_STATUS_GET_PATH); + jobject subPath = + jniEnv->CallObjectMethod(fileStatusObj, getPathMethod.javaMethod); + jniEnv->DeleteLocalRef(fileStatusObj); + if (subPath == nullptr) { + jniEnv->DeleteLocalRef(fileStatusArray); + return CheckThenError( + std::string("Exception when CallObjectMethod in GetChildren, ") + .append(getPathMethod.ToString())); + } + + JavaClassCache::JavaMethodContext pathToStringMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_PATH_TO_STRING); + auto subPathStr = (jstring)jniEnv->CallObjectMethod( + subPath, pathToStringMethod.javaMethod); + jniEnv->DeleteLocalRef(subPath); + const char* str = jniEnv->GetStringUTFChars(subPathStr, nullptr); + result->emplace_back(str); + jniEnv->ReleaseStringUTFChars(subPathStr, str); + jniEnv->DeleteLocalRef(subPathStr); + } + + jniEnv->DeleteLocalRef(fileStatusArray); + return IOStatus::OK(); +} + +IOStatus FlinkFileSystem::DeleteDir(const std::string& file_name, + const IOOptions& options, + IODebugContext* dbg) { + return Delete(file_name, options, dbg, true); +}; + +IOStatus FlinkFileSystem::DeleteFile(const std::string& file_name, + const IOOptions& options, + IODebugContext* dbg) { + return Delete(file_name, options, dbg, false); +} + +IOStatus FlinkFileSystem::Delete(const std::string& file_name, + const IOOptions& options, IODebugContext* dbg, + bool recursive) { + IOStatus fileExistsStatus = FileExists(file_name, options, dbg); + if (!fileExistsStatus.ok()) { + return fileExistsStatus.IsNotFound() + ? IOStatus::PathNotFound( + std::string("Could not find path when Delete, path: ") + .append(ConstructPath(file_name))) + : fileExistsStatus; + } + + std::string filePath = ConstructPath(file_name); + // Construct Path Instance + jobject pathInstance; + IOStatus status = + class_cache_->ConstructPathInstance(filePath, &pathInstance); + if (!status.ok()) { + return status; + } + + // Call delete method + JNIEnv* jniEnv = getJNIEnv(); + JavaClassCache::JavaMethodContext deleteMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_DELETE); + jboolean created = jniEnv->CallBooleanMethod( + file_system_instance_, deleteMethod.javaMethod, pathInstance, recursive); + jniEnv->DeleteLocalRef(pathInstance); + + status = CurrentStatus([filePath]() { + return std::string("Exception when Delete, path: ").append(filePath); + }); + if (!status.ok()) { + return status; + } + + return created ? IOStatus::OK() : IOStatus::IOError(status.ToString()); +} + +IOStatus FlinkFileSystem::CreateDir(const std::string& file_name, + const IOOptions& options, + IODebugContext* dbg) { + IOStatus s = FileExists(file_name, options, dbg); + if (!s.ok()) { + return CreateDirIfMissing(file_name, options, dbg); + } + return IOStatus::IOError(std::string("Exception when CreateDir because Dir (") + .append(file_name) + .append(") exists")); +} + +IOStatus FlinkFileSystem::CreateDirIfMissing(const std::string& file_name, + const IOOptions& options, + IODebugContext* dbg) { + JNIEnv* jniEnv = getJNIEnv(); + + std::string filePath = ConstructPath(file_name); + // Construct Path Instance + jobject pathInstance; + IOStatus status = + class_cache_->ConstructPathInstance(filePath, &pathInstance); + if (!status.ok()) { + return status; + } + + // Call mkdirs method + JavaClassCache::JavaMethodContext mkdirMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_SYSTEM_MKDIR); + jboolean created = jniEnv->CallBooleanMethod( + file_system_instance_, mkdirMethod.javaMethod, pathInstance); + jniEnv->DeleteLocalRef(pathInstance); + status = CurrentStatus([filePath]() { + return std::string("Exception when CreateDirIfMissing, path: ") + .append(filePath); + }); + if (!status.ok()) { + return status; + } + + return created ? IOStatus::OK() : IOStatus::IOError(status.ToString()); +} + +IOStatus FlinkFileSystem::GetFileSize(const std::string& file_name, + const IOOptions& options, uint64_t* size, + IODebugContext* dbg) { + JNIEnv* jniEnv = getJNIEnv(); + jobject fileStatus; + IOStatus status = GetFileStatus(file_name, options, dbg, &fileStatus); + if (!status.ok()) { + return status; + } + + JavaClassCache::JavaMethodContext getLenMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_STATUS_GET_LEN); + jlong fileSize = jniEnv->CallLongMethod(fileStatus, getLenMethod.javaMethod); + jniEnv->DeleteLocalRef(fileStatus); + + status = CurrentStatus([file_name]() { + return std::string("Exception when GetFileSize, file name: ") + .append(file_name); + }); + if (!status.ok()) { + return status; + } + + *size = fileSize; + return IOStatus::OK(); +} + +// 1. Must guarantee that FileStatus::getPath will not be used! +// 2. The life cycle of fileStatus is maintained by caller. +IOStatus FlinkFileSystem::GetFileStatus(const std::string& file_name, + const IOOptions& options, + IODebugContext* dbg, + jobject* fileStatus) { + IOStatus status = FileExists(file_name, options, dbg); + if (!status.ok()) { + return status.IsNotFound() + ? IOStatus::PathNotFound( + std::string( + "Could not find path when GetFileStatus, path: ") + .append(ConstructPath(file_name))) + : status; + } + + std::string filePath = ConstructPath(file_name); + // Construct Path Instance + jobject pathInstance; + status = class_cache_->ConstructPathInstance(filePath, &pathInstance); + if (!status.ok()) { + return status; + } + + // Call getFileStatus method + JNIEnv* jniEnv = getJNIEnv(); + JavaClassCache::JavaMethodContext getFileStatusMethod = + class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS); + *fileStatus = jniEnv->CallObjectMethod( + file_system_instance_, getFileStatusMethod.javaMethod, pathInstance); + jniEnv->DeleteLocalRef(pathInstance); + + return CurrentStatus([filePath]() { + return std::string("Exception when GetFileStatus, path: ").append(filePath); + }); +} + +IOStatus FlinkFileSystem::GetFileModificationTime(const std::string& file_name, + const IOOptions& options, + uint64_t* time, + IODebugContext* dbg) { + JNIEnv* jniEnv = getJNIEnv(); + jobject fileStatus; + IOStatus status = GetFileStatus(file_name, options, dbg, &fileStatus); + if (!status.ok()) { + return status; + } + + JavaClassCache::JavaMethodContext getModificationTimeMethod = + class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FILE_STATUS_GET_MODIFICATION_TIME); + jlong fileModificationTime = + jniEnv->CallLongMethod(fileStatus, getModificationTimeMethod.javaMethod); + jniEnv->DeleteLocalRef(fileStatus); + + status = CurrentStatus([file_name]() { + return std::string("Exception when GetFileModificationTime, file name: ") + .append(file_name); + }); + if (!status.ok()) { + return status; + } + + *time = fileModificationTime; + return IOStatus::OK(); +} + +IOStatus FlinkFileSystem::IsDirectory(const std::string& path, + const IOOptions& options, bool* is_dir, + IODebugContext* dbg) { + JNIEnv* jniEnv = getJNIEnv(); + jobject fileStatus; + IOStatus status = GetFileStatus(path, options, dbg, &fileStatus); + if (!status.ok()) { + return status; + } + + JavaClassCache::JavaMethodContext isDirMethod = + class_cache_->GetJMethod(JavaClassCache::JM_FLINK_FILE_STATUS_IS_DIR); + jboolean isDir = + jniEnv->CallBooleanMethod(fileStatus, isDirMethod.javaMethod); + jniEnv->DeleteLocalRef(fileStatus); + + status = CurrentStatus([path]() { + return std::string("Exception when IsDirectory, file name: ").append(path); + }); + if (!status.ok()) { + return status; + } + + *is_dir = isDir; + return IOStatus::OK(); +} + +IOStatus FlinkFileSystem::RenameFile(const std::string& src, + const std::string& target, + const IOOptions& options, + IODebugContext* dbg) { + IOStatus status = FileExists(src, options, dbg); + if (!status.ok()) { + return status.IsNotFound() + ? IOStatus::PathNotFound( + std::string( + "Could not find src path when RenameFile, path: ") + .append(ConstructPath(src))) + : status; + } + + std::string srcFilePath = ConstructPath(src); + // Construct src Path Instance + jobject srcPathInstance; + status = class_cache_->ConstructPathInstance(srcFilePath, &srcPathInstance); + if (!status.ok()) { + return status; + } + + std::string targetFilePath = ConstructPath(src); + // Construct target Path Instance + jobject targetPathInstance; + status = + class_cache_->ConstructPathInstance(targetFilePath, &targetPathInstance); + if (!status.ok()) { + return status; + } + + JNIEnv* jniEnv = getJNIEnv(); + JavaClassCache::JavaMethodContext renameMethod = class_cache_->GetJMethod( + JavaClassCache::JM_FLINK_FILE_SYSTEM_RENAME_FILE); + jboolean renamed = + jniEnv->CallBooleanMethod(file_system_instance_, renameMethod.javaMethod, + srcPathInstance, targetPathInstance); + + status = CurrentStatus([srcFilePath, targetFilePath]() { + return std::string("Exception when RenameFile, src: ") + .append(srcFilePath) + .append(", target: ") + .append(targetFilePath); + }); + if (!status.ok()) { + return status; + } + + return renamed ? IOStatus::OK() : IOStatus::IOError(status.ToString()); +} + +IOStatus FlinkFileSystem::LockFile(const std::string& /*file_name*/, + const IOOptions& /*options*/, + FileLock** lock, IODebugContext* /*dbg*/) { + // There isn't a very good way to atomically check and create a file, + // Since it will not influence the usage of Flink, just leave it OK() now; + *lock = nullptr; + return IOStatus::OK(); +} + +IOStatus FlinkFileSystem::UnlockFile(FileLock* /*lock*/, + const IOOptions& /*options*/, + IODebugContext* /*dbg*/) { + // There isn't a very good way to atomically check and create a file, + // Since it will not influence the usage of Flink, just leave it OK() now; + return IOStatus::OK(); +} + +Status FlinkFileSystem::Create(const std::shared_ptr& base, + const std::string& uri, + std::unique_ptr* result) { + auto* fileSystem = new FlinkFileSystem(base, uri); + Status status = fileSystem->Init(); + result->reset(fileSystem); + return status; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/env/flink/env_flink.h b/env/flink/env_flink.h index d1912a3de..1e87d6c06 100644 --- a/env/flink/env_flink.h +++ b/env/flink/env_flink.h @@ -5,6 +5,7 @@ #pragma once +#include "jni_helper.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" #include "rocksdb/status.h" @@ -28,16 +29,10 @@ class FlinkFileSystem : public FileSystemWrapper { static const char* kNickName() { return "flink"; } const char* NickName() const override { return kNickName(); } - // Constructor and Destructor - explicit FlinkFileSystem(const std::shared_ptr& base, - const std::string& fsname); + // Destructor ~FlinkFileSystem() override; // Several methods current FileSystem must implement - - std::string GetId() const override; - Status ValidateOptions(const DBOptions& /*db_opts*/, - const ColumnFamilyOptions& /*cf_opts*/) const override; IOStatus NewSequentialFile(const std::string& /*fname*/, const FileOptions& /*options*/, std::unique_ptr* /*result*/, @@ -54,14 +49,14 @@ class FlinkFileSystem : public FileSystemWrapper { const IOOptions& /*options*/, std::unique_ptr* /*result*/, IODebugContext* /*dbg*/) override; - IOStatus FileExists(const std::string& /*fname*/, + IOStatus FileExists(const std::string& /*file_name*/, const IOOptions& /*options*/, IODebugContext* /*dbg*/) override; - IOStatus GetChildren(const std::string& /*path*/, + IOStatus GetChildren(const std::string& /*file_name*/, const IOOptions& /*options*/, std::vector* /*result*/, IODebugContext* /*dbg*/) override; - IOStatus DeleteFile(const std::string& /*fname*/, + IOStatus DeleteFile(const std::string& /*file_name*/, const IOOptions& /*options*/, IODebugContext* /*dbg*/) override; IOStatus CreateDir(const std::string& /*name*/, const IOOptions& /*options*/, @@ -69,9 +64,10 @@ class FlinkFileSystem : public FileSystemWrapper { IOStatus CreateDirIfMissing(const std::string& /*name*/, const IOOptions& /*options*/, IODebugContext* /*dbg*/) override; - IOStatus DeleteDir(const std::string& /*name*/, const IOOptions& /*options*/, + IOStatus DeleteDir(const std::string& /*file_name*/, + const IOOptions& /*options*/, IODebugContext* /*dbg*/) override; - IOStatus GetFileSize(const std::string& /*fname*/, + IOStatus GetFileSize(const std::string& /*file_name*/, const IOOptions& /*options*/, uint64_t* /*size*/, IODebugContext* /*dbg*/) override; IOStatus GetFileModificationTime(const std::string& /*fname*/, @@ -91,6 +87,23 @@ class FlinkFileSystem : public FileSystemWrapper { private: std::string base_path_; + JavaClassCache* class_cache_; + jobject file_system_instance_; + + // Constructor and Destructor + explicit FlinkFileSystem(const std::shared_ptr& base, + const std::string& fsname); + + // Init FileSystem + Status Init(); + + IOStatus Delete(const std::string& /*file_name*/, + const IOOptions& /*options*/, IODebugContext* /*dbg*/, + bool /*recursive*/); + IOStatus GetFileStatus(const std::string& /*file_name*/, + const IOOptions& /*options*/, IODebugContext* /*dbg*/, + jobject* /*fileStatus*/); + std::string ConstructPath(const std::string& /*file_name*/); }; // Returns a `FlinkEnv` with base_path diff --git a/env/flink/jni_helper.cc b/env/flink/jni_helper.cc index 8d1ac5acf..d79ea777d 100644 --- a/env/flink/jni_helper.cc +++ b/env/flink/jni_helper.cc @@ -5,72 +5,320 @@ #include "jni_helper.h" +#include "jvm_util.h" + namespace ROCKSDB_NAMESPACE { -JavaClassCache::JavaClassCache(JNIEnv *env) : jni_env_(env) { +JavaClassCache::JavaClassCache(JNIEnv* env) : jni_env_(env) {} + +JavaClassCache::~JavaClassCache() { + // Release all global ref of cached jclasses + for (const auto& item : cached_java_classes_) { + if (item.javaClass) { + jni_env_->DeleteGlobalRef(item.javaClass); + } + } +} + +IOStatus JavaClassCache::Create(JNIEnv* env, + std::unique_ptr* result) { + auto classCache = new JavaClassCache(env); + IOStatus status = classCache->Init(); + result->reset(classCache); + return status; +} + +IOStatus JavaClassCache::Init() { // Set all class names - cached_java_classes_[JavaClassCache::JC_URI].className = "java/net/URI"; - cached_java_classes_[JavaClassCache::JC_BYTE_BUFFER].className = + cached_java_classes_[CachedJavaClass::JC_URI].className = "java/net/URI"; + cached_java_classes_[CachedJavaClass::JC_BYTE_BUFFER].className = "java/nio/ByteBuffer"; - cached_java_classes_[JavaClassCache::JC_THROWABLE].className = + cached_java_classes_[CachedJavaClass::JC_THROWABLE].className = "java/lang/Throwable"; - cached_java_classes_[JavaClassCache::JC_FLINK_PATH].className = + cached_java_classes_[CachedJavaClass::JC_FLINK_PATH].className = "org/apache/flink/core/fs/Path"; - cached_java_classes_[JavaClassCache::JC_FLINK_FILE_SYSTEM].className = + cached_java_classes_[CachedJavaClass::JC_FLINK_FILE_SYSTEM].className = "org/apache/flink/state/forst/fs/ForStFlinkFileSystem"; - cached_java_classes_[JavaClassCache::JC_FLINK_FILE_STATUS].className = + cached_java_classes_[CachedJavaClass::JC_FLINK_FILE_STATUS].className = "org/apache/flink/core/fs/FileStatus"; - cached_java_classes_[JavaClassCache::JC_FLINK_FS_INPUT_STREAM].className = + cached_java_classes_[CachedJavaClass::JC_FLINK_FS_INPUT_STREAM].className = "org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream"; - cached_java_classes_[JavaClassCache::JC_FLINK_FS_OUTPUT_STREAM].className = + cached_java_classes_[CachedJavaClass::JC_FLINK_FS_OUTPUT_STREAM].className = "org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream"; - // Try best to create and set the jclass objects based on the class names set - // above + // Create and set the jclass objects based on the class names set above int numCachedClasses = - sizeof(cached_java_classes_) / sizeof(javaClassAndName); + sizeof(cached_java_classes_) / sizeof(JavaClassContext); for (int i = 0; i < numCachedClasses; i++) { - initCachedClass(cached_java_classes_[i].className, - &cached_java_classes_[i].javaClass); + IOStatus status = initCachedClass(cached_java_classes_[i].className, + &cached_java_classes_[i].javaClass); + if (!status.ok()) { + return status; + } } -} -JavaClassCache::~JavaClassCache() { - // Release all global ref of cached jclasses - for (const auto &item : cached_java_classes_) { - if (item.javaClass) { - jni_env_->DeleteGlobalRef(item.javaClass); + // Set all method names, signatures and class infos + cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR] + .javaClassAndName = cached_java_classes_[JC_FLINK_PATH]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].methodName = + ""; + cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_CONSTRUCTOR].signature = + "(Lorg/apache/flink/core/fs/Path;)Z"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING] + .javaClassAndName = cached_java_classes_[JC_FLINK_PATH]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING].methodName = + "toString"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_PATH_TO_STRING].signature = + "()Ljava/lang/String;"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_URI_CONSTRUCTOR] + .javaClassAndName = cached_java_classes_[JC_URI]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_URI_CONSTRUCTOR].methodName = + ""; + cached_java_methods_[CachedJavaMethod::JM_FLINK_URI_CONSTRUCTOR].signature = + "(Ljava/lang/String;)V"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].methodName = + "get"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET].signature = + "(Ljava/net/URI;)Lorg/apache/flink/core/fs/FileSystem;"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_EXISTS] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_EXISTS] + .methodName = "exists"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_EXISTS] + .signature = "(Lorg/apache/flink/core/fs/Path;)Z"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LIST_STATUS] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LIST_STATUS] + .methodName = "listStatus"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_LIST_STATUS] + .signature = + "(Lorg/apache/flink/core/fs/Path;)[Lorg/apache/flink/core/fs/FileStatus;"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS] + .methodName = "getFileStatus"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS] + .signature = + "(Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/core/fs/FileStatus;"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_DELETE] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_DELETE] + .methodName = "delete"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_DELETE] + .signature = "(Lorg/apache/flink/core/fs/Path;Z)Z"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_MKDIR] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_MKDIR] + .methodName = "mkdirs"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_MKDIR].signature = + "(Lorg/apache/flink/core/fs/Path;)Z"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_RENAME_FILE] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_RENAME_FILE] + .methodName = "rename"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_RENAME_FILE] + .signature = + "(Lorg/apache/flink/core/fs/Path;Lorg/apache/flink/core/fs/Path;)Z"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN].methodName = + "open"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_OPEN].signature = + "(Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/state/forst/fs/" + "ByteBufferReadableFSDataInputStream;"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SEQ_READ] + .javaClassAndName = cached_java_classes_[JC_FLINK_FS_INPUT_STREAM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SEQ_READ] + .methodName = "readFully"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SEQ_READ] + .signature = "(Ljava/nio/ByteBuffer;)I"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_RANDOM_READ] + .javaClassAndName = cached_java_classes_[JC_FLINK_FS_INPUT_STREAM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_RANDOM_READ] + .methodName = "readFully"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_RANDOM_READ] + .signature = "(JLjava/nio/ByteBuffer;)I"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SKIP] + .javaClassAndName = cached_java_classes_[JC_FLINK_FS_INPUT_STREAM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SKIP] + .methodName = "skip"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_INPUT_STREAM_SKIP] + .signature = "(J)J"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_WRITE] + .javaClassAndName = cached_java_classes_[JC_FLINK_FS_OUTPUT_STREAM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_WRITE] + .methodName = "write"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_WRITE] + .signature = "(Ljava/nio/ByteBuffer;)V"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_FLUSH] + .javaClassAndName = cached_java_classes_[JC_FLINK_FS_OUTPUT_STREAM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_FLUSH] + .methodName = "flush"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_FLUSH] + .signature = "()V"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_SYNC] + .javaClassAndName = cached_java_classes_[JC_FLINK_FS_OUTPUT_STREAM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_SYNC] + .methodName = "sync"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_SYNC] + .signature = "()V"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_CLOSE] + .javaClassAndName = cached_java_classes_[JC_FLINK_FS_OUTPUT_STREAM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_CLOSE] + .methodName = "close"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FS_OUTPUT_STREAM_CLOSE] + .signature = "()V"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_CREATE] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_SYSTEM]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_CREATE] + .methodName = "create"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_SYSTEM_CREATE] + .signature = + "(Lorg/apache/flink/core/fs/Path;)Lorg/apache/flink/state/forst/fs/" + "ByteBufferWritableFSDataOutputStream;"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_PATH] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_STATUS]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_PATH] + .methodName = "getPath"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_PATH] + .signature = "()Lorg/apache/flink/core/fs/Path;"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_LEN] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_STATUS]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_LEN] + .methodName = "getLen"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_LEN] + .signature = "()J"; + + cached_java_methods_ + [CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_MODIFICATION_TIME] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_STATUS]; + cached_java_methods_ + [CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_MODIFICATION_TIME] + .methodName = "getModificationTime"; + cached_java_methods_ + [CachedJavaMethod::JM_FLINK_FILE_STATUS_GET_MODIFICATION_TIME] + .signature = "()J"; + + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_IS_DIR] + .javaClassAndName = cached_java_classes_[JC_FLINK_FILE_STATUS]; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_IS_DIR] + .methodName = "isDir"; + cached_java_methods_[CachedJavaMethod::JM_FLINK_FILE_STATUS_IS_DIR] + .signature = "()Z"; + + // Create and set the jmethod based on the method names and signatures set + // above + int numCachedMethods = + sizeof(cached_java_methods_) / sizeof(JavaMethodContext); + for (int i = 0; i < numCachedMethods; i++) { + cached_java_methods_[i].javaMethod = jni_env_->GetMethodID( + cached_java_methods_[i].javaClassAndName.javaClass, + cached_java_methods_[i].methodName, cached_java_methods_[i].signature); + + if (!cached_java_methods_[i].javaMethod) { + return IOStatus::IOError(std::string("Exception when GetMethodID, ") + .append(cached_java_methods_[i].ToString())); } } + return IOStatus::OK(); } -Status JavaClassCache::initCachedClass(const char *className, - jclass *cachedJclass) { +IOStatus JavaClassCache::initCachedClass(const char* className, + jclass* cachedJclass) { jclass tempLocalClassRef = jni_env_->FindClass(className); if (!tempLocalClassRef) { - return Status::IOError("Exception when FindClass, class name: " + - std::string(className)); + return IOStatus::IOError("Exception when FindClass, class name: " + + std::string(className)); } *cachedJclass = (jclass)jni_env_->NewGlobalRef(tempLocalClassRef); if (!*cachedJclass) { - return Status::IOError("Exception when NewGlobalRef, class name " + - std::string(className)); + return IOStatus::IOError("Exception when NewGlobalRef, class name " + + std::string(className)); } jni_env_->DeleteLocalRef(tempLocalClassRef); - return Status::OK(); + return IOStatus::OK(); +} + +JavaClassCache::JavaClassContext JavaClassCache::GetJClass( + CachedJavaClass cachedJavaClass) { + return cached_java_classes_[cachedJavaClass]; +} + +JavaClassCache::JavaMethodContext JavaClassCache::GetJMethod( + CachedJavaMethod cachedJavaMethod) { + return cached_java_methods_[cachedJavaMethod]; } -Status JavaClassCache::GetJClass(CachedJavaClass cachedJavaClass, - jclass *javaClass) { - jclass targetClass = cached_java_classes_[cachedJavaClass].javaClass; - Status status = Status::OK(); - if (!targetClass) { - status = initCachedClass(cached_java_classes_[cachedJavaClass].className, - &targetClass); +IOStatus JavaClassCache::ConstructPathInstance(const std::string& file_path, + jobject* pathInstance) { + JNIEnv* jniEnv = getJNIEnv(); + JavaClassCache::JavaClassContext pathClass = + GetJClass(JavaClassCache::JC_FLINK_PATH); + JavaClassCache::JavaMethodContext pathConstructor = + GetJMethod(JavaClassCache::JM_FLINK_PATH_CONSTRUCTOR); + jstring pathString = jniEnv->NewStringUTF(file_path.c_str()); + jobject tempPathInstance = jniEnv->NewObject( + pathClass.javaClass, pathConstructor.javaMethod, pathString); + jniEnv->DeleteLocalRef(pathString); + if (tempPathInstance == nullptr) { + return CheckThenError(std::string("Exception when ConstructPathInstance, ") + .append(pathClass.ToString()) + .append(pathConstructor.ToString()) + .append(", args: Path(") + .append(file_path) + .append(")")); } - *javaClass = targetClass; - return status; + *pathInstance = tempPathInstance; + return IOStatus::OK(); +} + +IOStatus CurrentStatus( + const std::function& exceptionMessageIfError) { + JNIEnv* jniEnv = getJNIEnv(); + if (jniEnv->ExceptionCheck()) { + // Throw Exception to Java side, stop any call from Java. + jthrowable throwable = jniEnv->ExceptionOccurred(); + jniEnv->ExceptionDescribe(); + jniEnv->ExceptionClear(); + jniEnv->Throw(throwable); + return IOStatus::IOError(exceptionMessageIfError()); + } + return IOStatus::OK(); +} + +IOStatus CheckThenError(const std::string& exceptionMessageIfError) { + JNIEnv* jniEnv = getJNIEnv(); + if (jniEnv->ExceptionCheck()) { + // Throw Exception to Java side, stop any call from Java. + jthrowable throwable = jniEnv->ExceptionOccurred(); + jniEnv->ExceptionDescribe(); + jniEnv->ExceptionClear(); + jniEnv->Throw(throwable); + } + return IOStatus::IOError(exceptionMessageIfError); } } // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/env/flink/jni_helper.h b/env/flink/jni_helper.h index 39d9e9f9a..490a4db82 100644 --- a/env/flink/jni_helper.h +++ b/env/flink/jni_helper.h @@ -3,8 +3,11 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include +#include + #include "jni.h" -#include "rocksdb/status.h" +#include "rocksdb/io_status.h" namespace ROCKSDB_NAMESPACE { @@ -24,22 +27,100 @@ class JavaClassCache { NUM_CACHED_CLASSES } CachedJavaClass; - // Constructor and Destructor - explicit JavaClassCache(JNIEnv* env); - ~JavaClassCache(); - - // Get jclass by specific CachedJavaClass - Status GetJClass(CachedJavaClass cachedJavaClass, jclass* javaClass); + // Frequently-used method type representing jmethods which will be cached. + typedef enum { + JM_FLINK_PATH_CONSTRUCTOR, + JM_FLINK_PATH_TO_STRING, + JM_FLINK_URI_CONSTRUCTOR, + JM_FLINK_FILE_SYSTEM_GET, + JM_FLINK_FILE_SYSTEM_EXISTS, + JM_FLINK_FILE_SYSTEM_LIST_STATUS, + JM_FLINK_FILE_SYSTEM_GET_FILE_STATUS, + JM_FLINK_FILE_SYSTEM_DELETE, + JM_FLINK_FILE_SYSTEM_MKDIR, + JM_FLINK_FILE_SYSTEM_RENAME_FILE, + JM_FLINK_FILE_SYSTEM_OPEN, + JM_FLINK_FS_INPUT_STREAM_SEQ_READ, + JM_FLINK_FS_INPUT_STREAM_RANDOM_READ, + JM_FLINK_FS_INPUT_STREAM_SKIP, + JM_FLINK_FS_OUTPUT_STREAM_WRITE, + JM_FLINK_FS_OUTPUT_STREAM_FLUSH, + JM_FLINK_FS_OUTPUT_STREAM_SYNC, + JM_FLINK_FS_OUTPUT_STREAM_CLOSE, + JM_FLINK_FILE_SYSTEM_CREATE, + JM_FLINK_FILE_STATUS_GET_PATH, + JM_FLINK_FILE_STATUS_GET_LEN, + JM_FLINK_FILE_STATUS_GET_MODIFICATION_TIME, + JM_FLINK_FILE_STATUS_IS_DIR, + NUM_CACHED_METHODS + } CachedJavaMethod; - private: - typedef struct { + // jclass with its context description + struct JavaClassContext { jclass javaClass; const char* className; - } javaClassAndName; + std::string ToString() const { + return std::string("className: ").append(className); + } + }; + + // jmethod with its context description + struct JavaMethodContext { + JavaClassContext javaClassAndName; + jmethodID javaMethod; + const char* methodName; + const char* signature; + + std::string ToString() const { + return javaClassAndName.ToString() + .append(", methodName: ") + .append(methodName) + .append(", signature: ") + .append(signature); + } + }; + + // Destructor. + ~JavaClassCache(); + + // Create a unique instance which inits necessary cached classes and methods. + // Return Status representing whether these classes and methods are inited + // correctly or not. + static IOStatus Create(JNIEnv* env, + std::unique_ptr* javaClassCache); + + // Get JavaClassContext by specific CachedJavaClass. + JavaClassContext GetJClass(CachedJavaClass cachedJavaClass); + + // Get JavaMethodContext by specific CachedJavaMethod. + JavaMethodContext GetJMethod(CachedJavaMethod cachedJavaMethod); + + // Construct Java Path Instance based on cached classes and method related to + // Path. + IOStatus ConstructPathInstance(const std::string& /*file_path*/, + jobject* /*pathInstance*/); + + private: JNIEnv* jni_env_; - javaClassAndName cached_java_classes_[JavaClassCache::NUM_CACHED_CLASSES]; + JavaClassContext cached_java_classes_[CachedJavaClass::NUM_CACHED_CLASSES]; + JavaMethodContext cached_java_methods_[CachedJavaMethod::NUM_CACHED_METHODS]; - Status initCachedClass(const char* className, jclass* cachedClass); + // Constructor. + explicit JavaClassCache(JNIEnv* env); + + // Init all classes and methods. + IOStatus Init(); + + // Init cached class. + IOStatus initCachedClass(const char* className, jclass* cachedClass); }; + +// Return current status of JNIEnv. +IOStatus CurrentStatus( + const std::function& /*exceptionMessageIfError*/); + +// Wrap error status of JNIEnv. +IOStatus CheckThenError(const std::string& /*exceptionMessageIfError*/); + } // namespace ROCKSDB_NAMESPACE \ No newline at end of file