Skip to content

Commit

Permalink
implement processing when a connection request is rejected due to an …
Browse files Browse the repository at this point in the history
…error other than the maximum number of connections
  • Loading branch information
t-horikawa committed Jun 24, 2024
1 parent 431e152 commit dd6b8db
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.tsurugidb.tsubakuro.channel.ipc.connection;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -26,8 +27,8 @@ public final class IpcConnectorImpl implements Connector {

private static native long getConnectorNative(String name) throws IOException;
private static native long requestNative(long handle) throws IOException;
private static native long waitNative(long handle, long id);
private static native long waitNative(long handle, long id, long timeout) throws TimeoutException;
private static native long waitNative(long handle, long id) throws ConnectException;
private static native long waitNative(long handle, long id, long timeout) throws ConnectException, TimeoutException;
private static native boolean checkNative(long handle, long id);
private static native void closeConnectorNative(long handle);

Expand Down
28 changes: 23 additions & 5 deletions modules/ipc/src/main/native/include/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -1004,11 +1004,11 @@ class connection_queue
index_queue(std::size_t size, boost::interprocess::managed_shared_memory::segment_manager* mgr) : queue_(mgr), capacity_(size) {
queue_.resize(capacity_);
}
void fill() {
void fill(std::uint8_t admin_slots) {
for (std::size_t i = 0; i < capacity_; i++) {
queue_.at(i) = i;
}
pushed_.store(capacity_);
pushed_.store(capacity_ - admin_slots);
}
void push(std::size_t len) {
boost::interprocess::scoped_lock lock(mutex_);
Expand All @@ -1020,7 +1020,18 @@ class connection_queue
[[nodiscard]] std::size_t try_pop() {
auto current = poped_.load();
while (true) {
if (pushed_.load() == current) {
if (pushed_.load() <= current) {
throw std::runtime_error("no request available");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
return queue_.at(index(current));
}
}
}
[[nodiscard]] std::size_t try_pop(std::uint8_t admin_slots) {
auto current = poped_.load();
while (true) {
if ((pushed_.load() + admin_slots) <= current) {
throw std::runtime_error("no request available");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
Expand Down Expand Up @@ -1129,8 +1140,9 @@ class connection_queue
/**
* @brief Construct a new object.
*/
connection_queue(std::size_t n, boost::interprocess::managed_shared_memory::segment_manager* mgr) : q_free_(n, mgr), q_requested_(n, mgr), v_requested_(n, mgr) {
q_free_.fill();
connection_queue(std::size_t n, boost::interprocess::managed_shared_memory::segment_manager* mgr, std::uint8_t as_n)
: q_free_(n + as_n, mgr), q_requested_(n + as_n, mgr), v_requested_(n + as_n, mgr), admin_slots_(as_n) {
q_free_.fill(as_n);
}
~connection_queue() = default;

Expand All @@ -1147,6 +1159,11 @@ class connection_queue
q_requested_.push(rid);
return rid;
}
std::size_t request_admin() {
auto rid = q_free_.try_pop(admin_slots_);
q_requested_.push(rid);
return rid;
}
std::size_t wait(std::size_t rid, std::int64_t timeout = 0) {
auto& entry = v_requested_.at(rid);
try {
Expand Down Expand Up @@ -1209,6 +1226,7 @@ class connection_queue
boost::interprocess::vector<element, element_allocator> v_requested_;

std::atomic_bool terminate_{false};
std::uint8_t admin_slots_;
boost::interprocess::interprocess_semaphore s_terminated_{0};

std::size_t session_id_{};
Expand Down
29 changes: 18 additions & 11 deletions modules/ipc/src/main/native/src/connectionJNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,7 @@ JNIEXPORT jlong JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_connection_IpcC
{
connection_container* container = reinterpret_cast<connection_container*>(static_cast<std::uintptr_t>(handle));
try {
if (auto session_id = container->get_connection_queue().request(); session_id != tateyama::common::wire::connection_queue::session_id_indicating_error) {
return session_id;
}
jclass classj = env->FindClass("Ljava/net/ConnectException;");
if (classj == nullptr) { std::abort(); }
env->ThrowNew(classj, "IPC connection establishment failure");
env->DeleteLocalRef(classj);
return 0;
return container->get_connection_queue().request();
} catch (std::runtime_error &e) {
jclass classj = env->FindClass("Ljava/io/IOException;");
if (classj == nullptr) { std::abort(); }
Expand All @@ -80,10 +73,17 @@ JNIEXPORT jlong JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_connection_IpcC
* Signature: (JJ)J
*/
JNIEXPORT jlong JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_connection_IpcConnectorImpl_waitNative__JJ
(JNIEnv *, jclass, jlong handle, jlong id)
(JNIEnv *env, jclass, jlong handle, jlong id)
{
connection_container* container = reinterpret_cast<connection_container*>(static_cast<std::uintptr_t>(handle));
return container->get_connection_queue().wait(id);
if (auto session_id = container->get_connection_queue().wait(id); session_id != tateyama::common::wire::connection_queue::session_id_indicating_error) {
return session_id;
}
jclass classj = env->FindClass("Ljava/net/ConnectException;");
if (classj == nullptr) { std::abort(); }
env->ThrowNew(classj, "IPC connection establishment failure");
env->DeleteLocalRef(classj);
return 0;
}

/*
Expand All @@ -96,7 +96,14 @@ JNIEXPORT jlong JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_connection_IpcC
{
connection_container* container = reinterpret_cast<connection_container*>(static_cast<std::uintptr_t>(handle));
try {
return container->get_connection_queue().wait(id, timeout / 1000);
if (auto session_id = container->get_connection_queue().wait(id, timeout / 1000); session_id != tateyama::common::wire::connection_queue::session_id_indicating_error) {
return session_id;
}
jclass classj = env->FindClass("Ljava/net/ConnectException;");
if (classj == nullptr) { std::abort(); }
env->ThrowNew(classj, "IPC connection establishment failure");
env->DeleteLocalRef(classj);
return 0;
} catch (std::runtime_error &e) {
jclass classj = env->FindClass("Ljava/util/concurrent/TimeoutException;");
if (classj == nullptr) { std::abort(); }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.tsurugidb.tsubakuro.channel.ipc.connection;

import java.net.ConnectException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -40,6 +42,48 @@ void connect() throws Exception {
server.close();
}

@Test
void reject() throws Exception {
ServerConnectionImpl serverConnection;

serverConnection = new ServerConnectionImpl(dbName);
// assertEquals(serverConnection.listen(), 0);

var connector = new IpcConnectorImpl(dbName);
var future = connector.connect();
var id = serverConnection.listen();
assertEquals(id, 1);
serverConnection.reject();

Throwable exception = assertThrows(ConnectException.class, () -> {
future.get();
});
assertEquals("IPC connection establishment failure", exception.getMessage());

serverConnection.close();
}

@Test
void reject_timeout() throws Exception {
ServerConnectionImpl serverConnection;

serverConnection = new ServerConnectionImpl(dbName);
// assertEquals(serverConnection.listen(), 0);

var connector = new IpcConnectorImpl(dbName);
var future = connector.connect();
var id = serverConnection.listen();
assertEquals(id, 1);
serverConnection.reject();

Throwable exception = assertThrows(ConnectException.class, () -> {
future.get(1, TimeUnit.SECONDS);
});
assertEquals("IPC connection establishment failure", exception.getMessage());

serverConnection.close();
}

@Test
void timeout() throws Exception {
try (var serverConnection = new ServerConnectionImpl(dbName)) {
Expand All @@ -52,7 +96,7 @@ void timeout() throws Exception {

var start = System.currentTimeMillis();
Throwable exception = assertThrows(TimeoutException.class, () -> {
var client = (WireImpl) future.get(1, TimeUnit.SECONDS);
future.get(1, TimeUnit.SECONDS);
});
assertEquals("connection response has not been accepted within the specified time", exception.getMessage());
var duration = System.currentTimeMillis() - start;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class ServerConnectionImpl implements Closeable {
private static native long createNative(String name);
private static native long listenNative(long handle);
private static native void acceptNative(long handle, long id);
private static native void rejectNative(long handle);
private static native void closeNative(long handle);

static {
Expand All @@ -36,6 +37,10 @@ public ServerWireImpl accept(long id) throws IOException {
return rv;
}

public void reject() throws IOException {
rejectNative(handle);
}

public void close() throws IOException {
closeNative(handle);
}
Expand Down
2 changes: 1 addition & 1 deletion modules/ipc/src/test/native/include/server_wires.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class connection_container
managed_shared_memory_ =
std::make_unique<boost::interprocess::managed_shared_memory>(boost::interprocess::create_only, name_.c_str(), request_queue_size);
managed_shared_memory_->destroy<connection_queue>(connection_queue::name);
connection_queue_ = managed_shared_memory_->construct<connection_queue>(connection_queue::name)(10, managed_shared_memory_->get_segment_manager());
connection_queue_ = managed_shared_memory_->construct<connection_queue>(connection_queue::name)(10, managed_shared_memory_->get_segment_manager(), 1);
}
catch(const boost::interprocess::interprocess_exception& ex) {
std::abort(); // FIXME
Expand Down
13 changes: 13 additions & 0 deletions modules/ipc/src/test/native/src/server_connectionJNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ JNIEXPORT void JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_connection_Serve
container->get_connection_queue().accept(container->get_connection_queue().slot(), id);
}

/*
* Class: com_tsurugidb_tsubakuro_channel_ipc_connection_ServerConnectionImpl
* Method: rejectNative
* Signature: (J)V
*/
JNIEXPORT void JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_connection_ServerConnectionImpl_rejectNative
(JNIEnv *, jclass, jlong handle)
{
connection_container* container = reinterpret_cast<connection_container*>(static_cast<std::uintptr_t>(handle));

container->get_connection_queue().reject(container->get_connection_queue().slot());
}

/*
* Class: com_tsurugidb_tsubakuro_channel_ipc_connection_ServerConnectionImpl
* Method: closeNative
Expand Down

0 comments on commit dd6b8db

Please sign in to comment.