diff --git a/env/flink/env_flink.cc b/env/flink/env_flink.cc index 290aa215b..9ff8f5b6d 100644 --- a/env/flink/env_flink.cc +++ b/env/flink/env_flink.cc @@ -855,4 +855,25 @@ Status FlinkFileSystem::Create(const std::shared_ptr& base, result->reset(fileSystem); return status; } + +Status NewFlinkEnv(const std::string& uri, + std::unique_ptr* flinkFileSystem) { + std::shared_ptr fs; + Status s = NewFlinkFileSystem(uri, &fs); + if (s.ok()) { + *flinkFileSystem = NewCompositeEnv(fs); + } + return s; +} + +Status NewFlinkFileSystem(const std::string& uri, + std::shared_ptr* fs) { + std::unique_ptr flinkFileSystem; + Status s = + FlinkFileSystem::Create(FileSystem::Default(), uri, &flinkFileSystem); + if (s.ok()) { + fs->reset(flinkFileSystem.release()); + } + return s; +} } // namespace ROCKSDB_NAMESPACE diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 9c4e9d308..759f9967a 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -30,6 +30,7 @@ set(JNI_NATIVE_SOURCES rocksjni/concurrent_task_limiter.cc rocksjni/config_options.cc rocksjni/env.cc + rocksjni/env_flink.cc rocksjni/env_options.cc rocksjni/event_listener.cc rocksjni/event_listener_jnicallback.cc @@ -157,6 +158,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/Filter.java src/main/java/org/rocksdb/FileOperationInfo.java src/main/java/org/rocksdb/FlinkCompactionFilter.java + src/main/java/org/rocksdb/FlinkEnv.java src/main/java/org/rocksdb/FlushJobInfo.java src/main/java/org/rocksdb/FlushReason.java src/main/java/org/rocksdb/FlushOptions.java @@ -459,6 +461,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4") org.rocksdb.EnvOptions org.rocksdb.Filter org.rocksdb.FlinkCompactionFilter + org.rocksdb.FlinkEnv org.rocksdb.FlushOptions org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig diff --git a/java/rocksjni/env_flink.cc b/java/rocksjni/env_flink.cc new file mode 100644 index 000000000..f6d4b44ca --- /dev/null +++ b/java/rocksjni/env_flink.cc @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "env/flink/env_flink.h" + +#include + +#include + +#include "java/rocksjni/portal.h" +#include "rocksdb/env.h" + +/* + * Class: org_rocksdb_FlinkEnv + * Method: createFlinkEnv + * Signature: (Ljava/lang/String;)J + */ +jlong Java_org_rocksdb_FlinkEnv_createFlinkEnv(JNIEnv* env, jclass, + jstring base_path) { + jboolean has_exception = JNI_FALSE; + auto path = + ROCKSDB_NAMESPACE::JniUtil::copyStdString(env, base_path, &has_exception); + if (has_exception == JNI_TRUE) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew( + env, "Could not copy jstring to std::string"); + return 0; + } + std::unique_ptr flink_env; + auto status = ROCKSDB_NAMESPACE::NewFlinkEnv(path, &flink_env); + if (!status.ok()) { + ROCKSDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, status); + return 0; + } + auto ptr_as_handle = flink_env.release(); + return reinterpret_cast(ptr_as_handle); +} + +/* + * Class: org_rocksdb_FlinkEnv + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_FlinkEnv_disposeInternal(JNIEnv*, jobject, + jlong jhandle) { + auto* handle = reinterpret_cast(jhandle); + assert(handle != nullptr); + delete handle; +} diff --git a/java/src/main/java/org/rocksdb/FlinkEnv.java b/java/src/main/java/org/rocksdb/FlinkEnv.java new file mode 100644 index 000000000..91e6d46b6 --- /dev/null +++ b/java/src/main/java/org/rocksdb/FlinkEnv.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.rocksdb; + +/** + * Flink Env which proxy all filesystem access to Flink FileSystem. + */ +public class FlinkEnv extends Env { + /** +

Creates a new environment that is used for Flink environment.

+ * + *

The caller must delete the result when it is + * no longer needed.

+ * + * @param basePath the base path string for the given Flink file system, + * formatted as "{fs-schema-supported-by-flink}://xxx" + */ + public FlinkEnv(final String basePath) { + super(createFlinkEnv(basePath)); + } + + private static native long createFlinkEnv(final String basePath); + + @Override protected final native void disposeInternal(final long handle); +} \ No newline at end of file diff --git a/src.mk b/src.mk index 9629e7ec8..41f4c0076 100644 --- a/src.mk +++ b/src.mk @@ -663,6 +663,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/config_options.cc \ java/rocksjni/export_import_files_metadatajni.cc \ java/rocksjni/env.cc \ + java/rocksjni/env_flink.cc \ java/rocksjni/env_options.cc \ java/rocksjni/event_listener.cc \ java/rocksjni/event_listener_jnicallback.cc \