diff --git a/lib/src/minio.dart b/lib/src/minio.dart index 5884ace9bc87b13326fb7ae106c73238739acce2..b3a7298233b5f4378a5053d0bed14e17d3da07eb 100644 --- a/lib/src/minio.dart +++ b/lib/src/minio.dart @@ -277,7 +277,7 @@ class Minio { return json.decode(resp.body); } - /// gets the region of the bucket + /// Gets the region of [bucket]. The region is cached for subsequent calls. Future<String> getBucketRegion(String bucket) async { MinioInvalidBucketNameError.check(bucket); @@ -841,7 +841,7 @@ class Minio { postPolicy.formData['x-amz-signature'] = signature; final url = _client .getBaseRequest('POST', postPolicy.formData['bucket'], null, region, - null, null, null) + null, null, null, null) .url; var portStr = (port == 80 || port == 443) ? '' : ':$port'; var urlStr = '${url.scheme}://${url.host}$portStr${url.path}'; @@ -902,29 +902,36 @@ class Minio { resource, reqParams, {}, + null, ); return presignSignatureV4(this, request, region, requestDate, expires); } - /// Uploads the object. + /// Uploads the object. Returns the ETag of the uploaded object. Future<String> putObject( String bucket, String object, Stream<List<int>> data, { int? size, + int? chunkSize, Map<String, String>? metadata, + void Function(int)? onProgress, }) async { MinioInvalidBucketNameError.check(bucket); MinioInvalidObjectNameError.check(object); - assert(size == null || size >= 0); + if (size != null && size < 0) { + throw MinioInvalidArgumentError('invalid size value: $size'); + } + + if (chunkSize != null && chunkSize < 5 * 1024 * 1024) { + throw MinioInvalidArgumentError('Minimum chunk size is 5MB'); + } metadata = prependXAMZMeta(metadata ?? <String, String>{}); - size ??= maxObjectSize; - final partSize = _calculatePartSize(size); + final partSize = chunkSize ?? _calculatePartSize(size ?? maxObjectSize); - final chunker = BlockStream(partSize); final uploader = MinioUploader( this, _client, @@ -932,7 +939,9 @@ class Minio { object, partSize, metadata, + onProgress, ); + final chunker = BlockStream(partSize); final etag = await data.transform(chunker).pipe(uploader); return etag.toString(); } diff --git a/lib/src/minio_client.dart b/lib/src/minio_client.dart index 4d70696338d59e20eb859f18e0a7d1d391dd82cd..f47be2c904af7884517aaee8757021e66120a0e7 100644 --- a/lib/src/minio_client.dart +++ b/lib/src/minio_client.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; @@ -9,26 +10,48 @@ import 'package:minio/src/minio_sign.dart'; import 'package:minio/src/utils.dart'; class MinioRequest extends BaseRequest { - MinioRequest(String method, Uri url) : super(method, url); + MinioRequest(String method, Uri url, {this.onProgress}) : super(method, url); dynamic body; + final void Function(int)? onProgress; + @override ByteStream finalize() { super.finalize(); + + late final ByteStream byteStream; + if (body is String) { final data = utf8.encode(body); headers['content-length'] = data.length.toString(); - return ByteStream.fromBytes(utf8.encode(body)); - } - if (body is List<int>) { + byteStream = ByteStream.fromBytes(utf8.encode(body)); + } else if (body is List<int>) { headers['content-length'] = body.length.toString(); - return ByteStream.fromBytes(body); + byteStream = ByteStream.fromBytes(body); + } else if (body is Stream<List<int>>) { + byteStream = ByteStream(body); + } else { + throw UnsupportedError('unsupported body type: ${body.runtimeType}'); } - if (body is Stream<List<int>>) { - return ByteStream(body); + + if (onProgress == null) { + return byteStream; } - throw UnsupportedError('unsupported body type: ${body.runtimeType}'); + + var bytesRead = 0; + + return ByteStream( + byteStream.transform( + StreamTransformer.fromHandlers( + handleData: (data, sink) { + sink.add(data); + bytesRead += data.length; + onProgress!(bytesRead); + }, + ), + ), + ); } MinioRequest replace({ @@ -103,6 +126,7 @@ class MinioClient { dynamic payload = '', Map<String, dynamic>? queries, Map<String, String>? headers, + void Function(int)? onProgress, }) async { if (bucket != null) { region ??= await minio.getBucketRegion(bucket); @@ -111,7 +135,7 @@ class MinioClient { region ??= 'us-east-1'; final request = getBaseRequest( - method, bucket, object, region, resource, queries, headers); + method, bucket, object, region, resource, queries, headers, onProgress); request.body = payload; final date = DateTime.now().toUtc(); @@ -190,9 +214,10 @@ class MinioClient { String? resource, Map<String, dynamic>? queries, Map<String, String>? headers, + void Function(int)? onProgress, ) { final url = getRequestUrl(bucket, object, resource, queries); - final request = MinioRequest(method, url); + final request = MinioRequest(method, url, onProgress: onProgress); request.headers['host'] = url.authority; if (headers != null) { diff --git a/lib/src/minio_uploader.dart b/lib/src/minio_uploader.dart index 66a1c4285aee812b8422f54ae3186f7d68db8bf1..d554939a6605a78939e23dc5e982ab5d98a21862 100644 --- a/lib/src/minio_uploader.dart +++ b/lib/src/minio_uploader.dart @@ -17,6 +17,7 @@ class MinioUploader implements StreamConsumer<List<int>> { this.object, this.partSize, this.metadata, + this.onProgress, ); final Minio minio; @@ -25,12 +26,18 @@ class MinioUploader implements StreamConsumer<List<int>> { final String object; final int partSize; final Map<String, String> metadata; + final void Function(int)? onProgress; - var partNumber = 1; - String? etag; - List<CompletedPart> parts = []; - Map<int?, Part>? oldParts; - String? uploadId; + var _partNumber = 1; + + String? _etag; + + // Complete object upload, value is the length of the part. + final _parts = <CompletedPart, int>{}; + + Map<int?, Part>? _oldParts; + + String? _uploadId; @override Future addStream(Stream<List<int>> stream) async { @@ -44,24 +51,25 @@ class MinioUploader implements StreamConsumer<List<int>> { headers['Content-MD5'] = base64.encode(md5digest); } - if (this.partNumber == 1 && chunk.length < partSize) { - this.etag = await upload(chunk, headers, null); + if (_partNumber == 1 && chunk.length < partSize) { + _etag = await _uploadChunk(chunk, headers, null); + onProgress?.call(chunk.length); return; } - if (uploadId == null) { - await initMultipartUpload(); + if (_uploadId == null) { + await _initMultipartUpload(); } - final partNumber = this.partNumber++; + final partNumber = _partNumber++; - if (oldParts != null) { - final oldPart = oldParts![partNumber]; + if (_oldParts != null) { + final oldPart = _oldParts![partNumber]; if (oldPart != null) { md5digest ??= md5.convert(chunk).bytes; if (hex.encode(md5digest) == oldPart.eTag) { final part = CompletedPart(oldPart.eTag, partNumber); - parts.add(part); + _parts[part] = oldPart.size!; continue; } } @@ -69,19 +77,21 @@ class MinioUploader implements StreamConsumer<List<int>> { final queries = <String, String?>{ 'partNumber': '$partNumber', - 'uploadId': uploadId, + 'uploadId': _uploadId, }; - final etag = await upload(chunk, headers, queries); + final etag = await _uploadChunk(chunk, headers, queries); final part = CompletedPart(etag, partNumber); - parts.add(part); + _parts[part] = chunk.length; + _reportMultipartUploadProgress(); } } @override Future<String?> close() async { - if (uploadId == null) return etag; - return minio.completeMultipartUpload(bucket, object, uploadId!, parts); + if (_uploadId == null) return _etag; + return minio.completeMultipartUpload( + bucket, object, _uploadId!, _parts.keys.toList()); } Map<String, String> getHeaders(List<int> chunk) { @@ -95,7 +105,7 @@ class MinioUploader implements StreamConsumer<List<int>> { return headers; } - Future<String?> upload( + Future<String?> _uploadChunk( List<int> chunk, Map<String, String> headers, Map<String, String?>? queries, @@ -112,29 +122,34 @@ class MinioUploader implements StreamConsumer<List<int>> { validate(resp); var etag = resp.headers['etag']; - if (etag != null) { - etag = trimDoubleQuote(etag); - } + if (etag != null) etag = trimDoubleQuote(etag); return etag; } - Future<void> initMultipartUpload() async { + Future<void> _initMultipartUpload() async { //FIXME: this code still causes Signature Error //FIXME: https://github.com/xtyxtyx/minio-dart/issues/7 //TODO: uncomment when fixed // uploadId = await minio.findUploadId(bucket, object); - if (uploadId == null) { - uploadId = + if (_uploadId == null) { + _uploadId = await minio.initiateNewMultipartUpload(bucket, object, metadata); return; } - final parts = minio.listParts(bucket, object, uploadId!); + final parts = minio.listParts(bucket, object, _uploadId!); final entries = await parts .asyncMap((part) => MapEntry(part.partNumber, part)) .toList(); - oldParts = Map.fromEntries(entries); + _oldParts = Map.fromEntries(entries); + } + + void _reportMultipartUploadProgress() { + if (onProgress != null) { + final bytes = _parts.values.reduce((a, b) => a + b); + onProgress!(bytes); + } } } diff --git a/test/minio_test.dart b/test/minio_test.dart index 0cde5e72dc97b5e827e626f4de963672b346c63c..c1eab81c499712fc0589b49241b4f54b92a2cdbb 100644 --- a/test/minio_test.dart +++ b/test/minio_test.dart @@ -388,6 +388,19 @@ void testPutObject() { expect(stat.size, equals(objectData.length)); await minio.removeObject(bucketName, objectName); }); + + test('progress report works', () async { + final objectName = uniqueName(); + int? progress; + await minio.putObject( + bucketName, + objectName, + Stream.value(objectData), + onProgress: (bytes) => progress = bytes, + ); + expect(progress, equals(objectData.length)); + await minio.removeObject(bucketName, objectName); + }); }); }