Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing InboundMessage with NativeInboundMessage for deprecation #13126

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void aggregate(ReleasableBytesReference content) {
}
}

public InboundMessage finishAggregation() throws IOException {
public NativeInboundMessage finishAggregation() throws IOException {
ensureOpen();
final ReleasableBytesReference releasableContent;
if (isFirstContent()) {
Expand All @@ -127,7 +128,7 @@ public InboundMessage finishAggregation() throws IOException {
}

final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
final NativeInboundMessage aggregated = new NativeInboundMessage(currentHeader, releasableContent, breakerControl);
boolean success = false;
try {
if (aggregated.getHeader().needsToReadVariableHeader()) {
Expand All @@ -142,7 +143,7 @@ public InboundMessage finishAggregation() throws IOException {
if (isShortCircuited()) {
aggregated.close();
success = true;
return new InboundMessage(aggregated.getHeader(), aggregationException);
return new NativeInboundMessage(aggregated.getHeader(), aggregationException);
} else {
success = true;
return aggregated;
Expand Down
108 changes: 0 additions & 108 deletions server/src/main/java/org/opensearch/transport/InboundMessage.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void messageReceived(
long slowLogThresholdMs,
TransportMessageListener messageListener
) throws IOException {
InboundMessage inboundMessage = (InboundMessage) message;
NativeInboundMessage inboundMessage = (NativeInboundMessage) message;
TransportLogger.logInboundMessage(channel, inboundMessage);
if (inboundMessage.isPing()) {
keepAlive.receiveKeepAlive(channel);
Expand All @@ -122,7 +123,7 @@ public void messageReceived(

private void handleMessage(
TcpChannel channel,
InboundMessage message,
NativeInboundMessage message,
long startTime,
long slowLogThresholdMs,
TransportMessageListener messageListener
Expand Down Expand Up @@ -194,7 +195,7 @@ private Map<String, Collection<String>> extractHeaders(Map<String, String> heade
private <T extends TransportRequest> void handleRequest(
TcpChannel channel,
Header header,
InboundMessage message,
NativeInboundMessage message,
TransportMessageListener messageListener
) throws IOException {
final String action = header.getActionName();
Expand Down
12 changes: 0 additions & 12 deletions server/src/main/java/org/opensearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -761,18 +761,6 @@ protected void serverAcceptedChannel(TcpChannel channel) {
*/
protected abstract void stopInternal();

/**
* @deprecated use {@link #inboundMessage(TcpChannel, ProtocolInboundMessage)}
* Handles inbound message that has been decoded.
*
* @param channel the channel the message is from
* @param message the message
*/
@Deprecated(since = "2.14.0", forRemoval = true)
public void inboundMessage(TcpChannel channel, InboundMessage message) {
inboundMessage(channel, (ProtocolInboundMessage) message);
}

/**
* Handles inbound message that has been decoded.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;

Expand All @@ -64,7 +65,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
}
}

static void logInboundMessage(TcpChannel channel, InboundMessage message) {
static void logInboundMessage(TcpChannel channel, NativeInboundMessage message) {
if (logger.isTraceEnabled()) {
try {
String logMessage = format(channel, message, "READ");
Expand Down Expand Up @@ -136,7 +137,7 @@ private static String format(TcpChannel channel, BytesReference message, String
return sb.toString();
}

private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
private static String format(TcpChannel channel, NativeInboundMessage message, String event) throws IOException {
final StringBuilder sb = new StringBuilder();
sb.append(channel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.transport.InboundAggregator;
import org.opensearch.transport.InboundBytesHandler;
import org.opensearch.transport.InboundDecoder;
import org.opensearch.transport.InboundMessage;
import org.opensearch.transport.ProtocolInboundMessage;
import org.opensearch.transport.StatsTracker;
import org.opensearch.transport.TcpChannel;
Expand All @@ -32,7 +31,7 @@
public class NativeInboundBytesHandler implements InboundBytesHandler {

private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);
private static final NativeInboundMessage PING_MESSAGE = new NativeInboundMessage(null, true);

private final ArrayDeque<ReleasableBytesReference> pending;
private final InboundDecoder decoder;
Expand Down Expand Up @@ -152,7 +151,7 @@ private void forwardFragments(
messageHandler.accept(channel, PING_MESSAGE);
} else if (fragment == InboundDecoder.END_CONTENT) {
assert aggregator.isAggregating();
try (InboundMessage aggregated = aggregator.finishAggregation()) {
try (NativeInboundMessage aggregated = aggregator.finishAggregation()) {
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testInboundAggregation() throws IOException {
}

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(aggregated.isPing());
Expand Down Expand Up @@ -138,7 +139,7 @@ public void testInboundUnknownAction() throws IOException {
assertEquals(0, content.refCount());

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertTrue(aggregated.isShortCircuit());
Expand All @@ -161,7 +162,7 @@ public void testCircuitBreak() throws IOException {
content1.close();

// Signal EOS
InboundMessage aggregated1 = aggregator.finishAggregation();
NativeInboundMessage aggregated1 = aggregator.finishAggregation();

assertEquals(0, content1.refCount());
assertThat(aggregated1, notNullValue());
Expand All @@ -180,7 +181,7 @@ public void testCircuitBreak() throws IOException {
content2.close();

// Signal EOS
InboundMessage aggregated2 = aggregator.finishAggregation();
NativeInboundMessage aggregated2 = aggregator.finishAggregation();

assertEquals(1, content2.refCount());
assertThat(aggregated2, notNullValue());
Expand All @@ -199,7 +200,7 @@ public void testCircuitBreak() throws IOException {
content3.close();

// Signal EOS
InboundMessage aggregated3 = aggregator.finishAggregation();
NativeInboundMessage aggregated3 = aggregator.finishAggregation();

assertEquals(1, content3.refCount());
assertThat(aggregated3, notNullValue());
Expand Down Expand Up @@ -263,7 +264,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException {
content.close();

// Signal EOS
InboundMessage aggregated = aggregator.finishAggregation();
NativeInboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(header.needsToReadVariableHeader());
Expand Down
Loading
Loading