diff --git a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcAgentController.java b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcAgentController.java index 429ffb961b..1822d8d1f9 100644 --- a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcAgentController.java +++ b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcAgentController.java @@ -65,10 +65,10 @@ public BrpcAgentController(AgentControllerConfig config) { .applicationName(appInstance.getQualifiedName()) .clientId("ctrl") .server(new RoundRobinEndPointProvider(endpoints)) - .workerThreads(2) + .ioThreads(2) .maxRetry(3) - .retryInterval(Duration.ofSeconds(2)) - .connectionTimeout(config.getClient().getConnectionTimeout()) + .retryBackOff(Duration.ofSeconds(2)) + .connectionTimeout(Duration.ofMillis(config.getClient().getConnectionTimeout())) .header(Headers.HEADER_VERSION, AgentBuildVersion.getString()) .header(Headers.HEADER_START_TIME, String.valueOf(ManagementFactory.getRuntimeMXBean().getStartTime())) .build(); diff --git a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcEventMessageChannel.java b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcEventMessageChannel.java index 9e4693dc77..fcea3beab8 100644 --- a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcEventMessageChannel.java +++ b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcEventMessageChannel.java @@ -60,8 +60,8 @@ public BrpcEventMessageChannel(DispatcherConfig dispatcherConfig) { .clientId("event") .server(new RoundRobinEndPointProvider(endpoints)) .maxRetry(3) - .retryInterval(Duration.ofMillis(200)) - .connectionTimeout(dispatcherConfig.getClient().getConnectionTimeout()) + .retryBackOff(Duration.ofMillis(200)) + .connectionTimeout(Duration.ofMillis(dispatcherConfig.getClient().getConnectionTimeout())) .build(); this.dispatcherConfig = dispatcherConfig; diff --git a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcMetricMessageChannel.java b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcMetricMessageChannel.java index 83f7068003..c5711f1159 100644 --- a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcMetricMessageChannel.java +++ b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcMetricMessageChannel.java @@ -97,8 +97,8 @@ public BrpcMetricMessageChannel(DispatcherConfig dispatcherConfig) { .clientId("metrics") .server(new RoundRobinEndPointProvider(endpoints)) .maxRetry(3) - .retryInterval(Duration.ofMillis(100)) - .connectionTimeout(dispatcherConfig.getClient().getConnectionTimeout()) + .retryBackOff(Duration.ofMillis(100)) + .connectionTimeout(Duration.ofMillis(dispatcherConfig.getClient().getConnectionTimeout())) .build(); this.dispatcherConfig = dispatcherConfig; diff --git a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcTraceMessageChannel.java b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcTraceMessageChannel.java index bc106a91b5..07c57a7810 100644 --- a/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcTraceMessageChannel.java +++ b/agent/agent-dispatcher-brpc/src/main/java/org/bithon/agent/dispatcher/brpc/BrpcTraceMessageChannel.java @@ -61,8 +61,8 @@ public BrpcTraceMessageChannel(DispatcherConfig dispatcherConfig) { .clientId("trace") .server(new RoundRobinEndPointProvider(endpoints)) .maxRetry(3) - .retryInterval(Duration.ofMillis(200)) - .connectionTimeout(dispatcherConfig.getClient().getConnectionTimeout()) + .retryBackOff(Duration.ofMillis(200)) + .connectionTimeout(Duration.ofMillis(dispatcherConfig.getClient().getConnectionTimeout())) .build(); this.dispatcherConfig = dispatcherConfig; diff --git a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/Dispatcher.java b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/Dispatcher.java index 59b37feb86..ac8df26512 100644 --- a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/Dispatcher.java +++ b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/Dispatcher.java @@ -23,7 +23,7 @@ import org.bithon.agent.observability.dispatcher.config.DispatcherConfig; import org.bithon.agent.observability.dispatcher.task.BlockingQueue; import org.bithon.agent.observability.dispatcher.task.DispatchTask; -import org.bithon.agent.observability.dispatcher.task.IMessageQueue; +import org.bithon.agent.observability.dispatcher.task.IThreadSafeQueue; import org.bithon.component.commons.logging.ILogAdaptor; import org.bithon.component.commons.logging.LoggerFactory; @@ -112,7 +112,7 @@ private synchronized void startTask(int port) { messageChannel::sendMessage); } - private IMessageQueue createQueue(DispatcherConfig config) { + private IThreadSafeQueue createQueue(DispatcherConfig config) { return new BlockingQueue(config.getQueueSize()); } diff --git a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/config/DispatcherConfig.java b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/config/DispatcherConfig.java index 39980bf7c5..bbe64d7c11 100644 --- a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/config/DispatcherConfig.java +++ b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/config/DispatcherConfig.java @@ -29,7 +29,7 @@ public class DispatcherConfig { private Map messageDebug = Collections.emptyMap(); public enum QueueFullStrategy { - DISCARD, + DISCARD_NEWEST, DISCARD_OLDEST } diff --git a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BatchMessageQueue.java b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BatchMessageQueue.java index fea34e81d8..b28550ce33 100644 --- a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BatchMessageQueue.java +++ b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BatchMessageQueue.java @@ -28,9 +28,9 @@ * @author Frank Chen * @date 19/4/23 10:31 pm */ -public class BatchMessageQueue implements IMessageQueue { +public class BatchMessageQueue implements IThreadSafeQueue { - private final IMessageQueue delegate; + private final IThreadSafeQueue delegate; private final int batchSize; private List fetched; @@ -39,7 +39,7 @@ public int getBatchSize() { return batchSize; } - public BatchMessageQueue(IMessageQueue delegate, int batchSize) { + public BatchMessageQueue(IThreadSafeQueue delegate, int batchSize) { this.delegate = delegate; this.batchSize = batchSize; } diff --git a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BlockingQueue.java b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BlockingQueue.java index 4b50756594..18b01eb90a 100644 --- a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BlockingQueue.java +++ b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/BlockingQueue.java @@ -24,7 +24,7 @@ * @author frank.chen021@outlook.com * @date 2021/5/14 10:46 上午 */ -public class BlockingQueue implements IMessageQueue { +public class BlockingQueue implements IThreadSafeQueue { private final LinkedBlockingQueue queue; private final int capacity; diff --git a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/DispatchTask.java b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/DispatchTask.java index 86be165966..354d338c09 100644 --- a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/DispatchTask.java +++ b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/DispatchTask.java @@ -32,10 +32,10 @@ public class DispatchTask { private static final ILogAdaptor LOG = LoggerFactory.getLogger(DispatchTask.class); private final Consumer underlyingSender; - private final IMessageQueue queue; + private final IThreadSafeQueue queue; private final DispatcherConfig.QueueFullStrategy queueFullStrategy; private volatile boolean isRunning = true; - private volatile boolean isTaskEnd = false; + private volatile boolean isTaskEnded = false; /** * in millisecond @@ -43,7 +43,7 @@ public class DispatchTask { private final long flushTime; public DispatchTask(String taskName, - IMessageQueue queue, + IThreadSafeQueue queue, DispatcherConfig config, Consumer underlyingSender) { this.flushTime = Math.max(10, config.getFlushTime()); @@ -54,7 +54,7 @@ public DispatchTask(String taskName, while (isRunning) { dispatch(true); } - isTaskEnd = true; + isTaskEnded = true; }, taskName + "-sender").start(); } @@ -85,6 +85,9 @@ private void dispatch(boolean waitIfEmpty) { } } + /** + * User code might call this public method in multiple threads, thread-safe must be guaranteed. + */ public void accept(Object message) { if (!isRunning) { return; @@ -97,15 +100,19 @@ public void accept(Object message) { // but because the underlying queue is already a concurrency-supported structure, // adding such a lock to solve this edge case does not gain much // - if (DispatcherConfig.QueueFullStrategy.DISCARD.equals(this.queueFullStrategy)) { - // The return is ignored if the 'offer' fails to run + if (DispatcherConfig.QueueFullStrategy.DISCARD_NEWEST.equals(this.queueFullStrategy)) { + // The 'message' will be discarded if the queue is full this.queue.offer(message); } else if (DispatcherConfig.QueueFullStrategy.DISCARD_OLDEST.equals(this.queueFullStrategy)) { // Discard the oldest in the queue + int discarded = 0; while (!queue.offer(message)) { - LOG.error("Failed offer element to the queue, capacity = {}. Discarding the oldest...", this.queue.capacity()); + discarded++; queue.pop(); } + if (discarded > 0) { + LOG.error("Failed offer element to the queue, capacity = {}. Discarded the {} oldest entry", this.queue.capacity(), discarded); + } } else { throw new UnsupportedOperationException("Not supported now"); } @@ -116,7 +123,7 @@ public void stop() { isRunning = false; // Wait for the send task to complete - while (!isTaskEnd) { + while (!isTaskEnded) { try { Thread.sleep(50); } catch (InterruptedException ignored) { diff --git a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/IMessageQueue.java b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/IThreadSafeQueue.java similarity index 97% rename from agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/IMessageQueue.java rename to agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/IThreadSafeQueue.java index a46de8b35d..572d905dee 100644 --- a/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/IMessageQueue.java +++ b/agent/agent-observability/src/main/java/org/bithon/agent/observability/dispatcher/task/IThreadSafeQueue.java @@ -21,7 +21,7 @@ /** * @author frankchen */ -public interface IMessageQueue { +public interface IThreadSafeQueue { boolean offer(Object items); diff --git a/agent/agent-observability/src/test/java/org/bithon/agent/observability/dispatcher/task/BlockingQueueTest.java b/agent/agent-observability/src/test/java/org/bithon/agent/observability/dispatcher/task/BlockingQueueTest.java index 9839cbc01c..8ce4973e3d 100644 --- a/agent/agent-observability/src/test/java/org/bithon/agent/observability/dispatcher/task/BlockingQueueTest.java +++ b/agent/agent-observability/src/test/java/org/bithon/agent/observability/dispatcher/task/BlockingQueueTest.java @@ -33,7 +33,7 @@ public class BlockingQueueTest { static class QueueTestDelegation { - private final IMessageQueue queue; + private final IThreadSafeQueue queue; private long elapsed = 0; private Object takenObject; diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/ServiceRegistry.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/ServiceRegistry.java index 6f5d52d01c..7282affcde 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/ServiceRegistry.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/ServiceRegistry.java @@ -73,7 +73,7 @@ private void addService(Class interfaceType, Object serviceImpl) { Map registryPerService = registry.computeIfAbsent(registryItem.getServiceName(), v -> new ConcurrentHashMap<>(7)); ServiceInvoker existingRegistry = registryPerService.putIfAbsent(registryItem.getMethodName(), - new ServiceInvoker(serviceImpl, method, registryItem.isOneway())); + new ServiceInvoker(registryItem, serviceImpl, method)); if (existingRegistry != null) { throw new DuplicateServiceException(interfaceType, method, @@ -95,15 +95,16 @@ public boolean contains(String service) { } public static class ServiceInvoker { - private final Method method; + private final ServiceRegistryItem metadata; + private final Object serviceImpl; - private final boolean isOneway; + private final Method method; private final Type[] parameterTypes; - public ServiceInvoker(Object serviceImpl, Method method, boolean isOneway) { - this.method = method; + public ServiceInvoker(ServiceRegistryItem metadata, Object serviceImpl, Method method) { + this.metadata = metadata; this.serviceImpl = serviceImpl; - this.isOneway = isOneway; + this.method = method; this.parameterTypes = method.getGenericParameterTypes(); } @@ -111,8 +112,8 @@ public Object invoke(Object[] args) throws InvocationTargetException, IllegalAcc return method.invoke(serviceImpl, args); } - public boolean isOneway() { - return isOneway; + public ServiceRegistryItem getMetadata() { + return metadata; } public Type[] getParameterTypes() { diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClient.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClient.java index 7dfab099e5..dd821a8ce1 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClient.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClient.java @@ -53,6 +53,7 @@ import java.io.Closeable; import java.time.Duration; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -68,7 +69,7 @@ public class BrpcClient implements IBrpcChannel, Closeable { private final IEndPointProvider server; private final ServiceRegistry serviceRegistry = new ServiceRegistry(); private NioEventLoopGroup bossGroup; - private final Duration retryInterval; + private final Duration retryBackoff; private final int maxRetry; /** @@ -88,45 +89,46 @@ public class BrpcClient implements IBrpcChannel, Closeable { /** * Use {@link BrpcClientBuilder} to create instance. - * - * @param nWorkerThreads if it's 0, worker threads will be default to Runtime.getRuntime().availableProcessors() * 2 */ - BrpcClient(IEndPointProvider server, - int nWorkerThreads, - int maxRetry, - Duration retryInterval, - String appName, - String clientId, - Duration connectionTimeout) { + BrpcClient(BrpcClientBuilder builder) { Preconditions.checkIfTrue(StringUtils.hasText("appName"), "appName can't be blank."); - Preconditions.checkIfTrue(maxRetry > 0, "maxRetry must be at least 1."); - this.server = Preconditions.checkArgumentNotNull("server", server); - this.maxRetry = maxRetry; - this.retryInterval = retryInterval; - this.appName = appName; + this.server = Preconditions.checkArgumentNotNull("server", builder.server); + this.maxRetry = Math.max(1, builder.maxRetry); + this.retryBackoff = builder.retryBackoff; + this.appName = builder.appName; this.invocationManager = new InvocationManager(); - this.bossGroup = new NioEventLoopGroup(nWorkerThreads, NamedThreadFactory.of("brpc-c-work-" + clientId)); - this.bootstrap = new Bootstrap(); - this.bootstrap.group(this.bossGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(WriteBufferWaterMark.DEFAULT.low(), 1024 * 1024)) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); - pipeline.addLast("decoder", new ServiceMessageInDecoder()); - pipeline.addLast("encoder", new ServiceMessageOutEncoder(invocationManager)); - pipeline.addLast(new ClientChannelManager()); - pipeline.addLast(new ServiceMessageChannelHandler(serviceRegistry, invocationManager)); - } - }); + this.bossGroup = new NioEventLoopGroup(builder.ioThreads, NamedThreadFactory.of("brpc-c-io-" + builder.clientId)); + this.bootstrap = new Bootstrap().group(this.bossGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, builder.keepAlive) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); + pipeline.addLast("decoder", new ServiceMessageInDecoder()); + pipeline.addLast("encoder", new ServiceMessageOutEncoder(invocationManager)); + pipeline.addLast(new ClientChannelManager()); + pipeline.addLast(new ServiceMessageChannelHandler(serviceRegistry, Runnable::run, invocationManager)); + } + }); - this.connectionTimeout = connectionTimeout; + if (builder.lowMaterMark > 0 && builder.highMaterMark > 0) { + this.bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(builder.lowMaterMark, builder.highMaterMark)); + } + + this.connectionTimeout = builder.connectionTimeout; + + if (builder.headers != null) { + for (Map.Entry entry : headers.entrySet()) { + String k = entry.getKey(); + String v = entry.getValue(); + this.setHeader(k, v); + } + } } @Override @@ -210,7 +212,7 @@ private void doConnect(int maxRetry) { server.getHost(), server.getPort(), maxRetry - i - 1); - Thread.sleep(retryInterval.toMillis()); + Thread.sleep(retryBackoff.toMillis()); } } catch (InterruptedException ignored) { } diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClientBuilder.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClientBuilder.java index e4f9daf79d..6e6baffa07 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClientBuilder.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcClientBuilder.java @@ -28,29 +28,32 @@ * @date 2022/12/10 14:10 */ public class BrpcClientBuilder { - private IEndPointProvider server; - private int workerThreads = 1; + boolean keepAlive = true; + int lowMaterMark = 0; + int highMaterMark = 0; + int ioThreads = 1; - private int maxRetry = 30; - private Duration retryInterval = Duration.ofMillis(100); + int maxRetry = 30; + Duration retryBackoff = Duration.ofMillis(100); - private String appName = "brpc-client"; + String appName = "brpc-client"; - private Map headers; + IEndPointProvider server; + Map headers; /** * The name that is used to set to threads of this client. * Although the default name is the same as {@link #appName} above, - * it differs from it because one application might have more than 1 brpc clients to serve different needs, + * it differs from it because one application might have more than one brpc client to serve different needs, * to mark the difference of these clients, this client id helps. */ - private String clientId = "brpc-client"; + String clientId = "brpc-client"; /** - * The default value is 200, which is originally used in previous versions. + * The default value is 200ms, which is originally used in previous versions. * We keep it as compatibility. */ - private int connectionTimeout = 200; + Duration connectionTimeout = Duration.ofMillis(200); public static BrpcClientBuilder builder() { return new BrpcClientBuilder(); @@ -66,8 +69,8 @@ public BrpcClientBuilder server(IEndPointProvider server) { return this; } - public BrpcClientBuilder workerThreads(int nWorkerThreads) { - this.workerThreads = nWorkerThreads; + public BrpcClientBuilder ioThreads(int ioThreads) { + this.ioThreads = ioThreads; return this; } @@ -76,8 +79,8 @@ public BrpcClientBuilder maxRetry(int maxRetry) { return this; } - public BrpcClientBuilder retryInterval(Duration retryInterval) { - this.retryInterval = retryInterval; + public BrpcClientBuilder retryBackOff(Duration retryBackOff) { + this.retryBackoff = retryBackOff; return this; } @@ -99,26 +102,26 @@ public BrpcClientBuilder header(String name, String value) { return this; } - public BrpcClientBuilder connectionTimeout(int connectionTimeout) { + public BrpcClientBuilder connectionTimeout(Duration connectionTimeout) { this.connectionTimeout = connectionTimeout; return this; } + public BrpcClientBuilder keepAlive(boolean keepAlive) { + return this; + } + + public BrpcClientBuilder lowMaterMark(int low) { + this.lowMaterMark = low; + return this; + } + + public BrpcClientBuilder highMaterMark(int high) { + this.highMaterMark = high; + return this; + } + public BrpcClient build() { - BrpcClient brpcClient = new BrpcClient(server, - workerThreads, - maxRetry, - retryInterval, - appName, - clientId, - Duration.ofMillis(connectionTimeout)); - if (headers != null) { - for (Map.Entry entry : headers.entrySet()) { - String k = entry.getKey(); - String v = entry.getValue(); - brpcClient.setHeader(k, v); - } - } - return brpcClient; + return new BrpcClient(this); } } diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcServer.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcServer.java index 0a490c9976..493853833f 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcServer.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcServer.java @@ -30,6 +30,7 @@ import org.bithon.component.brpc.message.out.ServiceRequestMessageOut; import org.bithon.component.commons.concurrency.NamedThreadFactory; import org.bithon.component.commons.logging.LoggerFactory; +import org.bithon.component.commons.utils.Preconditions; import org.bithon.shaded.io.netty.bootstrap.ServerBootstrap; import org.bithon.shaded.io.netty.buffer.PooledByteBufAllocator; import org.bithon.shaded.io.netty.channel.Channel; @@ -39,6 +40,7 @@ import org.bithon.shaded.io.netty.channel.ChannelInitializer; import org.bithon.shaded.io.netty.channel.ChannelOption; import org.bithon.shaded.io.netty.channel.ChannelPipeline; +import org.bithon.shaded.io.netty.channel.WriteBufferWaterMark; import org.bithon.shaded.io.netty.channel.nio.NioEventLoopGroup; import org.bithon.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; import org.bithon.shaded.io.netty.channel.socket.nio.NioSocketChannel; @@ -56,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.stream.Collectors; /** @@ -63,24 +66,50 @@ */ public class BrpcServer implements Closeable { - private final NioEventLoopGroup bossGroup; - private final NioEventLoopGroup workerGroup; - private final ServiceRegistry serviceRegistry = new ServiceRegistry(); + private final ServerBootstrap serverBootstrap; + private final NioEventLoopGroup acceptorGroup; + private final NioEventLoopGroup ioGroup; + private final ServiceRegistry serviceRegistry = new ServiceRegistry(); private final SessionManager sessionManager; - private final InvocationManager invocationManager; - public BrpcServer(String serverId) { - this(serverId, Runtime.getRuntime().availableProcessors()); - } + BrpcServer(BrpcServerBuilder builder) { + Preconditions.checkNotNull(builder.serverId, "serverId must be set"); - public BrpcServer(String serverId, int nWorkerThreads) { - this.bossGroup = new NioEventLoopGroup(1, NamedThreadFactory.of("brpc-server-" + serverId)); - this.workerGroup = new NioEventLoopGroup(nWorkerThreads, NamedThreadFactory.of("brpc-s-work-" + serverId)); + this.acceptorGroup = new NioEventLoopGroup(1, NamedThreadFactory.of("brpc-s-acceptor-" + builder.serverId)); + this.ioGroup = new NioEventLoopGroup(builder.ioThreads, NamedThreadFactory.of("brpc-s-io-" + builder.serverId)); this.invocationManager = new InvocationManager(); this.sessionManager = new SessionManager(this.invocationManager); + + final Executor executor = builder.executor; + this.serverBootstrap = new ServerBootstrap() + .group(acceptorGroup, ioGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_BACKLOG, builder.backlog) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, false) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); + pipeline.addLast("decoder", new ServiceMessageInDecoder()); + pipeline.addLast("encoder", new ServiceMessageOutEncoder(invocationManager)); + pipeline.addLast(new IdleStateHandler(builder.idleSeconds, 0, 0)); + pipeline.addLast(sessionManager); + pipeline.addLast(new ServiceMessageChannelHandler(serviceRegistry, executor, invocationManager)); + } + }); + + if (builder.lowMaterMark > 0 && builder.highMaterMark > 0) { + this.serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(builder.lowMaterMark, builder.highMaterMark)); + } } /** @@ -95,33 +124,6 @@ public BrpcServer bindService(Object impl) { } public BrpcServer start(int port) { - return start(port, 180); - } - - public BrpcServer start(int port, int idleTimeout) { - - ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap - .group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.SO_BACKLOG, 1024) - .childOption(ChannelOption.SO_KEEPALIVE, false) - .childOption(ChannelOption.TCP_NODELAY, true) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(NioSocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); - pipeline.addLast("decoder", new ServiceMessageInDecoder()); - pipeline.addLast("encoder", new ServiceMessageOutEncoder(invocationManager)); - pipeline.addLast(new IdleStateHandler(idleTimeout, 0, 0)); - pipeline.addLast(sessionManager); - pipeline.addLast(new ServiceMessageChannelHandler(serviceRegistry, invocationManager)); - } - }); try { serverBootstrap.bind(port).sync(); } catch (InterruptedException e) { @@ -134,11 +136,11 @@ protected void initChannel(NioSocketChannel ch) { @Override public void close() { try { - bossGroup.shutdownGracefully().sync(); + acceptorGroup.shutdownGracefully().sync(); } catch (InterruptedException ignored) { } try { - workerGroup.shutdownGracefully().sync(); + ioGroup.shutdownGracefully().sync(); } catch (InterruptedException ignored) { } } diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcServerBuilder.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcServerBuilder.java new file mode 100644 index 0000000000..4330886dd8 --- /dev/null +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/BrpcServerBuilder.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020 bithon.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.bithon.component.brpc.channel; + +import java.util.concurrent.Executor; + +/** + * @author frank.chen021@outlook.com + * @date 2024/4/30 22:18 + */ +public class BrpcServerBuilder { + String serverId; + int idleSeconds = 180; + int backlog = 1024; + int ioThreads = Runtime.getRuntime().availableProcessors(); + int lowMaterMark = 0; + int highMaterMark = 0; + Executor executor = null; + + public static BrpcServerBuilder builder() { + return new BrpcServerBuilder(); + } + + public BrpcServerBuilder serverId(String serverId) { + this.serverId = serverId; + return this; + } + + public BrpcServerBuilder ioThreads(int ioThreads) { + this.ioThreads = ioThreads; + return this; + } + + public BrpcServerBuilder idleSeconds(int idleSeconds) { + this.idleSeconds = idleSeconds; + return this; + } + + public BrpcServerBuilder backlog(int backlog) { + this.backlog = backlog; + return this; + } + + /** + * The executor that executes the service call + */ + public BrpcServerBuilder executor(Executor executor) { + this.executor = executor; + return this; + } + + public BrpcServer build() { + return new BrpcServer(this); + } +} diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/ServiceMessageChannelHandler.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/ServiceMessageChannelHandler.java index 17e6beac54..b51961ea24 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/ServiceMessageChannelHandler.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/channel/ServiceMessageChannelHandler.java @@ -17,7 +17,6 @@ package org.bithon.component.brpc.channel; import org.bithon.component.brpc.ServiceRegistry; -import org.bithon.component.brpc.invocation.IServiceInvocationExecutor; import org.bithon.component.brpc.invocation.InvocationManager; import org.bithon.component.brpc.invocation.ServiceInvocationRunnable; import org.bithon.component.brpc.message.ServiceMessage; @@ -26,71 +25,64 @@ import org.bithon.component.brpc.message.in.ServiceResponseMessageIn; import org.bithon.component.commons.logging.ILogAdaptor; import org.bithon.component.commons.logging.LoggerFactory; +import org.bithon.component.commons.utils.Preconditions; import org.bithon.component.commons.utils.StringUtils; import org.bithon.shaded.io.netty.channel.ChannelHandler; import org.bithon.shaded.io.netty.channel.ChannelHandlerContext; -import org.bithon.shaded.io.netty.channel.ChannelInboundHandlerAdapter; +import org.bithon.shaded.io.netty.channel.SimpleChannelInboundHandler; import org.bithon.shaded.io.netty.handler.codec.DecoderException; import java.io.IOException; +import java.util.concurrent.Executor; /** * @author frankchen */ @ChannelHandler.Sharable -class ServiceMessageChannelHandler extends ChannelInboundHandlerAdapter { +class ServiceMessageChannelHandler extends SimpleChannelInboundHandler { private static final ILogAdaptor LOG = LoggerFactory.getLogger(ServiceMessageChannelHandler.class); - private final IServiceInvocationExecutor executor; + private final Executor executor; private final ServiceRegistry serviceRegistry; private final InvocationManager invocationManager; - private boolean channelDebugEnabled; - - /** - * Instantiate an instance which calls the service in worker threads - */ - ServiceMessageChannelHandler(ServiceRegistry serviceRegistry, - InvocationManager invocationManager) { - this(serviceRegistry, ServiceInvocationRunnable::run, invocationManager); - } /** * Instantiate an instance which calls the service in specified executor. */ public ServiceMessageChannelHandler(ServiceRegistry serviceRegistry, - IServiceInvocationExecutor executor, + Executor executor, InvocationManager invocationManager) { - this.serviceRegistry = serviceRegistry; - this.executor = executor; - this.invocationManager = invocationManager; + this.serviceRegistry = Preconditions.checkArgumentNotNull("serviceRegistry", serviceRegistry); + this.invocationManager = Preconditions.checkArgumentNotNull("invocationManager", invocationManager); + this.executor = executor == null ? Runnable::run : executor; } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (!(msg instanceof ServiceMessage)) { - return; - } - - ServiceMessage message = (ServiceMessage) msg; - switch (message.getMessageType()) { + protected void channelRead0(ChannelHandlerContext ctx, ServiceMessage msg) { + switch (msg.getMessageType()) { case ServiceMessageType.CLIENT_REQUEST_ONEWAY: case ServiceMessageType.CLIENT_REQUEST: case ServiceMessageType.CLIENT_REQUEST_V2: - ServiceRequestMessageIn request = (ServiceRequestMessageIn) message; - if (channelDebugEnabled) { - LOG.info("Receiving request, txId={}, service={}#{}", request.getTransactionId(), request.getServiceName(), request.getMethodName()); + ServiceRequestMessageIn request = (ServiceRequestMessageIn) msg; + if (LOG.isDebugEnabled()) { + LOG.debug("Receiving request, txId={}, service={}#{}", request.getTransactionId(), request.getServiceName(), request.getMethodName()); } - executor.execute(new ServiceInvocationRunnable(serviceRegistry, ctx.channel(), (ServiceRequestMessageIn) message)); + ServiceInvocationRunnable.execute(serviceRegistry, + ctx.channel(), (ServiceRequestMessageIn) msg, + this.executor); break; + case ServiceMessageType.SERVER_RESPONSE: - if (channelDebugEnabled) { - LOG.info("Receiving response, txId={}", message.getTransactionId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Receiving response, txId={}", msg.getTransactionId()); } - invocationManager.onResponse((ServiceResponseMessageIn) message); + + invocationManager.handleResponse((ServiceResponseMessageIn) msg); break; + default: - LOG.warn("Receiving unknown message: {}", message.getMessageType()); + LOG.warn("Receiving unknown message: {}", msg.getMessageType()); break; } } @@ -105,6 +97,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } return; } + if (cause instanceof IOException) { // do not log stack trace for known exceptions LOG.error("Exception({}) occurred on channel({} --> {}) when processing message: {}", @@ -131,12 +124,4 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) { } ctx.fireChannelWritabilityChanged(); } - - public boolean isChannelDebugEnabled() { - return channelDebugEnabled; - } - - public void setChannelDebugEnabled(boolean channelDebugEnabled) { - this.channelDebugEnabled = channelDebugEnabled; - } } diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/IServiceInvocationExecutor.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/IServiceInvocationExecutor.java deleted file mode 100644 index 72685efa68..0000000000 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/IServiceInvocationExecutor.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2020 bithon.org - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.bithon.component.brpc.invocation; - -/** - * @author frankchen - */ -public interface IServiceInvocationExecutor { - void execute(ServiceInvocationRunnable runnable); -} diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/InvocationManager.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/InvocationManager.java index adea11a9ed..75d01e0562 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/InvocationManager.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/InvocationManager.java @@ -41,7 +41,7 @@ *

* Note: the concept 'client' here is a relative concept. * It could be a network client, which connects to an RPC server, - * it could also be an RPC server which calls service provided by a network client. + * it could also be an RPC server that calls service provided by a network client. * * @author frankchen */ @@ -83,10 +83,14 @@ public Object invoke(String appName, timeoutMillisecond); } - public byte[] invoke(IBrpcChannel channelWriter, + /** + * Invoke a remote service method and returns the raw byte-stream response. + * This is used for proxy. + */ + public byte[] invoke(IBrpcChannel channel, ServiceRequestMessageOut serviceRequest, long timeoutMillisecond) throws Throwable { - return (byte[]) invoke(channelWriter, serviceRequest, null, timeoutMillisecond); + return (byte[]) invoke(channel, serviceRequest, null, timeoutMillisecond); } private Object invoke(IBrpcChannel channel, @@ -125,8 +129,7 @@ private Object invoke(IBrpcChannel channel, if (!serviceRequest.isOneway()) { inflightRequest = new InflightRequest(serviceRequest.getServiceName(), serviceRequest.getMethodName(), - returnObjectType, - System.currentTimeMillis()); + returnObjectType); this.inflightRequests.put(serviceRequest.getTransactionId(), inflightRequest); } @@ -155,26 +158,28 @@ private Object invoke(IBrpcChannel channel, remoteEndpoint); } - //make sure it has been cleared when timeout + // Make sure it has been cleared when timeout inflightRequests.remove(serviceRequest.getTransactionId()); if (inflightRequest.exception != null) { throw inflightRequest.exception; } - if (!inflightRequest.responseReceived) { - throw new TimeoutException(remoteEndpoint.toString(), - serviceRequest.getServiceName(), - serviceRequest.getMethodName(), - timeoutMillisecond); + if (inflightRequest.responseAt > 0) { + // Response has been collected, then return the object. + // NOTE: The return object might be NULL + return inflightRequest.returnObject; } - return inflightRequest.returnObject; + throw new TimeoutException(remoteEndpoint.toString(), + serviceRequest.getServiceName(), + serviceRequest.getMethodName(), + timeoutMillisecond); } return null; } - public void onResponse(ServiceResponseMessageIn response) { + public void handleResponse(ServiceResponseMessageIn response) { long txId = response.getTransactionId(); InflightRequest inflightRequest = inflightRequests.remove(txId); if (inflightRequest == null) { @@ -197,29 +202,30 @@ public void onResponse(ServiceResponseMessageIn response) { } else { try { inflightRequest.returnObject = inflightRequest.returnObjectType == null ? - response.getReturnAsRaw() : - response.getReturningAsObject(inflightRequest.returnObjectType); + response.getReturnAsRaw() : + response.getReturningAsObject(inflightRequest.returnObjectType); } catch (IOException e) { inflightRequest.exception = new ServiceInvocationException(e, "Failed to deserialize the received response: %s", e.getMessage()); } } synchronized (inflightRequest) { - inflightRequest.responseReceived = true; + inflightRequest.responseAt = System.currentTimeMillis(); inflightRequest.notify(); } } - public void onClientException(long txId, Throwable e) { + /** + * Handle exception raised at caller side before the RPC call request is issued to server side + */ + public void handleException(long txId, Throwable e) { InflightRequest inflightRequest = inflightRequests.remove(txId); if (inflightRequest == null) { return; } - inflightRequest.exception = e; - synchronized (inflightRequest) { - inflightRequest.responseReceived = true; + inflightRequest.exception = e; inflightRequest.notify(); } } @@ -239,25 +245,19 @@ static class InflightRequest { private InflightRequest(String serviceName, String methodName, - Type returnObjectType, - long requestAt) { + Type returnObjectType) { this.serviceName = serviceName; this.methodName = methodName; this.returnObjectType = returnObjectType; - this.requestAt = requestAt; + this.requestAt = System.currentTimeMillis(); } /** * The deserialized response object. * If {@link #returnObjectType} is NULL, then this object holds raw byte-stream of the response. */ - Object returnObject; - - /** - * Indicate whether this request has a response. - * This is required so that {@link #returnObject} might be null - */ - boolean responseReceived; - Throwable exception; + volatile long responseAt; + volatile Object returnObject; + volatile Throwable exception; } } diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/ServiceInvocationRunnable.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/ServiceInvocationRunnable.java index 8ed1bc3fbf..11f1015ac0 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/ServiceInvocationRunnable.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/invocation/ServiceInvocationRunnable.java @@ -27,25 +27,82 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.Executor; /** * @author frankchen */ public class ServiceInvocationRunnable implements Runnable { - private final ServiceRegistry serviceRegistry; private final Channel channel; - private final ServiceRequestMessageIn serviceRequest; - public ServiceInvocationRunnable(ServiceRegistry serviceRegistry, - Channel channel, - ServiceRequestMessageIn serviceRequest) { - this.serviceRegistry = serviceRegistry; - this.serviceRequest = serviceRequest; + private final long txId; + private final ServiceRegistry.ServiceInvoker serviceInvoker; + private final Object[] args; + + public ServiceInvocationRunnable(Channel channel, + long txId, + ServiceRegistry.ServiceInvoker serviceInvoker, + Object[] args) { this.channel = channel; + this.txId = txId; + this.serviceInvoker = serviceInvoker; + this.args = args; } + @Override public void run() { + try { + Object ret; + try { + ret = serviceInvoker.invoke(this.args); + } catch (IllegalArgumentException e) { + throw new BadRequestException("[Client=%s] Bad Request: Service[%s#%s] exception: Illegal argument", + channel.remoteAddress().toString(), + serviceInvoker.getMetadata().getServiceName(), + serviceInvoker.getMetadata().getMethodName()); + } catch (IllegalAccessException e) { + throw new ServiceInvocationException("[Client=%s] Service[%s#%s] exception: %s", + channel.remoteAddress().toString(), + serviceInvoker.getMetadata().getServiceName(), + serviceInvoker.getMetadata().getMethodName(), + e.getMessage()); + } catch (InvocationTargetException e) { + throw new ServiceInvocationException(e.getTargetException(), + "[Client=%s] Service[%s#%s] invocation exception", + channel.remoteAddress().toString(), + serviceInvoker.getMetadata().getServiceName(), + serviceInvoker.getMetadata().getMethodName()); + } + + if (!serviceInvoker.getMetadata().isOneway()) { + ServiceResponseMessageOut.builder() + .serverResponseAt(System.currentTimeMillis()) + .txId(this.txId) + .serializer(serviceInvoker.getMetadata().getSerializer()) + .returning(ret) + .send(channel); + } + } catch (ServiceInvocationException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + LoggerFactory.getLogger(ServiceInvocationRunnable.class).error(StringUtils.format("[Client=%s] Service Invocation on %s#%s", + channel.remoteAddress().toString(), + serviceInvoker.getMetadata().getServiceName(), + serviceInvoker.getMetadata().getMethodName()), + cause); + ServiceResponseMessageOut.builder() + .serverResponseAt(System.currentTimeMillis()) + .txId(this.txId) + .serializer(serviceInvoker.getMetadata().getSerializer()) + .exception(cause) + .send(channel); + } + } + + public static void execute(ServiceRegistry serviceRegistry, + Channel channel, + ServiceRequestMessageIn serviceRequest, + Executor executor) { try { if (serviceRequest.getServiceName() == null) { throw new BadRequestException("[Client=%s] serviceName is null", channel.remoteAddress().toString()); @@ -64,26 +121,15 @@ public void run() { serviceRequest.getMethodName()); } - Object ret; try { - ret = serviceInvoker.invoke(serviceRequest.readArgs(serviceInvoker.getParameterTypes())); - } catch (IllegalArgumentException e) { - throw new BadRequestException("[Client=%s] Bad Request: Service[%s#%s] exception: Illegal argument", - channel.remoteAddress().toString(), - serviceRequest.getServiceName(), - serviceRequest.getMethodName()); - } catch (IllegalAccessException e) { - throw new ServiceInvocationException("[Client=%s] Service[%s#%s] exception: %s", - channel.remoteAddress().toString(), - serviceRequest.getServiceName(), - serviceRequest.getMethodName(), - e.getMessage()); - } catch (InvocationTargetException e) { - throw new ServiceInvocationException(e.getTargetException(), - "[Client=%s] Service[%s#%s] invocation exception", - channel.remoteAddress().toString(), - serviceRequest.getServiceName(), - serviceRequest.getMethodName()); + // read args outside the thread pool + // so that the messages in the netty buffer are consumed in the netty's IO thread + Object[] args = serviceRequest.readArgs(serviceInvoker.getParameterTypes()); + + executor.execute(new ServiceInvocationRunnable(channel, + serviceRequest.getTransactionId(), + serviceInvoker, + args)); } catch (IOException e) { throw new BadRequestException("[Client=%s] Bad Request: Service[%s#%s]: %s", channel.remoteAddress().toString(), @@ -91,15 +137,6 @@ public void run() { serviceRequest.getMethodName(), e.getMessage()); } - - if (!serviceInvoker.isOneway()) { - ServiceResponseMessageOut.builder() - .serverResponseAt(System.currentTimeMillis()) - .txId(serviceRequest.getTransactionId()) - .serializer(serviceRequest.getSerializer()) - .returning(ret) - .send(channel); - } } catch (ServiceInvocationException e) { Throwable cause = e.getCause() != null ? e.getCause() : e; LoggerFactory.getLogger(ServiceInvocationRunnable.class).error(StringUtils.format("[Client=%s] Service Invocation on %s#%s", diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/message/in/ServiceMessageInDecoder.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/message/in/ServiceMessageInDecoder.java index 4f24d5f041..50cd403970 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/message/in/ServiceMessageInDecoder.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/message/in/ServiceMessageInDecoder.java @@ -31,7 +31,8 @@ * Decode input stream to an incoming service message, either it's {@link ServiceRequestMessageIn} * or {@link ServiceResponseMessageIn} *

- * Note that the {@link ByteToMessageDecoder} DOES NOT allow its subclasses to be {@link org.bithon.shaded.io.netty.channel.ChannelHandler.Sharable}. + * + * NOTE that the {@link ByteToMessageDecoder} DOES NOT allow its subclasses to be {@link org.bithon.shaded.io.netty.channel.ChannelHandler.Sharable}. * However, {@link org.bithon.shaded.io.netty.handler.codec.MessageToByteEncoder} CAN BE shared * * @author frankchen @@ -51,16 +52,21 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t is.pushLimit(in.readableBytes()); int messageType = is.readInt32(); - if (messageType == ServiceMessageType.CLIENT_REQUEST - || messageType == ServiceMessageType.CLIENT_REQUEST_ONEWAY - || messageType == ServiceMessageType.CLIENT_REQUEST_V2) { - out.add(new ServiceRequestMessageIn(messageType).decode(is)); - } else if (messageType == ServiceMessageType.SERVER_RESPONSE) { - out.add(new ServiceResponseMessageIn().decode(is)); - } else { - throw new UnknownMessageException(ctx.channel().remoteAddress().toString(), - ctx.channel().localAddress().toString(), - messageType); + switch (messageType) { + case ServiceMessageType.CLIENT_REQUEST: + case ServiceMessageType.CLIENT_REQUEST_ONEWAY: + case ServiceMessageType.CLIENT_REQUEST_V2: + out.add(new ServiceRequestMessageIn(messageType).decode(is)); + break; + + case ServiceMessageType.SERVER_RESPONSE: + out.add(new ServiceResponseMessageIn().decode(is)); + break; + + default: + throw new UnknownMessageException(ctx.channel().remoteAddress().toString(), + ctx.channel().localAddress().toString(), + messageType); } } } diff --git a/component/component-brpc/src/main/java/org/bithon/component/brpc/message/out/ServiceMessageOutEncoder.java b/component/component-brpc/src/main/java/org/bithon/component/brpc/message/out/ServiceMessageOutEncoder.java index 22a55860b9..c1c413a094 100644 --- a/component/component-brpc/src/main/java/org/bithon/component/brpc/message/out/ServiceMessageOutEncoder.java +++ b/component/component-brpc/src/main/java/org/bithon/component/brpc/message/out/ServiceMessageOutEncoder.java @@ -68,8 +68,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ServiceMessageOut out = ((ServiceMessageEncodingException) cause).out; if (out.getMessageType() == ServiceMessageType.CLIENT_REQUEST || out.getMessageType() == ServiceMessageType.CLIENT_REQUEST_V2) { - invocationManager.onClientException(((ServiceMessageEncodingException) cause).out.getTransactionId(), - cause.getCause()); + invocationManager.handleException(((ServiceMessageEncodingException) cause).out.getTransactionId(), + cause.getCause()); return; } } diff --git a/component/component-brpc/src/test/java/org/bithon/component/brpc/BrpcRpcTest.java b/component/component-brpc/src/test/java/org/bithon/component/brpc/BrpcRpcTest.java index 1b3992b71f..148faa1a51 100644 --- a/component/component-brpc/src/test/java/org/bithon/component/brpc/BrpcRpcTest.java +++ b/component/component-brpc/src/test/java/org/bithon/component/brpc/BrpcRpcTest.java @@ -21,6 +21,7 @@ import org.bithon.component.brpc.channel.BrpcClient; import org.bithon.component.brpc.channel.BrpcClientBuilder; import org.bithon.component.brpc.channel.BrpcServer; +import org.bithon.component.brpc.channel.BrpcServerBuilder; import org.bithon.component.brpc.endpoint.EndPoint; import org.bithon.component.brpc.example.ExampleServiceImpl; import org.bithon.component.brpc.example.IExampleService; @@ -47,9 +48,12 @@ public class BrpcRpcTest { @BeforeClass public static void setup() { - brpcServer = new BrpcServer("test") - .bindService(new ExampleServiceImpl()) - .start(8070, idleSeconds); + brpcServer = BrpcServerBuilder.builder() + .serverId("test") + .idleSeconds(idleSeconds) + .build() + .bindService(new ExampleServiceImpl()) + .start(8070); } @AfterClass @@ -80,8 +84,8 @@ public void testBasicCases() { // test map Assert.assertEquals( - ImmutableMap.of("k1", "v1", "k2", "v2"), - exampleService.mergeMap(ImmutableMap.of("k1", "v1"), ImmutableMap.of("k2", "v2")) + ImmutableMap.of("k1", "v1", "k2", "v2"), + exampleService.mergeMap(ImmutableMap.of("k1", "v1"), ImmutableMap.of("k2", "v2")) ); } } @@ -93,14 +97,14 @@ public void testNullArgument() { // test the 2nd argument is null Assert.assertEquals( - ImmutableMap.of("k1", "v1"), - service.mergeMap(ImmutableMap.of("k1", "v1"), null) + ImmutableMap.of("k1", "v1"), + service.mergeMap(ImmutableMap.of("k1", "v1"), null) ); // test the 1st argument is null Assert.assertEquals( - ImmutableMap.of("k2", "v2"), - service.mergeMap(null, ImmutableMap.of("k2", "v2")) + ImmutableMap.of("k2", "v2"), + service.mergeMap(null, ImmutableMap.of("k2", "v2")) ); // test both arguments are null @@ -124,18 +128,18 @@ public void testMultipleSendMessageLite() { IExampleService exampleService = ch.getRemoteService(IExampleService.class); Assert.assertEquals("/1-/2", exampleService.sendWebMetrics1( - WebRequestMetrics.newBuilder().setUri("/1").build(), - WebRequestMetrics.newBuilder().setUri("/2").build() + WebRequestMetrics.newBuilder().setUri("/1").build(), + WebRequestMetrics.newBuilder().setUri("/2").build() )); Assert.assertEquals("/2-/3", exampleService.sendWebMetrics2( - "/2", - WebRequestMetrics.newBuilder().setUri("/3").build() + "/2", + WebRequestMetrics.newBuilder().setUri("/3").build() )); Assert.assertEquals("/4-/5", exampleService.sendWebMetrics3( - WebRequestMetrics.newBuilder().setUri("/4").build(), - "/5" + WebRequestMetrics.newBuilder().setUri("/4").build(), + "/5" )); } } @@ -352,8 +356,8 @@ public String ping() { IExampleService.class); Assert.assertEquals(2, client1Services.size()); Assert.assertEquals(ImmutableSet.of("pong1", "pong2"), ImmutableSet.of( - client1Services.get(0).ping(), - client1Services.get(1).ping() + client1Services.get(0).ping(), + client1Services.get(1).ping() )); // @@ -385,10 +389,10 @@ public void testJsonSerializer() { // test map Assert.assertEquals( - ImmutableMap.of("k1", "v1", "k2", "v2"), - exampleService.mergeWithJson( - ImmutableMap.of("k1", "v1"), - ImmutableMap.of("k2", "v2")) + ImmutableMap.of("k1", "v1", "k2", "v2"), + exampleService.mergeWithJson( + ImmutableMap.of("k1", "v1"), + ImmutableMap.of("k2", "v2")) ); } } @@ -411,7 +415,7 @@ public void testServiceWithZeroArgument() { @Test public void testCallNotRegisteredService() { - try (BrpcServer brpcServer = new BrpcServer("test").start(18070)) { + try (BrpcServer brpcServer = BrpcServerBuilder.builder().serverId("test").build().start(18070)) { try (BrpcClient ch = BrpcClientBuilder.builder().server("127.0.0.1", 18070).build()) { try { // IExampleService is not registered at remote, ServiceNotFoundException should be thrown diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml index 30b13b53bd..5539f316c5 100644 --- a/dev/checkstyle-suppressions.xml +++ b/dev/checkstyle-suppressions.xml @@ -24,7 +24,7 @@ "https://checkstyle.org/dtds/suppressions_1_2.dtd"> - + diff --git a/server/agent-controller/src/main/java/org/bithon/server/agent/controller/service/AgentControllerServer.java b/server/agent-controller/src/main/java/org/bithon/server/agent/controller/service/AgentControllerServer.java index 14c6f2ee86..14ebebce52 100644 --- a/server/agent-controller/src/main/java/org/bithon/server/agent/controller/service/AgentControllerServer.java +++ b/server/agent-controller/src/main/java/org/bithon/server/agent/controller/service/AgentControllerServer.java @@ -18,6 +18,7 @@ import lombok.extern.slf4j.Slf4j; import org.bithon.component.brpc.channel.BrpcServer; +import org.bithon.component.brpc.channel.BrpcServerBuilder; import org.bithon.component.commons.utils.Preconditions; import org.bithon.server.agent.controller.config.AgentControllerConfig; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -46,15 +47,17 @@ public class AgentControllerServer implements SmartLifecycle { private final AgentSettingLoader loader; - public AgentControllerServer(AgentSettingLoader loader, Environment env) { + public AgentControllerServer(AgentSettingLoader loader, + Environment env) { AgentControllerConfig config = Binder.get(env).bind("bithon.agent-controller", AgentControllerConfig.class).get(); Preconditions.checkIfTrue(config.getPort() > 1000 && config.getPort() < 65535, "The port of bithon.agent-controller property must be in the range of [1000, 65535)"); this.port = config.getPort(); - this.brpcServer = new BrpcServer("ctrl"); - this.brpcServer.bindService(new AgentSettingFetcher(loader)); - this.loader = loader; + this.brpcServer = BrpcServerBuilder.builder() + .serverId("ctrl") + .build() + .bindService(new AgentSettingFetcher(loader)); } public BrpcServer getBrpcServer() { diff --git a/server/collector/src/main/java/org/bithon/server/collector/brpc/BrpcCollectorServer.java b/server/collector/src/main/java/org/bithon/server/collector/brpc/BrpcCollectorServer.java index 1b31457b77..e8a0a2a57d 100644 --- a/server/collector/src/main/java/org/bithon/server/collector/brpc/BrpcCollectorServer.java +++ b/server/collector/src/main/java/org/bithon/server/collector/brpc/BrpcCollectorServer.java @@ -19,11 +19,15 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.bithon.component.brpc.channel.BrpcServer; +import org.bithon.component.brpc.channel.BrpcServerBuilder; +import org.bithon.component.commons.concurrency.NamedThreadFactory; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; -import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author frank.chen021@outlook.com @@ -44,14 +48,22 @@ public class BrpcCollectorServer { System.setProperty("org.bithon.shaded.io.netty.maxDirectMemory", "0"); } - - public synchronized ServiceGroup addService(String name, Object implementation, int port) { + public synchronized ServiceGroup addService(String group, Object implementation, int port) { ServiceGroup serviceGroup = serviceGroups.computeIfAbsent(port, k -> new ServiceGroup()); - serviceGroup.getServices().put(name, implementation); + serviceGroup.getServices().put(group, implementation); if (serviceGroup.brpcServer == null) { // Create a server with the first service name as the server id - serviceGroup.brpcServer = new BrpcServer(name); + serviceGroup.brpcServer = BrpcServerBuilder.builder() + .serverId(group) + .executor(new ThreadPoolExecutor(1, + Runtime.getRuntime().availableProcessors(), + 3, + TimeUnit.MINUTES, + new LinkedBlockingQueue<>(1024), + NamedThreadFactory.of("brpc-executor-" + group), + new ThreadPoolExecutor.CallerRunsPolicy())) + .build(); serviceGroup.start(port); log.info("Started Brpc services [{}] at port {}", String.join(",", serviceGroup.services.keySet()), @@ -62,14 +74,6 @@ public synchronized ServiceGroup addService(String name, Object implementation, return serviceGroup; } - public BrpcServer findServer(String serviceName) { - Optional serviceGroup = this.serviceGroups.values() - .stream() - .filter((sg -> sg.getServices().containsKey(serviceName))) - .findFirst(); - return serviceGroup.map(ServiceGroup::getBrpcServer).orElse(null); - } - @Getter public static class ServiceGroup { /** diff --git a/server/web-service/src/main/java/org/bithon/server/web/service/agent/sql/table/AgentServiceProxyFactory.java b/server/web-service/src/main/java/org/bithon/server/web/service/agent/sql/table/AgentServiceProxyFactory.java index a95669767e..c89d13f154 100644 --- a/server/web-service/src/main/java/org/bithon/server/web/service/agent/sql/table/AgentServiceProxyFactory.java +++ b/server/web-service/src/main/java/org/bithon/server/web/service/agent/sql/table/AgentServiceProxyFactory.java @@ -257,15 +257,15 @@ public void writeAsync(ServiceRequestMessageOut serviceRequest) throws IOExcepti discoveryServiceInvoker.getExecutor()) .thenAccept((responseBytes) -> { try { - ServiceResponseMessageIn in = ServiceResponseMessageIn.from(new ByteArrayInputStream(responseBytes)); - invocationManager.onResponse(in); + ServiceResponseMessageIn response = ServiceResponseMessageIn.from(new ByteArrayInputStream(responseBytes)); + invocationManager.handleResponse(response); } catch (IOException e) { - invocationManager.onClientException(txId, e); + invocationManager.handleException(txId, e); } }) .whenComplete((v, ex) -> { if (ex != null) { - invocationManager.onClientException(txId, ex.getCause() != null ? ex.getCause() : ex); + invocationManager.handleException(txId, ex.getCause() != null ? ex.getCause() : ex); } }); }