From 48329965f8732c51f27248fbbd181215ef41b95c Mon Sep 17 00:00:00 2001 From: xuty <xty50337@hotmail.com> Date: Fri, 27 Mar 2020 21:41:15 +0800 Subject: [PATCH] complete putObject --- lib/src/minio.dart | 70 ++++++++++++++++--- lib/src/minio_uploader.dart | 131 ++++++++++++++++++++++++++++++++---- lib/src/utils.dart | 28 ++++++++ pubspec.yaml | 1 + 4 files changed, 208 insertions(+), 22 deletions(-) diff --git a/lib/src/minio.dart b/lib/src/minio.dart index 1340bc4..752f456 100644 --- a/lib/src/minio.dart +++ b/lib/src/minio.dart @@ -1,14 +1,34 @@ +import 'dart:convert'; + import 'package:http/http.dart'; import 'package:minio/models.dart'; import 'package:minio/src/minio_errors.dart'; import 'package:minio/src/minio_helpers.dart'; import 'package:minio/src/minio_s3.dart'; import 'package:minio/src/minio_sign.dart'; +import 'package:minio/src/minio_uploader.dart'; import 'package:minio/src/utils.dart'; import 'package:xml/xml.dart' as xml; -class MinioRequest extends Request { +class MinioRequest extends BaseRequest { MinioRequest(String method, Uri url) : super(method, url); + + dynamic body; + + @override + ByteStream finalize() { + super.finalize(); + if (body is String) { + return ByteStream.fromBytes(utf8.encode(body)); + } + if (body is List<int>) { + return ByteStream.fromBytes(body); + } + if (body is Stream<List<int>>) { + return ByteStream(body); + } + throw UnsupportedError('unsupported body type: ${body.runtimeType}'); + } } class MinioClient { @@ -30,8 +50,8 @@ class MinioClient { String bucket, String object, String region, - String payload = '', String resource, + dynamic payload = '', Map<String, String> queries, Map<String, String> headers, }) async { @@ -68,8 +88,8 @@ class MinioClient { String bucket, String object, String region, - String payload = '', String resource, + dynamic payload = '', Map<String, String> queries, Map<String, String> headers, }) async { @@ -95,8 +115,8 @@ class MinioClient { String bucket, String object, String region, - String payload = '', String resource, + dynamic payload = '', Map<String, String> queries, Map<String, String> headers, }) async { @@ -200,6 +220,10 @@ class Minio { _client = MinioClient(this); } + final partSize = 64 * 1024 * 1024; + final maximumPartSize = 5 * 1024 * 1024 * 1024; + final maxObjectSize = 5 * 1024 * 1024 * 1024 * 1024; + final String endPoint; final int port; final bool useSSL; @@ -220,10 +244,30 @@ class Minio { return resp.statusCode == 200; } + int calculatePartSize(int size) { + assert(size != null && size >= 0); + + if (size > maxObjectSize) { + throw ArgumentError('size should not be more than $maxObjectSize'); + } + + if (this.partSize != null) { + return this.partSize; + } + + var partSize = this.partSize; + while (true) { + if ((partSize * 10000) > size) { + return partSize; + } + partSize += 16 * 1024 * 1024; + } + } + Future<String> completeMultipartUpload( String bucket, String object, - int uploadId, + String uploadId, List<CompletedPart> parts, ) async { MinioInvalidBucketNameError.check(bucket); @@ -232,7 +276,7 @@ class Minio { assert(uploadId != null); assert(parts != null); - var queries = {'uploadId': 'uploadId'}; + var queries = {'uploadId': uploadId}; var payload = CompleteMultipartUpload(parts).toXml().toString(); final resp = await _client.request( @@ -595,21 +639,29 @@ class Minio { return resp.body; } - Future putObject( + Future<String> putObject( String bucket, String object, Stream<List<int>> data, int size, { Map<String, String> metadata, - }) async { + }) { MinioInvalidBucketNameError.check(bucket); MinioInvalidObjectNameError.check(object); assert(data != null); - assert(size != null && size >= 0); + assert(size >= 0 || size == null); metadata = prependXAMZMeta(metadata ?? {}); // Stream. + + size ??= maxObjectSize; + size = calculatePartSize(size); + + final chunker = BlockStream(size); + final uploader = + MinioUploader(this, _client, bucket, object, size, metadata); + return data.transform(chunker).pipe(uploader); } Future<void> removeBucket(String bucket) async { diff --git a/lib/src/minio_uploader.dart b/lib/src/minio_uploader.dart index 0c87f9a..38d0e85 100644 --- a/lib/src/minio_uploader.dart +++ b/lib/src/minio_uploader.dart @@ -1,37 +1,142 @@ import 'dart:async'; +import 'dart:convert'; +import 'package:convert/convert.dart'; +import 'package:crypto/crypto.dart'; import 'package:minio/minio.dart'; +import 'package:minio/models.dart'; class MinioUploader implements StreamConsumer<List<int>> { MinioUploader( + this.minio, this.client, this.bucket, this.object, this.partSize, - this.metaData, + this.metadata, ); - final Minio client; + final Minio minio; + final MinioClient client; final String bucket; final String object; final int partSize; - final Map<String, String> metaData; + final Map<String, String> metadata; - var emptyStream = true; var partNumber = 1; - var etags = []; - List oldParts; - String id; + String etag; + List<CompletedPart> parts = []; + Map<int, Part> oldParts; + String uploadId; @override - Future addStream(Stream<List<int>> stream) { - // TODO: implement addStream - throw UnimplementedError(); + Future addStream(Stream<List<int>> stream) async { + await for (var chunk in stream) { + List<int> md5digest; + final headers = <String, String>{}; + headers.addAll(metadata); + headers['Content-Length'] = chunk.length.toString(); + if (!client.enableSHA256) { + md5digest = md5.convert(chunk).bytes; + headers['Content-MD5'] = base64.encode(md5digest); + } + + if (this.partNumber == 1 && chunk.length < partSize) { + return uploadInOneGo(chunk, headers); + } + + if (uploadId == null) { + await initMultipartUpload(); + } + + final partNumber = this.partNumber++; + + if (oldParts != null) { + final oldPart = oldParts[partNumber]; + if (oldPart != null) { + md5digest ??= md5.convert(chunk).bytes; + if (hex.encode(md5digest) == oldPart.eTag) { + final part = CompletedPart(oldPart.eTag, partNumber); + parts.add(part); + continue; + } + } + } + + final queries = { + 'partNumber': partNumber, + 'uploadId': uploadId, + }; + + final resp = await client.request( + method: 'PUT', + queries: queries, + headers: headers, + bucket: bucket, + object: object, + ); + + validate(resp); + + var etag = resp.headers['etag']; + if (etag != null) { + etag = etag.replaceAll(RegExp('^"'), '').replaceAll(RegExp(r'"$'), ''); + } + final part = CompletedPart(etag, partNumber); + parts.add(part); + } } @override - Future close() { - // TODO: implement close - throw UnimplementedError(); + Future<String> close() async { + if (uploadId == null) { + return etag; + } + + return minio.completeMultipartUpload(bucket, object, uploadId, parts); + } + + Map<String, String> getHeaders(List<int> chunk) { + final headers = <String, String>{}; + headers.addAll(metadata); + headers['Content-Length'] = chunk.length.toString(); + if (!client.enableSHA256) { + final md5digest = md5.convert(chunk).bytes; + headers['Content-MD5'] = base64.encode(md5digest); + } + return headers; + } + + Future<void> uploadInOneGo( + List<int> chunk, Map<String, String> headers) async { + final resp = await client.request( + method: 'PUT', + headers: headers, + bucket: bucket, + object: object, + payload: chunk, + ); + + validate(resp); + + etag = resp.headers['etag']; + if (etag != null) { + etag = etag.replaceAll(RegExp('^"'), '').replaceAll(RegExp(r'"$'), ''); + } + } + + Future<void> initMultipartUpload() async { + uploadId = await minio.findUploadID(bucket, object); + + if (uploadId == null) { + await minio.initiateNewMultipartUpload(bucket, object, metadata); + return; + } + + final parts = await minio.listParts(bucket, object, uploadId); + final entries = await parts + .asyncMap((part) => MapEntry(part.partNumber, part)) + .toList(); + oldParts = Map.fromEntries(entries); } } diff --git a/lib/src/utils.dart b/lib/src/utils.dart index 36c8f48..36ecc57 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -1,5 +1,8 @@ +import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; +import 'package:buffer/buffer.dart'; import 'package:convert/convert.dart'; import 'package:crypto/crypto.dart'; import 'package:xml/xml.dart'; @@ -29,3 +32,28 @@ String encodeQueries(Map<String, String> queries) { } return pairs.join('='); } + +class BlockStream extends StreamTransformerBase<List<int>, List<int>> { + BlockStream(this.size); + + final int size; + + @override + Stream<List<int>> bind(Stream<List<int>> stream) async* { + var buffer = BytesBuffer(); + + await for (var chunk in stream) { + buffer.add(chunk); + if (buffer.length >= size) { + final block = buffer.toBytes(); + yield block.sublist(0, size); + buffer = BytesBuffer(); + buffer.add(block.sublist(size)); + } + } + + if (buffer.length != 0) { + yield buffer.toBytes(); + } + } +} diff --git a/pubspec.yaml b/pubspec.yaml index dd14cf3..d42d6fc 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -13,6 +13,7 @@ dependencies: crypto: ^2.1.4 convert: ^2.1.1 xml: ^3.7.0 + buffer: ^1.0.6 # path: ^1.6.0 dev_dependencies: -- GitLab