[blobfs] Split headers, classes into multiple files.
Primarily focused on writeback and journaling. This is the first step
for distentangling dependencies (or at minimum, making them explicit).
Change-Id: I83cf26cbf03ab8061b8019b2eedfd338bf9a624f
diff --git a/zircon/system/ulib/blobfs/BUILD.gn b/zircon/system/ulib/blobfs/BUILD.gn
index 67ee748..6f0d941 100644
--- a/zircon/system/ulib/blobfs/BUILD.gn
+++ b/zircon/system/ulib/blobfs/BUILD.gn
@@ -52,13 +52,18 @@
"blob-cache.cpp",
"blob.cpp",
"blobfs.cpp",
+ "buffer.cpp",
"cache-node.cpp",
"compression/blob-compressor.cpp",
"directory.cpp",
"iterator/node-populator.cpp",
"journal.cpp",
+ "journal-entry.cpp",
"metrics.cpp",
+ "write-txn.cpp",
"writeback.cpp",
+ "writeback-queue.cpp",
+ "writeback-work.cpp",
]
public_deps += [
"$zx/system/ulib/block-client:headers",
diff --git a/zircon/system/ulib/blobfs/buffer.cpp b/zircon/system/ulib/blobfs/buffer.cpp
index 7b21695..dcb4ec9 100644
--- a/zircon/system/ulib/blobfs/buffer.cpp
+++ b/zircon/system/ulib/blobfs/buffer.cpp
@@ -2,156 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include <blobfs/writeback.h>
+#include <blobfs/buffer.h>
+#include <blobfs/writeback-work.h>
#include <utility>
namespace blobfs {
-WriteTxn::~WriteTxn() {
- ZX_DEBUG_ASSERT_MSG(requests_.is_empty(), "WriteTxn still has pending requests");
-}
-
-void WriteTxn::Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- ZX_DEBUG_ASSERT(vmo.is_valid());
- ZX_DEBUG_ASSERT(!IsBuffered());
-
- for (auto& request : requests_) {
- if (request.vmo != vmo.get()) {
- continue;
- }
-
- if (request.vmo_offset == relative_block) {
- // Take the longer of the operations (if operating on the same blocks).
- if (nblocks > request.length) {
- block_count_ += (nblocks - request.length);
- request.length = nblocks;
- }
- return;
- } else if ((request.vmo_offset + request.length == relative_block) &&
- (request.dev_offset + request.length == absolute_block)) {
- // Combine with the previous request, if immediately following.
- request.length += nblocks;
- block_count_ += nblocks;
- return;
- }
- }
-
- WriteRequest request;
- request.vmo = vmo.get();
- request.vmo_offset = relative_block;
- request.dev_offset = absolute_block;
- request.length = nblocks;
- requests_.push_back(std::move(request));
- block_count_ += request.length;
-}
-
-size_t WriteTxn::BlkStart() const {
- ZX_DEBUG_ASSERT(IsBuffered());
- ZX_DEBUG_ASSERT(requests_.size() > 0);
- return requests_[0].vmo_offset;
-}
-
-size_t WriteTxn::BlkCount() const {
- return block_count_;
-}
-
-void WriteTxn::SetBuffer(vmoid_t vmoid) {
- ZX_DEBUG_ASSERT(vmoid_ == VMOID_INVALID || vmoid_ == vmoid);
- ZX_DEBUG_ASSERT(vmoid != VMOID_INVALID);
- vmoid_ = vmoid;
-}
-
-zx_status_t WriteTxn::Flush() {
- ZX_ASSERT(IsBuffered());
- fs::Ticker ticker(transaction_manager_->LocalMetrics().Collecting());
-
- // Update all the outgoing transactions to be in disk blocks
- block_fifo_request_t blk_reqs[requests_.size()];
- const uint32_t kDiskBlocksPerBlobfsBlock =
- kBlobfsBlockSize / transaction_manager_->DeviceBlockSize();
- for (size_t i = 0; i < requests_.size(); i++) {
- blk_reqs[i].group = transaction_manager_->BlockGroupID();
- blk_reqs[i].vmoid = vmoid_;
- blk_reqs[i].opcode = BLOCKIO_WRITE;
- blk_reqs[i].vmo_offset = requests_[i].vmo_offset * kDiskBlocksPerBlobfsBlock;
- blk_reqs[i].dev_offset = requests_[i].dev_offset * kDiskBlocksPerBlobfsBlock;
- uint64_t length = requests_[i].length * kDiskBlocksPerBlobfsBlock;
- // TODO(ZX-2253): Requests this long, although unlikely, should be
- // handled more gracefully.
- ZX_ASSERT_MSG(length < UINT32_MAX, "Request size too large");
- blk_reqs[i].length = static_cast<uint32_t>(length);
- }
-
- // Actually send the operations to the underlying block device.
- zx_status_t status = transaction_manager_->Transaction(blk_reqs, requests_.size());
-
- if (transaction_manager_->LocalMetrics().Collecting()) {
- uint64_t sum = 0;
- for (const auto& blk_req : blk_reqs) {
- sum += blk_req.length * kBlobfsBlockSize;
- }
- transaction_manager_->LocalMetrics().UpdateWriteback(sum, ticker.End());
- }
-
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- block_count_ = 0;
- return status;
-}
-
-void WritebackWork::MarkCompleted(zx_status_t status) {
- WriteTxn::Reset();
- if (sync_cb_) {
- sync_cb_(status);
- }
- sync_cb_ = nullptr;
- ready_cb_ = nullptr;
-}
-
-bool WritebackWork::IsReady() {
- if (ready_cb_) {
- if (ready_cb_()) {
- ready_cb_ = nullptr;
- return true;
- }
-
- return false;
- }
-
- return true;
-}
-
-void WritebackWork::SetReadyCallback(ReadyCallback callback) {
- ZX_DEBUG_ASSERT(!ready_cb_);
- ready_cb_ = std::move(callback);
-}
-
-void WritebackWork::SetSyncCallback(SyncCallback callback) {
- if (sync_cb_) {
- // This "callback chain" allows multiple clients to observe the completion
- // of the WritebackWork. This is akin to a promise "and-then" relationship.
- sync_cb_ = [previous_callback = std::move(sync_cb_),
- next_callback = std::move(callback)] (zx_status_t status) {
- next_callback(status);
- previous_callback(status);
- };
- } else {
- sync_cb_ = std::move(callback);
- }
-}
-
-// Returns the number of blocks of the writeback buffer that have been consumed
-zx_status_t WritebackWork::Complete() {
- zx_status_t status = Flush();
- MarkCompleted(status);
- return status;
-}
-
-WritebackWork::WritebackWork(TransactionManager* transaction_manager)
- : WriteTxn(transaction_manager), ready_cb_(nullptr), sync_cb_(nullptr) {}
-
Buffer::~Buffer() {
if (vmoid_ != VMOID_INVALID) {
// Close the buffer vmo.
@@ -324,224 +181,4 @@
length_ -= blocks;
}
-WritebackQueue::~WritebackQueue() {
- // Ensure that thread teardown has completed, or that it was never brought up to begin with.
- ZX_DEBUG_ASSERT(!IsRunning());
- ZX_DEBUG_ASSERT(work_queue_.is_empty());
-}
-
-zx_status_t WritebackQueue::Teardown() {
- WritebackState state;
-
- {
- fbl::AutoLock lock(&lock_);
- state = state_;
-
- // Signal the background thread.
- unmounting_ = true;
- cnd_signal(&work_added_);
- }
-
- zx_status_t status = ZX_OK;
- if (state != WritebackState::kInit) {
- // Block until the thread completes itself.
- int result = -1;
- int success = thrd_join(worker_, &result);
- if (result != 0 || success != thrd_success) {
- status = ZX_ERR_INTERNAL;
- }
- }
-
- return status;
-}
-
-zx_status_t WritebackQueue::Create(TransactionManager* transaction_manager,
- const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out) {
- zx_status_t status;
- fbl::unique_ptr<Buffer> buffer;
- if ((status = Buffer::Create(transaction_manager, buffer_blocks, "blobfs-writeback", &buffer))
- != ZX_OK) {
- return status;
- }
-
- fbl::unique_ptr<WritebackQueue> wb(new WritebackQueue(std::move(buffer)));
-
- if (cnd_init(&wb->work_completed_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
- if (cnd_init(&wb->work_added_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- if (thrd_create_with_name(&wb->worker_,
- WritebackQueue::WritebackThread, wb.get(),
- "blobfs-writeback") != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- fbl::AutoLock lock(&wb->lock_);
- wb->state_ = WritebackState::kRunning;
- *out = std::move(wb);
- return ZX_OK;
-}
-
-zx_status_t WritebackQueue::Enqueue(fbl::unique_ptr<WritebackWork> work) {
- TRACE_DURATION("blobfs", "WritebackQueue::Enqueue", "work ptr", work.get());
- fbl::AutoLock lock(&lock_);
- zx_status_t status = ZX_OK;
-
- if (IsReadOnly()) {
- // If we are in a readonly state, return an error. However, the work should still be
- // enqueued and ultimately processed by the WritebackThread. This will help us avoid
- // potential race conditions if the work callback must acquire a lock.
- status = ZX_ERR_BAD_STATE;
- } else if (!work->IsBuffered()) {
- ZX_DEBUG_ASSERT(state_ == WritebackState::kRunning);
-
- // Only copy blocks to the buffer if they have not already been copied to another buffer.
- EnsureSpaceLocked(work->BlkCount());
-
- // It is possible that the queue entered a read only state
- // while we were waiting to ensure space, so check again now.
- if (IsReadOnly()) {
- status = ZX_ERR_BAD_STATE;
- } else {
- buffer_->CopyTransaction(work.get());
- }
- }
-
- work_queue_.push(std::move(work));
- cnd_signal(&work_added_);
- return status;
-}
-
-bool WritebackQueue::IsRunning() const {
- switch (state_) {
- case WritebackState::kRunning:
- case WritebackState::kReadOnly:
- return true;
- default:
- return false;
- }
-}
-
-void WritebackQueue::EnsureSpaceLocked(size_t blocks) {
- while (!buffer_->IsSpaceAvailable(blocks)) {
- // Not enough room to write back work, yet. Wait until room is available.
- Waiter w;
- producer_queue_.push(&w);
-
- do {
- cnd_wait(&work_completed_, lock_.GetInternal());
- } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
- (!buffer_->IsSpaceAvailable(blocks))); // ... and there is enough space for us.
-
- producer_queue_.pop();
- }
-}
-
-int WritebackQueue::WritebackThread(void* arg) {
- WritebackQueue* b = reinterpret_cast<WritebackQueue*>(arg);
-
- b->lock_.Acquire();
- while (true) {
- bool error = b->IsReadOnly();
- while (!b->work_queue_.is_empty()) {
- if (!error && !b->work_queue_.front().IsReady()) {
- // If the work is not yet ready, break and wait until we receive another signal.
- break;
- }
-
- auto work = b->work_queue_.pop();
- TRACE_DURATION("blobfs", "WritebackQueue::WritebackThread", "work ptr", work.get());
-
- bool our_buffer = b->buffer_->VerifyTransaction(work.get());
- size_t blk_count = work->BlkCount();
-
- // Stay unlocked while processing a unit of work.
- b->lock_.Release();
-
- if (error) {
- // If we are in a read only state, mark the work complete with an error status.
- work->MarkCompleted(ZX_ERR_BAD_STATE);
- } else {
- // If we should complete the work, make sure it has been buffered.
- // (This is not necessary if we are currently in an error state).
- ZX_DEBUG_ASSERT(work->IsBuffered());
- zx_status_t status;
- if ((status = work->Complete()) != ZX_OK) {
- FS_TRACE_ERROR("Work failed with status %d - "
- "converting writeback to read only state.\n", status);
- // If work completion failed, set the buffer to an error state.
- error = true;
- }
- }
-
- work = nullptr;
- b->lock_.Acquire();
-
- if (error) {
- // If we encountered an error, set the queue to readonly.
- b->state_ = WritebackState::kReadOnly;
- }
-
- if (our_buffer) {
- // If the last work we processed belonged to our buffer,
- // update the buffer's start/len accordingly.
- b->buffer_->FreeSpace(blk_count);
- }
-
- // We may have opened up space (or entered a read only state),
- // so signal the producer queue.
- cnd_signal(&b->work_completed_);
- }
-
- // Before waiting, we should check if we're unmounting.
- // If work still remains in the work or producer queues,
- // continue the loop until they are empty.
- if (b->unmounting_ && b->work_queue_.is_empty() && b->producer_queue_.is_empty()) {
- ZX_DEBUG_ASSERT(b->work_queue_.is_empty());
- ZX_DEBUG_ASSERT(b->producer_queue_.is_empty());
- b->state_ = WritebackState::kComplete;
- b->lock_.Release();
- return 0;
- }
-
- cnd_wait(&b->work_added_, b->lock_.GetInternal());
- }
-}
-
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- const size_t kMaxChunkBlocks = (3 * transaction_manager->WritebackCapacity()) / 4;
- uint64_t delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- while (nblocks > 0) {
- if ((*work)->BlkCount() + delta_blocks > kMaxChunkBlocks) {
- // If enqueueing these blocks could push us past the writeback buffer capacity
- // when combined with all previous writes, break this transaction into a smaller
- // chunk first.
- fbl::unique_ptr<WritebackWork> tmp;
- zx_status_t status = transaction_manager->CreateWork(&tmp, vn);
- if (status != ZX_OK) {
- return status;
- }
- if ((status = transaction_manager->EnqueueWork(std::move(*work),
- EnqueueType::kData)) != ZX_OK) {
- return status;
- }
- *work = std::move(tmp);
- }
-
- (*work)->Enqueue(vmo, relative_block, absolute_block, delta_blocks);
- relative_block += delta_blocks;
- absolute_block += delta_blocks;
- nblocks -= delta_blocks;
- delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- }
- return ZX_OK;
-}
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/include/blobfs/buffer.h b/zircon/system/ulib/blobfs/include/blobfs/buffer.h
index a51e8d6..41818c5 100644
--- a/zircon/system/ulib/blobfs/include/blobfs/buffer.h
+++ b/zircon/system/ulib/blobfs/include/blobfs/buffer.h
@@ -10,157 +10,12 @@
#include <utility>
-#include <blobfs/allocator.h>
-#include <blobfs/format.h>
-#include <blobfs/metrics.h>
#include <blobfs/transaction-manager.h>
-#include <fbl/algorithm.h>
-#include <fbl/auto_lock.h>
-#include <fbl/intrusive_hash_table.h>
-#include <fbl/intrusive_single_list.h>
-#include <fbl/macros.h>
-#include <fbl/mutex.h>
-#include <fbl/ref_ptr.h>
-#include <fbl/unique_ptr.h>
-#include <fbl/vector.h>
-#include <fs/block-txn.h>
-#include <fs/queue.h>
-#include <fs/vfs.h>
-#include <fs/vnode.h>
-#include <lib/sync/completion.h>
+#include <blobfs/write-txn.h>
#include <lib/fzl/owned-vmo-mapper.h>
-#include <lib/zx/vmo.h>
namespace blobfs {
-struct WriteRequest {
- zx_handle_t vmo;
- size_t vmo_offset;
- size_t dev_offset;
- size_t length;
-};
-
-enum class WritebackState {
- kInit, // Initial state of a writeback queue.
- kReady, // Indicates the queue is ready to start running.
- kRunning, // Indicates that the queue's async processor is currently running.
- kReadOnly, // State of a writeback queue which no longer allows writes.
- kComplete, // Indicates that the async processor has been torn down.
-};
-
-// A transaction consisting of enqueued VMOs to be written
-// out to disk at specified locations.
-class WriteTxn {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WriteTxn);
-
- explicit WriteTxn(TransactionManager* transaction_manager)
- : transaction_manager_(transaction_manager), vmoid_(VMOID_INVALID), block_count_(0) {}
-
- virtual ~WriteTxn();
-
- // Identifies that |nblocks| blocks of data starting at |relative_block| within the |vmo|
- // should be written out to |absolute_block| on disk at a later point in time.
- void Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
- fbl::Vector<WriteRequest>& Requests() { return requests_; }
-
- // Returns the first block at which this WriteTxn exists within its VMO buffer.
- // Requires all requests within the transaction to have been copied to a single buffer.
- size_t BlkStart() const;
-
- // Returns the total number of blocks in all requests within the WriteTxn. This number is
- // calculated at call time, unless the WriteTxn has already been fully buffered, at which point
- // the final |block_count_| is set. This is then returned for all subsequent calls to BlkCount.
- size_t BlkCount() const;
-
- bool IsBuffered() const {
- return vmoid_ != VMOID_INVALID;
- }
-
- // Sets the source buffer for the WriteTxn to |vmoid|.
- void SetBuffer(vmoid_t vmoid);
-
- // Checks if the WriteTxn vmoid_ matches |vmoid|.
- bool CheckBuffer(vmoid_t vmoid) const {
- return vmoid_ == vmoid;
- }
-
- // Resets the transaction's state.
- void Reset() {
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- }
-
-protected:
- // Activates the transaction.
- zx_status_t Flush();
-
-private:
- TransactionManager* transaction_manager_;
- vmoid_t vmoid_;
- fbl::Vector<WriteRequest> requests_;
- size_t block_count_;
-};
-
-// A wrapper around a WriteTxn with added support for callback invocation on completion.
-class WritebackWork : public WriteTxn,
- public fbl::SinglyLinkedListable<fbl::unique_ptr<WritebackWork>> {
-public:
- using ReadyCallback = fbl::Function<bool()>;
- using SyncCallback = fs::Vnode::SyncCallback;
-
- WritebackWork(TransactionManager* transaction_manager);
- virtual ~WritebackWork() = default;
-
- // Sets the WritebackWork to a completed state. |status| should indicate whether the work was
- // completed successfully.
- void MarkCompleted(zx_status_t status);
-
- // Returns true if the WritebackWork is "ready" to be processed. This is always true unless a
- // "ready callback" exists, in which case that callback determines the state of readiness. Once
- // a positive response is received, the ready callback is destroyed - the WritebackWork will
- // always be ready from this point forward.
- bool IsReady();
-
- // Adds a callback to the WritebackWork to be called before the WritebackWork is completed,
- // to ensure that it's ready for writeback.
- //
- // Only one ready callback may be set for each WritebackWork unit.
- void SetReadyCallback(ReadyCallback callback);
-
- // Adds a callback to the WritebackWork, such that it will be signalled when the WritebackWork
- // is flushed to disk. If no callback is set, nothing will get signalled.
- //
- // Multiple callbacks may be set. They are invoked in "first-in, last-out" order (i.e.,
- // enqueueing A, B, C will invoke C, B, A).
- void SetSyncCallback(SyncCallback callback);
-
- // Persists the enqueued work to disk,
- // and resets the WritebackWork to its initial state.
- zx_status_t Complete();
-
-private:
- // Optional callbacks.
- ReadyCallback ready_cb_; // Call to check whether work is ready to be processed.
- SyncCallback sync_cb_; // Call after work has been completely flushed.
-};
-
-// An object compatible with the WritebackWork interface, which contains a single blob reference.
-// When the writeback is completed, this reference will go out of scope.
-//
-// This class helps WritebackWork avoid concurrent writes and reads to blobs: if a BlobWork
-// is alive, the impacted Blob is still alive.
-class BlobWork : public WritebackWork {
-public:
- BlobWork(TransactionManager* transaction_manager, fbl::RefPtr<Blob> vnode)
- : WritebackWork(transaction_manager), vnode_(std::move(vnode)) {}
-
-private:
- fbl::RefPtr<Blob> vnode_;
-};
-
// In-memory data buffer.
// This class is thread-compatible.
class Buffer {
@@ -171,7 +26,7 @@
// Initializes the buffer VMO with |blocks| blocks of size kBlobfsBlockSize.
static zx_status_t Create(TransactionManager* transaction_manager, const size_t blocks,
- const char* label, fbl::unique_ptr<Buffer>* out);
+ const char* label, std::unique_ptr<Buffer>* out);
// Adds a transaction to |txn| which reads all data into buffer
// starting from |disk_start| on disk.
@@ -239,91 +94,4 @@
const size_t capacity_;
};
-// Manages an in-memory writeback buffer (and background thread,
-// which flushes this buffer out to disk).
-class WritebackQueue {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WritebackQueue);
-
- ~WritebackQueue();
-
- // Initializes the WritebackBuffer at |out|
- // with a buffer of |buffer_blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out);
-
- // Copies all transaction data referenced from |work| into the writeback buffer.
- zx_status_t Enqueue(fbl::unique_ptr<WritebackWork> work);
-
- bool IsReadOnly() const __TA_REQUIRES(lock_) { return state_ == WritebackState::kReadOnly; }
-
- size_t GetCapacity() const { return buffer_->capacity(); }
-
- // Stops the asynchronous queue processor.
- zx_status_t Teardown();
-private:
- // The waiter struct may be used as a stack-allocated queue for producers.
- // It allows them to take turns putting data into the buffer when it is
- // mostly full.
- struct Waiter : public fbl::SinglyLinkedListable<Waiter*> {};
- using ProducerQueue = fs::Queue<Waiter*>;
- using WorkQueue = fs::Queue<fbl::unique_ptr<WritebackWork>>;
-
- WritebackQueue(fbl::unique_ptr<Buffer> buffer) : buffer_(std::move(buffer)) {}
-
- bool IsRunning() const __TA_REQUIRES(lock_);
-
- // Blocks until |blocks| blocks of data are free for the caller.
- // Doesn't actually allocate any space.
- void EnsureSpaceLocked(size_t blocks) __TA_REQUIRES(lock_);
-
- // Thread which asynchronously processes transactions.
- static int WritebackThread(void* arg);
-
- // Signalled when the writeback buffer has space to add txns.
- cnd_t work_completed_;
- // Signalled when the writeback buffer can be consumed by the background thread.
- cnd_t work_added_;
-
- // Work associated with the "writeback" thread, which manages work items,
- // and flushes them to disk. This thread acts as a consumer of the
- // writeback buffer.
- thrd_t worker_;
-
- // Use to lock resources that may be accessed asynchronously.
- fbl::Mutex lock_;
-
- // Buffer which stores transactions to be written out to disk.
- fbl::unique_ptr<Buffer> buffer_;
-
- bool unmounting_ __TA_GUARDED(lock_) = false;
-
- // The WritebackQueue will start off in a kInit state, and will change to kRunning when the
- // background thread is brought up. Once it is running, if an error is detected during
- // writeback, the queue is converted to kReadOnly, and no further writes are permitted.
- WritebackState state_ __TA_GUARDED(lock_) = WritebackState::kInit;
-
- // Tracks all the pending Writeback Work operations which exist in the
- // writeback buffer and are ready to be sent to disk.
- WorkQueue work_queue_ __TA_GUARDED(lock_){};
-
- // Ensures that if multiple producers are waiting for space to write their
- // transactions into the writeback buffer, they can each write in-order.
- ProducerQueue producer_queue_ __TA_GUARDED(lock_);
-};
-
-// A wrapper around "Enqueue" for content which risks being larger
-// than the writeback buffer.
-//
-// For content which is smaller than 3/4 the size of the writeback buffer: the
-// content is enqueued to |work| without flushing.
-//
-// For content which is larger than 3/4 the size of the writeback buffer: flush
-// the data by enqueueing it to the writeback thread in chunks until the
-// remainder is small enough to comfortably fit within the writeback buffer.
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/include/blobfs/journal.h b/zircon/system/ulib/blobfs/include/blobfs/journal.h
index 021fe6e..9a68613 100644
--- a/zircon/system/ulib/blobfs/include/blobfs/journal.h
+++ b/zircon/system/ulib/blobfs/include/blobfs/journal.h
@@ -13,10 +13,10 @@
#include <stdint.h>
-#include <blobfs/blob.h>
#include <blobfs/format.h>
#include <blobfs/transaction-manager.h>
#include <blobfs/writeback.h>
+#include <blobfs/writeback-queue.h>
#include <fbl/intrusive_single_list.h>
#include <fbl/mutex.h>
#include <fbl/unique_ptr.h>
diff --git a/zircon/system/ulib/blobfs/include/blobfs/write-txn.h b/zircon/system/ulib/blobfs/include/blobfs/write-txn.h
index a51e8d6..de59a10 100644
--- a/zircon/system/ulib/blobfs/include/blobfs/write-txn.h
+++ b/zircon/system/ulib/blobfs/include/blobfs/write-txn.h
@@ -10,25 +10,8 @@
#include <utility>
-#include <blobfs/allocator.h>
-#include <blobfs/format.h>
-#include <blobfs/metrics.h>
#include <blobfs/transaction-manager.h>
-#include <fbl/algorithm.h>
-#include <fbl/auto_lock.h>
-#include <fbl/intrusive_hash_table.h>
-#include <fbl/intrusive_single_list.h>
-#include <fbl/macros.h>
-#include <fbl/mutex.h>
-#include <fbl/ref_ptr.h>
-#include <fbl/unique_ptr.h>
#include <fbl/vector.h>
-#include <fs/block-txn.h>
-#include <fs/queue.h>
-#include <fs/vfs.h>
-#include <fs/vnode.h>
-#include <lib/sync/completion.h>
-#include <lib/fzl/owned-vmo-mapper.h>
#include <lib/zx/vmo.h>
namespace blobfs {
@@ -40,14 +23,6 @@
size_t length;
};
-enum class WritebackState {
- kInit, // Initial state of a writeback queue.
- kReady, // Indicates the queue is ready to start running.
- kRunning, // Indicates that the queue's async processor is currently running.
- kReadOnly, // State of a writeback queue which no longer allows writes.
- kComplete, // Indicates that the async processor has been torn down.
-};
-
// A transaction consisting of enqueued VMOs to be written
// out to disk at specified locations.
class WriteTxn {
@@ -104,226 +79,4 @@
size_t block_count_;
};
-// A wrapper around a WriteTxn with added support for callback invocation on completion.
-class WritebackWork : public WriteTxn,
- public fbl::SinglyLinkedListable<fbl::unique_ptr<WritebackWork>> {
-public:
- using ReadyCallback = fbl::Function<bool()>;
- using SyncCallback = fs::Vnode::SyncCallback;
-
- WritebackWork(TransactionManager* transaction_manager);
- virtual ~WritebackWork() = default;
-
- // Sets the WritebackWork to a completed state. |status| should indicate whether the work was
- // completed successfully.
- void MarkCompleted(zx_status_t status);
-
- // Returns true if the WritebackWork is "ready" to be processed. This is always true unless a
- // "ready callback" exists, in which case that callback determines the state of readiness. Once
- // a positive response is received, the ready callback is destroyed - the WritebackWork will
- // always be ready from this point forward.
- bool IsReady();
-
- // Adds a callback to the WritebackWork to be called before the WritebackWork is completed,
- // to ensure that it's ready for writeback.
- //
- // Only one ready callback may be set for each WritebackWork unit.
- void SetReadyCallback(ReadyCallback callback);
-
- // Adds a callback to the WritebackWork, such that it will be signalled when the WritebackWork
- // is flushed to disk. If no callback is set, nothing will get signalled.
- //
- // Multiple callbacks may be set. They are invoked in "first-in, last-out" order (i.e.,
- // enqueueing A, B, C will invoke C, B, A).
- void SetSyncCallback(SyncCallback callback);
-
- // Persists the enqueued work to disk,
- // and resets the WritebackWork to its initial state.
- zx_status_t Complete();
-
-private:
- // Optional callbacks.
- ReadyCallback ready_cb_; // Call to check whether work is ready to be processed.
- SyncCallback sync_cb_; // Call after work has been completely flushed.
-};
-
-// An object compatible with the WritebackWork interface, which contains a single blob reference.
-// When the writeback is completed, this reference will go out of scope.
-//
-// This class helps WritebackWork avoid concurrent writes and reads to blobs: if a BlobWork
-// is alive, the impacted Blob is still alive.
-class BlobWork : public WritebackWork {
-public:
- BlobWork(TransactionManager* transaction_manager, fbl::RefPtr<Blob> vnode)
- : WritebackWork(transaction_manager), vnode_(std::move(vnode)) {}
-
-private:
- fbl::RefPtr<Blob> vnode_;
-};
-
-// In-memory data buffer.
-// This class is thread-compatible.
-class Buffer {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(Buffer);
-
- ~Buffer();
-
- // Initializes the buffer VMO with |blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t blocks,
- const char* label, fbl::unique_ptr<Buffer>* out);
-
- // Adds a transaction to |txn| which reads all data into buffer
- // starting from |disk_start| on disk.
- void Load(fs::ReadTxn* txn, size_t disk_start) {
- txn->Enqueue(vmoid_, 0, disk_start, capacity_);
- }
-
- // Returns true if there is space available for |blocks| blocks within the buffer.
- bool IsSpaceAvailable(size_t blocks) const;
-
- // Copies a write transaction to the buffer.
- // Also updates the in-memory offsets of the WriteTxn's requests so they point
- // to the correct offsets in the in-memory buffer instead of their original VMOs.
- //
- // |IsSpaceAvailable| should be called before invoking this function to
- // safely guarantee that space exists within the buffer.
- void CopyTransaction(WriteTxn* txn);
-
- // Adds a transaction to |work| with buffer offset |start| and length |length|,
- // starting at block |disk_start| on disk.
- void AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work);
-
- // Returns true if |txn| belongs to this buffer, and if so verifies
- // that it owns the next valid set of blocks within the buffer.
- bool VerifyTransaction(WriteTxn* txn) const;
-
- // Given a transaction |txn|, verifies that all requests belong to this buffer
- // and then sets the transaction's buffer accordingly (if it is not already set).
- void ValidateTransaction(WriteTxn* txn);
-
- // Frees the first |blocks| blocks in the buffer.
- void FreeSpace(size_t blocks);
-
- // Frees all space within the buffer.
- void FreeAllSpace() {
- FreeSpace(length_);
- }
-
- size_t start() const { return start_; }
- size_t length() const { return length_; }
- size_t capacity() const { return capacity_; }
-
- // Reserves the next index in the buffer.
- size_t ReserveIndex() {
- return (start_ + length_++) % capacity_;
- }
-
- // Returns data starting at block |index| in the buffer.
- void* MutableData(size_t index) {
- ZX_DEBUG_ASSERT(index < capacity_);
- return reinterpret_cast<char*>(mapper_.start()) + (index * kBlobfsBlockSize);
- }
-private:
- Buffer(TransactionManager* transaction_manager, fzl::OwnedVmoMapper mapper)
- : transaction_manager_(transaction_manager), mapper_(std::move(mapper)), start_(0),
- length_(0), capacity_(mapper_.size() / kBlobfsBlockSize) {}
-
- TransactionManager* transaction_manager_;
- fzl::OwnedVmoMapper mapper_;
- vmoid_t vmoid_ = VMOID_INVALID;
-
- // The units of all the following are "Blobfs blocks".
- size_t start_ = 0;
- size_t length_ = 0;
- const size_t capacity_;
-};
-
-// Manages an in-memory writeback buffer (and background thread,
-// which flushes this buffer out to disk).
-class WritebackQueue {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WritebackQueue);
-
- ~WritebackQueue();
-
- // Initializes the WritebackBuffer at |out|
- // with a buffer of |buffer_blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out);
-
- // Copies all transaction data referenced from |work| into the writeback buffer.
- zx_status_t Enqueue(fbl::unique_ptr<WritebackWork> work);
-
- bool IsReadOnly() const __TA_REQUIRES(lock_) { return state_ == WritebackState::kReadOnly; }
-
- size_t GetCapacity() const { return buffer_->capacity(); }
-
- // Stops the asynchronous queue processor.
- zx_status_t Teardown();
-private:
- // The waiter struct may be used as a stack-allocated queue for producers.
- // It allows them to take turns putting data into the buffer when it is
- // mostly full.
- struct Waiter : public fbl::SinglyLinkedListable<Waiter*> {};
- using ProducerQueue = fs::Queue<Waiter*>;
- using WorkQueue = fs::Queue<fbl::unique_ptr<WritebackWork>>;
-
- WritebackQueue(fbl::unique_ptr<Buffer> buffer) : buffer_(std::move(buffer)) {}
-
- bool IsRunning() const __TA_REQUIRES(lock_);
-
- // Blocks until |blocks| blocks of data are free for the caller.
- // Doesn't actually allocate any space.
- void EnsureSpaceLocked(size_t blocks) __TA_REQUIRES(lock_);
-
- // Thread which asynchronously processes transactions.
- static int WritebackThread(void* arg);
-
- // Signalled when the writeback buffer has space to add txns.
- cnd_t work_completed_;
- // Signalled when the writeback buffer can be consumed by the background thread.
- cnd_t work_added_;
-
- // Work associated with the "writeback" thread, which manages work items,
- // and flushes them to disk. This thread acts as a consumer of the
- // writeback buffer.
- thrd_t worker_;
-
- // Use to lock resources that may be accessed asynchronously.
- fbl::Mutex lock_;
-
- // Buffer which stores transactions to be written out to disk.
- fbl::unique_ptr<Buffer> buffer_;
-
- bool unmounting_ __TA_GUARDED(lock_) = false;
-
- // The WritebackQueue will start off in a kInit state, and will change to kRunning when the
- // background thread is brought up. Once it is running, if an error is detected during
- // writeback, the queue is converted to kReadOnly, and no further writes are permitted.
- WritebackState state_ __TA_GUARDED(lock_) = WritebackState::kInit;
-
- // Tracks all the pending Writeback Work operations which exist in the
- // writeback buffer and are ready to be sent to disk.
- WorkQueue work_queue_ __TA_GUARDED(lock_){};
-
- // Ensures that if multiple producers are waiting for space to write their
- // transactions into the writeback buffer, they can each write in-order.
- ProducerQueue producer_queue_ __TA_GUARDED(lock_);
-};
-
-// A wrapper around "Enqueue" for content which risks being larger
-// than the writeback buffer.
-//
-// For content which is smaller than 3/4 the size of the writeback buffer: the
-// content is enqueued to |work| without flushing.
-//
-// For content which is larger than 3/4 the size of the writeback buffer: flush
-// the data by enqueueing it to the writeback thread in chunks until the
-// remainder is small enough to comfortably fit within the writeback buffer.
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/include/blobfs/writeback-queue.h b/zircon/system/ulib/blobfs/include/blobfs/writeback-queue.h
index a51e8d6..f022993 100644
--- a/zircon/system/ulib/blobfs/include/blobfs/writeback-queue.h
+++ b/zircon/system/ulib/blobfs/include/blobfs/writeback-queue.h
@@ -10,36 +10,15 @@
#include <utility>
-#include <blobfs/allocator.h>
-#include <blobfs/format.h>
-#include <blobfs/metrics.h>
+#include <blobfs/buffer.h>
#include <blobfs/transaction-manager.h>
-#include <fbl/algorithm.h>
-#include <fbl/auto_lock.h>
-#include <fbl/intrusive_hash_table.h>
#include <fbl/intrusive_single_list.h>
-#include <fbl/macros.h>
#include <fbl/mutex.h>
-#include <fbl/ref_ptr.h>
-#include <fbl/unique_ptr.h>
-#include <fbl/vector.h>
#include <fs/block-txn.h>
#include <fs/queue.h>
-#include <fs/vfs.h>
-#include <fs/vnode.h>
-#include <lib/sync/completion.h>
-#include <lib/fzl/owned-vmo-mapper.h>
-#include <lib/zx/vmo.h>
namespace blobfs {
-struct WriteRequest {
- zx_handle_t vmo;
- size_t vmo_offset;
- size_t dev_offset;
- size_t length;
-};
-
enum class WritebackState {
kInit, // Initial state of a writeback queue.
kReady, // Indicates the queue is ready to start running.
@@ -48,197 +27,6 @@
kComplete, // Indicates that the async processor has been torn down.
};
-// A transaction consisting of enqueued VMOs to be written
-// out to disk at specified locations.
-class WriteTxn {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WriteTxn);
-
- explicit WriteTxn(TransactionManager* transaction_manager)
- : transaction_manager_(transaction_manager), vmoid_(VMOID_INVALID), block_count_(0) {}
-
- virtual ~WriteTxn();
-
- // Identifies that |nblocks| blocks of data starting at |relative_block| within the |vmo|
- // should be written out to |absolute_block| on disk at a later point in time.
- void Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
- fbl::Vector<WriteRequest>& Requests() { return requests_; }
-
- // Returns the first block at which this WriteTxn exists within its VMO buffer.
- // Requires all requests within the transaction to have been copied to a single buffer.
- size_t BlkStart() const;
-
- // Returns the total number of blocks in all requests within the WriteTxn. This number is
- // calculated at call time, unless the WriteTxn has already been fully buffered, at which point
- // the final |block_count_| is set. This is then returned for all subsequent calls to BlkCount.
- size_t BlkCount() const;
-
- bool IsBuffered() const {
- return vmoid_ != VMOID_INVALID;
- }
-
- // Sets the source buffer for the WriteTxn to |vmoid|.
- void SetBuffer(vmoid_t vmoid);
-
- // Checks if the WriteTxn vmoid_ matches |vmoid|.
- bool CheckBuffer(vmoid_t vmoid) const {
- return vmoid_ == vmoid;
- }
-
- // Resets the transaction's state.
- void Reset() {
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- }
-
-protected:
- // Activates the transaction.
- zx_status_t Flush();
-
-private:
- TransactionManager* transaction_manager_;
- vmoid_t vmoid_;
- fbl::Vector<WriteRequest> requests_;
- size_t block_count_;
-};
-
-// A wrapper around a WriteTxn with added support for callback invocation on completion.
-class WritebackWork : public WriteTxn,
- public fbl::SinglyLinkedListable<fbl::unique_ptr<WritebackWork>> {
-public:
- using ReadyCallback = fbl::Function<bool()>;
- using SyncCallback = fs::Vnode::SyncCallback;
-
- WritebackWork(TransactionManager* transaction_manager);
- virtual ~WritebackWork() = default;
-
- // Sets the WritebackWork to a completed state. |status| should indicate whether the work was
- // completed successfully.
- void MarkCompleted(zx_status_t status);
-
- // Returns true if the WritebackWork is "ready" to be processed. This is always true unless a
- // "ready callback" exists, in which case that callback determines the state of readiness. Once
- // a positive response is received, the ready callback is destroyed - the WritebackWork will
- // always be ready from this point forward.
- bool IsReady();
-
- // Adds a callback to the WritebackWork to be called before the WritebackWork is completed,
- // to ensure that it's ready for writeback.
- //
- // Only one ready callback may be set for each WritebackWork unit.
- void SetReadyCallback(ReadyCallback callback);
-
- // Adds a callback to the WritebackWork, such that it will be signalled when the WritebackWork
- // is flushed to disk. If no callback is set, nothing will get signalled.
- //
- // Multiple callbacks may be set. They are invoked in "first-in, last-out" order (i.e.,
- // enqueueing A, B, C will invoke C, B, A).
- void SetSyncCallback(SyncCallback callback);
-
- // Persists the enqueued work to disk,
- // and resets the WritebackWork to its initial state.
- zx_status_t Complete();
-
-private:
- // Optional callbacks.
- ReadyCallback ready_cb_; // Call to check whether work is ready to be processed.
- SyncCallback sync_cb_; // Call after work has been completely flushed.
-};
-
-// An object compatible with the WritebackWork interface, which contains a single blob reference.
-// When the writeback is completed, this reference will go out of scope.
-//
-// This class helps WritebackWork avoid concurrent writes and reads to blobs: if a BlobWork
-// is alive, the impacted Blob is still alive.
-class BlobWork : public WritebackWork {
-public:
- BlobWork(TransactionManager* transaction_manager, fbl::RefPtr<Blob> vnode)
- : WritebackWork(transaction_manager), vnode_(std::move(vnode)) {}
-
-private:
- fbl::RefPtr<Blob> vnode_;
-};
-
-// In-memory data buffer.
-// This class is thread-compatible.
-class Buffer {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(Buffer);
-
- ~Buffer();
-
- // Initializes the buffer VMO with |blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t blocks,
- const char* label, fbl::unique_ptr<Buffer>* out);
-
- // Adds a transaction to |txn| which reads all data into buffer
- // starting from |disk_start| on disk.
- void Load(fs::ReadTxn* txn, size_t disk_start) {
- txn->Enqueue(vmoid_, 0, disk_start, capacity_);
- }
-
- // Returns true if there is space available for |blocks| blocks within the buffer.
- bool IsSpaceAvailable(size_t blocks) const;
-
- // Copies a write transaction to the buffer.
- // Also updates the in-memory offsets of the WriteTxn's requests so they point
- // to the correct offsets in the in-memory buffer instead of their original VMOs.
- //
- // |IsSpaceAvailable| should be called before invoking this function to
- // safely guarantee that space exists within the buffer.
- void CopyTransaction(WriteTxn* txn);
-
- // Adds a transaction to |work| with buffer offset |start| and length |length|,
- // starting at block |disk_start| on disk.
- void AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work);
-
- // Returns true if |txn| belongs to this buffer, and if so verifies
- // that it owns the next valid set of blocks within the buffer.
- bool VerifyTransaction(WriteTxn* txn) const;
-
- // Given a transaction |txn|, verifies that all requests belong to this buffer
- // and then sets the transaction's buffer accordingly (if it is not already set).
- void ValidateTransaction(WriteTxn* txn);
-
- // Frees the first |blocks| blocks in the buffer.
- void FreeSpace(size_t blocks);
-
- // Frees all space within the buffer.
- void FreeAllSpace() {
- FreeSpace(length_);
- }
-
- size_t start() const { return start_; }
- size_t length() const { return length_; }
- size_t capacity() const { return capacity_; }
-
- // Reserves the next index in the buffer.
- size_t ReserveIndex() {
- return (start_ + length_++) % capacity_;
- }
-
- // Returns data starting at block |index| in the buffer.
- void* MutableData(size_t index) {
- ZX_DEBUG_ASSERT(index < capacity_);
- return reinterpret_cast<char*>(mapper_.start()) + (index * kBlobfsBlockSize);
- }
-private:
- Buffer(TransactionManager* transaction_manager, fzl::OwnedVmoMapper mapper)
- : transaction_manager_(transaction_manager), mapper_(std::move(mapper)), start_(0),
- length_(0), capacity_(mapper_.size() / kBlobfsBlockSize) {}
-
- TransactionManager* transaction_manager_;
- fzl::OwnedVmoMapper mapper_;
- vmoid_t vmoid_ = VMOID_INVALID;
-
- // The units of all the following are "Blobfs blocks".
- size_t start_ = 0;
- size_t length_ = 0;
- const size_t capacity_;
-};
-
// Manages an in-memory writeback buffer (and background thread,
// which flushes this buffer out to disk).
class WritebackQueue {
@@ -250,10 +38,10 @@
// Initializes the WritebackBuffer at |out|
// with a buffer of |buffer_blocks| blocks of size kBlobfsBlockSize.
static zx_status_t Create(TransactionManager* transaction_manager, const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out);
+ std::unique_ptr<WritebackQueue>* out);
// Copies all transaction data referenced from |work| into the writeback buffer.
- zx_status_t Enqueue(fbl::unique_ptr<WritebackWork> work);
+ zx_status_t Enqueue(std::unique_ptr<WritebackWork> work);
bool IsReadOnly() const __TA_REQUIRES(lock_) { return state_ == WritebackState::kReadOnly; }
@@ -267,9 +55,9 @@
// mostly full.
struct Waiter : public fbl::SinglyLinkedListable<Waiter*> {};
using ProducerQueue = fs::Queue<Waiter*>;
- using WorkQueue = fs::Queue<fbl::unique_ptr<WritebackWork>>;
+ using WorkQueue = fs::Queue<std::unique_ptr<WritebackWork>>;
- WritebackQueue(fbl::unique_ptr<Buffer> buffer) : buffer_(std::move(buffer)) {}
+ WritebackQueue(std::unique_ptr<Buffer> buffer) : buffer_(std::move(buffer)) {}
bool IsRunning() const __TA_REQUIRES(lock_);
@@ -294,7 +82,7 @@
fbl::Mutex lock_;
// Buffer which stores transactions to be written out to disk.
- fbl::unique_ptr<Buffer> buffer_;
+ std::unique_ptr<Buffer> buffer_;
bool unmounting_ __TA_GUARDED(lock_) = false;
@@ -312,18 +100,4 @@
ProducerQueue producer_queue_ __TA_GUARDED(lock_);
};
-// A wrapper around "Enqueue" for content which risks being larger
-// than the writeback buffer.
-//
-// For content which is smaller than 3/4 the size of the writeback buffer: the
-// content is enqueued to |work| without flushing.
-//
-// For content which is larger than 3/4 the size of the writeback buffer: flush
-// the data by enqueueing it to the writeback thread in chunks until the
-// remainder is small enough to comfortably fit within the writeback buffer.
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/include/blobfs/writeback-work.h b/zircon/system/ulib/blobfs/include/blobfs/writeback-work.h
index a51e8d6..84dfba4 100644
--- a/zircon/system/ulib/blobfs/include/blobfs/writeback-work.h
+++ b/zircon/system/ulib/blobfs/include/blobfs/writeback-work.h
@@ -10,103 +10,17 @@
#include <utility>
-#include <blobfs/allocator.h>
-#include <blobfs/format.h>
-#include <blobfs/metrics.h>
#include <blobfs/transaction-manager.h>
-#include <fbl/algorithm.h>
-#include <fbl/auto_lock.h>
-#include <fbl/intrusive_hash_table.h>
+#include <blobfs/write-txn.h>
+#include <fbl/function.h>
#include <fbl/intrusive_single_list.h>
-#include <fbl/macros.h>
-#include <fbl/mutex.h>
-#include <fbl/ref_ptr.h>
-#include <fbl/unique_ptr.h>
-#include <fbl/vector.h>
-#include <fs/block-txn.h>
-#include <fs/queue.h>
#include <fs/vfs.h>
-#include <fs/vnode.h>
-#include <lib/sync/completion.h>
-#include <lib/fzl/owned-vmo-mapper.h>
-#include <lib/zx/vmo.h>
namespace blobfs {
-struct WriteRequest {
- zx_handle_t vmo;
- size_t vmo_offset;
- size_t dev_offset;
- size_t length;
-};
-
-enum class WritebackState {
- kInit, // Initial state of a writeback queue.
- kReady, // Indicates the queue is ready to start running.
- kRunning, // Indicates that the queue's async processor is currently running.
- kReadOnly, // State of a writeback queue which no longer allows writes.
- kComplete, // Indicates that the async processor has been torn down.
-};
-
-// A transaction consisting of enqueued VMOs to be written
-// out to disk at specified locations.
-class WriteTxn {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WriteTxn);
-
- explicit WriteTxn(TransactionManager* transaction_manager)
- : transaction_manager_(transaction_manager), vmoid_(VMOID_INVALID), block_count_(0) {}
-
- virtual ~WriteTxn();
-
- // Identifies that |nblocks| blocks of data starting at |relative_block| within the |vmo|
- // should be written out to |absolute_block| on disk at a later point in time.
- void Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
- fbl::Vector<WriteRequest>& Requests() { return requests_; }
-
- // Returns the first block at which this WriteTxn exists within its VMO buffer.
- // Requires all requests within the transaction to have been copied to a single buffer.
- size_t BlkStart() const;
-
- // Returns the total number of blocks in all requests within the WriteTxn. This number is
- // calculated at call time, unless the WriteTxn has already been fully buffered, at which point
- // the final |block_count_| is set. This is then returned for all subsequent calls to BlkCount.
- size_t BlkCount() const;
-
- bool IsBuffered() const {
- return vmoid_ != VMOID_INVALID;
- }
-
- // Sets the source buffer for the WriteTxn to |vmoid|.
- void SetBuffer(vmoid_t vmoid);
-
- // Checks if the WriteTxn vmoid_ matches |vmoid|.
- bool CheckBuffer(vmoid_t vmoid) const {
- return vmoid_ == vmoid;
- }
-
- // Resets the transaction's state.
- void Reset() {
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- }
-
-protected:
- // Activates the transaction.
- zx_status_t Flush();
-
-private:
- TransactionManager* transaction_manager_;
- vmoid_t vmoid_;
- fbl::Vector<WriteRequest> requests_;
- size_t block_count_;
-};
-
// A wrapper around a WriteTxn with added support for callback invocation on completion.
class WritebackWork : public WriteTxn,
- public fbl::SinglyLinkedListable<fbl::unique_ptr<WritebackWork>> {
+ public fbl::SinglyLinkedListable<std::unique_ptr<WritebackWork>> {
public:
using ReadyCallback = fbl::Function<bool()>;
using SyncCallback = fs::Vnode::SyncCallback;
@@ -147,183 +61,4 @@
SyncCallback sync_cb_; // Call after work has been completely flushed.
};
-// An object compatible with the WritebackWork interface, which contains a single blob reference.
-// When the writeback is completed, this reference will go out of scope.
-//
-// This class helps WritebackWork avoid concurrent writes and reads to blobs: if a BlobWork
-// is alive, the impacted Blob is still alive.
-class BlobWork : public WritebackWork {
-public:
- BlobWork(TransactionManager* transaction_manager, fbl::RefPtr<Blob> vnode)
- : WritebackWork(transaction_manager), vnode_(std::move(vnode)) {}
-
-private:
- fbl::RefPtr<Blob> vnode_;
-};
-
-// In-memory data buffer.
-// This class is thread-compatible.
-class Buffer {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(Buffer);
-
- ~Buffer();
-
- // Initializes the buffer VMO with |blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t blocks,
- const char* label, fbl::unique_ptr<Buffer>* out);
-
- // Adds a transaction to |txn| which reads all data into buffer
- // starting from |disk_start| on disk.
- void Load(fs::ReadTxn* txn, size_t disk_start) {
- txn->Enqueue(vmoid_, 0, disk_start, capacity_);
- }
-
- // Returns true if there is space available for |blocks| blocks within the buffer.
- bool IsSpaceAvailable(size_t blocks) const;
-
- // Copies a write transaction to the buffer.
- // Also updates the in-memory offsets of the WriteTxn's requests so they point
- // to the correct offsets in the in-memory buffer instead of their original VMOs.
- //
- // |IsSpaceAvailable| should be called before invoking this function to
- // safely guarantee that space exists within the buffer.
- void CopyTransaction(WriteTxn* txn);
-
- // Adds a transaction to |work| with buffer offset |start| and length |length|,
- // starting at block |disk_start| on disk.
- void AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work);
-
- // Returns true if |txn| belongs to this buffer, and if so verifies
- // that it owns the next valid set of blocks within the buffer.
- bool VerifyTransaction(WriteTxn* txn) const;
-
- // Given a transaction |txn|, verifies that all requests belong to this buffer
- // and then sets the transaction's buffer accordingly (if it is not already set).
- void ValidateTransaction(WriteTxn* txn);
-
- // Frees the first |blocks| blocks in the buffer.
- void FreeSpace(size_t blocks);
-
- // Frees all space within the buffer.
- void FreeAllSpace() {
- FreeSpace(length_);
- }
-
- size_t start() const { return start_; }
- size_t length() const { return length_; }
- size_t capacity() const { return capacity_; }
-
- // Reserves the next index in the buffer.
- size_t ReserveIndex() {
- return (start_ + length_++) % capacity_;
- }
-
- // Returns data starting at block |index| in the buffer.
- void* MutableData(size_t index) {
- ZX_DEBUG_ASSERT(index < capacity_);
- return reinterpret_cast<char*>(mapper_.start()) + (index * kBlobfsBlockSize);
- }
-private:
- Buffer(TransactionManager* transaction_manager, fzl::OwnedVmoMapper mapper)
- : transaction_manager_(transaction_manager), mapper_(std::move(mapper)), start_(0),
- length_(0), capacity_(mapper_.size() / kBlobfsBlockSize) {}
-
- TransactionManager* transaction_manager_;
- fzl::OwnedVmoMapper mapper_;
- vmoid_t vmoid_ = VMOID_INVALID;
-
- // The units of all the following are "Blobfs blocks".
- size_t start_ = 0;
- size_t length_ = 0;
- const size_t capacity_;
-};
-
-// Manages an in-memory writeback buffer (and background thread,
-// which flushes this buffer out to disk).
-class WritebackQueue {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WritebackQueue);
-
- ~WritebackQueue();
-
- // Initializes the WritebackBuffer at |out|
- // with a buffer of |buffer_blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out);
-
- // Copies all transaction data referenced from |work| into the writeback buffer.
- zx_status_t Enqueue(fbl::unique_ptr<WritebackWork> work);
-
- bool IsReadOnly() const __TA_REQUIRES(lock_) { return state_ == WritebackState::kReadOnly; }
-
- size_t GetCapacity() const { return buffer_->capacity(); }
-
- // Stops the asynchronous queue processor.
- zx_status_t Teardown();
-private:
- // The waiter struct may be used as a stack-allocated queue for producers.
- // It allows them to take turns putting data into the buffer when it is
- // mostly full.
- struct Waiter : public fbl::SinglyLinkedListable<Waiter*> {};
- using ProducerQueue = fs::Queue<Waiter*>;
- using WorkQueue = fs::Queue<fbl::unique_ptr<WritebackWork>>;
-
- WritebackQueue(fbl::unique_ptr<Buffer> buffer) : buffer_(std::move(buffer)) {}
-
- bool IsRunning() const __TA_REQUIRES(lock_);
-
- // Blocks until |blocks| blocks of data are free for the caller.
- // Doesn't actually allocate any space.
- void EnsureSpaceLocked(size_t blocks) __TA_REQUIRES(lock_);
-
- // Thread which asynchronously processes transactions.
- static int WritebackThread(void* arg);
-
- // Signalled when the writeback buffer has space to add txns.
- cnd_t work_completed_;
- // Signalled when the writeback buffer can be consumed by the background thread.
- cnd_t work_added_;
-
- // Work associated with the "writeback" thread, which manages work items,
- // and flushes them to disk. This thread acts as a consumer of the
- // writeback buffer.
- thrd_t worker_;
-
- // Use to lock resources that may be accessed asynchronously.
- fbl::Mutex lock_;
-
- // Buffer which stores transactions to be written out to disk.
- fbl::unique_ptr<Buffer> buffer_;
-
- bool unmounting_ __TA_GUARDED(lock_) = false;
-
- // The WritebackQueue will start off in a kInit state, and will change to kRunning when the
- // background thread is brought up. Once it is running, if an error is detected during
- // writeback, the queue is converted to kReadOnly, and no further writes are permitted.
- WritebackState state_ __TA_GUARDED(lock_) = WritebackState::kInit;
-
- // Tracks all the pending Writeback Work operations which exist in the
- // writeback buffer and are ready to be sent to disk.
- WorkQueue work_queue_ __TA_GUARDED(lock_){};
-
- // Ensures that if multiple producers are waiting for space to write their
- // transactions into the writeback buffer, they can each write in-order.
- ProducerQueue producer_queue_ __TA_GUARDED(lock_);
-};
-
-// A wrapper around "Enqueue" for content which risks being larger
-// than the writeback buffer.
-//
-// For content which is smaller than 3/4 the size of the writeback buffer: the
-// content is enqueued to |work| without flushing.
-//
-// For content which is larger than 3/4 the size of the writeback buffer: flush
-// the data by enqueueing it to the writeback thread in chunks until the
-// remainder is small enough to comfortably fit within the writeback buffer.
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/include/blobfs/writeback.h b/zircon/system/ulib/blobfs/include/blobfs/writeback.h
index a51e8d6..90fbe59 100644
--- a/zircon/system/ulib/blobfs/include/blobfs/writeback.h
+++ b/zircon/system/ulib/blobfs/include/blobfs/writeback.h
@@ -10,143 +10,13 @@
#include <utility>
-#include <blobfs/allocator.h>
-#include <blobfs/format.h>
-#include <blobfs/metrics.h>
#include <blobfs/transaction-manager.h>
-#include <fbl/algorithm.h>
-#include <fbl/auto_lock.h>
-#include <fbl/intrusive_hash_table.h>
-#include <fbl/intrusive_single_list.h>
-#include <fbl/macros.h>
-#include <fbl/mutex.h>
+#include <blobfs/writeback-work.h>
#include <fbl/ref_ptr.h>
-#include <fbl/unique_ptr.h>
-#include <fbl/vector.h>
-#include <fs/block-txn.h>
-#include <fs/queue.h>
-#include <fs/vfs.h>
-#include <fs/vnode.h>
-#include <lib/sync/completion.h>
-#include <lib/fzl/owned-vmo-mapper.h>
#include <lib/zx/vmo.h>
namespace blobfs {
-struct WriteRequest {
- zx_handle_t vmo;
- size_t vmo_offset;
- size_t dev_offset;
- size_t length;
-};
-
-enum class WritebackState {
- kInit, // Initial state of a writeback queue.
- kReady, // Indicates the queue is ready to start running.
- kRunning, // Indicates that the queue's async processor is currently running.
- kReadOnly, // State of a writeback queue which no longer allows writes.
- kComplete, // Indicates that the async processor has been torn down.
-};
-
-// A transaction consisting of enqueued VMOs to be written
-// out to disk at specified locations.
-class WriteTxn {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WriteTxn);
-
- explicit WriteTxn(TransactionManager* transaction_manager)
- : transaction_manager_(transaction_manager), vmoid_(VMOID_INVALID), block_count_(0) {}
-
- virtual ~WriteTxn();
-
- // Identifies that |nblocks| blocks of data starting at |relative_block| within the |vmo|
- // should be written out to |absolute_block| on disk at a later point in time.
- void Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks);
-
- fbl::Vector<WriteRequest>& Requests() { return requests_; }
-
- // Returns the first block at which this WriteTxn exists within its VMO buffer.
- // Requires all requests within the transaction to have been copied to a single buffer.
- size_t BlkStart() const;
-
- // Returns the total number of blocks in all requests within the WriteTxn. This number is
- // calculated at call time, unless the WriteTxn has already been fully buffered, at which point
- // the final |block_count_| is set. This is then returned for all subsequent calls to BlkCount.
- size_t BlkCount() const;
-
- bool IsBuffered() const {
- return vmoid_ != VMOID_INVALID;
- }
-
- // Sets the source buffer for the WriteTxn to |vmoid|.
- void SetBuffer(vmoid_t vmoid);
-
- // Checks if the WriteTxn vmoid_ matches |vmoid|.
- bool CheckBuffer(vmoid_t vmoid) const {
- return vmoid_ == vmoid;
- }
-
- // Resets the transaction's state.
- void Reset() {
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- }
-
-protected:
- // Activates the transaction.
- zx_status_t Flush();
-
-private:
- TransactionManager* transaction_manager_;
- vmoid_t vmoid_;
- fbl::Vector<WriteRequest> requests_;
- size_t block_count_;
-};
-
-// A wrapper around a WriteTxn with added support for callback invocation on completion.
-class WritebackWork : public WriteTxn,
- public fbl::SinglyLinkedListable<fbl::unique_ptr<WritebackWork>> {
-public:
- using ReadyCallback = fbl::Function<bool()>;
- using SyncCallback = fs::Vnode::SyncCallback;
-
- WritebackWork(TransactionManager* transaction_manager);
- virtual ~WritebackWork() = default;
-
- // Sets the WritebackWork to a completed state. |status| should indicate whether the work was
- // completed successfully.
- void MarkCompleted(zx_status_t status);
-
- // Returns true if the WritebackWork is "ready" to be processed. This is always true unless a
- // "ready callback" exists, in which case that callback determines the state of readiness. Once
- // a positive response is received, the ready callback is destroyed - the WritebackWork will
- // always be ready from this point forward.
- bool IsReady();
-
- // Adds a callback to the WritebackWork to be called before the WritebackWork is completed,
- // to ensure that it's ready for writeback.
- //
- // Only one ready callback may be set for each WritebackWork unit.
- void SetReadyCallback(ReadyCallback callback);
-
- // Adds a callback to the WritebackWork, such that it will be signalled when the WritebackWork
- // is flushed to disk. If no callback is set, nothing will get signalled.
- //
- // Multiple callbacks may be set. They are invoked in "first-in, last-out" order (i.e.,
- // enqueueing A, B, C will invoke C, B, A).
- void SetSyncCallback(SyncCallback callback);
-
- // Persists the enqueued work to disk,
- // and resets the WritebackWork to its initial state.
- zx_status_t Complete();
-
-private:
- // Optional callbacks.
- ReadyCallback ready_cb_; // Call to check whether work is ready to be processed.
- SyncCallback sync_cb_; // Call after work has been completely flushed.
-};
-
// An object compatible with the WritebackWork interface, which contains a single blob reference.
// When the writeback is completed, this reference will go out of scope.
//
@@ -161,157 +31,6 @@
fbl::RefPtr<Blob> vnode_;
};
-// In-memory data buffer.
-// This class is thread-compatible.
-class Buffer {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(Buffer);
-
- ~Buffer();
-
- // Initializes the buffer VMO with |blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t blocks,
- const char* label, fbl::unique_ptr<Buffer>* out);
-
- // Adds a transaction to |txn| which reads all data into buffer
- // starting from |disk_start| on disk.
- void Load(fs::ReadTxn* txn, size_t disk_start) {
- txn->Enqueue(vmoid_, 0, disk_start, capacity_);
- }
-
- // Returns true if there is space available for |blocks| blocks within the buffer.
- bool IsSpaceAvailable(size_t blocks) const;
-
- // Copies a write transaction to the buffer.
- // Also updates the in-memory offsets of the WriteTxn's requests so they point
- // to the correct offsets in the in-memory buffer instead of their original VMOs.
- //
- // |IsSpaceAvailable| should be called before invoking this function to
- // safely guarantee that space exists within the buffer.
- void CopyTransaction(WriteTxn* txn);
-
- // Adds a transaction to |work| with buffer offset |start| and length |length|,
- // starting at block |disk_start| on disk.
- void AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work);
-
- // Returns true if |txn| belongs to this buffer, and if so verifies
- // that it owns the next valid set of blocks within the buffer.
- bool VerifyTransaction(WriteTxn* txn) const;
-
- // Given a transaction |txn|, verifies that all requests belong to this buffer
- // and then sets the transaction's buffer accordingly (if it is not already set).
- void ValidateTransaction(WriteTxn* txn);
-
- // Frees the first |blocks| blocks in the buffer.
- void FreeSpace(size_t blocks);
-
- // Frees all space within the buffer.
- void FreeAllSpace() {
- FreeSpace(length_);
- }
-
- size_t start() const { return start_; }
- size_t length() const { return length_; }
- size_t capacity() const { return capacity_; }
-
- // Reserves the next index in the buffer.
- size_t ReserveIndex() {
- return (start_ + length_++) % capacity_;
- }
-
- // Returns data starting at block |index| in the buffer.
- void* MutableData(size_t index) {
- ZX_DEBUG_ASSERT(index < capacity_);
- return reinterpret_cast<char*>(mapper_.start()) + (index * kBlobfsBlockSize);
- }
-private:
- Buffer(TransactionManager* transaction_manager, fzl::OwnedVmoMapper mapper)
- : transaction_manager_(transaction_manager), mapper_(std::move(mapper)), start_(0),
- length_(0), capacity_(mapper_.size() / kBlobfsBlockSize) {}
-
- TransactionManager* transaction_manager_;
- fzl::OwnedVmoMapper mapper_;
- vmoid_t vmoid_ = VMOID_INVALID;
-
- // The units of all the following are "Blobfs blocks".
- size_t start_ = 0;
- size_t length_ = 0;
- const size_t capacity_;
-};
-
-// Manages an in-memory writeback buffer (and background thread,
-// which flushes this buffer out to disk).
-class WritebackQueue {
-public:
- DISALLOW_COPY_ASSIGN_AND_MOVE(WritebackQueue);
-
- ~WritebackQueue();
-
- // Initializes the WritebackBuffer at |out|
- // with a buffer of |buffer_blocks| blocks of size kBlobfsBlockSize.
- static zx_status_t Create(TransactionManager* transaction_manager, const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out);
-
- // Copies all transaction data referenced from |work| into the writeback buffer.
- zx_status_t Enqueue(fbl::unique_ptr<WritebackWork> work);
-
- bool IsReadOnly() const __TA_REQUIRES(lock_) { return state_ == WritebackState::kReadOnly; }
-
- size_t GetCapacity() const { return buffer_->capacity(); }
-
- // Stops the asynchronous queue processor.
- zx_status_t Teardown();
-private:
- // The waiter struct may be used as a stack-allocated queue for producers.
- // It allows them to take turns putting data into the buffer when it is
- // mostly full.
- struct Waiter : public fbl::SinglyLinkedListable<Waiter*> {};
- using ProducerQueue = fs::Queue<Waiter*>;
- using WorkQueue = fs::Queue<fbl::unique_ptr<WritebackWork>>;
-
- WritebackQueue(fbl::unique_ptr<Buffer> buffer) : buffer_(std::move(buffer)) {}
-
- bool IsRunning() const __TA_REQUIRES(lock_);
-
- // Blocks until |blocks| blocks of data are free for the caller.
- // Doesn't actually allocate any space.
- void EnsureSpaceLocked(size_t blocks) __TA_REQUIRES(lock_);
-
- // Thread which asynchronously processes transactions.
- static int WritebackThread(void* arg);
-
- // Signalled when the writeback buffer has space to add txns.
- cnd_t work_completed_;
- // Signalled when the writeback buffer can be consumed by the background thread.
- cnd_t work_added_;
-
- // Work associated with the "writeback" thread, which manages work items,
- // and flushes them to disk. This thread acts as a consumer of the
- // writeback buffer.
- thrd_t worker_;
-
- // Use to lock resources that may be accessed asynchronously.
- fbl::Mutex lock_;
-
- // Buffer which stores transactions to be written out to disk.
- fbl::unique_ptr<Buffer> buffer_;
-
- bool unmounting_ __TA_GUARDED(lock_) = false;
-
- // The WritebackQueue will start off in a kInit state, and will change to kRunning when the
- // background thread is brought up. Once it is running, if an error is detected during
- // writeback, the queue is converted to kReadOnly, and no further writes are permitted.
- WritebackState state_ __TA_GUARDED(lock_) = WritebackState::kInit;
-
- // Tracks all the pending Writeback Work operations which exist in the
- // writeback buffer and are ready to be sent to disk.
- WorkQueue work_queue_ __TA_GUARDED(lock_){};
-
- // Ensures that if multiple producers are waiting for space to write their
- // transactions into the writeback buffer, they can each write in-order.
- ProducerQueue producer_queue_ __TA_GUARDED(lock_);
-};
-
// A wrapper around "Enqueue" for content which risks being larger
// than the writeback buffer.
//
@@ -321,7 +40,7 @@
// For content which is larger than 3/4 the size of the writeback buffer: flush
// the data by enqueueing it to the writeback thread in chunks until the
// remainder is small enough to comfortably fit within the writeback buffer.
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
+zx_status_t EnqueuePaginated(std::unique_ptr<WritebackWork>* work,
TransactionManager* transaction_manager, Blob* vn,
const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
uint64_t nblocks);
diff --git a/zircon/system/ulib/blobfs/journal-entry.cpp b/zircon/system/ulib/blobfs/journal-entry.cpp
index 3aebd9d..fecb25d 100644
--- a/zircon/system/ulib/blobfs/journal-entry.cpp
+++ b/zircon/system/ulib/blobfs/journal-entry.cpp
@@ -11,15 +11,6 @@
namespace blobfs {
-// TODO(ZX-2415): Add tracing/metrics collection to journal related operations.
-
-// Thread which asynchronously processes journal entries.
-static int JournalThread(void* arg) {
- Journal* journal = reinterpret_cast<Journal*>(arg);
- journal->ProcessLoop();
- return 0;
-}
-
JournalEntry::JournalEntry(JournalBase* journal, EntryStatus status, size_t header_index,
size_t commit_index, fbl::unique_ptr<WritebackWork> work)
: journal_(journal), status_(static_cast<uint32_t>(status)), block_count_(0),
@@ -98,977 +89,4 @@
commit_block_.checksum = checksum;
}
-zx_status_t Journal::Create(TransactionManager* transaction_manager, uint64_t journal_blocks,
- uint64_t start_block, fbl::unique_ptr<Journal>* out) {
- // Create the buffer with 1 less than total journal blocks.
- // (1 block must be reserved for journal info).
- zx_status_t status;
- fbl::unique_ptr<Buffer> buffer;
- if ((status = Buffer::Create(transaction_manager, journal_blocks - 1, "blobfs-journal",
- &buffer)) != ZX_OK) {
- return status;
- }
-
- // Create another buffer for the journal info block.
- fbl::unique_ptr<Buffer> info;
- if ((status = Buffer::Create(transaction_manager, 1, "blobfs-journal-info", &info)) != ZX_OK) {
- return status;
- }
-
- // Reserve the only block in the info buffer so its impossible to copy transactions to it.
- info->ReserveIndex();
-
- // Create the Journal with the newly created vmos.
- fbl::unique_ptr<Journal> journal(new Journal(transaction_manager, std::move(info),
- std::move(buffer), start_block));
-
- // Load contents of journal from disk.
- if ((status = journal->Load()) != ZX_OK) {
- FS_TRACE_ERROR("Journal: Failed to load from disk: %d\n", status);
- return status;
- }
-
- *out = std::move(journal);
- return ZX_OK;
-}
-
-Journal::~Journal() {
- // Ensure that thread teardown has completed, or that it was never brought up to begin with.
- ZX_DEBUG_ASSERT(!IsRunning());
-
- // Ensure that work and producer queues are currently empty.
- ZX_DEBUG_ASSERT(work_queue_.is_empty());
- ZX_DEBUG_ASSERT(producer_queue_.is_empty());
-}
-
-zx_status_t Journal::Teardown() {
- WritebackState state;
-
- {
- fbl::AutoLock lock(&lock_);
- state = state_;
-
- // Signal the background thread.
- unmounting_ = true;
- cnd_signal(&consumer_cvar_);
- }
-
- zx_status_t status = ZX_OK;
- if (state != WritebackState::kInit && state != WritebackState::kReady) {
- // Block until the thread completes itself.
- int result = -1;
- int success = thrd_join(thread_, &result);
- if (result != 0 || success != thrd_success) {
- status = ZX_ERR_INTERNAL;
- }
- }
-
- return status;
-}
-
-zx_status_t Journal::Load() {
- fbl::AutoLock lock(&lock_);
- ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);
-
- // Load info block and journal entries into their respective buffers.
- fs::ReadTxn txn(transaction_manager_);
- info_->Load(&txn, start_block_);
- entries_->Load(&txn, start_block_ + 1);
- zx_status_t status = txn.Transact();
-
- if (status != ZX_OK) {
- return status;
- }
-
- JournalInfo* info = GetInfo();
-
- // Verify the journal magic matches.
- if (info->magic != kJournalMagic) {
- FS_TRACE_ERROR("Journal info bad magic\n");
- return ZX_ERR_BAD_STATE;
- }
-
- if (info->start_block > 0 || info->num_blocks > 0 || info->timestamp > 0) {
- // Who checks the checksum? (It's us. We are doing it.)
- uint32_t old_checksum = info->checksum;
- info->checksum = 0;
-
- uint8_t* info_ptr = reinterpret_cast<uint8_t*>(info);
- info->checksum = crc32(0, info_ptr, sizeof(JournalInfo));
-
- if (old_checksum != info->checksum) {
- FS_TRACE_ERROR("Journal info checksum corrupt\n");
- return ZX_ERR_BAD_STATE;
- }
- }
-
- return status;
-}
-
-zx_status_t Journal::Replay() {
- fbl::AutoLock lock(&lock_);
- ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);
-
- uint64_t timestamp = 0;
- size_t start = GetInfo()->start_block;
- size_t length = GetInfo()->num_blocks;
- size_t total_entries = 0;
- size_t total_blocks = 0;
-
- // Replay entries until we find one that isn't valid.
- while (true) {
- uint64_t entry_blocks;
- // |start| is the header index of the next entry.
- zx_status_t status = ReplayEntry(start, length, &entry_blocks, ×tamp);
- if (status == ZX_ERR_OUT_OF_RANGE) {
- break;
- } else if (status != ZX_OK) {
- return status;
- }
-
- total_entries++;
- total_blocks += entry_blocks;
-
- start += entry_blocks;
- start %= entries_->capacity();
-
- if (length) {
- length -= entry_blocks;
- }
- }
-
- // TODO(planders): Sync to ensure that all entries have been written out before resetting the
- // on-disk state of the journal.
- if (total_entries > 0) {
- printf("Found and replayed %zu total blobfs journal entries starting from index %zu, "
- "including %zu total blocks.\n",
- total_entries, GetInfo()->start_block, total_blocks);
- } else if (start == 0 && length == 0) {
- // If no entries were found and journal is already in its default state,
- // return without writing out any changes.
- state_ = WritebackState::kReady;
- return ZX_OK;
- }
-
- // We expect length to be 0 at this point, assuming the journal was not corrupted and replay
- // completed successfully. However, in the case of corruption of the journal this may not be the
- // case. Since we cannot currently recover from this situation we should proceed as normal.
- zx_status_t status = CommitReplay();
- if (status != ZX_OK) {
- return status;
- }
-
- // Now that we've resolved any remaining entries, we are ready to start journal writeback.
- state_ = WritebackState::kReady;
- return ZX_OK;
-}
-
-zx_status_t Journal::InitWriteback() {
- fbl::AutoLock lock(&lock_);
- ZX_DEBUG_ASSERT(state_ == WritebackState::kReady);
-
- if (entries_->start() > 0 || entries_->length() > 0) {
- FS_TRACE_ERROR("Cannot initialize journal writeback - entries may still exist.\n");
- return ZX_ERR_BAD_STATE;
- }
-
- if (thrd_create_with_name(&thread_, JournalThread, this, "blobfs-journal") !=
- thrd_success) {
- FS_TRACE_ERROR("Failed to create journal thread.\n");
- return ZX_ERR_NO_RESOURCES;
- }
-
- state_ = WritebackState::kRunning;
- return ZX_OK;
-}
-
-zx_status_t Journal::Enqueue(fbl::unique_ptr<WritebackWork> work) {
- // Verify that the work exists and has not already been prepared for writeback.
- ZX_DEBUG_ASSERT(work != nullptr);
- ZX_DEBUG_ASSERT(!work->IsBuffered());
-
- // Block count will be the number of blocks in the transaction + header + commit.
- size_t blocks = work->BlkCount();
- // By default set the header/commit indices to the buffer capacity,
- // since this will be an invalid index value.
- size_t header_index = entries_->capacity();
- size_t commit_index = entries_->capacity();
-
- fbl::AutoLock lock(&lock_);
-
- zx_status_t status = ZX_OK;
- if (IsReadOnly()) {
- // If we are in "read only" mode, set an error status.
- status = ZX_ERR_BAD_STATE;
- } else if (blocks) {
- ZX_DEBUG_ASSERT(state_ == WritebackState::kRunning);
-
- // If the work contains no blocks (i.e. it is a sync work), proceed to create an entry
- // without enqueueing any data to the buffer.
-
- // Add 2 blocks to the block count for the journal entry's header/commit blocks.
- blocks += 2;
-
- // Ensure we have enough space to write the current entry to the buffer.
- // If not, wait until space becomes available.
- EnsureSpaceLocked(blocks);
-
- if (IsReadOnly()) {
- // The Journal is in a bad state and is no longer accepting new entries.
- status = ZX_ERR_BAD_STATE;
- } else {
- // Assign header index of journal entry to the next available value before we attempt to
- // copy the meat of the entry to the buffer.
- header_index = entries_->ReserveIndex();
-
- // Copy the data from WritebackWork to the journal buffer. We can wait to write out the
- // header and commit blocks asynchronously, since this will involve calculating the
- // checksum.
- // TODO(planders): Release the lock while transaction is being copied.
- entries_->CopyTransaction(work.get());
-
- // Assign commit_index immediately after copying to the buffer.
- // Increase length_ accordingly.
- commit_index = entries_->ReserveIndex();
-
- // Make sure that commit index matches what we expect
- // based on header index, block count, and buffer size.
- ZX_DEBUG_ASSERT(commit_index == (header_index + blocks - 1) % entries_->capacity());
- }
- }
-
- // Create the journal entry and push it onto the work queue.
- fbl::unique_ptr<JournalEntry> entry = CreateEntry(header_index, commit_index, std::move(work));
-
- if (entry->GetStatus() == EntryStatus::kInit) {
- // If we have a non-sync work, there is some extra preparation we need to do.
- if (status == ZX_OK) {
- // Prepare a WritebackWork to write out the entry to disk. Note that this does not
- // fully prepare the buffer for writeback, so a ready callback is added to the work as
- // part of this step.
- PrepareWork(entry.get(), &work);
- ZX_DEBUG_ASSERT(work != nullptr);
- status = EnqueueEntryWork(std::move(work));
- } else {
- // If the status is not okay (i.e. we are in a readonly state), do no additional
- // processing but set the entry state to error.
- entry->SetStatus(EntryStatus::kError);
- }
- }
-
- // Queue the entry to be processed asynchronously.
- work_queue_.push(std::move(entry));
-
- // Signal the JournalThread that there is at least one entry ready to be processed.
- SendSignalLocked(status);
- return status;
-}
-
-void Journal::SendSignalLocked(zx_status_t status) {
- if (status == ZX_OK) {
- // Once writeback has entered a read only state, no further transactions should succeed.
- ZX_ASSERT(state_ != WritebackState::kReadOnly);
- } else {
- state_ = WritebackState::kReadOnly;
- }
- consumer_signalled_ = true;
- cnd_signal(&consumer_cvar_);
-}
-
-bool Journal::IsRunning() const {
- switch (state_) {
- case WritebackState::kRunning:
- case WritebackState::kReadOnly:
- return true;
- default:
- return false;
- }
-}
-
-fbl::unique_ptr<JournalEntry> Journal::CreateEntry(uint64_t header_index, uint64_t commit_index,
- fbl::unique_ptr<WritebackWork> work) {
- EntryStatus status = EntryStatus::kInit;
-
- if (work->BlkCount() == 0) {
- // If the work has no transactions, this is a sync work - we can return early.
- // Right now we make the assumption that if a WritebackWork has any transactions, it cannot
- // have a corresponding sync callback. We may need to revisit this later.
- status = EntryStatus::kSync;
- } else if (IsReadOnly()) {
- // If the journal is in a read only state, set the entry status to error.
- status = EntryStatus::kError;
- }
-
- return std::make_unique<JournalEntry>(this, status, header_index, commit_index,
- std::move(work));
-}
-
-void Journal::PrepareWork(JournalEntry* entry, fbl::unique_ptr<WritebackWork>* out) {
- size_t header_index = entry->GetHeaderIndex();
- size_t commit_index = entry->GetCommitIndex();
- size_t block_count = entry->BlockCount();
-
- if (block_count == 0) {
- // If journal entry has size 0, it is an empty sync entry, and we don't need to write
- // anything to the journal.
- ZX_DEBUG_ASSERT(header_index == entries_->capacity());
- ZX_DEBUG_ASSERT(commit_index == entries_->capacity());
- return;
- }
-
- fbl::unique_ptr<WritebackWork> work = CreateWork();
-
- // Update work with transactions for the current entry.
- AddEntryTransaction(header_index, block_count, work.get());
-
- // Make sure the work is prepared for the writeback queue.
- work->SetReadyCallback(entry->CreateReadyCallback());
- work->SetSyncCallback(entry->CreateSyncCallback());
- *out = std::move(work);
-}
-
-void Journal::ProcessEntryResult(zx_status_t result, JournalEntry* entry) {
- fbl::AutoLock lock(&lock_);
- // Since it is possible for the entry to be deleted immediately after updating its status
- // (if the journal is being processed at the time), it is safer to update the entry status
- // under the journal lock.
- entry->SetStatusFromResult(result);
- SendSignalLocked(result);
-}
-
-void Journal::PrepareBuffer(JournalEntry* entry) {
- size_t header_index = entry->GetHeaderIndex();
- size_t commit_index = entry->GetCommitIndex();
- size_t block_count = entry->BlockCount();
-
- if (block_count == 0) {
- // If journal entry has size 0, it is an empty sync entry, and we don't need to write
- // anything to the journal.
- ZX_DEBUG_ASSERT(header_index == entries_->capacity());
- ZX_DEBUG_ASSERT(commit_index == entries_->capacity());
- return;
- }
-
- // Copy header block of the journal entry into the journal buffer. We must write the header
- // block into the buffer before the commit block so we can generate the checksum.
- void* data = entries_->MutableData(header_index);
- memset(data, 0, kBlobfsBlockSize);
- memcpy(data, &entry->GetHeaderBlock(), sizeof(HeaderBlock));
-
- // Now that the header block has been written to the buffer, we can calculate a checksum for
- // the header + all journaled metadata blocks and set it in the entry's commit block.
- entry->SetChecksum(GenerateChecksum(header_index, commit_index));
-
- // Write the commit block (now with checksum) to the journal buffer.
- data = entries_->MutableData(commit_index);
- memset(data, 0, kBlobfsBlockSize);
- memcpy(data, &entry->GetCommitBlock(), sizeof(CommitBlock));
-}
-
-void Journal::PrepareDelete(JournalEntry* entry, WritebackWork* work) {
- ZX_DEBUG_ASSERT(work != nullptr);
- size_t header_index = entry->GetHeaderIndex();
- size_t commit_index = entry->GetCommitIndex();
- size_t block_count = entry->BlockCount();
-
- if (block_count == 0) {
- // If journal entry has size 0, it is an empty sync entry, and we don't need to write
- // anything to the journal.
- ZX_DEBUG_ASSERT(header_index == entries_->capacity());
- ZX_DEBUG_ASSERT(commit_index == entries_->capacity());
- return;
- }
-
- // Overwrite the header & commit block in the buffer with empty data.
- memset(entries_->MutableData(header_index), 0, kBlobfsBlockSize);
- memset(entries_->MutableData(commit_index), 0, kBlobfsBlockSize);
-
- // Enqueue transactions for the header/commit blocks.
- entries_->AddTransaction(header_index, start_block_ + 1 + header_index, 1, work);
- entries_->AddTransaction(commit_index, start_block_ + 1 + commit_index, 1, work);
-}
-
-fbl::unique_ptr<WritebackWork> Journal::CreateWork() {
- fbl::unique_ptr<WritebackWork> work;
- transaction_manager_->CreateWork(&work, nullptr);
- ZX_DEBUG_ASSERT(work != nullptr);
- return work;
-}
-
-zx_status_t Journal::EnqueueEntryWork(fbl::unique_ptr<WritebackWork> work) {
- entries_->ValidateTransaction(work.get());
- return transaction_manager_->EnqueueWork(std::move(work), EnqueueType::kData);
-}
-
-bool Journal::VerifyEntryMetadata(size_t header_index, uint64_t last_timestamp, bool expect_valid) {
- HeaderBlock* header = GetHeaderBlock(header_index);
- // If length_ > 0, the next entry should be guaranteed.
- if (header->magic != kEntryHeaderMagic || header->timestamp <= last_timestamp) {
- // If the next calculated header block is either 1) not a header block, or 2) does not
- // have a timestamp strictly later than the previous entry, it is not a valid entry and
- // should not be replayed. This is only a journal replay "error" if, according to the
- // journal super block, we still have some entries left to process (i.e. length_ > 0).
- if (expect_valid) {
- FS_TRACE_ERROR("Journal Replay Error: invalid header found.\n");
- }
-
- return false;
- }
-
- size_t commit_index = (header_index + header->num_blocks + 1) % entries_->capacity();
- CommitBlock* commit = GetCommitBlock(commit_index);
-
- if (commit->magic != kEntryCommitMagic) {
- FS_TRACE_ERROR("Journal Replay Error: commit magic does not match expected\n");
- return false;
- }
-
- if (commit->timestamp != header->timestamp) {
- FS_TRACE_ERROR("Journal Replay Error: commit timestamp does not match expected\n");
- return false;
- }
-
- // Calculate the checksum of the entry data to verify the commit block's checksum.
- uint32_t checksum = GenerateChecksum(header_index, commit_index);
-
- // Since we already found a valid header, we expect this to be a valid entry. If something
- // in the commit block does not match what we expect, this is an error.
- if (commit->checksum != checksum) {
- FS_TRACE_ERROR("Journal Replay Error: commit checksum does not match expected\n");
- return false;
- }
-
- return true;
-}
-
-zx_status_t Journal::ReplayEntry(size_t header_index, size_t remaining_length,
- uint64_t* entry_blocks, uint64_t* timestamp) {
- ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);
-
- bool expect_valid = remaining_length > 0;
- if (!VerifyEntryMetadata(header_index, *timestamp, expect_valid)) {
- return ZX_ERR_OUT_OF_RANGE;
- }
-
- HeaderBlock* header = GetHeaderBlock(header_index);
- *timestamp = header->timestamp;
- *entry_blocks = header->num_blocks + 2;
- // We have found a valid entry - ensure that remaining_length is valid
- // (either 0 remaining, or enough to fit this entry).
- ZX_DEBUG_ASSERT(remaining_length == 0 || remaining_length >= *entry_blocks);
-
- fbl::unique_ptr<WritebackWork> work = CreateWork();
-
- // Enqueue one block at a time, since they may not end up being contiguous on disk.
- for (unsigned i = 0; i < header->num_blocks; i++) {
- size_t vmo_block = (header_index + i + 1) % entries_->capacity();
- entries_->AddTransaction(vmo_block, header->target_blocks[i], 1, work.get());
- }
-
- // Replay (and therefore mount) will fail if we cannot enqueue the replay work. Since the
- // journal itself is not corrupt (at least up to this point), we would expect replay to
- // succeed on a subsequent attempt, so we should keep any existing entries intact. (i.e.,
- // do not reset the journal metadata in this failure case).
- zx_status_t status = EnqueueEntryWork(std::move(work));
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
- }
-
- return status;
-}
-
-zx_status_t Journal::CommitReplay() {
- ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);
-
- // Overwrite the first journal entry block to 0. Since we are resetting the info block to point
- // to 0 as the first entry, we expect that block 0 will not contain a valid entry. Overwriting
- // it will ensure that this is not the case.
- memset(entries_->MutableData(0), 0, kBlobfsBlockSize);
- fbl::unique_ptr<WritebackWork> work = CreateWork();
-
- entries_->AddTransaction(0, start_block_ + 1, 1, work.get());
-
- zx_status_t status;
- if ((status = EnqueueEntryWork(std::move(work))) != ZX_OK) {
- FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
- return status;
- }
-
- // Write out the updated info block to disk.
- if ((status = WriteInfo(entries_->start(), entries_->length())) != ZX_OK) {
- FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
- return status;
- }
-
- // Wait for any replayed entries to complete before completing replay.
- work = CreateWork();
- sync_completion_t completion;
- sync_completion_reset(&completion);
-
- work->SetSyncCallback([&completion, &status](zx_status_t new_status) {
- status = new_status;
- sync_completion_signal(&completion);
- });
-
- if ((status = EnqueueEntryWork(std::move(work))) != ZX_OK) {
- FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
- return status;
- }
-
- sync_completion_wait(&completion, ZX_TIME_INFINITE);
-
- // Return a successful status, even if we detected corrupt metadata or entries.
- // Our metadata should still be in a consistent state so it will be safe to mount regardless.
- return ZX_OK;
-}
-
-zx_status_t Journal::WriteInfo(uint64_t start, uint64_t length) {
- JournalInfo* info = GetInfo();
-
- if (start == info->start_block && length == info->num_blocks) {
- // If the current buffer start/len match the info block, skip the writing step.
- return ZX_OK;
- }
-
- fbl::unique_ptr<WritebackWork> work;
- transaction_manager_->CreateWork(&work, nullptr);
-
- info->start_block = start;
- info->num_blocks = length;
- info->timestamp = zx_ticks_get();
-
- // Set the checksum to 0 so we can calculate the checksum of the rest of the info block.
- info->checksum = 0;
- uint8_t* info_ptr = reinterpret_cast<uint8_t*>(info);
- info->checksum = crc32(0, info_ptr, sizeof(JournalInfo));
-
- info_->AddTransaction(0, start_block_, 1, work.get());
- info_->ValidateTransaction(work.get());
- return transaction_manager_->EnqueueWork(std::move(work), EnqueueType::kData);
-}
-
-void Journal::EnsureSpaceLocked(size_t blocks) {
- while (!entries_->IsSpaceAvailable(blocks)) {
- // Not enough room to write back work, yet. Wait until room is available.
- Waiter w;
- producer_queue_.push(&w);
-
- do {
- cnd_wait(&producer_cvar_, lock_.GetInternal());
- } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
- (!entries_->IsSpaceAvailable(blocks))); // ... and there is enough space for us.
-
- producer_queue_.pop();
- }
-}
-
-void Journal::AddEntryTransaction(size_t start, size_t length, WritebackWork* work) {
- // Ensure the request fits within the buffer.
- ZX_DEBUG_ASSERT(start < entries_->capacity());
- ZX_DEBUG_ASSERT(length > 0);
- ZX_DEBUG_ASSERT(length < entries_->capacity());
- ZX_DEBUG_ASSERT(work != nullptr);
-
- // Adjust the length of the first transaction in
- // case it wraps around to the front of the buffer.
- size_t first_length = length;
-
- if (start + length > entries_->capacity()) {
- first_length = entries_->capacity() - start;
- }
-
- // Ensure we do not have an empty transaction.
- ZX_DEBUG_ASSERT(first_length > 0);
-
- // Enqueue the first part of the transaction.
- size_t disk_start = start_block_ + 1;
- entries_->AddTransaction(start, disk_start + start, first_length, work);
-
- // If we wrapped around to the front of the journal,
- // enqueue a second transaction with the remaining data + commit block.
- if (first_length < length) {
- entries_->AddTransaction(0, disk_start, length - first_length, work);
- }
-}
-
-uint32_t Journal::GenerateChecksum(size_t header_index, size_t commit_index) {
- ZX_DEBUG_ASSERT(commit_index != header_index);
-
- size_t first_length = 0;
-
- // Determine how long the first part of the transaction is.
- if (commit_index < header_index) {
- first_length = entries_->capacity() - header_index;
- } else {
- first_length = commit_index - header_index;
- }
-
- ZX_DEBUG_ASSERT(first_length > 0);
-
- // Calculate checksum.
- uint8_t* data_ptr = static_cast<uint8_t*>(entries_->MutableData(header_index));
- uint32_t checksum = crc32(0, data_ptr, first_length * kBlobfsBlockSize);
-
- // If the transaction wraps around the buffer, update checksum for the second half.
- if (commit_index < header_index) {
- data_ptr = static_cast<uint8_t*>(entries_->MutableData(0));
- checksum = crc32(checksum, data_ptr, commit_index * kBlobfsBlockSize);
- }
-
- return checksum;
-}
-
-fbl::unique_ptr<JournalEntry> Journal::GetNextEntry() {
- fbl::AutoLock lock(&lock_);
- return work_queue_.pop();
-}
-
-void Journal::ProcessQueues(JournalProcessor* processor) {
- // Process all entries in the work queue.
- fbl::unique_ptr<JournalEntry> entry;
- while ((entry = GetNextEntry()) != nullptr) {
- // TODO(planders): For each entry that we process, we can potentially verify that the
- // indices fit within the expected start/len of the journal buffer, and do
- // not collide with other entries.
- processor->ProcessWorkEntry(std::move(entry));
- }
-
- // Since the processor queues are accessed exclusively by the async thread,
- // we do not need to hold the lock while we access them.
-
- // If we processed any entries during the work step,
- // enqueue the dummy work to kick off the writeback queue.
- processor->EnqueueWork();
-
- // TODO(planders): Instead of immediately processing all wait items, wait until some
- // condition is fulfilled (e.g. journal is x% full, y total entries are
- // waiting, z time has passed, etc.) and write all entries out to disk at
- // once.
- // Process all entries in the "wait" queue. These are all transactions with entries that
- // have been enqueued to disk, and are waiting to verify that the write has completed.
- processor->ProcessWaitQueue();
-
- // TODO(planders): Similarly to the wait queue, instead of immediately processing all delete
- // items, wait until some condition is fulfilled and process all journal
- // deletions at once.
-
- // Track which entries have been fully persisted to their final on disk-location. Once we
- // have received verification that they have successfully completed, we can remove them
- // from the journal buffer to make space for new entries.
- processor->ProcessDeleteQueue();
-
- if (processor->HasError()) {
- {
- fbl::AutoLock lock(&lock_);
-
- // The thread signalling us should already be setting the Journal to read_only_, but in
- // case we managed to grab the lock first, set it again here.
- state_ = WritebackState::kReadOnly;
-
- // Reset the journal length to unblock transactions awaiting space,
- // No more writes to the buffer will be allowed.
- entries_->FreeAllSpace();
- }
-
- // Reset any pending delete requests (if any exist).
- processor->ResetWork();
- } else if (processor->GetBlocksProcessed() > 0) {
- uint64_t start, length;
-
- {
- fbl::AutoLock lock(&lock_);
-
- // Update the journal start/len to reflect the number of blocks that have been fully
- // processed.
- entries_->FreeSpace(processor->GetBlocksProcessed());
-
- start = entries_->start();
- length = entries_->length();
- }
-
- // The journal start/len have changed, so write out the info block.
- WriteInfo(start, length);
-
- // After the super block update has been queued for writeback, we can now "delete"
- // the entries that were previously pointed to by the info block. This must be done
- // after the info block write so that the info block does not point to invalid
- // entries.
- processor->EnqueueWork();
- }
-
- // If we are not in an error state and did not process any blocks, then the
- // JournalProcessor's work should be not have been initialized. This condition will be
- // checked at the beginning of the next call to ProcessQueue.
-
- // Since none of the methods in the kSync profile indicate that an entry should be added to
- // the next queue, it should be fine to pass a null output queue here.
- processor->ProcessSyncQueue();
-}
-
-void Journal::ProcessLoop() {
- JournalProcessor processor(this);
- while (true) {
- ProcessQueues(&processor);
-
- fbl::AutoLock lock(&lock_);
-
- // Signal the producer queue that space in the journal has (possibly) been freed up.
- cnd_signal(&producer_cvar_);
-
- // Before waiting, we should check if we're unmounting.
- if (unmounting_ && work_queue_.is_empty() && processor.IsEmpty() &&
- producer_queue_.is_empty()) {
- // Only return if we are unmounting AND all entries in all queues have been
- // processed. This includes producers which are currently waiting to be enqueued.
- state_ = WritebackState::kComplete;
- break;
- }
-
- // If we received a signal while we were processing other queues,
- // immediately start processing again.
- if (!consumer_signalled_) {
- cnd_wait(&consumer_cvar_, lock_.GetInternal());
- }
-
- consumer_signalled_ = false;
- }
-}
-
-void JournalProcessor::ProcessWorkEntry(fbl::unique_ptr<JournalEntry> entry) {
- SetContext(ProcessorContext::kWork);
- ProcessResult result = ProcessEntry(entry.get());
- ZX_DEBUG_ASSERT(result == ProcessResult::kContinue);
-
- // Enqueue the entry into the wait_queue, even in the case of error. This is so that
- // all works contained by journal entries will be processed in the second step, even if
- // we do not plan to send them along to the writeback queue.
- wait_queue_.push(std::move(entry));
-}
-
-void JournalProcessor::ProcessWaitQueue() {
- SetContext(ProcessorContext::kWait);
- ProcessQueue(&wait_queue_, &delete_queue_);
-}
-
-void JournalProcessor::ProcessDeleteQueue() {
- SetContext(ProcessorContext::kDelete);
- ProcessQueue(&delete_queue_, &sync_queue_);
-}
-void JournalProcessor::ProcessSyncQueue() {
- SetContext(ProcessorContext::kSync);
- ProcessQueue(&sync_queue_, nullptr);
-}
-
-void JournalProcessor::SetContext(ProcessorContext context) {
- if (context_ != context) {
- // If we are switching from the sync profile, sync queue must be empty.
- ZX_DEBUG_ASSERT(context_ != ProcessorContext::kSync || sync_queue_.is_empty());
-
- switch (context) {
- case ProcessorContext::kDefault:
- ZX_DEBUG_ASSERT(context_ == ProcessorContext::kSync);
- break;
- case ProcessorContext::kWork:
- ZX_DEBUG_ASSERT(context_ == ProcessorContext::kDefault ||
- context_ == ProcessorContext::kSync);
- break;
- case ProcessorContext::kWait:
- ZX_DEBUG_ASSERT(context_ != ProcessorContext::kDelete);
- break;
- case ProcessorContext::kDelete:
- ZX_DEBUG_ASSERT(context_ == ProcessorContext::kWait);
- break;
- case ProcessorContext::kSync:
- ZX_DEBUG_ASSERT(context_ == ProcessorContext::kDelete);
- break;
- default:
- ZX_DEBUG_ASSERT(false);
- }
-
- // Make sure that if a WritebackWork was established,
- // it was removed before we attempt to switch profiles.
- ZX_DEBUG_ASSERT(work_ == nullptr);
- blocks_processed_ = 0;
- context_ = context;
- }
-}
-
-void JournalProcessor::ProcessQueue(EntryQueue* in_queue, EntryQueue* out_queue) {
- // Process queue entries until there are none left, or we are told to wait.
- while (!in_queue->is_empty()) {
- // Process the entry before removing it from the queue.
- // If its status is kWaiting, we don't want to remove it.
- ProcessResult result = ProcessEntry(&in_queue->front());
-
- if (result == ProcessResult::kWait) {
- break;
- }
-
- auto entry = in_queue->pop();
-
- if (result == ProcessResult::kContinue) {
- ZX_DEBUG_ASSERT(out_queue != nullptr);
- out_queue->push(std::move(entry));
- } else {
- ZX_DEBUG_ASSERT(result == ProcessResult::kRemove);
- }
- }
-}
-
-ProcessResult JournalProcessor::ProcessEntry(JournalEntry* entry) {
- ZX_DEBUG_ASSERT(entry != nullptr);
-
- // Retrieve the entry status once up front so we don't have to keep atomically loading it.
- EntryStatus entry_status = entry->GetStatus();
-
- if (entry_status == EntryStatus::kWaiting) {
- // If the entry at the front of the queue is still waiting, we are done processing this
- // queue for the time being.
- return ProcessResult::kWait;
- }
-
- if (error_ && entry_status != EntryStatus::kSync) {
- // If we are in an error state and the entry is not a "sync" entry,
- // set the state to error so we do not do any unnecessary work.
- //
- // Since the error state takes precedence over the entry state,
- // we do not also have to set the entry state to error.
- entry_status = EntryStatus::kError;
- }
-
- if (entry_status == EntryStatus::kInit && context_ == ProcessorContext::kWork) {
- return ProcessWorkDefault(entry);
- }
-
- if (entry_status == EntryStatus::kPersisted) {
- if (context_ == ProcessorContext::kWait) {
- return ProcessWaitDefault(entry);
- }
-
- if (context_ == ProcessorContext::kDelete) {
- return ProcessDeleteDefault(entry);
- }
- }
-
- if (entry_status == EntryStatus::kSync) {
- if (context_ == ProcessorContext::kSync) {
- return ProcessSyncComplete(entry);
- }
-
- if (context_ != ProcessorContext::kDefault) {
- return ProcessSyncDefault(entry);
- }
- }
-
- if (entry_status == EntryStatus::kError) {
- if (context_ == ProcessorContext::kWork) {
- return ProcessErrorDefault();
- }
-
- if (context_ == ProcessorContext::kWait || context_ == ProcessorContext::kDelete) {
- return ProcessErrorComplete(entry);
- }
- }
-
- return ProcessUnsupported();
-}
-
-ProcessResult JournalProcessor::ProcessWorkDefault(JournalEntry* entry) {
- // If the entry is in the "init" state, we can now prepare its header/commit blocks
- // in the journal buffer.
- journal_->PrepareBuffer(entry);
- EntryStatus last_status = entry->SetStatus(EntryStatus::kWaiting);
-
- if (last_status == EntryStatus::kError) {
- // If the WritebackThread has failed and set our journal entry to an error
- // state in the time it's taken to prepare the buffer, set error state to
- // true. If we do not check this and continue having set the status to
- // kWaiting, we will never get another callback for this journal entry and
- // we will be stuck forever waiting for it to complete.
- error_ = true;
- entry->SetStatus(EntryStatus::kError);
- } else {
- ZX_DEBUG_ASSERT(last_status == EntryStatus::kInit);
- if (work_ == nullptr) {
- // Prepare a "dummy" work to kick off the writeback queue now that our entry is ready.
- // This is unnecessary in the case of an error, since the writeback queue will already
- // be failing all incoming transactions.
- work_ = journal_->CreateWork();
- }
- }
-
- return ProcessResult::kContinue;
-}
-
-ProcessResult JournalProcessor::ProcessWaitDefault(JournalEntry* entry) {
- EntryStatus last_status = entry->SetStatus(EntryStatus::kWaiting);
- ZX_DEBUG_ASSERT(last_status == EntryStatus::kPersisted);
- fbl::unique_ptr<WritebackWork> work = entry->TakeWork();
- journal_->EnqueueEntryWork(std::move(work));
- return ProcessResult::kContinue;
-}
-
-ProcessResult JournalProcessor::ProcessDeleteDefault(JournalEntry* entry) {
- if (work_ == nullptr) {
- // Use this work to enqueue any "delete" transactions we may encounter,
- // to be written after the info block is updated.
- work_ = journal_->CreateWork();
- }
-
- // The entry has now been fully persisted to disk, so we can remove the entry from
- // the journal. To ensure that it does not later get replayed unnecessarily, clear
- // out the header and commit blocks.
- journal_->PrepareDelete(entry, work_.get());
-
- // Track the number of blocks that have been fully processed so we can update the buffer.
- blocks_processed_ += entry->BlockCount();
-
- // We have fully processed this entry - do not add it to the next queue.
- return ProcessResult::kRemove;
-}
-
-ProcessResult JournalProcessor::ProcessSyncDefault(JournalEntry* entry) {
- // This is a sync request. Since there is no actual data to update,
- // we can just verify it and send it along to the next queue.
- ZX_DEBUG_ASSERT(entry->BlockCount() == 0);
- ZX_DEBUG_ASSERT(entry->GetHeaderIndex() == journal_->GetCapacity());
- ZX_DEBUG_ASSERT(entry->GetCommitIndex() == journal_->GetCapacity());
-
- // Always push the sync entry into the output queue.
- return ProcessResult::kContinue;
-}
-
-ProcessResult JournalProcessor::ProcessSyncComplete(JournalEntry* entry) {
- // Call the default sync method to ensure the entry matches what we expect.
- ProcessSyncDefault(entry);
-
- // Remove and enqueue the sync work.
- fbl::unique_ptr<WritebackWork> work = entry->TakeWork();
- journal_->EnqueueEntryWork(std::move(work));
-
- // The sync entry is complete; do not re-enqueue it.
- return ProcessResult::kRemove;
-}
-
-ProcessResult JournalProcessor::ProcessErrorDefault() {
- error_ = true;
- return ProcessResult::kContinue;
-}
-
-ProcessResult JournalProcessor::ProcessErrorComplete(JournalEntry* entry) {
- // If we are in an error state, force reset the entry's work. This will remove all
- // requests and call the sync closure (if it exists), thus completing this entry.
- entry->ForceReset();
- error_ = true;
-
- // Since all work is completed for this entry, we no longer need to send it along
- // to the next queue. Instead proceed to process the next entry.
- return ProcessResult::kRemove;
-}
-
-ProcessResult JournalProcessor::ProcessUnsupported() {
- ZX_ASSERT(false);
- return ProcessResult::kRemove;
-}
-
} // blobfs
diff --git a/zircon/system/ulib/blobfs/journal.cpp b/zircon/system/ulib/blobfs/journal.cpp
index 3aebd9d..764ff5c 100644
--- a/zircon/system/ulib/blobfs/journal.cpp
+++ b/zircon/system/ulib/blobfs/journal.cpp
@@ -3,8 +3,10 @@
// found in the LICENSE file.
#include <blobfs/journal.h>
+#include <fbl/auto_lock.h>
#include <fbl/unique_ptr.h>
#include <lib/cksum.h>
+#include <lib/sync/completion.h>
#include <zircon/types.h>
#include <utility>
@@ -20,84 +22,6 @@
return 0;
}
-JournalEntry::JournalEntry(JournalBase* journal, EntryStatus status, size_t header_index,
- size_t commit_index, fbl::unique_ptr<WritebackWork> work)
- : journal_(journal), status_(static_cast<uint32_t>(status)), block_count_(0),
- header_index_(header_index), commit_index_(commit_index), work_(std::move(work)) {
- if (status != EntryStatus::kInit) {
- // In the case of a sync request or error, return early.
- ZX_DEBUG_ASSERT(status == EntryStatus::kSync || status == EntryStatus::kError);
- return;
- }
-
- size_t work_blocks = work_->BlkCount();
- // Ensure the work is valid.
- ZX_DEBUG_ASSERT(work_blocks > 0);
- ZX_DEBUG_ASSERT(work_->IsBuffered());
- ZX_DEBUG_ASSERT(work_blocks <= kMaxEntryDataBlocks);
-
- // Copy all target blocks from the WritebackWork to the entry's header block.
- for (size_t i = 0; i < work_->Requests().size(); i++) {
- WriteRequest& request = work_->Requests()[i];
- for (size_t j = request.dev_offset; j < request.dev_offset + request.length; j++) {
- header_block_.target_blocks[block_count_++] = j;
- }
- }
-
- ZX_DEBUG_ASSERT(work_blocks == block_count_);
-
- // Set other information in the header/commit blocks.
- header_block_.magic = kEntryHeaderMagic;
- header_block_.num_blocks = block_count_;
- header_block_.timestamp = zx_ticks_get();
- commit_block_.magic = kEntryCommitMagic;
- commit_block_.timestamp = header_block_.timestamp;
- commit_block_.checksum = 0;
-}
-
-fbl::unique_ptr<WritebackWork> JournalEntry::TakeWork() {
- ZX_DEBUG_ASSERT(work_ != nullptr);
-
- if (header_index_ != commit_index_) {
- // If the journal entry contains any transactions, set the work closure to update the entry
- // status on write completion. This currently assumes that a WritebackWork with associated
- // transactions will NOT already have a closure attached. If we ever want to include
- // transactions on a syncing WritebackWork, we will need to revisit this.
- work_->SetSyncCallback(CreateSyncCallback());
- }
-
- return std::move(work_);
-}
-
-ReadyCallback JournalEntry::CreateReadyCallback() {
- return [this] () {
- // If the entry is in a waiting state, it is ready to be written to disk.
- return GetStatus() == EntryStatus::kWaiting;
- };
-}
-
-SyncCallback JournalEntry::CreateSyncCallback() {
- return [this] (zx_status_t result) {
- // Signal the journal that an entry is ready for processing.
- journal_->ProcessEntryResult(result, this);
- };
-}
-
-void JournalEntry::SetStatusFromResult(zx_status_t result) {
- // Set the state of the JournalEntry based on the last received result.
- if (result != ZX_OK) {
- SetStatus(EntryStatus::kError);
- return;
- }
-
- EntryStatus last_status = SetStatus(EntryStatus::kPersisted);
- ZX_DEBUG_ASSERT(last_status == EntryStatus::kWaiting);
-}
-
-void JournalEntry::SetChecksum(uint32_t checksum) {
- commit_block_.checksum = checksum;
-}
-
zx_status_t Journal::Create(TransactionManager* transaction_manager, uint64_t journal_blocks,
uint64_t start_block, fbl::unique_ptr<Journal>* out) {
// Create the buffer with 1 less than total journal blocks.
diff --git a/zircon/system/ulib/blobfs/test/writeback-test.cpp b/zircon/system/ulib/blobfs/test/writeback-test.cpp
index 482f86f..bcb368f 100644
--- a/zircon/system/ulib/blobfs/test/writeback-test.cpp
+++ b/zircon/system/ulib/blobfs/test/writeback-test.cpp
@@ -3,6 +3,7 @@
// found in the LICENSE file.
#include <blobfs/writeback.h>
+#include <blobfs/writeback-queue.h>
#include <unittest/unittest.h>
#include "utils.h"
diff --git a/zircon/system/ulib/blobfs/write-txn.cpp b/zircon/system/ulib/blobfs/write-txn.cpp
index 7b21695..2d29e88 100644
--- a/zircon/system/ulib/blobfs/write-txn.cpp
+++ b/zircon/system/ulib/blobfs/write-txn.cpp
@@ -2,9 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include <blobfs/writeback.h>
-
-#include <utility>
+#include <blobfs/metrics.h>
+#include <blobfs/write-txn.h>
namespace blobfs {
@@ -101,447 +100,4 @@
return status;
}
-void WritebackWork::MarkCompleted(zx_status_t status) {
- WriteTxn::Reset();
- if (sync_cb_) {
- sync_cb_(status);
- }
- sync_cb_ = nullptr;
- ready_cb_ = nullptr;
-}
-
-bool WritebackWork::IsReady() {
- if (ready_cb_) {
- if (ready_cb_()) {
- ready_cb_ = nullptr;
- return true;
- }
-
- return false;
- }
-
- return true;
-}
-
-void WritebackWork::SetReadyCallback(ReadyCallback callback) {
- ZX_DEBUG_ASSERT(!ready_cb_);
- ready_cb_ = std::move(callback);
-}
-
-void WritebackWork::SetSyncCallback(SyncCallback callback) {
- if (sync_cb_) {
- // This "callback chain" allows multiple clients to observe the completion
- // of the WritebackWork. This is akin to a promise "and-then" relationship.
- sync_cb_ = [previous_callback = std::move(sync_cb_),
- next_callback = std::move(callback)] (zx_status_t status) {
- next_callback(status);
- previous_callback(status);
- };
- } else {
- sync_cb_ = std::move(callback);
- }
-}
-
-// Returns the number of blocks of the writeback buffer that have been consumed
-zx_status_t WritebackWork::Complete() {
- zx_status_t status = Flush();
- MarkCompleted(status);
- return status;
-}
-
-WritebackWork::WritebackWork(TransactionManager* transaction_manager)
- : WriteTxn(transaction_manager), ready_cb_(nullptr), sync_cb_(nullptr) {}
-
-Buffer::~Buffer() {
- if (vmoid_ != VMOID_INVALID) {
- // Close the buffer vmo.
- block_fifo_request_t request;
- request.group = transaction_manager_->BlockGroupID();
- request.vmoid = vmoid_;
- request.opcode = BLOCKIO_CLOSE_VMO;
- transaction_manager_->Transaction(&request, 1);
- }
-}
-
-zx_status_t Buffer::Create(TransactionManager* blobfs, size_t blocks, const char* label,
- fbl::unique_ptr<Buffer>* out) {
-
- fzl::OwnedVmoMapper mapper;
- zx_status_t status = mapper.CreateAndMap(blocks * kBlobfsBlockSize, "blob-writeback");
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to create vmo\n");
- return status;
- }
-
- fbl::unique_ptr<Buffer> buffer(new Buffer(blobfs, std::move(mapper)));
- if ((status = buffer->transaction_manager_->AttachVmo(buffer->mapper_.vmo(), &buffer->vmoid_))
- != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to attach vmo\n");
- return status;
- }
-
- *out = std::move(buffer);
- return ZX_OK;
-}
-
-bool Buffer::IsSpaceAvailable(size_t blocks) const {
- // TODO(planders): Similar to minfs, make sure that we either have a fallback mechanism for
- // operations which are too large to be fully contained by the buffer, or that the
- // worst-case operation will always fit within the buffer.
- ZX_ASSERT_MSG(blocks <= capacity_, "Requested txn (%zu blocks) larger than buffer", blocks);
- return length_ + blocks <= capacity_;
-}
-
-void Buffer::CopyTransaction(WriteTxn* txn) {
- ZX_DEBUG_ASSERT(!txn->IsBuffered());
- auto& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- ZX_DEBUG_ASSERT(reqs[i].vmo != ZX_HANDLE_INVALID);
-
- // Read parameters of the current request.
- size_t vmo_offset = reqs[i].vmo_offset;
- size_t dev_offset = reqs[i].dev_offset;
- const size_t vmo_len = reqs[i].length;
- ZX_DEBUG_ASSERT(vmo_len > 0);
-
- // Calculate the offset/length we will need to write into the buffer.
- size_t buf_offset = (start_ + length_) % capacity_;
- size_t buf_len = (buf_offset + vmo_len > capacity_) ? capacity_ - buf_offset : vmo_len;
- size_t init_len = vmo_len;
- size_t total_len = buf_len;
-
- // Verify that the length is valid.
- ZX_DEBUG_ASSERT(buf_len > 0);
- ZX_DEBUG_ASSERT(buf_len <= vmo_len);
- ZX_DEBUG_ASSERT(buf_len < capacity_);
- zx_handle_t vmo = reqs[i].vmo;
- ZX_DEBUG_ASSERT(vmo != mapper_.vmo().get());
-
- // Write data from the vmo into the buffer.
- void* ptr = MutableData(buf_offset);
-
- zx_status_t status;
- ZX_DEBUG_ASSERT((start_ <= buf_offset) ?
- (start_ < buf_offset + buf_len) :
- (buf_offset + buf_len <= start_)); // Wraparound
- ZX_ASSERT_MSG((status = zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize)) == ZX_OK, "VMO Read Fail: %d", status);
-
- // Update the buffer len to include newly written data.
- length_ += buf_len;
-
- // Update the write_request to transfer from the writeback buffer out to disk,
- // rather than the supplied VMO.
- // Set the vmo handle to invalid, since we will be using the same vmoid for all requests.
- reqs[i].vmo = ZX_HANDLE_INVALID;
- reqs[i].vmo_offset = buf_offset;
- reqs[i].length = buf_len;
-
- if (buf_len != vmo_len) {
- // We wrapped around; write what remains from this request.
- vmo_offset += buf_len;
- dev_offset += buf_len;
- buf_len = vmo_len - buf_len;
- ZX_DEBUG_ASSERT(buf_len > 0);
-
- ptr = MutableData(0);
- ZX_DEBUG_ASSERT((start_ == 0) ? (start_ < buf_len) : (buf_len <= start_)); // Wraparound
- ZX_ASSERT(zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize) == ZX_OK);
-
- length_ += buf_len;
- total_len += buf_len;
-
- // Shift down all following write requests.
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
-
- // Shift down all following write requests
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
- // Insert the "new" request, which is the latter half of the last request
- WriteRequest request;
- request.vmo = vmo;
- request.vmo_offset = 0;
- request.dev_offset = dev_offset;
- request.length = buf_len;
- i++;
- reqs.insert(i, request);
- }
-
- // Verify that the length of all vmo writes we did match the total length we were meant to
- // write from the initial vmo.
- ZX_DEBUG_ASSERT(init_len == total_len);
- }
-
- txn->SetBuffer(vmoid_);
-}
-
-void Buffer::AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work) {
- // Ensure the request fits within the buffer.
- ZX_DEBUG_ASSERT(length > 0);
- ZX_DEBUG_ASSERT(start + length <= capacity_);
- ZX_DEBUG_ASSERT(work != nullptr);
- work->Enqueue(mapper_.vmo(), start, disk_start, length);
-}
-
-bool Buffer::VerifyTransaction(WriteTxn* txn) const {
- if (txn->CheckBuffer(vmoid_)) {
- if (txn->BlkCount() > 0) {
- // If the work belongs to the WritebackQueue, verify that it matches up with the
- // buffer's start/len.
- ZX_ASSERT(txn->BlkStart() == start_);
- ZX_ASSERT(txn->BlkCount() <= length_);
- }
-
- return true;
- }
-
- return false;
-}
-
-void Buffer::ValidateTransaction(WriteTxn* txn) {
- if (txn->IsBuffered()) {
- // If transaction is already buffered, make sure it belongs to this buffer.
- ZX_DEBUG_ASSERT(txn->CheckBuffer(vmoid_));
- } else {
- fbl::Vector<WriteRequest>& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- // Verify that each request references this buffer VMO,
- // and that the transaction fits within the buffer.
- ZX_DEBUG_ASSERT(reqs[i].vmo == mapper_.vmo().get());
- reqs[i].vmo = ZX_HANDLE_INVALID;
- }
-
- // Once each request has been verified, set the buffer.
- txn->SetBuffer(vmoid_);
- }
-}
-
-void Buffer::FreeSpace(size_t blocks) {
- ZX_DEBUG_ASSERT(blocks <= length_);
- start_ = (start_ + blocks) % capacity_;
- length_ -= blocks;
-}
-
-WritebackQueue::~WritebackQueue() {
- // Ensure that thread teardown has completed, or that it was never brought up to begin with.
- ZX_DEBUG_ASSERT(!IsRunning());
- ZX_DEBUG_ASSERT(work_queue_.is_empty());
-}
-
-zx_status_t WritebackQueue::Teardown() {
- WritebackState state;
-
- {
- fbl::AutoLock lock(&lock_);
- state = state_;
-
- // Signal the background thread.
- unmounting_ = true;
- cnd_signal(&work_added_);
- }
-
- zx_status_t status = ZX_OK;
- if (state != WritebackState::kInit) {
- // Block until the thread completes itself.
- int result = -1;
- int success = thrd_join(worker_, &result);
- if (result != 0 || success != thrd_success) {
- status = ZX_ERR_INTERNAL;
- }
- }
-
- return status;
-}
-
-zx_status_t WritebackQueue::Create(TransactionManager* transaction_manager,
- const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out) {
- zx_status_t status;
- fbl::unique_ptr<Buffer> buffer;
- if ((status = Buffer::Create(transaction_manager, buffer_blocks, "blobfs-writeback", &buffer))
- != ZX_OK) {
- return status;
- }
-
- fbl::unique_ptr<WritebackQueue> wb(new WritebackQueue(std::move(buffer)));
-
- if (cnd_init(&wb->work_completed_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
- if (cnd_init(&wb->work_added_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- if (thrd_create_with_name(&wb->worker_,
- WritebackQueue::WritebackThread, wb.get(),
- "blobfs-writeback") != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- fbl::AutoLock lock(&wb->lock_);
- wb->state_ = WritebackState::kRunning;
- *out = std::move(wb);
- return ZX_OK;
-}
-
-zx_status_t WritebackQueue::Enqueue(fbl::unique_ptr<WritebackWork> work) {
- TRACE_DURATION("blobfs", "WritebackQueue::Enqueue", "work ptr", work.get());
- fbl::AutoLock lock(&lock_);
- zx_status_t status = ZX_OK;
-
- if (IsReadOnly()) {
- // If we are in a readonly state, return an error. However, the work should still be
- // enqueued and ultimately processed by the WritebackThread. This will help us avoid
- // potential race conditions if the work callback must acquire a lock.
- status = ZX_ERR_BAD_STATE;
- } else if (!work->IsBuffered()) {
- ZX_DEBUG_ASSERT(state_ == WritebackState::kRunning);
-
- // Only copy blocks to the buffer if they have not already been copied to another buffer.
- EnsureSpaceLocked(work->BlkCount());
-
- // It is possible that the queue entered a read only state
- // while we were waiting to ensure space, so check again now.
- if (IsReadOnly()) {
- status = ZX_ERR_BAD_STATE;
- } else {
- buffer_->CopyTransaction(work.get());
- }
- }
-
- work_queue_.push(std::move(work));
- cnd_signal(&work_added_);
- return status;
-}
-
-bool WritebackQueue::IsRunning() const {
- switch (state_) {
- case WritebackState::kRunning:
- case WritebackState::kReadOnly:
- return true;
- default:
- return false;
- }
-}
-
-void WritebackQueue::EnsureSpaceLocked(size_t blocks) {
- while (!buffer_->IsSpaceAvailable(blocks)) {
- // Not enough room to write back work, yet. Wait until room is available.
- Waiter w;
- producer_queue_.push(&w);
-
- do {
- cnd_wait(&work_completed_, lock_.GetInternal());
- } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
- (!buffer_->IsSpaceAvailable(blocks))); // ... and there is enough space for us.
-
- producer_queue_.pop();
- }
-}
-
-int WritebackQueue::WritebackThread(void* arg) {
- WritebackQueue* b = reinterpret_cast<WritebackQueue*>(arg);
-
- b->lock_.Acquire();
- while (true) {
- bool error = b->IsReadOnly();
- while (!b->work_queue_.is_empty()) {
- if (!error && !b->work_queue_.front().IsReady()) {
- // If the work is not yet ready, break and wait until we receive another signal.
- break;
- }
-
- auto work = b->work_queue_.pop();
- TRACE_DURATION("blobfs", "WritebackQueue::WritebackThread", "work ptr", work.get());
-
- bool our_buffer = b->buffer_->VerifyTransaction(work.get());
- size_t blk_count = work->BlkCount();
-
- // Stay unlocked while processing a unit of work.
- b->lock_.Release();
-
- if (error) {
- // If we are in a read only state, mark the work complete with an error status.
- work->MarkCompleted(ZX_ERR_BAD_STATE);
- } else {
- // If we should complete the work, make sure it has been buffered.
- // (This is not necessary if we are currently in an error state).
- ZX_DEBUG_ASSERT(work->IsBuffered());
- zx_status_t status;
- if ((status = work->Complete()) != ZX_OK) {
- FS_TRACE_ERROR("Work failed with status %d - "
- "converting writeback to read only state.\n", status);
- // If work completion failed, set the buffer to an error state.
- error = true;
- }
- }
-
- work = nullptr;
- b->lock_.Acquire();
-
- if (error) {
- // If we encountered an error, set the queue to readonly.
- b->state_ = WritebackState::kReadOnly;
- }
-
- if (our_buffer) {
- // If the last work we processed belonged to our buffer,
- // update the buffer's start/len accordingly.
- b->buffer_->FreeSpace(blk_count);
- }
-
- // We may have opened up space (or entered a read only state),
- // so signal the producer queue.
- cnd_signal(&b->work_completed_);
- }
-
- // Before waiting, we should check if we're unmounting.
- // If work still remains in the work or producer queues,
- // continue the loop until they are empty.
- if (b->unmounting_ && b->work_queue_.is_empty() && b->producer_queue_.is_empty()) {
- ZX_DEBUG_ASSERT(b->work_queue_.is_empty());
- ZX_DEBUG_ASSERT(b->producer_queue_.is_empty());
- b->state_ = WritebackState::kComplete;
- b->lock_.Release();
- return 0;
- }
-
- cnd_wait(&b->work_added_, b->lock_.GetInternal());
- }
-}
-
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- const size_t kMaxChunkBlocks = (3 * transaction_manager->WritebackCapacity()) / 4;
- uint64_t delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- while (nblocks > 0) {
- if ((*work)->BlkCount() + delta_blocks > kMaxChunkBlocks) {
- // If enqueueing these blocks could push us past the writeback buffer capacity
- // when combined with all previous writes, break this transaction into a smaller
- // chunk first.
- fbl::unique_ptr<WritebackWork> tmp;
- zx_status_t status = transaction_manager->CreateWork(&tmp, vn);
- if (status != ZX_OK) {
- return status;
- }
- if ((status = transaction_manager->EnqueueWork(std::move(*work),
- EnqueueType::kData)) != ZX_OK) {
- return status;
- }
- *work = std::move(tmp);
- }
-
- (*work)->Enqueue(vmo, relative_block, absolute_block, delta_blocks);
- relative_block += delta_blocks;
- absolute_block += delta_blocks;
- nblocks -= delta_blocks;
- delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- }
- return ZX_OK;
-}
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/writeback-queue.cpp b/zircon/system/ulib/blobfs/writeback-queue.cpp
index 7b21695..574d0e9 100644
--- a/zircon/system/ulib/blobfs/writeback-queue.cpp
+++ b/zircon/system/ulib/blobfs/writeback-queue.cpp
@@ -2,328 +2,15 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include <blobfs/writeback.h>
+#include <blobfs/buffer.h>
+#include <blobfs/writeback-queue.h>
+#include <blobfs/writeback-work.h>
+#include <fbl/auto_lock.h>
#include <utility>
namespace blobfs {
-WriteTxn::~WriteTxn() {
- ZX_DEBUG_ASSERT_MSG(requests_.is_empty(), "WriteTxn still has pending requests");
-}
-
-void WriteTxn::Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- ZX_DEBUG_ASSERT(vmo.is_valid());
- ZX_DEBUG_ASSERT(!IsBuffered());
-
- for (auto& request : requests_) {
- if (request.vmo != vmo.get()) {
- continue;
- }
-
- if (request.vmo_offset == relative_block) {
- // Take the longer of the operations (if operating on the same blocks).
- if (nblocks > request.length) {
- block_count_ += (nblocks - request.length);
- request.length = nblocks;
- }
- return;
- } else if ((request.vmo_offset + request.length == relative_block) &&
- (request.dev_offset + request.length == absolute_block)) {
- // Combine with the previous request, if immediately following.
- request.length += nblocks;
- block_count_ += nblocks;
- return;
- }
- }
-
- WriteRequest request;
- request.vmo = vmo.get();
- request.vmo_offset = relative_block;
- request.dev_offset = absolute_block;
- request.length = nblocks;
- requests_.push_back(std::move(request));
- block_count_ += request.length;
-}
-
-size_t WriteTxn::BlkStart() const {
- ZX_DEBUG_ASSERT(IsBuffered());
- ZX_DEBUG_ASSERT(requests_.size() > 0);
- return requests_[0].vmo_offset;
-}
-
-size_t WriteTxn::BlkCount() const {
- return block_count_;
-}
-
-void WriteTxn::SetBuffer(vmoid_t vmoid) {
- ZX_DEBUG_ASSERT(vmoid_ == VMOID_INVALID || vmoid_ == vmoid);
- ZX_DEBUG_ASSERT(vmoid != VMOID_INVALID);
- vmoid_ = vmoid;
-}
-
-zx_status_t WriteTxn::Flush() {
- ZX_ASSERT(IsBuffered());
- fs::Ticker ticker(transaction_manager_->LocalMetrics().Collecting());
-
- // Update all the outgoing transactions to be in disk blocks
- block_fifo_request_t blk_reqs[requests_.size()];
- const uint32_t kDiskBlocksPerBlobfsBlock =
- kBlobfsBlockSize / transaction_manager_->DeviceBlockSize();
- for (size_t i = 0; i < requests_.size(); i++) {
- blk_reqs[i].group = transaction_manager_->BlockGroupID();
- blk_reqs[i].vmoid = vmoid_;
- blk_reqs[i].opcode = BLOCKIO_WRITE;
- blk_reqs[i].vmo_offset = requests_[i].vmo_offset * kDiskBlocksPerBlobfsBlock;
- blk_reqs[i].dev_offset = requests_[i].dev_offset * kDiskBlocksPerBlobfsBlock;
- uint64_t length = requests_[i].length * kDiskBlocksPerBlobfsBlock;
- // TODO(ZX-2253): Requests this long, although unlikely, should be
- // handled more gracefully.
- ZX_ASSERT_MSG(length < UINT32_MAX, "Request size too large");
- blk_reqs[i].length = static_cast<uint32_t>(length);
- }
-
- // Actually send the operations to the underlying block device.
- zx_status_t status = transaction_manager_->Transaction(blk_reqs, requests_.size());
-
- if (transaction_manager_->LocalMetrics().Collecting()) {
- uint64_t sum = 0;
- for (const auto& blk_req : blk_reqs) {
- sum += blk_req.length * kBlobfsBlockSize;
- }
- transaction_manager_->LocalMetrics().UpdateWriteback(sum, ticker.End());
- }
-
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- block_count_ = 0;
- return status;
-}
-
-void WritebackWork::MarkCompleted(zx_status_t status) {
- WriteTxn::Reset();
- if (sync_cb_) {
- sync_cb_(status);
- }
- sync_cb_ = nullptr;
- ready_cb_ = nullptr;
-}
-
-bool WritebackWork::IsReady() {
- if (ready_cb_) {
- if (ready_cb_()) {
- ready_cb_ = nullptr;
- return true;
- }
-
- return false;
- }
-
- return true;
-}
-
-void WritebackWork::SetReadyCallback(ReadyCallback callback) {
- ZX_DEBUG_ASSERT(!ready_cb_);
- ready_cb_ = std::move(callback);
-}
-
-void WritebackWork::SetSyncCallback(SyncCallback callback) {
- if (sync_cb_) {
- // This "callback chain" allows multiple clients to observe the completion
- // of the WritebackWork. This is akin to a promise "and-then" relationship.
- sync_cb_ = [previous_callback = std::move(sync_cb_),
- next_callback = std::move(callback)] (zx_status_t status) {
- next_callback(status);
- previous_callback(status);
- };
- } else {
- sync_cb_ = std::move(callback);
- }
-}
-
-// Returns the number of blocks of the writeback buffer that have been consumed
-zx_status_t WritebackWork::Complete() {
- zx_status_t status = Flush();
- MarkCompleted(status);
- return status;
-}
-
-WritebackWork::WritebackWork(TransactionManager* transaction_manager)
- : WriteTxn(transaction_manager), ready_cb_(nullptr), sync_cb_(nullptr) {}
-
-Buffer::~Buffer() {
- if (vmoid_ != VMOID_INVALID) {
- // Close the buffer vmo.
- block_fifo_request_t request;
- request.group = transaction_manager_->BlockGroupID();
- request.vmoid = vmoid_;
- request.opcode = BLOCKIO_CLOSE_VMO;
- transaction_manager_->Transaction(&request, 1);
- }
-}
-
-zx_status_t Buffer::Create(TransactionManager* blobfs, size_t blocks, const char* label,
- fbl::unique_ptr<Buffer>* out) {
-
- fzl::OwnedVmoMapper mapper;
- zx_status_t status = mapper.CreateAndMap(blocks * kBlobfsBlockSize, "blob-writeback");
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to create vmo\n");
- return status;
- }
-
- fbl::unique_ptr<Buffer> buffer(new Buffer(blobfs, std::move(mapper)));
- if ((status = buffer->transaction_manager_->AttachVmo(buffer->mapper_.vmo(), &buffer->vmoid_))
- != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to attach vmo\n");
- return status;
- }
-
- *out = std::move(buffer);
- return ZX_OK;
-}
-
-bool Buffer::IsSpaceAvailable(size_t blocks) const {
- // TODO(planders): Similar to minfs, make sure that we either have a fallback mechanism for
- // operations which are too large to be fully contained by the buffer, or that the
- // worst-case operation will always fit within the buffer.
- ZX_ASSERT_MSG(blocks <= capacity_, "Requested txn (%zu blocks) larger than buffer", blocks);
- return length_ + blocks <= capacity_;
-}
-
-void Buffer::CopyTransaction(WriteTxn* txn) {
- ZX_DEBUG_ASSERT(!txn->IsBuffered());
- auto& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- ZX_DEBUG_ASSERT(reqs[i].vmo != ZX_HANDLE_INVALID);
-
- // Read parameters of the current request.
- size_t vmo_offset = reqs[i].vmo_offset;
- size_t dev_offset = reqs[i].dev_offset;
- const size_t vmo_len = reqs[i].length;
- ZX_DEBUG_ASSERT(vmo_len > 0);
-
- // Calculate the offset/length we will need to write into the buffer.
- size_t buf_offset = (start_ + length_) % capacity_;
- size_t buf_len = (buf_offset + vmo_len > capacity_) ? capacity_ - buf_offset : vmo_len;
- size_t init_len = vmo_len;
- size_t total_len = buf_len;
-
- // Verify that the length is valid.
- ZX_DEBUG_ASSERT(buf_len > 0);
- ZX_DEBUG_ASSERT(buf_len <= vmo_len);
- ZX_DEBUG_ASSERT(buf_len < capacity_);
- zx_handle_t vmo = reqs[i].vmo;
- ZX_DEBUG_ASSERT(vmo != mapper_.vmo().get());
-
- // Write data from the vmo into the buffer.
- void* ptr = MutableData(buf_offset);
-
- zx_status_t status;
- ZX_DEBUG_ASSERT((start_ <= buf_offset) ?
- (start_ < buf_offset + buf_len) :
- (buf_offset + buf_len <= start_)); // Wraparound
- ZX_ASSERT_MSG((status = zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize)) == ZX_OK, "VMO Read Fail: %d", status);
-
- // Update the buffer len to include newly written data.
- length_ += buf_len;
-
- // Update the write_request to transfer from the writeback buffer out to disk,
- // rather than the supplied VMO.
- // Set the vmo handle to invalid, since we will be using the same vmoid for all requests.
- reqs[i].vmo = ZX_HANDLE_INVALID;
- reqs[i].vmo_offset = buf_offset;
- reqs[i].length = buf_len;
-
- if (buf_len != vmo_len) {
- // We wrapped around; write what remains from this request.
- vmo_offset += buf_len;
- dev_offset += buf_len;
- buf_len = vmo_len - buf_len;
- ZX_DEBUG_ASSERT(buf_len > 0);
-
- ptr = MutableData(0);
- ZX_DEBUG_ASSERT((start_ == 0) ? (start_ < buf_len) : (buf_len <= start_)); // Wraparound
- ZX_ASSERT(zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize) == ZX_OK);
-
- length_ += buf_len;
- total_len += buf_len;
-
- // Shift down all following write requests.
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
-
- // Shift down all following write requests
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
- // Insert the "new" request, which is the latter half of the last request
- WriteRequest request;
- request.vmo = vmo;
- request.vmo_offset = 0;
- request.dev_offset = dev_offset;
- request.length = buf_len;
- i++;
- reqs.insert(i, request);
- }
-
- // Verify that the length of all vmo writes we did match the total length we were meant to
- // write from the initial vmo.
- ZX_DEBUG_ASSERT(init_len == total_len);
- }
-
- txn->SetBuffer(vmoid_);
-}
-
-void Buffer::AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work) {
- // Ensure the request fits within the buffer.
- ZX_DEBUG_ASSERT(length > 0);
- ZX_DEBUG_ASSERT(start + length <= capacity_);
- ZX_DEBUG_ASSERT(work != nullptr);
- work->Enqueue(mapper_.vmo(), start, disk_start, length);
-}
-
-bool Buffer::VerifyTransaction(WriteTxn* txn) const {
- if (txn->CheckBuffer(vmoid_)) {
- if (txn->BlkCount() > 0) {
- // If the work belongs to the WritebackQueue, verify that it matches up with the
- // buffer's start/len.
- ZX_ASSERT(txn->BlkStart() == start_);
- ZX_ASSERT(txn->BlkCount() <= length_);
- }
-
- return true;
- }
-
- return false;
-}
-
-void Buffer::ValidateTransaction(WriteTxn* txn) {
- if (txn->IsBuffered()) {
- // If transaction is already buffered, make sure it belongs to this buffer.
- ZX_DEBUG_ASSERT(txn->CheckBuffer(vmoid_));
- } else {
- fbl::Vector<WriteRequest>& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- // Verify that each request references this buffer VMO,
- // and that the transaction fits within the buffer.
- ZX_DEBUG_ASSERT(reqs[i].vmo == mapper_.vmo().get());
- reqs[i].vmo = ZX_HANDLE_INVALID;
- }
-
- // Once each request has been verified, set the buffer.
- txn->SetBuffer(vmoid_);
- }
-}
-
-void Buffer::FreeSpace(size_t blocks) {
- ZX_DEBUG_ASSERT(blocks <= length_);
- start_ = (start_ + blocks) % capacity_;
- length_ -= blocks;
-}
-
WritebackQueue::~WritebackQueue() {
// Ensure that thread teardown has completed, or that it was never brought up to begin with.
ZX_DEBUG_ASSERT(!IsRunning());
@@ -512,36 +199,4 @@
}
}
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- const size_t kMaxChunkBlocks = (3 * transaction_manager->WritebackCapacity()) / 4;
- uint64_t delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- while (nblocks > 0) {
- if ((*work)->BlkCount() + delta_blocks > kMaxChunkBlocks) {
- // If enqueueing these blocks could push us past the writeback buffer capacity
- // when combined with all previous writes, break this transaction into a smaller
- // chunk first.
- fbl::unique_ptr<WritebackWork> tmp;
- zx_status_t status = transaction_manager->CreateWork(&tmp, vn);
- if (status != ZX_OK) {
- return status;
- }
- if ((status = transaction_manager->EnqueueWork(std::move(*work),
- EnqueueType::kData)) != ZX_OK) {
- return status;
- }
- *work = std::move(tmp);
- }
-
- (*work)->Enqueue(vmo, relative_block, absolute_block, delta_blocks);
- relative_block += delta_blocks;
- absolute_block += delta_blocks;
- nblocks -= delta_blocks;
- delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- }
- return ZX_OK;
-}
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/writeback-work.cpp b/zircon/system/ulib/blobfs/writeback-work.cpp
index 7b21695..f2365d2 100644
--- a/zircon/system/ulib/blobfs/writeback-work.cpp
+++ b/zircon/system/ulib/blobfs/writeback-work.cpp
@@ -2,105 +2,12 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include <blobfs/writeback.h>
+#include <blobfs/writeback-work.h>
#include <utility>
namespace blobfs {
-WriteTxn::~WriteTxn() {
- ZX_DEBUG_ASSERT_MSG(requests_.is_empty(), "WriteTxn still has pending requests");
-}
-
-void WriteTxn::Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- ZX_DEBUG_ASSERT(vmo.is_valid());
- ZX_DEBUG_ASSERT(!IsBuffered());
-
- for (auto& request : requests_) {
- if (request.vmo != vmo.get()) {
- continue;
- }
-
- if (request.vmo_offset == relative_block) {
- // Take the longer of the operations (if operating on the same blocks).
- if (nblocks > request.length) {
- block_count_ += (nblocks - request.length);
- request.length = nblocks;
- }
- return;
- } else if ((request.vmo_offset + request.length == relative_block) &&
- (request.dev_offset + request.length == absolute_block)) {
- // Combine with the previous request, if immediately following.
- request.length += nblocks;
- block_count_ += nblocks;
- return;
- }
- }
-
- WriteRequest request;
- request.vmo = vmo.get();
- request.vmo_offset = relative_block;
- request.dev_offset = absolute_block;
- request.length = nblocks;
- requests_.push_back(std::move(request));
- block_count_ += request.length;
-}
-
-size_t WriteTxn::BlkStart() const {
- ZX_DEBUG_ASSERT(IsBuffered());
- ZX_DEBUG_ASSERT(requests_.size() > 0);
- return requests_[0].vmo_offset;
-}
-
-size_t WriteTxn::BlkCount() const {
- return block_count_;
-}
-
-void WriteTxn::SetBuffer(vmoid_t vmoid) {
- ZX_DEBUG_ASSERT(vmoid_ == VMOID_INVALID || vmoid_ == vmoid);
- ZX_DEBUG_ASSERT(vmoid != VMOID_INVALID);
- vmoid_ = vmoid;
-}
-
-zx_status_t WriteTxn::Flush() {
- ZX_ASSERT(IsBuffered());
- fs::Ticker ticker(transaction_manager_->LocalMetrics().Collecting());
-
- // Update all the outgoing transactions to be in disk blocks
- block_fifo_request_t blk_reqs[requests_.size()];
- const uint32_t kDiskBlocksPerBlobfsBlock =
- kBlobfsBlockSize / transaction_manager_->DeviceBlockSize();
- for (size_t i = 0; i < requests_.size(); i++) {
- blk_reqs[i].group = transaction_manager_->BlockGroupID();
- blk_reqs[i].vmoid = vmoid_;
- blk_reqs[i].opcode = BLOCKIO_WRITE;
- blk_reqs[i].vmo_offset = requests_[i].vmo_offset * kDiskBlocksPerBlobfsBlock;
- blk_reqs[i].dev_offset = requests_[i].dev_offset * kDiskBlocksPerBlobfsBlock;
- uint64_t length = requests_[i].length * kDiskBlocksPerBlobfsBlock;
- // TODO(ZX-2253): Requests this long, although unlikely, should be
- // handled more gracefully.
- ZX_ASSERT_MSG(length < UINT32_MAX, "Request size too large");
- blk_reqs[i].length = static_cast<uint32_t>(length);
- }
-
- // Actually send the operations to the underlying block device.
- zx_status_t status = transaction_manager_->Transaction(blk_reqs, requests_.size());
-
- if (transaction_manager_->LocalMetrics().Collecting()) {
- uint64_t sum = 0;
- for (const auto& blk_req : blk_reqs) {
- sum += blk_req.length * kBlobfsBlockSize;
- }
- transaction_manager_->LocalMetrics().UpdateWriteback(sum, ticker.End());
- }
-
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- block_count_ = 0;
- return status;
-}
-
void WritebackWork::MarkCompleted(zx_status_t status) {
WriteTxn::Reset();
if (sync_cb_) {
@@ -152,396 +59,4 @@
WritebackWork::WritebackWork(TransactionManager* transaction_manager)
: WriteTxn(transaction_manager), ready_cb_(nullptr), sync_cb_(nullptr) {}
-Buffer::~Buffer() {
- if (vmoid_ != VMOID_INVALID) {
- // Close the buffer vmo.
- block_fifo_request_t request;
- request.group = transaction_manager_->BlockGroupID();
- request.vmoid = vmoid_;
- request.opcode = BLOCKIO_CLOSE_VMO;
- transaction_manager_->Transaction(&request, 1);
- }
-}
-
-zx_status_t Buffer::Create(TransactionManager* blobfs, size_t blocks, const char* label,
- fbl::unique_ptr<Buffer>* out) {
-
- fzl::OwnedVmoMapper mapper;
- zx_status_t status = mapper.CreateAndMap(blocks * kBlobfsBlockSize, "blob-writeback");
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to create vmo\n");
- return status;
- }
-
- fbl::unique_ptr<Buffer> buffer(new Buffer(blobfs, std::move(mapper)));
- if ((status = buffer->transaction_manager_->AttachVmo(buffer->mapper_.vmo(), &buffer->vmoid_))
- != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to attach vmo\n");
- return status;
- }
-
- *out = std::move(buffer);
- return ZX_OK;
-}
-
-bool Buffer::IsSpaceAvailable(size_t blocks) const {
- // TODO(planders): Similar to minfs, make sure that we either have a fallback mechanism for
- // operations which are too large to be fully contained by the buffer, or that the
- // worst-case operation will always fit within the buffer.
- ZX_ASSERT_MSG(blocks <= capacity_, "Requested txn (%zu blocks) larger than buffer", blocks);
- return length_ + blocks <= capacity_;
-}
-
-void Buffer::CopyTransaction(WriteTxn* txn) {
- ZX_DEBUG_ASSERT(!txn->IsBuffered());
- auto& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- ZX_DEBUG_ASSERT(reqs[i].vmo != ZX_HANDLE_INVALID);
-
- // Read parameters of the current request.
- size_t vmo_offset = reqs[i].vmo_offset;
- size_t dev_offset = reqs[i].dev_offset;
- const size_t vmo_len = reqs[i].length;
- ZX_DEBUG_ASSERT(vmo_len > 0);
-
- // Calculate the offset/length we will need to write into the buffer.
- size_t buf_offset = (start_ + length_) % capacity_;
- size_t buf_len = (buf_offset + vmo_len > capacity_) ? capacity_ - buf_offset : vmo_len;
- size_t init_len = vmo_len;
- size_t total_len = buf_len;
-
- // Verify that the length is valid.
- ZX_DEBUG_ASSERT(buf_len > 0);
- ZX_DEBUG_ASSERT(buf_len <= vmo_len);
- ZX_DEBUG_ASSERT(buf_len < capacity_);
- zx_handle_t vmo = reqs[i].vmo;
- ZX_DEBUG_ASSERT(vmo != mapper_.vmo().get());
-
- // Write data from the vmo into the buffer.
- void* ptr = MutableData(buf_offset);
-
- zx_status_t status;
- ZX_DEBUG_ASSERT((start_ <= buf_offset) ?
- (start_ < buf_offset + buf_len) :
- (buf_offset + buf_len <= start_)); // Wraparound
- ZX_ASSERT_MSG((status = zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize)) == ZX_OK, "VMO Read Fail: %d", status);
-
- // Update the buffer len to include newly written data.
- length_ += buf_len;
-
- // Update the write_request to transfer from the writeback buffer out to disk,
- // rather than the supplied VMO.
- // Set the vmo handle to invalid, since we will be using the same vmoid for all requests.
- reqs[i].vmo = ZX_HANDLE_INVALID;
- reqs[i].vmo_offset = buf_offset;
- reqs[i].length = buf_len;
-
- if (buf_len != vmo_len) {
- // We wrapped around; write what remains from this request.
- vmo_offset += buf_len;
- dev_offset += buf_len;
- buf_len = vmo_len - buf_len;
- ZX_DEBUG_ASSERT(buf_len > 0);
-
- ptr = MutableData(0);
- ZX_DEBUG_ASSERT((start_ == 0) ? (start_ < buf_len) : (buf_len <= start_)); // Wraparound
- ZX_ASSERT(zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize) == ZX_OK);
-
- length_ += buf_len;
- total_len += buf_len;
-
- // Shift down all following write requests.
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
-
- // Shift down all following write requests
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
- // Insert the "new" request, which is the latter half of the last request
- WriteRequest request;
- request.vmo = vmo;
- request.vmo_offset = 0;
- request.dev_offset = dev_offset;
- request.length = buf_len;
- i++;
- reqs.insert(i, request);
- }
-
- // Verify that the length of all vmo writes we did match the total length we were meant to
- // write from the initial vmo.
- ZX_DEBUG_ASSERT(init_len == total_len);
- }
-
- txn->SetBuffer(vmoid_);
-}
-
-void Buffer::AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work) {
- // Ensure the request fits within the buffer.
- ZX_DEBUG_ASSERT(length > 0);
- ZX_DEBUG_ASSERT(start + length <= capacity_);
- ZX_DEBUG_ASSERT(work != nullptr);
- work->Enqueue(mapper_.vmo(), start, disk_start, length);
-}
-
-bool Buffer::VerifyTransaction(WriteTxn* txn) const {
- if (txn->CheckBuffer(vmoid_)) {
- if (txn->BlkCount() > 0) {
- // If the work belongs to the WritebackQueue, verify that it matches up with the
- // buffer's start/len.
- ZX_ASSERT(txn->BlkStart() == start_);
- ZX_ASSERT(txn->BlkCount() <= length_);
- }
-
- return true;
- }
-
- return false;
-}
-
-void Buffer::ValidateTransaction(WriteTxn* txn) {
- if (txn->IsBuffered()) {
- // If transaction is already buffered, make sure it belongs to this buffer.
- ZX_DEBUG_ASSERT(txn->CheckBuffer(vmoid_));
- } else {
- fbl::Vector<WriteRequest>& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- // Verify that each request references this buffer VMO,
- // and that the transaction fits within the buffer.
- ZX_DEBUG_ASSERT(reqs[i].vmo == mapper_.vmo().get());
- reqs[i].vmo = ZX_HANDLE_INVALID;
- }
-
- // Once each request has been verified, set the buffer.
- txn->SetBuffer(vmoid_);
- }
-}
-
-void Buffer::FreeSpace(size_t blocks) {
- ZX_DEBUG_ASSERT(blocks <= length_);
- start_ = (start_ + blocks) % capacity_;
- length_ -= blocks;
-}
-
-WritebackQueue::~WritebackQueue() {
- // Ensure that thread teardown has completed, or that it was never brought up to begin with.
- ZX_DEBUG_ASSERT(!IsRunning());
- ZX_DEBUG_ASSERT(work_queue_.is_empty());
-}
-
-zx_status_t WritebackQueue::Teardown() {
- WritebackState state;
-
- {
- fbl::AutoLock lock(&lock_);
- state = state_;
-
- // Signal the background thread.
- unmounting_ = true;
- cnd_signal(&work_added_);
- }
-
- zx_status_t status = ZX_OK;
- if (state != WritebackState::kInit) {
- // Block until the thread completes itself.
- int result = -1;
- int success = thrd_join(worker_, &result);
- if (result != 0 || success != thrd_success) {
- status = ZX_ERR_INTERNAL;
- }
- }
-
- return status;
-}
-
-zx_status_t WritebackQueue::Create(TransactionManager* transaction_manager,
- const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out) {
- zx_status_t status;
- fbl::unique_ptr<Buffer> buffer;
- if ((status = Buffer::Create(transaction_manager, buffer_blocks, "blobfs-writeback", &buffer))
- != ZX_OK) {
- return status;
- }
-
- fbl::unique_ptr<WritebackQueue> wb(new WritebackQueue(std::move(buffer)));
-
- if (cnd_init(&wb->work_completed_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
- if (cnd_init(&wb->work_added_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- if (thrd_create_with_name(&wb->worker_,
- WritebackQueue::WritebackThread, wb.get(),
- "blobfs-writeback") != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- fbl::AutoLock lock(&wb->lock_);
- wb->state_ = WritebackState::kRunning;
- *out = std::move(wb);
- return ZX_OK;
-}
-
-zx_status_t WritebackQueue::Enqueue(fbl::unique_ptr<WritebackWork> work) {
- TRACE_DURATION("blobfs", "WritebackQueue::Enqueue", "work ptr", work.get());
- fbl::AutoLock lock(&lock_);
- zx_status_t status = ZX_OK;
-
- if (IsReadOnly()) {
- // If we are in a readonly state, return an error. However, the work should still be
- // enqueued and ultimately processed by the WritebackThread. This will help us avoid
- // potential race conditions if the work callback must acquire a lock.
- status = ZX_ERR_BAD_STATE;
- } else if (!work->IsBuffered()) {
- ZX_DEBUG_ASSERT(state_ == WritebackState::kRunning);
-
- // Only copy blocks to the buffer if they have not already been copied to another buffer.
- EnsureSpaceLocked(work->BlkCount());
-
- // It is possible that the queue entered a read only state
- // while we were waiting to ensure space, so check again now.
- if (IsReadOnly()) {
- status = ZX_ERR_BAD_STATE;
- } else {
- buffer_->CopyTransaction(work.get());
- }
- }
-
- work_queue_.push(std::move(work));
- cnd_signal(&work_added_);
- return status;
-}
-
-bool WritebackQueue::IsRunning() const {
- switch (state_) {
- case WritebackState::kRunning:
- case WritebackState::kReadOnly:
- return true;
- default:
- return false;
- }
-}
-
-void WritebackQueue::EnsureSpaceLocked(size_t blocks) {
- while (!buffer_->IsSpaceAvailable(blocks)) {
- // Not enough room to write back work, yet. Wait until room is available.
- Waiter w;
- producer_queue_.push(&w);
-
- do {
- cnd_wait(&work_completed_, lock_.GetInternal());
- } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
- (!buffer_->IsSpaceAvailable(blocks))); // ... and there is enough space for us.
-
- producer_queue_.pop();
- }
-}
-
-int WritebackQueue::WritebackThread(void* arg) {
- WritebackQueue* b = reinterpret_cast<WritebackQueue*>(arg);
-
- b->lock_.Acquire();
- while (true) {
- bool error = b->IsReadOnly();
- while (!b->work_queue_.is_empty()) {
- if (!error && !b->work_queue_.front().IsReady()) {
- // If the work is not yet ready, break and wait until we receive another signal.
- break;
- }
-
- auto work = b->work_queue_.pop();
- TRACE_DURATION("blobfs", "WritebackQueue::WritebackThread", "work ptr", work.get());
-
- bool our_buffer = b->buffer_->VerifyTransaction(work.get());
- size_t blk_count = work->BlkCount();
-
- // Stay unlocked while processing a unit of work.
- b->lock_.Release();
-
- if (error) {
- // If we are in a read only state, mark the work complete with an error status.
- work->MarkCompleted(ZX_ERR_BAD_STATE);
- } else {
- // If we should complete the work, make sure it has been buffered.
- // (This is not necessary if we are currently in an error state).
- ZX_DEBUG_ASSERT(work->IsBuffered());
- zx_status_t status;
- if ((status = work->Complete()) != ZX_OK) {
- FS_TRACE_ERROR("Work failed with status %d - "
- "converting writeback to read only state.\n", status);
- // If work completion failed, set the buffer to an error state.
- error = true;
- }
- }
-
- work = nullptr;
- b->lock_.Acquire();
-
- if (error) {
- // If we encountered an error, set the queue to readonly.
- b->state_ = WritebackState::kReadOnly;
- }
-
- if (our_buffer) {
- // If the last work we processed belonged to our buffer,
- // update the buffer's start/len accordingly.
- b->buffer_->FreeSpace(blk_count);
- }
-
- // We may have opened up space (or entered a read only state),
- // so signal the producer queue.
- cnd_signal(&b->work_completed_);
- }
-
- // Before waiting, we should check if we're unmounting.
- // If work still remains in the work or producer queues,
- // continue the loop until they are empty.
- if (b->unmounting_ && b->work_queue_.is_empty() && b->producer_queue_.is_empty()) {
- ZX_DEBUG_ASSERT(b->work_queue_.is_empty());
- ZX_DEBUG_ASSERT(b->producer_queue_.is_empty());
- b->state_ = WritebackState::kComplete;
- b->lock_.Release();
- return 0;
- }
-
- cnd_wait(&b->work_added_, b->lock_.GetInternal());
- }
-}
-
-zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
- TransactionManager* transaction_manager, Blob* vn,
- const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- const size_t kMaxChunkBlocks = (3 * transaction_manager->WritebackCapacity()) / 4;
- uint64_t delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- while (nblocks > 0) {
- if ((*work)->BlkCount() + delta_blocks > kMaxChunkBlocks) {
- // If enqueueing these blocks could push us past the writeback buffer capacity
- // when combined with all previous writes, break this transaction into a smaller
- // chunk first.
- fbl::unique_ptr<WritebackWork> tmp;
- zx_status_t status = transaction_manager->CreateWork(&tmp, vn);
- if (status != ZX_OK) {
- return status;
- }
- if ((status = transaction_manager->EnqueueWork(std::move(*work),
- EnqueueType::kData)) != ZX_OK) {
- return status;
- }
- *work = std::move(tmp);
- }
-
- (*work)->Enqueue(vmo, relative_block, absolute_block, delta_blocks);
- relative_block += delta_blocks;
- absolute_block += delta_blocks;
- nblocks -= delta_blocks;
- delta_blocks = fbl::min(nblocks, kMaxChunkBlocks);
- }
- return ZX_OK;
-}
-
} // namespace blobfs
diff --git a/zircon/system/ulib/blobfs/writeback.cpp b/zircon/system/ulib/blobfs/writeback.cpp
index 7b21695..8ef56c7 100644
--- a/zircon/system/ulib/blobfs/writeback.cpp
+++ b/zircon/system/ulib/blobfs/writeback.cpp
@@ -8,510 +8,6 @@
namespace blobfs {
-WriteTxn::~WriteTxn() {
- ZX_DEBUG_ASSERT_MSG(requests_.is_empty(), "WriteTxn still has pending requests");
-}
-
-void WriteTxn::Enqueue(const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,
- uint64_t nblocks) {
- ZX_DEBUG_ASSERT(vmo.is_valid());
- ZX_DEBUG_ASSERT(!IsBuffered());
-
- for (auto& request : requests_) {
- if (request.vmo != vmo.get()) {
- continue;
- }
-
- if (request.vmo_offset == relative_block) {
- // Take the longer of the operations (if operating on the same blocks).
- if (nblocks > request.length) {
- block_count_ += (nblocks - request.length);
- request.length = nblocks;
- }
- return;
- } else if ((request.vmo_offset + request.length == relative_block) &&
- (request.dev_offset + request.length == absolute_block)) {
- // Combine with the previous request, if immediately following.
- request.length += nblocks;
- block_count_ += nblocks;
- return;
- }
- }
-
- WriteRequest request;
- request.vmo = vmo.get();
- request.vmo_offset = relative_block;
- request.dev_offset = absolute_block;
- request.length = nblocks;
- requests_.push_back(std::move(request));
- block_count_ += request.length;
-}
-
-size_t WriteTxn::BlkStart() const {
- ZX_DEBUG_ASSERT(IsBuffered());
- ZX_DEBUG_ASSERT(requests_.size() > 0);
- return requests_[0].vmo_offset;
-}
-
-size_t WriteTxn::BlkCount() const {
- return block_count_;
-}
-
-void WriteTxn::SetBuffer(vmoid_t vmoid) {
- ZX_DEBUG_ASSERT(vmoid_ == VMOID_INVALID || vmoid_ == vmoid);
- ZX_DEBUG_ASSERT(vmoid != VMOID_INVALID);
- vmoid_ = vmoid;
-}
-
-zx_status_t WriteTxn::Flush() {
- ZX_ASSERT(IsBuffered());
- fs::Ticker ticker(transaction_manager_->LocalMetrics().Collecting());
-
- // Update all the outgoing transactions to be in disk blocks
- block_fifo_request_t blk_reqs[requests_.size()];
- const uint32_t kDiskBlocksPerBlobfsBlock =
- kBlobfsBlockSize / transaction_manager_->DeviceBlockSize();
- for (size_t i = 0; i < requests_.size(); i++) {
- blk_reqs[i].group = transaction_manager_->BlockGroupID();
- blk_reqs[i].vmoid = vmoid_;
- blk_reqs[i].opcode = BLOCKIO_WRITE;
- blk_reqs[i].vmo_offset = requests_[i].vmo_offset * kDiskBlocksPerBlobfsBlock;
- blk_reqs[i].dev_offset = requests_[i].dev_offset * kDiskBlocksPerBlobfsBlock;
- uint64_t length = requests_[i].length * kDiskBlocksPerBlobfsBlock;
- // TODO(ZX-2253): Requests this long, although unlikely, should be
- // handled more gracefully.
- ZX_ASSERT_MSG(length < UINT32_MAX, "Request size too large");
- blk_reqs[i].length = static_cast<uint32_t>(length);
- }
-
- // Actually send the operations to the underlying block device.
- zx_status_t status = transaction_manager_->Transaction(blk_reqs, requests_.size());
-
- if (transaction_manager_->LocalMetrics().Collecting()) {
- uint64_t sum = 0;
- for (const auto& blk_req : blk_reqs) {
- sum += blk_req.length * kBlobfsBlockSize;
- }
- transaction_manager_->LocalMetrics().UpdateWriteback(sum, ticker.End());
- }
-
- requests_.reset();
- vmoid_ = VMOID_INVALID;
- block_count_ = 0;
- return status;
-}
-
-void WritebackWork::MarkCompleted(zx_status_t status) {
- WriteTxn::Reset();
- if (sync_cb_) {
- sync_cb_(status);
- }
- sync_cb_ = nullptr;
- ready_cb_ = nullptr;
-}
-
-bool WritebackWork::IsReady() {
- if (ready_cb_) {
- if (ready_cb_()) {
- ready_cb_ = nullptr;
- return true;
- }
-
- return false;
- }
-
- return true;
-}
-
-void WritebackWork::SetReadyCallback(ReadyCallback callback) {
- ZX_DEBUG_ASSERT(!ready_cb_);
- ready_cb_ = std::move(callback);
-}
-
-void WritebackWork::SetSyncCallback(SyncCallback callback) {
- if (sync_cb_) {
- // This "callback chain" allows multiple clients to observe the completion
- // of the WritebackWork. This is akin to a promise "and-then" relationship.
- sync_cb_ = [previous_callback = std::move(sync_cb_),
- next_callback = std::move(callback)] (zx_status_t status) {
- next_callback(status);
- previous_callback(status);
- };
- } else {
- sync_cb_ = std::move(callback);
- }
-}
-
-// Returns the number of blocks of the writeback buffer that have been consumed
-zx_status_t WritebackWork::Complete() {
- zx_status_t status = Flush();
- MarkCompleted(status);
- return status;
-}
-
-WritebackWork::WritebackWork(TransactionManager* transaction_manager)
- : WriteTxn(transaction_manager), ready_cb_(nullptr), sync_cb_(nullptr) {}
-
-Buffer::~Buffer() {
- if (vmoid_ != VMOID_INVALID) {
- // Close the buffer vmo.
- block_fifo_request_t request;
- request.group = transaction_manager_->BlockGroupID();
- request.vmoid = vmoid_;
- request.opcode = BLOCKIO_CLOSE_VMO;
- transaction_manager_->Transaction(&request, 1);
- }
-}
-
-zx_status_t Buffer::Create(TransactionManager* blobfs, size_t blocks, const char* label,
- fbl::unique_ptr<Buffer>* out) {
-
- fzl::OwnedVmoMapper mapper;
- zx_status_t status = mapper.CreateAndMap(blocks * kBlobfsBlockSize, "blob-writeback");
- if (status != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to create vmo\n");
- return status;
- }
-
- fbl::unique_ptr<Buffer> buffer(new Buffer(blobfs, std::move(mapper)));
- if ((status = buffer->transaction_manager_->AttachVmo(buffer->mapper_.vmo(), &buffer->vmoid_))
- != ZX_OK) {
- FS_TRACE_ERROR("Buffer: Failed to attach vmo\n");
- return status;
- }
-
- *out = std::move(buffer);
- return ZX_OK;
-}
-
-bool Buffer::IsSpaceAvailable(size_t blocks) const {
- // TODO(planders): Similar to minfs, make sure that we either have a fallback mechanism for
- // operations which are too large to be fully contained by the buffer, or that the
- // worst-case operation will always fit within the buffer.
- ZX_ASSERT_MSG(blocks <= capacity_, "Requested txn (%zu blocks) larger than buffer", blocks);
- return length_ + blocks <= capacity_;
-}
-
-void Buffer::CopyTransaction(WriteTxn* txn) {
- ZX_DEBUG_ASSERT(!txn->IsBuffered());
- auto& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- ZX_DEBUG_ASSERT(reqs[i].vmo != ZX_HANDLE_INVALID);
-
- // Read parameters of the current request.
- size_t vmo_offset = reqs[i].vmo_offset;
- size_t dev_offset = reqs[i].dev_offset;
- const size_t vmo_len = reqs[i].length;
- ZX_DEBUG_ASSERT(vmo_len > 0);
-
- // Calculate the offset/length we will need to write into the buffer.
- size_t buf_offset = (start_ + length_) % capacity_;
- size_t buf_len = (buf_offset + vmo_len > capacity_) ? capacity_ - buf_offset : vmo_len;
- size_t init_len = vmo_len;
- size_t total_len = buf_len;
-
- // Verify that the length is valid.
- ZX_DEBUG_ASSERT(buf_len > 0);
- ZX_DEBUG_ASSERT(buf_len <= vmo_len);
- ZX_DEBUG_ASSERT(buf_len < capacity_);
- zx_handle_t vmo = reqs[i].vmo;
- ZX_DEBUG_ASSERT(vmo != mapper_.vmo().get());
-
- // Write data from the vmo into the buffer.
- void* ptr = MutableData(buf_offset);
-
- zx_status_t status;
- ZX_DEBUG_ASSERT((start_ <= buf_offset) ?
- (start_ < buf_offset + buf_len) :
- (buf_offset + buf_len <= start_)); // Wraparound
- ZX_ASSERT_MSG((status = zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize)) == ZX_OK, "VMO Read Fail: %d", status);
-
- // Update the buffer len to include newly written data.
- length_ += buf_len;
-
- // Update the write_request to transfer from the writeback buffer out to disk,
- // rather than the supplied VMO.
- // Set the vmo handle to invalid, since we will be using the same vmoid for all requests.
- reqs[i].vmo = ZX_HANDLE_INVALID;
- reqs[i].vmo_offset = buf_offset;
- reqs[i].length = buf_len;
-
- if (buf_len != vmo_len) {
- // We wrapped around; write what remains from this request.
- vmo_offset += buf_len;
- dev_offset += buf_len;
- buf_len = vmo_len - buf_len;
- ZX_DEBUG_ASSERT(buf_len > 0);
-
- ptr = MutableData(0);
- ZX_DEBUG_ASSERT((start_ == 0) ? (start_ < buf_len) : (buf_len <= start_)); // Wraparound
- ZX_ASSERT(zx_vmo_read(vmo, ptr, vmo_offset * kBlobfsBlockSize,
- buf_len * kBlobfsBlockSize) == ZX_OK);
-
- length_ += buf_len;
- total_len += buf_len;
-
- // Shift down all following write requests.
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
-
- // Shift down all following write requests
- static_assert(std::is_pod<WriteRequest>::value, "Can't memmove non-POD");
- // Insert the "new" request, which is the latter half of the last request
- WriteRequest request;
- request.vmo = vmo;
- request.vmo_offset = 0;
- request.dev_offset = dev_offset;
- request.length = buf_len;
- i++;
- reqs.insert(i, request);
- }
-
- // Verify that the length of all vmo writes we did match the total length we were meant to
- // write from the initial vmo.
- ZX_DEBUG_ASSERT(init_len == total_len);
- }
-
- txn->SetBuffer(vmoid_);
-}
-
-void Buffer::AddTransaction(size_t start, size_t disk_start, size_t length, WritebackWork* work) {
- // Ensure the request fits within the buffer.
- ZX_DEBUG_ASSERT(length > 0);
- ZX_DEBUG_ASSERT(start + length <= capacity_);
- ZX_DEBUG_ASSERT(work != nullptr);
- work->Enqueue(mapper_.vmo(), start, disk_start, length);
-}
-
-bool Buffer::VerifyTransaction(WriteTxn* txn) const {
- if (txn->CheckBuffer(vmoid_)) {
- if (txn->BlkCount() > 0) {
- // If the work belongs to the WritebackQueue, verify that it matches up with the
- // buffer's start/len.
- ZX_ASSERT(txn->BlkStart() == start_);
- ZX_ASSERT(txn->BlkCount() <= length_);
- }
-
- return true;
- }
-
- return false;
-}
-
-void Buffer::ValidateTransaction(WriteTxn* txn) {
- if (txn->IsBuffered()) {
- // If transaction is already buffered, make sure it belongs to this buffer.
- ZX_DEBUG_ASSERT(txn->CheckBuffer(vmoid_));
- } else {
- fbl::Vector<WriteRequest>& reqs = txn->Requests();
-
- for (size_t i = 0; i < reqs.size(); i++) {
- // Verify that each request references this buffer VMO,
- // and that the transaction fits within the buffer.
- ZX_DEBUG_ASSERT(reqs[i].vmo == mapper_.vmo().get());
- reqs[i].vmo = ZX_HANDLE_INVALID;
- }
-
- // Once each request has been verified, set the buffer.
- txn->SetBuffer(vmoid_);
- }
-}
-
-void Buffer::FreeSpace(size_t blocks) {
- ZX_DEBUG_ASSERT(blocks <= length_);
- start_ = (start_ + blocks) % capacity_;
- length_ -= blocks;
-}
-
-WritebackQueue::~WritebackQueue() {
- // Ensure that thread teardown has completed, or that it was never brought up to begin with.
- ZX_DEBUG_ASSERT(!IsRunning());
- ZX_DEBUG_ASSERT(work_queue_.is_empty());
-}
-
-zx_status_t WritebackQueue::Teardown() {
- WritebackState state;
-
- {
- fbl::AutoLock lock(&lock_);
- state = state_;
-
- // Signal the background thread.
- unmounting_ = true;
- cnd_signal(&work_added_);
- }
-
- zx_status_t status = ZX_OK;
- if (state != WritebackState::kInit) {
- // Block until the thread completes itself.
- int result = -1;
- int success = thrd_join(worker_, &result);
- if (result != 0 || success != thrd_success) {
- status = ZX_ERR_INTERNAL;
- }
- }
-
- return status;
-}
-
-zx_status_t WritebackQueue::Create(TransactionManager* transaction_manager,
- const size_t buffer_blocks,
- fbl::unique_ptr<WritebackQueue>* out) {
- zx_status_t status;
- fbl::unique_ptr<Buffer> buffer;
- if ((status = Buffer::Create(transaction_manager, buffer_blocks, "blobfs-writeback", &buffer))
- != ZX_OK) {
- return status;
- }
-
- fbl::unique_ptr<WritebackQueue> wb(new WritebackQueue(std::move(buffer)));
-
- if (cnd_init(&wb->work_completed_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
- if (cnd_init(&wb->work_added_) != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- if (thrd_create_with_name(&wb->worker_,
- WritebackQueue::WritebackThread, wb.get(),
- "blobfs-writeback") != thrd_success) {
- return ZX_ERR_NO_RESOURCES;
- }
-
- fbl::AutoLock lock(&wb->lock_);
- wb->state_ = WritebackState::kRunning;
- *out = std::move(wb);
- return ZX_OK;
-}
-
-zx_status_t WritebackQueue::Enqueue(fbl::unique_ptr<WritebackWork> work) {
- TRACE_DURATION("blobfs", "WritebackQueue::Enqueue", "work ptr", work.get());
- fbl::AutoLock lock(&lock_);
- zx_status_t status = ZX_OK;
-
- if (IsReadOnly()) {
- // If we are in a readonly state, return an error. However, the work should still be
- // enqueued and ultimately processed by the WritebackThread. This will help us avoid
- // potential race conditions if the work callback must acquire a lock.
- status = ZX_ERR_BAD_STATE;
- } else if (!work->IsBuffered()) {
- ZX_DEBUG_ASSERT(state_ == WritebackState::kRunning);
-
- // Only copy blocks to the buffer if they have not already been copied to another buffer.
- EnsureSpaceLocked(work->BlkCount());
-
- // It is possible that the queue entered a read only state
- // while we were waiting to ensure space, so check again now.
- if (IsReadOnly()) {
- status = ZX_ERR_BAD_STATE;
- } else {
- buffer_->CopyTransaction(work.get());
- }
- }
-
- work_queue_.push(std::move(work));
- cnd_signal(&work_added_);
- return status;
-}
-
-bool WritebackQueue::IsRunning() const {
- switch (state_) {
- case WritebackState::kRunning:
- case WritebackState::kReadOnly:
- return true;
- default:
- return false;
- }
-}
-
-void WritebackQueue::EnsureSpaceLocked(size_t blocks) {
- while (!buffer_->IsSpaceAvailable(blocks)) {
- // Not enough room to write back work, yet. Wait until room is available.
- Waiter w;
- producer_queue_.push(&w);
-
- do {
- cnd_wait(&work_completed_, lock_.GetInternal());
- } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
- (!buffer_->IsSpaceAvailable(blocks))); // ... and there is enough space for us.
-
- producer_queue_.pop();
- }
-}
-
-int WritebackQueue::WritebackThread(void* arg) {
- WritebackQueue* b = reinterpret_cast<WritebackQueue*>(arg);
-
- b->lock_.Acquire();
- while (true) {
- bool error = b->IsReadOnly();
- while (!b->work_queue_.is_empty()) {
- if (!error && !b->work_queue_.front().IsReady()) {
- // If the work is not yet ready, break and wait until we receive another signal.
- break;
- }
-
- auto work = b->work_queue_.pop();
- TRACE_DURATION("blobfs", "WritebackQueue::WritebackThread", "work ptr", work.get());
-
- bool our_buffer = b->buffer_->VerifyTransaction(work.get());
- size_t blk_count = work->BlkCount();
-
- // Stay unlocked while processing a unit of work.
- b->lock_.Release();
-
- if (error) {
- // If we are in a read only state, mark the work complete with an error status.
- work->MarkCompleted(ZX_ERR_BAD_STATE);
- } else {
- // If we should complete the work, make sure it has been buffered.
- // (This is not necessary if we are currently in an error state).
- ZX_DEBUG_ASSERT(work->IsBuffered());
- zx_status_t status;
- if ((status = work->Complete()) != ZX_OK) {
- FS_TRACE_ERROR("Work failed with status %d - "
- "converting writeback to read only state.\n", status);
- // If work completion failed, set the buffer to an error state.
- error = true;
- }
- }
-
- work = nullptr;
- b->lock_.Acquire();
-
- if (error) {
- // If we encountered an error, set the queue to readonly.
- b->state_ = WritebackState::kReadOnly;
- }
-
- if (our_buffer) {
- // If the last work we processed belonged to our buffer,
- // update the buffer's start/len accordingly.
- b->buffer_->FreeSpace(blk_count);
- }
-
- // We may have opened up space (or entered a read only state),
- // so signal the producer queue.
- cnd_signal(&b->work_completed_);
- }
-
- // Before waiting, we should check if we're unmounting.
- // If work still remains in the work or producer queues,
- // continue the loop until they are empty.
- if (b->unmounting_ && b->work_queue_.is_empty() && b->producer_queue_.is_empty()) {
- ZX_DEBUG_ASSERT(b->work_queue_.is_empty());
- ZX_DEBUG_ASSERT(b->producer_queue_.is_empty());
- b->state_ = WritebackState::kComplete;
- b->lock_.Release();
- return 0;
- }
-
- cnd_wait(&b->work_added_, b->lock_.GetInternal());
- }
-}
-
zx_status_t EnqueuePaginated(fbl::unique_ptr<WritebackWork>* work,
TransactionManager* transaction_manager, Blob* vn,
const zx::vmo& vmo, uint64_t relative_block, uint64_t absolute_block,