leveldb: fix graceful close (fixes #148)
diff --git a/leveldb/cache/cache.go b/leveldb/cache/cache.go
index a287d0e..f28e1bd 100644
--- a/leveldb/cache/cache.go
+++ b/leveldb/cache/cache.go
@@ -511,18 +511,12 @@
}
}
-// Close closes the 'cache map' and releases all 'cache node'.
+// Close closes the 'cache map' and forcefully releases all 'cache node'.
func (r *Cache) Close() error {
r.mu.Lock()
if !r.closed {
r.closed = true
- if r.cacher != nil {
- if err := r.cacher.Close(); err != nil {
- return err
- }
- }
-
h := (*mNode)(r.mHead)
h.initBuckets()
@@ -541,10 +535,37 @@
for _, f := range n.onDel {
f()
}
+ n.onDel = nil
}
}
}
r.mu.Unlock()
+
+ // Avoid deadlock.
+ if r.cacher != nil {
+ if err := r.cacher.Close(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
+// unlike Close it doesn't forcefully releases 'cache node'.
+func (r *Cache) CloseWeak() error {
+ r.mu.Lock()
+ if !r.closed {
+ r.closed = true
+ }
+ r.mu.Unlock()
+
+ // Avoid deadlock.
+ if r.cacher != nil {
+ r.cacher.EvictAll()
+ if err := r.cacher.Close(); err != nil {
+ return err
+ }
+ }
return nil
}
diff --git a/leveldb/db_test.go b/leveldb/db_test.go
index 37d08eb..16c0c14 100644
--- a/leveldb/db_test.go
+++ b/leveldb/db_test.go
@@ -2827,3 +2827,82 @@
t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
}
}
+
+func TestDB_GracefulClose(t *testing.T) {
+ runtime.GOMAXPROCS(4)
+ h := newDbHarnessWopt(t, &opt.Options{
+ DisableLargeBatchTransaction: true,
+ Compression: opt.NoCompression,
+ CompactionTableSize: 1 * opt.MiB,
+ WriteBuffer: 1 * opt.MiB,
+ })
+ defer h.close()
+
+ var closeWait sync.WaitGroup
+
+ // During write.
+ n := 0
+ closing := false
+ for i := 0; i < 1000000; i++ {
+ if !closing && h.totalTables() > 3 {
+ t.Logf("close db during write, index=%d", i)
+ closeWait.Add(1)
+ go func() {
+ h.closeDB()
+ closeWait.Done()
+ }()
+ closing = true
+ }
+ if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil {
+ t.Logf("Put error: %s (expected)", err)
+ n = i
+ break
+ }
+ }
+ closeWait.Wait()
+
+ // During read.
+ h.openDB()
+ closing = false
+ for i := 0; i < n; i++ {
+ if !closing && i > n/2 {
+ t.Logf("close db during read, index=%d", i)
+ closeWait.Add(1)
+ go func() {
+ h.closeDB()
+ closeWait.Done()
+ }()
+ closing = true
+ }
+ if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil {
+ t.Logf("Get error: %s (expected)", err)
+ break
+ }
+ }
+ closeWait.Wait()
+
+ // During iterate.
+ h.openDB()
+ closing = false
+ iter := h.db.NewIterator(nil, h.ro)
+ for i := 0; iter.Next(); i++ {
+ if len(iter.Key()) == 0 || len(iter.Value()) == 0 {
+ t.Error("Key or value has zero length")
+ }
+ if !closing {
+ t.Logf("close db during iter, index=%d", i)
+ closeWait.Add(1)
+ go func() {
+ h.closeDB()
+ closeWait.Done()
+ }()
+ closing = true
+ }
+ time.Sleep(time.Millisecond)
+ }
+ if err := iter.Error(); err != nil {
+ t.Logf("Iter error: %s (expected)", err)
+ }
+ iter.Release()
+ closeWait.Wait()
+}
diff --git a/leveldb/table.go b/leveldb/table.go
index 310ba6c..81d18a5 100644
--- a/leveldb/table.go
+++ b/leveldb/table.go
@@ -434,7 +434,7 @@
t.bpool.Close()
t.cache.Close()
if t.bcache != nil {
- t.bcache.Close()
+ t.bcache.CloseWeak()
}
}