Skip to content

Commit

Permalink
Add initial version (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
mirland authored Aug 24, 2022
1 parent b7157e6 commit fa967b9
Show file tree
Hide file tree
Showing 39 changed files with 2,337 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*.mocks.dart linguist-generated=true
*.freezed.dart linguist-generated=true
*.g.dart linguist-generated=true
*.gr.dart linguist-generated=true
pubspec.lock linguist-generated=true
Gemfile.lock linguist-generated=true
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @xmartlabs/flutter-open-source
33 changes: 33 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Miscellaneous
*.class
*.log
*.pyc
*.swp
.DS_Store
.atom/
.buildlog/
.history
.svn/
migrate_working_dir/

# IntelliJ related
*.iml
*.ipr
*.iws
.idea/

# The .vscode folder contains launch configuration and tasks you configure in
# VS Code which you may wish to be included in version control, so this line
# is commented out by default.
#.vscode/

# Flutter/Dart/Pub related
# Libraries should not include pubspec.lock, per https://dart.dev/guides/libraries/private-files#pubspeclock.
/pubspec.lock
**/doc/api/
.dart_tool/
.packages
build/

# Coverage
/coverage
10 changes: 10 additions & 0 deletions .metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This file tracks properties of this Flutter project.
# Used by Flutter tool to assess capabilities and perform upgrades etc.
#
# This file should be version controlled and should not be manually edited.

version:
revision: bcea432bce54a83306b3c00a7ad0ed98f777348d
channel: beta

project_type: package
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 0.0.1

* TODO: Describe initial release.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TODO: Add your license here.
4 changes: 4 additions & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
include: package:flutter_lints/flutter.yaml

# Additional information about this file can be found at
# https://dart.dev/guides/language/analysis-options
8 changes: 8 additions & 0 deletions lib/errors.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class StockError extends Error {
final String message;

StockError(this.message);

@override
String toString() => "StockError: $message";
}
11 changes: 11 additions & 0 deletions lib/fetcher.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import 'package:stock/src/factory_fetcher.dart';

class Fetcher<Key, Output> {
static Fetcher<Key, Output> ofFuture<Key, Output>(
Future<Output> Function(Key key) futureFactory) =>
FutureFetcher(futureFactory);

static Fetcher<Key, Output> ofStream<Key, Output>(
Stream<Output> Function(Key key) streamFactory) =>
StreamFetcher(streamFactory);
}
43 changes: 43 additions & 0 deletions lib/source_of_truth.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import 'dart:async';

import 'package:flutter/widgets.dart';

class SourceOfTruth<Key, Output> {
Stream<Output?> Function(Key key) reader;
Future<void> Function(Key key, Output? output) writer;

SourceOfTruth({required this.reader, required this.writer});
}

class CachedSourceOfTruth<Key, T> implements SourceOfTruth<Key, T> {
final _streamController = StreamController<T?>.broadcast();

late Map<Key, T?> _cachedValues;

@override
late Stream<T?> Function(Key key) reader;
@override
late Future<void> Function(Key key, T? output) writer;

CachedSourceOfTruth([Map<Key, T?>? cachedValues]) {
_cachedValues = {if (cachedValues != null) ...cachedValues};
reader = generateReader;
writer = generateWriter;
}

@protected
@visibleForTesting
void setCachedValue(Key key, T? t) => _cachedValues[key] = t;

@protected
Stream<T?> generateReader(Key key) async* {
yield _cachedValues[key];
yield* _streamController.stream;
}

@protected
Future<void> generateWriter(Key key, T? value) async {
setCachedValue(key, value);
_streamController.add(value);
}
}
20 changes: 20 additions & 0 deletions lib/src/extensions/future_stream_extensions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import 'package:rxdart/rxdart.dart';
import 'package:stock/store_response.dart';

extension StreamExtensions<T> on Stream<T> {
Stream<StoreResponse<T>> mapToResponse(ResponseOrigin origin) =>
map((data) => StoreResponse.data(origin, data))
.onErrorReturnWith((error, stacktrace) {
return StoreResponse.error(origin, error, stacktrace);
});
}

extension FutureExtensions<T> on Future<T> {
Future<StoreResponse<T>> mapToResponse(ResponseOrigin origin) async {
try {
return StoreResponse.data(origin, await this);
} catch (error, stacktrace) {
return StoreResponse.error(origin, error, stacktrace);
}
}
}
36 changes: 36 additions & 0 deletions lib/src/extensions/store_response_extensions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import 'package:stock/errors.dart';
import 'package:stock/store_response.dart';

extension StoreResponseExtensions<T> on StoreResponse<T> {
StoreResponse<R> swapType<R>() {
return map(
data: (response) => StoreResponse.data(origin, response.value as R),
loading: (_) => StoreResponse.loading(origin),
error: (response) =>
StoreResponse.error(origin, response.error, response.stackTrace),
);
}

T requireData() => map(
data: (response) => response.value,
loading: (_) => throw StockError('There is no data in loading'),
error: (response) => throw response.error,
);

T? get data => map(
data: (response) => response.value,
loading: (_) => null,
error: (response) => null,
);

bool get isLoading => this is StoreResponseLoading;

bool get isError => this is StoreResponseError;
}

extension StoreResponseStreamExtensions<T> on Stream<StoreResponse<T?>> {
Stream<StoreResponse<T>> whereDataNotNull() => where(
(event) =>
event is StoreResponseData<T?> ? event.requireData() != null : true,
).map((event) => event.swapType<T>());
}
16 changes: 16 additions & 0 deletions lib/src/factory_fetcher.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import 'package:stock/fetcher.dart';

class FactoryFetcher<Key, Output> implements Fetcher<Key, Output> {
Stream<Output> Function(Key key) factory;

FactoryFetcher(this.factory);
}

class FutureFetcher<Key, Output> extends FactoryFetcher<Key, Output> {
FutureFetcher(Future<Output> Function(Key key) factory)
: super((key) => Stream.fromFuture(factory(key)));
}

class StreamFetcher<Key, Output> extends FactoryFetcher<Key, Output> {
StreamFetcher(factory) : super(factory);
}
181 changes: 181 additions & 0 deletions lib/src/store_impl.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import 'dart:async';

import 'package:mutex/mutex.dart';
import 'package:rxdart/rxdart.dart';
import 'package:stock/fetcher.dart';
import 'package:stock/source_of_truth.dart';
import 'package:stock/src/extensions/future_stream_extensions.dart';
import 'package:stock/src/extensions/store_response_extensions.dart';
import 'package:stock/src/factory_fetcher.dart';
import 'package:stock/src/wrapped_source_of_truth.dart';
import 'package:stock/store.dart';
import 'package:stock/store_request.dart';
import 'package:stock/store_response.dart';

class StoreImpl<Key, Output> implements Store<Key, Output> {
final Fetcher<Key, Output> _fetcher;
final SourceOfTruth<Key, Output>? _sourceOfTruth;

final Map<Key, int> _writingMap = {};
final _writingLock = Mutex();

StoreImpl({
required Fetcher<Key, Output> fetcher,
required SourceOfTruth<Key, Output>? sourceOfTruth,
}) : _fetcher = fetcher,
_sourceOfTruth = sourceOfTruth;

@override
Future<Output> fresh(Key key) =>
_generateCombinedNetworkAndSourceOfTruthStream(
StoreRequest(key: key, refresh: true),
WriteWrappedSourceOfTruth(_sourceOfTruth),
_fetcher as FactoryFetcher<Key, Output>,
)
.where((event) => event is! StoreResponseLoading)
.where((event) => event.origin == ResponseOrigin.fetcher)
.first
.then((value) => value.requireData());

@override
Future<Output> get(Key key) => stream(key, refresh: false)
.where((event) => event is! StoreResponseLoading)
.first
.then((value) => value.requireData());

@override
Stream<StoreResponse<Output>> stream(Key key, {refresh = true}) =>
streamFromRequest(StoreRequest(
key: key,
refresh: refresh,
));

Stream<StoreResponse<Output>> streamFromRequest(StoreRequest<Key> request) =>
_generateCombinedNetworkAndSourceOfTruthStream(
request,
_sourceOfTruth == null ? CachedSourceOfTruth() : _sourceOfTruth!,
_fetcher as FactoryFetcher<Key, Output>,
);

Stream<StoreResponse<Output>> _generateCombinedNetworkAndSourceOfTruthStream(
StoreRequest<Key> request,
SourceOfTruth<Key, Output> sourceOfTruth,
FactoryFetcher<Key, Output> fetcher,
) async* {
final StreamController<StoreResponse<Output?>> controller =
StreamController.broadcast();
final syncLock = Mutex();
await syncLock.acquire();

final fetcherSubscription = _generateNetworkStream(
dataStreamController: controller,
request: request,
sourceOfTruth: sourceOfTruth,
fetcher: fetcher,
emitMutex: syncLock,
);

final sourceOfTruthSubscription = _generateSourceOfTruthStreamSubscription(
request: request,
sourceOfTruth: sourceOfTruth,
dataStreamController: controller,
dbLock: syncLock,
);

yield* controller.stream.whereDataNotNull().doOnCancel(() async {
await fetcherSubscription.cancel();
await sourceOfTruthSubscription.cancel();
});
}

StreamSubscription _generateNetworkStream({
required StoreRequest<Key> request,
required SourceOfTruth<Key, Output>? sourceOfTruth,
required FactoryFetcher<Key, Output> fetcher,
required Mutex emitMutex,
required StreamController<StoreResponse<Output?>> dataStreamController,
}) =>
Stream.fromFuture(
_shouldStartNetworkStream(request, dataStreamController))
.flatMap((shouldFetchNewValue) => _startNetworkFlow(
shouldFetchNewValue, dataStreamController, fetcher, request))
.listen((response) => emitMutex.protect(() async {
if (response is StoreResponseData<Output>) {
await _writingLock
.protect(() async => _incrementWritingMap(request, 1));
var writerResult = await sourceOfTruth
?.writer(request.key, response.value)
.mapToResponse(ResponseOrigin.fetcher);
if (writerResult is StoreResponseError) {
dataStreamController.add(writerResult.swapType());
await _writingLock
.protect(() async => _incrementWritingMap(request, -1));
}
} else {
dataStreamController.add(response);
}
}));

int _incrementWritingMap(StoreRequest<Key> request, int increment) =>
_writingMap[request.key] = (_writingMap[request.key] ?? 0) + increment;

Stream<StoreResponse<Output>> _startNetworkFlow(
bool shouldFetchNewValue,
StreamController<StoreResponse<dynamic>> dataStreamController,
FactoryFetcher<Key, Output> fetcher,
StoreRequest<Key> request) {
if (shouldFetchNewValue) {
dataStreamController
.add(StoreResponseLoading<Output>(ResponseOrigin.fetcher));
return fetcher.factory(request.key).mapToResponse(ResponseOrigin.fetcher);
} else {
return Rx.never<StoreResponse<Output>>();
}
}

Future<bool> _shouldStartNetworkStream(StoreRequest<Key> request,
StreamController<StoreResponse<Output?>> dataStreamController) async {
if (request.refresh) {
return true;
}
return await dataStreamController.stream
.where((event) => event.origin == ResponseOrigin.sourceOfTruth)
.where((event) => !event.isLoading)
.first
.then((value) => value.data == null);
}

StreamSubscription _generateSourceOfTruthStreamSubscription({
required StoreRequest<Key> request,
required SourceOfTruth<Key, Output> sourceOfTruth,
required Mutex dbLock,
required StreamController<StoreResponse<Output?>> dataStreamController,
}) {
var initialSyncDone = false;
final sourceOfTruthSubscription = sourceOfTruth
.reader(request.key)
.mapToResponse(ResponseOrigin.sourceOfTruth)
.listen((response) async {
if (response is StoreResponseData<Output?>) {
final fetcherData = await _writingLock.protect(() async {
final writingKeyData = (_writingMap[request.key] ?? -1) > 0;
if (writingKeyData) {
_incrementWritingMap(request, -1);
}
return writingKeyData;
});
dataStreamController.add(StoreResponseData(
fetcherData ? ResponseOrigin.fetcher : response.origin,
response.value,
));
} else {
dataStreamController.add(response.swapType());
}
if (dbLock.isLocked && !initialSyncDone) {
initialSyncDone = true;
dbLock.release();
}
});
return sourceOfTruthSubscription;
}
}
Loading

0 comments on commit fa967b9

Please sign in to comment.