diff --git a/README.md b/README.md index 1a4edf53221160a2a97be48d880bbf2132945507..b2705839f13afb560b244a3d94ed6a2e3bf5f860 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Created from templates made available by Stagehand under a BSD-style | `removeBucket` | putObject | presignedPostPolicy | getBucketPolicy | | `listObjects` | fPutObject | | setBucketPolicy | | listObjectsV2 | copyObject | | listenBucketNotification | -| listIncompleteUploads | statObject | | | +| `listIncompleteUploads` | statObject | | | | | removeObject | | | | | removeObjects | | | | | removeIncompleteUpload | | | diff --git a/lib/src/minio.dart b/lib/src/minio.dart index 890f9570b4afd8b4f56a4c6888b863dc682a29ed..1340bc4b8016ed1d57a7c42c8c4d12073b61d804 100644 --- a/lib/src/minio.dart +++ b/lib/src/minio.dart @@ -1,11 +1,10 @@ -import 'dart:collection'; - import 'package:http/http.dart'; import 'package:minio/models.dart'; import 'package:minio/src/minio_errors.dart'; import 'package:minio/src/minio_helpers.dart'; import 'package:minio/src/minio_s3.dart'; import 'package:minio/src/minio_sign.dart'; +import 'package:minio/src/utils.dart'; import 'package:xml/xml.dart' as xml; class MinioRequest extends Request { @@ -32,10 +31,11 @@ class MinioClient { String object, String region, String payload = '', + String resource, Map<String, String> queries, Map<String, String> headers, }) async { - final url = getRequestUrl(bucket, object, queries); + final url = getRequestUrl(bucket, object, resource, queries); final request = MinioRequest(method, url); final date = DateTime.now().toUtc(); final sha256sum = enableSHA256 ? sha256Hex(payload) : 'UNSIGNED-PAYLOAD'; @@ -69,6 +69,7 @@ class MinioClient { String object, String region, String payload = '', + String resource, Map<String, String> queries, Map<String, String> headers, }) async { @@ -78,6 +79,7 @@ class MinioClient { object: object, region: region, payload: payload, + resource: resource, queries: queries, headers: headers, ); @@ -94,6 +96,7 @@ class MinioClient { String object, String region, String payload = '', + String resource, Map<String, String> queries, Map<String, String> headers, }) async { @@ -103,6 +106,7 @@ class MinioClient { object: object, region: region, payload: payload, + resource: resource, queries: queries, headers: headers, ); @@ -111,7 +115,12 @@ class MinioClient { return response; } - Uri getRequestUrl(String bucket, String object, Map<String, String> query) { + Uri getRequestUrl( + String bucket, + String object, + String resource, + Map<String, String> queries, + ) { var host = minio.endPoint.toLowerCase(); var path = '/'; @@ -127,12 +136,16 @@ class MinioClient { if (object != null) path = '/${bucket}/${object}'; } + final resourcePart = resource != null ? '$resource&' : ''; + final queryPart = encodeQueries(queries); + final query = resourcePart + queryPart; + return Uri( scheme: minio.useSSL ? 'https' : 'http', host: host, port: minio.port, pathSegments: path.split('/'), - queryParameters: query, + query: query, ); } @@ -242,6 +255,38 @@ class Minio { return etag; } + Future<String> findUploadID(String bucket, String object) async { + MinioInvalidBucketNameError.check(bucket); + MinioInvalidObjectNameError.check(object); + + MultipartUpload latestUpload; + String keyMarker; + String uploadIdMarker; + var isTruncated = false; + + do { + final result = await listIncompleteUploadsQuery( + bucket, + object, + keyMarker, + uploadIdMarker, + '', + ); + for (var upload in result.uploads) { + if (upload.key != object) continue; + if (latestUpload == null || + upload.initiated.isAfter(latestUpload.initiated)) { + latestUpload = upload; + } + } + keyMarker = result.nextKeyMarker; + uploadIdMarker = result.nextUploadIdMarker; + isTruncated = result.isTruncated; + } while (isTruncated); + + return latestUpload?.uploadId; + } + Future<String> getBucketRegion(String bucket) async { MinioInvalidBucketNameError.check(bucket); @@ -309,6 +354,96 @@ class Minio { return resp.stream; } + Future<String> initiateNewMultipartUpload( + String bucket, + String object, + Map<String, String> metaData, + ) async { + MinioInvalidBucketNameError.check(bucket); + MinioInvalidObjectNameError.check(object); + + final resp = await _client.request( + method: 'POST', + bucket: bucket, + object: object, + headers: metaData, + resource: 'uploads'); + + validate(resp, expect: 200); + + final node = xml.parse(resp.body); + return node.findAllElements('UploadId').first.text; + } + + Stream<IncompleteUpload> listIncompleteUploads( + String bucket, + String prefix, [ + bool recursive = false, + ]) async* { + MinioInvalidBucketNameError.check(bucket); + MinioInvalidPrefixError.check(prefix); + + final delimiter = recursive ? '' : '/'; + + String keyMarker; + String uploadIdMarker; + var isTruncated = false; + + do { + final result = await listIncompleteUploadsQuery( + bucket, + prefix, + keyMarker, + uploadIdMarker, + delimiter, + ); + for (var upload in result.uploads) { + final parts = await listParts(bucket, upload.key, upload.uploadId); + final size = await parts.fold(0, (acc, item) => acc + item.size); + yield IncompleteUpload(upload: upload, size: size); + } + keyMarker = result.nextKeyMarker; + uploadIdMarker = result.nextUploadIdMarker; + isTruncated = result.isTruncated; + } while (isTruncated); + } + + Future<ListMultipartUploadsOutput> listIncompleteUploadsQuery( + String bucket, + String prefix, + String keyMarker, + String uploadIdMarker, + String delimiter, + ) async { + MinioInvalidBucketNameError.check(bucket); + MinioInvalidPrefixError.check(prefix); + + var queries = { + 'uploads': null, + 'prefix': prefix, + 'delimiter': delimiter, + }; + + if (keyMarker != null) { + queries['key-marker'] = keyMarker; + } + if (uploadIdMarker != null) { + queries['upload-id-marker'] = uploadIdMarker; + } + + final resp = await _client.request( + method: 'GET', + bucket: bucket, + resource: 'uploads', + queries: queries, + ); + + validate(resp); + + final node = xml.parse(resp.body); + return ListMultipartUploadsOutput.fromXml(node.root); + } + Future<List<Bucket>> listBuckets() async { final resp = await _client.request( method: 'GET', @@ -326,6 +461,7 @@ class Minio { bool recursive = false, }) async* { MinioInvalidBucketNameError.check(bucket); + MinioInvalidPrefixError.check(prefix); final delimiter = recursive ? '' : '/'; var marker = ''; @@ -355,6 +491,7 @@ class Minio { int maxKeys, ) async { MinioInvalidBucketNameError.check(bucket); + MinioInvalidPrefixError.check(prefix); final queries = <String, String>{}; queries['prefix'] = prefix; @@ -392,6 +529,49 @@ class Minio { ..nextMarker = nextMarker; } + Stream<Part> listParts( + String bucket, + String object, + String uploadId, + ) async* { + MinioInvalidBucketNameError.check(bucket); + MinioInvalidObjectNameError.check(object); + + var marker = 0; + var isTruncated = false; + do { + final result = await listPartsQuery(bucket, object, uploadId, marker); + marker = result.nextPartNumberMarker; + isTruncated = result.isTruncated; + yield* Stream.fromIterable(result.parts); + } while (isTruncated); + } + + Future<ListPartsOutput> listPartsQuery( + String bucket, + String object, + String uploadId, + int marker, + ) async { + var queries = <String, String>{'uploadId': uploadId}; + + if (marker != null && marker != 0) { + queries['part-number-marker'] = marker.toString(); + } + + final resp = await _client.request( + method: 'GET', + bucket: bucket, + object: object, + queries: queries, + ); + + validate(resp); + + final node = xml.parse(resp.body); + return ListPartsOutput.fromXml(node.root); + } + Future<void> makeBucket(String bucket, [String region]) async { MinioInvalidBucketNameError.check(bucket); if (this.region != null && region != null && this.region != region) { @@ -418,12 +598,18 @@ class Minio { Future putObject( String bucket, String object, - Stream data, + Stream<List<int>> data, int size, { Map<String, String> metadata, }) async { MinioInvalidBucketNameError.check(bucket); MinioInvalidObjectNameError.check(object); + + assert(data != null); + assert(size != null && size >= 0); + + metadata = prependXAMZMeta(metadata ?? {}); + // Stream. } Future<void> removeBucket(String bucket) async { diff --git a/lib/src/minio_errors.dart b/lib/src/minio_errors.dart index 075675d627eb88fa501e452ad061b39dea2d62f1..e46592fc7957511962aaa009eb1a5ea08eef149b 100644 --- a/lib/src/minio_errors.dart +++ b/lib/src/minio_errors.dart @@ -65,6 +65,11 @@ class MinioInvalidDateError extends MinioError { class MinioInvalidPrefixError extends MinioError { MinioInvalidPrefixError(String message) : super(message); + + static void check(String prefix) { + if (isValidPrefix(prefix)) return; + throw MinioInvalidPrefixError('Invalid prefix: $prefix'); + } } class MinioInvalidBucketPolicyError extends MinioError { diff --git a/lib/src/minio_helpers.dart b/lib/src/minio_helpers.dart index 952b0f917853993cd58c138769f6d25b7aead415..eb8b108dac1207d1cbd82bf59c5d8a7cf03ba30d 100644 --- a/lib/src/minio_helpers.dart +++ b/lib/src/minio_helpers.dart @@ -1,9 +1,3 @@ -import 'dart:convert'; - -import 'package:convert/convert.dart'; -import 'package:crypto/crypto.dart'; -import 'package:xml/xml.dart'; - bool isValidBucketName(String bucket) { if (bucket == null) return false; @@ -124,18 +118,9 @@ String makeDateShort(DateTime date) { isoDate.substring(8, 10); } -String sha256Hex(String data) { - return hex.encode(sha256.convert(utf8.encode(data)).bytes); -} - -XmlElement getNodeProp(XmlElement xml, String name) { - final result = xml.findElements(name); - return result.isNotEmpty ? result.first : null; -} - -Map<String, String> prependXAMZMeta(Map<String, String> metaData) { - final newMetadata = Map<String, String>.from(metaData); - for (var key in metaData.keys) { +Map<String, String> prependXAMZMeta(Map<String, String> metadata) { + final newMetadata = Map<String, String>.from(metadata); + for (var key in metadata.keys) { if (!isAmzHeader(key) && !isSupportedHeader(key) && !isStorageclassHeader(key)) { diff --git a/lib/src/minio_models.dart b/lib/src/minio_models.dart index 0c71150c358511330cfae0af3ac91ed520b66b10..4b2957e09ab919cf089de5eb15075d68bbd49f02 100644 --- a/lib/src/minio_models.dart +++ b/lib/src/minio_models.dart @@ -29,3 +29,43 @@ class CompleteMultipartUpload { /// Array of CompletedPart data types. List<CompletedPart> parts; } + +class ListMultipartUploadsOutput { + ListMultipartUploadsOutput.fromXml(XmlElement xml) { + isTruncated = getProp(xml, 'IsLatest')?.text?.toUpperCase() == 'TRUE'; + nextKeyMarker = getProp(xml, 'NextKeyMarker')?.text; + nextUploadIdMarker = getProp(xml, 'NextUploadIdMarker')?.text; + uploads = xml + .findElements('Upload') + .map((e) => MultipartUpload.fromXml(e)) + .toList(); + } + + bool isTruncated; + String nextKeyMarker; + String nextUploadIdMarker; + List<MultipartUpload> uploads; +} + +class ListPartsOutput { + ListPartsOutput.fromXml(XmlElement xml) { + isTruncated = getProp(xml, 'IsLatest')?.text?.toUpperCase() == 'TRUE'; + nextPartNumberMarker = + int.parse(getProp(xml, 'NextPartNumberMarker')?.text); + parts = xml.findElements('Upload').map((e) => Part.fromXml(e)).toList(); + } + + bool isTruncated; + int nextPartNumberMarker; + List<Part> parts; +} + +class IncompleteUpload { + IncompleteUpload({ + this.upload, + this.size, + }); + + final MultipartUpload upload; + final int size; +} diff --git a/lib/src/minio_sign.dart b/lib/src/minio_sign.dart index fa240f4321451780e06f0803ee09a949e92430b9..2915ed6d6c6dddf90d759043c7fa717564563271 100644 --- a/lib/src/minio_sign.dart +++ b/lib/src/minio_sign.dart @@ -2,6 +2,7 @@ import 'package:convert/convert.dart'; import 'package:crypto/crypto.dart'; import 'package:minio/minio.dart'; import 'package:minio/src/minio_helpers.dart'; +import 'package:minio/src/utils.dart'; const signV4Algorithm = 'AWS4-HMAC-SHA256'; diff --git a/lib/src/minio_uploader.dart b/lib/src/minio_uploader.dart new file mode 100644 index 0000000000000000000000000000000000000000..0c87f9a172cf726c2d9f0621151bcd93b83967cb --- /dev/null +++ b/lib/src/minio_uploader.dart @@ -0,0 +1,37 @@ +import 'dart:async'; + +import 'package:minio/minio.dart'; + +class MinioUploader implements StreamConsumer<List<int>> { + MinioUploader( + this.client, + this.bucket, + this.object, + this.partSize, + this.metaData, + ); + + final Minio client; + final String bucket; + final String object; + final int partSize; + final Map<String, String> metaData; + + var emptyStream = true; + var partNumber = 1; + var etags = []; + List oldParts; + String id; + + @override + Future addStream(Stream<List<int>> stream) { + // TODO: implement addStream + throw UnimplementedError(); + } + + @override + Future close() { + // TODO: implement close + throw UnimplementedError(); + } +} diff --git a/lib/src/utils.dart b/lib/src/utils.dart new file mode 100644 index 0000000000000000000000000000000000000000..36c8f48278211208fe21c6ea60b8693a132b3332 --- /dev/null +++ b/lib/src/utils.dart @@ -0,0 +1,31 @@ +import 'dart:convert'; + +import 'package:convert/convert.dart'; +import 'package:crypto/crypto.dart'; +import 'package:xml/xml.dart'; + +String sha256Hex(String data) { + return hex.encode(sha256.convert(utf8.encode(data)).bytes); +} + +XmlElement getNodeProp(XmlElement xml, String name) { + final result = xml.findElements(name); + return result.isNotEmpty ? result.first : null; +} + +String encodeQuery(String rawKey, String rawValue) { + final pair = [rawKey]; + if (rawValue != null) { + pair.add(Uri.encodeQueryComponent(rawValue)); + } + return pair.join('='); +} + +String encodeQueries(Map<String, String> queries) { + final pairs = <String>[]; + for (var key in queries.keys) { + final value = queries[key]; + pairs.add(encodeQuery(key, value)); + } + return pairs.join('='); +}