diff --git a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java index 5b440dc..bddd9f8 100644 --- a/src/main/java/org/fluentd/logger/sender/RawSocketSender.java +++ b/src/main/java/org/fluentd/logger/sender/RawSocketSender.java @@ -82,10 +82,10 @@ private void connect() throws IOException { } } - private void reconnect(boolean forceReconnection) throws IOException { + private void reconnect() throws IOException { if (socket == null) { connect(); - } else if (forceReconnection || socket.isClosed() || (!socket.isConnected())) { + } else if (socket.isClosed() || (!socket.isConnected())) { close(); connect(); } @@ -178,7 +178,7 @@ private synchronized boolean send(byte[] bytes) { public synchronized void flush() { try { // check whether connection is established or not - reconnect(!reconnector.isErrorHistoryEmpty()); + reconnect(); // write data out.write(getBuffer()); out.flush(); @@ -187,6 +187,7 @@ public synchronized void flush() { } catch (IOException e) { LOG.error(this.getClass().getName(), "flush", e); reconnector.addErrorHistory(System.currentTimeMillis()); + close(); } } diff --git a/src/test/java/org/fluentd/logger/TestFluentLogger.java b/src/test/java/org/fluentd/logger/TestFluentLogger.java index 3a9d7b2..abfe7d1 100644 --- a/src/test/java/org/fluentd/logger/TestFluentLogger.java +++ b/src/test/java/org/fluentd/logger/TestFluentLogger.java @@ -3,7 +3,6 @@ import org.fluentd.logger.sender.Event; import org.fluentd.logger.sender.NullSender; import org.fluentd.logger.util.MockFluentd; -import org.junit.Ignore; import org.junit.Test; import org.msgpack.MessagePack; import org.msgpack.unpacker.Unpacker; @@ -20,6 +19,8 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestFluentLogger { private Logger _logger = LoggerFactory.getLogger(TestFluentLogger.class); @@ -227,12 +228,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { // start loggers FluentLogger logger = FluentLogger.getLogger("testtag", host, port); + assertFalse(logger.isConnected()); { Map data = new HashMap(); data.put("k1", "v1"); data.put("k2", "v2"); logger.log("test01", data); } + assertTrue(logger.isConnected()); TimeUnit.MILLISECONDS.sleep(500); _logger.info("Closing the current fluentd instance"); @@ -240,13 +243,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { fluentd1.close(); TimeUnit.MILLISECONDS.sleep(500); - + assertTrue(logger.isConnected()); { Map data = new HashMap(); data.put("k3", "v3"); data.put("k4", "v4"); logger.log("test01", data); } + assertFalse(logger.isConnected()); final List elist2 = new ArrayList(); MockFluentd fluentd2 = new MockFluentd(port, new MockFluentd.MockProcess() { @@ -274,6 +278,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { data.put("k6", "v6"); logger.log("test01", data); } + assertTrue(logger.isConnected()); // close loggers FluentLogger.closeAll(); @@ -281,12 +286,10 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { fluentd2.close(); - // wait for unpacking event data on fluentd TimeUnit.MILLISECONDS.sleep(2000); threadManager.join(); - // check data assertEquals(1, elist1.size()); assertEquals("testtag.test01", elist1.get(0).tag); @@ -361,6 +364,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { final FluentLogger logger = FluentLogger.getLogger(null, host, port); ExecutorService executorService = Executors.newFixedThreadPool(N); + /* + * Each thread emits the following events LOOP times + * Thread#0: {'0' => 0} + * Thread#1: {'0' => 0, '1' => 1} + * Thread#2: {'0' => 0, '1' => 1, '2' => 2} + * : + * Thread#(N-1): {'0' => 0, '1' => 1, '2' => 2 ... '(N-1)' => (N-1)} + */ for (int i = 0; i < N; i++) { final int ii = i; executorService.execute(new Runnable() { diff --git a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java index ef234a5..11e9592 100644 --- a/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -275,9 +276,11 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { fluentd.start(); Sender sender = new RawSocketSender("localhost", port); + assertFalse(sender.isConnected()); Map data = new HashMap(); data.put("key0", "v0"); sender.emit("tag0", data); + assertTrue(sender.isConnected()); // close fluentd to make the next sending failed TimeUnit.MILLISECONDS.sleep(500); @@ -289,6 +292,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { data = new HashMap(); data.put("key0", "v1"); sender.emit("tag0", data); + assertFalse(sender.isConnected()); // wait to avoid the suppression of reconnection TimeUnit.MILLISECONDS.sleep(500);