Skip to content

Commit

Permalink
spare response slot for update expiration_ ime
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Aug 8, 2024
1 parent ecfd61c commit d3b4192
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ public class ResponseBox {

private final Link link;
private final Queues queues;
private SlotEntry[] boxes = new SlotEntry[SIZE];
private SlotEntry[] boxes = new SlotEntry[SIZE + 1]; // '+ 1' for urgent request
private boolean intentionalClose = false;

public ResponseBox(@Nonnull Link link) {
this.link = link;
this.queues = new Queues(link);
for (byte i = 0; i < SIZE; i++) {
for (int i = 0; i < SIZE + 1; i++) {
boxes[i] = new SlotEntry(i);
queues.addSlot(boxes[i]);
}
Expand All @@ -51,12 +51,28 @@ public ChannelResponse register(@Nonnull byte[] header, @Nonnull byte[] payload)
return channelResponse;
}

ChannelResponse registerUrgent(@Nonnull byte[] header, @Nonnull byte[] payload) throws IOException {
var slotEntry = boxes[SIZE];
if (slotEntry.channelResponse() == null) {
var channelResponse = new ChannelResponse(link, slotEntry.slot());
slotEntry.channelResponse(channelResponse);
slotEntry.requestMessage(payload);
link.send(slotEntry.slot(), header, payload, channelResponse);
return channelResponse;
}
throw new IOException("urgent slot is in use");
}

public void push(int slot, byte[] payload) {
var slotEntry = boxes[slot];
var channelResponse = slotEntry.channelResponse();
if (channelResponse != null) {
channelResponse.setMainResponse(ByteBuffer.wrap(payload));
returnEntryToQueue(slotEntry);
if (slot < SIZE) {
returnEntryToQueue(slotEntry);
} else {
slotEntry.resetChannelResponse();
}
return;
}
LOG.error("invalid slotEntry is used: slot={}, payload={}", slot, payload);
Expand All @@ -66,7 +82,11 @@ public void push(int slot, byte[] payload) {
public void push(int slot, IOException e) {
var slotEntry = boxes[slot];
slotEntry.channelResponse().setMainResponse(e);
returnEntryToQueue(slotEntry);
if (slot < SIZE) {
returnEntryToQueue(slotEntry);
} else {
slotEntry.resetChannelResponse();
}
}

private void returnEntryToQueue(SlotEntry slotEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,38 @@ public FutureResponse<? extends Response> send(int serviceId, @Nonnull ByteBuffe
return send(serviceId, payload.array());
}

/**
* Send an Urgent Request to the server via the native wire.
* @param serviceId the destination service ID
* @param payload the Request message in byte[]
* @return a Future response message corresponding the request
* @throws IOException error occurred in ByteBuffer variant of send()
*/
public FutureResponse<? extends Response> sendUrgent(int serviceId, @Nonnull byte[] payload) throws IOException {
if (closed.get()) {
throw new IOException("already closed");
}
var header = FrameworkRequest.Header.newBuilder()
.setServiceMessageVersionMajor(SERVICE_MESSAGE_VERSION_MAJOR)
.setServiceMessageVersionMinor(SERVICE_MESSAGE_VERSION_MINOR)
.setServiceId(serviceId)
.setSessionId(sessionId())
.build();
var response = responseBox.registerUrgent(toDelimitedByteArray(header), payload);
return FutureResponse.wrap(Owner.of(response));
}

/**
* Send an Urgent Request to the server via the native wire.
* @param serviceId the destination service ID
* @param payload the Request message in ByteBuffer
* @return a Future response message corresponding the request
* @throws IOException error occurred in responseBox.register()
*/
public FutureResponse<? extends Response> sendUrgent(int serviceId, @Nonnull ByteBuffer payload) throws IOException {
return send(serviceId, payload.array());
}

/**
* Create a ResultSetWire without a name, meaning that this wire is not connected
* @return ResultSetWireImpl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,23 @@ public Void process(ByteBuffer payload) throws IOException, ServerException, Int
}
}

<R> FutureResponse<R> sendUpdateExpirationTime(
int serviceId,
@Nonnull byte[] payload,
@Nonnull ResponseProcessor<R> processor) throws IOException {
Objects.requireNonNull(payload);
Objects.requireNonNull(processor);
if (wire instanceof WireImpl) {
FutureResponse<? extends Response> future = ((WireImpl) wire).sendUrgent(serviceId, payload);
return convert(future, processor);
}
FutureResponse<? extends Response> future = wire.send(serviceId, payload);
return convert(future, processor);
}

@Override
public FutureResponse<Void> updateExpirationTime() throws IOException {
return send(
return sendUpdateExpirationTime(
SERVICE_ID,
toDelimitedByteArray(newRequest()
.setUpdateExpirationTime(CoreRequest.UpdateExpirationTime.newBuilder())
Expand All @@ -196,7 +210,7 @@ public FutureResponse<Void> updateExpirationTime() throws IOException {

@Override
public FutureResponse<Void> updateExpirationTime(long t, @Nonnull TimeUnit u) throws IOException {
return send(
return sendUpdateExpirationTime(
SERVICE_ID,
toDelimitedByteArray(newRequest()
.setUpdateExpirationTime(CoreRequest.UpdateExpirationTime.newBuilder()
Expand Down

0 comments on commit d3b4192

Please sign in to comment.