blob: 6f98ef06193c7858c22eb8a76d9f93a2340ec5ef [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of gcloud.db;
/// A function definition for transactional functions.
///
/// The function will be given a [Transaction] object which can be used to make
/// lookups/queries and queue modifications (inserts/updates/deletes).
typedef TransactionHandler = Future Function(Transaction transaction);
/// A datastore transaction.
///
/// It can be used for making lookups/queries and queue modifications
/// (inserts/updates/deletes). Finally the transaction can be either committed
/// or rolled back.
class Transaction {
static const int _TRANSACTION_STARTED = 0;
static const int _TRANSACTION_ROLLED_BACK = 1;
static const int _TRANSACTION_COMMITTED = 2;
final DatastoreDB db;
final ds.Transaction _datastoreTransaction;
final List<Model> _inserts = [];
final List<Key> _deletes = [];
int _transactionState = _TRANSACTION_STARTED;
Transaction(this.db, this._datastoreTransaction);
/// Looks up [keys] within this transaction.
Future<List<T>> lookup<T extends Model>(List<Key> keys) {
return _lookupHelper<T>(db, keys,
datastoreTransaction: _datastoreTransaction);
}
/// Enqueues [inserts] and [deletes] which should be committed at commit time.
void queueMutations({List<Model> inserts, List<Key> deletes}) {
_checkSealed();
if (inserts != null) {
_inserts.addAll(inserts);
}
if (deletes != null) {
_deletes.addAll(deletes);
}
}
/// Query for [kind] models with [ancestorKey].
///
/// Note that [ancestorKey] is required, since a transaction is not allowed to
/// touch/look at an arbitrary number of rows.
Query<T> query<T extends Model>(Key ancestorKey, {Partition partition}) {
// TODO(#25): The `partition` element is redundant and should be removed.
if (partition == null) {
partition = ancestorKey.partition;
} else if (ancestorKey.partition != partition) {
throw ArgumentError(
'Ancestor queries must have the same partition in the ancestor key '
'as the partition where the query executes in.');
}
_checkSealed();
return Query<T>(db,
partition: partition,
ancestorKey: ancestorKey,
datastoreTransaction: _datastoreTransaction);
}
/// Rolls this transaction back.
Future rollback() {
_checkSealed(changeState: _TRANSACTION_ROLLED_BACK);
return db.datastore.rollback(_datastoreTransaction);
}
/// Commits this transaction including all of the queued mutations.
Future commit() {
_checkSealed(changeState: _TRANSACTION_COMMITTED);
return _commitHelper(db,
inserts: _inserts,
deletes: _deletes,
datastoreTransaction: _datastoreTransaction);
}
_checkSealed({int changeState}) {
if (_transactionState == _TRANSACTION_COMMITTED) {
throw StateError('The transaction has already been committed.');
} else if (_transactionState == _TRANSACTION_ROLLED_BACK) {
throw StateError('The transaction has already been rolled back.');
}
if (changeState != null) {
_transactionState = changeState;
}
}
}
class Query<T extends Model> {
final _relationMapping = const <String, ds.FilterRelation>{
'<': ds.FilterRelation.LessThan,
'<=': ds.FilterRelation.LessThanOrEqual,
'>': ds.FilterRelation.GreatherThan,
'>=': ds.FilterRelation.GreatherThanOrEqual,
'=': ds.FilterRelation.Equal,
};
final DatastoreDB _db;
final ds.Transaction _transaction;
final String _kind;
final Partition _partition;
final Key _ancestorKey;
final List<ds.Filter> _filters = [];
final List<ds.Order> _orders = [];
int _offset;
int _limit;
Query(DatastoreDB dbImpl,
{Partition partition,
Key ancestorKey,
ds.Transaction datastoreTransaction})
: _db = dbImpl,
_kind = dbImpl.modelDB.kindName(T),
_partition = partition,
_ancestorKey = ancestorKey,
_transaction = datastoreTransaction;
/// Adds a filter to this [Query].
///
/// [filterString] has form "name OP" where 'name' is a fieldName of the
/// model and OP is an operator. The following operators are supported:
///
/// * '<' (less than)
/// * '<=' (less than or equal)
/// * '>' (greater than)
/// * '>=' (greater than or equal)
/// * '=' (equal)
///
/// [comparisonObject] is the object for comparison.
void filter(String filterString, Object comparisonObject) {
var parts = filterString.split(' ');
if (parts.length != 2 || !_relationMapping.containsKey(parts[1])) {
throw ArgumentError("Invalid filter string '$filterString'.");
}
var name = parts[0];
var comparison = parts[1];
var propertyName = _convertToDatastoreName(name);
// This is for backwards compatibility: We allow [datastore.Key]s for now.
// TODO: We should remove the condition in a major version update of
// `package:gcloud`.
if (comparisonObject is! ds.Key) {
comparisonObject = _db.modelDB
.toDatastoreValue(_kind, name, comparisonObject, forComparison: true);
}
_filters.add(ds.Filter(
_relationMapping[comparison], propertyName, comparisonObject));
}
/// Adds an order to this [Query].
///
/// [orderString] has the form "-name" where 'name' is a fieldName of the model
/// and the optional '-' says whether the order is descending or ascending.
void order(String orderString) {
// TODO: validate [orderString] (e.g. is name valid)
if (orderString.startsWith('-')) {
_orders.add(ds.Order(ds.OrderDirection.Decending,
_convertToDatastoreName(orderString.substring(1))));
} else {
_orders.add(ds.Order(
ds.OrderDirection.Ascending, _convertToDatastoreName(orderString)));
}
}
/// Sets the [offset] of this [Query].
///
/// When running this query, [offset] results will be skipped.
void offset(int offset) {
_offset = offset;
}
/// Sets the [limit] of this [Query].
///
/// When running this query, a maximum of [limit] results will be returned.
void limit(int limit) {
_limit = limit;
}
/// Execute this [Query] on the datastore.
///
/// Outside of transactions this method might return stale data or may not
/// return the newest updates performed on the datastore since updates
/// will be reflected in the indices in an eventual consistent way.
Stream<T> run() {
ds.Key ancestorKey;
if (_ancestorKey != null) {
ancestorKey = _db.modelDB.toDatastoreKey(_ancestorKey);
}
var query = ds.Query(
ancestorKey: ancestorKey,
kind: _kind,
filters: _filters,
orders: _orders,
offset: _offset,
limit: _limit);
ds.Partition partition;
if (_partition != null) {
partition = ds.Partition(_partition.namespace);
}
return StreamFromPages<ds.Entity>((int pageSize) {
return _db.datastore
.query(query, transaction: _transaction, partition: partition);
}).stream.map<T>(_db.modelDB.fromDatastoreEntity);
}
// TODO:
// - add runPaged() returning Page<Model>
// - add run*() method once we have EntityResult{Entity,Cursor} in low-level
// API.
String _convertToDatastoreName(String name) {
var propertyName = _db.modelDB.fieldNameToPropertyName(_kind, name);
if (propertyName == null) {
throw ArgumentError("Field $name is not available for kind $_kind");
}
return propertyName;
}
}
class DatastoreDB {
final ds.Datastore datastore;
final ModelDB _modelDB;
Partition _defaultPartition;
DatastoreDB(this.datastore, {ModelDB modelDB, Partition defaultPartition})
: _modelDB = modelDB != null ? modelDB : ModelDBImpl() {
_defaultPartition =
defaultPartition != null ? defaultPartition : Partition(null);
}
/// The [ModelDB] used to serialize/deserialize objects.
ModelDB get modelDB => _modelDB;
/// Gets the empty key using the default [Partition].
///
/// Model keys with parent set to [emptyKey] will create their own entity
/// groups.
Key get emptyKey => defaultPartition.emptyKey;
/// Gets the default [Partition].
Partition get defaultPartition => _defaultPartition;
/// Creates a new [Partition] with namespace [namespace].
Partition newPartition(String namespace) {
return Partition(namespace);
}
/// Begins a new a new transaction.
///
/// A transaction can touch only a limited number of entity groups. This limit
/// is currently 5.
// TODO: Add retries and/or auto commit/rollback.
Future withTransaction(TransactionHandler transactionHandler) {
return datastore
.beginTransaction(crossEntityGroup: true)
.then((datastoreTransaction) {
var transaction = Transaction(this, datastoreTransaction);
return transactionHandler(transaction);
});
}
/// Build a query for [kind] models.
Query<T> query<T extends Model>({Partition partition, Key ancestorKey}) {
// TODO(#26): There is only one case where `partition` is not redundant
// Namely if `ancestorKey == null` and `partition != null`. We could
// say we get rid of `partition` and enforce `ancestorKey` to
// be `Partition.emptyKey`?
if (partition == null) {
if (ancestorKey != null) {
partition = ancestorKey.partition;
} else {
partition = defaultPartition;
}
} else if (ancestorKey != null && partition != ancestorKey.partition) {
throw ArgumentError(
'Ancestor queries must have the same partition in the ancestor key '
'as the partition where the query executes in.');
}
return Query<T>(this, partition: partition, ancestorKey: ancestorKey);
}
/// Looks up [keys] in the datastore and returns a list of [Model] objects.
///
/// For transactions, please use [beginTransaction] and call the [lookup]
/// method on it's returned [Transaction] object.
Future<List<T>> lookup<T extends Model>(List<Key> keys) {
return _lookupHelper<T>(this, keys);
}
/// Add [inserts] to the datastore and remove [deletes] from it.
///
/// The order of inserts and deletes is not specified. When the commit is done
/// direct lookups will see the effect but non-ancestor queries will see the
/// change in an eventual consistent way.
///
/// For transactions, please use `beginTransaction` and it's returned
/// [Transaction] object.
Future commit({List<Model> inserts, List<Key> deletes}) {
return _commitHelper(this, inserts: inserts, deletes: deletes);
}
}
Future _commitHelper(DatastoreDB db,
{List<Model> inserts,
List<Key> deletes,
ds.Transaction datastoreTransaction}) {
List<ds.Entity> entityInserts, entityAutoIdInserts;
List<ds.Key> entityDeletes;
var autoIdModelInserts;
if (inserts != null) {
entityInserts = <ds.Entity>[];
entityAutoIdInserts = <ds.Entity>[];
autoIdModelInserts = <Model>[];
for (var model in inserts) {
// If parent was not explicitly set, we assume this model will map to
// it's own entity group.
if (model.parentKey == null) {
model.parentKey = db.defaultPartition.emptyKey;
}
if (model.id == null) {
autoIdModelInserts.add(model);
entityAutoIdInserts.add(db.modelDB.toDatastoreEntity(model));
} else {
entityInserts.add(db.modelDB.toDatastoreEntity(model));
}
}
}
if (deletes != null) {
entityDeletes = deletes.map(db.modelDB.toDatastoreKey).toList();
}
return db.datastore
.commit(
inserts: entityInserts,
autoIdInserts: entityAutoIdInserts,
deletes: entityDeletes,
transaction: datastoreTransaction)
.then((ds.CommitResult result) {
if (entityAutoIdInserts != null && entityAutoIdInserts.isNotEmpty) {
for (var i = 0; i < result.autoIdInsertKeys.length; i++) {
var key = db.modelDB.fromDatastoreKey(result.autoIdInsertKeys[i]);
autoIdModelInserts[i].parentKey = key.parent;
autoIdModelInserts[i].id = key.id;
}
}
});
}
Future<List<T>> _lookupHelper<T extends Model>(DatastoreDB db, List<Key> keys,
{ds.Transaction datastoreTransaction}) {
var entityKeys = keys.map(db.modelDB.toDatastoreKey).toList();
return db.datastore
.lookup(entityKeys, transaction: datastoreTransaction)
.then((List<ds.Entity> entities) {
return entities.map<T>(db.modelDB.fromDatastoreEntity).toList();
});
}