blob: bcaedf1dece3190eda4ab5e0dbbe8c337c11ea1c [file] [log] [blame]
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Copyright 2016 The Fuchsia Authors. All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "util/env_fuchsia.h"
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#include <deque>
#include <set>
#include <vector>
#include <fuchsia/io/cpp/fidl.h>
#include <lib/async/cpp/task.h>
#include <lib/fdio/vfs.h>
#include <lib/fit/defer.h>
#include <lib/fsl/io/fd.h>
#include <lib/syslog/cpp/logger.h>
#include <zircon/status.h>
#include <zircon/syscalls.h>
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/logging.h"
#include "util/mutexlock.h"
namespace leveldb {
namespace {
// Encode a zx_status_t as a LevelDB Status
static Status FuchsiaZxError(const std::string& context, zx_status_t status) {
if (status == ZX_ERR_NOT_FOUND) {
return Status::NotFound(context, "File not found");
} else {
return Status::IOError(context, zx_status_get_string(status));
}
}
// SyncWaiter implements a basic async dispatcher that only supports one wait at
// a time. Users of a SyncWaiter can (synchronously) let events be dispatched
// until a certain condition happens. In case of error, clients can use SetError
// to force an early exit.
// A SyncWaiter can only be used by one thread at a time. Callbacks will be
// called on the thread that currently runs the SyncWaiter. SyncWaiter is safe
// to use with fidl::InterfacePtr if the calling thread owns the pointers and
// all associated resources.
class SyncWaiter {
public:
// |error_context| is used as a prefix for error messages
// It should oulive the SyncWaiter
SyncWaiter(const std::string* error_context)
: dispatcher_({{&kOps}, this}),
wait_(nullptr),
error_context_(error_context){};
SyncWaiter(const SyncWaiter&) = delete;
~SyncWaiter() {
if (wait_ != nullptr) {
wait_->handler(async_dispatcher(), wait_, ZX_ERR_CANCELED, nullptr);
}
}
// Returns the async dispatcher associated to this object
async_dispatcher_t* async_dispatcher() { return &dispatcher_.dispatcher; }
// Quit early with an error
void SetError(Status s) { pending_error_ = s; }
Status status() { return pending_error_; }
// Handle events until |can_proceed()| returns true
Status RunUntil(fit::function<bool()> can_proceed) {
Status s = Run(zx::time(0));
if (!s.ok()) {
return s;
}
while (!can_proceed()) {
s = Run(zx::time::infinite());
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
// Handle events until |Done()| is called
Status RunUntilDone() {
is_done_ = false;
return RunUntil([this] { return is_done_; });
}
// Interrupts the |RunUntilDone()| loop
void Done() { is_done_ = true; }
private:
// Wait for events until |deadline|
Status Run(zx::time deadline) {
if (!pending_error_.ok()) {
return pending_error_;
}
if (!wait_) {
if (deadline == zx::time(0)) {
return Status::OK();
} else {
return Status::IOError(*error_context_, "No work to do");
}
}
zx_signals_t observed;
zx_status_t wait_status = zx_object_wait_one(wait_->object, wait_->trigger,
deadline.get(), &observed);
if (wait_status == ZX_ERR_TIMED_OUT) {
return Status::OK();
}
if (wait_status != ZX_OK) {
return FuchsiaZxError(*error_context_, wait_status);
}
zx_packet_signal_t signals = {wait_->trigger, observed, 1};
async_wait_t* wait = wait_;
wait_ = nullptr;
wait->handler(async_dispatcher(), wait, ZX_OK, &signals);
return pending_error_;
}
zx_status_t BeginWait(async_wait_t* wait) {
if (wait_ != nullptr) {
return ZX_ERR_NOT_SUPPORTED;
}
wait_ = wait;
return ZX_OK;
}
zx_status_t CancelWait(async_wait_t* wait) {
if (wait != wait_ || wait == nullptr) {
return ZX_ERR_NOT_FOUND;
}
wait_ = nullptr;
return ZX_OK;
}
using dispatcher = struct {
async_dispatcher_t dispatcher;
SyncWaiter* self;
};
static zx_time_t now(async_dispatcher_t* dispatcher) {
return zx_clock_get_monotonic();
}
static zx_status_t begin_wait(async_dispatcher_t* dispatcher,
async_wait_t* wait) {
return reinterpret_cast<leveldb::SyncWaiter::dispatcher*>(dispatcher)
->self->BeginWait(wait);
}
static zx_status_t cancel_wait(async_dispatcher_t* dispatcher,
async_wait_t* wait) {
return reinterpret_cast<leveldb::SyncWaiter::dispatcher*>(dispatcher)
->self->CancelWait(wait);
}
static zx_status_t post_task(async_dispatcher_t* dispatcher,
async_task_t* task) {
return ZX_ERR_NOT_SUPPORTED;
}
static zx_status_t cancel_task(async_dispatcher_t* dispatcher,
async_task_t* task) {
return ZX_ERR_NOT_SUPPORTED;
}
static zx_status_t queue_packet(async_dispatcher_t* dispatcher,
async_receiver_t* receiver,
const zx_packet_user_t* data) {
return ZX_ERR_NOT_SUPPORTED;
}
static zx_status_t set_guest_bell_trap(async_dispatcher_t* dispatcher,
async_guest_bell_trap_t* trap,
zx_handle_t guest, zx_vaddr_t addr,
size_t length) {
return ZX_ERR_NOT_SUPPORTED;
}
constexpr static async_ops_t kOps = {
ASYNC_OPS_V1,
0,
{now, begin_wait, cancel_wait, post_task, cancel_task, queue_packet,
set_guest_bell_trap}};
// The async dispatcher associated to this object.
dispatcher dispatcher_;
// A waiter set through the async dispatcher.
async_wait_t* wait_ = nullptr;
// Set to an error if the client wants to exit early.
Status pending_error_ = Status::OK();
// A context provided for errors. It must outlive the object.
const std::string* error_context_;
// Set by Done() to indicate that the loop should terminate.
bool is_done_;
};
// Open a filesystem node at path |name| relative to a directory |root_dir|.
// The parameters |flags| and |mode| correspond to the parameters of
// fuchsia::io::Directory::Open. The resulting node handle is returned in
// |out_handle|. This function adds the OPEN_FLAG_DESCRIBE flag to be able to
// get errors back on the channel (via the OnOpen event) and a description of
// the type of node that was open. If opening is successful, Status::OK() is
// returned the resulting NodeInfo structure is returned in
// |out_node_info|. Otherwise, an error is returned.
Status OpenAndDescribe(const fuchsia::io::DirectorySyncPtr& root_dir,
const std::string& name, uint32_t flags, uint32_t mode,
fidl::InterfaceHandle<fuchsia::io::Node>* out_handle,
fuchsia::io::NodeInfo* out_node_info) {
// We want to receive the description of the object we open
flags |= fuchsia::io::OPEN_FLAG_DESCRIBE;
// NodeSyncPtr has no support for receiving OnOpen
// We use the SyncWaiter to take care of this
SyncWaiter sync_waiter(&name);
fuchsia::io::NodePtr node;
node.set_error_handler([&sync_waiter, &name](zx_status_t node_status) {
sync_waiter.SetError(FuchsiaZxError(name, node_status));
});
node.events().OnOpen = [&sync_waiter, &name, out_node_info](
zx_status_t open_status,
std::unique_ptr<fuchsia::io::NodeInfo> node_info) {
if (open_status != ZX_OK) {
sync_waiter.SetError(FuchsiaZxError(name, open_status));
return;
}
if (!node_info) {
sync_waiter.SetError(Status::IOError(name, "Expected a NodeInfo"));
return;
}
*out_node_info = std::move(*node_info);
sync_waiter.Done();
};
zx_status_t fidl_status = root_dir->Open(
flags, mode, name, node.NewRequest(sync_waiter.async_dispatcher()));
if (fidl_status != ZX_OK) {
return FuchsiaZxError(name, fidl_status);
}
Status s = sync_waiter.RunUntilDone();
if (!s.ok()) {
return s;
}
*out_handle = node.Unbind();
return Status::OK();
}
// Open a node at path |name| relative to |root_dir| and ensure it is a file. On
// success, the resulting channel is bound to |file| unless |file| is null.
Status OpenFileAt(const fuchsia::io::DirectorySyncPtr& root_dir,
const std::string& fname, uint32_t flags,
fuchsia::io::FileSyncPtr* file) {
fuchsia::io::NodeInfo node_info;
fidl::InterfaceHandle<fuchsia::io::Node> file_handle;
Status s =
OpenAndDescribe(root_dir, fname, flags, fuchsia::io::MODE_TYPE_FILE,
&file_handle, &node_info);
if (!s.ok()) {
return s;
}
if (!node_info.is_file()) {
return FuchsiaZxError(fname, ZX_ERR_NOT_FILE);
}
if (file != nullptr) {
// Cast the fuchsia::io::Node to fuchsia::io::File by moving the channel.
file->Bind(file_handle.TakeChannel());
}
return Status::OK();
}
// Open a node at path |name| relative to |root_dir| and ensure it is a
// directory. On success, the resulting channel is bound to |file| unless |file|
// is null.
Status OpenDirAt(const fuchsia::io::DirectorySyncPtr& root_dir,
const std::string& dname, uint32_t flags,
fuchsia::io::DirectorySyncPtr* dir) {
fidl::InterfaceHandle<fuchsia::io::Node> dir_handle;
fuchsia::io::NodeInfo node_info;
Status s = OpenAndDescribe(
root_dir, dname, flags | fuchsia::io::OPEN_FLAG_DIRECTORY,
fuchsia::io::MODE_TYPE_DIRECTORY, &dir_handle, &node_info);
if (!s.ok()) {
return s;
}
// Given our Open mode, the channel must be a directory
if (dir != nullptr) {
// Case the fuchsia::io::Node to fuchsia::io::Directory by moving the
// channel.
dir->Bind(dir_handle.TakeChannel());
}
return Status::OK();
}
class FuchsiaFileLogger : public Logger {
private:
std::unique_ptr<WritableFile> file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread
public:
FuchsiaFileLogger(std::unique_ptr<WritableFile> f, uint64_t (*gettid)())
: file_(std::move(f)), gettid_(gettid) {}
void Logv(const char* format, va_list ap) override {
const uint64_t thread_id = (*gettid_)();
// We try twice: the first time with a fixed-size stack allocated buffer,
// and the second time with a much larger dynamically allocated buffer.
char buffer[500];
for (int iter = 0; iter < 2; iter++) {
char* base;
int bufsize;
if (iter == 0) {
bufsize = sizeof(buffer);
base = buffer;
} else {
bufsize = 30000;
base = new char[bufsize];
}
char* p = base;
char* limit = base + bufsize;
struct timeval now_tv;
gettimeofday(&now_tv, nullptr);
p +=
snprintf(p, limit - p, "%06d %llx ", static_cast<int>(now_tv.tv_usec),
static_cast<long long unsigned int>(thread_id));
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Truncate to available space if necessary
if (p >= limit) {
if (iter == 0) {
continue; // Try again with larger buffer
} else {
p = limit - 1;
}
}
// Add newline if necessary
if (p == base || p[-1] != '\n') {
*p++ = '\n';
}
assert(p <= limit);
file_->Append(Slice(base, p - base));
file_->Flush();
if (base != buffer) {
delete[] base;
}
break;
}
}
};
// An extended version of RandomAccessFile that allows accessing the size of the
// file
class RandomAccessFileWithSize : public RandomAccessFile {
public:
// Returns the size of the file in bytes
virtual size_t size() = 0;
};
// Wraps a RandomAccessFileWithSize to provide the SequentialFile interface
class SequentialFileWrapper : public SequentialFile {
private:
// The total size of the file
size_t file_size_;
// The current position in the file
size_t position_;
// The underlying RandomAccessFile
std::unique_ptr<RandomAccessFileWithSize> file_;
public:
SequentialFileWrapper(std::unique_ptr<RandomAccessFileWithSize> file)
: file_size_(file->size()), position_(0), file_(std::move(file)) {}
~SequentialFileWrapper() override {}
Status Skip(uint64_t n) override {
if (position_ + n < position_ || position_ + n > file_size_) {
position_ = file_size_;
} else {
position_ += n;
}
return Status::OK();
}
Status Read(size_t n, Slice* result, char* scratch) override {
Status s = file_->Read(position_, n, result, scratch);
if (!s.ok()) {
return s;
}
position_ += result->size();
return Status::OK();
}
};
// A RandomAccessFile based on FIDL IO
class FuchsiaRandomAccessFile : public RandomAccessFileWithSize {
private:
std::string filename_;
fuchsia::io::FileSyncPtr file_;
size_t size_;
public:
FuchsiaRandomAccessFile(const std::string& fname,
fuchsia::io::FileSyncPtr file, size_t size)
: filename_(fname), file_(std::move(file)), size_(size) {}
~FuchsiaRandomAccessFile() override {
zx_status_t out_s;
file_->Close(&out_s);
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
// We can't receive more than MAX_BUF bytes over the FIDL channel, so we
// have to do a manual pagination.
size_t bytes_read = 0;
while (bytes_read < n) {
size_t step = std::min(n - bytes_read, fuchsia::io::MAX_BUF);
zx_status_t read_status;
std::vector<uint8_t> read_data;
zx_status_t fidl_status =
file_->ReadAt(step, offset + bytes_read, &read_status, &read_data);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(filename_, fidl_status);
}
if (read_status != ZX_OK) {
return FuchsiaZxError(filename_, read_status);
}
if (read_data.size() > step) {
return Status::IOError(filename_, "Received more bytes than read");
}
if (read_data.size() == 0) {
break;
}
memcpy(scratch + bytes_read, reinterpret_cast<char*>(read_data.data()),
read_data.size());
bytes_read += read_data.size();
}
*result = Slice(scratch, bytes_read);
return Status::OK();
}
size_t size() override { return size_; }
};
// A RandomAccessFile backed by a VMO
class FuchsiaVmoFile : public RandomAccessFileWithSize {
private:
std::string filename_;
zx::vmo vmo_;
size_t size_;
public:
// Create a RandomAccessFile from |vmo|, at path |fname| for a file of
// size |size|.
FuchsiaVmoFile(const std::string& fname, zx::vmo vmo, size_t size)
: filename_(fname), vmo_(std::move(vmo)), size_(size) {}
size_t size() override { return size_; }
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
size_t read_size = offset > size_ ? 0 : std::min(n, size_ - offset);
zx_status_t status = vmo_.read(scratch, offset, read_size);
if (status != ZX_OK) {
return FuchsiaZxError(filename_, status);
}
*result = Slice(scratch, read_size);
return Status::OK();
}
};
// Tries to construct a FuchsiaVmoFile from |file|, or fallbacks to a
// FuchsiaRandomAccessFile. On success, returns Status::OK() and writes the
// result in |out|.
Status BuildRandomAccessFile(const std::string& filename,
fuchsia::io::FileSyncPtr file,
std::unique_ptr<RandomAccessFileWithSize>* out) {
zx_status_t getattr_status;
fuchsia::io::NodeAttributes attrs;
zx_status_t fidl_status = file->GetAttr(&getattr_status, &attrs);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(filename, fidl_status);
}
if (getattr_status != ZX_OK) {
return FuchsiaZxError(filename, getattr_status);
}
zx_status_t vmo_status;
zx::vmo vmo;
fidl_status =
file->GetVmo(fuchsia::io::OPEN_RIGHT_READABLE, &vmo_status, &vmo);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(filename, fidl_status);
}
if (vmo_status == ZX_OK && vmo) {
*out = std::make_unique<FuchsiaVmoFile>(filename, std::move(vmo),
attrs.content_size);
} else {
*out = std::make_unique<FuchsiaRandomAccessFile>(filename, std::move(file),
attrs.content_size);
}
return Status::OK();
}
// WritableFile that handles asynchronous writes. Writes are sent as soon as the
// buffer is full, and the list of pending writes is saved. Flush ensures that
// all writes have received an answer. The implementation assumes that there
// will not be partial writes.
class FuchsiaWritableFile : public WritableFile {
public:
FuchsiaWritableFile(const fuchsia::io::DirectorySyncPtr& root,
const std::string& fname,
fidl::InterfaceHandle<fuchsia::io::File> f)
: sync_waiter_(&filename_), filename_(fname) {
file_.Bind(std::move(f), sync_waiter_.async_dispatcher());
file_.set_error_handler([this](zx_status_t status) {
sync_waiter_.SetError(FuchsiaZxError(filename_, status));
file_ = nullptr;
});
buf_.reserve(kBufSize);
// Keep a handle to the parent directory if the basename starts with
// MANIFEST. This is necessary to call Sync() on it in Sync().
const char* fbuf = filename_.c_str();
const char* sep = strrchr(fbuf, '/');
Slice basename;
std::string dir;
if (sep == nullptr) {
dir = ".";
basename = fbuf;
} else {
dir = std::string(fbuf, sep - fbuf);
basename = sep + 1;
}
if (basename.starts_with("MANIFEST")) {
Status s = OpenDirAt(root, dir, 0, &parent_dir_);
if (!s.ok()) {
// Store the error in the event handler
sync_waiter_.SetError(s);
}
}
}
~FuchsiaWritableFile() override {
if (file_) {
Status s = Close();
if (!s.ok()) {
FXL_LOG(WARNING) << "Error when closing " << filename_
<< " in ~FuchsiaAsyncWritableFile: " << s.ToString();
}
}
}
Status Append(const Slice& d) override {
if (!sync_waiter_.status().ok()) {
return sync_waiter_.status();
}
Slice data = d;
// Copy pieces of data to the buffer and send the buffer. There is no point
// in trying to avoid going through the buffer, since we need to either move
// or copy the storage into the VectorPtr when writing.
while (data.size() > 0) {
size_t write_size = std::min(kBufSize - buf_.size(), data.size());
uint8_t* write_pos = &(*buf_.end());
buf_.resize(write_size + buf_.size());
memcpy(write_pos, data.data(), write_size);
data.remove_prefix(write_size);
if (buf_.size() >= kBufSize) {
Send();
}
}
return Status::OK();
}
Status Close() override {
if (!sync_waiter_.status().ok()) {
return sync_waiter_.status();
}
Status s = Flush();
if (!s.ok()) {
return s;
}
file_->Close([this](zx_status_t status) {
if (status != ZX_OK) {
sync_waiter_.SetError(FuchsiaZxError(filename_, status));
} else {
sync_waiter_.Done();
}
});
s = sync_waiter_.RunUntilDone();
if (!s.ok()) {
return s;
}
if (parent_dir_) {
zx_status_t fidl_status, close_status;
fidl_status = parent_dir_->Close(&close_status);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(filename_, fidl_status);
}
if (close_status != ZX_OK) {
return FuchsiaZxError(filename_, close_status);
}
}
file_ = nullptr;
return Status::OK();
}
Status Flush() override {
if (!sync_waiter_.status().ok()) {
return sync_waiter_.status();
}
if (buf_.size() > 0) {
Send();
}
return sync_waiter_.RunUntil([this] { return pending_writes_ == 0; });
}
Status Sync() override {
if (!sync_waiter_.status().ok()) {
return sync_waiter_.status();
}
// Ensure new files referred to by the manifest are in the filesystem.
if (parent_dir_) {
zx_status_t sync_status;
zx_status_t fidl_status = parent_dir_->Sync(&sync_status);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(filename_, fidl_status);
}
if (sync_status != ZX_OK) {
return FuchsiaZxError(filename_, sync_status);
}
}
Status s = Flush();
if (!s.ok()) {
return s;
}
file_->Sync([this](zx_status_t status) {
if (status != ZX_OK) {
sync_waiter_.SetError(FuchsiaZxError(filename_, status));
} else {
sync_waiter_.Done();
}
});
return sync_waiter_.RunUntilDone();
}
private:
void Send() {
size_t write_size = buf_.size();
pending_writes_++;
file_->Write(fidl::VectorPtr<uint8_t>(std::move(buf_)),
[this, write_size](zx_status_t status, size_t actual) {
if (status != ZX_OK) {
sync_waiter_.SetError(FuchsiaZxError(filename_, status));
} else {
// Assume all writes are full.
FXL_DCHECK(write_size == actual);
pending_writes_--;
}
});
buf_.clear();
buf_.reserve(kBufSize);
}
// The size of the write buffer. This is the maximum size that can be sent in
// a Write message.
constexpr static size_t kBufSize = fuchsia::io::MAX_BUF;
// A handle on the file.
fuchsia::io::FilePtr file_;
// For MANIFESTs, keep a handle on the parent directory for syncing.
fuchsia::io::DirectorySyncPtr parent_dir_;
// A SyncWaiter to handle asynchronous responses.
SyncWaiter sync_waiter_;
// The buffer of things to sent. Will always have capacity |kBufSize|.
std::vector<uint8_t> buf_;
// The number of sent but unanswered writes.
size_t pending_writes_ = 0;
// The name of the file, for reporting errors.
std::string filename_;
};
class FuchsiaFileLock : public FileLock {
public:
FuchsiaFileLock(std::string fname) : name_(fname) {}
const std::string& name() { return name_; }
private:
const std::string name_;
};
// Set of locked files. This is used to guard against multiple uses from one
// process.
class LockTable {
private:
port::Mutex mu_;
std::set<std::string> locked_files_ GUARDED_BY(mu_);
public:
bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_);
return locked_files_.insert(fname).second;
}
void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_);
locked_files_.erase(fname);
}
};
class FuchsiaEnv : public Env {
public:
FuchsiaEnv(fidl::InterfaceHandle<fuchsia::io::Directory> root_channel);
FuchsiaEnv(int root_fd);
~FuchsiaEnv() override {
if (root_dir_) {
int32_t out_s;
root_dir_->Close(&out_s);
}
if (this == Env::Default()) {
char msg[] = "Destroying Env::Default()\n";
fwrite(msg, 1, sizeof(msg), stderr);
abort();
}
}
Status NewSequentialFile(const std::string& fname,
SequentialFile** result) override {
std::unique_ptr<SequentialFile> sequential_file;
Status s = NewSequentialFile(fname, &sequential_file);
*result = sequential_file.release();
return s;
}
Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) override {
std::unique_ptr<RandomAccessFileWithSize> random_access_file;
Status s = NewRandomAccessFile(fname, &random_access_file);
*result = random_access_file.release();
return s;
}
Status NewWritableFile(const std::string& fname,
WritableFile** result) override {
std::unique_ptr<WritableFile> writable_file;
Status s =
NewWritableFile(fname, fuchsia::io::OPEN_FLAG_TRUNCATE, &writable_file);
*result = writable_file.release();
return s;
}
Status NewAppendableFile(const std::string& fname,
WritableFile** result) override {
std::unique_ptr<WritableFile> writable_file;
Status s =
NewWritableFile(fname, fuchsia::io::OPEN_FLAG_APPEND, &writable_file);
*result = writable_file.release();
return s;
}
bool FileExists(const std::string& fname) override {
if (!delayed_status_.ok()) {
return false;
}
fidl::InterfaceHandle<fuchsia::io::Node> node_handle;
fuchsia::io::NodeInfo node_info;
return OpenAndDescribe(root_dir_, fname,
fuchsia::io::OPEN_FLAG_NODE_REFERENCE, 0,
&node_handle, &node_info)
.ok();
}
Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override {
if (!delayed_status_.ok()) {
return delayed_status_;
}
fuchsia::io::DirectorySyncPtr d;
result->clear();
Status s = OpenDirAt(root_dir_, dir, fuchsia::io::OPEN_RIGHT_READABLE, &d);
if (!s.ok()) {
return s;
}
std::vector<uint8_t> buf;
zx_status_t fidl_status;
zx_status_t read_status;
vdirent_t entry;
while ((fidl_status = d->ReadDirents(fuchsia::io::MAX_BUF, &read_status,
&buf)) == ZX_OK &&
read_status == ZX_OK && buf.size() > 0) {
size_t offset = 0;
while (offset + sizeof(vdirent_t) <= buf.size()) {
memcpy(&entry, buf.data() + offset, sizeof(vdirent_t));
if (sizeof(vdirent_t) + entry.size > buf.size() - offset) {
break;
}
result->push_back(
std::string((char*)buf.data() + offset + sizeof(vdirent_t),
(size_t)entry.size));
offset += sizeof(vdirent_t) + entry.size;
}
if (offset != buf.size()) {
return Status::IOError(dir, "Malformed dirent");
}
}
if (fidl_status != ZX_OK) {
return FuchsiaZxError(dir, fidl_status);
}
if (read_status != ZX_OK) {
return FuchsiaZxError(dir, read_status);
}
return Status::OK();
}
Status DeleteFile(const std::string& fname) override {
if (!delayed_status_.ok()) {
return delayed_status_;
}
fuchsia::io::DirectorySyncPtr d;
std::string basename;
Status s = FindFileInDirectory(fname, &d, &basename);
if (!s.ok()) {
return s;
}
zx_status_t delete_status;
zx_status_t fidl_status = d->Unlink(basename, &delete_status);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(fname, fidl_status);
}
if (delete_status != ZX_OK) {
return FuchsiaZxError(fname, delete_status);
}
return Status::OK();
}
Status CreateDir(const std::string& name) override {
if (!delayed_status_.ok()) {
return delayed_status_;
}
return OpenDirAt(
root_dir_, name,
fuchsia::io::OPEN_FLAG_CREATE | fuchsia::io::OPEN_FLAG_CREATE_IF_ABSENT,
nullptr);
}
Status DeleteDir(const std::string& name) override {
return DeleteFile(name);
}
Status GetFileSize(const std::string& fname, uint64_t* size) override {
if (!delayed_status_.ok()) {
return delayed_status_;
}
fidl::InterfaceHandle<fuchsia::io::Node> node_handle;
fuchsia::io::NodeInfo node_info;
Status s =
OpenAndDescribe(root_dir_, fname, fuchsia::io::OPEN_FLAG_NODE_REFERENCE,
0, &node_handle, &node_info);
if (!s.ok()) {
return s;
}
fuchsia::io::NodeSyncPtr node;
node.Bind(std::move(node_handle));
zx_status_t attr_status;
fuchsia::io::NodeAttributes node_attributes;
zx_status_t fidl_status = node->GetAttr(&attr_status, &node_attributes);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(fname, fidl_status);
}
if (attr_status != ZX_OK) {
return FuchsiaZxError(fname, attr_status);
}
*size = node_attributes.content_size;
return Status::OK();
}
// Assuming the target is a non-existing file (not a directory)
Status RenameFile(const std::string& src,
const std::string& target) override {
if (!delayed_status_.ok()) {
return delayed_status_;
}
fuchsia::io::DirectorySyncPtr src_dir;
std::string src_basename;
Status result = FindFileInDirectory(src, &src_dir, &src_basename);
if (!result.ok()) {
return result;
}
fuchsia::io::DirectorySyncPtr target_dir;
std::string target_basename;
result = FindFileInDirectory(target, &target_dir, &target_basename);
if (!result.ok()) {
return result;
}
zx_status_t token_status;
zx::handle target_dir_token;
zx_status_t fidl_status =
target_dir->GetToken(&token_status, &target_dir_token);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(target, fidl_status);
}
if (token_status != ZX_OK) {
return FuchsiaZxError(target, token_status);
}
zx_status_t rename_status;
fidl_status = src_dir->Rename(src_basename, std::move(target_dir_token),
target_basename, &rename_status);
if (fidl_status != ZX_OK) {
return FuchsiaZxError(target, fidl_status);
}
if (rename_status != ZX_OK) {
return FuchsiaZxError(target, rename_status);
}
return Status::OK();
}
Status LockFile(const std::string& fname, FileLock** result) override {
std::unique_ptr<FileLock> lock;
Status s = LockFile(fname, &lock);
*result = lock.release();
return s;
}
Status UnlockFile(FileLock* lock) override {
return UnlockFile(std::unique_ptr<FuchsiaFileLock>(
reinterpret_cast<FuchsiaFileLock*>(lock)));
}
void Schedule(void (*function)(void*), void* arg) override;
void StartThread(void (*function)(void* arg), void* arg) override;
Status GetTestDirectory(std::string* result) override {
if (!delayed_status_.ok()) {
return delayed_status_;
}
const char* env = getenv("TEST_TMPDIR");
if (env && env[0] != '\0') {
*result = env;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "leveldbtest-%d", int(geteuid()));
*result = buf;
}
// Directory may already exist
CreateDir(*result);
return Status::OK();
}
Status NewLogger(const std::string& fname, Logger** result) override {
std::unique_ptr<Logger> logger;
Status s = NewLogger(fname, &logger);
*result = logger.release();
return s;
}
uint64_t NowMicros() override {
struct timeval tv;
gettimeofday(&tv, nullptr);
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
}
void SleepForMicroseconds(int micros) override { usleep(micros); }
private:
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result) {
if (!delayed_status_.ok()) {
return delayed_status_;
}
std::unique_ptr<RandomAccessFileWithSize> file;
Status s = NewRandomAccessFile(fname, &file);
if (!s.ok()) {
return s;
}
*result = std::make_unique<SequentialFileWrapper>(std::move(file));
return s;
}
Status NewRandomAccessFile(
const std::string& fname,
std::unique_ptr<RandomAccessFileWithSize>* result) {
if (!delayed_status_.ok()) {
return delayed_status_;
}
fuchsia::io::FileSyncPtr file;
Status s =
OpenFileAt(root_dir_, fname, fuchsia::io::OPEN_RIGHT_READABLE, &file);
if (!s.ok()) {
return s;
}
return BuildRandomAccessFile(fname, std::move(file), result);
}
Status NewWritableFile(const std::string& fname, uint32_t extra_flags,
std::unique_ptr<WritableFile>* result) {
if (!delayed_status_.ok()) {
return delayed_status_;
}
fuchsia::io::FileSyncPtr file;
Status s = OpenFileAt(root_dir_, fname,
fuchsia::io::OPEN_RIGHT_WRITABLE |
fuchsia::io::OPEN_FLAG_CREATE | extra_flags,
&file);
if (s.ok()) {
*result = std::make_unique<FuchsiaWritableFile>(root_dir_, fname,
std::move(file));
}
return s;
}
// Our implementation uses the |locks_| table to guard against creating
// multiple databases backed by the same file within one process. This does
// not guard against multiple processes using the same file concurrently.
Status LockFile(const std::string& fname, std::unique_ptr<FileLock>* lock) {
if (!delayed_status_.ok()) {
return delayed_status_;
}
Status result;
if (!locks_.Insert(fname)) {
result = Status::IOError("lock " + fname, "already held by process");
} else {
*lock = std::make_unique<FuchsiaFileLock>(fname);
}
return result;
}
Status UnlockFile(std::unique_ptr<FuchsiaFileLock> lock) {
if (!delayed_status_.ok()) {
return delayed_status_;
}
locks_.Remove(lock->name());
return Status::OK();
}
Status NewLogger(const std::string& fname, std::unique_ptr<Logger>* result) {
if (!delayed_status_.ok()) {
return delayed_status_;
}
std::unique_ptr<WritableFile> file;
Status status =
NewWritableFile(fname, fuchsia::io::OPEN_FLAG_TRUNCATE, &file);
if (!status.ok()) {
return status;
}
*result = std::make_unique<FuchsiaFileLogger>(std::move(file),
&FuchsiaEnv::gettid);
return Status::OK();
}
void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort();
}
}
// BGThread() is the body of the background thread
void BGThread();
static void* BGThreadWrapper(void* arg) {
reinterpret_cast<FuchsiaEnv*>(arg)->BGThread();
return nullptr;
}
Status FindFileInDirectory(const std::string& fname,
fuchsia::io::DirectorySyncPtr* d,
std::string* basename) {
if (!delayed_status_.ok()) {
return delayed_status_;
}
const char* f = fname.c_str();
const char* sep = strrchr(f, '/');
std::string dir;
if (sep == nullptr) {
dir = ".";
*basename = f;
} else {
dir = std::string(f, sep - f);
*basename = sep + 1;
}
return OpenDirAt(root_dir_, dir, 0, d);
}
static uint64_t gettid() {
pthread_t tid = pthread_self();
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
return thread_id;
}
// Root directory used by this environment
fuchsia::io::DirectorySyncPtr root_dir_;
// Delayed construction errors
Status delayed_status_;
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
pthread_t bgthread_;
bool started_bgthread_;
// Entry per Schedule() call
struct BGItem {
void* arg;
void (*function)(void*);
};
typedef std::deque<BGItem> BGQueue;
BGQueue queue_;
LockTable locks_;
};
FuchsiaEnv::FuchsiaEnv(
fidl::InterfaceHandle<fuchsia::io::Directory> root_channel) {
if (!root_channel) {
abort();
}
root_dir_.Bind(std::move(root_channel));
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
}
int OpenDirAtCWD(std::string name) {
int fd = openat(AT_FDCWD, name.c_str(), O_PATH);
return fd;
}
FuchsiaEnv::FuchsiaEnv(int root_fd) : started_bgthread_(false) {
if (root_fd != AT_FDCWD && root_fd < 0) {
abort();
}
// The file descriptor obtained with openat(AT_FDCWD, ".", ...) cannot be
// converted to a channel. When the client does not provide this, we choose to
// create the database in /tmp.
// Some components do not have access to /tmp, but we still need to be able
// to create a default object because the default constructor for
// leveldb::Option needs it. We store an error in delayed_status_ so that
// this instance fails when asked to do any work.
// If we opened a fd ourselves, we need to close it, but we should not close
// fds created by other people.
bool close_root_fd = false;
fit::deferred_action<fit::closure> cleanup;
if (root_fd == AT_FDCWD) {
root_fd = OpenDirAtCWD("/tmp");
// In this case we own the root_fd
if (root_fd >= 0) {
cleanup = [root_fd]() { close(root_fd); };
}
}
zx::channel root = fsl::CloneChannelFromFileDescriptor(root_fd);
if (root) {
root_dir_.Bind(std::move(root));
} else {
delayed_status_ =
Status::IOError("FuchsiaEnv", "Could not get a root channel from fd");
}
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
}
void FuchsiaEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Start background thread if necessary
if (!started_bgthread_) {
started_bgthread_ = true;
PthreadCall("create thread",
pthread_create(&bgthread_, nullptr,
&FuchsiaEnv::BGThreadWrapper, this));
}
// If the queue is currently empty, the background thread may currently be
// waiting.
if (queue_.empty()) {
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
}
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void FuchsiaEnv::BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty()) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
}
}
struct StartThreadState {
void (*user_function)(void*);
void* arg;
};
static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg);
delete state;
return nullptr;
}
void FuchsiaEnv::StartThread(void (*function)(void* arg), void* arg) {
pthread_t t;
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;
PthreadCall("start thread",
pthread_create(&t, nullptr, &StartThreadWrapper, state));
}
} // namespace
static pthread_once_t once = PTHREAD_ONCE_INIT;
static Env* default_env;
static void InitDefaultEnv() { default_env = new FuchsiaEnv(AT_FDCWD); }
Env* Env::Default() {
pthread_once(&once, InitDefaultEnv);
return default_env;
}
std::unique_ptr<Env> MakeFuchsiaEnv(int root_fd) {
return std::make_unique<FuchsiaEnv>(root_fd);
}
std::unique_ptr<Env> MakeFuchsiaEnv(
fidl::InterfaceHandle<fuchsia::io::Directory> root_dir) {
return std::make_unique<FuchsiaEnv>(std::move(root_dir));
}
} // namespace leveldb