From 163b0aaa2c9ff630b263cf9ac4a148a111e3594e Mon Sep 17 00:00:00 2001 From: Johan Torin Date: Mon, 4 Dec 2023 11:45:41 +0100 Subject: [PATCH] Adjust SIRI ServiceBus-client parameters to improve performance * Move to ServiceBusReceiveMode.RECEIVE_AND_DELETE instead of PEEK_LOCK. In this mode the message is deleted from the subscription as soon as it has been delivered. In case of an error this means that the message is lost but 1) the data is applied to the graph in another thread anyway and 2) there is nothing that indicates that a second attempt on the same message data would be more successful. * Fetch more than one message at a time, greatly improving message throughput even with small values. Roundtrip latency of the message fetching can in some situations otherwise lead to the client falling behind. Add config parameter 'prefetchCount' with a default value of 10 messages. * Lower the AutoDeleteOnIdle of subscriptions from one day to default one hour, still a conservative value. Despite having automatic removal of subscriptions on shutdown, lingering subscriptions may still happen, for example if the JVM dies. Add config parameter 'autoDeleteOnIdle' with a default value of one hour. --- docs/sandbox/siri/SiriAzureUpdater.md | 62 ++++++++++--------- .../azure/AbstractAzureSiriUpdater.java | 17 +++-- .../azure/SiriAzureUpdaterParameters.java | 20 ++++++ .../azure/SiriAzureUpdaterConfig.java | 15 +++++ 4 files changed, 81 insertions(+), 33 deletions(-) diff --git a/docs/sandbox/siri/SiriAzureUpdater.md b/docs/sandbox/siri/SiriAzureUpdater.md index 75aa58897ec..3b5c536946d 100644 --- a/docs/sandbox/siri/SiriAzureUpdater.md +++ b/docs/sandbox/siri/SiriAzureUpdater.md @@ -21,20 +21,22 @@ To enable the SIRI updater you need to add it to the updaters section of the `ro -| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since | -|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:| -| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 | -| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 | -| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 | -| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 | -| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 | -| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 | -| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 | -| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 | -| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 | -|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 | -|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na | -|    url | `string` | Endpoint to fetch from | *Optional* | | na | +| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since | +|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:| +| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 | +| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 | +| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 | +| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 | +| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 | +| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 | +| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 | +| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 | +| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 | +| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 | +| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 | +|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 | +|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na | +|    url | `string` | Endpoint to fetch from | *Optional* | | na | ##### Parameter details @@ -107,21 +109,23 @@ Has to be present for authenticationMethod SharedAccessKey. This should be Prima -| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since | -|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:| -| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 | -| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 | -| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 | -| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 | -| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 | -| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 | -| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 | -| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 | -| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 | -|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 | -|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na | -|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 | -|    url | `string` | Endpoint to fetch from | *Optional* | | na | +| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since | +|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:| +| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 | +| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 | +| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 | +| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 | +| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 | +| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 | +| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 | +| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 | +| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 | +| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 | +| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 | +|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 | +|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na | +|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 | +|    url | `string` | Endpoint to fetch from | *Optional* | | na | ##### Parameter details diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java index f96941c7370..3a600a755b1 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/AbstractAzureSiriUpdater.java @@ -10,6 +10,7 @@ import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient; import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder; import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions; +import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import com.google.common.base.Preconditions; import com.google.common.io.CharStreams; import java.io.InputStreamReader; @@ -46,6 +47,8 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater { private final Consumer messageConsumer = this::messageConsumer; private final Consumer errorConsumer = this::errorConsumer; private final String topicName; + private final Duration autoDeleteOnIdle; + private final int prefetchCount; protected WriteToGraphCallback saveResultOnGraph; private ServiceBusProcessorClient eventProcessor; @@ -73,6 +76,8 @@ public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config, TransitModel this.dataInitializationUrl = config.getDataInitializationUrl(); this.timeout = config.getTimeout(); this.feedId = config.feedId(); + this.autoDeleteOnIdle = config.getAutoDeleteOnIdle(); + this.prefetchCount = config.getPrefetchCount(); TransitService transitService = new DefaultTransitService(transitModel); this.entityResolver = new EntityResolver(transitService, feedId); this.fuzzyTripMatcher = @@ -122,10 +127,11 @@ public void run() { .buildAsyncClient(); } - // If Idle more then one day, then delete subscription so we don't have old obsolete subscriptions on Azure Service Bus + // Set options var options = new CreateSubscriptionOptions(); options.setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS)); - options.setAutoDeleteOnIdle(Duration.ofDays(1)); + // Set subscription to be deleted if idle for a certain time, so that orphaned instances doesn't linger. + options.setAutoDeleteOnIdle(autoDeleteOnIdle); // Make sure there is no old subscription on serviceBus if ( @@ -150,15 +156,18 @@ public void run() { .processor() .topicName(topicName) .subscriptionName(subscriptionName) + .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE) + .prefetchCount(prefetchCount) .processError(errorConsumer) .processMessage(messageConsumer) .buildProcessorClient(); eventProcessor.start(); LOG.info( - "Service Bus processor started for topic {} and subscription {}", + "Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.", topicName, - subscriptionName + subscriptionName, + prefetchCount ); ApplicationShutdownSupport.addShutdownHook( diff --git a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java index 0d207d27efe..4b8406da896 100644 --- a/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java +++ b/src/ext/java/org/opentripplanner/ext/siri/updater/azure/SiriAzureUpdaterParameters.java @@ -1,5 +1,7 @@ package org.opentripplanner.ext.siri.updater.azure; +import java.time.Duration; + public abstract class SiriAzureUpdaterParameters { private String configRef; @@ -13,6 +15,8 @@ public abstract class SiriAzureUpdaterParameters { private int timeout; private boolean fuzzyTripMatching; + private Duration autoDeleteOnIdle; + private int prefetchCount; public SiriAzureUpdaterParameters(String type) { this.type = type; @@ -93,4 +97,20 @@ public boolean isFuzzyTripMatching() { public void setFuzzyTripMatching(boolean fuzzyTripMatching) { this.fuzzyTripMatching = fuzzyTripMatching; } + + public Duration getAutoDeleteOnIdle() { + return autoDeleteOnIdle; + } + + public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) { + this.autoDeleteOnIdle = autoDeleteOnIdle; + } + + public int getPrefetchCount() { + return prefetchCount; + } + + public void setPrefetchCount(int prefetchCount) { + this.prefetchCount = prefetchCount; + } } diff --git a/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java b/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java index 35da716337e..ddb6a967f92 100644 --- a/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java +++ b/src/main/java/org/opentripplanner/standalone/config/routerconfig/updaters/azure/SiriAzureUpdaterConfig.java @@ -4,6 +4,7 @@ import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_2; import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_5; +import java.time.Duration; import java.time.LocalDate; import java.time.Period; import java.time.ZoneId; @@ -41,6 +42,20 @@ public static void populateConfig( .summary("The ID of the feed to apply the updates to.") .asString(null) ); + parameters.setAutoDeleteOnIdle( + c + .of("autoDeleteOnIdle") + .since(V2_5) + .summary("The time after which an inactive subscription is removed.") + .asDuration(Duration.ofHours(1)) + ); + parameters.setPrefetchCount( + c + .of("prefetchCount") + .since(V2_5) + .summary("The number of messages to fetch from the subscription at a time.") + .asInt(10) + ); parameters.setFuzzyTripMatching( c .of("fuzzyTripMatching")