Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-542 DFSClient: Create JUnit for read retry #648

Merged
merged 2 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package org.hpccsystems.dfs.client;

import java.io.Serializable;
import java.security.InvalidParameterException;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;

import org.hpccsystems.commons.ecl.FileFilter;
import org.hpccsystems.commons.errors.HpccFileException;
Expand Down Expand Up @@ -261,11 +265,60 @@ public String[] getCopyLocations()
public String getCopyIP(int copyindex)
{
int copiescount = copyLocations.length;
if (copyindex < 0 || copyindex >= copiescount) return null;
if (copyindex < 0 || copyindex >= copiescount)
{
return null;
}

return copyLocations[copyindex];
}

/**
* Set the copy IP
*
* @param copyIndex
* the copyindex
* @param copyIP The IP of the file part copy
*/
public void setCopyIP(int copyIndex, String copyIP)
{
if (copyIndex < 0 || copyIndex >= copyLocations.length)
{
return;
}

copyLocations[copyIndex] = copyIP;
}

/**
* Add file part copy
*
* @param index The index at which to insert the file part copy
* @param copyIP The IP of the new file part copy
* @param copyPath The path of the new file part copy
*/
public void add(int index, String copyIP, String copyPath) throws Exception
{
if (index < 0 || index > copyLocations.length)
{
throw new InvalidParameterException("Insertion index: " + index + " is invalid."
+ "Expected index in range of: [0," + copyLocations.length + "]");
}

if (copyIP == null || copyPath == null)
{
throw new InvalidParameterException("Copy IP or Path are invalid, must be non-null.");
}

List<String> copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if either index, copyIP, or copyPath are invalid, we should throw

copyLocationsList.add(index, copyIP);
copyLocations = copyLocationsList.toArray(new String[0]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a little bit extraneous, not sure what type copyLocations is, but are we not able to append the copyIP directly? and if not, are we using the most appropriate datastructure to track copyIPs, copypaths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, the existing type is a simple array; I had considered moving this to a List but other parts of the code are expecting an array, so it would require the internal list be converted to an Array at those locations. I thought it would be better to do the conversion here in the add() function because this unlikely to be used outside of test cases.


List<String> copyPathList = new ArrayList<>(Arrays.asList(copyPaths));
copyPathList.add(index, copyPath);
copyPaths = copyPathList.toArray(new String[0]);
}

/**
* Count of copies available for this file part.
* @return copy locations size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ public String getIP()
return this.dataPart.getCopyIP(prioritizedCopyIndexes.get(getFilePartCopy()));
}

private String getCopyPath()
{
return this.dataPart.getCopyPath(prioritizedCopyIndexes.get(getFilePartCopy()));
}

private int getFilePartCopy()
{
return filePartCopyIndexPointer;
Expand Down Expand Up @@ -1528,150 +1533,156 @@ private void makeActive() throws HpccFileException
this.active.set(false);
this.handle = 0;

try
boolean needsRetry = false;
do
{
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '"
+ getIP() + "'");

needsRetry = false;
try
{
if (getUseSSL())
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '"
+ (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'" + " for Path: '" + getCopyPath() + "'");
try
{
if (getUseSSL())
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
else
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
}

this.sock.setSoTimeout(socketOpTimeoutMs);

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
catch (java.net.UnknownHostException e)
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e);
}
else
catch (java.io.IOException e)
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
throw new HpccFileException(e);
}

this.sock.setSoTimeout(socketOpTimeoutMs);
try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
catch (java.net.UnknownHostException e)
{
throw new HpccFileException("Bad file part addr " + this.getIP(), e);
}
catch (java.io.IOException e)
{
throw new HpccFileException(e);
}
//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------

try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}
try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();

//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------
this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}

try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();
RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;

this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}
byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
}

RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;
rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
{
readTrans = makeTokenRequest();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
throw new HpccFileException("Failed on initial remote read read trans", e);
}

rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

try
{
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
if (CompileTimeConstants.PROFILE_CODE)
{
readTrans = makeTokenRequest();
firstByteTimeNS = System.nanoTime();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
this.active.set(true);
}
catch (IOException e)
catch (Exception e)
{
throw new HpccFileException("Failed on initial remote read read trans", e);
}
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (CompileTimeConstants.PROFILE_CODE)
{
firstByteTimeNS = System.nanoTime();
needsRetry = true;
if (!setNextFilePartCopy())
{
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
}

this.active.set(true);
}
catch (Exception e)
{
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (!setNextFilePartCopy())
// This should be a multi exception
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
} while (needsRetry);
}

/* Notes on protocol:
Expand Down
Loading
Loading