Skip to content

Commit

Permalink
Merge pull request opentripplanner#5741 from Skanetrafiken/adjust-sir…
Browse files Browse the repository at this point in the history
…i-servicebus-client-parameters

Adjust SIRI ServiceBus-client parameters to improve performance
  • Loading branch information
jtorin authored Mar 14, 2024
2 parents 378cceb + 163b0aa commit 5edc5ae
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 33 deletions.
62 changes: 33 additions & 29 deletions docs/sandbox/siri/SiriAzureUpdater.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ To enable the SIRI updater you need to add it to the updaters section of the `ro
<!-- siri-azure-et-updater BEGIN -->
<!-- NOTE! This section is auto-generated. Do not change, change doc in code instead. -->

| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |
| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-et-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__11__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 |
| [customMidnight](#u__11__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__11__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 |
| [servicebus-url](#u__11__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |


##### Parameter details
Expand Down Expand Up @@ -107,21 +109,23 @@ Has to be present for authenticationMethod SharedAccessKey. This should be Prima
<!-- siri-azure-sx-updater BEGIN -->
<!-- NOTE! This section is auto-generated. Do not change, change doc in code instead. -->

| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:---------:|----------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |
| Config Parameter | Type | Summary | Req./Opt. | Default Value | Since |
|------------------------------------------------------------|:----------:|------------------------------------------------------------------|:----------:|---------------------|:-----:|
| type = "siri-azure-sx-updater" | `enum` | The type of the updater. | *Required* | | 1.5 |
| [authenticationType](#u__10__authenticationType) | `enum` | Which authentication type to use | *Optional* | `"sharedaccesskey"` | 2.5 |
| autoDeleteOnIdle | `duration` | The time after which an inactive subscription is removed. | *Optional* | `"PT1H"` | 2.5 |
| [customMidnight](#u__10__customMidnight) | `integer` | Time on which time breaks into new day. | *Optional* | `0` | 2.2 |
| feedId | `string` | The ID of the feed to apply the updates to. | *Optional* | | 2.2 |
| [fullyQualifiedNamespace](#u__10__fullyQualifiedNamespace) | `string` | Service Bus fully qualified namespace used for authentication. | *Optional* | | 2.5 |
| fuzzyTripMatching | `boolean` | Whether to apply fuzzyTripMatching on the updates | *Optional* | `false` | 2.2 |
| prefetchCount | `integer` | The number of messages to fetch from the subscription at a time. | *Optional* | `10` | 2.5 |
| [servicebus-url](#u__10__servicebus_url) | `string` | Service Bus connection used for authentication. | *Optional* | | 2.2 |
| topic | `string` | Service Bus topic to connect to. | *Optional* | | 2.2 |
| history | `object` | Configuration for fetching historical data on startup | *Optional* | | 2.2 |
|    fromDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"-P1D"` | 2.2 |
|    timeout | `integer` | Timeout in milliseconds | *Optional* | `300000` | na |
|    toDateTime | `string` | Datetime boundary for historical data. | *Optional* | `"P1D"` | 2.2 |
|    url | `string` | Endpoint to fetch from | *Optional* | | na |


##### Parameter details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.google.common.base.Preconditions;
import com.google.common.io.CharStreams;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -46,6 +47,8 @@ public abstract class AbstractAzureSiriUpdater implements GraphUpdater {
private final Consumer<ServiceBusReceivedMessageContext> messageConsumer = this::messageConsumer;
private final Consumer<ServiceBusErrorContext> errorConsumer = this::errorConsumer;
private final String topicName;
private final Duration autoDeleteOnIdle;
private final int prefetchCount;

protected WriteToGraphCallback saveResultOnGraph;
private ServiceBusProcessorClient eventProcessor;
Expand Down Expand Up @@ -73,6 +76,8 @@ public AbstractAzureSiriUpdater(SiriAzureUpdaterParameters config, TransitModel
this.dataInitializationUrl = config.getDataInitializationUrl();
this.timeout = config.getTimeout();
this.feedId = config.feedId();
this.autoDeleteOnIdle = config.getAutoDeleteOnIdle();
this.prefetchCount = config.getPrefetchCount();
TransitService transitService = new DefaultTransitService(transitModel);
this.entityResolver = new EntityResolver(transitService, feedId);
this.fuzzyTripMatcher =
Expand Down Expand Up @@ -122,10 +127,11 @@ public void run() {
.buildAsyncClient();
}

// If Idle more then one day, then delete subscription so we don't have old obsolete subscriptions on Azure Service Bus
// Set options
var options = new CreateSubscriptionOptions();
options.setDefaultMessageTimeToLive(Duration.of(25, ChronoUnit.HOURS));
options.setAutoDeleteOnIdle(Duration.ofDays(1));
// Set subscription to be deleted if idle for a certain time, so that orphaned instances doesn't linger.
options.setAutoDeleteOnIdle(autoDeleteOnIdle);

// Make sure there is no old subscription on serviceBus
if (
Expand All @@ -150,15 +156,18 @@ public void run() {
.processor()
.topicName(topicName)
.subscriptionName(subscriptionName)
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.prefetchCount(prefetchCount)
.processError(errorConsumer)
.processMessage(messageConsumer)
.buildProcessorClient();

eventProcessor.start();
LOG.info(
"Service Bus processor started for topic {} and subscription {}",
"Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.",
topicName,
subscriptionName
subscriptionName,
prefetchCount
);

ApplicationShutdownSupport.addShutdownHook(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opentripplanner.ext.siri.updater.azure;

import java.time.Duration;

public abstract class SiriAzureUpdaterParameters {

private String configRef;
Expand All @@ -13,6 +15,8 @@ public abstract class SiriAzureUpdaterParameters {
private int timeout;

private boolean fuzzyTripMatching;
private Duration autoDeleteOnIdle;
private int prefetchCount;

public SiriAzureUpdaterParameters(String type) {
this.type = type;
Expand Down Expand Up @@ -93,4 +97,20 @@ public boolean isFuzzyTripMatching() {
public void setFuzzyTripMatching(boolean fuzzyTripMatching) {
this.fuzzyTripMatching = fuzzyTripMatching;
}

public Duration getAutoDeleteOnIdle() {
return autoDeleteOnIdle;
}

public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) {
this.autoDeleteOnIdle = autoDeleteOnIdle;
}

public int getPrefetchCount() {
return prefetchCount;
}

public void setPrefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_2;
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_5;

import java.time.Duration;
import java.time.LocalDate;
import java.time.Period;
import java.time.ZoneId;
Expand Down Expand Up @@ -41,6 +42,20 @@ public static void populateConfig(
.summary("The ID of the feed to apply the updates to.")
.asString(null)
);
parameters.setAutoDeleteOnIdle(
c
.of("autoDeleteOnIdle")
.since(V2_5)
.summary("The time after which an inactive subscription is removed.")
.asDuration(Duration.ofHours(1))
);
parameters.setPrefetchCount(
c
.of("prefetchCount")
.since(V2_5)
.summary("The number of messages to fetch from the subscription at a time.")
.asInt(10)
);
parameters.setFuzzyTripMatching(
c
.of("fuzzyTripMatching")
Expand Down

0 comments on commit 5edc5ae

Please sign in to comment.