Skip to content

Commit

Permalink
Merge pull request #1928 from Danielle9897/RDoc-3010-concurrent-subsc…
Browse files Browse the repository at this point in the history
…riptions-for-pr

RDoc-3010 [Node.js] Subscriptions > Concurrent subscriptions [Replace C# samples]
  • Loading branch information
ppekrol authored Oct 22, 2024
2 parents 1318ccc + da2462a commit ad3575f
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Concurrent Subscriptions
---

{NOTE: }

* With **Concurrent Subscriptions**, multiple data subscription workers can connect to the same subscription task simultaneously.

* Each worker is assigned a different batch of documents to process.

* By processing different batches in parallel, multiple workers can significantly accelerate the consumption of the subscription's contents.

* Documents that were assigned to workers whose connection has ended unexpectedly,
can be reassigned by the server to available workers.
See [connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure) below.

* In this page:
* [Defining concurrent workers](../../client-api/data-subscriptions/concurrent-subscriptions#defining-concurrent-workers)
* [Dropping a connection](../../client-api/data-subscriptions/concurrent-subscriptions#dropping-a-connection)
* [Connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure)

{NOTE/}

---

{PANEL: Defining concurrent workers}

Concurrent workers are defined similarly to other workers, except their
[strategy](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#worker-strategies)
is set to [SubscriptionOpeningStrategy.Concurrent](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#multiple-workers-per-subscription-strategy).

* To define a concurrent worker:
* Create the worker using [GetSubscriptionWorker](../../client-api/data-subscriptions/consumption/api-overview#create-the-subscription-worker).
* Pass it a [SubscriptionWorkerOptions](../../client-api/data-subscriptions/consumption/api-overview#subscriptionworkeroptions) instance.
* Set the strategy to `SubscriptionOpeningStrategy.Concurrent`

* Usage:
* Define two concurrent workers
{CODE conSub_defineWorkers@ClientApi\DataSubscriptions\ConcurrentSubscriptions.cs /}
* Run both workers
{CODE conSub_runWorkers@ClientApi\DataSubscriptions\ConcurrentSubscriptions.cs /}

{PANEL/}

{PANEL: Dropping a connection}

* Use `Subscriptions.DropSubscriptionWorker` to **forcefully disconnect**
the specified worker from the subscription it is connected to.
{CODE-BLOCK: csharp}
public void DropSubscriptionWorker<T>(SubscriptionWorker<T> worker, string database = null)
{CODE-BLOCK/}

* Usage:
{CODE conSub_dropWorker@ClientApi\DataSubscriptions\ConcurrentSubscriptions.cs /}

{PANEL/}

{PANEL: Connection failure}

* When a concurrent worker's connection ends unexpectedly,
the server may reassign the documents this worker has been processing to any other concurrent worker that is available.
* A worker that reconnects after a connection failure will be assigned a **new** batch of documents.
It is **not** guaranteed that the new batch will contain the same documents this worker was processing before the disconnection.
* As a result, documents may be processed more than once:
- first by a worker that disconnected unexpectedly without acknowledging the completion of its assigned documents,
- and later by other workers the documents are reassigned to.

{PANEL/}

## Related Articles

**Data Subscriptions**:

- [How to Create a Data Subscription](../../client-api/data-subscriptions/creation/how-to-create-data-subscription)
- [How to Consume a Data Subscription](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription)
- [Maintenance Operations](../../client-api/data-subscriptions/advanced-topics/maintenance-operations)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Concurrent Subscriptions
---

{NOTE: }

* With **Concurrent Subscriptions**, multiple data subscription workers can connect to the same subscription task simultaneously.

* Each worker is assigned a different batch of documents to process.

* By processing different batches in parallel, multiple workers can significantly accelerate the consumption of the subscription's contents.

* Documents that were assigned to workers whose connection has ended unexpectedly,
can be reassigned by the server to available workers.
See [connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure) below.

* In this page:
* [Defining concurrent workers](../../client-api/data-subscriptions/concurrent-subscriptions#defining-concurrent-workers)
* [Dropping a connection](../../client-api/data-subscriptions/concurrent-subscriptions#dropping-a-connection)
* [Connection failure](../../client-api/data-subscriptions/concurrent-subscriptions#connection-failure)

{NOTE/}

---

{PANEL: Defining concurrent workers}

Concurrent workers are defined similarly to other workers, except their
[strategy](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#worker-strategies)
is set to [Concurrent](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription#multiple-workers-per-subscription-strategy).

* To define a concurrent worker:
* Create the worker using [getSubscriptionWorker](../../client-api/data-subscriptions/consumption/api-overview#create-the-subscription-worker).
* Pass it a [subscription worker options](../../client-api/data-subscriptions/consumption/api-overview#subscription-worker-options) object.
* Set the strategy to `Concurrent`

* Usage:
* Define two concurrent workers
{CODE:nodejs concurrent_1@client-api\dataSubscriptions\concurrentSubscriptions.js /}
* Run both workers
{CODE:nodejs concurrent_2@client-api\dataSubscriptions\concurrentSubscriptions.js /}

{PANEL/}

{PANEL: Dropping a connection}

* Use `dropSubscriptionWorker` to **forcefully disconnect**
the specified worker from the subscription it is connected to.

* Use `dropConnection` to disconnect ALL workers connected to the specified subscription.

{CODE:nodejs concurrent_3@client-api\dataSubscriptions\concurrentSubscriptions.js /}

{CODE:nodejs drop_syntax@client-api\dataSubscriptions\concurrentSubscriptions.js /}

{PANEL/}

{PANEL: Connection failure}

* When a concurrent worker's connection ends unexpectedly,
the server may reassign the documents this worker has been processing to any other concurrent worker that is available.
* A worker that reconnects after a connection failure will be assigned a **new** batch of documents.
It is **not** guaranteed that the new batch will contain the same documents this worker was processing before the disconnection.
* As a result, documents may be processed more than once:
- first by a worker that disconnected unexpectedly without acknowledging the completion of its assigned documents,
- and later by other workers the documents are reassigned to.

{PANEL/}

## Related Articles

**Data Subscriptions**:

- [How to Create a Data Subscription](../../client-api/data-subscriptions/creation/how-to-create-data-subscription)
- [How to Consume a Data Subscription](../../client-api/data-subscriptions/consumption/how-to-consume-data-subscription)
- [Maintenance Operations](../../client-api/data-subscriptions/advanced-topics/maintenance-operations)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using Raven.Client.Documents;
using Raven.Client.Documents.Subscriptions;
using Raven.Documentation.Samples.Orders;

namespace Raven.Documentation.Samples.ClientApi.ConcurrentDataSubscriptions
{
public class ConcurrentDataSubscriptions
{
public void Create3ConcurrentSubscriptions()
{
var store = new DocumentStore
{
Urls = new[] { "http://127.0.0.1:8080" },
Database = "sampleDB"
};
store.Initialize();

#region conSub_defineWorkers
// Define concurrent subscription workers
var worker1 = store.Subscriptions.GetSubscriptionWorker<Order>(
// Set the worker to connect to the "Get all orders" subscription task
new SubscriptionWorkerOptions("Get all orders")
{
// Set Concurrent strategy
Strategy = SubscriptionOpeningStrategy.Concurrent,
MaxDocsPerBatch = 20
});

var worker2 = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions("Get all orders")
{
Strategy = SubscriptionOpeningStrategy.Concurrent,
MaxDocsPerBatch = 20
});
#endregion

/*
#region conSub_OnEstablishedSubscriptionConnection
worker1.OnEstablishedSubscriptionConnection += () =>
{
// your logic here
};
#endregion
*/

#region conSub_defineStrategy
var worker3 = store.Subscriptions.GetSubscriptionWorker<Order>(
new SubscriptionWorkerOptions("Get all orders")
{
// Concurrent strategy
Strategy = SubscriptionOpeningStrategy.Concurrent,
MaxDocsPerBatch = 20
});
#endregion

#region conSub_runWorkers
// Start the concurrent worker.
// Workers will connect concurrently to the "Get all rders" subscription task.
var worker1Task = worker1.Run(batch =>
{
// Process batch
foreach (var item in batch.Items)
{
// Process item
}
});

var worker2Task = worker2.Run(batch =>
{
// Process batch
foreach (var item in batch.Items)
{
// Process item
}
});
#endregion

#region conSub_dropWorker
// Drop a concurrent subscription worker
store.Subscriptions.DropSubscriptionWorker(worker2);
#endregion

var worker3Task = worker3.Run(batch =>
{
foreach (var item in batch.Items)
{
// Process item
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import * as assert from "assert";
import { DocumentStore } from "ravendb";

const documentStore = new DocumentStore();
const session = documentStore.openSession();

class InvalidOperationException extends Error {
constructor(message) {
super(message);
this.name = "InvalidOperationException";
}
}

async function concurrentSubscriptions() {

{
//region concurrent_1
// Define 2 concurrent subscription workers
// ========================================

const options = {
// Set concurrent strategy
strategy: "Concurrent",
subscriptionName: "Get all orders",
maxDocsPerBatch: 20
};

const worker1 = documentStore.subscriptions.getSubscriptionWorker(options);
const worker2 = documentStore.subscriptions.getSubscriptionWorker(options);
//endregion
}

{
//region concurrent_2
worker1.on("batch", (batch, callback) => {
try {
for (const item of batch.items) {
// Process item
}
callback();

} catch(err) {
callback(err);
}
});

worker2.on("batch", (batch, callback) => {
try {
for (const item of batch.items) {
// Process item
}
callback();

} catch(err) {
callback(err);
}
});
//endregion
}

{
//region concurrent_3
// Drop connection for worker2
await documentStore.subscriptions.dropSubscriptionWorker(worker2);
//endregion
}
}

{
//region drop_syntax
// Available overloads:
dropConnection(options);
dropConnection(options, database);
dropSubscriptionWorker(worker);
dropSubscriptionWorker(worker, database);
//endregion
}

0 comments on commit ad3575f

Please sign in to comment.