[blobfs][vnode] Move Vnode code to vnode.cpp
Vnode implementation was in the Blobfs VFS
source file. Found this out when looking
for the constructor.
TEST=blobfs-integration-tests
Change-Id: I06f0ebf8f47d508b4da3b055c5368bcb3413ca25
diff --git a/system/ulib/blobfs/blobfs.cpp b/system/ulib/blobfs/blobfs.cpp
index 8f47405..fca567e 100644
--- a/system/ulib/blobfs/blobfs.cpp
+++ b/system/ulib/blobfs/blobfs.cpp
@@ -14,11 +14,6 @@
#include <blobfs/blobfs.h>
#include <blobfs/extent-reserver.h>
-#include <blobfs/iterator/allocated-extent-iterator.h>
-#include <blobfs/iterator/block-iterator.h>
-#include <blobfs/iterator/extent-iterator.h>
-#include <blobfs/iterator/node-populator.h>
-#include <blobfs/iterator/vector-extent-iterator.h>
#include <blobfs/lz4.h>
#include <blobfs/node-reserver.h>
#include <digest/digest.h>
@@ -117,758 +112,8 @@
return ZX_OK;
}
-// A wrapper around "Enqueue" for content which risks being larger
-// than the writeback buffer.
-//
-// For content which is smaller than 3/4 the size of the writeback buffer: the
-// content is enqueued to |work| without flushing.
-//
-// For content which is larger than 3/4 the size of the writeback buffer: flush
-// the data by enqueueing it to the writeback thread in chunks until the
-// remainder is small enough to comfortably fit within the writeback buffer.
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work, Blobfs* blobfs, VnodeBlob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- const size_t kMaxChunkBlocks = (3 * blobfs->WritebackCapacity()) / 4;
- uint64_t delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- while (nblocks > 0) {
- (*work)->Enqueue(vmo, relative_block, absolute_block, delta_blocks);
- relative_block += delta_blocks;
- absolute_block += delta_blocks;
- nblocks -= delta_blocks;
- delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- if (nblocks) {
- fbl::unique_ptr<WritebackWork> tmp;
- zx_status_t status = blobfs->CreateWork(&tmp, vn);
- if (status != ZX_OK) {
- return status;
- }
- if ((status = blobfs->EnqueueWork(std::move(*work), EnqueueType::kData)) != ZX_OK) {
- return status;
- }
- *work = std::move(tmp);
- }
- }
- return ZX_OK;
-}
-
} // namespace
-zx_status_t VnodeBlob::Verify() const {
- TRACE_DURATION("blobfs", "Blobfs::Verify");
- fs::Ticker ticker(blobfs_->CollectingMetrics());
-
- const void* data = inode_.blob_size ? GetData() : nullptr;
- const void* tree = inode_.blob_size ? GetMerkle() : nullptr;
- const uint64_t data_size = inode_.blob_size;
- const uint64_t merkle_size = MerkleTree::GetTreeLength(data_size);
- // TODO(smklein): We could lazily verify more of the VMO if
- // we could fault in pages on-demand.
- //
- // For now, we aggressively verify the entire VMO up front.
- Digest digest;
- digest = reinterpret_cast<const uint8_t*>(&digest_[0]);
- zx_status_t status = MerkleTree::Verify(data, data_size, tree,
- merkle_size, 0, data_size, digest);
- blobfs_->UpdateMerkleVerifyMetrics(data_size, merkle_size, ticker.End());
-
- if (status != ZX_OK) {
- char name[Digest::kLength * 2 + 1];
- ZX_ASSERT(digest.ToString(name, sizeof(name)) == ZX_OK);
- FS_TRACE_ERROR("blobfs verify(%s) Failure: %s\n", name, zx_status_get_string(status));
- }
-
- return status;
-}
-
-zx_status_t VnodeBlob::InitVmos() {
- TRACE_DURATION("blobfs", "Blobfs::InitVmos");
-
- if (mapping_.vmo()) {
- return ZX_OK;
- }
-
- uint64_t data_blocks = BlobDataBlocks(inode_);
- uint64_t merkle_blocks = MerkleTreeBlocks(inode_);
- uint64_t num_blocks = data_blocks + merkle_blocks;
-
- if (num_blocks == 0) {
- // No need to initialize VMO for null blob.
- return ZX_OK;
- }
-
- // Reverts blob back to uninitialized state on error.
- auto cleanup = fbl::MakeAutoCall([this]() { BlobCloseHandles(); });
-
- size_t vmo_size;
- if (mul_overflow(num_blocks, kBlobfsBlockSize, &vmo_size)) {
- FS_TRACE_ERROR("Multiplication overflow");
- return ZX_ERR_OUT_OF_RANGE;
- }
-
- zx_status_t status = mapping_.CreateAndMap(vmo_size, "blob");
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Failed to initialize vmo; error: %d\n", status);
- return status;
- }
- if ((status = blobfs_->AttachVmo(mapping_.vmo(), &vmoid_)) != ZX_OK) {
- FS_TRACE_ERROR("Failed to attach VMO to block device; error: %d\n", status);
- return status;
- }
-
- if ((inode_.header.flags & kBlobFlagLZ4Compressed) != 0) {
- if ((status = InitCompressed()) != ZX_OK) {
- return status;
- }
- } else {
- if ((status = InitUncompressed()) != ZX_OK) {
- return status;
- }
- }
- if ((status = Verify()) != ZX_OK) {
- return status;
- }
-
- cleanup.cancel();
- return ZX_OK;
-}
-
-zx_status_t VnodeBlob::InitCompressed() {
- TRACE_DURATION("blobfs", "Blobfs::InitCompressed", "size", inode_.blob_size,
- "blocks", inode_.block_count);
- fs::Ticker ticker(blobfs_->CollectingMetrics());
- fs::ReadTxn txn(blobfs_);
- uint32_t merkle_blocks = MerkleTreeBlocks(inode_);
-
- fzl::OwnedVmoMapper compressed_mapper;
- uint32_t compressed_blocks = (inode_.block_count - merkle_blocks);
- size_t compressed_size;
- if (mul_overflow(compressed_blocks, kBlobfsBlockSize, &compressed_size)) {
- FS_TRACE_ERROR("Multiplication overflow\n");
- return ZX_ERR_OUT_OF_RANGE;
- }
- zx_status_t status = compressed_mapper.CreateAndMap(compressed_size, "compressed-blob");
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Failed to initialized compressed vmo; error: %d\n", status);
- return status;
- }
- vmoid_t compressed_vmoid;
- status = blobfs_->AttachVmo(compressed_mapper.vmo(), &compressed_vmoid);
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Failed to attach commpressed VMO to blkdev: %d\n", status);
- return status;
- }
-
- auto detach = fbl::MakeAutoCall([this, &compressed_vmoid]() {
- blobfs_->DetachVmo(compressed_vmoid);
- });
-
- const uint64_t kDataStart = DataStartBlock(blobfs_->info_);
- AllocatedExtentIterator extent_iter(blobfs_->allocator_.get(), GetMapIndex());
- BlockIterator block_iter(&extent_iter);
-
- // Read the uncompressed merkle tree into the start of the blob's VMO.
- status = StreamBlocks(&block_iter, merkle_blocks,
- [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
- txn.Enqueue(vmoid_, vmo_offset, dev_offset + kDataStart, length);
- return ZX_OK;
- });
- if (status != ZX_OK) {
- return status;
- }
-
- // Read the compressed blocks into the compressed VMO, accounting for the merkle blocks
- // which have already been seen.
- ZX_DEBUG_ASSERT(block_iter.BlockIndex() == merkle_blocks);
-
- status = StreamBlocks(&block_iter, compressed_blocks,
- [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
- txn.Enqueue(compressed_vmoid, vmo_offset - merkle_blocks, dev_offset + kDataStart,
- length);
- return ZX_OK;
- });
-
- if (status != ZX_OK) {
- return status;
- }
-
- if ((status = txn.Transact()) != ZX_OK) {
- FS_TRACE_ERROR("Failed to flush read transaction: %d\n", status);
- return status;
- }
-
- fs::Duration read_time = ticker.End();
- ticker.Reset();
-
- // Decompress the compressed data into the target buffer.
- size_t target_size = inode_.blob_size;
- status = Decompressor::Decompress(GetData(), &target_size,
- compressed_mapper.start(), &compressed_size);
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Failed to decompress data: %d\n", status);
- return status;
- } else if (target_size != inode_.blob_size) {
- FS_TRACE_ERROR("Failed to fully decompress blob (%zu of %zu expected)\n",
- target_size, inode_.blob_size);
- return ZX_ERR_IO_DATA_INTEGRITY;
- }
-
- blobfs_->UpdateMerkleDecompressMetrics((compressed_blocks) * kBlobfsBlockSize,
- inode_.blob_size, read_time, ticker.End());
- return ZX_OK;
-}
-
-zx_status_t VnodeBlob::InitUncompressed() {
- TRACE_DURATION("blobfs", "Blobfs::InitUncompressed", "size", inode_.blob_size,
- "blocks", inode_.block_count);
- fs::Ticker ticker(blobfs_->CollectingMetrics());
- fs::ReadTxn txn(blobfs_);
- AllocatedExtentIterator extent_iter(blobfs_->allocator_.get(), GetMapIndex());
- BlockIterator block_iter(&extent_iter);
- // Read both the uncompressed merkle tree and data.
- const uint64_t blob_data_blocks = BlobDataBlocks(inode_);
- const uint64_t merkle_blocks = MerkleTreeBlocks(inode_);
- if (blob_data_blocks + merkle_blocks > std::numeric_limits<uint32_t>::max()) {
- return ZX_ERR_IO_DATA_INTEGRITY;
- }
- const uint32_t length = static_cast<uint32_t>(blob_data_blocks + merkle_blocks);
- const uint64_t data_start = DataStartBlock(blobfs_->info_);
- zx_status_t status = StreamBlocks(&block_iter, length,
- [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
- txn.Enqueue(vmoid_, vmo_offset, dev_offset + data_start, length);
- return ZX_OK;
- });
-
- if (status != ZX_OK) {
- return status;
- }
-
- status = txn.Transact();
- if (status != ZX_OK) {
- return status;
- }
- blobfs_->UpdateMerkleDiskReadMetrics(length * kBlobfsBlockSize, ticker.End());
- return status;
-}
-
-void VnodeBlob::PopulateInode(uint32_t node_index) {
- ZX_DEBUG_ASSERT(map_index_ == 0);
- SetState(kBlobStateReadable);
- map_index_ = node_index;
- Inode* inode = blobfs_->GetNode(node_index);
- inode_ = *inode;
-}
-
-uint64_t VnodeBlob::SizeData() const {
- if (GetState() == kBlobStateReadable) {
- return inode_.blob_size;
- }
- return 0;
-}
-
-VnodeBlob::VnodeBlob(Blobfs* bs, const Digest& digest)
- : blobfs_(bs),
- flags_(kBlobStateEmpty), syncing_(false), clone_watcher_(this) {
- digest.CopyTo(digest_, sizeof(digest_));
-}
-
-VnodeBlob::VnodeBlob(Blobfs* bs)
- : blobfs_(bs),
- flags_(kBlobStateEmpty | kBlobFlagDirectory),
- syncing_(false), clone_watcher_(this) {}
-
-void VnodeBlob::BlobCloseHandles() {
- mapping_.Reset();
- readable_event_.reset();
-}
-
-zx_status_t VnodeBlob::SpaceAllocate(uint64_t size_data) {
- TRACE_DURATION("blobfs", "Blobfs::SpaceAllocate", "size_data", size_data);
- fs::Ticker ticker(blobfs_->CollectingMetrics());
-
- if (GetState() != kBlobStateEmpty) {
- return ZX_ERR_BAD_STATE;
- }
-
- auto write_info = fbl::make_unique<WritebackInfo>();
-
- // Initialize the inode with known fields.
- memset(inode_.merkle_root_hash, 0, Digest::kLength);
- inode_.blob_size = size_data;
- inode_.block_count = MerkleTreeBlocks(inode_) + static_cast<uint32_t>(BlobDataBlocks(inode_));
-
- // Special case for the null blob: We skip the write phase.
- if (inode_.blob_size == 0) {
- zx_status_t status = blobfs_->ReserveNodes(1, &write_info->node_indices);
- if (status != ZX_OK) {
- return status;
- }
- map_index_ = write_info->node_indices[0].index();
- write_info_ = std::move(write_info);
-
- if ((status = Verify()) != ZX_OK) {
- return status;
- }
- SetState(kBlobStateDataWrite);
- if ((status = WriteMetadata()) != ZX_OK) {
- FS_TRACE_ERROR("Null blob metadata fail: %d\n", status);
- return status;
- }
- return ZX_OK;
- }
-
- fbl::Vector<ReservedExtent> extents;
- fbl::Vector<ReservedNode> nodes;
-
- // Reserve space for the blob.
- zx_status_t status = blobfs_->ReserveBlocks(inode_.block_count, &extents);
- if (status != ZX_OK) {
- return status;
- }
- if (extents.size() > kMaxBlobExtents) {
- FS_TRACE_ERROR("Error: Block reservation requires too many extents (%zu vs %zu max)\n",
- extents.size(), kMaxBlobExtents);
- return ZX_ERR_BAD_STATE;
- }
- const ExtentCountType extent_count = static_cast<ExtentCountType>(extents.size());
-
- // Reserve space for all the nodes necessary to contain this blob.
- size_t node_count = NodePopulator::NodeCountForExtents(extent_count);
- status = blobfs_->ReserveNodes(node_count, &nodes);
- if (status != ZX_OK) {
- return status;
- }
-
- if (inode_.blob_size >= kCompressionMinBytesSaved) {
- size_t max = write_info->compressor.BufferMax(inode_.blob_size);
- status = write_info->compressed_blob.CreateAndMap(max, "compressed-blob");
- if (status != ZX_OK) {
- return status;
- }
- status = write_info->compressor.Initialize(write_info->compressed_blob.start(),
- write_info->compressed_blob.size());
- if (status != ZX_OK) {
- FS_TRACE_ERROR("blobfs: Failed to initialize compressor: %d\n", status);
- return status;
- }
- }
-
- // Open VMOs, so we can begin writing after allocate succeeds.
- fzl::OwnedVmoMapper mapping;
- if ((status = mapping.CreateAndMap(inode_.block_count * kBlobfsBlockSize, "blob")) != ZX_OK) {
- return status;
- }
- if ((status = blobfs_->AttachVmo(mapping.vmo(), &vmoid_)) != ZX_OK) {
- return status;
- }
-
- map_index_ = nodes[0].index();
- mapping_ = std::move(mapping);
- write_info->extents = std::move(extents);
- write_info->node_indices = std::move(nodes);
- write_info_ = std::move(write_info);
-
- SetState(kBlobStateDataWrite);
- blobfs_->UpdateAllocationMetrics(size_data, ticker.End());
- return ZX_OK;
-}
-
-void* VnodeBlob::GetData() const {
- return fs::GetBlock(kBlobfsBlockSize, mapping_.start(), MerkleTreeBlocks(inode_));
-}
-
-void* VnodeBlob::GetMerkle() const {
- return mapping_.start();
-}
-
-zx_status_t VnodeBlob::WriteMetadata() {
- TRACE_DURATION("blobfs", "Blobfs::WriteMetadata");
- assert(GetState() == kBlobStateDataWrite);
-
- zx_status_t status;
- fbl::unique_ptr<WritebackWork> wb;
- if ((status = blobfs_->CreateWork(&wb, this)) != ZX_OK) {
- return status;
- }
-
- // Update the on-disk hash.
- memcpy(inode_.merkle_root_hash, &digest_[0], Digest::kLength);
-
- // All data has been written to the containing VMO.
- SetState(kBlobStateReadable);
- if (readable_event_.is_valid()) {
- status = readable_event_.signal(0u, ZX_USER_SIGNAL_0);
- if (status != ZX_OK) {
- SetState(kBlobStateError);
- return status;
- }
- }
-
- atomic_store(&syncing_, true);
-
- if (inode_.block_count) {
- // We utilize the NodePopulator class to take our reserved blocks and nodes and fill the
- // persistent map with an allocated inode / container.
-
- // If |on_node| is invoked on a node, it means that node was necessary to represent this
- // blob. Persist the node back to durable storge.
- auto on_node = [this, &wb](const ReservedNode& node) {
- blobfs_->PersistNode(wb.get(), node.index());
- };
-
- // If |on_extent| is invoked on an extent, it was necessary to represent this blob. Persist
- // the allocation of these blocks back to durable storage.
- //
- // Additionally, because of the compression feature of blobfs, it is possible we reserved
- // more extents than this blob ended up using. Decrement |remaining_blocks| to track if we
- // should exit early.
- size_t remaining_blocks = inode_.block_count;
- auto on_extent = [this, &wb, &remaining_blocks](ReservedExtent& extent) {
- ZX_DEBUG_ASSERT(remaining_blocks > 0);
- if (remaining_blocks >= extent.extent().Length()) {
- // Consume the entire extent.
- remaining_blocks -= extent.extent().Length();
- } else {
- // Consume only part of the extent; we're done iterating.
- extent.SplitAt(static_cast<BlockCountType>(remaining_blocks));
- remaining_blocks = 0;
- }
- blobfs_->PersistBlocks(wb.get(), extent);
- if (remaining_blocks == 0) {
- return NodePopulator::IterationCommand::Stop;
- }
- return NodePopulator::IterationCommand::Continue;
- };
-
- Inode* mapped_inode = blobfs_->GetNode(map_index_);
- *mapped_inode = inode_;
- NodePopulator populator(blobfs_->allocator_.get(), std::move(write_info_->extents),
- std::move(write_info_->node_indices));
- ZX_ASSERT(populator.Walk(on_node, on_extent) == ZX_OK);
-
- // Ensure all non-allocation flags are propagated to the inode.
- mapped_inode->header.flags |= (inode_.header.flags & kBlobFlagLZ4Compressed);
- } else {
- // Special case: Empty node.
- ZX_DEBUG_ASSERT(write_info_->node_indices.size() == 1);
- const ReservedNode& node = write_info_->node_indices[0];
- blobfs_->allocator_->MarkInodeAllocated(node);
- blobfs_->PersistNode(wb.get(), node.index());
- }
-
- wb->SetSyncComplete();
- if ((status = blobfs_->EnqueueWork(std::move(wb), EnqueueType::kJournal)) != ZX_OK) {
- return status;
- }
-
- // Drop the write info, since we no longer need it.
- write_info_.reset();
- return status;
-}
-
-zx_status_t VnodeBlob::WriteInternal(const void* data, size_t len, size_t* actual) {
- TRACE_DURATION("blobfs", "Blobfs::WriteInternal", "data", data, "len", len);
-
- *actual = 0;
- if (len == 0) {
- return ZX_OK;
- }
-
- const uint32_t merkle_blocks = MerkleTreeBlocks(inode_);
- const size_t merkle_bytes = merkle_blocks * kBlobfsBlockSize;
- if (GetState() == kBlobStateDataWrite) {
- size_t to_write = fbl::min(len, inode_.blob_size - write_info_->bytes_written);
- size_t offset = write_info_->bytes_written + merkle_bytes;
- zx_status_t status = mapping_.vmo().write(data, offset, to_write);
- if (status != ZX_OK) {
- return status;
- }
-
- *actual = to_write;
- write_info_->bytes_written += to_write;
-
- if (write_info_->compressor.Compressing()) {
- if ((status = write_info_->compressor.Update(data, to_write)) != ZX_OK) {
- return status;
- }
- ConsiderCompressionAbort();
- }
-
- // More data to write.
- if (write_info_->bytes_written < inode_.blob_size) {
- return ZX_OK;
- }
-
- // Only write data to disk once we've buffered the file into memory.
- // This gives us a chance to try compressing the blob before we write it back.
- fbl::unique_ptr<WritebackWork> wb;
- if ((status = blobfs_->CreateWork(&wb, this)) != ZX_OK) {
- return status;
- }
-
- // In case the operation fails, forcibly reset the WritebackWork
- // to avoid asserting that no write requests exist on destruction.
- auto set_error = fbl::MakeAutoCall([&]() {
- if (wb != nullptr) {
- wb->Reset(ZX_ERR_BAD_STATE);
- }
-
- SetState(kBlobStateError);
- });
-
- if (write_info_->compressor.Compressing()) {
- if ((status = write_info_->compressor.End()) != ZX_OK) {
- return status;
- }
- ConsiderCompressionAbort();
- }
-
- // Since the merkle tree and data are co-allocated, use a block iterator
- // to parse their data in order.
- VectorExtentIterator extent_iter(write_info_->extents);
- BlockIterator block_iter(&extent_iter);
-
- // TODO(smklein): As an optimization, use the CreateInit/Update/Final
- // methods to create the merkle tree as we write data, rather than
- // waiting until the data is fully downloaded to create the tree.
- size_t merkle_size = MerkleTree::GetTreeLength(inode_.blob_size);
- fs::Duration generation_time;
- if (merkle_size > 0) {
- Digest digest;
- void* merkle_data = GetMerkle();
- const void* blob_data = GetData();
- fs::Ticker ticker(blobfs_->CollectingMetrics()); // Tracking generation time.
-
- if ((status = MerkleTree::Create(blob_data, inode_.blob_size, merkle_data,
- merkle_size, &digest)) != ZX_OK) {
- return status;
- } else if (digest != digest_) {
- // Downloaded blob did not match provided digest.
- return ZX_ERR_IO_DATA_INTEGRITY;
- }
-
- status = StreamBlocks(&block_iter, merkle_blocks,
- [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
- return EnqueuePaginated(&wb, blobfs_, this, mapping_.vmo(), vmo_offset,
- dev_offset + blobfs_->DataStart(), length);
- });
-
- if (status != ZX_OK) {
- return status;
- }
- generation_time = ticker.End();
- } else if ((status = Verify()) != ZX_OK) {
- // Small blobs may not have associated Merkle Trees, and will
- // require validation, since we are not regenerating and checking
- // the digest.
- return status;
- }
-
- if (write_info_->compressor.Compressing()) {
- uint64_t blocks64 = fbl::round_up(write_info_->compressor.Size(),
- kBlobfsBlockSize) / kBlobfsBlockSize;
- ZX_DEBUG_ASSERT(blocks64 <= std::numeric_limits<uint32_t>::max());
- uint32_t blocks = static_cast<uint32_t>(blocks64);
- int64_t vmo_bias = -static_cast<int64_t>(merkle_blocks);
- ZX_DEBUG_ASSERT(block_iter.BlockIndex() + vmo_bias == 0);
- status = StreamBlocks(&block_iter, blocks,
- [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
- return EnqueuePaginated(&wb, blobfs_, this, write_info_->compressed_blob.vmo(),
- vmo_offset - merkle_blocks,
- dev_offset + blobfs_->DataStart(), length);
- });
-
- if (status != ZX_OK) {
- return status;
- }
- blocks += MerkleTreeBlocks(inode_);
- // By compressing, we used less blocks than we originally reserved.
- ZX_DEBUG_ASSERT(inode_.block_count > blocks);
-
- inode_.block_count = blocks;
- inode_.header.flags |= kBlobFlagLZ4Compressed;
- } else {
- uint64_t blocks64 =
- fbl::round_up(inode_.blob_size, kBlobfsBlockSize) / kBlobfsBlockSize;
- ZX_DEBUG_ASSERT(blocks64 <= std::numeric_limits<uint32_t>::max());
- uint32_t blocks = static_cast<uint32_t>(blocks64);
- status = StreamBlocks(&block_iter, blocks,
- [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
- return EnqueuePaginated(&wb, blobfs_, this, mapping_.vmo(), vmo_offset,
- dev_offset + blobfs_->DataStart(), length);
- });
- if (status != ZX_OK) {
- return status;
- }
- }
-
- // Enqueue the blob's final data work. Metadata must be enqueued separately.
- if ((status = blobfs_->EnqueueWork(std::move(wb), EnqueueType::kData)) != ZX_OK) {
- return status;
- }
-
- // No more data to write. Flush to disk.
- fs::Ticker ticker(blobfs_->CollectingMetrics()); // Tracking enqueue time.
- if ((status = WriteMetadata()) != ZX_OK) {
- return status;
- }
-
- blobfs_->UpdateClientWriteMetrics(to_write, merkle_size, ticker.End(),
- generation_time);
- set_error.cancel();
- return ZX_OK;
- }
-
- return ZX_ERR_BAD_STATE;
-}
-
-void VnodeBlob::ConsiderCompressionAbort() {
- ZX_DEBUG_ASSERT(write_info_->compressor.Compressing());
- if (inode_.blob_size - kCompressionMinBytesSaved < write_info_->compressor.Size()) {
- write_info_->compressor.Reset();
- write_info_->compressed_blob.Reset();
- }
-}
-
-zx_status_t VnodeBlob::GetReadableEvent(zx_handle_t* out) {
- TRACE_DURATION("blobfs", "Blobfs::GetReadableEvent");
- zx_status_t status;
- // This is the first 'wait until read event' request received.
- if (!readable_event_.is_valid()) {
- status = zx::event::create(0, &readable_event_);
- if (status != ZX_OK) {
- return status;
- } else if (GetState() == kBlobStateReadable) {
- readable_event_.signal(0u, ZX_USER_SIGNAL_0);
- }
- }
- status = zx_handle_duplicate(readable_event_.get(), ZX_RIGHTS_BASIC, out);
- if (status != ZX_OK) {
- return status;
- }
- return sizeof(zx_handle_t);
-}
-
-zx_status_t VnodeBlob::CloneVmo(zx_rights_t rights, zx_handle_t* out) {
- TRACE_DURATION("blobfs", "Blobfs::CloneVmo", "rights", rights, "out", out);
- if (GetState() != kBlobStateReadable) {
- return ZX_ERR_BAD_STATE;
- }
- if (inode_.blob_size == 0) {
- return ZX_ERR_BAD_STATE;
- }
- zx_status_t status = InitVmos();
- if (status != ZX_OK) {
- return status;
- }
-
- // TODO(smklein): Only clone / verify the part of the vmo that
- // was requested.
- const size_t merkle_bytes = MerkleTreeBlocks(inode_) * kBlobfsBlockSize;
- zx::vmo clone;
- if ((status = mapping_.vmo().clone(ZX_VMO_CLONE_COPY_ON_WRITE, merkle_bytes, inode_.blob_size,
- &clone)) != ZX_OK) {
- return status;
- }
-
- // TODO(mdempsky): Push elsewhere.
- if ((status = clone.replace_as_executable(zx::handle(), &clone)) != ZX_OK) {
- return status;
- }
-
- if ((status = clone.replace(rights, &clone)) != ZX_OK) {
- return status;
- }
- *out = clone.release();
-
- if (clone_watcher_.object() == ZX_HANDLE_INVALID) {
- clone_watcher_.set_object(mapping_.vmo().get());
- clone_watcher_.set_trigger(ZX_VMO_ZERO_CHILDREN);
-
- // Keep a reference to "this" alive, preventing the blob
- // from being closed while someone may still be using the
- // underlying memory.
- //
- // We'll release it when no client-held VMOs are in use.
- clone_ref_ = fbl::RefPtr<VnodeBlob>(this);
- clone_watcher_.Begin(blobfs_->dispatcher());
- }
-
- return ZX_OK;
-}
-
-void VnodeBlob::HandleNoClones(async_dispatcher_t* dispatcher, async::WaitBase* wait,
- zx_status_t status, const zx_packet_signal_t* signal) {
- ZX_DEBUG_ASSERT(status == ZX_OK);
- ZX_DEBUG_ASSERT((signal->observed & ZX_VMO_ZERO_CHILDREN) != 0);
- ZX_DEBUG_ASSERT(clone_watcher_.object() != ZX_HANDLE_INVALID);
- clone_watcher_.set_object(ZX_HANDLE_INVALID);
- clone_ref_ = nullptr;
-}
-
-zx_status_t VnodeBlob::ReadInternal(void* data, size_t len, size_t off, size_t* actual) {
- TRACE_DURATION("blobfs", "Blobfs::ReadInternal", "len", len, "off", off);
-
- if (GetState() != kBlobStateReadable) {
- return ZX_ERR_BAD_STATE;
- }
-
- if (inode_.blob_size == 0) {
- *actual = 0;
- return ZX_OK;
- }
-
- zx_status_t status = InitVmos();
- if (status != ZX_OK) {
- return status;
- }
-
- Digest d;
- d = reinterpret_cast<const uint8_t*>(&digest_[0]);
-
- if (off >= inode_.blob_size) {
- *actual = 0;
- return ZX_OK;
- }
- if (len > (inode_.blob_size - off)) {
- len = inode_.blob_size - off;
- }
-
- const size_t merkle_bytes = MerkleTreeBlocks(inode_) * kBlobfsBlockSize;
- status = mapping_.vmo().read(data, merkle_bytes + off, len);
- if (status == ZX_OK) {
- *actual = len;
- }
- return status;
-}
-
-zx_status_t VnodeBlob::QueueUnlink() {
- flags_ |= kBlobFlagDeletable;
- // Attempt to purge in case the blob has been unlinked with no open fds
- return TryPurge();
-}
-
-zx_status_t VnodeBlob::VerifyBlob(Blobfs* bs, uint32_t node_index) {
- Inode* inode = bs->GetNode(node_index);
- Digest digest(inode->merkle_root_hash);
- fbl::AllocChecker ac;
- fbl::RefPtr<VnodeBlob> vn =
- fbl::AdoptRef(new (&ac) VnodeBlob(bs, digest));
-
- if (!ac.check()) {
- return ZX_ERR_NO_MEMORY;
- }
-
- vn->PopulateInode(node_index);
-
- // Set blob state to "Purged" so we do not try to add it to the cached map on recycle.
- vn->SetState(kBlobStatePurged);
-
- // If we are unable to read in the blob from disk, this should also be a VerifyBlob error.
- // Since InitVmos calls Verify as its final step, we can just return its result here.
- return vn->InitVmos();
-}
-
zx_status_t Blobfs::VerifyBlob(uint32_t node_index) {
return VnodeBlob::VerifyBlob(this, node_index);
}
@@ -949,8 +194,8 @@
}
// Initialize the WritebackQueue.
- zx_status_t status = WritebackQueue::Create(this, WriteBufferSize() / kBlobfsBlockSize,
- &writeback_);
+ zx_status_t status =
+ WritebackQueue::Create(this, WriteBufferSize() / kBlobfsBlockSize, &writeback_);
if (status != ZX_OK) {
return status;
@@ -978,7 +223,9 @@
return ZX_OK;
}
-size_t Blobfs::WritebackCapacity() const { return writeback_->GetCapacity(); }
+size_t Blobfs::WritebackCapacity() const {
+ return writeback_->GetCapacity();
+}
void Blobfs::Shutdown(fs::Vfs::ShutdownCallback cb) {
TRACE_DURATION("blobfs", "Blobfs::Unmount");
@@ -1035,12 +282,10 @@
}
void Blobfs::WriteBitmap(WritebackWork* wb, uint64_t nblocks, uint64_t start_block) {
- TRACE_DURATION("blobfs", "Blobfs::WriteBitmap", "nblocks", nblocks, "start_block",
- start_block);
+ TRACE_DURATION("blobfs", "Blobfs::WriteBitmap", "nblocks", nblocks, "start_block", start_block);
uint64_t bbm_start_block = start_block / kBlobfsBlockBits;
- uint64_t bbm_end_block = fbl::round_up(start_block + nblocks,
- kBlobfsBlockBits) /
- kBlobfsBlockBits;
+ uint64_t bbm_end_block =
+ fbl::round_up(start_block + nblocks, kBlobfsBlockBits) / kBlobfsBlockBits;
// Write back the block allocation bitmap
wb->Enqueue(allocator_->GetBlockMapVmo(), bbm_start_block,
@@ -1170,9 +415,7 @@
zx_status_t Blobfs::LookupBlob(const Digest& digest, fbl::RefPtr<VnodeBlob>* out) {
TRACE_DURATION("blobfs", "Blobfs::LookupBlob");
const uint8_t* key = digest.AcquireBytes();
- auto release = fbl::MakeAutoCall([&digest]() {
- digest.ReleaseBytes();
- });
+ auto release = fbl::MakeAutoCall([&digest]() { digest.ReleaseBytes(); });
// Look up the blob in the maps.
fbl::RefPtr<VnodeBlob> vn;
@@ -1258,12 +501,14 @@
}
const uint32_t kInodesPerSlice = static_cast<uint32_t>(info_.slice_size / kBlobfsInodeSize);
- uint64_t inodes64 = (info_.ino_slices + static_cast<uint32_t>(request.length)) * kInodesPerSlice;
+ uint64_t inodes64 =
+ (info_.ino_slices + static_cast<uint32_t>(request.length)) * kInodesPerSlice;
ZX_DEBUG_ASSERT(inodes64 <= std::numeric_limits<uint32_t>::max());
uint32_t inodes = static_cast<uint32_t>(inodes64);
uint32_t inoblks = (inodes + kBlobfsInodesPerBlock - 1) / kBlobfsInodesPerBlock;
ZX_DEBUG_ASSERT(info_.inode_count <= std::numeric_limits<uint32_t>::max());
- uint32_t inoblks_old = (static_cast<uint32_t>(info_.inode_count) + kBlobfsInodesPerBlock - 1) / kBlobfsInodesPerBlock;
+ uint32_t inoblks_old = (static_cast<uint32_t>(info_.inode_count) + kBlobfsInodesPerBlock - 1) /
+ kBlobfsInodesPerBlock;
ZX_DEBUG_ASSERT(inoblks_old <= inoblks);
if (node_map->Grow(inoblks * kBlobfsBlockSize) != ZX_OK) {
@@ -1312,7 +557,7 @@
ZX_DEBUG_ASSERT(abmblks_old <= abmblks);
if (abmblks > kBlocksPerSlice) {
- //TODO(planders): Allocate more slices for the block bitmap.
+ // TODO(planders): Allocate more slices for the block bitmap.
FS_TRACE_ERROR("Blobfs::AddBlocks needs to increase block bitmap size\n");
return ZX_ERR_NO_SPACE;
}
@@ -1406,8 +651,7 @@
}
}
-void Blobfs::UpdateMerkleDecompressMetrics(uint64_t size_compressed,
- uint64_t size_uncompressed,
+void Blobfs::UpdateMerkleDecompressMetrics(uint64_t size_compressed, uint64_t size_uncompressed,
const fs::Duration& read_duration,
const fs::Duration& decompress_duration) {
if (CollectingMetrics()) {
@@ -1428,8 +672,7 @@
}
}
-Blobfs::Blobfs(fbl::unique_fd fd, const Superblock* info)
- : blockfd_(std::move(fd)) {
+Blobfs::Blobfs(fbl::unique_fd fd, const Superblock* info) : blockfd_(std::move(fd)) {
memcpy(&info_, info, sizeof(Superblock));
}
@@ -1447,8 +690,8 @@
}
}
-zx_status_t Blobfs::Create(fbl::unique_fd fd, const MountOptions& options,
- const Superblock* info, fbl::unique_ptr<Blobfs>* out) {
+zx_status_t Blobfs::Create(fbl::unique_fd fd, const MountOptions& options, const Superblock* info,
+ fbl::unique_ptr<Blobfs>* out) {
TRACE_DURATION("blobfs", "Blobfs::Create");
zx_status_t status = CheckSuperblock(info, TotalBlocks(*info));
if (status < 0) {
@@ -1496,19 +739,17 @@
if ((status = node_map.CreateAndMap(nodemap_size, "nodemap")) != ZX_OK) {
return status;
}
- fs->allocator_ = fbl::make_unique<Allocator>(fs.get(), std::move(block_map),
- std::move(node_map));
+ fs->allocator_ =
+ fbl::make_unique<Allocator>(fs.get(), std::move(block_map), std::move(node_map));
if ((status = fs->allocator_->ResetFromStorage(fs::ReadTxn(fs.get()))) != ZX_OK) {
FS_TRACE_ERROR("blobfs: Failed to load bitmaps: %d\n", status);
return status;
}
- if ((status = fs->info_mapping_.CreateAndMap(kBlobfsBlockSize,
- "blobfs-superblock")) != ZX_OK) {
+ if ((status = fs->info_mapping_.CreateAndMap(kBlobfsBlockSize, "blobfs-superblock")) != ZX_OK) {
FS_TRACE_ERROR("blobfs: Failed to create info vmo: %d\n", status);
return status;
- } else if ((status = fs->AttachVmo(fs->info_mapping_.vmo(),
- &fs->info_vmoid_)) != ZX_OK) {
+ } else if ((status = fs->AttachVmo(fs->info_mapping_.vmo(), &fs->info_vmoid_)) != ZX_OK) {
FS_TRACE_ERROR("blobfs: Failed to attach info vmo: %d\n", status);
return status;
} else if ((status = fs->CreateFsId()) != ZX_OK) {
@@ -1551,7 +792,8 @@
char name[digest::Digest::kLength * 2 + 1];
digest.ToString(name, sizeof(name));
FS_TRACE_ERROR("blobfs: CORRUPTED FILESYSTEM: Duplicate node: "
- "%s @ index %u\n", name, i);
+ "%s @ index %u\n",
+ name, i);
return status;
}
UpdateLookupMetrics(size);
@@ -1658,8 +900,7 @@
zx_status_t Blobfs::OpenRootNode(fbl::RefPtr<VnodeBlob>* out) {
fbl::AllocChecker ac;
- fbl::RefPtr<VnodeBlob> vn =
- fbl::AdoptRef(new (&ac) VnodeBlob(this));
+ fbl::RefPtr<VnodeBlob> vn = fbl::AdoptRef(new (&ac) VnodeBlob(this));
if (!ac.check()) {
return ZX_ERR_NO_MEMORY;
@@ -1705,8 +946,7 @@
}
zx_status_t Mount(async_dispatcher_t* dispatcher, fbl::unique_fd blockfd,
- const MountOptions& options, zx::channel root,
- fbl::Closure on_unmount) {
+ const MountOptions& options, zx::channel root, fbl::Closure on_unmount) {
zx_status_t status;
fbl::unique_ptr<Blobfs> fs;
diff --git a/system/ulib/blobfs/vnode.cpp b/system/ulib/blobfs/vnode.cpp
index 5cc298a..634c356 100644
--- a/system/ulib/blobfs/vnode.cpp
+++ b/system/ulib/blobfs/vnode.cpp
@@ -15,23 +15,783 @@
#include <zircon/device/device.h>
#include <zircon/device/vfs.h>
+#include <fbl/auto_call.h>
#include <fbl/ref_ptr.h>
#include <fbl/string_piece.h>
#include <lib/fdio/debug.h>
#include <lib/fdio/vfs.h>
#include <lib/sync/completion.h>
+#include <zircon/status.h>
#include <zircon/syscalls.h>
#define ZXDEBUG 0
#include <blobfs/blobfs.h>
+#include <blobfs/iterator/allocated-extent-iterator.h>
+#include <blobfs/iterator/block-iterator.h>
+#include <blobfs/iterator/extent-iterator.h>
+#include <blobfs/iterator/node-populator.h>
+#include <blobfs/iterator/vector-extent-iterator.h>
#include <utility>
-using digest::Digest;
-
namespace blobfs {
+namespace {
+
+using digest::Digest;
+using digest::MerkleTree;
+
+// A wrapper around "Enqueue" for content which risks being larger
+// than the writeback buffer.
+//
+// For content which is smaller than 3/4 the size of the writeback buffer: the
+// content is enqueued to |work| without flushing.
+//
+// For content which is larger than 3/4 the size of the writeback buffer: flush
+// the data by enqueueing it to the writeback thread in chunks until the
+// remainder is small enough to comfortably fit within the writeback buffer.
+zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work, Blobfs* blobfs, VnodeBlob* vn,
+ const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
+ uint64_t nblocks) {
+ const size_t kMaxChunkBlocks = (3 * blobfs->WritebackCapacity()) / 4;
+ uint64_t delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
+ while (nblocks > 0) {
+ (*work)->Enqueue(vmo, relative_block, absolute_block, delta_blocks);
+ relative_block += delta_blocks;
+ absolute_block += delta_blocks;
+ nblocks -= delta_blocks;
+ delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
+ if (nblocks) {
+ fbl::unique_ptr<WritebackWork> tmp;
+ zx_status_t status = blobfs->CreateWork(&tmp, vn);
+ if (status != ZX_OK) {
+ return status;
+ }
+ if ((status = blobfs->EnqueueWork(std::move(*work), EnqueueType::kData)) != ZX_OK) {
+ return status;
+ }
+ *work = std::move(tmp);
+ }
+ }
+ return ZX_OK;
+}
+
+} // namespace
+
+zx_status_t VnodeBlob::Verify() const {
+ TRACE_DURATION("blobfs", "Blobfs::Verify");
+ fs::Ticker ticker(blobfs_->CollectingMetrics());
+
+ const void* data = inode_.blob_size ? GetData() : nullptr;
+ const void* tree = inode_.blob_size ? GetMerkle() : nullptr;
+ const uint64_t data_size = inode_.blob_size;
+ const uint64_t merkle_size = MerkleTree::GetTreeLength(data_size);
+ // TODO(smklein): We could lazily verify more of the VMO if
+ // we could fault in pages on-demand.
+ //
+ // For now, we aggressively verify the entire VMO up front.
+ Digest digest;
+ digest = reinterpret_cast<const uint8_t*>(&digest_[0]);
+ zx_status_t status =
+ MerkleTree::Verify(data, data_size, tree, merkle_size, 0, data_size, digest);
+ blobfs_->UpdateMerkleVerifyMetrics(data_size, merkle_size, ticker.End());
+
+ if (status != ZX_OK) {
+ char name[Digest::kLength * 2 + 1];
+ ZX_ASSERT(digest.ToString(name, sizeof(name)) == ZX_OK);
+ FS_TRACE_ERROR("blobfs verify(%s) Failure: %s\n", name, zx_status_get_string(status));
+ }
+
+ return status;
+}
+
+zx_status_t VnodeBlob::InitVmos() {
+ TRACE_DURATION("blobfs", "Blobfs::InitVmos");
+
+ if (mapping_.vmo()) {
+ return ZX_OK;
+ }
+
+ uint64_t data_blocks = BlobDataBlocks(inode_);
+ uint64_t merkle_blocks = MerkleTreeBlocks(inode_);
+ uint64_t num_blocks = data_blocks + merkle_blocks;
+
+ if (num_blocks == 0) {
+ // No need to initialize VMO for null blob.
+ return ZX_OK;
+ }
+
+ // Reverts blob back to uninitialized state on error.
+ auto cleanup = fbl::MakeAutoCall([this]() { BlobCloseHandles(); });
+
+ size_t vmo_size;
+ if (mul_overflow(num_blocks, kBlobfsBlockSize, &vmo_size)) {
+ FS_TRACE_ERROR("Multiplication overflow");
+ return ZX_ERR_OUT_OF_RANGE;
+ }
+
+ zx_status_t status = mapping_.CreateAndMap(vmo_size, "blob");
+ if (status != ZX_OK) {
+ FS_TRACE_ERROR("Failed to initialize vmo; error: %d\n", status);
+ return status;
+ }
+ if ((status = blobfs_->AttachVmo(mapping_.vmo(), &vmoid_)) != ZX_OK) {
+ FS_TRACE_ERROR("Failed to attach VMO to block device; error: %d\n", status);
+ return status;
+ }
+
+ if ((inode_.header.flags & kBlobFlagLZ4Compressed) != 0) {
+ if ((status = InitCompressed()) != ZX_OK) {
+ return status;
+ }
+ } else {
+ if ((status = InitUncompressed()) != ZX_OK) {
+ return status;
+ }
+ }
+ if ((status = Verify()) != ZX_OK) {
+ return status;
+ }
+
+ cleanup.cancel();
+ return ZX_OK;
+}
+
+zx_status_t VnodeBlob::InitCompressed() {
+ TRACE_DURATION("blobfs", "Blobfs::InitCompressed", "size", inode_.blob_size, "blocks",
+ inode_.block_count);
+ fs::Ticker ticker(blobfs_->CollectingMetrics());
+ fs::ReadTxn txn(blobfs_);
+ uint32_t merkle_blocks = MerkleTreeBlocks(inode_);
+
+ fzl::OwnedVmoMapper compressed_mapper;
+ uint32_t compressed_blocks = (inode_.block_count - merkle_blocks);
+ size_t compressed_size;
+ if (mul_overflow(compressed_blocks, kBlobfsBlockSize, &compressed_size)) {
+ FS_TRACE_ERROR("Multiplication overflow\n");
+ return ZX_ERR_OUT_OF_RANGE;
+ }
+ zx_status_t status = compressed_mapper.CreateAndMap(compressed_size, "compressed-blob");
+ if (status != ZX_OK) {
+ FS_TRACE_ERROR("Failed to initialized compressed vmo; error: %d\n", status);
+ return status;
+ }
+ vmoid_t compressed_vmoid;
+ status = blobfs_->AttachVmo(compressed_mapper.vmo(), &compressed_vmoid);
+ if (status != ZX_OK) {
+ FS_TRACE_ERROR("Failed to attach commpressed VMO to blkdev: %d\n", status);
+ return status;
+ }
+
+ auto detach =
+ fbl::MakeAutoCall([this, &compressed_vmoid]() { blobfs_->DetachVmo(compressed_vmoid); });
+
+ const uint64_t kDataStart = DataStartBlock(blobfs_->info_);
+ AllocatedExtentIterator extent_iter(blobfs_->allocator_.get(), GetMapIndex());
+ BlockIterator block_iter(&extent_iter);
+
+ // Read the uncompressed merkle tree into the start of the blob's VMO.
+ status = StreamBlocks(&block_iter, merkle_blocks,
+ [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
+ txn.Enqueue(vmoid_, vmo_offset, dev_offset + kDataStart, length);
+ return ZX_OK;
+ });
+ if (status != ZX_OK) {
+ return status;
+ }
+
+ // Read the compressed blocks into the compressed VMO, accounting for the merkle blocks
+ // which have already been seen.
+ ZX_DEBUG_ASSERT(block_iter.BlockIndex() == merkle_blocks);
+
+ status = StreamBlocks(&block_iter, compressed_blocks,
+ [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
+ txn.Enqueue(compressed_vmoid, vmo_offset - merkle_blocks,
+ dev_offset + kDataStart, length);
+ return ZX_OK;
+ });
+
+ if (status != ZX_OK) {
+ return status;
+ }
+
+ if ((status = txn.Transact()) != ZX_OK) {
+ FS_TRACE_ERROR("Failed to flush read transaction: %d\n", status);
+ return status;
+ }
+
+ fs::Duration read_time = ticker.End();
+ ticker.Reset();
+
+ // Decompress the compressed data into the target buffer.
+ size_t target_size = inode_.blob_size;
+ status = Decompressor::Decompress(GetData(), &target_size, compressed_mapper.start(),
+ &compressed_size);
+ if (status != ZX_OK) {
+ FS_TRACE_ERROR("Failed to decompress data: %d\n", status);
+ return status;
+ } else if (target_size != inode_.blob_size) {
+ FS_TRACE_ERROR("Failed to fully decompress blob (%zu of %zu expected)\n", target_size,
+ inode_.blob_size);
+ return ZX_ERR_IO_DATA_INTEGRITY;
+ }
+
+ blobfs_->UpdateMerkleDecompressMetrics((compressed_blocks)*kBlobfsBlockSize, inode_.blob_size,
+ read_time, ticker.End());
+ return ZX_OK;
+}
+
+zx_status_t VnodeBlob::InitUncompressed() {
+ TRACE_DURATION("blobfs", "Blobfs::InitUncompressed", "size", inode_.blob_size, "blocks",
+ inode_.block_count);
+ fs::Ticker ticker(blobfs_->CollectingMetrics());
+ fs::ReadTxn txn(blobfs_);
+ AllocatedExtentIterator extent_iter(blobfs_->allocator_.get(), GetMapIndex());
+ BlockIterator block_iter(&extent_iter);
+ // Read both the uncompressed merkle tree and data.
+ const uint64_t blob_data_blocks = BlobDataBlocks(inode_);
+ const uint64_t merkle_blocks = MerkleTreeBlocks(inode_);
+ if (blob_data_blocks + merkle_blocks > std::numeric_limits<uint32_t>::max()) {
+ return ZX_ERR_IO_DATA_INTEGRITY;
+ }
+ const uint32_t length = static_cast<uint32_t>(blob_data_blocks + merkle_blocks);
+ const uint64_t data_start = DataStartBlock(blobfs_->info_);
+ zx_status_t status = StreamBlocks(
+ &block_iter, length, [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
+ txn.Enqueue(vmoid_, vmo_offset, dev_offset + data_start, length);
+ return ZX_OK;
+ });
+
+ if (status != ZX_OK) {
+ return status;
+ }
+
+ status = txn.Transact();
+ if (status != ZX_OK) {
+ return status;
+ }
+ blobfs_->UpdateMerkleDiskReadMetrics(length * kBlobfsBlockSize, ticker.End());
+ return status;
+}
+
+void VnodeBlob::PopulateInode(uint32_t node_index) {
+ ZX_DEBUG_ASSERT(map_index_ == 0);
+ SetState(kBlobStateReadable);
+ map_index_ = node_index;
+ Inode* inode = blobfs_->GetNode(node_index);
+ inode_ = *inode;
+}
+
+uint64_t VnodeBlob::SizeData() const {
+ if (GetState() == kBlobStateReadable) {
+ return inode_.blob_size;
+ }
+ return 0;
+}
+
+VnodeBlob::VnodeBlob(Blobfs* bs, const Digest& digest)
+ : blobfs_(bs), flags_(kBlobStateEmpty), syncing_(false), clone_watcher_(this) {
+ digest.CopyTo(digest_, sizeof(digest_));
+}
+
+VnodeBlob::VnodeBlob(Blobfs* bs)
+ : blobfs_(bs), flags_(kBlobStateEmpty | kBlobFlagDirectory), syncing_(false),
+ clone_watcher_(this) {}
+
+void VnodeBlob::BlobCloseHandles() {
+ mapping_.Reset();
+ readable_event_.reset();
+}
+
+zx_status_t VnodeBlob::SpaceAllocate(uint64_t size_data) {
+ TRACE_DURATION("blobfs", "Blobfs::SpaceAllocate", "size_data", size_data);
+ fs::Ticker ticker(blobfs_->CollectingMetrics());
+
+ if (GetState() != kBlobStateEmpty) {
+ return ZX_ERR_BAD_STATE;
+ }
+
+ auto write_info = fbl::make_unique<WritebackInfo>();
+
+ // Initialize the inode with known fields.
+ memset(inode_.merkle_root_hash, 0, Digest::kLength);
+ inode_.blob_size = size_data;
+ inode_.block_count = MerkleTreeBlocks(inode_) + static_cast<uint32_t>(BlobDataBlocks(inode_));
+
+ // Special case for the null blob: We skip the write phase.
+ if (inode_.blob_size == 0) {
+ zx_status_t status = blobfs_->ReserveNodes(1, &write_info->node_indices);
+ if (status != ZX_OK) {
+ return status;
+ }
+ map_index_ = write_info->node_indices[0].index();
+ write_info_ = std::move(write_info);
+
+ if ((status = Verify()) != ZX_OK) {
+ return status;
+ }
+ SetState(kBlobStateDataWrite);
+ if ((status = WriteMetadata()) != ZX_OK) {
+ FS_TRACE_ERROR("Null blob metadata fail: %d\n", status);
+ return status;
+ }
+ return ZX_OK;
+ }
+
+ fbl::Vector<ReservedExtent> extents;
+ fbl::Vector<ReservedNode> nodes;
+
+ // Reserve space for the blob.
+ zx_status_t status = blobfs_->ReserveBlocks(inode_.block_count, &extents);
+ if (status != ZX_OK) {
+ return status;
+ }
+ if (extents.size() > kMaxBlobExtents) {
+ FS_TRACE_ERROR("Error: Block reservation requires too many extents (%zu vs %zu max)\n",
+ extents.size(), kMaxBlobExtents);
+ return ZX_ERR_BAD_STATE;
+ }
+ const ExtentCountType extent_count = static_cast<ExtentCountType>(extents.size());
+
+ // Reserve space for all the nodes necessary to contain this blob.
+ size_t node_count = NodePopulator::NodeCountForExtents(extent_count);
+ status = blobfs_->ReserveNodes(node_count, &nodes);
+ if (status != ZX_OK) {
+ return status;
+ }
+
+ if (inode_.blob_size >= kCompressionMinBytesSaved) {
+ size_t max = write_info->compressor.BufferMax(inode_.blob_size);
+ status = write_info->compressed_blob.CreateAndMap(max, "compressed-blob");
+ if (status != ZX_OK) {
+ return status;
+ }
+ status = write_info->compressor.Initialize(write_info->compressed_blob.start(),
+ write_info->compressed_blob.size());
+ if (status != ZX_OK) {
+ FS_TRACE_ERROR("blobfs: Failed to initialize compressor: %d\n", status);
+ return status;
+ }
+ }
+
+ // Open VMOs, so we can begin writing after allocate succeeds.
+ fzl::OwnedVmoMapper mapping;
+ if ((status = mapping.CreateAndMap(inode_.block_count * kBlobfsBlockSize, "blob")) != ZX_OK) {
+ return status;
+ }
+ if ((status = blobfs_->AttachVmo(mapping.vmo(), &vmoid_)) != ZX_OK) {
+ return status;
+ }
+
+ map_index_ = nodes[0].index();
+ mapping_ = std::move(mapping);
+ write_info->extents = std::move(extents);
+ write_info->node_indices = std::move(nodes);
+ write_info_ = std::move(write_info);
+
+ SetState(kBlobStateDataWrite);
+ blobfs_->UpdateAllocationMetrics(size_data, ticker.End());
+ return ZX_OK;
+}
+
+void* VnodeBlob::GetData() const {
+ return fs::GetBlock(kBlobfsBlockSize, mapping_.start(), MerkleTreeBlocks(inode_));
+}
+
+void* VnodeBlob::GetMerkle() const {
+ return mapping_.start();
+}
+
+zx_status_t VnodeBlob::WriteMetadata() {
+ TRACE_DURATION("blobfs", "Blobfs::WriteMetadata");
+ assert(GetState() == kBlobStateDataWrite);
+
+ zx_status_t status;
+ fbl::unique_ptr<WritebackWork> wb;
+ if ((status = blobfs_->CreateWork(&wb, this)) != ZX_OK) {
+ return status;
+ }
+
+ // Update the on-disk hash.
+ memcpy(inode_.merkle_root_hash, &digest_[0], Digest::kLength);
+
+ // All data has been written to the containing VMO.
+ SetState(kBlobStateReadable);
+ if (readable_event_.is_valid()) {
+ status = readable_event_.signal(0u, ZX_USER_SIGNAL_0);
+ if (status != ZX_OK) {
+ SetState(kBlobStateError);
+ return status;
+ }
+ }
+
+ atomic_store(&syncing_, true);
+
+ if (inode_.block_count) {
+ // We utilize the NodePopulator class to take our reserved blocks and nodes and fill the
+ // persistent map with an allocated inode / container.
+
+ // If |on_node| is invoked on a node, it means that node was necessary to represent this
+ // blob. Persist the node back to durable storge.
+ auto on_node = [this, &wb](const ReservedNode& node) {
+ blobfs_->PersistNode(wb.get(), node.index());
+ };
+
+ // If |on_extent| is invoked on an extent, it was necessary to represent this blob. Persist
+ // the allocation of these blocks back to durable storage.
+ //
+ // Additionally, because of the compression feature of blobfs, it is possible we reserved
+ // more extents than this blob ended up using. Decrement |remaining_blocks| to track if we
+ // should exit early.
+ size_t remaining_blocks = inode_.block_count;
+ auto on_extent = [this, &wb, &remaining_blocks](ReservedExtent& extent) {
+ ZX_DEBUG_ASSERT(remaining_blocks > 0);
+ if (remaining_blocks >= extent.extent().Length()) {
+ // Consume the entire extent.
+ remaining_blocks -= extent.extent().Length();
+ } else {
+ // Consume only part of the extent; we're done iterating.
+ extent.SplitAt(static_cast<BlockCountType>(remaining_blocks));
+ remaining_blocks = 0;
+ }
+ blobfs_->PersistBlocks(wb.get(), extent);
+ if (remaining_blocks == 0) {
+ return NodePopulator::IterationCommand::Stop;
+ }
+ return NodePopulator::IterationCommand::Continue;
+ };
+
+ Inode* mapped_inode = blobfs_->GetNode(map_index_);
+ *mapped_inode = inode_;
+ NodePopulator populator(blobfs_->allocator_.get(), std::move(write_info_->extents),
+ std::move(write_info_->node_indices));
+ ZX_ASSERT(populator.Walk(on_node, on_extent) == ZX_OK);
+
+ // Ensure all non-allocation flags are propagated to the inode.
+ mapped_inode->header.flags |= (inode_.header.flags & kBlobFlagLZ4Compressed);
+ } else {
+ // Special case: Empty node.
+ ZX_DEBUG_ASSERT(write_info_->node_indices.size() == 1);
+ const ReservedNode& node = write_info_->node_indices[0];
+ blobfs_->allocator_->MarkInodeAllocated(node);
+ blobfs_->PersistNode(wb.get(), node.index());
+ }
+
+ wb->SetSyncComplete();
+ if ((status = blobfs_->EnqueueWork(std::move(wb), EnqueueType::kJournal)) != ZX_OK) {
+ return status;
+ }
+
+ // Drop the write info, since we no longer need it.
+ write_info_.reset();
+ return status;
+}
+
+zx_status_t VnodeBlob::WriteInternal(const void* data, size_t len, size_t* actual) {
+ TRACE_DURATION("blobfs", "Blobfs::WriteInternal", "data", data, "len", len);
+
+ *actual = 0;
+ if (len == 0) {
+ return ZX_OK;
+ }
+
+ const uint32_t merkle_blocks = MerkleTreeBlocks(inode_);
+ const size_t merkle_bytes = merkle_blocks * kBlobfsBlockSize;
+ if (GetState() == kBlobStateDataWrite) {
+ size_t to_write = fbl::min(len, inode_.blob_size - write_info_->bytes_written);
+ size_t offset = write_info_->bytes_written + merkle_bytes;
+ zx_status_t status = mapping_.vmo().write(data, offset, to_write);
+ if (status != ZX_OK) {
+ return status;
+ }
+
+ *actual = to_write;
+ write_info_->bytes_written += to_write;
+
+ if (write_info_->compressor.Compressing()) {
+ if ((status = write_info_->compressor.Update(data, to_write)) != ZX_OK) {
+ return status;
+ }
+ ConsiderCompressionAbort();
+ }
+
+ // More data to write.
+ if (write_info_->bytes_written < inode_.blob_size) {
+ return ZX_OK;
+ }
+
+ // Only write data to disk once we've buffered the file into memory.
+ // This gives us a chance to try compressing the blob before we write it back.
+ fbl::unique_ptr<WritebackWork> wb;
+ if ((status = blobfs_->CreateWork(&wb, this)) != ZX_OK) {
+ return status;
+ }
+
+ // In case the operation fails, forcibly reset the WritebackWork
+ // to avoid asserting that no write requests exist on destruction.
+ auto set_error = fbl::MakeAutoCall([&]() {
+ if (wb != nullptr) {
+ wb->Reset(ZX_ERR_BAD_STATE);
+ }
+
+ SetState(kBlobStateError);
+ });
+
+ if (write_info_->compressor.Compressing()) {
+ if ((status = write_info_->compressor.End()) != ZX_OK) {
+ return status;
+ }
+ ConsiderCompressionAbort();
+ }
+
+ // Since the merkle tree and data are co-allocated, use a block iterator
+ // to parse their data in order.
+ VectorExtentIterator extent_iter(write_info_->extents);
+ BlockIterator block_iter(&extent_iter);
+
+ // TODO(smklein): As an optimization, use the CreateInit/Update/Final
+ // methods to create the merkle tree as we write data, rather than
+ // waiting until the data is fully downloaded to create the tree.
+ size_t merkle_size = MerkleTree::GetTreeLength(inode_.blob_size);
+ fs::Duration generation_time;
+ if (merkle_size > 0) {
+ Digest digest;
+ void* merkle_data = GetMerkle();
+ const void* blob_data = GetData();
+ fs::Ticker ticker(blobfs_->CollectingMetrics()); // Tracking generation time.
+
+ if ((status = MerkleTree::Create(blob_data, inode_.blob_size, merkle_data, merkle_size,
+ &digest)) != ZX_OK) {
+ return status;
+ } else if (digest != digest_) {
+ // Downloaded blob did not match provided digest.
+ return ZX_ERR_IO_DATA_INTEGRITY;
+ }
+
+ status = StreamBlocks(&block_iter, merkle_blocks,
+ [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
+ return EnqueuePaginated(
+ &wb, blobfs_, this, mapping_.vmo(), vmo_offset,
+ dev_offset + blobfs_->DataStart(), length);
+ });
+
+ if (status != ZX_OK) {
+ return status;
+ }
+ generation_time = ticker.End();
+ } else if ((status = Verify()) != ZX_OK) {
+ // Small blobs may not have associated Merkle Trees, and will
+ // require validation, since we are not regenerating and checking
+ // the digest.
+ return status;
+ }
+
+ if (write_info_->compressor.Compressing()) {
+ uint64_t blocks64 =
+ fbl::round_up(write_info_->compressor.Size(), kBlobfsBlockSize) / kBlobfsBlockSize;
+ ZX_DEBUG_ASSERT(blocks64 <= std::numeric_limits<uint32_t>::max());
+ uint32_t blocks = static_cast<uint32_t>(blocks64);
+ int64_t vmo_bias = -static_cast<int64_t>(merkle_blocks);
+ ZX_DEBUG_ASSERT(block_iter.BlockIndex() + vmo_bias == 0);
+ status = StreamBlocks(&block_iter, blocks,
+ [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
+ return EnqueuePaginated(
+ &wb, blobfs_, this, write_info_->compressed_blob.vmo(),
+ vmo_offset - merkle_blocks,
+ dev_offset + blobfs_->DataStart(), length);
+ });
+
+ if (status != ZX_OK) {
+ return status;
+ }
+ blocks += MerkleTreeBlocks(inode_);
+ // By compressing, we used less blocks than we originally reserved.
+ ZX_DEBUG_ASSERT(inode_.block_count > blocks);
+
+ inode_.block_count = blocks;
+ inode_.header.flags |= kBlobFlagLZ4Compressed;
+ } else {
+ uint64_t blocks64 =
+ fbl::round_up(inode_.blob_size, kBlobfsBlockSize) / kBlobfsBlockSize;
+ ZX_DEBUG_ASSERT(blocks64 <= std::numeric_limits<uint32_t>::max());
+ uint32_t blocks = static_cast<uint32_t>(blocks64);
+ status = StreamBlocks(&block_iter, blocks,
+ [&](uint64_t vmo_offset, uint64_t dev_offset, uint32_t length) {
+ return EnqueuePaginated(
+ &wb, blobfs_, this, mapping_.vmo(), vmo_offset,
+ dev_offset + blobfs_->DataStart(), length);
+ });
+ if (status != ZX_OK) {
+ return status;
+ }
+ }
+
+ // Enqueue the blob's final data work. Metadata must be enqueued separately.
+ if ((status = blobfs_->EnqueueWork(std::move(wb), EnqueueType::kData)) != ZX_OK) {
+ return status;
+ }
+
+ // No more data to write. Flush to disk.
+ fs::Ticker ticker(blobfs_->CollectingMetrics()); // Tracking enqueue time.
+ if ((status = WriteMetadata()) != ZX_OK) {
+ return status;
+ }
+
+ blobfs_->UpdateClientWriteMetrics(to_write, merkle_size, ticker.End(), generation_time);
+ set_error.cancel();
+ return ZX_OK;
+ }
+
+ return ZX_ERR_BAD_STATE;
+}
+
+void VnodeBlob::ConsiderCompressionAbort() {
+ ZX_DEBUG_ASSERT(write_info_->compressor.Compressing());
+ if (inode_.blob_size - kCompressionMinBytesSaved < write_info_->compressor.Size()) {
+ write_info_->compressor.Reset();
+ write_info_->compressed_blob.Reset();
+ }
+}
+
+zx_status_t VnodeBlob::GetReadableEvent(zx_handle_t* out) {
+ TRACE_DURATION("blobfs", "Blobfs::GetReadableEvent");
+ zx_status_t status;
+ // This is the first 'wait until read event' request received.
+ if (!readable_event_.is_valid()) {
+ status = zx::event::create(0, &readable_event_);
+ if (status != ZX_OK) {
+ return status;
+ } else if (GetState() == kBlobStateReadable) {
+ readable_event_.signal(0u, ZX_USER_SIGNAL_0);
+ }
+ }
+ status = zx_handle_duplicate(readable_event_.get(), ZX_RIGHTS_BASIC, out);
+ if (status != ZX_OK) {
+ return status;
+ }
+ return sizeof(zx_handle_t);
+}
+
+zx_status_t VnodeBlob::CloneVmo(zx_rights_t rights, zx_handle_t* out) {
+ TRACE_DURATION("blobfs", "Blobfs::CloneVmo", "rights", rights, "out", out);
+ if (GetState() != kBlobStateReadable) {
+ return ZX_ERR_BAD_STATE;
+ }
+ if (inode_.blob_size == 0) {
+ return ZX_ERR_BAD_STATE;
+ }
+ zx_status_t status = InitVmos();
+ if (status != ZX_OK) {
+ return status;
+ }
+
+ // TODO(smklein): Only clone / verify the part of the vmo that
+ // was requested.
+ const size_t merkle_bytes = MerkleTreeBlocks(inode_) * kBlobfsBlockSize;
+ zx::vmo clone;
+ if ((status = mapping_.vmo().clone(ZX_VMO_CLONE_COPY_ON_WRITE, merkle_bytes, inode_.blob_size,
+ &clone)) != ZX_OK) {
+ return status;
+ }
+
+ // TODO(mdempsky): Push elsewhere.
+ if ((status = clone.replace_as_executable(zx::handle(), &clone)) != ZX_OK) {
+ return status;
+ }
+
+ if ((status = clone.replace(rights, &clone)) != ZX_OK) {
+ return status;
+ }
+ *out = clone.release();
+
+ if (clone_watcher_.object() == ZX_HANDLE_INVALID) {
+ clone_watcher_.set_object(mapping_.vmo().get());
+ clone_watcher_.set_trigger(ZX_VMO_ZERO_CHILDREN);
+
+ // Keep a reference to "this" alive, preventing the blob
+ // from being closed while someone may still be using the
+ // underlying memory.
+ //
+ // We'll release it when no client-held VMOs are in use.
+ clone_ref_ = fbl::RefPtr<VnodeBlob>(this);
+ clone_watcher_.Begin(blobfs_->dispatcher());
+ }
+
+ return ZX_OK;
+}
+
+void VnodeBlob::HandleNoClones(async_dispatcher_t* dispatcher, async::WaitBase* wait,
+ zx_status_t status, const zx_packet_signal_t* signal) {
+ ZX_DEBUG_ASSERT(status == ZX_OK);
+ ZX_DEBUG_ASSERT((signal->observed & ZX_VMO_ZERO_CHILDREN) != 0);
+ ZX_DEBUG_ASSERT(clone_watcher_.object() != ZX_HANDLE_INVALID);
+ clone_watcher_.set_object(ZX_HANDLE_INVALID);
+ clone_ref_ = nullptr;
+}
+
+zx_status_t VnodeBlob::ReadInternal(void* data, size_t len, size_t off, size_t* actual) {
+ TRACE_DURATION("blobfs", "Blobfs::ReadInternal", "len", len, "off", off);
+
+ if (GetState() != kBlobStateReadable) {
+ return ZX_ERR_BAD_STATE;
+ }
+
+ if (inode_.blob_size == 0) {
+ *actual = 0;
+ return ZX_OK;
+ }
+
+ zx_status_t status = InitVmos();
+ if (status != ZX_OK) {
+ return status;
+ }
+
+ Digest d;
+ d = reinterpret_cast<const uint8_t*>(&digest_[0]);
+
+ if (off >= inode_.blob_size) {
+ *actual = 0;
+ return ZX_OK;
+ }
+ if (len > (inode_.blob_size - off)) {
+ len = inode_.blob_size - off;
+ }
+
+ const size_t merkle_bytes = MerkleTreeBlocks(inode_) * kBlobfsBlockSize;
+ status = mapping_.vmo().read(data, merkle_bytes + off, len);
+ if (status == ZX_OK) {
+ *actual = len;
+ }
+ return status;
+}
+
+zx_status_t VnodeBlob::QueueUnlink() {
+ flags_ |= kBlobFlagDeletable;
+ // Attempt to purge in case the blob has been unlinked with no open fds
+ return TryPurge();
+}
+
+zx_status_t VnodeBlob::VerifyBlob(Blobfs* bs, uint32_t node_index) {
+ Inode* inode = bs->GetNode(node_index);
+ Digest digest(inode->merkle_root_hash);
+ fbl::AllocChecker ac;
+ fbl::RefPtr<VnodeBlob> vn = fbl::AdoptRef(new (&ac) VnodeBlob(bs, digest));
+
+ if (!ac.check()) {
+ return ZX_ERR_NO_MEMORY;
+ }
+
+ vn->PopulateInode(node_index);
+
+ // Set blob state to "Purged" so we do not try to add it to the cached map on recycle.
+ vn->SetState(kBlobStatePurged);
+
+ // If we are unable to read in the blob from disk, this should also be a VerifyBlob error.
+ // Since InitVmos calls Verify as its final step, we can just return its result here.
+ return vn->InitVmos();
+}
+
void VnodeBlob::fbl_recycle() {
if (GetState() != kBlobStatePurged && !IsDirectory()) {
// Relocate blobs which haven't been deleted to the closed cache.
@@ -88,8 +848,7 @@
return ReadInternal(data, len, off, out_actual);
}
-zx_status_t VnodeBlob::Write(const void* data, size_t len, size_t offset,
- size_t* out_actual) {
+zx_status_t VnodeBlob::Write(const void* data, size_t len, size_t offset, size_t* out_actual) {
TRACE_DURATION("blobfs", "VnodeBlob::Write", "len", len, "off", offset);
if (IsDirectory()) {
return ZX_ERR_NOT_FILE;
@@ -97,8 +856,7 @@
return WriteInternal(data, len, out_actual);
}
-zx_status_t VnodeBlob::Append(const void* data, size_t len, size_t* out_end,
- size_t* out_actual) {
+zx_status_t VnodeBlob::Append(const void* data, size_t len, size_t* out_end, size_t* out_actual) {
zx_status_t status = WriteInternal(data, len, out_actual);
if (GetState() == kBlobStateDataWrite) {
ZX_DEBUG_ASSERT(write_info_ != nullptr);