blob: 48e4b9a6148400afbeb6cd5f97e68d68dcb71f05 [file] [log] [blame]
// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::any::Any;
use std::cell::Cell;
use std::sync::Arc;
use failure::Error;
use fuchsia_async as fasync;
use fuchsia_wayland_core as wl;
use fuchsia_zircon as zx;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::select;
use parking_lot::Mutex;
use wayland::WlDisplayEvent;
use crate::display::DISPLAY_SINGLETON_OBJECT_ID;
use crate::object::{MessageReceiver, ObjectLookupError, ObjectMap, ObjectRef, RequestReceiver};
use crate::registry::Registry;
type Task = Box<FnMut(&mut Client) -> Result<(), Error> + 'static>;
/// The state of a single client connection. Each client connection will have
/// have its own zircon channel and its own set of protocol objects. The
/// |Registry| is the only piece of global state that is shared between
/// clients.
pub struct Client {
/// The zircon channel used to communicate with this client.
chan: Arc<fasync::Channel>,
/// The set of objects for this client.
objects: ObjectMap,
/// A pointer to the global registry.
registry: Arc<Mutex<Registry>>,
/// An incoming task queue of closures to be invoked on the client. These
/// closures will be invoked with a mutable reference to the `Client`,
/// providing a way for background tasks to access client resources.
tasks: mpsc::UnboundedReceiver<Task>,
/// The sending endpoint for the task channel.
task_queue: TaskQueue,
/// A monotonically increasing value that can be embedded in certain events.
next_event_serial: Cell<u32>,
protocol_logging: bool,
}
impl Client {
/// Creates a new client.
pub fn new(chan: fasync::Channel, registry: Arc<Mutex<Registry>>) -> Self {
let (sender, receiver) = mpsc::unbounded();
Client {
registry,
chan: Arc::new(chan),
objects: ObjectMap::new(),
tasks: receiver,
task_queue: TaskQueue(sender),
next_event_serial: Cell::new(0),
protocol_logging: false,
}
}
#[allow(dead_code)]
pub fn next_event_serial(&self) -> u32 {
let serial = self.next_event_serial.get();
self.next_event_serial.set(serial + 1);
serial
}
/// Enables or disables protocol message logging.
pub fn set_protocol_logging(&mut self, enabled: bool) {
self.protocol_logging = enabled;
}
/// Returns `true` if protocol messages should be logged.
pub(crate) fn protocol_logging(&self) -> bool {
self.protocol_logging
}
/// Returns a object that can post messages to the `Client`.
pub fn task_queue(&self) -> TaskQueue {
self.task_queue.clone()
}
/// Spawns an async task that waits for messages to be received on the
/// zircon channel, decodes the messages, and dispatches them to the
/// corresponding |MessageReceiver|s.
pub fn start(mut self) {
fasync::spawn_local(
async move {
let mut buffer = zx::MessageBuf::new();
loop {
select! {
// Fusing: we exit when `recv_msg` fails, so we don't
// need to worry about fast-looping when the channel is
// closed.
message = self.chan.recv_msg(&mut buffer).fuse() => {
// We got a new message over the zircon channel.
if let Err(e) = message {
println!("Failed to receive message on the channel {}", e);
return;
}
// Dispatch the message.
if let Err(e) = self.receive_message(buffer.into()) {
println!("Failed to receive message on the channel {}", e);
return;
}
buffer = zx::MessageBuf::new();
},
// Fusing: we panic immediately if the task queue ever returns
// `None`, so no need to track state of the channel between
// loop iterations. NOTE: for this to remain true, no other code
// can be given access to mutate `self.tasks`.
task = self.tasks.next().fuse() => {
// A new closure has been received.
//
// We unwrap since we retain a reference to the
// sending endpoint of the channel, preventing it
// from closing.
let mut task = task.expect("Task stream has unexpectedly closed.");
if let Err(e) = task(&mut self) {
println!("Failed to run wayland task: {}", e);
return;
}
},
}
}
},
);
}
/// A pointer to the global registry for this client.
pub fn registry(&self) -> Arc<Mutex<Registry>> {
self.registry.clone()
}
/// Looks up an object in the map and returns a downcasted reference to
/// the implementation.
pub fn get_object<T: Any>(&self, id: wl::ObjectId) -> Result<&T, ObjectLookupError> {
self.objects.get(id)
}
/// Looks up an object in the map and returns a downcasted mutable
/// reference to the implementation.
pub fn get_object_mut<T: Any>(
&mut self, id: wl::ObjectId,
) -> Result<&mut T, ObjectLookupError> {
self.objects.get_mut(id)
}
/// Adds a new object into the map that will handle messages with the sender
/// set to |id|. When a message is received with the corresponding |id|, the
/// message will be decoded and forwarded to the |RequestReceiver|.
///
/// Returns Err if there is already an object for |id| in this |ObjectMap|.
pub fn add_object<I: wl::Interface + 'static, R: RequestReceiver<I> + 'static>(
&mut self, id: u32, receiver: R,
) -> Result<ObjectRef<R>, Error> {
self.objects.add_object(id, receiver)
}
/// Adds an object to the map using the low-level primitives. It's favorable
/// to use instead |add_object| if the wayland interface for the object is
/// statically known.
pub fn add_object_raw(
&mut self, id: wl::ObjectId, receiver: Box<MessageReceiver>,
request_spec: &'static wl::MessageGroupSpec,
) -> Result<(), Error> {
self.objects.add_object_raw(id, receiver, request_spec)
}
/// Deletes the object `id` from the local object map and send a notification to the
/// client confirming that `id` can be reused.
pub fn delete_id(&mut self, id: wl::ObjectId) -> Result<(), Error> {
self.objects.delete(id)?;
self.post(DISPLAY_SINGLETON_OBJECT_ID, WlDisplayEvent::DeleteId { id })
}
/// Reads the message header to find the target for this message and then
/// forwards the message to the associated |MessageReceiver|.
///
/// Returns Err if no object is associated with the sender field in the
/// message header, or if the objects receiver itself fails.
pub(crate) fn receive_message(&mut self, mut message: wl::Message) -> Result<(), Error> {
while !message.is_empty() {
let header = message.read_header()?;
// Lookup the table entry for this object & fail if there is no entry
// found.
let (receiver, spec) = self.objects.lookup_internal(&header)?;
// Decode the argument stream and invoke the |MessageReceiver|.
let args = message.read_args(spec.0)?;
receiver(header.sender, header.opcode, args, self)?;
}
Ok(())
}
pub fn post<E: wl::IntoMessage>(&self, sender: wl::ObjectId, event: E) -> Result<(), Error> {
if self.protocol_logging() {
println!("<-e-- {}", event.log(sender));
}
let message = event.into_message(sender)?;
let (bytes, mut handles) = message.take();
self.chan.write(&bytes, &mut handles)?;
Ok(())
}
}
/// A `TaskQueue` enables asynchronous operations to post tasks back to the
/// `Client`.
///
/// Ex:
/// let foo: ObjectRef<Foo> = get_foo_ref();
/// let tasks = client.task_queue();
/// task.post(|client| {
/// let foo = foo.get(client);
/// foo.handle_delayed_operation();
/// });
#[derive(Clone)]
pub struct TaskQueue(mpsc::UnboundedSender<Task>);
impl TaskQueue {
/// Posts the closure to be run as soon as possible.
pub fn post<F>(&self, f: F)
where
F: FnMut(&mut Client) -> Result<(), Error> + 'static,
{
self.0
.unbounded_send(Box::new(f))
.expect("failed to send task");
}
}