| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:convert'; |
| import 'dart:typed_data'; |
| |
| import 'package:fidl/fidl.dart'; |
| import 'package:fidl_fuchsia_mem/fidl.dart' as fuchsia_mem; |
| import 'package:fidl_fuchsia_modular/fidl.dart' as fidl; |
| import 'package:lib.app.dart/logging.dart'; |
| import 'package:zircon/zircon.dart'; |
| |
| import 'link_watcher_host.dart'; |
| |
| export 'package:fidl_fuchsia_modular/fidl.dart'; |
| |
| export 'link_watcher_host.dart'; |
| |
| /// When a value for a given [ref] is not found. |
| class LinkClientNotFoundException extends Error { |
| /// The id/ref that was not found. |
| final String ref; |
| |
| /// Constructor. |
| LinkClientNotFoundException(this.ref); |
| |
| @override |
| String toString() { |
| return 'LinkClientNotFoundException: no value found for "$ref"'; |
| } |
| } |
| |
| /// Client wrapper for [fidl.Link]. |
| /// |
| /// TODO(SO-1126): implement all methods for LinkClient |
| class LinkClient { |
| /// The underlying [Proxy] used to send client requests to the [fidl.Link] |
| /// service. |
| final fidl.LinkProxy proxy = fidl.LinkProxy(); |
| |
| /// The name of the link. |
| final String name; |
| |
| /// Constructor. |
| LinkClient({ |
| this.name, |
| }) { |
| if (name == null) { |
| /// TODO: add a better warning. |
| log.warning('default links will be deprecated soon'); |
| } |
| |
| proxy.ctrl |
| ..onBind = _handleBind |
| ..onClose = _handleClose |
| ..onConnectionError = _handleConnectionError |
| ..onUnbind = _handleUnbind; |
| } |
| |
| final Completer<Null> _bind = Completer<Null>(); |
| |
| /// A future that completes when the [proxy] is bound. |
| Future<Null> get bound => _bind.future; |
| |
| void _handleBind() { |
| log.fine('proxy ready'); |
| _bind.complete(null); |
| } |
| |
| /// Get the decoded JSON value from [fidl.Link#get]. |
| Future<Object> get({ |
| List<String> path, |
| }) async { |
| log.fine('#get($path)'); |
| |
| Completer<Object> completer = Completer<Object>(); |
| |
| try { |
| await bound; |
| } on Exception catch (err, stackTrace) { |
| completer.completeError(err, stackTrace); |
| return completer.future; |
| } |
| |
| // ignore: unawaited_futures |
| proxy.ctrl.error.then((ProxyError err) { |
| if (!completer.isCompleted) { |
| completer.completeError(err); |
| } |
| }); |
| |
| try { |
| proxy.get(path, completer.complete); |
| } on Exception catch (err) { |
| completer.completeError(err); |
| } |
| |
| return completer.future; |
| } |
| |
| /// Future based API for [fidl.Link#set]. |
| Future<Null> set({ |
| List<String> path, |
| Object jsonData, |
| }) async { |
| log.fine('#set($path, $json)'); |
| |
| Completer<Null> completer = Completer<Null>(); |
| |
| try { |
| await bound; |
| } on Exception catch (err, stackTrace) { |
| completer.completeError(err, stackTrace); |
| return completer.future; |
| } |
| |
| String jsonString = json.encode(jsonData); |
| var jsonList = Uint8List.fromList(utf8.encode(jsonString)); |
| var data = fuchsia_mem.Buffer( |
| vmo: SizedVmo.fromUint8List(jsonList), |
| size: jsonList.length, |
| ); |
| |
| // ignore: unawaited_futures |
| proxy.ctrl.error.then((ProxyError err) { |
| if (!completer.isCompleted) { |
| completer.completeError(err); |
| } |
| }); |
| |
| try { |
| proxy.set(path, data); |
| } on Exception catch (err) { |
| completer.completeError(err); |
| } |
| |
| // Since there is no async success path for proxy.set (it is fire and |
| // forget) the best way to check for success is pushing a job onto the end |
| // of the async call stack and checking that the completer didn't enounter |
| // and error, no errors at this stage == success. |
| scheduleMicrotask(() { |
| if (!completer.isCompleted) { |
| completer.complete(null); |
| } |
| }); |
| |
| return completer.future; |
| } |
| |
| final List<LinkWatcherHost> _watchers = <LinkWatcherHost>[]; |
| final List<StreamController<String>> _streams = <StreamController<String>>[]; |
| bool _receivedInitialValue = false; |
| |
| /// Stream based API for [fidl.Link#watch] and [fidl.Link#watchAll]. |
| Stream<String> watch({bool all = false}) { |
| log.fine('#watch(all: $all)'); |
| |
| // TODO(SO-1127): connect the stream's control plane to the underlying link watcher |
| // so that it properly responds to clients requesting listen, pause, resume, |
| // cancel. |
| StreamController<String> controller = StreamController<String>(); |
| _streams.add(controller); |
| |
| bound.then((_) { |
| log.fine('link proxy bound, adding watcher'); |
| |
| LinkWatcherHost watcher = LinkWatcherHost(onNotify: (String data) { |
| // TODO: remove when MI4-940 is done |
| bool isInitialNullData = |
| (data == null || data == 'null') && !_receivedInitialValue; |
| if (!isInitialNullData) { |
| _receivedInitialValue = true; |
| controller.add(data); |
| } |
| }); |
| _watchers.add(watcher); |
| |
| // Using Future#catchError allows any sync errors thrown within the onValue |
| // block below to be caught without needing to add try-catch logic. |
| watcher.wrap().then((InterfaceHandle<LinkWatcher> handle) { |
| if (all) { |
| proxy.watchAll(handle); |
| } else { |
| proxy.watch(handle); |
| } |
| }, onError: controller.addError).catchError(controller.addError); |
| }); |
| |
| return controller.stream; |
| } |
| |
| /// See [fidl.Link#setEntity]. |
| Future<Null> setEntity(String ref) async { |
| assert(ref != null); |
| assert(ref.isNotEmpty); |
| |
| Completer<Null> completer = Completer<Null>(); |
| |
| try { |
| await bound; |
| } on Exception catch (err, stackTrace) { |
| completer.completeError(err, stackTrace); |
| return completer.future; |
| } |
| |
| // ignore: unawaited_futures |
| proxy.ctrl.error.then((ProxyError err) { |
| if (!completer.isCompleted) { |
| completer.completeError(err); |
| } |
| }); |
| |
| try { |
| proxy.setEntity(ref); |
| } on Exception catch (err) { |
| completer.completeError(err); |
| } |
| |
| // Since there is no async success path for proxy.set (it is fire and |
| // forget) the best way to check for success is pushing a job onto the end |
| // of the async call stack and checking that the completer didn't enounter |
| // and error, no errors at this stage == success. |
| scheduleMicrotask(() { |
| if (!completer.isCompleted) { |
| completer.complete(null); |
| } |
| }); |
| |
| return completer.future; |
| } |
| |
| /// See [fidl.Link#getEntity]. |
| Future<String> getEntity() async { |
| Completer<String> completer = Completer<String>(); |
| |
| try { |
| await bound; |
| } on Exception catch (err, stackTrace) { |
| completer.completeError(err, stackTrace); |
| return completer.future; |
| } |
| |
| // ignore: unawaited_futures |
| proxy.ctrl.error.then((ProxyError err) { |
| if (!completer.isCompleted) { |
| completer.completeError(err); |
| } |
| }); |
| |
| void handleEntity(String ref) { |
| completer.complete(ref); |
| } |
| |
| try { |
| proxy.getEntity(handleEntity); |
| } on Exception catch (err) { |
| completer.completeError(err); |
| } |
| |
| return completer.future; |
| } |
| |
| /// Closes the underlying proxy connection, should be called as a response to |
| /// Lifecycle::terminate (see https://goo.gl/MmZ2dc). |
| Future<Null> terminate() async { |
| log.info('terminate called'); |
| proxy.ctrl.close(); |
| return; |
| } |
| |
| void _handleUnbind() { |
| log.fine('proxy unbound'); |
| } |
| |
| void _handleClose() { |
| log.fine('proxy closed'); |
| for (LinkWatcherHost watcher in _watchers) { |
| watcher.terminate(); |
| } |
| _watchers.clear(); |
| |
| for (StreamController<String> stream in _streams) { |
| stream.close(); |
| } |
| |
| log.info('link watchers closed'); |
| } |
| |
| void _handleConnectionError() { |
| log.severe('LinkClient connection error'); |
| } |
| } |