// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <blobfs/journal.h>
#include <fbl/auto_lock.h>
#include <fbl/unique_ptr.h>
#include <lib/cksum.h>
#include <lib/sync/completion.h>
#include <zircon/types.h>

#include <utility>

namespace blobfs {

// TODO(ZX-2415): Add tracing/metrics collection to journal related operations.

// Thread which asynchronously processes journal entries.
static int JournalThread(void* arg) {
    Journal* journal = reinterpret_cast<Journal*>(arg);
    journal->ProcessLoop();
    return 0;
}

zx_status_t Journal::Create(TransactionManager* transaction_manager, uint64_t journal_blocks,
                            uint64_t start_block, fbl::unique_ptr<Journal>* out) {
    // Create the buffer with 1 less than total journal blocks.
    // (1 block must be reserved for journal info).
    zx_status_t status;
    fbl::unique_ptr<Buffer> buffer;
    if ((status = Buffer::Create(transaction_manager, journal_blocks - 1, "blobfs-journal",
                                 &buffer)) != ZX_OK) {
        return status;
    }

    // Create another buffer for the journal info block.
    fbl::unique_ptr<Buffer> info;
    if ((status = Buffer::Create(transaction_manager, 1, "blobfs-journal-info", &info)) != ZX_OK) {
        return status;
    }

    // Reserve the only block in the info buffer so its impossible to copy transactions to it.
    info->ReserveIndex();

    // Create the Journal with the newly created vmos.
    fbl::unique_ptr<Journal> journal(new Journal(transaction_manager, std::move(info),
                                                 std::move(buffer), start_block));

    // Load contents of journal from disk.
    if ((status = journal->Load()) != ZX_OK) {
        FS_TRACE_ERROR("Journal: Failed to load from disk: %d\n", status);
        return status;
    }

    *out = std::move(journal);
    return ZX_OK;
}

Journal::~Journal() {
    // Ensure that thread teardown has completed, or that it was never brought up to begin with.
    ZX_DEBUG_ASSERT(!IsRunning());

    // Ensure that work and producer queues are currently empty.
    ZX_DEBUG_ASSERT(work_queue_.is_empty());
    ZX_DEBUG_ASSERT(producer_queue_.is_empty());
}

zx_status_t Journal::Teardown() {
    WritebackState state;

    {
        fbl::AutoLock lock(&lock_);
        state = state_;

        // Signal the background thread.
        unmounting_ = true;
        cnd_signal(&consumer_cvar_);
    }

    zx_status_t status = ZX_OK;
    if (state != WritebackState::kInit && state != WritebackState::kReady) {
        // Block until the thread completes itself.
        int result = -1;
        int success = thrd_join(thread_, &result);
        if (result != 0 || success != thrd_success) {
            status = ZX_ERR_INTERNAL;
        }
    }

    return status;
}

zx_status_t Journal::Load() {
    fbl::AutoLock lock(&lock_);
    ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);

    // Load info block and journal entries into their respective buffers.
    fs::ReadTxn txn(transaction_manager_);
    info_->Load(&txn, start_block_);
    entries_->Load(&txn, start_block_ + 1);
    zx_status_t status = txn.Transact();

    if (status != ZX_OK) {
        return status;
    }

    JournalInfo* info = GetInfo();

    // Verify the journal magic matches.
    if (info->magic != kJournalMagic) {
        FS_TRACE_ERROR("Journal info bad magic\n");
        return ZX_ERR_BAD_STATE;
    }

    if (info->start_block > 0 || info->num_blocks > 0 || info->timestamp > 0) {
        // Who checks the checksum? (It's us. We are doing it.)
        uint32_t old_checksum = info->checksum;
        info->checksum = 0;

        uint8_t* info_ptr = reinterpret_cast<uint8_t*>(info);
        info->checksum = crc32(0, info_ptr, sizeof(JournalInfo));

        if (old_checksum != info->checksum) {
            FS_TRACE_ERROR("Journal info checksum corrupt\n");
            return ZX_ERR_BAD_STATE;
        }
    }

    return status;
}

zx_status_t Journal::Replay() {
    fbl::AutoLock lock(&lock_);
    ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);

    uint64_t timestamp = 0;
    size_t start = GetInfo()->start_block;
    size_t length = GetInfo()->num_blocks;
    size_t total_entries = 0;
    size_t total_blocks = 0;

    // Replay entries until we find one that isn't valid.
    while (true) {
        uint64_t entry_blocks;
        // |start| is the header index of the next entry.
        zx_status_t status = ReplayEntry(start, length, &entry_blocks, &timestamp);
        if (status == ZX_ERR_OUT_OF_RANGE) {
            break;
        } else if (status != ZX_OK) {
            return status;
        }

        total_entries++;
        total_blocks += entry_blocks;

        start += entry_blocks;
        start %= entries_->capacity();

        if (length) {
            length -= entry_blocks;
        }
    }

    // TODO(planders): Sync to ensure that all entries have been written out before resetting the
    //                 on-disk state of the journal.
    if (total_entries > 0) {
        printf("Found and replayed %zu total blobfs journal entries starting from index %zu, "
               "including %zu total blocks.\n",
               total_entries, GetInfo()->start_block, total_blocks);
    } else if (start == 0 && length == 0) {
        // If no entries were found and journal is already in its default state,
        // return without writing out any changes.
        state_ = WritebackState::kReady;
        return ZX_OK;
    }

    // We expect length to be 0 at this point, assuming the journal was not corrupted and replay
    // completed successfully. However, in the case of corruption of the journal this may not be the
    // case. Since we cannot currently recover from this situation we should proceed as normal.
    zx_status_t status = CommitReplay();
    if (status != ZX_OK) {
        return status;
    }

    // Now that we've resolved any remaining entries, we are ready to start journal writeback.
    state_ = WritebackState::kReady;
    return ZX_OK;
}

zx_status_t Journal::InitWriteback() {
    fbl::AutoLock lock(&lock_);
    ZX_DEBUG_ASSERT(state_ == WritebackState::kReady);

    if (entries_->start() > 0 || entries_->length() > 0) {
        FS_TRACE_ERROR("Cannot initialize journal writeback - entries may still exist.\n");
        return ZX_ERR_BAD_STATE;
    }

    if (thrd_create_with_name(&thread_, JournalThread, this, "blobfs-journal") !=
        thrd_success) {
        FS_TRACE_ERROR("Failed to create journal thread.\n");
        return ZX_ERR_NO_RESOURCES;
    }

    state_ = WritebackState::kRunning;
    return ZX_OK;
}

zx_status_t Journal::Enqueue(fbl::unique_ptr<WritebackWork> work) {
    // Verify that the work exists and has not already been prepared for writeback.
    ZX_DEBUG_ASSERT(work != nullptr);
    ZX_DEBUG_ASSERT(!work->IsBuffered());

    // Block count will be the number of blocks in the transaction + header + commit.
    size_t blocks = work->BlkCount();
    // By default set the header/commit indices to the buffer capacity,
    // since this will be an invalid index value.
    size_t header_index = entries_->capacity();
    size_t commit_index = entries_->capacity();

    fbl::AutoLock lock(&lock_);

    zx_status_t status = ZX_OK;
    if (IsReadOnly()) {
        // If we are in "read only" mode, set an error status.
        status = ZX_ERR_BAD_STATE;
    } else if (blocks) {
        ZX_DEBUG_ASSERT(state_ == WritebackState::kRunning);

        // If the work contains no blocks (i.e. it is a sync work), proceed to create an entry
        // without enqueueing any data to the buffer.

        // Add 2 blocks to the block count for the journal entry's header/commit blocks.
        blocks += 2;

        // Ensure we have enough space to write the current entry to the buffer.
        // If not, wait until space becomes available.
        EnsureSpaceLocked(blocks);

        if (IsReadOnly()) {
            // The Journal is in a bad state and is no longer accepting new entries.
            status = ZX_ERR_BAD_STATE;
        } else {
            // Assign header index of journal entry to the next available value before we attempt to
            // copy the meat of the entry to the buffer.
            header_index = entries_->ReserveIndex();

            // Copy the data from WritebackWork to the journal buffer. We can wait to write out the
            // header and commit blocks asynchronously, since this will involve calculating the
            // checksum.
            // TODO(planders): Release the lock while transaction is being copied.
            entries_->CopyTransaction(work.get());

            // Assign commit_index immediately after copying to the buffer.
            // Increase length_ accordingly.
            commit_index = entries_->ReserveIndex();

            // Make sure that commit index matches what we expect
            // based on header index, block count, and buffer size.
            ZX_DEBUG_ASSERT(commit_index == (header_index + blocks - 1) % entries_->capacity());
        }
    }

    // Create the journal entry and push it onto the work queue.
    fbl::unique_ptr<JournalEntry> entry = CreateEntry(header_index, commit_index, std::move(work));

    if (entry->GetStatus() == EntryStatus::kInit) {
        // If we have a non-sync work, there is some extra preparation we need to do.
        if (status == ZX_OK) {
            // Prepare a WritebackWork to write out the entry to disk. Note that this does not
            // fully prepare the buffer for writeback, so a ready callback is added to the work as
            // part of this step.
            PrepareWork(entry.get(), &work);
            ZX_DEBUG_ASSERT(work != nullptr);
            status = EnqueueEntryWork(std::move(work));
        } else {
            // If the status is not okay (i.e. we are in a readonly state), do no additional
            // processing but set the entry state to error.
            entry->SetStatus(EntryStatus::kError);
        }
    }

    // Queue the entry to be processed asynchronously.
    work_queue_.push(std::move(entry));

    // Signal the JournalThread that there is at least one entry ready to be processed.
    SendSignalLocked(status);
    return status;
}

void Journal::SendSignalLocked(zx_status_t status) {
    if (status == ZX_OK) {
        // Once writeback has entered a read only state, no further transactions should succeed.
        ZX_ASSERT(state_ != WritebackState::kReadOnly);
    } else {
        state_ = WritebackState::kReadOnly;
    }
    consumer_signalled_ = true;
    cnd_signal(&consumer_cvar_);
}

bool Journal::IsRunning() const {
    switch (state_) {
    case WritebackState::kRunning:
    case WritebackState::kReadOnly:
        return true;
    default:
        return false;
    }
}

fbl::unique_ptr<JournalEntry> Journal::CreateEntry(uint64_t header_index, uint64_t commit_index,
                                                   fbl::unique_ptr<WritebackWork> work) {
    EntryStatus status = EntryStatus::kInit;

    if (work->BlkCount() == 0) {
        // If the work has no transactions, this is a sync work - we can return early.
        // Right now we make the assumption that if a WritebackWork has any transactions, it cannot
        // have a corresponding sync callback. We may need to revisit this later.
        status = EntryStatus::kSync;
    } else if (IsReadOnly()) {
        // If the journal is in a read only state, set the entry status to error.
        status = EntryStatus::kError;
    }

    return std::make_unique<JournalEntry>(this, status, header_index, commit_index,
                                          std::move(work));
}

void Journal::PrepareWork(JournalEntry* entry, fbl::unique_ptr<WritebackWork>* out) {
    size_t header_index = entry->GetHeaderIndex();
    size_t commit_index = entry->GetCommitIndex();
    size_t block_count = entry->BlockCount();

    if (block_count == 0) {
        // If journal entry has size 0, it is an empty sync entry, and we don't need to write
        // anything to the journal.
        ZX_DEBUG_ASSERT(header_index == entries_->capacity());
        ZX_DEBUG_ASSERT(commit_index == entries_->capacity());
        return;
    }

    fbl::unique_ptr<WritebackWork> work = CreateWork();

    // Update work with transactions for the current entry.
    AddEntryTransaction(header_index, block_count, work.get());

    // Make sure the work is prepared for the writeback queue.
    work->SetReadyCallback(entry->CreateReadyCallback());
    work->SetSyncCallback(entry->CreateSyncCallback());
    *out = std::move(work);
}

void Journal::ProcessEntryResult(zx_status_t result, JournalEntry* entry) {
    fbl::AutoLock lock(&lock_);
    // Since it is possible for the entry to be deleted immediately after updating its status
    // (if the journal is being processed at the time), it is safer to update the entry status
    // under the journal lock.
    entry->SetStatusFromResult(result);
    SendSignalLocked(result);
}

void Journal::PrepareBuffer(JournalEntry* entry) {
    size_t header_index = entry->GetHeaderIndex();
    size_t commit_index = entry->GetCommitIndex();
    size_t block_count = entry->BlockCount();

    if (block_count == 0) {
        // If journal entry has size 0, it is an empty sync entry, and we don't need to write
        // anything to the journal.
        ZX_DEBUG_ASSERT(header_index == entries_->capacity());
        ZX_DEBUG_ASSERT(commit_index == entries_->capacity());
        return;
    }

    // Copy header block of the journal entry into the journal buffer. We must write the header
    // block into the buffer before the commit block so we can generate the checksum.
    void* data = entries_->MutableData(header_index);
    memset(data, 0, kBlobfsBlockSize);
    memcpy(data, &entry->GetHeaderBlock(), sizeof(HeaderBlock));

    // Now that the header block has been written to the buffer, we can calculate a checksum for
    // the header + all journaled metadata blocks and set it in the entry's commit block.
    entry->SetChecksum(GenerateChecksum(header_index, commit_index));

    // Write the commit block (now with checksum) to the journal buffer.
    data = entries_->MutableData(commit_index);
    memset(data, 0, kBlobfsBlockSize);
    memcpy(data, &entry->GetCommitBlock(), sizeof(CommitBlock));
}

void Journal::PrepareDelete(JournalEntry* entry, WritebackWork* work) {
    ZX_DEBUG_ASSERT(work != nullptr);
    size_t header_index = entry->GetHeaderIndex();
    size_t commit_index = entry->GetCommitIndex();
    size_t block_count = entry->BlockCount();

    if (block_count == 0) {
        // If journal entry has size 0, it is an empty sync entry, and we don't need to write
        // anything to the journal.
        ZX_DEBUG_ASSERT(header_index == entries_->capacity());
        ZX_DEBUG_ASSERT(commit_index == entries_->capacity());
        return;
    }

    // Overwrite the header & commit block in the buffer with empty data.
    memset(entries_->MutableData(header_index), 0, kBlobfsBlockSize);
    memset(entries_->MutableData(commit_index), 0, kBlobfsBlockSize);

    // Enqueue transactions for the header/commit blocks.
    entries_->AddTransaction(header_index, start_block_ + 1 + header_index, 1, work);
    entries_->AddTransaction(commit_index, start_block_ + 1 + commit_index, 1, work);
}

fbl::unique_ptr<WritebackWork> Journal::CreateWork() {
    fbl::unique_ptr<WritebackWork> work;
    transaction_manager_->CreateWork(&work, nullptr);
    ZX_DEBUG_ASSERT(work != nullptr);
    return work;
}

zx_status_t Journal::EnqueueEntryWork(fbl::unique_ptr<WritebackWork> work) {
    entries_->ValidateTransaction(work.get());
    return transaction_manager_->EnqueueWork(std::move(work), EnqueueType::kData);
}

bool Journal::VerifyEntryMetadata(size_t header_index, uint64_t last_timestamp, bool expect_valid) {
    HeaderBlock* header = GetHeaderBlock(header_index);
    // If length_ > 0, the next entry should be guaranteed.
    if (header->magic != kEntryHeaderMagic || header->timestamp <= last_timestamp) {
        // If the next calculated header block is either 1) not a header block, or 2) does not
        // have a timestamp strictly later than the previous entry, it is not a valid entry and
        // should not be replayed. This is only a journal replay "error" if, according to the
        // journal super block, we still have some entries left to process (i.e. length_ > 0).
        if (expect_valid) {
            FS_TRACE_ERROR("Journal Replay Error: invalid header found.\n");
        }

        return false;
    }

    size_t commit_index = (header_index + header->num_blocks + 1) % entries_->capacity();
    CommitBlock* commit = GetCommitBlock(commit_index);

    if (commit->magic != kEntryCommitMagic) {
        FS_TRACE_ERROR("Journal Replay Error: commit magic does not match expected\n");
        return false;
    }

    if (commit->timestamp != header->timestamp) {
        FS_TRACE_ERROR("Journal Replay Error: commit timestamp does not match expected\n");
        return false;
    }

    // Calculate the checksum of the entry data to verify the commit block's checksum.
    uint32_t checksum = GenerateChecksum(header_index, commit_index);

    // Since we already found a valid header, we expect this to be a valid entry. If something
    // in the commit block does not match what we expect, this is an error.
    if (commit->checksum != checksum) {
        FS_TRACE_ERROR("Journal Replay Error: commit checksum does not match expected\n");
        return false;
    }

    return true;
}

zx_status_t Journal::ReplayEntry(size_t header_index, size_t remaining_length,
                                 uint64_t* entry_blocks, uint64_t* timestamp) {
    ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);

    bool expect_valid = remaining_length > 0;
    if (!VerifyEntryMetadata(header_index, *timestamp, expect_valid)) {
        return ZX_ERR_OUT_OF_RANGE;
    }

    HeaderBlock* header = GetHeaderBlock(header_index);
    *timestamp = header->timestamp;
    *entry_blocks = header->num_blocks + 2;
    // We have found a valid entry - ensure that remaining_length is valid
    // (either 0 remaining, or enough to fit this entry).
    ZX_DEBUG_ASSERT(remaining_length == 0 || remaining_length >= *entry_blocks);

    fbl::unique_ptr<WritebackWork> work = CreateWork();

    // Enqueue one block at a time, since they may not end up being contiguous on disk.
    for (unsigned i = 0; i < header->num_blocks; i++) {
        size_t vmo_block = (header_index + i + 1) % entries_->capacity();
        entries_->AddTransaction(vmo_block, header->target_blocks[i], 1, work.get());
    }

    // Replay (and therefore mount) will fail if we cannot enqueue the replay work. Since the
    // journal itself is not corrupt (at least up to this point), we would expect replay to
    // succeed on a subsequent attempt, so we should keep any existing entries intact. (i.e.,
    // do not reset the journal metadata in this failure case).
    zx_status_t status = EnqueueEntryWork(std::move(work));
    if (status != ZX_OK) {
        FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
    }

    return status;
}

zx_status_t Journal::CommitReplay() {
    ZX_DEBUG_ASSERT(state_ == WritebackState::kInit);

    // Overwrite the first journal entry block to 0. Since we are resetting the info block to point
    // to 0 as the first entry, we expect that block 0 will not contain a valid entry. Overwriting
    // it will ensure that this is not the case.
    memset(entries_->MutableData(0), 0, kBlobfsBlockSize);
    fbl::unique_ptr<WritebackWork> work = CreateWork();

    entries_->AddTransaction(0, start_block_ + 1, 1, work.get());

    zx_status_t status;
    if ((status = EnqueueEntryWork(std::move(work))) != ZX_OK) {
        FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
        return status;
    }

    // Write out the updated info block to disk.
    if ((status = WriteInfo(entries_->start(), entries_->length())) != ZX_OK) {
        FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
        return status;
    }

    // Wait for any replayed entries to complete before completing replay.
    work = CreateWork();
    sync_completion_t completion;
    sync_completion_reset(&completion);

    work->SetSyncCallback([&completion, &status](zx_status_t new_status) {
        status = new_status;
        sync_completion_signal(&completion);
    });

    if ((status = EnqueueEntryWork(std::move(work))) != ZX_OK) {
        FS_TRACE_ERROR("Journal replay failed with status %d\n", status);
        return status;
    }

    sync_completion_wait(&completion, ZX_TIME_INFINITE);

    // Return a successful status, even if we detected corrupt metadata or entries.
    // Our metadata should still be in a consistent state so it will be safe to mount regardless.
    return ZX_OK;
}

zx_status_t Journal::WriteInfo(uint64_t start, uint64_t length) {
    JournalInfo* info = GetInfo();

    if (start == info->start_block && length == info->num_blocks) {
        // If the current buffer start/len match the info block, skip the writing step.
        return ZX_OK;
    }

    fbl::unique_ptr<WritebackWork> work;
    transaction_manager_->CreateWork(&work, nullptr);

    info->start_block = start;
    info->num_blocks = length;
    info->timestamp = zx_ticks_get();

    // Set the checksum to 0 so we can calculate the checksum of the rest of the info block.
    info->checksum = 0;
    uint8_t* info_ptr = reinterpret_cast<uint8_t*>(info);
    info->checksum = crc32(0, info_ptr, sizeof(JournalInfo));

    info_->AddTransaction(0, start_block_, 1, work.get());
    info_->ValidateTransaction(work.get());
    return transaction_manager_->EnqueueWork(std::move(work), EnqueueType::kData);
}

void Journal::EnsureSpaceLocked(size_t blocks) {
    while (!entries_->IsSpaceAvailable(blocks)) {
        // Not enough room to write back work, yet. Wait until room is available.
        Waiter w;
        producer_queue_.push(&w);

        do {
            cnd_wait(&producer_cvar_, lock_.GetInternal());
        } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue...
                (!entries_->IsSpaceAvailable(blocks))); // ... and there is enough space for us.

        producer_queue_.pop();
    }
}

void Journal::AddEntryTransaction(size_t start, size_t length, WritebackWork* work) {
    // Ensure the request fits within the buffer.
    ZX_DEBUG_ASSERT(start < entries_->capacity());
    ZX_DEBUG_ASSERT(length > 0);
    ZX_DEBUG_ASSERT(length < entries_->capacity());
    ZX_DEBUG_ASSERT(work != nullptr);

    // Adjust the length of the first transaction in
    // case it wraps around to the front of the buffer.
    size_t first_length = length;

    if (start + length > entries_->capacity()) {
        first_length = entries_->capacity() - start;
    }

    // Ensure we do not have an empty transaction.
    ZX_DEBUG_ASSERT(first_length > 0);

    // Enqueue the first part of the transaction.
    size_t disk_start = start_block_ + 1;
    entries_->AddTransaction(start, disk_start + start, first_length, work);

    // If we wrapped around to the front of the journal,
    // enqueue a second transaction with the remaining data + commit block.
    if (first_length < length) {
        entries_->AddTransaction(0, disk_start, length - first_length, work);
    }
}

uint32_t Journal::GenerateChecksum(size_t header_index, size_t commit_index) {
    ZX_DEBUG_ASSERT(commit_index != header_index);

    size_t first_length = 0;

    // Determine how long the first part of the transaction is.
    if (commit_index < header_index) {
        first_length = entries_->capacity() - header_index;
    } else {
        first_length = commit_index - header_index;
    }

    ZX_DEBUG_ASSERT(first_length > 0);

    // Calculate checksum.
    uint8_t* data_ptr = static_cast<uint8_t*>(entries_->MutableData(header_index));
    uint32_t checksum = crc32(0, data_ptr, first_length * kBlobfsBlockSize);

    // If the transaction wraps around the buffer, update checksum for the second half.
    if (commit_index < header_index) {
        data_ptr = static_cast<uint8_t*>(entries_->MutableData(0));
        checksum = crc32(checksum, data_ptr, commit_index * kBlobfsBlockSize);
    }

    return checksum;
}

fbl::unique_ptr<JournalEntry> Journal::GetNextEntry() {
    fbl::AutoLock lock(&lock_);
    return work_queue_.pop();
}

void Journal::ProcessQueues(JournalProcessor* processor) {
    // Process all entries in the work queue.
    fbl::unique_ptr<JournalEntry> entry;
    while ((entry = GetNextEntry()) != nullptr) {
        // TODO(planders): For each entry that we process, we can potentially verify that the
        //                 indices fit within the expected start/len of the journal buffer, and do
        //                 not collide with other entries.
        processor->ProcessWorkEntry(std::move(entry));
    }

    // Since the processor queues are accessed exclusively by the async thread,
    // we do not need to hold the lock while we access them.

    // If we processed any entries during the work step,
    // enqueue the dummy work to kick off the writeback queue.
    processor->EnqueueWork();

    // TODO(planders): Instead of immediately processing all wait items, wait until some
    //                 condition is fulfilled (e.g. journal is x% full, y total entries are
    //                 waiting, z time has passed, etc.) and write all entries out to disk at
    //                 once.
    // Process all entries in the "wait" queue. These are all transactions with entries that
    // have been enqueued to disk, and are waiting to verify that the write has completed.
    processor->ProcessWaitQueue();

    // TODO(planders): Similarly to the wait queue, instead of immediately processing all delete
    //                 items, wait until some condition is fulfilled and process all journal
    //                 deletions at once.

    // Track which entries have been fully persisted to their final on disk-location. Once we
    // have received verification that they have successfully completed, we can remove them
    // from the journal buffer to make space for new entries.
    processor->ProcessDeleteQueue();

    if (processor->HasError()) {
        {
            fbl::AutoLock lock(&lock_);

            // The thread signalling us should already be setting the Journal to read_only_, but in
            // case we managed to grab the lock first, set it again here.
            state_ = WritebackState::kReadOnly;

            // Reset the journal length to unblock transactions awaiting space,
            // No more writes to the buffer will be allowed.
            entries_->FreeAllSpace();
        }

        // Reset any pending delete requests (if any exist).
        processor->ResetWork();
    } else if (processor->GetBlocksProcessed() > 0) {
        uint64_t start, length;

        {
            fbl::AutoLock lock(&lock_);

            // Update the journal start/len to reflect the number of blocks that have been fully
            // processed.
            entries_->FreeSpace(processor->GetBlocksProcessed());

            start = entries_->start();
            length = entries_->length();
        }

        // The journal start/len have changed, so write out the info block.
        WriteInfo(start, length);

        // After the super block update has been queued for writeback, we can now "delete"
        // the entries that were previously pointed to by the info block. This must be done
        // after the info block write so that the info block does not point to invalid
        // entries.
        processor->EnqueueWork();
    }

    // If we are not in an error state and did not process any blocks, then the
    // JournalProcessor's work should be not have been initialized. This condition will be
    // checked at the beginning of the next call to ProcessQueue.

    // Since none of the methods in the kSync profile indicate that an entry should be added to
    // the next queue, it should be fine to pass a null output queue here.
    processor->ProcessSyncQueue();
}

void Journal::ProcessLoop() {
    JournalProcessor processor(this);
    while (true) {
        ProcessQueues(&processor);

        fbl::AutoLock lock(&lock_);

        // Signal the producer queue that space in the journal has (possibly) been freed up.
        cnd_signal(&producer_cvar_);

        // Before waiting, we should check if we're unmounting.
        if (unmounting_ && work_queue_.is_empty() && processor.IsEmpty() &&
            producer_queue_.is_empty()) {
            // Only return if we are unmounting AND all entries in all queues have been
            // processed. This includes producers which are currently waiting to be enqueued.
            state_ = WritebackState::kComplete;
            break;
        }

        // If we received a signal while we were processing other queues,
        // immediately start processing again.
        if (!consumer_signalled_) {
            cnd_wait(&consumer_cvar_, lock_.GetInternal());
        }

        consumer_signalled_ = false;
    }
}

void JournalProcessor::ProcessWorkEntry(fbl::unique_ptr<JournalEntry> entry) {
    SetContext(ProcessorContext::kWork);
    ProcessResult result = ProcessEntry(entry.get());
    ZX_DEBUG_ASSERT(result == ProcessResult::kContinue);

    // Enqueue the entry into the wait_queue, even in the case of error. This is so that
    // all works contained by journal entries will be processed in the second step, even if
    // we do not plan to send them along to the writeback queue.
    wait_queue_.push(std::move(entry));
}

void JournalProcessor::ProcessWaitQueue() {
    SetContext(ProcessorContext::kWait);
    ProcessQueue(&wait_queue_, &delete_queue_);
}

void JournalProcessor::ProcessDeleteQueue() {
    SetContext(ProcessorContext::kDelete);
    ProcessQueue(&delete_queue_, &sync_queue_);
}
void JournalProcessor::ProcessSyncQueue() {
    SetContext(ProcessorContext::kSync);
    ProcessQueue(&sync_queue_, nullptr);
}

void JournalProcessor::SetContext(ProcessorContext context) {
    if (context_ != context) {
        // If we are switching from the sync profile, sync queue must be empty.
        ZX_DEBUG_ASSERT(context_ != ProcessorContext::kSync || sync_queue_.is_empty());

        switch (context) {
        case ProcessorContext::kDefault:
            ZX_DEBUG_ASSERT(context_ == ProcessorContext::kSync);
            break;
        case ProcessorContext::kWork:
            ZX_DEBUG_ASSERT(context_ == ProcessorContext::kDefault ||
                            context_ == ProcessorContext::kSync);
            break;
        case ProcessorContext::kWait:
            ZX_DEBUG_ASSERT(context_ != ProcessorContext::kDelete);
            break;
        case ProcessorContext::kDelete:
            ZX_DEBUG_ASSERT(context_ == ProcessorContext::kWait);
            break;
        case ProcessorContext::kSync:
            ZX_DEBUG_ASSERT(context_ == ProcessorContext::kDelete);
            break;
        default:
            ZX_DEBUG_ASSERT(false);
        }

        // Make sure that if a WritebackWork was established,
        // it was removed before we attempt to switch profiles.
        ZX_DEBUG_ASSERT(work_ == nullptr);
        blocks_processed_ = 0;
        context_ = context;
    }
}

void JournalProcessor::ProcessQueue(EntryQueue* in_queue, EntryQueue* out_queue) {
    // Process queue entries until there are none left, or we are told to wait.
    while (!in_queue->is_empty()) {
        // Process the entry before removing it from the queue.
        // If its status is kWaiting, we don't want to remove it.
        ProcessResult result = ProcessEntry(&in_queue->front());

        if (result == ProcessResult::kWait) {
            break;
        }

        auto entry = in_queue->pop();

        if (result == ProcessResult::kContinue) {
            ZX_DEBUG_ASSERT(out_queue != nullptr);
            out_queue->push(std::move(entry));
        } else {
            ZX_DEBUG_ASSERT(result == ProcessResult::kRemove);
        }
    }
}

ProcessResult JournalProcessor::ProcessEntry(JournalEntry* entry) {
    ZX_DEBUG_ASSERT(entry != nullptr);

    // Retrieve the entry status once up front so we don't have to keep atomically loading it.
    EntryStatus entry_status = entry->GetStatus();

    if (entry_status == EntryStatus::kWaiting) {
        // If the entry at the front of the queue is still waiting, we are done processing this
        // queue for the time being.
        return ProcessResult::kWait;
    }

    if (error_ && entry_status != EntryStatus::kSync) {
        // If we are in an error state and the entry is not a "sync" entry,
        // set the state to error so we do not do any unnecessary work.
        //
        // Since the error state takes precedence over the entry state,
        // we do not also have to set the entry state to error.
        entry_status = EntryStatus::kError;
    }

    if (entry_status == EntryStatus::kInit && context_ == ProcessorContext::kWork) {
        return ProcessWorkDefault(entry);
    }

    if (entry_status == EntryStatus::kPersisted) {
        if (context_ == ProcessorContext::kWait) {
            return ProcessWaitDefault(entry);
        }

        if (context_ == ProcessorContext::kDelete) {
            return ProcessDeleteDefault(entry);
        }
    }

    if (entry_status == EntryStatus::kSync) {
        if (context_ == ProcessorContext::kSync) {
            return ProcessSyncComplete(entry);
        }

        if (context_ != ProcessorContext::kDefault) {
            return ProcessSyncDefault(entry);
        }
    }

    if (entry_status == EntryStatus::kError) {
        if (context_ == ProcessorContext::kWork) {
            return ProcessErrorDefault();
        }

        if (context_ == ProcessorContext::kWait || context_ == ProcessorContext::kDelete) {
            return ProcessErrorComplete(entry);
        }
    }

    return ProcessUnsupported();
}

ProcessResult JournalProcessor::ProcessWorkDefault(JournalEntry* entry) {
    // If the entry is in the "init" state, we can now prepare its header/commit blocks
    // in the journal buffer.
    journal_->PrepareBuffer(entry);
    EntryStatus last_status = entry->SetStatus(EntryStatus::kWaiting);

    if (last_status == EntryStatus::kError) {
        // If the WritebackThread has failed and set our journal entry to an error
        // state in the time it's taken to prepare the buffer, set error state to
        // true. If we do not check this and continue having set the status to
        // kWaiting, we will never get another callback for this journal entry and
        // we will be stuck forever waiting for it to complete.
        error_ = true;
        entry->SetStatus(EntryStatus::kError);
    } else {
        ZX_DEBUG_ASSERT(last_status == EntryStatus::kInit);
        if (work_ == nullptr) {
            // Prepare a "dummy" work to kick off the writeback queue now that our entry is ready.
            // This is unnecessary in the case of an error, since the writeback queue will already
            // be failing all incoming transactions.
            work_ = journal_->CreateWork();
        }
    }

    return ProcessResult::kContinue;
}

ProcessResult JournalProcessor::ProcessWaitDefault(JournalEntry* entry) {
    EntryStatus last_status = entry->SetStatus(EntryStatus::kWaiting);
    ZX_DEBUG_ASSERT(last_status == EntryStatus::kPersisted);
    fbl::unique_ptr<WritebackWork> work = entry->TakeWork();
    journal_->EnqueueEntryWork(std::move(work));
    return ProcessResult::kContinue;
}

ProcessResult JournalProcessor::ProcessDeleteDefault(JournalEntry* entry) {
    if (work_ == nullptr) {
        // Use this work to enqueue any "delete" transactions we may encounter,
        // to be written after the info block is updated.
        work_ = journal_->CreateWork();
    }

    // The entry has now been fully persisted to disk, so we can remove the entry from
    // the journal. To ensure that it does not later get replayed unnecessarily, clear
    // out the header and commit blocks.
    journal_->PrepareDelete(entry, work_.get());

    // Track the number of blocks that have been fully processed so we can update the buffer.
    blocks_processed_ += entry->BlockCount();

    // We have fully processed this entry - do not add it to the next queue.
    return ProcessResult::kRemove;
}

ProcessResult JournalProcessor::ProcessSyncDefault(JournalEntry* entry) {
    // This is a sync request. Since there is no actual data to update,
    // we can just verify it and send it along to the next queue.
    ZX_DEBUG_ASSERT(entry->BlockCount() == 0);
    ZX_DEBUG_ASSERT(entry->GetHeaderIndex() == journal_->GetCapacity());
    ZX_DEBUG_ASSERT(entry->GetCommitIndex() == journal_->GetCapacity());

    // Always push the sync entry into the output queue.
    return ProcessResult::kContinue;
}

ProcessResult JournalProcessor::ProcessSyncComplete(JournalEntry* entry) {
    // Call the default sync method to ensure the entry matches what we expect.
    ProcessSyncDefault(entry);

    // Remove and enqueue the sync work.
    fbl::unique_ptr<WritebackWork> work = entry->TakeWork();
    journal_->EnqueueEntryWork(std::move(work));

    // The sync entry is complete; do not re-enqueue it.
    return ProcessResult::kRemove;
}

ProcessResult JournalProcessor::ProcessErrorDefault() {
    error_ = true;
    return ProcessResult::kContinue;
}

ProcessResult JournalProcessor::ProcessErrorComplete(JournalEntry* entry) {
    // If we are in an error state, force reset the entry's work. This will remove all
    // requests and call the sync closure (if it exists), thus completing this entry.
    entry->ForceReset();
    error_ = true;

    // Since all work is completed for this entry, we no longer need to send it along
    // to the next queue. Instead proceed to process the next entry.
    return ProcessResult::kRemove;
}

ProcessResult JournalProcessor::ProcessUnsupported() {
    ZX_ASSERT(false);
    return ProcessResult::kRemove;
}

} // blobfs
