blob: 9fb539b17ad176e0175e8c414d76ca22b54e7b16 [file] [log] [blame]
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
anyhow::{format_err, Context as _, Error},
fidl::{
client::QueryResponseFut,
endpoints::{ClientEnd, Proxy},
},
fidl_fuchsia_sysmem::{
AllocatorProxy, BufferCollectionConstraints, BufferCollectionInfo2, BufferCollectionMarker,
BufferCollectionProxy, BufferCollectionTokenMarker, BufferMemorySettings,
},
fuchsia_zircon as zx,
futures::{
future::{FusedFuture, Future},
ready,
task::{Context, Poll},
FutureExt,
},
log::error,
std::convert::TryInto,
std::pin::Pin,
};
/// A set of buffers that have been allocated with the SysmemAllocator.
#[derive(Debug)]
pub struct SysmemAllocatedBuffers {
buffers: Vec<zx::Vmo>,
settings: BufferMemorySettings,
_buffer_collection: BufferCollectionProxy,
}
impl SysmemAllocatedBuffers {
/// Settings of the buffers that are available through `SysmemAllocator::get`
/// Returns None if the buffers are not allocated yet.
pub fn settings(&self) -> &BufferMemorySettings {
&self.settings
}
/// Get a VMO which has been allocated from the
pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
let idx = idx as usize;
return self.buffers.get_mut(idx);
}
/// Get the number of VMOs that have been allocated.
/// Returns None if the allocation is not complete yet.
pub fn len(&self) -> u32 {
self.buffers.len().try_into().expect("buffers should fit in u32")
}
}
/// A Future that communicates with the `fuchsia.sysmem.Allocator` service to allocate shared
/// buffers.
pub enum SysmemAllocation {
Pending,
/// Waiting for the Sync response from the Allocator
WaitingForSync {
future: QueryResponseFut<()>,
token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
buffer_collection: BufferCollectionProxy,
},
/// Waiting for the buffers to be allocated, which should eventually happen after delivering the token.
WaitingForAllocation(
QueryResponseFut<(zx::zx_status_t, BufferCollectionInfo2)>,
BufferCollectionProxy,
),
/// Allocation is completed. The status here represents whether it completed successfully (ZX_OK) or an error.
Done(zx::Status),
}
impl SysmemAllocation {
/// A pending allocation which has not been started, and will never finish.
pub fn pending() -> Self {
Self::Pending
}
/// Allocate a new shared memory collection, using `allocator` to communicate with the Allocator
/// service. `constraints` will be used to allocate the collection. A shared collection token
/// client end will be provided to the `token_target_fn` once the request has been synced with
/// the collection. This token can be used with `SysmemAllocation::shared` to finish allocating
/// the shared buffers or provided to another service to share allocation, or duplicated to
/// share this memory with more than one other client.
pub fn allocate<
F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
>(
allocator: AllocatorProxy,
constraints: BufferCollectionConstraints,
token_target_fn: F,
) -> Result<Self, Error> {
let (client_token, client_token_request) =
fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>()?;
allocator
.allocate_shared_collection(client_token_request)
.context("Allocating shared collection")?;
// Duplicate to get another BufferCollectionToken to the same collection.
let (token, token_request) = fidl::endpoints::create_endpoints()?;
client_token.duplicate(std::u32::MAX, token_request)?;
let client_end_token =
ClientEnd::new(client_token.into_channel().unwrap().into_zx_channel());
let mut res = Self::bind(allocator, client_end_token, constraints)?;
if let Self::WaitingForSync { token_fn, .. } = &mut res {
token_fn.replace(Box::new(move || token_target_fn(token)));
}
Ok(res)
}
/// Bind to a shared memory collection, using `allocator` to communicate with the Allocator
/// service and a `token` which has already been allocated. `constraints` is set to communicate
/// the requirements of this client.
pub fn bind(
allocator: AllocatorProxy,
token: ClientEnd<BufferCollectionTokenMarker>,
mut constraints: BufferCollectionConstraints,
) -> Result<Self, Error> {
let (buffer_collection, collection_request) =
fidl::endpoints::create_proxy::<BufferCollectionMarker>()?;
allocator.bind_shared_collection(token, collection_request)?;
buffer_collection
.set_constraints(true, &mut constraints)
.context("sending constraints to sysmem")?;
Ok(Self::WaitingForSync {
future: buffer_collection.sync(),
token_fn: None,
buffer_collection,
})
}
/// Advances a synced collection to wait for the allocation of the buffers, after synced.
/// Delivers the token to the target as the collection is aware of it now and can reliably
/// detect when all tokens have been turned in and constraints have been set.
fn synced(&mut self) -> Result<(), Error> {
*self = match std::mem::replace(self, Self::Done(zx::Status::BAD_STATE)) {
Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
if let Some(deliver_token_fn) = token_fn {
deliver_token_fn();
}
Self::WaitingForAllocation(
buffer_collection.wait_for_buffers_allocated(),
buffer_collection,
)
}
_ => Self::Done(zx::Status::BAD_STATE),
};
if let Self::Done(e) = self {
return Err(e.into_io_error().into());
}
Ok(())
}
/// Finish once the allocation has completed. Returns the buffers and marks the allocation as
/// complete.
fn allocated(
&mut self,
status: zx::zx_status_t,
mut buffer_info: BufferCollectionInfo2,
) -> Result<SysmemAllocatedBuffers, Error> {
match std::mem::replace(self, Self::Done(zx::Status::from_raw(status))) {
Self::WaitingForAllocation(_, buffer_collection) => {
let num_buffers = buffer_info.buffer_count.try_into()?;
let mut buffers = Vec::new();
for buffer in buffer_info.buffers[0..num_buffers].iter_mut() {
buffers.push(buffer.vmo.take().ok_or(format_err!("missing buffer"))?);
}
Ok(SysmemAllocatedBuffers {
buffers,
settings: buffer_info.settings.buffer_settings,
_buffer_collection: buffer_collection,
})
}
_ => Err(format_err!("allocation complete but not in the right state")),
}
}
}
impl FusedFuture for SysmemAllocation {
fn is_terminated(&self) -> bool {
match self {
Self::Done(_) => true,
_ => false,
}
}
}
impl Future for SysmemAllocation {
type Output = Result<SysmemAllocatedBuffers, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let s = Pin::into_inner(self);
if let Self::WaitingForSync { future, .. } = s {
match ready!(future.poll_unpin(cx)) {
Err(e) => {
error!("SysmemAllocator error: {:?}", e);
return Poll::Ready(Err(e.into()));
}
Ok(()) => {
if let Err(e) = s.synced() {
return Poll::Ready(Err(e));
}
}
};
}
if let Self::WaitingForAllocation(future, _) = s {
match ready!(future.poll_unpin(cx)) {
Ok((status, buffer_collection)) => {
return Poll::Ready(s.allocated(status, buffer_collection))
}
Err(e) => {
error!("SysmemAllocator waiting error: {:?}", e);
Poll::Ready(Err(e.into()))
}
}
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl::encoding::Decodable;
use fidl_fuchsia_sysmem::{
AllocatorMarker, AllocatorRequest, BufferCollectionRequest, BufferCollectionTokenProxy,
BufferCollectionTokenRequest, BufferCollectionTokenRequestStream, CoherencyDomain,
HeapType, ImageFormatConstraints, SingleBufferSettings, VmoBuffer,
};
use fuchsia_async::{self as fasync, pin_mut};
use futures::StreamExt;
use crate::buffer_collection_constraints::BUFFER_COLLECTION_CONSTRAINTS_DEFAULT;
fn assert_tokens_connected(
exec: &mut fasync::Executor,
proxy: &BufferCollectionTokenProxy,
requests: &mut BufferCollectionTokenRequestStream,
) {
let mut sync_fut = proxy.sync();
match exec.run_until_stalled(&mut requests.next()) {
Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
responder.send().expect("respond to sync")
}
x => panic!("Expected vended token to be connected, got {:?}", x),
};
// The sync future is ready now.
assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
}
#[test]
fn allocate_future() {
let mut exec = fasync::Executor::new().expect("executor creation");
let (proxy, mut allocator_requests) =
fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>().unwrap();
let (sender, mut receiver) = futures::channel::oneshot::channel();
let token_fn = move |token| {
sender.send(token).expect("should be able to send token");
};
let mut allocation =
SysmemAllocation::allocate(proxy, BUFFER_COLLECTION_CONSTRAINTS_DEFAULT, token_fn)
.expect("starting should work");
let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
token_request,
..
}))) => token_request.into_stream().expect("request into stream"),
x => panic!("Expected a shared allocation request, got {:?}", x),
};
let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate {
rights_attenuation_mask: _,
token_request,
..
}))) => token_request.into_stream().expect("duplicate request into stream"),
x => panic!("Expected a duplication request, got {:?}", x),
};
let (token_client_1, mut collection_requests_1) = match exec
.run_until_stalled(&mut allocator_requests.next())
{
Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection {
token,
buffer_collection_request,
..
}))) => (
token.into_proxy().unwrap(),
buffer_collection_request.into_stream().expect("collection request into stream"),
),
x => panic!("Expected Bind Shared Collection, got: {:?}", x),
};
// The token turned into the allocator for binding should be connected to the server on allocating.
assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
match exec.run_until_stalled(&mut collection_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
x => panic!("Expected buffer constraints request, got {:?}", x),
};
let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
x => panic!("Expected a sync request, got {:?}", x),
};
// The sysmem allocator is now waiting for the sync from the collection
assert!(exec.run_until_stalled(&mut allocation).is_pending());
// When it gets a response that the collection is synced, it vends the token out
sync_responder.send().expect("respond to sync request");
assert!(exec.run_until_stalled(&mut allocation).is_pending());
let token_client_2 = match receiver.try_recv() {
Ok(Some(token)) => token.into_proxy().unwrap(),
x => panic!("Should have a token sent to the fn, got {:?}", x),
};
// token_client_2 should be attached to the token_requests_2 that we handed over to sysmem
// (in the token duplicate)
assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
// We should have received a wait for the buffers to be allocated in our collection
const SIZE_BYTES: u32 = 1024;
let buffer_settings = BufferMemorySettings {
size_bytes: SIZE_BYTES,
is_physically_contiguous: true,
is_secure: false,
coherency_domain: CoherencyDomain::Ram,
heap: HeapType::SystemRam,
};
match exec.run_until_stalled(&mut collection_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForBuffersAllocated {
responder,
}))) => {
let single_buffer_settings = SingleBufferSettings {
buffer_settings,
has_image_format_constraints: false,
image_format_constraints: ImageFormatConstraints::new_empty(),
};
let mut buffer_collection_info = BufferCollectionInfo2 {
buffer_count: 1,
settings: single_buffer_settings,
..BufferCollectionInfo2::new_empty()
};
buffer_collection_info.buffers[0] = VmoBuffer {
vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
vmo_usable_start: 0,
};
responder
.send(zx::Status::OK.into_raw(), &mut buffer_collection_info)
.expect("send collection response")
}
x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
};
// The allocator should now be finished!
let mut buffers = match exec.run_until_stalled(&mut allocation) {
Poll::Pending => panic!("allocation should be done"),
Poll::Ready(res) => res.expect("successful allocation"),
};
assert_eq!(1, buffers.len());
assert!(buffers.get_mut(0).is_some());
assert_eq!(buffers.settings(), &buffer_settings);
}
#[test]
fn with_system_allocator() {
let mut exec = fasync::Executor::new().expect("executor creation");
let sysmem_client = fuchsia_component::client::connect_to_service::<AllocatorMarker>()
.expect("connect to allocator");
let mut buffer_constraints = BufferCollectionConstraints {
min_buffer_count: 2,
has_buffer_memory_constraints: true,
..BUFFER_COLLECTION_CONSTRAINTS_DEFAULT
};
buffer_constraints.buffer_memory_constraints.min_size_bytes = 4096;
let (sender, mut receiver) = futures::channel::oneshot::channel();
let token_fn = move |token| {
sender.send(token).expect("should be able to send token");
};
let mut allocation =
SysmemAllocation::allocate(sysmem_client.clone(), buffer_constraints, token_fn)
.expect("start allocator");
// Receive the token. From here on, using the token, the test becomes the other client to
// the Allocator sharing the memory.
let token = loop {
assert!(exec.run_until_stalled(&mut allocation).is_pending());
if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
break x;
}
};
let token = token.expect("receive token");
let (buffer_collection_client, buffer_collection_requests) =
fidl::endpoints::create_proxy::<BufferCollectionMarker>().expect("proxy creation");
sysmem_client.bind_shared_collection(token, buffer_collection_requests).expect("bind okay");
buffer_collection_client
.set_constraints(true, &mut buffer_constraints)
.expect("constraints should send okay");
let allocation_fut = buffer_collection_client.wait_for_buffers_allocated();
pin_mut!(allocation_fut);
let (status, buffers) =
exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
assert_eq!(zx::Status::OK.into_raw(), status);
// Allocator should be ready now.
let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
Poll::Ready(bufs) => bufs.expect("allocation success"),
x => panic!("Expected ready, got {:?}", x),
};
let _allocator_settings = allocated_buffers.settings();
assert_eq!(buffers.buffer_count, allocated_buffers.len());
}
}