A while ago I wrote for my own needs a (non-reentrant) read-write lock for kotlin's coroutines. I got no reactions on the relevant github thread. I think it's a nice code, so I thought I'd share it here for some feedback.
This post contains the full code already, but here's a gist including a small test file: https://gist.github.com/code-hunger/f7bc1682cf74f3dc58807546a9775999.
Intro
Java's read-write locks do not work with kotlin coroutines, because Java's locks lock on threads, and:
- a coroutine may start in one thread and continue in another (so Java's locks will stop execution here; unwanted)
- on the same thread, (pieces of) multiple coroutines might be executed one after another, so Java's locks won't prevent the next coroutine running on the same thread (unwanted, again).
Requirements
We need an object providing methods
suspend fun <R> write(f: suspend () -> R): R
suspend fun <R> read(f: suspend () -> R): R
so that a read suspend function is executed only if (and as soon as) no writers are currently running (even a suspended writer is considered 'running' here), and a writing suspend function is executed only if (and as soon as) no other (nor read, nor write) function is currently being executed. (so this is a non-reentrant lock. I don't even know if a reentrant lock is possible with coroutines, but I'm not that interested in this now)
I've aimed to enable the user to control what types of players exists (e.g. Readers and Writers), how they are related (e.g. "infinitely Readers allowed, single Writer only", or e.g. "a limited number of readers only"), and how it is decided whom to wake up (the user knows priority better).
Impl
The skeleton is provided by MultiLock
, which does all the actual locking and waiting. It keeps track of the mutex, the State
and the current waiters
.
The choices on whether a lock is successful or not are provided by the user who need not concern
with locking (I cheat a bit here, because I let the user do the waking up).
The waiters use the Queue
interface so that the user can substitute any priority behaviour they
want.
A Waiter
is constructed from the current coroutine and only knows its type (W
) and how to wake up. It caries no
other information.
A LockStrategy
is a user-supplied pair (acquire, release) behaviour of pure (side-effect-free)
functions that only say if the acquire is successful (Allow
vs Block
) and the next lock state
after a lock release (release
can also operate on the mutable Queue
of waiters, but is again
allowed to do so in a non-threadsafe way).
Before the full MultiLock impl, let me show how the lock
function is implemented first (here acquire
is user-supplied, for the particular
waiter type):
when (val acquireDecision = acquire(state)) {
is Allow ->
state = acquireDecision.newState
is Block -> suspendCoroutine { continuation ->
waiters.add(Waiter(acquireDecision.waiterType, continuation))
// we suspend to let others finish
// so we have to release the lock now
mutex.unlock(owner)
wasUnlocked = true
}
}
so if the acquire is successfull, the lock switches to the new state, and control flow continues
normally. Otherwise, the coroutine is suspended: control flow stops here, the wakeup hook is stored
in the waiters queue, and the lock
implementation does not terminate until woken up.
class MultiLock<S, W>(
private var state: S,
private val waiters: Queue<Waiter<W>> // we take ownership here, it's best if constructed in-place
) {
class Waiter<W>(
val waiterType: W,
val awake: () -> Unit
) {
constructor(waiterType: W, continuation: Continuation<Unit>) :
this(waiterType, awake = { continuation.resume(Unit) })
}
interface LockStrategy<S, W> {
fun acquire(state: S): AcquireDecision<S, W>
fun release(state: S, waiters: Queue<Waiter<W>>): S
}
sealed interface AcquireDecision<S, W> {
data class Allow<S, W>(val newState: S) : AcquireDecision<S, W>
data class Block<S, W>(val waiterType: W) : AcquireDecision<S, W>
}
private val mutex = Mutex()
suspend fun <R> withLock(
lockStrategy: LockStrategy<S, W>,
f: suspend () -> R
): R {
lockStrategy.lock()
try {
return f()
} finally {
lockStrategy.unlock()
}
}
private suspend fun LockStrategy<S, W>.lock() {
val owner = Object()
var wasUnlocked = false
mutex.lock(owner)
try {
when (val acquireDecision = acquire(state)) {
is Allow ->
state = acquireDecision.newState
is Block -> suspendCoroutine { continuation ->
waiters.add(Waiter(acquireDecision.waiterType, continuation))
// we suspend to let others finish
// so we have to release the lock now
mutex.unlock(owner)
wasUnlocked = true
}
}
} finally {
if (!wasUnlocked) {
mutex.unlock(owner)
}
}
}
private suspend fun LockStrategy<S, W>.unlock() = mutex.withLock { state = release(state, waiters) }
}
Now a particular read-write lock logic can be realized as follows. We use a linked list for the waiters, a Read/Write/Free state.
class ReadWriteLock {
private val multiLock = MultiLock<State, WaiterType>(State.Free, LinkedList())
sealed interface State {
data class Read(var size: Int) : State
data object Write : State
data object Free : State
}
enum class WaiterType { Reader, Writer }
suspend fun <R> write(f: suspend () -> R): R = multiLock.withLock(WriteLockStrategy, f)
suspend fun <R> read(f: suspend () -> R): R = multiLock.withLock(ReaderLockStrategy, f)
}
To stress how independent the State-transition and waiter-manipulation logic is, I laid out the Read/Write lock strategies as standalone objects:
private object WriteLockStrategy : LockStrategy<State, WaiterType> {
override fun acquire(state: State): AcquireDecision<State, WaiterType> =
when (state) {
State.Free -> Allow(State.Write)
else -> Block(WaiterType.Writer)
}
override fun release(state: State, waiters: Queue<MultiLock.Waiter<WaiterType>>): State {
if (waiters.peek()?.waiterType == WaiterType.Writer) {
waiters
.poll()
.awake()
return State.Write
}
var readCount = 0
while (waiters.peek()?.waiterType == WaiterType.Reader) {
waiters
.poll()
.awake()
readCount++
}
return if (readCount > 0)
State.Read(readCount)
else
State.Free
}
}
private object ReaderLockStrategy : LockStrategy<State, WaiterType> {
override fun acquire(state: State): AcquireDecision<State, WaiterType> =
when (state) {
State.Free -> Allow(State.Read(1))
is State.Read -> Allow(State.Read(state.size + 1))
else -> Block(WaiterType.Reader)
}
override fun release(state: State, waiters: Queue<MultiLock.Waiter<WaiterType>>): State {
check(state is State.Read)
if (state.size > 1) {
return State.Read(state.size - 1)
}
if (waiters.isNotEmpty()) {
waiters.poll()
.apply { check(waiterType == WaiterType.Writer) }
.awake()
return State.Write
}
return State.Free
}
}
Intended extensibility
By supplying different such acquire/release pairs (and different queue implementations), we can achieve any locking behaviour we want. And this is (should be) very easy now, as the actual locking is all hidden in the MultiLock
.
E.g. we can track the number of the readers in the State
, and return Block
when a threshold is hit so that only a fixed number of readers is allowed at a time.
Or we can make Writers
have higher precedence and start blocking all new reads as soon as a Writer starts
waiting, to avoid readers never giving the lock up in case of a constant reader inflow. And all of this implemented as pure single-thread data type manipulation.
Or we can make other types of waiters to achieve any custom logic we might need. If an application has different areas A,B,C and some operations hit 1 or 2 of these, we could design a state/waiter type which allows a writer for {A,B} to run in parallel with readers of C. Or whatever we can think of.
Here's the code again, with a small naive test file: https://gist.github.com/code-hunger/f7bc1682cf74f3dc58807546a9775999
Questions
I'm humbly asking for feedback on:
- (most importantly) correctness of ReadWriteLock. Unless the lock works correctly, it's trash.
- design. I haven't seen such a parametric lock design that allows such level of user control. I've never needed anything more than the standard read-write lock, but it was provoking to realize that the 1-writer/many-readers situation is only a particularity of the common use case, and need not be tied to the locking logic.
release
should not be able toresume
waiters like it does now. For example, on a writer release, the resumed read continuations might run before the new (read) state returned fromrelease
is set ("undispatched" resumption), so we'd have readers running while the state is still 'Write'. I couldn't come up with a scenario in which two real writer+reader are running simultaneously (the example above only has a broken 'state' value but no actual reader+writer run together), despite chatgpt's trying to convince me such one exists. But still I have to fix that. \$\endgroup\$continuation.resume()
call and pass it to the continuation's exception handler. \$\endgroup\$