Skip to content

Commit

Permalink
resolve issue :datavane/tis#289 add modify process for reader ftp dir…
Browse files Browse the repository at this point in the history
…ectory
  • Loading branch information
baisui1981 committed Feb 26, 2024
1 parent 80132c8 commit 101a805
Showing 1 changed file with 80 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -146,59 +151,68 @@ public boolean isSymbolicLink(String filePath) {
return isExitFlag;
}

HashSet<Res> sourceFiles = new HashSet<Res>();

//HashSet<Res> sourceFiles = new HashSet<Res>();
@Override
public HashSet<Res> getListFiles(String directoryPath, int parentLevel, int maxTraversalLevel) {
return getListFiles(new HashSet<Res>(), directoryPath, new FTPChildPath(), parentLevel, maxTraversalLevel);
}


public HashSet<Res> getListFiles(HashSet<Res> sourceFiles, final String rootPath, FTPChildPath childPath, int parentLevel, int maxTraversalLevel) {
Objects.requireNonNull(sourceFiles, "param sourceFiles can not be null");
if (parentLevel < maxTraversalLevel) {
final String fullPath = childPath.withRootPath(rootPath);// rootPath + IOUtils.DIR_SEPARATOR + childPath.toString();
String parentPath = null;// 父级目录,以'/'结尾
int pathLen = directoryPath.length();
if (directoryPath.contains("*") || directoryPath.contains("?")) {
int pathLen = fullPath.length();
if (fullPath.contains("*") || fullPath.contains("?")) {
// path是正则表达式
String subPath = UnstructuredStorageReaderUtil.getRegexPathParentPath(directoryPath);
String subPath = UnstructuredStorageReaderUtil.getRegexPathParentPath(fullPath);
if (isDirExist(subPath)) {
parentPath = subPath;
} else {
String message = String.format("不能进入目录:[%s]," + "请确认您的配置项path:[%s]存在,且配置的用户有权限进入", subPath,
directoryPath);
fullPath);
LOG.error(message);
throw DataXException.asDataXException(FtpReaderErrorCode.FILE_NOT_EXISTS, message);
}
} else if (isDirExist(directoryPath)) {
} else if (isDirExist(fullPath)) {
// path是目录
if (directoryPath.charAt(pathLen - 1) == IOUtils.DIR_SEPARATOR) {
parentPath = directoryPath;
if (fullPath.charAt(pathLen - 1) == IOUtils.DIR_SEPARATOR) {
parentPath = fullPath;
} else {
parentPath = directoryPath + IOUtils.DIR_SEPARATOR;
parentPath = fullPath + IOUtils.DIR_SEPARATOR;
}
} else if (isFileExist(directoryPath)) {
} else if (isFileExist(fullPath)) {
// path指向具体文件
sourceFiles.add(new Res(directoryPath, directoryPath));
sourceFiles.add(new Res(fullPath, childPath.toString()));
return sourceFiles;
} else if (isSymbolicLink(directoryPath)) {
} else if (isSymbolicLink(fullPath)) {
//path是链接文件
String message = String.format("文件:[%s]是链接文件,当前不支持链接文件的读取", directoryPath);
String message = String.format("文件:[%s]是链接文件,当前不支持链接文件的读取", fullPath);
LOG.error(message);
throw DataXException.asDataXException(FtpReaderErrorCode.LINK_FILE, message);
} else {
String message = String.format("请确认您的配置项path:[%s]存在,且配置的用户有权限读取", directoryPath);
String message = String.format("请确认您的配置项path:[%s]存在,且配置的用户有权限读取", fullPath);
LOG.error(message);
throw DataXException.asDataXException(FtpReaderErrorCode.FILE_NOT_EXISTS, message);
}

try {
FTPFile[] fs = ftpClient.listFiles(new String(directoryPath.getBytes(), FTP.DEFAULT_CONTROL_ENCODING));
FTPFile[] fs = ftpClient.listFiles(new String(fullPath.getBytes(), FTP.DEFAULT_CONTROL_ENCODING));
for (FTPFile ff : fs) {
String strName = ff.getName();
String filePath = parentPath + strName;
final FTPChildPath childFilePath = childPath.append(strName);//(new StringBuffer(childPath)).append(IOUtils.DIR_SEPARATOR).append(strName);

if (ff.isDirectory()) {
if (!(strName.equals(".") || strName.equals(".."))) {
//递归处理
getListFiles(filePath, parentLevel + 1, maxTraversalLevel);
getListFiles(sourceFiles, rootPath
, childFilePath, parentLevel + 1, maxTraversalLevel);
}
} else if (ff.isFile()) {
// 是文件
sourceFiles.add(new Res(filePath, filePath));
sourceFiles.add(new Res(filePath, childFilePath.toString()));
} else if (ff.isSymbolicLink()) {
//是链接文件
String message = String.format("文件:[%s]是链接文件,当前不支持链接文件的读取", filePath);
Expand All @@ -211,20 +225,62 @@ public HashSet<Res> getListFiles(String directoryPath, int parentLevel, int maxT
}
} // end for FTPFile
} catch (IOException e) {
String message = String.format("获取path:[%s] 下文件列表时发生I/O异常,请确认与ftp服务器的连接正常", directoryPath);
String message = String.format("获取path:[%s] 下文件列表时发生I/O异常,请确认与ftp服务器的连接正常", fullPath);
LOG.error(message);
throw DataXException.asDataXException(FtpReaderErrorCode.COMMAND_FTP_IO_EXCEPTION, message, e);
}
return sourceFiles;

} else {
//超出最大递归层数
String message = String.format("获取path:[%s] 下文件列表时超出最大层数,请确认路径[%s]下不存在软连接文件", directoryPath, directoryPath);
LOG.error(message);
throw DataXException.asDataXException(FtpReaderErrorCode.OUT_MAX_DIRECTORY_LEVEL, message);
// String message = String.format("获取path:[%s] 下文件列表时超出最大层数,请确认路径[%s]下不存在软连接文件", directoryPath, directoryPath);
// LOG.error(message);
// throw DataXException.asDataXException(FtpReaderErrorCode.OUT_MAX_DIRECTORY_LEVEL, message);
return sourceFiles;
}
}


private static class FTPChildPath {
private StringBuffer childPath;

public FTPChildPath() {
this(null);
}

public FTPChildPath(StringBuffer childPath) {
this.childPath = childPath;
}

private String withRootPath(String rootPath) {
if (this.childPath == null) {
return rootPath;
} else {
return appendSeparator(rootPath) + childPath;
}
}

private FTPChildPath append(String fileName) {

if (this.childPath == null) {
return new FTPChildPath(new StringBuffer(fileName));
} else {
// final StringBuffer childFilePath = ;
return new FTPChildPath((new StringBuffer(appendSeparator(childPath.toString()))).append(fileName));
}
}

private static String appendSeparator(String path) {
return path + (StringUtils.endsWith(path, String.valueOf(IOUtils.DIR_SEPARATOR)) ? StringUtils.EMPTY : IOUtils.DIR_SEPARATOR);
}

@Override
public String toString() {
return Objects.requireNonNull(childPath).toString();
}
}


@Override
public InputStream getInputStream(String filePath) {
try {
Expand All @@ -236,7 +292,7 @@ public void close() throws IOException {
// 只能被关闭一次
if (closed.compareAndSet(false, true)) {
if (!ftpClient.completePendingCommand()) {
throw new IOException("completePendingCommand faild");
// throw new IOException("completePendingCommand faild");
}
}
}
Expand Down

0 comments on commit 101a805

Please sign in to comment.