Skip to content

Commit

Permalink
fix: Avoid concurrency issue when multiple requests are made with the…
Browse files Browse the repository at this point in the history
… same key (#46)
  • Loading branch information
mirland authored Oct 31, 2024
1 parent c997515 commit a23dd83
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dart-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
- main

jobs:
build:
check_coverage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down
62 changes: 46 additions & 16 deletions lib/src/implementations/stock_impl.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// ignore_for_file: public_member_api_docs

import 'dart:async';
import 'dart:math';

import 'package:meta/meta.dart';
import 'package:mutex/mutex.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stock/src/extensions/future_stream_internal_extensions.dart';
Expand All @@ -15,6 +17,9 @@ import 'package:stock/src/stock_request.dart';
import 'package:stock/src/stock_response.dart';
import 'package:stock/src/stock_response_extensions.dart';

@visibleForTesting
typedef SessionStreamKey = double;

class StockImpl<Key, Output> implements Stock<Key, Output> {
StockImpl({
required Fetcher<Key, Output> fetcher,
Expand All @@ -25,8 +30,11 @@ class StockImpl<Key, Output> implements Stock<Key, Output> {
final FactoryFetcher<Key, Output> _fetcher;
final SourceOfTruth<Key, Output>? _sourceOfTruth;

final Map<Key, int> _writingMap = {};
final _writingLock = Mutex();
@visibleForTesting
final Map<Key, Set<SessionStreamKey>> sessionFetcherPendingRequests = {};
@visibleForTesting
final Map<Key, Set<SessionStreamKey>> currentStreamSessions = {};
final _notifyPendingRequestsLock = Mutex();

@override
Future<Output> fresh(Key key) =>
Expand Down Expand Up @@ -70,6 +78,8 @@ class StockImpl<Key, Output> implements Stock<Key, Output> {
StockRequest<Key> request,
SourceOfTruth<Key, Output> sourceOfTruth,
) async* {
final sessionKey = Random.secure().nextDouble();

final controller = StreamController<StockResponse<Output?>>.broadcast();
final syncLock = Mutex();
await syncLock.acquire();
Expand All @@ -79,26 +89,35 @@ class StockImpl<Key, Output> implements Stock<Key, Output> {
request: request,
sourceOfTruth: sourceOfTruth,
emitMutex: syncLock,
sessionStreamKey: sessionKey,
);

final sourceOfTruthSubscription = _generateSourceOfTruthStreamSubscription(
request: request,
sourceOfTruth: sourceOfTruth,
dataStreamController: controller,
dbLock: syncLock,
sessionStreamKey: sessionKey,
);

yield* controller.stream.whereDataNotNull().doOnCancel(() async {
currentStreamSessions[request.key]?.remove(sessionKey);
await fetcherSubscription.cancel();
await sourceOfTruthSubscription.cancel();
});
}).doOnListen(
() => _notifyPendingRequestsLock.protect(() async {
currentStreamSessions[request.key] =
(currentStreamSessions[request.key] ?? {})..add(sessionKey);
}),
);
}

StreamSubscription<void> _generateNetworkStream({
required StockRequest<Key> request,
required SourceOfTruth<Key, Output>? sourceOfTruth,
required Mutex emitMutex,
required StreamController<StockResponse<Output?>> dataStreamController,
required SessionStreamKey sessionStreamKey,
}) =>
Stream.fromFuture(
_shouldStartNetworkStream(request, dataStreamController),
Expand All @@ -113,24 +132,35 @@ class StockImpl<Key, Output> implements Stock<Key, Output> {
.listen(
(response) => emitMutex.protect(() async {
if (response is StockResponseData<Output>) {
await _writingLock
.protect(() async => _incrementWritingMap(request, 1));
await _notifyPendingRequestsLock.protect(
() async => _setupSessionPendingRequest(
request,
currentStreamSessions[request.key]!,
),
);
final writerResult = await sourceOfTruth
?.write(request.key, response.value)
.mapToResponse(ResponseOrigin.fetcher);
if (writerResult is StockResponseError) {
dataStreamController.add(writerResult.swapType());
await _writingLock
.protect(() async => _incrementWritingMap(request, -1));
await _notifyPendingRequestsLock.protect(
() async => sessionFetcherPendingRequests[request.key]
?.remove(sessionStreamKey),
);
}
} else {
dataStreamController.add(response);
}
}),
);

int _incrementWritingMap(StockRequest<Key> request, int increment) =>
_writingMap[request.key] = (_writingMap[request.key] ?? 0) + increment;
void _setupSessionPendingRequest(
StockRequest<Key> request,
Set<SessionStreamKey> currentSessions,
) =>
sessionFetcherPendingRequests[request.key] =
(sessionFetcherPendingRequests[request.key] ?? {})
..addAll(currentSessions);

Stream<StockResponse<Output>> _startNetworkFlow(
bool shouldFetchNewValue,
Expand Down Expand Up @@ -167,20 +197,20 @@ class StockImpl<Key, Output> implements Stock<Key, Output> {
required SourceOfTruth<Key, Output> sourceOfTruth,
required Mutex dbLock,
required StreamController<StockResponse<Output?>> dataStreamController,
required SessionStreamKey sessionStreamKey,
}) {
var initialSyncDone = false;
final sourceOfTruthSubscription = sourceOfTruth
.reader(request.key)
.mapToResponse(ResponseOrigin.sourceOfTruth)
.listen((response) async {
if (response is StockResponseData<Output?>) {
final fetcherData = await _writingLock.protect(() async {
final writingKeyData = (_writingMap[request.key] ?? -1) > 0;
if (writingKeyData) {
_incrementWritingMap(request, -1);
}
return writingKeyData;
});
final fetcherData = await _notifyPendingRequestsLock.protect(
() async =>
sessionFetcherPendingRequests[request.key]
?.remove(sessionStreamKey) ??
false,
);
dataStreamController.add(
StockResponseData(
fetcherData ? ResponseOrigin.fetcher : response.origin,
Expand Down
70 changes: 55 additions & 15 deletions pubspec.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ dependencies:

dev_dependencies:
build_runner: ^2.4.9
dartx: ^1.2.0
equatable: ^2.0.5
lints: ^3.0.0
mockito: ^5.4.4
test: ^1.25.5
Expand Down
11 changes: 11 additions & 0 deletions test/common/mock_key_value.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import 'package:equatable/equatable.dart';

class MockIntKeyValue extends Equatable {
const MockIntKeyValue(this.key, this.value);

final int key;
final int value;

@override
List<Object> get props => [key];
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:stock/src/source_of_truth.dart';

Expand Down
11 changes: 11 additions & 0 deletions test/common/stock_test_extensions.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import 'dart:async';

import 'package:dartx/dartx.dart';
import 'package:stock/src/implementations/stock_impl.dart';
import 'package:stock/src/stock.dart';
import 'package:stock/src/stock_response.dart';

Expand All @@ -25,6 +27,15 @@ extension StockExtensions<Key, Output> on Stock<Key, Output> {
(value) =>
value.map((items) => items.removeStacktraceIfNeeded()).toList(),
);

int get currentStreamSessionCount => (this as StockImpl)
.currentStreamSessions
.entries
.sumBy((e) => e.value.length);
int get sessionFetcherPendingRequests => (this as StockImpl)
.sessionFetcherPendingRequests
.entries
.sumBy((e) => e.value.length);
}

class ResultListener<T> {
Expand Down
Loading

0 comments on commit a23dd83

Please sign in to comment.