| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:typed_data'; |
| |
| import 'package:fidl_fuchsia_ledger/fidl.dart' as ledger; |
| |
| import 'document/change.dart'; |
| import 'document/document.dart'; |
| import 'document/document_id.dart'; |
| import 'document/values/key_value.dart'; |
| import 'ledger_helpers.dart'; |
| import 'query/query.dart'; |
| import 'sledge.dart'; |
| import 'storage/document_storage.dart'; |
| import 'storage/kv_encoding.dart' as sledge_storage; |
| import 'storage/schema_storage.dart'; |
| import 'uint8list_ops.dart'; |
| |
| typedef Modification = Future Function(); |
| |
| /// A private exception used to abort and rollback transactions |
| class _RollbackException implements Exception {} |
| |
| /// Runs a modification and tracks modified documents in order to write the |
| /// changes to Ledger. |
| class Transaction { |
| // List of Documents modified during the transaction. |
| final Set<Document> _documents = <Document>{}; |
| final Sledge _sledge; |
| final ledger.PageProxy _pageProxy; |
| final ledger.PageSnapshotProxy _pageSnapshotProxy; |
| |
| /// Default constructor. |
| Transaction( |
| this._sledge, this._pageProxy, LedgerObjectsFactory ledgerObjectsFactory) |
| : _pageSnapshotProxy = ledgerObjectsFactory.newPageSnapshotProxy(); |
| |
| /// Runs [modification]. |
| Future<bool> saveModification(Modification modification) async { |
| // Start Ledger transaction. |
| _pageProxy |
| ..startTransactionNew() |
| // Obtain the snapshot. |
| // All the read operations in |modification| will read from that snapshot. |
| ..getSnapshotNew( |
| _pageSnapshotProxy.ctrl.request(), |
| new Uint8List(0), |
| null, |
| ); |
| |
| try { |
| // Execute the modifications. |
| // The modifications may: |
| // - obtain a handle to a document, which would trigger a call to |getDocument|. |
| // - modify a document. This would result in |documentWasModified| being called. |
| try { |
| await modification(); |
| } on _RollbackException { |
| _rollbackModification(); |
| return false; |
| // ignore: avoid_catches_without_on_clauses |
| } catch (e) { |
| _rollbackModification(); |
| rethrow; |
| } |
| |
| // Iterate through all the documents modified by this transaction and |
| // forward the updates (puts and deletes) to Ledger. |
| for (final document in _documents) { |
| if (document.state == DocumentState.available) { |
| saveDocumentToPage(document, _pageProxy); |
| saveSchemaToPage(document.documentId.schema, _pageProxy); |
| } else { |
| await deleteDocumentFromPage( |
| document, _pageProxy, _pageSnapshotProxy); |
| } |
| } |
| |
| // Finish the transaction by commiting. |
| _pageProxy.commitNew(); |
| |
| // Notify the documents that the transaction has been completed. |
| _documents |
| ..forEach((Document document) => document.completeTransaction()) |
| ..clear(); |
| return true; |
| } finally { |
| _pageSnapshotProxy.ctrl.close(); |
| } |
| } |
| |
| /// Abort and rollback the transaction |
| void abortAndRollback() { |
| throw _RollbackException(); |
| } |
| |
| /// Notification that [document] was modified. |
| void documentWasModified(Document document) { |
| _documents.add(document); |
| } |
| |
| /// Returns the document identified with [documentId]. |
| /// If the document does not exist, a new document is returned. |
| Future<Document> getDocument(DocumentId documentId) async { |
| final document = new Document(_sledge, documentId); |
| Uint8List keyPrefix = concatUint8Lists( |
| sledge_storage.prefixForType(sledge_storage.KeyValueType.document), |
| documentId.prefix); |
| List<KeyValue> kvs = |
| await getEntriesFromSnapshotWithPrefix(_pageSnapshotProxy, keyPrefix); |
| |
| // Strip the document prefix from the KVs. |
| for (int i = 0; i < kvs.length; i++) { |
| kvs[i] = new KeyValue( |
| getSublistView(kvs[i].key, |
| start: DocumentId.prefixLength + sledge_storage.typePrefixLength), |
| kvs[i].value); |
| } |
| |
| if (kvs.isNotEmpty) { |
| final change = new Change(kvs); |
| document.applyChange(change); |
| } |
| |
| return document; |
| } |
| |
| /// Returns the list of the ids of all documents matching the given [query]. |
| /// The result will not contain any updates in progress in the current transaction. |
| Future<List<DocumentId>> getDocumentIds(Query query) async { |
| final documentIds = <DocumentId>[]; |
| |
| if (query.filtersDocuments()) { |
| // TODO: actually check if index is present. |
| bool indexIsPresent = false; |
| if (indexIsPresent) { |
| Uint8List keyPrefix = query.prefixInIndex(); |
| List<KeyValue> keyValues = await getEntriesFromSnapshotWithPrefix( |
| _pageSnapshotProxy, keyPrefix); |
| for (KeyValue keyValue in keyValues) { |
| documentIds.add(new DocumentId(query.schema, keyValue.value)); |
| } |
| } else { |
| print('getDocumentIds called with missing index'); |
| List<DocumentId> documentIds = |
| await getDocumentIds(new Query(query.schema)); |
| final filteredDocumentIds = <DocumentId>[]; |
| for (final documentId in documentIds) { |
| // TODO(LE-638): Avoid discarding the documents after reading them. |
| final doc = await getDocument(documentId); |
| if (query.documentMatchesQuery(doc)) { |
| filteredDocumentIds.add(documentId); |
| } |
| } |
| return filteredDocumentIds; |
| // TODO: schedule a transaction that builds the index. |
| } |
| } else { |
| Uint8List keyPrefix = concatUint8Lists( |
| sledge_storage.prefixForType(sledge_storage.KeyValueType.document), |
| query.schema.hash); |
| |
| // Get all entries that correspond to the given schema. |
| List<KeyValue> keyValues = |
| await getEntriesFromSnapshotWithPrefix(_pageSnapshotProxy, keyPrefix); |
| // Entries are sorted by key, thus entries corresponding to the same |
| // document will be in consecutive positions. |
| Uint8List currentDocumentSubId; |
| for (KeyValue keyValue in keyValues) { |
| final newDocumentSubId = |
| sledge_storage.documentSubIdFromKey(keyValue.key); |
| if (!uint8ListsAreEqual(currentDocumentSubId, newDocumentSubId)) { |
| documentIds.add(new DocumentId(query.schema, newDocumentSubId)); |
| currentDocumentSubId = newDocumentSubId; |
| } |
| } |
| } |
| return documentIds; |
| } |
| |
| /// Returns whether the document identified with [documentId] exists. |
| Future<bool> documentExists(DocumentId documentId) async { |
| Uint8List keyPrefix = concatUint8Lists( |
| sledge_storage.prefixForType(sledge_storage.KeyValueType.document), |
| documentId.prefix); |
| List<KeyValue> kvs = |
| await getEntriesFromSnapshotWithPrefix(_pageSnapshotProxy, keyPrefix); |
| return kvs.isNotEmpty; |
| } |
| |
| /// Rollback the documents that were modified during the transaction. |
| void _rollbackModification() { |
| _documents |
| ..forEach((Document document) => document.rollbackChange()) |
| ..clear(); |
| _pageProxy.rollbackNew(); |
| } |
| } |