From d6cd1e13f800269b6328626782d70d3efa3d8830 Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Mon, 9 Dec 2024 16:52:08 +0000 Subject: [PATCH 1/3] parallel raptor cache generation --- .../transit/RaptorTransferIndex.java | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java index 1d9b804067c..f8d0cf1f696 100644 --- a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java +++ b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.List; import java.util.function.Function; +import java.util.stream.IntStream; import org.opentripplanner.raptor.api.model.RaptorTransfer; import org.opentripplanner.street.search.request.StreetSearchRequest; @@ -35,27 +36,33 @@ public static RaptorTransferIndex create( reversedTransfers.add(new ArrayList<>()); } - for (int fromStop = 0; fromStop < transfersByStopIndex.size(); fromStop++) { - // The transfers are filtered so that there is only one possible directional transfer - // for a stop pair. - var transfers = transfersByStopIndex - .get(fromStop) - .stream() - .flatMap(s -> s.asRaptorTransfer(request).stream()) - .collect( - toMap(RaptorTransfer::stop, Function.identity(), (a, b) -> a.c1() < b.c1() ? a : b) - ) - .values(); + IntStream + .range(0, transfersByStopIndex.size()) + .parallel() + .forEach(fromStop -> { + // The transfers are filtered so that there is only one possible directional transfer + // for a stop pair. + var transfers = transfersByStopIndex + .get(fromStop) + .stream() + .flatMap(s -> s.asRaptorTransfer(request).stream()) + .collect( + toMap(RaptorTransfer::stop, Function.identity(), (a, b) -> a.c1() < b.c1() ? a : b) + ) + .values(); - forwardTransfers.get(fromStop).addAll(transfers); + // forwardTransfers is not modified here, and no two threads will access the same element + // in it, so this is still thread safe. + forwardTransfers.get(fromStop).addAll(transfers); + }); - for (RaptorTransfer forwardTransfer : transfers) { + for (int fromStop = 0; fromStop < transfersByStopIndex.size(); fromStop++) { + for (var forwardTransfer : forwardTransfers.get(fromStop)) { reversedTransfers .get(forwardTransfer.stop()) .add(DefaultRaptorTransfer.reverseOf(fromStop, forwardTransfer)); } } - return new RaptorTransferIndex(forwardTransfers, reversedTransfers); } From 1bf3f655ab25bb7194d8d0980665c58d34b4f359 Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 12 Dec 2024 13:43:09 +0000 Subject: [PATCH 2/3] add feature check around the parallel operation --- .../transit/RaptorTransferIndex.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java index f8d0cf1f696..aa335939086 100644 --- a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java +++ b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.function.Function; import java.util.stream.IntStream; +import org.opentripplanner.framework.application.OTPFeature; import org.opentripplanner.raptor.api.model.RaptorTransfer; import org.opentripplanner.street.search.request.StreetSearchRequest; @@ -36,25 +37,26 @@ public static RaptorTransferIndex create( reversedTransfers.add(new ArrayList<>()); } - IntStream - .range(0, transfersByStopIndex.size()) - .parallel() - .forEach(fromStop -> { - // The transfers are filtered so that there is only one possible directional transfer - // for a stop pair. - var transfers = transfersByStopIndex - .get(fromStop) - .stream() - .flatMap(s -> s.asRaptorTransfer(request).stream()) - .collect( - toMap(RaptorTransfer::stop, Function.identity(), (a, b) -> a.c1() < b.c1() ? a : b) - ) - .values(); + var stopIndices = IntStream.range(0, transfersByStopIndex.size()); + if (OTPFeature.ParallelRouting.isOn()) { + stopIndices = stopIndices.parallel(); + } + stopIndices.forEach(fromStop -> { + // The transfers are filtered so that there is only one possible directional transfer + // for a stop pair. + var transfers = transfersByStopIndex + .get(fromStop) + .stream() + .flatMap(s -> s.asRaptorTransfer(request).stream()) + .collect( + toMap(RaptorTransfer::stop, Function.identity(), (a, b) -> a.c1() < b.c1() ? a : b) + ) + .values(); - // forwardTransfers is not modified here, and no two threads will access the same element - // in it, so this is still thread safe. - forwardTransfers.get(fromStop).addAll(transfers); - }); + // forwardTransfers is not modified here, and no two threads will access the same element + // in it, so this is still thread safe. + forwardTransfers.get(fromStop).addAll(transfers); + }); for (int fromStop = 0; fromStop < transfersByStopIndex.size(); fromStop++) { for (var forwardTransfer : forwardTransfers.get(fromStop)) { From cded8677ecdae7553713395976c5502b20e41661 Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 12 Dec 2024 15:16:49 +0000 Subject: [PATCH 3/3] Always parallelize cache building during server startup --- .../raptoradapter/transit/RaptorTransferIndex.java | 13 +++++++++++-- .../transit/request/RaptorRequestTransferCache.java | 5 +++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java index aa335939086..61ece117388 100644 --- a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java +++ b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/RaptorTransferIndex.java @@ -25,9 +25,16 @@ public RaptorTransferIndex( this.reversedTransfers = reversedTransfers.stream().map(List::copyOf).toArray(List[]::new); } + /** + * Create an index to be put into the transfer cache + * + * @param isRuntimeRequest true if the request originates from the client during the runtime, + * false if the request comes from transferCacheRequests in router-config.json + */ public static RaptorTransferIndex create( List> transfersByStopIndex, - StreetSearchRequest request + StreetSearchRequest request, + boolean isRuntimeRequest ) { var forwardTransfers = new ArrayList>(transfersByStopIndex.size()); var reversedTransfers = new ArrayList>(transfersByStopIndex.size()); @@ -38,7 +45,9 @@ public static RaptorTransferIndex create( } var stopIndices = IntStream.range(0, transfersByStopIndex.size()); - if (OTPFeature.ParallelRouting.isOn()) { + // we want to always parallelize the cache building during the startup + // and only parallelize during runtime requests if the feature flag is on + if (!isRuntimeRequest || OTPFeature.ParallelRouting.isOn()) { stopIndices = stopIndices.parallel(); } stopIndices.forEach(fromStop -> { diff --git a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/request/RaptorRequestTransferCache.java b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/request/RaptorRequestTransferCache.java index d778f491142..c7e09e12729 100644 --- a/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/request/RaptorRequestTransferCache.java +++ b/application/src/main/java/org/opentripplanner/routing/algorithm/raptoradapter/transit/request/RaptorRequestTransferCache.java @@ -38,7 +38,8 @@ public void put(List> transfersByStopIndex, RouteRequest request) final CacheKey cacheKey = new CacheKey(transfersByStopIndex, request); final RaptorTransferIndex raptorTransferIndex = RaptorTransferIndex.create( transfersByStopIndex, - cacheKey.request + cacheKey.request, + false ); LOG.info("Initializing cache with request: {}", cacheKey.options); @@ -58,7 +59,7 @@ private CacheLoader cacheLoader() { @Override public RaptorTransferIndex load(CacheKey cacheKey) { LOG.info("Adding runtime request to cache: {}", cacheKey.options); - return RaptorTransferIndex.create(cacheKey.transfersByStopIndex, cacheKey.request); + return RaptorTransferIndex.create(cacheKey.transfersByStopIndex, cacheKey.request, true); } }; }