From fce00d0ed268a5a2426477b8a331b9b1f98db48e Mon Sep 17 00:00:00 2001 From: James McMullan Date: Tue, 16 Jul 2024 16:06:31 -0400 Subject: [PATCH] Code review changes --- .../hpccsystems/dfs/client/FileUtility.java | 121 +++++++++++++++--- .../dfs/client/HPCCRemoteFileWriter.java | 52 ++++++-- .../dfs/client/HpccRemoteFileReader.java | 115 ++++++++++++----- .../org/hpccsystems/dfs/client/Utils.java | 14 +- 4 files changed, 241 insertions(+), 61 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index ddbdda681..e610bde16 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -34,6 +34,11 @@ import java.nio.file.Paths; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import org.hpccsystems.commons.ecl.FieldDef; import org.json.JSONArray; @@ -68,6 +73,8 @@ public class FileUtility private static final int NUM_DEFAULT_THREADS = 4; static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120; + private static boolean otelInitialized = false; + private static class TaskContext { public AtomicLong recordsRead = new AtomicLong(0); @@ -83,6 +90,24 @@ private static class TaskContext private long operationStart = 0; private List operationResults = new ArrayList(); + private Span taskSpan = null; + + public void setTaskSpanAttributes(Attributes attributes) + { + synchronized(taskSpan) + { + taskSpan.setAllAttributes(attributes); + } + } + + public void makeTaskSpanCurrent() + { + synchronized(taskSpan) + { + taskSpan.makeCurrent(); + } + } + public boolean hasError() { boolean err = false; @@ -100,6 +125,11 @@ public void addError(String error) { errorMessages.add(error); } + + synchronized(taskSpan) + { + taskSpan.recordException(new Exception(error)); + } } public void addWarn(String warn) @@ -131,6 +161,15 @@ public boolean hasOperation() public void startOperation(String operationName) { + if (taskSpan != null) + { + taskSpan = Utils.createChildSpan(taskSpan, operationName); + } + else + { + taskSpan = Utils.createSpan(operationName); + } + clear(); currentOperationDesc = operationName; operationStart = System.nanoTime(); @@ -148,6 +187,17 @@ public void endOperation(boolean success) return; } + if (success) + { + taskSpan.setStatus(StatusCode.OK); + } + else + { + taskSpan.setStatus(StatusCode.ERROR); + } + + taskSpan.end(); + long totalOperationTime = System.nanoTime(); totalOperationTime -= operationStart; @@ -635,7 +685,7 @@ private static String[] filterFilesByFormat(String[] srcFiles, FileFormat format return filteredFiles.toArray(new String[0]); } - private static void executeTasks(Runnable[] tasks, int numThreads) throws Exception + private static void executeTasks(Runnable[] tasks, int numThreads, TaskContext context) throws Exception { int numTasksPerThread = tasks.length / numThreads; int numResidualTasks = tasks.length % numThreads; @@ -661,6 +711,8 @@ private static void executeTasks(Runnable[] tasks, int numThreads) throws Except public void run() { + context.makeTaskSpanCurrent(); + for (int j = 0; j < numSubTasks; j++) { subTasks[startingSubTask + j].run(); @@ -1097,7 +1149,8 @@ private static void performRead(String[] args, TaskContext context) for (int i = 0; i < datasets.length; i++) { String datasetName = datasets[i]; - context.startOperation("Read " + datasetName); + context.startOperation("FileUtility.Read_" + datasetName); + context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1106,7 +1159,8 @@ private static void performRead(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + String error = "Error while attempting to open file: '" + datasetName + "': " + e.getMessage(); + context.addError(error); return; } @@ -1119,7 +1173,8 @@ private static void performRead(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + String error = "Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage(); + context.addError(error); return; } @@ -1165,7 +1220,7 @@ private static void performRead(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1281,7 +1336,9 @@ private static void performReadTest(String[] args, TaskContext context) } String datasetName = cmd.getOptionValue("read_test"); - context.startOperation("Read Test " + datasetName); + context.startOperation("FileUtility.ReadTest_" + datasetName); + + context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1291,7 +1348,7 @@ private static void performReadTest(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + context.addError("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); return; } @@ -1304,7 +1361,7 @@ private static void performReadTest(String[] args, TaskContext context) } catch (Exception e) { - System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + context.addError("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); return; } @@ -1319,16 +1376,15 @@ private static void performReadTest(String[] args, TaskContext context) int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1; if (filePartIndex < 0 || filePartIndex >= fileParts.length) { - System.out.println("Skipping invalid file part index: " + filePartsStrs[i] + context.addWarn("Skipping invalid file part index: " + filePartsStrs[i] + " outside of range: [0," + fileParts.length + "]"); - continue; } filePartList.add(fileParts[filePartIndex]); } catch (NumberFormatException e) { - System.out.println("Skipping invalid file part index: " + filePartsStrs[i]); + context.addWarn("Skipping invalid file part index: " + filePartsStrs[i]); } } } @@ -1354,7 +1410,7 @@ private static void performReadTest(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1465,7 +1521,9 @@ private static void performCopy(String[] args, TaskContext context) String srcFile = copyPairs[i]; String destFile = copyPairs[i+1]; - context.startOperation("Copy " + srcFile + " -> " + destFile); + context.startOperation("FileUtility.Copy_ " + srcFile + " -> " + destFile); + context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.src.url"), srcURL, + AttributeKey.stringKey("server.dest.url"), destURL)); HPCCFile file = null; try @@ -1486,6 +1544,7 @@ private static void performCopy(String[] args, TaskContext context) catch (HpccFileException e) { context.addError("Error while retrieving file parts for: '" + srcFile + "': " + e.getMessage()); + return; } boolean shouldRedistribute = true; @@ -1519,7 +1578,7 @@ private static void performCopy(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1529,7 +1588,7 @@ private static void performCopy(String[] args, TaskContext context) if (context.hasError()) { - return; + return; } try @@ -1643,7 +1702,10 @@ private static void performWrite(String[] args, TaskContext context) String srcFile = writePairs[pairIdx]; String destFile = writePairs[pairIdx+1]; - context.startOperation("Write " + srcFile + " -> " + destFile); + context.startOperation( "FileUtility.Write_" + srcFile + "_to_" + destFile); + + Attributes attributes = Attributes.of(AttributeKey.stringKey("server.url"), destURL); + context.setTaskSpanAttributes(attributes); SplitTable[] splitTables = null; String[] srcFiles = null; @@ -1687,7 +1749,7 @@ private static void performWrite(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1743,7 +1805,7 @@ private static void performWrite(String[] args, TaskContext context) try { - executeTasks(tasks, numThreads); + executeTasks(tasks, numThreads, context); } catch (Exception e) { @@ -1779,7 +1841,28 @@ private static void performWrite(String[] args, TaskContext context) */ public static JSONArray run(String[] args) { - AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + if (!otelInitialized) + { + if (Boolean.getBoolean("otel.java.global-autoconfigure.enabled")) + { + System.out.println("OpenTelemetry autoconfiguration enabled with following values."); + System.out.println("If any of these options are not provided, they will defalt to values which could require additional CLASSPATH dependancies."); + System.out.println("If missing dependancies arise, utility will halt!"); + System.out.println(" otel.traces.exporter sys property: " + System.getProperty("otel.traces.exporter")); + System.out.println(" OTEL_TRACES_EXPORTER Env var: " + System.getenv("OTEL_TRACES_EXPORTER")); + System.out.println(" OTEL_TRACES_SAMPLER Env var: " + System.getenv("OTEL_TRACES_SAMPLER")); + System.out.println(" otel.traces.sampler sys property: " + System.getProperty("otel.traces.sampler")); + System.out.println(" otel.logs.exporter: "+ System.getProperty("otel.logs.exporter")); + System.out.println(" OTEL_LOGS_EXPORTER Env var: " + System.getenv("OTEL_LOGS_EXPORTER")); + System.out.println(" otel.metrics.exporter: "+ System.getProperty("otel.metrics.exporter")); + System.out.println(" OTEL_METRICS_EXPORTER Env var: " + System.getenv("OTEL_METRICS_EXPORTER")); + + OpenTelemetry otel = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + Utils.setGlobalOpenTelemetry(otel); + } + + otelInitialized = true; + } Options options = getTopLevelOptions(); CommandLineParser parser = new DefaultParser(); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java index 05092272d..3bc8b93ed 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCRemoteFileWriter.java @@ -36,7 +36,6 @@ public class HPCCRemoteFileWriter { private static final Logger log = LogManager.getLogger(HPCCRemoteFileWriter.class); - private FieldDef recordDef = null; private DataPartition dataPartition = null; private RowServiceOutputStream outputStream = null; private BinaryRecordWriter binaryRecordWriter = null; @@ -44,9 +43,31 @@ public class HPCCRemoteFileWriter private long recordsWritten = 0; private long openTimeMs = 0; + private FileWriteContext context = null; + private Span writeSpan = null; private String writeSpanName = null; + public static class FileWriteContext + { + public FieldDef recordDef = null; + public CompressionAlgorithm fileCompression = CompressionAlgorithm.DEFAULT; + public int connectTimeoutMs = -1; + public int socketOpTimeoutMs = -1; + public Span parentSpan = null; + } + + private static FileWriteContext constructReadContext(FieldDef recordDef, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs) + { + FileWriteContext context = new FileWriteContext(); + context.recordDef = recordDef; + context.fileCompression = fileCompression; + context.connectTimeoutMs = connectTimeoutMs; + context.socketOpTimeoutMs = socketOpTimeoutMs; + + return context; + } + /** * A remote file writer. * @@ -110,13 +131,26 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccessor recordAccessor, CompressionAlgorithm fileCompression, int connectTimeoutMs, int socketOpTimeoutMs) throws Exception { - this.recordDef = recordDef; + this(constructReadContext(recordDef, fileCompression, connectTimeoutMs, socketOpTimeoutMs), dp, recordAccessor); + } + + public HPCCRemoteFileWriter(FileWriteContext ctx, DataPartition dp, IRecordAccessor recordAccessor) + throws Exception + { this.dataPartition = dp; + this.context = ctx; this.recordAccessor = recordAccessor; this.writeSpanName = "HPCCRemoteFileWriter.RowService/Write_" + dp.getFileName() + "_" + dp.getThisPart(); - this.writeSpan = Utils.createSpan(writeSpanName); + if (context.parentSpan == null) + { + this.writeSpan = Utils.createSpan(writeSpanName); + } + else + { + this.writeSpan = Utils.createChildSpan(context.parentSpan, writeSpanName); + } String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -125,22 +159,22 @@ public HPCCRemoteFileWriter(DataPartition dp, FieldDef recordDef, IRecordAccesso secondaryIP = dp.getCopyIP(1); } - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP, - AttributeKey.stringKey("server.secondary.address"), secondaryIP, + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort())); writeSpan.setAllAttributes(attributes); this.outputStream = new RowServiceOutputStream(dataPartition.getCopyIP(0), dataPartition.getPort(), dataPartition.getUseSsl(), - dataPartition.getFileAccessBlob(), this.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), - fileCompression, connectTimeoutMs, socketOpTimeoutMs, this.writeSpan); + dataPartition.getFileAccessBlob(), context.recordDef, this.dataPartition.getThisPart(), this.dataPartition.getCopyPath(0), + context.fileCompression, context.connectTimeoutMs, context.socketOpTimeoutMs, this.writeSpan); this.binaryRecordWriter = new BinaryRecordWriter(this.outputStream); this.binaryRecordWriter.initialize(this.recordAccessor); log.info("HPCCRemoteFileWriter: Opening file part: " + dataPartition.getThisPart() - + " compression: " + fileCompression.name()); + + " compression: " + context.fileCompression.name()); log.trace("Record definition:\n" - + RecordDefinitionTranslator.toJsonRecord(this.recordDef)); + + RecordDefinitionTranslator.toJsonRecord(context.recordDef)); openTimeMs = System.currentTimeMillis(); } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index d8e2a1aef..1bad07312 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -38,7 +38,6 @@ public class HpccRemoteFileReader implements Iterator { private static final Logger log = LogManager.getLogger(HpccRemoteFileReader.class); - private FieldDef originalRecordDef = null; private DataPartition dataPartition = null; private RowServiceInputStream inputStream = null; private BinaryRecordReader binaryRecordReader; @@ -46,18 +45,13 @@ public class HpccRemoteFileReader implements Iterator private boolean handlePrefetch = true; private boolean isClosed = false; private boolean canReadNext = true; - private boolean createPrefetchThread = true; private int retryCount = 0; - private int connectTimeout = 0; - private int readSizeKB = 0; - private int limit = -1; private int maxReadRetries = DEFAULT_READ_RETRIES; - private int socketOpTimeoutMs = 0; private long openTimeMs = 0; private long recordsRead = 0; + private FileReadContext context = null; private Span readSpan = null; - private String readSpanName = null; public static final int NO_RECORD_LIMIT = -1; public static final int DEFAULT_READ_SIZE_OPTION = -1; @@ -71,6 +65,31 @@ public static class FileReadResumeInfo public long recordReaderStreamPos = 0; }; + public static class FileReadContext + { + public FieldDef originalRD = null; + public int connectTimeout = -1; + public int socketOpTimeoutMS = -1; + public int recordReadLimit = -1; + public boolean createPrefetchThread = true; + public int readSizeKB = -1; + public Span parentSpan = null; + }; + + private static FileReadContext constructReadContext(FieldDef originalRD, int connectTimeout, int socketOpTimeoutMS, + int recordReadLimit, boolean createPrefetchThread, int readSizeKB) + { + FileReadContext context = new FileReadContext(); + context.originalRD = originalRD; + context.connectTimeout = connectTimeout; + context.socketOpTimeoutMS = socketOpTimeoutMS; + context.recordReadLimit = recordReadLimit; + context.createPrefetchThread = createPrefetchThread; + context.readSizeKB = readSizeKB; + + return context; + } + /** * Instantiates a new hpcc remote file reader. * @@ -204,19 +223,41 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde * @throws Exception * general exception */ - public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception + public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception { - this.handlePrefetch = createPrefetchThread; - this.originalRecordDef = originalRD; + this(constructReadContext(originalRD, connectTimeout, socketOpTimeoutMs, limit, createPrefetchThread, readSizeKB), dp, recBuilder, resumeInfo); + } + + /** + * A remote file reader that reads the part identified by the HpccPart object using the record definition provided. + * + * @param ctx + * the FileReadContext + * @param dp + * the part of the file, name and location + * @param recBuilder + * the IRecordBuilder used to construct records + * @param resumeInfo + * FileReadeResumeInfo data required to restart a read from a particular point in a file, null for reading from start + * @throws Exception + * general exception + */ + public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilder recBuilder, FileReadResumeInfo resumeInfo) throws Exception + { + this.context = ctx; + this.handlePrefetch = context.createPrefetchThread; this.dataPartition = dp; this.recordBuilder = recBuilder; - this.readSizeKB = readSizeKB; - this.limit = limit; - this.createPrefetchThread = createPrefetchThread; - this.socketOpTimeoutMs = socketOpTimeoutMs; - this.readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dp.getFileName() + "_" + dp.getThisPart(); - this.readSpan = Utils.createSpan(readSpanName); + String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); + if (context.parentSpan != null) + { + this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); + } + else + { + this.readSpan = Utils.createSpan(readSpanName); + } String primaryIP = dp.getCopyIP(0); String secondaryIP = ""; @@ -225,19 +266,18 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde secondaryIP = dp.getCopyIP(1); } - Attributes attributes = Attributes.of( AttributeKey.stringKey("server.primary.address"), primaryIP, - AttributeKey.stringKey("server.secondary.address"), secondaryIP, + Attributes attributes = Attributes.of( AttributeKey.stringKey("server.0.address"), primaryIP, + AttributeKey.stringKey("server.1.address"), secondaryIP, ServerAttributes.SERVER_PORT, Long.valueOf(dp.getPort()), - AttributeKey.longKey("read.size"), Long.valueOf(readSizeKB*1000)); - readSpan.setAllAttributes(attributes); + AttributeKey.longKey("read.size"), Long.valueOf(context.readSizeKB*1000)); + this.readSpan.setAllAttributes(attributes); - if (connectTimeout < 1) + if (context.connectTimeout < 1) { - connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS; + context.connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS; } - this.connectTimeout = connectTimeout; - if (this.originalRecordDef == null) + if (context.originalRD == null) { Exception e = new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required."); this.readSpan.recordException(e); @@ -256,7 +296,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde if (resumeInfo == null) { - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, null, false, socketOpTimeoutMs, readSpan); + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, + context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null, + false, context.socketOpTimeoutMS, this.readSpan); this.binaryRecordReader = new BinaryRecordReader(this.inputStream); this.binaryRecordReader.initialize(this.recordBuilder); @@ -271,7 +313,9 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde restartInfo.streamPos = resumeInfo.inputStreamPos; restartInfo.tokenBin = resumeInfo.tokenBin; - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, projectedRecordDefinition, connectTimeout, limit, createPrefetchThread, readSizeKB, restartInfo, false, socketOpTimeoutMs, this.readSpan); + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout, + context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo, + false, context.socketOpTimeoutMS, this.readSpan); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) @@ -289,7 +333,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart() + (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" )); log.trace("Original record definition:\n" - + RecordDefinitionTranslator.toJsonRecord(originalRD) + + RecordDefinitionTranslator.toJsonRecord(context.originalRD) + " projected record definition:\n" + RecordDefinitionTranslator.toJsonRecord(projectedRecordDefinition)); openTimeMs = System.currentTimeMillis(); @@ -315,10 +359,19 @@ private boolean retryRead() try { - this.readSpan = Utils.createSpan(readSpanName); - this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef, - this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread, - this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs, this.readSpan); + String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition.getFileName() + "_" + dataPartition.getThisPart(); + if (context.parentSpan != null) + { + this.readSpan = Utils.createChildSpan(context.parentSpan, readSpanName); + } + else + { + this.readSpan = Utils.createSpan(readSpanName); + } + + this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(), + context.connectTimeout, context.recordReadLimit, context.createPrefetchThread, + context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan); long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos; if (bytesToSkip < 0) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java index 7462c64dc..7ac5b74e5 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java @@ -34,6 +34,11 @@ public static OpenTelemetry getOpenTelemetry() return globalOpenTelemetry; } + public static void setGlobalOpenTelemetry(OpenTelemetry ot) + { + globalOpenTelemetry = ot; + } + public static Tracer getTelemetryTracer() { if (dfsClientTracer == null) @@ -46,17 +51,22 @@ public static Tracer getTelemetryTracer() public static Span createSpan(String name) { - return Utils.getTelemetryTracer().spanBuilder(name) + Span span = Utils.getTelemetryTracer().spanBuilder(name) .setSpanKind(SpanKind.CLIENT) .startSpan(); + span.makeCurrent(); + + return span; } public static Span createChildSpan(Span parentSpan, String name) { - return Utils.getTelemetryTracer().spanBuilder(name) + Span span = Utils.getTelemetryTracer().spanBuilder(name) .setParent(Context.current().with(parentSpan)) .setSpanKind(SpanKind.CLIENT) .startSpan(); + span.makeCurrent(); + return span; } }