Fix issue 474: a race between the f*_unlocked() STDIO calls in
env_posix.cc and concurrent application calls to fflush(NULL).

The fix is to avoid using stdio in env_posix.cc but add our own
buffering where we need it.

Added a test to reproduce the bug.

Added a test for Env reads/writes.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=170738066
diff --git a/db/db_test.cc b/db/db_test.cc
index a0b08bc..edc3916 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -25,6 +25,13 @@
   return r;
 }
 
+static std::string RandomKey(Random* rnd) {
+  int len = (rnd->OneIn(3)
+             ? 1                // Short sometimes to encourage collisions
+             : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
+  return test::RandomKey(rnd, len);
+}
+
 namespace {
 class AtomicCounter {
  private:
@@ -1394,6 +1401,15 @@
   ASSERT_EQ("(->)(c->cv)", Contents());
 }
 
+TEST(DBTest, Fflush_Issue474) {
+  static const int kNum = 100000;
+  Random rnd(test::RandomSeed());
+  for (int i = 0; i < kNum; i++) {
+    fflush(NULL);
+    ASSERT_OK(Put(RandomKey(&rnd), RandomString(&rnd, 100)));
+  }
+}
+
 TEST(DBTest, ComparatorCheck) {
   class NewComparator : public Comparator {
    public:
@@ -1959,13 +1975,6 @@
   KVMap map_;
 };
 
-static std::string RandomKey(Random* rnd) {
-  int len = (rnd->OneIn(3)
-             ? 1                // Short sometimes to encourage collisions
-             : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
-  return test::RandomKey(rnd, len);
-}
-
 static bool CompareIterators(int step,
                              DB* model,
                              DB* db,
diff --git a/util/env_posix.cc b/util/env_posix.cc
index b21f515..1a88802 100644
--- a/util/env_posix.cc
+++ b/util/env_posix.cc
@@ -34,6 +34,8 @@
 static int open_read_only_file_limit = -1;
 static int mmap_limit = -1;
 
+static const size_t kBufSize = 65536;
+
 static Status PosixError(const std::string& context, int err_number) {
   if (err_number == ENOENT) {
     return Status::NotFound(context, strerror(err_number));
@@ -96,30 +98,32 @@
 class PosixSequentialFile: public SequentialFile {
  private:
   std::string filename_;
-  FILE* file_;
+  int fd_;
 
  public:
-  PosixSequentialFile(const std::string& fname, FILE* f)
-      : filename_(fname), file_(f) { }
-  virtual ~PosixSequentialFile() { fclose(file_); }
+  PosixSequentialFile(const std::string& fname, int fd)
+      : filename_(fname), fd_(fd) {}
+  virtual ~PosixSequentialFile() { close(fd_); }
 
   virtual Status Read(size_t n, Slice* result, char* scratch) {
     Status s;
-    size_t r = fread_unlocked(scratch, 1, n, file_);
-    *result = Slice(scratch, r);
-    if (r < n) {
-      if (feof(file_)) {
-        // We leave status as ok if we hit the end of the file
-      } else {
-        // A partial read with an error: return a non-ok status
+    while (true) {
+      ssize_t r = read(fd_, scratch, n);
+      if (r < 0) {
+        if (errno == EINTR) {
+          continue;  // Retry
+        }
         s = PosixError(filename_, errno);
+        break;
       }
+      *result = Slice(scratch, r);
+      break;
     }
     return s;
   }
 
   virtual Status Skip(uint64_t n) {
-    if (fseek(file_, n, SEEK_CUR)) {
+    if (lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
       return PosixError(filename_, errno);
     }
     return Status::OK();
@@ -213,42 +217,64 @@
 
 class PosixWritableFile : public WritableFile {
  private:
+  // buf_[0, pos_-1] contains data to be written to fd_.
   std::string filename_;
-  FILE* file_;
+  int fd_;
+  char buf_[kBufSize];
+  size_t pos_;
 
  public:
-  PosixWritableFile(const std::string& fname, FILE* f)
-      : filename_(fname), file_(f) { }
+  PosixWritableFile(const std::string& fname, int fd)
+      : filename_(fname), fd_(fd), pos_(0) { }
 
   ~PosixWritableFile() {
-    if (file_ != NULL) {
+    if (fd_ >= 0) {
       // Ignoring any potential errors
-      fclose(file_);
+      FlushBuffered();
     }
   }
 
   virtual Status Append(const Slice& data) {
-    size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
-    if (r != data.size()) {
-      return PosixError(filename_, errno);
+    size_t n = data.size();
+    const char* p = data.data();
+
+    // Fit as much as possible into buffer.
+    size_t copy = std::min(n, kBufSize - pos_);
+    memcpy(buf_ + pos_, p, copy);
+    p += copy;
+    n -= copy;
+    pos_ += copy;
+    if (n == 0) {
+      return Status::OK();
     }
-    return Status::OK();
+
+    // Can't fit in buffer, so need to do at least one write.
+    Status s = FlushBuffered();
+    if (!s.ok()) {
+      return s;
+    }
+
+    // Small writes go to buffer, large writes are written directly.
+    if (n < kBufSize) {
+      memcpy(buf_, p, n);
+      pos_ = n;
+      return Status::OK();
+    }
+    return WriteRaw(p, n);
   }
 
   virtual Status Close() {
-    Status result;
-    if (fclose(file_) != 0) {
+    Status result = FlushBuffered();
+    const int r = close(fd_);
+    if (r < 0 && result.ok()) {
       result = PosixError(filename_, errno);
     }
-    file_ = NULL;
+    fd_ = -1;
     return result;
   }
 
   virtual Status Flush() {
-    if (fflush_unlocked(file_) != 0) {
-      return PosixError(filename_, errno);
-    }
-    return Status::OK();
+    return FlushBuffered();
   }
 
   Status SyncDirIfManifest() {
@@ -284,12 +310,36 @@
     if (!s.ok()) {
       return s;
     }
-    if (fflush_unlocked(file_) != 0 ||
-        fdatasync(fileno(file_)) != 0) {
-      s = Status::IOError(filename_, strerror(errno));
+    s = FlushBuffered();
+    if (s.ok()) {
+      if (fdatasync(fd_) != 0) {
+        s = PosixError(filename_, errno);
+      }
     }
     return s;
   }
+
+ private:
+  Status FlushBuffered() {
+    Status s = WriteRaw(buf_, pos_);
+    pos_ = 0;
+    return s;
+  }
+
+  Status WriteRaw(const char* p, size_t n) {
+    while (n > 0) {
+      ssize_t r = write(fd_, p, n);
+      if (r < 0) {
+        if (errno == EINTR) {
+          continue;  // Retry
+        }
+        return PosixError(filename_, errno);
+      }
+      p += r;
+      n -= r;
+    }
+    return Status::OK();
+  }
 };
 
 static int LockOrUnlock(int fd, bool lock) {
@@ -338,12 +388,12 @@
 
   virtual Status NewSequentialFile(const std::string& fname,
                                    SequentialFile** result) {
-    FILE* f = fopen(fname.c_str(), "r");
-    if (f == NULL) {
+    int fd = open(fname.c_str(), O_RDONLY);
+    if (fd < 0) {
       *result = NULL;
       return PosixError(fname, errno);
     } else {
-      *result = new PosixSequentialFile(fname, f);
+      *result = new PosixSequentialFile(fname, fd);
       return Status::OK();
     }
   }
@@ -379,12 +429,12 @@
   virtual Status NewWritableFile(const std::string& fname,
                                  WritableFile** result) {
     Status s;
-    FILE* f = fopen(fname.c_str(), "w");
-    if (f == NULL) {
+    int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
+    if (fd < 0) {
       *result = NULL;
       s = PosixError(fname, errno);
     } else {
-      *result = new PosixWritableFile(fname, f);
+      *result = new PosixWritableFile(fname, fd);
     }
     return s;
   }
@@ -392,12 +442,12 @@
   virtual Status NewAppendableFile(const std::string& fname,
                                    WritableFile** result) {
     Status s;
-    FILE* f = fopen(fname.c_str(), "a");
-    if (f == NULL) {
+    int fd = open(fname.c_str(), O_APPEND | O_RDWR | O_CREAT, 0644);
+    if (fd < 0) {
       *result = NULL;
       s = PosixError(fname, errno);
     } else {
-      *result = new PosixWritableFile(fname, f);
+      *result = new PosixWritableFile(fname, fd);
     }
     return s;
   }
diff --git a/util/env_test.cc b/util/env_test.cc
index 4d8ecc4..53ec923 100644
--- a/util/env_test.cc
+++ b/util/env_test.cc
@@ -6,6 +6,7 @@
 
 #include "port/port.h"
 #include "util/testharness.h"
+#include "util/testutil.h"
 
 namespace leveldb {
 
@@ -27,6 +28,55 @@
   reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
 }
 
+
+TEST(EnvTest, ReadWrite) {
+  Random rnd(test::RandomSeed());
+
+  // Get file to use for testing.
+  std::string test_dir;
+  ASSERT_OK(env_->GetTestDirectory(&test_dir));
+  std::string test_file_name = test_dir + "/open_on_read.txt";
+  WritableFile* wfile_tmp;
+  ASSERT_OK(env_->NewWritableFile(test_file_name, &wfile_tmp));
+  std::unique_ptr<WritableFile> wfile(wfile_tmp);
+
+  // Fill a file with data generated via a sequence of randomly sized writes.
+  static const size_t kDataSize = 10 * 1048576;
+  std::string data;
+  while (data.size() < kDataSize) {
+    int len = rnd.Skewed(18);  // Up to 2^18 - 1, but typically much smaller
+    std::string r;
+    test::RandomString(&rnd, len, &r);
+    ASSERT_OK(wfile->Append(r));
+    data += r;
+    if (rnd.OneIn(10)) {
+      ASSERT_OK(wfile->Flush());
+    }
+  }
+  ASSERT_OK(wfile->Sync());
+  ASSERT_OK(wfile->Close());
+  wfile.reset();
+
+  // Read all data using a sequence of randomly sized reads.
+  SequentialFile* rfile_tmp;
+  ASSERT_OK(env_->NewSequentialFile(test_file_name, &rfile_tmp));
+  std::unique_ptr<SequentialFile> rfile(rfile_tmp);
+  std::string read_result;
+  std::string scratch;
+  while (read_result.size() < data.size()) {
+    int len = std::min<int>(rnd.Skewed(18), data.size() - read_result.size());
+    scratch.resize(std::max(len, 1));  // at least 1 so &scratch[0] is legal
+    Slice read;
+    ASSERT_OK(rfile->Read(len, &read, &scratch[0]));
+    if (len > 0) {
+      ASSERT_GT(read.size(), 0);
+    }
+    ASSERT_LE(read.size(), len);
+    read_result.append(read.data(), read.size());
+  }
+  ASSERT_EQ(read_result, data);
+}
+
 TEST(EnvTest, RunImmediately) {
   port::AtomicPointer called (NULL);
   env_->Schedule(&SetBool, &called);