Skip to content

Commit

Permalink
apply commnets
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Dec 9, 2024
1 parent 4abe9a9 commit b2869b2
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -39,13 +38,13 @@ public class Disposer extends Thread {

private AtomicBoolean started = new AtomicBoolean();

private Queue<ForegroundFutureResponse<?>> futureResponseQueue = new ArrayDeque<>();
private ConcurrentLinkedQueue<ForegroundFutureResponse<?>> futureResponseQueue = new ConcurrentLinkedQueue<>();

private Queue<DelayedClose> serverResourceQueue = new ArrayDeque<>();
private ConcurrentLinkedQueue<DelayedClose> serverResourceQueue = new ConcurrentLinkedQueue<>();

private AtomicBoolean empty = new AtomicBoolean();

private final AtomicReference<DelayedShutdown> shutdown = new AtomicReference<>();
private ConcurrentLinkedQueue<DelayedShutdown> shutdownQueue = new ConcurrentLinkedQueue<>();

private final AtomicReference<DelayedClose> close = new AtomicReference<>();

Expand Down Expand Up @@ -85,10 +84,7 @@ public void run() {
boolean shutdownProcessed = false;

while (true) {
ForegroundFutureResponse<?> futureResponse;
synchronized (futureResponseQueue) {
futureResponse = futureResponseQueue.poll();
}
var futureResponse = futureResponseQueue.poll();
if (futureResponse != null) {
try {
var obj = futureResponse.retrieve();
Expand Down Expand Up @@ -119,10 +115,7 @@ public void run() {
continue;
}
}
DelayedClose serverResource;
synchronized (serverResourceQueue) {
serverResource = serverResourceQueue.poll();
}
var serverResource = serverResourceQueue.poll();
if (serverResource != null) {
try {
serverResource.delayedClose();
Expand All @@ -134,18 +127,17 @@ public void run() {
notifyEmpty();
if (!shutdownProcessed) {
try {
var sh = shutdown.get();
if (sh != null) {
sh.shutdown();
shutdownProcessed = true;
while (!shutdownQueue.isEmpty()) {
shutdownQueue.poll().shutdown();
}
shutdownProcessed = true;
} catch (IOException e) {
exception = addSuppressed(exception, e);
}
}
var cl = close.get();
if (cl != null) {
if (shutdownProcessed || shutdown.get() == null) {
if (shutdownProcessed || shutdownQueue.isEmpty()) {
try {
cl.delayedClose();
} catch (ServerException | IOException | InterruptedException e) {
Expand All @@ -162,8 +154,13 @@ public void run() {
}

if (exception != null) {
LOG.info(exception.getMessage());
throw new UncheckedIOException(new IOException(exception));
LOG.error(exception.getMessage());
exception.printStackTrace();
if (exception instanceof IOException) {
throw new UncheckedIOException((IOException) exception);
} else {
throw new UncheckedIOException(new IOException(exception));
}
}
}

Expand All @@ -177,12 +174,10 @@ private Exception addSuppressed(Exception exception, Exception e) {
}

synchronized void add(ForegroundFutureResponse<?> futureResponse) {
if (close.get() != null) {
if (close.get() != null || !shutdownQueue.isEmpty()) {
throw new AssertionError("Session already closed");
}
synchronized (futureResponseQueue) {
futureResponseQueue.add(futureResponse);
}
futureResponseQueue.add(futureResponse);
if (!started.getAndSet(true)) {
this.start();
}
Expand All @@ -194,12 +189,10 @@ synchronized void add(ForegroundFutureResponse<?> futureResponse) {
* @param resource the DelayedClose to be added
*/
public synchronized void add(DelayedClose resource) {
if (close.get() != null) {
if (close.get() != null || !shutdownQueue.isEmpty()) {
throw new AssertionError("Session already closed");
}
synchronized (serverResourceQueue) {
serverResourceQueue.add(resource);
}
serverResourceQueue.add(resource);
if (!started.getAndSet(true)) {
this.start();
}
Expand All @@ -212,13 +205,15 @@ public synchronized void add(DelayedClose resource) {
* @param c the clean up procesure to be registered
* @throws IOException An error was occurred in c.shoutdown() execution.
*/
public synchronized void registerDelayedShutdown(DelayedShutdown c) throws IOException {
if (!started.getAndSet(true)) {
empty.set(true);
c.shutdown();
return;
public void registerDelayedShutdown(DelayedShutdown c) throws IOException {
synchronized (this) {
if (started.getAndSet(true)) {
shutdownQueue.add(c);
return;
}
}
shutdown.set(c);
empty.set(true);
c.shutdown();
}

/**
Expand All @@ -231,22 +226,22 @@ public synchronized void registerDelayedShutdown(DelayedShutdown c) throws IOExc
* @throws InterruptedException if interrupted while disposing the session
*/
public synchronized void registerDelayedClose(DelayedClose c) throws ServerException, IOException, InterruptedException {
if (!started.getAndSet(true)) {
empty.set(true);
c.delayedClose();
return;
}
if (futureResponseQueue.isEmpty() && serverResourceQueue.isEmpty()) {
c.delayedClose();
close.set(new DelayedClose() {
@Override
public void delayedClose() {
// do nothing
synchronized (this) {
if (started.getAndSet(true)) {
if (!futureResponseQueue.isEmpty() || !serverResourceQueue.isEmpty()) {
close.set(c);
return;
}
});
return;
close.set(new DelayedClose() {
@Override
public void delayedClose() {
// do nothing
}
});
}
}
close.set(c);
empty.set(true);
c.delayedClose();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class PreparedStatementImpl implements PreparedStatement {

private final SqlService service;
private final ServerResource.CloseHandler closeHandler;
private final AtomicBoolean added = new AtomicBoolean();
private final AtomicBoolean addedToDisposer = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private long timeout = 0;
private TimeUnit unit;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void setCloseTimeout(long t, TimeUnit u) {
@Override
public void close() throws IOException, ServerException, InterruptedException {
if (disposer != null) {
if (!added.getAndSet(true)) {
if (!addedToDisposer.getAndSet(true)) {
disposer.add(new Disposer.DelayedClose() {
@Override
public void delayedClose() throws ServerException, IOException, InterruptedException {
Expand Down
Loading

0 comments on commit b2869b2

Please sign in to comment.