diff --git a/Documentation/5.4/Raven.Documentation.Pages/client-api/data-subscriptions/concurrent-subscriptions.dotnet.markdown b/Documentation/5.4/Raven.Documentation.Pages/client-api/data-subscriptions/concurrent-subscriptions.dotnet.markdown new file mode 100644 index 000000000..9784d8a3f --- /dev/null +++ b/Documentation/5.4/Raven.Documentation.Pages/client-api/data-subscriptions/concurrent-subscriptions.dotnet.markdown @@ -0,0 +1,75 @@ +# Concurrent Subscriptions +--- + +{NOTE: } + +* With **Concurrent Subscriptions**, multiple data subscription workers can connect to the same subscription task simultaneously. + +* Each worker is assigned a different batch of documents to process. + +* By processing different batches in parallel, multiple workers can significantly accelerate the consumption of the subscription's contents. + +* Documents that were assigned to workers whose connection has ended unexpectedly, + can be reassigned by the server to available workers. + See [connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure) below. + +* In this page: + * [Defining concurrent workers](../../client-api/data-subscriptions/concurrent-subscriptions#defining-concurrent-workers) + * [Dropping a connection](../../client-api/data-subscriptions/concurrent-subscriptions#dropping-a-connection) + * [Connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure) + +{NOTE/} + +--- + +{PANEL: Defining concurrent workers} + +Concurrent workers are defined similarly to other workers, except their +[strategy](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#worker-strategies) +is set to [SubscriptionOpeningStrategy.Concurrent](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#multiple-workers-per-subscription-strategy). + +* To define a concurrent worker: + * Create the worker using [GetSubscriptionWorker](../../client-api/data-subscriptions/consumption/api-overview#create-the-subscription-worker). + * Pass it a [SubscriptionWorkerOptions](../../client-api/data-subscriptions/consumption/api-overview#subscriptionworkeroptions) instance. + * Set the strategy to `SubscriptionOpeningStrategy.Concurrent` + +* Usage: + * Define two concurrent workers + {CODE conSub_defineWorkers@ClientApi\DataSubscriptions\ConcurrentSubscriptions.cs /} + * Run both workers + {CODE conSub_runWorkers@ClientApi\DataSubscriptions\ConcurrentSubscriptions.cs /} + +{PANEL/} + +{PANEL: Dropping a connection} + +* Use `Subscriptions.DropSubscriptionWorker` to **forcefully disconnect** + the specified worker from the subscription it is connected to. + {CODE-BLOCK: csharp} + public void DropSubscriptionWorker(SubscriptionWorker worker, string database = null) + {CODE-BLOCK/} + +* Usage: + {CODE conSub_dropWorker@ClientApi\DataSubscriptions\ConcurrentSubscriptions.cs /} + +{PANEL/} + +{PANEL: Connection failure} + +* When a concurrent worker's connection ends unexpectedly, + the server may reassign the documents this worker has been processing to any other concurrent worker that is available. +* A worker that reconnects after a connection failure will be assigned a **new** batch of documents. + It is **not** guaranteed that the new batch will contain the same documents this worker was processing before the disconnection. +* As a result, documents may be processed more than once: + - first by a worker that disconnected unexpectedly without acknowledging the completion of its assigned documents, + - and later by other workers the documents are reassigned to. + +{PANEL/} + +## Related Articles + +**Data Subscriptions**: + +- [How to Create a Data Subscription](../../client-api/data-subscriptions/creation/how-to-create-data-subscription) +- [How to Consume a Data Subscription](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription) +- [Maintenance Operations](../../client-api/data-subscriptions/advanced-topics/maintenance-operations) diff --git a/Documentation/5.4/Raven.Documentation.Pages/client-api/data-subscriptions/concurrent-subscriptions.js.markdown b/Documentation/5.4/Raven.Documentation.Pages/client-api/data-subscriptions/concurrent-subscriptions.js.markdown new file mode 100644 index 000000000..2ab44a968 --- /dev/null +++ b/Documentation/5.4/Raven.Documentation.Pages/client-api/data-subscriptions/concurrent-subscriptions.js.markdown @@ -0,0 +1,75 @@ +# Concurrent Subscriptions +--- + +{NOTE: } + +* With **Concurrent Subscriptions**, multiple data subscription workers can connect to the same subscription task simultaneously. + +* Each worker is assigned a different batch of documents to process. + +* By processing different batches in parallel, multiple workers can significantly accelerate the consumption of the subscription's contents. + +* Documents that were assigned to workers whose connection has ended unexpectedly, + can be reassigned by the server to available workers. + See [connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure) below. + +* In this page: + * [Defining concurrent workers](../../client-api/data-subscriptions/concurrent-subscriptions#defining-concurrent-workers) + * [Dropping a connection](../../client-api/data-subscriptions/concurrent-subscriptions#dropping-a-connection) + * [Connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure) + +{NOTE/} + +--- + +{PANEL: Defining concurrent workers} + +Concurrent workers are defined similarly to other workers, except their +[strategy](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#worker-strategies) +is set to [Concurrent](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#multiple-workers-per-subscription-strategy). + +* To define a concurrent worker: + * Create the worker using [getSubscriptionWorker](../../client-api/data-subscriptions/consumption/api-overview#create-the-subscription-worker). + * Pass it a [subscription worker options](../../client-api/data-subscriptions/consumption/api-overview#subscription-worker-options) object. + * Set the strategy to `Concurrent` + +* Usage: + * Define two concurrent workers + {CODE:nodejs concurrent_1@client-api\dataSubscriptions\concurrentSubscriptions.js /} + * Run both workers + {CODE:nodejs concurrent_2@client-api\dataSubscriptions\concurrentSubscriptions.js /} + +{PANEL/} + +{PANEL: Dropping a connection} + +* Use `dropSubscriptionWorker` to **forcefully disconnect** + the specified worker from the subscription it is connected to. + +* Use `dropConnection` to disconnect ALL workers connected to the specified subscription. + +{CODE:nodejs concurrent_3@client-api\dataSubscriptions\concurrentSubscriptions.js /} + +{CODE:nodejs drop_syntax@client-api\dataSubscriptions\concurrentSubscriptions.js /} + +{PANEL/} + +{PANEL: Connection failure} + +* When a concurrent worker's connection ends unexpectedly, + the server may reassign the documents this worker has been processing to any other concurrent worker that is available. +* A worker that reconnects after a connection failure will be assigned a **new** batch of documents. + It is **not** guaranteed that the new batch will contain the same documents this worker was processing before the disconnection. +* As a result, documents may be processed more than once: + - first by a worker that disconnected unexpectedly without acknowledging the completion of its assigned documents, + - and later by other workers the documents are reassigned to. + +{PANEL/} + +## Related Articles + +**Data Subscriptions**: + +- [How to Create a Data Subscription](../../client-api/data-subscriptions/creation/how-to-create-data-subscription) +- [How to Consume a Data Subscription](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription) +- [Maintenance Operations](../../client-api/data-subscriptions/advanced-topics/maintenance-operations) diff --git a/Documentation/5.4/Samples/csharp/Raven.Documentation.Samples/ClientApi/DataSubscriptions/ConcurrentSubscriptions.cs b/Documentation/5.4/Samples/csharp/Raven.Documentation.Samples/ClientApi/DataSubscriptions/ConcurrentSubscriptions.cs new file mode 100644 index 000000000..cbca0a757 --- /dev/null +++ b/Documentation/5.4/Samples/csharp/Raven.Documentation.Samples/ClientApi/DataSubscriptions/ConcurrentSubscriptions.cs @@ -0,0 +1,92 @@ +using Raven.Client.Documents; +using Raven.Client.Documents.Subscriptions; +using Raven.Documentation.Samples.Orders; + +namespace Raven.Documentation.Samples.ClientApi.ConcurrentDataSubscriptions +{ + public class ConcurrentDataSubscriptions + { + public void Create3ConcurrentSubscriptions() + { + var store = new DocumentStore + { + Urls = new[] { "http://127.0.0.1:8080" }, + Database = "sampleDB" + }; + store.Initialize(); + + #region conSub_defineWorkers + // Define concurrent subscription workers + var worker1 = store.Subscriptions.GetSubscriptionWorker( + // Set the worker to connect to the "Get all orders" subscription task + new SubscriptionWorkerOptions("Get all orders") + { + // Set Concurrent strategy + Strategy = SubscriptionOpeningStrategy.Concurrent, + MaxDocsPerBatch = 20 + }); + + var worker2 = store.Subscriptions.GetSubscriptionWorker( + new SubscriptionWorkerOptions("Get all orders") + { + Strategy = SubscriptionOpeningStrategy.Concurrent, + MaxDocsPerBatch = 20 + }); + #endregion + + /* + #region conSub_OnEstablishedSubscriptionConnection + worker1.OnEstablishedSubscriptionConnection += () => + { + // your logic here + }; + #endregion + */ + + #region conSub_defineStrategy + var worker3 = store.Subscriptions.GetSubscriptionWorker( + new SubscriptionWorkerOptions("Get all orders") + { + // Concurrent strategy + Strategy = SubscriptionOpeningStrategy.Concurrent, + MaxDocsPerBatch = 20 + }); + #endregion + + #region conSub_runWorkers + // Start the concurrent worker. + // Workers will connect concurrently to the "Get all rders" subscription task. + var worker1Task = worker1.Run(batch => + { + // Process batch + foreach (var item in batch.Items) + { + // Process item + } + }); + + var worker2Task = worker2.Run(batch => + { + // Process batch + foreach (var item in batch.Items) + { + // Process item + } + }); + #endregion + + #region conSub_dropWorker + // Drop a concurrent subscription worker + store.Subscriptions.DropSubscriptionWorker(worker2); + #endregion + + var worker3Task = worker3.Run(batch => + { + foreach (var item in batch.Items) + { + // Process item + } + }); + } + } +} diff --git a/Documentation/5.4/Samples/nodejs/client-api/dataSubscriptions/concurrentSubscriptions.js b/Documentation/5.4/Samples/nodejs/client-api/dataSubscriptions/concurrentSubscriptions.js new file mode 100644 index 000000000..77d2f8ae9 --- /dev/null +++ b/Documentation/5.4/Samples/nodejs/client-api/dataSubscriptions/concurrentSubscriptions.js @@ -0,0 +1,77 @@ +import * as assert from "assert"; +import { DocumentStore } from "ravendb"; + +const documentStore = new DocumentStore(); +const session = documentStore.openSession(); + +class InvalidOperationException extends Error { + constructor(message) { + super(message); + this.name = "InvalidOperationException"; + } +} + +async function concurrentSubscriptions() { + + { + //region concurrent_1 + // Define 2 concurrent subscription workers + // ======================================== + + const options = { + // Set concurrent strategy + strategy: "Concurrent", + subscriptionName: "Get all orders", + maxDocsPerBatch: 20 + }; + + const worker1 = documentStore.subscriptions.getSubscriptionWorker(options); + const worker2 = documentStore.subscriptions.getSubscriptionWorker(options); + //endregion + } + + { + //region concurrent_2 + worker1.on("batch", (batch, callback) => { + try { + for (const item of batch.items) { + // Process item + } + callback(); + + } catch(err) { + callback(err); + } + }); + + worker2.on("batch", (batch, callback) => { + try { + for (const item of batch.items) { + // Process item + } + callback(); + + } catch(err) { + callback(err); + } + }); + //endregion + } + + { + //region concurrent_3 + // Drop connection for worker2 + await documentStore.subscriptions.dropSubscriptionWorker(worker2); + //endregion + } +} + +{ +//region drop_syntax +// Available overloads: +dropConnection(options); +dropConnection(options, database); +dropSubscriptionWorker(worker); +dropSubscriptionWorker(worker, database); +//endregion +}