From 7cb1166e61ab4a6f703dcf80b12a015e996e2966 Mon Sep 17 00:00:00 2001
From: James McMullan <jpmcmu@gmail.com>
Date: Fri, 14 Jun 2024 09:18:42 -0400
Subject: [PATCH] HPCC4J-611 Add OpenTelemetry tracing to dfsclient

- Added tracing support to read pathways
- Added tracing support to write pathways

Signed-off-by: James McMullan James.McMullan@lexisnexis.com
---
 .../dfs/client/HPCCRemoteFileWriter.java      |  14 +-
 .../dfs/client/HpccRemoteFileReader.java      |  26 ++-
 .../dfs/client/RowServiceInputStream.java     | 212 ++++++++++++++++--
 .../dfs/client/RowServiceOutputStream.java    | 191 ++++++++++++++--
 .../org/hpccsystems/dfs/client/Utils.java     |  43 ++++
 5 files changed, 448 insertions(+), 38 deletions(-)
 create mode 100644 dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java

diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java
index 5108c19ab..be9d5f0b4 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java
@@ -14,7 +14,11 @@
 
 import org.hpccsystems.commons.ecl.FieldDef;
 import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
+
 import org.hpccsystems.dfs.client.RowServiceOutputStream;
+import org.hpccsystems.dfs.client.Utils;
+
+import io.opentelemetry.api.trace.Span;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
@@ -37,6 +41,9 @@ public class HPCCRemoteFileWriter<T>
     private long                   recordsWritten     = 0;
     private long                   openTimeMs         = 0;
 
+    private Span                   writeSpan          = null;
+    private String                 writeSpanName      = null;
+
     /**
      * A remote file writer.
      *
@@ -105,9 +112,12 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso
 
         this.recordAccessor = recordAccessor;
 
+        this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart();
+        this.writeSpan = Utils.createRootSpan(writeSpanName);
+
         this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(),
                 dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0),
-                fileCompression, connectTimeoutMs, socketOpTimeoutMs);
+                fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan);
 
         this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream);
         this.binaryRecordWriter.initialize(this.recordAccessor);
@@ -161,6 +171,8 @@ public void close() throws Exception
         this.report();
         this.binaryRecordWriter.finalize();
 
+        this.writeSpan.end();
+
         long closeTimeMs = System.currentTimeMillis();
         double writeTimeS = (closeTimeMs -  openTimeMs) / 1000.0;
         log.info("HPCCRemoteFileWriter: Closing file part: " + dataPartition.getThisPart()
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java
index a1d161fe3..2cddf9299 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java
@@ -12,9 +12,14 @@
  *******************************************************************************/
 package org.hpccsystems.dfs.client;
 
+import org.hpccsystems.dfs.client.Utils;
+
 import org.hpccsystems.commons.ecl.FieldDef;
 import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
 import org.hpccsystems.commons.errors.HpccFileException;
+
+import io.opentelemetry.api.trace.Span;
+
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
 
@@ -48,6 +53,9 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
     private long                  openTimeMs        = 0;
     private long                  recordsRead       = 0;
 
+    private Span                  readSpan          = null;
+    private String                readSpanName      = null;
+
     public static final int    NO_RECORD_LIMIT                  = -1;
     public static final int    DEFAULT_READ_SIZE_OPTION         = -1;
     public static final int    DEFAULT_CONNECT_TIMEOUT_OPTION   = -1;
@@ -204,6 +212,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
         this.createPrefetchThread = createPrefetchThread;
         this.socketOpTimeoutMs = socketOpTimeoutMs;
 
+        this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart();
+        this.readSpan = Utils.createRootSpan(readSpanName);
+
         if (connectTimeout < 1)
         {
             connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS;
@@ -212,18 +223,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
 
         if (this.originalRecordDef == null)
         {
+            this.readSpan.end();
             throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
         }
 
         FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition();
         if (projectedRecordDefinition == null)
         {
+            this.readSpan.end();
             throw new Exception("IRecordBuilder does not have a valid record definition.");
         }
 
         if (resumeInfo == null)
         {
-            this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs);
+            this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan);
             this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
             this.binaryRecordReader.initialize(this.recordBuilder);
 
@@ -238,10 +251,12 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
             restartInfo.streamPos = resumeInfo.inputStreamPos;
             restartInfo.tokenBin = resumeInfo.tokenBin;
 
-            this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs);
+            this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan);
+
             long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
             if (bytesToSkip < 0)
             {
+                this.readSpan.end();
                 throw new Exception("Unable to restart unexpected stream pos in record reader.");
             }
             this.inputStream.skip(bytesToSkip);
@@ -279,9 +294,11 @@ private boolean retryRead()
 
             try
             {
+                this.readSpan = Utils.createRootSpan(readSpanName);
                 this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
                         this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
-                        this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
+                        this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan);
+
                 long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
                 if (bytesToSkip < 0)
                 {
@@ -294,6 +311,7 @@ private boolean retryRead()
             }
             catch (Exception e)
             {
+                this.readSpan.end();
                 log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
                 return false;
             }
@@ -499,7 +517,9 @@ public void close() throws Exception
             return;
         }
 
+        this.readSpan.end();
         report();
+
         this.inputStream.close();
         isClosed = true;
 
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java
index 2399ef1e6..15c58bab5 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java
@@ -41,6 +41,15 @@
 import org.hpccsystems.commons.network.Network;
 import org.hpccsystems.generated.CompileTimeConstants;
 
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.semconv.ExceptionAttributes;
+import io.opentelemetry.semconv.ServerAttributes;
+import io.opentelemetry.semconv.ServiceAttributes;
+
 /**
  * An input stream that uses the row service provided by the HPCC platform to read a particular file part.
  */
@@ -63,6 +72,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable
     private java.io.DataOutputStream dos = null;
     private String                   rowServiceVersion = "";
 
+    private Span                     readSpan = null;
+    private String                   readSpanTraceID = "";
+
     private int                      filePartCopyIndexPointer = 0;  //pointer into the prioritizedCopyIndexes struct
     private List<Integer>            prioritizedCopyIndexes = new ArrayList<Integer>();
 
@@ -296,7 +308,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
      */
     public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching) throws Exception
     {
-        this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS);
+        this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, DEFAULT_SOCKET_OP_TIMEOUT_MS, null);
     }
 
     /**
@@ -325,7 +337,42 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
      * @throws Exception
      *            general exception
      */
-    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception
+    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB,
+                                RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS) throws Exception
+    {
+        this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restartInfo, isFetching, socketOpTimeoutMS, null);
+    }
+
+    /**
+     * A plain socket connect to a THOR node for remote read
+     *
+     * @param dp
+     *            the data partition to read
+     * @param rd
+     *            the JSON definition for the read input and output
+     * @param pRd
+     *            the projected record definition
+     * @param connectTimeout
+     *               the connection timeout in milliseconds
+     * @param limit
+     *            the record limit to use for reading the dataset. -1 implies no limit
+     * @param createPrefetchThread
+     *            Wether or not this inputstream should handle prefetching itself or if prefetch will be called externally
+     * @param maxReadSizeInKB
+     *            max readsize in kilobytes
+     * @param restartInfo
+     *            information used to restart a read from a particular stream position
+     * @param isFetching
+     *            Will this input stream be used to serviced batched fetch requests
+     * @param socketOpTimeoutMS
+     *            Socket (read / write) operation timeout in milliseconds
+     * @param readSpan
+     *            OpenTelemetry span to use for tracing
+     * @throws Exception
+     *            general exception
+     */
+    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB,
+                                RestartInformation restartInfo, boolean isFetching, int socketOpTimeoutMS, Span readSpan) throws Exception
     {
         this.recordDefinition = rd;
         this.projectedRecordDefinition = pRd;
@@ -344,6 +391,13 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
 
         this.dataPart = dp;
 
+        this.readSpan = readSpan;
+        if (readSpan != null)
+        {
+            this.readSpanTraceID = "00-" + readSpan.getSpanContext().getTraceId()
+                                + "-" + readSpan.getSpanContext().getSpanId() + "-00";
+        }
+
         int copycount = dataPart.getCopyCount();
         for (int index = 0; index < copycount; index++)
         {
@@ -369,7 +423,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
             this.tokenBin = restartInfo.tokenBin;
             this.streamPos = restartInfo.streamPos;
             this.streamPosOfFetchStart = this.streamPos;
-        }   
+        }
         String prefix = "RowServiceInputStream constructor, file "  + dataPart.getFileName() +  " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
 
         if (inFetchingMode == false)
@@ -389,7 +443,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
                 }
                 catch (Exception e)
                 {
-                    prefetchException = new HpccFileException(prefix + "Error while batch fetch warm starting: " + e.getMessage());
+                    setPrefetchException(new HpccFileException(prefix + "Error while batch fetch warm starting: " + e.getMessage()));
                 }
 
                 blockingRequestFinished.set(true);
@@ -426,6 +480,7 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
 
             if (prefetchException != null)
             {
+                // Will already be recorded in span
                 throw prefetchException;
             }
         }
@@ -494,6 +549,20 @@ RestartInformation getRestartInformationForStreamPos(long streamPos)
         return restartInfo;
     }
 
+
+    private void setPrefetchException(HpccFileException e)
+    {
+        this.prefetchException = e;
+
+        if (readSpan != null)
+        {
+            Attributes attributes = Attributes.of(  ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                    ExceptionAttributes.EXCEPTION_MESSAGE, e.getMessage());
+            readSpan.recordException(e, attributes);
+        }
+    }
+
     private boolean setNextFilePartCopy()
     {
         if (filePartCopyIndexPointer + 1 >= prioritizedCopyIndexes.size()) return false;
@@ -654,7 +723,15 @@ public void startBlockingFetchRequest(List<Long> fetchOffsets) throws Exception
     {
         if (inFetchingMode == false)
         {
-            throw new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode.");
+            Exception wrappedException = new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode.");
+            if (readSpan != null)
+            {
+                Attributes attributes = Attributes.of(  ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                        ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                        ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
+                readSpan.recordException(wrappedException, attributes);
+            }
+            throw wrappedException;
         }
 
         // Clear stream information, but keep streamPos & markPos as they are due to potential wrapping counting streams
@@ -736,6 +813,14 @@ private int startFetch()
         }
         String prefix = "RowServiceInputStream.startFetch(), file "   + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
 
+        if (readSpan != null)
+        {
+            Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                    AttributeKey.longKey("read.offset"), streamPos,
+                                                    AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000));
+            readSpan.addEvent("RowServiceInputStream.readRequest", attributes);
+        }
 
         //------------------------------------------------------------------------------
         // If we haven't made the connection active, activate it now and send the
@@ -755,7 +840,7 @@ private int startFetch()
             }
             catch (HpccFileException e)
             {
-                prefetchException = e;
+                setPrefetchException(e);
                 try
                 {
                     close();
@@ -781,7 +866,7 @@ private int startFetch()
                 }
                 catch (IOException e)
                 {
-                    prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e);
+                    setPrefetchException(new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e));
                     try
                     {
                         close();
@@ -805,7 +890,7 @@ private int startFetch()
             }
             catch (HpccFileException e)
             {
-                prefetchException = e;
+                setPrefetchException(e);
                 try
                 {
                     close();
@@ -816,7 +901,7 @@ private int startFetch()
 
             if (response.errorCode != RFCCodes.RFCStreamNoError)
             {
-                prefetchException = new HpccFileException(prefix + response.errorMessage);
+                setPrefetchException(new HpccFileException(prefix + response.errorMessage));
                 try
                 {
                     close();
@@ -836,7 +921,7 @@ private int startFetch()
                 }
                 catch (IOException e)
                 {
-                    prefetchException = new HpccFileException(prefix + "response length was < 0; error closing file:" + e.getMessage());
+                    setPrefetchException(new HpccFileException(prefix + "response length was < 0; error closing file:" + e.getMessage()));
                 }
                 return -1;
             }
@@ -860,13 +945,13 @@ private int startFetch()
                 }
                 catch (IOException e)
                 {
-                    prefetchException = new HpccFileException(prefix + "Failed on remote read read retry:" + e.getMessage(), e);
+                    setPrefetchException(new HpccFileException(prefix + "Failed on remote read read retry:" + e.getMessage(), e));
                     return -1;
                 }
             }
             else if (this.handle == 0)
             {
-                prefetchException = new HpccFileException(prefix + "response.handle was null, Read retry failed");
+                setPrefetchException(new HpccFileException(prefix + "response.handle was null, Read retry failed"));
                 try
                 {
                     close();
@@ -900,7 +985,7 @@ else if (this.handle == 0)
         }
         catch (IOException e)
         {
-            prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e);
+            setPrefetchException(new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e));
             try
             {
                 close();
@@ -951,7 +1036,15 @@ private void readDataInFetch()
                 bytesToRead = this.dis.available();
                 if (bytesToRead < 0)
                 {
-                    throw new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes.");
+                    IOException wrappedException = new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes.");
+                    if (readSpan != null)
+                    {
+                        Attributes attributes = Attributes.of(  ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                                ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                                ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
+                        readSpan.recordException(wrappedException, attributes);
+                    }
+                    throw wrappedException;
                 }
 
                 // Either due to a bug in the JVM or due to a design issue
@@ -969,7 +1062,7 @@ private void readDataInFetch()
             }
             catch (IOException e)
             {
-                prefetchException = new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e);
+                setPrefetchException(new HpccFileException(prefix + "Error during read block:" + e.getMessage(), e));
                 try
                 {
                     close();
@@ -1030,7 +1123,7 @@ private void finishFetch()
         }
         catch (IOException e)
         {
-            prefetchException = new HpccFileException(prefix + "Error during finish request read block: " + e.getMessage(), e);
+            setPrefetchException(new HpccFileException(prefix + "Error during finish request read block: " + e.getMessage(), e));
             try
             {
                 close();
@@ -1038,12 +1131,30 @@ private void finishFetch()
             catch(Exception ie){}
         }
 
+        if (readSpan != null)
+        {
+            Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                  ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                  AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest));
+            readSpan.addEvent("RowServiceInputStream.readResponse", attributes);
+        }
+
         //------------------------------------------------------------------------------
         // Send read ahead request
         //------------------------------------------------------------------------------
 
         if (inFetchingMode == false)
         {
+            if (readSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                    AttributeKey.longKey("read.offset"), streamPos,
+                                                    AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000));
+                readSpan.addEvent("RowServiceInputStream.readRequest", attributes);
+            }
+
+
             // Create the read ahead request
             if (this.simulateFail) this.handle = -1;
             String readAheadRequest = (this.forceTokenUse) ? this.makeTokenRequest() : this.makeHandleRequest();
@@ -1057,7 +1168,7 @@ private void finishFetch()
             }
             catch (IOException e)
             {
-                prefetchException = new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e);
+                setPrefetchException(new HpccFileException(prefix + "Failure sending read ahead transaction:" + e.getMessage(), e));
                 try
                 {
                     close();
@@ -1222,7 +1333,15 @@ public int available() throws IOException
             if (availBytes == 0)
             {
                 // this.bufferWriteMutex.release();
-                throw new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0");
+                IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0");
+                if (readSpan != null)
+                {
+                    Attributes attributes = Attributes.of(  ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                            ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                            ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
+                    readSpan.recordException(wrappedException, attributes);
+                }
+                throw wrappedException;
             }
         }
 
@@ -1254,6 +1373,7 @@ public void close() throws IOException
                 catch(Exception e){}
             }
 
+
             this.sendCloseFileRequest();
 
             this.dos.close();
@@ -1558,6 +1678,13 @@ private void makeActive() throws HpccFileException
         this.handle = 0;
         String prefix = "RowServiceInputStream.makeActive, file "  + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";
 
+        if (readSpan != null)
+        {
+            Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(getPort()));
+            readSpan.addEvent("RowServiceInputStream.connect", attributes);
+        }
+
         boolean needsRetry = false;
         do
         {
@@ -1625,6 +1752,13 @@ private void makeActive() throws HpccFileException
                 // Check protocol version
                 //------------------------------------------------------------------------------
 
+                if (readSpan != null)
+                {
+                    Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                            ServerAttributes.SERVER_PORT, Long.valueOf(getPort()));
+                    readSpan.addEvent("RowServiceInputStream.versionRequest", attributes);
+                }
+
                 try
                 {
                     String msg = makeGetVersionRequest();
@@ -1659,6 +1793,14 @@ private void makeActive() throws HpccFileException
                     }
 
                     rowServiceVersion = new String(versionBytes, HPCCCharSet);
+
+                    if (readSpan != null)
+                    {
+                        Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                            ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                            ServiceAttributes.SERVICE_VERSION, rowServiceVersion);
+                        readSpan.addEvent("RowServiceInputStream.versionResponse", attributes);
+                    }
                 }
 
                 //------------------------------------------------------------------------------
@@ -1986,7 +2128,8 @@ private void makeFetchObject(StringBuilder sb)
 
     private String makeGetVersionRequest()
     {
-        final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", \"format\": \"binary\" }";
+        final String trace =  readSpan != null ? "\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n" : "";
+        final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }";
         return versionMsg;
     }
 
@@ -1998,6 +2141,11 @@ private String makeInitialRequest()
         sb.append("{ \"format\" : \"binary\", \n");
         sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n");
 
+        if (readSpan != null)
+        {
+            sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n");
+        }
+
         if (!useOldProtocol)
         {
             sb.append("\"command\" : \"newstream\", \n");
@@ -2066,6 +2214,11 @@ private String makeHandleRequest()
         sb.append("{ \"format\" : \"binary\",\n");
         sb.append("  \"handle\" : \"" + Integer.toString(this.handle) + "\",");
 
+        if (readSpan != null)
+        {
+            sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n");
+        }
+
         if (!useOldProtocol)
         {
             sb.append("\"command\" : \"continue\", \n");
@@ -2089,6 +2242,11 @@ private String makeTokenRequest()
         sb.append("{ \"format\" : \"binary\",\n");
         sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n");
 
+        if (readSpan != null)
+        {
+            sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n");
+        }
+
         if (!useOldProtocol)
         {
             sb.append("\"command\" : \"newstream\", \n");
@@ -2108,6 +2266,12 @@ private String makeCloseHandleRequest()
 
         sb.append("{ \"format\" : \"binary\",\n");
         sb.append("  \"handle\" : \"" + Integer.toString(this.handle) + "\",");
+
+        if (readSpan != null)
+        {
+            sb.append("\"_trace/traceparent\" : \"" + readSpanTraceID + "\",\n");
+        }
+
         sb.append("  \"command\" : \"close\"");
         sb.append("\n}");
 
@@ -2145,7 +2309,15 @@ private void sendCloseFileRequest() throws IOException
         }
         catch (HpccFileException e)
         {
-            throw new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e);
+            IOException wrappedException = new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e);
+            if (readSpan != null)
+            {
+                Attributes attributes = Attributes.of(  ServerAttributes.SERVER_ADDRESS, getIP(),
+                                                        ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
+                                                        ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
+                readSpan.recordException(wrappedException, attributes);
+            }
+            throw wrappedException;
         }
     }
 
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java
index be85d89e8..4f1cc8282 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java
@@ -27,6 +27,11 @@
 import javax.net.ssl.SSLSocketFactory;
 
 import org.json.JSONObject;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.semconv.ServerAttributes;
+
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
 import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
@@ -62,6 +67,9 @@ public class RowServiceOutputStream extends OutputStream
     private long                 handle                        = -1;
     private ByteBuffer           scratchBuffer                 = ByteBuffer.allocate(SCRATCH_BUFFER_LEN);
 
+    private Span                 writeSpan                     = null;
+    private String               writeSpanTraceID              = "";
+
     private static class RowServiceResponse
     {
         int len = 0;
@@ -154,7 +162,7 @@ private static class RowServiceResponse
     RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath,
             CompressionAlgorithm fileCompression, int connectTimeoutMs) throws Exception
     {
-        this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, DEFAULT_SOCKET_OP_TIMEOUT_MS);
+        this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, DEFAULT_SOCKET_OP_TIMEOUT_MS, null);
     }
 
     /**
@@ -185,6 +193,40 @@ private static class RowServiceResponse
      */
     RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath,
             CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS) throws Exception
+    {
+        this(ip,port,useSSL,accessToken,recordDef,filePartIndex,filePartPath,fileCompression, connectTimeoutMs, sockOpTimeoutMS, null);
+    }
+
+    /**
+     * Creates RowServiceOutputStream to be used to stream data to target dafilesrv on HPCC cluster.
+     *
+     * @param ip
+     *            the ip
+     * @param port
+     *            the port
+     * @param useSSL
+     *            the use SSL
+     * @param accessToken
+     *            the access token
+     * @param recordDef
+     *            the record def
+     * @param filePartIndex
+     *            the file part index
+     * @param filePartPath
+     *            the file part path
+     * @param fileCompression
+     *            the file compression
+     * @param connectTimeoutMs
+     *            the socket connect timeout in ms (default is 5000)
+     * @param socketOpTimeoutMS
+     *            the socket operation(read/write) timeout in ms (default is 15000)
+     * @param writeSpan
+     *            the opentelemetry span to use for tracing
+     * @throws Exception
+     *             the exception
+     */
+    RowServiceOutputStream(String ip, int port, boolean useSSL, String accessToken, FieldDef recordDef, int filePartIndex, String filePartPath,
+            CompressionAlgorithm fileCompression, int connectTimeoutMs, int sockOpTimeoutMS, Span writeSpan) throws Exception
     {
         this.rowServiceIP = ip;
         this.rowServicePort = port;
@@ -194,6 +236,14 @@ private static class RowServiceResponse
         this.accessToken = accessToken;
         this.compressionAlgo = fileCompression;
         this.sockOpTimeoutMs = sockOpTimeoutMS;
+        this.writeSpan = writeSpan;
+
+        if (this.writeSpan != null)
+        {
+            Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                  ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+            writeSpan.addEvent("RowServiceOutputStream.connect", attributes);
+        }
 
         try
         {
@@ -236,13 +286,29 @@ private static class RowServiceResponse
         {
             String errorMessage = "Exception occured while attempting to connect to row service (" + rowServiceIP + ":" + rowServicePort + "): " + e.getMessage();
             log.error(errorMessage);
-            throw new Exception(errorMessage);
+
+            Exception wrappedException = new Exception(errorMessage, e);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                      ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
 
         //------------------------------------------------------------------------------
         // Check protocol version
         //------------------------------------------------------------------------------
 
+        if (writeSpan != null)
+        {
+            Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+            writeSpan.addEvent("RowServiceOutputStream.versionRequest", attributes);
+        }
+
         try
         {
             String msg = makeGetVersionRequest();
@@ -254,7 +320,15 @@ private static class RowServiceResponse
         }
         catch (IOException e)
         {
-            throw new HpccFileException("Failed on initial remote read read trans", e);
+            HpccFileException wrappedException = new HpccFileException("Failed on initial remote read read trans", e);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                      ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
 
         RowServiceResponse response = readResponse();
@@ -273,7 +347,15 @@ private static class RowServiceResponse
             }
             catch (IOException e)
             {
-                throw new HpccFileException("Error while attempting to read version response.", e);
+                HpccFileException wrappedException = new HpccFileException("Error while attempting to read version response.", e);
+                if (writeSpan != null)
+                {
+                    Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                        ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                    writeSpan.recordException(wrappedException, attributes);
+                }
+
+                throw wrappedException;
             }
 
             rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1);
@@ -287,7 +369,8 @@ private static class RowServiceResponse
 
     private String makeGetVersionRequest()
     {
-        final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", \"format\": \"binary\" }";
+        final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
+        final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }";
         return versionMsg;
     }
 
@@ -295,8 +378,10 @@ private void makeInitialWriteRequest() throws Exception
     {
         String jsonRecordDef = RecordDefinitionTranslator.toJsonRecord(this.recordDef).toString();
 
+        final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
         String initialRequest = "\n{\n"
                 + "    \"format\" : \"binary\",\n"
+                + trace
                 + "    \"replyLimit\" : " + SCRATCH_BUFFER_LEN + ",\n"
                 + (useOldProtocol ? "" : "\"command\" : \"newstream\",\n")
                 + "    \"node\" : {\n"
@@ -336,16 +421,27 @@ private void makeInitialWriteRequest() throws Exception
 
         if (response.errorCode != RFCCodes.RFCStreamNoError)
         {
-            throw new IOException(response.errorMessage);
+            IOException wrappedException = new IOException(response.errorMessage);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
     }
 
     private String makeCloseHandleRequest()
     {
+        final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
+
         StringBuilder sb = new StringBuilder(256);
         sb.delete(0, sb.length());
 
         sb.append("{ \"format\" : \"binary\",\n");
+        sb.append(trace);
         sb.append("  \"handle\" : \"" + Long.toString(this.handle) + "\",");
         sb.append("  \"command\" : \"close\"");
         sb.append("\n}");
@@ -373,7 +469,15 @@ private void sendCloseFileRequest() throws IOException
         }
         catch (IOException e)
         {
-            throw new IOException("Failed on close file with error: ", e);
+            IOException wrappedException = new IOException("Failed on close file with error: ", e);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
 
         RowServiceResponse response = null;
@@ -383,12 +487,28 @@ private void sendCloseFileRequest() throws IOException
         }
         catch (HpccFileException e)
         {
-            throw new IOException("Failed to close file. Unable to read response with error: ", e);
+            IOException wrappedException = new IOException("Failed to close file. Unable to read response with error: ", e);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
 
         if (response.errorCode != RFCCodes.RFCStreamNoError)
         {
-            throw new IOException(response.errorMessage);
+            IOException wrappedException = new IOException(response.errorMessage);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
     }
 
@@ -451,7 +571,15 @@ private RowServiceResponse readResponse() throws HpccFileException
 
             if (response.len < 4)
             {
-                throw new HpccFileException("Early data termination, no handle");
+                HpccFileException wrappedException = new HpccFileException("Early data termination, no handle");
+                if (writeSpan != null)
+                {
+                    Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                        ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                    writeSpan.recordException(wrappedException, attributes);
+                }
+
+                throw wrappedException;
             }
 
             response.handle = dis.readInt();
@@ -459,7 +587,15 @@ private RowServiceResponse readResponse() throws HpccFileException
         }
         catch (IOException e)
         {
-            throw new HpccFileException("Error while attempting to read row service response: ", e);
+            HpccFileException wrappedException = new HpccFileException("Error while attempting to read row service response: ", e);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
 
         return response;
@@ -480,7 +616,15 @@ public void close() throws IOException
         }
         else if (bytesWritten == 0 && compressionAlgo != CompressionAlgorithm.NONE)
         {
-            throw new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster.");
+            IOException wrappedException = new IOException("Fatal error while closing file. Writing compressed files with 0 length is not supported with the remote HPCC cluster.");
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
 
         this.socket.close();
@@ -513,7 +657,10 @@ public void write(byte[] b) throws IOException
      */
     public void write(byte[] b, int off, int len) throws IOException
     {
+        final String trace = writeSpan != null ? "\"_trace/traceparent\" : \"" + writeSpanTraceID + "\",\n" : "";
+
         String request = "{ \"format\" : \"binary\", \"handle\" : \"" + this.handle + "\""
+                       + trace
                        + (useOldProtocol ? "" : ", \"command\" : \"continue\"") + " }";
         byte[] jsonRequestData = request.getBytes("ISO-8859-1");
 
@@ -547,12 +694,28 @@ public void write(byte[] b, int off, int len) throws IOException
         }
         catch (HpccFileException e)
         {
-            throw new IOException("Failed during write operation. Unable to read response with error: ", e);
+            IOException wrappedException = new IOException("Failed during write operation. Unable to read response with error: ", e);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
 
         if (response.errorCode != RFCCodes.RFCStreamNoError)
         {
-            throw new IOException(response.errorMessage);
+            IOException wrappedException = new IOException(response.errorMessage);
+            if (writeSpan != null)
+            {
+                Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, this.rowServiceIP,
+                                                    ServerAttributes.SERVER_PORT, Long.valueOf(this.rowServicePort));
+                writeSpan.recordException(wrappedException, attributes);
+            }
+
+            throw wrappedException;
         }
     }
 
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java
new file mode 100644
index 000000000..377cca974
--- /dev/null
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *******************************************************************************/
+package org.hpccsystems.dfs.client;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+
+public class Utils
+{
+    public static Tracer getTelemetryTracer()
+    {
+        return GlobalOpenTelemetry.get().getTracer("DFSClient");
+    }
+
+    public static Span createRootSpan(String name)
+    {
+        return Utils.getTelemetryTracer().spanBuilder(name)
+                                        .setSpanKind(SpanKind.CLIENT)
+                                        .startSpan();
+    }
+
+    public static Span createChildSpan(Span parentSpan, String name)
+    {
+        return Utils.getTelemetryTracer().spanBuilder(name)
+                                        .setParent(Context.current().with(parentSpan))
+                                        .setSpanKind(SpanKind.CLIENT)
+                                        .startSpan();
+    }
+
+}