blob: 1b5d03160e33a9cbe56c452d37e60aa5a8a648c0 [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/storage/blobfs/pager/user_pager.h"
#include <fuchsia/scheduler/cpp/fidl.h>
#include <lib/fdio/directory.h>
#include <lib/fit/defer.h>
#include <lib/fzl/owned-vmo-mapper.h>
#include <lib/syslog/cpp/macros.h>
#include <lib/zx/thread.h>
#include <limits.h>
#include <zircon/status.h>
#include <zircon/threads.h>
#include <algorithm>
#include <memory>
#include <fbl/auto_call.h>
#include <fs/trace.h>
#include "src/storage/blobfs/format.h"
#include "src/storage/blobfs/metrics.h"
#include "src/storage/lib/watchdog/include/lib/watchdog/operations.h"
namespace blobfs {
namespace pager {
struct ReadRange {
uint64_t offset;
uint64_t length;
// Returns a range which covers [offset, offset+length), adjusted for alignment.
// The returned range will have the following guarantees:
// - The range will contain [offset, offset+length).
// - The returned offset will be block-aligned.
// - The end of the returned range is *either* block-aligned or is the end of the file.
// - The range will be adjusted for verification (see |BlobVerifier::Align|).
// The range needs to be extended before actually populating the transfer buffer with pages, as
// absent pages will cause page faults during verification on the userpager thread, causing it to
// block against itself indefinitely.
// For example:
// |...input_range...|
// |..data_block..|..data_block..|..data_block..|
// |........output_range.........|
ReadRange GetBlockAlignedReadRange(const UserPagerInfo& info, uint64_t offset, uint64_t length) {
ZX_DEBUG_ASSERT(offset < info.data_length_bytes);
// Clamp the range to the size of the blob.
length = std::min(length, info.data_length_bytes - offset);
// Align to the block size for verification. (In practice this means alignment to 8k).
zx_status_t status = info.verifier->Align(&offset, &length);
// This only happens if the info.verifier thinks that [offset,length) is out of range, which
// will only happen if |verifier| was initialized with a different length than the rest of |info|
// (which is a programming error).
ZX_DEBUG_ASSERT(offset % kBlobfsBlockSize == 0);
ZX_DEBUG_ASSERT(length % kBlobfsBlockSize == 0 || offset + length == info.data_length_bytes);
return {.offset = offset, .length = length};
// Returns a range at least as big as GetBlockAlignedReadRange(), extended by an implementation
// defined read-ahead algorithm.
// The same alignment guarantees for GetBlockAlignedReadRange() apply.
ReadRange GetBlockAlignedExtendedRange(const UserPagerInfo& info, uint64_t offset,
uint64_t length) {
// TODO(rashaeqbal): Consider making the cluster size dynamic once we have prefetch read
// efficiency metrics from the kernel - i.e. what percentage of prefetched pages are actually
// used. Note that dynamic prefetch sizing might not play well with compression, since we
// always need to read in entire compressed frames.
// TODO(rashaeqbal): Consider extending the range backwards as well. Will need some way to track
// populated ranges.
// Read in at least 32KB at a time. This gives us the best performance numbers w.r.t. memory
// savings and observed latencies. Detailed results from experiments to tune this can be found in
constexpr uint64_t kReadAheadClusterSize = (32 * (1 << 10));
size_t read_ahead_offset = offset;
size_t read_ahead_length = std::max(kReadAheadClusterSize, length);
read_ahead_length = std::min(read_ahead_length, info.data_length_bytes - read_ahead_offset);
// Align to the block size for verification. (In practice this means alignment to 8k).
return GetBlockAlignedReadRange(info, read_ahead_offset, read_ahead_length);
// Helper function to apply a scheduling deadline profile to the pager |thread|. Called from
// |UserPager::Create| after starting the pager thread.
void SetDeadlineProfile(thrd_t thread) {
zx::channel channel0, channel1;
zx_status_t status = zx::channel::create(0u, &channel0, &channel1);
if (status != ZX_OK) {
FX_LOGS(WARNING) << "Could not create channel pair: " << zx_status_get_string(status);
// Connect to the scheduler profile provider service.
status = fdio_service_connect(
(std::string("/svc_blobfs/") + fuchsia::scheduler::ProfileProvider::Name_).c_str(),
if (status != ZX_OK) {
FX_LOGS(WARNING) << "Could not connect to scheduler profile provider: "
<< zx_status_get_string(status);
fuchsia::scheduler::ProfileProvider_SyncProxy provider(std::move(channel1));
zx_status_t fidl_status = ZX_OK;
zx::profile profile;
// Deadline profile parameters for the pager thread.
// Details on the performance analysis to arrive at these numbers can be found in
// TODO( Migrate to the role-based API when available, instead of hard
// coding parameters.
const zx_duration_t capacity = ZX_USEC(1800);
const zx_duration_t deadline = ZX_USEC(2800);
const zx_duration_t period = deadline;
status = provider.GetDeadlineProfile(
capacity, deadline, period, "/boot/bin/blobfs:blobfs-pager-thread", &fidl_status, &profile);
if (status != ZX_OK || fidl_status != ZX_OK) {
FX_LOGS(WARNING) << "Failed to get deadline profile: " << zx_status_get_string(status) << ", "
<< zx_status_get_string(fidl_status);
} else {
auto pager_thread = zx::unowned_thread(thrd_get_zx_handle(thread));
// Set the deadline profile.
status = pager_thread->set_profile(profile, 0);
if (status != ZX_OK) {
FX_LOGS(WARNING) << "Failed to set deadline profile: " << zx_status_get_string(status);
UserPager::Worker::Worker(size_t decompression_buffer_size, BlobfsMetrics* metrics)
: decompression_buffer_size_(decompression_buffer_size), metrics_(metrics) {}
zx::status<std::unique_ptr<UserPager::Worker>> UserPager::Worker::Create(
std::unique_ptr<TransferBuffer> uncompressed_buffer,
std::unique_ptr<TransferBuffer> compressed_buffer, size_t decompression_buffer_size,
BlobfsMetrics* metrics, bool sandbox_decompression) {
ZX_DEBUG_ASSERT(metrics != nullptr && uncompressed_buffer != nullptr &&
uncompressed_buffer->vmo().is_valid() && compressed_buffer != nullptr &&
if (uncompressed_buffer->size() % kBlobfsBlockSize ||
compressed_buffer->size() % kBlobfsBlockSize ||
decompression_buffer_size % kBlobfsBlockSize) {
return zx::error(ZX_ERR_INVALID_ARGS);
if (compressed_buffer->size() < decompression_buffer_size) {
return zx::error(ZX_ERR_INVALID_ARGS);
TRACE_DURATION("blobfs", "UserPager::Worker::Create");
auto worker =
std::unique_ptr<UserPager::Worker>(new UserPager::Worker(decompression_buffer_size, metrics));
worker->uncompressed_transfer_buffer_ = std::move(uncompressed_buffer);
worker->compressed_transfer_buffer_ = std::move(compressed_buffer);
zx_status_t status =
worker->compressed_mapper_.Map(worker->compressed_transfer_buffer_->vmo(), 0,
worker->compressed_transfer_buffer_->size(), ZX_VM_PERM_READ);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to map the compressed TransferBuffer: "
<< zx_status_get_string(status);
return zx::error(status);
status = zx::vmo::create(worker->decompression_buffer_size_, 0, &worker->decompression_buffer_);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to create decompression buffer: " << zx_status_get_string(status);
return zx::error(status);
if (sandbox_decompression) {
status = zx::vmo::create(kDecompressionBufferSize, 0, &worker->sandbox_buffer_);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to create sandbox buffer: %s\n" << zx_status_get_string(status);
return zx::error(status);
auto client_or = ExternalDecompressorClient::Create(worker->sandbox_buffer_,
if (!client_or.is_ok()) {
return zx::error(client_or.status_value());
worker->decompressor_client_ = std::move(client_or.value());
return zx::ok(std::move(worker));
PagerErrorStatus UserPager::Worker::TransferPagesToVmo(uint64_t offset, uint64_t length,
const zx::vmo& vmo,
const UserPagerInfo& info,
const zx::pager& pager) {
size_t end;
if (add_overflow(offset, length, &end)) {
FX_LOGS(ERROR) << "pager transfer range would overflow (off=" << offset << ", len=" << length
<< ") for blob " << info.verifier->digest();
return PagerErrorStatus::kErrBadState;
if (info.decompressor != nullptr) {
return TransferChunkedPagesToVmo(offset, length, vmo, info, pager);
} else {
return TransferUncompressedPagesToVmo(offset, length, vmo, info, pager);
// The requested range is aligned in multiple steps as follows:
// 1. The range is extended to speculatively read in 32k at a time.
// 2. The extended range is further aligned for Merkle tree verification later.
// 3. This range is read in chunks equal to the size of the uncompressed_transfer_buffer_. Each
// chunk is verified as it is read in, and spliced into the destination VMO with supply_pages().
// The assumption here is that the transfer buffer is sized per the alignment requirements for
// Merkle tree verification. We have static asserts in place to check this assumption - the transfer
// buffer (256MB) is 8k block aligned.
PagerErrorStatus UserPager::Worker::TransferUncompressedPagesToVmo(uint64_t requested_offset,
uint64_t requested_length,
const zx::vmo& vmo,
const UserPagerInfo& info,
const zx::pager& pager) {
const auto [start_offset, total_length] =
GetBlockAlignedExtendedRange(info, requested_offset, requested_length);
TRACE_DURATION("blobfs", "UserPager::TransferUncompressedPagesToVmo", "offset", start_offset,
"length", total_length);
uint64_t offset = start_offset;
uint64_t length_remaining = total_length;
uint64_t length;
// Read in multiples of the transfer buffer size. In practice we should only require one iteration
// for the majority of cases, since the transfer buffer is 256MB.
while (length_remaining > 0) {
length = std::min(uncompressed_transfer_buffer_->size(), length_remaining);
auto decommit = fit::defer([this, length = length]() {
// Decommit pages in the transfer buffer that might have been populated. All blobs share the
// same transfer buffer - this prevents data leaks between different blobs.
ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize), nullptr, 0);
// Read from storage into the transfer buffer.
auto populate_status = uncompressed_transfer_buffer_->Populate(offset, length, info);
if (!populate_status.is_ok()) {
FX_LOGS(ERROR) << "TransferUncompressed: Failed to populate transfer vmo for blob "
<< info.verifier->digest() << ": " << populate_status.status_string();
return ToPagerErrorStatus(populate_status.status_value());
const uint64_t rounded_length = fbl::round_up<uint64_t, uint64_t>(length, PAGE_SIZE);
// The block size is a multiple of the page size and |length| has already been block aligned. If
// |rounded_length| is greater than |length| then |length| isn't block aligned because it's at
// the end of the blob. In the compact layout the Merkle tree can share the last block of the
// data and may have been read into the transfer buffer. The Merkle tree needs to be removed
// before transfering the pages to the destination VMO.
static_assert(kBlobfsBlockSize % PAGE_SIZE == 0);
if (rounded_length > length) {
zx_status_t status = uncompressed_transfer_buffer_->vmo().op_range(
ZX_VMO_OP_ZERO, length, rounded_length - length, nullptr, 0);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "TransferUncompressed: Failed to remove Merkle tree from "
"transfer buffer: "
<< zx_status_get_string(status);
return ToPagerErrorStatus(status);
// Verify the pages read in.
fzl::VmoMapper mapping;
// We need to unmap the transfer VMO before its pages can be transferred to the destination
// VMO, via |zx_pager_supply_pages|.
auto unmap = fit::defer([&]() { mapping.Unmap(); });
// Map the transfer VMO in order to pass the verifier a pointer to the data.
zx_status_t status =
mapping.Map(uncompressed_transfer_buffer_->vmo(), 0, rounded_length, ZX_VM_PERM_READ);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "TransferUncompressed: Failed to map transfer buffer: "
<< zx_status_get_string(status);
return ToPagerErrorStatus(status);
status = info.verifier->VerifyPartial(mapping.start(), length, offset, rounded_length);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "TransferUncompressed: Failed to verify data for blob "
<< info.verifier->digest() << ": " << zx_status_get_string(status);
return ToPagerErrorStatus(status);
// Move the pages from the transfer buffer to the destination VMO.
zx_status_t status =
pager.supply_pages(vmo, offset, rounded_length, uncompressed_transfer_buffer_->vmo(), 0);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "TransferUncompressed: Failed to supply pages to paged VMO for blob "
<< info.verifier->digest() << ": " << zx_status_get_string(status);
return ToPagerErrorStatus(status);
length_remaining -= length;
offset += length;
fbl::String merkle_root_hash = info.verifier->digest().ToString();
metrics_->IncrementPageIn(merkle_root_hash, start_offset, total_length);
return PagerErrorStatus::kOK;
// The requested range is aligned in multiple steps as follows:
// 1. The desired uncompressed range is aligned for Merkle tree verification.
// 2. This range is extended to span complete compression frames / chunks, since that is the
// granularity we can decompress data in. The result of this alignment produces a
// CompressionMapping, which contains the mapping of the requested uncompressed range to the
// compressed range that needs to be read in from disk.
// 3. The uncompressed range is processed in chunks equal to the decompression_buffer_size_. For
// each chunk, we compute the CompressionMapping to determine the compressed range that needs to be
// read in. Each chunk is uncompressed and verified as it is read in, and spliced into the
// destination VMO with supply_pages().
// There are two assumptions we make here: First that the decompression buffer is sized per the
// alignment requirements for Merkle tree verification. And second that the transfer buffer is sized
// such that it can accommodate all the compressed data for the decompression buffer, i.e. the
// transfer buffer should work with the worst case compression ratio of 1. We have static asserts in
// place to check both these assumptions - the transfer buffer is the same size as the decompression
// buffer (256MB), and both these buffers are 8k block aligned.
PagerErrorStatus UserPager::Worker::TransferChunkedPagesToVmo(uint64_t requested_offset,
uint64_t requested_length,
const zx::vmo& vmo,
const UserPagerInfo& info,
const zx::pager& pager) {
const auto [offset, length] = GetBlockAlignedReadRange(info, requested_offset, requested_length);
TRACE_DURATION("blobfs", "UserPager::TransferChunkedPagesToVmo", "offset", offset, "length",
fbl::String merkle_root_hash = info.verifier->digest().ToString();
size_t current_decompressed_offset = offset;
size_t desired_decompressed_end = offset + length;
// Read in multiples of the decompression buffer size. In practice we should only require one
// iteration for the majority of cases, since the decompression buffer is 256MB.
while (current_decompressed_offset < desired_decompressed_end) {
size_t current_decompressed_length = desired_decompressed_end - current_decompressed_offset;
zx::status<CompressionMapping> mapping_status = info.decompressor->MappingForDecompressedRange(
current_decompressed_offset, current_decompressed_length, decompression_buffer_size_);
if (!mapping_status.is_ok()) {
FX_LOGS(ERROR) << "TransferChunked: Failed to find range for [" << offset << ", "
<< current_decompressed_offset + current_decompressed_length << ") for blob "
<< info.verifier->digest() << ": " << mapping_status.status_string();
return ToPagerErrorStatus(mapping_status.status_value());
CompressionMapping mapping = mapping_status.value();
// The compressed frame may not fall at a block aligned address, but we read in block aligned
// chunks. This offset will be applied to the buffer we pass to decompression.
// TODO(jfsulliv): Caching blocks which span frames may be useful for performance.
size_t offset_of_compressed_data = mapping.compressed_offset % kBlobfsBlockSize;
// Read from storage into the transfer buffer.
size_t read_offset = fbl::round_down(mapping.compressed_offset, kBlobfsBlockSize);
size_t read_len = (mapping.compressed_length + offset_of_compressed_data);
auto decommit_compressed = fit::defer([this, length = read_len]() {
// Decommit pages in the transfer buffer that might have been populated. All blobs share the
// same transfer buffer - this prevents data leaks between different blobs.
ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize), nullptr, 0);
auto populate_status = compressed_transfer_buffer_->Populate(read_offset, read_len, info);
if (!populate_status.is_ok()) {
FX_LOGS(ERROR) << "TransferChunked: Failed to populate transfer vmo for blob "
<< info.verifier->digest() << ": " << populate_status.status_string();
return ToPagerErrorStatus(populate_status.status_value());
auto decommit_decompressed = fit::defer([this, length = mapping.decompressed_length]() {
// Decommit pages in the decompression buffer that might have been populated. All blobs share
// the same transfer buffer - this prevents data leaks between different blobs.
decompression_buffer_.op_range(ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize),
nullptr, 0);
// Map the decompression VMO.
fzl::VmoMapper decompressed_mapper;
if (zx_status_t status =
decompressed_mapper.Map(decompression_buffer_, 0, mapping.decompressed_length,
FX_LOGS(ERROR) << "TransferChunked: Failed to map decompress buffer: "
<< zx_status_get_string(status);
return ToPagerErrorStatus(status);
auto unmap_decompression = fit::defer([&]() { decompressed_mapper.Unmap(); });
fs::Ticker ticker(metrics_->Collecting());
size_t decompressed_size = mapping.decompressed_length;
zx_status_t decompress_status;
if (decompressor_client_) {
auto decommit_sandbox = fit::defer([this, length = mapping.decompressed_length]() {
// Decommit pages in the sandbox buffer that might have been populated. All blobs share
// the same sandbox buffer - this prevents data leaks between different blobs.
sandbox_buffer_.op_range(ZX_VMO_OP_DECOMMIT, 0, fbl::round_up(length, kBlobfsBlockSize),
nullptr, 0);
ExternalSeekableDecompressor decompressor(decompressor_client_.get(),
decompress_status = decompressor.DecompressRange(
offset_of_compressed_data, mapping.compressed_length, mapping.decompressed_length);
if (decompress_status == ZX_OK) {
zx_status_t read_status =, 0, mapping.decompressed_length);
if (read_status != ZX_OK) {
FX_LOGS(ERROR) << "TransferChunked: Failed to copy from sandbox buffer: "
<< zx_status_get_string(read_status);
return ToPagerErrorStatus(read_status);
} else {
// Decompress the data
uint8_t* src = static_cast<uint8_t*>(compressed_mapper_.start()) + offset_of_compressed_data;
decompress_status = info.decompressor->DecompressRange(
decompressed_mapper.start(), &decompressed_size, src, mapping.compressed_length,
if (decompress_status != ZX_OK) {
FX_LOGS(ERROR) << "TransferChunked: Failed to decompress for blob " << info.verifier->digest()
<< ": " << zx_status_get_string(decompress_status);
return ToPagerErrorStatus(decompress_status);
decompressed_size, ticker.End(),
decompressor_client_ != nullptr);
// Verify the decompressed pages.
const uint64_t rounded_length =
fbl::round_up<uint64_t, uint64_t>(mapping.decompressed_length, PAGE_SIZE);
zx_status_t status =
info.verifier->VerifyPartial(decompressed_mapper.start(), mapping.decompressed_length,
mapping.decompressed_offset, rounded_length);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "TransferChunked: Failed to verify data for blob "
<< info.verifier->digest() << ": " << zx_status_get_string(status);
return ToPagerErrorStatus(status);
// Move the pages from the decompression buffer to the destination VMO.
status = pager.supply_pages(vmo, mapping.decompressed_offset, rounded_length,
decompression_buffer_, 0);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "TransferChunked: Failed to supply pages to paged VMO for blob "
<< info.verifier->digest() << ": " << zx_status_get_string(status);
return ToPagerErrorStatus(status);
metrics_->IncrementPageIn(merkle_root_hash, read_offset, read_len);
// Advance the required decompressed offset based on how much has already been populated.
current_decompressed_offset = mapping.decompressed_offset + mapping.decompressed_length;
return PagerErrorStatus::kOK;
UserPager::UserPager(std::unique_ptr<Worker> worker) : worker_(std::move(worker)) {}
zx::status<std::unique_ptr<UserPager>> UserPager::Create(
std::unique_ptr<TransferBuffer> uncompressed_buffer,
std::unique_ptr<TransferBuffer> compressed_buffer, size_t decompression_buffer_size,
BlobfsMetrics* metrics, bool sandbox_decompression) {
auto worker_or =
UserPager::Worker::Create(std::move(uncompressed_buffer), std::move(compressed_buffer),
decompression_buffer_size, metrics, sandbox_decompression);
if (worker_or.is_error()) {
return worker_or.take_error();
auto pager = std::unique_ptr<UserPager>(new UserPager(std::move(worker_or.value())));
// Create the pager object.
zx_status_t status = zx::pager::create(0, &pager->pager_);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Cannot initialize pager";
return zx::error(status);
// Start the worker thread.
thrd_t thread;
status = pager->pager_loop_.StartThread("blobfs-pager-thread", &thread);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Could not start pager thread";
return zx::error(status);
// Set a scheduling deadline profile for the blobfs-pager-thread. This is purely a performance
// optimization, and failure to do so is not fatal. So in the case of an error encountered
// in any of the steps within |SetDeadlineProfile|, we log a warning, and successfully return the
// UserPager instance.
// Initialize and start the watchdog.
pager->watchdog_ = fs_watchdog::CreateWatchdog();
zx::status<> watchdog_status = pager->watchdog_->Start();
if (!watchdog_status.is_ok()) {
FX_LOGS(ERROR) << "Could not start pager watchdog";
return zx::error(watchdog_status.status_value());
return zx::ok(std::move(pager));
PagerErrorStatus UserPager::TransferPagesToVmo(uint64_t offset, uint64_t length, const zx::vmo& vmo,
const UserPagerInfo& info) {
static const fs_watchdog::FsOperationType kOperation(
fs_watchdog::FsOperationType::CommonFsOperation::PageFault, std::chrono::seconds(60));
[[maybe_unused]] fs_watchdog::FsOperationTracker tracker(&kOperation, watchdog_.get());
// TODO( This will eventually contain logic for selecting which worker will
// handle this request.
return worker_->TransferPagesToVmo(offset, length, vmo, info, pager_);
} // namespace pager
} // namespace blobfs