Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc improvement #800

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public BrpcAgentController(AgentControllerConfig config) {
.applicationName(appInstance.getQualifiedName())
.clientId("ctrl")
.server(new RoundRobinEndPointProvider(endpoints))
.workerThreads(2)
.ioThreads(2)
.maxRetry(3)
.retryInterval(Duration.ofSeconds(2))
.connectionTimeout(config.getClient().getConnectionTimeout())
.retryBackOff(Duration.ofSeconds(2))
.connectionTimeout(Duration.ofMillis(config.getClient().getConnectionTimeout()))
.header(Headers.HEADER_VERSION, AgentBuildVersion.getString())
.header(Headers.HEADER_START_TIME, String.valueOf(ManagementFactory.getRuntimeMXBean().getStartTime()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public BrpcEventMessageChannel(DispatcherConfig dispatcherConfig) {
.clientId("event")
.server(new RoundRobinEndPointProvider(endpoints))
.maxRetry(3)
.retryInterval(Duration.ofMillis(200))
.connectionTimeout(dispatcherConfig.getClient().getConnectionTimeout())
.retryBackOff(Duration.ofMillis(200))
.connectionTimeout(Duration.ofMillis(dispatcherConfig.getClient().getConnectionTimeout()))
.build();

this.dispatcherConfig = dispatcherConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public BrpcMetricMessageChannel(DispatcherConfig dispatcherConfig) {
.clientId("metrics")
.server(new RoundRobinEndPointProvider(endpoints))
.maxRetry(3)
.retryInterval(Duration.ofMillis(100))
.connectionTimeout(dispatcherConfig.getClient().getConnectionTimeout())
.retryBackOff(Duration.ofMillis(100))
.connectionTimeout(Duration.ofMillis(dispatcherConfig.getClient().getConnectionTimeout()))
.build();
this.dispatcherConfig = dispatcherConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public BrpcTraceMessageChannel(DispatcherConfig dispatcherConfig) {
.clientId("trace")
.server(new RoundRobinEndPointProvider(endpoints))
.maxRetry(3)
.retryInterval(Duration.ofMillis(200))
.connectionTimeout(dispatcherConfig.getClient().getConnectionTimeout())
.retryBackOff(Duration.ofMillis(200))
.connectionTimeout(Duration.ofMillis(dispatcherConfig.getClient().getConnectionTimeout()))
.build();

this.dispatcherConfig = dispatcherConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.bithon.agent.observability.dispatcher.config.DispatcherConfig;
import org.bithon.agent.observability.dispatcher.task.BlockingQueue;
import org.bithon.agent.observability.dispatcher.task.DispatchTask;
import org.bithon.agent.observability.dispatcher.task.IMessageQueue;
import org.bithon.agent.observability.dispatcher.task.IThreadSafeQueue;
import org.bithon.component.commons.logging.ILogAdaptor;
import org.bithon.component.commons.logging.LoggerFactory;

Expand Down Expand Up @@ -112,7 +112,7 @@ private synchronized void startTask(int port) {
messageChannel::sendMessage);
}

private IMessageQueue createQueue(DispatcherConfig config) {
private IThreadSafeQueue createQueue(DispatcherConfig config) {
return new BlockingQueue(config.getQueueSize());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class DispatcherConfig {
private Map<String, Boolean> messageDebug = Collections.emptyMap();

public enum QueueFullStrategy {
DISCARD,
DISCARD_NEWEST,
DISCARD_OLDEST
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
* @author Frank Chen
* @date 19/4/23 10:31 pm
*/
public class BatchMessageQueue implements IMessageQueue {
public class BatchMessageQueue implements IThreadSafeQueue {

private final IMessageQueue delegate;
private final IThreadSafeQueue delegate;

private final int batchSize;
private List<Object> fetched;
Expand All @@ -39,7 +39,7 @@ public int getBatchSize() {
return batchSize;
}

public BatchMessageQueue(IMessageQueue delegate, int batchSize) {
public BatchMessageQueue(IThreadSafeQueue delegate, int batchSize) {
this.delegate = delegate;
this.batchSize = batchSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @author [email protected]
* @date 2021/5/14 10:46 上午
*/
public class BlockingQueue implements IMessageQueue {
public class BlockingQueue implements IThreadSafeQueue {
private final LinkedBlockingQueue<Object> queue;
private final int capacity;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ public class DispatchTask {
private static final ILogAdaptor LOG = LoggerFactory.getLogger(DispatchTask.class);

private final Consumer<Object> underlyingSender;
private final IMessageQueue queue;
private final IThreadSafeQueue queue;
private final DispatcherConfig.QueueFullStrategy queueFullStrategy;
private volatile boolean isRunning = true;
private volatile boolean isTaskEnd = false;
private volatile boolean isTaskEnded = false;

/**
* in millisecond
*/
private final long flushTime;

public DispatchTask(String taskName,
IMessageQueue queue,
IThreadSafeQueue queue,
DispatcherConfig config,
Consumer<Object> underlyingSender) {
this.flushTime = Math.max(10, config.getFlushTime());
Expand All @@ -54,7 +54,7 @@ public DispatchTask(String taskName,
while (isRunning) {
dispatch(true);
}
isTaskEnd = true;
isTaskEnded = true;
}, taskName + "-sender").start();
}

Expand Down Expand Up @@ -85,6 +85,9 @@ private void dispatch(boolean waitIfEmpty) {
}
}

/**
* User code might call this public method in multiple threads, thread-safe must be guaranteed.
*/
public void accept(Object message) {
if (!isRunning) {
return;
Expand All @@ -97,15 +100,19 @@ public void accept(Object message) {
// but because the underlying queue is already a concurrency-supported structure,
// adding such a lock to solve this edge case does not gain much
//
if (DispatcherConfig.QueueFullStrategy.DISCARD.equals(this.queueFullStrategy)) {
// The return is ignored if the 'offer' fails to run
if (DispatcherConfig.QueueFullStrategy.DISCARD_NEWEST.equals(this.queueFullStrategy)) {
// The 'message' will be discarded if the queue is full
this.queue.offer(message);
} else if (DispatcherConfig.QueueFullStrategy.DISCARD_OLDEST.equals(this.queueFullStrategy)) {
// Discard the oldest in the queue
int discarded = 0;
while (!queue.offer(message)) {
LOG.error("Failed offer element to the queue, capacity = {}. Discarding the oldest...", this.queue.capacity());
discarded++;
queue.pop();
}
if (discarded > 0) {
LOG.error("Failed offer element to the queue, capacity = {}. Discarded the {} oldest entry", this.queue.capacity(), discarded);
}
} else {
throw new UnsupportedOperationException("Not supported now");
}
Expand All @@ -116,7 +123,7 @@ public void stop() {
isRunning = false;

// Wait for the send task to complete
while (!isTaskEnd) {
while (!isTaskEnded) {
try {
Thread.sleep(50);
} catch (InterruptedException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* @author frankchen
*/
public interface IMessageQueue {
public interface IThreadSafeQueue {

boolean offer(Object items);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class BlockingQueueTest {

static class QueueTestDelegation {
private final IMessageQueue queue;
private final IThreadSafeQueue queue;
private long elapsed = 0;
private Object takenObject;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private void addService(Class<?> interfaceType, Object serviceImpl) {
Map<String, ServiceInvoker> registryPerService = registry.computeIfAbsent(registryItem.getServiceName(),
v -> new ConcurrentHashMap<>(7));
ServiceInvoker existingRegistry = registryPerService.putIfAbsent(registryItem.getMethodName(),
new ServiceInvoker(serviceImpl, method, registryItem.isOneway()));
new ServiceInvoker(registryItem, serviceImpl, method));
if (existingRegistry != null) {
throw new DuplicateServiceException(interfaceType,
method,
Expand All @@ -95,24 +95,25 @@ public boolean contains(String service) {
}

public static class ServiceInvoker {
private final Method method;
private final ServiceRegistryItem metadata;

private final Object serviceImpl;
private final boolean isOneway;
private final Method method;
private final Type[] parameterTypes;

public ServiceInvoker(Object serviceImpl, Method method, boolean isOneway) {
this.method = method;
public ServiceInvoker(ServiceRegistryItem metadata, Object serviceImpl, Method method) {
this.metadata = metadata;
this.serviceImpl = serviceImpl;
this.isOneway = isOneway;
this.method = method;
this.parameterTypes = method.getGenericParameterTypes();
}

public Object invoke(Object[] args) throws InvocationTargetException, IllegalAccessException {
return method.invoke(serviceImpl, args);
}

public boolean isOneway() {
return isOneway;
public ServiceRegistryItem getMetadata() {
return metadata;
}

public Type[] getParameterTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -68,7 +69,7 @@ public class BrpcClient implements IBrpcChannel, Closeable {
private final IEndPointProvider server;
private final ServiceRegistry serviceRegistry = new ServiceRegistry();
private NioEventLoopGroup bossGroup;
private final Duration retryInterval;
private final Duration retryBackoff;
private final int maxRetry;

/**
Expand All @@ -88,45 +89,46 @@ public class BrpcClient implements IBrpcChannel, Closeable {

/**
* Use {@link BrpcClientBuilder} to create instance.
*
* @param nWorkerThreads if it's 0, worker threads will be default to Runtime.getRuntime().availableProcessors() * 2
*/
BrpcClient(IEndPointProvider server,
int nWorkerThreads,
int maxRetry,
Duration retryInterval,
String appName,
String clientId,
Duration connectionTimeout) {
BrpcClient(BrpcClientBuilder builder) {
Preconditions.checkIfTrue(StringUtils.hasText("appName"), "appName can't be blank.");
Preconditions.checkIfTrue(maxRetry > 0, "maxRetry must be at least 1.");

this.server = Preconditions.checkArgumentNotNull("server", server);
this.maxRetry = maxRetry;
this.retryInterval = retryInterval;
this.appName = appName;
this.server = Preconditions.checkArgumentNotNull("server", builder.server);
this.maxRetry = Math.max(1, builder.maxRetry);
this.retryBackoff = builder.retryBackoff;
this.appName = builder.appName;

this.invocationManager = new InvocationManager();
this.bossGroup = new NioEventLoopGroup(nWorkerThreads, NamedThreadFactory.of("brpc-c-work-" + clientId));
this.bootstrap = new Bootstrap();
this.bootstrap.group(this.bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WriteBufferWaterMark.DEFAULT.low(), 1024 * 1024))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ServiceMessageInDecoder());
pipeline.addLast("encoder", new ServiceMessageOutEncoder(invocationManager));
pipeline.addLast(new ClientChannelManager());
pipeline.addLast(new ServiceMessageChannelHandler(serviceRegistry, invocationManager));
}
});
this.bossGroup = new NioEventLoopGroup(builder.ioThreads, NamedThreadFactory.of("brpc-c-io-" + builder.clientId));
this.bootstrap = new Bootstrap().group(this.bossGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, builder.keepAlive)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ServiceMessageInDecoder());
pipeline.addLast("encoder", new ServiceMessageOutEncoder(invocationManager));
pipeline.addLast(new ClientChannelManager());
pipeline.addLast(new ServiceMessageChannelHandler(serviceRegistry, Runnable::run, invocationManager));
}
});

this.connectionTimeout = connectionTimeout;
if (builder.lowMaterMark > 0 && builder.highMaterMark > 0) {
this.bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(builder.lowMaterMark, builder.highMaterMark));
}

this.connectionTimeout = builder.connectionTimeout;

if (builder.headers != null) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
this.setHeader(k, v);
}
}
}

@Override
Expand Down Expand Up @@ -210,7 +212,7 @@ private void doConnect(int maxRetry) {
server.getHost(),
server.getPort(),
maxRetry - i - 1);
Thread.sleep(retryInterval.toMillis());
Thread.sleep(retryBackoff.toMillis());
}
} catch (InterruptedException ignored) {
}
Expand Down
Loading
Loading