Skip to content

Commit

Permalink
transaction uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Sep 27, 2023
1 parent 24fccb7 commit ccd200c
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 67 deletions.
12 changes: 6 additions & 6 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 76 additions & 59 deletions lib/src/streams/data_bundle.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import 'dart:typed_data';
import 'package:arweave/utils.dart';
import 'package:fpdart/fpdart.dart';

import '../utils/bundle_tag_parser.dart';
import '../crypto/merkle.dart';
import './utils.dart';
import '../models/models.dart';
import 'data_item.dart';
Expand All @@ -26,6 +26,7 @@ class TransactionResult {
final BigInt reward;
final String signature;
final Stream<TransactionChunk> Function() chunkStreamGenerator;
final TransactionChunksWithProofs chunks;

TransactionResult({
required this.id,
Expand All @@ -39,6 +40,7 @@ class TransactionResult {
required this.reward,
required this.signature,
required this.chunkStreamGenerator,
required this.chunks,
});

Map<String, dynamic> toJson() {
Expand Down Expand Up @@ -74,64 +76,79 @@ class DataItemFile {
});
}

// typedef TransactionTaskEither
// = TaskEither<StreamTransactionError, TransactionResult>;
// TransactionTaskEither createTransactionTaskEither({
// required final Wallet wallet,
// String? anchor,
// required final String target,
// List<Tag> tags = const [],
// required final BigInt quantity,
// BigInt? reward,
// required final DataStreamGenerator dataStreamGenerator,
// required final int dataSize,
// }) {
// if (anchor == null) {
// anchor = await getTransactionAnchor();
//
// }
//
// return getOwnerTaskEither(wallet).flatMap((owner) =>
// prepareChunksTaskEither(dataStreamGenerator).flatMap((chunksWithProofs) {
// final chunks = chunksWithProofs.chunks;
// final dataRoot = chunksWithProofs.dataRoot;
//
// return deepHashTaskEither([
// toStream(utf8.encode('2')), // Transaction format
// toStream(decodeBase64ToBytes(owner)),
// toStream(decodeBase64ToBytes(target)),
// toStream(utf8.encode(quantity.toString())),
// toStream(utf8.encode(reward.toString())),
// toStream(decodeBase64ToBytes(anchor)),
// toStream(serializeTags(tags: tags)),
// toStream(utf8.encode(dataSize.toString())),
// toStream(decodeBase64ToBytes(dataRoot)),
// ]).flatMap((signatureData) =>
// signDataItemTaskEither(wallet: wallet, signatureData: signatureData)
// .flatMap((signResult) {
// final transaction = TransactionResult(
// id: signResult.id,
// anchor: anchor,
// owner: owner,
// tags: tags,
// target: target,
// quantity: quantity,
// dataRoot: dataRoot,
// dataSize: dataSize,
// reward: reward,
// signature: encodeBytesToBase64(signResult.signature),
// chunkStreamGenerator: () => getChunks(
// dataStreamGenerator(),
// chunks,
// dataRoot,
// dataSize,
// ),
// );
//
// return TaskEither.of(transaction);
// }));
// }));
// }
typedef TransactionTaskEither
= TaskEither<StreamTransactionError, TransactionResult>;
TransactionTaskEither createTransactionTaskEither({
required final Wallet wallet,
String? anchor,
String? target,
List<Tag> tags = const [],
BigInt? quantity,
BigInt? reward,
required final DataStreamGenerator dataStreamGenerator,
required final int dataSize,
}) {
return getTxAnchor(anchor).flatMap((anchor) =>
getTxPrice(reward, dataSize, target).flatMap((reward) =>
getOwnerTaskEither(wallet).flatMap((owner) =>
prepareChunksTaskEither(dataStreamGenerator)
.flatMap((chunksWithProofs) {
final chunks = chunksWithProofs.chunks;
final dataRoot = chunksWithProofs.dataRoot;

final bundleTags = [
createTag('Bundle-Format', 'binary'),
createTag('Bundle-Version', '2.0.0'),
...tags,
];
final finalQuantity = quantity ?? BigInt.zero;

return deepHashTaskEither([
utf8.encode("2"),
decodeBase64ToBytes(owner),
decodeBase64ToBytes(target ?? ''),
utf8.encode(finalQuantity.toString()),
utf8.encode(reward.toString()),
decodeBase64ToBytes(anchor),
bundleTags
.map(
(t) => [
decodeBase64ToBytes(t.name),
decodeBase64ToBytes(t.value),
],
)
.toList(),
utf8.encode(dataSize.toString()),
decodeBase64ToBytes(dataRoot),
]).flatMap((signatureData) {
return signDataItemTaskEither(
wallet: wallet, signatureData: signatureData)
.flatMap((signResult) {
final transaction = TransactionResult(
id: signResult.id,
anchor: anchor,
owner: owner,
tags: bundleTags,
target: target ?? '',
quantity: finalQuantity,
dataRoot: dataRoot,
dataSize: dataSize,
reward: reward,
signature: encodeBytesToBase64(signResult.signature),
chunkStreamGenerator: () => getChunks(
dataStreamGenerator(),
chunks,
dataRoot,
dataSize,
),
chunks: chunks,
);

return TaskEither.of(transaction);
});
});
}))));
}

typedef BundledDataItemResult
= TaskEither<StreamTransactionError, DataItemResult>;
Expand Down
2 changes: 1 addition & 1 deletion lib/src/streams/data_item.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ DataItemTaskEither createDataItemTaskEither({

final tagsBytes = serializeTags(tags: tags);

return deepHashTaskEither([
return deepHashStreamTaskEither([
toStream(utf8.encode('dataitem')),
toStream(utf8.encode('1')), // Transaction format
toStream(utf8.encode('1')),
Expand Down
6 changes: 6 additions & 0 deletions lib/src/streams/errors.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class InvalidAnchorSizeError extends StreamTransactionError {}

class DeepHashStreamError extends StreamTransactionError {}

class DeepHashError extends StreamTransactionError {}

class SignatureError extends StreamTransactionError {}

class GetWalletOwnerError extends StreamTransactionError {}
Expand All @@ -31,3 +33,7 @@ class TransactionGetOwnerError extends StreamTransactionError {}
class GetTxAnchorError extends StreamTransactionError {}

class GetTxPriceError extends StreamTransactionError {}

class PostTxHeadersError extends StreamTransactionError {}

class PostTxChunksError extends StreamTransactionError {}
1 change: 1 addition & 0 deletions lib/src/streams/streams.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export 'data_bundle.dart';
export 'data_item.dart';
export 'data_models.dart';
export 'deep_hash_stream.dart';
export 'transaction_uploader.dart';
export 'utils.dart';
164 changes: 164 additions & 0 deletions lib/src/streams/transaction_uploader.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:arweave/arweave.dart';
import 'package:async/async.dart';
import 'package:fpdart/fpdart.dart';
import 'package:http/http.dart';
import 'package:mutex/mutex.dart';
import 'package:retry/retry.dart';

import '../api/api.dart';
import '../crypto/crypto.dart';
import '../utils.dart';
import 'errors.dart';

/// Errors from /chunk we should never try and continue on.
const _fatalChunkUploadErrors = [
'invalid_json',
'chunk_too_big',
'data_path_too_big',
'offset_too_big',
'data_size_too_big',
'chunk_proof_ratio_not_attractive',
'invalid_proof'
];

TaskEither<StreamTransactionError, Stream<(int, int)>> uploadTransaction(
TransactionResult transaction) {
final arweave = ArweaveApi();
final txHeaders = transaction.toJson();

return _postTransactionHeaderTaskEither(arweave, txHeaders).flatMap((_) {
print('call post chunks');
return TaskEither.of(_postChunks(arweave, transaction));
});
}

TaskEither<StreamTransactionError, Response> _postTransactionHeaderTaskEither(
ArweaveApi arweave, Map<String, dynamic> headers) {
return TaskEither.tryCatch(() async {
print('Uploading transaction headers...');

final res = await arweave.post('tx', body: json.encode(headers));

if (!(res.statusCode >= 200 && res.statusCode < 300)) {
print('Unable to upload transaction: ${res.statusCode}');
throw Exception('Unable to upload transaction: ${res.statusCode}');
}

return res;
}, (error, _) => PostTxHeadersError());
}

Stream<(int, int)> _postChunks(
ArweaveApi arweave,
TransactionResult transaction,
) async* {
print('Uploading chunks...');
final chunkUploadCompletionStreamController = StreamController<int>();
final chunkStream = transaction.chunkStreamGenerator();
final chunkQueue = StreamQueue(chunkStream);
final totalChunks = transaction.chunks.chunks.length;

final maxConcurrentChunkUploadCount = 128;
int chunkIndex = 0;
int uploadedChunks = 0;
isComplete() => uploadedChunks >= totalChunks;

Future<void> uploadChunkAndNotifyOfCompletion(
int chunkIndex, TransactionChunk chunk) async {
try {
await retry(
() => _uploadChunk(
arweave,
chunkIndex,
chunk,
transaction.chunks.dataRoot,
),
onRetry: (exception) {
print(
'Retrying for chunk $chunkIndex on exception ${exception.toString()}',
);
},
);

print('chunk uploaded');

chunkUploadCompletionStreamController.add(chunkIndex);
} catch (err) {
print('Chunk upload failed at $chunkIndex');
chunkUploadCompletionStreamController.addError(err);
}
}

final mutex = Mutex();

// Initiate as many chunk uploads as we can in parallel at the start.
// Note that onListen is not actually awaited, so should be protected by Mutex
// to ensure that additional chunk uploads are not started before these.
chunkUploadCompletionStreamController.onListen = () async {
await mutex.protect(() async {
while (chunkIndex < totalChunks &&
chunkIndex < maxConcurrentChunkUploadCount) {
uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next);
chunkIndex++;
}
});
};

// Start a new chunk upload if there are still any left to upload and
// notify the stream consumer of chunk upload completion events.
yield* chunkUploadCompletionStreamController.stream
.map((completedChunkIndex) {
uploadedChunks++;

// chunkIndex and chunkQueue must be protected from race conditions by Mutex
// Note that the future is not awaited, so it will be queued after returning
mutex.protect(() async {
if (chunkIndex < totalChunks) {
uploadChunkAndNotifyOfCompletion(chunkIndex, await chunkQueue.next);
chunkIndex++;
} else if (isComplete()) {
if (await chunkQueue.hasNext) {
throw StateError('Chunks remaining in queue');
}
chunkQueue.cancel();
chunkUploadCompletionStreamController.close();
}
});

return (uploadedChunks, totalChunks);
});
}

Future<void> _uploadChunk(ArweaveApi arweave, int chunkIndex,
TransactionChunk chunk, Uint8List dataRoot) async {
final chunkValid = await validatePath(
dataRoot,
int.parse(chunk.offset),
0,
int.parse(chunk.dataSize),
decodeBase64ToBytes(chunk.dataPath),
);

if (!chunkValid) {
throw StateError('Unable to validate chunk: $chunkIndex');
}

final res = await arweave.post('chunk', body: json.encode(chunk));
print('Uploaded chunk $chunkIndex: ${res.statusCode}');

if (res.statusCode != 200) {
final responseError = getResponseError(res);

if (_fatalChunkUploadErrors.contains(responseError)) {
throw StateError(
'Fatal error uploading chunk: $chunkIndex: ${res.statusCode} $responseError');
} else {
throw Exception(
'Received non-fatal error while uploading chunk $chunkIndex: ${res.statusCode} $responseError');
}
}
}
Loading

0 comments on commit ccd200c

Please sign in to comment.