| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <blobfs/journal.h> |
| #include <fbl/auto_lock.h> |
| #include <fbl/unique_ptr.h> |
| #include <lib/cksum.h> |
| #include <lib/sync/completion.h> |
| #include <zircon/types.h> |
| |
| #include <utility> |
| |
| namespace blobfs { |
| |
| namespace { |
| void DumpJournalInfo(const JournalInfo& info, FILE* out) { |
| if (out == nullptr) { |
| return; |
| } |
| |
| fprintf(out, |
| "JournalInfo.magic: %" PRIu64 "\n" |
| "JournalInfo.start_block: %" PRIu64 "\n" |
| "JournalInfo.num_blocks: %" PRIu64 "\n" |
| "JournalInfo.timestamp: %" PRIu64 "\n" |
| "JournalInfo.checksum: %" PRIu32 "\n", |
| info.magic, info.start_block, info.num_blocks, info.timestamp, info.checksum); |
| } |
| |
| void DumpHeaderBlock(const HeaderBlock& header, FILE* out, bool print_target_blocks = false) { |
| fprintf(out, |
| "HeaderBlock.magic: %" PRIu64 "\n" |
| "HeaderBlock.timestamp: %" PRIu64 "\n" |
| "HeaderBlock.reserved: %" PRIu64 "\n" |
| "HeaderBlock.num_blocks: %" PRIu64 "\n", |
| header.magic, header.timestamp, header.reserved, header.num_blocks); |
| |
| // Valid or not, we print all the target_block. This may be noisy so |
| // printing target_blocks is disabled by default. |
| if (!print_target_blocks) { |
| return; |
| } |
| |
| for (uint32_t i = 0; i < kMaxEntryDataBlocks; i++) { |
| fprintf(out, "HeaderBlock.target_block[%" PRIu32 "]: %" PRIu64 "\n", i, |
| header.target_blocks[i]); |
| } |
| } |
| |
| } // namespace |
| |
| // TODO(ZX-2415): Add tracing/metrics collection to journal related operations. |
| |
| // Thread which asynchronously processes journal entries. |
| static int JournalThread(void* arg) { |
| Journal* journal = reinterpret_cast<Journal*>(arg); |
| journal->ProcessLoop(); |
| return 0; |
| } |
| |
| zx_status_t Journal::Create(TransactionManager* transaction_manager, uint64_t journal_blocks, |
| uint64_t start_block, fbl::unique_ptr<Journal>* out) { |
| // Create the buffer with 1 less than total journal blocks. |
| // (1 block must be reserved for journal info). |
| zx_status_t status; |
| fbl::unique_ptr<Buffer> buffer; |
| if ((status = Buffer::Create(transaction_manager, journal_blocks - 1, "blobfs-journal", |
| &buffer)) != ZX_OK) { |
| return status; |
| } |
| |
| // Create another buffer for the journal info block. |
| VmoBuffer info; |
| status = info.Initialize(transaction_manager, 1, "blobfs-journal-info"); |
| if (status != ZX_OK) { |
| return status; |
| } |
| |
| // Create the Journal with the newly created vmos. |
| fbl::unique_ptr<Journal> journal(new Journal(transaction_manager, std::move(info), |
| std::move(buffer), start_block)); |
| |
| // Load contents of journal from disk. |
| if ((status = journal->Load()) != ZX_OK) { |
| FS_TRACE_ERROR("Journal: Failed to load from disk: %d\n", status); |
| return status; |
| } |
| |
| *out = std::move(journal); |
| return ZX_OK; |
| } |
| |
| Journal::~Journal() { |
| if (IsRunning()) { |
| Teardown(); |
| } |
| |
| // Ensure that thread teardown has completed, or that it was never brought up to begin with. |
| ZX_DEBUG_ASSERT(!IsRunning()); |
| |
| // Ensure that work and producer queues are currently empty. |
| ZX_DEBUG_ASSERT(work_queue_.is_empty()); |
| ZX_DEBUG_ASSERT(producer_queue_.is_empty()); |
| } |
| |
| zx_status_t Journal::Teardown() { |
| WritebackState state; |
| |
| { |
| fbl::AutoLock lock(&lock_); |
| if (unmounting_) { |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| state = state_; |
| |
| // Signal the background thread. |
| unmounting_ = true; |
| cnd_signal(&consumer_cvar_); |
| } |
| |
| zx_status_t status = ZX_OK; |
| if (state != WritebackState::kInit && state != WritebackState::kReady) { |
| // Block until the thread completes itself. |
| int result = -1; |
| int success = thrd_join(thread_, &result); |
| if (result != 0 || success != thrd_success) { |
| status = ZX_ERR_INTERNAL; |
| } |
| } |
| |
| return status; |
| } |
| |
| zx_status_t Journal::Load() { |
| fbl::AutoLock lock(&lock_); |
| ZX_DEBUG_ASSERT(state_ == WritebackState::kInit); |
| |
| // Load info block and journal entries into their respective buffers. |
| fs::ReadTxn txn(transaction_manager_); |
| txn.Enqueue(info_.vmoid(), 0, start_block_, info_.capacity()); |
| entries_->Load(&txn, start_block_ + 1); |
| zx_status_t status = txn.Transact(); |
| |
| if (status != ZX_OK) { |
| return status; |
| } |
| |
| JournalInfo* info = GetInfo(); |
| |
| // Verify the journal magic matches. |
| if (info->magic != kJournalMagic) { |
| FS_TRACE_ERROR("Journal info bad magic\n"); |
| DumpJournalInfo(*info, stderr); |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| if (info->start_block > 0 || info->num_blocks > 0 || info->timestamp > 0) { |
| // Who checks the checksum? (It's us. We are doing it.) |
| uint32_t old_checksum = info->checksum; |
| info->checksum = 0; |
| |
| uint8_t* info_ptr = reinterpret_cast<uint8_t*>(info); |
| info->checksum = crc32(0, info_ptr, sizeof(JournalInfo)); |
| |
| if (old_checksum != info->checksum) { |
| FS_TRACE_ERROR("Journal info checksum corrupt\n"); |
| DumpJournalInfo(*info, stderr); |
| return ZX_ERR_BAD_STATE; |
| } |
| } |
| |
| return status; |
| } |
| |
| zx_status_t Journal::Replay() { |
| fbl::AutoLock lock(&lock_); |
| ZX_DEBUG_ASSERT(state_ == WritebackState::kInit); |
| |
| uint64_t timestamp = 0; |
| size_t start = GetInfo()->start_block; |
| size_t length = GetInfo()->num_blocks; |
| size_t total_entries = 0; |
| size_t total_blocks = 0; |
| |
| // Replay entries until we find one that isn't valid. |
| while (true) { |
| uint64_t entry_blocks; |
| // |start| is the header index of the next entry. |
| zx_status_t status = ReplayEntry(start, length, &entry_blocks, ×tamp); |
| if (status == ZX_ERR_OUT_OF_RANGE) { |
| break; |
| } else if (status != ZX_OK) { |
| return status; |
| } |
| |
| total_entries++; |
| total_blocks += entry_blocks; |
| |
| start += entry_blocks; |
| start %= entries_->capacity(); |
| |
| if (length) { |
| length -= entry_blocks; |
| } |
| } |
| |
| // TODO(planders): Sync to ensure that all entries have been written out before resetting the |
| // on-disk state of the journal. |
| if (total_entries > 0) { |
| printf("Found and replayed %zu total blobfs journal entries starting from index %zu, " |
| "including %zu total blocks.\n", |
| total_entries, GetInfo()->start_block, total_blocks); |
| } else if (start == 0 && length == 0) { |
| // If no entries were found and journal is already in its default state, |
| // return without writing out any changes. |
| state_ = WritebackState::kReady; |
| return ZX_OK; |
| } |
| |
| // We expect length to be 0 at this point, assuming the journal was not corrupted and replay |
| // completed successfully. However, in the case of corruption of the journal this may not be the |
| // case. Since we cannot currently recover from this situation we should proceed as normal. |
| zx_status_t status = CommitReplay(); |
| if (status != ZX_OK) { |
| return status; |
| } |
| |
| // Now that we've resolved any remaining entries, we are ready to start journal writeback. |
| state_ = WritebackState::kReady; |
| return ZX_OK; |
| } |
| |
| zx_status_t Journal::InitWriteback() { |
| fbl::AutoLock lock(&lock_); |
| ZX_DEBUG_ASSERT(state_ == WritebackState::kReady); |
| |
| if (entries_->start() > 0 || entries_->length() > 0) { |
| FS_TRACE_ERROR("Cannot initialize journal writeback - entries may still exist.\n"); |
| return ZX_ERR_BAD_STATE; |
| } |
| |
| if (thrd_create_with_name(&thread_, JournalThread, this, "blobfs-journal") != |
| thrd_success) { |
| FS_TRACE_ERROR("Failed to create journal thread.\n"); |
| return ZX_ERR_NO_RESOURCES; |
| } |
| |
| state_ = WritebackState::kRunning; |
| return ZX_OK; |
| } |
| |
| zx_status_t Journal::Enqueue(fbl::unique_ptr<WritebackWork> work) { |
| // Verify that the work exists and has not already been prepared for writeback. |
| ZX_DEBUG_ASSERT(work != nullptr); |
| ZX_DEBUG_ASSERT(!work->Transaction().IsBuffered()); |
| |
| // Block count will be the number of blocks in the transaction + header + commit. |
| size_t blocks = work->Transaction().BlkCount(); |
| // By default set the header/commit indices to the buffer capacity, |
| // since this will be an invalid index value. |
| size_t header_index = entries_->capacity(); |
| size_t commit_index = entries_->capacity(); |
| |
| fbl::AutoLock lock(&lock_); |
| |
| zx_status_t status = ZX_OK; |
| if (IsReadOnly()) { |
| // If we are in "read only" mode, set an error status. |
| status = ZX_ERR_BAD_STATE; |
| } else if (blocks) { |
| ZX_DEBUG_ASSERT(state_ == WritebackState::kRunning); |
| |
| // If the work contains no blocks (i.e. it is a sync work), proceed to create an entry |
| // without enqueueing any data to the buffer. |
| |
| // Add 2 blocks to the block count for the journal entry's header/commit blocks. |
| blocks += 2; |
| |
| // Ensure we have enough space to write the current entry to the buffer. |
| // If not, wait until space becomes available. |
| EnsureSpaceLocked(blocks); |
| |
| if (IsReadOnly()) { |
| // The Journal is in a bad state and is no longer accepting new entries. |
| status = ZX_ERR_BAD_STATE; |
| } else { |
| // Assign header index of journal entry to the next available value before we attempt to |
| // copy the meat of the entry to the buffer. |
| header_index = entries_->ReserveIndex(); |
| |
| // Copy the data from WritebackWork to the journal buffer. We can wait to write out the |
| // header and commit blocks asynchronously, since this will involve calculating the |
| // checksum. |
| // TODO(planders): Release the lock while transaction is being copied. |
| entries_->CopyTransaction(&work->Transaction()); |
| |
| // Assign commit_index immediately after copying to the buffer. |
| // Increase length_ accordingly. |
| commit_index = entries_->ReserveIndex(); |
| |
| // Make sure that commit index matches what we expect |
| // based on header index, block count, and buffer size. |
| ZX_DEBUG_ASSERT(commit_index == (header_index + blocks - 1) % entries_->capacity()); |
| } |
| } |
| |
| // Create the journal entry and push it onto the work queue. |
| fbl::unique_ptr<JournalEntry> entry = CreateEntry(header_index, commit_index, std::move(work)); |
| |
| if (entry->GetStatus() == EntryStatus::kInit) { |
| // If we have a non-sync work, there is some extra preparation we need to do. |
| if (status == ZX_OK) { |
| // Prepare a WritebackWork to write out the entry to disk. Note that this does not |
| // fully prepare the buffer for writeback, so a ready callback is added to the work as |
| // part of this step. |
| PrepareWork(entry.get(), &work); |
| ZX_DEBUG_ASSERT(work != nullptr); |
| status = EnqueueEntryWork(std::move(work)); |
| } else { |
| // If the status is not okay (i.e. we are in a readonly state), do no additional |
| // processing but set the entry state to error. |
| entry->SetStatus(EntryStatus::kError); |
| } |
| } |
| |
| // Queue the entry to be processed asynchronously. |
| work_queue_.push(std::move(entry)); |
| |
| // Signal the JournalThread that there is at least one entry ready to be processed. |
| SendSignalLocked(status); |
| return status; |
| } |
| |
| void Journal::SendSignalLocked(zx_status_t status) { |
| if (status == ZX_OK) { |
| // Once writeback has entered a read only state, no further transactions should succeed. |
| ZX_ASSERT(state_ != WritebackState::kReadOnly); |
| } else { |
| state_ = WritebackState::kReadOnly; |
| } |
| consumer_signalled_ = true; |
| cnd_signal(&consumer_cvar_); |
| } |
| |
| bool Journal::IsRunning() const { |
| switch (state_) { |
| case WritebackState::kRunning: |
| case WritebackState::kReadOnly: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| fbl::unique_ptr<JournalEntry> Journal::CreateEntry(uint64_t header_index, uint64_t commit_index, |
| fbl::unique_ptr<WritebackWork> work) { |
| EntryStatus status = EntryStatus::kInit; |
| |
| if (work->Transaction().BlkCount() == 0) { |
| // If the work has no transactions, this is a sync work - we can return early. |
| // Right now we make the assumption that if a WritebackWork has any transactions, it cannot |
| // have a corresponding sync callback. We may need to revisit this later. |
| status = EntryStatus::kSync; |
| } else if (IsReadOnly()) { |
| // If the journal is in a read only state, set the entry status to error. |
| status = EntryStatus::kError; |
| } |
| |
| return std::make_unique<JournalEntry>(this, status, header_index, commit_index, |
| std::move(work)); |
| } |
| |
| void Journal::PrepareWork(JournalEntry* entry, fbl::unique_ptr<WritebackWork>* out) { |
| size_t header_index = entry->GetHeaderIndex(); |
| size_t commit_index = entry->GetCommitIndex(); |
| size_t block_count = entry->BlockCount(); |
| |
| if (block_count == 0) { |
| // If journal entry has size 0, it is an empty sync entry, and we don't need to write |
| // anything to the journal. |
| ZX_DEBUG_ASSERT(header_index == entries_->capacity()); |
| ZX_DEBUG_ASSERT(commit_index == entries_->capacity()); |
| return; |
| } |
| |
| fbl::unique_ptr<WritebackWork> work = CreateWork(); |
| |
| // Update work with transactions for the current entry. |
| AddEntryTransaction(header_index, block_count, work.get()); |
| |
| // Make sure the work is prepared for the writeback queue. |
| work->SetReadyCallback(entry->CreateReadyCallback()); |
| work->SetSyncCallback(entry->CreateSyncCallback()); |
| *out = std::move(work); |
| } |
| |
| void Journal::ProcessEntryResult(zx_status_t result, JournalEntry* entry) { |
| fbl::AutoLock lock(&lock_); |
| // Since it is possible for the entry to be deleted immediately after updating its status |
| // (if the journal is being processed at the time), it is safer to update the entry status |
| // under the journal lock. |
| entry->SetStatusFromResult(result); |
| SendSignalLocked(result); |
| } |
| |
| void Journal::PrepareBuffer(JournalEntry* entry) { |
| size_t header_index = entry->GetHeaderIndex(); |
| size_t commit_index = entry->GetCommitIndex(); |
| size_t block_count = entry->BlockCount(); |
| |
| if (block_count == 0) { |
| // If journal entry has size 0, it is an empty sync entry, and we don't need to write |
| // anything to the journal. |
| ZX_DEBUG_ASSERT(header_index == entries_->capacity()); |
| ZX_DEBUG_ASSERT(commit_index == entries_->capacity()); |
| return; |
| } |
| |
| // Copy header block of the journal entry into the journal buffer. We must write the header |
| // block into the buffer before the commit block so we can generate the checksum. |
| void* data = entries_->Data(header_index); |
| memset(data, 0, kBlobfsBlockSize); |
| memcpy(data, &entry->GetHeaderBlock(), sizeof(HeaderBlock)); |
| |
| // Now that the header block has been written to the buffer, we can calculate a checksum for |
| // the header + all journaled metadata blocks and set it in the entry's commit block. |
| entry->SetChecksum(GenerateChecksum(header_index, commit_index)); |
| |
| // Write the commit block (now with checksum) to the journal buffer. |
| data = entries_->Data(commit_index); |
| memset(data, 0, kBlobfsBlockSize); |
| memcpy(data, &entry->GetCommitBlock(), sizeof(CommitBlock)); |
| } |
| |
| void Journal::PrepareDelete(JournalEntry* entry, WritebackWork* work) { |
| ZX_DEBUG_ASSERT(work != nullptr); |
| size_t header_index = entry->GetHeaderIndex(); |
| size_t commit_index = entry->GetCommitIndex(); |
| size_t block_count = entry->BlockCount(); |
| |
| if (block_count == 0) { |
| // If journal entry has size 0, it is an empty sync entry, and we don't need to write |
| // anything to the journal. |
| ZX_DEBUG_ASSERT(header_index == entries_->capacity()); |
| ZX_DEBUG_ASSERT(commit_index == entries_->capacity()); |
| return; |
| } |
| |
| // Overwrite the header & commit block in the buffer with empty data. |
| memset(entries_->Data(header_index), 0, kBlobfsBlockSize); |
| memset(entries_->Data(commit_index), 0, kBlobfsBlockSize); |
| |
| // Enqueue transactions for the header/commit blocks. |
| entries_->AddTransaction(header_index, start_block_ + 1 + header_index, 1, |
| &work->Transaction()); |
| entries_->AddTransaction(commit_index, start_block_ + 1 + commit_index, 1, |
| &work->Transaction()); |
| } |
| |
| fbl::unique_ptr<WritebackWork> Journal::CreateWork() { |
| fbl::unique_ptr<WritebackWork> work; |
| transaction_manager_->CreateWork(&work, nullptr); |
| ZX_DEBUG_ASSERT(work != nullptr); |
| return work; |
| } |
| |
| zx_status_t Journal::EnqueueEntryWork(fbl::unique_ptr<WritebackWork> work) { |
| entries_->ValidateTransaction(&work->Transaction()); |
| return transaction_manager_->EnqueueWork(std::move(work), EnqueueType::kData); |
| } |
| |
| bool Journal::VerifyEntryMetadata(size_t header_index, uint64_t last_timestamp, bool expect_valid) { |
| HeaderBlock* header = GetHeaderBlock(header_index); |
| // If length_ > 0, the next entry should be guaranteed. |
| if (header->magic != kEntryHeaderMagic || header->timestamp <= last_timestamp) { |
| // If the next calculated header block is either 1) not a header block, or 2) does not |
| // have a timestamp strictly later than the previous entry, it is not a valid entry and |
| // should not be replayed. This is only a journal replay "error" if, according to the |
| // journal super block, we still have some entries left to process (i.e. length_ > 0). |
| if (expect_valid) { |
| FS_TRACE_ERROR("Journal Replay Error: invalid header found.\n"); |
| } |
| |
| return false; |
| } |
| |
| size_t commit_index = (header_index + header->num_blocks + 1) % entries_->capacity(); |
| CommitBlock* commit = GetCommitBlock(commit_index); |
| |
| if (commit->magic != kEntryCommitMagic) { |
| FS_TRACE_ERROR("Journal Replay Error: commit magic does not match expected\n"); |
| return false; |
| } |
| |
| if (commit->timestamp != header->timestamp) { |
| FS_TRACE_ERROR("Journal Replay Error: commit timestamp does not match expected\n"); |
| DumpHeaderBlock(*header, stderr); |
| return false; |
| } |
| |
| // Calculate the checksum of the entry data to verify the commit block's checksum. |
| uint32_t checksum = GenerateChecksum(header_index, commit_index); |
| |
| // Since we already found a valid header, we expect this to be a valid entry. If something |
| // in the commit block does not match what we expect, this is an error. |
| if (commit->checksum != checksum) { |
| FS_TRACE_ERROR("Journal Replay Error: commit checksum does not match expected\n"); |
| DumpHeaderBlock(*header, stderr); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| zx_status_t Journal::ReplayEntry(size_t header_index, size_t remaining_length, |
| uint64_t* entry_blocks, uint64_t* timestamp) { |
| ZX_DEBUG_ASSERT(state_ == WritebackState::kInit); |
| |
| bool expect_valid = remaining_length > 0; |
| if (!VerifyEntryMetadata(header_index, *timestamp, expect_valid)) { |
| return ZX_ERR_OUT_OF_RANGE; |
| } |
| |
| HeaderBlock* header = GetHeaderBlock(header_index); |
| *timestamp = header->timestamp; |
| *entry_blocks = header->num_blocks + 2; |
| // We have found a valid entry - ensure that remaining_length is valid |
| // (either 0 remaining, or enough to fit this entry). |
| ZX_DEBUG_ASSERT(remaining_length == 0 || remaining_length >= *entry_blocks); |
| |
| fbl::unique_ptr<WritebackWork> work = CreateWork(); |
| |
| // Enqueue one block at a time, since they may not end up being contiguous on disk. |
| for (unsigned i = 0; i < header->num_blocks; i++) { |
| size_t vmo_block = (header_index + i + 1) % entries_->capacity(); |
| entries_->AddTransaction(vmo_block, header->target_blocks[i], 1, &work->Transaction()); |
| } |
| |
| // Replay (and therefore mount) will fail if we cannot enqueue the replay work. Since the |
| // journal itself is not corrupt (at least up to this point), we would expect replay to |
| // succeed on a subsequent attempt, so we should keep any existing entries intact. (i.e., |
| // do not reset the journal metadata in this failure case). |
| zx_status_t status = EnqueueEntryWork(std::move(work)); |
| if (status != ZX_OK) { |
| FS_TRACE_ERROR("Journal replay failed with status %d\n", status); |
| } |
| |
| return status; |
| } |
| |
| zx_status_t Journal::CommitReplay() { |
| ZX_DEBUG_ASSERT(state_ == WritebackState::kInit); |
| |
| // Overwrite the first journal entry block to 0. Since we are resetting the info block to point |
| // to 0 as the first entry, we expect that block 0 will not contain a valid entry. Overwriting |
| // it will ensure that this is not the case. |
| memset(entries_->Data(0), 0, kBlobfsBlockSize); |
| fbl::unique_ptr<WritebackWork> work = CreateWork(); |
| |
| entries_->AddTransaction(0, start_block_ + 1, 1, &work->Transaction()); |
| |
| zx_status_t status; |
| if ((status = EnqueueEntryWork(std::move(work))) != ZX_OK) { |
| FS_TRACE_ERROR("Journal replay failed with status %d\n", status); |
| return status; |
| } |
| |
| // Write out the updated info block to disk. |
| if ((status = WriteInfo(entries_->start(), entries_->length())) != ZX_OK) { |
| FS_TRACE_ERROR("Journal replay failed with status %d\n", status); |
| return status; |
| } |
| |
| // Wait for any replayed entries to complete before completing replay. |
| work = CreateWork(); |
| sync_completion_t completion; |
| sync_completion_reset(&completion); |
| |
| work->SetSyncCallback([&completion, &status](zx_status_t new_status) { |
| status = new_status; |
| sync_completion_signal(&completion); |
| }); |
| |
| if ((status = EnqueueEntryWork(std::move(work))) != ZX_OK) { |
| FS_TRACE_ERROR("Journal replay failed with status %d\n", status); |
| return status; |
| } |
| |
| sync_completion_wait(&completion, ZX_TIME_INFINITE); |
| |
| // Return a successful status, even if we detected corrupt metadata or entries. |
| // Our metadata should still be in a consistent state so it will be safe to mount regardless. |
| return ZX_OK; |
| } |
| |
| zx_status_t Journal::WriteInfo(uint64_t start, uint64_t length) { |
| JournalInfo* info = GetInfo(); |
| |
| if (start == info->start_block && length == info->num_blocks) { |
| // If the current buffer start/len match the info block, skip the writing step. |
| return ZX_OK; |
| } |
| |
| fbl::unique_ptr<WritebackWork> work; |
| transaction_manager_->CreateWork(&work, nullptr); |
| |
| info->start_block = start; |
| info->num_blocks = length; |
| info->timestamp = zx_ticks_get(); |
| |
| // Set the checksum to 0 so we can calculate the checksum of the rest of the info block. |
| info->checksum = 0; |
| uint8_t* info_ptr = reinterpret_cast<uint8_t*>(info); |
| info->checksum = crc32(0, info_ptr, sizeof(JournalInfo)); |
| |
| work->Transaction().Enqueue(info_.vmo(), 0, start_block_, 1); |
| return transaction_manager_->EnqueueWork(std::move(work), EnqueueType::kData); |
| } |
| |
| void Journal::EnsureSpaceLocked(size_t blocks) { |
| while (!entries_->IsSpaceAvailable(blocks)) { |
| // Not enough room to write back work, yet. Wait until room is available. |
| Waiter w; |
| producer_queue_.push(&w); |
| |
| do { |
| cnd_wait(&producer_cvar_, lock_.GetInternal()); |
| } while ((&producer_queue_.front() != &w) && // We are first in line to enqueue... |
| (!entries_->IsSpaceAvailable(blocks))); // ... and there is enough space for us. |
| |
| producer_queue_.pop(); |
| } |
| } |
| |
| void Journal::AddEntryTransaction(size_t start, size_t length, WritebackWork* work) { |
| // Ensure the request fits within the buffer. |
| ZX_DEBUG_ASSERT(start < entries_->capacity()); |
| ZX_DEBUG_ASSERT(length > 0); |
| ZX_DEBUG_ASSERT(length < entries_->capacity()); |
| ZX_DEBUG_ASSERT(work != nullptr); |
| |
| // Adjust the length of the first transaction in |
| // case it wraps around to the front of the buffer. |
| size_t first_length = length; |
| |
| if (start + length > entries_->capacity()) { |
| first_length = entries_->capacity() - start; |
| } |
| |
| // Ensure we do not have an empty transaction. |
| ZX_DEBUG_ASSERT(first_length > 0); |
| |
| // Enqueue the first part of the transaction. |
| size_t disk_start = start_block_ + 1; |
| entries_->AddTransaction(start, disk_start + start, first_length, &work->Transaction()); |
| |
| // If we wrapped around to the front of the journal, |
| // enqueue a second transaction with the remaining data + commit block. |
| if (first_length < length) { |
| entries_->AddTransaction(0, disk_start, length - first_length, &work->Transaction()); |
| } |
| } |
| |
| uint32_t Journal::GenerateChecksum(size_t header_index, size_t commit_index) { |
| ZX_DEBUG_ASSERT(commit_index != header_index); |
| |
| size_t first_length = 0; |
| |
| // Determine how long the first part of the transaction is. |
| if (commit_index < header_index) { |
| first_length = entries_->capacity() - header_index; |
| } else { |
| first_length = commit_index - header_index; |
| } |
| |
| ZX_DEBUG_ASSERT(first_length > 0); |
| |
| // Calculate checksum. |
| uint8_t* data_ptr = static_cast<uint8_t*>(entries_->Data(header_index)); |
| uint32_t checksum = crc32(0, data_ptr, first_length * kBlobfsBlockSize); |
| |
| // If the transaction wraps around the buffer, update checksum for the second half. |
| if (commit_index < header_index) { |
| data_ptr = static_cast<uint8_t*>(entries_->Data(0)); |
| checksum = crc32(checksum, data_ptr, commit_index * kBlobfsBlockSize); |
| } |
| |
| return checksum; |
| } |
| |
| fbl::unique_ptr<JournalEntry> Journal::GetNextEntry() { |
| fbl::AutoLock lock(&lock_); |
| return work_queue_.pop(); |
| } |
| |
| void Journal::ProcessQueues(JournalProcessor* processor) { |
| // Process all entries in the work queue. |
| fbl::unique_ptr<JournalEntry> entry; |
| while ((entry = GetNextEntry()) != nullptr) { |
| // TODO(planders): For each entry that we process, we can potentially verify that the |
| // indices fit within the expected start/len of the journal buffer, and do |
| // not collide with other entries. |
| processor->ProcessWorkEntry(std::move(entry)); |
| } |
| |
| // Since the processor queues are accessed exclusively by the async thread, |
| // we do not need to hold the lock while we access them. |
| |
| // If we processed any entries during the work step, |
| // enqueue the dummy work to kick off the writeback queue. |
| processor->EnqueueWork(); |
| |
| // TODO(planders): Instead of immediately processing all wait items, wait until some |
| // condition is fulfilled (e.g. journal is x% full, y total entries are |
| // waiting, z time has passed, etc.) and write all entries out to disk at |
| // once. |
| // Process all entries in the "wait" queue. These are all transactions with entries that |
| // have been enqueued to disk, and are waiting to verify that the write has completed. |
| processor->ProcessWaitQueue(); |
| |
| // TODO(planders): Similarly to the wait queue, instead of immediately processing all delete |
| // items, wait until some condition is fulfilled and process all journal |
| // deletions at once. |
| |
| // Track which entries have been fully persisted to their final on disk-location. Once we |
| // have received verification that they have successfully completed, we can remove them |
| // from the journal buffer to make space for new entries. |
| processor->ProcessDeleteQueue(); |
| |
| if (processor->HasError()) { |
| { |
| fbl::AutoLock lock(&lock_); |
| |
| // The thread signalling us should already be setting the Journal to read_only_, but in |
| // case we managed to grab the lock first, set it again here. |
| state_ = WritebackState::kReadOnly; |
| |
| // Reset the journal length to unblock transactions awaiting space, |
| // No more writes to the buffer will be allowed. |
| entries_->FreeAllSpace(); |
| } |
| |
| // Reset any pending delete requests (if any exist). |
| processor->ResetWork(); |
| } else if (processor->GetBlocksProcessed() > 0) { |
| uint64_t start, length; |
| |
| { |
| fbl::AutoLock lock(&lock_); |
| |
| // Update the journal start/len to reflect the number of blocks that have been fully |
| // processed. |
| entries_->FreeSpace(processor->GetBlocksProcessed()); |
| |
| start = entries_->start(); |
| length = entries_->length(); |
| } |
| |
| // The journal start/len have changed, so write out the info block. |
| WriteInfo(start, length); |
| |
| // After the super block update has been queued for writeback, we can now "delete" |
| // the entries that were previously pointed to by the info block. This must be done |
| // after the info block write so that the info block does not point to invalid |
| // entries. |
| processor->EnqueueWork(); |
| } |
| |
| // If we are not in an error state and did not process any blocks, then the |
| // JournalProcessor's work should be not have been initialized. This condition will be |
| // checked at the beginning of the next call to ProcessQueue. |
| |
| // Since none of the methods in the kSync profile indicate that an entry should be added to |
| // the next queue, it should be fine to pass a null output queue here. |
| processor->ProcessSyncQueue(); |
| } |
| |
| void Journal::ProcessLoop() { |
| JournalProcessor processor(this); |
| while (true) { |
| ProcessQueues(&processor); |
| |
| fbl::AutoLock lock(&lock_); |
| |
| // Signal the producer queue that space in the journal has (possibly) been freed up. |
| cnd_signal(&producer_cvar_); |
| |
| // Before waiting, we should check if we're unmounting. |
| if (unmounting_ && work_queue_.is_empty() && processor.IsEmpty() && |
| producer_queue_.is_empty()) { |
| // Only return if we are unmounting AND all entries in all queues have been |
| // processed. This includes producers which are currently waiting to be enqueued. |
| state_ = WritebackState::kComplete; |
| break; |
| } |
| |
| // If we received a signal while we were processing other queues, |
| // immediately start processing again. |
| if (!consumer_signalled_) { |
| cnd_wait(&consumer_cvar_, lock_.GetInternal()); |
| } |
| |
| consumer_signalled_ = false; |
| } |
| } |
| |
| void JournalProcessor::ProcessWorkEntry(fbl::unique_ptr<JournalEntry> entry) { |
| SetContext(ProcessorContext::kWork); |
| ProcessResult result = ProcessEntry(entry.get()); |
| ZX_DEBUG_ASSERT(result == ProcessResult::kContinue); |
| |
| // Enqueue the entry into the wait_queue, even in the case of error. This is so that |
| // all works contained by journal entries will be processed in the second step, even if |
| // we do not plan to send them along to the writeback queue. |
| wait_queue_.push(std::move(entry)); |
| } |
| |
| void JournalProcessor::ProcessWaitQueue() { |
| SetContext(ProcessorContext::kWait); |
| ProcessQueue(&wait_queue_, &delete_queue_); |
| } |
| |
| void JournalProcessor::ProcessDeleteQueue() { |
| SetContext(ProcessorContext::kDelete); |
| ProcessQueue(&delete_queue_, &sync_queue_); |
| } |
| void JournalProcessor::ProcessSyncQueue() { |
| SetContext(ProcessorContext::kSync); |
| ProcessQueue(&sync_queue_, nullptr); |
| } |
| |
| void JournalProcessor::SetContext(ProcessorContext context) { |
| if (context_ != context) { |
| // If we are switching from the sync profile, sync queue must be empty. |
| ZX_DEBUG_ASSERT(context_ != ProcessorContext::kSync || sync_queue_.is_empty()); |
| |
| switch (context) { |
| case ProcessorContext::kDefault: |
| ZX_DEBUG_ASSERT(context_ == ProcessorContext::kSync); |
| break; |
| case ProcessorContext::kWork: |
| ZX_DEBUG_ASSERT(context_ == ProcessorContext::kDefault || |
| context_ == ProcessorContext::kSync); |
| break; |
| case ProcessorContext::kWait: |
| ZX_DEBUG_ASSERT(context_ != ProcessorContext::kDelete); |
| break; |
| case ProcessorContext::kDelete: |
| ZX_DEBUG_ASSERT(context_ == ProcessorContext::kWait); |
| break; |
| case ProcessorContext::kSync: |
| ZX_DEBUG_ASSERT(context_ == ProcessorContext::kDelete); |
| break; |
| default: |
| ZX_DEBUG_ASSERT(false); |
| } |
| |
| // Make sure that if a WritebackWork was established, |
| // it was removed before we attempt to switch profiles. |
| ZX_DEBUG_ASSERT(work_ == nullptr); |
| blocks_processed_ = 0; |
| context_ = context; |
| } |
| } |
| |
| void JournalProcessor::ProcessQueue(EntryQueue* in_queue, EntryQueue* out_queue) { |
| // Process queue entries until there are none left, or we are told to wait. |
| while (!in_queue->is_empty()) { |
| // Process the entry before removing it from the queue. |
| // If its status is kWaiting, we don't want to remove it. |
| ProcessResult result = ProcessEntry(&in_queue->front()); |
| |
| if (result == ProcessResult::kWait) { |
| break; |
| } |
| |
| auto entry = in_queue->pop(); |
| |
| if (result == ProcessResult::kContinue) { |
| ZX_DEBUG_ASSERT(out_queue != nullptr); |
| out_queue->push(std::move(entry)); |
| } else { |
| ZX_DEBUG_ASSERT(result == ProcessResult::kRemove); |
| } |
| } |
| } |
| |
| ProcessResult JournalProcessor::ProcessEntry(JournalEntry* entry) { |
| ZX_DEBUG_ASSERT(entry != nullptr); |
| |
| // Retrieve the entry status once up front so we don't have to keep atomically loading it. |
| EntryStatus entry_status = entry->GetStatus(); |
| |
| if (entry_status == EntryStatus::kWaiting) { |
| // If the entry at the front of the queue is still waiting, we are done processing this |
| // queue for the time being. |
| return ProcessResult::kWait; |
| } |
| |
| if (error_ && entry_status != EntryStatus::kSync) { |
| // If we are in an error state and the entry is not a "sync" entry, |
| // set the state to error so we do not do any unnecessary work. |
| // |
| // Since the error state takes precedence over the entry state, |
| // we do not also have to set the entry state to error. |
| entry_status = EntryStatus::kError; |
| } |
| |
| if (entry_status == EntryStatus::kInit && context_ == ProcessorContext::kWork) { |
| return ProcessWorkDefault(entry); |
| } |
| |
| if (entry_status == EntryStatus::kPersisted) { |
| if (context_ == ProcessorContext::kWait) { |
| return ProcessWaitDefault(entry); |
| } |
| |
| if (context_ == ProcessorContext::kDelete) { |
| return ProcessDeleteDefault(entry); |
| } |
| } |
| |
| if (entry_status == EntryStatus::kSync) { |
| if (context_ == ProcessorContext::kSync) { |
| return ProcessSyncComplete(entry); |
| } |
| |
| if (context_ != ProcessorContext::kDefault) { |
| return ProcessSyncDefault(entry); |
| } |
| } |
| |
| if (entry_status == EntryStatus::kError) { |
| if (context_ == ProcessorContext::kWork) { |
| return ProcessErrorDefault(); |
| } |
| |
| if (context_ == ProcessorContext::kWait || context_ == ProcessorContext::kDelete) { |
| return ProcessErrorComplete(entry); |
| } |
| } |
| |
| return ProcessUnsupported(); |
| } |
| |
| ProcessResult JournalProcessor::ProcessWorkDefault(JournalEntry* entry) { |
| // If the entry is in the "init" state, we can now prepare its header/commit blocks |
| // in the journal buffer. |
| journal_->PrepareBuffer(entry); |
| EntryStatus last_status = entry->SetStatus(EntryStatus::kWaiting); |
| |
| if (last_status == EntryStatus::kError) { |
| // If the WritebackThread has failed and set our journal entry to an error |
| // state in the time it's taken to prepare the buffer, set error state to |
| // true. If we do not check this and continue having set the status to |
| // kWaiting, we will never get another callback for this journal entry and |
| // we will be stuck forever waiting for it to complete. |
| error_ = true; |
| entry->SetStatus(EntryStatus::kError); |
| } else { |
| ZX_DEBUG_ASSERT(last_status == EntryStatus::kInit); |
| if (work_ == nullptr) { |
| // Prepare a "dummy" work to kick off the writeback queue now that our entry is ready. |
| // This is unnecessary in the case of an error, since the writeback queue will already |
| // be failing all incoming transactions. |
| work_ = journal_->CreateWork(); |
| } |
| } |
| |
| return ProcessResult::kContinue; |
| } |
| |
| ProcessResult JournalProcessor::ProcessWaitDefault(JournalEntry* entry) { |
| EntryStatus last_status = entry->SetStatus(EntryStatus::kWaiting); |
| ZX_DEBUG_ASSERT(last_status == EntryStatus::kPersisted); |
| fbl::unique_ptr<WritebackWork> work = entry->TakeWork(); |
| journal_->EnqueueEntryWork(std::move(work)); |
| return ProcessResult::kContinue; |
| } |
| |
| ProcessResult JournalProcessor::ProcessDeleteDefault(JournalEntry* entry) { |
| if (work_ == nullptr) { |
| // Use this work to enqueue any "delete" transactions we may encounter, |
| // to be written after the info block is updated. |
| work_ = journal_->CreateWork(); |
| } |
| |
| // The entry has now been fully persisted to disk, so we can remove the entry from |
| // the journal. To ensure that it does not later get replayed unnecessarily, clear |
| // out the header and commit blocks. |
| journal_->PrepareDelete(entry, work_.get()); |
| |
| // Track the number of blocks that have been fully processed so we can update the buffer. |
| blocks_processed_ += entry->BlockCount(); |
| |
| // We have fully processed this entry - do not add it to the next queue. |
| return ProcessResult::kRemove; |
| } |
| |
| ProcessResult JournalProcessor::ProcessSyncDefault(JournalEntry* entry) { |
| // This is a sync request. Since there is no actual data to update, |
| // we can just verify it and send it along to the next queue. |
| ZX_DEBUG_ASSERT(entry->BlockCount() == 0); |
| ZX_DEBUG_ASSERT(entry->GetHeaderIndex() == journal_->GetCapacity()); |
| ZX_DEBUG_ASSERT(entry->GetCommitIndex() == journal_->GetCapacity()); |
| |
| // Always push the sync entry into the output queue. |
| return ProcessResult::kContinue; |
| } |
| |
| ProcessResult JournalProcessor::ProcessSyncComplete(JournalEntry* entry) { |
| // Call the default sync method to ensure the entry matches what we expect. |
| ProcessSyncDefault(entry); |
| |
| // Remove and enqueue the sync work. |
| fbl::unique_ptr<WritebackWork> work = entry->TakeWork(); |
| journal_->EnqueueEntryWork(std::move(work)); |
| |
| // The sync entry is complete; do not re-enqueue it. |
| return ProcessResult::kRemove; |
| } |
| |
| ProcessResult JournalProcessor::ProcessErrorDefault() { |
| error_ = true; |
| return ProcessResult::kContinue; |
| } |
| |
| ProcessResult JournalProcessor::ProcessErrorComplete(JournalEntry* entry) { |
| // If we are in an error state, force reset the entry's work. This will remove all |
| // requests and call the sync closure (if it exists), thus completing this entry. |
| entry->ForceReset(); |
| error_ = true; |
| |
| // Since all work is completed for this entry, we no longer need to send it along |
| // to the next queue. Instead proceed to process the next entry. |
| return ProcessResult::kRemove; |
| } |
| |
| ProcessResult JournalProcessor::ProcessUnsupported() { |
| ZX_ASSERT(false); |
| return ProcessResult::kRemove; |
| } |
| |
| } // blobfs |