Skip to content

Commit

Permalink
Merge pull request opentripplanner#5892 from leonardehrenfried/update…
Browse files Browse the repository at this point in the history
…-semantics

Use enum instead of boolean for real time update semantics
  • Loading branch information
leonardehrenfried authored Jun 13, 2024
2 parents b9e54d2 + dfce217 commit b1a7c53
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.opentripplanner.updater.spi.UpdateError.UpdateErrorType.TRIP_NOT_FOUND;
import static org.opentripplanner.updater.spi.UpdateError.UpdateErrorType.TRIP_NOT_FOUND_IN_PATTERN;
import static org.opentripplanner.updater.spi.UpdateError.UpdateErrorType.UNKNOWN;
import static org.opentripplanner.updater.trip.UpdateIncrementality.FULL_DATASET;

import java.time.LocalDate;
import java.util.ArrayList;
Expand All @@ -30,6 +31,7 @@
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.UpdateSuccess;
import org.opentripplanner.updater.trip.TimetableSnapshotManager;
import org.opentripplanner.updater.trip.UpdateIncrementality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
Expand Down Expand Up @@ -83,15 +85,16 @@ public SiriTimetableSnapshotSource(
* FIXME RT_AB: TripUpdate is the GTFS term, and these SIRI ETs are never converted into that
* same internal model.
*
* @param fullDataset true iff the list with updates represent all updates that are active right
* now, i.e. all previous updates should be disregarded
* @param updates SIRI EstimatedTimetable deliveries that should be applied atomically.
* @param incrementality the incrementality of the update, for example if updates represent all
* updates that are active right now, i.e. all previous updates should be
* disregarded
* @param updates SIRI EstimatedTimetable deliveries that should be applied atomically.
*/
public UpdateResult applyEstimatedTimetable(
@Nullable SiriFuzzyTripMatcher fuzzyTripMatcher,
EntityResolver entityResolver,
String feedId,
boolean fullDataset,
UpdateIncrementality incrementality,
List<EstimatedTimetableDeliveryStructure> updates
) {
if (updates == null) {
Expand All @@ -102,7 +105,7 @@ public UpdateResult applyEstimatedTimetable(
List<Result<UpdateSuccess, UpdateError>> results = new ArrayList<>();

snapshotManager.withLock(() -> {
if (fullDataset) {
if (incrementality == FULL_DATASET) {
// Remove all updates from the buffer
snapshotManager.clearBuffer(feedId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opentripplanner.ext.siri.updater;

import java.util.Optional;
import org.opentripplanner.updater.trip.UpdateIncrementality;
import uk.org.siri.siri20.Siri;

/**
Expand All @@ -18,10 +19,10 @@ public interface EstimatedTimetableSource {
Optional<Siri> getUpdates();

/**
* @return true iff the last list with updates represent all updates that are active right now,
* i.e. all previous updates should be disregarded
* @return The incrementality of the last collection of updates.
* {@link UpdateIncrementality}
*/
boolean getFullDatasetValueOfLastUpdates();
UpdateIncrementality incrementalityOfLastUpdates();

String getFeedId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.UpdateIncrementality;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -372,7 +373,7 @@ private void processSiriData(ByteString data) {
fuzzyTripMatcher,
entityResolver,
feedId,
false,
UpdateIncrementality.DIFFERENTIAL,
estimatedTimetableDeliveries
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.opentripplanner.ext.siri.updater;

import static org.opentripplanner.updater.trip.UpdateIncrementality.DIFFERENTIAL;
import static org.opentripplanner.updater.trip.UpdateIncrementality.FULL_DATASET;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import org.opentripplanner.framework.io.OtpHttpClientException;
import org.opentripplanner.updater.spi.HttpHeaders;
import org.opentripplanner.updater.trip.UpdateIncrementality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.Siri;
Expand All @@ -26,10 +30,9 @@ public class SiriETHttpTripUpdateSource implements EstimatedTimetableSource {
private final String requestorRef;

/**
* True iff the last list with updates represent all updates that are active right now, i.e. all
* previous updates should be disregarded
* The incrementality of the last received collection of updates.
*/
private boolean fullDataset = true;
private UpdateIncrementality updateIncrementality = FULL_DATASET;
private ZonedDateTime lastTimestamp = ZonedDateTime.now().minusMonths(1);

public SiriETHttpTripUpdateSource(Parameters parameters) {
Expand Down Expand Up @@ -61,7 +64,7 @@ public Optional<Siri> getUpdates() {
lastTimestamp = serviceDelivery.getResponseTimestamp();

//All subsequent requests will return changes since last request
fullDataset = false;
updateIncrementality = DIFFERENTIAL;
return siri;
} catch (OtpHttpClientException e) {
LOG.info("Failed after {} ms", (System.currentTimeMillis() - t1));
Expand All @@ -74,8 +77,8 @@ public Optional<Siri> getUpdates() {
}

@Override
public boolean getFullDatasetValueOfLastUpdates() {
return fullDataset;
public UpdateIncrementality incrementalityOfLastUpdates() {
return updateIncrementality;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void runPolling() {
do {
var updates = updateSource.getUpdates();
if (updates.isPresent()) {
boolean fullDataset = updateSource.getFullDatasetValueOfLastUpdates();
var incrementality = updateSource.incrementalityOfLastUpdates();
ServiceDelivery serviceDelivery = updates.get().getServiceDelivery();
moreData = Boolean.TRUE.equals(serviceDelivery.isMoreData());
// Mark this updater as primed after last page of updates. Copy moreData into a final
Expand All @@ -104,7 +104,7 @@ public void runPolling() {
fuzzyTripMatcher,
entityResolver,
feedId,
fullDataset,
incrementality,
etds
);
ResultLogger.logUpdateResult(feedId, "siri-et", result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.spi.ResultLogger;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.trip.UpdateIncrementality;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
Expand Down Expand Up @@ -102,7 +103,7 @@ private Future<?> processMessage(List<EstimatedTimetableDeliveryStructure> updat
fuzzyTripMatcher(),
entityResolver(),
feedId,
false,
UpdateIncrementality.DIFFERENTIAL,
updates
);
ResultLogger.logUpdateResultErrors(feedId, "siri-et", result);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.opentripplanner.updater.trip;

import static org.opentripplanner.updater.trip.UpdateIncrementality.DIFFERENTIAL;
import static org.opentripplanner.updater.trip.UpdateIncrementality.FULL_DATASET;

import com.google.protobuf.ExtensionRegistry;
import com.google.transit.realtime.GtfsRealtime;
import com.google.transit.realtime.GtfsRealtime.FeedEntity;
Expand All @@ -25,7 +28,7 @@ public class GtfsRealtimeTripUpdateSource {
private final String feedId;
private final String url;
private final HttpHeaders headers;
private boolean fullDataset = true;
private UpdateIncrementality updateIncrementality = FULL_DATASET;
private final ExtensionRegistry registry = ExtensionRegistry.newInstance();
private final OtpHttpClient otpHttpClient;

Expand All @@ -41,7 +44,7 @@ public List<TripUpdate> getUpdates() {
FeedMessage feedMessage;
List<FeedEntity> feedEntityList;
List<TripUpdate> updates = null;
fullDataset = true;
updateIncrementality = FULL_DATASET;
try {
// Decode message
feedMessage =
Expand All @@ -61,7 +64,7 @@ public List<TripUpdate> getUpdates() {
.getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)
) {
fullDataset = false;
updateIncrementality = DIFFERENTIAL;
}

// Create List of TripUpdates
Expand All @@ -85,10 +88,10 @@ public String toString() {
}

/**
* @return true iff the last list with updates represent all updates that are active right now,
* i.e. all previous updates should be disregarded
* @return the incrementality of the last list with updates, i.e. if all previous updates
* should be disregarded
*/
public boolean getFullDatasetValueOfLastUpdates() {
return fullDataset;
public UpdateIncrementality incrementalityOfLastUpdates() {
return updateIncrementality;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.opentripplanner.updater.trip;

import static org.opentripplanner.updater.trip.UpdateIncrementality.DIFFERENTIAL;
import static org.opentripplanner.updater.trip.UpdateIncrementality.FULL_DATASET;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.transit.realtime.GtfsRealtime;
import java.net.URI;
Expand Down Expand Up @@ -139,7 +142,7 @@ public void connectionLost(Throwable cause) {
@Override
public void messageArrived(String topic, MqttMessage message) {
List<GtfsRealtime.TripUpdate> updates = null;
boolean fullDataset = true;
UpdateIncrementality updateIncrementality = FULL_DATASET;
try {
// Decode message
GtfsRealtime.FeedMessage feedMessage = GtfsRealtime.FeedMessage.PARSER.parseFrom(
Expand All @@ -156,7 +159,7 @@ public void messageArrived(String topic, MqttMessage message) {
.getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)
) {
fullDataset = false;
updateIncrementality = DIFFERENTIAL;
}

// Create List of TripUpdates
Expand All @@ -177,7 +180,7 @@ public void messageArrived(String topic, MqttMessage message) {
snapshotSource,
fuzzyTripMatcher,
backwardsDelayPropagationType,
fullDataset,
updateIncrementality,
updates,
feedId,
recordMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ public void setup(WriteToGraphCallback writeToGraphCallback) {
public void runPolling() {
// Get update lists from update source
List<TripUpdate> updates = updateSource.getUpdates();
boolean fullDataset = updateSource.getFullDatasetValueOfLastUpdates();
var incrementality = updateSource.incrementalityOfLastUpdates();

if (updates != null) {
// Handle trip updates via graph writer runnable
TripUpdateGraphWriterRunnable runnable = new TripUpdateGraphWriterRunnable(
snapshotSource,
fuzzyTripMatcher,
backwardsDelayPropagationType,
fullDataset,
incrementality,
updates,
feedId,
recordMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.opentripplanner.updater.spi.UpdateError.UpdateErrorType.TOO_FEW_STOPS;
import static org.opentripplanner.updater.spi.UpdateError.UpdateErrorType.TRIP_ALREADY_EXISTS;
import static org.opentripplanner.updater.spi.UpdateError.UpdateErrorType.TRIP_NOT_FOUND;
import static org.opentripplanner.updater.trip.UpdateIncrementality.DIFFERENTIAL;
import static org.opentripplanner.updater.trip.UpdateIncrementality.FULL_DATASET;

import com.google.common.base.Preconditions;
import com.google.common.collect.Multimaps;
Expand Down Expand Up @@ -132,15 +134,14 @@ public TimetableSnapshotSource(
*
* @param backwardsDelayPropagationType Defines when delays are propagated to previous stops and
* if these stops are given the NO_DATA flag.
* @param fullDataset true if the list with updates represent all updates that
* are active right now, i.e. all previous updates should be
* disregarded
* @param updateIncrementality Determines the incrementality of the updates. FULL updates clear the buffer
* of all previous updates for the given feed id.
* @param updates GTFS-RT TripUpdate's that should be applied atomically
*/
public UpdateResult applyTripUpdates(
GtfsRealtimeFuzzyTripMatcher fuzzyTripMatcher,
BackwardsDelayPropagationType backwardsDelayPropagationType,
boolean fullDataset,
UpdateIncrementality updateIncrementality,
List<TripUpdate> updates,
String feedId
) {
Expand All @@ -153,7 +154,7 @@ public UpdateResult applyTripUpdates(
List<Result<UpdateSuccess, UpdateError>> results = new ArrayList<>();

snapshotManager.withLock(() -> {
if (fullDataset) {
if (updateIncrementality == FULL_DATASET) {
// Remove all updates from the buffer
snapshotManager.clearBuffer(feedId);
}
Expand Down Expand Up @@ -201,7 +202,7 @@ public UpdateResult applyTripUpdates(
final TripDescriptor.ScheduleRelationship tripScheduleRelationship = determineTripScheduleRelationship(
tripDescriptor
);
if (!fullDataset) {
if (updateIncrementality == DIFFERENTIAL) {
purgePatternModifications(tripScheduleRelationship, tripId, serviceDate);
}

Expand Down Expand Up @@ -229,13 +230,13 @@ public UpdateResult applyTripUpdates(
tripId,
serviceDate,
CancelationType.CANCEL,
fullDataset
updateIncrementality
);
case DELETED -> handleCanceledTrip(
tripId,
serviceDate,
CancelationType.DELETE,
fullDataset
updateIncrementality
);
case REPLACEMENT -> validateAndHandleModifiedTrip(
tripUpdate,
Expand Down Expand Up @@ -268,7 +269,7 @@ public UpdateResult applyTripUpdates(

var updateResult = UpdateResult.ofResults(results);

if (fullDataset) {
if (updateIncrementality == FULL_DATASET) {
logUpdateResult(feedId, failuresByRelationship, updateResult);
}
return updateResult;
Expand Down Expand Up @@ -1065,11 +1066,11 @@ private Result<UpdateSuccess, UpdateError> handleCanceledTrip(
FeedScopedId tripId,
final LocalDate serviceDate,
CancelationType cancelationType,
boolean fullDataset
UpdateIncrementality incrementality
) {
var canceledPreviouslyAddedTrip = fullDataset
? false
: cancelPreviouslyAddedTrip(tripId, serviceDate, cancelationType);
var canceledPreviouslyAddedTrip =
incrementality != FULL_DATASET &&
cancelPreviouslyAddedTrip(tripId, serviceDate, cancelationType);

// if previously an added trip was removed, there can't be a scheduled trip to remove
if (canceledPreviouslyAddedTrip) {
Expand Down
Loading

0 comments on commit b1a7c53

Please sign in to comment.