Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PE-5029: adds a parameter to by pass the download verification #57

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 136 additions & 118 deletions lib/src/streams/download.dart
Original file line number Diff line number Diff line change
@@ -1,39 +1,13 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:arweave/arweave.dart';
import 'package:arweave/src/utils/graph_ql_utils.dart';
import 'package:async/async.dart';
import 'package:http/http.dart';
import 'http_client/io.dart' if (dart.library.js) 'http_client/browsers.dart';

String gqlGetTxInfo(String txId) => '''
{
transaction(id: "$txId") {
owner {
key
}
data {
size
}
quantity {
winston
}
fee {
winston
}
anchor
signature
recipient
tags {
name
value
}
bundledIn {
id
}
}
}
''';
import 'http_client/io.dart' if (dart.library.js) 'http_client/browsers.dart';

Future<
(
Expand All @@ -43,47 +17,26 @@ Future<
required String txId,
String? gatewayHost = 'arweave.net',
Function(double progress, int speed)? onProgress,
bool verifyDownload = true,
}) async {
final downloadUrl = "https://$gatewayHost/$txId";
final gqlUrl = "https://$gatewayHost/graphql";

final gqlResponse = await post(
Uri.parse(gqlUrl),
headers: {'Content-Type': 'application/json'},
body: jsonEncode({'query': gqlGetTxInfo(txId)}),
);

if (gqlResponse.statusCode != 200) {
throw Exception('Failed to download $txId');
}

var txData = jsonDecode(gqlResponse.body)['data']['transaction'];

final txAnchor = txData['anchor'];
final txOwner = txData['owner']['key'];
final txTarget = txData['recipient'];
final txSignature = txData['signature'];
final txDataSize = int.parse(txData['data']['size']);
final isDataItem = txData['bundledIn'] != null;
final txQuantity = int.parse(txData['quantity']['winston']);
final txReward = int.parse(txData['fee']['winston']);
final downloadedTags = txData['tags'];
final List<Tag> txTags = [];
for (var tag in downloadedTags) {
txTags.add(createTag(tag['name'], tag['value']));
}

int bytesDownloaded = 0;
StreamSubscription<List<int>>? subscription;
final controller = StreamController<List<int>>();

final txData = await _getTransactionData(
txId: txId,
gatewayHost: gatewayHost!,
);

// keep track of progress and download speed
int lastBytes = 0;
setProgressTimer(onProgress) => Timer.periodic(
Duration(milliseconds: 500),
(Timer timer) {
double progress =
double.parse((bytesDownloaded / txDataSize).toStringAsFixed(2));
double progress = double.parse(
(bytesDownloaded / txData.dataSize).toStringAsFixed(2));
int speed = bytesDownloaded - lastBytes;

onProgress(
Expand All @@ -106,46 +59,62 @@ Future<
final client = getClient();
final streamResponse = await client.send(request);

final splitStream = StreamSplitter(streamResponse.stream);
final downloadStream = splitStream.split();
final verificationStream = splitStream.split();

// Calling `close()` indicates that no further streams will be created,
// causing splitStream to function without an internal buffer.
// The future will be completed when both streams are consumed, so we
// shouldn't await it here.
unawaited(splitStream.close());

_verify(
isDataItem: isDataItem,
id: txId,
owner: txOwner,
signature: txSignature,
target: txTarget,
anchor: txAnchor,
tags: txTags,
dataStream: verificationStream.map((list) => Uint8List.fromList(list)),
reward: txReward,
quantity: txQuantity,
dataSize: txDataSize,
).then((isVerified) {
if (!isVerified) {
controller.addError('failed to verify transaction');
}

subscription?.cancel();
controller.close();
if (onProgress != null) {
progressTimer.cancel();
}
});
Stream<List<int>> downloadStream;

if (verifyDownload) {
final splitStream = StreamSplitter(streamResponse.stream);

downloadStream = splitStream.split();
final verificationStream = splitStream.split();

// Calling `close()` indicates that no further streams will be created,
// causing splitStream to function without an internal buffer.
// The future will be completed when both streams are consumed, so we
// shouldn't await it here.
unawaited(splitStream.close());

_verify(
dataStream: verificationStream,
txData: txData,
txId: txId,
).then((isVerified) {
if (!isVerified) {
// TODO: maybe using a custom Exception here would be better? e.g. ValidationException
controller.addError('failed to verify transaction');
}

controller.close();
subscription?.cancel();
});
} else {
/// If we don't need to verify the download, we can just return the
/// stream directly.
downloadStream = streamResponse.stream;
}

subscription = downloadStream.listen(
(List<int> chunk) {
bytesDownloaded += chunk.length;
controller.sink.add(chunk);
},
cancelOnError: true,
onError: (e) {
print('[arweave]: Error downloading $txId');

controller.addError(e);
controller.close();

if (onProgress != null) {
progressTimer.cancel();
}
},
onDone: () {
if (onProgress != null) {
progressTimer.cancel();
}

controller.close();
},
);
}

Expand Down Expand Up @@ -175,40 +144,89 @@ Future<
return (controller.stream, cancelDownload);
}

_verify({
required bool isDataItem,
required String id,
required String owner,
required String signature,
required String target,
required String anchor,
required List<Tag> tags,
// TODO: maybe move this to a separate file? An utils file maybe?
// We problably have the same logic on ardrive-app project. Maybe we can
// create a package with this logic and use it on both projects.
Future<TransactionData> _getTransactionData({
required String txId,
required String gatewayHost,
}) async {
final gqlUrl = "https://$gatewayHost/graphql";

final gqlResponse = await post(
Uri.parse(gqlUrl),
headers: {'Content-Type': 'application/json'},
body: jsonEncode({'query': gqlGetTxInfo(txId)}),
);

if (gqlResponse.statusCode != 200) {
throw Exception('Failed to download $txId');
}

var txDataJson = jsonDecode(gqlResponse.body)['data']['transaction'];

return TransactionData.fromJson(txDataJson);
}

Future<bool> _verify({
required TransactionData txData,
required String txId,
required Stream<List<int>> dataStream,
required int reward,
required int quantity,
required int dataSize,
}) {
if (isDataItem) {
if (txData.isDataItem) {
return verifyDataItem(
id: id,
owner: owner,
signature: signature,
target: target,
anchor: anchor,
tags: tags,
id: txId,
owner: txData.owner,
signature: txData.signature,
target: txData.target,
anchor: txData.anchor,
tags: txData.tags,
dataStream: dataStream.map((list) => Uint8List.fromList(list)));
} else {
return verifyTransaction(
id: id,
owner: owner,
signature: signature,
target: target,
anchor: anchor,
tags: tags,
reward: reward,
quantity: quantity,
dataSize: dataSize,
id: txId,
owner: txData.owner,
signature: txData.signature,
target: txData.target,
anchor: txData.anchor,
tags: txData.tags,
reward: txData.reward,
quantity: txData.quantity,
dataSize: txData.dataSize,
dataStream: dataStream.map((list) => Uint8List.fromList(list)),
);
}
}

class TransactionData {
late String anchor;
late String owner;
late String target;
late String signature;
late int dataSize;
late bool isDataItem;
late int quantity;
late int reward;
late List<Tag> tags;

TransactionData.fromJson(Map<String, dynamic> json) {
parseData(json);
}

void parseData(Map<String, dynamic> jsonData) {
anchor = jsonData['anchor'];
owner = jsonData['owner']['key'];
target = jsonData['recipient'];
signature = jsonData['signature'];
dataSize = int.parse(jsonData['data']['size']);
isDataItem = jsonData['bundledIn'] != null;
quantity = int.parse(jsonData['quantity']['winston']);
reward = int.parse(jsonData['fee']['winston']);

var downloadedTags = jsonData['tags'];
tags = [];
for (var tag in downloadedTags) {
tags.add(createTag(tag['name'], tag['value']));
}
}
}
28 changes: 28 additions & 0 deletions lib/src/utils/graph_ql_utils.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
String gqlGetTxInfo(String txId) => '''
{
transaction(id: "$txId") {
owner {
key
}
data {
size
}
quantity {
winston
}
fee {
winston
}
anchor
signature
recipient
tags {
name
value
}
bundledIn {
id
}
}
}
''';
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: arweave
description: ""
version: 3.8.2
version: 3.8.3
environment:
sdk: ">=3.0.0 <4.0.0"
publish_to: none
Expand Down
Loading