diff --git a/CHANGELOG.md b/CHANGELOG.md index f1155cf743510f0e7e276497f5150d58fb5c8735..7252306c59e88b2aa96f3fd302f2307e2d4cba73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.1.5 + +- support notification apis + ## 0.1.4 - support presignedPostPolicy diff --git a/README.md b/README.md index 9454ad94774ef4902b8cdd4390e19d6f0a08c02e..639551ed3d27b11a161d715828f27dc9835c8e90 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,10 @@ This is the _unofficial_ MinIO Dart Client SDK that provides simple APIs to acce | Bucket operations | Object operations | Presigned operations | Bucket Policy & Notification operations | |-------------------------|--------------------------|----------------------|-----------------------------------------| -| [makeBucket] | [getObject] | [presignedUrl] | getBucketNotification | -| [listBuckets] | [getPartialObject] | [presignedGetObject] | setBucketNotification | -| [bucketExists] | [fGetObject] | [presignedPutObject] | removeAllBucketNotification | -| [removeBucket] | [putObject] | [presignedPostPolicy]| listenBucketNotification | +| [makeBucket] | [getObject] | [presignedUrl] | [getBucketNotification] | +| [listBuckets] | [getPartialObject] | [presignedGetObject] | [setBucketNotification] | +| [bucketExists] | [fGetObject] | [presignedPutObject] | [removeAllBucketNotification] | +| [removeBucket] | [putObject] | [presignedPostPolicy]| [listenBucketNotification] | | [listObjects] | [fPutObject] | | getBucketPolicy | | [listObjectsV2] | [copyObject] | | setBucketPolicy | | [listIncompleteUploads] | [statObject] | | | @@ -97,3 +97,7 @@ MIT [presignedPutObject]: https://pub.dev/documentation/minio/latest/minio/Minio/presignedPutObject.html [presignedPostPolicy]: https://pub.dev/documentation/minio/latest/minio/Minio/presignedPostPolicy.html +[getBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/getBucketNotification.html +[setBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/setBucketNotification.html +[removeAllBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/removeAllBucketNotification.html +[listenBucketNotification]: https://pub.dev/documentation/minio/latest/minio/Minio/listenBucketNotification.html diff --git a/example/minio_example.dart b/example/minio_example.dart index f5e866ee722dcf20580b486671aa7ad852b16f6c..055e7d82fc68e1e658e347a17a25fe2cc429a7e9 100644 --- a/example/minio_example.dart +++ b/example/minio_example.dart @@ -22,6 +22,13 @@ void main() async { print('bucket $bucket already exists'); } + final poller = minio.listenBucketNotification(bucket, events: [ + 's3:ObjectCreated:*', + ]); + poller.stream.listen((event) { + print('--- event: ${event['eventName']}'); + }); + final region = await minio.getBucketRegion('00test'); print('--- object region:'); print(region); @@ -69,4 +76,6 @@ void main() async { await minio.removeBucket(bucket); print('--- bucket removed'); + + poller.stop(); } diff --git a/lib/src/minio.dart b/lib/src/minio.dart index 0772893058046c6261f725ef28a49634859cd742..de213016de111030ef511c8ecc0c717bde8d3e4b 100644 --- a/lib/src/minio.dart +++ b/lib/src/minio.dart @@ -3,6 +3,7 @@ import 'package:minio/models.dart'; import 'package:minio/src/minio_client.dart'; import 'package:minio/src/minio_errors.dart'; import 'package:minio/src/minio_helpers.dart'; +import 'package:minio/src/minio_poller.dart'; import 'package:minio/src/minio_sign.dart'; import 'package:minio/src/minio_uploader.dart'; import 'package:minio/src/utils.dart'; @@ -211,6 +212,23 @@ class Minio { return latestUpload?.uploadId; } + /// Return the list of notification configurations stored + /// in the S3 provider + Future<NotificationConfiguration> getBucketNotification(String bucket) async { + MinioInvalidBucketNameError.check(bucket); + + final resp = await _client.request( + method: 'GET', + bucket: bucket, + resource: 'notification', + ); + + validate(resp, expect: 200); + + final node = xml.parse(resp.body); + return NotificationConfiguration.fromXml(node.rootElement); + } + /// gets the region of the bucket Future<String> getBucketRegion(String bucket) async { MinioInvalidBucketNameError.check(bucket); @@ -378,6 +396,25 @@ class Minio { return ListMultipartUploadsOutput.fromXml(node.root); } + /// Listen for notifications on a bucket. Additionally one can provider + /// filters for prefix, suffix and events. There is no prior set bucket notification + /// needed to use this API. **This is an MinIO extension API** where unique identifiers + /// are regitered and unregistered by the server automatically based on incoming requests. + NotificationPoller listenBucketNotification( + String bucket, { + String prefix, + String suffix, + List<String> events, + }) { + MinioInvalidBucketNameError.check(bucket); + + final listener = + NotificationPoller(_client, bucket, prefix, suffix, events); + listener.start(); + + return listener; + } + /// List of buckets created. Future<List<Bucket>> listBuckets() async { final resp = await _client.request( @@ -776,6 +813,12 @@ class Minio { return etag.toString(); } + /// Remove all bucket notification + Future<void> removeAllBucketNotification(bucket) { + return setBucketNotification( + bucket, NotificationConfiguration(null, null, null)); + } + /// Remove a bucket. Future<void> removeBucket(String bucket) async { MinioInvalidBucketNameError.check(bucket); @@ -836,14 +879,31 @@ class Minio { final headers = {'Content-MD5': md5Base64(payload)}; await _client.request( - method: 'POST', - bucket: bucket, - resource: 'delete', - headers: headers, - payload: payload); + method: 'POST', + bucket: bucket, + resource: 'delete', + headers: headers, + payload: payload, + ); } } + // Remove all the notification configurations in the S3 provider + Future<void> setBucketNotification( + String bucket, NotificationConfiguration config) async { + MinioInvalidBucketNameError.check(bucket); + assert(config != null); + + final resp = await _client.request( + method: 'PUT', + bucket: bucket, + resource: 'notification', + payload: config.toXml().toString(), + ); + + validate(resp, expect: 200); + } + /// Stat information of the object. Future<StatObjectResult> statObject(String bucket, String object) async { MinioInvalidBucketNameError.check(bucket); diff --git a/lib/src/minio_client.dart b/lib/src/minio_client.dart index 2aebd12a8fdb04ef1f7fd1c5e6771f83434c3544..d153b3e344dbcdc6913635e8255663b39c70c799 100644 --- a/lib/src/minio_client.dart +++ b/lib/src/minio_client.dart @@ -16,9 +16,12 @@ class MinioRequest extends BaseRequest { ByteStream finalize() { super.finalize(); if (body is String) { + final data = utf8.encode(body); + headers['content-length'] = data.length.toString(); return ByteStream.fromBytes(utf8.encode(body)); } if (body is List<int>) { + headers['content-length'] = body.length.toString(); return ByteStream.fromBytes(body); } if (body is Stream<List<int>>) { @@ -48,7 +51,7 @@ class MinioClient { } final Minio minio; - final String userAgent = 'MinIO (Unknown; Unknown) minio-js/0.0.1'; + final String userAgent = 'MinIO (Unknown; Unknown) minio-dart/0.1.5'; bool enableSHA256; bool anonymous; @@ -61,7 +64,7 @@ class MinioClient { String region, String resource, dynamic payload = '', - Map<String, String> queries, + Map<String, dynamic> queries, Map<String, String> headers, }) async { region ??= await minio.getBucketRegion(bucket); @@ -93,10 +96,10 @@ class MinioClient { String region, String resource, dynamic payload = '', - Map<String, String> queries, + Map<String, dynamic> queries, Map<String, String> headers, }) async { - final stream = _request( + final stream = await _request( method: method, bucket: bucket, object: object, @@ -107,7 +110,7 @@ class MinioClient { headers: headers, ); - final response = await Response.fromStream(await stream); + final response = await Response.fromStream(stream); logResponse(response); return response; @@ -120,7 +123,7 @@ class MinioClient { String region, String resource, dynamic payload = '', - Map<String, String> queries, + Map<String, dynamic> queries, Map<String, String> headers, }) async { final response = await _request( @@ -144,12 +147,12 @@ class MinioClient { String object, String region, String resource, - Map<String, String> queries, + Map<String, dynamic> queries, Map<String, String> headers, ) { final url = getRequestUrl(bucket, object, resource, queries); final request = MinioRequest(method, url); - request.headers['host'] = url.host; + request.headers['host'] = url.authority; if (headers != null) { request.headers.addAll(headers); @@ -162,7 +165,7 @@ class MinioClient { String bucket, String object, String resource, - Map<String, String> queries, + Map<String, dynamic> queries, ) { var host = minio.endPoint.toLowerCase(); var path = '/'; diff --git a/lib/src/minio_models_generated.dart b/lib/src/minio_models_generated.dart index f23949cc0e548bcc7421530a709ebd96867662aa..c042b50c2cc4aea10902e3ef0bb56c3ef31db5f7 100644 --- a/lib/src/minio_models_generated.dart +++ b/lib/src/minio_models_generated.dart @@ -1,6 +1,7 @@ import 'package:xml/xml.dart'; XmlElement getProp(XmlElement xml, String name) { + if (xml == null) return null; final result = xml.findElements(name); return result.isNotEmpty ? result.first : null; } @@ -2132,10 +2133,18 @@ class NotificationConfiguration { XmlNode toXml() { final builder = XmlBuilder(); builder.element('NotificationConfiguration', nest: () { - builder.element('LambdaFunctionConfigurations', - nest: lambdaFunctionConfigurations.toXml()); - builder.element('QueueConfigurations', nest: queueConfigurations.toXml()); - builder.element('TopicConfigurations', nest: topicConfigurations.toXml()); + if (lambdaFunctionConfigurations != null) { + builder.element('LambdaFunctionConfigurations', + nest: lambdaFunctionConfigurations.toXml()); + } + if (queueConfigurations != null) { + builder.element('QueueConfigurations', + nest: queueConfigurations.toXml()); + } + if (topicConfigurations != null) { + builder.element('TopicConfigurations', + nest: topicConfigurations.toXml()); + } }); return builder.build(); } diff --git a/lib/src/minio_poller.dart b/lib/src/minio_poller.dart new file mode 100644 index 0000000000000000000000000000000000000000..381967aac71b068441b5485c000a91e11490cd95 --- /dev/null +++ b/lib/src/minio_poller.dart @@ -0,0 +1,66 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:minio/src/minio_client.dart'; + +class NotificationPoller { + NotificationPoller( + this._client, + this.bucket, + this.prefix, + this.suffix, + this.events, + ); + + final MinioClient _client; + final String bucket; + final String prefix; + final String suffix; + final List<String> events; + + final _eventStream = StreamController<Map<String, dynamic>>.broadcast(); + Stream<Map<String, dynamic>> get stream => _eventStream.stream; + + bool _stop = true; + + /// Starts the polling. + void start() async { + _stop = false; + while (!_stop) { + await _checkForChanges(); + } + } + + /// Stops the polling. + void stop() { + _stop = true; + } + + Future<void> _checkForChanges() async { + // Don't continue if we're looping again but are cancelled. + if (_stop) return; + + final queries = { + if (prefix != null) 'prefix': prefix, + if (suffix != null) 'suffix': suffix, + if (events != null) 'events': events, + }; + + final respStream = await _client.requestStream( + method: 'GET', + bucket: bucket, + queries: queries, + ); + + await for (var resp in respStream.stream) { + if (_stop) break; + + final chunk = utf8.decode(resp); + if (chunk.trim().isEmpty) continue; + + final data = json.decode(chunk); + final records = List<Map<String, dynamic>>.from(data['Records']); + await _eventStream.addStream(Stream.fromIterable(records)); + } + } +} diff --git a/lib/src/utils.dart b/lib/src/utils.dart index f52f794e179a4ed24812c8db44f30628e93ac55e..640260523f82ea0cfa7328dd0a23fec17ee9635e 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -48,11 +48,19 @@ String encodeQuery(String rawKey, String rawValue) { return pair.join('='); } -String encodeQueries(Map<String, String> queries) { +String encodeQueries(Map<String, dynamic> queries) { final pairs = <String>[]; for (var key in queries.keys) { final value = queries[key]; - pairs.add(encodeQuery(key, value)); + if (value is String || value == null) { + pairs.add(encodeQuery(key, value)); + } else if (value is Iterable<String>) { + for (var val in value) { + pairs.add(encodeQuery(key, val)); + } + } else { + throw ArgumentError('unsupported value: $value'); + } } return pairs.join('&'); } diff --git a/pubspec.yaml b/pubspec.yaml index 1d2accc84f39945a3e789b4d11159c2479e93afa..b4dd52c0eef2b79dc7af6364612989a3797a4aec 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: minio description: Unofficial MinIO Dart Client SDK that provides simple APIs to access any Amazon S3 compatible object storage server. -version: 0.1.4 +version: 0.1.5 homepage: https://github.com/xtyxtyx/minio-dart issue_tracker: https://github.com/xtyxtyx/minio-dart/issues