Skip to content

Commit

Permalink
fet(uploader)
Browse files Browse the repository at this point in the history
fixes bugs on updating the progress
  • Loading branch information
thiagocarvalhodev committed Sep 26, 2023
1 parent c055c13 commit f97d37a
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 128 deletions.
123 changes: 71 additions & 52 deletions lib/blocs/upload/upload_cubit.dart
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,24 @@ class UploadCubit extends Cubit<UploadState> {
driveKey: driveKey,
);

List<UploadTask> completedTasks = [];

uploadController.onProgressChange((progress) {
final newCompletedTasks = progress.task.where(
(element) =>
element.status == UploadStatus.complete &&
!completedTasks.contains(element),
);

for (var element in newCompletedTasks) {
completedTasks.add(element);
// TODO: Save as the file is finished the upload
// _saveEntityOnDB(element);
for (var metadata in element.content!) {
logger.d(metadata.metadataTxId ?? 'METADATA IS NULL');
}
}

emit(
UploadInProgressUsingNewUploader(
progress: progress,
Expand All @@ -587,62 +604,64 @@ class UploadCubit extends Cubit<UploadState> {
});

uploadController.onDone((tasks) async {
logger.d(tasks.toString());
unawaited(_profileCubit.refreshBalance());
// Single file only
// TODO: abstract to the database interface.
// TODO: improve API for finishing a file upload.
for (var task in tasks) {
final metadatas = task.content;

if (metadatas != null) {
for (var metadata in metadatas) {
if (metadata is ARFSFileUploadMetadata) {
final fileMetadata = metadata;

final revisionAction =
conflictingFiles.containsKey(fileMetadata.name)
? RevisionAction.uploadNewVersion
: RevisionAction.create;

final entity = FileEntity(
dataContentType: fileMetadata.dataContentType,
dataTxId: fileMetadata.dataTxId,
driveId: fileMetadata.driveId,
id: fileMetadata.id,
lastModifiedDate: fileMetadata.lastModifiedDate,
name: fileMetadata.name,
parentFolderId: fileMetadata.parentFolderId,
size: fileMetadata.size,
// TODO: pinnedDataOwnerAddress
);

if (fileMetadata.metadataTxId == null) {
logger.e('Metadata tx id is null');
throw Exception('Metadata tx id is null');
}

entity.txId = fileMetadata.metadataTxId!;

await _driveDao.transaction(() async {
// If path is a blob from drag and drop, use file name. Else use the path field from folder upload
// TODO: Changed this logic. PLEASE REVIEW IT.
final filePath = '${_targetFolder.path}/${metadata.name}';
await _driveDao.writeFileEntity(entity, filePath);
await _driveDao.insertFileRevision(
entity.toRevisionCompanion(
performedAction: revisionAction,
),
);
});
}

// all files are uploaded
emit(UploadComplete());
unawaited(_saveEntityOnDB(task));
}
});
}

Future _saveEntityOnDB(UploadTask task) async {
unawaited(_profileCubit.refreshBalance());
// Single file only
// TODO: abstract to the database interface.
// TODO: improve API for finishing a file upload.
final metadatas = task.content;

if (metadatas != null) {
for (var metadata in metadatas) {
if (metadata is ARFSFileUploadMetadata) {
final fileMetadata = metadata;

final revisionAction = conflictingFiles.containsKey(fileMetadata.name)
? RevisionAction.uploadNewVersion
: RevisionAction.create;

final entity = FileEntity(
dataContentType: fileMetadata.dataContentType,
dataTxId: fileMetadata.dataTxId,
driveId: fileMetadata.driveId,
id: fileMetadata.id,
lastModifiedDate: fileMetadata.lastModifiedDate,
name: fileMetadata.name,
parentFolderId: fileMetadata.parentFolderId,
size: fileMetadata.size,
// TODO: pinnedDataOwnerAddress
);

if (fileMetadata.metadataTxId == null) {
logger.e('Metadata tx id is null');
throw Exception('Metadata tx id is null');
}

entity.txId = fileMetadata.metadataTxId!;

await _driveDao.transaction(() async {
// If path is a blob from drag and drop, use file name. Else use the path field from folder upload
// TODO: Changed this logic. PLEASE REVIEW IT.
final filePath = '${_targetFolder.path}/${metadata.name}';
await _driveDao.writeFileEntity(entity, filePath);
await _driveDao.insertFileRevision(
entity.toRevisionCompanion(
performedAction: revisionAction,
),
);
});
}

// all files are uploaded
emit(UploadComplete());
}
});
}
}

Future<void> skipLargeFilesAndCheckForConflicts() async {
Expand Down
2 changes: 1 addition & 1 deletion lib/components/upload_form.dart
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ class _UploadFormState extends State<UploadForm> {
break;
}

if (progress.totalSize > 0 && task.dataItem != null) {
if (task.isProgressAvailable && task.dataItem != null) {
progressText =
'${filesize(((task.dataItem!.dataItemSize) * task.progress).ceil())}/${filesize(task.dataItem!.dataItemSize)}';
} else {
Expand Down
102 changes: 54 additions & 48 deletions packages/ardrive_uploader/lib/src/ardrive_uploader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -294,58 +294,75 @@ class _ArDriveUploader implements ArDriveUploader {
required UploadController controller,
}) async {
List<Future<void>> activeUploads = [];
int totalSize = 0; // size accumulator
List<ARFSUploadMetadata> contents = [];
List<UploadTask> tasks = [];
int totalSize = 0;

for (var f in files) {
final metadata = await _metadataGenerator.generateMetadata(
f.$2,
f.$1,
);

final uploadTask = UploadTask(
status: UploadStatus.notStarted,
content: [metadata],
);

tasks.add(uploadTask);

controller.updateProgress(task: uploadTask);

contents.add(metadata);
}

for (int i = 0; i < files.length; i++) {
int fileSize = await files[i].$2.length;

if (fileSize >= 500 * 1024 * 1024) {
// File size is >= 500MiB

// Wait for ongoing uploads to complete
await Future.wait(activeUploads);
totalSize = 0; // Reset the accumulator
activeUploads.clear(); // Clear the active uploads
} else {
while (activeUploads.length >= 25 ||
totalSize + fileSize >= 500 * 1024 * 1024) {
await Future.any(activeUploads); // Wait for at least one to complete
// Recalculate totalSize and remove completed futures
totalSize = 0;
var remainingFutures = <Future<void>>[];
for (var future in activeUploads) {
remainingFutures.add(future.whenComplete(() {
totalSize -= fileSize; // Subtract the size of the completed file
remainingFutures.remove(future);
}));
// Add the size of the corresponding file to totalSize
// This part will depend on how you keep track of file sizes
}
activeUploads = remainingFutures;
while (activeUploads.length >= 25 ||
totalSize + fileSize >= 500 * 1024 * 1024) {
await Future.any(activeUploads);

// Remove completed uploads and update totalSize
int recalculatedSize = 0;
List<Future<void>> ongoingUploads = [];

for (var f in activeUploads) {
// You need to figure out how to get the file size for the ongoing upload here
// Add its size to recalculatedSize
int ongoingFileSize = await files[i].$2.length;
recalculatedSize += ongoingFileSize;

ongoingUploads.add(f);
}
totalSize += fileSize; // Add to the accumulator

activeUploads = ongoingUploads;
totalSize = recalculatedSize;
}

// Existing logic to start upload
late Future<void> uploadFuture;
totalSize += fileSize;

uploadFuture = _uploadSingleFile(
Future<void> uploadFuture = _uploadSingleFile(
file: files[i].$2,
uploadController: controller,
wallet: wallet,
driveKey: driveKey,
uploadController: controller,
uploadTask: UploadTask(
status: UploadStatus.notStarted,
),
args: files[i].$1,
).then((value) {
// activeUploads.remove(uploadFuture);
metadata: contents[i],
uploadTask: tasks[i],
);

uploadFuture.then((_) {
activeUploads.remove(uploadFuture);
totalSize -= fileSize;
}).catchError((error) {
activeUploads.remove(uploadFuture);
totalSize -= fileSize;
// TODO: Handle error
});

activeUploads.add(uploadFuture);
}

// Wait for any remaining uploads to complete
await Future.wait(activeUploads);
}

Expand All @@ -355,19 +372,8 @@ class _ArDriveUploader implements ArDriveUploader {
required Wallet wallet,
SecretKey? driveKey,
required UploadTask uploadTask,
ARFSUploadMetadataArgs? args,
required ARFSUploadMetadata metadata,
}) async {
final metadata = await _metadataGenerator.generateMetadata(
file,
args,
);

// adds the metadata of the file to the upload task
uploadTask = uploadTask.copyWith(
content: [metadata],
status: UploadStatus.bundling,
);

final createDataBundle = _dataBundler.createDataBundle(
file: file,
metadata: metadata,
Expand Down
14 changes: 11 additions & 3 deletions packages/ardrive_uploader/lib/src/streamed_upload.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'package:ardrive_uploader/ardrive_uploader.dart';
import 'package:ardrive_uploader/src/turbo_upload_service_base.dart';
import 'package:ardrive_utils/ardrive_utils.dart';
import 'package:arweave/arweave.dart';
import 'package:flutter/foundation.dart';
import 'package:uuid/uuid.dart';

abstract class StreamedUpload<T, R> {
Expand Down Expand Up @@ -47,12 +48,14 @@ class TurboStreamedUpload implements StreamedUpload<DataItemResult, dynamic> {
},
);

print(
'Sending request to turbo. Is possible get progress: ${controller.isPossibleGetProgress}');

handle = handle.copyWith(status: UploadStatus.inProgress);
controller.updateProgress(task: handle);

if (kIsWeb && handle.dataItem!.dataItemSize > 1024 * 1024 * 500) {
handle.isProgressAvailable = false;
controller.updateProgress(task: handle);
}

// TODO: set if its possible to get the progress. Check the turbo web impl

// gets the streamed request
Expand All @@ -78,6 +81,11 @@ class TurboStreamedUpload implements StreamedUpload<DataItemResult, dynamic> {
.then((value) async {
print('Turbo response: ${value.statusCode}');

if (!handle.isProgressAvailable) {
print('Progress is not available, setting to 1');
handle.progress = 1;
}

controller.updateProgress(
task: handle,
);
Expand Down
43 changes: 19 additions & 24 deletions packages/ardrive_uploader/lib/src/upload_controller.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ abstract class UploadController {
});
void onProgressChange(Function(UploadProgress progress) callback);

bool get isPossibleGetProgress;
set isPossibleGetProgress(bool value);

factory UploadController(
StreamController<UploadProgress> progressStream,
) {
Expand Down Expand Up @@ -210,31 +207,29 @@ class _UploadController implements UploadController {
print('Upload Finished');
};

@override
bool get isPossibleGetProgress => _isPossibleGetProgress;

@override
set isPossibleGetProgress(bool value) => _isPossibleGetProgress = value;

bool _isPossibleGetProgress = true;

@override
final List<UploadTask> tasks = [];

// CALCULATE BASED ON TOTAL SIZE NOT ONLY ON THE NUMBER OF TASKS
double calculateTotalProgress(List<UploadTask> tasks) {
double totalProgress = 0.0;

for (var task in tasks) {
if (task.dataItem == null) {
continue;
}

if (task.isProgressAvailable) {
totalProgress += (task.progress * task.dataItem!.dataItemSize);
}
}

return (totalSize(tasks) == 0) ? 0.0 : totalProgress / totalSize(tasks);
// double totalProgress = 0.0;

// for (var task in tasks) {
// if (task.dataItem == null) {
// totalProgress += 0;
// continue;
// }

// if (task.isProgressAvailable) {
// totalProgress += (task.progress * task.dataItem!.dataItemSize);
// }
// }

// return (totalSize(tasks) == 0) ? 0.0 : totalProgress / totalSize(tasks);
return tasks
.map((e) => e.progress)
.reduce((value, element) => value + element) /
tasks.length;
}

int totalSize(List<UploadTask> tasks) {
Expand Down

0 comments on commit f97d37a

Please sign in to comment.