Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Apr 28, 2024
1 parent f992be1 commit 490fd60
Showing 1 changed file with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -176,7 +175,7 @@ private PythonDependencyManager(ReadableConfig config) {
private void addPythonFile(Configuration pythonDependencyConfig, String filePath) {
Preconditions.checkNotNull(filePath);
String fileKey = generateUniqueFileKey(PYTHON_FILE_PREFIX, filePath);
registerCachedFileIfNotExist(config, fileKey, filePath);
registerCachedFileIfNotExist(fileKey, filePath);
if (!pythonDependencyConfig.contains(PYTHON_FILES_DISTRIBUTED_CACHE_INFO)) {
pythonDependencyConfig.set(
PYTHON_FILES_DISTRIBUTED_CACHE_INFO, new LinkedHashMap<>());
Expand Down Expand Up @@ -222,7 +221,7 @@ private void setPythonRequirements(

String fileKey =
generateUniqueFileKey(PYTHON_REQUIREMENTS_FILE_PREFIX, requirementsFilePath);
registerCachedFileIfNotExist(config, fileKey, requirementsFilePath);
registerCachedFileIfNotExist(fileKey, requirementsFilePath);
pythonDependencyConfig
.get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO)
.put(FILE, fileKey);
Expand All @@ -231,7 +230,7 @@ private void setPythonRequirements(
String cacheDirKey =
generateUniqueFileKey(
PYTHON_REQUIREMENTS_CACHE_PREFIX, requirementsCachedDir);
registerCachedFileIfNotExist(config, cacheDirKey, requirementsCachedDir);
registerCachedFileIfNotExist(cacheDirKey, requirementsCachedDir);
pythonDependencyConfig
.get(PYTHON_REQUIREMENTS_FILE_DISTRIBUTED_CACHE_INFO)
.put(CACHE, cacheDirKey);
Expand All @@ -256,7 +255,7 @@ private void addPythonArchive(
String fileKey =
generateUniqueFileKey(
PYTHON_ARCHIVE_PREFIX, archivePath + PARAM_DELIMITER + targetDir);
registerCachedFileIfNotExist(config, fileKey, archivePath);
registerCachedFileIfNotExist(fileKey, archivePath);
pythonDependencyConfig
.get(PYTHON_ARCHIVES_DISTRIBUTED_CACHE_INFO)
.put(fileKey, targetDir);
Expand Down Expand Up @@ -334,19 +333,36 @@ private String generateUniqueFileKey(String prefix, String hashString) {
"%s_%s", prefix, StringUtils.byteToHexString(messageDigest.digest()));
}

private void registerCachedFileIfNotExist(ReadableConfig config, String name, String path) {
final Set<String> cachedFiles =
config.getOptional(PipelineOptions.CACHED_FILES)
.map(LinkedHashSet::new)
.orElseGet(LinkedHashSet::new);
private void registerCachedFileIfNotExist(String name, String path) {
final List<Tuple2<String, String>> cachedFilePairs =
config.getOptional(PipelineOptions.CACHED_FILES).orElse(new ArrayList<>())
.stream()
.map(
m ->
Tuple2.of(
ConfigurationUtils.parseStringToMap(m)
.get("name"),
m))
.collect(Collectors.toList());

final Set<String> cachedFileNames =
cachedFilePairs.stream()
.map(f -> (String) f.getField(0))
.collect(Collectors.toSet());
if (cachedFileNames.contains(name)) {
return;
}

final List<String> cachedFiles =
cachedFilePairs.stream()
.map(f -> (String) f.getField(1))
.collect(Collectors.toList());
Map<String, String> map = new HashMap<>();
map.put("name", name);
map.put("path", path);
cachedFiles.add(ConfigurationUtils.convertValue(map, String.class));

((WritableConfig) config)
.set(PipelineOptions.CACHED_FILES, new ArrayList<>(cachedFiles));
((WritableConfig) config).set(PipelineOptions.CACHED_FILES, cachedFiles);
}

private void removeCachedFilesByPrefix(String prefix) {
Expand Down

0 comments on commit 490fd60

Please sign in to comment.