Skip to content

Commit

Permalink
feat: uds 支持协程
Browse files Browse the repository at this point in the history
  • Loading branch information
goodjava committed Nov 11, 2024
1 parent ece4c13 commit 8b85d2c
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
*/
public class NetUtils {

public static EventLoopGroup getEventLoopGroup() {
// if (CommonUtils.isMac() && CommonUtils.isArch64()) {
// return new NioEventLoopGroup();
// }
public static EventLoopGroup getEventLoopGroup(boolean remote) {
if (remote) {
return new NioEventLoopGroup();
}
if (CommonUtils.isWindows()) {
return new NioEventLoopGroup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public UdsClient(String id) {


private EventLoopGroup getEventLoopGroup() {
return NetUtils.getEventLoopGroup();
return NetUtils.getEventLoopGroup(this.remote);
}


Expand Down Expand Up @@ -135,10 +135,6 @@ public void call(Object msg) {
Send.send(this.channel, command);
}


/**
* 发送OpenAI流式请求
*/
public void stream(UdsCommand command, ClientStreamCallback callback) {
Map<String, String> attachments = command.getAttachments();
// 注册回调
Expand Down Expand Up @@ -169,13 +165,23 @@ public UdsCommand call(UdsCommand req) {
}
log.debug("start send,id:{}", id);
Send.send(channel, req);

wheelTimer.newTimeout(() -> {
log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout());
HashMap<String, Object> map = reqMap.remove(req.getId());
if (null != map) {
CompletableFuture<Object> f = (CompletableFuture<Object>) map.get("future");
if (null != f && !f.isDone()) {
future.completeExceptionally(
new TimeoutException("Request timeout: " + req.getTimeout())
);
}
}
}, req.getTimeout() + 350);

//异步还是同步
if (req.isAsync()) {
req.setCompletableFuture(future);
wheelTimer.newTimeout(() -> {
log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout());
reqMap.remove(req.getId());
}, req.getTimeout() + 350);
return req;
}
return (UdsCommand) future.get(req.getTimeout(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.base.Stopwatch;
import com.xiaomi.data.push.common.*;
import com.xiaomi.data.push.uds.WheelTimer.UdsWheelTimer;
import com.xiaomi.data.push.uds.context.TraceContext;
import com.xiaomi.data.push.uds.context.TraceEvent;
import com.xiaomi.data.push.uds.context.UdsServerContext;
Expand All @@ -31,6 +32,8 @@
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -42,7 +45,6 @@
import java.nio.file.Paths;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
Expand All @@ -58,6 +60,10 @@ public class UdsServer implements IServer<UdsCommand> {

private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();

// 创建时间轮,可以根据需要调整tick时间和轮子大小
private UdsWheelTimer wheelTimer = new UdsWheelTimer();


/**
* 是否使用remote模式(标准tcp)
*/
Expand Down Expand Up @@ -97,8 +103,8 @@ public void start(String path) {
this.path = path;
delPath();
boolean mac = CommonUtils.isMac();
EventLoopGroup bossGroup = NetUtils.getEventLoopGroup();
EventLoopGroup workerGroup = NetUtils.getEventLoopGroup();
EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(this.remote);
EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(this.remote);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
Expand Down Expand Up @@ -187,9 +193,24 @@ public UdsCommand call(UdsCommand req) {
TraceContext context = new TraceContext();
context.enter();
long id = req.getId();

// 创建超时任务
Timeout timeout = wheelTimer.newTimeout(() -> {
CompletableFuture<UdsCommand> future = reqMap.remove(id);
if (future != null && !future.isDone()) {
future.completeExceptionally(
new TimeoutException("Request timeout: " + req.getTimeout())
);
}
}, req.getTimeout());

try {
String app = req.getApp();
CompletableFuture<UdsCommand> future = new CompletableFuture<>();

// 添加完成时取消定时任务的回调
future.whenComplete((k, v) -> timeout.cancel());

reqMap.put(id, future);
Channel channel = UdsServerContext.ins().channel(app);
if (null == channel || !channel.isOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

/**
* @author [email protected]
Expand Down

This file was deleted.

0 comments on commit 8b85d2c

Please sign in to comment.