Skip to content

Commit

Permalink
make Transaction and PreparedStatement close() execute asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Nov 21, 2024
1 parent b0c9c6e commit 9172b47
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ChannelResponse;
import com.tsurugidb.tsubakuro.client.SessionAlreadyClosedException;
import com.tsurugidb.tsubakuro.exception.ServerException;
import com.tsurugidb.tsubakuro.util.ServerResource;

/**
Expand All @@ -38,21 +39,38 @@ public class Disposer extends Thread {

private AtomicBoolean started = new AtomicBoolean();

private Queue<ForegroundFutureResponse<?>> queue = new ArrayDeque<>();
private Queue<ForegroundFutureResponse<?>> futureResponseQueue = new ArrayDeque<>();

private AtomicBoolean finished = new AtomicBoolean();
private Queue<DelayedClose> serverResourceQueue = new ArrayDeque<>();

private final AtomicReference<CleanUp> cleanUp = new AtomicReference<>();
private AtomicBoolean empty = new AtomicBoolean();

private final AtomicReference<DelayedShutdown> shutdown = new AtomicReference<>();

private final AtomicReference<DelayedClose> close = new AtomicReference<>();

/**
* Enclodure of delayed clean up procedure.
*/
public interface CleanUp {
public interface DelayedShutdown {
/**
* clean up procedure.
* @throws IOException An error was occurred while cleanUP() is executed.
*/
void cleanUp() throws IOException;
void shutdown() throws IOException;
}

/**
* Enclodure of delayed clean up procedure.
*/
public interface DelayedClose {
/**
* invoke the close() procedure of its belonging object.
* @throws ServerException if error was occurred while disposing this session
* @throws IOException if I/O error was occurred while disposing this session
* @throws InterruptedException if interrupted while disposing this session
*/
void delayedClose() throws ServerException, IOException, InterruptedException;
}

/**
Expand All @@ -67,8 +85,8 @@ public void run() {

while (true) {
ForegroundFutureResponse<?> futureResponse;
synchronized (queue) {
futureResponse = queue.poll();
synchronized (futureResponseQueue) {
futureResponse = futureResponseQueue.poll();
}
if (futureResponse != null) {
try {
Expand All @@ -89,7 +107,7 @@ public void run() {
continue;
} catch (TimeoutException e) {
// Let's try again
queue.add(futureResponse);
futureResponseQueue.add(futureResponse);
continue;
} catch (Exception e) { // should not occur
if (exception == null) {
Expand All @@ -99,10 +117,22 @@ public void run() {
}
continue;
}
} else {
if (cleanUp.get() != null) {
break;
}
DelayedClose serverResource;
synchronized (serverResourceQueue) {
serverResource = serverResourceQueue.poll();
}
if (serverResource != null) {
try {
serverResource.delayedClose();
} catch (ServerException | IOException | InterruptedException e) {
exception = addSuppressed(exception, e);
}
continue;
}
notifyEmpty();
if (shutdown.get() != null || close.get() != null) {
break;
}
try {
Thread.sleep(10);
Expand All @@ -112,12 +142,20 @@ public void run() {
}

try {
cleanUp.get().cleanUp();
var sh = shutdown.get();
if (sh != null) {
sh.shutdown();
}
} catch (IOException e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
exception = addSuppressed(exception, e);
} finally {
try {
var cl = close.get();
if (cl != null) {
cl.delayedClose();
}
} catch (ServerException | IOException | InterruptedException e) {
exception = addSuppressed(exception, e);
}
}

Expand All @@ -127,38 +165,86 @@ public void run() {
}
}

void add(ForegroundFutureResponse<?> futureResponse) {
private Exception addSuppressed(Exception exception, Exception e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
return exception;
}

synchronized void add(ForegroundFutureResponse<?> futureResponse) {
if (shutdown.get() != null || close.get() != null) {
throw new AssertionError("Session already closed");
}
synchronized (futureResponseQueue) {
futureResponseQueue.add(futureResponse);
}
if (!started.getAndSet(true)) {
this.start();
}
}

/**
* Add a DelayedClose object containing a close procedure for a certain ServerResource object.
* If disposer thread has not started, a disposer thread will be started.
* @param resource the DelayedClose to be added
*/
public synchronized void add(DelayedClose resource) {
if (shutdown.get() != null || close.get() != null) {
throw new AssertionError("Session already closed");
}
synchronized (serverResourceQueue) {
serverResourceQueue.add(resource);
}
if (!started.getAndSet(true)) {
this.start();
}
synchronized (queue) {
queue.add(futureResponse);
}

/**
* Register a delayed shutdown procesure of the Session.
* If disposer thread has not started, cleanUp() is executed.
* NOTE: This method is assumed to be called only in close and/or shutdown of a Session.
* @param c the clean up procesure to be registered
* @throws IOException An error was occurred when c.shoutdown() has been immediately executed.
*/
public synchronized void registerDelayedShutdown(DelayedShutdown c) throws IOException {
if (!started.getAndSet(true)) {
c.shutdown();
empty.set(true);
return;
}
shutdown.set(c);
}

/**
* Register a clean up procesure.
* Register a delayed close object in charge of asynchronous close of the Session.
* If disposer thread has not started, cleanUp() is executed.
* NOTE: This method is assumed to be called only in close and/or shutdown of a Session.
* @param c the clean up procesure to be registered
* @throws IOException An error was occurred when CleanUP c has been immediately executed.
* @throws ServerException if error was occurred while disposing this session
* @throws IOException if I/O error was occurred while disposing this session
* @throws InterruptedException if interrupted while disposing this session
*/
public void registerCleanUp(CleanUp c) throws IOException {
public synchronized void registerDelayedClose(DelayedClose c) throws ServerException, IOException, InterruptedException {
if (!started.getAndSet(true)) {
c.cleanUp();
c.delayedClose();
empty.set(true);
return;
}
cleanUp.set(c);
close.set(c);
}

/**
* Wait until the release of the server resource corresponding to the response
* closed without getting is completed.
* NOTE: This method must be called with the guarantee that no subsequent add() will be called.
*/
public synchronized void waitForFinish() {
public synchronized void waitForEmpty() {
if (started.get()) {
while (finished.get()) {
while (!empty.get()) {
try {
wait();
} catch (InterruptedException e) {
Expand All @@ -168,8 +254,8 @@ public synchronized void waitForFinish() {
}
}

private synchronized void notifyFinish() {
finished.set(false);
private synchronized void notifyEmpty() {
empty.set(true);
notify();
}
}
Loading

0 comments on commit 9172b47

Please sign in to comment.