Skip to content

Commit

Permalink
unsuccessful fix for jnr#21
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Mar 9, 2016
1 parent 52cd420 commit 98f1a2e
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 30 deletions.
9 changes: 8 additions & 1 deletion src/main/java/jnr/unixsocket/UnixServerSocketChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ public UnixSocketChannel accept() throws IOException {
SockAddrUnix addr = remote.getStruct();
IntByReference len = new IntByReference(addr.getMaximumLength());

int clientfd = Native.accept(getFD(), addr, len);
int clientfd=-1;
try {
begin();
clientfd = Native.accept(getFD(), addr, len);
}
finally {
end(clientfd>=0);
}

if (clientfd < 0) {
throw new IOException("accept failed: " + Native.getLastErrorString());
Expand Down
118 changes: 118 additions & 0 deletions src/test/java/jnr/unixsocket/AcceptInterruptTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package jnr.unixsocket;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import jnr.unixsocket.UnixServerSocketChannel;
import jnr.unixsocket.UnixSocketAddress;
import junit.framework.Assert;

import org.junit.Test;

public class AcceptInterruptTest
{
@Test
public void testAcceptCloseInterrupt() throws Exception
{
File file = File.createTempFile("test", ".sock");
file.delete();
file.deleteOnExit();

final UnixServerSocketChannel channel = UnixServerSocketChannel.open();
channel.socket().bind(new UnixSocketAddress(file));

final AtomicBoolean run = new AtomicBoolean(true);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch complete = new CountDownLatch(1);
Thread accept = new Acceptor(complete, start, channel, run);

// Start accepting thread
accept.setDaemon(true);
accept.start();
Assert.assertTrue(start.await(5,TimeUnit.SECONDS));

// Mark as no longer running
run.set(false);

// Close and Interrupt
channel.close();
accept.interrupt();
Assert.assertTrue(complete.await(5,TimeUnit.SECONDS));
}

@Test
public void testAcceptInterrupt() throws Exception
{
File file = File.createTempFile("test", ".sock");
file.delete();
file.deleteOnExit();

final UnixServerSocketChannel channel = UnixServerSocketChannel.open();
channel.socket().bind(new UnixSocketAddress(file));

final AtomicBoolean run = new AtomicBoolean(true);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch complete = new CountDownLatch(1);
Thread accept = new Acceptor(complete, start, channel, run);

// Start accepting thread
accept.setDaemon(true);
accept.start();
Assert.assertTrue(start.await(5,TimeUnit.SECONDS));

// Mark as no longer running
run.set(false);

accept.interrupt();
Assert.assertTrue(complete.await(5,TimeUnit.SECONDS));

}


private final class Acceptor extends Thread {
private final CountDownLatch complete;
private final CountDownLatch start;
private final UnixServerSocketChannel channel;
private final AtomicBoolean run;

private Acceptor(CountDownLatch complete, CountDownLatch start, UnixServerSocketChannel channel,
AtomicBoolean run) {
this.complete = complete;
this.start = start;
this.channel = channel;
this.run = run;
}

@Override public void run()
{
try
{
while(run.get())
{
if (start.getCount()>0)
start.countDown();
try
{
channel.accept();
System.err.println("accepted");
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
System.err.println("finally");
}
}
}
finally
{
complete.countDown();
}
}
}
}
94 changes: 65 additions & 29 deletions src/test/java/jnr/unixsocket/example/UnixServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.Iterator;
import java.util.logging.Level;
Expand All @@ -30,84 +31,119 @@
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;

public class UnixServer {
public class UnixServer
{

public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException
{
java.io.File path = new java.io.File("/tmp/fubar.sock");
path.deleteOnExit();
UnixSocketAddress address = new UnixSocketAddress(path);
UnixServerSocketChannel channel = UnixServerSocketChannel.open();

try {
try
{
Selector sel = NativeSelectorProvider.getInstance().openSelector();
channel.configureBlocking(false);
channel.socket().bind(address);
channel.register(sel, SelectionKey.OP_ACCEPT, new ServerActor(channel, sel));
channel.register(sel,SelectionKey.OP_ACCEPT,new ServerActor(channel,sel));

while (sel.select() > 0) {
System.err.printf("Selecting ...%n");
while (sel.select() > 0)
{
Set<SelectionKey> keys = sel.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while ( iterator.hasNext() ) {
SelectionKey k = iterator.next();
Actor a = (Actor) k.attachment();
if (!a.rxready()) {
System.err.printf("Selected %d%n",keys.size());

for (Iterator<SelectionKey> i = keys.iterator();i.hasNext();)
{
SelectionKey k = i.next();
i.remove();
Actor a = (Actor)k.attachment();
System.err.printf("Key %s actor=%s%n",k,a);
if (!a.rxready())
{
k.cancel();
}
iterator.remove();
}
System.err.printf("selecting ...%n");
}
} catch (IOException ex) {
Logger.getLogger(UnixServerSocket.class.getName()).log(Level.SEVERE, null, ex);
}
catch (IOException ex)
{
Logger.getLogger(UnixServerSocket.class.getName()).log(Level.SEVERE,null,ex);
}

}

static interface Actor {
static interface Actor
{
public boolean rxready();
}

static final class ServerActor implements Actor {
static final class ServerActor implements Actor
{
private final UnixServerSocketChannel channel;
private final Selector selector;

public ServerActor(UnixServerSocketChannel channel, Selector selector) {
public ServerActor(UnixServerSocketChannel channel, Selector selector)
{
this.channel = channel;
this.selector = selector;
}
public final boolean rxready() {
try {

public final boolean rxready()
{
try
{
System.err.printf("%x rxready()%n",hashCode());
UnixSocketChannel client = channel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ, new ClientActor(client));
ClientActor actor = new ClientActor(client);
System.err.printf("%x accepted%n",actor.hashCode());
client.register(selector,SelectionKey.OP_READ,actor);
return true;
} catch (IOException ex) {
}
catch (IOException ex)
{
ex.printStackTrace();
return false;
}
}
}
static final class ClientActor implements Actor {

static final class ClientActor implements Actor
{
private final UnixSocketChannel channel;

public ClientActor(UnixSocketChannel channel) {
public ClientActor(UnixSocketChannel channel)
{
this.channel = channel;
}

public final boolean rxready() {
try {
public final boolean rxready()
{
try
{
ByteBuffer buf = ByteBuffer.allocate(1024);
int n = channel.read(buf);
UnixSocketAddress remote = channel.getRemoteSocketAddress();
System.out.printf("Read in %d bytes from %s\n", n, remote);

if (n > 0) {
UnixSocketAddress local = channel.getLocalSocketAddress();
System.err.printf("%x Read in %d bytes from %s%n",hashCode(),n,local);

if (n > 0)
{
buf.flip();
channel.write(buf);
return true;
} else if (n < 0) {
}
else if (n < 0)
{
return false;
}

} catch (IOException ex) {
}
catch (IOException ex)
{
ex.printStackTrace();
return false;
}
Expand Down

0 comments on commit 98f1a2e

Please sign in to comment.