@@ -119,6 +119,8 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
119119 }
120120 switch kv .Kind {
121121 case pb .KV_DATA_KEY :
122+ sw .writeLock .Lock ()
123+ defer sw .writeLock .Unlock ()
122124 y .AssertTrue (len (sw .db .opt .EncryptionKey ) > 0 )
123125 var dk pb.DataKey
124126 if err := proto .Unmarshal (kv .Value , & dk ); err != nil {
@@ -136,6 +138,8 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
136138 }
137139 return nil
138140 case pb .KV_FILE :
141+ sw .writeLock .Lock ()
142+ defer sw .writeLock .Unlock ()
139143 // All tables should be recieved before any of the keys.
140144 if sw .processingKeys {
141145 return errors .New ("Received pb.KV_FILE after pb.KV_KEY" )
@@ -169,22 +173,27 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
169173 // Pass. The following code will handle the keys.
170174 }
171175
176+ sw .writeLock .Lock ()
172177 sw .processingKeys = true
178+ if sw .maxVersion < kv .Version {
179+ sw .maxVersion = kv .Version
180+ }
173181 if sw .prevLevel == 0 {
174- // If prevLevel is 0, that means that we have not written anything yet. Equivalently,
175- // we were virtually writing to the maxLevel+1.
182+ // If prevLevel is 0, that means that we have not written anything yet.
183+ // So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
184+ // so we can set prevLevel to len(levels).
176185 sw .prevLevel = len (sw .db .lc .levels )
177186 }
187+ sw .writeLock .Unlock ()
188+ 178189 var meta , userMeta byte
179190 if len (kv .Meta ) > 0 {
180191 meta = kv .Meta [0 ]
181192 }
182193 if len (kv .UserMeta ) > 0 {
183194 userMeta = kv .UserMeta [0 ]
184195 }
185- if sw .maxVersion < kv .Version {
186- sw .maxVersion = kv .Version
187- }
196+ 188197 e := & Entry {
189198 Key : y .KeyWithTs (kv .Key , kv .Version ),
190199 Value : y .Copy (kv .Value ),
0 commit comments