| // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| part of gcloud.storage; |
| |
| const String _ABSOLUTE_PREFIX = 'gs://'; |
| const String _DIRECTORY_DELIMITER = 'gs://'; |
| |
| /// Representation of an absolute name consisting of bucket name and object |
| /// name. |
| class _AbsoluteName { |
| String bucketName; |
| String objectName; |
| |
| _AbsoluteName.parse(String absoluteName) { |
| if (!absoluteName.startsWith(_ABSOLUTE_PREFIX)) { |
| throw FormatException("Absolute name '$absoluteName' does not start " |
| "with '$_ABSOLUTE_PREFIX'"); |
| } |
| int index = absoluteName.indexOf('/', _ABSOLUTE_PREFIX.length); |
| if (index == -1 || index == _ABSOLUTE_PREFIX.length) { |
| throw FormatException("Absolute name '$absoluteName' does not have " |
| "a bucket name"); |
| } |
| if (index == absoluteName.length - 1) { |
| throw FormatException("Absolute name '$absoluteName' does not have " |
| "an object name"); |
| } |
| bucketName = absoluteName.substring(_ABSOLUTE_PREFIX.length, index); |
| objectName = absoluteName.substring(index + 1); |
| } |
| } |
| |
| /// Storage API implementation providing access to buckets. |
| class _StorageImpl implements Storage { |
| final String project; |
| final storage_api.StorageApi _api; |
| |
| _StorageImpl(http.Client client, this.project) |
| : _api = storage_api.StorageApi(client); |
| |
| Future createBucket(String bucketName, |
| {PredefinedAcl predefinedAcl, Acl acl}) { |
| var bucket = storage_api.Bucket()..name = bucketName; |
| var predefinedName = predefinedAcl != null ? predefinedAcl._name : null; |
| if (acl != null) { |
| bucket.acl = acl._toBucketAccessControlList(); |
| } |
| return _api.buckets |
| .insert(bucket, project, predefinedAcl: predefinedName) |
| .then((bucket) => null); |
| } |
| |
| Future deleteBucket(String bucketName) { |
| return _api.buckets.delete(bucketName); |
| } |
| |
| Bucket bucket(String bucketName, |
| {PredefinedAcl defaultPredefinedObjectAcl, Acl defaultObjectAcl}) { |
| return _BucketImpl( |
| this, bucketName, defaultPredefinedObjectAcl, defaultObjectAcl); |
| } |
| |
| Future<bool> bucketExists(String bucketName) { |
| notFoundError(e) { |
| return e is storage_api.DetailedApiRequestError && e.status == 404; |
| } |
| |
| return _api.buckets |
| .get(bucketName) |
| .then((_) => true) |
| .catchError((e) => false, test: notFoundError); |
| } |
| |
| Future<BucketInfo> bucketInfo(String bucketName) { |
| return _api.buckets |
| .get(bucketName, projection: 'full') |
| .then((bucket) => _BucketInfoImpl(bucket)); |
| } |
| |
| Stream<String> listBucketNames() { |
| Future<_BucketPageImpl> firstPage(int pageSize) { |
| return _listBuckets(pageSize, null) |
| .then((response) => _BucketPageImpl(this, pageSize, response)); |
| } |
| |
| return StreamFromPages<String>(firstPage).stream; |
| } |
| |
| Future<Page<String>> pageBucketNames({int pageSize = 50}) { |
| return _listBuckets(pageSize, null).then((response) { |
| return _BucketPageImpl(this, pageSize, response); |
| }); |
| } |
| |
| Future copyObject(String src, String dest) { |
| var srcName = _AbsoluteName.parse(src); |
| var destName = _AbsoluteName.parse(dest); |
| return _api.objects |
| .copy(null, srcName.bucketName, srcName.objectName, destName.bucketName, |
| destName.objectName) |
| .then((_) => null); |
| } |
| |
| Future<storage_api.Buckets> _listBuckets(int pageSize, String nextPageToken) { |
| return _api.buckets |
| .list(project, maxResults: pageSize, pageToken: nextPageToken); |
| } |
| } |
| |
| class _BucketInfoImpl implements BucketInfo { |
| final storage_api.Bucket _bucket; |
| |
| _BucketInfoImpl(this._bucket); |
| |
| String get bucketName => _bucket.name; |
| |
| String get etag => _bucket.etag; |
| |
| DateTime get created => _bucket.timeCreated; |
| |
| String get id => _bucket.id; |
| |
| Acl get acl => Acl._fromBucketAcl(_bucket); |
| } |
| |
| /// Bucket API implementation providing access to objects. |
| class _BucketImpl implements Bucket { |
| final storage_api.StorageApi _api; |
| PredefinedAcl _defaultPredefinedObjectAcl; |
| Acl _defaultObjectAcl; |
| final String bucketName; |
| |
| _BucketImpl(_StorageImpl storage, this.bucketName, |
| this._defaultPredefinedObjectAcl, this._defaultObjectAcl) |
| : this._api = storage._api; |
| |
| String absoluteObjectName(String objectName) { |
| return '$_ABSOLUTE_PREFIX$bucketName/$objectName'; |
| } |
| |
| StreamSink<List<int>> write(String objectName, |
| {int length, |
| ObjectMetadata metadata, |
| Acl acl, |
| PredefinedAcl predefinedAcl, |
| String contentType}) { |
| storage_api.Object object; |
| if (metadata == null) { |
| metadata = _ObjectMetadata(acl: acl, contentType: contentType); |
| } else { |
| if (acl != null) { |
| metadata = metadata.replace(acl: acl); |
| } |
| if (contentType != null) { |
| metadata = metadata.replace(contentType: contentType); |
| } |
| } |
| _ObjectMetadata objectMetadata = metadata as _ObjectMetadata; |
| object = objectMetadata._object; |
| |
| // If no predefined ACL is passed use the default (if any). |
| String predefinedName; |
| if (predefinedAcl != null || _defaultPredefinedObjectAcl != null) { |
| var predefined = |
| predefinedAcl != null ? predefinedAcl : _defaultPredefinedObjectAcl; |
| predefinedName = predefined._name; |
| } |
| |
| // If no ACL is passed use the default (if any). |
| if (object.acl == null && _defaultObjectAcl != null) { |
| object.acl = _defaultObjectAcl._toObjectAccessControlList(); |
| } |
| |
| // Fill properties not passed in metadata. |
| object.name = objectName; |
| |
| var sink = _MediaUploadStreamSink( |
| _api, bucketName, objectName, object, predefinedName, length); |
| return sink; |
| } |
| |
| Future<ObjectInfo> writeBytes(String objectName, List<int> bytes, |
| {ObjectMetadata metadata, |
| Acl acl, |
| PredefinedAcl predefinedAcl, |
| String contentType}) { |
| _MediaUploadStreamSink sink = write(objectName, |
| length: bytes.length, |
| metadata: metadata, |
| acl: acl, |
| predefinedAcl: predefinedAcl, |
| contentType: contentType) as _MediaUploadStreamSink; |
| sink.add(bytes); |
| return sink.close(); |
| } |
| |
| Stream<List<int>> read(String objectName, {int offset, int length}) async* { |
| if (offset == null) { |
| offset = 0; |
| } |
| |
| if (offset != 0 && length == null) { |
| throw ArgumentError('length must have a value if offset is non-zero.'); |
| } |
| |
| var options = storage_api.DownloadOptions.FullMedia; |
| |
| if (length != null) { |
| if (length <= 0) { |
| throw ArgumentError.value( |
| length, 'length', 'If provided, length must greater than zero.'); |
| } |
| // For ByteRange, end is *inclusive*. |
| var end = offset + length - 1; |
| var range = storage_api.ByteRange(offset, end); |
| assert(range.length == length); |
| options = storage_api.PartialDownloadOptions(range); |
| } |
| |
| commons.Media media = (await _api.objects.get(bucketName, objectName, |
| downloadOptions: options)) as commons.Media; |
| |
| yield* media.stream; |
| } |
| |
| Future<ObjectInfo> info(String objectName) { |
| return _api.objects |
| .get(bucketName, objectName, projection: 'full') |
| .then((object) => _ObjectInfoImpl(object as storage_api.Object)); |
| } |
| |
| Future delete(String objectName) { |
| return _api.objects.delete(bucketName, objectName); |
| } |
| |
| Stream<BucketEntry> list({String prefix}) { |
| Future<_ObjectPageImpl> firstPage(int pageSize) { |
| return _listObjects(bucketName, prefix, _DIRECTORY_DELIMITER, 50, null) |
| .then( |
| (response) => _ObjectPageImpl(this, prefix, pageSize, response)); |
| } |
| |
| return StreamFromPages<BucketEntry>(firstPage).stream; |
| } |
| |
| Future<Page<BucketEntry>> page({String prefix, int pageSize = 50}) { |
| return _listObjects( |
| bucketName, prefix, _DIRECTORY_DELIMITER, pageSize, null) |
| .then((response) { |
| return _ObjectPageImpl(this, prefix, pageSize, response); |
| }); |
| } |
| |
| Future updateMetadata(String objectName, ObjectMetadata metadata) { |
| // TODO: support other ObjectMetadata implementations? |
| _ObjectMetadata md = metadata as _ObjectMetadata; |
| var object = md._object; |
| if (md._object.acl == null && _defaultObjectAcl == null) { |
| throw ArgumentError('ACL is required for update'); |
| } |
| if (md.contentType == null) { |
| throw ArgumentError('Content-Type is required for update'); |
| } |
| if (md._object.acl == null) { |
| md._object.acl = _defaultObjectAcl._toObjectAccessControlList(); |
| } |
| return _api.objects.update(object, bucketName, objectName); |
| } |
| |
| Future<storage_api.Objects> _listObjects(String bucketName, String prefix, |
| String delimiter, int pageSize, String nextPageToken) { |
| return _api.objects.list(bucketName, |
| prefix: prefix, |
| delimiter: delimiter, |
| maxResults: pageSize, |
| pageToken: nextPageToken); |
| } |
| } |
| |
| class _BucketPageImpl implements Page<String> { |
| final _StorageImpl _storage; |
| final int _pageSize; |
| final String _nextPageToken; |
| final List<String> items; |
| |
| _BucketPageImpl(this._storage, this._pageSize, storage_api.Buckets response) |
| : items = List(response.items != null ? response.items.length : 0), |
| _nextPageToken = response.nextPageToken { |
| for (int i = 0; i < items.length; i++) { |
| items[i] = response.items[i].name; |
| } |
| } |
| |
| bool get isLast => _nextPageToken == null; |
| |
| Future<Page<String>> next({int pageSize}) { |
| if (isLast) return Future.value(null); |
| if (pageSize == null) pageSize = this._pageSize; |
| |
| return _storage._listBuckets(pageSize, _nextPageToken).then((response) { |
| return _BucketPageImpl(_storage, pageSize, response); |
| }); |
| } |
| } |
| |
| class _ObjectPageImpl implements Page<BucketEntry> { |
| final _BucketImpl _bucket; |
| final String _prefix; |
| final int _pageSize; |
| final String _nextPageToken; |
| final List<BucketEntry> items; |
| |
| _ObjectPageImpl( |
| this._bucket, this._prefix, this._pageSize, storage_api.Objects response) |
| : items = List((response.items != null ? response.items.length : 0) + |
| (response.prefixes != null ? response.prefixes.length : 0)), |
| _nextPageToken = response.nextPageToken { |
| var prefixes = 0; |
| if (response.prefixes != null) { |
| for (int i = 0; i < response.prefixes.length; i++) { |
| items[i] = BucketEntry._directory(response.prefixes[i]); |
| } |
| prefixes = response.prefixes.length; |
| } |
| if (response.items != null) { |
| for (int i = 0; i < response.items.length; i++) { |
| items[prefixes + i] = BucketEntry._object(response.items[i].name); |
| } |
| } |
| } |
| |
| bool get isLast => _nextPageToken == null; |
| |
| Future<Page<BucketEntry>> next({int pageSize}) { |
| if (isLast) return Future.value(null); |
| if (pageSize == null) pageSize = this._pageSize; |
| |
| return _bucket |
| ._listObjects(_bucket.bucketName, _prefix, _DIRECTORY_DELIMITER, |
| pageSize, _nextPageToken) |
| .then((response) { |
| return _ObjectPageImpl(_bucket, _prefix, pageSize, response); |
| }); |
| } |
| } |
| |
| class _ObjectGenerationImpl implements ObjectGeneration { |
| final String objectGeneration; |
| final int metaGeneration; |
| |
| _ObjectGenerationImpl(this.objectGeneration, this.metaGeneration); |
| } |
| |
| class _ObjectInfoImpl implements ObjectInfo { |
| final storage_api.Object _object; |
| final ObjectMetadata _metadata; |
| Uri _downloadLink; |
| ObjectGeneration _generation; |
| |
| _ObjectInfoImpl(storage_api.Object object) |
| : _object = object, |
| _metadata = _ObjectMetadata._(object); |
| |
| String get name => _object.name; |
| |
| int get length => int.parse(_object.size); |
| |
| DateTime get updated => _object.updated; |
| |
| String get etag => _object.etag; |
| |
| List<int> get md5Hash => base64.decode(_object.md5Hash); |
| |
| int get crc32CChecksum { |
| var list = base64.decode(_object.crc32c); |
| return (list[3] << 24) | (list[2] << 16) | (list[1] << 8) | list[0]; |
| } |
| |
| Uri get downloadLink { |
| if (_downloadLink == null) { |
| _downloadLink = Uri.parse(_object.mediaLink); |
| } |
| return _downloadLink; |
| } |
| |
| ObjectGeneration get generation { |
| if (_generation == null) { |
| _generation = _ObjectGenerationImpl( |
| _object.generation, int.parse(_object.metageneration)); |
| } |
| return _generation; |
| } |
| |
| /// Additional metadata. |
| ObjectMetadata get metadata => _metadata; |
| } |
| |
| class _ObjectMetadata implements ObjectMetadata { |
| final storage_api.Object _object; |
| Acl _cachedAcl; |
| ObjectGeneration _cachedGeneration; |
| Map<String, String> _cachedCustom; |
| |
| _ObjectMetadata( |
| {Acl acl, |
| String contentType, |
| String contentEncoding, |
| String cacheControl, |
| String contentDisposition, |
| String contentLanguage, |
| Map<String, String> custom}) |
| : _object = storage_api.Object() { |
| _object.acl = acl != null ? acl._toObjectAccessControlList() : null; |
| _object.contentType = contentType; |
| _object.contentEncoding = contentEncoding; |
| _object.cacheControl = cacheControl; |
| _object.contentDisposition = contentDisposition; |
| _object.contentLanguage = contentLanguage; |
| if (custom != null) _object.metadata = custom; |
| } |
| |
| _ObjectMetadata._(this._object); |
| |
| Acl get acl { |
| if (_cachedAcl == null) { |
| _cachedAcl = Acl._fromObjectAcl(_object); |
| } |
| return _cachedAcl; |
| } |
| |
| String get contentType => _object.contentType; |
| |
| String get contentEncoding => _object.contentEncoding; |
| |
| String get cacheControl => _object.cacheControl; |
| |
| String get contentDisposition => _object.contentDisposition; |
| |
| String get contentLanguage => _object.contentLanguage; |
| |
| ObjectGeneration get generation { |
| if (_cachedGeneration == null) { |
| _cachedGeneration = ObjectGeneration( |
| _object.generation, int.parse(_object.metageneration)); |
| } |
| return _cachedGeneration; |
| } |
| |
| Map<String, String> get custom { |
| if (_object.metadata == null) return null; |
| if (_cachedCustom == null) { |
| _cachedCustom = UnmodifiableMapView<String, String>(_object.metadata); |
| } |
| return _cachedCustom; |
| } |
| |
| ObjectMetadata replace( |
| {Acl acl, |
| String contentType, |
| String contentEncoding, |
| String cacheControl, |
| String contentDisposition, |
| String contentLanguage, |
| Map<String, String> custom}) { |
| return _ObjectMetadata( |
| acl: acl != null ? acl : this.acl, |
| contentType: contentType != null ? contentType : this.contentType, |
| contentEncoding: |
| contentEncoding != null ? contentEncoding : this.contentEncoding, |
| cacheControl: cacheControl != null ? cacheControl : this.cacheControl, |
| contentDisposition: contentDisposition != null |
| ? contentDisposition |
| : this.contentEncoding, |
| contentLanguage: |
| contentLanguage != null ? contentLanguage : this.contentEncoding, |
| custom: custom != null ? Map.from(custom) : this.custom); |
| } |
| } |
| |
| /// Implementation of StreamSink which handles Google media upload. |
| /// It provides a StreamSink and logic which selects whether to use normal |
| /// media upload (multipart mime) or resumable media upload. |
| class _MediaUploadStreamSink implements StreamSink<List<int>> { |
| static const int _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH = 1024 * 1024; |
| final storage_api.StorageApi _api; |
| final String _bucketName; |
| final String _objectName; |
| final storage_api.Object _object; |
| final String _predefinedAcl; |
| final int _length; |
| final int _maxNormalUploadLength; |
| int _bufferLength = 0; |
| final List<List<int>> buffer = List<List<int>>(); |
| final _controller = StreamController<List<int>>(sync: true); |
| StreamSubscription _subscription; |
| StreamController<List<int>> _resumableController; |
| final _doneCompleter = Completer<ObjectInfo>(); |
| |
| static const int _STATE_LENGTH_KNOWN = 0; |
| static const int _STATE_PROBING_LENGTH = 1; |
| static const int _STATE_DECIDED_RESUMABLE = 2; |
| int _state; |
| |
| _MediaUploadStreamSink(this._api, this._bucketName, this._objectName, |
| this._object, this._predefinedAcl, this._length, |
| [this._maxNormalUploadLength = _DEFAULT_MAX_NORMAL_UPLOAD_LENGTH]) { |
| if (_length != null) { |
| // If the length is known in advance decide on the upload strategy |
| // immediately |
| _state = _STATE_LENGTH_KNOWN; |
| if (_length <= _maxNormalUploadLength) { |
| _startNormalUpload(_controller.stream, _length); |
| } else { |
| _startResumableUpload(_controller.stream, _length); |
| } |
| } else { |
| _state = _STATE_PROBING_LENGTH; |
| // If the length is not known in advance decide on the upload strategy |
| // later. Start buffering until enough data has been read to decide. |
| _subscription = _controller.stream |
| .listen(_onData, onDone: _onDone, onError: _onError); |
| } |
| } |
| |
| void add(List<int> event) { |
| _controller.add(event); |
| } |
| |
| void addError(errorEvent, [StackTrace stackTrace]) { |
| _controller.addError(errorEvent, stackTrace); |
| } |
| |
| Future addStream(Stream<List<int>> stream) { |
| return _controller.addStream(stream); |
| } |
| |
| Future<ObjectInfo> close() { |
| _controller.close(); |
| return _doneCompleter.future; |
| } |
| |
| Future get done => _doneCompleter.future; |
| |
| _onData(List<int> data) { |
| assert(_state != _STATE_LENGTH_KNOWN); |
| if (_state == _STATE_PROBING_LENGTH) { |
| buffer.add(data); |
| _bufferLength += data.length; |
| if (_bufferLength > _maxNormalUploadLength) { |
| // Start resumable upload. |
| // TODO: Avoid using another stream-controller. |
| _resumableController = StreamController<List<int>>(sync: true); |
| buffer.forEach(_resumableController.add); |
| _startResumableUpload(_resumableController.stream, _length); |
| _state = _STATE_DECIDED_RESUMABLE; |
| } |
| } else { |
| assert(_state == _STATE_DECIDED_RESUMABLE); |
| _resumableController.add(data); |
| } |
| } |
| |
| _onDone() { |
| if (_state == _STATE_PROBING_LENGTH) { |
| // As the data is already cached don't bother to wait on somebody |
| // listening on the stream before adding the data. |
| _startNormalUpload(Stream<List<int>>.fromIterable(buffer), _bufferLength); |
| } else { |
| _resumableController.close(); |
| } |
| } |
| |
| _onError(e, StackTrace s) { |
| // If still deciding on the strategy complete with error. Otherwise |
| // forward the error for default processing. |
| if (_state == _STATE_PROBING_LENGTH) { |
| _completeError(e, s); |
| } else { |
| _resumableController.addError(e, s); |
| } |
| } |
| |
| _completeError(e, StackTrace s) { |
| if (_state != _STATE_LENGTH_KNOWN) { |
| // Always cancel subscription on error. |
| _subscription.cancel(); |
| } |
| _doneCompleter.completeError(e, s); |
| } |
| |
| void _startNormalUpload(Stream<List<int>> stream, int length) { |
| var contentType = _object.contentType != null |
| ? _object.contentType |
| : 'application/octet-stream'; |
| var media = storage_api.Media(stream, length, contentType: contentType); |
| _api.objects |
| .insert(_object, _bucketName, |
| name: _objectName, |
| predefinedAcl: _predefinedAcl, |
| uploadMedia: media, |
| uploadOptions: storage_api.UploadOptions.Default) |
| .then((response) { |
| _doneCompleter.complete(_ObjectInfoImpl(response)); |
| }, onError: _completeError); |
| } |
| |
| void _startResumableUpload(Stream<List<int>> stream, int length) { |
| var contentType = _object.contentType != null |
| ? _object.contentType |
| : 'application/octet-stream'; |
| var media = storage_api.Media(stream, length, contentType: contentType); |
| _api.objects |
| .insert(_object, _bucketName, |
| name: _objectName, |
| predefinedAcl: _predefinedAcl, |
| uploadMedia: media, |
| uploadOptions: storage_api.UploadOptions.Resumable) |
| .then((response) { |
| _doneCompleter.complete(_ObjectInfoImpl(response)); |
| }, onError: _completeError); |
| } |
| } |