Skip to content

Commit

Permalink
HPCC4J-667 DFSClient: Request traceparents should use current active …
Browse files Browse the repository at this point in the history
…span

- Changed RowServiceInputStream to use the active span instead of the top level read span for the traceparent on requests

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Dec 3, 2024
1 parent 96d8009 commit a0ad6e3
Showing 1 changed file with 38 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static StreamContext constructStreamContext(FieldDef rd, FieldDef pRd, i
return ctx;
}

public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 25;
public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 5;
public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout
public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations
public static final int DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS = 100;
Expand Down Expand Up @@ -165,7 +165,6 @@ private static StreamContext constructStreamContext(FieldDef rd, FieldDef pRd, i
private String rowServiceVersion = "";

private Span fileReadSpan = null;
private String traceContextHeader = null;

private Span readRequestSpan = null;
private int readRequestCount = 0;
Expand Down Expand Up @@ -472,7 +471,6 @@ public RowServiceInputStream(StreamContext context, DataPartition dp, RestartInf
if (context.fileReadSpan != null && context.fileReadSpan.getSpanContext().isValid())
{
this.fileReadSpan = context.fileReadSpan;
this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileReadSpan);
}

int copycount = dataPart.getCopyCount();
Expand Down Expand Up @@ -1038,7 +1036,7 @@ private int startFetch()
if (inFetchingMode)
{
if (this.simulateFail) this.handle = -1;
readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest();
readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest(readRequestSpan) : this.makeHandleRequest(readRequestSpan);

try
{
Expand Down Expand Up @@ -1118,7 +1116,7 @@ private int startFetch()
{
inTokenRetry = true;

String retryTrans = this.makeTokenRequest();
String retryTrans = this.makeTokenRequest(readRequestSpan);
int len = retryTrans.length();
try
{
Expand Down Expand Up @@ -1345,7 +1343,7 @@ private void finishFetch()

// Create the read ahead request
if (this.simulateFail) this.handle = -1;
String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest();
String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest(readRequestSpan) : this.makeHandleRequest(readRequestSpan);

try
{
Expand Down Expand Up @@ -2002,7 +2000,7 @@ private void makeActive() throws HpccFileException

try
{
String msg = makeGetVersionRequest();
String msg = makeGetVersionRequest(versionSpan);
int msgLen = msg.length();

this.dos.writeInt(msgLen);
Expand Down Expand Up @@ -2069,11 +2067,11 @@ private void makeActive() throws HpccFileException
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
readTrans = makeInitialRequest(readRequestSpan);
}
else
{
readTrans = makeTokenRequest();
readTrans = makeTokenRequest(readRequestSpan);
}

int transLen = readTrans.length();
Expand Down Expand Up @@ -2395,15 +2393,25 @@ private void makeFetchObject(StringBuilder sb)
}
}

private String makeGetVersionRequest()
private String makeGetVersionRequest(Span versionSpan)
{
String traceContextHeader = null;
if (versionSpan != null) {
traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(versionSpan);
}

final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : "";
final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }";
return versionMsg;
}

private String makeInitialRequest()
private String makeInitialRequest(Span span)
{
String traceContextHeader = null;
if (span != null) {
traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span);
}

StringBuilder sb = new StringBuilder(256);

sb.append(RFCCodes.RFCStreamReadCmd);
Expand Down Expand Up @@ -2478,8 +2486,13 @@ private String makeNodeObject()
return sb.toString();
}

private String makeHandleRequest()
private String makeHandleRequest(Span span)
{
String traceContextHeader = null;
if (span != null) {
traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span);
}

StringBuilder sb = new StringBuilder(256);

sb.append(RFCCodes.RFCStreamReadCmd);
Expand All @@ -2505,8 +2518,13 @@ private String makeHandleRequest()
return sb.toString();
}

private String makeTokenRequest()
private String makeTokenRequest(Span span)
{
String traceContextHeader = null;
if (span != null) {
traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span);
}

StringBuilder sb = new StringBuilder(256);

sb.append(RFCCodes.RFCStreamReadCmd);
Expand All @@ -2529,8 +2547,13 @@ private String makeTokenRequest()
return sb.toString();
}

private String makeCloseHandleRequest()
private String makeCloseHandleRequest(Span span)
{
String traceContextHeader = null;
if (span != null) {
traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span);
}

StringBuilder sb = new StringBuilder(256);

sb.append("{ \"format\" : \"binary\",\n");
Expand Down Expand Up @@ -2562,7 +2585,7 @@ private void sendCloseFileRequest() throws IOException
closeSpan.setStatus(StatusCode.OK);
}

String closeFileRequest = makeCloseHandleRequest();
String closeFileRequest = makeCloseHandleRequest(closeSpan);
int jsonRequestLen = closeFileRequest.length();

try
Expand Down

0 comments on commit a0ad6e3

Please sign in to comment.