diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 3226701d8..8d9bf23b6 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -165,7 +165,6 @@ private static StreamContext constructStreamContext(FieldDef rd, FieldDef pRd, i private String rowServiceVersion = ""; private Span fileReadSpan = null; - private String traceContextHeader = null; private Span readRequestSpan = null; private int readRequestCount = 0; @@ -472,7 +471,6 @@ public RowServiceInputStream(StreamContext context, DataPartition dp, RestartInf if (context.fileReadSpan != null && context.fileReadSpan.getSpanContext().isValid()) { this.fileReadSpan = context.fileReadSpan; - this.traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(fileReadSpan); } int copycount = dataPart.getCopyCount(); @@ -1038,7 +1036,7 @@ private int startFetch() if (inFetchingMode) { if (this.simulateFail) this.handle = -1; - readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest(); + readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest(readRequestSpan) : this.makeHandleRequest(readRequestSpan); try { @@ -1118,7 +1116,7 @@ private int startFetch() { inTokenRetry = true; - String retryTrans = this.makeTokenRequest(); + String retryTrans = this.makeTokenRequest(readRequestSpan); int len = retryTrans.length(); try { @@ -1345,7 +1343,7 @@ private void finishFetch() // Create the read ahead request if (this.simulateFail) this.handle = -1; - String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest(); + String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest(readRequestSpan) : this.makeHandleRequest(readRequestSpan); try { @@ -2002,7 +2000,7 @@ private void makeActive() throws HpccFileException try { - String msg = makeGetVersionRequest(); + String msg = makeGetVersionRequest(versionSpan); int msgLen = msg.length(); this.dos.writeInt(msgLen); @@ -2069,11 +2067,11 @@ private void makeActive() throws HpccFileException if (this.tokenBin == null) { this.tokenBin = new byte[0]; - readTrans = makeInitialRequest(); + readTrans = makeInitialRequest(readRequestSpan); } else { - readTrans = makeTokenRequest(); + readTrans = makeTokenRequest(readRequestSpan); } int transLen = readTrans.length(); @@ -2395,14 +2393,15 @@ private void makeFetchObject(StringBuilder sb) } } - private String makeGetVersionRequest() + private String makeGetVersionRequest(Span versionSpan) { + String traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(versionSpan); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }"; return versionMsg; } - private String makeInitialRequest() + private String makeInitialRequest(Span span) { StringBuilder sb = new StringBuilder(256); @@ -2410,6 +2409,7 @@ private String makeInitialRequest() sb.append("{ \"format\" : \"binary\", \n"); sb.append("\"replyLimit\" : " + this.initialReadSizeKB + ",\n"); + String traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace); @@ -2478,7 +2478,7 @@ private String makeNodeObject() return sb.toString(); } - private String makeHandleRequest() + private String makeHandleRequest(Span span) { StringBuilder sb = new StringBuilder(256); @@ -2487,6 +2487,7 @@ private String makeHandleRequest() sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); + String traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace); @@ -2505,7 +2506,7 @@ private String makeHandleRequest() return sb.toString(); } - private String makeTokenRequest() + private String makeTokenRequest(Span span) { StringBuilder sb = new StringBuilder(256); @@ -2513,6 +2514,7 @@ private String makeTokenRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); + String traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace); @@ -2529,13 +2531,14 @@ private String makeTokenRequest() return sb.toString(); } - private String makeCloseHandleRequest() + private String makeCloseHandleRequest(Span span) { StringBuilder sb = new StringBuilder(256); sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); + String traceContextHeader = org.hpccsystems.ws.client.utils.Utils.getTraceParentHeader(span); final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : ""; sb.append(trace); @@ -2562,7 +2565,7 @@ private void sendCloseFileRequest() throws IOException closeSpan.setStatus(StatusCode.OK); } - String closeFileRequest = makeCloseHandleRequest(); + String closeFileRequest = makeCloseHandleRequest(closeSpan); int jsonRequestLen = closeFileRequest.length(); try