diff --git a/flake.lock b/flake.lock index a0a2e55..96a0677 100644 --- a/flake.lock +++ b/flake.lock @@ -5,11 +5,11 @@ "systems": "systems" }, "locked": { - "lastModified": 1692799911, - "narHash": "sha256-3eihraek4qL744EvQXsK1Ha6C3CR7nnT8X2qWap4RNk=", + "lastModified": 1694529238, + "narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=", "owner": "numtide", "repo": "flake-utils", - "rev": "f9e7cf818399d17d347f847525c5a5a8032e4e44", + "rev": "ff7b65b44d01cf9ba6a71320833626af21126384", "type": "github" }, "original": { @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1694062546, - "narHash": "sha256-PiGI4f2BGnZcedP6slLjCLGLRLXPa9+ogGGgVPfGxys=", + "lastModified": 1695318763, + "narHash": "sha256-FHVPDRP2AfvsxAdc+AsgFJevMz5VBmnZglFUMlxBkcY=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "b200e0df08f80c32974a6108ce431d8a8a5e6547", + "rev": "e12483116b3b51a185a33a272bf351e357ba9a99", "type": "github" }, "original": { diff --git a/lib/src/streams/data_bundle.dart b/lib/src/streams/data_bundle.dart index ce0c546..2961426 100644 --- a/lib/src/streams/data_bundle.dart +++ b/lib/src/streams/data_bundle.dart @@ -5,7 +5,7 @@ import 'dart:typed_data'; import 'package:arweave/utils.dart'; import 'package:fpdart/fpdart.dart'; -import '../utils/bundle_tag_parser.dart'; +import '../crypto/merkle.dart'; import './utils.dart'; import '../models/models.dart'; import 'data_item.dart'; @@ -26,6 +26,7 @@ class TransactionResult { final BigInt reward; final String signature; final Stream Function() chunkStreamGenerator; + final TransactionChunksWithProofs chunks; TransactionResult({ required this.id, @@ -39,6 +40,7 @@ class TransactionResult { required this.reward, required this.signature, required this.chunkStreamGenerator, + required this.chunks, }); Map toJson() { @@ -74,64 +76,79 @@ class DataItemFile { }); } -// typedef TransactionTaskEither -// = TaskEither; -// TransactionTaskEither createTransactionTaskEither({ -// required final Wallet wallet, -// String? anchor, -// required final String target, -// List tags = const [], -// required final BigInt quantity, -// BigInt? reward, -// required final DataStreamGenerator dataStreamGenerator, -// required final int dataSize, -// }) { -// if (anchor == null) { -// anchor = await getTransactionAnchor(); -// -// } -// -// return getOwnerTaskEither(wallet).flatMap((owner) => -// prepareChunksTaskEither(dataStreamGenerator).flatMap((chunksWithProofs) { -// final chunks = chunksWithProofs.chunks; -// final dataRoot = chunksWithProofs.dataRoot; -// -// return deepHashTaskEither([ -// toStream(utf8.encode('2')), // Transaction format -// toStream(decodeBase64ToBytes(owner)), -// toStream(decodeBase64ToBytes(target)), -// toStream(utf8.encode(quantity.toString())), -// toStream(utf8.encode(reward.toString())), -// toStream(decodeBase64ToBytes(anchor)), -// toStream(serializeTags(tags: tags)), -// toStream(utf8.encode(dataSize.toString())), -// toStream(decodeBase64ToBytes(dataRoot)), -// ]).flatMap((signatureData) => -// signDataItemTaskEither(wallet: wallet, signatureData: signatureData) -// .flatMap((signResult) { -// final transaction = TransactionResult( -// id: signResult.id, -// anchor: anchor, -// owner: owner, -// tags: tags, -// target: target, -// quantity: quantity, -// dataRoot: dataRoot, -// dataSize: dataSize, -// reward: reward, -// signature: encodeBytesToBase64(signResult.signature), -// chunkStreamGenerator: () => getChunks( -// dataStreamGenerator(), -// chunks, -// dataRoot, -// dataSize, -// ), -// ); -// -// return TaskEither.of(transaction); -// })); -// })); -// } +typedef TransactionTaskEither + = TaskEither; +TransactionTaskEither createTransactionTaskEither({ + required final Wallet wallet, + String? anchor, + String? target, + List tags = const [], + BigInt? quantity, + BigInt? reward, + required final DataStreamGenerator dataStreamGenerator, + required final int dataSize, +}) { + return getTxAnchor(anchor).flatMap((anchor) => + getTxPrice(reward, dataSize, target).flatMap((reward) => + getOwnerTaskEither(wallet).flatMap((owner) => + prepareChunksTaskEither(dataStreamGenerator) + .flatMap((chunksWithProofs) { + final chunks = chunksWithProofs.chunks; + final dataRoot = chunksWithProofs.dataRoot; + + final bundleTags = [ + createTag('Bundle-Format', 'binary'), + createTag('Bundle-Version', '2.0.0'), + ...tags, + ]; + final finalQuantity = quantity ?? BigInt.zero; + + return deepHashTaskEither([ + utf8.encode("2"), + decodeBase64ToBytes(owner), + decodeBase64ToBytes(target ?? ''), + utf8.encode(finalQuantity.toString()), + utf8.encode(reward.toString()), + decodeBase64ToBytes(anchor), + bundleTags + .map( + (t) => [ + decodeBase64ToBytes(t.name), + decodeBase64ToBytes(t.value), + ], + ) + .toList(), + utf8.encode(dataSize.toString()), + decodeBase64ToBytes(dataRoot), + ]).flatMap((signatureData) { + return signDataItemTaskEither( + wallet: wallet, signatureData: signatureData) + .flatMap((signResult) { + final transaction = TransactionResult( + id: signResult.id, + anchor: anchor, + owner: owner, + tags: bundleTags, + target: target ?? '', + quantity: finalQuantity, + dataRoot: dataRoot, + dataSize: dataSize, + reward: reward, + signature: encodeBytesToBase64(signResult.signature), + chunkStreamGenerator: () => getChunks( + dataStreamGenerator(), + chunks, + dataRoot, + dataSize, + ), + chunks: chunks, + ); + + return TaskEither.of(transaction); + }); + }); + })))); +} typedef BundledDataItemResult = TaskEither; diff --git a/lib/src/streams/data_item.dart b/lib/src/streams/data_item.dart index cab5532..03f3279 100644 --- a/lib/src/streams/data_item.dart +++ b/lib/src/streams/data_item.dart @@ -36,7 +36,7 @@ DataItemTaskEither createDataItemTaskEither({ final tagsBytes = serializeTags(tags: tags); - return deepHashTaskEither([ + return deepHashStreamTaskEither([ toStream(utf8.encode('dataitem')), toStream(utf8.encode('1')), // Transaction format toStream(utf8.encode('1')), diff --git a/lib/src/streams/errors.dart b/lib/src/streams/errors.dart index 8ac8953..5758c15 100644 --- a/lib/src/streams/errors.dart +++ b/lib/src/streams/errors.dart @@ -8,6 +8,8 @@ class InvalidAnchorSizeError extends StreamTransactionError {} class DeepHashStreamError extends StreamTransactionError {} +class DeepHashError extends StreamTransactionError {} + class SignatureError extends StreamTransactionError {} class GetWalletOwnerError extends StreamTransactionError {} @@ -31,3 +33,7 @@ class TransactionGetOwnerError extends StreamTransactionError {} class GetTxAnchorError extends StreamTransactionError {} class GetTxPriceError extends StreamTransactionError {} + +class PostTxHeadersError extends StreamTransactionError {} + +class PostTxChunksError extends StreamTransactionError {} diff --git a/lib/src/streams/streams.dart b/lib/src/streams/streams.dart index 310a965..10c0848 100644 --- a/lib/src/streams/streams.dart +++ b/lib/src/streams/streams.dart @@ -2,4 +2,5 @@ export 'data_bundle.dart'; export 'data_item.dart'; export 'data_models.dart'; export 'deep_hash_stream.dart'; +export 'transaction_uploader.dart'; export 'utils.dart'; diff --git a/lib/src/streams/transaction_uploader.dart b/lib/src/streams/transaction_uploader.dart new file mode 100644 index 0000000..6bd14f8 --- /dev/null +++ b/lib/src/streams/transaction_uploader.dart @@ -0,0 +1,164 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:arweave/arweave.dart'; +import 'package:async/async.dart'; +import 'package:fpdart/fpdart.dart'; +import 'package:http/http.dart'; +import 'package:mutex/mutex.dart'; +import 'package:retry/retry.dart'; + +import '../api/api.dart'; +import '../crypto/crypto.dart'; +import '../utils.dart'; +import 'errors.dart'; + +/// Errors from /chunk we should never try and continue on. +const _fatalChunkUploadErrors = [ + 'invalid_json', + 'chunk_too_big', + 'data_path_too_big', + 'offset_too_big', + 'data_size_too_big', + 'chunk_proof_ratio_not_attractive', + 'invalid_proof' +]; + +TaskEither> uploadTransaction( + TransactionResult transaction) { + final arweave = ArweaveApi(); + final txHeaders = transaction.toJson(); + + return _postTransactionHeaderTaskEither(arweave, txHeaders).flatMap((_) { + print('call post chunks'); + return TaskEither.of(_postChunks(arweave, transaction)); + }); +} + +TaskEither _postTransactionHeaderTaskEither( + ArweaveApi arweave, Map headers) { + return TaskEither.tryCatch(() async { + print('Uploading transaction headers...'); + + final res = await arweave.post('tx', body: json.encode(headers)); + + if (!(res.statusCode >= 200 && res.statusCode < 300)) { + print('Unable to upload transaction: ${res.statusCode}'); + throw Exception('Unable to upload transaction: ${res.statusCode}'); + } + + return res; + }, (error, _) => PostTxHeadersError()); +} + +Stream<(int, int)> _postChunks( + ArweaveApi arweave, + TransactionResult transaction, +) async* { + print('Uploading chunks...'); + final chunkUploadCompletionStreamController = StreamController(); + final chunkStream = transaction.chunkStreamGenerator(); + final chunkQueue = StreamQueue(chunkStream); + final totalChunks = transaction.chunks.chunks.length; + + final maxConcurrentChunkUploadCount = 128; + int chunkIndex = 0; + int uploadedChunks = 0; + isComplete() => uploadedChunks >= totalChunks; + + Future uploadChunkAndNotifyOfCompletion( + int chunkIndex, TransactionChunk chunk) async { + try { + await retry( + () => _uploadChunk( + arweave, + chunkIndex, + chunk, + transaction.chunks.dataRoot, + ), + onRetry: (exception) { + print( + 'Retrying for chunk $chunkIndex on exception ${exception.toString()}', + ); + }, + ); + + print('chunk uploaded'); + + chunkUploadCompletionStreamController.add(chunkIndex); + } catch (err) { + print('Chunk upload failed at $chunkIndex'); + chunkUploadCompletionStreamController.addError(err); + } + } + + final mutex = Mutex(); + + // Initiate as many chunk uploads as we can in parallel at the start. + // Note that onListen is not actually awaited, so should be protected by Mutex + // to ensure that additional chunk uploads are not started before these. + chunkUploadCompletionStreamController.onListen = () async { + await mutex.protect(() async { + while (chunkIndex < totalChunks && + chunkIndex < maxConcurrentChunkUploadCount) { + uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next); + chunkIndex++; + } + }); + }; + + // Start a new chunk upload if there are still any left to upload and + // notify the stream consumer of chunk upload completion events. + yield* chunkUploadCompletionStreamController.stream + .map((completedChunkIndex) { + uploadedChunks++; + + // chunkIndex and chunkQueue must be protected from race conditions by Mutex + // Note that the future is not awaited, so it will be queued after returning + mutex.protect(() async { + if (chunkIndex < totalChunks) { + uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next); + chunkIndex++; + } else if (isComplete()) { + if (await chunkQueue.hasNext) { + throw StateError('Chunks remaining in queue'); + } + chunkQueue.cancel(); + chunkUploadCompletionStreamController.close(); + } + }); + + return (uploadedChunks, totalChunks); + }); +} + +Future _uploadChunk(ArweaveApi arweave, int chunkIndex, + TransactionChunk chunk, Uint8List dataRoot) async { + final chunkValid = await validatePath( + dataRoot, + int.parse(chunk.offset), + 0, + int.parse(chunk.dataSize), + decodeBase64ToBytes(chunk.dataPath), + ); + + if (!chunkValid) { + throw StateError('Unable to validate chunk: $chunkIndex'); + } + + final res = await arweave.post('chunk', body: json.encode(chunk)); + print('Uploaded chunk $chunkIndex: ${res.statusCode}'); + + if (res.statusCode != 200) { + final responseError = getResponseError(res); + + if (_fatalChunkUploadErrors.contains(responseError)) { + throw StateError( + 'Fatal error uploading chunk: $chunkIndex: ${res.statusCode} $responseError'); + } else { + throw Exception( + 'Received non-fatal error while uploading chunk $chunkIndex: ${res.statusCode} $responseError'); + } + } +} diff --git a/lib/src/streams/utils.dart b/lib/src/streams/utils.dart index 32536e2..9ddc097 100644 --- a/lib/src/streams/utils.dart +++ b/lib/src/streams/utils.dart @@ -79,7 +79,7 @@ Stream Function() createByteRangeStream( }; } -TaskEither deepHashTaskEither( +TaskEither deepHashStreamTaskEither( final List> inputs, ) { return TaskEither.tryCatch(() async { @@ -87,6 +87,17 @@ TaskEither deepHashTaskEither( }, (error, _) => DeepHashStreamError()); } +TaskEither deepHashTaskEither( + final List inputs, +) { + return TaskEither.tryCatch(() async { + return await deepHash(inputs); + }, (error, _) { + print(error); + return DeepHashError(); + }); +} + TaskEither signDataItemTaskEither({ required final Wallet wallet, required final Uint8List signatureData,