From 6dca48f14e80f3dcd816cdbc4c9b07a12ce0a1ba Mon Sep 17 00:00:00 2001 From: James McMullan Date: Thu, 12 Oct 2023 14:21:55 -0400 Subject: [PATCH 1/2] HPCC4J-542 DFSClient: Create JUnit for read retry - Added file part failure retry test - Fixed retry issue on initial connection Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/DataPartition.java | 35 +++ .../dfs/client/RowServiceInputStream.java | 236 +++++++++--------- .../dfs/client/DFSReadWriteTest.java | 47 ++++ 3 files changed, 203 insertions(+), 115 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index bc648b49c..9ecdd07cc 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -16,6 +16,9 @@ package org.hpccsystems.dfs.client; import java.io.Serializable; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; import org.hpccsystems.commons.ecl.FileFilter; import org.hpccsystems.commons.errors.HpccFileException; @@ -266,6 +269,38 @@ public String getCopyIP(int copyindex) return copyLocations[copyindex]; } + /** + * Set the copy IP + * + * @param copyIndex + * the copyindex + * @param copyIP The IP of the file part copy + */ + public void setCopyIP(int copyIndex, String copyIP) + { + if (copyIndex < 0 || copyIndex >= copyLocations.length) return; + + copyLocations[copyIndex] = copyIP; + } + + /** + * Add file part copy + * + * @param index The index at which to insert the file part copy + * @param copyIP The IP of the new file part copy + * @param copyPath The path of the new file part copy + */ + public void add(int index, String copyIP, String copyPath) + { + List copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations)); + copyLocationsList.add(index, copyIP); + copyLocations = copyLocationsList.toArray(new String[0]); + + List copyPathList = new ArrayList<>(Arrays.asList(copyPaths)); + copyPathList.add(index, copyPath); + copyPaths = copyPathList.toArray(new String[0]); + } + /** * Count of copies available for this file part. * @return copy locations size diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 795fd8478..d8218b767 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -1528,150 +1528,156 @@ private void makeActive() throws HpccFileException this.active.set(false); this.handle = 0; - try + boolean needsRetry = false; + do { - log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '" - + getIP() + "'"); - + needsRetry = false; try { - if (getUseSSL()) + log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'"); + try { - SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); - sock = (SSLSocket) ssf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); - - log.debug("Attempting SSL handshake..."); - ((SSLSocket) sock).startHandshake(); - log.debug("SSL handshake successful..."); - log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + if (getUseSSL()) + { + SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); + sock = (SSLSocket) ssf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + + log.debug("Attempting SSL handshake..."); + ((SSLSocket) sock).startHandshake(); + log.debug("SSL handshake successful..."); + log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + } + else + { + SocketFactory sf = SocketFactory.getDefault(); + sock = sf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + } + + this.sock.setSoTimeout(socketOpTimeoutMs); + + log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); } - else + catch (java.net.UnknownHostException e) { - SocketFactory sf = SocketFactory.getDefault(); - sock = sf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + throw new HpccFileException("Bad file part addr " + this.getIP(), e); + } + catch (java.io.IOException e) + { + throw new HpccFileException(e); } - this.sock.setSoTimeout(socketOpTimeoutMs); + try + { + this.dos = new java.io.DataOutputStream(sock.getOutputStream()); + this.dis = new java.io.DataInputStream(sock.getInputStream()); + } + catch (java.io.IOException e) + { + throw new HpccFileException("Failed to create streams", e); + } - log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); - } - catch (java.net.UnknownHostException e) - { - throw new HpccFileException("Bad file part addr " + this.getIP(), e); - } - catch (java.io.IOException e) - { - throw new HpccFileException(e); - } + //------------------------------------------------------------------------------ + // Check protocol version + //------------------------------------------------------------------------------ - try - { - this.dos = new java.io.DataOutputStream(sock.getOutputStream()); - this.dis = new java.io.DataInputStream(sock.getInputStream()); - } - catch (java.io.IOException e) - { - throw new HpccFileException("Failed to create streams", e); - } + try + { + String msg = makeGetVersionRequest(); + int msgLen = msg.length(); - //------------------------------------------------------------------------------ - // Check protocol version - //------------------------------------------------------------------------------ + this.dos.writeInt(msgLen); + this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); + this.dos.flush(); + } + catch (IOException e) + { + throw new HpccFileException("Failed on initial remote read trans", e); + } - try - { - String msg = makeGetVersionRequest(); - int msgLen = msg.length(); + RowServiceResponse response = readResponse(); + if (response.len == 0) + { + useOldProtocol = true; + } + else + { + useOldProtocol = false; - this.dos.writeInt(msgLen); - this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); - this.dos.flush(); - } - catch (IOException e) - { - throw new HpccFileException("Failed on initial remote read trans", e); - } + byte[] versionBytes = new byte[response.len]; + try + { + this.dis.readFully(versionBytes); + } + catch (IOException e) + { + throw new HpccFileException("Error while attempting to read version response.", e); + } - RowServiceResponse response = readResponse(); - if (response.len == 0) - { - useOldProtocol = true; - } - else - { - useOldProtocol = false; + rowServiceVersion = new String(versionBytes, HPCCCharSet); + } + + //------------------------------------------------------------------------------ + // Send initial read request + //------------------------------------------------------------------------------ - byte[] versionBytes = new byte[response.len]; try { - this.dis.readFully(versionBytes); + String readTrans = null; + if (this.tokenBin == null) + { + this.tokenBin = new byte[0]; + readTrans = makeInitialRequest(); + } + else + { + readTrans = makeTokenRequest(); + } + + int transLen = readTrans.length(); + this.dos.writeInt(transLen); + this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); + this.dos.flush(); } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + throw new HpccFileException("Failed on initial remote read read trans", e); } - rowServiceVersion = new String(versionBytes, HPCCCharSet); - } - - //------------------------------------------------------------------------------ - // Send initial read request - //------------------------------------------------------------------------------ - - try - { - String readTrans = null; - if (this.tokenBin == null) - { - this.tokenBin = new byte[0]; - readTrans = makeInitialRequest(); - } - else + if (CompileTimeConstants.PROFILE_CODE) { - readTrans = makeTokenRequest(); + firstByteTimeNS = System.nanoTime(); } - int transLen = readTrans.length(); - this.dos.writeInt(transLen); - this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); - this.dos.flush(); + this.active.set(true); } - catch (IOException e) + catch (Exception e) { - throw new HpccFileException("Failed on initial remote read read trans", e); - } + log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + + "'"); + log.error(e.getMessage()); - if (CompileTimeConstants.PROFILE_CODE) - { - firstByteTimeNS = System.nanoTime(); + needsRetry = true; + if (!setNextFilePartCopy()) + { + throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); + } } - - this.active.set(true); - } - catch (Exception e) - { - log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() - + "'"); - log.error(e.getMessage()); - - if (!setNextFilePartCopy()) - // This should be a multi exception - throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); - } + } while (needsRetry); } /* Notes on protocol: diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 2fc59c86d..2c6f0cd80 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -1039,6 +1039,53 @@ public void emptyCompressedFileTest() } } + @Test + public void filePartReadRetryTest() + { + { + HPCCFile readFile = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + for (int i = 0; i < fileParts.length; i++) + { + String firstCopyIP = fileParts[i].getCopyIP(0); + String firstCopyPath = fileParts[i].getCopyPath(0); + fileParts[i].setCopyIP(0, "1.1.1.1"); + fileParts[i].add(1, firstCopyIP, firstCopyPath); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + Assert.fail(e.getMessage()); + } + } + + { + HPCCFile readFile = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + for (int i = 0; i < fileParts.length; i++) + { + fileParts[i].add(0,"1.1.1.1", ""); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + Assert.fail(e.getMessage()); + } + } + } + @Test public void invalidSignatureTest() { From 280aaddf6aeddb1e14f106fa8212f5e1a48a51ed Mon Sep 17 00:00:00 2001 From: James McMullan Date: Fri, 13 Oct 2023 08:59:40 -0400 Subject: [PATCH 2/2] Code review changes --- .../hpccsystems/dfs/client/DataPartition.java | 24 ++++++++++++++++--- .../dfs/client/RowServiceInputStream.java | 9 +++++-- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index 9ecdd07cc..589e62f4c 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -16,6 +16,7 @@ package org.hpccsystems.dfs.client; import java.io.Serializable; +import java.security.InvalidParameterException; import java.util.List; import java.util.ArrayList; import java.util.Arrays; @@ -264,7 +265,10 @@ public String[] getCopyLocations() public String getCopyIP(int copyindex) { int copiescount = copyLocations.length; - if (copyindex < 0 || copyindex >= copiescount) return null; + if (copyindex < 0 || copyindex >= copiescount) + { + return null; + } return copyLocations[copyindex]; } @@ -278,7 +282,10 @@ public String getCopyIP(int copyindex) */ public void setCopyIP(int copyIndex, String copyIP) { - if (copyIndex < 0 || copyIndex >= copyLocations.length) return; + if (copyIndex < 0 || copyIndex >= copyLocations.length) + { + return; + } copyLocations[copyIndex] = copyIP; } @@ -290,8 +297,19 @@ public void setCopyIP(int copyIndex, String copyIP) * @param copyIP The IP of the new file part copy * @param copyPath The path of the new file part copy */ - public void add(int index, String copyIP, String copyPath) + public void add(int index, String copyIP, String copyPath) throws Exception { + if (index < 0 || index > copyLocations.length) + { + throw new InvalidParameterException("Insertion index: " + index + " is invalid." + + "Expected index in range of: [0," + copyLocations.length + "]"); + } + + if (copyIP == null || copyPath == null) + { + throw new InvalidParameterException("Copy IP or Path are invalid, must be non-null."); + } + List copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations)); copyLocationsList.add(index, copyIP); copyLocations = copyLocationsList.toArray(new String[0]); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index d8218b767..0ea652111 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -520,6 +520,11 @@ public String getIP() return this.dataPart.getCopyIP(prioritizedCopyIndexes.get(getFilePartCopy())); } + private String getCopyPath() + { + return this.dataPart.getCopyPath(prioritizedCopyIndexes.get(getFilePartCopy())); + } + private int getFilePartCopy() { return filePartCopyIndexPointer; @@ -1535,7 +1540,7 @@ private void makeActive() throws HpccFileException try { log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" - + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'"); + + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'" + " for Path: '" + getCopyPath() + "'"); try { if (getUseSSL()) @@ -1574,7 +1579,7 @@ private void makeActive() throws HpccFileException } catch (java.net.UnknownHostException e) { - throw new HpccFileException("Bad file part addr " + this.getIP(), e); + throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e); } catch (java.io.IOException e) {