blob: 840409ed2286c1b1da133b0c2ca43754bff8d5ba [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library gcloud.datastore_impl;
import 'dart:async';
import 'package:googleapis/datastore/v1.dart' as api;
import 'package:http/http.dart' as http;
import '../common.dart' show Page;
import '../datastore.dart' as datastore;
class TransactionImpl implements datastore.Transaction {
final String data;
class DatastoreImpl implements datastore.Datastore {
static const List<String> SCOPES = <String>[
final api.DatastoreApi _api;
final String _project;
/// The [project] parameter is the name of the cloud project (it should not
/// start with a `s~`).
DatastoreImpl(http.Client client, String project)
: _api = api.DatastoreApi(client),
_project = project;
api.Key _convertDatastore2ApiKey(datastore.Key key, {bool enforceId = true}) {
var apiKey = api.Key();
apiKey.partitionId = api.PartitionId()
..projectId = _project
..namespaceId = key.partition.namespace;
apiKey.path = element) {
final part = api.PathElement();
part.kind = element.kind;
final id =;
if (id is int) { = '$id';
} else if (id is String) { = id;
} else if (enforceId) {
throw datastore.ApplicationError(
'Error while encoding entity key: Using `null` as the id is not '
return part;
return apiKey;
static datastore.Key _convertApi2DatastoreKey(api.Key key) {
var elements = element) {
if ( != null) {
return datastore.KeyElement(element.kind, int.parse(;
} else if ( != null) {
return datastore.KeyElement(element.kind,;
} else {
throw datastore.DatastoreError(
'Invalid server response: Expected allocated name/id.');
datastore.Partition partition;
if (key.partitionId != null) {
partition = datastore.Partition(key.partitionId.namespaceId);
// TODO: assert projectId.
return datastore.Key(elements, partition: partition);
bool _compareApiKey(api.Key a, api.Key b) {
if (a.path.length != b.path.length) return false;
// FIXME(Issue #2): Is this comparison working correctly?
if (a.partitionId != null) {
if (b.partitionId == null) return false;
if (a.partitionId.projectId != b.partitionId.projectId) return false;
if (a.partitionId.namespaceId != b.partitionId.namespaceId) return false;
} else {
if (b.partitionId != null) return false;
for (int i = 0; i < a.path.length; i++) {
if (a.path[i].id != b.path[i].id ||
a.path[i].name != b.path[i].name ||
a.path[i].kind != b.path[i].kind) return false;
return true;
api.Value _convertDatastore2ApiPropertyValue(value, bool indexed,
{bool lists = true}) {
var apiValue = api.Value()..excludeFromIndexes = !indexed;
if (value == null) {
return apiValue..nullValue = "NULL_VALUE";
} else if (value is bool) {
return apiValue..booleanValue = value;
} else if (value is int) {
return apiValue..integerValue = '$value';
} else if (value is double) {
return apiValue..doubleValue = value;
} else if (value is String) {
return apiValue..stringValue = value;
} else if (value is DateTime) {
return apiValue..timestampValue = value.toIso8601String();
} else if (value is datastore.BlobValue) {
return apiValue..blobValueAsBytes = value.bytes;
} else if (value is datastore.Key) {
return apiValue
..keyValue = _convertDatastore2ApiKey(value, enforceId: false);
} else if (value is List) {
if (!lists) {
// FIXME(Issue #3): Consistently handle exceptions.
throw Exception('List values are not allowed.');
convertItem(i) =>
_convertDatastore2ApiPropertyValue(i, indexed, lists: false);
return api.Value()
..arrayValue =
(api.ArrayValue()..values =;
} else {
throw UnsupportedError(
'Types ${value.runtimeType} cannot be used for serializing.');
static dynamic _convertApi2DatastoreProperty(api.Value value) {
if (value.booleanValue != null)
return value.booleanValue;
else if (value.integerValue != null)
return int.parse(value.integerValue);
else if (value.doubleValue != null)
return value.doubleValue;
else if (value.stringValue != null)
return value.stringValue;
else if (value.timestampValue != null)
return DateTime.parse(value.timestampValue);
else if (value.blobValue != null)
return datastore.BlobValue(value.blobValueAsBytes);
else if (value.keyValue != null)
return _convertApi2DatastoreKey(value.keyValue);
else if (value.arrayValue != null && value.arrayValue.values != null)
return value.arrayValue.values
else if (value.entityValue != null)
throw UnsupportedError('Entity values are not supported.');
else if (value.geoPointValue != null)
throw UnsupportedError('GeoPoint values are not supported.');
return null;
static datastore.Entity _convertApi2DatastoreEntity(api.Entity entity) {
var unindexedProperties = Set<String>();
var properties = <String, Object>{};
if ( != null) { name, api.Value value) {
properties[name] = _convertApi2DatastoreProperty(value);
if (value.excludeFromIndexes != null && value.excludeFromIndexes) {
return datastore.Entity(_convertApi2DatastoreKey(entity.key), properties,
unIndexedProperties: unindexedProperties);
api.Entity _convertDatastore2ApiEntity(datastore.Entity entity,
{bool enforceId = false}) {
var apiEntity = api.Entity();
apiEntity.key = _convertDatastore2ApiKey(entity.key, enforceId: enforceId); = {};
if ( != null) {
for (var key in {
var value =[key];
bool indexed = false;
if (entity.unIndexedProperties != null) {
indexed = !entity.unIndexedProperties.contains(key);
var property = _convertDatastore2ApiPropertyValue(value, indexed);[key] = property;
return apiEntity;
static Map<datastore.FilterRelation, String> relationMapping = const {
datastore.FilterRelation.LessThan: 'LESS_THAN',
datastore.FilterRelation.LessThanOrEqual: 'LESS_THAN_OR_EQUAL',
datastore.FilterRelation.Equal: 'EQUAL',
datastore.FilterRelation.GreatherThan: 'GREATER_THAN',
datastore.FilterRelation.GreatherThanOrEqual: 'GREATER_THAN_OR_EQUAL',
api.Filter _convertDatastore2ApiFilter(datastore.Filter filter) {
var pf = api.PropertyFilter();
var operator = relationMapping[filter.relation];
if (operator == null) {
throw ArgumentError('Unknown filter relation: ${filter.relation}.');
pf.op = operator; = api.PropertyReference() =;
pf.value =
_convertDatastore2ApiPropertyValue(filter.value, true, lists: false);
return api.Filter()..propertyFilter = pf;
api.Filter _convertDatastoreAncestorKey2ApiFilter(datastore.Key key) {
var pf = api.PropertyFilter();
pf.op = 'HAS_ANCESTOR'; = api.PropertyReference() = '__key__';
pf.value = api.Value()
..keyValue = _convertDatastore2ApiKey(key, enforceId: true);
return api.Filter()..propertyFilter = pf;
api.Filter _convertDatastore2ApiFilters(
List<datastore.Filter> filters, datastore.Key ancestorKey) {
if ((filters == null || filters.isEmpty) && ancestorKey == null) {
return null;
var compFilter = api.CompositeFilter();
if (filters != null) {
compFilter.filters =;
if (ancestorKey != null) {
var filter = _convertDatastoreAncestorKey2ApiFilter(ancestorKey);
if (compFilter.filters == null) {
compFilter.filters = [filter];
} else {
compFilter.op = 'AND';
return api.Filter()..compositeFilter = compFilter;
api.PropertyOrder _convertDatastore2ApiOrder(datastore.Order order) {
var property = api.PropertyReference() = order.propertyName;
var direction = order.direction == datastore.OrderDirection.Ascending
return api.PropertyOrder()
..direction = direction = property;
List<api.PropertyOrder> _convertDatastore2ApiOrders(
List<datastore.Order> orders) {
if (orders == null) return null;
static Future<Null> _handleError(error, StackTrace stack) {
if (error is api.DetailedApiRequestError) {
if (error.status == 400) {
return Future.error(datastore.ApplicationError(error.message), stack);
} else if (error.status == 409) {
// NOTE: This is reported as:
// "too much contention on these datastore entities"
// TODO:
return Future.error(datastore.TransactionAbortedError(), stack);
} else if (error.status == 412) {
return Future.error(datastore.NeedIndexError(), stack);
return Future.error(error, stack);
Future<List<datastore.Key>> allocateIds(List<datastore.Key> keys) {
var request = api.AllocateIdsRequest();
..keys = {
return _convertDatastore2ApiKey(key, enforceId: false);
return _api.projects.allocateIds(request, _project).then((response) {
}, onError: _handleError);
Future<datastore.Transaction> beginTransaction(
{bool crossEntityGroup = false}) {
var request = api.BeginTransactionRequest();
return _api.projects.beginTransaction(request, _project).then((result) {
return TransactionImpl(result.transaction);
}, onError: _handleError);
Future<datastore.CommitResult> commit(
{List<datastore.Entity> inserts,
List<datastore.Entity> autoIdInserts,
List<datastore.Key> deletes,
datastore.Transaction transaction}) {
var request = api.CommitRequest();
if (transaction != null) {
request.mode = 'TRANSACTIONAL';
request.transaction = (transaction as TransactionImpl).data;
} else {
request.mode = 'NON_TRANSACTIONAL';
var mutations = request.mutations = <api.Mutation>[];
if (inserts != null) {
for (int i = 0; i < inserts.length; i++) {
..upsert = _convertDatastore2ApiEntity(inserts[i], enforceId: true));
int autoIdStartIndex = -1;
if (autoIdInserts != null) {
autoIdStartIndex = mutations.length;
for (int i = 0; i < autoIdInserts.length; i++) {
..insert =
_convertDatastore2ApiEntity(autoIdInserts[i], enforceId: false));
if (deletes != null) {
for (int i = 0; i < deletes.length; i++) {
..delete = _convertDatastore2ApiKey(deletes[i], enforceId: true));
return _api.projects.commit(request, _project).then((result) {
List<datastore.Key> keys;
if (autoIdInserts != null && autoIdInserts.isNotEmpty) {
List<api.MutationResult> mutationResults = result.mutationResults;
assert(autoIdStartIndex != -1);
assert(mutationResults.length >=
(autoIdStartIndex + autoIdInserts.length));
keys = mutationResults
(api.MutationResult r) => _convertApi2DatastoreKey(r.key))
return datastore.CommitResult(keys);
}, onError: _handleError);
Future<List<datastore.Entity>> lookup(List<datastore.Key> keys,
{datastore.Transaction transaction}) {
var apiKeys = {
return _convertDatastore2ApiKey(key, enforceId: true);
var request = api.LookupRequest();
request.keys = apiKeys;
if (transaction != null) {
// TODO: Make readOptions more configurable.
request.readOptions = api.ReadOptions();
request.readOptions.transaction = (transaction as TransactionImpl).data;
return _api.projects.lookup(request, _project).then((response) {
if (response.deferred != null && response.deferred.isNotEmpty) {
throw datastore.DatastoreError(
'Could not successfully look up all keys due to resource '
// NOTE: This is worst-case O(n^2)!
// Maybe we can optimize this somehow. But the API says:
// message LookupResponse {
// // The order of results in these fields is undefined and has no relation to
// // the order of the keys in the input.
// // Entities found as ResultType.FULL entities.
// repeated EntityResult found = 1;
// // Entities not found as ResultType.KEY_ONLY entities.
// repeated EntityResult missing = 2;
// // A list of keys that were not looked up due to resource constraints.
// repeated Key deferred = 3;
// }
var entities = List<datastore.Entity>(apiKeys.length);
for (int i = 0; i < apiKeys.length; i++) {
var apiKey = apiKeys[i];
bool found = false;
if (response.found != null) {
for (var result in response.found) {
if (_compareApiKey(apiKey, result.entity.key)) {
entities[i] = _convertApi2DatastoreEntity(result.entity);
found = true;
if (found) continue;
if (response.missing != null) {
for (var result in response.missing) {
if (_compareApiKey(apiKey, result.entity.key)) {
entities[i] = null;
found = true;
if (!found) {
throw datastore.DatastoreError('Invalid server response: '
'Tried to lookup ${apiKey.toJson()} but entity was neither in '
'missing nor in found.');
return entities;
}, onError: _handleError);
Future<Page<datastore.Entity>> query(datastore.Query query,
{datastore.Partition partition, datastore.Transaction transaction}) {
// NOTE: We explicitly do not set 'limit' here, since this is handled by
// QueryPageImpl.runQuery.
var apiQuery = api.Query()
..filter = _convertDatastore2ApiFilters(query.filters, query.ancestorKey)
..order = _convertDatastore2ApiOrders(query.orders)
..offset = query.offset;
if (query.kind != null) {
apiQuery.kind = [api.KindExpression() = query.kind];
var request = api.RunQueryRequest();
request.query = apiQuery;
if (transaction != null) {
// TODO: Make readOptions more configurable.
request.readOptions = api.ReadOptions();
request.readOptions.transaction = (transaction as TransactionImpl).data;
if (partition != null) {
request.partitionId = api.PartitionId()
..namespaceId = partition.namespace;
return QueryPageImpl.runQuery(_api, _project, request, query.limit)
Future rollback(datastore.Transaction transaction) {
// TODO: Handle [transaction]
var request = api.RollbackRequest()
..transaction = (transaction as TransactionImpl).data;
return _api.projects.rollback(request, _project).catchError(_handleError);
class QueryPageImpl implements Page<datastore.Entity> {
static const int MAX_ENTITIES_PER_RESPONSE = 2000;
final api.DatastoreApi _api;
final String _project;
final api.RunQueryRequest _nextRequest;
final List<datastore.Entity> _entities;
final bool _isLast;
// This might be `null` in which case we request as many as we can get.
final int _remainingNumberOfEntities;
QueryPageImpl(this._api, this._project, this._nextRequest, this._entities,
this._isLast, this._remainingNumberOfEntities);
static Future<QueryPageImpl> runQuery(api.DatastoreApi api, String project,
api.RunQueryRequest request, int limit,
{int batchSize}) {
int batchLimit = batchSize;
if (batchLimit == null) {
if (limit != null && limit < batchLimit) {
batchLimit = limit;
request.query.limit = batchLimit;
return api.projects.runQuery(request, project).then((response) {
var returnedEntities = const <datastore.Entity>[];
var batch = response.batch;
if (batch.entityResults != null) {
returnedEntities = batch.entityResults
.map((result) => result.entity)
// This check is only necessary for the first request/response pair
// (if offset was supplied).
if (request.query.offset != null &&
request.query.offset > 0 &&
request.query.offset != response.batch.skippedResults) {
throw datastore.DatastoreError(
'Server did not skip over the specified ${request.query.offset} '
if (limit != null && returnedEntities.length > limit) {
throw datastore.DatastoreError(
'Server returned more entities then the limit for the request'
'(${request.query.limit}) was.');
// FIXME: TODO: Big hack!
// It looks like Apiary/Atlas is currently broken.
if (limit != null &&
returnedEntities.length < batchLimit &&
response.batch.moreResults == 'MORE_RESULTS_AFTER_LIMIT') {
throw new datastore.DatastoreError(
'Server returned response with less entities then the limit was, '
'but signals there are more results after the limit.');
// In case a limit was specified, we need to subtraction the number of
// entities we already got.
// (the checks above guarantee that this subtraction is >= 0).
int remainingEntities;
if (limit != null) {
remainingEntities = limit - returnedEntities.length;
// If the server signals there are more entities and we either have no
// limit or our limit has not been reached, we set `moreBatches` to
// `true`.
bool moreBatches = (remainingEntities == null || remainingEntities > 0) &&
response.batch.moreResults == 'MORE_RESULTS_AFTER_LIMIT';
bool gotAll = limit != null && remainingEntities == 0;
bool noMore = response.batch.moreResults == 'NO_MORE_RESULTS';
bool isLast = gotAll || noMore;
// As a sanity check, we assert that `moreBatches XOR isLast`.
assert(isLast != moreBatches);
// FIXME: TODO: Big hack!
// It looks like Apiary/Atlas is currently broken.
if (moreBatches && returnedEntities.isEmpty) {
print('Warning: Api to Google Cloud Datastore returned bogus response. '
'Trying a workaround.');
isLast = true;
moreBatches = false;
if (!isLast && response.batch.endCursor == null) {
throw datastore.DatastoreError(
'Server did not supply an end cursor, even though the query '
'is not done.');
if (isLast) {
return QueryPageImpl(
api, project, request, returnedEntities, true, null);
} else {
// NOTE: We reuse the old RunQueryRequest object here .
// The offset will be 0 from now on, since the first request will have
// skipped over the first `offset` results.
request.query.offset = 0;
// Furthermore we set the startCursor to the endCursor of the previous
// result batch, so we can continue where we left off.
request.query.startCursor = batch.endCursor;
return QueryPageImpl(
api, project, request, returnedEntities, false, remainingEntities);
bool get isLast => _isLast;
List<datastore.Entity> get items => _entities;
Future<Page<datastore.Entity>> next({int pageSize}) {
// NOTE: We do not respect [pageSize] here, the only mechanism we can
// really use is `query.limit`, but this is user-specified when making
// the query.
if (isLast) {
return Future.sync(() {
throw ArgumentError('Cannot call next() on last page.');
return QueryPageImpl.runQuery(
_api, _project, _nextRequest, _remainingNumberOfEntities)