Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 77973c6

Browse files
setup oracle correctly in stream writer
1 parent fe5b808 commit 77973c6

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

‎levels.go‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,8 +1573,8 @@ func (s *levelsController) close() error {
15731573
}
15741574

15751575
// get searches for a given key in all the levels of the LSM tree. It returns
1576-
// key version <= the expected version (maxVs). If not found, it returns an empty
1577-
// y.ValueStruct.
1576+
// key version <= the expected version (version in key). If not found,
1577+
// it returns an empty y.ValueStruct.
15781578
func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
15791579
y.ValueStruct, error) {
15801580
if s.kv.IsClosed() {

‎stream_writer.go‎

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,11 @@ func (sw *StreamWriter) Flush() error {
282282
if sw.db.orc != nil {
283283
sw.db.orc.Stop()
284284
}
285+
286+
if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
287+
sw.maxVersion = curMax
288+
}
289+
285290
sw.db.orc = newOracle(sw.db.opt)
286291
sw.db.orc.nextTxnTs = sw.maxVersion
287292
sw.db.orc.txnMark.Done(sw.maxVersion)

‎stream_writer_test.go‎

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,4 +750,51 @@ func TestStreamWriterIncremental(t *testing.T) {
750750
require.NoError(t, err)
751751
})
752752
})
753+
754+
t.Run("multiple incremental with older data first", func(t *testing.T) {
755+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
756+
buf := z.NewBuffer(10<<20, "test")
757+
defer func() { require.NoError(t, buf.Release()) }()
758+
KVToBuffer(&pb.KV{
759+
Key: []byte("a1"),
760+
Value: []byte("val1"),
761+
Version: 11,
762+
}, buf)
763+
sw := db.NewStreamWriter()
764+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
765+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
766+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
767+
768+
buf = z.NewBuffer(10<<20, "test")
769+
defer func() { require.NoError(t, buf.Release()) }()
770+
KVToBuffer(&pb.KV{
771+
Key: []byte("a2"),
772+
Value: []byte("val2"),
773+
Version: 9,
774+
}, buf)
775+
sw = db.NewStreamWriter()
776+
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
777+
require.NoError(t, sw.Write(buf), "sw.Write() failed")
778+
require.NoError(t, sw.Flush(), "sw.Flush() failed")
779+
780+
// This will move the maxTs to 10 (earlier, without the fix)
781+
require.NoError(t, db.Update(func(txn *Txn) error {
782+
return txn.Set([]byte("a1"), []byte("val3"))
783+
}))
784+
// This will move the maxTs to 11 (earliler, without the fix)
785+
require.NoError(t, db.Update(func(txn *Txn) error {
786+
return txn.Set([]byte("a3"), []byte("val4"))
787+
}))
788+
789+
// And now, the first write with val1 will resurface (without the fix)
790+
require.NoError(t, db.View(func(txn *Txn) error {
791+
item, err := txn.Get([]byte("a1"))
792+
require.NoError(t, err)
793+
val, err := item.ValueCopy(nil)
794+
require.NoError(t, err)
795+
require.Equal(t, "val3", string(val))
796+
return nil
797+
}))
798+
})
799+
})
753800
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /