| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| // Copyright 2016 The Fuchsia Authors. All rights reserved. |
| // Redistribution and use in source and binary forms, with or without |
| // modification, are permitted provided that the following conditions are |
| // met: |
| // |
| // * Redistributions of source code must retain the above copyright |
| // notice, this list of conditions and the following disclaimer. |
| // * Redistributions in binary form must reproduce the above |
| // copyright notice, this list of conditions and the following disclaimer |
| // in the documentation and/or other materials provided with the |
| // distribution. |
| // * Neither the name of Google Inc. nor the names of its |
| // contributors may be used to endorse or promote products derived from |
| // this software without specific prior written permission. |
| // |
| // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| #include "util/env_fuchsia.h" |
| |
| #include <fcntl.h> |
| #include <fuchsia/io/cpp/fidl.h> |
| #include <lib/async-loop/cpp/loop.h> |
| #include <lib/async-loop/default.h> |
| #include <lib/async/cpp/task.h> |
| #include <lib/fdio/fd.h> |
| #include <lib/fdio/vfs.h> |
| #include <lib/fit/defer.h> |
| #include <lib/syslog/global.h> |
| #include <pthread.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/time.h> |
| #include <sys/types.h> |
| #include <time.h> |
| #include <unistd.h> |
| #include <zircon/assert.h> |
| #include <zircon/status.h> |
| #include <zircon/syscalls.h> |
| #include <zircon/types.h> |
| |
| #include <set> |
| #include <thread> |
| #include <vector> |
| |
| #include "leveldb/env.h" |
| #include "leveldb/slice.h" |
| #include "port/port.h" |
| #include "port/thread_annotations.h" |
| #include "util/logging.h" |
| #include "util/mutexlock.h" |
| |
| namespace leveldb { |
| |
| namespace { |
| |
| #define LOG_TAG "leveldb" |
| |
| // Encode a zx_status_t as a LevelDB Status |
| static Status FuchsiaZxError(const std::string& context, zx_status_t status) { |
| if (status == ZX_ERR_NOT_FOUND) { |
| return Status::NotFound(context, "File not found"); |
| } else { |
| return Status::IOError(context, zx_status_get_string(status)); |
| } |
| } |
| |
| // Implementation from fuchsia/src/lib/fsl/io/fd.cc |
| zx::channel CloneChannelFromFileDescriptor(int fd) { |
| zx::handle handle; |
| zx_status_t status = fdio_fd_clone(fd, handle.reset_and_get_address()); |
| if (status != ZX_OK) { |
| return zx::channel(); |
| } |
| |
| zx_info_handle_basic_t info = {}; |
| status = handle.get_info(ZX_INFO_HANDLE_BASIC, &info, sizeof(info), NULL, NULL); |
| |
| if (status != ZX_OK || info.type != ZX_OBJ_TYPE_CHANNEL) { |
| return zx::channel(); |
| } |
| |
| return zx::channel(std::move(handle)); |
| } |
| |
| // SyncWaiter implements a basic async dispatcher that only supports one wait at |
| // a time. Users of a SyncWaiter can (synchronously) let events be dispatched |
| // until a certain condition happens. In case of error, clients can use SetError |
| // to force an early exit. |
| |
| // A SyncWaiter can only be used by one thread at a time. Callbacks will be |
| // called on the thread that currently runs the SyncWaiter. SyncWaiter is safe |
| // to use with fidl::InterfacePtr if the calling thread owns the pointers and |
| // all associated resources. |
| class SyncWaiter { |
| public: |
| // |error_context| is used as a prefix for error messages |
| // It should oulive the SyncWaiter |
| SyncWaiter(const std::string* error_context) |
| : dispatcher_({{&kOps}, this}), |
| wait_(nullptr), |
| error_context_(error_context){}; |
| SyncWaiter(const SyncWaiter&) = delete; |
| |
| ~SyncWaiter() { |
| if (wait_ != nullptr) { |
| wait_->handler(async_dispatcher(), wait_, ZX_ERR_CANCELED, nullptr); |
| } |
| } |
| |
| // Returns the async dispatcher associated to this object |
| async_dispatcher_t* async_dispatcher() { return &dispatcher_.dispatcher; } |
| |
| // Quit early with an error |
| void SetError(Status s) { pending_error_ = s; } |
| |
| Status status() { return pending_error_; } |
| |
| // Handle events until |can_proceed()| returns true |
| Status RunUntil(fit::function<bool()> can_proceed) { |
| Status s = Run(zx::time(0)); |
| if (!s.ok()) { |
| return s; |
| } |
| while (!can_proceed()) { |
| s = Run(zx::time::infinite()); |
| if (!s.ok()) { |
| return s; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Handle events until |Done()| is called |
| Status RunUntilDone() { |
| is_done_ = false; |
| return RunUntil([this] { return is_done_; }); |
| } |
| |
| // Interrupts the |RunUntilDone()| loop |
| void Done() { is_done_ = true; } |
| |
| private: |
| // Wait for events until |deadline| |
| Status Run(zx::time deadline) { |
| if (!pending_error_.ok()) { |
| return pending_error_; |
| } |
| if (!wait_) { |
| if (deadline == zx::time(0)) { |
| return Status::OK(); |
| } else { |
| return Status::IOError(*error_context_, "No work to do"); |
| } |
| } |
| |
| zx_signals_t observed; |
| zx_status_t wait_status = zx_object_wait_one(wait_->object, wait_->trigger, |
| deadline.get(), &observed); |
| if (wait_status == ZX_ERR_TIMED_OUT) { |
| return Status::OK(); |
| } |
| if (wait_status != ZX_OK) { |
| return FuchsiaZxError(*error_context_, wait_status); |
| } |
| zx_packet_signal_t signals = {wait_->trigger, observed, 1}; |
| async_wait_t* wait = wait_; |
| wait_ = nullptr; |
| wait->handler(async_dispatcher(), wait, ZX_OK, &signals); |
| return pending_error_; |
| } |
| |
| zx_status_t BeginWait(async_wait_t* wait) { |
| if (wait_ != nullptr) { |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| wait_ = wait; |
| return ZX_OK; |
| } |
| |
| zx_status_t CancelWait(async_wait_t* wait) { |
| if (wait != wait_ || wait == nullptr) { |
| return ZX_ERR_NOT_FOUND; |
| } |
| wait_ = nullptr; |
| return ZX_OK; |
| } |
| |
| using dispatcher = struct { |
| async_dispatcher_t dispatcher; |
| SyncWaiter* self; |
| }; |
| |
| static zx_time_t now(async_dispatcher_t* dispatcher) { |
| return zx_clock_get_monotonic(); |
| } |
| static zx_status_t begin_wait(async_dispatcher_t* dispatcher, |
| async_wait_t* wait) { |
| return reinterpret_cast<leveldb::SyncWaiter::dispatcher*>(dispatcher) |
| ->self->BeginWait(wait); |
| } |
| static zx_status_t cancel_wait(async_dispatcher_t* dispatcher, |
| async_wait_t* wait) { |
| return reinterpret_cast<leveldb::SyncWaiter::dispatcher*>(dispatcher) |
| ->self->CancelWait(wait); |
| } |
| static zx_status_t post_task(async_dispatcher_t* dispatcher, |
| async_task_t* task) { |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| static zx_status_t cancel_task(async_dispatcher_t* dispatcher, |
| async_task_t* task) { |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| static zx_status_t queue_packet(async_dispatcher_t* dispatcher, |
| async_receiver_t* receiver, |
| const zx_packet_user_t* data) { |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| static zx_status_t set_guest_bell_trap(async_dispatcher_t* dispatcher, |
| async_guest_bell_trap_t* trap, |
| zx_handle_t guest, zx_vaddr_t addr, |
| size_t length) { |
| return ZX_ERR_NOT_SUPPORTED; |
| } |
| |
| constexpr static async_ops_t kOps = { |
| ASYNC_OPS_V1, |
| 0, |
| {now, begin_wait, cancel_wait, post_task, cancel_task, queue_packet, |
| set_guest_bell_trap}}; |
| // The async dispatcher associated to this object. |
| dispatcher dispatcher_; |
| // A waiter set through the async dispatcher. |
| async_wait_t* wait_ = nullptr; |
| // Set to an error if the client wants to exit early. |
| Status pending_error_ = Status::OK(); |
| // A context provided for errors. It must outlive the object. |
| const std::string* error_context_; |
| // Set by Done() to indicate that the loop should terminate. |
| bool is_done_; |
| }; |
| |
| // Open a filesystem node at path |name| relative to a directory |root_dir|. |
| // The parameters |flags| and |mode| correspond to the parameters of |
| // fuchsia::io::Directory::Open. The resulting node handle is returned in |
| // |out_handle|. This function adds the OPEN_FLAG_DESCRIBE flag to be able to |
| // get errors back on the channel (via the OnOpen event) and a description of |
| // the type of node that was open. If opening is successful, Status::OK() is |
| // returned the resulting NodeInfo structure is returned in |
| // |out_node_info|. Otherwise, an error is returned. |
| Status OpenAndDescribe(const fuchsia::io::DirectorySyncPtr& root_dir, |
| const std::string& name, uint32_t flags, uint32_t mode, |
| fidl::InterfaceHandle<fuchsia::io::Node>* out_handle, |
| fuchsia::io::NodeInfo* out_node_info) { |
| // We want to receive the description of the object we open |
| flags |= fuchsia::io::OPEN_FLAG_DESCRIBE; |
| |
| // NodeSyncPtr has no support for receiving OnOpen |
| // We use the SyncWaiter to take care of this |
| SyncWaiter sync_waiter(&name); |
| fuchsia::io::NodePtr node; |
| node.set_error_handler([&sync_waiter, &name](zx_status_t node_status) { |
| sync_waiter.SetError(FuchsiaZxError(name, node_status)); |
| }); |
| node.events().OnOpen = [&sync_waiter, &name, out_node_info]( |
| zx_status_t open_status, |
| std::unique_ptr<fuchsia::io::NodeInfo> node_info) { |
| if (open_status != ZX_OK) { |
| sync_waiter.SetError(FuchsiaZxError(name, open_status)); |
| return; |
| } |
| if (!node_info) { |
| sync_waiter.SetError(Status::IOError(name, "Expected a NodeInfo")); |
| return; |
| } |
| *out_node_info = std::move(*node_info); |
| sync_waiter.Done(); |
| }; |
| zx_status_t fidl_status = root_dir->Open( |
| flags, mode, name, node.NewRequest(sync_waiter.async_dispatcher())); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(name, fidl_status); |
| } |
| Status s = sync_waiter.RunUntilDone(); |
| if (!s.ok()) { |
| return s; |
| } |
| *out_handle = node.Unbind(); |
| return Status::OK(); |
| } |
| |
| // Open a node at path |name| relative to |root_dir| and ensure it is a file. On |
| // success, the resulting channel is bound to |file| unless |file| is null. |
| Status OpenFileAt(const fuchsia::io::DirectorySyncPtr& root_dir, |
| const std::string& fname, uint32_t flags, |
| fuchsia::io::FileSyncPtr* file) { |
| fuchsia::io::NodeInfo node_info; |
| fidl::InterfaceHandle<fuchsia::io::Node> file_handle; |
| Status s = |
| OpenAndDescribe(root_dir, fname, flags, fuchsia::io::MODE_TYPE_FILE, |
| &file_handle, &node_info); |
| if (!s.ok()) { |
| return s; |
| } |
| if (!node_info.is_file()) { |
| return FuchsiaZxError(fname, ZX_ERR_NOT_FILE); |
| } |
| if (file != nullptr) { |
| // Cast the fuchsia::io::Node to fuchsia::io::File by moving the channel. |
| file->Bind(file_handle.TakeChannel()); |
| } |
| return Status::OK(); |
| } |
| |
| // Open a node at path |name| relative to |root_dir| and ensure it is a |
| // directory. On success, the resulting channel is bound to |file| unless |file| |
| // is null. |
| Status OpenDirAt(const fuchsia::io::DirectorySyncPtr& root_dir, |
| const std::string& dname, uint32_t flags, |
| fuchsia::io::DirectorySyncPtr* dir) { |
| fidl::InterfaceHandle<fuchsia::io::Node> dir_handle; |
| fuchsia::io::NodeInfo node_info; |
| Status s = OpenAndDescribe( |
| root_dir, dname, flags | fuchsia::io::OPEN_FLAG_DIRECTORY, |
| fuchsia::io::MODE_TYPE_DIRECTORY, &dir_handle, &node_info); |
| if (!s.ok()) { |
| return s; |
| } |
| // Given our Open mode, the channel must be a directory |
| if (dir != nullptr) { |
| // Case the fuchsia::io::Node to fuchsia::io::Directory by moving the |
| // channel. |
| dir->Bind(dir_handle.TakeChannel()); |
| } |
| return Status::OK(); |
| } |
| |
| class FuchsiaFileLogger : public Logger { |
| private: |
| std::unique_ptr<WritableFile> file_; |
| uint64_t (*gettid_)(); // Return the thread id for the current thread |
| public: |
| FuchsiaFileLogger(std::unique_ptr<WritableFile> f, uint64_t (*gettid)()) |
| : file_(std::move(f)), gettid_(gettid) {} |
| |
| void Logv(const char* format, va_list ap) override { |
| const uint64_t thread_id = (*gettid_)(); |
| |
| // We try twice: the first time with a fixed-size stack allocated buffer, |
| // and the second time with a much larger dynamically allocated buffer. |
| char buffer[500]; |
| for (int iter = 0; iter < 2; iter++) { |
| char* base; |
| int bufsize; |
| if (iter == 0) { |
| bufsize = sizeof(buffer); |
| base = buffer; |
| } else { |
| bufsize = 30000; |
| base = new char[bufsize]; |
| } |
| char* p = base; |
| char* limit = base + bufsize; |
| |
| struct timeval now_tv; |
| gettimeofday(&now_tv, nullptr); |
| p += |
| snprintf(p, limit - p, "%06d %llx ", static_cast<int>(now_tv.tv_usec), |
| static_cast<long long unsigned int>(thread_id)); |
| |
| // Print the message |
| if (p < limit) { |
| va_list backup_ap; |
| va_copy(backup_ap, ap); |
| p += vsnprintf(p, limit - p, format, backup_ap); |
| va_end(backup_ap); |
| } |
| |
| // Truncate to available space if necessary |
| if (p >= limit) { |
| if (iter == 0) { |
| continue; // Try again with larger buffer |
| } else { |
| p = limit - 1; |
| } |
| } |
| |
| // Add newline if necessary |
| if (p == base || p[-1] != '\n') { |
| *p++ = '\n'; |
| } |
| |
| assert(p <= limit); |
| file_->Append(Slice(base, p - base)); |
| file_->Flush(); |
| if (base != buffer) { |
| delete[] base; |
| } |
| break; |
| } |
| } |
| }; |
| |
| // An extended version of RandomAccessFile that allows accessing the size of the |
| // file |
| class RandomAccessFileWithSize : public RandomAccessFile { |
| public: |
| // Returns the size of the file in bytes |
| virtual size_t size() = 0; |
| }; |
| |
| // Wraps a RandomAccessFileWithSize to provide the SequentialFile interface |
| class SequentialFileWrapper : public SequentialFile { |
| private: |
| // The total size of the file |
| size_t file_size_; |
| // The current position in the file |
| size_t position_; |
| // The underlying RandomAccessFile |
| std::unique_ptr<RandomAccessFileWithSize> file_; |
| |
| public: |
| SequentialFileWrapper(std::unique_ptr<RandomAccessFileWithSize> file) |
| : file_size_(file->size()), position_(0), file_(std::move(file)) {} |
| |
| ~SequentialFileWrapper() override {} |
| |
| Status Skip(uint64_t n) override { |
| if (position_ + n < position_ || position_ + n > file_size_) { |
| position_ = file_size_; |
| } else { |
| position_ += n; |
| } |
| return Status::OK(); |
| } |
| |
| Status Read(size_t n, Slice* result, char* scratch) override { |
| Status s = file_->Read(position_, n, result, scratch); |
| if (!s.ok()) { |
| return s; |
| } |
| position_ += result->size(); |
| return Status::OK(); |
| } |
| }; |
| |
| // A RandomAccessFile based on FIDL IO |
| class FuchsiaRandomAccessFile : public RandomAccessFileWithSize { |
| private: |
| std::string filename_; |
| fuchsia::io::FileSyncPtr file_; |
| size_t size_; |
| |
| public: |
| FuchsiaRandomAccessFile(const std::string& fname, |
| fuchsia::io::FileSyncPtr file, size_t size) |
| : filename_(fname), file_(std::move(file)), size_(size) {} |
| |
| ~FuchsiaRandomAccessFile() override { |
| zx_status_t out_s; |
| file_->Close(&out_s); |
| } |
| |
| Status Read(uint64_t offset, size_t n, Slice* result, |
| char* scratch) const override { |
| // We can't receive more than MAX_BUF bytes over the FIDL channel, so we |
| // have to do a manual pagination. |
| size_t bytes_read = 0; |
| while (bytes_read < n) { |
| size_t step = std::min(n - bytes_read, fuchsia::io::MAX_BUF); |
| zx_status_t read_status; |
| std::vector<uint8_t> read_data; |
| zx_status_t fidl_status = |
| file_->ReadAt(step, offset + bytes_read, &read_status, &read_data); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(filename_, fidl_status); |
| } |
| if (read_status != ZX_OK) { |
| return FuchsiaZxError(filename_, read_status); |
| } |
| if (read_data.size() > step) { |
| return Status::IOError(filename_, "Received more bytes than read"); |
| } |
| if (read_data.size() == 0) { |
| break; |
| } |
| memcpy(scratch + bytes_read, reinterpret_cast<char*>(read_data.data()), |
| read_data.size()); |
| bytes_read += read_data.size(); |
| } |
| *result = Slice(scratch, bytes_read); |
| return Status::OK(); |
| } |
| |
| size_t size() override { return size_; } |
| }; |
| |
| // A RandomAccessFile backed by a VMO |
| class FuchsiaVmoFile : public RandomAccessFileWithSize { |
| private: |
| std::string filename_; |
| zx::vmo vmo_; |
| size_t size_; |
| |
| public: |
| // Create a RandomAccessFile from |vmo|, at path |fname| for a file of |
| // size |size|. |
| FuchsiaVmoFile(const std::string& fname, zx::vmo vmo, size_t size) |
| : filename_(fname), vmo_(std::move(vmo)), size_(size) {} |
| |
| size_t size() override { return size_; } |
| |
| Status Read(uint64_t offset, size_t n, Slice* result, |
| char* scratch) const override { |
| size_t read_size = offset > size_ ? 0 : std::min(n, size_ - offset); |
| zx_status_t status = vmo_.read(scratch, offset, read_size); |
| if (status != ZX_OK) { |
| return FuchsiaZxError(filename_, status); |
| } |
| *result = Slice(scratch, read_size); |
| return Status::OK(); |
| } |
| }; |
| |
| // Tries to construct a FuchsiaVmoFile from |file|, or fallbacks to a |
| // FuchsiaRandomAccessFile. On success, returns Status::OK() and writes the |
| // result in |out|. |
| Status BuildRandomAccessFile(const std::string& filename, |
| fuchsia::io::FileSyncPtr file, |
| std::unique_ptr<RandomAccessFileWithSize>* out) { |
| zx_status_t buffer_status; |
| fuchsia::mem::BufferPtr buffer; |
| zx_status_t fidl_status = file->GetBuffer(fuchsia::io::OPEN_RIGHT_READABLE, |
| &buffer_status, &buffer); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(filename, fidl_status); |
| } |
| if (buffer_status == ZX_OK && buffer) { |
| *out = std::make_unique<FuchsiaVmoFile>(filename, std::move(buffer->vmo), |
| buffer->size); |
| return Status::OK(); |
| } |
| zx_status_t getattr_status; |
| fuchsia::io::NodeAttributes attrs; |
| fidl_status = file->GetAttr(&getattr_status, &attrs); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(filename, fidl_status); |
| } |
| if (getattr_status != ZX_OK) { |
| return FuchsiaZxError(filename, getattr_status); |
| } |
| *out = std::make_unique<FuchsiaRandomAccessFile>(filename, std::move(file), |
| attrs.content_size); |
| return Status::OK(); |
| } |
| |
| // WritableFile that handles asynchronous writes. Writes are sent as soon as the |
| // buffer is full, and the list of pending writes is saved. Flush ensures that |
| // all writes have received an answer. The implementation assumes that there |
| // will not be partial writes. |
| |
| class FuchsiaWritableFile : public WritableFile { |
| public: |
| FuchsiaWritableFile(const fuchsia::io::DirectorySyncPtr& root, |
| const std::string& fname, |
| fidl::InterfaceHandle<fuchsia::io::File> f) |
| : sync_waiter_(&filename_), filename_(fname) { |
| file_.Bind(std::move(f), sync_waiter_.async_dispatcher()); |
| file_.set_error_handler([this](zx_status_t status) { |
| sync_waiter_.SetError(FuchsiaZxError(filename_, status)); |
| file_ = nullptr; |
| }); |
| buf_.reserve(kBufSize); |
| |
| // Keep a handle to the parent directory if the basename starts with |
| // MANIFEST. This is necessary to call Sync() on it in Sync(). |
| const char* fbuf = filename_.c_str(); |
| const char* sep = strrchr(fbuf, '/'); |
| Slice basename; |
| std::string dir; |
| if (sep == nullptr) { |
| dir = "."; |
| basename = fbuf; |
| } else { |
| dir = std::string(fbuf, sep - fbuf); |
| basename = sep + 1; |
| } |
| if (basename.starts_with("MANIFEST")) { |
| Status s = OpenDirAt( |
| root, dir, |
| fuchsia::io::OPEN_RIGHT_READABLE | fuchsia::io::OPEN_FLAG_POSIX, |
| &parent_dir_); |
| if (!s.ok()) { |
| // Store the error in the event handler |
| sync_waiter_.SetError(s); |
| } |
| } |
| } |
| |
| ~FuchsiaWritableFile() override { |
| if (file_) { |
| Status s = Close(); |
| if (!s.ok()) { |
| FX_LOGF(WARNING, LOG_TAG, |
| "Error when closing %s in ~FuchsiaAsyncWritableFile: %s", |
| filename_.c_str(), s.ToString().c_str()); |
| } |
| } |
| } |
| |
| Status Append(const Slice& d) override { |
| if (!sync_waiter_.status().ok()) { |
| return sync_waiter_.status(); |
| } |
| |
| Slice data = d; |
| // Copy pieces of data to the buffer and send the buffer. There is no point |
| // in trying to avoid going through the buffer, since we need to either move |
| // or copy the storage into the VectorPtr when writing. |
| while (data.size() > 0) { |
| size_t write_size = std::min(kBufSize - buf_.size(), data.size()); |
| uint8_t* write_pos = &(*buf_.end()); |
| buf_.resize(write_size + buf_.size()); |
| memcpy(write_pos, data.data(), write_size); |
| data.remove_prefix(write_size); |
| if (buf_.size() >= kBufSize) { |
| Send(); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status Close() override { |
| if (!sync_waiter_.status().ok()) { |
| return sync_waiter_.status(); |
| } |
| Status s = Flush(); |
| if (!s.ok()) { |
| return s; |
| } |
| file_->Close([this](zx_status_t status) { |
| if (status != ZX_OK) { |
| sync_waiter_.SetError(FuchsiaZxError(filename_, status)); |
| } else { |
| sync_waiter_.Done(); |
| } |
| }); |
| s = sync_waiter_.RunUntilDone(); |
| if (!s.ok()) { |
| return s; |
| } |
| if (parent_dir_) { |
| zx_status_t fidl_status, close_status; |
| fidl_status = parent_dir_->Close(&close_status); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(filename_, fidl_status); |
| } |
| if (close_status != ZX_OK) { |
| return FuchsiaZxError(filename_, close_status); |
| } |
| } |
| file_ = nullptr; |
| return Status::OK(); |
| } |
| |
| Status Flush() override { |
| if (!sync_waiter_.status().ok()) { |
| return sync_waiter_.status(); |
| } |
| if (buf_.size() > 0) { |
| Send(); |
| } |
| return sync_waiter_.RunUntil([this] { return pending_writes_ == 0; }); |
| } |
| |
| Status Sync() override { |
| if (!sync_waiter_.status().ok()) { |
| return sync_waiter_.status(); |
| } |
| |
| // Ensure new files referred to by the manifest are in the filesystem. |
| if (parent_dir_) { |
| zx_status_t sync_status; |
| zx_status_t fidl_status = parent_dir_->Sync(&sync_status); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(filename_, fidl_status); |
| } |
| if (sync_status != ZX_OK) { |
| return FuchsiaZxError(filename_, sync_status); |
| } |
| } |
| Status s = Flush(); |
| if (!s.ok()) { |
| return s; |
| } |
| file_->Sync([this](zx_status_t status) { |
| if (status != ZX_OK) { |
| sync_waiter_.SetError(FuchsiaZxError(filename_, status)); |
| } else { |
| sync_waiter_.Done(); |
| } |
| }); |
| return sync_waiter_.RunUntilDone(); |
| } |
| |
| private: |
| void Send() { |
| size_t write_size = buf_.size(); |
| pending_writes_++; |
| file_->Write(std::move(buf_), |
| [this, write_size](zx_status_t status, size_t actual) { |
| if (status != ZX_OK) { |
| sync_waiter_.SetError(FuchsiaZxError(filename_, status)); |
| } else { |
| // Assume all writes are full. |
| ZX_DEBUG_ASSERT(write_size == actual); |
| pending_writes_--; |
| } |
| }); |
| buf_.clear(); |
| buf_.reserve(kBufSize); |
| } |
| |
| // The size of the write buffer. This is the maximum size that can be sent in |
| // a Write message. |
| constexpr static size_t kBufSize = fuchsia::io::MAX_BUF; |
| // A handle on the file. |
| fuchsia::io::FilePtr file_; |
| // For MANIFESTs, keep a handle on the parent directory for syncing. |
| fuchsia::io::DirectorySyncPtr parent_dir_; |
| // A SyncWaiter to handle asynchronous responses. |
| SyncWaiter sync_waiter_; |
| // The buffer of things to sent. Will always have capacity |kBufSize|. |
| std::vector<uint8_t> buf_; |
| // The number of sent but unanswered writes. |
| size_t pending_writes_ = 0; |
| // The name of the file, for reporting errors. |
| std::string filename_; |
| }; |
| |
| class FuchsiaFileLock : public FileLock { |
| public: |
| FuchsiaFileLock(std::string fname) : name_(fname) {} |
| |
| const std::string& name() { return name_; } |
| |
| private: |
| const std::string name_; |
| }; |
| |
| // Set of locked files. This is used to guard against multiple uses from one |
| // process. |
| class LockTable { |
| private: |
| port::Mutex mu_; |
| std::set<std::string> locked_files_ GUARDED_BY(mu_); |
| |
| public: |
| bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) { |
| MutexLock l(&mu_); |
| return locked_files_.insert(fname).second; |
| } |
| void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) { |
| MutexLock l(&mu_); |
| locked_files_.erase(fname); |
| } |
| }; |
| |
| class FuchsiaEnv : public Env { |
| public: |
| FuchsiaEnv(fidl::InterfaceHandle<fuchsia::io::Directory> root_channel); |
| FuchsiaEnv(int root_fd); |
| |
| ~FuchsiaEnv() override { |
| if (bgloop_) { |
| bgloop_->Shutdown(); |
| } |
| |
| if (root_dir_) { |
| int32_t out_s; |
| root_dir_->Close(&out_s); |
| } |
| |
| if (this == Env::Default()) { |
| char msg[] = "Destroying Env::Default()\n"; |
| fwrite(msg, 1, sizeof(msg), stderr); |
| abort(); |
| } |
| } |
| |
| Status NewSequentialFile(const std::string& fname, |
| SequentialFile** result) override { |
| std::unique_ptr<SequentialFile> sequential_file; |
| Status s = NewSequentialFile(fname, &sequential_file); |
| *result = sequential_file.release(); |
| return s; |
| } |
| |
| Status NewRandomAccessFile(const std::string& fname, |
| RandomAccessFile** result) override { |
| std::unique_ptr<RandomAccessFileWithSize> random_access_file; |
| Status s = NewRandomAccessFile(fname, &random_access_file); |
| *result = random_access_file.release(); |
| return s; |
| } |
| |
| Status NewWritableFile(const std::string& fname, |
| WritableFile** result) override { |
| std::unique_ptr<WritableFile> writable_file; |
| Status s = |
| NewWritableFile(fname, fuchsia::io::OPEN_FLAG_TRUNCATE, &writable_file); |
| *result = writable_file.release(); |
| return s; |
| } |
| |
| Status NewAppendableFile(const std::string& fname, |
| WritableFile** result) override { |
| std::unique_ptr<WritableFile> writable_file; |
| Status s = |
| NewWritableFile(fname, fuchsia::io::OPEN_FLAG_APPEND, &writable_file); |
| *result = writable_file.release(); |
| return s; |
| } |
| |
| bool FileExists(const std::string& fname) override { |
| if (!delayed_status_.ok()) { |
| return false; |
| } |
| |
| fidl::InterfaceHandle<fuchsia::io::Node> node_handle; |
| fuchsia::io::NodeInfo node_info; |
| return OpenAndDescribe(root_dir_, fname, |
| fuchsia::io::OPEN_FLAG_NODE_REFERENCE, 0, |
| &node_handle, &node_info) |
| .ok(); |
| } |
| |
| Status GetChildren(const std::string& dir, |
| std::vector<std::string>* result) override { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| fuchsia::io::DirectorySyncPtr d; |
| result->clear(); |
| Status s = OpenDirAt(root_dir_, dir, fuchsia::io::OPEN_RIGHT_READABLE, &d); |
| if (!s.ok()) { |
| return s; |
| } |
| std::vector<uint8_t> buf; |
| zx_status_t fidl_status; |
| zx_status_t read_status; |
| vdirent_t entry; |
| while ((fidl_status = d->ReadDirents(fuchsia::io::MAX_BUF, &read_status, |
| &buf)) == ZX_OK && |
| read_status == ZX_OK && buf.size() > 0) { |
| size_t offset = 0; |
| while (offset + sizeof(vdirent_t) <= buf.size()) { |
| memcpy(&entry, buf.data() + offset, sizeof(vdirent_t)); |
| if (sizeof(vdirent_t) + entry.size > buf.size() - offset) { |
| break; |
| } |
| result->push_back( |
| std::string((char*)buf.data() + offset + sizeof(vdirent_t), |
| (size_t)entry.size)); |
| offset += sizeof(vdirent_t) + entry.size; |
| } |
| if (offset != buf.size()) { |
| return Status::IOError(dir, "Malformed dirent"); |
| } |
| } |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(dir, fidl_status); |
| } |
| if (read_status != ZX_OK) { |
| return FuchsiaZxError(dir, read_status); |
| } |
| return Status::OK(); |
| } |
| |
| Status DeleteFile(const std::string& fname) override { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| fuchsia::io::DirectorySyncPtr d; |
| std::string basename; |
| Status s = FindFileInDirectory(fname, &d, &basename); |
| if (!s.ok()) { |
| return s; |
| } |
| zx_status_t delete_status; |
| zx_status_t fidl_status = d->Unlink(basename, &delete_status); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(fname, fidl_status); |
| } |
| if (delete_status != ZX_OK) { |
| return FuchsiaZxError(fname, delete_status); |
| } |
| return Status::OK(); |
| } |
| |
| Status CreateDir(const std::string& name) override { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| return OpenDirAt(root_dir_, name, |
| fuchsia::io::OPEN_RIGHT_READABLE | |
| fuchsia::io::OPEN_FLAG_POSIX | |
| fuchsia::io::OPEN_FLAG_CREATE | |
| fuchsia::io::OPEN_FLAG_CREATE_IF_ABSENT, |
| nullptr); |
| } |
| |
| Status DeleteDir(const std::string& name) override { |
| return DeleteFile(name); |
| } |
| |
| Status GetFileSize(const std::string& fname, uint64_t* size) override { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| fidl::InterfaceHandle<fuchsia::io::Node> node_handle; |
| fuchsia::io::NodeInfo node_info; |
| Status s = |
| OpenAndDescribe(root_dir_, fname, fuchsia::io::OPEN_FLAG_NODE_REFERENCE, |
| 0, &node_handle, &node_info); |
| if (!s.ok()) { |
| return s; |
| } |
| fuchsia::io::NodeSyncPtr node; |
| node.Bind(std::move(node_handle)); |
| zx_status_t attr_status; |
| fuchsia::io::NodeAttributes node_attributes; |
| zx_status_t fidl_status = node->GetAttr(&attr_status, &node_attributes); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(fname, fidl_status); |
| } |
| if (attr_status != ZX_OK) { |
| return FuchsiaZxError(fname, attr_status); |
| } |
| *size = node_attributes.content_size; |
| |
| return Status::OK(); |
| } |
| |
| // Assuming the target is a non-existing file (not a directory) |
| Status RenameFile(const std::string& src, |
| const std::string& target) override { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| fuchsia::io::DirectorySyncPtr src_dir; |
| std::string src_basename; |
| Status result = FindFileInDirectory(src, &src_dir, &src_basename); |
| if (!result.ok()) { |
| return result; |
| } |
| fuchsia::io::DirectorySyncPtr target_dir; |
| std::string target_basename; |
| result = FindFileInDirectory(target, &target_dir, &target_basename); |
| if (!result.ok()) { |
| return result; |
| } |
| zx_status_t token_status; |
| zx::handle target_dir_token; |
| zx_status_t fidl_status = |
| target_dir->GetToken(&token_status, &target_dir_token); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(target, fidl_status); |
| } |
| if (token_status != ZX_OK) { |
| return FuchsiaZxError(target, token_status); |
| } |
| zx_status_t rename_status; |
| fidl_status = src_dir->Rename(src_basename, std::move(target_dir_token), |
| target_basename, &rename_status); |
| if (fidl_status != ZX_OK) { |
| return FuchsiaZxError(target, fidl_status); |
| } |
| if (rename_status != ZX_OK) { |
| return FuchsiaZxError(target, rename_status); |
| } |
| return Status::OK(); |
| } |
| |
| Status LockFile(const std::string& fname, FileLock** result) override { |
| std::unique_ptr<FileLock> lock; |
| Status s = LockFile(fname, &lock); |
| *result = lock.release(); |
| return s; |
| } |
| |
| Status UnlockFile(FileLock* lock) override { |
| return UnlockFile(std::unique_ptr<FuchsiaFileLock>( |
| reinterpret_cast<FuchsiaFileLock*>(lock))); |
| } |
| |
| void Schedule(void (*function)(void*), void* arg) override; |
| |
| void StartThread(void (*function)(void* arg), void* arg) override; |
| |
| Status GetTestDirectory(std::string* result) override { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| const char* env = getenv("TEST_TMPDIR"); |
| if (env && env[0] != '\0') { |
| *result = env; |
| } else { |
| char buf[100]; |
| snprintf(buf, sizeof(buf), "leveldbtest-%d", int(geteuid())); |
| *result = buf; |
| } |
| // Directory may already exist |
| CreateDir(*result); |
| return Status::OK(); |
| } |
| |
| Status NewLogger(const std::string& fname, Logger** result) override { |
| std::unique_ptr<Logger> logger; |
| Status s = NewLogger(fname, &logger); |
| *result = logger.release(); |
| return s; |
| } |
| |
| uint64_t NowMicros() override { |
| struct timeval tv; |
| gettimeofday(&tv, nullptr); |
| return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; |
| } |
| |
| void SleepForMicroseconds(int micros) override { usleep(micros); } |
| |
| private: |
| Status NewSequentialFile(const std::string& fname, |
| std::unique_ptr<SequentialFile>* result) { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| std::unique_ptr<RandomAccessFileWithSize> file; |
| Status s = NewRandomAccessFile(fname, &file); |
| if (!s.ok()) { |
| return s; |
| } |
| *result = std::make_unique<SequentialFileWrapper>(std::move(file)); |
| return s; |
| } |
| |
| Status NewRandomAccessFile( |
| const std::string& fname, |
| std::unique_ptr<RandomAccessFileWithSize>* result) { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| fuchsia::io::FileSyncPtr file; |
| Status s = |
| OpenFileAt(root_dir_, fname, fuchsia::io::OPEN_RIGHT_READABLE, &file); |
| if (!s.ok()) { |
| return s; |
| } |
| return BuildRandomAccessFile(fname, std::move(file), result); |
| } |
| |
| Status NewWritableFile(const std::string& fname, uint32_t extra_flags, |
| std::unique_ptr<WritableFile>* result) { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| fuchsia::io::FileSyncPtr file; |
| Status s = OpenFileAt(root_dir_, fname, |
| fuchsia::io::OPEN_RIGHT_WRITABLE | |
| fuchsia::io::OPEN_FLAG_CREATE | extra_flags, |
| &file); |
| if (s.ok()) { |
| *result = std::make_unique<FuchsiaWritableFile>(root_dir_, fname, |
| std::move(file)); |
| } |
| return s; |
| } |
| |
| // Our implementation uses the |locks_| table to guard against creating |
| // multiple databases backed by the same file within one process. This does |
| // not guard against multiple processes using the same file concurrently. |
| Status LockFile(const std::string& fname, std::unique_ptr<FileLock>* lock) { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| Status result; |
| if (!locks_.Insert(fname)) { |
| result = Status::IOError("lock " + fname, "already held by process"); |
| } else { |
| *lock = std::make_unique<FuchsiaFileLock>(fname); |
| } |
| return result; |
| } |
| |
| Status UnlockFile(std::unique_ptr<FuchsiaFileLock> lock) { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| locks_.Remove(lock->name()); |
| return Status::OK(); |
| } |
| |
| Status NewLogger(const std::string& fname, std::unique_ptr<Logger>* result) { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| std::unique_ptr<WritableFile> file; |
| Status status = |
| NewWritableFile(fname, fuchsia::io::OPEN_FLAG_TRUNCATE, &file); |
| if (!status.ok()) { |
| return status; |
| } |
| *result = std::make_unique<FuchsiaFileLogger>(std::move(file), |
| &FuchsiaEnv::gettid); |
| return Status::OK(); |
| } |
| |
| Status FindFileInDirectory(const std::string& fname, |
| fuchsia::io::DirectorySyncPtr* d, |
| std::string* basename) { |
| if (!delayed_status_.ok()) { |
| return delayed_status_; |
| } |
| |
| const char* f = fname.c_str(); |
| const char* sep = strrchr(f, '/'); |
| std::string dir; |
| if (sep == nullptr) { |
| dir = "."; |
| *basename = f; |
| } else { |
| dir = std::string(f, sep - f); |
| *basename = sep + 1; |
| } |
| return OpenDirAt( |
| root_dir_, dir, |
| fuchsia::io::OPEN_RIGHT_READABLE | fuchsia::io::OPEN_FLAG_POSIX, d); |
| } |
| |
| static uint64_t gettid() { |
| pthread_t tid = pthread_self(); |
| uint64_t thread_id = 0; |
| memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); |
| return thread_id; |
| } |
| |
| // Root directory used by this environment |
| fuchsia::io::DirectorySyncPtr root_dir_; |
| // Delayed construction errors |
| Status delayed_status_; |
| |
| std::optional<async::Loop> bgloop_; |
| |
| LockTable locks_; |
| }; |
| |
| FuchsiaEnv::FuchsiaEnv( |
| fidl::InterfaceHandle<fuchsia::io::Directory> root_channel) { |
| if (!root_channel) { |
| abort(); |
| } |
| root_dir_.Bind(std::move(root_channel)); |
| } |
| |
| int OpenDirAtCWD(std::string name) { |
| int fd = openat(AT_FDCWD, name.c_str(), O_RDONLY); |
| return fd; |
| } |
| |
| FuchsiaEnv::FuchsiaEnv(int root_fd) { |
| if (root_fd != AT_FDCWD && root_fd < 0) { |
| abort(); |
| } |
| |
| // The file descriptor obtained with openat(AT_FDCWD, ".", ...) cannot be |
| // converted to a channel. When the client does not provide this, we choose to |
| // create the database in /tmp. |
| |
| // Some components do not have access to /tmp, but we still need to be able |
| // to create a default object because the default constructor for |
| // leveldb::Option needs it. We store an error in delayed_status_ so that |
| // this instance fails when asked to do any work. |
| |
| // If we opened a fd ourselves, we need to close it, but we should not close |
| // fds created by other people. |
| bool close_root_fd = false; |
| fit::deferred_action<fit::closure> cleanup; |
| if (root_fd == AT_FDCWD) { |
| root_fd = OpenDirAtCWD("/tmp"); |
| // In this case we own the root_fd |
| if (root_fd >= 0) { |
| cleanup = [root_fd]() { close(root_fd); }; |
| } |
| } |
| zx::channel root = CloneChannelFromFileDescriptor(root_fd); |
| if (root) { |
| root_dir_.Bind(std::move(root)); |
| } else { |
| delayed_status_ = |
| Status::IOError("FuchsiaEnv", "Could not get a root channel from fd"); |
| } |
| } |
| |
| void FuchsiaEnv::Schedule(void (*function)(void*), void* arg) { |
| // Start background thread if necessary |
| if (!bgloop_) { |
| bgloop_.emplace(&kAsyncLoopConfigNoAttachToCurrentThread); |
| zx_status_t status = bgloop_->StartThread("LevelDB_BackgroundThread"); |
| ZX_ASSERT(status == ZX_OK); |
| } |
| |
| async::PostTask(bgloop_->dispatcher(), |
| ([function, arg] { (*function)(arg); })); |
| } |
| |
| // Env::StartThread is only used in tests and benchmarks. |
| void FuchsiaEnv::StartThread(void (*function)(void* arg), void* arg) { |
| std::thread t([function, arg] { (*function)(arg); }); |
| t.detach(); |
| } |
| |
| } // namespace |
| |
| static std::once_flag once; |
| static Env* default_env; |
| |
| static void InitDefaultEnv() { default_env = new FuchsiaEnv(AT_FDCWD); } |
| |
| Env* Env::Default() { |
| std::call_once(once, InitDefaultEnv); |
| return default_env; |
| } |
| |
| std::unique_ptr<Env> MakeFuchsiaEnv(int root_fd) { |
| return std::make_unique<FuchsiaEnv>(root_fd); |
| } |
| |
| std::unique_ptr<Env> MakeFuchsiaEnv( |
| fidl::InterfaceHandle<fuchsia::io::Directory> root_dir) { |
| return std::make_unique<FuchsiaEnv>(std::move(root_dir)); |
| } |
| |
| } // namespace leveldb |