1
\$\begingroup\$

A while ago I wrote for my own needs a (non-reentrant) read-write lock for kotlin's coroutines. I got no reactions on the relevant github thread. I think it's a nice code, so I thought I'd share it here for some feedback.

This post contains the full code already, but here's a gist including a small test file: https://gist.github.com/code-hunger/f7bc1682cf74f3dc58807546a9775999.

Intro

Java's read-write locks do not work with kotlin coroutines, because Java's locks lock on threads, and:

  1. a coroutine may start in one thread and continue in another (so Java's locks will stop execution here; unwanted)
  2. on the same thread, (pieces of) multiple coroutines might be executed one after another, so Java's locks won't prevent the next coroutine running on the same thread (unwanted, again).

Requirements

We need an object providing methods

suspend fun <R> write(f: suspend () -> R): R
suspend fun <R> read(f: suspend () -> R): R

so that a read suspend function is executed only if (and as soon as) no writers are currently running (even a suspended writer is considered 'running' here), and a writing suspend function is executed only if (and as soon as) no other (nor read, nor write) function is currently being executed. (so this is a non-reentrant lock. I don't even know if a reentrant lock is possible with coroutines, but I'm not that interested in this now)

I've aimed to enable the user to control what types of players exists (e.g. Readers and Writers), how they are related (e.g. "infinitely Readers allowed, single Writer only", or e.g. "a limited number of readers only"), and how it is decided whom to wake up (the user knows priority better).

Impl

The skeleton is provided by MultiLock, which does all the actual locking and waiting. It keeps track of the mutex, the State and the current waiters. The choices on whether a lock is successful or not are provided by the user who need not concern with locking (I cheat a bit here, because I let the user do the waking up).

The waiters use the Queue interface so that the user can substitute any priority behaviour they want.

A Waiter is constructed from the current coroutine and only knows its type (W) and how to wake up. It caries no other information.

A LockStrategy is a user-supplied pair (acquire, release) behaviour of pure (side-effect-free) functions that only say if the acquire is successful (Allow vs Block) and the next lock state after a lock release (release can also operate on the mutable Queue of waiters, but is again allowed to do so in a non-threadsafe way).

Before the full MultiLock impl, let me show how the lock function is implemented first (here acquire is user-supplied, for the particular waiter type):

when (val acquireDecision = acquire(state)) {
 is Allow ->
 state = acquireDecision.newState
 is Block -> suspendCoroutine { continuation ->
 waiters.add(Waiter(acquireDecision.waiterType, continuation))
 // we suspend to let others finish
 // so we have to release the lock now
 mutex.unlock(owner)
 wasUnlocked = true
 }
}

so if the acquire is successfull, the lock switches to the new state, and control flow continues normally. Otherwise, the coroutine is suspended: control flow stops here, the wakeup hook is stored in the waiters queue, and the lock implementation does not terminate until woken up.

class MultiLock<S, W>(
 private var state: S,
 private val waiters: Queue<Waiter<W>> // we take ownership here, it's best if constructed in-place
) {
 class Waiter<W>(
 val waiterType: W,
 val awake: () -> Unit
 ) {
 constructor(waiterType: W, continuation: Continuation<Unit>) :
 this(waiterType, awake = { continuation.resume(Unit) })
 }
 interface LockStrategy<S, W> {
 fun acquire(state: S): AcquireDecision<S, W>
 fun release(state: S, waiters: Queue<Waiter<W>>): S
 }
 sealed interface AcquireDecision<S, W> {
 data class Allow<S, W>(val newState: S) : AcquireDecision<S, W>
 data class Block<S, W>(val waiterType: W) : AcquireDecision<S, W>
 }
 private val mutex = Mutex()
 suspend fun <R> withLock(
 lockStrategy: LockStrategy<S, W>,
 f: suspend () -> R
 ): R {
 lockStrategy.lock()
 try {
 return f()
 } finally {
 lockStrategy.unlock()
 }
 }
 private suspend fun LockStrategy<S, W>.lock() {
 val owner = Object()
 var wasUnlocked = false
 mutex.lock(owner)
 try {
 when (val acquireDecision = acquire(state)) {
 is Allow ->
 state = acquireDecision.newState
 is Block -> suspendCoroutine { continuation ->
 waiters.add(Waiter(acquireDecision.waiterType, continuation))
 // we suspend to let others finish
 // so we have to release the lock now
 mutex.unlock(owner)
 wasUnlocked = true
 }
 }
 } finally {
 if (!wasUnlocked) {
 mutex.unlock(owner)
 }
 }
 }
 private suspend fun LockStrategy<S, W>.unlock() = mutex.withLock { state = release(state, waiters) }
}

Now a particular read-write lock logic can be realized as follows. We use a linked list for the waiters, a Read/Write/Free state.

class ReadWriteLock {
 private val multiLock = MultiLock<State, WaiterType>(State.Free, LinkedList())
 sealed interface State {
 data class Read(var size: Int) : State
 data object Write : State
 data object Free : State
 }
 enum class WaiterType { Reader, Writer }
 suspend fun <R> write(f: suspend () -> R): R = multiLock.withLock(WriteLockStrategy, f)
 suspend fun <R> read(f: suspend () -> R): R = multiLock.withLock(ReaderLockStrategy, f)
}

To stress how independent the State-transition and waiter-manipulation logic is, I laid out the Read/Write lock strategies as standalone objects:

private object WriteLockStrategy : LockStrategy<State, WaiterType> {
 override fun acquire(state: State): AcquireDecision<State, WaiterType> =
 when (state) {
 State.Free -> Allow(State.Write)
 else -> Block(WaiterType.Writer)
 }
 override fun release(state: State, waiters: Queue<MultiLock.Waiter<WaiterType>>): State {
 if (waiters.peek()?.waiterType == WaiterType.Writer) {
 waiters
 .poll()
 .awake()
 return State.Write
 }
 var readCount = 0
 while (waiters.peek()?.waiterType == WaiterType.Reader) {
 waiters
 .poll()
 .awake()
 readCount++
 }
 return if (readCount > 0)
 State.Read(readCount)
 else
 State.Free
 }
}
private object ReaderLockStrategy : LockStrategy<State, WaiterType> {
 override fun acquire(state: State): AcquireDecision<State, WaiterType> =
 when (state) {
 State.Free -> Allow(State.Read(1))
 is State.Read -> Allow(State.Read(state.size + 1))
 else -> Block(WaiterType.Reader)
 }
 override fun release(state: State, waiters: Queue<MultiLock.Waiter<WaiterType>>): State {
 check(state is State.Read)
 if (state.size > 1) {
 return State.Read(state.size - 1)
 }
 if (waiters.isNotEmpty()) {
 waiters.poll()
 .apply { check(waiterType == WaiterType.Writer) }
 .awake()
 return State.Write
 }
 return State.Free
 }
}

Intended extensibility

By supplying different such acquire/release pairs (and different queue implementations), we can achieve any locking behaviour we want. And this is (should be) very easy now, as the actual locking is all hidden in the MultiLock.

E.g. we can track the number of the readers in the State, and return Block when a threshold is hit so that only a fixed number of readers is allowed at a time.

Or we can make Writers have higher precedence and start blocking all new reads as soon as a Writer starts waiting, to avoid readers never giving the lock up in case of a constant reader inflow. And all of this implemented as pure single-thread data type manipulation.

Or we can make other types of waiters to achieve any custom logic we might need. If an application has different areas A,B,C and some operations hit 1 or 2 of these, we could design a state/waiter type which allows a writer for {A,B} to run in parallel with readers of C. Or whatever we can think of.

Here's the code again, with a small naive test file: https://gist.github.com/code-hunger/f7bc1682cf74f3dc58807546a9775999

Questions

I'm humbly asking for feedback on:

  1. (most importantly) correctness of ReadWriteLock. Unless the lock works correctly, it's trash.
  2. design. I haven't seen such a parametric lock design that allows such level of user control. I've never needed anything more than the standard read-write lock, but it was provoking to realize that the 1-writer/many-readers situation is only a particularity of the common use case, and need not be tied to the locking logic.
asked Aug 27 at 13:31
\$\endgroup\$
3
  • \$\begingroup\$ I asked ChatGpt for a review, and it pointed out I have completely overlooked cancelation safety (didn't know about this thing): chatgpt.com/share/68af12cb-fcf8-800d-a1e8-e3af158da6cc. Fixing this seems to require changes in multiple parts of the code though. \$\endgroup\$ Commented Aug 27 at 14:18
  • \$\begingroup\$ ... and also the release should not be able to resume waiters like it does now. For example, on a writer release, the resumed read continuations might run before the new (read) state returned from release is set ("undispatched" resumption), so we'd have readers running while the state is still 'Write'. I couldn't come up with a scenario in which two real writer+reader are running simultaneously (the example above only has a broken 'state' value but no actual reader+writer run together), despite chatgpt's trying to convince me such one exists. But still I have to fix that. \$\endgroup\$ Commented Aug 27 at 16:05
  • \$\begingroup\$ ugh and that raises one more problem: exception-safety when resuming multiple readers. Seems we have to catch Throwable from the continuation.resume() call and pass it to the continuation's exception handler. \$\endgroup\$ Commented Aug 27 at 16:23

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.