blob: d57f434be17d610c94f3f022d9763abadf1670cf [file] [log] [blame]
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "snapuserd_core.h"
namespace android {
namespace snapshot {
using namespace android;
using namespace android::dm;
using android::base::unique_fd;
int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
std::vector<const CowOperation*>* replace_zero_vec) {
int num_ops = *pending_ops;
int nr_consecutive = 0;
bool checkOrderedOp = (replace_zero_vec == nullptr);
do {
if (!cowop_iter_->Done() && num_ops) {
const CowOperation* cow_op = &cowop_iter_->Get();
if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
break;
}
*source_offset = cow_op->new_block * BLOCK_SZ;
if (!checkOrderedOp) {
replace_zero_vec->push_back(cow_op);
}
cowop_iter_->Next();
num_ops -= 1;
nr_consecutive = 1;
while (!cowop_iter_->Done() && num_ops) {
const CowOperation* op = &cowop_iter_->Get();
if (checkOrderedOp && !IsOrderedOp(*op)) {
break;
}
uint64_t next_offset = op->new_block * BLOCK_SZ;
if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
break;
}
if (!checkOrderedOp) {
replace_zero_vec->push_back(op);
}
nr_consecutive += 1;
num_ops -= 1;
cowop_iter_->Next();
}
}
} while (0);
return nr_consecutive;
}
bool Worker::MergeReplaceZeroOps() {
// Flush after merging 2MB. Since all ops are independent and there is no
// dependency between COW ops, we will flush the data and the number
// of ops merged in COW block device. If there is a crash, we will
// end up replaying some of the COW ops which were already merged. That is
// ok.
//
// Although increasing this greater than 2MB may help in improving merge
// times; however, on devices with low memory, this can be problematic
// when there are multiple merge threads in parallel.
int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 2;
int num_ops_merged = 0;
SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
while (!cowop_iter_->Done()) {
int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
std::vector<const CowOperation*> replace_zero_vec;
uint64_t source_offset;
int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
if (linear_blocks == 0) {
// Merge complete
CHECK(cowop_iter_->Done());
break;
}
for (size_t i = 0; i < replace_zero_vec.size(); i++) {
const CowOperation* cow_op = replace_zero_vec[i];
if (cow_op->type == kCowReplaceOp) {
if (!ProcessReplaceOp(cow_op)) {
SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
return false;
}
} else {
CHECK(cow_op->type == kCowZeroOp);
if (!ProcessZeroOp()) {
SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
return false;
}
}
bufsink_.UpdateBufferOffset(BLOCK_SZ);
}
size_t io_size = linear_blocks * BLOCK_SZ;
// Merge - Write the contents back to base device
int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
io_size, source_offset));
if (ret < 0 || ret != io_size) {
SNAP_LOG(ERROR)
<< "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
<< " at offset: " << source_offset << " io_size: " << io_size;
return false;
}
num_ops_merged += linear_blocks;
if (num_ops_merged >= total_ops_merged_per_commit) {
// Flush the data
if (fsync(base_path_merge_fd_.get()) < 0) {
SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
return false;
}
// Track the merge completion
if (!snapuserd_->CommitMerge(num_ops_merged)) {
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
return false;
}
num_ops_merged = 0;
}
bufsink_.ResetBufferOffset();
if (snapuserd_->IsIOTerminated()) {
SNAP_LOG(ERROR)
<< "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
return false;
}
}
// Any left over ops not flushed yet.
if (num_ops_merged) {
// Flush the data
if (fsync(base_path_merge_fd_.get()) < 0) {
SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
return false;
}
if (!snapuserd_->CommitMerge(num_ops_merged)) {
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
return false;
}
num_ops_merged = 0;
}
return true;
}
bool Worker::MergeOrderedOpsAsync() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
while (!cowop_iter_->Done()) {
const CowOperation* cow_op = &cowop_iter_->Get();
if (!IsOrderedOp(*cow_op)) {
break;
}
SNAP_LOG(DEBUG) << "Waiting for merge begin...";
// Wait for RA thread to notify that the merge window
// is ready for merging.
if (!snapuserd_->WaitForMergeBegin()) {
return false;
}
snapuserd_->SetMergeInProgress(ra_block_index_);
loff_t offset = 0;
int num_ops = snapuserd_->GetTotalBlocksToMerge();
int pending_sqe = queue_depth_;
int pending_ios_to_submit = 0;
bool flush_required = false;
blocks_merged_in_group_ = 0;
SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
while (num_ops) {
uint64_t source_offset;
int linear_blocks = PrepareMerge(&source_offset, &num_ops);
if (linear_blocks != 0) {
size_t io_size = (linear_blocks * BLOCK_SZ);
// Get an SQE entry from the ring and populate the I/O variables
struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
if (!sqe) {
SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
return false;
}
io_uring_prep_write(sqe, base_path_merge_fd_.get(),
(char*)read_ahead_buffer + offset, io_size, source_offset);
offset += io_size;
num_ops -= linear_blocks;
blocks_merged_in_group_ += linear_blocks;
pending_sqe -= 1;
pending_ios_to_submit += 1;
// These flags are important - We need to make sure that the
// blocks are linked and are written in the same order as
// populated. This is because of overlapping block writes.
//
// If there are no dependency, we can optimize this further by
// allowing parallel writes; but for now, just link all the SQ
// entries.
sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
}
// Ring is full or no more COW ops to be merged in this batch
if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
// If this is a last set of COW ops to be merged in this batch, we need
// to sync the merged data. We will try to grab an SQE entry
// and set the FSYNC command; additionally, make sure that
// the fsync is done after all the I/O operations queued
// in the ring is completed by setting IOSQE_IO_DRAIN.
//
// If there is no space in the ring, we will flush it later
// by explicitly calling fsync() system call.
if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
if (pending_sqe != 0) {
struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
if (!sqe) {
// very unlikely but let's continue and not fail the
// merge - we will flush it later
SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
flush_required = true;
} else {
io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
// Drain the queue before fsync
io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
pending_sqe -= 1;
flush_required = false;
pending_ios_to_submit += 1;
sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
}
} else {
flush_required = true;
}
}
// Submit the IO for all the COW ops in a single syscall
int ret = io_uring_submit(ring_.get());
if (ret != pending_ios_to_submit) {
SNAP_PLOG(ERROR)
<< "io_uring_submit failed for read-ahead: "
<< " io submit: " << ret << " expected: " << pending_ios_to_submit;
return false;
}
int pending_ios_to_complete = pending_ios_to_submit;
pending_ios_to_submit = 0;
bool status = true;
// Reap I/O completions
while (pending_ios_to_complete) {
struct io_uring_cqe* cqe;
// io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
// these error codes are not truly I/O errors; we can retry them
// by re-populating the SQE entries and submitting the I/O
// request back. However, we don't do that now; instead we
// will fallback to synchronous I/O.
ret = io_uring_wait_cqe(ring_.get(), &cqe);
if (ret) {
SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret;
status = false;
break;
}
if (cqe->res < 0) {
SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
status = false;
break;
}
io_uring_cqe_seen(ring_.get(), cqe);
pending_ios_to_complete -= 1;
}
if (!status) {
return false;
}
pending_sqe = queue_depth_;
}
if (linear_blocks == 0) {
break;
}
}
// Verify all ops are merged
CHECK(num_ops == 0);
// Flush the data
if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
SNAP_LOG(ERROR) << " Failed to fsync merged data";
return false;
}
// Merge is done and data is on disk. Update the COW Header about
// the merge completion
if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
return false;
}
SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
// Mark the block as merge complete
snapuserd_->SetMergeCompleted(ra_block_index_);
// Notify RA thread that the merge thread is ready to merge the next
// window
snapuserd_->NotifyRAForMergeReady();
// Get the next block
ra_block_index_ += 1;
}
return true;
}
bool Worker::MergeOrderedOps() {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
SNAP_LOG(INFO) << "MergeOrderedOps started....";
while (!cowop_iter_->Done()) {
const CowOperation* cow_op = &cowop_iter_->Get();
if (!IsOrderedOp(*cow_op)) {
break;
}
SNAP_LOG(DEBUG) << "Waiting for merge begin...";
// Wait for RA thread to notify that the merge window
// is ready for merging.
if (!snapuserd_->WaitForMergeBegin()) {
snapuserd_->SetMergeFailed(ra_block_index_);
return false;
}
snapuserd_->SetMergeInProgress(ra_block_index_);
loff_t offset = 0;
int num_ops = snapuserd_->GetTotalBlocksToMerge();
SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
while (num_ops) {
uint64_t source_offset;
int linear_blocks = PrepareMerge(&source_offset, &num_ops);
if (linear_blocks == 0) {
break;
}
size_t io_size = (linear_blocks * BLOCK_SZ);
// Write to the base device. Data is already in the RA buffer. Note
// that XOR ops is already handled by the RA thread. We just write
// the contents out.
int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
(char*)read_ahead_buffer + offset, io_size,
source_offset));
if (ret < 0 || ret != io_size) {
SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
<< " at offset: " << source_offset << " io_size: " << io_size;
snapuserd_->SetMergeFailed(ra_block_index_);
return false;
}
offset += io_size;
num_ops -= linear_blocks;
}
// Verify all ops are merged
CHECK(num_ops == 0);
// Flush the data
if (fsync(base_path_merge_fd_.get()) < 0) {
SNAP_LOG(ERROR) << " Failed to fsync merged data";
snapuserd_->SetMergeFailed(ra_block_index_);
return false;
}
// Merge is done and data is on disk. Update the COW Header about
// the merge completion
if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
snapuserd_->SetMergeFailed(ra_block_index_);
return false;
}
SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
// Mark the block as merge complete
snapuserd_->SetMergeCompleted(ra_block_index_);
// Notify RA thread that the merge thread is ready to merge the next
// window
snapuserd_->NotifyRAForMergeReady();
// Get the next block
ra_block_index_ += 1;
}
return true;
}
bool Worker::AsyncMerge() {
if (!MergeOrderedOpsAsync()) {
SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
// Reset the iter so that we retry the merge
while (blocks_merged_in_group_ && !cowop_iter_->RDone()) {
cowop_iter_->Prev();
blocks_merged_in_group_ -= 1;
}
return false;
}
SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
return true;
}
bool Worker::SyncMerge() {
if (!MergeOrderedOps()) {
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
return false;
}
SNAP_LOG(INFO) << "MergeOrderedOps completed";
return true;
}
bool Worker::Merge() {
cowop_iter_ = reader_->GetOpIter(true);
bool retry = false;
bool ordered_ops_merge_status;
// Start Async Merge
if (merge_async_) {
ordered_ops_merge_status = AsyncMerge();
if (!ordered_ops_merge_status) {
FinalizeIouring();
retry = true;
merge_async_ = false;
}
}
// Check if we need to fallback and retry the merge
//
// If the device doesn't support async merge, we
// will directly enter here (aka devices with 4.x kernels)
const bool sync_merge_required = (retry || !merge_async_);
if (sync_merge_required) {
ordered_ops_merge_status = SyncMerge();
if (!ordered_ops_merge_status) {
// Merge failed. Device will continue to be mounted
// off snapshots; merge will be retried during
// next reboot
SNAP_LOG(ERROR) << "Merge failed for ordered ops";
snapuserd_->MergeFailed();
return false;
}
}
// Replace and Zero ops
if (!MergeReplaceZeroOps()) {
SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
snapuserd_->MergeFailed();
return false;
}
snapuserd_->MergeCompleted();
return true;
}
bool Worker::InitializeIouring() {
if (!snapuserd_->IsIouringSupported()) {
return false;
}
ring_ = std::make_unique<struct io_uring>();
int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
if (ret) {
LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
return false;
}
merge_async_ = true;
LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
return true;
}
void Worker::FinalizeIouring() {
if (merge_async_) {
io_uring_queue_exit(ring_.get());
}
}
bool Worker::RunMergeThread() {
SNAP_LOG(DEBUG) << "Waiting for merge begin...";
if (!snapuserd_->WaitForMergeBegin()) {
SNAP_LOG(ERROR) << "Merge terminated early...";
return true;
}
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
}
SNAP_LOG(INFO) << "Merge starting..";
if (!Init()) {
SNAP_LOG(ERROR) << "Merge thread initialization failed...";
snapuserd_->MergeFailed();
return false;
}
InitializeIouring();
if (!Merge()) {
return false;
}
FinalizeIouring();
CloseFds();
reader_->CloseCowFd();
SNAP_LOG(INFO) << "Snapshot-Merge completed";
return true;
}
} // namespace snapshot
} // namespace android