Skip to content

Commit

Permalink
Merge pull request #1 from fluent/master
Browse files Browse the repository at this point in the history
pull master
  • Loading branch information
mxk1235 committed Sep 3, 2014
2 parents 5e8f2c2 + cc34053 commit cb20c25
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
7 changes: 4 additions & 3 deletions src/main/java/org/fluentd/logger/sender/RawSocketSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ private void connect() throws IOException {
}
}

private void reconnect(boolean forceReconnection) throws IOException {
private void reconnect() throws IOException {
if (socket == null) {
connect();
} else if (forceReconnection || socket.isClosed() || (!socket.isConnected())) {
} else if (socket.isClosed() || (!socket.isConnected())) {
close();
connect();
}
Expand Down Expand Up @@ -178,7 +178,7 @@ private synchronized boolean send(byte[] bytes) {
public synchronized void flush() {
try {
// check whether connection is established or not
reconnect(!reconnector.isErrorHistoryEmpty());
reconnect();
// write data
out.write(getBuffer());
out.flush();
Expand All @@ -187,6 +187,7 @@ public synchronized void flush() {
} catch (IOException e) {
LOG.error(this.getClass().getName(), "flush", e);
reconnector.addErrorHistory(System.currentTimeMillis());
close();
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/test/java/org/fluentd/logger/TestFluentLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.fluentd.logger.sender.Event;
import org.fluentd.logger.sender.NullSender;
import org.fluentd.logger.util.MockFluentd;
import org.junit.Ignore;
import org.junit.Test;
import org.msgpack.MessagePack;
import org.msgpack.unpacker.Unpacker;
Expand All @@ -20,6 +19,8 @@
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class TestFluentLogger {
private Logger _logger = LoggerFactory.getLogger(TestFluentLogger.class);
Expand Down Expand Up @@ -227,26 +228,29 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {

// start loggers
FluentLogger logger = FluentLogger.getLogger("testtag", host, port);
assertFalse(logger.isConnected());
{
Map<String, Object> data = new HashMap<String, Object>();
data.put("k1", "v1");
data.put("k2", "v2");
logger.log("test01", data);
}
assertTrue(logger.isConnected());

TimeUnit.MILLISECONDS.sleep(500);
_logger.info("Closing the current fluentd instance");
fluentd1.closeClientSockets();
fluentd1.close();

TimeUnit.MILLISECONDS.sleep(500);

assertTrue(logger.isConnected());
{
Map<String, Object> data = new HashMap<String, Object>();
data.put("k3", "v3");
data.put("k4", "v4");
logger.log("test01", data);
}
assertFalse(logger.isConnected());

final List<Event> elist2 = new ArrayList<Event>();
MockFluentd fluentd2 = new MockFluentd(port, new MockFluentd.MockProcess() {
Expand Down Expand Up @@ -274,19 +278,18 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
data.put("k6", "v6");
logger.log("test01", data);
}
assertTrue(logger.isConnected());

// close loggers
FluentLogger.closeAll();
Thread.sleep(2000);

fluentd2.close();


// wait for unpacking event data on fluentd
TimeUnit.MILLISECONDS.sleep(2000);
threadManager.join();


// check data
assertEquals(1, elist1.size());
assertEquals("testtag.test01", elist1.get(0).tag);
Expand Down Expand Up @@ -361,6 +364,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {

final FluentLogger logger = FluentLogger.getLogger(null, host, port);
ExecutorService executorService = Executors.newFixedThreadPool(N);
/*
* Each thread emits the following events LOOP times
* Thread#0: {'0' => 0}
* Thread#1: {'0' => 0, '1' => 1}
* Thread#2: {'0' => 0, '1' => 1, '2' => 2}
* :
* Thread#(N-1): {'0' => 0, '1' => 1, '2' => 2 ... '(N-1)' => (N-1)}
*/
for (int i = 0; i < N; i++) {
final int ii = i;
executorService.execute(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;


Expand Down Expand Up @@ -275,9 +276,11 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
fluentd.start();

Sender sender = new RawSocketSender("localhost", port);
assertFalse(sender.isConnected());
Map<String, Object> data = new HashMap<String, Object>();
data.put("key0", "v0");
sender.emit("tag0", data);
assertTrue(sender.isConnected());

// close fluentd to make the next sending failed
TimeUnit.MILLISECONDS.sleep(500);
Expand All @@ -289,6 +292,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
data = new HashMap<String, Object>();
data.put("key0", "v1");
sender.emit("tag0", data);
assertFalse(sender.isConnected());

// wait to avoid the suppression of reconnection
TimeUnit.MILLISECONDS.sleep(500);
Expand Down

0 comments on commit cb20c25

Please sign in to comment.