Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Sep 21, 2023
1 parent 7183b84 commit a8d8e68
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 39 deletions.
130 changes: 118 additions & 12 deletions lib/src/streams/data_bundle.dart
Original file line number Diff line number Diff line change
@@ -1,15 +1,63 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:arweave/utils.dart';
import 'package:fpdart/fpdart.dart';

import '../utils/bundle_tag_parser.dart';
import './utils.dart';
import '../models/models.dart';
import 'data_item.dart';
import 'data_models.dart';
import 'errors.dart';

class TransactionResult {
static const int format = 2;

final String id;
final String anchor;
final String owner;
final List<Tag> tags;
final String target;
final BigInt quantity;
final String dataRoot;
final int dataSize;
final BigInt reward;
final String signature;
final Stream<TransactionChunk> Function() chunkStreamGenerator;

TransactionResult({
required this.id,
required this.anchor,
required this.owner,
required this.tags,
required this.target,
required this.quantity,
required this.dataRoot,
required this.dataSize,
required this.reward,
required this.signature,
required this.chunkStreamGenerator,
});

Map<String, dynamic> toJson() {
return {
'format': 2,
'id': id,
'last_tx': anchor,
'owner': owner,
'tags': tags.map((tag) => tag.toJson()).toList(),
'target': target,
'quantity': quantity.toString(),
'data_root': dataRoot,
'data_size': dataSize.toString(),
'reward': reward.toString(),
'signature': signature,
};
}
}

class DataItemFile {
final int dataSize;
final DataStreamGenerator streamGenerator;
Expand All @@ -26,29 +74,87 @@ class DataItemFile {
});
}

typedef BundledDataItemResult = TaskEither<DataItemError, DataItemResult>;
// typedef TransactionTaskEither
// = TaskEither<StreamTransactionError, TransactionResult>;
// TransactionTaskEither createTransactionTaskEither({
// required final Wallet wallet,
// String? anchor,
// required final String target,
// List<Tag> tags = const [],
// required final BigInt quantity,
// BigInt? reward,
// required final DataStreamGenerator dataStreamGenerator,
// required final int dataSize,
// }) {
// if (anchor == null) {
// anchor = await getTransactionAnchor();
//
// }
//
// return getOwnerTaskEither(wallet).flatMap((owner) =>
// prepareChunksTaskEither(dataStreamGenerator).flatMap((chunksWithProofs) {
// final chunks = chunksWithProofs.chunks;
// final dataRoot = chunksWithProofs.dataRoot;
//
// return deepHashTaskEither([
// toStream(utf8.encode('2')), // Transaction format
// toStream(decodeBase64ToBytes(owner)),
// toStream(decodeBase64ToBytes(target)),
// toStream(utf8.encode(quantity.toString())),
// toStream(utf8.encode(reward.toString())),
// toStream(decodeBase64ToBytes(anchor)),
// toStream(serializeTags(tags: tags)),
// toStream(utf8.encode(dataSize.toString())),
// toStream(decodeBase64ToBytes(dataRoot)),
// ]).flatMap((signatureData) =>
// signDataItemTaskEither(wallet: wallet, signatureData: signatureData)
// .flatMap((signResult) {
// final transaction = TransactionResult(
// id: signResult.id,
// anchor: anchor,
// owner: owner,
// tags: tags,
// target: target,
// quantity: quantity,
// dataRoot: dataRoot,
// dataSize: dataSize,
// reward: reward,
// signature: encodeBytesToBase64(signResult.signature),
// chunkStreamGenerator: () => getChunks(
// dataStreamGenerator(),
// chunks,
// dataRoot,
// dataSize,
// ),
// );
//
// return TaskEither.of(transaction);
// }));
// }));
// }

typedef BundledDataItemResult
= TaskEither<StreamTransactionError, DataItemResult>;
BundledDataItemResult createBundledDataItemTaskEither({
required final Wallet wallet,
required final List<DataItemFile> dataItemFiles,
required final List<Tag> tags,
}) {
final List<DataItemTaskEither> dataItemTaskEitherList = [];
final List<DataItemResult> dataItemList = [];
final dataItemCount = dataItemFiles.length;
for (var i = 0; i < dataItemCount; i++) {
final dataItem = dataItemFiles[i];

final dataItemTaskEither = createDataItemTaskEither(
createDataItemTaskEither(
wallet: wallet,
dataStream: dataItem.streamGenerator,
dataStreamSize: dataItem.dataSize,
target: dataItem.target,
anchor: dataItem.anchor,
tags: dataItem.tags,
);
dataItemTaskEitherList.add(dataItemTaskEither);
).map((dataItem) => dataItemList.add(dataItem));
}

return createDataBundleTaskEither(dataItemTaskEitherList)
return createDataBundleTaskEither(TaskEither.of(dataItemList))
.flatMap((dataBundle) {
final dataBundleStream = dataBundle.stream;
final dataBundleSize = dataBundle.dataBundleStreamSize;
Expand All @@ -71,15 +177,15 @@ BundledDataItemResult createBundledDataItemTaskEither({
}

DataBundleTaskEither createDataBundleTaskEither(
final List<DataItemTaskEither> dataItemsTaskEither,
final TaskEither<StreamTransactionError, List<DataItemResult>> dataItems,
) {
return dataItemsTaskEither.sequenceTaskEitherSeq().flatMap((dataItems) {
final dataItemsLength = dataItems.length;
return dataItems.flatMap((dataItemResults) {
final dataItemsLength = dataItemResults.length;
final headers = Uint8List(dataItemsLength * 64);
int dataItemsSize = 0;

for (var i = 0; i < dataItemsLength; i++) {
final dataItem = dataItems[i];
final dataItem = dataItemResults[i];
final id = decodeBase64ToBytes(dataItem.id);
final dataItemLength = dataItem.dataItemSize;

Expand All @@ -104,7 +210,7 @@ DataBundleTaskEither createDataBundleTaskEither(

final bundleGenerator = combineStreamAndFunctionList(
Stream.fromIterable([bundleHeaders]),
dataItems.map((dataItem) => dataItem.streamGenerator).toList());
dataItemResults.map((dataItem) => dataItem.streamGenerator).toList());

return TaskEither.of(DataBundleResult(
dataBundleStreamSize: bundleHeaders.length + dataItemsSize,
Expand Down
5 changes: 3 additions & 2 deletions lib/src/streams/data_models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class SignDataItemResult {
}

typedef DataStreamGenerator = Stream<Uint8List> Function();
typedef DataItemTaskEither = TaskEither<DataItemError, DataItemResult>;
typedef DataItemTaskEither = TaskEither<StreamTransactionError, DataItemResult>;

class DataItemResult {
final String id;
Expand Down Expand Up @@ -53,7 +53,8 @@ class ProcessedDataItem {
});
}

typedef DataBundleTaskEither = TaskEither<DataItemError, DataBundleResult>;
typedef DataBundleTaskEither
= TaskEither<StreamTransactionError, DataBundleResult>;

class DataBundleResult {
final int dataBundleStreamSize;
Expand Down
32 changes: 15 additions & 17 deletions lib/src/streams/errors.dart
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
abstract class DataItemError implements Exception {}
abstract class StreamTransactionError implements Exception {}

class DataItemCreationError extends DataItemError {}
class DataItemCreationError extends StreamTransactionError {}

class InvalidTargetSizeError extends DataItemError {}
class InvalidTargetSizeError extends StreamTransactionError {}

class InvalidAnchorSizeError extends DataItemError {}
class InvalidAnchorSizeError extends StreamTransactionError {}

class DeepHashStreamError extends DataItemError {}
class DeepHashStreamError extends StreamTransactionError {}

class SignatureError extends DataItemError {}
class SignatureError extends StreamTransactionError {}

class GetWalletOwnerError extends DataItemError {}
class GetWalletOwnerError extends StreamTransactionError {}

class ProcessedDataItemHeadersError extends DataItemError {}
class ProcessedDataItemHeadersError extends StreamTransactionError {}

class DecodeBase64ToBytesError extends DataItemError {}
class DecodeBase64ToBytesError extends StreamTransactionError {}

class SerializeTagsError extends DataItemError {}
class SerializeTagsError extends StreamTransactionError {}

abstract class TransactionError implements Exception {}
class GenerateTransactionChunksError extends StreamTransactionError {}

class GenerateTransactionChunksError extends TransactionError {}
class PrepareChunksError extends StreamTransactionError {}

class PrepareChunksError extends TransactionError {}
class TransactionDeepHashError extends StreamTransactionError {}

class TransactionDeepHashError extends TransactionError {}
class TransactionSignatureError extends StreamTransactionError {}

class TransactionSignatureError extends TransactionError {}

class TransactionGetOwnerError extends TransactionError {}
class TransactionGetOwnerError extends StreamTransactionError {}
66 changes: 58 additions & 8 deletions lib/src/streams/utils.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import 'dart:async';
import 'dart:typed_data';

import 'package:arweave/src/api/api.dart';
import 'package:arweave/src/models/models.dart';
import 'package:arweave/src/streams/transaction.dart';
import 'package:async/async.dart';
import 'package:fpdart/fpdart.dart';

import '../crypto/crypto.dart';
Expand Down Expand Up @@ -77,15 +80,15 @@ Stream<Uint8List> Function() createByteRangeStream(
};
}

TaskEither<DataItemError, Uint8List> deepHashTaskEither(
TaskEither<StreamTransactionError, Uint8List> deepHashTaskEither(
final List<Stream<Uint8List>> inputs,
) {
return TaskEither.tryCatch(() async {
return await deepHashStream(inputs);
}, (error, _) => DeepHashStreamError());
}

TaskEither<DataItemError, SignDataItemResult> signDataItemTaskEither({
TaskEither<StreamTransactionError, SignDataItemResult> signDataItemTaskEither({
required final Wallet wallet,
required final Uint8List signatureData,
}) {
Expand All @@ -100,12 +103,13 @@ TaskEither<DataItemError, SignDataItemResult> signDataItemTaskEither({
}, (error, _) => SignatureError());
}

TaskEither<DataItemError, String> getOwnerTaskEither(final Wallet wallet) =>
TaskEither<StreamTransactionError, String> getOwnerTaskEither(
final Wallet wallet) =>
TaskEither.tryCatch(() async {
return await wallet.getOwner();
}, (error, _) => GetWalletOwnerError());

TaskEither<DataItemError, bool> verifySignatureTaskEither({
TaskEither<StreamTransactionError, bool> verifySignatureTaskEither({
required final Uint8List owner,
required final Uint8List signature,
required final Uint8List signatureData,
Expand Down Expand Up @@ -133,15 +137,15 @@ TaskEither<DataItemError, bool> verifySignatureTaskEither({
}, (error, _) => SignatureError());
}

TaskEither<DataItemError, Uint8List> decodeBase64ToBytesTaskEither(
TaskEither<StreamTransactionError, Uint8List> decodeBase64ToBytesTaskEither(
final String input,
) {
return TaskEither.tryCatch(() async {
return decodeBase64ToBytes(input);
}, (error, _) => DecodeBase64ToBytesError());
}

TaskEither<DataItemError, Uint8List> decodeAndCheckTargetTaskEither(
TaskEither<StreamTransactionError, Uint8List> decodeAndCheckTargetTaskEither(
final String target,
) {
return TaskEither.tryCatch(() async {
Expand All @@ -153,7 +157,7 @@ TaskEither<DataItemError, Uint8List> decodeAndCheckTargetTaskEither(
}, (error, _) => DecodeBase64ToBytesError());
}

TaskEither<DataItemError, Uint8List> decodeAndCheckAnchorTaskEither(
TaskEither<StreamTransactionError, Uint8List> decodeAndCheckAnchorTaskEither(
final String target,
) {
return TaskEither.tryCatch(() async {
Expand All @@ -165,10 +169,56 @@ TaskEither<DataItemError, Uint8List> decodeAndCheckAnchorTaskEither(
}, (error, _) => DecodeBase64ToBytesError());
}

TaskEither<DataItemError, Uint8List> serializeTagsTaskEither(
TaskEither<StreamTransactionError, Uint8List> serializeTagsTaskEither(
final List<Tag> tags,
) {
return TaskEither.tryCatch(() async {
return serializeTags(tags: tags);
}, (error, _) => SerializeTagsError());
}

TaskEither<StreamTransactionError, PrepareChunksResult> prepareChunksTaskEither(
final DataStreamGenerator dataStreamGenerator,
) {
return generateTransactionChunksTaskEither(dataStreamGenerator)
.flatMap((chunks) {
final dataRoot = encodeBytesToBase64(chunks.dataRoot);
return TaskEither.of(PrepareChunksResult(
chunks: chunks,
dataRoot: dataRoot,
));
});
}

TaskEither<StreamTransactionError, TransactionChunksWithProofs>
generateTransactionChunksTaskEither(
DataStreamGenerator dataStreamGenerator) =>
TaskEither.tryCatch(
() async => await generateTransactionChunksFromStream(
dataStreamGenerator()),
(e, s) => GenerateTransactionChunksError());

Stream<TransactionChunk> getChunks(Stream<Uint8List> dataStream,
TransactionChunksWithProofs chunks, String dataRoot, int dataSize) async* {
final chunker = ChunkedStreamReader(dataStream);

for (var i = 0; i < chunks.chunks.length; i++) {
final chunk = chunks.chunks[i];
final proof = chunks.proofs[i];

final chunkSize = chunk.maxByteRange - chunk.minByteRange;
final chunkData = await chunker.readBytes(chunkSize);

yield TransactionChunk(
dataRoot: dataRoot,
dataSize: dataSize.toString(),
dataPath: encodeBytesToBase64(proof.proof),
offset: proof.offset.toString(),
chunk: encodeBytesToBase64(chunkData),
);
}
await chunker.cancel();
}

Future<String> getTransactionAnchor() =>
ArweaveApi().get('tx_anchor').then((res) => res.body);

0 comments on commit a8d8e68

Please sign in to comment.