Skip to content

Commit

Permalink
[env] Support JNI of FlinkEnv (#12)
Browse files Browse the repository at this point in the history
* [env] Support JNI of FlinkEnv
  • Loading branch information
ljz2051 authored Mar 21, 2024
1 parent a5c920d commit ec88681
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 0 deletions.
21 changes: 21 additions & 0 deletions env/flink/env_flink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -855,4 +855,25 @@ Status FlinkFileSystem::Create(const std::shared_ptr<FileSystem>& base,
result->reset(fileSystem);
return status;
}

Status NewFlinkEnv(const std::string& uri,
std::unique_ptr<Env>* flinkFileSystem) {
std::shared_ptr<FileSystem> fs;
Status s = NewFlinkFileSystem(uri, &fs);
if (s.ok()) {
*flinkFileSystem = NewCompositeEnv(fs);
}
return s;
}

Status NewFlinkFileSystem(const std::string& uri,
std::shared_ptr<FileSystem>* fs) {
std::unique_ptr<FileSystem> flinkFileSystem;
Status s =
FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem);
if (s.ok()) {
fs->reset(flinkFileSystem.release());
}
return s;
}
} // namespace ROCKSDB_NAMESPACE
3 changes: 3 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/concurrent_task_limiter.cc
rocksjni/config_options.cc
rocksjni/env.cc
rocksjni/env_flink.cc
rocksjni/env_options.cc
rocksjni/event_listener.cc
rocksjni/event_listener_jnicallback.cc
Expand Down Expand Up @@ -157,6 +158,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/Filter.java
src/main/java/org/rocksdb/FileOperationInfo.java
src/main/java/org/rocksdb/FlinkCompactionFilter.java
src/main/java/org/rocksdb/FlinkEnv.java
src/main/java/org/rocksdb/FlushJobInfo.java
src/main/java/org/rocksdb/FlushReason.java
src/main/java/org/rocksdb/FlushOptions.java
Expand Down Expand Up @@ -459,6 +461,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4")
org.rocksdb.EnvOptions
org.rocksdb.Filter
org.rocksdb.FlinkCompactionFilter
org.rocksdb.FlinkEnv
org.rocksdb.FlushOptions
org.rocksdb.HashLinkedListMemTableConfig
org.rocksdb.HashSkipListMemTableConfig
Expand Down
63 changes: 63 additions & 0 deletions java/rocksjni/env_flink.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "env/flink/env_flink.h"

#include <jni.h>

#include <vector>

#include "java/rocksjni/portal.h"
#include "rocksdb/env.h"

/*
* Class: org_rocksdb_FlinkEnv
* Method: createFlinkEnv
* Signature: (Ljava/lang/String;)J
*/
jlong Java_org_rocksdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass,
jstring base_path) {
jboolean has_exception = JNI_FALSE;
auto path =
ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, base_path, &has_exception);
if (has_exception == JNI_TRUE) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(
env, "Could not copy jstring to std::string");
return 0;
}
std::unique_ptr<ROCKSDB_NAMESPACE::Env> flink_env;
auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env);
if (!status.ok()) {
ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status);
return 0;
}
auto ptr_as_handle = flink_env.release();
return reinterpret_cast<jlong>(ptr_as_handle);
}

/*
* Class: org_rocksdb_FlinkEnv
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_FlinkEnv_disposeInternal(JNIEnv*, jobject,
jlong jhandle) {
auto* handle = reinterpret_cast<ROCKSDB_NAMESPACE::Env*>(jhandle);
assert(handle != nullptr);
delete handle;
}
41 changes: 41 additions & 0 deletions java/src/main/java/org/rocksdb/FlinkEnv.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.rocksdb;

/**
* Flink Env which proxy all filesystem access to Flink FileSystem.
*/
public class FlinkEnv extends Env {
/**
<p>Creates a new environment that is used for Flink environment.</p>
*
* <p>The caller must delete the result when it is
* no longer needed.</p>
*
* @param basePath the base path string for the given Flink file system,
* formatted as "{fs-schema-supported-by-flink}://xxx"
*/
public FlinkEnv(final String basePath) {
super(createFlinkEnv(basePath));
}

private static native long createFlinkEnv(final String basePath);

@Override protected final native void disposeInternal(final long handle);
}
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ JNI_NATIVE_SOURCES = \
java/rocksjni/config_options.cc \
java/rocksjni/export_import_files_metadatajni.cc \
java/rocksjni/env.cc \
java/rocksjni/env_flink.cc \
java/rocksjni/env_options.cc \
java/rocksjni/event_listener.cc \
java/rocksjni/event_listener_jnicallback.cc \
Expand Down

0 comments on commit ec88681

Please sign in to comment.