// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of gcloud.storage;

const String _ABSOLUTE_PREFIX = 'gs://';
const String _DIRECTORY_DELIMITER = 'gs://';

/// Representation of an absolute name consisting of bucket name and object
/// name.
class _AbsoluteName {
  String bucketName;
  String objectName;

  _AbsoluteName.parse(String absoluteName) {
    if (!absoluteName.startsWith(_ABSOLUTE_PREFIX)) {
      throw FormatException("Absolute name '$absoluteName' does not start "
          "with '$_ABSOLUTE_PREFIX'");
    }
    int index = absoluteName.indexOf('/', _ABSOLUTE_PREFIX.length);
    if (index == -1 || index == _ABSOLUTE_PREFIX.length) {
      throw FormatException("Absolute name '$absoluteName' does not have "
          "a bucket name");
    }
    if (index == absoluteName.length - 1) {
      throw FormatException("Absolute name '$absoluteName' does not have "
          "an object name");
    }
    bucketName = absoluteName.substring(_ABSOLUTE_PREFIX.length, index);
    objectName = absoluteName.substring(index + 1);
  }
}

/// Storage API implementation providing access to buckets.
class _StorageImpl implements Storage {
  final String project;
  final storage_api.StorageApi _api;

  _StorageImpl(http.Client client, this.project)
      : _api = storage_api.StorageApi(client);

  Future createBucket(String bucketName,
      {PredefinedAcl predefinedAcl, Acl acl}) {
    var bucket = storage_api.Bucket()..name = bucketName;
    var predefinedName = predefinedAcl != null ? predefinedAcl._name : null;
    if (acl != null) {
      bucket.acl = acl._toBucketAccessControlList();
    }
    return _api.buckets
        .insert(bucket, project, predefinedAcl: predefinedName)
        .then((bucket) => null);
  }

  Future deleteBucket(String bucketName) {
    return _api.buckets.delete(bucketName);
  }

  Bucket bucket(String bucketName,
      {PredefinedAcl defaultPredefinedObjectAcl, Acl defaultObjectAcl}) {
    return _BucketImpl(
        this, bucketName, defaultPredefinedObjectAcl, defaultObjectAcl);
  }

  Future<bool> bucketExists(String bucketName) {
    notFoundError(e) {
      return e is storage_api.DetailedApiRequestError && e.status == 404;
    }

    return _api.buckets
        .get(bucketName)
        .then((_) => true)
        .catchError((e) => false, test: notFoundError);
  }

  Future<BucketInfo> bucketInfo(String bucketName) {
    return _api.buckets
        .get(bucketName, projection: 'full')
        .then((bucket) => _BucketInfoImpl(bucket));
  }

  Stream<String> listBucketNames() {
    Future<_BucketPageImpl> firstPage(int pageSize) {
      return _listBuckets(pageSize, null)
          .then((response) => _BucketPageImpl(this, pageSize, response));
    }

    return StreamFromPages<String>(firstPage).stream;
  }

  Future<Page<String>> pageBucketNames({int pageSize = 50}) {
    return _listBuckets(pageSize, null).then((response) {
      return _BucketPageImpl(this, pageSize, response);
    });
  }

  Future copyObject(String src, String dest) {
    var srcName = _AbsoluteName.parse(src);
    var destName = _AbsoluteName.parse(dest);
    return _api.objects
        .copy(null, srcName.bucketName, srcName.objectName, destName.bucketName,
            destName.objectName)
        .then((_) => null);
  }

  Future<storage_api.Buckets> _listBuckets(int pageSize, String nextPageToken) {
    return _api.buckets
        .list(project, maxResults: pageSize, pageToken: nextPageToken);
  }
}

class _BucketInfoImpl implements BucketInfo {
  final storage_api.Bucket _bucket;

  _BucketInfoImpl(this._bucket);

  String get bucketName => _bucket.name;

  String get etag => _bucket.etag;

  DateTime get created => _bucket.timeCreated;

  String get id => _bucket.id;

  Acl get acl => Acl._fromBucketAcl(_bucket);
}

/// Bucket API implementation providing access to objects.
class _BucketImpl implements Bucket {
  final storage_api.StorageApi _api;
  PredefinedAcl _defaultPredefinedObjectAcl;
  Acl _defaultObjectAcl;
  final String bucketName;

  _BucketImpl(_StorageImpl storage, this.bucketName,
      this._defaultPredefinedObjectAcl, this._defaultObjectAcl)
      : this._api = storage._api;

  String absoluteObjectName(String objectName) {
    return '$_ABSOLUTE_PREFIX$bucketName/$objectName';
  }

  StreamSink<List<int>> write(String objectName,
      {int length,
      ObjectMetadata metadata,
      Acl acl,
      PredefinedAcl predefinedAcl,
      String contentType}) {
    storage_api.Object object;
    if (metadata == null) {
      metadata = _ObjectMetadata(acl: acl, contentType: contentType);
    } else {
      if (acl != null) {
        metadata = metadata.replace(acl: acl);
      }
      if (contentType != null) {
        metadata = metadata.replace(contentType: contentType);
      }
    }
    _ObjectMetadata objectMetadata = metadata as _ObjectMetadata;
    object = objectMetadata._object;

    // If no predefined ACL is passed use the default (if any).
    String predefinedName;
    if (predefinedAcl != null || _defaultPredefinedObjectAcl != null) {
      var predefined =
          predefinedAcl != null ? predefinedAcl : _defaultPredefinedObjectAcl;
      predefinedName = predefined._name;
    }

    // If no ACL is passed use the default (if any).
    if (object.acl == null && _defaultObjectAcl != null) {
      object.acl = _defaultObjectAcl._toObjectAccessControlList();
    }

    // Fill properties not passed in metadata.
    object.name = objectName;

    var sink = _MediaUploadStreamSink(
        _api, bucketName, objectName, object, predefinedName, length);
    return sink;
  }

  Future<ObjectInfo> writeBytes(String objectName, List<int> bytes,
      {ObjectMetadata metadata,
      Acl acl,
      PredefinedAcl predefinedAcl,
      String contentType}) {
    _MediaUploadStreamSink sink = write(objectName,
        length: bytes.length,
        metadata: metadata,
        acl: acl,
        predefinedAcl: predefinedAcl,
        contentType: contentType) as _MediaUploadStreamSink;
    sink.add(bytes);
    return sink.close();
  }

  Stream<List<int>> read(String objectName, {int offset, int length}) async* {
    if (offset == null) {
      offset = 0;
    }

    if (offset != 0 && length == null) {
      throw ArgumentError('length must have a value if offset is non-zero.');
    }

    var options = storage_api.DownloadOptions.FullMedia;

    if (length != null) {
      if (length <= 0) {
        throw ArgumentError.value(
            length, 'length', 'If provided, length must greater than zero.');
      }
      // For ByteRange, end is *inclusive*.
      var end = offset + length - 1;
      var range = storage_api.ByteRange(offset, end);
      assert(range.length == length);
      options = storage_api.PartialDownloadOptions(range);
    }

    commons.Media media = (await _api.objects.get(bucketName, objectName,
        downloadOptions: options)) as commons.Media;

    yield* media.stream;
  }

  Future<ObjectInfo> info(String objectName) {
    return _api.objects
        .get(bucketName, objectName, projection: 'full')
        .then((object) => _ObjectInfoImpl(object as storage_api.Object));
  }

  Future delete(String objectName) {
    return _api.objects.delete(bucketName, objectName);
  }

  Stream<BucketEntry> list({String prefix}) {
    Future<_ObjectPageImpl> firstPage(int pageSize) {
      return _listObjects(bucketName, prefix, _DIRECTORY_DELIMITER, 50, null)
          .then(
              (response) => _ObjectPageImpl(this, prefix, pageSize, response));
    }

    return StreamFromPages<BucketEntry>(firstPage).stream;
  }

  Future<Page<BucketEntry>> page({String prefix, int pageSize = 50}) {
    return _listObjects(
            bucketName, prefix, _DIRECTORY_DELIMITER, pageSize, null)
        .then((response) {
      return _ObjectPageImpl(this, prefix, pageSize, response);
    });
  }

  Future updateMetadata(String objectName, ObjectMetadata metadata) {
    // TODO: support other ObjectMetadata implementations?
    _ObjectMetadata md = metadata as _ObjectMetadata;
    var object = md._object;
    if (md._object.acl == null && _defaultObjectAcl == null) {
      throw ArgumentError('ACL is required for update');
    }
    if (md.contentType == null) {
      throw ArgumentError('Content-Type is required for update');
    }
    if (md._object.acl == null) {
      md._object.acl = _defaultObjectAcl._toObjectAccessControlList();
    }
    return _api.objects.update(object, bucketName, objectName);
  }

  Future<storage_api.Objects> _listObjects(String bucketName, String prefix,
      String delimiter, int pageSize, String nextPageToken) {
    return _api.objects.list(bucketName,
        prefix: prefix,
        delimiter: delimiter,
        maxResults: pageSize,
        pageToken: nextPageToken);
  }
}

class _BucketPageImpl implements Page<String> {
  final _StorageImpl _storage;
  final int _pageSize;
  final String _nextPageToken;
  final List<String> items;

  _BucketPageImpl(this._storage, this._pageSize, storage_api.Buckets response)
      : items = List(response.items != null ? response.items.length : 0),
        _nextPageToken = response.nextPageToken {
    for (int i = 0; i < items.length; i++) {
      items[i] = response.items[i].name;
    }
  }

  bool get isLast => _nextPageToken == null;

  Future<Page<String>> next({int pageSize}) {
    if (isLast) return Future.value(null);
    if (pageSize == null) pageSize = this._pageSize;

    return _storage._listBuckets(pageSize, _nextPageToken).then((response) {
      return _BucketPageImpl(_storage, pageSize, response);
    });
  }
}

class _ObjectPageImpl implements Page<BucketEntry> {
  final _BucketImpl _bucket;
  final String _prefix;
  final int _pageSize;
  final String _nextPageToken;
  final List<BucketEntry> items;

  _ObjectPageImpl(
      this._bucket, this._prefix, this._pageSize, storage_api.Objects response)
      : items = List((response.items != null ? response.items.length : 0) +
            (response.prefixes != null ? response.prefixes.length : 0)),
        _nextPageToken = response.nextPageToken {
    var prefixes = 0;
    if (response.prefixes != null) {
      for (int i = 0; i < response.prefixes.length; i++) {
        items[i] = BucketEntry._directory(response.prefixes[i]);
      }
      prefixes = response.prefixes.length;
    }
    if (response.items != null) {
      for (int i = 0; i < response.items.length; i++) {
        items[prefixes + i] = BucketEntry._object(response.items[i].name);
      }
    }
  }

  bool get isLast => _nextPageToken == null;

  Future<Page<BucketEntry>> next({int pageSize}) {
    if (isLast) return Future.value(null);
    if (pageSize == null) pageSize = this._pageSize;

    return _bucket
        ._listObjects(_bucket.bucketName, _prefix, _DIRECTORY_DELIMITER,
            pageSize, _nextPageToken)
        .then((response) {
      return _ObjectPageImpl(_bucket, _prefix, pageSize, response);
    });
  }
}

class _ObjectGenerationImpl implements ObjectGeneration {
  final String objectGeneration;
  final int metaGeneration;

  _ObjectGenerationImpl(this.objectGeneration, this.metaGeneration);
}

class _ObjectInfoImpl implements ObjectInfo {
  final storage_api.Object _object;
  final ObjectMetadata _metadata;
  Uri _downloadLink;
  ObjectGeneration _generation;

  _ObjectInfoImpl(storage_api.Object object)
      : _object = object,
        _metadata = _ObjectMetadata._(object);

  String get name => _object.name;

  int get length => int.parse(_object.size);

  DateTime get updated => _object.updated;

  String get etag => _object.etag;

  List<int> get md5Hash => base64.decode(_object.md5Hash);

  int get crc32CChecksum {
    var list = base64.decode(_object.crc32c);
    return (list[3] << 24) | (list[2] << 16) | (list[1] << 8) | list[0];
  }

  Uri get downloadLink {
    if (_downloadLink == null) {
      _downloadLink = Uri.parse(_object.mediaLink);
    }
    return _downloadLink;
  }

  ObjectGeneration get generation {
    if (_generation == null) {
      _generation = _ObjectGenerationImpl(
          _object.generation, int.parse(_object.metageneration));
    }
    return _generation;
  }

  /// Additional metadata.
  ObjectMetadata get metadata => _metadata;
}

class _ObjectMetadata implements ObjectMetadata {
  final storage_api.Object _object;
  Acl _cachedAcl;
  ObjectGeneration _cachedGeneration;
  Map<String, String> _cachedCustom;

  _ObjectMetadata(
      {Acl acl,
      String contentType,
      String contentEncoding,
      String cacheControl,
      String contentDisposition,
      String contentLanguage,
      Map<String, String> custom})
      : _object = storage_api.Object() {
    _object.acl = acl != null ? acl._toObjectAccessControlList() : null;
    _object.contentType = contentType;
    _object.contentEncoding = contentEncoding;
    _object.cacheControl = cacheControl;
    _object.contentDisposition = contentDisposition;
    _object.contentLanguage = contentLanguage;
    if (custom != null) _object.metadata = custom;
  }

  _ObjectMetadata._(this._object);

  Acl get acl {
    if (_cachedAcl == null) {
      _cachedAcl = Acl._fromObjectAcl(_object);
    }
    return _cachedAcl;
  }

  String get contentType => _object.contentType;

  String get contentEncoding => _object.contentEncoding;

  String get cacheControl => _object.cacheControl;

  String get contentDisposition => _object.contentDisposition;

  String get contentLanguage => _object.contentLanguage;

  ObjectGeneration get generation {
    if (_cachedGeneration == null) {
      _cachedGeneration = ObjectGeneration(
          _object.generation, int.parse(_object.metageneration));
    }
    return _cachedGeneration;
  }

  Map<String, String> get custom {
    if (_object.metadata == null) return null;
    if (_cachedCustom == null) {
      _cachedCustom = UnmodifiableMapView<String, String>(_object.metadata);
    }
    return _cachedCustom;
  }

  ObjectMetadata replace(
      {Acl acl,
      String contentType,
      String contentEncoding,
      String cacheControl,
      String contentDisposition,
      String contentLanguage,
      Map<String, String> custom}) {
    return _ObjectMetadata(
        acl: acl != null ? acl : this.acl,
        contentType: contentType != null ? contentType : this.contentType,
        contentEncoding:
            contentEncoding != null ? contentEncoding : this.contentEncoding,
        cacheControl: cacheControl != null ? cacheControl : this.cacheControl,
        contentDisposition: contentDisposition != null
            ? contentDisposition
            : this.contentEncoding,
        contentLanguage:
            contentLanguage != null ? contentLanguage : this.contentEncoding,
        custom: custom != null ? Map.from(custom) : this.custom);
  }
}

/// Implementation of StreamSink which handles Google media upload.
/// It provides a StreamSink and logic which selects whether to use normal
/// media upload (multipart mime) or resumable media upload.
class _MediaUploadStreamSink implements StreamSink<List<int>> {
  static const int _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH = 1024 * 1024;
  final storage_api.StorageApi _api;
  final String _bucketName;
  final String _objectName;
  final storage_api.Object _object;
  final String _predefinedAcl;
  final int _length;
  final int _maxNormalUploadLength;
  int _bufferLength = 0;
  final List<List<int>> buffer = List<List<int>>();
  final _controller = StreamController<List<int>>(sync: true);
  StreamSubscription _subscription;
  StreamController<List<int>> _resumableController;
  final _doneCompleter = Completer<ObjectInfo>();

  static const int _STATE_LENGTH_KNOWN = 0;
  static const int _STATE_PROBING_LENGTH = 1;
  static const int _STATE_DECIDED_RESUMABLE = 2;
  int _state;

  _MediaUploadStreamSink(this._api, this._bucketName, this._objectName,
      this._object, this._predefinedAcl, this._length,
      [this._maxNormalUploadLength = _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH]) {
    if (_length != null) {
      // If the length is known in advance decide on the upload strategy
      // immediately
      _state = _STATE_LENGTH_KNOWN;
      if (_length <= _maxNormalUploadLength) {
        _startNormalUpload(_controller.stream, _length);
      } else {
        _startResumableUpload(_controller.stream, _length);
      }
    } else {
      _state = _STATE_PROBING_LENGTH;
      // If the length is not known in advance decide on the upload strategy
      // later. Start buffering until enough data has been read to decide.
      _subscription = _controller.stream
          .listen(_onData, onDone: _onDone, onError: _onError);
    }
  }

  void add(List<int> event) {
    _controller.add(event);
  }

  void addError(errorEvent, [StackTrace stackTrace]) {
    _controller.addError(errorEvent, stackTrace);
  }

  Future addStream(Stream<List<int>> stream) {
    return _controller.addStream(stream);
  }

  Future<ObjectInfo> close() {
    _controller.close();
    return _doneCompleter.future;
  }

  Future get done => _doneCompleter.future;

  _onData(List<int> data) {
    assert(_state != _STATE_LENGTH_KNOWN);
    if (_state == _STATE_PROBING_LENGTH) {
      buffer.add(data);
      _bufferLength += data.length;
      if (_bufferLength > _maxNormalUploadLength) {
        // Start resumable upload.
        // TODO: Avoid using another stream-controller.
        _resumableController = StreamController<List<int>>(sync: true);
        buffer.forEach(_resumableController.add);
        _startResumableUpload(_resumableController.stream, _length);
        _state = _STATE_DECIDED_RESUMABLE;
      }
    } else {
      assert(_state == _STATE_DECIDED_RESUMABLE);
      _resumableController.add(data);
    }
  }

  _onDone() {
    if (_state == _STATE_PROBING_LENGTH) {
      // As the data is already cached don't bother to wait on somebody
      // listening on the stream before adding the data.
      _startNormalUpload(Stream<List<int>>.fromIterable(buffer), _bufferLength);
    } else {
      _resumableController.close();
    }
  }

  _onError(e, StackTrace s) {
    // If still deciding on the strategy complete with error. Otherwise
    // forward the error for default processing.
    if (_state == _STATE_PROBING_LENGTH) {
      _completeError(e, s);
    } else {
      _resumableController.addError(e, s);
    }
  }

  _completeError(e, StackTrace s) {
    if (_state != _STATE_LENGTH_KNOWN) {
      // Always cancel subscription on error.
      _subscription.cancel();
    }
    _doneCompleter.completeError(e, s);
  }

  void _startNormalUpload(Stream<List<int>> stream, int length) {
    var contentType = _object.contentType != null
        ? _object.contentType
        : 'application/octet-stream';
    var media = storage_api.Media(stream, length, contentType: contentType);
    _api.objects
        .insert(_object, _bucketName,
            name: _objectName,
            predefinedAcl: _predefinedAcl,
            uploadMedia: media,
            uploadOptions: storage_api.UploadOptions.Default)
        .then((response) {
      _doneCompleter.complete(_ObjectInfoImpl(response));
    }, onError: _completeError);
  }

  void _startResumableUpload(Stream<List<int>> stream, int length) {
    var contentType = _object.contentType != null
        ? _object.contentType
        : 'application/octet-stream';
    var media = storage_api.Media(stream, length, contentType: contentType);
    _api.objects
        .insert(_object, _bucketName,
            name: _objectName,
            predefinedAcl: _predefinedAcl,
            uploadMedia: media,
            uploadOptions: storage_api.UploadOptions.Resumable)
        .then((response) {
      _doneCompleter.complete(_ObjectInfoImpl(response));
    }, onError: _completeError);
  }
}
