| /* |
| * Copyright (C) 2021 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "snapuserd_core.h" |
| |
| namespace android { |
| namespace snapshot { |
| |
| using namespace android; |
| using namespace android::dm; |
| using android::base::unique_fd; |
| |
| int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, |
| std::vector<const CowOperation*>* replace_zero_vec) { |
| int num_ops = *pending_ops; |
| int nr_consecutive = 0; |
| bool checkOrderedOp = (replace_zero_vec == nullptr); |
| |
| do { |
| if (!cowop_iter_->Done() && num_ops) { |
| const CowOperation* cow_op = &cowop_iter_->Get(); |
| if (checkOrderedOp && !IsOrderedOp(*cow_op)) { |
| break; |
| } |
| |
| *source_offset = cow_op->new_block * BLOCK_SZ; |
| if (!checkOrderedOp) { |
| replace_zero_vec->push_back(cow_op); |
| } |
| |
| cowop_iter_->Next(); |
| num_ops -= 1; |
| nr_consecutive = 1; |
| |
| while (!cowop_iter_->Done() && num_ops) { |
| const CowOperation* op = &cowop_iter_->Get(); |
| if (checkOrderedOp && !IsOrderedOp(*op)) { |
| break; |
| } |
| |
| uint64_t next_offset = op->new_block * BLOCK_SZ; |
| if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) { |
| break; |
| } |
| |
| if (!checkOrderedOp) { |
| replace_zero_vec->push_back(op); |
| } |
| |
| nr_consecutive += 1; |
| num_ops -= 1; |
| cowop_iter_->Next(); |
| } |
| } |
| } while (0); |
| |
| return nr_consecutive; |
| } |
| |
| bool Worker::MergeReplaceZeroOps() { |
| // Flush after merging 2MB. Since all ops are independent and there is no |
| // dependency between COW ops, we will flush the data and the number |
| // of ops merged in COW block device. If there is a crash, we will |
| // end up replaying some of the COW ops which were already merged. That is |
| // ok. |
| // |
| // Although increasing this greater than 2MB may help in improving merge |
| // times; however, on devices with low memory, this can be problematic |
| // when there are multiple merge threads in parallel. |
| int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 2; |
| int num_ops_merged = 0; |
| |
| SNAP_LOG(INFO) << "MergeReplaceZeroOps started...."; |
| |
| while (!cowop_iter_->Done()) { |
| int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ; |
| std::vector<const CowOperation*> replace_zero_vec; |
| uint64_t source_offset; |
| |
| int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec); |
| if (linear_blocks == 0) { |
| // Merge complete |
| CHECK(cowop_iter_->Done()); |
| break; |
| } |
| |
| for (size_t i = 0; i < replace_zero_vec.size(); i++) { |
| const CowOperation* cow_op = replace_zero_vec[i]; |
| if (cow_op->type == kCowReplaceOp) { |
| if (!ProcessReplaceOp(cow_op)) { |
| SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block; |
| return false; |
| } |
| } else { |
| CHECK(cow_op->type == kCowZeroOp); |
| if (!ProcessZeroOp()) { |
| SNAP_LOG(ERROR) << "Merge ZeroOp failed."; |
| return false; |
| } |
| } |
| |
| bufsink_.UpdateBufferOffset(BLOCK_SZ); |
| } |
| |
| size_t io_size = linear_blocks * BLOCK_SZ; |
| |
| // Merge - Write the contents back to base device |
| int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), |
| io_size, source_offset)); |
| if (ret < 0 || ret != io_size) { |
| SNAP_LOG(ERROR) |
| << "Merge: ReplaceZeroOps: Failed to write to backing device while merging " |
| << " at offset: " << source_offset << " io_size: " << io_size; |
| return false; |
| } |
| |
| num_ops_merged += linear_blocks; |
| |
| if (num_ops_merged >= total_ops_merged_per_commit) { |
| // Flush the data |
| if (fsync(base_path_merge_fd_.get()) < 0) { |
| SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data"; |
| return false; |
| } |
| |
| // Track the merge completion |
| if (!snapuserd_->CommitMerge(num_ops_merged)) { |
| SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; |
| return false; |
| } |
| |
| num_ops_merged = 0; |
| } |
| |
| bufsink_.ResetBufferOffset(); |
| |
| if (snapuserd_->IsIOTerminated()) { |
| SNAP_LOG(ERROR) |
| << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge"; |
| return false; |
| } |
| } |
| |
| // Any left over ops not flushed yet. |
| if (num_ops_merged) { |
| // Flush the data |
| if (fsync(base_path_merge_fd_.get()) < 0) { |
| SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data"; |
| return false; |
| } |
| |
| if (!snapuserd_->CommitMerge(num_ops_merged)) { |
| SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; |
| return false; |
| } |
| |
| num_ops_merged = 0; |
| } |
| |
| return true; |
| } |
| |
| bool Worker::MergeOrderedOpsAsync() { |
| void* mapped_addr = snapuserd_->GetMappedAddr(); |
| void* read_ahead_buffer = |
| static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); |
| |
| SNAP_LOG(INFO) << "MergeOrderedOpsAsync started...."; |
| |
| while (!cowop_iter_->Done()) { |
| const CowOperation* cow_op = &cowop_iter_->Get(); |
| if (!IsOrderedOp(*cow_op)) { |
| break; |
| } |
| |
| SNAP_LOG(DEBUG) << "Waiting for merge begin..."; |
| // Wait for RA thread to notify that the merge window |
| // is ready for merging. |
| if (!snapuserd_->WaitForMergeBegin()) { |
| return false; |
| } |
| |
| snapuserd_->SetMergeInProgress(ra_block_index_); |
| |
| loff_t offset = 0; |
| int num_ops = snapuserd_->GetTotalBlocksToMerge(); |
| |
| int pending_sqe = queue_depth_; |
| int pending_ios_to_submit = 0; |
| bool flush_required = false; |
| blocks_merged_in_group_ = 0; |
| |
| SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; |
| while (num_ops) { |
| uint64_t source_offset; |
| |
| int linear_blocks = PrepareMerge(&source_offset, &num_ops); |
| |
| if (linear_blocks != 0) { |
| size_t io_size = (linear_blocks * BLOCK_SZ); |
| |
| // Get an SQE entry from the ring and populate the I/O variables |
| struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get()); |
| if (!sqe) { |
| SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops"; |
| return false; |
| } |
| |
| io_uring_prep_write(sqe, base_path_merge_fd_.get(), |
| (char*)read_ahead_buffer + offset, io_size, source_offset); |
| |
| offset += io_size; |
| num_ops -= linear_blocks; |
| blocks_merged_in_group_ += linear_blocks; |
| |
| pending_sqe -= 1; |
| pending_ios_to_submit += 1; |
| // These flags are important - We need to make sure that the |
| // blocks are linked and are written in the same order as |
| // populated. This is because of overlapping block writes. |
| // |
| // If there are no dependency, we can optimize this further by |
| // allowing parallel writes; but for now, just link all the SQ |
| // entries. |
| sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); |
| } |
| |
| // Ring is full or no more COW ops to be merged in this batch |
| if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) { |
| // If this is a last set of COW ops to be merged in this batch, we need |
| // to sync the merged data. We will try to grab an SQE entry |
| // and set the FSYNC command; additionally, make sure that |
| // the fsync is done after all the I/O operations queued |
| // in the ring is completed by setting IOSQE_IO_DRAIN. |
| // |
| // If there is no space in the ring, we will flush it later |
| // by explicitly calling fsync() system call. |
| if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) { |
| if (pending_sqe != 0) { |
| struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get()); |
| if (!sqe) { |
| // very unlikely but let's continue and not fail the |
| // merge - we will flush it later |
| SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops"; |
| flush_required = true; |
| } else { |
| io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0); |
| // Drain the queue before fsync |
| io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN); |
| pending_sqe -= 1; |
| flush_required = false; |
| pending_ios_to_submit += 1; |
| sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); |
| } |
| } else { |
| flush_required = true; |
| } |
| } |
| |
| // Submit the IO for all the COW ops in a single syscall |
| int ret = io_uring_submit(ring_.get()); |
| if (ret != pending_ios_to_submit) { |
| SNAP_PLOG(ERROR) |
| << "io_uring_submit failed for read-ahead: " |
| << " io submit: " << ret << " expected: " << pending_ios_to_submit; |
| return false; |
| } |
| |
| int pending_ios_to_complete = pending_ios_to_submit; |
| pending_ios_to_submit = 0; |
| |
| bool status = true; |
| |
| // Reap I/O completions |
| while (pending_ios_to_complete) { |
| struct io_uring_cqe* cqe; |
| |
| // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR; |
| // these error codes are not truly I/O errors; we can retry them |
| // by re-populating the SQE entries and submitting the I/O |
| // request back. However, we don't do that now; instead we |
| // will fallback to synchronous I/O. |
| ret = io_uring_wait_cqe(ring_.get(), &cqe); |
| if (ret) { |
| SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret; |
| status = false; |
| break; |
| } |
| |
| if (cqe->res < 0) { |
| SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res; |
| status = false; |
| break; |
| } |
| |
| io_uring_cqe_seen(ring_.get(), cqe); |
| pending_ios_to_complete -= 1; |
| } |
| |
| if (!status) { |
| return false; |
| } |
| |
| pending_sqe = queue_depth_; |
| } |
| |
| if (linear_blocks == 0) { |
| break; |
| } |
| } |
| |
| // Verify all ops are merged |
| CHECK(num_ops == 0); |
| |
| // Flush the data |
| if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) { |
| SNAP_LOG(ERROR) << " Failed to fsync merged data"; |
| return false; |
| } |
| |
| // Merge is done and data is on disk. Update the COW Header about |
| // the merge completion |
| if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { |
| SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; |
| return false; |
| } |
| |
| SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); |
| |
| // Mark the block as merge complete |
| snapuserd_->SetMergeCompleted(ra_block_index_); |
| |
| // Notify RA thread that the merge thread is ready to merge the next |
| // window |
| snapuserd_->NotifyRAForMergeReady(); |
| |
| // Get the next block |
| ra_block_index_ += 1; |
| } |
| |
| return true; |
| } |
| |
| bool Worker::MergeOrderedOps() { |
| void* mapped_addr = snapuserd_->GetMappedAddr(); |
| void* read_ahead_buffer = |
| static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); |
| |
| SNAP_LOG(INFO) << "MergeOrderedOps started...."; |
| |
| while (!cowop_iter_->Done()) { |
| const CowOperation* cow_op = &cowop_iter_->Get(); |
| if (!IsOrderedOp(*cow_op)) { |
| break; |
| } |
| |
| SNAP_LOG(DEBUG) << "Waiting for merge begin..."; |
| // Wait for RA thread to notify that the merge window |
| // is ready for merging. |
| if (!snapuserd_->WaitForMergeBegin()) { |
| snapuserd_->SetMergeFailed(ra_block_index_); |
| return false; |
| } |
| |
| snapuserd_->SetMergeInProgress(ra_block_index_); |
| |
| loff_t offset = 0; |
| int num_ops = snapuserd_->GetTotalBlocksToMerge(); |
| SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; |
| while (num_ops) { |
| uint64_t source_offset; |
| |
| int linear_blocks = PrepareMerge(&source_offset, &num_ops); |
| if (linear_blocks == 0) { |
| break; |
| } |
| |
| size_t io_size = (linear_blocks * BLOCK_SZ); |
| // Write to the base device. Data is already in the RA buffer. Note |
| // that XOR ops is already handled by the RA thread. We just write |
| // the contents out. |
| int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), |
| (char*)read_ahead_buffer + offset, io_size, |
| source_offset)); |
| if (ret < 0 || ret != io_size) { |
| SNAP_LOG(ERROR) << "Failed to write to backing device while merging " |
| << " at offset: " << source_offset << " io_size: " << io_size; |
| snapuserd_->SetMergeFailed(ra_block_index_); |
| return false; |
| } |
| |
| offset += io_size; |
| num_ops -= linear_blocks; |
| } |
| |
| // Verify all ops are merged |
| CHECK(num_ops == 0); |
| |
| // Flush the data |
| if (fsync(base_path_merge_fd_.get()) < 0) { |
| SNAP_LOG(ERROR) << " Failed to fsync merged data"; |
| snapuserd_->SetMergeFailed(ra_block_index_); |
| return false; |
| } |
| |
| // Merge is done and data is on disk. Update the COW Header about |
| // the merge completion |
| if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { |
| SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; |
| snapuserd_->SetMergeFailed(ra_block_index_); |
| return false; |
| } |
| |
| SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); |
| // Mark the block as merge complete |
| snapuserd_->SetMergeCompleted(ra_block_index_); |
| |
| // Notify RA thread that the merge thread is ready to merge the next |
| // window |
| snapuserd_->NotifyRAForMergeReady(); |
| |
| // Get the next block |
| ra_block_index_ += 1; |
| } |
| |
| return true; |
| } |
| |
| bool Worker::AsyncMerge() { |
| if (!MergeOrderedOpsAsync()) { |
| SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O"; |
| // Reset the iter so that we retry the merge |
| while (blocks_merged_in_group_ && !cowop_iter_->RDone()) { |
| cowop_iter_->Prev(); |
| blocks_merged_in_group_ -= 1; |
| } |
| |
| return false; |
| } |
| |
| SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed"; |
| return true; |
| } |
| |
| bool Worker::SyncMerge() { |
| if (!MergeOrderedOps()) { |
| SNAP_LOG(ERROR) << "Merge failed for ordered ops"; |
| return false; |
| } |
| |
| SNAP_LOG(INFO) << "MergeOrderedOps completed"; |
| return true; |
| } |
| |
| bool Worker::Merge() { |
| cowop_iter_ = reader_->GetOpIter(true); |
| |
| bool retry = false; |
| bool ordered_ops_merge_status; |
| |
| // Start Async Merge |
| if (merge_async_) { |
| ordered_ops_merge_status = AsyncMerge(); |
| if (!ordered_ops_merge_status) { |
| FinalizeIouring(); |
| retry = true; |
| merge_async_ = false; |
| } |
| } |
| |
| // Check if we need to fallback and retry the merge |
| // |
| // If the device doesn't support async merge, we |
| // will directly enter here (aka devices with 4.x kernels) |
| const bool sync_merge_required = (retry || !merge_async_); |
| |
| if (sync_merge_required) { |
| ordered_ops_merge_status = SyncMerge(); |
| if (!ordered_ops_merge_status) { |
| // Merge failed. Device will continue to be mounted |
| // off snapshots; merge will be retried during |
| // next reboot |
| SNAP_LOG(ERROR) << "Merge failed for ordered ops"; |
| snapuserd_->MergeFailed(); |
| return false; |
| } |
| } |
| |
| // Replace and Zero ops |
| if (!MergeReplaceZeroOps()) { |
| SNAP_LOG(ERROR) << "Merge failed for replace/zero ops"; |
| snapuserd_->MergeFailed(); |
| return false; |
| } |
| |
| snapuserd_->MergeCompleted(); |
| |
| return true; |
| } |
| |
| bool Worker::InitializeIouring() { |
| if (!snapuserd_->IsIouringSupported()) { |
| return false; |
| } |
| |
| ring_ = std::make_unique<struct io_uring>(); |
| |
| int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0); |
| if (ret) { |
| LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret; |
| return false; |
| } |
| |
| merge_async_ = true; |
| |
| LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_; |
| return true; |
| } |
| |
| void Worker::FinalizeIouring() { |
| if (merge_async_) { |
| io_uring_queue_exit(ring_.get()); |
| } |
| } |
| |
| bool Worker::RunMergeThread() { |
| SNAP_LOG(DEBUG) << "Waiting for merge begin..."; |
| if (!snapuserd_->WaitForMergeBegin()) { |
| SNAP_LOG(ERROR) << "Merge terminated early..."; |
| return true; |
| } |
| |
| if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) { |
| SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid(); |
| } |
| |
| SNAP_LOG(INFO) << "Merge starting.."; |
| |
| if (!Init()) { |
| SNAP_LOG(ERROR) << "Merge thread initialization failed..."; |
| snapuserd_->MergeFailed(); |
| return false; |
| } |
| |
| InitializeIouring(); |
| |
| if (!Merge()) { |
| return false; |
| } |
| |
| FinalizeIouring(); |
| CloseFds(); |
| reader_->CloseCowFd(); |
| |
| SNAP_LOG(INFO) << "Snapshot-Merge completed"; |
| |
| return true; |
| } |
| |
| } // namespace snapshot |
| } // namespace android |