Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ian UI d2 1728 identity buckets oom #492

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.uid2.operator.service;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;

public interface IModifiedBucketEncryptStream extends ReadStream<Buffer>, WriteStream<Buffer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to check UID2 Java Coding Guidelines in conf

IModifiedBucketEncryptStream exceptionHandler(Handler<Throwable> handler);

Future<Void> write(Buffer buffer);

void write(Buffer buffer, Handler<AsyncResult<Void>> handler);

void end(Handler<AsyncResult<Void>> handler);

WriteStream<Buffer> setWriteQueueMaxSize(int i);

boolean writeQueueFull();

WriteStream<Buffer> drainHandler(Handler<Void> handler);

ReadStream<Buffer> handler(Handler<Buffer> handler);

ReadStream<Buffer> pause();

ReadStream<Buffer> resume();

ReadStream<Buffer> fetch(long l);

ReadStream<Buffer> endHandler(Handler<Void> handler);

@Override
default Pipe<Buffer> pipe() {
return ReadStream.super.pipe();
}

@Override
default Future<Void> pipeTo(WriteStream<Buffer> dst) {
return ReadStream.super.pipeTo(dst);
}

@Override
default void pipeTo(WriteStream<Buffer> dst, Handler<AsyncResult<Void>> handler) {
ReadStream.super.pipeTo(dst, handler);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package com.uid2.operator.service;

import com.uid2.shared.Utils;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;


public class ModifiedBucketEncodeStream implements IModifiedBucketEncryptStream {
private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketEncodeStream.class);

private final Context context;

private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
private Handler<Buffer> dataHandler;
private Handler<Void> drainHandler;

private boolean readInProgress;
private boolean incomingStreamEnded = false;
private boolean outgoingStreamEnded = false;
private long maxBufferSizeBytes = 5242880; // 5 MB
private long demand = Long.MAX_VALUE;

Buffer data;

public ModifiedBucketEncodeStream(Context context) {
this.context = context;
this.data = Buffer.buffer();
}

@Override
public synchronized IModifiedBucketEncryptStream exceptionHandler(Handler<Throwable> handler) {
this.exceptionHandler = handler;
return this;
}

@Override
public synchronized Future<Void> write(Buffer buffer) {
synchronized (this) {
data.appendBuffer(buffer);
}
return Future.succeededFuture();
}

@Override
public synchronized void write(Buffer buffer, Handler<AsyncResult<Void>> handler) {
synchronized (this) {
data.appendBuffer(buffer);
}
succeededAsyncResult(handler);
}

private void succeededAsyncResult(Handler<AsyncResult<Void>> handler) {
handler.handle(new AsyncResult<>() {
@Override
public Void result() {
return null;
}

@Override
public Throwable cause() {
return null;
}

@Override
public boolean succeeded() {
return true;
}

@Override
public boolean failed() {
return false;
}
});
}

@Override
public synchronized void end(Handler<AsyncResult<Void>> handler) {
this.incomingStreamEnded = true;
succeededAsyncResult(handler);
}

@Override
public synchronized WriteStream<Buffer> setWriteQueueMaxSize(int i) {
maxBufferSizeBytes = i;
return this;
}

@Override
public synchronized boolean writeQueueFull() {
return data.length() > maxBufferSizeBytes;
}

@Override
public synchronized WriteStream<Buffer> drainHandler(Handler<Void> handler) {
this.drainHandler = handler;
return this;
}

// ReadStream methods

@Override
public synchronized ReadStream<Buffer> handler(Handler<Buffer> handler) {
this.dataHandler = handler;
if (handler != null && demand > 0) {
read();
}
return this;
}

@Override
public synchronized ReadStream<Buffer> pause() {
this.demand = 0;
return this;
}

@Override
public synchronized ReadStream<Buffer> resume() {
fetch(Long.MAX_VALUE);
return this;
}

@Override
public synchronized ReadStream<Buffer> fetch(long amount) {
demand = Long.MAX_VALUE - amount >= demand ? demand + amount : Long.MAX_VALUE;
read();
return this;
}

@Override
public synchronized ReadStream<Buffer> endHandler(Handler<Void> handler) {
this.endHandler = handler;
return this;
}

private void read() {
if (this.readInProgress) {
if (!incomingStreamEnded || !outgoingStreamEnded) {
this.context.runOnContext(v -> this.read());
}
return;
}

if (demand <= 0) {
return;
}
demand--;

this.readInProgress = true;

this.context.executeBlocking(() -> {
String chunk = "";
if (data.length() == 0) {
return chunk;
}

synchronized (this) {
if (data.length() % 3 == 0 || incomingStreamEnded) {
chunk = Utils.toBase64String(data.getBytes());
data = Buffer.buffer();
outgoingStreamEnded = true;
} else if ((data.length() - 1) % 3 == 0) {
chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 1));
data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 1, data.length()));
} else {
chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 2));
data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 2, data.length()));
}
return chunk;
}
}, asyncResult -> {
this.dataHandler.handle(Buffer.buffer(asyncResult.result()));
this.readInProgress = false;
scheduleNextRead();
});
}

private synchronized void scheduleNextRead() {
if (demand > 0 && (!incomingStreamEnded || !outgoingStreamEnded)) {
context.runOnContext(unused -> read());
} else if (outgoingStreamEnded && endHandler != null) {
endHandler.handle(null);
}
}
}
Loading
Loading