diff --git a/lib/src/streams/data_bundle.dart b/lib/src/streams/data_bundle.dart index 5689d8d..ca9e9d1 100644 --- a/lib/src/streams/data_bundle.dart +++ b/lib/src/streams/data_bundle.dart @@ -5,8 +5,8 @@ import 'dart:typed_data'; import 'package:arweave/utils.dart'; import 'package:fpdart/fpdart.dart'; -import '../crypto/merkle.dart'; import './utils.dart'; +import '../crypto/merkle.dart'; import '../models/models.dart'; import 'data_item.dart'; import 'data_models.dart'; @@ -151,24 +151,24 @@ TransactionTaskEither createTransactionTaskEither({ } typedef BundledDataItemResult - = TaskEither; + = Future>; BundledDataItemResult createBundledDataItemTaskEither({ required final Wallet wallet, required final List dataItemFiles, required final List tags, -}) { +}) async { final List dataItemList = []; final dataItemCount = dataItemFiles.length; for (var i = 0; i < dataItemCount; i++) { final dataItem = dataItemFiles[i]; - createDataItemTaskEither( + await createDataItemTaskEither( wallet: wallet, dataStream: dataItem.streamGenerator, dataStreamSize: dataItem.dataSize, target: dataItem.target, anchor: dataItem.anchor, tags: dataItem.tags, - ).map((dataItem) => dataItemList.add(dataItem)); + ).map((dataItem) => dataItemList.add(dataItem)).run(); } return createDataBundleTaskEither(TaskEither.of(dataItemList)) diff --git a/lib/src/streams/download.dart b/lib/src/streams/download.dart new file mode 100644 index 0000000..07c4d97 --- /dev/null +++ b/lib/src/streams/download.dart @@ -0,0 +1,214 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; +import 'package:arweave/arweave.dart'; +import 'package:async/async.dart'; +import 'package:http/http.dart'; +import 'http_client/io.dart' if (dart.library.js) 'http_client/browsers.dart'; + +String gqlGetTxInfo(String txId) => ''' +{ + transaction(id: "$txId") { + owner { + key + } + data { + size + } + quantity { + winston + } + fee { + winston + } + anchor + signature + recipient + tags { + name + value + } + bundledIn { + id + } + } +} +'''; + +Future< + ( + Stream>, + void Function(), + )> download({ + required String txId, + String? gatewayHost = 'arweave.net', + Function(double progress, int speed)? onProgress, +}) async { + final downloadUrl = "https://$gatewayHost/$txId"; + final gqlUrl = "https://$gatewayHost/graphql"; + + final gqlResponse = await post( + Uri.parse(gqlUrl), + headers: {'Content-Type': 'application/json'}, + body: jsonEncode({'query': gqlGetTxInfo(txId)}), + ); + + if (gqlResponse.statusCode != 200) { + throw Exception('Failed to download $txId'); + } + + var txData = jsonDecode(gqlResponse.body)['data']['transaction']; + + final txAnchor = txData['anchor']; + final txOwner = txData['owner']['key']; + final txTarget = txData['recipient']; + final txSignature = txData['signature']; + final txDataSize = int.parse(txData['data']['size']); + final isDataItem = txData['bundledIn'] != null; + final txQuantity = int.parse(txData['quantity']['winston']); + final txReward = int.parse(txData['fee']['winston']); + final downloadedTags = txData['tags']; + final List txTags = []; + for (var tag in downloadedTags) { + txTags.add(createTag(tag['name'], tag['value'])); + } + + int bytesDownloaded = 0; + StreamSubscription>? subscription; + final controller = StreamController>(); + + // keep track of progress and download speed + int lastBytes = 0; + setProgressTimer(onProgress) => Timer.periodic( + Duration(milliseconds: 500), + (Timer timer) { + double progress = + double.parse((bytesDownloaded / txDataSize).toStringAsFixed(2)); + int speed = bytesDownloaded - lastBytes; + + onProgress( + progress, speed * 2); // multiply by 2 to get bytes per second + + lastBytes = bytesDownloaded; + }, + ); + + late Timer progressTimer; + + Future startDownload([int startByte = 0]) async { + if (onProgress != null) { + progressTimer = setProgressTimer(onProgress); + } + final request = Request('GET', Uri.parse(downloadUrl)); + if (startByte > 0) { + request.headers['Range'] = 'bytes=$startByte-'; + } + final client = getClient(); + final streamResponse = await client.send(request); + + final splitStream = StreamSplitter(streamResponse.stream); + final downloadStream = splitStream.split(); + final verificationStream = splitStream.split(); + + // Calling `close()` indicates that no further streams will be created, + // causing splitStream to function without an internal buffer. + // The future will be completed when both streams are consumed, so we + // shouldn't await it here. + unawaited(splitStream.close()); + + _verify( + isDataItem: isDataItem, + id: txId, + owner: txOwner, + signature: txSignature, + target: txTarget, + anchor: txAnchor, + tags: txTags, + dataStream: verificationStream.map((list) => Uint8List.fromList(list)), + reward: txReward, + quantity: txQuantity, + dataSize: txDataSize, + ).then((isVerified) { + if (!isVerified) { + controller.addError('failed to verify transaction'); + } + + subscription?.cancel(); + controller.close(); + if (onProgress != null) { + progressTimer.cancel(); + } + }); + + subscription = downloadStream.listen( + (List chunk) { + bytesDownloaded += chunk.length; + controller.sink.add(chunk); + }, + cancelOnError: true, + ); + } + + // TODO: expose pause and resume after implementing them for verification + // void pauseDownload() { + // subscription?.cancel(); + // if (onProgress != null) { + // progressTimer.cancel(); + // } + // } + // + // void resumeDownload() { + // startDownload(bytesDownloaded); + // } + + void cancelDownload() { + controller.addError('download cancelled'); + subscription?.cancel(); + controller.close(); + if (onProgress != null) { + progressTimer.cancel(); + } + } + + startDownload(); + + return (controller.stream, cancelDownload); +} + +_verify({ + required bool isDataItem, + required String id, + required String owner, + required String signature, + required String target, + required String anchor, + required List tags, + required Stream> dataStream, + required int reward, + required int quantity, + required int dataSize, +}) { + if (isDataItem) { + return verifyDataItem( + id: id, + owner: owner, + signature: signature, + target: target, + anchor: anchor, + tags: tags, + dataStream: dataStream.map((list) => Uint8List.fromList(list))); + } else { + return verifyTransaction( + id: id, + owner: owner, + signature: signature, + target: target, + anchor: anchor, + tags: tags, + reward: reward, + quantity: quantity, + dataSize: dataSize, + dataStream: dataStream.map((list) => Uint8List.fromList(list)), + ); + } +} diff --git a/lib/src/streams/http_client/browsers.dart b/lib/src/streams/http_client/browsers.dart new file mode 100644 index 0000000..7a777fc --- /dev/null +++ b/lib/src/streams/http_client/browsers.dart @@ -0,0 +1,7 @@ +import 'package:http/http.dart'; +import 'package:fetch_client/fetch_client.dart'; + +Client getClient() => FetchClient( + mode: RequestMode.cors, + // streamRequests: true, + ); diff --git a/lib/src/streams/http_client/io.dart b/lib/src/streams/http_client/io.dart new file mode 100644 index 0000000..675307f --- /dev/null +++ b/lib/src/streams/http_client/io.dart @@ -0,0 +1,3 @@ +import 'package:http/http.dart'; + +Client getClient() => Client(); diff --git a/lib/src/streams/streams.dart b/lib/src/streams/streams.dart index 10c0848..b4f2056 100644 --- a/lib/src/streams/streams.dart +++ b/lib/src/streams/streams.dart @@ -3,4 +3,5 @@ export 'data_item.dart'; export 'data_models.dart'; export 'deep_hash_stream.dart'; export 'transaction_uploader.dart'; +export 'download.dart'; export 'utils.dart'; diff --git a/lib/src/streams/transaction_uploader.dart b/lib/src/streams/transaction_uploader.dart index 794afeb..36e397a 100644 --- a/lib/src/streams/transaction_uploader.dart +++ b/lib/src/streams/transaction_uploader.dart @@ -4,8 +4,8 @@ import 'dart:typed_data'; import 'package:arweave/arweave.dart'; import 'package:async/async.dart'; +import 'package:dio/dio.dart'; import 'package:fpdart/fpdart.dart'; -import 'package:http/http.dart'; import 'package:mutex/mutex.dart'; import 'package:retry/retry.dart'; @@ -25,22 +25,31 @@ const _fatalChunkUploadErrors = [ 'invalid_proof' ]; -TaskEither> uploadTransaction( - TransactionResult transaction) { +TaskEither, UploadAborter)> + uploadTransaction(TransactionResult transaction) { final arweave = ArweaveApi(); final txHeaders = transaction.toJson(); + final aborter = UploadAborter(); return _postTransactionHeaderTaskEither(arweave, txHeaders).flatMap((_) { - return TaskEither.of(_postChunks(arweave, transaction)); + return TaskEither.of((_postChunks(arweave, transaction, aborter), aborter)); }); } +// TODO: Add Dio to the ArweaveAPI class and use that instead of creating a new +// instance here. TaskEither _postTransactionHeaderTaskEither( ArweaveApi arweave, Map headers) { return TaskEither.tryCatch(() async { - final res = await arweave.post('tx', body: json.encode(headers)); + final endpoint = 'https://arweave.net/tx'; + final Dio dio = Dio(); + final res = await dio.post(endpoint, data: json.encode(headers)); - if (!(res.statusCode >= 200 && res.statusCode < 300)) { + if (res.statusCode == null) { + throw Exception('Unable to upload transaction: ${res.statusCode}'); + } + + if (!(res.statusCode! >= 200 && res.statusCode! < 300)) { throw Exception('Unable to upload transaction: ${res.statusCode}'); } @@ -51,10 +60,18 @@ TaskEither _postTransactionHeaderTaskEither( Stream<(int, int)> _postChunks( ArweaveApi arweave, TransactionResult transaction, + UploadAborter aborter, ) async* { final chunkUploadCompletionStreamController = StreamController(); final chunkStream = transaction.chunkStreamGenerator(); final chunkQueue = StreamQueue(chunkStream); + + if (aborter.aborted) { + throw StateError('Upload aborted'); + } + + aborter.setChunkQueue(chunkQueue); + final totalChunks = transaction.chunks.chunks.length; final maxConcurrentChunkUploadCount = 128; @@ -62,29 +79,6 @@ Stream<(int, int)> _postChunks( int uploadedChunks = 0; isComplete() => uploadedChunks >= totalChunks; - Future uploadChunkAndNotifyOfCompletion( - int chunkIndex, TransactionChunk chunk) async { - try { - await retry( - () => _uploadChunk( - arweave, - chunkIndex, - chunk, - transaction.chunks.dataRoot, - ), - onRetry: (exception) { - print( - 'Retrying for chunk $chunkIndex on exception ${exception.toString()}', - ); - }, - ); - - chunkUploadCompletionStreamController.add(chunkIndex); - } catch (err) { - chunkUploadCompletionStreamController.addError(err); - } - } - final mutex = Mutex(); // Initiate as many chunk uploads as we can in parallel at the start. @@ -94,7 +88,16 @@ Stream<(int, int)> _postChunks( await mutex.protect(() async { while (chunkIndex < totalChunks && chunkIndex < maxConcurrentChunkUploadCount) { - uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next); + final chunkUploader = + ChunkUploader(transaction, chunkUploadCompletionStreamController); + + aborter.addChunk(chunkUploader); + + chunkUploader + .uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next) + .then((value) { + aborter.removeChunk(chunkUploader); + }); chunkIndex++; } }); @@ -110,7 +113,15 @@ Stream<(int, int)> _postChunks( // Note that the future is not awaited, so it will be queued after returning mutex.protect(() async { if (chunkIndex < totalChunks) { - uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next); + final chunkUploader = + ChunkUploader(transaction, chunkUploadCompletionStreamController); + aborter.addChunk(chunkUploader); + + chunkUploader + .uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next) + .then((value) { + aborter.removeChunk(chunkUploader); + }); chunkIndex++; } else if (isComplete()) { if (await chunkQueue.hasNext) { @@ -125,31 +136,111 @@ Stream<(int, int)> _postChunks( }); } -Future _uploadChunk(ArweaveApi arweave, int chunkIndex, - TransactionChunk chunk, Uint8List dataRoot) async { - final chunkValid = await validatePath( - dataRoot, - int.parse(chunk.offset), - 0, - int.parse(chunk.dataSize), - decodeBase64ToBytes(chunk.dataPath), - ); - - if (!chunkValid) { - throw StateError('Unable to validate chunk: $chunkIndex'); +class UploadAborter { + StreamQueue? chunkQueue; + List chunkUploaders = []; + bool aborted = false; + + void setChunkQueue(StreamQueue queue) { + chunkQueue = queue; } - final res = await arweave.post('chunk', body: json.encode(chunk)); + void addChunk(ChunkUploader chunk) { + chunkUploaders.add(chunk); + } - if (res.statusCode != 200) { - final responseError = getResponseError(res); + void removeChunk(ChunkUploader chunk) { + chunkUploaders.remove(chunk); + } - if (_fatalChunkUploadErrors.contains(responseError)) { - throw StateError( - 'Fatal error uploading chunk: $chunkIndex: ${res.statusCode} $responseError'); - } else { - throw Exception( - 'Received non-fatal error while uploading chunk $chunkIndex: ${res.statusCode} $responseError'); + Future abort() async { + for (var chunk in chunkUploaders) { + chunk.cancel(); } + + chunkQueue?.cancel(immediate: true); + + aborted = true; + } +} + +class ChunkUploader { + final TransactionResult transaction; + final StreamController chunkUploadCompletionStreamController; + final CancelToken _cancelToken = CancelToken(); + + ChunkUploader(this.transaction, this.chunkUploadCompletionStreamController); + + Future uploadChunkAndNotifyOfCompletion( + int chunkIndex, TransactionChunk chunk) async { + try { + await retry( + () => _uploadChunk( + chunkIndex, + chunk, + transaction.chunks.dataRoot, + ), + onRetry: (exception) { + if (exception is DioException) { + if (exception.type == DioExceptionType.cancel) { + throw exception; + } + } + print( + 'Retrying for chunk $chunkIndex on exception ${exception.toString()}', + ); + }, + ); + + chunkUploadCompletionStreamController.add(chunkIndex); + } catch (err) { + chunkUploadCompletionStreamController.addError(err); + } + } + + Future _uploadChunk( + int chunkIndex, + TransactionChunk chunk, + Uint8List dataRoot, + ) async { + final chunkValid = await validatePath( + dataRoot, + int.parse(chunk.offset), + 0, + int.parse(chunk.dataSize), + decodeBase64ToBytes(chunk.dataPath), + ); + + if (!chunkValid) { + throw StateError('Unable to validate chunk: $chunkIndex'); + } + + // TODO: Add Dio to the ArweaveAPI class and use that instead of creating a new + // instance here. + final dio = Dio(); + + final endpoint = 'https://arweave.net/chunk'; + + final res = await dio.post( + endpoint, + data: json.encode(chunk), + cancelToken: _cancelToken, + ); + + if (res.statusCode != 200) { + final responseError = getResponseErrorFromDioRespose(res); + + if (_fatalChunkUploadErrors.contains(responseError)) { + throw StateError( + 'Fatal error uploading chunk: $chunkIndex: ${res.statusCode} $responseError'); + } else { + throw Exception( + 'Received non-fatal error while uploading chunk $chunkIndex: ${res.statusCode} $responseError'); + } + } + } + + void cancel() { + _cancelToken.cancel(); } } diff --git a/lib/src/streams/utils.dart b/lib/src/streams/utils.dart index c5464c1..be7a86f 100644 --- a/lib/src/streams/utils.dart +++ b/lib/src/streams/utils.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'dart:typed_data'; import 'package:arweave/src/api/api.dart'; @@ -266,3 +267,87 @@ TaskEither getTxPrice( .then((res) => BigInt.parse(res.body)); }, (error, _) => GetTxPriceError()); } + +Future verifyDataItem({ + required String id, + required String owner, + required String signature, + required String target, + required String anchor, + required List tags, + required Stream dataStream, +}) async { + final signatureData = await deepHashStream([ + toStream(utf8.encode('dataitem')), + toStream(utf8.encode('1')), // Transaction format + toStream(utf8.encode('1')), // Signature type + toStream(decodeBase64ToBytes(owner)), + toStream(decodeBase64ToBytes(target)), + toStream(decodeBase64ToBytes(anchor)), + toStream(serializeTags(tags: tags)), + dataStream, + ]); + final claimedSignatureBytes = decodeBase64ToBytes(signature); + + final idHash = await sha256.hash(claimedSignatureBytes); + final expectedId = encodeBytesToBase64(idHash.bytes); + + if (expectedId != id) { + return false; + } + + return rsaPssVerify( + input: signatureData, + signature: claimedSignatureBytes, + modulus: decodeBase64ToBigInt(owner), + publicExponent: BigInt.from(65537)); +} + +Future verifyTransaction({ + required String id, + required String owner, + required String signature, + required String target, + required String anchor, + required int quantity, + required int reward, + required int dataSize, + required List tags, + required Stream dataStream, +}) async { + final chunksWithProofs = + await generateTransactionChunksFromStream(dataStream); + + final signatureData = await deepHash([ + utf8.encode("2"), + decodeBase64ToBytes(owner), + decodeBase64ToBytes(target), + utf8.encode(quantity.toString()), + utf8.encode(reward.toString()), + decodeBase64ToBytes(anchor), + tags + .map( + (t) => [ + decodeBase64ToBytes(t.name), + decodeBase64ToBytes(t.value), + ], + ) + .toList(), + utf8.encode(dataSize.toString()), + chunksWithProofs.dataRoot, + ]); + + final claimedSignatureBytes = decodeBase64ToBytes(signature); + + final idHash = await sha256.hash(claimedSignatureBytes); + final expectedId = encodeBytesToBase64(idHash.bytes); + + if (id != expectedId) return false; + + return rsaPssVerify( + input: signatureData, + signature: claimedSignatureBytes, + modulus: decodeBase64ToBigInt(owner), + publicExponent: publicExponent, + ); +} diff --git a/lib/src/utils.dart b/lib/src/utils.dart index c9fb29f..53ce4f7 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -1,7 +1,8 @@ import 'dart:convert'; import 'dart:typed_data'; -import 'package:http/http.dart'; +import 'package:dio/dio.dart'; +import 'package:http/http.dart' as http; import 'crypto/crypto.dart'; @@ -96,7 +97,7 @@ String winstonToAr(BigInt winston) { } /// Safely get the error from an Arweave HTTP response. -String getResponseError(Response res) { +String getResponseError(http.Response res) { if (res.headers['Content-Type'] == 'application/json') { Map errJson = json.decode(res.body); @@ -110,6 +111,20 @@ String getResponseError(Response res) { return res.body; } +String getResponseErrorFromDioRespose(Response res) { + if (res.headers['Content-Type'] == 'application/json') { + Map errJson = json.decode(res.data); + + if (errJson['data'] != null) { + return errJson['data'] is Map + ? errJson['data']['error'] + : errJson['data']; + } + } + + return res.data; +} + Future ownerToAddress(String owner) async => encodeBytesToBase64( await sha256.hash(decodeBase64ToBytes(owner)).then((res) => res.bytes)); diff --git a/pubspec.yaml b/pubspec.yaml index 34ee432..2cce12a 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -3,10 +3,11 @@ description: "" version: 3.7.0 environment: sdk: ">=3.0.0 <4.0.0" +publish_to: none dependencies: cryptography: ^2.5.0 - http: ^0.13.3 + http: ^1.1.0 json_annotation: ^4.0.1 jwk: ^0.1.0 meta: ^1.7.0 @@ -21,6 +22,11 @@ dependencies: hash: ^1.0.4 fpdart: ^1.1.0 better_cryptography: ^1.0.0+1 + fetch_client: + git: + url: https://github.com/karlprieb/fetch_client.git + ref: ignore-headers + dio: ^5.3.3 dev_dependencies: build_runner: ^2.0.4