| // Copyright 2020 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| // !!! IMPORTANT !!! |
| // Most of the definitions in this file need to be kept in sync with: |
| // - //sdk/banjo/fuchsia.hardware.block/block.banjo; |
| // - //zircon/public/system/public/zircon/device/block.h. |
| |
| use { |
| anyhow::{ensure, Error}, |
| fidl_fuchsia_hardware_block as block, |
| fuchsia_async::{self as fasync, FifoReadable, FifoWritable}, |
| fuchsia_zircon::{self as zx, HandleBased}, |
| futures::channel::oneshot, |
| std::{ |
| collections::HashMap, |
| convert::TryInto, |
| future::Future, |
| ops::DerefMut, |
| pin::Pin, |
| sync::{Arc, Mutex}, |
| task::{Context, Poll, Waker}, |
| }, |
| }; |
| |
| pub use cache::Cache; |
| |
| pub mod cache; |
| |
| const BLOCK_VMOID_INVALID: u16 = 0; |
| const TEMP_VMO_SIZE: usize = 65536; |
| |
| const BLOCKIO_READ: u32 = 1; |
| const BLOCKIO_WRITE: u32 = 2; |
| const BLOCKIO_FLUSH: u32 = 3; |
| const _BLOCKIO_TRIM: u32 = 4; |
| const BLOCKIO_CLOSE_VMO: u32 = 5; |
| |
| #[repr(C)] |
| #[derive(Default)] |
| struct BlockFifoRequest { |
| op_code: u32, |
| request_id: u32, |
| group_id: u16, |
| vmoid: u16, |
| block_count: u32, |
| vmo_block: u64, |
| device_block: u64, |
| trace_flow_id: u64, |
| } |
| |
| #[repr(C)] |
| #[derive(Default)] |
| struct BlockFifoResponse { |
| status: i32, |
| request_id: u32, |
| group_id: u16, |
| reserved1: u16, |
| count: u32, |
| reserved2: u64, |
| reserved3: u64, |
| reserved4: u64, |
| } |
| |
| unsafe impl fasync::FifoEntry for BlockFifoRequest {} |
| unsafe impl fasync::FifoEntry for BlockFifoResponse {} |
| |
| pub enum BufferSlice<'a> { |
| VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 }, |
| Memory(&'a [u8]), |
| } |
| |
| impl<'a> BufferSlice<'a> { |
| pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self { |
| BufferSlice::VmoId { vmo_id, offset, length } |
| } |
| } |
| |
| impl<'a> From<&'a [u8]> for BufferSlice<'a> { |
| fn from(buf: &'a [u8]) -> Self { |
| BufferSlice::Memory(buf) |
| } |
| } |
| |
| pub enum MutableBufferSlice<'a> { |
| VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 }, |
| Memory(&'a mut [u8]), |
| } |
| |
| impl<'a> MutableBufferSlice<'a> { |
| pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self { |
| MutableBufferSlice::VmoId { vmo_id, offset, length } |
| } |
| } |
| |
| impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> { |
| fn from(buf: &'a mut [u8]) -> Self { |
| MutableBufferSlice::Memory(buf) |
| } |
| } |
| |
| #[derive(Default)] |
| struct RequestState { |
| result: Option<zx::Status>, |
| waker: Option<Waker>, |
| } |
| |
| #[derive(Default)] |
| struct FifoState { |
| // The fifo. |
| fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>, |
| |
| // The next request ID to be used. |
| next_request_id: u32, |
| |
| // A queue of messages to be sent on the fifo. |
| queue: std::collections::VecDeque<BlockFifoRequest>, |
| |
| // Map from request ID to RequestState. |
| map: HashMap<u32, RequestState>, |
| |
| // The waker for the FifoPoller. |
| poller_waker: Option<Waker>, |
| } |
| |
| impl FifoState { |
| fn terminate(&mut self) { |
| self.fifo.take(); |
| for (_, request_state) in self.map.iter_mut() { |
| request_state.result.get_or_insert(zx::Status::CANCELED); |
| if let Some(waker) = request_state.waker.take() { |
| waker.wake(); |
| } |
| } |
| if let Some(waker) = self.poller_waker.take() { |
| waker.wake(); |
| } |
| } |
| } |
| |
| type FifoStateRef = Arc<Mutex<FifoState>>; |
| |
| // A future used for fifo responses. |
| struct ResponseFuture { |
| request_id: u32, |
| fifo_state: FifoStateRef, |
| } |
| |
| impl ResponseFuture { |
| fn new(fifo_state: FifoStateRef, request_id: u32) -> Self { |
| ResponseFuture { request_id, fifo_state } |
| } |
| } |
| |
| impl Future for ResponseFuture { |
| type Output = Result<(), zx::Status>; |
| |
| fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut state = self.fifo_state.lock().unwrap(); |
| let request_state = state.map.get_mut(&self.request_id).unwrap(); |
| if let Some(result) = request_state.result { |
| Poll::Ready(result.into()) |
| } else { |
| request_state.waker.replace(context.waker().clone()); |
| Poll::Pending |
| } |
| } |
| } |
| |
| impl Drop for ResponseFuture { |
| fn drop(&mut self) { |
| self.fifo_state.lock().unwrap().map.remove(&self.request_id).unwrap(); |
| } |
| } |
| |
| /// Wraps a vmo-id. Will panic if you forget to detach. |
| pub struct VmoId(u16); |
| |
| impl VmoId { |
| fn take(&mut self) -> VmoId { |
| let vmo_id = VmoId(self.0); |
| self.0 = BLOCK_VMOID_INVALID; |
| vmo_id |
| } |
| |
| fn into_id(mut self) -> u16 { |
| let id = self.0; |
| self.0 = BLOCK_VMOID_INVALID; |
| id |
| } |
| |
| fn id(&self) -> u16 { |
| self.0 |
| } |
| } |
| |
| impl Drop for VmoId { |
| fn drop(&mut self) { |
| assert_eq!(self.0, BLOCK_VMOID_INVALID, "Did you forget to detach?"); |
| } |
| } |
| |
| /// Represents a connection to a remote block device. |
| pub struct RemoteBlockDevice { |
| device: Mutex<block::BlockSynchronousProxy>, |
| block_size: u32, |
| block_count: u64, |
| fifo_state: FifoStateRef, |
| temp_vmo: futures::lock::Mutex<zx::Vmo>, |
| temp_vmo_id: VmoId, |
| } |
| |
| impl RemoteBlockDevice { |
| /// Returns a connection to a remote block device via the given channel. |
| pub fn new(channel: zx::Channel) -> Result<Self, Error> { |
| let device = Self::from_channel(channel)?; |
| fasync::Task::spawn(FifoPoller { fifo_state: device.fifo_state.clone() }).detach(); |
| Ok(device) |
| } |
| |
| /// Returns a connection to a remote block device via the given channel, but spawns a separate |
| /// thread for polling the fifo which makes it work in cases where no executor is configured for |
| /// the calling thread. |
| pub fn new_sync(channel: zx::Channel) -> Result<Self, Error> { |
| // The fifo needs to be instantiated from the thread that has the executor as that's where |
| // the fifo registers for notifications to be delivered. |
| let (sender, receiver) = oneshot::channel::<Result<Self, Error>>(); |
| std::thread::spawn(move || { |
| let mut executor = fasync::Executor::new().expect("failed to create executor"); |
| let maybe_device = RemoteBlockDevice::from_channel(channel); |
| let fifo_state = maybe_device.as_ref().ok().map(|device| device.fifo_state.clone()); |
| let _ = sender.send(maybe_device); |
| if let Some(fifo_state) = fifo_state { |
| executor.run_singlethreaded(FifoPoller { fifo_state }); |
| } |
| }); |
| futures::executor::block_on(receiver).unwrap() |
| } |
| |
| fn from_channel(channel: zx::Channel) -> Result<Self, Error> { |
| let mut block_device = block::BlockSynchronousProxy::new(channel); |
| let (status, maybe_info) = block_device.get_info(zx::Time::INFINITE)?; |
| let info = maybe_info.ok_or(zx::Status::from_raw(status))?; |
| let (status, maybe_fifo) = block_device.get_fifo(zx::Time::INFINITE)?; |
| let fifo = fasync::Fifo::from_fifo(maybe_fifo.ok_or(zx::Status::from_raw(status))?)?; |
| let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() })); |
| let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?; |
| let (status, maybe_vmo_id) = block_device |
| .attach_vmo(temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?, zx::Time::INFINITE)?; |
| let temp_vmo_id = VmoId(maybe_vmo_id.ok_or(zx::Status::from_raw(status))?.id); |
| let device = Self { |
| device: Mutex::new(block_device), |
| block_size: info.block_size, |
| block_count: info.block_count, |
| fifo_state, |
| temp_vmo: futures::lock::Mutex::new(temp_vmo), |
| temp_vmo_id, |
| }; |
| Ok(device) |
| } |
| |
| /// Wraps AttachVmo from fuchsia.hardware.block::Block. |
| pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, Error> { |
| let mut device = self.device.lock().unwrap(); |
| let (status, maybe_vmo_id) = device |
| .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?, zx::Time::INFINITE)?; |
| Ok(VmoId(maybe_vmo_id.ok_or(zx::Status::from_raw(status))?.id)) |
| } |
| |
| /// Detaches the given vmo-id from the device. |
| pub async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), Error> { |
| self.send(BlockFifoRequest { |
| op_code: BLOCKIO_CLOSE_VMO, |
| vmoid: vmo_id.into_id(), |
| ..Default::default() |
| }) |
| .await |
| } |
| |
| fn to_blocks(&self, bytes: u64) -> Result<u64, Error> { |
| ensure!(bytes % self.block_size as u64 == 0, "bad alignment"); |
| Ok(bytes / self.block_size as u64) |
| } |
| |
| // Sends the request and waits for the response. |
| async fn send(&self, mut request: BlockFifoRequest) -> Result<(), Error> { |
| let request_id; |
| { |
| let mut state = self.fifo_state.lock().unwrap(); |
| if state.fifo.is_none() { |
| // Fifo has been closed. |
| return Err(zx::Status::CANCELED.into()); |
| } |
| request_id = state.next_request_id; |
| state.next_request_id = state.next_request_id.overflowing_add(1).0; |
| assert!( |
| state.map.insert(request_id, RequestState::default()).is_none(), |
| "request id in use!" |
| ); |
| request.request_id = request_id; |
| state.queue.push_back(request); |
| if let Some(waker) = state.poller_waker.take() { |
| waker.wake(); |
| } |
| } |
| Ok(ResponseFuture::new(self.fifo_state.clone(), request_id).await?) |
| } |
| |
| /// Reads from the device at |device_offset| into the given buffer slice. |
| pub async fn read_at( |
| &self, |
| buffer_slice: MutableBufferSlice<'_>, |
| device_offset: u64, |
| ) -> Result<(), Error> { |
| match buffer_slice { |
| MutableBufferSlice::VmoId { vmo_id, offset, length } => { |
| self.send(BlockFifoRequest { |
| op_code: BLOCKIO_READ, |
| vmoid: vmo_id.id(), |
| block_count: self.to_blocks(length)?.try_into()?, |
| vmo_block: self.to_blocks(offset)?, |
| device_block: self.to_blocks(device_offset)?, |
| ..Default::default() |
| }) |
| .await? |
| } |
| MutableBufferSlice::Memory(mut slice) => { |
| let temp_vmo = self.temp_vmo.lock().await; |
| let mut device_block = self.to_blocks(device_offset)?; |
| loop { |
| let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len()); |
| let block_count = self.to_blocks(to_do as u64)? as u32; |
| self.send(BlockFifoRequest { |
| op_code: BLOCKIO_READ, |
| vmoid: self.temp_vmo_id.id(), |
| block_count: block_count, |
| vmo_block: 0, |
| device_block: device_block, |
| ..Default::default() |
| }) |
| .await?; |
| temp_vmo.read(&mut slice[..to_do], 0)?; |
| if to_do == slice.len() { |
| break; |
| } |
| device_block += block_count as u64; |
| slice = &mut slice[to_do..]; |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Writes the data in |buffer_slice| to the device. |
| pub async fn write_at( |
| &self, |
| buffer_slice: BufferSlice<'_>, |
| device_offset: u64, |
| ) -> Result<(), Error> { |
| match buffer_slice { |
| BufferSlice::VmoId { vmo_id, offset, length } => { |
| self.send(BlockFifoRequest { |
| op_code: BLOCKIO_WRITE, |
| vmoid: vmo_id.id(), |
| block_count: self.to_blocks(length)?.try_into()?, |
| vmo_block: self.to_blocks(offset)?, |
| device_block: self.to_blocks(device_offset)?, |
| ..Default::default() |
| }) |
| .await?; |
| } |
| BufferSlice::Memory(mut slice) => { |
| let temp_vmo = self.temp_vmo.lock().await; |
| let mut device_block = self.to_blocks(device_offset)?; |
| loop { |
| let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len()); |
| let block_count = self.to_blocks(to_do as u64)? as u32; |
| temp_vmo.write(&slice[..to_do], 0)?; |
| self.send(BlockFifoRequest { |
| op_code: BLOCKIO_WRITE, |
| vmoid: self.temp_vmo_id.id(), |
| block_count: block_count, |
| vmo_block: 0, |
| device_block: device_block, |
| ..Default::default() |
| }) |
| .await?; |
| if to_do == slice.len() { |
| break; |
| } |
| device_block += block_count as u64; |
| slice = &slice[to_do..]; |
| } |
| } |
| } |
| Ok(()) |
| } |
| |
| // Flush all data |
| pub async fn flush(&self) -> Result<(), Error> { |
| self.send(BlockFifoRequest { |
| op_code: BLOCKIO_FLUSH, |
| vmoid: BLOCK_VMOID_INVALID, |
| ..Default::default() |
| }) |
| .await |
| } |
| |
| pub fn block_size(&self) -> u32 { |
| self.block_size |
| } |
| |
| pub fn block_count(&self) -> u64 { |
| self.block_count |
| } |
| |
| /// Returns true if the remote fifo is still connected. |
| pub fn is_connected(&self) -> bool { |
| self.fifo_state.lock().unwrap().fifo.is_some() |
| } |
| } |
| |
| impl Drop for RemoteBlockDevice { |
| fn drop(&mut self) { |
| // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn |
| // down. |
| self.temp_vmo_id.take().into_id(); |
| // Ignore errors here as there is not much we can do about it. |
| let _ = self.device.lock().unwrap().close_fifo(zx::Time::INFINITE); |
| } |
| } |
| |
| // FifoPoller is a future responsible for sending and receiving from the fifo. |
| struct FifoPoller { |
| fifo_state: FifoStateRef, |
| } |
| |
| impl Future for FifoPoller { |
| type Output = (); |
| |
| fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> { |
| let mut state_lock = self.fifo_state.lock().unwrap(); |
| let state = state_lock.deref_mut(); // So that we can split the borrow. |
| |
| let fifo = if let Some(fifo) = state.fifo.as_ref() { |
| fifo |
| } else { |
| return Poll::Ready(()); |
| }; |
| |
| // Send requests. |
| loop { |
| let slice = state.queue.as_slices().0; |
| if slice.is_empty() { |
| break; |
| } |
| match fifo.write(context, slice) { |
| Poll::Ready(Ok(sent)) => { |
| state.queue.drain(0..sent); |
| } |
| Poll::Ready(Err(_)) => { |
| state.terminate(); |
| return Poll::Ready(()); |
| } |
| Poll::Pending => { |
| break; |
| } |
| } |
| } |
| |
| // Receive responses. |
| while let Poll::Ready(result) = fifo.read(context) { |
| match result { |
| Ok(Some(response)) => { |
| let request_id = response.request_id; |
| // If the request isn't in the map, assume that it's a cancelled read. |
| if let Some(request_state) = state.map.get_mut(&request_id) { |
| request_state.result.replace(zx::Status::from_raw(response.status)); |
| if let Some(waker) = request_state.waker.take() { |
| waker.wake(); |
| } |
| } |
| } |
| _ => { |
| state.terminate(); |
| return Poll::Ready(()); |
| } |
| } |
| } |
| |
| state.poller_waker = Some(context.waker().clone()); |
| Poll::Pending |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| super::{ |
| BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice, RemoteBlockDevice, |
| }, |
| fidl_fuchsia_hardware_block::{self as block, BlockRequest}, |
| fuchsia_async::{self as fasync, FifoReadable, FifoWritable}, |
| fuchsia_zircon as zx, |
| futures::{ |
| future::{AbortHandle, Abortable, TryFutureExt}, |
| join, |
| stream::{futures_unordered::FuturesUnordered, StreamExt}, |
| }, |
| ramdevice_client::RamdiskClient, |
| }; |
| |
| const RAMDISK_BLOCK_SIZE: u64 = 1024; |
| const RAMDISK_BLOCK_COUNT: u64 = 1024; |
| |
| pub fn make_ramdisk() -> (RamdiskClient, RemoteBlockDevice) { |
| isolated_driver_manager::launch_isolated_driver_manager() |
| .expect("launch_isolated_driver_manager failed"); |
| ramdevice_client::wait_for_device("/dev/misc/ramctl", std::time::Duration::from_secs(10)) |
| .expect("ramctl did not appear"); |
| let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT) |
| .expect("RamdiskClient::create failed"); |
| let remote_block_device = |
| RemoteBlockDevice::new(ramdisk.open().expect("ramdisk.open failed")) |
| .expect("RemoteBlockDevice::new failed"); |
| assert_eq!(remote_block_device.block_size, 1024); |
| (ramdisk, remote_block_device) |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_against_ram_disk() { |
| let (_ramdisk, remote_block_device) = make_ramdisk(); |
| |
| let stats_before = remote_block_device |
| .device |
| .lock() |
| .unwrap() |
| .get_stats(false, zx::Time::INFINITE) |
| .expect("get_stats failed"); |
| assert_eq!(stats_before.0, zx::Status::OK.into_raw()); |
| let stats_before = stats_before.1.expect("Processing get_stats result failed"); |
| |
| let vmo = zx::Vmo::create(131072).expect("Vmo::create failed"); |
| vmo.write(b"hello", 5).expect("vmo.write failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).expect("attach_vmo failed"); |
| remote_block_device |
| .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0) |
| .await |
| .expect("write_at failed"); |
| remote_block_device |
| .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0) |
| .await |
| .expect("read_at failed"); |
| let mut buf: [u8; 5] = Default::default(); |
| vmo.read(&mut buf, 1029).expect("vmo.read failed"); |
| assert_eq!(&buf, b"hello"); |
| remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed"); |
| |
| // check that the stats are what we expect them to be |
| let stats_after = remote_block_device |
| .device |
| .lock() |
| .unwrap() |
| .get_stats(false, zx::Time::INFINITE) |
| .expect("get_stats failed"); |
| assert_eq!(stats_after.0, zx::Status::OK.into_raw()); |
| let stats_after = stats_after.1.expect("Processing get_stats result failed"); |
| // write stats |
| assert_eq!( |
| stats_before.write.success.total_calls + 1, |
| stats_after.write.success.total_calls |
| ); |
| assert_eq!( |
| stats_before.write.success.bytes_transferred + 1024, |
| stats_after.write.success.bytes_transferred |
| ); |
| assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls); |
| // read stats |
| assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls); |
| assert_eq!( |
| stats_before.read.success.bytes_transferred + 2048, |
| stats_after.read.success.bytes_transferred |
| ); |
| assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_against_ram_disk_with_flush() { |
| let (_ramdisk, remote_block_device) = make_ramdisk(); |
| |
| let stats_before = remote_block_device |
| .device |
| .lock() |
| .unwrap() |
| .get_stats(false, zx::Time::INFINITE) |
| .expect("get_stats failed"); |
| assert_eq!(stats_before.0, zx::Status::OK.into_raw()); |
| let stats_before = stats_before.1.expect("Processing get_stats result failed"); |
| |
| let vmo = zx::Vmo::create(131072).expect("Vmo::create failed"); |
| vmo.write(b"hello", 5).expect("vmo.write failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).expect("attach_vmo failed"); |
| remote_block_device |
| .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0) |
| .await |
| .expect("write_at failed"); |
| remote_block_device.flush().await.expect("flush failed"); |
| remote_block_device |
| .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0) |
| .await |
| .expect("read_at failed"); |
| let mut buf: [u8; 5] = Default::default(); |
| vmo.read(&mut buf, 1029).expect("vmo.read failed"); |
| assert_eq!(&buf, b"hello"); |
| remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed"); |
| |
| // check that the stats are what we expect them to be |
| let stats_after = remote_block_device |
| .device |
| .lock() |
| .unwrap() |
| .get_stats(false, zx::Time::INFINITE) |
| .expect("get_stats failed"); |
| assert_eq!(stats_after.0, zx::Status::OK.into_raw()); |
| let stats_after = stats_after.1.expect("Processing get_stats result failed"); |
| // write stats |
| assert_eq!( |
| stats_before.write.success.total_calls + 1, |
| stats_after.write.success.total_calls |
| ); |
| assert_eq!( |
| stats_before.write.success.bytes_transferred + 1024, |
| stats_after.write.success.bytes_transferred |
| ); |
| assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls); |
| // flush stats |
| assert_eq!( |
| stats_before.flush.success.total_calls + 1, |
| stats_after.flush.success.total_calls |
| ); |
| assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls); |
| // read stats |
| assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls); |
| assert_eq!( |
| stats_before.read.success.bytes_transferred + 2048, |
| stats_after.read.success.bytes_transferred |
| ); |
| assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_alignment() { |
| let (_ramdisk, remote_block_device) = make_ramdisk(); |
| let vmo = zx::Vmo::create(131072).expect("Vmo::create failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).expect("attach_vmo failed"); |
| remote_block_device |
| .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1) |
| .await |
| .expect_err("expected failure due to bad alignment"); |
| remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed"); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_parallel_io() { |
| let (_ramdisk, remote_block_device) = make_ramdisk(); |
| let vmo = zx::Vmo::create(131072).expect("Vmo::create failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).expect("attach_vmo failed"); |
| let mut reads = Vec::new(); |
| for _ in 0..1024 { |
| reads.push( |
| remote_block_device |
| .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0) |
| .inspect_err(|e| panic!("read should have succeeded: {}", e)), |
| ); |
| } |
| futures::future::join_all(reads).await; |
| remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed"); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_closed_device() { |
| let (ramdisk, remote_block_device) = make_ramdisk(); |
| let vmo = zx::Vmo::create(131072).expect("Vmo::create failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).expect("attach_vmo failed"); |
| let mut reads = Vec::new(); |
| for _ in 0..1024 { |
| reads.push( |
| remote_block_device |
| .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0), |
| ); |
| } |
| assert!(remote_block_device.is_connected()); |
| let _ = futures::join!(futures::future::join_all(reads), async { |
| ramdisk.destroy().expect("ramdisk.destroy failed") |
| }); |
| // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing. |
| while remote_block_device |
| .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0) |
| .await |
| .is_ok() |
| {} |
| |
| // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we |
| // get false-positives from is_connected. |
| while remote_block_device.is_connected() { |
| // Sleep for a bit to minimise lock contention. |
| fasync::Timer::new(fasync::Time::after(zx::Duration::from_millis(500))).await; |
| } |
| |
| // But once is_connected goes negative, it should stay negative. |
| assert_eq!(remote_block_device.is_connected(), false); |
| let _ = remote_block_device.detach_vmo(vmo_id).await; |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_cancelled_reads() { |
| let (_ramdisk, remote_block_device) = make_ramdisk(); |
| let vmo = zx::Vmo::create(131072).expect("Vmo::create failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).expect("attach_vmo failed"); |
| { |
| let mut reads = FuturesUnordered::new(); |
| for _ in 0..1024 { |
| reads.push( |
| remote_block_device |
| .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0), |
| ); |
| } |
| // Read the first 500 results and then dump the rest. |
| for _ in 0..500 { |
| reads.next().await; |
| } |
| } |
| remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed"); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_parallel_large_read_and_write_with_memory_succeds() { |
| let (_ramdisk, remote_block_device) = make_ramdisk(); |
| let remote_block_device_ref = &remote_block_device; |
| let test_one = |offset, len, fill| async move { |
| let buf = vec![fill; len]; |
| remote_block_device_ref |
| .write_at(buf[..].into(), offset) |
| .await |
| .expect("write_at failed"); |
| // Read back an extra block either side. |
| let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize]; |
| remote_block_device_ref |
| .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE) |
| .await |
| .expect("read_at failed"); |
| assert_eq!( |
| &read_buf[0..RAMDISK_BLOCK_SIZE as usize], |
| &[0; RAMDISK_BLOCK_SIZE as usize][..] |
| ); |
| assert_eq!( |
| &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len], |
| &buf[..] |
| ); |
| assert_eq!( |
| &read_buf[RAMDISK_BLOCK_SIZE as usize + len..], |
| &[0; RAMDISK_BLOCK_SIZE as usize][..] |
| ); |
| }; |
| const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize; |
| join!( |
| test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8), |
| test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8) |
| ); |
| } |
| |
| // Implements dummy server which can be used by test cases to verify whether |
| // channel messages and fifo operations are being received - by using set_channel_handler or |
| // set_fifo_hander respectively |
| struct FakeBlockServer<'a> { |
| server_channel: Option<zx::Channel>, |
| channel_handler: Box<dyn Fn(&BlockRequest) -> bool + 'a>, |
| fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>, |
| } |
| |
| impl<'a> FakeBlockServer<'a> { |
| // Creates a new FakeBlockServer given a channel to listen on. |
| // |
| // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server |
| // handles requests received from channel or the fifo respectfully. |
| // |
| // 'channel_handler' receives a message before it is handled by the default implementation |
| // and can return 'true' to indicate all processing is done and no further processing of |
| // that message is required |
| // |
| // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the |
| // FakeBlockServer will send over the fifo. |
| fn new( |
| server_channel: zx::Channel, |
| channel_handler: impl Fn(&BlockRequest) -> bool + 'a, |
| fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a, |
| ) -> FakeBlockServer<'a> { |
| FakeBlockServer { |
| server_channel: Some(server_channel), |
| channel_handler: Box::new(channel_handler), |
| fifo_handler: Box::new(fifo_handler), |
| } |
| } |
| |
| // Runs the server |
| async fn run(&mut self) { |
| let server = fidl::endpoints::ServerEnd::<block::BlockMarker>::new( |
| self.server_channel.take().unwrap(), |
| ); |
| |
| // Set up a mock server. |
| let (server_fifo, client_fifo) = |
| zx::Fifo::create(16, std::mem::size_of::<BlockFifoRequest>()) |
| .expect("Fifo::create failed"); |
| let maybe_server_fifo = std::sync::Mutex::new(Some(client_fifo)); |
| |
| let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair(); |
| let fifo_future = Abortable::new( |
| async { |
| let fifo = fasync::Fifo::from_fifo(server_fifo).expect("from_fifo failed"); |
| while let Some(request) = fifo.read_entry().await.expect("read_entry failed") { |
| let response = self.fifo_handler.as_ref()(request); |
| fifo.write_entries(std::slice::from_ref(&response)) |
| .await |
| .expect("write_entries failed"); |
| } |
| }, |
| fifo_future_abort_registration, |
| ); |
| |
| let channel_future = async { |
| server |
| .into_stream() |
| .expect("into_stream failed") |
| .for_each(|request| async { |
| let request = request.expect("unexpected fidl error"); |
| |
| // Give a chance for the test to register and potentially handle the event |
| if self.channel_handler.as_ref()(&request) { |
| return; |
| } |
| |
| match request { |
| BlockRequest::GetInfo { responder } => { |
| let mut block_info = block::BlockInfo { |
| block_count: 1024, |
| block_size: 512, |
| max_transfer_size: 1024 * 1024, |
| flags: 0, |
| reserved: 0, |
| }; |
| responder |
| .send(zx::sys::ZX_OK, Some(&mut block_info)) |
| .expect("send failed"); |
| } |
| BlockRequest::GetFifo { responder } => { |
| responder |
| .send(zx::sys::ZX_OK, maybe_server_fifo.lock().unwrap().take()) |
| .expect("send failed"); |
| } |
| BlockRequest::AttachVmo { vmo: _, responder } => { |
| let mut vmo_id = block::VmoId { id: 1 }; |
| responder |
| .send(zx::sys::ZX_OK, Some(&mut vmo_id)) |
| .expect("send failed"); |
| } |
| BlockRequest::CloseFifo { responder } => { |
| fifo_future_abort.abort(); |
| responder.send(zx::sys::ZX_OK).expect("send failed"); |
| } |
| _ => panic!("Unexpected message"), |
| } |
| }) |
| .await; |
| }; |
| |
| let _result = join!(fifo_future, channel_future); |
| //_result can be Err(Aborted) since FifoClose calls .abort but that's expected |
| } |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_block_fifo_close_is_called() { |
| let close_called = std::sync::Mutex::new(false); |
| let (client, server) = zx::Channel::create().expect("Channel::create failed"); |
| |
| // Have to spawn this on a different thread because RemoteBlockDevice uses a synchronous |
| // client and we are using a single threaded executor. |
| std::thread::spawn(move || { |
| let _remote_block_device = |
| RemoteBlockDevice::new_sync(client).expect("RemoteBlockDevice::new_sync failed"); |
| // The drop here should cause CloseFifo to be sent. |
| }); |
| |
| let channel_handler = |request: &BlockRequest| -> bool { |
| if let BlockRequest::CloseFifo { .. } = request { |
| *close_called.lock().unwrap() = true; |
| } |
| false |
| }; |
| let mut fake_server = FakeBlockServer::new(server, channel_handler, |_| unreachable!()); |
| fake_server.run().await; |
| |
| // After the server has finished running, we can check to see that close was called. |
| assert!(*close_called.lock().unwrap()); |
| } |
| |
| #[fasync::run_singlethreaded(test)] |
| async fn test_block_flush_is_called() { |
| let flush_called = std::sync::Mutex::new(false); |
| let (client, server) = zx::Channel::create().expect("Channel::create failed"); |
| |
| // Have to spawn this on a different thread because RemoteBlockDevice uses a synchronous |
| // client and we are using a single threaded executor. |
| std::thread::spawn(move || { |
| let remote_block_device = |
| RemoteBlockDevice::new_sync(client).expect("RemoteBlockDevice::new_sync failed"); |
| futures::executor::block_on(remote_block_device.flush()) |
| .expect("RemoteBlockDevice::flush failed"); |
| }); |
| |
| let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse { |
| *flush_called.lock().unwrap() = true; |
| assert_eq!(request.op_code, super::BLOCKIO_FLUSH); |
| BlockFifoResponse { |
| status: zx::Status::OK.into_raw(), |
| request_id: request.request_id, |
| ..Default::default() |
| } |
| }; |
| let mut fake_server = FakeBlockServer::new(server, |_| false, fifo_handler); |
| fake_server.run().await; |
| |
| // After the server has finished running, we can check to see that close was called. |
| assert!(*flush_called.lock().unwrap()); |
| } |
| } |