Skip to content

Commit

Permalink
HPCC4J-639 Add concurrent connection startup limit
Browse files Browse the repository at this point in the history
- Added a limit to the number of connections that can be started simultaneously
- Reduced the size of the initial read request
- Exposed parameters in FileUtility

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Sep 24, 2024
1 parent 735de52 commit 06e5a4e
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public JSONObject end(boolean success)

public int readRetries = HpccRemoteFileReader.DEFAULT_READ_RETRIES;
public int socketOpTimeoutMS = RowServiceInputStream.DEFAULT_SOCKET_OP_TIMEOUT_MS;
public int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB;

public void setCurrentOperationSpanAttributes(Attributes attributes)
{
Expand Down Expand Up @@ -352,6 +353,26 @@ private static String[] getCredentials(CommandLine cmd)
return new String[] {user, pass};
}

private static void applyGlobalConfig(CommandLine cmd)
{
int concurrentStartups = -1;
String concurrentStartupsStr = cmd.getOptionValue("connection_startup_limit", "" + -1);
try
{
concurrentStartups = Integer.parseInt(concurrentStartupsStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for connection_startup_limit: "
+ concurrentStartupsStr + ", must be an integer.");
}

if (concurrentStartups > 0)
{
RowServiceInputStream.setMaxConcurrentConnectionStartups(concurrentStartups);
}
}

private static enum FileFormat
{
THOR,
Expand Down Expand Up @@ -589,6 +610,23 @@ private static int getSocketOpTimeoutMS(CommandLine cmd)
return socketOpTimeoutS * 1000;
}

private static int getInitialReadSizeKB(CommandLine cmd)
{
int initialReadSizeKB = RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB;
String initialReadSizeStr = cmd.getOptionValue("initial_read_size", "" + initialReadSizeKB);
try
{
initialReadSizeKB = Integer.parseInt(initialReadSizeStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for initial_read_size: "
+ initialReadSizeStr + ", must be an integer. Defaulting to: " + RowServiceInputStream.DEFAULT_INITIAL_REQUEST_READ_SIZE_KB + "KB.");
}

return initialReadSizeKB;
}

private static Options getReadOptions()
{
Options options = new Options();
Expand All @@ -602,6 +640,7 @@ private static Options getReadOptions()
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");

options.addOption(Option.builder("read")
.argName("files")
Expand All @@ -622,12 +661,14 @@ private static Options getReadTestOptions()
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");
options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice.");
options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice.");
options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice.");
options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster.");
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand All @@ -651,6 +692,7 @@ private static Options getCopyOptions()
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");

options.addOption(Option.builder("copy")
.argName("files")
Expand All @@ -673,6 +715,7 @@ private static Options getWriteOptions()
options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");

options.addOption(Option.builder("write")
.argName("files")
Expand Down Expand Up @@ -903,18 +946,21 @@ public void run()
HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext();
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.initialReadSizeKB = context.initialReadSizeKB;
readContext.readSizeKB = readRequestSize;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;

HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(readContext, filePart, new HPCCRecordBuilder(recordDef));
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);
fileReader.setMaxReadRetries(context.readRetries);

long recCount = 0;
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -942,6 +988,7 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;
readContext.initialReadSizeKB = context.initialReadSizeKB;

final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef));
filePartReader.setMaxReadRetries(context.readRetries);
Expand All @@ -963,13 +1010,15 @@ public void run()
{
try
{
long recCount = 0;
while (fileReader.hasNext())
{
splitTable.addRecordPosition(fileReader.getStreamPosition());
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

splitTable.finish(fileReader.getStreamPosition());

Expand Down Expand Up @@ -1068,6 +1117,7 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre
readContext.parentSpan = context.getCurrentOperation().operationSpan;
readContext.originalRD = recordDef;
readContext.socketOpTimeoutMS = context.socketOpTimeoutMS;
readContext.initialReadSizeKB = context.initialReadSizeKB;
filePartReaders[j] = new HpccRemoteFileReader<HPCCRecord>(readContext, inFilePart, new HPCCRecordBuilder(recordDef));
filePartReaders[j].setMaxReadRetries(context.readRetries);
}
Expand All @@ -1091,14 +1141,18 @@ public void run()
{
for (int k = 0; k < fileReaders.length; k++)
{
long recordsRead = 0;
long recordsWritten = 0;
HpccRemoteFileReader<HPCCRecord> fileReader = fileReaders[k];
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}
context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -1251,14 +1305,19 @@ public void run()
splitEnd = endingSplit.splitEnd;
}

long recordsRead = 0;
long recordsWritten = 0;
while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd)
{
HPCCRecord record = (HPCCRecord) fileReader.getNext();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}

context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord());
inputStreams[j].close();
}
Expand Down Expand Up @@ -1299,6 +1358,8 @@ private static void performRead(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String outputPath = cmd.getOptionValue("out",".");

int numThreads = NUM_DEFAULT_THREADS;
Expand All @@ -1321,6 +1382,7 @@ private static void performRead(String[] args, TaskContext context)

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);
context.initialReadSizeKB = getInitialReadSizeKB(cmd);

FileFormat format = FileFormat.THOR;
switch (formatStr.toUpperCase())
Expand Down Expand Up @@ -1501,6 +1563,8 @@ private static void performReadTest(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String outputPath = cmd.getOptionValue("out",".");

int numThreads = NUM_DEFAULT_THREADS;
Expand Down Expand Up @@ -1553,6 +1617,7 @@ private static void performReadTest(String[] args, TaskContext context)

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);
context.initialReadSizeKB = getInitialReadSizeKB(cmd);

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
Expand Down Expand Up @@ -1733,6 +1798,8 @@ private static void performCopy(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String destClusterName = cmd.getOptionValue("dest_cluster");

String srcURL = cmd.getOptionValue("url");
Expand Down Expand Up @@ -1781,6 +1848,7 @@ private static void performCopy(String[] args, TaskContext context)

context.readRetries = getReadRetries(cmd);
context.socketOpTimeoutMS = getSocketOpTimeoutMS(cmd);
context.initialReadSizeKB = getInitialReadSizeKB(cmd);

for (int i = 0; i < copyPairs.length; i+=2)
{
Expand Down Expand Up @@ -1937,6 +2005,8 @@ private static void performWrite(String[] args, TaskContext context)
String user = creds[0];
String pass = creds[1];

applyGlobalConfig(cmd);

String destClusterName = cmd.getOptionValue("dest_cluster");

String srcURL = cmd.getOptionValue("url");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static class FileReadContext
public int socketOpTimeoutMS = -1;
public int recordReadLimit = -1;
public boolean createPrefetchThread = true;
public int initialReadSizeKB = -1;
public int readSizeKB = -1;
public int readRequestSpanBatchSize = -1; // The number of read requests before creating a new span
public Span parentSpan = null;
Expand All @@ -92,6 +93,21 @@ private static FileReadContext constructReadContext(FieldDef originalRD, int con
return context;
}

private static RowServiceInputStream.StreamContext constructStreamContext(FileReadContext readContext)
{
RowServiceInputStream.StreamContext context = new RowServiceInputStream.StreamContext();
context.recordDefinition = readContext.originalRD;
context.recordReadLimit = readContext.recordReadLimit;
context.createPrefetchThread = readContext.createPrefetchThread;
context.maxReadSizeKB = readContext.readSizeKB;
context.initialReadSizeKB = readContext.initialReadSizeKB;
context.connectTimeoutMS = readContext.connectTimeout;
context.socketOpTimeoutMS = readContext.socketOpTimeoutMS;
context.createPrefetchThread = readContext.createPrefetchThread;

return context;
}

/**
* Instantiates a new hpcc remote file reader.
*
Expand Down Expand Up @@ -287,12 +303,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
throw e;
}

RowServiceInputStream.StreamContext streamContext = constructStreamContext(context);
streamContext.projectedRecordDefinition = projectedRecordDefinition;
streamContext.fileReadSpan = this.readSpan;

if (resumeInfo == null)
{
this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, null,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, null);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

this.binaryRecordReader = new BinaryRecordReader(this.inputStream);
this.binaryRecordReader.initialize(this.recordBuilder);

Expand All @@ -307,9 +326,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD, projectedRecordDefinition, context.connectTimeout,
context.recordReadLimit, context.createPrefetchThread, context.readSizeKB, restartInfo,
false, context.socketOpTimeoutMS, this.readSpan);
this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);

long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
Expand Down Expand Up @@ -386,9 +403,11 @@ private boolean retryRead()
{
this.readSpan = createReadSpan(context, dataPartition);

this.inputStream = new RowServiceInputStream(this.dataPartition, context.originalRD,this.recordBuilder.getRecordDefinition(),
context.connectTimeout, context.recordReadLimit, context.createPrefetchThread,
context.readSizeKB, restartInfo, false, context.socketOpTimeoutMS, this.readSpan);
RowServiceInputStream.StreamContext streamContext = constructStreamContext(context);
streamContext.projectedRecordDefinition = this.recordBuilder.getRecordDefinition();
streamContext.fileReadSpan = this.readSpan;

this.inputStream = new RowServiceInputStream(streamContext, this.dataPartition, restartInfo);
this.inputStream.setReadRequestSpanBatchSize(context.readRequestSpanBatchSize);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
Expand Down
Loading

0 comments on commit 06e5a4e

Please sign in to comment.