From 98f1a2ed1683fad4407e7ca948440bfca4d0e379 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 10 Mar 2016 05:19:32 +1100 Subject: [PATCH] unsuccessful fix for #21 --- .../unixsocket/UnixServerSocketChannel.java | 9 +- .../jnr/unixsocket/AcceptInterruptTest.java | 118 ++++++++++++++++++ .../jnr/unixsocket/example/UnixServer.java | 94 +++++++++----- 3 files changed, 191 insertions(+), 30 deletions(-) create mode 100644 src/test/java/jnr/unixsocket/AcceptInterruptTest.java diff --git a/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java b/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java index aa85ff7..b15f15c 100644 --- a/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java +++ b/src/main/java/jnr/unixsocket/UnixServerSocketChannel.java @@ -53,7 +53,14 @@ public UnixSocketChannel accept() throws IOException { SockAddrUnix addr = remote.getStruct(); IntByReference len = new IntByReference(addr.getMaximumLength()); - int clientfd = Native.accept(getFD(), addr, len); + int clientfd=-1; + try { + begin(); + clientfd = Native.accept(getFD(), addr, len); + } + finally { + end(clientfd>=0); + } if (clientfd < 0) { throw new IOException("accept failed: " + Native.getLastErrorString()); diff --git a/src/test/java/jnr/unixsocket/AcceptInterruptTest.java b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java new file mode 100644 index 0000000..2bd1824 --- /dev/null +++ b/src/test/java/jnr/unixsocket/AcceptInterruptTest.java @@ -0,0 +1,118 @@ +package jnr.unixsocket; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import jnr.unixsocket.UnixServerSocketChannel; +import jnr.unixsocket.UnixSocketAddress; +import junit.framework.Assert; + +import org.junit.Test; + +public class AcceptInterruptTest +{ + @Test + public void testAcceptCloseInterrupt() throws Exception + { + File file = File.createTempFile("test", ".sock"); + file.delete(); + file.deleteOnExit(); + + final UnixServerSocketChannel channel = UnixServerSocketChannel.open(); + channel.socket().bind(new UnixSocketAddress(file)); + + final AtomicBoolean run = new AtomicBoolean(true); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch complete = new CountDownLatch(1); + Thread accept = new Acceptor(complete, start, channel, run); + + // Start accepting thread + accept.setDaemon(true); + accept.start(); + Assert.assertTrue(start.await(5,TimeUnit.SECONDS)); + + // Mark as no longer running + run.set(false); + + // Close and Interrupt + channel.close(); + accept.interrupt(); + Assert.assertTrue(complete.await(5,TimeUnit.SECONDS)); + } + + @Test + public void testAcceptInterrupt() throws Exception + { + File file = File.createTempFile("test", ".sock"); + file.delete(); + file.deleteOnExit(); + + final UnixServerSocketChannel channel = UnixServerSocketChannel.open(); + channel.socket().bind(new UnixSocketAddress(file)); + + final AtomicBoolean run = new AtomicBoolean(true); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch complete = new CountDownLatch(1); + Thread accept = new Acceptor(complete, start, channel, run); + + // Start accepting thread + accept.setDaemon(true); + accept.start(); + Assert.assertTrue(start.await(5,TimeUnit.SECONDS)); + + // Mark as no longer running + run.set(false); + + accept.interrupt(); + Assert.assertTrue(complete.await(5,TimeUnit.SECONDS)); + + } + + + private final class Acceptor extends Thread { + private final CountDownLatch complete; + private final CountDownLatch start; + private final UnixServerSocketChannel channel; + private final AtomicBoolean run; + + private Acceptor(CountDownLatch complete, CountDownLatch start, UnixServerSocketChannel channel, + AtomicBoolean run) { + this.complete = complete; + this.start = start; + this.channel = channel; + this.run = run; + } + + @Override public void run() + { + try + { + while(run.get()) + { + if (start.getCount()>0) + start.countDown(); + try + { + channel.accept(); + System.err.println("accepted"); + } + catch (IOException e) + { + e.printStackTrace(); + } + finally + { + System.err.println("finally"); + } + } + } + finally + { + complete.countDown(); + } + } + } +} diff --git a/src/test/java/jnr/unixsocket/example/UnixServer.java b/src/test/java/jnr/unixsocket/example/UnixServer.java index 2358d42..536d604 100644 --- a/src/test/java/jnr/unixsocket/example/UnixServer.java +++ b/src/test/java/jnr/unixsocket/example/UnixServer.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.Iterator; import java.util.Set; import java.util.Iterator; import java.util.logging.Level; @@ -30,84 +31,119 @@ import jnr.unixsocket.UnixSocketAddress; import jnr.unixsocket.UnixSocketChannel; -public class UnixServer { +public class UnixServer +{ - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws IOException + { java.io.File path = new java.io.File("/tmp/fubar.sock"); path.deleteOnExit(); UnixSocketAddress address = new UnixSocketAddress(path); UnixServerSocketChannel channel = UnixServerSocketChannel.open(); - try { + try + { Selector sel = NativeSelectorProvider.getInstance().openSelector(); channel.configureBlocking(false); channel.socket().bind(address); - channel.register(sel, SelectionKey.OP_ACCEPT, new ServerActor(channel, sel)); + channel.register(sel,SelectionKey.OP_ACCEPT,new ServerActor(channel,sel)); - while (sel.select() > 0) { + System.err.printf("Selecting ...%n"); + while (sel.select() > 0) + { Set keys = sel.selectedKeys(); - Iterator iterator = keys.iterator(); - while ( iterator.hasNext() ) { - SelectionKey k = iterator.next(); - Actor a = (Actor) k.attachment(); - if (!a.rxready()) { + System.err.printf("Selected %d%n",keys.size()); + + for (Iterator i = keys.iterator();i.hasNext();) + { + SelectionKey k = i.next(); + i.remove(); + Actor a = (Actor)k.attachment(); + System.err.printf("Key %s actor=%s%n",k,a); + if (!a.rxready()) + { k.cancel(); } iterator.remove(); } + System.err.printf("selecting ...%n"); } - } catch (IOException ex) { - Logger.getLogger(UnixServerSocket.class.getName()).log(Level.SEVERE, null, ex); + } + catch (IOException ex) + { + Logger.getLogger(UnixServerSocket.class.getName()).log(Level.SEVERE,null,ex); } } - static interface Actor { + static interface Actor + { public boolean rxready(); } - static final class ServerActor implements Actor { + static final class ServerActor implements Actor + { private final UnixServerSocketChannel channel; private final Selector selector; - public ServerActor(UnixServerSocketChannel channel, Selector selector) { + public ServerActor(UnixServerSocketChannel channel, Selector selector) + { this.channel = channel; this.selector = selector; } - public final boolean rxready() { - try { + + public final boolean rxready() + { + try + { + System.err.printf("%x rxready()%n",hashCode()); UnixSocketChannel client = channel.accept(); client.configureBlocking(false); - client.register(selector, SelectionKey.OP_READ, new ClientActor(client)); + ClientActor actor = new ClientActor(client); + System.err.printf("%x accepted%n",actor.hashCode()); + client.register(selector,SelectionKey.OP_READ,actor); return true; - } catch (IOException ex) { + } + catch (IOException ex) + { + ex.printStackTrace(); return false; } } } - static final class ClientActor implements Actor { + + static final class ClientActor implements Actor + { private final UnixSocketChannel channel; - public ClientActor(UnixSocketChannel channel) { + public ClientActor(UnixSocketChannel channel) + { this.channel = channel; } - public final boolean rxready() { - try { + public final boolean rxready() + { + try + { ByteBuffer buf = ByteBuffer.allocate(1024); int n = channel.read(buf); - UnixSocketAddress remote = channel.getRemoteSocketAddress(); - System.out.printf("Read in %d bytes from %s\n", n, remote); - - if (n > 0) { + UnixSocketAddress local = channel.getLocalSocketAddress(); + System.err.printf("%x Read in %d bytes from %s%n",hashCode(),n,local); + + if (n > 0) + { buf.flip(); channel.write(buf); return true; - } else if (n < 0) { + } + else if (n < 0) + { return false; } - } catch (IOException ex) { + } + catch (IOException ex) + { ex.printStackTrace(); return false; }