Fix corruption bug found and analyzed by dhruba@gmail.com

https://groups.google.com/d/msg/leveldb/Kc9JxuIUu5A/9P0N9RL4ar8J
diff --git a/Makefile b/Makefile
index fef085b..42c4952 100644
--- a/Makefile
+++ b/Makefile
@@ -69,7 +69,7 @@
 else
 # Update db.h if you change these.
 SHARED_MAJOR = 1
-SHARED_MINOR = 8
+SHARED_MINOR = 9
 SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
 SHARED2 = $(SHARED1).$(SHARED_MAJOR)
 SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
diff --git a/db/db_test.cc b/db/db_test.cc
index 74abd13..684ea3b 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -59,6 +59,12 @@
   // Simulate non-writable file system while this pointer is non-NULL
   port::AtomicPointer non_writable_;
 
+  // Force sync of manifest files to fail while this pointer is non-NULL
+  port::AtomicPointer manifest_sync_error_;
+
+  // Force write to manifest files to fail while this pointer is non-NULL
+  port::AtomicPointer manifest_write_error_;
+
   bool count_random_reads_;
   AtomicCounter random_read_counter_;
 
@@ -69,6 +75,8 @@
     no_space_.Release_Store(NULL);
     non_writable_.Release_Store(NULL);
     count_random_reads_ = false;
+    manifest_sync_error_.Release_Store(NULL);
+    manifest_write_error_.Release_Store(NULL);
   }
 
   Status NewWritableFile(const std::string& f, WritableFile** r) {
@@ -100,6 +108,30 @@
         return base_->Sync();
       }
     };
+    class ManifestFile : public WritableFile {
+     private:
+      SpecialEnv* env_;
+      WritableFile* base_;
+     public:
+      ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
+      ~ManifestFile() { delete base_; }
+      Status Append(const Slice& data) {
+        if (env_->manifest_write_error_.Acquire_Load() != NULL) {
+          return Status::IOError("simulated writer error");
+        } else {
+          return base_->Append(data);
+        }
+      }
+      Status Close() { return base_->Close(); }
+      Status Flush() { return base_->Flush(); }
+      Status Sync() {
+        if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
+          return Status::IOError("simulated sync error");
+        } else {
+          return base_->Sync();
+        }
+      }
+    };
 
     if (non_writable_.Acquire_Load() != NULL) {
       return Status::IOError("simulated write error");
@@ -109,6 +141,8 @@
     if (s.ok()) {
       if (strstr(f.c_str(), ".sst") != NULL) {
         *r = new SSTableFile(this, *r);
+      } else if (strstr(f.c_str(), "MANIFEST") != NULL) {
+        *r = new ManifestFile(this, *r);
       }
     }
     return s;
@@ -1492,6 +1526,47 @@
   env_->non_writable_.Release_Store(NULL);
 }
 
+TEST(DBTest, ManifestWriteError) {
+  // Test for the following problem:
+  // (a) Compaction produces file F
+  // (b) Log record containing F is written to MANIFEST file, but Sync() fails
+  // (c) GC deletes F
+  // (d) After reopening DB, reads fail since deleted F is named in log record
+
+  // We iterate twice.  In the second iteration, everything is the
+  // same except the log record never makes it to the MANIFEST file.
+  for (int iter = 0; iter < 2; iter++) {
+    port::AtomicPointer* error_type = (iter == 0)
+        ? &env_->manifest_sync_error_
+        : &env_->manifest_write_error_;
+
+    // Insert foo=>bar mapping
+    Options options = CurrentOptions();
+    options.env = env_;
+    options.create_if_missing = true;
+    options.error_if_exists = false;
+    DestroyAndReopen(&options);
+    ASSERT_OK(Put("foo", "bar"));
+    ASSERT_EQ("bar", Get("foo"));
+
+    // Memtable compaction (will succeed)
+    dbfull()->TEST_CompactMemTable();
+    ASSERT_EQ("bar", Get("foo"));
+    const int last = config::kMaxMemCompactLevel;
+    ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo=>bar is now in last level
+
+    // Merging compaction (will fail)
+    error_type->Release_Store(env_);
+    dbfull()->TEST_CompactRange(last, NULL, NULL);  // Should fail
+    ASSERT_EQ("bar", Get("foo"));
+
+    // Recovery: should not lose data
+    error_type->Release_Store(NULL);
+    Reopen(&options);
+    ASSERT_EQ("bar", Get("foo"));
+  }
+}
+
 TEST(DBTest, FilesDeletedAfterCompaction) {
   ASSERT_OK(Put("foo", "v2"));
   Compact("a", "z");
diff --git a/db/version_set.cc b/db/version_set.cc
index bdd4a57..7d0a5de 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -786,12 +786,23 @@
       if (s.ok()) {
         s = descriptor_file_->Sync();
       }
+      if (!s.ok()) {
+        Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
+        if (ManifestContains(record)) {
+          Log(options_->info_log,
+              "MANIFEST contains log record despite error; advancing to new "
+              "version to prevent mismatch between in-memory and logged state");
+          s = Status::OK();
+        }
+      }
     }
 
     // If we just created a new descriptor file, install it by writing a
     // new CURRENT file that points to it.
     if (s.ok() && !new_manifest_file.empty()) {
       s = SetCurrentFile(env_, dbname_, manifest_file_number_);
+      // No need to double-check MANIFEST in case of error since it
+      // will be discarded below.
     }
 
     mu->Lock();
@@ -1025,6 +1036,31 @@
   return scratch->buffer;
 }
 
+// Return true iff the manifest contains the specified record.
+bool VersionSet::ManifestContains(const std::string& record) const {
+  std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
+  Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
+  SequentialFile* file = NULL;
+  Status s = env_->NewSequentialFile(fname, &file);
+  if (!s.ok()) {
+    Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
+    return false;
+  }
+  log::Reader reader(file, NULL, true/*checksum*/, 0);
+  Slice r;
+  std::string scratch;
+  bool result = false;
+  while (reader.ReadRecord(&r, &scratch)) {
+    if (r == Slice(record)) {
+      result = true;
+      break;
+    }
+  }
+  delete file;
+  Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
+  return result;
+}
+
 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
   uint64_t result = 0;
   for (int level = 0; level < config::kNumLevels; level++) {
diff --git a/db/version_set.h b/db/version_set.h
index 792899b..9d084fd 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -277,6 +277,8 @@
 
   void AppendVersion(Version* v);
 
+  bool ManifestContains(const std::string& record) const;
+
   Env* const env_;
   const std::string dbname_;
   const Options* const options_;
diff --git a/include/leveldb/db.h b/include/leveldb/db.h
index f1e70a0..29d3674 100644
--- a/include/leveldb/db.h
+++ b/include/leveldb/db.h
@@ -14,7 +14,7 @@
 
 // Update Makefile if you change these
 static const int kMajorVersion = 1;
-static const int kMinorVersion = 8;
+static const int kMinorVersion = 9;
 
 struct Options;
 struct ReadOptions;