Skip to content

Commit

Permalink
Improve streaming and aggregation of replay events through the dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
bluemonk3y committed Jun 15, 2017
1 parent e3803b1 commit 1e31499
Showing 1 changed file with 63 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,35 @@ public class ChronicleQReplayAggregator implements ReplayAggregator {

private final static Logger LOGGER = Logger.getLogger(ChronicleQReplayAggregator.class);

private final LogRequest request;
private final LogRequest request;
private String uuid;
private String basePath;
private Chronicle queue = null;
private ConcurrentLinkedQueue<ReplayEvent> writeQueue = new ConcurrentLinkedQueue<>();
private LogReplayHandler replayHandler;

private long startTime = new DateTime().getMillis();
private long startTime = new DateTime().getMillis();

private AtomicBoolean cancelled = new AtomicBoolean(false);
private AtomicBoolean cancelled = new AtomicBoolean(false);
private volatile int sent = 0;

ScheduledExecutorService scheduler = ExecutorService.newScheduledThreadPool("services");
private AtomicInteger size = new AtomicInteger();


private volatile int written;
public String listenerId;
public String listenerId;
private ScheduledFuture<?> future;
private int pausePeriod = Integer.getInteger("q.pause",100);
private volatile AtomicInteger writing = new AtomicInteger();
private int MAX_WRITTEN = Integer.getInteger("replay.max.queue", 1 * 100 * 1000);
private volatile AtomicInteger tried = new AtomicInteger();

volatile static int created = 0;

public ChronicleQReplayAggregator(LogRequest request, LogReplayHandler replayHandler, String handlerId) {
this.request = request;
this.replayHandler = replayHandler;
this.listenerId = request.subscriber() + "_id";
this.request = request;
this.replayHandler = replayHandler;
this.listenerId = request.subscriber() + "_id";

try {
String pid =PIDGetter.getPID();
Expand All @@ -95,7 +95,8 @@ public ChronicleQReplayAggregator(LogRequest request, LogReplayHandler replayHan


public void handle(ReplayEvent event) {
if (cancelled.get() || written++ > MAX_WRITTEN) {
tried.incrementAndGet();
if (cancelled.get()) {
if (!cancelled.get()) cancelled.set(true);
return ;
}
Expand All @@ -108,7 +109,7 @@ public void handle(ReplayEvent event) {
} else {
flushQueue();
}
}
}

private void flushQueue() {
bytes = 0;
Expand Down Expand Up @@ -154,39 +155,48 @@ private void persist(List<ReplayEvent> writeQueue) {
ExcerptTailer reader = null;

public void run(){
try {
while (!cancelled.get() && reader.nextIndex()) {
int sizes = reader.readInt();
byte[] bytess = new byte[sizes];
reader.read(bytess);

List<ReplayEvent> items = (List<ReplayEvent>) Convertor.deserialize(bytess);
reader.finish();
try {
while (!cancelled.get() && reader.nextIndex()) {
int sizes = reader.readInt();
byte[] bytess = new byte[sizes];
reader.read(bytess);

List<ReplayEvent> items = (List<ReplayEvent>) Convertor.deserialize(bytess);
reader.finish();
// System.out.println("RUNNER: " + items.size() + " Size:" + size + " Sent:" + sent + " written:" + written);
size.addAndGet(items.size() * -1);
sent += items.size();
size.addAndGet(items.size() * -1);
sent += items.size();

replayHandler.handle(items);
replayHandler.handle(items);

}
} catch (Throwable t){
}
} catch (Throwable t){
LOGGER.warn("LOGGER" + t.getMessage() + " cancelled:" + cancelled + " sent:" + sent + " incoming" + size);
cancelled.set(true);
if (t.getMessage().contains("RetryInvocationException: SendFailed.Throwable:noSender")) {
replayHandler = null;
}
} finally {
if (cancelled.get() || isExpired() || request.isCancelled()) {
cancel();
}
}
}

public Integer age(long now) {
return Long.valueOf((now - startTime)/1000l).intValue();
}

public void cancel() {
if (t.getMessage().contains("RetryInvocationException: SendFailed.Throwable:noSender")) {
replayHandler = null;
}
} finally {
if (cancelled.get() || isExpired() || request.isCancelled()) {
cancel();
}
}
}

public Integer age(long now) {
return Long.valueOf((now - startTime)/1000l).intValue();
}

public void cancel() {
scheduler.schedule(new Runnable() {
@Override
public void run() {
cancelInternal();
}
}, 3, TimeUnit.SECONDS);
}

public void cancelInternal() {
synchronized (this) {
if (this.queue == null) return;

Expand All @@ -200,7 +210,7 @@ public void cancel() {
}
}
this.cancelled.set(true);
LOGGER.info("LOGGER Cancelled:" + request.subscriber() + " Listener:" + listenerId + " written: " + written + " sent:" + sent + " persistedBytes:" + persistedBytes + " persistedQueues:" + queuedPersisted);
LOGGER.info("LOGGER Cancelled:" + request.subscriber() + " Listener:" + listenerId + " tried:" + tried + " written: " + written + " sent:" + sent + " persistedBytes:" + persistedBytes + " persistedQueues:" + queuedPersisted);
// try and coordinate the shutdown
int waiting = 0;
while (writing.get() > 0 && waiting++ < Integer.getInteger("replay.agg.wait.count", 100)) {
Expand Down Expand Up @@ -230,7 +240,7 @@ public void cancel() {
deleteQueueFiles();
}
}
}
}
boolean closed = false;
public void close() {
if (!closed) {
Expand All @@ -250,17 +260,17 @@ private void deleteQueueFiles() {
}

public boolean isExpired() {
return replayHandler == null || this.request.isExpired();
}
return replayHandler == null || this.request.isExpired();
}

public boolean isRunnable() {
boolean itemsQueued = queue.size() > 0;
return itemsQueued && !request.isCancelled() && !request.isExpired() ;
}
public boolean isRunnable() {
boolean itemsQueued = queue.size() > 0;
return itemsQueued && !request.isCancelled() && !request.isExpired() ;
}

public int size() {
return size.get();
}
public int size() {
return size.get();
}

@Override

Expand All @@ -280,4 +290,8 @@ public void flush() {
public LogReplayHandler getReplayHandler() {
return replayHandler;
}

public String toString() {
return this.getClass().getSimpleName() +" :" + request.subscriber() + " Listener:" + listenerId + " tried:" + tried + " written: " + written + " sent:" + sent + " persistedBytes:" + persistedBytes + " persistedQueues:" + queuedPersisted;
}
}

0 comments on commit 1e31499

Please sign in to comment.