blob: 1f8d363f90ba62c9ac3a8d8bdbe9aa19e9d0bc6b [file] [log] [blame]
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "src/lib/storage/vfs/cpp/journal/journal_writer.h"
#include <lib/fit/defer.h>
#include <lib/sync/completion.h>
#include <lib/syslog/cpp/macros.h>
#include <zircon/status.h>
#include <cstdio>
#include "src/lib/storage/vfs/cpp/journal/entry_view.h"
#include "src/lib/storage/vfs/cpp/metrics/events.h"
namespace fs {
namespace {
using storage::OperationType;
} // namespace
namespace internal {
JournalWriter::JournalWriter(TransactionHandler* transaction_handler,
JournalSuperblock journal_superblock, uint64_t journal_start_block,
uint64_t entries_length, std::shared_ptr<JournalMetrics> metrics)
: transaction_handler_(transaction_handler),
journal_superblock_(std::move(journal_superblock)),
metrics_(metrics),
journal_start_block_(journal_start_block),
next_sequence_number_(journal_superblock_.sequence_number()),
next_entry_start_block_(journal_superblock_.start()),
entries_length_(entries_length) {}
JournalWriter::JournalWriter(TransactionHandler* transaction_handler,
std::shared_ptr<JournalMetrics> metrics)
: transaction_handler_(transaction_handler), metrics_(metrics) {}
fpromise::result<void, zx_status_t> JournalWriter::WriteData(JournalWorkItem work) {
auto event = metrics()->NewLatencyEvent(fs_metrics::Event::kJournalWriterWriteData);
uint32_t block_count = 0;
// If any of the data operations we're about to write overlap with in-flight metadata operations,
// then we risk those metadata operations "overwriting" our data blocks on replay.
//
// Before writing data, identify that those metadata blocks should not be replayed.
for (const auto& operation : work.operations) {
range::Range<uint64_t> range(operation.op.dev_offset,
operation.op.dev_offset + operation.op.length);
if (live_metadata_operations_.Overlaps(range)) {
// TODO(smklein): Write "real" revocation records instead of merely updating the info block.
//
// Currently, writing the info block is sufficient to "avoid metadata replay", but this is
// only the case because the JournalWriter is synchronous, single-threaded, and non-caching.
// If we enable asynchronous writeback, emitting revocation records may be a more desirable
// option than "blocking until all prior operations complete, then blocking on writing the
// info block".
zx_status_t status = WriteInfoBlock();
if (status != ZX_OK) {
FX_LOGST(WARNING, "journal") << "Failed to write data: " << zx_status_get_string(status);
return fpromise::error(status);
}
}
block_count += operation.op.length;
}
event.set_block_count(block_count);
zx_status_t status = WriteOperations(work.operations);
if (status != ZX_OK) {
FX_LOGST(WARNING, "journal") << "Failed to write data: " << zx_status_get_string(status);
return fpromise::error(status);
}
event.set_success(true);
return fpromise::ok();
}
fpromise::result<void, zx_status_t> JournalWriter::WriteMetadata(
JournalWorkItem work, std::optional<JournalWorkItem> trim_work) {
const uint64_t block_count = work.reservation.length();
FX_LOGST(DEBUG, "journal") << "WriteMetadata: Writing " << block_count
<< " blocks (includes header, commit)";
auto event = metrics()->NewLatencyEvent(fs_metrics::Event::kJournalWriterWriteMetadata);
event.set_block_count(block_count);
// Ensure the info block is caught up, so it doesn't point to the middle of an invalid entry.
zx_status_t status = WriteInfoBlockIfIntersect(block_count);
if (status != ZX_OK) {
FX_LOGST(WARNING, "journal") << "WriteMetadata: Failed to write info block: "
<< zx_status_get_string(status);
return fpromise::error(status);
}
// Monitor the in-flight metadata operations.
for (const auto& operation : work.operations) {
range::Range<uint64_t> range(operation.op.dev_offset,
operation.op.dev_offset + operation.op.length);
live_metadata_operations_.Insert(std::move(range));
}
// Write metadata to the journal itself.
status = WriteMetadataToJournal(&work);
if (status != ZX_OK) {
FX_LOGST(WARNING, "journal") << "WriteMetadata: Failed to write metadata to journal: "
<< zx_status_get_string(status);
return fpromise::error(status);
}
event.set_success(true);
// We rely on trim going first because the callbacks need to be after the trim operations have
// been submitted (e.g. it's not safe to send new data operations until after trim operations have
// been submitted).
if (trim_work) {
pending_work_items_.push_back(*std::move(trim_work));
}
pending_work_items_.push_back(std::move(work));
return fpromise::ok();
}
zx_status_t JournalWriter::WriteOperationToJournal(const storage::BlockBufferView& view) {
const uint64_t total_block_count = view.length();
const uint64_t max_reservation_size = EntriesLength();
uint64_t written_block_count = 0;
std::vector<storage::BufferedOperation> journal_operations;
storage::BufferedOperation operation;
operation.vmoid = view.vmoid();
operation.op.type = storage::OperationType::kWrite;
// Both the reservation and the on-disk location may wraparound.
while (written_block_count != total_block_count) {
operation.op.vmo_offset = (view.start() + written_block_count) % max_reservation_size;
operation.op.dev_offset = EntriesStartBlock() + next_entry_start_block_;
// The maximum number of blocks that can be written to the journal, on-disk, before needing to
// wrap around.
const uint64_t journal_block_max = EntriesLength() - next_entry_start_block_;
// The maximum number of blocks that can be written from the reservation, in-memory, before
// needing to wrap around.
const uint64_t reservation_block_max = max_reservation_size - operation.op.vmo_offset;
operation.op.length = std::min(total_block_count - written_block_count,
std::min(journal_block_max, reservation_block_max));
journal_operations.push_back(operation);
written_block_count += operation.op.length;
next_entry_start_block_ = (next_entry_start_block_ + operation.op.length) % EntriesLength();
}
zx_status_t status = WriteOperations(journal_operations);
if (status != ZX_OK) {
FX_LOGST(WARNING, "journal") << "JournalWriter::WriteOperationToJournal: Failed to write: "
<< zx_status_get_string(status);
return status;
}
return status;
}
fpromise::result<void, zx_status_t> JournalWriter::Sync() {
auto event = metrics()->NewLatencyEvent(fs_metrics::Event::kJournalWriterSync);
if (!IsWritebackEnabled()) {
return fpromise::error(ZX_ERR_IO_REFUSED);
}
if (next_sequence_number_ == journal_superblock_.sequence_number()) {
FX_LOGST(DEBUG, "journal") << "Sync: Skipping write to info block (no sequence update)";
return fpromise::ok();
}
zx_status_t status = WriteInfoBlock();
if (status != ZX_OK) {
return fpromise::error(status);
}
event.set_success(true);
return fpromise::ok();
}
zx_status_t JournalWriter::WriteMetadataToJournal(JournalWorkItem* work) {
FX_LOGST(DEBUG, "journal") << "WriteMetadataToJournal: Writing " << work->reservation.length()
<< " blocks with sequence_number " << next_sequence_number_;
// Set the header and commit blocks within the journal.
JournalEntryView entry(work->reservation.buffer_view(), work->operations,
next_sequence_number_++);
zx_status_t status = WriteOperationToJournal(work->reservation.buffer_view());
// Although the payload may be encoded while written to the journal, it should be decoded
// when written to the final on-disk location later.
entry.DecodePayloadBlocks();
return status;
}
zx_status_t JournalWriter::WriteInfoBlockIfIntersect(uint64_t block_count) {
// We need to write the info block now if [journal tail, journal tail + block_count) intersects
// with [journal head, journal tail).
//
// Logically, the journal is a circular buffer:
//
// [ ____, ____, ____, ____, ____, ____ ]
//
// Within that buffer, the journal has some entries which will be replayed
//
// Info Block Next Entry Start Block
// | |
// [ ____, head, data, tail, ____, ____ ]
//
// In this diagram, it would be safe to write one, two, or three additional blocks: they would fit
// within the journal. However, if four blocks are written, the journal would "eat its own head":
//
// Info Block
// |
// [ blk3, blk4, data, tail, blk1, blk2 ]
// |
// Collision!
//
// If power failure occurred, replay would be unable to parse prior entries, since the start block
// would point to an invalid entry. However, if we also wrote the info block repeatedly, the
// journaling code would incur a significant write amplification cost.
//
// To compromise, we write the info block before any writes that would trigger this collision.
const uint64_t head = journal_superblock_.start();
const uint64_t tail = next_entry_start_block_;
const uint64_t capacity = EntriesLength();
// It's a little tricky to distinguish between an "empty" and "full" journal, so we observe
// that case explicitly first, using the sequence number to make the distinction.
//
// We require an info block update if the journal is full, but not if it's empty.
bool write_info =
(head == tail) && (next_sequence_number_ != journal_superblock_.sequence_number());
if (!write_info) {
const uint64_t journal_used = (head <= tail) ? (tail - head) : ((capacity - head) + tail);
const uint64_t journal_free = capacity - journal_used;
if (journal_free < block_count) {
FX_LOGST(DEBUG, "journal") << "WriteInfoBlockIfIntersect: Writing info block (can't write "
<< block_count << " blocks)";
write_info = true;
} else {
FX_LOGST(DEBUG, "journal") << "WriteInfoBlockIfIntersect: Not writing info (have "
<< journal_free << ", need " << block_count << " blocks)";
}
}
if (write_info) {
zx_status_t status = WriteInfoBlock();
if (status != ZX_OK) {
FX_LOGST(WARNING, "journal") << "WriteInfoBlockIfIntersect: Failed to write info block";
return status;
}
}
return ZX_OK;
}
zx_status_t JournalWriter::WriteInfoBlock() {
// Before writing the info block, we must make sure any previous metadata writes have been flushed
// because the journal can't replay those writes after writing the info block.
while (!pending_work_items_.empty() || pending_flush_) {
if (auto result = Flush(); result.is_error()) {
return result.take_error();
}
}
auto event = metrics()->NewLatencyEvent(fs_metrics::Event::kJournalWriterWriteInfoBlock);
event.set_block_count(InfoLength());
ZX_DEBUG_ASSERT(next_sequence_number_ > journal_superblock_.sequence_number());
FX_LOGST(DEBUG, "journal") << "WriteInfoBlock: Updating sequence_number from "
<< journal_superblock_.sequence_number() << " to "
<< next_sequence_number_;
ZX_DEBUG_ASSERT(next_entry_start_block_ < EntriesLength());
journal_superblock_.Update(next_entry_start_block_, next_sequence_number_);
std::vector<storage::BufferedOperation> journal_operations;
storage::BufferedOperation operation;
operation.vmoid = journal_superblock_.buffer().vmoid();
operation.op.type = storage::OperationType::kWrite;
operation.op.vmo_offset = 0;
operation.op.dev_offset = InfoStartBlock();
operation.op.length = InfoLength();
journal_operations.push_back(operation);
zx_status_t status = WriteOperations(journal_operations);
if (status != ZX_OK) {
return status;
}
event.set_success(true);
// Immediately after the info block is updated, no metadata operations should be replayed
// on reboot.
live_metadata_operations_.Clear();
return ZX_OK;
}
zx_status_t JournalWriter::WriteOperations(
const std::vector<storage::BufferedOperation>& operations) {
if (!IsWritebackEnabled()) {
FX_LOGST(INFO, "journal")
<< "WriteOperations: Not issuing writeback because writeback is disabled";
return ZX_ERR_IO_REFUSED;
}
zx_status_t status = transaction_handler_->RunRequests(operations);
if (status != ZX_OK) {
FX_LOGST(ERROR, "journal") << "WriteOperations: Failed to write requests " << operations << ": "
<< zx_status_get_string(status) << ". Filesystem now read-only.";
DisableWriteback();
return status;
}
return ZX_OK;
}
fpromise::result<void, zx_status_t> JournalWriter::Flush() {
// In case of success or failure, we want to clear pending_work_items_ because there might be
// cleanup that needs to run buried in the destructors of the callbacks.
auto clean_up = fit::defer([this] { pending_work_items_.clear(); });
if (!IsWritebackEnabled()) {
FX_LOGST(INFO, "journal")
<< "JournalWriter::Flush: Not issuing writeback because writeback is disabled";
return fpromise::error(ZX_ERR_BAD_STATE);
}
if (zx_status_t status = transaction_handler_->Flush(); status != ZX_OK) {
FX_LOGST(WARNING, "journal") << "JournalWriter::Flush: " << zx_status_get_string(status);
DisableWriteback();
return fpromise::error(status);
}
pending_flush_ = false;
for (JournalWorkItem& w : pending_work_items_) {
// Move, so that we release resources when this goes out of scope.
JournalWorkItem work = std::move(w);
if (work.commit_callback) {
work.commit_callback();
}
zx_status_t status = WriteOperations(work.operations);
if (status != ZX_OK) {
FX_LOGST(WARNING, "journal")
<< "Flush: Failed to write metadata to final location: " << zx_status_get_string(status);
// WriteOperations will mark things so that all subsequent writes will fail.
return fpromise::error(status);
}
// Before we can write the info block, we need another flush to ensure the writes to final
// locations will persist.
pending_flush_ = true;
// We've written the metadata so reads will work now and we can call complete_callback.
if (work.complete_callback) {
work.complete_callback();
}
}
return fpromise::ok();
}
} // namespace internal
} // namespace fs