Skip to content

Commit

Permalink
Rewrite downloader using virtual threads (#702)
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry authored Nov 1, 2023
1 parent bf72949 commit 44f22b2
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ private Path getPath(String name, String type, Path defaultPath, String defaultU

private void download() {
var timer = stats.startStage("download");
Downloader downloader = Downloader.create(config(), stats());
Downloader downloader = Downloader.create(config());
for (ToDownload toDownload : toDownload) {
if (profile.caresAboutSource(toDownload.id)) {
downloader.add(toDownload.id, toDownload.url, toDownload.path);
Expand All @@ -919,7 +919,7 @@ private void download() {
private void ensureInputFilesExist() {
for (InputPath inputPath : inputPaths) {
if (profile.caresAboutSource(inputPath.id) && !Files.exists(inputPath.path)) {
throw new IllegalArgumentException(inputPath.path + " does not exist");
throw new IllegalArgumentException(inputPath.path + " does not exist. Run with --download to fetch it");
}
}
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.onthegomap.planetiler.util;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.ClosedFileSystemException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
Expand Down Expand Up @@ -263,7 +268,7 @@ public static void unzipResource(String resource, Path dest) {
* @throws UncheckedIOException if an IO exception occurs
*/
public static void safeCopy(InputStream inputStream, Path destPath) {
try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
try (var outputStream = Files.newOutputStream(destPath, StandardOpenOption.CREATE, WRITE)) {
int totalSize = 0;

int nBytes;
Expand Down Expand Up @@ -310,7 +315,7 @@ public static void unzip(InputStream input, Path destDir) {

try (
var out = Files.newOutputStream(destination, StandardOpenOption.CREATE_NEW,
StandardOpenOption.WRITE)
WRITE)
) {
totalEntryArchive++;
while ((nBytes = zip.read(buffer)) > 0) {
Expand Down Expand Up @@ -366,4 +371,16 @@ public static boolean isNewer(Path src, Path dest) {
return true;
}
}

/** Expands the file at {@code path} to {@code size} bytes. */
public static void setLength(Path path, long size) {
try (var fc = FileChannel.open(path, CREATE, WRITE)) {
int written = fc.write(ByteBuffer.allocate(1), size - 1);
if (written != 1) {
throw new IOException("Unable to expand " + path + " to " + size);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TopOsmTiles {
TopOsmTiles(PlanetilerConfig config, Stats stats) {
this.config = config;
this.stats = stats;
downloader = Downloader.create(config, stats);
downloader = Downloader.create(config);
}

Reader fetch(LocalDate date) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ default void runAndWrapException() {
throwFatalException(e);
}
}

static Runnable wrap(RunnableThatThrows thrower) {
return thrower::runAndWrapException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@
import static org.junit.jupiter.api.Assertions.*;

import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.stats.Stats;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -25,26 +23,25 @@ class DownloaderTest {
@TempDir
Path path;
private final PlanetilerConfig config = PlanetilerConfig.defaults();
private final Stats stats = Stats.inMemory();
private long downloads = 0;
private AtomicLong downloads = new AtomicLong(0);

private Downloader mockDownloader(Map<String, byte[]> resources, boolean supportsRange, int maxLength) {
return new Downloader(config, stats, 2L) {
private Downloader mockDownloader(Map<String, byte[]> resources, boolean supportsRange) {
return new Downloader(config, 2L) {

@Override
InputStream openStream(String url) {
downloads++;
downloads.incrementAndGet();
assertTrue(resources.containsKey(url), "no resource for " + url);
byte[] bytes = resources.get(url);
return new ByteArrayInputStream(maxLength < bytes.length ? Arrays.copyOf(bytes, maxLength) : bytes);
return new ByteArrayInputStream(bytes);
}

@Override
InputStream openStreamRange(String url, long start, long end) {
assertTrue(supportsRange, "does not support range");
downloads++;
downloads.incrementAndGet();
assertTrue(resources.containsKey(url), "no resource for " + url);
byte[] result = new byte[Math.min(maxLength, (int) (end - start))];
byte[] result = new byte[(int) (end - start)];
byte[] bytes = resources.get(url);
for (int i = (int) start; i < start + result.length; i++) {
result[(int) (i - start)] = bytes[i];
Expand All @@ -53,39 +50,36 @@ InputStream openStreamRange(String url, long start, long end) {
}

@Override
CompletableFuture<ResourceMetadata> httpHead(String url) {
ResourceMetadata httpHead(String url) {
String[] parts = url.split("#");
if (parts.length > 1) {
int redirectNum = Integer.parseInt(parts[1]);
String next = redirectNum <= 1 ? parts[0] : (parts[0] + "#" + (redirectNum - 1));
return CompletableFuture.supplyAsync(
() -> new ResourceMetadata(Optional.of(next), url, 0, supportsRange));
return new ResourceMetadata(Optional.of(next), url, 0, supportsRange);
}
byte[] bytes = resources.get(url);
return CompletableFuture.supplyAsync(
() -> new ResourceMetadata(Optional.empty(), url, bytes.length, supportsRange));
return new ResourceMetadata(Optional.empty(), url, bytes.length, supportsRange);
}
};
}

@ParameterizedTest
@CsvSource({
"false,100,0",
"true,100,0",
"true,2,0",
"false,100,1",
"false,100,2",
"true,2,4",
"false,0",
"true,0",
"false,1",
"false,2",
"true,4",
})
void testDownload(boolean range, int maxLength, int redirects) throws Exception {
void testDownload(boolean range, int redirects) throws Exception {
Path dest = path.resolve("out");
String string = "0123456789";
String url = "http://url";
String initialUrl = url + (redirects > 0 ? "#" + redirects : "");
Map<String, byte[]> resources = new ConcurrentHashMap<>();

byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
Downloader downloader = mockDownloader(resources, range, maxLength);
Downloader downloader = mockDownloader(resources, range);

// fails if no data
var resource1 = new Downloader.ResourceToDownload("resource", initialUrl, dest);
Expand All @@ -102,10 +96,10 @@ void testDownload(boolean range, int maxLength, int redirects) throws Exception
assertEquals(10, resource2.bytesDownloaded());

// does not re-request if size is the same
downloads = 0;
downloads.set(0);
var resource3 = new Downloader.ResourceToDownload("resource", initialUrl, dest);
downloader.downloadIfNecessary(resource3).get();
assertEquals(0, downloads);
assertEquals(0, downloads.get());
assertEquals(string, Files.readString(dest));
assertEquals(FileUtils.size(path), FileUtils.size(dest));
assertEquals(0, resource3.bytesDownloaded());
Expand All @@ -115,15 +109,15 @@ void testDownload(boolean range, int maxLength, int redirects) throws Exception
String newContent = "54321";
resources.put(url, newContent.getBytes(StandardCharsets.UTF_8));
downloader.downloadIfNecessary(resource4).get();
assertTrue(downloads > 0, "downloads were " + downloads);
assertTrue(downloads.get() > 0, "downloads were " + downloads);
assertEquals(newContent, Files.readString(dest));
assertEquals(FileUtils.size(path), FileUtils.size(dest));
assertEquals(5, resource4.bytesDownloaded());
}

@Test
void testDownloadFailsIfTooBig() {
var downloader = new Downloader(config, stats, 2L) {
var downloader = new Downloader(config, 2L) {

@Override
InputStream openStream(String url) {
Expand All @@ -136,8 +130,8 @@ InputStream openStreamRange(String url, long start, long end) {
}

@Override
CompletableFuture<ResourceMetadata> httpHead(String url) {
return CompletableFuture.completedFuture(new ResourceMetadata(Optional.empty(), url, Long.MAX_VALUE, true));
ResourceMetadata httpHead(String url) {
return new ResourceMetadata(Optional.empty(), url, Long.MAX_VALUE, true);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,11 @@ void testWalkPathWithPatternSingleZip() {
List.of("/shapefile/stations.shp", "/shapefile/stations.shx"),
matchingPaths.stream().map(Path::toString).sorted().toList());
}

@Test
void testExpandFile() throws IOException {
Path path = tmpDir.resolve("toExpand");
FileUtils.setLength(path, 1000);
assertEquals(1000, Files.size(path));
}
}

0 comments on commit 44f22b2

Please sign in to comment.