From ad0d503d55f3d3aba68faa513aea020687e5f795 Mon Sep 17 00:00:00 2001
From: xuty <xty50337@hotmail.com>
Date: Wed, 1 Apr 2020 19:51:05 +0800
Subject: [PATCH] finish notification apis

---
 CHANGELOG.md                        |  4 ++
 README.md                           | 12 +++--
 example/minio_example.dart          |  9 ++++
 lib/src/minio.dart                  | 70 ++++++++++++++++++++++++++---
 lib/src/minio_client.dart           | 21 +++++----
 lib/src/minio_models_generated.dart | 17 +++++--
 lib/src/minio_poller.dart           | 66 +++++++++++++++++++++++++++
 lib/src/utils.dart                  | 12 ++++-
 pubspec.yaml                        |  2 +-
 9 files changed, 188 insertions(+), 25 deletions(-)
 create mode 100644 lib/src/minio_poller.dart

diff --git a/CHANGELOG.md b/CHANGELOG.md
index f1155cf..7252306 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.1.5
+
+- support notification apis
+
 ## 0.1.4
 
 - support presignedPostPolicy
diff --git a/README.md b/README.md
index 9454ad9..639551e 100644
--- a/README.md
+++ b/README.md
@@ -4,10 +4,10 @@ This is the _unofficial_ MinIO Dart Client SDK that provides simple APIs to acce
 
 | Bucket operations       | Object operations        | Presigned operations | Bucket Policy & Notification operations |
 |-------------------------|--------------------------|----------------------|-----------------------------------------|
-| [makeBucket]            | [getObject]              | [presignedUrl]       | getBucketNotification                   |
-| [listBuckets]           | [getPartialObject]       | [presignedGetObject] | setBucketNotification                   |
-| [bucketExists]          | [fGetObject]             | [presignedPutObject] | removeAllBucketNotification             |
-| [removeBucket]          | [putObject]              | [presignedPostPolicy]| listenBucketNotification                |
+| [makeBucket]            | [getObject]              | [presignedUrl]       | [getBucketNotification]                 |
+| [listBuckets]           | [getPartialObject]       | [presignedGetObject] | [setBucketNotification]                 |
+| [bucketExists]          | [fGetObject]             | [presignedPutObject] | [removeAllBucketNotification]           |
+| [removeBucket]          | [putObject]              | [presignedPostPolicy]| [listenBucketNotification]              |
 | [listObjects]           | [fPutObject]             |                      | getBucketPolicy                         |
 | [listObjectsV2]         | [copyObject]             |                      | setBucketPolicy                         |
 | [listIncompleteUploads] | [statObject]             |                      |                                         |
@@ -97,3 +97,7 @@ MIT
 [presignedPutObject]: https://pub.dev/documentation/minio/latest/minio/Minio/presignedPutObject.html
 [presignedPostPolicy]: https://pub.dev/documentation/minio/latest/minio/Minio/presignedPostPolicy.html
 
+[getBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/getBucketNotification.html
+[setBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/setBucketNotification.html
+[removeAllBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/removeAllBucketNotification.html
+[listenBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/listenBucketNotification.html
diff --git a/example/minio_example.dart b/example/minio_example.dart
index f5e866e..055e7d8 100644
--- a/example/minio_example.dart
+++ b/example/minio_example.dart
@@ -22,6 +22,13 @@ void main() async {
     print('bucket $bucket already exists');
   }
 
+  final poller = minio.listenBucketNotification(bucket, events: [
+    's3:ObjectCreated:*',
+  ]);
+  poller.stream.listen((event) {
+    print('--- event: ${event['eventName']}');
+  });
+
   final region = await minio.getBucketRegion('00test');
   print('--- object region:');
   print(region);
@@ -69,4 +76,6 @@ void main() async {
 
   await minio.removeBucket(bucket);
   print('--- bucket removed');
+
+  poller.stop();
 }
diff --git a/lib/src/minio.dart b/lib/src/minio.dart
index 0772893..de21301 100644
--- a/lib/src/minio.dart
+++ b/lib/src/minio.dart
@@ -3,6 +3,7 @@ import 'package:minio/models.dart';
 import 'package:minio/src/minio_client.dart';
 import 'package:minio/src/minio_errors.dart';
 import 'package:minio/src/minio_helpers.dart';
+import 'package:minio/src/minio_poller.dart';
 import 'package:minio/src/minio_sign.dart';
 import 'package:minio/src/minio_uploader.dart';
 import 'package:minio/src/utils.dart';
@@ -211,6 +212,23 @@ class Minio {
     return latestUpload?.uploadId;
   }
 
+  /// Return the list of notification configurations stored
+  /// in the S3 provider
+  Future<NotificationConfiguration> getBucketNotification(String bucket) async {
+    MinioInvalidBucketNameError.check(bucket);
+
+    final resp = await _client.request(
+      method: 'GET',
+      bucket: bucket,
+      resource: 'notification',
+    );
+
+    validate(resp, expect: 200);
+
+    final node = xml.parse(resp.body);
+    return NotificationConfiguration.fromXml(node.rootElement);
+  }
+
   /// gets the region of the bucket
   Future<String> getBucketRegion(String bucket) async {
     MinioInvalidBucketNameError.check(bucket);
@@ -378,6 +396,25 @@ class Minio {
     return ListMultipartUploadsOutput.fromXml(node.root);
   }
 
+  /// Listen for notifications on a bucket. Additionally one can provider
+  /// filters for prefix, suffix and events. There is no prior set bucket notification
+  /// needed to use this API. **This is an MinIO extension API** where unique identifiers
+  /// are regitered and unregistered by the server automatically based on incoming requests.
+  NotificationPoller listenBucketNotification(
+    String bucket, {
+    String prefix,
+    String suffix,
+    List<String> events,
+  }) {
+    MinioInvalidBucketNameError.check(bucket);
+
+    final listener =
+        NotificationPoller(_client, bucket, prefix, suffix, events);
+    listener.start();
+
+    return listener;
+  }
+
   /// List of buckets created.
   Future<List<Bucket>> listBuckets() async {
     final resp = await _client.request(
@@ -776,6 +813,12 @@ class Minio {
     return etag.toString();
   }
 
+  /// Remove all bucket notification
+  Future<void> removeAllBucketNotification(bucket) {
+    return setBucketNotification(
+        bucket, NotificationConfiguration(null, null, null));
+  }
+
   /// Remove a bucket.
   Future<void> removeBucket(String bucket) async {
     MinioInvalidBucketNameError.check(bucket);
@@ -836,14 +879,31 @@ class Minio {
       final headers = {'Content-MD5': md5Base64(payload)};
 
       await _client.request(
-          method: 'POST',
-          bucket: bucket,
-          resource: 'delete',
-          headers: headers,
-          payload: payload);
+        method: 'POST',
+        bucket: bucket,
+        resource: 'delete',
+        headers: headers,
+        payload: payload,
+      );
     }
   }
 
+  // Remove all the notification configurations in the S3 provider
+  Future<void> setBucketNotification(
+      String bucket, NotificationConfiguration config) async {
+    MinioInvalidBucketNameError.check(bucket);
+    assert(config != null);
+
+    final resp = await _client.request(
+      method: 'PUT',
+      bucket: bucket,
+      resource: 'notification',
+      payload: config.toXml().toString(),
+    );
+
+    validate(resp, expect: 200);
+  }
+
   /// Stat information of the object.
   Future<StatObjectResult> statObject(String bucket, String object) async {
     MinioInvalidBucketNameError.check(bucket);
diff --git a/lib/src/minio_client.dart b/lib/src/minio_client.dart
index 2aebd12..d153b3e 100644
--- a/lib/src/minio_client.dart
+++ b/lib/src/minio_client.dart
@@ -16,9 +16,12 @@ class MinioRequest extends BaseRequest {
   ByteStream finalize() {
     super.finalize();
     if (body is String) {
+      final data = utf8.encode(body);
+      headers['content-length'] = data.length.toString();
       return ByteStream.fromBytes(utf8.encode(body));
     }
     if (body is List<int>) {
+      headers['content-length'] = body.length.toString();
       return ByteStream.fromBytes(body);
     }
     if (body is Stream<List<int>>) {
@@ -48,7 +51,7 @@ class MinioClient {
   }
 
   final Minio minio;
-  final String userAgent = 'MinIO (Unknown; Unknown) minio-js/0.0.1';
+  final String userAgent = 'MinIO (Unknown; Unknown) minio-dart/0.1.5';
 
   bool enableSHA256;
   bool anonymous;
@@ -61,7 +64,7 @@ class MinioClient {
     String region,
     String resource,
     dynamic payload = '',
-    Map<String, String> queries,
+    Map<String, dynamic> queries,
     Map<String, String> headers,
   }) async {
     region ??= await minio.getBucketRegion(bucket);
@@ -93,10 +96,10 @@ class MinioClient {
     String region,
     String resource,
     dynamic payload = '',
-    Map<String, String> queries,
+    Map<String, dynamic> queries,
     Map<String, String> headers,
   }) async {
-    final stream = _request(
+    final stream = await _request(
       method: method,
       bucket: bucket,
       object: object,
@@ -107,7 +110,7 @@ class MinioClient {
       headers: headers,
     );
 
-    final response = await Response.fromStream(await stream);
+    final response = await Response.fromStream(stream);
     logResponse(response);
 
     return response;
@@ -120,7 +123,7 @@ class MinioClient {
     String region,
     String resource,
     dynamic payload = '',
-    Map<String, String> queries,
+    Map<String, dynamic> queries,
     Map<String, String> headers,
   }) async {
     final response = await _request(
@@ -144,12 +147,12 @@ class MinioClient {
     String object,
     String region,
     String resource,
-    Map<String, String> queries,
+    Map<String, dynamic> queries,
     Map<String, String> headers,
   ) {
     final url = getRequestUrl(bucket, object, resource, queries);
     final request = MinioRequest(method, url);
-    request.headers['host'] = url.host;
+    request.headers['host'] = url.authority;
 
     if (headers != null) {
       request.headers.addAll(headers);
@@ -162,7 +165,7 @@ class MinioClient {
     String bucket,
     String object,
     String resource,
-    Map<String, String> queries,
+    Map<String, dynamic> queries,
   ) {
     var host = minio.endPoint.toLowerCase();
     var path = '/';
diff --git a/lib/src/minio_models_generated.dart b/lib/src/minio_models_generated.dart
index f23949c..c042b50 100644
--- a/lib/src/minio_models_generated.dart
+++ b/lib/src/minio_models_generated.dart
@@ -1,6 +1,7 @@
 import 'package:xml/xml.dart';
 
 XmlElement getProp(XmlElement xml, String name) {
+  if (xml == null) return null;
   final result = xml.findElements(name);
   return result.isNotEmpty ? result.first : null;
 }
@@ -2132,10 +2133,18 @@ class NotificationConfiguration {
   XmlNode toXml() {
     final builder = XmlBuilder();
     builder.element('NotificationConfiguration', nest: () {
-      builder.element('LambdaFunctionConfigurations',
-          nest: lambdaFunctionConfigurations.toXml());
-      builder.element('QueueConfigurations', nest: queueConfigurations.toXml());
-      builder.element('TopicConfigurations', nest: topicConfigurations.toXml());
+      if (lambdaFunctionConfigurations != null) {
+        builder.element('LambdaFunctionConfigurations',
+            nest: lambdaFunctionConfigurations.toXml());
+      }
+      if (queueConfigurations != null) {
+        builder.element('QueueConfigurations',
+            nest: queueConfigurations.toXml());
+      }
+      if (topicConfigurations != null) {
+        builder.element('TopicConfigurations',
+            nest: topicConfigurations.toXml());
+      }
     });
     return builder.build();
   }
diff --git a/lib/src/minio_poller.dart b/lib/src/minio_poller.dart
new file mode 100644
index 0000000..381967a
--- /dev/null
+++ b/lib/src/minio_poller.dart
@@ -0,0 +1,66 @@
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:minio/src/minio_client.dart';
+
+class NotificationPoller {
+  NotificationPoller(
+    this._client,
+    this.bucket,
+    this.prefix,
+    this.suffix,
+    this.events,
+  );
+
+  final MinioClient _client;
+  final String bucket;
+  final String prefix;
+  final String suffix;
+  final List<String> events;
+
+  final _eventStream = StreamController<Map<String, dynamic>>.broadcast();
+  Stream<Map<String, dynamic>> get stream => _eventStream.stream;
+
+  bool _stop = true;
+
+  /// Starts the polling.
+  void start() async {
+    _stop = false;
+    while (!_stop) {
+      await _checkForChanges();
+    }
+  }
+
+  /// Stops the polling.
+  void stop() {
+    _stop = true;
+  }
+
+  Future<void> _checkForChanges() async {
+    // Don't continue if we're looping again but are cancelled.
+    if (_stop) return;
+
+    final queries = {
+      if (prefix != null) 'prefix': prefix,
+      if (suffix != null) 'suffix': suffix,
+      if (events != null) 'events': events,
+    };
+
+    final respStream = await _client.requestStream(
+      method: 'GET',
+      bucket: bucket,
+      queries: queries,
+    );
+
+    await for (var resp in respStream.stream) {
+      if (_stop) break;
+
+      final chunk = utf8.decode(resp);
+      if (chunk.trim().isEmpty) continue;
+
+      final data = json.decode(chunk);
+      final records = List<Map<String, dynamic>>.from(data['Records']);
+      await _eventStream.addStream(Stream.fromIterable(records));
+    }
+  }
+}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index f52f794..6402605 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -48,11 +48,19 @@ String encodeQuery(String rawKey, String rawValue) {
   return pair.join('=');
 }
 
-String encodeQueries(Map<String, String> queries) {
+String encodeQueries(Map<String, dynamic> queries) {
   final pairs = <String>[];
   for (var key in queries.keys) {
     final value = queries[key];
-    pairs.add(encodeQuery(key, value));
+    if (value is String || value == null) {
+      pairs.add(encodeQuery(key, value));
+    } else if (value is Iterable<String>) {
+      for (var val in value) {
+        pairs.add(encodeQuery(key, val));
+      }
+    } else {
+      throw ArgumentError('unsupported value: $value');
+    }
   }
   return pairs.join('&');
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 1d2accc..b4dd52c 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,6 +1,6 @@
 name: minio
 description: Unofficial MinIO Dart Client SDK that provides simple APIs to access any Amazon S3 compatible object storage server.
-version: 0.1.4
+version: 0.1.5
 homepage: https://github.com/xtyxtyx/minio-dart
 issue_tracker: https://github.com/xtyxtyx/minio-dart/issues
 
-- 
GitLab