| // Copyright 2021 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| use { |
| crate::fuchsia::{errors::map_to_status, file::FxFile, node::OpenedNode}, |
| anyhow::Error, |
| fidl::endpoints::ServerEnd, |
| fidl_fuchsia_hardware_block as block, |
| fidl_fuchsia_hardware_block_volume::{ |
| self as volume, VolumeAndNodeMarker, VolumeAndNodeRequest, |
| }, |
| fidl_fuchsia_io as fio, |
| fuchsia_async::{self as fasync, FifoReadable, FifoWritable}, |
| fuchsia_zircon as zx, |
| futures::{stream::TryStreamExt, try_join}, |
| fxfs::{ |
| errors::FxfsError, |
| round::{round_down, round_up}, |
| }, |
| remote_block_device::{BlockFifoRequest, BlockFifoResponse}, |
| rustc_hash::FxHashMap as HashMap, |
| std::{ |
| collections::BTreeMap, |
| hash::{Hash, Hasher}, |
| sync::Mutex, |
| }, |
| vfs::{execution_scope::ExecutionScope, file::File, node::Node}, |
| }; |
| |
| // Multiple Block I/O request may be sent as a group. |
| // Notes: |
| // - the group is identified by `group_id` in the request |
| // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST` |
| // flag is set. |
| // - when processing a request of a group fails, subsequent requests of that |
| // group will not be processed. |
| // |
| // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details. |
| // |
| // FifoMessageGroup keeps track of the relevant BlockFifoResponse field for |
| // a group requests. Only `status` and `count` needs to be updated. |
| struct FifoMessageGroup { |
| group_id: u16, |
| status: zx::sys::zx_status_t, |
| count: u32, |
| } |
| |
| impl Hash for FifoMessageGroup { |
| fn hash<H: Hasher>(&self, state: &mut H) { |
| self.group_id.hash(state); |
| } |
| } |
| |
| impl FifoMessageGroup { |
| // Initialise a FifoMessageGroup given the request group ID. |
| // `count` is set to 0 as no requests has been processed yet. |
| fn new(group_id: u16) -> Self { |
| Self { group_id, status: zx::sys::ZX_OK, count: 0 } |
| } |
| |
| // Takes the FifoMessageGroup and converts it to a BlockFifoResponse. |
| // Note that this doesn't return the request ID, it needs to be set |
| // after extracting the BlockFifoResponse before sending it |
| fn into_response(self) -> BlockFifoResponse { |
| return BlockFifoResponse { |
| status: self.status, |
| group: self.group_id, |
| count: self.count, |
| ..Default::default() |
| }; |
| } |
| |
| fn increment_count(&mut self) { |
| self.count += 1; |
| } |
| |
| fn set_status(&mut self, status: zx::sys::zx_status_t) { |
| self.status = status; |
| } |
| |
| fn is_err(&self) -> bool { |
| self.status != zx::sys::ZX_OK |
| } |
| } |
| |
| struct FifoMessageGroups(HashMap<u16, FifoMessageGroup>); |
| |
| // Keeps track of all the group requests that are currently being processed |
| impl FifoMessageGroups { |
| fn new() -> Self { |
| Self(HashMap::default()) |
| } |
| |
| // Returns the current MessageGroup with this group ID |
| fn get(&mut self, group_id: u16) -> &mut FifoMessageGroup { |
| self.0.entry(group_id).or_insert_with(|| FifoMessageGroup::new(group_id)) |
| } |
| |
| // Remove a group when `BlockIoFlag::GROUP_LAST` flag is set. |
| fn remove(&mut self, group_id: u16) -> FifoMessageGroup { |
| match self.0.remove(&group_id) { |
| Some(group) => group, |
| // `remove(group_id)` can be called when the group has not yet been |
| // added to this FifoMessageGroups. In which case, return a default |
| // MessageGroup. |
| None => FifoMessageGroup::new(group_id), |
| } |
| } |
| } |
| |
| // This is the default slice size used for Volumes in devices |
| const DEVICE_VOLUME_SLICE_SIZE: u64 = 32 * 1024; |
| |
| /// Implements server to handle Block requests |
| pub struct BlockServer { |
| file: OpenedNode<FxFile>, |
| scope: ExecutionScope, |
| server_channel: Option<zx::Channel>, |
| maybe_server_fifo: Mutex<Option<zx::Fifo>>, |
| message_groups: Mutex<FifoMessageGroups>, |
| vmos: Mutex<BTreeMap<u16, zx::Vmo>>, |
| } |
| |
| impl BlockServer { |
| /// Creates a new BlockServer given a server channel to listen on. |
| pub fn new( |
| file: OpenedNode<FxFile>, |
| scope: ExecutionScope, |
| server_channel: zx::Channel, |
| ) -> BlockServer { |
| BlockServer { |
| file, |
| scope, |
| server_channel: Some(server_channel), |
| maybe_server_fifo: Mutex::new(None), |
| message_groups: Mutex::new(FifoMessageGroups::new()), |
| vmos: Mutex::new(BTreeMap::new()), |
| } |
| } |
| |
| // Returns a VMO id that is currently not being used |
| fn get_vmo_id(&self, vmo: zx::Vmo) -> Option<u16> { |
| let mut vmos = self.vmos.lock().unwrap(); |
| let mut prev_id = 0; |
| for &id in vmos.keys() { |
| if id != prev_id + 1 { |
| let vmo_id = prev_id + 1; |
| vmos.insert(vmo_id, vmo); |
| return Some(vmo_id); |
| } |
| prev_id = id; |
| } |
| if prev_id < std::u16::MAX { |
| let vmo_id = prev_id + 1; |
| vmos.insert(vmo_id, vmo); |
| Some(vmo_id) |
| } else { |
| None |
| } |
| } |
| |
| async fn handle_blockio_write(&self, request: &BlockFifoRequest) -> Result<(), Error> { |
| let block_size = self.file.get_block_size(); |
| |
| let data = { |
| let vmos = self.vmos.lock().unwrap(); |
| let vmo = vmos.get(&request.vmoid).ok_or(FxfsError::NotFound)?; |
| let mut buffer = vec![0u8; (request.length as u64 * block_size) as usize]; |
| vmo.read(&mut buffer[..], request.vmo_offset * block_size)?; |
| buffer |
| }; |
| |
| self.file.write_at_uncached(request.dev_offset * block_size as u64, &data[..]).await?; |
| |
| Ok(()) |
| } |
| |
| async fn handle_blockio_read(&self, request: &BlockFifoRequest) -> Result<(), Error> { |
| let block_size = self.file.get_block_size(); |
| |
| let mut buffer = vec![0u8; (request.length as u64 * block_size) as usize]; |
| let bytes_read = self |
| .file |
| .read_at_uncached(request.dev_offset * (block_size as u64), &mut buffer[..]) |
| .await?; |
| |
| // Fill in the rest of the buffer if bytes_read is less than the requested amount |
| buffer[bytes_read as usize..].fill(0); |
| |
| let vmos = self.vmos.lock().unwrap(); |
| let vmo = vmos.get(&request.vmoid).ok_or(FxfsError::NotFound)?; |
| vmo.write(&buffer[..], request.vmo_offset * block_size)?; |
| |
| Ok(()) |
| } |
| |
| async fn process_fifo_request(&self, request: &BlockFifoRequest) -> zx::sys::zx_status_t { |
| fn into_raw_status(result: Result<(), Error>) -> zx::sys::zx_status_t { |
| let status: zx::Status = result.map_err(|e| map_to_status(e)).into(); |
| status.into_raw() |
| } |
| |
| match remote_block_device::BlockOpcode::from_primitive(request.command.opcode) { |
| Some(remote_block_device::BlockOpcode::CloseVmo) => { |
| let mut vmos = self.vmos.lock().unwrap(); |
| match vmos.remove(&request.vmoid) { |
| Some(_vmo) => zx::sys::ZX_OK, |
| None => zx::sys::ZX_ERR_NOT_FOUND, |
| } |
| } |
| Some(remote_block_device::BlockOpcode::Write) => { |
| into_raw_status(self.handle_blockio_write(&request).await) |
| } |
| Some(remote_block_device::BlockOpcode::Read) => { |
| into_raw_status(self.handle_blockio_read(&request).await) |
| } |
| // TODO(https://fxbug.dev/42171261): simply returning ZX_OK since we're |
| // writing to device and no need to flush cache, but need to |
| // check that flush goes down the stack |
| Some(remote_block_device::BlockOpcode::Flush) => zx::sys::ZX_OK, |
| // TODO(https://fxbug.dev/42171261) |
| Some(remote_block_device::BlockOpcode::Trim) => zx::sys::ZX_OK, |
| None => panic!("Unexpected message, request {:?}", request.command.opcode), |
| } |
| } |
| |
| async fn handle_fifo_request(&self, request: BlockFifoRequest) -> Option<BlockFifoResponse> { |
| let flags = remote_block_device::BlockIoFlag::from_bits_truncate(request.command.flags); |
| let is_group = flags.contains(remote_block_device::BlockIoFlag::GROUP_ITEM); |
| let wants_reply = flags.contains(remote_block_device::BlockIoFlag::GROUP_LAST); |
| |
| // Set up the BlockFifoResponse for this request, but do no process request yet |
| let mut maybe_reply = { |
| if is_group { |
| let mut groups = self.message_groups.lock().unwrap(); |
| if wants_reply { |
| let mut group = groups.remove(request.group); |
| group.increment_count(); |
| |
| // This occurs when a previous request in this group has failed |
| if group.is_err() { |
| let mut reply = group.into_response(); |
| reply.reqid = request.reqid; |
| // No need to process this request |
| return Some(reply); |
| } |
| |
| let mut response = group.into_response(); |
| response.reqid = request.reqid; |
| Some(response) |
| } else { |
| let group = groups.get(request.group); |
| group.increment_count(); |
| |
| if group.is_err() { |
| // No need to process this request |
| return None; |
| } |
| None |
| } |
| } else { |
| Some(BlockFifoResponse { reqid: request.reqid, count: 1, ..Default::default() }) |
| } |
| }; |
| let status = self.process_fifo_request(&request).await; |
| // Status only needs to be updated in the reply if it's not OK |
| if status != zx::sys::ZX_OK { |
| match &mut maybe_reply { |
| None => { |
| // maybe_reply will only be None if it's part of a group request |
| self.message_groups.lock().unwrap().get(request.group).set_status(status); |
| } |
| Some(reply) => { |
| reply.status = status; |
| } |
| } |
| } |
| maybe_reply |
| } |
| |
| fn handle_clone_request(&self, object: zx::Channel) { |
| let file = OpenedNode::new(self.file.clone()); |
| let scope_cloned = self.scope.clone(); |
| self.scope.spawn(async move { |
| let mut cloned_server = BlockServer::new(file, scope_cloned, object); |
| let _ = cloned_server.run().await; |
| }); |
| } |
| |
| async fn handle_request(&self, request: VolumeAndNodeRequest) -> Result<(), Error> { |
| match request { |
| VolumeAndNodeRequest::GetInfo { responder } => { |
| let block_size = self.file.get_block_size(); |
| let block_count = |
| (self.file.get_size().await.unwrap() + block_size - 1) / block_size; |
| responder.send(Ok(&block::BlockInfo { |
| block_count, |
| block_size: block_size as u32, |
| max_transfer_size: 1024 * 1024, |
| flags: block::Flag::empty(), |
| }))?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::GetStats { clear: _, responder } => { |
| responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?; |
| } |
| VolumeAndNodeRequest::OpenSession { session, control_handle: _ } => { |
| let stream = session.into_stream()?; |
| let () = stream |
| .try_for_each(|request| async { |
| let () = match request { |
| block::SessionRequest::GetFifo { responder } => { |
| match self.maybe_server_fifo.lock().unwrap().take() { |
| Some(fifo) => responder.send(Ok(fifo))?, |
| None => { |
| responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))? |
| } |
| } |
| } |
| block::SessionRequest::AttachVmo { vmo, responder } => { |
| match self.get_vmo_id(vmo) { |
| Some(vmo_id) => { |
| responder.send(Ok(&block::VmoId { id: vmo_id }))? |
| } |
| None => { |
| responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))? |
| } |
| } |
| } |
| // TODO(https://fxbug.dev/42171261): close fifo |
| block::SessionRequest::Close { responder } => responder.send(Ok(()))?, |
| }; |
| Ok(()) |
| }) |
| .await?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::ReadBlocks { |
| responder, |
| vmo: _, |
| length: _, |
| dev_offset: _, |
| vmo_offset: _, |
| } => { |
| responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::WriteBlocks { |
| responder, |
| vmo: _, |
| length: _, |
| dev_offset: _, |
| vmo_offset: _, |
| } => { |
| responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::GetTypeGuid { responder } => { |
| responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::GetInstanceGuid { responder } => { |
| responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::GetName { responder } => { |
| responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?; |
| } |
| VolumeAndNodeRequest::QuerySlices { start_slices, responder } => { |
| // Initialise slices with default value. |
| let default = volume::VsliceRange { allocated: false, count: 0 }; |
| let mut slices = [default; volume::MAX_SLICE_REQUESTS as usize]; |
| |
| let mut status = zx::sys::ZX_OK; |
| let mut response_count = 0; |
| for (slice, start_slice) in slices.iter_mut().zip(start_slices.into_iter()) { |
| match self.file.is_allocated(start_slice * DEVICE_VOLUME_SLICE_SIZE).await { |
| Ok((allocated, bytes)) => { |
| slice.count = round_up(bytes, DEVICE_VOLUME_SLICE_SIZE).unwrap(); |
| slice.allocated = allocated; |
| response_count += 1; |
| } |
| Err(e) => { |
| status = e.into_raw(); |
| break; |
| } |
| } |
| } |
| responder.send(status, &slices, response_count)?; |
| } |
| // TODO(https://fxbug.dev/42171261): need to check if this returns the right information. |
| VolumeAndNodeRequest::GetVolumeInfo { responder } => { |
| match self.file.get_attrs().await { |
| Ok(attr) => { |
| let allocated_bytes = attr.storage_size; |
| let unallocated_bytes = |
| self.file.get_size_uncached().await - allocated_bytes; |
| let allocated_slices = |
| round_up(allocated_bytes, DEVICE_VOLUME_SLICE_SIZE).unwrap(); |
| let unallocated_bytes = |
| round_down(unallocated_bytes, DEVICE_VOLUME_SLICE_SIZE); |
| let manager = volume::VolumeManagerInfo { |
| slice_size: DEVICE_VOLUME_SLICE_SIZE, |
| slice_count: allocated_slices + unallocated_bytes, |
| assigned_slice_count: allocated_slices, |
| maximum_slice_count: allocated_slices + unallocated_bytes, |
| max_virtual_slice: allocated_slices + unallocated_bytes, |
| }; |
| let volume_info = volume::VolumeInfo { |
| partition_slice_count: allocated_slices, |
| slice_limit: 0, |
| }; |
| responder.send(zx::sys::ZX_OK, Some(&manager), Some(&volume_info))?; |
| } |
| Err(e) => { |
| responder.send(e.into_raw(), None, None)?; |
| } |
| } |
| } |
| VolumeAndNodeRequest::Extend { start_slice, slice_count, responder } => { |
| // TODO(https://fxbug.dev/42171261): this is a hack. When extend is called, the extent is |
| // expected to be set as allocated. The easiest way to do this is to just |
| // write an extent of zeroed data. Another issue here is the size. The memory |
| // allocated here should be bounded to what's available. |
| let data = vec![0u8; (slice_count * DEVICE_VOLUME_SLICE_SIZE) as usize]; |
| match self |
| .file |
| .write_at_uncached(start_slice * DEVICE_VOLUME_SLICE_SIZE, data[..].into()) |
| .await |
| { |
| Ok(_) => responder.send(zx::sys::ZX_OK)?, |
| Err(status) => responder.send(status.into_raw())?, |
| }; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::Shrink { start_slice: _, slice_count: _, responder } => { |
| responder.send(zx::sys::ZX_OK)?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::Destroy { responder } => { |
| responder.send(zx::sys::ZX_OK)?; |
| } |
| VolumeAndNodeRequest::Clone { flags: _, object, control_handle: _ } => { |
| // Have to move this into a non-async function to avoid Rust compiler's |
| // complaint about recursive async functions |
| self.handle_clone_request(object.into_channel()); |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::Reopen { |
| rights_request: _, |
| object_request: _, |
| control_handle: _, |
| } => {} |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::Close { responder } => { |
| responder.send(Ok(()))?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::GetConnectionInfo { responder } => { |
| // TODO(https://fxbug.dev/42157659): Fill in rights and available operations. |
| let info = fio::ConnectionInfo::default(); |
| responder.send(info)?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::Sync { responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::GetAttr { responder } => { |
| match self.file.get_attrs().await { |
| Ok(mut attrs) => { |
| attrs.mode = fio::MODE_TYPE_BLOCK_DEVICE; |
| responder.send(zx::sys::ZX_OK, &attrs)?; |
| } |
| Err(e) => { |
| let attrs = fio::NodeAttributes { |
| mode: 0, |
| id: fio::INO_UNKNOWN, |
| content_size: 0, |
| storage_size: 0, |
| link_count: 0, |
| creation_time: 0, |
| modification_time: 0, |
| }; |
| responder.send(e.into_raw(), &attrs)?; |
| } |
| }; |
| } |
| // TODO(https://fxbug.dev/42171261) VolumeAndNodeRequest::GetAttributes { query, responder } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::SetAttr { flags: _, attributes: _, responder } => { |
| responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::GetAttributes { query: _, responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::UpdateAttributes { payload: _, responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::ListExtendedAttributes { iterator, .. } => { |
| iterator.close_with_epitaph(zx::Status::NOT_SUPPORTED)?; |
| } |
| VolumeAndNodeRequest::GetExtendedAttribute { responder, .. } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::SetExtendedAttribute { responder, .. } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::RemoveExtendedAttribute { responder, .. } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::GetFlags { responder } => { |
| responder.send(zx::sys::ZX_OK, fio::OpenFlags::NODE_REFERENCE)?; |
| } |
| // TODO(https://fxbug.dev/42171261) |
| VolumeAndNodeRequest::SetFlags { flags: _, responder } => { |
| responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?; |
| } |
| // TODO(https://fxbug.dev/42056856) |
| VolumeAndNodeRequest::Query { responder } => { |
| responder.send(fio::NODE_PROTOCOL_NAME.as_bytes())?; |
| } |
| VolumeAndNodeRequest::QueryFilesystem { responder } => { |
| match self.file.query_filesystem() { |
| Ok(info) => responder.send(zx::sys::ZX_OK, Some(&info))?, |
| Err(e) => responder.send(e.into_raw(), None)?, |
| } |
| } |
| VolumeAndNodeRequest::ConnectToDeviceFidl { server, control_handle: _ } => { |
| // This should serve *only* Volume, but it's a pain to write that without |
| // duplication. |
| // |
| // TODO(https://fxbug.dev/42171261): make it so. |
| // TODO(https://fxbug.dev/42054916): make it so. |
| self.handle_clone_request(server); |
| } |
| VolumeAndNodeRequest::ConnectToController { server, control_handle: _ } => { |
| // This should serve *only* Controller, but it's a pain to write that without |
| // duplication. |
| // |
| // TODO(https://fxbug.dev/42171261): make it so. |
| // TODO(https://fxbug.dev/42054916): make it so. |
| self.handle_clone_request(server.into_channel()); |
| } |
| VolumeAndNodeRequest::Bind { driver: _, responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::Rebind { driver: _, responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::UnbindChildren { responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::ScheduleUnbind { responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| VolumeAndNodeRequest::GetTopologicalPath { responder } => { |
| responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?; |
| } |
| } |
| Ok(()) |
| } |
| |
| async fn handle_requests( |
| &self, |
| server: fidl::endpoints::ServerEnd<VolumeAndNodeMarker>, |
| ) -> Result<(), Error> { |
| server |
| .into_stream()? |
| .map_err(|e| e.into()) |
| .try_for_each_concurrent(None, |request| self.handle_request(request)) |
| .await?; |
| Ok(()) |
| } |
| |
| pub async fn run(&mut self) -> Result<(), Error> { |
| let server = ServerEnd::<VolumeAndNodeMarker>::new(self.server_channel.take().unwrap()); |
| |
| // Create a fifo pair |
| let (server_fifo, client_fifo) = |
| zx::Fifo::create(16, std::mem::size_of::<BlockFifoRequest>())?; |
| self.maybe_server_fifo = Mutex::new(Some(client_fifo)); |
| |
| // Handling requests from fifo |
| let fifo_future = async { |
| let fifo = fasync::Fifo::<BlockFifoRequest, BlockFifoResponse>::from_fifo(server_fifo); |
| loop { |
| match fifo.read_entry().await { |
| Ok(request) => { |
| if let Some(response) = self.handle_fifo_request(request).await { |
| fifo.write_entries(std::slice::from_ref(&response)).await?; |
| } |
| // if `self.handle_fifo_request(..)` returns None, then |
| // there's no reply for this request. This occurs for |
| // requests part of a group request where |
| // `BlockIoFlag::GROUP_LAST` flag is not set. |
| } |
| Err(zx::Status::PEER_CLOSED) => break Result::<_, Error>::Ok(()), |
| Err(e) => break Err(e.into()), |
| } |
| } |
| }; |
| |
| // Handling requests from fidl |
| let channel_future = async { |
| self.handle_requests(server).await?; |
| // This is temporary for when client doesn't call for fifo |
| self.maybe_server_fifo.lock().unwrap().take(); |
| Ok(()) |
| }; |
| |
| try_join!(fifo_future, channel_future)?; |
| Ok(()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use { |
| crate::fuchsia::testing::{open_file_checked, TestFixture}, |
| fidl::endpoints::{ClientEnd, ServerEnd}, |
| fidl_fuchsia_device::ControllerMarker, |
| fidl_fuchsia_hardware_block::BlockMarker, |
| fidl_fuchsia_hardware_block_volume::VolumeAndNodeMarker, |
| fidl_fuchsia_io as fio, |
| fs_management::{filesystem::Filesystem, Blobfs}, |
| fuchsia_merkle::MerkleTree, |
| fuchsia_zircon as zx, |
| futures::join, |
| remote_block_device::{BlockClient, RemoteBlockClient, VmoId}, |
| rustc_hash::FxHashSet as HashSet, |
| }; |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_block_server() { |
| let fixture = TestFixture::new().await; |
| let (client, server) = fidl::endpoints::create_proxy::<ControllerMarker>().unwrap(); |
| join!( |
| async { |
| let mut blobfs = Filesystem::new(client, Blobfs::default()); |
| blobfs.format().await.expect("format blobfs failed"); |
| blobfs.fsck().await.expect("fsck failed"); |
| }, |
| async { |
| let root = fixture.root(); |
| |
| let file = open_file_checked( |
| &root, |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::NOT_DIRECTORY, |
| "block_device", |
| ) |
| .await; |
| let () = file |
| .resize(2 * 1024 * 1024) |
| .await |
| .expect("resize failed") |
| .map_err(zx::Status::from_raw) |
| .expect("resize error"); |
| let () = file |
| .close() |
| .await |
| .expect("close failed") |
| .map_err(zx::Status::from_raw) |
| .expect("close error"); |
| |
| root.open( |
| fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "block_device", |
| server.into_channel().into(), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_clone() { |
| let fixture = TestFixture::new().await; |
| let (client_channel, server_channel) = zx::Channel::create(); |
| let (client_channel_copy1, server_channel_copy1) = zx::Channel::create(); |
| let (client_channel_copy2, server_channel_copy2) = zx::Channel::create(); |
| |
| join!( |
| async { |
| // Putting original block device in its own execution scope to test that the |
| // clone will work independent of the original |
| { |
| let original_block_device = |
| ClientEnd::<VolumeAndNodeMarker>::new(client_channel) |
| .into_proxy() |
| .expect("convert into proxy failed"); |
| original_block_device |
| .clone( |
| fio::OpenFlags::CLONE_SAME_RIGHTS, |
| ServerEnd::new(server_channel_copy1), |
| ) |
| .expect("clone failed"); |
| original_block_device |
| .clone( |
| fio::OpenFlags::CLONE_SAME_RIGHTS, |
| ServerEnd::new(server_channel_copy2), |
| ) |
| .expect("clone failed"); |
| } |
| |
| let block_device_cloned1 = RemoteBlockClient::new( |
| ClientEnd::<BlockMarker>::new(client_channel_copy1) |
| .into_proxy() |
| .expect("create proxy"), |
| ) |
| .await |
| .expect("create new RemoteBlockClient failed"); |
| let block_device_cloned2 = RemoteBlockClient::new( |
| ClientEnd::<BlockMarker>::new(client_channel_copy2) |
| .into_proxy() |
| .expect("create proxy"), |
| ) |
| .await |
| .expect("create new RemoteBlockClient failed"); |
| |
| let offset = block_device_cloned1.block_size() as usize; |
| let len = block_device_cloned1.block_size() as usize; |
| // Must write with length as a multiple of the block_size |
| let write_buf = vec![0xa3u8; len]; |
| // Write to "foo" via block_device_cloned1 |
| block_device_cloned1 |
| .write_at(write_buf[..].into(), offset as u64) |
| .await |
| .expect("write_at failed"); |
| let mut read_buf = vec![0u8; len]; |
| block_device_cloned2 |
| .read_at(read_buf.as_mut_slice().into(), offset as u64) |
| .await |
| .expect("read_at failed"); |
| assert_eq!(&read_buf, &write_buf); |
| }, |
| async { |
| let root = fixture.root(); |
| root.open( |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "foo", |
| ServerEnd::new(server_channel), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_attach_vmo() { |
| let fixture = TestFixture::new().await; |
| let (client_channel, server_channel) = zx::Channel::create(); |
| join!( |
| async { |
| let remote_block_device = RemoteBlockClient::new( |
| ClientEnd::<BlockMarker>::new(client_channel) |
| .into_proxy() |
| .expect("create proxy"), |
| ) |
| .await |
| .expect("RemoteBlockClient::new failed"); |
| let mut vmo_set = HashSet::default(); |
| let vmo = zx::Vmo::create(1).expect("Vmo::create failed"); |
| for _ in 1..5 { |
| match remote_block_device.attach_vmo(&vmo).await { |
| Ok(vmo_id) => { |
| // TODO(https://fxbug.dev/42171261): need to detach vmoid. into_id() is a |
| // temporary solution. Remove this after detaching vmo has been |
| // implemented |
| // Make sure that vmo_id is unique |
| assert_eq!(vmo_set.insert(vmo_id.into_id()), true); |
| } |
| Err(e) => panic!("unexpected error {:?}", e), |
| } |
| } |
| }, |
| async { |
| let root = fixture.root(); |
| root.open( |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "foo", |
| ServerEnd::new(server_channel), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_detach_vmo() { |
| let fixture = TestFixture::new().await; |
| let (client_channel, server_channel) = zx::Channel::create(); |
| join!( |
| async { |
| let remote_block_device = RemoteBlockClient::new( |
| ClientEnd::<BlockMarker>::new(client_channel) |
| .into_proxy() |
| .expect("create proxy"), |
| ) |
| .await |
| .expect("RemoteBlockClient::new failed"); |
| let vmo = zx::Vmo::create(1).expect("Vmo::create failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed"); |
| let vmo_id_copy = VmoId::new(vmo_id.id()); |
| remote_block_device.detach_vmo(vmo_id).await.expect("detach failed"); |
| remote_block_device.detach_vmo(vmo_id_copy).await.expect_err("detach succeeded"); |
| }, |
| async { |
| let root = fixture.root(); |
| root.open( |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "foo", |
| ServerEnd::new(server_channel), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_read_write_files() { |
| let fixture = TestFixture::new().await; |
| let (client_channel, server_channel) = zx::Channel::create(); |
| join!( |
| async { |
| let remote_block_device = RemoteBlockClient::new( |
| ClientEnd::<BlockMarker>::new(client_channel) |
| .into_proxy() |
| .expect("create proxy"), |
| ) |
| .await |
| .expect("RemoteBlockClient::new failed"); |
| let vmo = zx::Vmo::create(131072).expect("create vmo failed"); |
| let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed"); |
| |
| // Must write with length as a multiple of the block_size |
| let offset = remote_block_device.block_size() as usize; |
| let len = remote_block_device.block_size() as usize; |
| let write_buf = vec![0xa3u8; len]; |
| remote_block_device |
| .write_at(write_buf[..].into(), offset as u64) |
| .await |
| .expect("write_at failed"); |
| |
| // Read back an extra block either side |
| let mut read_buf = vec![0u8; len + 2 * remote_block_device.block_size() as usize]; |
| remote_block_device |
| .read_at(read_buf.as_mut_slice().into(), 0) |
| .await |
| .expect("read_at failed"); |
| |
| // We expect the extra block on the LHS of the read_buf to be 0 |
| assert_eq!(&read_buf[..offset], &vec![0; offset][..]); |
| |
| assert_eq!(&read_buf[offset..offset + len], &write_buf); |
| |
| // We expect the extra block on the RHS of the read_buf to be 0 |
| assert_eq!( |
| &read_buf[offset + len..], |
| &vec![0; remote_block_device.block_size() as usize][..] |
| ); |
| |
| remote_block_device.detach_vmo(vmo_id).await.expect("detach failed"); |
| }, |
| async { |
| let root = fixture.root(); |
| root.open( |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "foo", |
| ServerEnd::new(server_channel), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_flush_is_called() { |
| let fixture = TestFixture::new().await; |
| let (client_channel, server_channel) = zx::Channel::create(); |
| join!( |
| async { |
| let remote_block_device = RemoteBlockClient::new( |
| ClientEnd::<BlockMarker>::new(client_channel) |
| .into_proxy() |
| .expect("create proxy"), |
| ) |
| .await |
| .expect("RemoteBlockClient::new failed"); |
| remote_block_device.flush().await.expect("flush failed"); |
| }, |
| async { |
| let root = fixture.root(); |
| root.open( |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "foo", |
| ServerEnd::new(server_channel), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_getattr() { |
| let fixture = TestFixture::new().await; |
| let (client_channel, server_channel) = zx::Channel::create(); |
| |
| join!( |
| async { |
| let original_block_device = ClientEnd::<VolumeAndNodeMarker>::new(client_channel) |
| .into_proxy() |
| .expect("convert into proxy failed"); |
| let (status, attr) = |
| original_block_device.get_attr().await.expect("get_attr failed"); |
| zx::Status::ok(status).expect("block get_attr failed"); |
| assert_eq!(attr.mode, fio::MODE_TYPE_BLOCK_DEVICE); |
| }, |
| async { |
| let root = fixture.root(); |
| root.open( |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "foo", |
| ServerEnd::new(server_channel), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_get_info() { |
| let fixture = TestFixture::new().await; |
| let (client_channel, server_channel) = zx::Channel::create(); |
| let file_size = 2 * 1024 * 1024; |
| join!( |
| async { |
| let original_block_device = ClientEnd::<VolumeAndNodeMarker>::new(client_channel) |
| .into_proxy() |
| .expect("convert into proxy failed"); |
| let info = original_block_device |
| .get_info() |
| .await |
| .expect("get_info failed") |
| .map_err(zx::Status::from_raw) |
| .expect("block get_info failed"); |
| assert_eq!(info.block_count * u64::from(info.block_size), file_size); |
| }, |
| async { |
| let root = fixture.root(); |
| let file = open_file_checked( |
| &root, |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::NOT_DIRECTORY, |
| "block_device", |
| ) |
| .await; |
| let () = file |
| .resize(file_size) |
| .await |
| .expect("resize failed") |
| .map_err(zx::Status::from_raw) |
| .expect("resize error"); |
| let () = file |
| .close() |
| .await |
| .expect("close failed") |
| .map_err(zx::Status::from_raw) |
| .expect("close error"); |
| |
| root.open( |
| fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "block_device", |
| ServerEnd::new(server_channel), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| |
| #[fuchsia::test(threads = 10)] |
| async fn test_blobfs() { |
| let fixture = TestFixture::new().await; |
| let (client, server) = fidl::endpoints::create_proxy::<ControllerMarker>().unwrap(); |
| join!( |
| async { |
| let mut blobfs = Filesystem::new(client, Blobfs::default()); |
| blobfs.format().await.expect("format blobfs failed"); |
| blobfs.fsck().await.expect("fsck failed"); |
| // Mount blobfs |
| let serving = blobfs.serve().await.expect("serve blobfs failed"); |
| |
| let content = String::from("Hello world!").into_bytes(); |
| let merkle_root_hash = |
| MerkleTree::from_reader(&content[..]).unwrap().root().to_string(); |
| { |
| let file = fuchsia_fs::directory::open_file( |
| serving.root(), |
| &merkle_root_hash, |
| fio::OpenFlags::CREATE | fio::OpenFlags::RIGHT_WRITABLE, |
| ) |
| .await |
| .expect("open file failed"); |
| let () = file |
| .resize(content.len() as u64) |
| .await |
| .expect("resize failed") |
| .map_err(zx::Status::from_raw) |
| .expect("resize error"); |
| let _: u64 = file |
| .write(&content) |
| .await |
| .expect("write to file failed") |
| .map_err(zx::Status::from_raw) |
| .expect("write to file error"); |
| } |
| // Check that blobfs can be successfully unmounted |
| serving.shutdown().await.expect("shutdown blobfs failed"); |
| |
| let serving = blobfs.serve().await.expect("serve blobfs failed"); |
| { |
| let file = fuchsia_fs::directory::open_file( |
| serving.root(), |
| &merkle_root_hash, |
| fio::OpenFlags::RIGHT_READABLE, |
| ) |
| .await |
| .expect("open file failed"); |
| let read_content = |
| fuchsia_fs::file::read(&file).await.expect("read from file failed"); |
| assert_eq!(content, read_content); |
| } |
| |
| serving.shutdown().await.expect("shutdown blobfs failed"); |
| }, |
| async { |
| let root = fixture.root(); |
| |
| let file = open_file_checked( |
| &root, |
| fio::OpenFlags::CREATE |
| | fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::NOT_DIRECTORY, |
| "block_device", |
| ) |
| .await; |
| let () = file |
| .resize(5 * 1024 * 1024) |
| .await |
| .expect("resize failed") |
| .map_err(zx::Status::from_raw) |
| .expect("resize error"); |
| let () = file |
| .close() |
| .await |
| .expect("close failed") |
| .map_err(zx::Status::from_raw) |
| .expect("close error"); |
| |
| root.open( |
| fio::OpenFlags::RIGHT_READABLE |
| | fio::OpenFlags::RIGHT_WRITABLE |
| | fio::OpenFlags::BLOCK_DEVICE, |
| fio::ModeType::empty(), |
| "block_device", |
| server.into_channel().into(), |
| ) |
| .expect("open failed"); |
| } |
| ); |
| fixture.close().await; |
| } |
| } |