blob: be1b6e916449e6c698d78568bafc40f4703ff845 [file] [log] [blame]
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
crate::fuchsia::{
fxblob::{blob::FxBlob, BlobDirectory},
pager::PagerBacked,
volume::FxVolume,
},
anyhow::Error,
arrayref::array_refs,
event_listener::EventListener,
fuchsia_async as fasync,
fuchsia_hash::Hash,
fuchsia_zircon as zx,
fxfs::{
drop_event::DropEvent,
errors::FxfsError,
log::*,
object_handle::{ReadObjectHandle, WriteObjectHandle},
object_store::ObjectDescriptor,
},
std::{
collections::btree_map::{BTreeMap, Entry},
mem::size_of,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
},
};
const REPLAY_THREADS: usize = 2;
/// Paired with a ProfileState that is currently recording, takes messages to be written into the
/// current profile. This should be dropped before the recording is stopped to ensure that all
/// messages have been flushed to the writer.
pub struct Recorder {
sender: async_channel::Sender<Vec<u8>>,
buffer: Vec<u8>,
offset: usize,
}
struct Message {
hash: Hash,
// Don't bother with offset+length. The kernel is going split up and align it one way and then
// we're going to change it all with read-ahead/read-around.
offset: u64,
}
impl Message {
fn encode_to(hash: &Hash, offset: u64, dest: &mut [u8]) {
dest[..size_of::<Hash>()].clone_from_slice(hash.as_bytes());
dest[size_of::<Hash>()..].clone_from_slice(offset.to_le_bytes().as_ref());
}
fn decode_from(src: &[u8; size_of::<Message>()]) -> Self {
let (first, second) = array_refs!(src, size_of::<Hash>(), size_of::<u64>());
Self { hash: Hash::from_array(*first), offset: u64::from_le_bytes(*second) }
}
fn is_zeroes(&self) -> bool {
self.hash == Hash::from_array([0u8; size_of::<Hash>()]) && self.offset == 0
}
}
impl Recorder {
/// Record a blob page in request, for the given hash and offset.
pub fn record(&mut self, hash: Hash, offset: u64) -> Result<(), Error> {
// Encode `Message` into the buffer.
let offset_next = self.offset + size_of::<Message>();
Message::encode_to(
&hash,
offset,
&mut self.buffer.as_mut_slice()[self.offset..offset_next],
);
self.offset = offset_next;
if self.offset + size_of::<Message>() > self.buffer.len() {
// try_send to avoid async await, we use an unbounded channel anyways.
let len = self.buffer.len();
self.sender.try_send(std::mem::replace(&mut self.buffer, vec![0u8; len]))?;
self.offset = 0;
}
Ok(())
}
}
impl Drop for Recorder {
fn drop(&mut self) {
// Recorder should be dropped to flush entries first, if the channel is closed then we've
// already shut down the receiver.
debug_assert!(!self.sender.is_closed());
// Best effort sending what messages have already been queued.
if self.offset > 0 {
let buffer = std::mem::take(&mut self.buffer);
let _ = self.sender.try_send(buffer);
}
}
}
struct Request {
file: Arc<FxBlob>,
offset: u64,
}
struct RecordingState {
stopped_listener: EventListener,
sender_clone: async_channel::Sender<Vec<u8>>,
}
struct ReplayState {
// Using a DropEvent to trigger Take shutdown if this goes out of scope.
stop_on_drop: DropEvent,
stopped_listener: EventListener,
}
enum State {
None,
Recording(RecordingState),
Replaying(ReplayState),
}
/// Holds the current profile recording or replay state, and provides methods for state transitions.
pub struct ProfileState(State);
impl ProfileState {
pub fn new() -> Self {
Self(State::None)
}
/// Returns true if currently recording or replaying.
pub fn busy(&self) -> bool {
!matches!(self.0, State::None)
}
/// Creates a new recording and returns the `Recorder` object to record to. The recording starts
/// shutdown when the associated `Recorder` is dropped, and shutdown is completed when
/// `stop_profiler()` is called.
pub fn record_new<T: WriteObjectHandle>(&mut self, handle: T) -> Recorder {
self.stop_profiler();
let finished_event = DropEvent::new();
let (sender, receiver) = async_channel::unbounded::<Vec<u8>>();
let sender_clone = sender.clone();
let stopped_listener = (*finished_event).listen();
let buffer = vec![0u8; handle.block_size() as usize];
fasync::Task::spawn(async move {
// DropEvent should fire when this task is done.
let _finished_event = finished_event;
while let Ok(buffer) = receiver.recv().await {
let mut io_buf = handle.allocate_buffer(buffer.len()).await;
// While I'd prefer to avoid this memcpy, making the io Buffer `Send` was going to
// very messy.
io_buf.as_mut_slice().clone_from_slice(buffer.as_slice());
if let Err(e) = handle.write_or_append(None, io_buf.as_ref()).await {
error!("Failed to write profile block: {:?}", e);
break;
}
}
})
.detach();
self.0 = State::Recording(RecordingState { stopped_listener, sender_clone });
Recorder { sender, buffer, offset: 0 }
}
/// Stop in-flight recording and replaying. This method will block until the resources have been
/// cleaned up.
pub fn stop_profiler(&mut self) {
let old = std::mem::replace(&mut self.0, State::None);
match old {
State::Recording(recording) => {
// This closes the channel for *all* senders, which starts shutting down.
recording.sender_clone.close();
recording.stopped_listener.wait();
}
State::Replaying(replaying) => {
let ReplayState { stop_on_drop, stopped_listener } = replaying;
std::mem::drop(stop_on_drop);
stopped_listener.wait();
}
State::None => {}
}
}
fn page_in_thread(queue: async_channel::Receiver<Request>, stop_processing: Arc<AtomicBool>) {
while let Ok(request) = queue.recv_blocking() {
let _ = request.file.vmo().op_range(
zx::VmoOp::COMMIT,
request.offset,
zx::system_get_page_size() as u64,
);
if stop_processing.load(Ordering::Relaxed) {
break;
}
}
}
async fn read_and_queue<T: ReadObjectHandle>(
handle: T,
volume: Arc<FxVolume>,
sender: &async_channel::Sender<Request>,
local_cache: &mut BTreeMap<Hash, Arc<FxBlob>>,
) -> Result<(), Error> {
let root_dir = volume
.get_or_load_node(
volume.store().root_directory_object_id(),
ObjectDescriptor::Directory,
None,
)
.await?
.into_any()
.downcast::<BlobDirectory>()
.map_err(|_| FxfsError::Inconsistent)?;
let mut io_buf = handle.allocate_buffer(handle.block_size() as usize).await;
let file_size = handle.get_size() as usize;
let mut offset = 0;
while offset < file_size {
let actual = handle
.read(offset as u64, io_buf.as_mut())
.await
.map_err(|e| e.context(format!("Failed to read at offset: {}", offset)))?;
if actual != io_buf.len() {
// This is unexpected due to how it's written, but recoverable.
warn!("Partial profile read at {} of length {}", offset, actual);
}
offset += actual;
let mut local_offset = 0;
for next_offset in std::ops::RangeInclusive::new(size_of::<Message>(), actual)
.step_by(size_of::<Message>())
{
let msg = Message::decode_from(
io_buf.as_slice()[local_offset..next_offset].try_into().unwrap(),
);
local_offset = next_offset;
// Ignore trailing zeroes. This is technically a valid entry but extremely unlikely
// and will only break an optimization.
if msg.is_zeroes() {
break;
}
let file = match local_cache.entry(msg.hash) {
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => match root_dir.lookup_blob(msg.hash).await {
Err(e) => {
warn!("Failed to open object {} from profile: {:?}", msg.hash, e);
continue;
}
Ok(file) => {
entry.insert(file.clone());
file
}
},
};
sender.send(Request { file, offset: msg.offset }).await.unwrap();
}
}
Ok(())
}
/// Reads given handle to parse a profile and replay it by requesting pages via
/// ZX_VMO_OP_COMMIT in blocking background threads.
pub fn replay_profile<T: ReadObjectHandle>(&mut self, handle: T, volume: Arc<FxVolume>) {
self.stop_profiler();
let stop_on_drop = DropEvent::new();
let (sender, receiver) = async_channel::unbounded::<Request>();
let shutting_down = Arc::new(AtomicBool::new(false));
// Create (bounded) async_channel, async thread reads and populates the channel, then N threads
// consume it and touch pages.
let mut replay_threads = Vec::with_capacity(REPLAY_THREADS);
for _ in 0..REPLAY_THREADS {
let stop_processing = shutting_down.clone();
let queue = receiver.clone();
replay_threads.push(std::thread::spawn(move || {
Self::page_in_thread(queue, stop_processing);
}));
}
let stopped_event = DropEvent::new();
let stopped_listener = stopped_event.listen();
let start_shutdown = stop_on_drop.listen();
fasync::Task::spawn(async move {
// Move the drop event in
let _stopped_event = stopped_event;
// Hold the items in cache until replay is stopped.
let mut local_cache: BTreeMap<Hash, Arc<FxBlob>> = BTreeMap::new();
if let Err(e) = Self::read_and_queue(handle, volume, &sender, &mut local_cache).await {
error!("Failed to read back profile: {:?}", e);
}
sender.close();
start_shutdown.await;
shutting_down.store(true, Ordering::Relaxed);
for thread in replay_threads {
let _ = thread.join();
}
})
.detach();
self.0 = State::Replaying(ReplayState { stop_on_drop, stopped_listener });
}
}
impl Drop for ProfileState {
fn drop(&mut self) {
self.stop_profiler();
}
}
#[cfg(test)]
mod tests {
use {
super::{Message, ProfileState, Request},
crate::fuchsia::{
fxblob::{
blob::FxBlob,
testing::{self as testing, BlobFixture},
BlobDirectory,
},
pager::PagerBacked,
},
anyhow::Error,
async_trait::async_trait,
delivery_blob::CompressionMode,
event_listener::{Event, EventListener},
fuchsia_async as fasync,
fuchsia_hash::Hash,
fxfs::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle},
std::{
collections::BTreeMap,
mem::size_of,
sync::{Arc, Mutex},
time::Duration,
},
storage_device::{
buffer::{BufferRef, MutableBufferRef},
buffer_allocator::{BufferAllocator, BufferFuture, BufferSource},
},
};
struct FakeReaderWriterInner {
data: Vec<u8>,
delays: Vec<EventListener>,
}
struct FakeReaderWriter {
allocator: BufferAllocator,
inner: Arc<Mutex<FakeReaderWriterInner>>,
}
const BLOCK_SIZE: usize = 4096;
impl FakeReaderWriter {
fn new() -> Self {
Self {
allocator: BufferAllocator::new(BLOCK_SIZE, BufferSource::new(BLOCK_SIZE * 8)),
inner: Arc::new(Mutex::new(FakeReaderWriterInner {
data: Vec::new(),
delays: Vec::new(),
})),
}
}
fn push_delay(&self, delay: EventListener) {
self.inner.lock().unwrap().delays.insert(0, delay);
}
}
impl ObjectHandle for FakeReaderWriter {
fn object_id(&self) -> u64 {
0
}
fn block_size(&self) -> u64 {
self.allocator.block_size() as u64
}
fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
self.allocator.allocate_buffer(size)
}
}
impl WriteObjectHandle for FakeReaderWriter {
async fn write_or_append(
&self,
offset: Option<u64>,
buf: BufferRef<'_>,
) -> Result<u64, Error> {
// We only append for now.
assert!(offset.is_none());
let delay = self.inner.lock().unwrap().delays.pop();
if let Some(delay) = delay {
delay.await;
}
// This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
self.inner.lock().unwrap().data.extend_from_slice(buf.as_slice());
Ok(buf.len() as u64)
}
async fn truncate(&self, _size: u64) -> Result<(), Error> {
unreachable!();
}
async fn flush(&self) -> Result<(), Error> {
unreachable!();
}
}
#[async_trait]
impl ReadObjectHandle for FakeReaderWriter {
async fn read(&self, offset: u64, mut buf: MutableBufferRef<'_>) -> Result<usize, Error> {
let delay = self.inner.lock().unwrap().delays.pop();
if let Some(delay) = delay {
delay.await;
}
// This relocking has a TOCTOU flavour, but it shouldn't matter for this application.
let inner = self.inner.lock().unwrap();
assert!(offset as usize <= inner.data.len());
let offset_end = std::cmp::min(offset as usize + buf.len(), inner.data.len());
let size = offset_end - offset as usize;
buf.as_mut_slice()[..size].clone_from_slice(&inner.data[offset as usize..offset_end]);
Ok(size)
}
fn get_size(&self) -> u64 {
self.inner.lock().unwrap().data.len() as u64
}
}
#[fuchsia::test(threads = 10)]
async fn test_recording_basic() {
let mut state = ProfileState::new();
let handle = FakeReaderWriter::new();
let inner = handle.inner.clone();
assert_eq!(inner.lock().unwrap().data.len(), 0);
{
// Drop recorder when finished writing to flush data.
let mut recorder = state.record_new(handle);
recorder.record(Hash::from_array([88u8; size_of::<Hash>()]), 0).unwrap();
}
state.stop_profiler();
assert_eq!(inner.lock().unwrap().data.len(), BLOCK_SIZE);
}
#[fuchsia::test(threads = 10)]
async fn test_recording_more_than_block() {
let mut state = ProfileState::new();
let handle = FakeReaderWriter::new();
let inner = handle.inner.clone();
let fixture = testing::new_blob_fixture().await;
assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
let message_count = (fixture.fs().block_size() as usize / size_of::<Message>()) + 1;
assert_eq!(inner.lock().unwrap().data.len(), 0);
let hash;
{
hash = fixture.write_blob(&[88u8], CompressionMode::Never).await;
// Drop recorder when finished writing to flush data.
let mut recorder = state.record_new(handle);
for _ in 0..message_count {
recorder.record(hash, 4096).unwrap();
}
}
state.stop_profiler();
{
let data = &inner.lock().unwrap().data;
assert_eq!(data.len(), BLOCK_SIZE * 2);
}
let mut result = FakeReaderWriter::new();
result.inner = inner.clone();
let mut local_cache: BTreeMap<Hash, Arc<FxBlob>> = BTreeMap::new();
let (sender, receiver) = async_channel::unbounded::<Request>();
let volume = fixture.volume().volume().clone();
let task = fasync::Task::spawn(async move {
ProfileState::read_and_queue(result, volume, &sender, &mut local_cache).await.unwrap();
});
let mut recv_count = 0;
while let Ok(msg) = receiver.recv().await {
recv_count += 1;
assert_eq!(msg.file.root(), hash);
assert_eq!(msg.offset, 4096);
}
task.await;
assert_eq!(recv_count, message_count);
fixture.close().await;
}
#[fuchsia::test(threads = 10)]
async fn test_replay_profile() {
// Create all the files that we need first, then restart the filesystem to clear cache.
let mut state = ProfileState::new();
let handle = FakeReaderWriter::new();
let data_original = handle.inner.clone();
let mut hashes = Vec::new();
let device = {
let fixture = testing::new_blob_fixture().await;
assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
let message_count = (fixture.fs().block_size() as usize / size_of::<Message>()) + 1;
let mut recorder = state.record_new(handle);
// Page in the zero offsets only to avoid readahead strangeness.
for i in 0..message_count {
let hash =
fixture.write_blob(i.to_le_bytes().as_slice(), CompressionMode::Never).await;
hashes.push(hash);
recorder.record(hash, 0).unwrap();
}
fixture.close().await
};
device.ensure_unique();
state.stop_profiler();
device.reopen(false);
let fixture = testing::open_blob_fixture(device).await;
{
// Need to get the root vmo to check committed bytes.
let dir = fixture
.volume()
.root()
.clone()
.into_any()
.downcast::<BlobDirectory>()
.expect("Root should be BlobDirectory");
// Ensure that nothing is paged in right now.
for hash in &hashes {
let blob = dir.lookup_blob(*hash).await.expect("Opening blob");
assert_eq!(blob.vmo().info().unwrap().committed_bytes, 0);
}
let mut handle = FakeReaderWriter::new();
// New handle with the recorded data gets replayed on the volume.
handle.inner = data_original;
state.replay_profile(handle, fixture.volume().volume().clone());
// Await all data being played back by checking that things have paged in.
for hash in &hashes {
let blob = dir.lookup_blob(*hash).await.expect("Opening blob");
while blob.vmo().info().unwrap().committed_bytes == 0 {
fasync::Timer::new(Duration::from_millis(25)).await;
}
// The replay task shouldn't increment the blob's open count. This ensures that we
// can still delete blobs while we are replaying a profile.
assert!(blob.mark_to_be_purged());
}
state.stop_profiler();
}
fixture.close().await;
}
// Doesn't ensure that anything reads back properly, just that everything shuts down when
// stopped early.
#[fuchsia::test(threads = 10)]
async fn test_replay_profile_stop_early() {
// Create all the files that we need first, then restart the filesystem to clear cache.
let mut state = ProfileState::new();
let fixture = testing::new_blob_fixture().await;
let recording_handle = FakeReaderWriter::new();
let inner = recording_handle.inner.clone();
// Queue up more than 2 pages. Doesn't matter that they're all the same.
{
let hash = fixture.write_blob(&[0, 1, 2, 3], CompressionMode::Never).await;
let mut recorder = state.record_new(recording_handle);
assert_eq!(BLOCK_SIZE as u64, fixture.fs().block_size());
let message_count = (fixture.fs().block_size() as usize / size_of::<Message>()) * 2 + 1;
for _ in 0..message_count {
recorder.record(hash, 0).unwrap();
}
}
state.stop_profiler();
let mut replay_handle = FakeReaderWriter::new();
replay_handle.inner = inner.clone();
let delay1 = Event::new();
replay_handle.push_delay(delay1.listen());
let delay2 = Event::new();
replay_handle.push_delay(delay2.listen());
// Don't let delay1 hold us up. Only wait on the second read.
delay1.notify(usize::MAX);
state.replay_profile(replay_handle, fixture.volume().volume().clone());
// Let it make some small amount of progress.
fasync::Timer::new(Duration::from_millis(5)).await;
fasync::Task::spawn(async move {
// Let the profiler wait on this a little.
fasync::Timer::new(Duration::from_millis(5)).await;
delay2.notify(usize::MAX);
})
.detach();
// Should block on the second read for a bit, then cleanly shutdown.
state.stop_profiler();
fixture.close().await;
}
}