import 'dart:async';
import 'dart:convert';

import 'package:convert/convert.dart';
import 'package:crypto/crypto.dart';
import 'package:minio/minio.dart';
import 'package:minio/models.dart';
import 'package:minio/src/utils.dart';

class MinioUploader implements StreamConsumer<List<int>> {
  MinioUploader(
    this.minio,
    this.client,
    this.bucket,
    this.object,
    this.partSize,
    this.metadata,
  );

  final Minio minio;
  final MinioClient client;
  final String bucket;
  final String object;
  final int partSize;
  final Map<String, String> metadata;

  var partNumber = 1;
  String etag;
  List<CompletedPart> parts = [];
  Map<int, Part> oldParts;
  String uploadId;

  @override
  Future addStream(Stream<List<int>> stream) async {
    await for (var chunk in stream) {
      List<int> md5digest;
      final headers = <String, String>{};
      headers.addAll(metadata);
      headers['Content-Length'] = chunk.length.toString();
      if (!client.enableSHA256) {
        md5digest = md5.convert(chunk).bytes;
        headers['Content-MD5'] = base64.encode(md5digest);
      }

      if (this.partNumber == 1 && chunk.length < partSize) {
        this.etag = await upload(chunk, headers, null);
        return;
      }

      if (uploadId == null) {
        await initMultipartUpload();
      }

      final partNumber = this.partNumber++;

      if (oldParts != null) {
        final oldPart = oldParts[partNumber];
        if (oldPart != null) {
          md5digest ??= md5.convert(chunk).bytes;
          if (hex.encode(md5digest) == oldPart.eTag) {
            final part = CompletedPart(oldPart.eTag, partNumber);
            parts.add(part);
            continue;
          }
        }
      }

      final queries = {
        'partNumber': partNumber,
        'uploadId': uploadId,
      };

      final etag = await upload(chunk, headers, queries);
      final part = CompletedPart(etag, partNumber);
      parts.add(part);
    }
  }

  @override
  Future<String> close() async {
    if (uploadId == null) return etag;
    return minio.completeMultipartUpload(bucket, object, uploadId, parts);
  }

  Map<String, String> getHeaders(List<int> chunk) {
    final headers = <String, String>{};
    headers.addAll(metadata);
    headers['Content-Length'] = chunk.length.toString();
    if (!client.enableSHA256) {
      final md5digest = md5.convert(chunk).bytes;
      headers['Content-MD5'] = base64.encode(md5digest);
    }
    return headers;
  }

  Future<String> upload(
    List<int> chunk,
    Map<String, String> headers,
    Map<String, String> queries,
  ) async {
    final resp = await client.request(
      method: 'PUT',
      headers: headers,
      queries: queries,
      bucket: bucket,
      object: object,
      payload: chunk,
    );

    validate(resp);

    var etag = resp.headers['etag'];
    if (etag != null) {
      etag = trimDoubleQuote(etag);
    }

    return etag;
  }

  Future<void> initMultipartUpload() async {
    uploadId = await minio.findUploadID(bucket, object);

    if (uploadId == null) {
      await minio.initiateNewMultipartUpload(bucket, object, metadata);
      return;
    }

    final parts = await minio.listParts(bucket, object, uploadId);
    final entries = await parts
        .asyncMap((part) => MapEntry(part.partNumber, part))
        .toList();
    oldParts = Map.fromEntries(entries);
  }
}