From 512975ed4658cd1e1fab8af929a767a8a65b93e5 Mon Sep 17 00:00:00 2001
From: xuty <xty50337@hotmail.com>
Date: Fri, 27 Mar 2020 19:06:03 +0800
Subject: [PATCH] complete listIncompleteUploads

---
 README.md                   |   2 +-
 lib/src/minio.dart          | 198 ++++++++++++++++++++++++++++++++++--
 lib/src/minio_errors.dart   |   5 +
 lib/src/minio_helpers.dart  |  21 +---
 lib/src/minio_models.dart   |  40 ++++++++
 lib/src/minio_sign.dart     |   1 +
 lib/src/minio_uploader.dart |  37 +++++++
 lib/src/utils.dart          |  31 ++++++
 8 files changed, 310 insertions(+), 25 deletions(-)
 create mode 100644 lib/src/minio_uploader.dart
 create mode 100644 lib/src/utils.dart

diff --git a/README.md b/README.md
index 1a4edf5..b270583 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ Created from templates made available by Stagehand under a BSD-style
 | `removeBucket`         	| putObject              	| presignedPostPolicy  	| getBucketPolicy                         	|
 | `listObjects`          	| fPutObject             	|                      	| setBucketPolicy                         	|
 | listObjectsV2         	| copyObject             	|                      	| listenBucketNotification                	|
-| listIncompleteUploads 	| statObject             	|                      	|                                         	|
+| `listIncompleteUploads`	| statObject             	|                      	|                                         	|
 |                       	| removeObject           	|                      	|                                         	|
 |                       	| removeObjects          	|                      	|                                         	|
 |                       	| removeIncompleteUpload 	|                      	|                                         	|
diff --git a/lib/src/minio.dart b/lib/src/minio.dart
index 890f957..1340bc4 100644
--- a/lib/src/minio.dart
+++ b/lib/src/minio.dart
@@ -1,11 +1,10 @@
-import 'dart:collection';
-
 import 'package:http/http.dart';
 import 'package:minio/models.dart';
 import 'package:minio/src/minio_errors.dart';
 import 'package:minio/src/minio_helpers.dart';
 import 'package:minio/src/minio_s3.dart';
 import 'package:minio/src/minio_sign.dart';
+import 'package:minio/src/utils.dart';
 import 'package:xml/xml.dart' as xml;
 
 class MinioRequest extends Request {
@@ -32,10 +31,11 @@ class MinioClient {
     String object,
     String region,
     String payload = '',
+    String resource,
     Map<String, String> queries,
     Map<String, String> headers,
   }) async {
-    final url = getRequestUrl(bucket, object, queries);
+    final url = getRequestUrl(bucket, object, resource, queries);
     final request = MinioRequest(method, url);
     final date = DateTime.now().toUtc();
     final sha256sum = enableSHA256 ? sha256Hex(payload) : 'UNSIGNED-PAYLOAD';
@@ -69,6 +69,7 @@ class MinioClient {
     String object,
     String region,
     String payload = '',
+    String resource,
     Map<String, String> queries,
     Map<String, String> headers,
   }) async {
@@ -78,6 +79,7 @@ class MinioClient {
       object: object,
       region: region,
       payload: payload,
+      resource: resource,
       queries: queries,
       headers: headers,
     );
@@ -94,6 +96,7 @@ class MinioClient {
     String object,
     String region,
     String payload = '',
+    String resource,
     Map<String, String> queries,
     Map<String, String> headers,
   }) async {
@@ -103,6 +106,7 @@ class MinioClient {
       object: object,
       region: region,
       payload: payload,
+      resource: resource,
       queries: queries,
       headers: headers,
     );
@@ -111,7 +115,12 @@ class MinioClient {
     return response;
   }
 
-  Uri getRequestUrl(String bucket, String object, Map<String, String> query) {
+  Uri getRequestUrl(
+    String bucket,
+    String object,
+    String resource,
+    Map<String, String> queries,
+  ) {
     var host = minio.endPoint.toLowerCase();
     var path = '/';
 
@@ -127,12 +136,16 @@ class MinioClient {
       if (object != null) path = '/${bucket}/${object}';
     }
 
+    final resourcePart = resource != null ? '$resource&' : '';
+    final queryPart = encodeQueries(queries);
+    final query = resourcePart + queryPart;
+
     return Uri(
       scheme: minio.useSSL ? 'https' : 'http',
       host: host,
       port: minio.port,
       pathSegments: path.split('/'),
-      queryParameters: query,
+      query: query,
     );
   }
 
@@ -242,6 +255,38 @@ class Minio {
     return etag;
   }
 
+  Future<String> findUploadID(String bucket, String object) async {
+    MinioInvalidBucketNameError.check(bucket);
+    MinioInvalidObjectNameError.check(object);
+
+    MultipartUpload latestUpload;
+    String keyMarker;
+    String uploadIdMarker;
+    var isTruncated = false;
+
+    do {
+      final result = await listIncompleteUploadsQuery(
+        bucket,
+        object,
+        keyMarker,
+        uploadIdMarker,
+        '',
+      );
+      for (var upload in result.uploads) {
+        if (upload.key != object) continue;
+        if (latestUpload == null ||
+            upload.initiated.isAfter(latestUpload.initiated)) {
+          latestUpload = upload;
+        }
+      }
+      keyMarker = result.nextKeyMarker;
+      uploadIdMarker = result.nextUploadIdMarker;
+      isTruncated = result.isTruncated;
+    } while (isTruncated);
+
+    return latestUpload?.uploadId;
+  }
+
   Future<String> getBucketRegion(String bucket) async {
     MinioInvalidBucketNameError.check(bucket);
 
@@ -309,6 +354,96 @@ class Minio {
     return resp.stream;
   }
 
+  Future<String> initiateNewMultipartUpload(
+    String bucket,
+    String object,
+    Map<String, String> metaData,
+  ) async {
+    MinioInvalidBucketNameError.check(bucket);
+    MinioInvalidObjectNameError.check(object);
+
+    final resp = await _client.request(
+        method: 'POST',
+        bucket: bucket,
+        object: object,
+        headers: metaData,
+        resource: 'uploads');
+
+    validate(resp, expect: 200);
+
+    final node = xml.parse(resp.body);
+    return node.findAllElements('UploadId').first.text;
+  }
+
+  Stream<IncompleteUpload> listIncompleteUploads(
+    String bucket,
+    String prefix, [
+    bool recursive = false,
+  ]) async* {
+    MinioInvalidBucketNameError.check(bucket);
+    MinioInvalidPrefixError.check(prefix);
+
+    final delimiter = recursive ? '' : '/';
+
+    String keyMarker;
+    String uploadIdMarker;
+    var isTruncated = false;
+
+    do {
+      final result = await listIncompleteUploadsQuery(
+        bucket,
+        prefix,
+        keyMarker,
+        uploadIdMarker,
+        delimiter,
+      );
+      for (var upload in result.uploads) {
+        final parts = await listParts(bucket, upload.key, upload.uploadId);
+        final size = await parts.fold(0, (acc, item) => acc + item.size);
+        yield IncompleteUpload(upload: upload, size: size);
+      }
+      keyMarker = result.nextKeyMarker;
+      uploadIdMarker = result.nextUploadIdMarker;
+      isTruncated = result.isTruncated;
+    } while (isTruncated);
+  }
+
+  Future<ListMultipartUploadsOutput> listIncompleteUploadsQuery(
+    String bucket,
+    String prefix,
+    String keyMarker,
+    String uploadIdMarker,
+    String delimiter,
+  ) async {
+    MinioInvalidBucketNameError.check(bucket);
+    MinioInvalidPrefixError.check(prefix);
+
+    var queries = {
+      'uploads': null,
+      'prefix': prefix,
+      'delimiter': delimiter,
+    };
+
+    if (keyMarker != null) {
+      queries['key-marker'] = keyMarker;
+    }
+    if (uploadIdMarker != null) {
+      queries['upload-id-marker'] = uploadIdMarker;
+    }
+
+    final resp = await _client.request(
+      method: 'GET',
+      bucket: bucket,
+      resource: 'uploads',
+      queries: queries,
+    );
+
+    validate(resp);
+
+    final node = xml.parse(resp.body);
+    return ListMultipartUploadsOutput.fromXml(node.root);
+  }
+
   Future<List<Bucket>> listBuckets() async {
     final resp = await _client.request(
       method: 'GET',
@@ -326,6 +461,7 @@ class Minio {
     bool recursive = false,
   }) async* {
     MinioInvalidBucketNameError.check(bucket);
+    MinioInvalidPrefixError.check(prefix);
     final delimiter = recursive ? '' : '/';
 
     var marker = '';
@@ -355,6 +491,7 @@ class Minio {
     int maxKeys,
   ) async {
     MinioInvalidBucketNameError.check(bucket);
+    MinioInvalidPrefixError.check(prefix);
 
     final queries = <String, String>{};
     queries['prefix'] = prefix;
@@ -392,6 +529,49 @@ class Minio {
       ..nextMarker = nextMarker;
   }
 
+  Stream<Part> listParts(
+    String bucket,
+    String object,
+    String uploadId,
+  ) async* {
+    MinioInvalidBucketNameError.check(bucket);
+    MinioInvalidObjectNameError.check(object);
+
+    var marker = 0;
+    var isTruncated = false;
+    do {
+      final result = await listPartsQuery(bucket, object, uploadId, marker);
+      marker = result.nextPartNumberMarker;
+      isTruncated = result.isTruncated;
+      yield* Stream.fromIterable(result.parts);
+    } while (isTruncated);
+  }
+
+  Future<ListPartsOutput> listPartsQuery(
+    String bucket,
+    String object,
+    String uploadId,
+    int marker,
+  ) async {
+    var queries = <String, String>{'uploadId': uploadId};
+
+    if (marker != null && marker != 0) {
+      queries['part-number-marker'] = marker.toString();
+    }
+
+    final resp = await _client.request(
+      method: 'GET',
+      bucket: bucket,
+      object: object,
+      queries: queries,
+    );
+
+    validate(resp);
+
+    final node = xml.parse(resp.body);
+    return ListPartsOutput.fromXml(node.root);
+  }
+
   Future<void> makeBucket(String bucket, [String region]) async {
     MinioInvalidBucketNameError.check(bucket);
     if (this.region != null && region != null && this.region != region) {
@@ -418,12 +598,18 @@ class Minio {
   Future putObject(
     String bucket,
     String object,
-    Stream data,
+    Stream<List<int>> data,
     int size, {
     Map<String, String> metadata,
   }) async {
     MinioInvalidBucketNameError.check(bucket);
     MinioInvalidObjectNameError.check(object);
+
+    assert(data != null);
+    assert(size != null && size >= 0);
+
+    metadata = prependXAMZMeta(metadata ?? {});
+    // Stream.
   }
 
   Future<void> removeBucket(String bucket) async {
diff --git a/lib/src/minio_errors.dart b/lib/src/minio_errors.dart
index 075675d..e46592f 100644
--- a/lib/src/minio_errors.dart
+++ b/lib/src/minio_errors.dart
@@ -65,6 +65,11 @@ class MinioInvalidDateError extends MinioError {
 
 class MinioInvalidPrefixError extends MinioError {
   MinioInvalidPrefixError(String message) : super(message);
+
+  static void check(String prefix) {
+    if (isValidPrefix(prefix)) return;
+    throw MinioInvalidPrefixError('Invalid prefix: $prefix');
+  }
 }
 
 class MinioInvalidBucketPolicyError extends MinioError {
diff --git a/lib/src/minio_helpers.dart b/lib/src/minio_helpers.dart
index 952b0f9..eb8b108 100644
--- a/lib/src/minio_helpers.dart
+++ b/lib/src/minio_helpers.dart
@@ -1,9 +1,3 @@
-import 'dart:convert';
-
-import 'package:convert/convert.dart';
-import 'package:crypto/crypto.dart';
-import 'package:xml/xml.dart';
-
 bool isValidBucketName(String bucket) {
   if (bucket == null) return false;
 
@@ -124,18 +118,9 @@ String makeDateShort(DateTime date) {
       isoDate.substring(8, 10);
 }
 
-String sha256Hex(String data) {
-  return hex.encode(sha256.convert(utf8.encode(data)).bytes);
-}
-
-XmlElement getNodeProp(XmlElement xml, String name) {
-  final result = xml.findElements(name);
-  return result.isNotEmpty ? result.first : null;
-}
-
-Map<String, String> prependXAMZMeta(Map<String, String> metaData) {
-  final newMetadata = Map<String, String>.from(metaData);
-  for (var key in metaData.keys) {
+Map<String, String> prependXAMZMeta(Map<String, String> metadata) {
+  final newMetadata = Map<String, String>.from(metadata);
+  for (var key in metadata.keys) {
     if (!isAmzHeader(key) &&
         !isSupportedHeader(key) &&
         !isStorageclassHeader(key)) {
diff --git a/lib/src/minio_models.dart b/lib/src/minio_models.dart
index 0c71150..4b2957e 100644
--- a/lib/src/minio_models.dart
+++ b/lib/src/minio_models.dart
@@ -29,3 +29,43 @@ class CompleteMultipartUpload {
   /// Array of CompletedPart data types.
   List<CompletedPart> parts;
 }
+
+class ListMultipartUploadsOutput {
+  ListMultipartUploadsOutput.fromXml(XmlElement xml) {
+    isTruncated = getProp(xml, 'IsLatest')?.text?.toUpperCase() == 'TRUE';
+    nextKeyMarker = getProp(xml, 'NextKeyMarker')?.text;
+    nextUploadIdMarker = getProp(xml, 'NextUploadIdMarker')?.text;
+    uploads = xml
+        .findElements('Upload')
+        .map((e) => MultipartUpload.fromXml(e))
+        .toList();
+  }
+
+  bool isTruncated;
+  String nextKeyMarker;
+  String nextUploadIdMarker;
+  List<MultipartUpload> uploads;
+}
+
+class ListPartsOutput {
+  ListPartsOutput.fromXml(XmlElement xml) {
+    isTruncated = getProp(xml, 'IsLatest')?.text?.toUpperCase() == 'TRUE';
+    nextPartNumberMarker =
+        int.parse(getProp(xml, 'NextPartNumberMarker')?.text);
+    parts = xml.findElements('Upload').map((e) => Part.fromXml(e)).toList();
+  }
+
+  bool isTruncated;
+  int nextPartNumberMarker;
+  List<Part> parts;
+}
+
+class IncompleteUpload {
+  IncompleteUpload({
+    this.upload,
+    this.size,
+  });
+  
+  final MultipartUpload upload;
+  final int size;
+}
diff --git a/lib/src/minio_sign.dart b/lib/src/minio_sign.dart
index fa240f4..2915ed6 100644
--- a/lib/src/minio_sign.dart
+++ b/lib/src/minio_sign.dart
@@ -2,6 +2,7 @@ import 'package:convert/convert.dart';
 import 'package:crypto/crypto.dart';
 import 'package:minio/minio.dart';
 import 'package:minio/src/minio_helpers.dart';
+import 'package:minio/src/utils.dart';
 
 const signV4Algorithm = 'AWS4-HMAC-SHA256';
 
diff --git a/lib/src/minio_uploader.dart b/lib/src/minio_uploader.dart
new file mode 100644
index 0000000..0c87f9a
--- /dev/null
+++ b/lib/src/minio_uploader.dart
@@ -0,0 +1,37 @@
+import 'dart:async';
+
+import 'package:minio/minio.dart';
+
+class MinioUploader implements StreamConsumer<List<int>> {
+  MinioUploader(
+    this.client,
+    this.bucket,
+    this.object,
+    this.partSize,
+    this.metaData,
+  );
+
+  final Minio client;
+  final String bucket;
+  final String object;
+  final int partSize;
+  final Map<String, String> metaData;
+
+  var emptyStream = true;
+  var partNumber = 1;
+  var etags = [];
+  List oldParts;
+  String id;
+
+  @override
+  Future addStream(Stream<List<int>> stream) {
+    // TODO: implement addStream
+    throw UnimplementedError();
+  }
+
+  @override
+  Future close() {
+    // TODO: implement close
+    throw UnimplementedError();
+  }
+}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
new file mode 100644
index 0000000..36c8f48
--- /dev/null
+++ b/lib/src/utils.dart
@@ -0,0 +1,31 @@
+import 'dart:convert';
+
+import 'package:convert/convert.dart';
+import 'package:crypto/crypto.dart';
+import 'package:xml/xml.dart';
+
+String sha256Hex(String data) {
+  return hex.encode(sha256.convert(utf8.encode(data)).bytes);
+}
+
+XmlElement getNodeProp(XmlElement xml, String name) {
+  final result = xml.findElements(name);
+  return result.isNotEmpty ? result.first : null;
+}
+
+String encodeQuery(String rawKey, String rawValue) {
+  final pair = [rawKey];
+  if (rawValue != null) {
+    pair.add(Uri.encodeQueryComponent(rawValue));
+  }
+  return pair.join('=');
+}
+
+String encodeQueries(Map<String, String> queries) {
+  final pairs = <String>[];
+  for (var key in queries.keys) {
+    final value = queries[key];
+    pairs.add(encodeQuery(key, value));
+  }
+  return pairs.join('=');
+}
-- 
GitLab