Skip to content

Commit

Permalink
Merge pull request #500 from project-tsurugi/r
Browse files Browse the repository at this point in the history
return to 87b3aba
  • Loading branch information
t-horikawa authored Nov 22, 2024
2 parents 847b577 + 0ea1836 commit 712a02a
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
*/
package com.tsurugidb.tsubakuro.channel.common.connection;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tsurugidb.tsubakuro.channel.common.connection.wire.impl.ChannelResponse;
import com.tsurugidb.tsubakuro.client.SessionAlreadyClosedException;
import com.tsurugidb.tsubakuro.exception.ServerException;
// import com.tsurugidb.tsubakuro.client.SessionAlreadyClosedException;
import com.tsurugidb.tsubakuro.util.ServerResource;

/**
Expand All @@ -39,54 +39,32 @@ public class Disposer extends Thread {

private AtomicBoolean started = new AtomicBoolean();

private Queue<ForegroundFutureResponse<?>> futureResponseQueue = new ArrayDeque<>();

private Queue<DelayedClose> serverResourceQueue = new ArrayDeque<>();

private AtomicBoolean empty = new AtomicBoolean();

private final AtomicReference<DelayedShutdown> shutdown = new AtomicReference<>();
private AtomicBoolean sessionClosed = new AtomicBoolean();

private final AtomicReference<DelayedClose> close = new AtomicReference<>();
private Queue<ForegroundFutureResponse<?>> queue = new ArrayDeque<>();

/**
* Enclodure of delayed clean up procedure.
*/
public interface DelayedShutdown {
/**
* clean up procedure.
* @throws IOException An error was occurred while cleanUP() is executed.
*/
void shutdown() throws IOException;
}
private AtomicBoolean queueHasEntry = new AtomicBoolean();

/**
* Enclodure of delayed clean up procedure.
*/
public interface DelayedClose {
/**
* invoke the close() procedure of its belonging object.
* @throws ServerException if error was occurred while disposing this session
* @throws IOException if I/O error was occurred while disposing this session
* @throws InterruptedException if interrupted while disposing this session
*/
void delayedClose() throws ServerException, IOException, InterruptedException;
}
private ServerResource session;

/**
* Creates a new instance.
* @param session the current session which this blongs to
*/
public Disposer() {
public Disposer(@Nonnull ServerResource session) {
Objects.requireNonNull(session);
this.session = session;
}

@Override
public void run() {
Exception exception = null;

while (true) {
ForegroundFutureResponse<?> futureResponse;
synchronized (futureResponseQueue) {
futureResponse = futureResponseQueue.poll();
synchronized (queue) {
futureResponse = queue.poll();
}
if (sessionClosed.get() && futureResponse == null) {
break;
}
if (futureResponse != null) {
try {
Expand All @@ -99,163 +77,80 @@ public void run() {
// Server resource has not created at the server
continue;
} catch (SessionAlreadyClosedException e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
continue;
// Server resource has been disposed by the session close
throw new AssertionError("SessionAlreadyClosedException should not occur in the current server implementation"); // FIXME remove this line
// continue;
} catch (TimeoutException e) {
// Let's try again
futureResponseQueue.add(futureResponse);
queue.add(futureResponse);
continue;
} catch (Exception e) { // should not occur
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
} catch (Exception e) {
// should not occur
LOG.info(e.getMessage());
continue;
}
} else {
notifyQueueIsEmpty();
}
DelayedClose serverResource;
synchronized (serverResourceQueue) {
serverResource = serverResourceQueue.poll();
}
if (serverResource != null) {
if (!sessionClosed.get()) {
try {
serverResource.delayedClose();
} catch (ServerException | IOException | InterruptedException e) {
exception = addSuppressed(exception, e);
}
continue;
}
notifyEmpty();
if (shutdown.get() != null || close.get() != null) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// No problem, it's OK
}
}

try {
var sh = shutdown.get();
if (sh != null) {
sh.shutdown();
}
} catch (IOException e) {
exception = addSuppressed(exception, e);
} finally {
try {
var cl = close.get();
if (cl != null) {
cl.delayedClose();
Thread.sleep(10);
} catch (InterruptedException e) {
// No problem, it's OK
}
} catch (ServerException | IOException | InterruptedException e) {
exception = addSuppressed(exception, e);
}
}

if (exception != null) {
LOG.info(exception.getMessage());
throw new UncheckedIOException(new IOException(exception));
}
}

private Exception addSuppressed(Exception exception, Exception e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
return exception;
}

synchronized void add(ForegroundFutureResponse<?> futureResponse) {
if (shutdown.get() != null || close.get() != null) {
throw new AssertionError("Session already closed");
}
synchronized (futureResponseQueue) {
futureResponseQueue.add(futureResponse);
}
if (!started.getAndSet(true)) {
this.start();
}
// FIXME Revive these lines when the server implementation improves.
// try {
// session.close();
// } catch (Exception e) {
// LOG.error(e.getMessage());
// }
}

/**
* Add a DelayedClose object containing a close procedure for a certain ServerResource object.
* If disposer thread has not started, a disposer thread will be started.
* @param resource the DelayedClose to be added
* Receive notification that the session is to be closed soon and
* let the caller know if the session can be closed immediately.
* NOTE: This method is assumed to be called from Session.close() only.
* @return true if the session can be closed immediately
* as the queue that stores unhandled ForegroundFutureResponse is empty
*/
public synchronized void add(DelayedClose resource) {
if (shutdown.get() != null || close.get() != null) {
throw new AssertionError("Session already closed");
}
synchronized (serverResourceQueue) {
serverResourceQueue.add(resource);
}
if (!started.getAndSet(true)) {
this.start();
public boolean prepareCloseAndIsEmpty() {
// FIXME Remove the following line when the server implementation improves.
waitForFinishDisposal();
synchronized (queue) {
sessionClosed.set(true);
return queue.isEmpty();
}
}

/**
* Register a delayed shutdown procesure of the Session.
* If disposer thread has not started, cleanUp() is executed.
* NOTE: This method is assumed to be called only in close and/or shutdown of a Session.
* @param c the clean up procesure to be registered
* @throws IOException An error was occurred when c.shoutdown() has been immediately executed.
*/
public synchronized void registerDelayedShutdown(DelayedShutdown c) throws IOException {
void add(ForegroundFutureResponse<?> futureResponse) {
if (!started.getAndSet(true)) {
c.shutdown();
empty.set(true);
return;
this.start();
}
shutdown.set(c);
}

/**
* Register a delayed close object in charge of asynchronous close of the Session.
* If disposer thread has not started, cleanUp() is executed.
* NOTE: This method is assumed to be called only in close and/or shutdown of a Session.
* @param c the clean up procesure to be registered
* @throws ServerException if error was occurred while disposing this session
* @throws IOException if I/O error was occurred while disposing this session
* @throws InterruptedException if interrupted while disposing this session
*/
public synchronized void registerDelayedClose(DelayedClose c) throws ServerException, IOException, InterruptedException {
if (!started.getAndSet(true)) {
c.delayedClose();
empty.set(true);
return;
synchronized (queue) {
queue.add(futureResponse);
queueHasEntry.set(true);
}
close.set(c);
}

/**
* Wait until the release of the server resource corresponding to the response
* closed without getting is completed.
* NOTE: This method must be called with the guarantee that no subsequent add() will be called.
*/
public synchronized void waitForEmpty() {
if (started.get()) {
while (!empty.get()) {
try {
wait();
} catch (InterruptedException e) {
continue;
}
public synchronized void waitForFinishDisposal() {
while (queueHasEntry.get()) {
try {
wait();
} catch (InterruptedException e) {
continue;
}
}
}

private synchronized void notifyEmpty() {
empty.set(true);
private synchronized void notifyQueueIsEmpty() {
queueHasEntry.set(false);
notify();
}
}
Loading

0 comments on commit 712a02a

Please sign in to comment.