// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of gcloud.db;

/// A function definition for transactional functions.
///
/// The function will be given a [Transaction] object which can be used to make
/// lookups/queries and queue modifications (inserts/updates/deletes).
typedef TransactionHandler = Future Function(Transaction transaction);

/// A datastore transaction.
///
/// It can be used for making lookups/queries and queue modifications
/// (inserts/updates/deletes). Finally the transaction can be either committed
/// or rolled back.
class Transaction {
  static const int _TRANSACTION_STARTED = 0;
  static const int _TRANSACTION_ROLLED_BACK = 1;
  static const int _TRANSACTION_COMMITTED = 2;

  final DatastoreDB db;
  final ds.Transaction _datastoreTransaction;

  final List<Model> _inserts = [];
  final List<Key> _deletes = [];

  int _transactionState = _TRANSACTION_STARTED;

  Transaction(this.db, this._datastoreTransaction);

  /// Looks up [keys] within this transaction.
  Future<List<T>> lookup<T extends Model>(List<Key> keys) {
    return _lookupHelper<T>(db, keys,
        datastoreTransaction: _datastoreTransaction);
  }

  /// Enqueues [inserts] and [deletes] which should be committed at commit time.
  void queueMutations({List<Model> inserts, List<Key> deletes}) {
    _checkSealed();
    if (inserts != null) {
      _inserts.addAll(inserts);
    }
    if (deletes != null) {
      _deletes.addAll(deletes);
    }
  }

  /// Query for [kind] models with [ancestorKey].
  ///
  /// Note that [ancestorKey] is required, since a transaction is not allowed to
  /// touch/look at an arbitrary number of rows.
  Query<T> query<T extends Model>(Key ancestorKey, {Partition partition}) {
    // TODO(#25): The `partition` element is redundant and should be removed.
    if (partition == null) {
      partition = ancestorKey.partition;
    } else if (ancestorKey.partition != partition) {
      throw ArgumentError(
          'Ancestor queries must have the same partition in the ancestor key '
          'as the partition where the query executes in.');
    }
    _checkSealed();
    return Query<T>(db,
        partition: partition,
        ancestorKey: ancestorKey,
        datastoreTransaction: _datastoreTransaction);
  }

  /// Rolls this transaction back.
  Future rollback() {
    _checkSealed(changeState: _TRANSACTION_ROLLED_BACK);
    return db.datastore.rollback(_datastoreTransaction);
  }

  /// Commits this transaction including all of the queued mutations.
  Future commit() {
    _checkSealed(changeState: _TRANSACTION_COMMITTED);
    return _commitHelper(db,
        inserts: _inserts,
        deletes: _deletes,
        datastoreTransaction: _datastoreTransaction);
  }

  _checkSealed({int changeState}) {
    if (_transactionState == _TRANSACTION_COMMITTED) {
      throw StateError('The transaction has already been committed.');
    } else if (_transactionState == _TRANSACTION_ROLLED_BACK) {
      throw StateError('The transaction has already been rolled back.');
    }
    if (changeState != null) {
      _transactionState = changeState;
    }
  }
}

class Query<T extends Model> {
  final _relationMapping = const <String, ds.FilterRelation>{
    '<': ds.FilterRelation.LessThan,
    '<=': ds.FilterRelation.LessThanOrEqual,
    '>': ds.FilterRelation.GreatherThan,
    '>=': ds.FilterRelation.GreatherThanOrEqual,
    '=': ds.FilterRelation.Equal,
  };

  final DatastoreDB _db;
  final ds.Transaction _transaction;
  final String _kind;

  final Partition _partition;
  final Key _ancestorKey;

  final List<ds.Filter> _filters = [];
  final List<ds.Order> _orders = [];
  int _offset;
  int _limit;

  Query(DatastoreDB dbImpl,
      {Partition partition,
      Key ancestorKey,
      ds.Transaction datastoreTransaction})
      : _db = dbImpl,
        _kind = dbImpl.modelDB.kindName(T),
        _partition = partition,
        _ancestorKey = ancestorKey,
        _transaction = datastoreTransaction;

  /// Adds a filter to this [Query].
  ///
  /// [filterString] has form "name OP" where 'name' is a fieldName of the
  /// model and OP is an operator. The following operators are supported:
  ///
  ///   * '<' (less than)
  ///   * '<=' (less than or equal)
  ///   * '>' (greater than)
  ///   * '>=' (greater than or equal)
  ///   * '=' (equal)
  ///
  /// [comparisonObject] is the object for comparison.
  void filter(String filterString, Object comparisonObject) {
    var parts = filterString.split(' ');
    if (parts.length != 2 || !_relationMapping.containsKey(parts[1])) {
      throw ArgumentError("Invalid filter string '$filterString'.");
    }

    var name = parts[0];
    var comparison = parts[1];
    var propertyName = _convertToDatastoreName(name);

    // This is for backwards compatibility: We allow [datastore.Key]s for now.
    // TODO: We should remove the condition in a major version update of
    // `package:gcloud`.
    if (comparisonObject is! ds.Key) {
      comparisonObject = _db.modelDB
          .toDatastoreValue(_kind, name, comparisonObject, forComparison: true);
    }
    _filters.add(ds.Filter(
        _relationMapping[comparison], propertyName, comparisonObject));
  }

  /// Adds an order to this [Query].
  ///
  /// [orderString] has the form "-name" where 'name' is a fieldName of the model
  /// and the optional '-' says whether the order is descending or ascending.
  void order(String orderString) {
    // TODO: validate [orderString] (e.g. is name valid)
    if (orderString.startsWith('-')) {
      _orders.add(ds.Order(ds.OrderDirection.Decending,
          _convertToDatastoreName(orderString.substring(1))));
    } else {
      _orders.add(ds.Order(
          ds.OrderDirection.Ascending, _convertToDatastoreName(orderString)));
    }
  }

  /// Sets the [offset] of this [Query].
  ///
  /// When running this query, [offset] results will be skipped.
  void offset(int offset) {
    _offset = offset;
  }

  /// Sets the [limit] of this [Query].
  ///
  /// When running this query, a maximum of [limit] results will be returned.
  void limit(int limit) {
    _limit = limit;
  }

  /// Execute this [Query] on the datastore.
  ///
  /// Outside of transactions this method might return stale data or may not
  /// return the newest updates performed on the datastore since updates
  /// will be reflected in the indices in an eventual consistent way.
  Stream<T> run() {
    ds.Key ancestorKey;
    if (_ancestorKey != null) {
      ancestorKey = _db.modelDB.toDatastoreKey(_ancestorKey);
    }
    var query = ds.Query(
        ancestorKey: ancestorKey,
        kind: _kind,
        filters: _filters,
        orders: _orders,
        offset: _offset,
        limit: _limit);

    ds.Partition partition;
    if (_partition != null) {
      partition = ds.Partition(_partition.namespace);
    }

    return StreamFromPages<ds.Entity>((int pageSize) {
      return _db.datastore
          .query(query, transaction: _transaction, partition: partition);
    }).stream.map<T>(_db.modelDB.fromDatastoreEntity);
  }

  // TODO:
  // - add runPaged() returning Page<Model>
  // - add run*() method once we have EntityResult{Entity,Cursor} in low-level
  //   API.

  String _convertToDatastoreName(String name) {
    var propertyName = _db.modelDB.fieldNameToPropertyName(_kind, name);
    if (propertyName == null) {
      throw ArgumentError("Field $name is not available for kind $_kind");
    }
    return propertyName;
  }
}

class DatastoreDB {
  final ds.Datastore datastore;
  final ModelDB _modelDB;
  Partition _defaultPartition;

  DatastoreDB(this.datastore, {ModelDB modelDB, Partition defaultPartition})
      : _modelDB = modelDB != null ? modelDB : ModelDBImpl() {
    _defaultPartition =
        defaultPartition != null ? defaultPartition : Partition(null);
  }

  /// The [ModelDB] used to serialize/deserialize objects.
  ModelDB get modelDB => _modelDB;

  /// Gets the empty key using the default [Partition].
  ///
  /// Model keys with parent set to [emptyKey] will create their own entity
  /// groups.
  Key get emptyKey => defaultPartition.emptyKey;

  /// Gets the default [Partition].
  Partition get defaultPartition => _defaultPartition;

  /// Creates a new [Partition] with namespace [namespace].
  Partition newPartition(String namespace) {
    return Partition(namespace);
  }

  /// Begins a new a new transaction.
  ///
  /// A transaction can touch only a limited number of entity groups. This limit
  /// is currently 5.
  // TODO: Add retries and/or auto commit/rollback.
  Future withTransaction(TransactionHandler transactionHandler) {
    return datastore
        .beginTransaction(crossEntityGroup: true)
        .then((datastoreTransaction) {
      var transaction = Transaction(this, datastoreTransaction);
      return transactionHandler(transaction);
    });
  }

  /// Build a query for [kind] models.
  Query<T> query<T extends Model>({Partition partition, Key ancestorKey}) {
    // TODO(#26): There is only one case where `partition` is not redundant
    // Namely if `ancestorKey == null` and `partition != null`. We could
    // say we get rid of `partition` and enforce `ancestorKey` to
    // be `Partition.emptyKey`?
    if (partition == null) {
      if (ancestorKey != null) {
        partition = ancestorKey.partition;
      } else {
        partition = defaultPartition;
      }
    } else if (ancestorKey != null && partition != ancestorKey.partition) {
      throw ArgumentError(
          'Ancestor queries must have the same partition in the ancestor key '
          'as the partition where the query executes in.');
    }
    return Query<T>(this, partition: partition, ancestorKey: ancestorKey);
  }

  /// Looks up [keys] in the datastore and returns a list of [Model] objects.
  ///
  /// For transactions, please use [beginTransaction] and call the [lookup]
  /// method on it's returned [Transaction] object.
  Future<List<T>> lookup<T extends Model>(List<Key> keys) {
    return _lookupHelper<T>(this, keys);
  }

  /// Add [inserts] to the datastore and remove [deletes] from it.
  ///
  /// The order of inserts and deletes is not specified. When the commit is done
  /// direct lookups will see the effect but non-ancestor queries will see the
  /// change in an eventual consistent way.
  ///
  /// For transactions, please use `beginTransaction` and it's returned
  /// [Transaction] object.
  Future commit({List<Model> inserts, List<Key> deletes}) {
    return _commitHelper(this, inserts: inserts, deletes: deletes);
  }
}

Future _commitHelper(DatastoreDB db,
    {List<Model> inserts,
    List<Key> deletes,
    ds.Transaction datastoreTransaction}) {
  List<ds.Entity> entityInserts, entityAutoIdInserts;
  List<ds.Key> entityDeletes;
  var autoIdModelInserts;
  if (inserts != null) {
    entityInserts = <ds.Entity>[];
    entityAutoIdInserts = <ds.Entity>[];
    autoIdModelInserts = <Model>[];

    for (var model in inserts) {
      // If parent was not explicitly set, we assume this model will map to
      // it's own entity group.
      if (model.parentKey == null) {
        model.parentKey = db.defaultPartition.emptyKey;
      }
      if (model.id == null) {
        autoIdModelInserts.add(model);
        entityAutoIdInserts.add(db.modelDB.toDatastoreEntity(model));
      } else {
        entityInserts.add(db.modelDB.toDatastoreEntity(model));
      }
    }
  }
  if (deletes != null) {
    entityDeletes = deletes.map(db.modelDB.toDatastoreKey).toList();
  }

  return db.datastore
      .commit(
          inserts: entityInserts,
          autoIdInserts: entityAutoIdInserts,
          deletes: entityDeletes,
          transaction: datastoreTransaction)
      .then((ds.CommitResult result) {
    if (entityAutoIdInserts != null && entityAutoIdInserts.isNotEmpty) {
      for (var i = 0; i < result.autoIdInsertKeys.length; i++) {
        var key = db.modelDB.fromDatastoreKey(result.autoIdInsertKeys[i]);
        autoIdModelInserts[i].parentKey = key.parent;
        autoIdModelInserts[i].id = key.id;
      }
    }
  });
}

Future<List<T>> _lookupHelper<T extends Model>(DatastoreDB db, List<Key> keys,
    {ds.Transaction datastoreTransaction}) {
  var entityKeys = keys.map(db.modelDB.toDatastoreKey).toList();
  return db.datastore
      .lookup(entityKeys, transaction: datastoreTransaction)
      .then((List<ds.Entity> entities) {
    return entities.map<T>(db.modelDB.fromDatastoreEntity).toList();
  });
}
