From 426f1ca8ac78e4d9b13059e799007cc0e2d9bd76 Mon Sep 17 00:00:00 2001
From: xuty <xty50337@hotmail.com>
Date: Tue, 4 Jan 2022 23:04:31 +0800
Subject: [PATCH] Update stream file upload

---
 CHANGELOG.md              |  3 +++
 lib/src/minio.dart        |  2 +-
 lib/src/minio_client.dart |  2 +-
 lib/src/utils.dart        | 29 +++++++++++++++++++++++++++--
 pubspec.yaml              |  2 +-
 test/minio_test.dart      | 17 ++++++++++++++++-
 test/utils_test.dart      | 23 +++++++++++++++++++++--
 7 files changed, 70 insertions(+), 8 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index c138251..d6a5452 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,6 @@
+# 3.3.3-pre
+- Update stream file upload
+
 # 3.3.2-pre
 - Add tests
 
diff --git a/lib/src/minio.dart b/lib/src/minio.dart
index f5b4a5c..89ea1b9 100644
--- a/lib/src/minio.dart
+++ b/lib/src/minio.dart
@@ -942,7 +942,7 @@ class Minio {
       metadata,
       onProgress,
     );
-    final chunker = BlockStream(partSize);
+    final chunker = MinChunkSize(partSize);
     final etag = await data.transform(chunker).pipe(uploader);
     return etag.toString();
   }
diff --git a/lib/src/minio_client.dart b/lib/src/minio_client.dart
index af0d2ef..9361ea1 100644
--- a/lib/src/minio_client.dart
+++ b/lib/src/minio_client.dart
@@ -39,7 +39,7 @@ class MinioRequest extends BaseRequest {
       headers['content-length'] = body.length.toString();
     }
 
-    stream = stream.transform(BlockStream(1 << 16));
+    stream = stream.transform(MaxChunkSize(1 << 16));
 
     if (onProgress == null) {
       return ByteStream(stream);
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index cba532b..307cf27 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -63,8 +63,8 @@ String encodeQueries(Map<String, dynamic> queries) {
   return pairs.join('&');
 }
 
-class BlockStream extends StreamTransformerBase<Uint8List, Uint8List> {
-  BlockStream(this.size);
+class MaxChunkSize extends StreamTransformerBase<Uint8List, Uint8List> {
+  MaxChunkSize(this.size);
 
   final int size;
 
@@ -89,6 +89,31 @@ class BlockStream extends StreamTransformerBase<Uint8List, Uint8List> {
   }
 }
 
+class MinChunkSize extends StreamTransformerBase<Uint8List, Uint8List> {
+  MinChunkSize(this.size);
+
+  final int size;
+
+  @override
+  Stream<Uint8List> bind(Stream<Uint8List> stream) async* {
+    var buffer = BytesBuilder(copy: false);
+
+    await for (var chunk in stream) {
+      buffer.add(chunk);
+
+      if (buffer.length < size) {
+        continue;
+      }
+
+      yield buffer.takeBytes();
+    }
+
+    if (buffer.isNotEmpty) {
+      yield buffer.takeBytes();
+    }
+  }
+}
+
 String trimDoubleQuote(String str) {
   return str.replaceAll(RegExp('^"'), '').replaceAll(RegExp(r'"$'), '');
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 1334210..3f2abbf 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,6 +1,6 @@
 name: minio
 description: Unofficial MinIO Dart Client SDK that provides simple APIs to access any Amazon S3 compatible object storage server.
-version: 3.3.2-pre
+version: 3.3.3-pre
 homepage: https://github.com/xtyxtyx/minio-dart
 issue_tracker: https://github.com/xtyxtyx/minio-dart/issues
 
diff --git a/test/minio_test.dart b/test/minio_test.dart
index 3d0d0c2..9a1df13 100644
--- a/test/minio_test.dart
+++ b/test/minio_test.dart
@@ -4,6 +4,7 @@ import 'dart:typed_data';
 import 'package:minio/io.dart';
 import 'package:minio/minio.dart';
 import 'package:minio/src/minio_models_generated.dart';
+import 'package:minio/src/utils.dart';
 import 'package:test/test.dart';
 
 import 'helpers.dart';
@@ -412,7 +413,21 @@ void testPutObject() {
       expect(stat.size, equals(dataLength));
     });
 
-    test('large file upload works', () async {
+    test('stream upload works', () async {
+      final objectName = uniqueName();
+      final dataLength = 1024 * 1024;
+      final data = Uint8List.fromList(List<int>.generate(dataLength, (i) => i));
+      await minio.putObject(
+        bucketName,
+        objectName,
+        Stream.value(data).transform(MaxChunkSize(123)),
+      );
+      final stat = await minio.statObject(bucketName, objectName);
+      await minio.removeObject(bucketName, objectName);
+      expect(stat.size, equals(dataLength));
+    });
+
+    test('multipart file upload works', () async {
       final objectName = uniqueName();
       final dataLength = 12 * 1024 * 1024;
       final data = Uint8List.fromList(List<int>.generate(dataLength, (i) => i));
diff --git a/test/utils_test.dart b/test/utils_test.dart
index ae4d946..03b4bad 100644
--- a/test/utils_test.dart
+++ b/test/utils_test.dart
@@ -39,7 +39,7 @@ void testRfc7231Time() {
 }
 
 void testBlockStream() {
-  test('BlockStream can split chunks to blocks', () async {
+  test('MaxChunkSize works', () async {
     final streamData = [
       Uint8List.fromList([1, 2]),
       Uint8List.fromList([3, 4, 5, 6]),
@@ -47,7 +47,7 @@ void testBlockStream() {
       Uint8List.fromList([10, 11, 12, 13]),
     ];
 
-    final stream = Stream.fromIterable(streamData).transform(BlockStream(3));
+    final stream = Stream.fromIterable(streamData).transform(MaxChunkSize(3));
 
     expect(
       await stream.toList(),
@@ -61,4 +61,23 @@ void testBlockStream() {
       ]),
     );
   });
+
+  test('MinChunkSize works', () async {
+    final streamData = [
+      Uint8List.fromList([1, 2]),
+      Uint8List.fromList([3, 4, 5, 6]),
+      Uint8List.fromList([7, 8, 9]),
+      Uint8List.fromList([10, 11, 12, 13]),
+    ];
+
+    final stream = Stream.fromIterable(streamData).transform(MinChunkSize(5));
+
+    expect(
+      await stream.toList(),
+      equals([
+        Uint8List.fromList([1, 2, 3, 4, 5, 6]),
+        Uint8List.fromList([7, 8, 9, 10, 11, 12, 13]),
+      ]),
+    );
+  });
 }
-- 
GitLab