diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index 5108c19ab..be9d5f0b4 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -14,7 +14,11 @@ import org.hpccsystems.commons.ecl.FieldDef; import org.hpccsystems.commons.ecl.RecordDefinitionTranslator; + import org.hpccsystems.dfs.client.RowServiceOutputStream; +import org.hpccsystems.dfs.client.Utils; + +import io.opentelemetry.api.trace.Span; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -37,6 +41,9 @@ public class HPCCRemoteFileWriter<T> private long recordsWritten = 0; private long openTimeMs = 0; + private Span writeSpan = null; + private String writeSpanName = null; + /** * A remote file writer. * @@ -105,9 +112,12 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso this.recordAccessor = recordAccessor; + this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart(); + this.writeSpan = Utils.createRootSpan(writeSpanName); + this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(), dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), - fileCompression, connectTimeoutMs, socketOpTimeoutMs); + fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan); this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream); this.binaryRecordWriter.initialize(this.recordAccessor); @@ -161,6 +171,8 @@ public void close() throws Exception this.report(); this.binaryRecordWriter.finalize(); + this.writeSpan.end(); + long closeTimeMs = System.currentTimeMillis(); double writeTimeS = (closeTimeMs - openTimeMs) / 1000.0; log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart() diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index a1d161fe3..2cddf9299 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -12,9 +12,14 @@ *******************************************************************************/ package org.hpccsystems.dfs.client; +import org.hpccsystems.dfs.client.Utils; + import org.hpccsystems.commons.ecl.FieldDef; import org.hpccsystems.commons.ecl.RecordDefinitionTranslator; import org.hpccsystems.commons.errors.HpccFileException; + +import io.opentelemetry.api.trace.Span; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -48,6 +53,9 @@ public class HpccRemoteFileReader<T> implements Iterator<T> private long openTimeMs = 0; private long recordsRead = 0; + private Span readSpan = null; + private String readSpanName = null; + public static final int NO_RECORD_LIMIT = -1; public static final int DEFAULT_READ_SIZE_OPTION = -1; public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1; @@ -204,6 +212,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde this.createPrefetchThread = createPrefetchThread; this.socketOpTimeoutMs = socketOpTimeoutMs; + this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart(); + this.readSpan = Utils.createRootSpan(readSpanName); + if (connectTimeout < 1) { connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS; @@ -212,18 +223,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (this.originalRecordDef == null) { + this.readSpan.end(); throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required."); } FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition(); if (projectedRecordDefinition == null) { + this.readSpan.end(); throw new Exception("IRecordBuilder does not have a valid record definition."); } if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs); + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -238,10 +251,12 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde restartInfo.streamPos = resumeInfo.inputStreamPos; restartInfo.tokenBin = resumeInfo.tokenBin; - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs); + this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan); + long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { + this.readSpan.end(); throw new Exception("Unable to restart unexpected stream pos in record reader."); } this.inputStream.skip(bytesToSkip); @@ -279,9 +294,11 @@ private boolean retryRead() try { + this.readSpan = Utils.createRootSpan(readSpanName); this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, - this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs); + this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan); + long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) { @@ -294,6 +311,7 @@ private boolean retryRead() } catch (Exception e) { + this.readSpan.end(); log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e); return false; } @@ -499,7 +517,9 @@ public void close() throws Exception return; } + this.readSpan.end(); report(); + this.inputStream.close(); isClosed = true; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 2399ef1e6..15c58bab5 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -41,6 +41,15 @@ import org.hpccsystems.commons.network.Network; import org.hpccsystems.generated.CompileTimeConstants; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.ExceptionAttributes; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.ServiceAttributes; + /** * An input stream that uses the row service provided by the HPCC platform to read a particular file part. */ @@ -63,6 +72,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable private java.io.DataOutputStream dos = null; private String rowServiceVersion = ""; + private Span readSpan = null; + private String readSpanTraceID = ""; + private int filePartCopyIndexPointer = 0; //pointer into the prioritizedCopyIndexes struct private List<Integer> prioritizedCopyIndexes = new ArrayList<Integer>(); @@ -296,7 +308,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co */ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching) throws Exception { - this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS); + this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS, null); } /** @@ -325,7 +337,42 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co * @throws Exception * general exception */ - public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception + public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, + RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception + { + this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, socketOpTimeoutMS, null); + } + + /** + * A plain socket connect to a THOR node for remote read + * + * @param dp + * the data partition to read + * @param rd + * the JSON definition for the read input and output + * @param pRd + * the projected record definition + * @param connectTimeout + * the connection timeout in milliseconds + * @param limit + * the record limit to use for reading the dataset. -1 implies no limit + * @param createPrefetchThread + * Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally + * @param maxReadSizeInKB + * max readsize in kilobytes + * @param restartInfo + * information used to restart a read from a particular stream position + * @param isFetching + * Will this input stream be used to serviced batched fetch requests + * @param socketOpTimeoutMS + * Socket (read / write) operation timeout in milliseconds + * @param readSpan + * OpenTelemetry span to use for tracing + * @throws Exception + * general exception + */ + public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, + RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, Span readSpan) throws Exception { this.recordDefinition = rd; this.projectedRecordDefinition = pRd; @@ -344,6 +391,13 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.dataPart = dp; + this.readSpan = readSpan; + if (readSpan != null) + { + this.readSpanTraceID = "00-" + readSpan.getSpanContext().getTraceId() + + "-" + readSpan.getSpanContext().getSpanId() + "-00"; + } + int copycount = dataPart.getCopyCount(); for (int index = 0; index < copycount; index++) { @@ -369,7 +423,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co this.tokenBin = restartInfo.tokenBin; this.streamPos = restartInfo.streamPos; this.streamPosOfFetchStart = this.streamPos; - } + } String prefix = "RowServiceInputStream constructor, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; if (inFetchingMode == false) @@ -389,7 +443,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co } catch (Exception e) { - prefetchException = new HpccFileException(prefix + "Error while batch fetch warm starting: " + e.getMessage()); + setPrefetchException(new HpccFileException(prefix + "Error while batch fetch warm starting: " + e.getMessage())); } blockingRequestFinished.set(true); @@ -426,6 +480,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co if (prefetchException != null) { + // Will already be recorded in span throw prefetchException; } } @@ -494,6 +549,20 @@ RestartInformation getRestartInformationForStreamPos(long streamPos) return restartInfo; } + + private void setPrefetchException(HpccFileException e) + { + this.prefetchException = e; + + if (readSpan != null) + { + Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + ExceptionAttributes.EXCEPTION_MESSAGE, e.getMessage()); + readSpan.recordException(e, attributes); + } + } + private boolean setNextFilePartCopy() { if (filePartCopyIndexPointer + 1 >= prioritizedCopyIndexes.size()) return false; @@ -654,7 +723,15 @@ public void startBlockingFetchRequest(List<Long> fetchOffsets) throws Exception { if (inFetchingMode == false) { - throw new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode."); + Exception wrappedException = new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode."); + if (readSpan != null) + { + Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); + readSpan.recordException(wrappedException, attributes); + } + throw wrappedException; } // Clear stream information, but keep streamPos & markPos as they are due to potential wrapping counting streams @@ -736,6 +813,14 @@ private int startFetch() } String prefix = "RowServiceInputStream.startFetch(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + if (readSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + AttributeKey.longKey("read.offset"), streamPos, + AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); + readSpan.addEvent("RowServiceInputStream.readRequest", attributes); + } //------------------------------------------------------------------------------ // If we haven't made the connection active, activate it now and send the @@ -755,7 +840,7 @@ private int startFetch() } catch (HpccFileException e) { - prefetchException = e; + setPrefetchException(e); try { close(); @@ -781,7 +866,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); + setPrefetchException(new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e)); try { close(); @@ -805,7 +890,7 @@ private int startFetch() } catch (HpccFileException e) { - prefetchException = e; + setPrefetchException(e); try { close(); @@ -816,7 +901,7 @@ private int startFetch() if (response.errorCode != RFCCodes.RFCStreamNoError) { - prefetchException = new HpccFileException(prefix + response.errorMessage); + setPrefetchException(new HpccFileException(prefix + response.errorMessage)); try { close(); @@ -836,7 +921,7 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException(prefix + "response length was < 0; error closing file:" + e.getMessage()); + setPrefetchException(new HpccFileException(prefix + "response length was < 0; error closing file:" + e.getMessage())); } return -1; } @@ -860,13 +945,13 @@ private int startFetch() } catch (IOException e) { - prefetchException = new HpccFileException(prefix + "Failed on remote read read retry:" + e.getMessage(), e); + setPrefetchException(new HpccFileException(prefix + "Failed on remote read read retry:" + e.getMessage(), e)); return -1; } } else if (this.handle == 0) { - prefetchException = new HpccFileException(prefix + "response.handle was null, Read retry failed"); + setPrefetchException(new HpccFileException(prefix + "response.handle was null, Read retry failed")); try { close(); @@ -900,7 +985,7 @@ else if (this.handle == 0) } catch (IOException e) { - prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); + setPrefetchException(new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e)); try { close(); @@ -951,7 +1036,15 @@ private void readDataInFetch() bytesToRead = this.dis.available(); if (bytesToRead < 0) { - throw new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); + IOException wrappedException = new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes."); + if (readSpan != null) + { + Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); + readSpan.recordException(wrappedException, attributes); + } + throw wrappedException; } // Either due to a bug in the JVM or due to a design issue @@ -969,7 +1062,7 @@ private void readDataInFetch() } catch (IOException e) { - prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e); + setPrefetchException(new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e)); try { close(); @@ -1030,7 +1123,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException(prefix + "Error during finish request read block: " + e.getMessage(), e); + setPrefetchException(new HpccFileException(prefix + "Error during finish request read block: " + e.getMessage(), e)); try { close(); @@ -1038,12 +1131,30 @@ private void finishFetch() catch(Exception ie){} } + if (readSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest)); + readSpan.addEvent("RowServiceInputStream.readResponse", attributes); + } + //------------------------------------------------------------------------------ // Send read ahead request //------------------------------------------------------------------------------ if (inFetchingMode == false) { + if (readSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + AttributeKey.longKey("read.offset"), streamPos, + AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000)); + readSpan.addEvent("RowServiceInputStream.readRequest", attributes); + } + + // Create the read ahead request if (this.simulateFail) this.handle = -1; String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest(); @@ -1057,7 +1168,7 @@ private void finishFetch() } catch (IOException e) { - prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e); + setPrefetchException(new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e)); try { close(); @@ -1222,7 +1333,15 @@ public int available() throws IOException if (availBytes == 0) { // this.bufferWriteMutex.release(); - throw new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); + IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); + if (readSpan != null) + { + Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); + readSpan.recordException(wrappedException, attributes); + } + throw wrappedException; } } @@ -1254,6 +1373,7 @@ public void close() throws IOException catch(Exception e){} } + this.sendCloseFileRequest(); this.dos.close(); @@ -1558,6 +1678,13 @@ private void makeActive() throws HpccFileException this.handle = 0; String prefix = "RowServiceInputStream.makeActive, file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; + if (readSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort())); + readSpan.addEvent("RowServiceInputStream.connect", attributes); + } + boolean needsRetry = false; do { @@ -1625,6 +1752,13 @@ private void makeActive() throws HpccFileException // Check protocol version //------------------------------------------------------------------------------ + if (readSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort())); + readSpan.addEvent("RowServiceInputStream.versionRequest", attributes); + } + try { String msg = makeGetVersionRequest(); @@ -1659,6 +1793,14 @@ private void makeActive() throws HpccFileException } rowServiceVersion = new String(versionBytes, HPCCCharSet); + + if (readSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + ServiceAttributes.SERVICE_VERSION, rowServiceVersion); + readSpan.addEvent("RowServiceInputStream.versionResponse", attributes); + } } //------------------------------------------------------------------------------ @@ -1986,7 +2128,8 @@ private void makeFetchObject(StringBuilder sb) private String makeGetVersionRequest() { - final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", \"format\": \"binary\" }"; + final String trace = readSpan != null ? "\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n" : ""; + final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }"; return versionMsg; } @@ -1998,6 +2141,11 @@ private String makeInitialRequest() sb.append("{ \"format\" : \"binary\", \n"); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); + if (readSpan != null) + { + sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n"); + } + if (!useOldProtocol) { sb.append("\"command\" : \"newstream\", \n"); @@ -2066,6 +2214,11 @@ private String makeHandleRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); + if (readSpan != null) + { + sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n"); + } + if (!useOldProtocol) { sb.append("\"command\" : \"continue\", \n"); @@ -2089,6 +2242,11 @@ private String makeTokenRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n"); + if (readSpan != null) + { + sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n"); + } + if (!useOldProtocol) { sb.append("\"command\" : \"newstream\", \n"); @@ -2108,6 +2266,12 @@ private String makeCloseHandleRequest() sb.append("{ \"format\" : \"binary\",\n"); sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\","); + + if (readSpan != null) + { + sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n"); + } + sb.append(" \"command\" : \"close\""); sb.append("\n}"); @@ -2145,7 +2309,15 @@ private void sendCloseFileRequest() throws IOException } catch (HpccFileException e) { - throw new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); + IOException wrappedException = new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e); + if (readSpan != null) + { + Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(), + ServerAttributes.SERVER_PORT, Long.valueOf(getPort()), + ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage()); + readSpan.recordException(wrappedException, attributes); + } + throw wrappedException; } } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index be85d89e8..4f1cc8282 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -27,6 +27,11 @@ import javax.net.ssl.SSLSocketFactory; import org.json.JSONObject; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.semconv.ServerAttributes; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.hpccsystems.commons.ecl.RecordDefinitionTranslator; @@ -62,6 +67,9 @@ public class RowServiceOutputStream extends OutputStream private long handle = -1; private ByteBuffer scratchBuffer = ByteBuffer.allocate(SCRATCH_BUFFER_LEN); + private Span writeSpan = null; + private String writeSpanTraceID = ""; + private static class RowServiceResponse { int len = 0; @@ -154,7 +162,7 @@ private static class RowServiceResponse RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath, CompressionAlgorithm fileCompression, int connectTimeoutMs) throws Exception { - this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, DEFAULT_SOCKET_OP_TIMEOUT_MS); + this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, DEFAULT_SOCKET_OP_TIMEOUT_MS, null); } /** @@ -185,6 +193,40 @@ private static class RowServiceResponse */ RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath, CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS) throws Exception + { + this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, sockOpTimeoutMS, null); + } + + /** + * Creates RowServiceOutputStream to be used to stream data to target dafilesrv on HPCC cluster. + * + * @param ip + * the ip + * @param port + * the port + * @param useSSL + * the use SSL + * @param accessToken + * the access token + * @param recordDef + * the record def + * @param filePartIndex + * the file part index + * @param filePartPath + * the file part path + * @param fileCompression + * the file compression + * @param connectTimeoutMs + * the socket connect timeout in ms (default is 5000) + * @param socketOpTimeoutMS + * the socket operation(read/write) timeout in ms (default is 15000) + * @param writeSpan + * the opentelemetry span to use for tracing + * @throws Exception + * the exception + */ + RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath, + CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS, Span writeSpan) throws Exception { this.rowServiceIP = ip; this.rowServicePort = port; @@ -194,6 +236,14 @@ private static class RowServiceResponse this.accessToken = accessToken; this.compressionAlgo = fileCompression; this.sockOpTimeoutMs = sockOpTimeoutMS; + this.writeSpan = writeSpan; + + if (this.writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.addEvent("RowServiceOutputStream.connect", attributes); + } try { @@ -236,13 +286,29 @@ private static class RowServiceResponse { String errorMessage = "Exception occured while attempting to connect to row service (" + rowServiceIP + ":" + rowServicePort + "): " + e.getMessage(); log.error(errorMessage); - throw new Exception(errorMessage); + + Exception wrappedException = new Exception(errorMessage, e); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } //------------------------------------------------------------------------------ // Check protocol version //------------------------------------------------------------------------------ + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.addEvent("RowServiceOutputStream.versionRequest", attributes); + } + try { String msg = makeGetVersionRequest(); @@ -254,7 +320,15 @@ private static class RowServiceResponse } catch (IOException e) { - throw new HpccFileException("Failed on initial remote read read trans", e); + HpccFileException wrappedException = new HpccFileException("Failed on initial remote read read trans", e); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } RowServiceResponse response = readResponse(); @@ -273,7 +347,15 @@ private static class RowServiceResponse } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + HpccFileException wrappedException = new HpccFileException("Error while attempting to read version response.", e); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1); @@ -287,7 +369,8 @@ private static class RowServiceResponse private String makeGetVersionRequest() { - final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", \"format\": \"binary\" }"; + final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : ""; + final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }"; return versionMsg; } @@ -295,8 +378,10 @@ private void makeInitialWriteRequest() throws Exception { String jsonRecordDef = RecordDefinitionTranslator.toJsonRecord(this.recordDef).toString(); + final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : ""; String initialRequest = "\n{\n" + " \"format\" : \"binary\",\n" + + trace + " \"replyLimit\" : " + SCRATCH_BUFFER_LEN + ",\n" + (useOldProtocol ? "" : "\"command\" : \"newstream\",\n") + " \"node\" : {\n" @@ -336,16 +421,27 @@ private void makeInitialWriteRequest() throws Exception if (response.errorCode != RFCCodes.RFCStreamNoError) { - throw new IOException(response.errorMessage); + IOException wrappedException = new IOException(response.errorMessage); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } } private String makeCloseHandleRequest() { + final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : ""; + StringBuilder sb = new StringBuilder(256); sb.delete(0, sb.length()); sb.append("{ \"format\" : \"binary\",\n"); + sb.append(trace); sb.append(" \"handle\" : \"" + Long.toString(this.handle) + "\","); sb.append(" \"command\" : \"close\""); sb.append("\n}"); @@ -373,7 +469,15 @@ private void sendCloseFileRequest() throws IOException } catch (IOException e) { - throw new IOException("Failed on close file with error: ", e); + IOException wrappedException = new IOException("Failed on close file with error: ", e); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } RowServiceResponse response = null; @@ -383,12 +487,28 @@ private void sendCloseFileRequest() throws IOException } catch (HpccFileException e) { - throw new IOException("Failed to close file. Unable to read response with error: ", e); + IOException wrappedException = new IOException("Failed to close file. Unable to read response with error: ", e); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } if (response.errorCode != RFCCodes.RFCStreamNoError) { - throw new IOException(response.errorMessage); + IOException wrappedException = new IOException(response.errorMessage); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } } @@ -451,7 +571,15 @@ private RowServiceResponse readResponse() throws HpccFileException if (response.len < 4) { - throw new HpccFileException("Early data termination, no handle"); + HpccFileException wrappedException = new HpccFileException("Early data termination, no handle"); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } response.handle = dis.readInt(); @@ -459,7 +587,15 @@ private RowServiceResponse readResponse() throws HpccFileException } catch (IOException e) { - throw new HpccFileException("Error while attempting to read row service response: ", e); + HpccFileException wrappedException = new HpccFileException("Error while attempting to read row service response: ", e); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } return response; @@ -480,7 +616,15 @@ public void close() throws IOException } else if (bytesWritten == 0 && compressionAlgo != CompressionAlgorithm.NONE) { - throw new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster."); + IOException wrappedException = new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster."); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } this.socket.close(); @@ -513,7 +657,10 @@ public void write(byte[] b) throws IOException */ public void write(byte[] b, int off, int len) throws IOException { + final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : ""; + String request = "{ \"format\" : \"binary\", \"handle\" : \"" + this.handle + "\"" + + trace + (useOldProtocol ? "" : ", \"command\" : \"continue\"") + " }"; byte[] jsonRequestData = request.getBytes("ISO-8859-1"); @@ -547,12 +694,28 @@ public void write(byte[] b, int off, int len) throws IOException } catch (HpccFileException e) { - throw new IOException("Failed during write operation. Unable to read response with error: ", e); + IOException wrappedException = new IOException("Failed during write operation. Unable to read response with error: ", e); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } if (response.errorCode != RFCCodes.RFCStreamNoError) { - throw new IOException(response.errorMessage); + IOException wrappedException = new IOException(response.errorMessage); + if (writeSpan != null) + { + Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP, + ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort)); + writeSpan.recordException(wrappedException, attributes); + } + + throw wrappedException; } } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java new file mode 100644 index 000000000..377cca974 --- /dev/null +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java @@ -0,0 +1,43 @@ +/******************************************************************************* + * HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + *******************************************************************************/ +package org.hpccsystems.dfs.client; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; + +public class Utils +{ + public static Tracer getTelemetryTracer() + { + return GlobalOpenTelemetry.get().getTracer("DFSClient"); + } + + public static Span createRootSpan(String name) + { + return Utils.getTelemetryTracer().spanBuilder(name) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } + + public static Span createChildSpan(Span parentSpan, String name) + { + return Utils.getTelemetryTracer().spanBuilder(name) + .setParent(Context.current().with(parentSpan)) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } + +}