[ledger] Move to new API.
Change-Id: Id61bb87042b8c3c2ee2bac58b481d4dc1aac5990
diff --git a/examples/ledger/todo_list/lib/src/models/todo_list_model.dart b/examples/ledger/todo_list/lib/src/models/todo_list_model.dart
index f617314..eee3b2a 100644
--- a/examples/ledger/todo_list/lib/src/models/todo_list_model.dart
+++ b/examples/ledger/todo_list/lib/src/models/todo_list_model.dart
@@ -45,12 +45,11 @@
await _ledger.getRootPage(_page.ctrl.request());
final snapshot = ledger.PageSnapshotProxy();
- final status = await _page.getSnapshot(
+ await _page.getSnapshotNew(
snapshot.ctrl.request(),
new Uint8List(0),
_pageWatcherBinding.wrap(_PageWatcher(this)),
);
- validateLedgerResponse(status, 'Watch');
_readItems(snapshot);
}
@@ -63,14 +62,12 @@
/// Marks the item of the given [id] as done.
void markItemDone(List<int> id) {
- _page.delete(id).then((status) => validateLedgerResponse(status, 'Delete'));
+ _page.deleteNew(id);
}
/// Adds a new todo item with the given [content].
void addItem(String content) {
- _page
- .put(_makeKey(), utf8.encode(content))
- .then((status) => validateLedgerResponse(status, 'Put'));
+ _page.putNew(_makeKey(), utf8.encode(content));
}
void _readItems(ledger.PageSnapshotProxy snapshot) {
diff --git a/public/dart/sledge/lib/src/document/document.dart b/public/dart/sledge/lib/src/document/document.dart
index 315cb15..45926e2 100644
--- a/public/dart/sledge/lib/src/document/document.dart
+++ b/public/dart/sledge/lib/src/document/document.dart
@@ -117,10 +117,12 @@
/// Applies [change] to fields of this document.
void applyChange(final Change change) {
- Map<Uint8List, Change> splittedChanges =
- change.splitByPrefix(_identifierLength);
- for (final splittedChange in splittedChanges.entries) {
- _fields[splittedChange.key].applyChange(splittedChange.value);
+ if (_state == DocumentState.available) {
+ Map<Uint8List, Change> splittedChanges =
+ change.splitByPrefix(_identifierLength);
+ for (final splittedChange in splittedChanges.entries) {
+ _fields[splittedChange.key].applyChange(splittedChange.value);
+ }
}
_changeController.add(null);
}
diff --git a/public/dart/sledge/lib/src/sledge.dart b/public/dart/sledge/lib/src/sledge.dart
index e9fb5ac..ac41193 100644
--- a/public/dart/sledge/lib/src/sledge.dart
+++ b/public/dart/sledge/lib/src/sledge.dart
@@ -40,12 +40,10 @@
// The factories used for fake object injection.
final LedgerObjectsFactory _ledgerObjectsFactory;
- // Contains the status of the initialization.
- // ignore: unused_field
- Future<bool> _initializationSucceeded;
-
ModificationQueue _modificationQueue;
+ Subscription _subscribtion;
+
/// Default constructor.
factory Sledge(ComponentContext componentContext, [SledgePageId pageId]) {
fidl.InterfacePair<ledger.Ledger> ledgerPair = new fidl.InterfacePair();
@@ -65,20 +63,9 @@
// 2/ Setting a conflict resolver on the LedgerProxy (not yet implemented).
// 3/ Obtaining a LedgerPageProxy using the LedgerProxy.
// 4/ Subscribing for change notifications on the LedgerPageProxy.
- // Any of these steps can fail.
- //
- // The following Completer is completed with `false` if an error occurs at
- // any step. It is completed with `true` if the 4th step finishes
- // succesfully.
- //
- // Operations that require the succesfull initialization of the Sledge
- // instance await the Future returned by this completer.
- Completer<bool> initializationCompleter = new Completer<bool>();
_ledgerProxy.ctrl.onConnectionError = () {
- if (!initializationCompleter.isCompleted) {
- initializationCompleter.complete(false);
- }
+ // TODO(jif): Handle disconnection from the Ledger.
};
_ledgerProxy.ctrl.bind(ledgerHandle);
@@ -86,8 +73,7 @@
_ledgerProxy.getPage(pageId.id, _pageProxy.ctrl.request());
_modificationQueue =
new ModificationQueue(this, _ledgerObjectsFactory, _pageProxy);
- _subscribe(initializationCompleter);
- _initializationSucceeded = initializationCompleter.future;
+ _subscribtion = _subscribe();
}
/// Constructor that takes a new-style binding of ComponentContext
@@ -108,11 +94,9 @@
/// Convenience constructor for tests.
Sledge.testing(this._pageProxy, this._ledgerObjectsFactory) {
- Completer<bool> initializationCompleter = new Completer<bool>();
_modificationQueue =
new ModificationQueue(this, _ledgerObjectsFactory, _pageProxy);
- _subscribe(initializationCompleter);
- _initializationSucceeded = initializationCompleter.future;
+ _subscribtion = _subscribe();
}
/// Convenience constructor for integration tests.
@@ -122,6 +106,7 @@
/// Closes connection to ledger.
void close() {
+ _subscribtion.unsubscribe();
_pageProxy.ctrl.close();
_ledgerProxy.ctrl.close();
}
@@ -130,11 +115,7 @@
/// Returns false if an error occurred and the modification couldn't be
/// committed.
/// Returns true otherwise.
- Future<bool> runInTransaction(Modification modification) async {
- bool initializationSucceeded = await _initializationSucceeded;
- if (!initializationSucceeded) {
- return false;
- }
+ Future<bool> runInTransaction(Modification modification) {
return _modificationQueue.queueModification(modification);
}
@@ -211,11 +192,10 @@
}
/// Subscribes for page.onChange to perform applyChange.
- Subscription _subscribe(Completer<bool> subscriptionCompleter) {
+ Subscription _subscribe() {
assert(_modificationQueue.currentTransaction == null,
'`_subscribe` must be called before any transaction can start.');
- return new Subscription(
- _pageProxy, _ledgerObjectsFactory, _applyChange, subscriptionCompleter);
+ return new Subscription(_pageProxy, _ledgerObjectsFactory, _applyChange);
}
void _verifyThatTransactionHasStarted() {
diff --git a/public/dart/sledge/lib/src/storage/document_storage.dart b/public/dart/sledge/lib/src/storage/document_storage.dart
index f68a708..0cf1bce 100644
--- a/public/dart/sledge/lib/src/storage/document_storage.dart
+++ b/public/dart/sledge/lib/src/storage/document_storage.dart
@@ -28,68 +28,51 @@
/// Stores [document] into [page].
/// [document] must not be deleted.
-List<Future<ledger.Status>> saveDocumentToPage(
- Document document, ledger.Page page) {
+void saveDocumentToPage(Document document, ledger.Page page) {
assert(document.state == DocumentState.available);
- final updateLedgerFutures = <Future<ledger.Status>>[];
final Uint8List documentPrefix = _documentStorageKeyPrefix(document);
final Change change = document.getChange();
// Forward the "deletes".
for (Uint8List deletedKey in change.deletedKeys) {
- final completer = new Completer<ledger.Status>();
final Uint8List keyWithDocumentPrefix =
concatUint8Lists(documentPrefix, deletedKey);
- page.delete(
+ page.deleteNew(
keyWithDocumentPrefix,
- completer.complete,
);
- updateLedgerFutures.add(completer.future);
}
// Forward the "puts".
for (KeyValue kv in change.changedEntries) {
- final completer = new Completer<ledger.Status>();
-
final Uint8List keyWithDocumentPrefix =
concatUint8Lists(documentPrefix, kv.key);
- page.put(
+ page.putNew(
keyWithDocumentPrefix,
kv.value,
- completer.complete,
);
- updateLedgerFutures.add(completer.future);
}
- return updateLedgerFutures;
}
-List<Future<ledger.Status>> _deleteKeyValues(
- List<KeyValue> keyValues, ledger.Page page) {
- final updateLedgerFutures = <Future<ledger.Status>>[];
+void _deleteKeyValues(List<KeyValue> keyValues, ledger.Page page) {
for (KeyValue kv in keyValues) {
- final completer = new Completer<ledger.Status>();
- page.delete(
+ page.deleteNew(
kv.key,
- completer.complete,
);
- updateLedgerFutures.add(completer.future);
}
- return updateLedgerFutures;
}
/// Deletes all the key-values storing [document] from [page] at the time
/// [snapshot] was taken.
-Future<List<Future<ledger.Status>>> deleteDocumentFromPage(
+Future<void> deleteDocumentFromPage(
Document document, ledger.Page page, ledger.PageSnapshot snapshot) {
assert(document.state == DocumentState.pendingDeletion);
final Uint8List documentPrefix = _documentStorageKeyPrefix(document);
// TODO: Don't read the values from the snapshot as only the keys are needed.
Future<List<KeyValue>> futureKeyValues =
getEntriesFromSnapshotWithPrefix(snapshot, documentPrefix);
- Future<List<Future<ledger.Status>>> futureList =
- futureKeyValues.then((List<KeyValue> keyValues) {
- return _deleteKeyValues(keyValues, page);
+ Future<void> futureList = futureKeyValues.then((List<KeyValue> keyValues) {
+ _deleteKeyValues(keyValues, page);
});
return futureList;
}
diff --git a/public/dart/sledge/lib/src/storage/schema_storage.dart b/public/dart/sledge/lib/src/storage/schema_storage.dart
index 45f3827..1a7ea72 100644
--- a/public/dart/sledge/lib/src/storage/schema_storage.dart
+++ b/public/dart/sledge/lib/src/storage/schema_storage.dart
@@ -2,7 +2,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
@@ -22,19 +21,15 @@
}
/// Stores [schema] into [page].
-Future<ledger.Status> saveSchemaToPage(Schema schema, ledger.Page page) {
- final completer = new Completer<ledger.Status>();
-
+void saveSchemaToPage(Schema schema, ledger.Page page) {
final Uint8List key = _schemaStorageKey(schema);
String jsonString = json.encode(schema);
final Uint8List value = getUint8ListFromString(jsonString);
// TODO: handle the case where |value| is larger than the maximum allowed
// size.
- page.put(
+ page.putNew(
key,
value,
- completer.complete,
);
- return completer.future;
}
diff --git a/public/dart/sledge/lib/src/subscription/subscription.dart b/public/dart/sledge/lib/src/subscription/subscription.dart
index 7a04192..fd6594e 100644
--- a/public/dart/sledge/lib/src/subscription/subscription.dart
+++ b/public/dart/sledge/lib/src/subscription/subscription.dart
@@ -2,7 +2,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-import 'dart:async';
import 'dart:typed_data';
import 'package:fidl/fidl.dart';
@@ -22,26 +21,14 @@
/// Register a watcher for Ledger page, which pass all changes to
/// _applyChangeCallback.
Subscription(this._pageProxy, LedgerObjectsFactory ledgerObjectsFactory,
- this._applyChangeCallback, Completer<bool> subscriptionCompleter)
+ this._applyChangeCallback)
: _snapshotProxy = ledgerObjectsFactory.newPageSnapshotProxy(),
_pageWatcherBinding = ledgerObjectsFactory.newPageWatcherBinding() {
- Completer<ledger.Status> completer = new Completer<ledger.Status>();
- _pageProxy.getSnapshot(
+ _pageProxy.getSnapshotNew(
_snapshotProxy.ctrl.request(),
new Uint8List(0),
_pageWatcherBinding.wrap(this),
- completer.complete,
);
-
- completer.future.then((ledger.Status status) {
- if (subscriptionCompleter.isCompleted) {
- // If an error occurs, `subscriptionCompleter` may have been completed
- // by the caller before `completer` has ran.
- return;
- }
- bool subscriptionSuccesfull = status == ledger.Status.ok;
- subscriptionCompleter.complete(subscriptionSuccesfull);
- });
}
@override
@@ -59,7 +46,6 @@
callback(null);
}
- // TODO: use it.
/// Ends subscription.
void unsubscribe() {
_pageWatcherBinding?.close();
diff --git a/public/dart/sledge/lib/src/transaction.dart b/public/dart/sledge/lib/src/transaction.dart
index 0316fb6..ed91cfd 100644
--- a/public/dart/sledge/lib/src/transaction.dart
+++ b/public/dart/sledge/lib/src/transaction.dart
@@ -32,7 +32,6 @@
final Sledge _sledge;
final ledger.PageProxy _pageProxy;
final ledger.PageSnapshotProxy _pageSnapshotProxy;
- // TODO: close _pageSnapshotProxy
/// Default constructor.
Transaction(
@@ -42,82 +41,55 @@
/// Runs [modification].
Future<bool> saveModification(Modification modification) async {
// Start Ledger transaction.
- final startTransactionCompleter = new Completer<ledger.Status>();
- _pageProxy.startTransaction(startTransactionCompleter.complete);
- bool startTransactionOk =
- (await startTransactionCompleter.future) == ledger.Status.ok;
- if (!startTransactionOk) {
- return false;
- }
+ _pageProxy
+ ..startTransactionNew()
+ // Obtain the snapshot.
+ // All the read operations in |modification| will read from that snapshot.
+ ..getSnapshotNew(
+ _pageSnapshotProxy.ctrl.request(),
+ new Uint8List(0),
+ null,
+ );
- // Obtain the snapshot.
- // All the read operations in |modification| will read from that snapshot.
- final snapshotCompleter = new Completer<ledger.Status>();
- _pageProxy.getSnapshot(
- _pageSnapshotProxy.ctrl.request(),
- new Uint8List(0),
- null,
- snapshotCompleter.complete,
- );
- bool getSnapshotOk = (await snapshotCompleter.future) == ledger.Status.ok;
- if (!getSnapshotOk) {
- return false;
- }
-
- // Execute the modifications.
- // The modifications may:
- // - obtain a handle to a document, which would trigger a call to |getDocument|.
- // - modify a document. This would result in |documentWasModified| being called.
try {
- await modification();
- } on _RollbackException {
- await _rollbackModification();
- return false;
- // ignore: avoid_catches_without_on_clauses
- } catch (e) {
- await _rollbackModification();
- rethrow;
- }
-
- // Iterate through all the documents modified by this transaction and
- // forward the updates (puts and deletes) to Ledger.
- final updateLedgerFutures = <Future<ledger.Status>>[];
- for (final document in _documents) {
- if (document.state == DocumentState.available) {
- updateLedgerFutures
- ..addAll(saveDocumentToPage(document, _pageProxy))
- ..add(saveSchemaToPage(document.documentId.schema, _pageProxy));
- } else {
- final futures = await deleteDocumentFromPage(
- document, _pageProxy, _pageSnapshotProxy);
- updateLedgerFutures.addAll(futures);
- }
- }
-
- // Await until all updates have been succesfully executed.
- // If some updates have failed, rollback.
- final List<ledger.Status> statuses = await Future.wait(updateLedgerFutures);
- for (final status in statuses) {
- if (status != ledger.Status.ok) {
- await _rollbackModification();
+ // Execute the modifications.
+ // The modifications may:
+ // - obtain a handle to a document, which would trigger a call to |getDocument|.
+ // - modify a document. This would result in |documentWasModified| being called.
+ try {
+ await modification();
+ } on _RollbackException {
+ _rollbackModification();
return false;
+ // ignore: avoid_catches_without_on_clauses
+ } catch (e) {
+ _rollbackModification();
+ rethrow;
}
- }
- // Finish the transaction by commiting. If the commit fails, rollback.
- final commitCompleter = new Completer<ledger.Status>();
- _pageProxy.commit(commitCompleter.complete);
- bool commitOk = (await commitCompleter.future) == ledger.Status.ok;
- if (!commitOk) {
- await _rollbackModification();
- return false;
- }
+ // Iterate through all the documents modified by this transaction and
+ // forward the updates (puts and deletes) to Ledger.
+ for (final document in _documents) {
+ if (document.state == DocumentState.available) {
+ saveDocumentToPage(document, _pageProxy);
+ saveSchemaToPage(document.documentId.schema, _pageProxy);
+ } else {
+ await deleteDocumentFromPage(
+ document, _pageProxy, _pageSnapshotProxy);
+ }
+ }
- // Notify the documents that the transaction has been completed.
- _documents
- ..forEach((Document document) => document.completeTransaction())
- ..clear();
- return true;
+ // Finish the transaction by commiting.
+ _pageProxy.commitNew();
+
+ // Notify the documents that the transaction has been completed.
+ _documents
+ ..forEach((Document document) => document.completeTransaction())
+ ..clear();
+ return true;
+ } finally {
+ _pageSnapshotProxy.ctrl.close();
+ }
}
/// Abort and rollback the transaction
@@ -220,15 +192,10 @@
}
/// Rollback the documents that were modified during the transaction.
- Future _rollbackModification() async {
+ void _rollbackModification() {
_documents
..forEach((Document document) => document.rollbackChange())
..clear();
- final completer = new Completer<ledger.Status>();
- _pageProxy.rollback(completer.complete);
- bool commitOk = (await completer.future) == ledger.Status.ok;
- if (!commitOk) {
- throw new Exception('Transaction failed. Unable to rollback.');
- }
+ _pageProxy.rollbackNew();
}
}
diff --git a/public/dart/sledge/test/fakes/fake_ledger_page.dart b/public/dart/sledge/test/fakes/fake_ledger_page.dart
index 8f9dbc2..79627b6 100644
--- a/public/dart/sledge/test/fakes/fake_ledger_page.dart
+++ b/public/dart/sledge/test/fakes/fake_ledger_page.dart
@@ -35,65 +35,42 @@
_storageState = new StorageState(onChange);
}
- ledger.Status putStatus;
- ledger.Status deleteStatus;
- ledger.Status startTransactionStatus;
- ledger.Status commitStatus;
- ledger.Status rollbackStatus;
- ledger.Status getSnapshotStatus;
-
- void resetAllStatus() {
- putStatus = null;
- deleteStatus = null;
- startTransactionStatus = null;
- commitStatus = null;
- rollbackStatus = null;
- getSnapshotStatus = null;
- }
-
StorageState get storageState => _storageState;
@override
- void put(
- Uint8List key, Uint8List value, void callback(ledger.Status status)) {
+ void putNew(Uint8List key, Uint8List value) {
_modification.changedEntries.add(new KeyValue(key, value));
- callback(putStatus ?? ledger.Status.ok);
}
@override
- void delete(Uint8List key, void callback(ledger.Status status)) {
+ void deleteNew(Uint8List key) {
_modification.deletedKeys.add(key);
- callback(deleteStatus ?? ledger.Status.ok);
}
@override
- void startTransaction(void callback(ledger.Status status)) {
+ void startTransactionNew() {
assert(_modification.changedEntries.isEmpty);
assert(_modification.deletedKeys.isEmpty);
- callback(startTransactionStatus ?? ledger.Status.ok);
}
@override
- void commit(void callback(ledger.Status status)) {
+ void commitNew() {
_storageState.applyChange(_modification);
onChange(_modification);
_modification.clear();
- callback(commitStatus ?? ledger.Status.ok);
}
@override
- void rollback(void callback(ledger.Status status)) {
+ void rollbackNew() {
_modification.clear();
- callback(rollbackStatus ?? ledger.Status.ok);
}
@override
- void getSnapshot(Object snapshotRequest, Uint8List keyPrefix, dynamic watcher,
- void callback(ledger.Status status)) {
+ void getSnapshotNew(
+ Object snapshotRequest, Uint8List keyPrefix, dynamic watcher) {
if (watcher != null) {
_watcher = watcher;
}
- callback(getSnapshotStatus ?? ledger.Status.ok);
}
List<ledger.Entry> getEntries(Uint8List keyPrefix) =>
diff --git a/public/dart/sledge/test/integration_tests/sledge_integration_test.dart b/public/dart/sledge/test/integration_tests/sledge_integration_test.dart
index 955cfd5..6f8a054 100644
--- a/public/dart/sledge/test/integration_tests/sledge_integration_test.dart
+++ b/public/dart/sledge/test/integration_tests/sledge_integration_test.dart
@@ -69,7 +69,7 @@
int someInteger;
while (someInteger != 43) {
await passiveSledge.runInTransaction(() async {
- Document doc = await passiveSledge.getDocument(id);
+ Document doc = await Future(() => passiveSledge.getDocument(id));
someInteger = doc['someInteger'].value;
});
}
diff --git a/public/dart/sledge/test/schema_test.dart b/public/dart/sledge/test/schema_test.dart
index 2731371..41dae49 100644
--- a/public/dart/sledge/test/schema_test.dart
+++ b/public/dart/sledge/test/schema_test.dart
@@ -7,7 +7,6 @@
// TODO: investigate whether we can get rid of the implementation_imports.
// ignore_for_file: implementation_imports
-import 'package:fidl_fuchsia_ledger/fidl.dart' as ledger;
import 'package:lib.app.dart/logging.dart';
import 'package:sledge/sledge.dart';
import 'package:sledge/src/document/change.dart';
@@ -341,18 +340,17 @@
expect(transactionSucceed, true);
expect(doc['someInteger'].value, equals(14));
- // Test case when commit fails.
- sledge.fakeLedgerPage.commitStatus = ledger.Status.ioError;
+ // Test case when transaction fails.
transactionSucceed = await sledge.runInTransaction(() async {
doc['someBool'].value = true;
doc['someInteger'].value = 42;
+ sledge.abortAndRollback();
});
expect(transactionSucceed, false);
expect(doc['someBool'].value, equals(false));
expect(doc['someInteger'].value, equals(14));
// Check that after failed transaction we can get successful one.
- sledge.fakeLedgerPage.resetAllStatus();
transactionSucceed = await sledge.runInTransaction(() async {
doc['someInteger'].value = 8;
});
@@ -380,18 +378,16 @@
expect(transactionSucceed, true);
expect(doc['map'].length, equals(1));
- // Test case when commit fails.
- sledge.fakeLedgerPage.commitStatus = ledger.Status.ioError;
+ // Test case when transaction fails.
transactionSucceed = await sledge.runInTransaction(() async {
doc['map']['a'] = new Uint8List.fromList([4]);
doc['map']['foo'] = new Uint8List.fromList([1, 3]);
+ sledge.abortAndRollback();
});
expect(transactionSucceed, false);
expect(doc['map'].length, equals(1));
expect(doc['map']['a'], equals([1, 2, 3]));
- // Check that after failed transaction we can get successful one.
- sledge.fakeLedgerPage.resetAllStatus();
transactionSucceed = await sledge.runInTransaction(() async {
doc['map']['foo'] = new Uint8List.fromList([1, 3]);
});
diff --git a/public/dart/sledge/test/transaction_test.dart b/public/dart/sledge/test/transaction_test.dart
index bcefd31..2556149 100644
--- a/public/dart/sledge/test/transaction_test.dart
+++ b/public/dart/sledge/test/transaction_test.dart
@@ -34,7 +34,7 @@
List<int> events = <int>[];
// ignore: unawaited_futures
sledge.runInTransaction(() async {
- events.add(0);
+ await new Future(() => events.add(0));
});
expect(events, equals(<int>[]));
await sledge.runInTransaction(() async {