[ledger] Move to new API.

Change-Id: Id61bb87042b8c3c2ee2bac58b481d4dc1aac5990
diff --git a/examples/ledger/todo_list/lib/src/models/todo_list_model.dart b/examples/ledger/todo_list/lib/src/models/todo_list_model.dart
index f617314..eee3b2a 100644
--- a/examples/ledger/todo_list/lib/src/models/todo_list_model.dart
+++ b/examples/ledger/todo_list/lib/src/models/todo_list_model.dart
@@ -45,12 +45,11 @@
     await _ledger.getRootPage(_page.ctrl.request());
 
     final snapshot = ledger.PageSnapshotProxy();
-    final status = await _page.getSnapshot(
+    await _page.getSnapshotNew(
       snapshot.ctrl.request(),
       new Uint8List(0),
       _pageWatcherBinding.wrap(_PageWatcher(this)),
     );
-    validateLedgerResponse(status, 'Watch');
     _readItems(snapshot);
   }
 
@@ -63,14 +62,12 @@
 
   /// Marks the item of the given [id] as done.
   void markItemDone(List<int> id) {
-    _page.delete(id).then((status) => validateLedgerResponse(status, 'Delete'));
+    _page.deleteNew(id);
   }
 
   /// Adds a new todo item with the given [content].
   void addItem(String content) {
-    _page
-        .put(_makeKey(), utf8.encode(content))
-        .then((status) => validateLedgerResponse(status, 'Put'));
+    _page.putNew(_makeKey(), utf8.encode(content));
   }
 
   void _readItems(ledger.PageSnapshotProxy snapshot) {
diff --git a/public/dart/sledge/lib/src/document/document.dart b/public/dart/sledge/lib/src/document/document.dart
index 315cb15..45926e2 100644
--- a/public/dart/sledge/lib/src/document/document.dart
+++ b/public/dart/sledge/lib/src/document/document.dart
@@ -117,10 +117,12 @@
 
   /// Applies [change] to fields of this document.
   void applyChange(final Change change) {
-    Map<Uint8List, Change> splittedChanges =
-        change.splitByPrefix(_identifierLength);
-    for (final splittedChange in splittedChanges.entries) {
-      _fields[splittedChange.key].applyChange(splittedChange.value);
+    if (_state == DocumentState.available) {
+      Map<Uint8List, Change> splittedChanges =
+          change.splitByPrefix(_identifierLength);
+      for (final splittedChange in splittedChanges.entries) {
+        _fields[splittedChange.key].applyChange(splittedChange.value);
+      }
     }
     _changeController.add(null);
   }
diff --git a/public/dart/sledge/lib/src/sledge.dart b/public/dart/sledge/lib/src/sledge.dart
index e9fb5ac..ac41193 100644
--- a/public/dart/sledge/lib/src/sledge.dart
+++ b/public/dart/sledge/lib/src/sledge.dart
@@ -40,12 +40,10 @@
   // The factories used for fake object injection.
   final LedgerObjectsFactory _ledgerObjectsFactory;
 
-  // Contains the status of the initialization.
-  // ignore: unused_field
-  Future<bool> _initializationSucceeded;
-
   ModificationQueue _modificationQueue;
 
+  Subscription _subscribtion;
+
   /// Default constructor.
   factory Sledge(ComponentContext componentContext, [SledgePageId pageId]) {
     fidl.InterfacePair<ledger.Ledger> ledgerPair = new fidl.InterfacePair();
@@ -65,20 +63,9 @@
     // 2/ Setting a conflict resolver on the LedgerProxy (not yet implemented).
     // 3/ Obtaining a LedgerPageProxy using the LedgerProxy.
     // 4/ Subscribing for change notifications on the LedgerPageProxy.
-    // Any of these steps can fail.
-    //
-    // The following Completer is completed with `false` if an error occurs at
-    // any step. It is completed with `true` if the 4th step finishes
-    // succesfully.
-    //
-    // Operations that require the succesfull initialization of the Sledge
-    // instance await the Future returned by this completer.
-    Completer<bool> initializationCompleter = new Completer<bool>();
 
     _ledgerProxy.ctrl.onConnectionError = () {
-      if (!initializationCompleter.isCompleted) {
-        initializationCompleter.complete(false);
-      }
+      // TODO(jif): Handle disconnection from the Ledger.
     };
 
     _ledgerProxy.ctrl.bind(ledgerHandle);
@@ -86,8 +73,7 @@
     _ledgerProxy.getPage(pageId.id, _pageProxy.ctrl.request());
     _modificationQueue =
         new ModificationQueue(this, _ledgerObjectsFactory, _pageProxy);
-    _subscribe(initializationCompleter);
-    _initializationSucceeded = initializationCompleter.future;
+    _subscribtion = _subscribe();
   }
 
   /// Constructor that takes a new-style binding of ComponentContext
@@ -108,11 +94,9 @@
 
   /// Convenience constructor for tests.
   Sledge.testing(this._pageProxy, this._ledgerObjectsFactory) {
-    Completer<bool> initializationCompleter = new Completer<bool>();
     _modificationQueue =
         new ModificationQueue(this, _ledgerObjectsFactory, _pageProxy);
-    _subscribe(initializationCompleter);
-    _initializationSucceeded = initializationCompleter.future;
+    _subscribtion = _subscribe();
   }
 
   /// Convenience constructor for integration tests.
@@ -122,6 +106,7 @@
 
   /// Closes connection to ledger.
   void close() {
+    _subscribtion.unsubscribe();
     _pageProxy.ctrl.close();
     _ledgerProxy.ctrl.close();
   }
@@ -130,11 +115,7 @@
   /// Returns false if an error occurred and the modification couldn't be
   /// committed.
   /// Returns true otherwise.
-  Future<bool> runInTransaction(Modification modification) async {
-    bool initializationSucceeded = await _initializationSucceeded;
-    if (!initializationSucceeded) {
-      return false;
-    }
+  Future<bool> runInTransaction(Modification modification) {
     return _modificationQueue.queueModification(modification);
   }
 
@@ -211,11 +192,10 @@
   }
 
   /// Subscribes for page.onChange to perform applyChange.
-  Subscription _subscribe(Completer<bool> subscriptionCompleter) {
+  Subscription _subscribe() {
     assert(_modificationQueue.currentTransaction == null,
         '`_subscribe` must be called before any transaction can start.');
-    return new Subscription(
-        _pageProxy, _ledgerObjectsFactory, _applyChange, subscriptionCompleter);
+    return new Subscription(_pageProxy, _ledgerObjectsFactory, _applyChange);
   }
 
   void _verifyThatTransactionHasStarted() {
diff --git a/public/dart/sledge/lib/src/storage/document_storage.dart b/public/dart/sledge/lib/src/storage/document_storage.dart
index f68a708..0cf1bce 100644
--- a/public/dart/sledge/lib/src/storage/document_storage.dart
+++ b/public/dart/sledge/lib/src/storage/document_storage.dart
@@ -28,68 +28,51 @@
 
 /// Stores [document] into [page].
 /// [document] must not be deleted.
-List<Future<ledger.Status>> saveDocumentToPage(
-    Document document, ledger.Page page) {
+void saveDocumentToPage(Document document, ledger.Page page) {
   assert(document.state == DocumentState.available);
-  final updateLedgerFutures = <Future<ledger.Status>>[];
 
   final Uint8List documentPrefix = _documentStorageKeyPrefix(document);
   final Change change = document.getChange();
 
   // Forward the "deletes".
   for (Uint8List deletedKey in change.deletedKeys) {
-    final completer = new Completer<ledger.Status>();
     final Uint8List keyWithDocumentPrefix =
         concatUint8Lists(documentPrefix, deletedKey);
-    page.delete(
+    page.deleteNew(
       keyWithDocumentPrefix,
-      completer.complete,
     );
-    updateLedgerFutures.add(completer.future);
   }
 
   // Forward the "puts".
   for (KeyValue kv in change.changedEntries) {
-    final completer = new Completer<ledger.Status>();
-
     final Uint8List keyWithDocumentPrefix =
         concatUint8Lists(documentPrefix, kv.key);
-    page.put(
+    page.putNew(
       keyWithDocumentPrefix,
       kv.value,
-      completer.complete,
     );
-    updateLedgerFutures.add(completer.future);
   }
-  return updateLedgerFutures;
 }
 
-List<Future<ledger.Status>> _deleteKeyValues(
-    List<KeyValue> keyValues, ledger.Page page) {
-  final updateLedgerFutures = <Future<ledger.Status>>[];
+void _deleteKeyValues(List<KeyValue> keyValues, ledger.Page page) {
   for (KeyValue kv in keyValues) {
-    final completer = new Completer<ledger.Status>();
-    page.delete(
+    page.deleteNew(
       kv.key,
-      completer.complete,
     );
-    updateLedgerFutures.add(completer.future);
   }
-  return updateLedgerFutures;
 }
 
 /// Deletes all the key-values storing [document] from [page] at the time
 /// [snapshot] was taken.
-Future<List<Future<ledger.Status>>> deleteDocumentFromPage(
+Future<void> deleteDocumentFromPage(
     Document document, ledger.Page page, ledger.PageSnapshot snapshot) {
   assert(document.state == DocumentState.pendingDeletion);
   final Uint8List documentPrefix = _documentStorageKeyPrefix(document);
   // TODO: Don't read the values from the snapshot as only the keys are needed.
   Future<List<KeyValue>> futureKeyValues =
       getEntriesFromSnapshotWithPrefix(snapshot, documentPrefix);
-  Future<List<Future<ledger.Status>>> futureList =
-      futureKeyValues.then((List<KeyValue> keyValues) {
-    return _deleteKeyValues(keyValues, page);
+  Future<void> futureList = futureKeyValues.then((List<KeyValue> keyValues) {
+    _deleteKeyValues(keyValues, page);
   });
   return futureList;
 }
diff --git a/public/dart/sledge/lib/src/storage/schema_storage.dart b/public/dart/sledge/lib/src/storage/schema_storage.dart
index 45f3827..1a7ea72 100644
--- a/public/dart/sledge/lib/src/storage/schema_storage.dart
+++ b/public/dart/sledge/lib/src/storage/schema_storage.dart
@@ -2,7 +2,6 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-import 'dart:async';
 import 'dart:convert';
 import 'dart:typed_data';
 
@@ -22,19 +21,15 @@
 }
 
 /// Stores [schema] into [page].
-Future<ledger.Status> saveSchemaToPage(Schema schema, ledger.Page page) {
-  final completer = new Completer<ledger.Status>();
-
+void saveSchemaToPage(Schema schema, ledger.Page page) {
   final Uint8List key = _schemaStorageKey(schema);
 
   String jsonString = json.encode(schema);
   final Uint8List value = getUint8ListFromString(jsonString);
   // TODO: handle the case where |value| is larger than the maximum allowed
   // size.
-  page.put(
+  page.putNew(
     key,
     value,
-    completer.complete,
   );
-  return completer.future;
 }
diff --git a/public/dart/sledge/lib/src/subscription/subscription.dart b/public/dart/sledge/lib/src/subscription/subscription.dart
index 7a04192..fd6594e 100644
--- a/public/dart/sledge/lib/src/subscription/subscription.dart
+++ b/public/dart/sledge/lib/src/subscription/subscription.dart
@@ -2,7 +2,6 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-import 'dart:async';
 import 'dart:typed_data';
 
 import 'package:fidl/fidl.dart';
@@ -22,26 +21,14 @@
   /// Register a watcher for Ledger page, which pass all changes to
   /// _applyChangeCallback.
   Subscription(this._pageProxy, LedgerObjectsFactory ledgerObjectsFactory,
-      this._applyChangeCallback, Completer<bool> subscriptionCompleter)
+      this._applyChangeCallback)
       : _snapshotProxy = ledgerObjectsFactory.newPageSnapshotProxy(),
         _pageWatcherBinding = ledgerObjectsFactory.newPageWatcherBinding() {
-    Completer<ledger.Status> completer = new Completer<ledger.Status>();
-    _pageProxy.getSnapshot(
+    _pageProxy.getSnapshotNew(
       _snapshotProxy.ctrl.request(),
       new Uint8List(0),
       _pageWatcherBinding.wrap(this),
-      completer.complete,
     );
-
-    completer.future.then((ledger.Status status) {
-      if (subscriptionCompleter.isCompleted) {
-        // If an error occurs, `subscriptionCompleter` may have been completed
-        // by the caller before `completer` has ran.
-        return;
-      }
-      bool subscriptionSuccesfull = status == ledger.Status.ok;
-      subscriptionCompleter.complete(subscriptionSuccesfull);
-    });
   }
 
   @override
@@ -59,7 +46,6 @@
     callback(null);
   }
 
-  // TODO: use it.
   /// Ends subscription.
   void unsubscribe() {
     _pageWatcherBinding?.close();
diff --git a/public/dart/sledge/lib/src/transaction.dart b/public/dart/sledge/lib/src/transaction.dart
index 0316fb6..ed91cfd 100644
--- a/public/dart/sledge/lib/src/transaction.dart
+++ b/public/dart/sledge/lib/src/transaction.dart
@@ -32,7 +32,6 @@
   final Sledge _sledge;
   final ledger.PageProxy _pageProxy;
   final ledger.PageSnapshotProxy _pageSnapshotProxy;
-  // TODO: close _pageSnapshotProxy
 
   /// Default constructor.
   Transaction(
@@ -42,82 +41,55 @@
   /// Runs [modification].
   Future<bool> saveModification(Modification modification) async {
     // Start Ledger transaction.
-    final startTransactionCompleter = new Completer<ledger.Status>();
-    _pageProxy.startTransaction(startTransactionCompleter.complete);
-    bool startTransactionOk =
-        (await startTransactionCompleter.future) == ledger.Status.ok;
-    if (!startTransactionOk) {
-      return false;
-    }
+    _pageProxy
+      ..startTransactionNew()
+      // Obtain the snapshot.
+      // All the read operations in |modification| will read from that snapshot.
+      ..getSnapshotNew(
+        _pageSnapshotProxy.ctrl.request(),
+        new Uint8List(0),
+        null,
+      );
 
-    // Obtain the snapshot.
-    // All the read operations in |modification| will read from that snapshot.
-    final snapshotCompleter = new Completer<ledger.Status>();
-    _pageProxy.getSnapshot(
-      _pageSnapshotProxy.ctrl.request(),
-      new Uint8List(0),
-      null,
-      snapshotCompleter.complete,
-    );
-    bool getSnapshotOk = (await snapshotCompleter.future) == ledger.Status.ok;
-    if (!getSnapshotOk) {
-      return false;
-    }
-
-    // Execute the modifications.
-    // The modifications may:
-    // - obtain a handle to a document, which would trigger a call to |getDocument|.
-    // - modify a document. This would result in |documentWasModified| being called.
     try {
-      await modification();
-    } on _RollbackException {
-      await _rollbackModification();
-      return false;
-      // ignore: avoid_catches_without_on_clauses
-    } catch (e) {
-      await _rollbackModification();
-      rethrow;
-    }
-
-    // Iterate through all the documents modified by this transaction and
-    // forward the updates (puts and deletes) to Ledger.
-    final updateLedgerFutures = <Future<ledger.Status>>[];
-    for (final document in _documents) {
-      if (document.state == DocumentState.available) {
-        updateLedgerFutures
-          ..addAll(saveDocumentToPage(document, _pageProxy))
-          ..add(saveSchemaToPage(document.documentId.schema, _pageProxy));
-      } else {
-        final futures = await deleteDocumentFromPage(
-            document, _pageProxy, _pageSnapshotProxy);
-        updateLedgerFutures.addAll(futures);
-      }
-    }
-
-    // Await until all updates have been succesfully executed.
-    // If some updates have failed, rollback.
-    final List<ledger.Status> statuses = await Future.wait(updateLedgerFutures);
-    for (final status in statuses) {
-      if (status != ledger.Status.ok) {
-        await _rollbackModification();
+      // Execute the modifications.
+      // The modifications may:
+      // - obtain a handle to a document, which would trigger a call to |getDocument|.
+      // - modify a document. This would result in |documentWasModified| being called.
+      try {
+        await modification();
+      } on _RollbackException {
+        _rollbackModification();
         return false;
+        // ignore: avoid_catches_without_on_clauses
+      } catch (e) {
+        _rollbackModification();
+        rethrow;
       }
-    }
 
-    // Finish the transaction by commiting. If the commit fails, rollback.
-    final commitCompleter = new Completer<ledger.Status>();
-    _pageProxy.commit(commitCompleter.complete);
-    bool commitOk = (await commitCompleter.future) == ledger.Status.ok;
-    if (!commitOk) {
-      await _rollbackModification();
-      return false;
-    }
+      // Iterate through all the documents modified by this transaction and
+      // forward the updates (puts and deletes) to Ledger.
+      for (final document in _documents) {
+        if (document.state == DocumentState.available) {
+          saveDocumentToPage(document, _pageProxy);
+          saveSchemaToPage(document.documentId.schema, _pageProxy);
+        } else {
+          await deleteDocumentFromPage(
+              document, _pageProxy, _pageSnapshotProxy);
+        }
+      }
 
-    // Notify the documents that the transaction has been completed.
-    _documents
-      ..forEach((Document document) => document.completeTransaction())
-      ..clear();
-    return true;
+      // Finish the transaction by commiting.
+      _pageProxy.commitNew();
+
+      // Notify the documents that the transaction has been completed.
+      _documents
+        ..forEach((Document document) => document.completeTransaction())
+        ..clear();
+      return true;
+    } finally {
+      _pageSnapshotProxy.ctrl.close();
+    }
   }
 
   /// Abort and rollback the transaction
@@ -220,15 +192,10 @@
   }
 
   /// Rollback the documents that were modified during the transaction.
-  Future _rollbackModification() async {
+  void _rollbackModification() {
     _documents
       ..forEach((Document document) => document.rollbackChange())
       ..clear();
-    final completer = new Completer<ledger.Status>();
-    _pageProxy.rollback(completer.complete);
-    bool commitOk = (await completer.future) == ledger.Status.ok;
-    if (!commitOk) {
-      throw new Exception('Transaction failed. Unable to rollback.');
-    }
+    _pageProxy.rollbackNew();
   }
 }
diff --git a/public/dart/sledge/test/fakes/fake_ledger_page.dart b/public/dart/sledge/test/fakes/fake_ledger_page.dart
index 8f9dbc2..79627b6 100644
--- a/public/dart/sledge/test/fakes/fake_ledger_page.dart
+++ b/public/dart/sledge/test/fakes/fake_ledger_page.dart
@@ -35,65 +35,42 @@
     _storageState = new StorageState(onChange);
   }
 
-  ledger.Status putStatus;
-  ledger.Status deleteStatus;
-  ledger.Status startTransactionStatus;
-  ledger.Status commitStatus;
-  ledger.Status rollbackStatus;
-  ledger.Status getSnapshotStatus;
-
-  void resetAllStatus() {
-    putStatus = null;
-    deleteStatus = null;
-    startTransactionStatus = null;
-    commitStatus = null;
-    rollbackStatus = null;
-    getSnapshotStatus = null;
-  }
-
   StorageState get storageState => _storageState;
 
   @override
-  void put(
-      Uint8List key, Uint8List value, void callback(ledger.Status status)) {
+  void putNew(Uint8List key, Uint8List value) {
     _modification.changedEntries.add(new KeyValue(key, value));
-    callback(putStatus ?? ledger.Status.ok);
   }
 
   @override
-  void delete(Uint8List key, void callback(ledger.Status status)) {
+  void deleteNew(Uint8List key) {
     _modification.deletedKeys.add(key);
-    callback(deleteStatus ?? ledger.Status.ok);
   }
 
   @override
-  void startTransaction(void callback(ledger.Status status)) {
+  void startTransactionNew() {
     assert(_modification.changedEntries.isEmpty);
     assert(_modification.deletedKeys.isEmpty);
-    callback(startTransactionStatus ?? ledger.Status.ok);
   }
 
   @override
-  void commit(void callback(ledger.Status status)) {
+  void commitNew() {
     _storageState.applyChange(_modification);
     onChange(_modification);
     _modification.clear();
-    callback(commitStatus ?? ledger.Status.ok);
   }
 
   @override
-  void rollback(void callback(ledger.Status status)) {
+  void rollbackNew() {
     _modification.clear();
-    callback(rollbackStatus ?? ledger.Status.ok);
   }
 
   @override
-  void getSnapshot(Object snapshotRequest, Uint8List keyPrefix, dynamic watcher,
-      void callback(ledger.Status status)) {
+  void getSnapshotNew(
+      Object snapshotRequest, Uint8List keyPrefix, dynamic watcher) {
     if (watcher != null) {
       _watcher = watcher;
     }
-    callback(getSnapshotStatus ?? ledger.Status.ok);
   }
 
   List<ledger.Entry> getEntries(Uint8List keyPrefix) =>
diff --git a/public/dart/sledge/test/integration_tests/sledge_integration_test.dart b/public/dart/sledge/test/integration_tests/sledge_integration_test.dart
index 955cfd5..6f8a054 100644
--- a/public/dart/sledge/test/integration_tests/sledge_integration_test.dart
+++ b/public/dart/sledge/test/integration_tests/sledge_integration_test.dart
@@ -69,7 +69,7 @@
     int someInteger;
     while (someInteger != 43) {
       await passiveSledge.runInTransaction(() async {
-        Document doc = await passiveSledge.getDocument(id);
+        Document doc = await Future(() => passiveSledge.getDocument(id));
         someInteger = doc['someInteger'].value;
       });
     }
diff --git a/public/dart/sledge/test/schema_test.dart b/public/dart/sledge/test/schema_test.dart
index 2731371..41dae49 100644
--- a/public/dart/sledge/test/schema_test.dart
+++ b/public/dart/sledge/test/schema_test.dart
@@ -7,7 +7,6 @@
 
 // TODO: investigate whether we can get rid of the implementation_imports.
 // ignore_for_file: implementation_imports
-import 'package:fidl_fuchsia_ledger/fidl.dart' as ledger;
 import 'package:lib.app.dart/logging.dart';
 import 'package:sledge/sledge.dart';
 import 'package:sledge/src/document/change.dart';
@@ -341,18 +340,17 @@
       expect(transactionSucceed, true);
       expect(doc['someInteger'].value, equals(14));
 
-      // Test case when commit fails.
-      sledge.fakeLedgerPage.commitStatus = ledger.Status.ioError;
+      // Test case when transaction fails.
       transactionSucceed = await sledge.runInTransaction(() async {
         doc['someBool'].value = true;
         doc['someInteger'].value = 42;
+        sledge.abortAndRollback();
       });
       expect(transactionSucceed, false);
       expect(doc['someBool'].value, equals(false));
       expect(doc['someInteger'].value, equals(14));
 
       // Check that after failed transaction we can get successful one.
-      sledge.fakeLedgerPage.resetAllStatus();
       transactionSucceed = await sledge.runInTransaction(() async {
         doc['someInteger'].value = 8;
       });
@@ -380,18 +378,16 @@
       expect(transactionSucceed, true);
       expect(doc['map'].length, equals(1));
 
-      // Test case when commit fails.
-      sledge.fakeLedgerPage.commitStatus = ledger.Status.ioError;
+      // Test case when transaction fails.
       transactionSucceed = await sledge.runInTransaction(() async {
         doc['map']['a'] = new Uint8List.fromList([4]);
         doc['map']['foo'] = new Uint8List.fromList([1, 3]);
+        sledge.abortAndRollback();
       });
       expect(transactionSucceed, false);
       expect(doc['map'].length, equals(1));
       expect(doc['map']['a'], equals([1, 2, 3]));
 
-      // Check that after failed transaction we can get successful one.
-      sledge.fakeLedgerPage.resetAllStatus();
       transactionSucceed = await sledge.runInTransaction(() async {
         doc['map']['foo'] = new Uint8List.fromList([1, 3]);
       });
diff --git a/public/dart/sledge/test/transaction_test.dart b/public/dart/sledge/test/transaction_test.dart
index bcefd31..2556149 100644
--- a/public/dart/sledge/test/transaction_test.dart
+++ b/public/dart/sledge/test/transaction_test.dart
@@ -34,7 +34,7 @@
     List<int> events = <int>[];
     // ignore: unawaited_futures
     sledge.runInTransaction(() async {
-      events.add(0);
+      await new Future(() => events.add(0));
     });
     expect(events, equals(<int>[]));
     await sledge.runInTransaction(() async {