Skip to content

Commit

Permalink
Merge pull request #53 from ardriveapp/PE-3697
Browse files Browse the repository at this point in the history
PE-3697: Download tx
  • Loading branch information
thiagocarvalhodev authored Oct 26, 2023
2 parents 4aa4377 + 05b7049 commit e2c98c1
Show file tree
Hide file tree
Showing 9 changed files with 482 additions and 60 deletions.
10 changes: 5 additions & 5 deletions lib/src/streams/data_bundle.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import 'dart:typed_data';
import 'package:arweave/utils.dart';
import 'package:fpdart/fpdart.dart';

import '../crypto/merkle.dart';
import './utils.dart';
import '../crypto/merkle.dart';
import '../models/models.dart';
import 'data_item.dart';
import 'data_models.dart';
Expand Down Expand Up @@ -151,24 +151,24 @@ TransactionTaskEither createTransactionTaskEither({
}

typedef BundledDataItemResult
= TaskEither<StreamTransactionError, DataItemResult>;
= Future<TaskEither<StreamTransactionError, DataItemResult>>;
BundledDataItemResult createBundledDataItemTaskEither({
required final Wallet wallet,
required final List<DataItemFile> dataItemFiles,
required final List<Tag> tags,
}) {
}) async {
final List<DataItemResult> dataItemList = [];
final dataItemCount = dataItemFiles.length;
for (var i = 0; i < dataItemCount; i++) {
final dataItem = dataItemFiles[i];
createDataItemTaskEither(
await createDataItemTaskEither(
wallet: wallet,
dataStream: dataItem.streamGenerator,
dataStreamSize: dataItem.dataSize,
target: dataItem.target,
anchor: dataItem.anchor,
tags: dataItem.tags,
).map((dataItem) => dataItemList.add(dataItem));
).map((dataItem) => dataItemList.add(dataItem)).run();
}

return createDataBundleTaskEither(TaskEither.of(dataItemList))
Expand Down
214 changes: 214 additions & 0 deletions lib/src/streams/download.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:arweave/arweave.dart';
import 'package:async/async.dart';
import 'package:http/http.dart';
import 'http_client/io.dart' if (dart.library.js) 'http_client/browsers.dart';

String gqlGetTxInfo(String txId) => '''
{
transaction(id: "$txId") {
owner {
key
}
data {
size
}
quantity {
winston
}
fee {
winston
}
anchor
signature
recipient
tags {
name
value
}
bundledIn {
id
}
}
}
''';

Future<
(
Stream<List<int>>,
void Function(),
)> download({
required String txId,
String? gatewayHost = 'arweave.net',
Function(double progress, int speed)? onProgress,
}) async {
final downloadUrl = "https://$gatewayHost/$txId";
final gqlUrl = "https://$gatewayHost/graphql";

final gqlResponse = await post(
Uri.parse(gqlUrl),
headers: {'Content-Type': 'application/json'},
body: jsonEncode({'query': gqlGetTxInfo(txId)}),
);

if (gqlResponse.statusCode != 200) {
throw Exception('Failed to download $txId');
}

var txData = jsonDecode(gqlResponse.body)['data']['transaction'];

final txAnchor = txData['anchor'];
final txOwner = txData['owner']['key'];
final txTarget = txData['recipient'];
final txSignature = txData['signature'];
final txDataSize = int.parse(txData['data']['size']);
final isDataItem = txData['bundledIn'] != null;
final txQuantity = int.parse(txData['quantity']['winston']);
final txReward = int.parse(txData['fee']['winston']);
final downloadedTags = txData['tags'];
final List<Tag> txTags = [];
for (var tag in downloadedTags) {
txTags.add(createTag(tag['name'], tag['value']));
}

int bytesDownloaded = 0;
StreamSubscription<List<int>>? subscription;
final controller = StreamController<List<int>>();

// keep track of progress and download speed
int lastBytes = 0;
setProgressTimer(onProgress) => Timer.periodic(
Duration(milliseconds: 500),
(Timer timer) {
double progress =
double.parse((bytesDownloaded / txDataSize).toStringAsFixed(2));
int speed = bytesDownloaded - lastBytes;

onProgress(
progress, speed * 2); // multiply by 2 to get bytes per second

lastBytes = bytesDownloaded;
},
);

late Timer progressTimer;

Future<void> startDownload([int startByte = 0]) async {
if (onProgress != null) {
progressTimer = setProgressTimer(onProgress);
}
final request = Request('GET', Uri.parse(downloadUrl));
if (startByte > 0) {
request.headers['Range'] = 'bytes=$startByte-';
}
final client = getClient();
final streamResponse = await client.send(request);

final splitStream = StreamSplitter(streamResponse.stream);
final downloadStream = splitStream.split();
final verificationStream = splitStream.split();

// Calling `close()` indicates that no further streams will be created,
// causing splitStream to function without an internal buffer.
// The future will be completed when both streams are consumed, so we
// shouldn't await it here.
unawaited(splitStream.close());

_verify(
isDataItem: isDataItem,
id: txId,
owner: txOwner,
signature: txSignature,
target: txTarget,
anchor: txAnchor,
tags: txTags,
dataStream: verificationStream.map((list) => Uint8List.fromList(list)),
reward: txReward,
quantity: txQuantity,
dataSize: txDataSize,
).then((isVerified) {
if (!isVerified) {
controller.addError('failed to verify transaction');
}

subscription?.cancel();
controller.close();
if (onProgress != null) {
progressTimer.cancel();
}
});

subscription = downloadStream.listen(
(List<int> chunk) {
bytesDownloaded += chunk.length;
controller.sink.add(chunk);
},
cancelOnError: true,
);
}

// TODO: expose pause and resume after implementing them for verification
// void pauseDownload() {
// subscription?.cancel();
// if (onProgress != null) {
// progressTimer.cancel();
// }
// }
//
// void resumeDownload() {
// startDownload(bytesDownloaded);
// }

void cancelDownload() {
controller.addError('download cancelled');
subscription?.cancel();
controller.close();
if (onProgress != null) {
progressTimer.cancel();
}
}

startDownload();

return (controller.stream, cancelDownload);
}

_verify({
required bool isDataItem,
required String id,
required String owner,
required String signature,
required String target,
required String anchor,
required List<Tag> tags,
required Stream<List<int>> dataStream,
required int reward,
required int quantity,
required int dataSize,
}) {
if (isDataItem) {
return verifyDataItem(
id: id,
owner: owner,
signature: signature,
target: target,
anchor: anchor,
tags: tags,
dataStream: dataStream.map((list) => Uint8List.fromList(list)));
} else {
return verifyTransaction(
id: id,
owner: owner,
signature: signature,
target: target,
anchor: anchor,
tags: tags,
reward: reward,
quantity: quantity,
dataSize: dataSize,
dataStream: dataStream.map((list) => Uint8List.fromList(list)),
);
}
}
7 changes: 7 additions & 0 deletions lib/src/streams/http_client/browsers.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import 'package:http/http.dart';
import 'package:fetch_client/fetch_client.dart';

Client getClient() => FetchClient(
mode: RequestMode.cors,
// streamRequests: true,
);
3 changes: 3 additions & 0 deletions lib/src/streams/http_client/io.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import 'package:http/http.dart';

Client getClient() => Client();
1 change: 1 addition & 0 deletions lib/src/streams/streams.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export 'data_item.dart';
export 'data_models.dart';
export 'deep_hash_stream.dart';
export 'transaction_uploader.dart';
export 'download.dart';
export 'utils.dart';
Loading

0 comments on commit e2c98c1

Please sign in to comment.