diff --git a/lib/io.dart b/lib/io.dart index 2a278f3ce550844e9e7e229f79c6faf9dc983aba..39dbcd731e91302c98983afd7863bec664e748b9 100644 --- a/lib/io.dart +++ b/lib/io.dart @@ -1,4 +1,5 @@ import 'dart:io'; +import 'dart:typed_data'; import 'package:minio/src/minio.dart'; import 'package:minio/src/minio_errors.dart'; @@ -31,7 +32,7 @@ extension MinioX on Minio { return putObject( bucket, object, - file.openRead(), + file.openRead().cast<Uint8List>(), size: stat.size, metadata: metadata, ); diff --git a/lib/src/minio.dart b/lib/src/minio.dart index b3a7298233b5f4378a5053d0bed14e17d3da07eb..f5b4a5ce8ca0c2700bc167406f355b886c7f4f63 100644 --- a/lib/src/minio.dart +++ b/lib/src/minio.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; import 'package:minio/models.dart'; import 'package:minio/src/minio_client.dart'; @@ -911,7 +912,7 @@ class Minio { Future<String> putObject( String bucket, String object, - Stream<List<int>> data, { + Stream<Uint8List> data, { int? size, int? chunkSize, Map<String, String>? metadata, diff --git a/lib/src/minio_client.dart b/lib/src/minio_client.dart index f47be2c904af7884517aaee8757021e66120a0e7..40ca29a783bdbe22c8a8b8fd7d1de92927581e8b 100644 --- a/lib/src/minio_client.dart +++ b/lib/src/minio_client.dart @@ -163,6 +163,7 @@ class MinioClient { dynamic payload = '', Map<String, dynamic>? queries, Map<String, String>? headers, + void Function(int)? onProgress, }) async { final stream = await _request( method: method, @@ -173,6 +174,7 @@ class MinioClient { resource: resource, queries: queries, headers: headers, + onProgress: onProgress, ); final response = await MinioResponse.fromStream(stream); diff --git a/lib/src/minio_uploader.dart b/lib/src/minio_uploader.dart index d554939a6605a78939e23dc5e982ab5d98a21862..ae74afa76498ac74a3a17c8d76d96a6d80696a4d 100644 --- a/lib/src/minio_uploader.dart +++ b/lib/src/minio_uploader.dart @@ -1,5 +1,6 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; import 'package:convert/convert.dart'; import 'package:crypto/crypto.dart'; @@ -9,7 +10,7 @@ import 'package:minio/src/minio_client.dart'; import 'package:minio/src/minio_helpers.dart'; import 'package:minio/src/utils.dart'; -class MinioUploader implements StreamConsumer<List<int>> { +class MinioUploader implements StreamConsumer<Uint8List> { MinioUploader( this.minio, this.client, @@ -39,8 +40,11 @@ class MinioUploader implements StreamConsumer<List<int>> { String? _uploadId; + // The number of bytes uploaded of the current part. + int? bytesUploaded; + @override - Future addStream(Stream<List<int>> stream) async { + Future addStream(Stream<Uint8List> stream) async { await for (var chunk in stream) { List<int>? md5digest; final headers = <String, String>{}; @@ -53,7 +57,6 @@ class MinioUploader implements StreamConsumer<List<int>> { if (_partNumber == 1 && chunk.length < partSize) { _etag = await _uploadChunk(chunk, headers, null); - onProgress?.call(chunk.length); return; } @@ -83,7 +86,6 @@ class MinioUploader implements StreamConsumer<List<int>> { final etag = await _uploadChunk(chunk, headers, queries); final part = CompletedPart(etag, partNumber); _parts[part] = chunk.length; - _reportMultipartUploadProgress(); } } @@ -106,7 +108,7 @@ class MinioUploader implements StreamConsumer<List<int>> { } Future<String?> _uploadChunk( - List<int> chunk, + Uint8List chunk, Map<String, String> headers, Map<String, String?>? queries, ) async { @@ -116,7 +118,8 @@ class MinioUploader implements StreamConsumer<List<int>> { queries: queries, bucket: bucket, object: object, - payload: chunk, + payload: Stream.value(chunk).transform(BlockStream(1 << 16)), + onProgress: _updateProgress, ); validate(resp); @@ -146,10 +149,22 @@ class MinioUploader implements StreamConsumer<List<int>> { _oldParts = Map.fromEntries(entries); } - void _reportMultipartUploadProgress() { - if (onProgress != null) { - final bytes = _parts.values.reduce((a, b) => a + b); - onProgress!(bytes); + void _updateProgress(int bytesUploaded) { + this.bytesUploaded = bytesUploaded; + _reportUploadProgress(); + } + + void _reportUploadProgress() { + if (onProgress == null || bytesUploaded == null) { + return; + } + + var totalBytesUploaded = bytesUploaded!; + + for (var part in _parts.keys) { + totalBytesUploaded += _parts[part]!; } + + onProgress!(totalBytesUploaded); } } diff --git a/lib/src/utils.dart b/lib/src/utils.dart index 67fdde0a034b4aa6020ac22491cf68a833728dee..cba532bbc5118e56e013ecad3e3f13f57fec3913 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -1,8 +1,8 @@ import 'dart:async'; import 'dart:convert'; import 'dart:math' as math; +import 'dart:typed_data'; -import 'package:buffer/buffer.dart'; import 'package:convert/convert.dart'; import 'package:crypto/crypto.dart'; import 'package:intl/intl.dart'; @@ -63,26 +63,29 @@ String encodeQueries(Map<String, dynamic> queries) { return pairs.join('&'); } -class BlockStream extends StreamTransformerBase<List<int>, List<int>> { +class BlockStream extends StreamTransformerBase<Uint8List, Uint8List> { BlockStream(this.size); final int size; @override - Stream<List<int>> bind(Stream<List<int>> stream) async* { - var buffer = BytesBuffer(); - + Stream<Uint8List> bind(Stream<Uint8List> stream) async* { await for (var chunk in stream) { - buffer.add(chunk); - if (buffer.length >= size) { - final block = buffer.toBytes(); - yield block.sublist(0, size); - buffer = BytesBuffer(); - buffer.add(block.sublist(size)); + if (chunk.length < size) { + yield chunk; + continue; } - } - yield buffer.toBytes(); + final blocks = chunk.length ~/ size; + + for (var i = 0; i < blocks; i++) { + yield Uint8List.sublistView(chunk, i * size, (i + 1) * size); + } + + if (blocks * size < chunk.length) { + yield Uint8List.sublistView(chunk, blocks * size); + } + } } } diff --git a/test/minio_stream_test.dart b/test/minio_stream_test.dart index fbc031c2665dca77c60fe6c36502351220a0ca8b..8456f1de80d59d83977867821d503f58ba231020 100644 --- a/test/minio_stream_test.dart +++ b/test/minio_stream_test.dart @@ -1,3 +1,5 @@ +import 'dart:typed_data'; + import 'package:test/test.dart'; import 'helpers.dart'; @@ -6,7 +8,7 @@ void main() { group('MinioByteStream', () { final bucketName = uniqueName(); final objectName = 'content-length-test'; - final testData = [1, 2, 3, 4, 5]; + final testData = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { final minio = getMinioClient(); diff --git a/test/minio_test.dart b/test/minio_test.dart index c1eab81c499712fc0589b49241b4f54b92a2cdbb..550e42c4862aaf548ba57d35fb0d902c9a73de9e 100644 --- a/test/minio_test.dart +++ b/test/minio_test.dart @@ -510,7 +510,7 @@ void testStatObject() { final minio = getMinioClient(); final bucketName = uniqueName(); final objectName = uniqueName(); - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucketName); @@ -591,7 +591,7 @@ void testRemoveObject() { final minio = getMinioClient(); final bucketName = uniqueName(); final objectName = uniqueName(); - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucketName); @@ -628,7 +628,7 @@ void testListObjects() { final minio = getMinioClient(); final bucketName = uniqueName(); final objectName = uniqueName(); - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucketName); @@ -657,7 +657,7 @@ void testListObjects() { final minio = getMinioClient(); final bucket = uniqueName(); final object = 'new folder/new file.txt'; - final data = [1, 2, 3, 4, 5]; + final data = Uint8List.fromList([1, 2, 3, 4, 5]); setUpAll(() async { await minio.makeBucket(bucket);