blob: 3602f9196b3b3237b01fb536ca7806664d3877ea [file] [log] [blame] [edit]
// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/storage/blobfs/blob.h"
#include <assert.h>
#include <ctype.h>
#include <fidl/fuchsia.io/cpp/wire.h>
#include <fuchsia/device/c/fidl.h>
#include <lib/fit/defer.h>
#include <lib/sync/completion.h>
#include <lib/syslog/cpp/macros.h>
#include <lib/zx/status.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <zircon/assert.h>
#include <zircon/errors.h>
#include <zircon/status.h>
#include <zircon/syscalls.h>
#include <algorithm>
#include <iterator>
#include <memory>
#include <string_view>
#include <utility>
#include <vector>
#include <fbl/algorithm.h>
#include <fbl/ref_ptr.h>
#include <fbl/string_buffer.h>
#include <safemath/checked_math.h>
#include "src/lib/digest/digest.h"
#include "src/lib/digest/merkle-tree.h"
#include "src/lib/digest/node-digest.h"
#include "src/lib/storage/vfs/cpp/journal/data_streamer.h"
#include "src/storage/blobfs/blob_data_producer.h"
#include "src/storage/blobfs/blob_layout.h"
#include "src/storage/blobfs/blob_verifier.h"
#include "src/storage/blobfs/blobfs.h"
#include "src/storage/blobfs/common.h"
#include "src/storage/blobfs/compression/chunked.h"
#include "src/storage/blobfs/compression/streaming_chunked_decompressor.h"
#include "src/storage/blobfs/compression_settings.h"
#include "src/storage/blobfs/format.h"
#include "src/storage/blobfs/iterator/allocated_extent_iterator.h"
#include "src/storage/blobfs/iterator/block_iterator.h"
#include "src/storage/blobfs/iterator/extent_iterator.h"
#include "src/storage/blobfs/iterator/node_populator.h"
#include "src/storage/blobfs/iterator/vector_extent_iterator.h"
namespace blobfs {
namespace {
// When performing streaming writes, to ensure block alignment, we must cache data in memory before
// it is streamed into the writeback buffer. The lower this value is, the less memory will be used
// during streaming writes, at the expense of performing more (smaller) unbuffered IO operations.
constexpr size_t kCacheFlushThreshold = 4;
static_assert(kCacheFlushThreshold <= Blobfs::WriteBufferBlockCount(),
"Number of cached blocks exceeds size of writeback cache.");
// Maximum amount of data which can be kept in memory while decompressing pre-compressed blobs. Must
// be big enough to hold the largest decompressed chunk of a blob but small enough to prevent denial
// of service attacks via memory exhaustion. Arbitrarily set at 256 MiB to match the pager. Chunks
// may not be page aligned, thus maximum memory consumption may be one page more than this amount.
constexpr uint64_t kMaxDecompressionMemoryUsage = 256 * (1ull << 20);
const size_t kSystemPageSize = zx_system_get_page_size();
} // namespace
// Data used exclusively during writeback.
struct Blob::WriteInfo {
WriteInfo() = default;
// Not copyable or movable because merkle_tree_creator has a pointer to digest.
WriteInfo(const WriteInfo&) = delete;
WriteInfo& operator=(const WriteInfo&) = delete;
// We leave room in the merkle tree buffer to add padding before the merkle tree which might be
// required with the compact blob layout.
uint8_t* merkle_tree(uint64_t block_size) const {
ZX_ASSERT_MSG(merkle_tree_buffer, "Merkle tree buffer should not be nullptr");
return merkle_tree_buffer.get() + block_size;
}
// Amount of data written via the fuchsia.io Write() or Append() methods thus far.
uint64_t bytes_written = 0;
std::vector<ReservedExtent> extents;
std::vector<ReservedNode> node_indices;
std::optional<BlobCompressor> compressor;
// The fused write error. Once writing has failed, we return the same error on subsequent
// writes in case a higher layer dropped the error and returned a short write instead.
zx_status_t write_error = ZX_ERR_BAD_STATE;
// As data is written, we build the merkle tree using this.
digest::MerkleTreeCreator merkle_tree_creator;
// The merkle tree creator stores the root digest here.
uint8_t digest[digest::kSha256Length];
// The merkle tree creator stores the rest of the tree here. The buffer includes space for
// padding. See the comment for merkle_tree() above.
std::unique_ptr<uint8_t[]> merkle_tree_buffer;
// The old blob that this write is replacing.
fbl::RefPtr<Blob> old_blob;
fzl::OwnedVmoMapper buffer;
std::unique_ptr<BlobLayout> blob_layout = nullptr;
std::unique_ptr<fs::DataStreamer> streamer = nullptr;
// If true, indicates we are streaming the current blob to disk as written. This implies that
// we have disabled dynamic compression (i.e. if true, |compressor| should be std::nullopt).
bool streaming_write = false;
// Amount of data persisted to disk thus far (always <= bytes_written). Will always be 0 if
// streaming writes has been disabled.
uint64_t bytes_persisted = 0;
BlockIterator block_iter = BlockIterator{nullptr};
// Format of the data being written/persisted to disk.
CompressionAlgorithm data_format = CompressionAlgorithm::kUncompressed;
// Size of the data persisted to disk for this blob.
uint64_t data_size;
// Seek table of the incoming data if |data_format| is CompressionAlgorithm::kChunked.
chunked_compression::SeekTable seek_table;
// Decompressor to use on incoming data if |data_format| is CompressionAlgorithm::kChunked.
std::unique_ptr<StreamingChunkedDecompressor> streaming_decompressor;
};
zx_status_t Blob::VerifyNullBlob() const {
ZX_ASSERT_MSG(blob_size_ == 0, "Inode blob size is not zero :%lu", blob_size_);
auto verifier_or = BlobVerifier::CreateWithoutTree(digest(), blobfs_->GetMetrics(), 0,
&blobfs_->blob_corruption_notifier());
if (verifier_or.is_error())
return verifier_or.error_value();
return verifier_or->Verify(nullptr, 0, 0);
}
uint64_t Blob::FileSize() const {
std::lock_guard lock(mutex_);
if (state() == BlobState::kReadable)
return blob_size_;
return 0;
}
Blob::Blob(Blobfs* bs, const digest::Digest& digest, CompressionAlgorithm data_format)
: CacheNode(*bs->vfs(), digest), blobfs_(bs), write_info_(std::make_unique<WriteInfo>()) {
write_info_->data_format = data_format;
}
Blob::Blob(Blobfs* bs, uint32_t node_index, const Inode& inode)
: CacheNode(*bs->vfs(), digest::Digest(inode.merkle_root_hash)),
blobfs_(bs),
state_(BlobState::kReadable),
syncing_state_(SyncingState::kDone),
map_index_(node_index),
blob_size_(inode.blob_size),
block_count_(inode.block_count) {}
zx_status_t Blob::WriteNullBlob() {
ZX_DEBUG_ASSERT(blob_size_ == 0);
ZX_DEBUG_ASSERT(block_count_ == 0);
if (zx_status_t status = VerifyNullBlob(); status != ZX_OK) {
return status;
}
BlobTransaction transaction;
if (zx_status_t status = WriteMetadata(transaction); status != ZX_OK) {
return status;
}
transaction.Commit(*blobfs_->GetJournal(), {},
[blob = fbl::RefPtr(this)]() { blob->CompleteSync(); });
return MarkReadable();
}
zx_status_t Blob::PrepareWrite(uint64_t size_data, bool compress) {
if (size_data > 0 && fbl::round_up(size_data, GetBlockSize()) == 0) {
// Fail early if |size_data| would overflow when rounded up to block size.
return ZX_ERR_OUT_OF_RANGE;
}
std::lock_guard lock(mutex_);
if (state() != BlobState::kEmpty) {
return ZX_ERR_BAD_STATE;
}
// Make sure we don't compress pre-compressed data.
if (write_info_->data_format != CompressionAlgorithm::kUncompressed) {
compress = false;
}
// Streaming writes are only supported when we're not doing dynamic compression.
write_info_->streaming_write = blobfs_->use_streaming_writes() && !compress;
write_info_->data_size = size_data;
// If incoming data isn't compressed, then we already know the size of the blob.
if (size_data > 0 && write_info_->data_format == CompressionAlgorithm::kUncompressed) {
blob_size_ = size_data;
zx_status_t status = InitializeMerkleBuffer();
if (status != ZX_OK) {
return status;
}
}
// Reserve a node for blob's inode. We might need more nodes for extents later.
zx_status_t status = blobfs_->GetAllocator()->ReserveNodes(1, &write_info_->node_indices);
if (status != ZX_OK) {
return status;
}
map_index_ = write_info_->node_indices[0].index();
// Initialize write buffers. For compressed blobs, we only write into the compression buffer.
// For uncompressed or pre-compressed blobs, we write into the data vmo.
if (size_data > 0) {
if (compress) {
write_info_->compressor =
BlobCompressor::Create(blobfs_->write_compression_settings(), blob_size_);
if (!write_info_->compressor) {
// TODO(fxbug.dev/70356)Make BlobCompressor::Create return the actual error instead.
// Replace ZX_ERR_INTERNAL with the correct error once fxbug.dev/70356 is fixed.
FX_LOGS(ERROR) << "Failed to initialize compressor: " << ZX_ERR_INTERNAL;
return ZX_ERR_INTERNAL;
}
} else {
VmoNameBuffer name = FormatWritingBlobDataVmoName(digest());
const uint64_t block_aligned_size = fbl::round_up(size_data, kBlobfsBlockSize);
status = write_info_->buffer.CreateAndMap(block_aligned_size, name.c_str());
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to create vmo for writing blob " << digest()
<< " (vmo size = " << block_aligned_size
<< "): " << zx_status_get_string(status);
return status;
}
}
write_info_->streamer =
std::make_unique<fs::DataStreamer>(blobfs_->GetJournal(), Blobfs::WriteBufferBlockCount());
write_info_->streaming_write = blobfs_->use_streaming_writes() && !compress;
}
set_state(BlobState::kDataWrite);
// Special case for the null blob: We skip the write phase.
return write_info_->data_size == 0 ? WriteNullBlob() : ZX_OK;
}
void Blob::SetOldBlob(Blob& blob) {
std::lock_guard lock(mutex_);
write_info_->old_blob = fbl::RefPtr(&blob);
}
zx_status_t Blob::SpaceAllocate() {
ZX_DEBUG_ASSERT(write_info_->blob_layout != nullptr);
ZX_DEBUG_ASSERT(write_info_->blob_layout->TotalBlockCount() != 0);
TRACE_DURATION("blobfs", "Blob::SpaceAllocate", "block_count",
write_info_->blob_layout->TotalBlockCount());
fs::Ticker ticker;
std::vector<ReservedExtent> extents;
std::vector<ReservedNode> nodes;
// Reserve space for the blob.
const uint64_t block_count = write_info_->blob_layout->TotalBlockCount();
const uint64_t reserved_blocks = blobfs_->GetAllocator()->ReservedBlockCount();
zx_status_t status = blobfs_->GetAllocator()->ReserveBlocks(block_count, &extents);
if (status == ZX_ERR_NO_SPACE && reserved_blocks > 0) {
// It's possible that a blob has just been unlinked but has yet to be flushed through the
// journal, and the blocks are still reserved, so if that looks likely, force a flush and then
// try again. This might need to be revisited if/when blobfs becomes multi-threaded.
sync_completion_t sync;
blobfs_->Sync([&](zx_status_t) { sync_completion_signal(&sync); });
sync_completion_wait(&sync, ZX_TIME_INFINITE);
status = blobfs_->GetAllocator()->ReserveBlocks(block_count, &extents);
}
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to allocate " << write_info_->blob_layout->TotalBlockCount()
<< " blocks for blob: " << zx_status_get_string(status);
return status;
}
if (extents.size() > kMaxExtentsPerBlob) {
FX_LOGS(ERROR) << "Error: Block reservation requires too many extents (" << extents.size()
<< " vs " << kMaxExtentsPerBlob << " max)";
return ZX_ERR_BAD_STATE;
}
// Reserve space for all additional nodes necessary to contain this blob. The inode has already
// been reserved in Blob::PrepareWrite. Hence, we need to reserve one less node here.
size_t node_count = NodePopulator::NodeCountForExtents(extents.size()) - 1;
status = blobfs_->GetAllocator()->ReserveNodes(node_count, &nodes);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to reserve " << node_count
<< " nodes for blob: " << zx_status_get_string(status);
return status;
}
write_info_->extents = std::move(extents);
write_info_->node_indices.insert(write_info_->node_indices.end(),
std::make_move_iterator(nodes.begin()),
std::make_move_iterator(nodes.end()));
write_info_->block_iter =
BlockIterator(std::make_unique<VectorExtentIterator>(write_info_->extents));
block_count_ = block_count;
blobfs_->GetMetrics()->UpdateAllocation(blob_size_, ticker.End());
return ZX_OK;
}
bool Blob::IsDataLoaded() const {
// Data is served out of the paged_vmo().
return paged_vmo().is_valid();
}
zx_status_t Blob::WriteMetadata(BlobTransaction& transaction) {
TRACE_DURATION("blobfs", "Blobfs::WriteMetadata");
ZX_DEBUG_ASSERT(state() == BlobState::kDataWrite);
if (block_count_) {
// We utilize the NodePopulator class to take our reserved blocks and nodes and fill the
// persistent map with an allocated inode / container.
// If |on_node| is invoked on a node, it means that node was necessary to represent this
// blob. Persist the node back to durable storage.
auto on_node = [this, &transaction](uint32_t node_index) {
blobfs_->PersistNode(node_index, transaction);
};
// If |on_extent| is invoked on an extent, it was necessary to represent this blob. Persist
// the allocation of these blocks back to durable storage.
auto on_extent = [this, &transaction](ReservedExtent& extent) {
blobfs_->PersistBlocks(extent, transaction);
return NodePopulator::IterationCommand::Continue;
};
auto mapped_inode_or_error = blobfs_->GetNode(map_index_);
if (mapped_inode_or_error.is_error()) {
return mapped_inode_or_error.status_value();
}
InodePtr mapped_inode = std::move(mapped_inode_or_error).value();
*mapped_inode = Inode{
.blob_size = blob_size_,
.block_count = safemath::checked_cast<uint32_t>(block_count_),
};
digest().CopyTo(mapped_inode->merkle_root_hash);
NodePopulator populator(blobfs_->GetAllocator(), std::move(write_info_->extents),
std::move(write_info_->node_indices));
zx_status_t status = populator.Walk(on_node, on_extent);
ZX_ASSERT_MSG(status == ZX_OK, "populator.Walk failed with error: %s",
zx_status_get_string(status));
SetCompressionAlgorithm(&*mapped_inode, write_info_->data_format);
} else {
// Special case: Empty node.
ZX_DEBUG_ASSERT(write_info_->node_indices.size() == 1);
auto mapped_inode_or_error = blobfs_->GetNode(map_index_);
if (mapped_inode_or_error.is_error()) {
return mapped_inode_or_error.status_value();
}
InodePtr mapped_inode = std::move(mapped_inode_or_error).value();
*mapped_inode = Inode{};
digest().CopyTo(mapped_inode->merkle_root_hash);
blobfs_->GetAllocator()->MarkInodeAllocated(std::move(write_info_->node_indices[0]));
blobfs_->PersistNode(map_index_, transaction);
}
return ZX_OK;
}
zx_status_t Blob::WriteInternal(const void* data, size_t len, size_t* actual) {
TRACE_DURATION("blobfs", "Blobfs::WriteInternal", "data", data, "len", len);
if (state() == BlobState::kError) {
return write_info_ ? write_info_->write_error : ZX_ERR_BAD_STATE;
}
if (len == 0) {
return ZX_OK;
}
if (state() != BlobState::kDataWrite) {
return ZX_ERR_BAD_STATE;
}
const size_t to_write = std::min(len, write_info_->data_size - write_info_->bytes_written);
// If we're doing dynamic compression, write the incoming data into the compressor, otherwise
// cache the data in the write buffer VMO.
if (write_info_->compressor) {
if (zx_status_t status = write_info_->compressor->Update(data, to_write); status != ZX_OK) {
return status;
}
} else {
ZX_DEBUG_ASSERT(write_info_->buffer.vmo().is_valid());
if (zx_status_t status =
write_info_->buffer.vmo().write(data, write_info_->bytes_written, to_write);
status != ZX_OK) {
FX_LOGS(ERROR) << "VMO write failed: " << zx_status_get_string(status);
return status;
}
}
// If the blob data is pre-compressed, ensure we've initialized the decompressor first.
// This requires that we've buffered enough data to decode the seek table.
if (write_info_->data_format == CompressionAlgorithm::kChunked &&
write_info_->streaming_decompressor == nullptr) {
zx::status status = InitializeDecompressor(write_info_->bytes_written + to_write);
if (status.is_error()) {
return status.error_value();
}
// We couldn't initialize the decompressor yet since we don't have enough data.
if (!status.value()) {
ZX_DEBUG_ASSERT((write_info_->bytes_written + to_write) < write_info_->data_size);
*actual = to_write;
write_info_->bytes_written += to_write;
return ZX_OK;
}
ZX_DEBUG_ASSERT(write_info_->streaming_decompressor != nullptr);
ZX_DEBUG_ASSERT(blob_size_ > 0);
}
// If we're doing streaming writes, try to persist all the data we have buffered so far.
if (write_info_->streaming_write) {
if (zx_status_t status = StreamBufferedData(write_info_->bytes_written + to_write);
status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to perform streaming write: " << zx_status_get_string(status);
return status;
}
}
// Update the Merkle tree with the incoming data. If the blob is pre-compressed, we use the
// decompressor to update the Merkle tree via a callback, otherwise we update it directly..
if (write_info_->streaming_decompressor) {
zx::status status =
write_info_->streaming_decompressor->Update({static_cast<const uint8_t*>(data), to_write});
if (status.is_error()) {
return status.error_value();
}
} else {
if (zx_status_t status = write_info_->merkle_tree_creator.Append(data, to_write);
status != ZX_OK) {
FX_LOGS(ERROR) << "MerkleTreeCreator::Append failed: " << zx_status_get_string(status);
return status;
}
}
*actual = to_write;
write_info_->bytes_written += to_write;
// More data to write.
if (write_info_->bytes_written < write_info_->data_size) {
return ZX_OK;
}
return Commit();
}
zx_status_t Blob::Commit() {
if (digest() != write_info_->digest) {
// Downloaded blob did not match provided digest.
FX_LOGS(ERROR) << "downloaded blob did not match provided digest " << digest();
return ZX_ERR_IO_DATA_INTEGRITY;
}
const size_t merkle_size = write_info_->merkle_tree_creator.GetTreeLength();
bool compress = write_info_->compressor.has_value();
if (compress) {
if (zx_status_t status = write_info_->compressor->End(); status != ZX_OK) {
return status;
}
// If we're using the chunked compressor, abort compression if we're not going to get any
// savings. We can't easily do it for the other compression formats without changing the
// decompression API to support streaming.
if (write_info_->compressor->algorithm() == CompressionAlgorithm::kChunked &&
fbl::round_up(write_info_->compressor->Size() + merkle_size, GetBlockSize()) >=
fbl::round_up(blob_size_ + merkle_size, GetBlockSize())) {
compress = false;
}
}
if (compress) {
write_info_->data_format = write_info_->compressor->algorithm();
write_info_->data_size = write_info_->compressor->Size();
}
fs::Duration generation_time;
// For non-streaming writes, we lazily allocate space.
if (!write_info_->streaming_write) {
if (zx_status_t status = InitializeBlobLayout(); status != ZX_OK) {
return status;
}
if (zx_status_t status = SpaceAllocate(); status != ZX_OK) {
return status;
}
}
std::variant<std::monostate, DecompressBlobDataProducer, SimpleBlobDataProducer> data;
BlobDataProducer* data_ptr = nullptr;
if (compress) {
// The data comes from the compression buffer.
data_ptr = &data.emplace<SimpleBlobDataProducer>(
cpp20::span(static_cast<const uint8_t*>(write_info_->compressor->Data()),
write_info_->compressor->Size()));
} else if (write_info_->compressor) {
// In this case, we've decided against compressing because there are no savings, so we have to
// decompress.
zx::status producer_or =
DecompressBlobDataProducer::Create(*write_info_->compressor, blob_size_);
if (producer_or.is_error()) {
return producer_or.error_value();
}
data_ptr = &data.emplace<DecompressBlobDataProducer>(std::move(producer_or).value());
} else {
// The data comes from the data buffer.
const uint8_t* buff = static_cast<const uint8_t*>(write_info_->buffer.start());
data_ptr = &data.emplace<SimpleBlobDataProducer>(
cpp20::span(buff + write_info_->bytes_persisted,
write_info_->data_size - write_info_->bytes_persisted));
}
SimpleBlobDataProducer merkle(cpp20::span(write_info_->merkle_tree(GetBlockSize()), merkle_size));
MergeBlobDataProducer producer = [&, data_size = write_info_->data_size,
&blob_layout = write_info_->blob_layout]() {
switch (blob_layout->Format()) {
case BlobLayoutFormat::kDeprecatedPaddedMerkleTreeAtStart:
// Write the merkle data first followed by the data. The merkle data should be a multiple
// of the block size so we don't need any padding.
ZX_ASSERT_MSG(merkle.GetRemainingBytes() % GetBlockSize() == 0,
"Merkle data size :%lu not a multiple of blobfs block size %lu",
merkle.GetRemainingBytes(), GetBlockSize());
return MergeBlobDataProducer(merkle, *data_ptr, /*padding=*/0);
case BlobLayoutFormat::kCompactMerkleTreeAtEnd:
// Write the data followed by the merkle tree. There might be some padding between the
// data and the merkle tree.
const size_t padding = blob_layout->MerkleTreeOffset() - data_size;
return MergeBlobDataProducer(*data_ptr, merkle, padding);
}
}();
// Calculate outstanding amount of data to write, and where, in terms of blocks.
uint64_t block_count = write_info_->blob_layout->TotalBlockCount();
uint64_t block_offset = 0;
// If we already streamed some data to disk, update |block_count| and |block_offset| accordingly.
if (write_info_->bytes_persisted > 0) {
if (write_info_->bytes_persisted < write_info_->data_size) {
// Continue writing data from last position (which should be block aligned).
ZX_DEBUG_ASSERT((write_info_->bytes_persisted % GetBlockSize()) == 0);
block_offset = write_info_->bytes_persisted / GetBlockSize();
block_count = write_info_->blob_layout->TotalBlockCount() - block_offset;
} else {
// Already streamed blob data to disk, only the Merkle tree remains.
block_count = write_info_->blob_layout->MerkleTreeBlockCount();
block_offset = write_info_->blob_layout->MerkleTreeBlockOffset();
}
}
// Write remaining data to disk, if any.
if (block_count > 0) {
zx_status_t status = WriteData(block_count, block_offset, producer, *write_info_->streamer);
if (status != ZX_OK) {
return status;
}
}
// No more data to write. Flush data to disk and commit metadata.
fs::Ticker ticker; // Tracking enqueue time.
if (zx_status_t status = FlushData(); status != ZX_OK) {
return status;
}
blobfs_->GetMetrics()->UpdateClientWrite(block_count_ * GetBlockSize(), merkle_size, ticker.End(),
generation_time);
return MarkReadable();
}
zx_status_t Blob::FlushData() {
// Enqueue the blob's final data work. Metadata must be enqueued separately.
zx_status_t data_status = ZX_ERR_IO;
sync_completion_t data_written;
// Issue the signal when the callback is destroyed rather than in the callback because the
// callback won't get called in some error paths.
auto data_written_finished = fit::defer([&] { sync_completion_signal(&data_written); });
auto write_all_data = write_info_->streamer->Flush().then(
[&data_status, data_written_finished = std::move(data_written_finished)](
const fpromise::result<void, zx_status_t>& result) {
data_status = result.is_ok() ? ZX_OK : result.error();
return result;
});
// Discard things we don't need any more. This has to be after the Flush call above to ensure
// all data has been copied from these buffers.
write_info_->buffer.Reset();
// FreePagedVmo() will return the reference that keeps this object alive on behalf of the paging
// system so we can free it outside the lock. However, when a Blob is being written it can't be
// mapped so we know there should be no pager reference. Otherwise, calling FreePagedVmo() will
// make future uses of the mapped data go invalid.
//
// If in the future we need to support memory mapping a paged VMO (like we allow mapping and using
// the portions of a blob that are already known), then this code will have to be changed to not
// free the VMO here (which will in turn require other changes).
fbl::RefPtr<fs::Vnode> pager_reference = FreePagedVmo();
ZX_DEBUG_ASSERT(!pager_reference);
write_info_->compressor.reset();
// Wrap all pending writes with a strong reference to this Blob, so that it stays
// alive while there are writes in progress acting on it.
BlobTransaction transaction;
if (zx_status_t status = WriteMetadata(transaction); status != ZX_OK) {
return status;
}
if (write_info_->old_blob) {
zx_status_t status = blobfs_->FreeInode(write_info_->old_blob->Ino(), transaction);
ZX_ASSERT_MSG(status == ZX_OK, "FreeInode failed with error: %s", zx_status_get_string(status));
auto& cache = GetCache();
status = cache.Evict(write_info_->old_blob);
ZX_ASSERT_MSG(status == ZX_OK, "Failed to evict old blob with error: %s",
zx_status_get_string(status));
status = cache.Add(fbl::RefPtr(this));
ZX_ASSERT_MSG(status == ZX_OK, "Failed to add blob to cache with error: %s",
zx_status_get_string(status));
}
transaction.Commit(*blobfs_->GetJournal(), std::move(write_all_data),
[self = fbl::RefPtr(this)]() {});
// It's not safe to continue until all data has been written because we might need to reload it
// (e.g. if the blob is immediately read after writing), and the journal caches data in ring
// buffers, so wait until that has happened. We don't need to wait for the metadata because we
// cache that.
sync_completion_wait(&data_written, ZX_TIME_INFINITE);
if (data_status != ZX_OK) {
return data_status;
}
return ZX_OK;
}
zx_status_t Blob::WriteData(uint64_t block_count, uint64_t block_offset, BlobDataProducer& producer,
fs::DataStreamer& streamer) {
if (zx_status_t status = IterateToBlock(&write_info_->block_iter, block_offset);
status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to iterate to block offset " << block_offset << ": "
<< zx_status_get_string(status);
return status;
}
const uint64_t data_start = DataStartBlock(blobfs_->Info());
return StreamBlocks(
&write_info_->block_iter, block_count,
[&](uint64_t vmo_offset, uint64_t dev_offset, uint64_t block_count) {
while (block_count) {
if (producer.NeedsFlush()) {
// Queued operations might point at buffers that are about to be invalidated, so we have
// to force those operations to be issued which will cause them to be copied.
streamer.IssueOperations();
}
auto data = producer.Consume(block_count * GetBlockSize());
if (data.is_error())
return data.error_value();
ZX_ASSERT_MSG(!data->empty(), "Data span for writing should not be empty.");
storage::UnbufferedOperation op = {.data = data->data(),
.op = {
.type = storage::OperationType::kWrite,
.dev_offset = dev_offset + data_start,
.length = data->size() / GetBlockSize(),
}};
// Pad if necessary.
const size_t alignment = data->size() % GetBlockSize();
if (alignment > 0) {
memset(const_cast<uint8_t*>(data->end()), 0, GetBlockSize() - alignment);
++op.op.length;
}
block_count -= op.op.length;
dev_offset += op.op.length;
streamer.StreamData(std::move(op));
} // while (block_count)
return ZX_OK;
});
}
zx_status_t Blob::MarkReadable() {
if (readable_event_.is_valid()) {
zx_status_t status = readable_event_.signal(0u, ZX_USER_SIGNAL_0);
if (status != ZX_OK) {
LatchWriteError(status);
return status;
}
}
set_state(BlobState::kReadable);
syncing_state_ = SyncingState::kSyncing;
write_info_.reset();
return ZX_OK;
}
void Blob::LatchWriteError(zx_status_t write_error) {
if (state_ != BlobState::kDataWrite) {
return;
}
if (zx_status_t status = GetCache().Evict(fbl::RefPtr(this)); status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to evict blob from cache: " << zx_status_get_string(status);
}
set_state(BlobState::kError);
write_info_->write_error = write_error;
}
zx_status_t Blob::GetReadableEvent(zx::event* out) {
TRACE_DURATION("blobfs", "Blobfs::GetReadableEvent");
zx_status_t status;
// This is the first 'wait until read event' request received.
if (!readable_event_.is_valid()) {
status = zx::event::create(0, &readable_event_);
if (status != ZX_OK) {
return status;
} else if (state() == BlobState::kReadable) {
readable_event_.signal(0u, ZX_USER_SIGNAL_0);
}
}
zx::event out_event;
status = readable_event_.duplicate(ZX_RIGHTS_BASIC, &out_event);
if (status != ZX_OK) {
return status;
}
*out = std::move(out_event);
return ZX_OK;
}
zx_status_t Blob::CloneDataVmo(zx_rights_t rights, zx::vmo* out_vmo) {
TRACE_DURATION("blobfs", "Blobfs::CloneVmo", "rights", rights);
if (state_ != BlobState::kReadable) {
return ZX_ERR_BAD_STATE;
}
if (blob_size_ == 0) {
return ZX_ERR_BAD_STATE;
}
auto status = LoadVmosFromDisk();
if (status != ZX_OK) {
return status;
}
zx::vmo clone;
status = paged_vmo().create_child(ZX_VMO_CHILD_SNAPSHOT_AT_LEAST_ON_WRITE, 0, blob_size_, &clone);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to create child VMO: " << zx_status_get_string(status);
return status;
}
DidClonePagedVmo();
// Only add exec right to VMO if explictly requested. (Saves a syscall if we're just going to
// drop the right back again in replace() call below.)
if (rights & ZX_RIGHT_EXECUTE) {
// Check if the VMEX resource held by Blobfs is valid and fail if it isn't. We do this to make
// sure that we aren't implicitly relying on the ZX_POL_AMBIENT_MARK_VMO_EXEC job policy.
const zx::resource& vmex = blobfs_->vmex_resource();
if (!vmex.is_valid()) {
FX_LOGS(ERROR) << "No VMEX resource available, executable blobs unsupported";
return ZX_ERR_NOT_SUPPORTED;
}
if ((status = clone.replace_as_executable(vmex, &clone)) != ZX_OK) {
return status;
}
}
// Narrow rights to those requested.
if ((status = clone.replace(rights, &clone)) != ZX_OK) {
return status;
}
*out_vmo = std::move(clone);
return ZX_OK;
}
zx_status_t Blob::ReadInternal(void* data, size_t len, size_t off, size_t* actual) {
TRACE_DURATION("blobfs", "Blobfs::ReadInternal", "len", len, "off", off);
// The common case is that the blob is already loaded. To allow multiple readers, it's important
// to avoid taking an exclusive lock unless necessary.
fs::SharedLock lock(mutex_);
// Only expect this to be called when the blob is open. The fidl API guarantees this but tests
// can easily forget to open the blob before trying to read.
ZX_DEBUG_ASSERT(open_count() > 0);
if (state_ != BlobState::kReadable)
return ZX_ERR_BAD_STATE;
if (!IsDataLoaded()) {
// Release the shared lock and load the data from within an exclusive lock. LoadVmosFromDisk()
// can be called multiple times so the race condition caused by this unlocking will be benign.
lock.unlock();
{
// Load the VMO data from within the lock.
std::lock_guard exclusive_lock(mutex_);
if (zx_status_t status = LoadVmosFromDisk(); status != ZX_OK)
return status;
}
lock.lock();
// The readable state should never change (from the value we checked at the top of this
// function) by attempting to load from disk, that only happens when we try to write.
ZX_DEBUG_ASSERT(state_ == BlobState::kReadable);
}
if (blob_size_ == 0) {
*actual = 0;
return ZX_OK;
}
if (off >= blob_size_) {
*actual = 0;
return ZX_OK;
}
if (len > (blob_size_ - off)) {
len = blob_size_ - off;
}
ZX_DEBUG_ASSERT(IsDataLoaded());
// Send reads through the pager. This will potentially page-in the data by reentering us from the
// kernel on the pager thread.
ZX_DEBUG_ASSERT(paged_vmo().is_valid());
if (zx_status_t status = paged_vmo().read(data, off, len); status != ZX_OK)
return status;
*actual = len;
return ZX_OK;
}
zx_status_t Blob::LoadPagedVmosFromDisk() {
ZX_ASSERT_MSG(!IsDataLoaded(), "Data VMO is not loaded.");
// If there is an overridden cache policy for pager-backed blobs, apply it now. Otherwise the
// system-wide default will be used.
std::optional<CachePolicy> cache_policy = blobfs_->pager_backed_cache_policy();
if (cache_policy) {
set_overridden_cache_policy(*cache_policy);
}
zx::status<LoaderInfo> load_info_or =
blobfs_->loader().LoadBlob(map_index_, &blobfs_->blob_corruption_notifier());
if (load_info_or.is_error())
return load_info_or.error_value();
// Make the vmo.
if (auto status = EnsureCreatePagedVmo(load_info_or->layout->FileBlockAlignedSize());
status.is_error())
return status.error_value();
// Commit the other load information.
loader_info_ = std::move(*load_info_or);
return ZX_OK;
}
zx_status_t Blob::LoadVmosFromDisk() {
// We expect the file to be open in FIDL for this to be called. Whether the paged vmo is
// registered with the pager is dependent on the HasReferences() flag so this should not get
// out-of-sync.
ZX_DEBUG_ASSERT(HasReferences());
if (IsDataLoaded())
return ZX_OK;
if (blob_size_ == 0) {
// Null blobs don't need any loading, just verification that they're correct.
return VerifyNullBlob();
}
zx_status_t status = LoadPagedVmosFromDisk();
if (status == ZX_OK)
SetPagedVmoName(true);
syncing_state_ = SyncingState::kDone;
return status;
}
zx_status_t Blob::QueueUnlink() {
std::lock_guard lock(mutex_);
deletable_ = true;
// Attempt to purge in case the blob has been unlinked with no open fds
return TryPurge();
}
zx_status_t Blob::Verify() {
{
std::lock_guard lock(mutex_);
if (auto status = LoadVmosFromDisk(); status != ZX_OK)
return status;
}
// For non-pager-backed blobs, commit the entire blob in memory. This will cause all of the pages
// to be verified as they are read in (or for the null bob we just verify immediately). If the
// commit operation fails due to a verification failure, we do propagate the error back via the
// return status.
//
// This is a read-only operation on the blob so can be done with the shared lock. Since it will
// reenter the Blob object on the pager thread to satisfy this request, it actually MUST be done
// with only the shared lock or the reentrance on the pager thread will deadlock us.
{
fs::SharedLock lock(mutex_);
// There is a race condition if somehow this blob was unloaded in between the above exclusive
// lock and the shared lock in this block. Currently this is not possible because there is only
// one thread processing fidl messages and paging events on the pager threads can't unload the
// blob.
//
// But in the future certain changes might make this theoretically possible (though very
// difficult to imagine in practice). If this were to happen, we would prefer to err on the side
// of reporting a blob valid rather than mistakenly reporting errors that might cause a valid
// blob to be deleted.
if (state_ != BlobState::kReadable)
return ZX_OK;
if (blob_size_ == 0) {
// It's the null blob, so just verify.
return VerifyNullBlob();
}
return paged_vmo().op_range(ZX_VMO_OP_COMMIT, 0, blob_size_, nullptr, 0);
}
}
void Blob::OnNoPagedVmoClones() {
// Override the default behavior of PagedVnode to avoid clearing the paged_vmo. We keep this
// alive for caching purposes as long as this object is alive, and this object's lifetime is
// managed by the BlobCache.
if (!HasReferences()) {
// Mark the name to help identify the VMO is unused.
SetPagedVmoName(false);
// Hint that the VMO's pages are no longer needed, and can be evicted under memory pressure. If
// a page is accessed again, it will lose the hint.
zx_status_t status = paged_vmo().op_range(ZX_VMO_OP_DONT_NEED, 0, blob_size_, nullptr, 0);
if (status != ZX_OK) {
FX_LOGS(WARNING) << "Hinting DONT_NEED on blob " << digest()
<< " failed: " << zx_status_get_string(status);
}
// This might have been the last reference to a deleted blob, so try purging it.
if (zx_status_t status = TryPurge(); status != ZX_OK) {
FX_LOGS(WARNING) << "Purging blob " << digest()
<< " failed: " << zx_status_get_string(status);
}
}
}
BlobCache& Blob::GetCache() { return blobfs_->GetCache(); }
bool Blob::ShouldCache() const {
std::lock_guard lock(mutex_);
return state() == BlobState::kReadable;
}
void Blob::ActivateLowMemory() {
// The reference returned by FreePagedVmo() needs to be released outside of the lock since it
// could be keeping this class in scope.
fbl::RefPtr<fs::Vnode> pager_reference;
{
std::lock_guard lock(mutex_);
// We shouldn't be putting the blob into a low-memory state while it is still mapped.
//
// It is common for tests to trigger this assert during Blobfs tear-down. This will happen when
// the "no clones" message was not delivered before destruction. This can happen if the test
// code kept a vmo reference, but can also happen when there are no clones because the delivery
// of this message depends on running the message loop which is easy to skip in a test.
//
// Often, the solution is to call RunUntilIdle() on the loop after the test code has cleaned up
// its mappings but before deleting Blobfs. This will allow the pending notifications to be
// delivered.
ZX_ASSERT_MSG(!has_clones(), "Cannot put blob in low memory state as its mapped via clones.");
pager_reference = FreePagedVmo();
loader_info_ = LoaderInfo(); // Release the verifiers and associated Merkle data.
}
// When the pager_reference goes out of scope here, it could delete |this|.
}
Blob::~Blob() { ActivateLowMemory(); }
fs::VnodeProtocolSet Blob::GetProtocols() const { return fs::VnodeProtocol::kFile; }
bool Blob::ValidateRights(fs::Rights rights) const {
// To acquire write access to a blob, it must be empty.
//
// TODO(fxbug.dev/67659) If we run FIDL on multiple threads (we currently don't) there is a race
// condition here where another thread could start writing at the same time. Decide whether we
// support FIDL from multiple threads and if so, whether this condition is important.
std::lock_guard lock(mutex_);
return !rights.write || state() == BlobState::kEmpty;
}
zx_status_t Blob::GetNodeInfoForProtocol([[maybe_unused]] fs::VnodeProtocol protocol,
[[maybe_unused]] fs::Rights rights,
fs::VnodeRepresentation* info) {
std::lock_guard lock(mutex_);
zx::event observer;
if (zx_status_t status = GetReadableEvent(&observer); status != ZX_OK) {
return status;
}
*info = fs::VnodeRepresentation::File{.observer = std::move(observer)};
return ZX_OK;
}
zx_status_t Blob::Read(void* data, size_t len, size_t off, size_t* out_actual) {
TRACE_DURATION("blobfs", "Blob::Read", "len", len, "off", off);
return blobfs_->node_operations().read.Track(
[&] { return ReadInternal(data, len, off, out_actual); });
}
zx_status_t Blob::Write(const void* data, size_t len, size_t offset, size_t* out_actual) {
TRACE_DURATION("blobfs", "Blob::Write", "len", len, "off", offset);
return blobfs_->node_operations().write.Track([&] {
std::lock_guard lock(mutex_);
*out_actual = 0;
if (state() == BlobState::kDataWrite && offset != write_info_->bytes_written) {
FX_LOGS(ERROR) << "only append is currently supported (requested_offset: " << offset
<< ", expected: " << write_info_->bytes_written << ")";
return ZX_ERR_NOT_SUPPORTED;
}
zx_status_t status = WriteInternal(data, len, out_actual);
// Fuse any write errors for the next call as higher layers (e.g. zxio) may treat write errors
// as short writes.
if (status != ZX_OK) {
LatchWriteError(status);
}
return status;
});
}
zx_status_t Blob::Append(const void* data, size_t len, size_t* out_end, size_t* out_actual) {
TRACE_DURATION("blobfs", "Blob::Append", "len", len);
return blobfs_->node_operations().append.Track([&] {
std::lock_guard lock(mutex_);
*out_actual = 0;
zx_status_t status = WriteInternal(data, len, out_actual);
// Fuse any write errors for the next call as higher layers (e.g. zxio) may treat write errors
// as short writes.
if (status != ZX_OK) {
LatchWriteError(status);
} else if (state() == BlobState::kDataWrite) {
ZX_DEBUG_ASSERT(write_info_ != nullptr);
*out_end = write_info_->bytes_written;
} else {
*out_end = blob_size_;
}
return status;
});
}
zx_status_t Blob::GetAttributes(fs::VnodeAttributes* a) {
TRACE_DURATION("blobfs", "Blob::GetAttributes");
return blobfs_->node_operations().get_attr.Track([&] {
// FileSize() expects to be called outside the lock.
auto content_size = FileSize();
std::lock_guard lock(mutex_);
*a = fs::VnodeAttributes();
a->mode = V_TYPE_FILE | V_IRUSR | V_IXUSR;
a->inode = map_index_;
a->content_size = content_size;
a->storage_size = block_count_ * GetBlockSize();
a->link_count = 1;
a->creation_time = 0;
a->modification_time = 0;
return ZX_OK;
});
}
zx_status_t Blob::Truncate(size_t len) {
TRACE_DURATION("blobfs", "Blob::Truncate", "len", len);
return blobfs_->node_operations().truncate.Track([&] {
return PrepareWrite(len, blobfs_->ShouldCompress() && len > kCompressionSizeThresholdBytes);
});
}
#ifdef __Fuchsia__
zx::status<std::string> Blob::GetDevicePath() const { return blobfs_->Device()->GetDevicePath(); }
zx_status_t Blob::GetVmo(fuchsia_io::wire::VmoFlags flags, zx::vmo* out_vmo) {
static_assert(sizeof flags == sizeof(uint32_t),
"Underlying type of |flags| has changed, update conversion below.");
TRACE_DURATION("blobfs", "Blob::GetVmo", "flags", static_cast<uint32_t>(flags));
std::lock_guard lock(mutex_);
// Only expect this to be called when the blob is open. The fidl API guarantees this but tests
// can easily forget to open the blob before getting the VMO.
ZX_DEBUG_ASSERT(open_count() > 0);
if (flags & fuchsia_io::wire::VmoFlags::kWrite) {
return ZX_ERR_NOT_SUPPORTED;
} else if (flags & fuchsia_io::wire::VmoFlags::kSharedBuffer) {
return ZX_ERR_NOT_SUPPORTED;
}
// Let clients map and set the names of their VMOs.
zx_rights_t rights = ZX_RIGHTS_BASIC | ZX_RIGHT_MAP | ZX_RIGHTS_PROPERTY;
// We can ignore VmoFlags::PRIVATE_CLONE since private / shared access to the underlying VMO can
// both be satisfied with a clone due to the immutability of blobfs blobs.
rights |= (flags & fuchsia_io::wire::VmoFlags::kRead) ? ZX_RIGHT_READ : 0;
rights |= (flags & fuchsia_io::wire::VmoFlags::kExecute) ? ZX_RIGHT_EXECUTE : 0;
return CloneDataVmo(rights, out_vmo);
}
#endif // defined(__Fuchsia__)
void Blob::Sync(SyncCallback on_complete) {
// This function will issue its callbacks on either the current thread or the journal thread. The
// vnode interface says this is OK.
TRACE_DURATION("blobfs", "Blob::Sync");
auto event = blobfs_->node_operations().sync.NewEvent();
// Wraps `on_complete` to record the result into `event` as well.
SyncCallback completion_callback = [on_complete = std::move(on_complete),
event = std::move(event)](zx_status_t status) mutable {
on_complete(status);
event.SetStatus(status);
};
SyncingState state;
{
std::scoped_lock guard(mutex_);
state = syncing_state_;
}
switch (state) {
case SyncingState::kDataIncomplete: {
// It doesn't make sense to sync a partial blob since it can't have its proper
// content-addressed name without all the data.
completion_callback(ZX_ERR_BAD_STATE);
break;
}
case SyncingState::kSyncing: {
// The blob data is complete. When this happens the Blob object will automatically write its
// metadata, but it may not get flushed for some time. This call both encourages the sync to
// happen "soon" and provides a way to get notified when it does.
auto trace_id = TRACE_NONCE();
TRACE_FLOW_BEGIN("blobfs", "Blob.sync", trace_id);
blobfs_->Sync(std::move(completion_callback));
break;
}
case SyncingState::kDone: {
// All metadata has already been synced. Calling Sync() is a no-op.
completion_callback(ZX_OK);
break;
}
}
}
// This function will get called on an arbitrary pager worker thread.
void Blob::VmoRead(uint64_t offset, uint64_t length) {
TRACE_DURATION("blobfs", "Blob::VmoRead", "offset", offset, "length", length);
// It's important that this function use only a shared read lock. This is for performance (to
// allow multiple page requests to be run in parallel) and to prevent deadlock with the non-paged
// Read() path. The non-paged path is implemented by reading from the vmo which will recursively
// call into this code and taking an exclusive lock would deadlock.
fs::SharedLock lock(mutex_);
if (!paged_vmo()) {
// Races with calling FreePagedVmo() on another thread can result in stale read requests. Ignore
// them if the VMO is gone.
return;
}
ZX_DEBUG_ASSERT(IsDataLoaded());
std::optional vfs_opt = vfs();
ZX_ASSERT(vfs_opt.has_value());
fs::PagedVfs& vfs = vfs_opt.value().get();
if (is_corrupt_) {
FX_LOGS(ERROR) << "Blobfs failing page request because blob was previously found corrupt.";
if (auto error_result = vfs.ReportPagerError(paged_vmo(), offset, length, ZX_ERR_BAD_STATE);
error_result.is_error()) {
FX_LOGS(ERROR) << "Failed to report pager error to kernel: " << error_result.status_string();
}
return;
}
auto page_supplier = PageLoader::PageSupplier(
[&vfs, &dest_vmo = paged_vmo()](uint64_t offset, uint64_t length, const zx::vmo& aux_vmo,
uint64_t aux_offset) {
return vfs.SupplyPages(dest_vmo, offset, length, aux_vmo, aux_offset);
});
PagerErrorStatus pager_error_status =
blobfs_->page_loader().TransferPages(page_supplier, offset, length, loader_info_);
if (pager_error_status != PagerErrorStatus::kOK) {
FX_LOGS(ERROR) << "Pager failed to transfer pages to the blob, error: "
<< zx_status_get_string(static_cast<zx_status_t>(pager_error_status));
if (auto error_result = vfs.ReportPagerError(paged_vmo(), offset, length,
static_cast<zx_status_t>(pager_error_status));
error_result.is_error()) {
FX_LOGS(ERROR) << "Failed to report pager error to kernel: " << error_result.status_string();
}
// We've signaled a failure and unblocked outstanding page requests for this range. If the pager
// error was a verification error, fail future requests as well - we should not service further
// page requests on a corrupt blob.
//
// Note that we cannot simply detach the VMO from the pager here. There might be outstanding
// page requests which have been queued but are yet to be serviced. These need to be handled
// correctly - if the VMO is detached, there will be no way for us to communicate failure to
// the kernel, since zx_pager_op_range() requires a valid pager VMO handle. Without being able
// to make a call to zx_pager_op_range() to indicate a failed page request, the faulting thread
// would hang indefinitely.
if (pager_error_status == PagerErrorStatus::kErrDataIntegrity)
is_corrupt_ = true;
}
}
bool Blob::HasReferences() const { return open_count() > 0 || has_clones(); }
void Blob::CompleteSync() {
// Called on the journal thread when the syncing is complete.
{
std::scoped_lock guard(mutex_);
syncing_state_ = SyncingState::kDone;
}
}
void Blob::WillTeardownFilesystem() {
// Be careful to release the pager reference outside the lock.
fbl::RefPtr<fs::Vnode> pager_reference;
{
std::lock_guard lock(mutex_);
pager_reference = FreePagedVmo();
}
// When pager_reference goes out of scope here, it could cause |this| to be deleted.
}
zx_status_t Blob::OpenNode([[maybe_unused]] ValidatedOptions options,
fbl::RefPtr<Vnode>* out_redirect) {
std::lock_guard lock(mutex_);
if (IsDataLoaded() && open_count() == 1) {
// Just went from an unopened node that already had data to an opened node (the open_count()
// reflects the new state).
//
// This normally means that the node was closed but cached, and we're not re-opening it. This
// means we have to mark things as being open and register for the corresponding notifications.
//
// It's also possible to get in this state if there was a memory mapping for a file that
// was otherwise closed. In that case we don't need to do anything but the operations here
// can be performed multiple times with no bad effects. Avoiding these calls in the "mapped but
// opened" state would mean checking for no mappings which bundles this code more tightly to
// the HasReferences() implementation that is better avoided.
SetPagedVmoName(true);
}
return ZX_OK;
}
zx_status_t Blob::CloseNode() {
TRACE_DURATION("blobfs", "Blob::CloseNode");
return blobfs_->node_operations().close.Track([&] {
std::lock_guard lock(mutex_);
if (paged_vmo() && !HasReferences()) {
// Mark the name to help identify the VMO is unused.
SetPagedVmoName(false);
// Hint that the VMO's pages are no longer needed, and can be evicted under memory pressure.
// If a page is accessed again, it will lose the hint.
zx_status_t status = paged_vmo().op_range(ZX_VMO_OP_DONT_NEED, 0, blob_size_, nullptr, 0);
if (status != ZX_OK) {
FX_LOGS(WARNING) << "Hinting DONT_NEED on blob " << digest()
<< " failed: " << zx_status_get_string(status);
}
}
// Attempt purge in case blob was unlinked prior to close.
return TryPurge();
});
}
zx_status_t Blob::TryPurge() {
if (Purgeable()) {
return Purge();
}
return ZX_OK;
}
zx_status_t Blob::Purge() {
ZX_DEBUG_ASSERT(Purgeable());
if (state_ == BlobState::kReadable) {
// A readable blob should only be purged if it has been unlinked.
ZX_ASSERT_MSG(deletable_, "Should not purge blob which is not unlinked.");
BlobTransaction transaction;
if (zx_status_t status = blobfs_->FreeInode(map_index_, transaction); status != ZX_OK)
return status;
transaction.Commit(*blobfs_->GetJournal());
}
// If the blob is in the error state, it should have already been evicted from
// the cache (see LatchWriteError).
if (state_ != BlobState::kError) {
if (zx_status_t status = GetCache().Evict(fbl::RefPtr(this)); status != ZX_OK)
return status;
}
set_state(BlobState::kPurged);
return ZX_OK;
}
uint64_t Blob::GetBlockSize() const { return blobfs_->Info().block_size; }
void Blob::SetPagedVmoName(bool active) {
VmoNameBuffer name =
active ? FormatBlobDataVmoName(digest()) : FormatInactiveBlobDataVmoName(digest());
// Ignore failures, the name is for informational purposes only.
paged_vmo().set_property(ZX_PROP_NAME, name.data(), name.size());
}
zx_status_t Blob::InitializeBlobLayout() {
ZX_DEBUG_ASSERT(blob_size_ > 0);
ZX_DEBUG_ASSERT(write_info_->data_size > 0);
zx::status blob_layout_or = BlobLayout::CreateFromSizes(
GetBlobLayoutFormat(blobfs_->Info()), blob_size_, write_info_->data_size, kBlobfsBlockSize);
if (blob_layout_or.is_error()) {
FX_LOGS(ERROR) << "Failed to create blob layout: " << blob_layout_or.status_string();
return blob_layout_or.status_value();
}
write_info_->blob_layout = std::move(blob_layout_or.value());
return ZX_OK;
}
zx_status_t Blob::InitializeMerkleBuffer() {
ZX_DEBUG_ASSERT(blob_size_ > 0);
ZX_DEBUG_ASSERT(write_info_->merkle_tree_buffer == nullptr);
write_info_->merkle_tree_creator.SetUseCompactFormat(
ShouldUseCompactMerkleTreeFormat(GetBlobLayoutFormat(blobfs_->Info())));
zx_status_t status = write_info_->merkle_tree_creator.SetDataLength(blob_size_);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to set Merkle tree data length to " << blob_size_
<< " bytes: " << zx_status_get_string(status);
return status;
}
const size_t tree_len = write_info_->merkle_tree_creator.GetTreeLength();
// Allow for zero padding before and after.
write_info_->merkle_tree_buffer = std::make_unique<uint8_t[]>(tree_len + GetBlockSize());
status =
write_info_->merkle_tree_creator.SetTree(write_info_->merkle_tree(GetBlockSize()), tree_len,
&write_info_->digest, sizeof write_info_->digest);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to set Merkle tree data length to " << blob_size_
<< " bytes: " << zx_status_get_string(status);
return status;
}
return ZX_OK;
}
zx_status_t Blob::StreamBufferedData(uint64_t buff_pos) {
// Ensure we allocated space for the blob before writing it to disk.
if (write_info_->blob_layout == nullptr) {
zx_status_t status = InitializeBlobLayout();
if (status != ZX_OK) {
return status;
}
status = SpaceAllocate();
if (status != ZX_OK) {
return status;
}
}
ZX_DEBUG_ASSERT(buff_pos >= write_info_->bytes_persisted);
ZX_DEBUG_ASSERT(buff_pos <= write_info_->data_size);
const uint64_t buffered_bytes = buff_pos - write_info_->bytes_persisted;
// Write as many block-aligned bytes from the buffer to disk as we can.
if (buffered_bytes >= (kCacheFlushThreshold * GetBlockSize())) {
const uint64_t write_amount = fbl::round_down(buffered_bytes, GetBlockSize());
ZX_DEBUG_ASSERT(write_info_->bytes_persisted % GetBlockSize() == 0);
const uint64_t start_block =
write_info_->blob_layout->DataBlockOffset() + write_info_->bytes_persisted / GetBlockSize();
const uint8_t* buffer = static_cast<const uint8_t*>(write_info_->buffer.start());
SimpleBlobDataProducer data({buffer + write_info_->bytes_persisted, write_amount});
zx_status_t status =
WriteData(write_amount / GetBlockSize(), start_block, data, *write_info_->streamer);
if (status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to stream blob data to disk: " << zx_status_get_string(status);
return status;
}
// Ensure data is copied into writeback cache so we can decommit those pages from the buffer.
write_info_->streamer->IssueOperations();
write_info_->bytes_persisted += write_amount;
// Decommit now unused pages from the buffer.
const uint64_t page_aligned_offset =
fbl::round_down(write_info_->bytes_persisted, kSystemPageSize);
status = zx_vmo_op_range(write_info_->buffer.vmo().get(), ZX_VMO_OP_DECOMMIT, 0,
page_aligned_offset, nullptr, 0);
if (status != ZX_OK) {
return status;
}
}
// To simplify the Commit logic when using the deprecated format (Merkle tree at beginning), if we
// received all data for the blob, enqueue the remaining data so we only have the Merkle tree left
// to write to disk. This ensures Commit only has to deal with contiguous chunks of data.
if (buff_pos == write_info_->data_size &&
write_info_->blob_layout->Format() == BlobLayoutFormat::kDeprecatedPaddedMerkleTreeAtStart) {
if (zx_status_t status = WriteRemainingDataForDeprecatedFormat(); status != ZX_OK) {
return status;
}
ZX_DEBUG_ASSERT(write_info_->bytes_persisted == write_info_->data_size);
}
return ZX_OK;
}
zx_status_t Blob::WriteRemainingDataForDeprecatedFormat() {
ZX_DEBUG_ASSERT(write_info_->blob_layout->Format() ==
BlobLayoutFormat::kDeprecatedPaddedMerkleTreeAtStart);
if (write_info_->bytes_persisted < write_info_->data_size) {
// All data persisted so far should have been block aligned.
ZX_DEBUG_ASSERT(write_info_->bytes_persisted % GetBlockSize() == 0);
const size_t remaining_bytes = write_info_->data_size - write_info_->bytes_persisted;
const size_t remaining_bytes_aligned = fbl::round_up(remaining_bytes, GetBlockSize());
const uint64_t block_count = remaining_bytes_aligned / GetBlockSize();
const uint64_t block_offset = write_info_->blob_layout->DataBlockOffset() +
(write_info_->bytes_persisted / GetBlockSize());
const uint8_t* buff = static_cast<const uint8_t*>(write_info_->buffer.start());
// The data buffer is already padded to ensure it's a multiple of the block size.
SimpleBlobDataProducer data({buff + write_info_->bytes_persisted, remaining_bytes_aligned});
if (zx_status_t status = WriteData(block_count, block_offset, data, *write_info_->streamer);
status != ZX_OK) {
FX_LOGS(ERROR) << "Failed to write final block to disk: " << zx_status_get_string(status);
return status;
}
write_info_->bytes_persisted += remaining_bytes;
}
ZX_DEBUG_ASSERT(write_info_->bytes_persisted == write_info_->data_size);
// We've now persisted all of the blob's data to disk. The only remaining thing to write out is
// the Merkle tree, which is at the first block, so we need to reset the block iterator before
// writing any more data to disk.
write_info_->block_iter =
BlockIterator(std::make_unique<VectorExtentIterator>(write_info_->extents));
return ZX_OK;
}
zx::status<bool> Blob::InitializeDecompressor(size_t buff_pos) {
// Try to load the seek table and initialize the decompressor.
chunked_compression::HeaderReader reader;
// We don't know the size of the file yet, so we have to pass the max value of a size_t.
// We validate the maximum chunk size below to prevent any potential memory exhaustion.
const chunked_compression::Status status = reader.Parse(
write_info_->buffer.start(), buff_pos, write_info_->data_size, &write_info_->seek_table);
// If we don't have enough data to read the seek table yet, wait until we have more data.
if (status == chunked_compression::kStatusErrBufferTooSmall) {
return zx::ok(false);
}
if (status != chunked_compression::kStatusOk) {
return zx::error(chunked_compression::ToZxStatus(status));
}
if (write_info_->seek_table.Entries().empty()) {
FX_LOGS(ERROR) << "Decoded seek table has no entries!";
return zx::error(ZX_ERR_OUT_OF_RANGE);
}
// The StreamingChunkedDecompressor decommits chunks as they are decompressed, so we just need to
// ensure the maximum decompressed chunk size does not exceed our set upper bound.
using chunked_compression::SeekTableEntry;
const size_t largest_decompressed_size =
std::max_element(write_info_->seek_table.Entries().begin(),
write_info_->seek_table.Entries().end(),
[](const SeekTableEntry& a, const SeekTableEntry& b) {
return a.decompressed_size < b.decompressed_size;
})
->decompressed_size;
if (largest_decompressed_size > kMaxDecompressionMemoryUsage) {
FX_LOGS(ERROR) << "Largest seek table entry (decompressed size = " << largest_decompressed_size
<< ") exceeds set memory consumption limit (" << kMaxDecompressionMemoryUsage
<< ")!";
return zx::error(ZX_ERR_NO_MEMORY);
}
// Set blob size using decompressed length, initialize Merkle tree buffers/streaming decompressor.
blob_size_ = write_info_->seek_table.DecompressedSize();
if (zx_status_t status = InitializeMerkleBuffer(); status != ZX_OK) {
return zx::error(status);
}
zx::status decompressor_or = StreamingChunkedDecompressor::Create(
*blobfs_->decompression_connector(), write_info_->seek_table,
[&merkle_tree_creator =
write_info_->merkle_tree_creator](cpp20::span<const uint8_t> data) -> zx::status<> {
if (zx_status_t status = merkle_tree_creator.Append(data.data(), data.size());
status != ZX_OK) {
FX_LOGS(ERROR) << "MerkleTreeCreator::Append failed: " << zx_status_get_string(status);
return zx::error(status);
}
return zx::ok();
});
if (decompressor_or.is_error()) {
return zx::error(decompressor_or.error_value());
}
write_info_->streaming_decompressor = std::move(decompressor_or.value());
return zx::ok(true);
}
} // namespace blobfs