| // Copyright 2019 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "src/storage/blobfs/pager/user-pager.h" |
| |
| #include <fuchsia/scheduler/cpp/fidl.h> |
| #include <lib/fdio/directory.h> |
| #include <lib/fit/defer.h> |
| #include <lib/fzl/owned-vmo-mapper.h> |
| #include <lib/syslog/cpp/macros.h> |
| #include <lib/zx/thread.h> |
| #include <limits.h> |
| #include <zircon/status.h> |
| #include <zircon/threads.h> |
| |
| #include <algorithm> |
| #include <memory> |
| |
| #include <fbl/auto_call.h> |
| #include <fs/trace.h> |
| |
| #include "src/storage/blobfs/format.h" |
| #include "src/storage/blobfs/metrics.h" |
| #include "src/storage/lib/watchdog/include/lib/watchdog/operations.h" |
| |
| namespace blobfs { |
| namespace pager { |
| |
| UserPager::UserPager(size_t decompression_buffer_size, BlobfsMetrics* metrics) |
| : decompression_buffer_size_(decompression_buffer_size), metrics_(metrics) {} |
| |
| zx::status<std::unique_ptr<UserPager>> UserPager::Create( |
| std::unique_ptr<TransferBuffer> uncompressed_buffer, |
| std::unique_ptr<TransferBuffer> compressed_buffer, size_t decompression_buffer_size, |
| BlobfsMetrics* metrics, bool sandbox_decompression) { |
| ZX_DEBUG_ASSERT(metrics != nullptr && uncompressed_buffer != nullptr && |
| uncompressed_buffer->vmo().is_valid() && compressed_buffer != nullptr && |
| compressed_buffer->vmo().is_valid()); |
| |
| if (uncompressed_buffer->size() % kBlobfsBlockSize || |
| compressed_buffer->size() % kBlobfsBlockSize || |
| decompression_buffer_size % kBlobfsBlockSize) { |
| return zx::error(ZX_ERR_INVALID_ARGS); |
| } |
| if (compressed_buffer->size() < decompression_buffer_size) { |
| return zx::error(ZX_ERR_INVALID_ARGS); |
| } |
| |
| TRACE_DURATION("blobfs", "UserPager::Create"); |
| |
| auto pager = std::unique_ptr<UserPager>(new UserPager(decompression_buffer_size, metrics)); |
| pager->uncompressed_transfer_buffer_ = std::move(uncompressed_buffer); |
| pager->compressed_transfer_buffer_ = std::move(compressed_buffer); |
| |
| zx_status_t status = |
| pager->compressed_mapper_.Map(pager->compressed_transfer_buffer_->vmo(), 0, |
| pager->compressed_transfer_buffer_->size(), ZX_VM_PERM_READ); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "Failed to map the compressed TransferBuffer: " |
| << zx_status_get_string(status); |
| return zx::error(status); |
| } |
| |
| status = zx::vmo::create(pager->decompression_buffer_size_, 0, &pager->decompression_buffer_); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "Failed to create decompression buffer: " << zx_status_get_string(status); |
| return zx::error(status); |
| } |
| |
| if (sandbox_decompression) { |
| status = zx::vmo::create(kDecompressionBufferSize, 0, &pager->sandbox_buffer_); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "blobfs: Failed to create sandbox buffer: %s\n" |
| << zx_status_get_string(status); |
| return zx::error(status); |
| } |
| |
| auto client_or = ExternalDecompressorClient::Create(pager->sandbox_buffer_, |
| pager->compressed_transfer_buffer_->vmo()); |
| if (!client_or.is_ok()) { |
| return zx::error(client_or.status_value()); |
| } |
| pager->decompressor_client_ = std::move(client_or.value()); |
| } |
| |
| // Create the pager object. |
| status = zx::pager::create(0, &pager->pager_); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "Cannot initialize pager"; |
| return zx::error(status); |
| } |
| |
| // Start the pager thread. |
| thrd_t thread; |
| status = pager->pager_loop_.StartThread("blobfs-pager-thread", &thread); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "Could not start pager thread"; |
| return zx::error(status); |
| } |
| |
| // Set a scheduling deadline profile for the blobfs-pager-thread. This is purely a performance |
| // optimization, and failure to do so is not fatal. So in the case of an error encountered |
| // in any of the steps within |SetDeadlineProfile|, we log a warning, and successfully return the |
| // UserPager instance. |
| SetDeadlineProfile(thread); |
| |
| // Initialize and start the watchdog. |
| pager->watchdog_ = fs_watchdog::CreateWatchdog(); |
| zx::status<> watchdog_status = pager->watchdog_->Start(); |
| if (!watchdog_status.is_ok()) { |
| FX_LOGS(ERROR) << "Could not start pager watchdog"; |
| return zx::error(watchdog_status.status_value()); |
| } |
| |
| return zx::ok(std::move(pager)); |
| } |
| |
| void UserPager::SetDeadlineProfile(thrd_t thread) { |
| zx::channel channel0, channel1; |
| zx_status_t status = zx::channel::create(0u, &channel0, &channel1); |
| if (status != ZX_OK) { |
| FX_LOGS(WARNING) << "Could not create channel pair: " << zx_status_get_string(status); |
| return; |
| } |
| |
| // Connect to the scheduler profile provider service. |
| status = fdio_service_connect( |
| (std::string("/svc_blobfs/") + fuchsia::scheduler::ProfileProvider::Name_).c_str(), |
| channel0.release()); |
| if (status != ZX_OK) { |
| FX_LOGS(WARNING) << "Could not connect to scheduler profile provider: " |
| << zx_status_get_string(status); |
| return; |
| } |
| |
| fuchsia::scheduler::ProfileProvider_SyncProxy provider(std::move(channel1)); |
| |
| zx_status_t fidl_status = ZX_OK; |
| zx::profile profile; |
| |
| // Deadline profile parameters for the pager thread. |
| // Details on the performance analysis to arrive at these numbers can be found in fxbug.dev/56291. |
| // |
| // TODO(fxbug.dev/40858): Migrate to the role-based API when available, instead of hard |
| // coding parameters. |
| const zx_duration_t capacity = ZX_USEC(1800); |
| const zx_duration_t deadline = ZX_USEC(2800); |
| const zx_duration_t period = deadline; |
| |
| status = provider.GetDeadlineProfile( |
| capacity, deadline, period, "/boot/bin/blobfs:blobfs-pager-thread", &fidl_status, &profile); |
| |
| if (status != ZX_OK || fidl_status != ZX_OK) { |
| FX_LOGS(WARNING) << "Failed to get deadline profile: " << zx_status_get_string(status) << ", " |
| << zx_status_get_string(fidl_status); |
| } else { |
| auto pager_thread = zx::unowned_thread(thrd_get_zx_handle(thread)); |
| // Set the deadline profile. |
| status = pager_thread->set_profile(profile, 0); |
| if (status != ZX_OK) { |
| FX_LOGS(WARNING) << "Failed to set deadline profile: " << zx_status_get_string(status); |
| } |
| } |
| } |
| |
| UserPager::ReadRange UserPager::GetBlockAlignedReadRange(const UserPagerInfo& info, uint64_t offset, |
| uint64_t length) const { |
| ZX_DEBUG_ASSERT(offset < info.data_length_bytes); |
| // Clamp the range to the size of the blob. |
| length = std::min(length, info.data_length_bytes - offset); |
| |
| // Align to the block size for verification. (In practice this means alignment to 8k). |
| zx_status_t status = info.verifier->Align(&offset, &length); |
| // This only happens if the info.verifier thinks that [offset,length) is out of range, which |
| // will only happen if |verifier| was initialized with a different length than the rest of |info| |
| // (which is a programming error). |
| ZX_DEBUG_ASSERT(status == ZX_OK); |
| |
| ZX_DEBUG_ASSERT(offset % kBlobfsBlockSize == 0); |
| ZX_DEBUG_ASSERT(length % kBlobfsBlockSize == 0 || offset + length == info.data_length_bytes); |
| |
| return {.offset = offset, .length = length}; |
| } |
| |
| UserPager::ReadRange UserPager::GetBlockAlignedExtendedRange(const UserPagerInfo& info, |
| uint64_t offset, |
| uint64_t length) const { |
| // TODO(rashaeqbal): Consider making the cluster size dynamic once we have prefetch read |
| // efficiency metrics from the kernel - i.e. what percentage of prefetched pages are actually |
| // used. Note that dynamic prefetch sizing might not play well with compression, since we |
| // always need to read in entire compressed frames. |
| // |
| // TODO(rashaeqbal): Consider extending the range backwards as well. Will need some way to track |
| // populated ranges. |
| // |
| // Read in at least 32KB at a time. This gives us the best performance numbers w.r.t. memory |
| // savings and observed latencies. Detailed results from experiments to tune this can be found in |
| // fxbug.dev/48519. |
| constexpr uint64_t kReadAheadClusterSize = (32 * (1 << 10)); |
| |
| size_t read_ahead_offset = offset; |
| size_t read_ahead_length = std::max(kReadAheadClusterSize, length); |
| read_ahead_length = std::min(read_ahead_length, info.data_length_bytes - read_ahead_offset); |
| |
| // Align to the block size for verification. (In practice this means alignment to 8k). |
| return GetBlockAlignedReadRange(info, read_ahead_offset, read_ahead_length); |
| } |
| |
| PagerErrorStatus UserPager::TransferPagesToVmo(uint64_t offset, uint64_t length, const zx::vmo& vmo, |
| const UserPagerInfo& info) { |
| size_t end; |
| if (add_overflow(offset, length, &end)) { |
| FX_LOGS(ERROR) << "pager transfer range would overflow (off=" << offset << ", len=" << length |
| << ")"; |
| return PagerErrorStatus::kErrBadState; |
| } |
| |
| static const fs_watchdog::FsOperationType kOperation( |
| fs_watchdog::FsOperationType::CommonFsOperation::PageFault, std::chrono::seconds(60)); |
| [[maybe_unused]] fs_watchdog::FsOperationTracker tracker(&kOperation, watchdog_.get()); |
| |
| if (info.decompressor != nullptr) { |
| return TransferChunkedPagesToVmo(offset, length, vmo, info); |
| } else { |
| return TransferUncompressedPagesToVmo(offset, length, vmo, info); |
| } |
| } |
| |
| // The requested range is aligned in multiple steps as follows: |
| // 1. The range is extended to speculatively read in 32k at a time. |
| // 2. The extended range is further aligned for Merkle tree verification later. |
| // 3. This range is read in chunks equal to the size of the uncompressed_transfer_buffer_. Each |
| // chunk is verified as it is read in, and spliced into the destination VMO with supply_pages(). |
| // |
| // The assumption here is that the transfer buffer is sized per the alignment requirements for |
| // Merkle tree verification. We have static asserts in place to check this assumption - the transfer |
| // buffer (256MB) is 8k block aligned. |
| PagerErrorStatus UserPager::TransferUncompressedPagesToVmo(uint64_t requested_offset, |
| uint64_t requested_length, |
| const zx::vmo& vmo, |
| const UserPagerInfo& info) { |
| ZX_DEBUG_ASSERT(!info.decompressor); |
| |
| const auto [start_offset, total_length] = |
| GetBlockAlignedExtendedRange(info, requested_offset, requested_length); |
| |
| TRACE_DURATION("blobfs", "UserPager::TransferUncompressedPagesToVmo", "offset", start_offset, |
| "length", total_length); |
| |
| uint64_t offset = start_offset; |
| uint64_t length_remaining = total_length; |
| uint64_t length; |
| |
| // Read in multiples of the transfer buffer size. In practice we should only require one iteration |
| // for the majority of cases, since the transfer buffer is 256MB. |
| while (length_remaining > 0) { |
| length = std::min(uncompressed_transfer_buffer_->size(), length_remaining); |
| |
| auto decommit = fit::defer([this, length = length]() { |
| // Decommit pages in the transfer buffer that might have been populated. All blobs share the |
| // same transfer buffer - this prevents data leaks between different blobs. |
| uncompressed_transfer_buffer_->vmo().op_range( |
| ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize), nullptr, 0); |
| }); |
| |
| // Read from storage into the transfer buffer. |
| auto populate_status = uncompressed_transfer_buffer_->Populate(offset, length, info); |
| if (!populate_status.is_ok()) { |
| FX_LOGS(ERROR) << "TransferUncompressed: Failed to populate transfer vmo: " |
| << populate_status.status_string(); |
| return ToPagerErrorStatus(populate_status.status_value()); |
| } |
| |
| const uint64_t rounded_length = fbl::round_up<uint64_t, uint64_t>(length, PAGE_SIZE); |
| |
| // The block size is a multiple of the page size and |length| has already been block aligned. If |
| // |rounded_length| is greater than |length| then |length| isn't block aligned because it's at |
| // the end of the blob. In the compact layout the Merkle tree can share the last block of the |
| // data and may have been read into the transfer buffer. The Merkle tree needs to be removed |
| // before transfering the pages to the destination VMO. |
| static_assert(kBlobfsBlockSize % PAGE_SIZE == 0); |
| if (rounded_length > length) { |
| zx_status_t status = uncompressed_transfer_buffer_->vmo().op_range( |
| ZX_VMO_OP_ZERO, length, rounded_length - length, nullptr, 0); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferUncompressed: Failed to remove Merkle tree from " |
| "transfer buffer: " |
| << zx_status_get_string(status); |
| return ToPagerErrorStatus(status); |
| } |
| } |
| |
| // Verify the pages read in. |
| { |
| fzl::VmoMapper mapping; |
| // We need to unmap the transfer VMO before its pages can be transferred to the destination |
| // VMO, via |zx_pager_supply_pages|. |
| auto unmap = fit::defer([&]() { mapping.Unmap(); }); |
| |
| // Map the transfer VMO in order to pass the verifier a pointer to the data. |
| zx_status_t status = |
| mapping.Map(uncompressed_transfer_buffer_->vmo(), 0, rounded_length, ZX_VM_PERM_READ); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferUncompressed: Failed to map transfer buffer: " |
| << zx_status_get_string(status); |
| return ToPagerErrorStatus(status); |
| } |
| |
| status = info.verifier->VerifyPartial(mapping.start(), length, offset, rounded_length); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferUncompressed: Failed to verify data: " |
| << zx_status_get_string(status); |
| return ToPagerErrorStatus(status); |
| } |
| } |
| |
| ZX_DEBUG_ASSERT(offset % PAGE_SIZE == 0); |
| // Move the pages from the transfer buffer to the destination VMO. |
| zx_status_t status = |
| pager_.supply_pages(vmo, offset, rounded_length, uncompressed_transfer_buffer_->vmo(), 0); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferUncompressed: Failed to supply pages to paged VMO: " |
| << zx_status_get_string(status); |
| return ToPagerErrorStatus(status); |
| } |
| |
| length_remaining -= length; |
| offset += length; |
| } |
| |
| fbl::String merkle_root_hash = info.verifier->digest().ToString(); |
| metrics_->IncrementPageIn(merkle_root_hash, start_offset, total_length); |
| |
| return PagerErrorStatus::kOK; |
| } |
| |
| // The requested range is aligned in multiple steps as follows: |
| // 1. The desired uncompressed range is aligned for Merkle tree verification. |
| // 2. This range is extended to span complete compression frames / chunks, since that is the |
| // granularity we can decompress data in. The result of this alignment produces a |
| // CompressionMapping, which contains the mapping of the requested uncompressed range to the |
| // compressed range that needs to be read in from disk. |
| // 3. The uncompressed range is processed in chunks equal to the decompression_buffer_size_. For |
| // each chunk, we compute the CompressionMapping to determine the compressed range that needs to be |
| // read in. Each chunk is uncompressed and verified as it is read in, and spliced into the |
| // destination VMO with supply_pages(). |
| // |
| // There are two assumptions we make here: First that the decompression buffer is sized per the |
| // alignment requirements for Merkle tree verification. And second that the transfer buffer is sized |
| // such that it can accommodate all the compressed data for the decompression buffer, i.e. the |
| // transfer buffer should work with the worst case compression ratio of 1. We have static asserts in |
| // place to check both these assumptions - the transfer buffer is the same size as the decompression |
| // buffer (256MB), and both these buffers are 8k block aligned. |
| PagerErrorStatus UserPager::TransferChunkedPagesToVmo(uint64_t requested_offset, |
| uint64_t requested_length, const zx::vmo& vmo, |
| const UserPagerInfo& info) { |
| ZX_DEBUG_ASSERT(info.decompressor); |
| |
| const auto [offset, length] = GetBlockAlignedReadRange(info, requested_offset, requested_length); |
| |
| TRACE_DURATION("blobfs", "UserPager::TransferChunkedPagesToVmo", "offset", offset, "length", |
| length); |
| |
| fbl::String merkle_root_hash = info.verifier->digest().ToString(); |
| |
| size_t current_decompressed_offset = offset; |
| size_t desired_decompressed_end = offset + length; |
| |
| // Read in multiples of the decompression buffer size. In practice we should only require one |
| // iteration for the majority of cases, since the decompression buffer is 256MB. |
| while (current_decompressed_offset < desired_decompressed_end) { |
| size_t current_decompressed_length = desired_decompressed_end - current_decompressed_offset; |
| zx::status<CompressionMapping> mapping_status = info.decompressor->MappingForDecompressedRange( |
| current_decompressed_offset, current_decompressed_length, decompression_buffer_size_); |
| |
| if (!mapping_status.is_ok()) { |
| FX_LOGS(ERROR) << "TransferChunked: Failed to find range for [" << offset << ", " |
| << current_decompressed_offset + current_decompressed_length |
| << "): " << mapping_status.status_string(); |
| return ToPagerErrorStatus(mapping_status.status_value()); |
| } |
| CompressionMapping mapping = mapping_status.value(); |
| |
| // The compressed frame may not fall at a block aligned address, but we read in block aligned |
| // chunks. This offset will be applied to the buffer we pass to decompression. |
| // TODO(jfsulliv): Caching blocks which span frames may be useful for performance. |
| size_t offset_of_compressed_data = mapping.compressed_offset % kBlobfsBlockSize; |
| |
| // Read from storage into the transfer buffer. |
| size_t read_offset = fbl::round_down(mapping.compressed_offset, kBlobfsBlockSize); |
| size_t read_len = (mapping.compressed_length + offset_of_compressed_data); |
| |
| auto decommit_compressed = fit::defer([this, length = read_len]() { |
| // Decommit pages in the transfer buffer that might have been populated. All blobs share the |
| // same transfer buffer - this prevents data leaks between different blobs. |
| compressed_transfer_buffer_->vmo().op_range( |
| ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize), nullptr, 0); |
| }); |
| |
| auto populate_status = compressed_transfer_buffer_->Populate(read_offset, read_len, info); |
| if (!populate_status.is_ok()) { |
| FX_LOGS(ERROR) << "TransferChunked: Failed to populate transfer vmo: " |
| << populate_status.status_string(); |
| return ToPagerErrorStatus(populate_status.status_value()); |
| } |
| |
| auto decommit_decompressed = fit::defer([this, length = mapping.decompressed_length]() { |
| // Decommit pages in the decompression buffer that might have been populated. All blobs share |
| // the same transfer buffer - this prevents data leaks between different blobs. |
| decompression_buffer_.op_range(ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize), |
| nullptr, 0); |
| }); |
| |
| // Map the decompression VMO. |
| fzl::VmoMapper decompressed_mapper; |
| if (zx_status_t status = |
| decompressed_mapper.Map(decompression_buffer_, 0, mapping.decompressed_length, |
| ZX_VM_PERM_READ | ZX_VM_PERM_WRITE) != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferChunked: Failed to map decompress buffer: " |
| << zx_status_get_string(status); |
| return ToPagerErrorStatus(status); |
| } |
| auto unmap_decompression = fit::defer([&]() { decompressed_mapper.Unmap(); }); |
| |
| fs::Ticker ticker(metrics_->Collecting()); |
| size_t decompressed_size = mapping.decompressed_length; |
| zx_status_t decompress_status; |
| if (decompressor_client_) { |
| ZX_DEBUG_ASSERT(sandbox_buffer_.is_valid()); |
| auto decommit_sandbox = fit::defer([this, length = mapping.decompressed_length]() { |
| // Decommit pages in the sandbox buffer that might have been populated. All blobs share |
| // the same sandbox buffer - this prevents data leaks between different blobs. |
| sandbox_buffer_.op_range(ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize), |
| nullptr, 0); |
| }); |
| ExternalSeekableDecompressor decompressor(decompressor_client_.get(), |
| info.decompressor.get()); |
| decompress_status = decompressor.DecompressRange( |
| offset_of_compressed_data, mapping.compressed_length, mapping.decompressed_length); |
| if (decompress_status == ZX_OK) { |
| zx_status_t read_status = |
| sandbox_buffer_.read(decompressed_mapper.start(), 0, mapping.decompressed_length); |
| if (read_status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferChunked: Failed to copy from sandbox buffer: " |
| << zx_status_get_string(read_status); |
| return ToPagerErrorStatus(read_status); |
| } |
| } |
| } else { |
| // Decompress the data |
| uint8_t* src = static_cast<uint8_t*>(compressed_mapper_.start()) + offset_of_compressed_data; |
| decompress_status = info.decompressor->DecompressRange( |
| decompressed_mapper.start(), &decompressed_size, src, mapping.compressed_length, |
| mapping.decompressed_offset); |
| } |
| if (decompress_status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferChunked: Failed to decompress: " |
| << zx_status_get_string(decompress_status); |
| return ToPagerErrorStatus(decompress_status); |
| } |
| metrics_->paged_read_metrics().IncrementDecompression(CompressionAlgorithm::CHUNKED, |
| decompressed_size, ticker.End(), |
| decompressor_client_ != nullptr); |
| |
| // Verify the decompressed pages. |
| const uint64_t rounded_length = |
| fbl::round_up<uint64_t, uint64_t>(mapping.decompressed_length, PAGE_SIZE); |
| zx_status_t status = |
| info.verifier->VerifyPartial(decompressed_mapper.start(), mapping.decompressed_length, |
| mapping.decompressed_offset, rounded_length); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferChunked: Failed to verify data: " << zx_status_get_string(status); |
| return ToPagerErrorStatus(status); |
| } |
| |
| decompressed_mapper.Unmap(); |
| |
| // Move the pages from the decompression buffer to the destination VMO. |
| status = pager_.supply_pages(vmo, mapping.decompressed_offset, rounded_length, |
| decompression_buffer_, 0); |
| if (status != ZX_OK) { |
| FX_LOGS(ERROR) << "TransferChunked: Failed to supply pages to paged VMO: " |
| << zx_status_get_string(status); |
| return ToPagerErrorStatus(status); |
| } |
| metrics_->IncrementPageIn(merkle_root_hash, read_offset, read_len); |
| |
| // Advance the required decompressed offset based on how much has already been populated. |
| current_decompressed_offset = mapping.decompressed_offset + mapping.decompressed_length; |
| } |
| |
| return PagerErrorStatus::kOK; |
| } |
| |
| } // namespace pager |
| } // namespace blobfs |