Skip to content

Commit

Permalink
[env] Support JNI of FlinkEnv
Browse files Browse the repository at this point in the history
  • Loading branch information
ljz2051 committed Mar 21, 2024
1 parent a5c920d commit 5c81b44
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 0 deletions.
21 changes: 21 additions & 0 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -855,4 +855,25 @@ Status FlinkFileSystem::Create(const std::shared_ptr<FileSystem>& base,
result->reset(fileSystem);
return status;
}

Status NewFlinkEnv(const std::string& uri,
std::unique_ptr<Env>* flinkFileSystem) {
std::shared_ptr<FileSystem> fs;
Status s = NewFlinkFileSystem(uri, &fs);
if (s.ok()) {
*flinkFileSystem = NewCompositeEnv(fs);
}
return s;
}

Status NewFlinkFileSystem(const std::string& uri,
std::shared_ptr<FileSystem>* fs) {
std::unique_ptr<FileSystem> flinkFileSystem;
Status s =
FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem);
if (s.ok()) {
fs->reset(flinkFileSystem.release());
}
return s;
}
} // namespace ROCKSDB_NAMESPACE
1 change: 1 addition & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/concurrent_task_limiter.cc
rocksjni/config_options.cc
rocksjni/env.cc
rocksjni/env_flink.cc
rocksjni/env_options.cc
rocksjni/event_listener.cc
rocksjni/event_listener_jnicallback.cc
Expand Down
53 changes: 53 additions & 0 deletions java/rocksjni/env_flink.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the "bridge" between Java and C++ and enables
// calling c++ ROCKSDB_NAMESPACE::Env methods from Java side.

#include "env/flink/env_flink.h"

#include <jni.h>

#include <vector>

#include "java/rocksjni/portal.h"
#include "rocksdb/env.h"

/*
* Class: org_rocksdb_FlinkEnv
* Method: createFlinkEnv
* Signature: (Ljava/lang/String;)J
*/
jlong Java_org_rocksdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass,
jstring j_fs_name) {
jboolean has_exception = JNI_FALSE;
auto fs_name =
ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, j_fs_name, &has_exception);
if (has_exception == JNI_TRUE) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(
env, "Could not copy jstring to std::string");
return 0;
}
std::unique_ptr<ROCKSDB_NAMESPACE::Env> flink_env;
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(fs_name, &flink_env);
if (!status.ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);
return 0;
}
auto ptr_as_handle = flink_env.release();
return reinterpret_cast<jlong>(ptr_as_handle);
}

/*
* Class: org_rocksdb_FlinkEnv
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_FlinkEnv_disposeInternal(JNIEnv*, jobject,
jlong jhandle) {
auto* handle = reinterpret_cast<ROCKSDB_NAMESPACE::Env*>(jhandle);
assert(handle != nullptr);
delete handle;
}
27 changes: 27 additions & 0 deletions java/src/main/java/org/rocksdb/FlinkEnv.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* Flink Env which proxy all filesystem access to Flink FileSystem.
*/
public class FlinkEnv extends Env {
/**
<p>Creates a new environment that is used for Flink environment.</p>
*
* <p>The caller must delete the result when it is
* no longer needed.</p>
*
* @param fsName the filesystem name as a string in the form "flink://hostname:port/"
*/
public FlinkEnv(final String fsName) {
super(createFlinkEnv(fsName));
}

private static native long createFlinkEnv(final String fsName);

@Override protected final native void disposeInternal(final long handle);
}
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/config_options.cc \
java/rocksjni/export_import_files_metadatajni.cc \
java/rocksjni/env.cc \
java/rocksjni/env_flink.cc \
java/rocksjni/env_options.cc \
java/rocksjni/event_listener.cc \
java/rocksjni/event_listener_jnicallback.cc \
Expand Down

0 comments on commit 5c81b44

Please sign in to comment.