From ecb4dfc3c49e977fe6c39ad38124fc0119d96dc2 Mon Sep 17 00:00:00 2001 From: xuty <xty50337@hotmail.com> Date: Tue, 4 Jan 2022 17:50:34 +0800 Subject: [PATCH] Improve upload progress granularity --- lib/io.dart | 3 ++- lib/src/minio.dart | 3 ++- lib/src/minio_client.dart | 2 ++ lib/src/minio_uploader.dart | 35 +++++++++++++++++++++++++---------- lib/src/utils.dart | 29 ++++++++++++++++------------- test/minio_stream_test.dart | 4 +++- test/minio_test.dart | 8 ++++---- 7 files changed, 54 insertions(+), 30 deletions(-) diff --git a/lib/io.dart b/lib/io.dart index 2a278f3..39dbcd7 100644 --- a/lib/io.dart +++ b/lib/io.dart @@ -1,4 +1,5 @@ import 'dart:io'; +import 'dart:typed_data'; import 'package:minio/src/minio.dart'; import 'package:minio/src/minio_errors.dart'; @@ -31,7 +32,7 @@ extension MinioX on Minio { return putObject( bucket, object, - file.openRead(), + file.openRead().cast<Uint8List>(), size: stat.size, metadata: metadata, ); diff --git a/lib/src/minio.dart b/lib/src/minio.dart index b3a7298..f5b4a5c 100644 --- a/lib/src/minio.dart +++ b/lib/src/minio.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; import 'package:minio/models.dart'; import 'package:minio/src/minio_client.dart'; @@ -911,7 +912,7 @@ class Minio { Future<String> putObject( String bucket, String object, - Stream<List<int>> data, { + Stream<Uint8List> data, { int? size, int? chunkSize, Map<String, String>? metadata, diff --git a/lib/src/minio_client.dart b/lib/src/minio_client.dart index f47be2c..40ca29a 100644 --- a/lib/src/minio_client.dart +++ b/lib/src/minio_client.dart @@ -163,6 +163,7 @@ class MinioClient { dynamic payload = '', Map<String, dynamic>? queries, Map<String, String>? headers, + void Function(int)? onProgress, }) async { final stream = await _request( method: method, @@ -173,6 +174,7 @@ class MinioClient { resource: resource, queries: queries, headers: headers, + onProgress: onProgress, ); final response = await MinioResponse.fromStream(stream); diff --git a/lib/src/minio_uploader.dart b/lib/src/minio_uploader.dart index d554939..ae74afa 100644 --- a/lib/src/minio_uploader.dart +++ b/lib/src/minio_uploader.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; import 'package:convert/convert.dart'; import 'package:crypto/crypto.dart'; @@ -9,7 +10,7 @@ import 'package:minio/src/minio_client.dart'; import 'package:minio/src/minio_helpers.dart'; import 'package:minio/src/utils.dart'; -class MinioUploader implements StreamConsumer<List<int>> { +class MinioUploader implements StreamConsumer<Uint8List> { MinioUploader( this.minio, this.client, @@ -39,8 +40,11 @@ class MinioUploader implements StreamConsumer<List<int>> { String? _uploadId; + // The number of bytes uploaded of the current part. + int? bytesUploaded; + @override - Future addStream(Stream<List<int>> stream) async { + Future addStream(Stream<Uint8List> stream) async { await for (var chunk in stream) { List<int>? md5digest; final headers = <String, String>{}; @@ -53,7 +57,6 @@ class MinioUploader implements StreamConsumer<List<int>> { if (_partNumber == 1 && chunk.length < partSize) { _etag = await _uploadChunk(chunk, headers, null); - onProgress?.call(chunk.length); return; } @@ -83,7 +86,6 @@ class MinioUploader implements StreamConsumer<List<int>> { final etag = await _uploadChunk(chunk, headers, queries); final part = CompletedPart(etag, partNumber); _parts[part] = chunk.length; - _reportMultipartUploadProgress(); } } @@ -106,7 +108,7 @@ class MinioUploader implements StreamConsumer<List<int>> { } Future<String?> _uploadChunk( - List<int> chunk, + Uint8List chunk, Map<String, String> headers, Map<String, String?>? queries, ) async { @@ -116,7 +118,8 @@ class MinioUploader implements StreamConsumer<List<int>> { queries: queries, bucket: bucket, object: object, - payload: chunk, + payload: Stream.value(chunk).transform(BlockStream(1 << 16)), + onProgress: _updateProgress, ); validate(resp); @@ -146,10 +149,22 @@ class MinioUploader implements StreamConsumer<List<int>> { _oldParts = Map.fromEntries(entries); } - void _reportMultipartUploadProgress() { - if (onProgress != null) { - final bytes = _parts.values.reduce((a, b) => a + b); - onProgress!(bytes); + void _updateProgress(int bytesUploaded) { + this.bytesUploaded = bytesUploaded; + _reportUploadProgress(); + } + + void _reportUploadProgress() { + if (onProgress == null || bytesUploaded == null) { + return; + } + + var totalBytesUploaded = bytesUploaded!; + + for (var part in _parts.keys) { + totalBytesUploaded += _parts[part]!; } + + onProgress!(totalBytesUploaded); } } diff --git a/lib/src/utils.dart b/lib/src/utils.dart index 67fdde0..cba532b 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -1,8 +1,8 @@ import 'dart:async'; import 'dart:convert'; import 'dart:math' as math; +import 'dart:typed_data'; -import 'package:buffer/buffer.dart'; import 'package:convert/convert.dart'; import 'package:crypto/crypto.dart'; import 'package:intl/intl.dart'; @@ -63,26 +63,29 @@ String encodeQueries(Map<String, dynamic> queries) { return pairs.join('&'); } -class BlockStream extends StreamTransformerBase<List<int>, List<int>> { +class BlockStream extends StreamTransformerBase<Uint8List, Uint8List> { BlockStream(this.size); final int size; @override - Stream<List<int>> bind(Stream<List<int>> stream) async* { - var buffer = BytesBuffer(); - + Stream<Uint8List> bind(Stream<Uint8List> stream) async* { await for (var chunk in stream) { - buffer.add(chunk); - if (buffer.length >= size) { - final block = buffer.toBytes(); - yield block.sublist(0, size); - buffer = BytesBuffer(); - buffer.add(block.sublist(size)); + if (chunk.length < size) { + yield chunk; + continue; } - } - yield buffer.toBytes(); + final blocks = chunk.length ~/ size; + + for (var i = 0; i < blocks; i++) { + yield Uint8List.sublistView(chunk, i * size, (i + 1) * size); + } + + if (blocks * size < chunk.length) { + yield Uint8List.sublistView(chunk, blocks * size); + } + } } } diff --git a/test/minio_stream_test.dart b/test/minio_stream_test.dart index fbc031c..8456f1d 100644 --- a/test/minio_stream_test.dart +++ b/test/minio_stream_test.dart @@ -1,3 +1,5 @@ +import 'dart:typed_data'; + import 'package:test/test.dart'; import 'helpers.dart'; @@ -6,7 +8,7 @@ void main() { group('MinioByteStream', () { final bucketName = uniqueName(); final objectName = 'content-length-test'; - final testData = [1, 2, 3, 4, 5]; + final testData = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { final minio = getMinioClient(); diff --git a/test/minio_test.dart b/test/minio_test.dart index c1eab81..550e42c 100644 --- a/test/minio_test.dart +++ b/test/minio_test.dart @@ -510,7 +510,7 @@ void testStatObject() { final minio = getMinioClient(); final bucketName = uniqueName(); final objectName = uniqueName(); - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucketName); @@ -591,7 +591,7 @@ void testRemoveObject() { final minio = getMinioClient(); final bucketName = uniqueName(); final objectName = uniqueName(); - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucketName); @@ -628,7 +628,7 @@ void testListObjects() { final minio = getMinioClient(); final bucketName = uniqueName(); final objectName = uniqueName(); - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucketName); @@ -657,7 +657,7 @@ void testListObjects() { final minio = getMinioClient(); final bucket = uniqueName(); final object = 'new folder/new file.txt'; - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucket); -- GitLab