Skip to content

Commit

Permalink
Code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmcmu committed Jul 16, 2024
1 parent fae1052 commit fce00d0
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 61 deletions.
121 changes: 102 additions & 19 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
import java.nio.file.Paths;

import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;

import org.hpccsystems.commons.ecl.FieldDef;
import org.json.JSONArray;
Expand Down Expand Up @@ -68,6 +73,8 @@ public class FileUtility
private static final int NUM_DEFAULT_THREADS = 4;
static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120;

private static boolean otelInitialized = false;

private static class TaskContext
{
public AtomicLong recordsRead = new AtomicLong(0);
Expand All @@ -83,6 +90,24 @@ private static class TaskContext
private long operationStart = 0;
private List<JSONObject> operationResults = new ArrayList<JSONObject>();

private Span taskSpan = null;

public void setTaskSpanAttributes(Attributes attributes)
{
synchronized(taskSpan)
{
taskSpan.setAllAttributes(attributes);
}
}

public void makeTaskSpanCurrent()
{
synchronized(taskSpan)
{
taskSpan.makeCurrent();
}
}

public boolean hasError()
{
boolean err = false;
Expand All @@ -100,6 +125,11 @@ public void addError(String error)
{
errorMessages.add(error);
}

synchronized(taskSpan)
{
taskSpan.recordException(new Exception(error));
}
}

public void addWarn(String warn)
Expand Down Expand Up @@ -131,6 +161,15 @@ public boolean hasOperation()

public void startOperation(String operationName)
{
if (taskSpan != null)
{
taskSpan = Utils.createChildSpan(taskSpan, operationName);
}
else
{
taskSpan = Utils.createSpan(operationName);
}

clear();
currentOperationDesc = operationName;
operationStart = System.nanoTime();
Expand All @@ -148,6 +187,17 @@ public void endOperation(boolean success)
return;
}

if (success)
{
taskSpan.setStatus(StatusCode.OK);
}
else
{
taskSpan.setStatus(StatusCode.ERROR);
}

taskSpan.end();

long totalOperationTime = System.nanoTime();
totalOperationTime -= operationStart;

Expand Down Expand Up @@ -635,7 +685,7 @@ private static String[] filterFilesByFormat(String[] srcFiles, FileFormat format
return filteredFiles.toArray(new String[0]);
}

private static void executeTasks(Runnable[] tasks, int numThreads) throws Exception
private static void executeTasks(Runnable[] tasks, int numThreads, TaskContext context) throws Exception
{
int numTasksPerThread = tasks.length / numThreads;
int numResidualTasks = tasks.length % numThreads;
Expand All @@ -661,6 +711,8 @@ private static void executeTasks(Runnable[] tasks, int numThreads) throws Except

public void run()
{
context.makeTaskSpanCurrent();

for (int j = 0; j < numSubTasks; j++)
{
subTasks[startingSubTask + j].run();
Expand Down Expand Up @@ -1097,7 +1149,8 @@ private static void performRead(String[] args, TaskContext context)
for (int i = 0; i < datasets.length; i++)
{
String datasetName = datasets[i];
context.startOperation("Read " + datasetName);
context.startOperation("FileUtility.Read_" + datasetName);
context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString));

HPCCFile file = null;
try
Expand All @@ -1106,7 +1159,8 @@ private static void performRead(String[] args, TaskContext context)
}
catch (Exception e)
{
System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage());
String error = "Error while attempting to open file: '" + datasetName + "': " + e.getMessage();
context.addError(error);
return;
}

Expand All @@ -1119,7 +1173,8 @@ private static void performRead(String[] args, TaskContext context)
}
catch (Exception e)
{
System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage());
String error = "Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage();
context.addError(error);
return;
}

Expand Down Expand Up @@ -1165,7 +1220,7 @@ private static void performRead(String[] args, TaskContext context)

try
{
executeTasks(tasks, numThreads);
executeTasks(tasks, numThreads, context);
}
catch (Exception e)
{
Expand Down Expand Up @@ -1281,7 +1336,9 @@ private static void performReadTest(String[] args, TaskContext context)
}

String datasetName = cmd.getOptionValue("read_test");
context.startOperation("Read Test " + datasetName);
context.startOperation("FileUtility.ReadTest_" + datasetName);

context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString));

HPCCFile file = null;
try
Expand All @@ -1291,7 +1348,7 @@ private static void performReadTest(String[] args, TaskContext context)
}
catch (Exception e)
{
System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage());
context.addError("Error while attempting to open file: '" + datasetName + "': " + e.getMessage());
return;
}

Expand All @@ -1304,7 +1361,7 @@ private static void performReadTest(String[] args, TaskContext context)
}
catch (Exception e)
{
System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage());
context.addError("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage());
return;
}

Expand All @@ -1319,16 +1376,15 @@ private static void performReadTest(String[] args, TaskContext context)
int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1;
if (filePartIndex < 0 || filePartIndex >= fileParts.length)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]
context.addWarn("Skipping invalid file part index: " + filePartsStrs[i]
+ " outside of range: [0," + fileParts.length + "]");
continue;
}

filePartList.add(fileParts[filePartIndex]);
}
catch (NumberFormatException e)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]);
context.addWarn("Skipping invalid file part index: " + filePartsStrs[i]);
}
}
}
Expand All @@ -1354,7 +1410,7 @@ private static void performReadTest(String[] args, TaskContext context)

try
{
executeTasks(tasks, numThreads);
executeTasks(tasks, numThreads, context);
}
catch (Exception e)
{
Expand Down Expand Up @@ -1465,7 +1521,9 @@ private static void performCopy(String[] args, TaskContext context)
String srcFile = copyPairs[i];
String destFile = copyPairs[i+1];

context.startOperation("Copy " + srcFile + " -> " + destFile);
context.startOperation("FileUtility.Copy_ " + srcFile + " -> " + destFile);
context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.src.url"), srcURL,
AttributeKey.stringKey("server.dest.url"), destURL));

HPCCFile file = null;
try
Expand All @@ -1486,6 +1544,7 @@ private static void performCopy(String[] args, TaskContext context)
catch (HpccFileException e)
{
context.addError("Error while retrieving file parts for: '" + srcFile + "': " + e.getMessage());
return;
}

boolean shouldRedistribute = true;
Expand Down Expand Up @@ -1519,7 +1578,7 @@ private static void performCopy(String[] args, TaskContext context)

try
{
executeTasks(tasks, numThreads);
executeTasks(tasks, numThreads, context);
}
catch (Exception e)
{
Expand All @@ -1529,7 +1588,7 @@ private static void performCopy(String[] args, TaskContext context)

if (context.hasError())
{
return;
return;
}

try
Expand Down Expand Up @@ -1643,7 +1702,10 @@ private static void performWrite(String[] args, TaskContext context)
String srcFile = writePairs[pairIdx];
String destFile = writePairs[pairIdx+1];

context.startOperation("Write " + srcFile + " -> " + destFile);
context.startOperation( "FileUtility.Write_" + srcFile + "_to_" + destFile);

Attributes attributes = Attributes.of(AttributeKey.stringKey("server.url"), destURL);
context.setTaskSpanAttributes(attributes);

SplitTable[] splitTables = null;
String[] srcFiles = null;
Expand Down Expand Up @@ -1687,7 +1749,7 @@ private static void performWrite(String[] args, TaskContext context)

try
{
executeTasks(tasks, numThreads);
executeTasks(tasks, numThreads, context);
}
catch (Exception e)
{
Expand Down Expand Up @@ -1743,7 +1805,7 @@ private static void performWrite(String[] args, TaskContext context)

try
{
executeTasks(tasks, numThreads);
executeTasks(tasks, numThreads, context);
}
catch (Exception e)
{
Expand Down Expand Up @@ -1779,7 +1841,28 @@ private static void performWrite(String[] args, TaskContext context)
*/
public static JSONArray run(String[] args)
{
AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
if (!otelInitialized)
{
if (Boolean.getBoolean("otel.java.global-autoconfigure.enabled"))
{
System.out.println("OpenTelemetry autoconfiguration enabled with following values.");
System.out.println("If any of these options are not provided, they will defalt to values which could require additional CLASSPATH dependancies.");
System.out.println("If missing dependancies arise, utility will halt!");
System.out.println(" otel.traces.exporter sys property: " + System.getProperty("otel.traces.exporter"));
System.out.println(" OTEL_TRACES_EXPORTER Env var: " + System.getenv("OTEL_TRACES_EXPORTER"));
System.out.println(" OTEL_TRACES_SAMPLER Env var: " + System.getenv("OTEL_TRACES_SAMPLER"));
System.out.println(" otel.traces.sampler sys property: " + System.getProperty("otel.traces.sampler"));
System.out.println(" otel.logs.exporter: "+ System.getProperty("otel.logs.exporter"));
System.out.println(" OTEL_LOGS_EXPORTER Env var: " + System.getenv("OTEL_LOGS_EXPORTER"));
System.out.println(" otel.metrics.exporter: "+ System.getProperty("otel.metrics.exporter"));
System.out.println(" OTEL_METRICS_EXPORTER Env var: " + System.getenv("OTEL_METRICS_EXPORTER"));

OpenTelemetry otel = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
Utils.setGlobalOpenTelemetry(otel);
}

otelInitialized = true;
}

Options options = getTopLevelOptions();
CommandLineParser parser = new DefaultParser();
Expand Down
Loading

0 comments on commit fce00d0

Please sign in to comment.