6
\$\begingroup\$

There are many debouncer and throttle implementations created using Grand Central Dispatch, and even one built into Combine. I wanted to create one using the new Swift Concurrency feature in Swift 5.5+.

Below is what I put together with help from others:

actor Limiter {
 enum Policy {
 case throttle
 case debounce
 }
 private let policy: Policy
 private let duration: TimeInterval
 private var task: Task<Void, Never>?
 init(policy: Policy, duration: TimeInterval) {
 self.policy = policy
 self.duration = duration
 }
 nonisolated func callAsFunction(task: @escaping () async -> Void) {
 Task {
 switch policy {
 case .throttle:
 await throttle(task: task)
 case .debounce:
 await debounce(task: task)
 }
 }
 }
 private func throttle(task: @escaping () async -> Void) {
 guard self.task?.isCancelled ?? true else { return }
 Task {
 await task()
 }
 self.task = Task {
 try? await sleep()
 self.task?.cancel()
 self.task = nil
 }
 }
 private func debounce(task: @escaping () async -> Void) {
 self.task?.cancel()
 self.task = Task {
 do {
 try await sleep()
 guard !Task.isCancelled else { return }
 await task()
 } catch {
 return
 }
 }
 }
 private func sleep() async throws {
 try await Task.sleep(nanoseconds: UInt64(duration * 1_000_000_000))
 }
}

I created tests to go with it, but testThrottler and testDebouncer are failing randomly which means there's some race condition somewhere or my assumptions in the tests are incorrect:

final class LimiterTests: XCTestCase {
 func testThrottler() async throws {
 // Given
 let promise = expectation(description: "Ensure first task fired")
 let throttler = Limiter(policy: .throttle, duration: 1)
 var value = ""
 var fulfillmentCount = 0
 promise.expectedFulfillmentCount = 2
 func sendToServer(_ input: String) {
 throttler {
 value += input
 // Then
 switch fulfillmentCount {
 case 0:
 XCTAssertEqual(value, "h")
 case 1:
 XCTAssertEqual(value, "hwor")
 default:
 XCTFail()
 }
 promise.fulfill()
 fulfillmentCount += 1
 }
 }
 // When
 sendToServer("h")
 sendToServer("e")
 sendToServer("l")
 sendToServer("l")
 sendToServer("o")
 await sleep(2)
 sendToServer("wor")
 sendToServer("ld")
 wait(for: [promise], timeout: 10)
 }
 func testDebouncer() async throws {
 // Given
 let promise = expectation(description: "Ensure last task fired")
 let limiter = Limiter(policy: .debounce, duration: 1)
 var value = ""
 var fulfillmentCount = 0
 promise.expectedFulfillmentCount = 2
 func sendToServer(_ input: String) {
 limiter {
 value += input
 // Then
 switch fulfillmentCount {
 case 0:
 XCTAssertEqual(value, "o")
 case 1:
 XCTAssertEqual(value, "old")
 default:
 XCTFail()
 }
 promise.fulfill()
 fulfillmentCount += 1
 }
 }
 // When
 sendToServer("h")
 sendToServer("e")
 sendToServer("l")
 sendToServer("l")
 sendToServer("o")
 await sleep(2)
 sendToServer("wor")
 sendToServer("ld")
 wait(for: [promise], timeout: 10)
 }
 func testThrottler2() async throws {
 // Given
 let promise = expectation(description: "Ensure throttle before duration")
 let throttler = Limiter(policy: .throttle, duration: 1)
 var end = Date.now + 1
 promise.expectedFulfillmentCount = 2
 func test() {
 // Then
 XCTAssertLessThan(.now, end)
 promise.fulfill()
 }
 // When
 throttler(task: test)
 throttler(task: test)
 throttler(task: test)
 throttler(task: test)
 throttler(task: test)
 await sleep(2)
 end = .now + 1
 throttler(task: test)
 throttler(task: test)
 throttler(task: test)
 await sleep(2)
 wait(for: [promise], timeout: 10)
 }
 func testDebouncer2() async throws {
 // Given
 let promise = expectation(description: "Ensure debounce after duration")
 let debouncer = Limiter(policy: .debounce, duration: 1)
 var end = Date.now + 1
 promise.expectedFulfillmentCount = 2
 func test() {
 // Then
 XCTAssertGreaterThan(.now, end)
 promise.fulfill()
 }
 // When
 debouncer(task: test)
 debouncer(task: test)
 debouncer(task: test)
 debouncer(task: test)
 debouncer(task: test)
 await sleep(2)
 end = .now + 1
 debouncer(task: test)
 debouncer(task: test)
 debouncer(task: test)
 await sleep(2)
 wait(for: [promise], timeout: 10)
 }
 private func sleep(_ duration: TimeInterval) async {
 await Task.sleep(UInt64(duration * 1_000_000_000))
 }
}

I'm hoping for help in seeing anything I missed in the Limiter implementation, or maybe if there's a better way to do debounce and throttle with Swift Concurrency.

Martin R
24.2k2 gold badges37 silver badges95 bronze badges
asked Nov 2, 2021 at 15:22
\$\endgroup\$

1 Answer 1

8
\$\begingroup\$

The problem is the use of a nonisolated function to initiate an asynchronous update of an actor-isolated property. (I'm surprised the compiler even permits that.) Not only is it misleading, but actors also feature reentrancy, and you introduce all sorts of unintended races.

In the latter part of this answer, below, I offer my suggestions on what I would change in your implementation. But, nowadays, the right solution is to use the debounce and throttle from Apple’s Swift Async Algorithms library.

For example:

import AsyncAlgorithms
final class AsyncAlgorithmsTests: XCTestCase {
 // a stream of individual keystrokes with a pause after the first five characters
 func keystrokes() -> AsyncStream<String> {
 AsyncStream<String> { continuation in
 Task {
 continuation.yield("h")
 continuation.yield("e")
 continuation.yield("l")
 continuation.yield("l")
 continuation.yield("o")
 try await Task.sleep(seconds: 2)
 continuation.yield(",")
 continuation.yield(" ")
 continuation.yield("w")
 continuation.yield("o")
 continuation.yield("r")
 continuation.yield("l")
 continuation.yield("d")
 continuation.finish()
 }
 }
 }
 // A stream of the individual keystrokes aggregated together as strings (as we
 // want to search the whole string, not for individual characters)
 //
 // e.g.
 // h
 // he
 // hel
 // hell
 // hello
 // ...
 //
 // As the `keystrokes` sequence has a pause after the fifth character, this will
 // also pause after "hello" and before "hello,". We can use that pause to test
 // debouncing and throttling
 func strings() -> AsyncStream<String> {
 AsyncStream<String> { continuation in
 Task {
 var string = ""
 for await keystroke in keystrokes() {
 string += keystroke
 continuation.yield(string)
 }
 continuation.finish()
 }
 }
 }
 func testDebounce() async throws {
 let debouncedSequence = strings().debounce(for: .seconds(1))
 // usually you'd just loop through the sequence with something like
 //
 // for await string in debouncedSequence {
 // sendToServer(string)
 // }
 // but I'm just going to directly await the yielded values and test the resulting array
 let result: [String] = await debouncedSequence.reduce(into: []) { 0ドル.append(1ドル) }
 XCTAssertEqual(result, ["hello", "hello, world"])
 }
 func testThrottle() async throws {
 let throttledSequence = strings().throttle(for: .seconds(1))
 let result: [String] = await throttledSequence.reduce(into: []) { 0ドル.append(1ドル) }
 XCTAssertEqual(result, ["h", "hello,"])
 }
}
// MARK: - Task.sleep(seconds:)
extension Task where Success == Never, Failure == Never {
 /// Suspends the current task for at least the given duration
 /// in seconds.
 ///
 /// If the task is canceled before the time ends,
 /// this function throws `CancellationError`.
 ///
 /// This function doesn't block the underlying thread.
 public static func sleep(seconds duration: TimeInterval) async throws {
 try await Task.sleep(nanoseconds: UInt64(duration * .nanosecondsPerSecond))
 }
}
// MARK: - TimeInterval
extension TimeInterval {
 static let nanosecondsPerSecond = TimeInterval(NSEC_PER_SEC)
}

Given that you were soliciting a "code review", if you really wanted to write your own "debounce" and "throttle" and did not want to use Async Algorithms for some reason, my previous answer, below, addresses some observations on your implementation:


You can add an actor-isolated function to Limiter:

func submit(task: @escaping () async -> Void) {
 switch policy {
 case .throttle: throttle(task: task)
 case .debounce: debounce(task: task)
 }
}

Note, I am not using callAsFunction as an actor-isolated function as it looks like (in Xcode 13.2.1, for me, at least) that this causes a segmentation fault in the compiler.

Anyway, you can then modify your tests to use the submit actor-isolated function, e.g.:

// test throttling as user enters "hello, world" into a text field
func testThrottler() async throws {
 // Given
 let promise = expectation(description: "Ensure first task fired")
 let throttler = Limiter(policy: .throttle, duration: 1)
 var fulfillmentCount = 0
 promise.expectedFulfillmentCount = 2
 var value = ""
 func accumulateAndSendToServer(_ input: String) async {
 value += input
 await throttler.submit { [value] in
 // Then
 switch fulfillmentCount {
 case 0: XCTAssertEqual(value, "h")
 case 1: XCTAssertEqual(value, "hello,")
 default: XCTFail()
 }
 promise.fulfill()
 fulfillmentCount += 1
 }
 }
 // When
 await accumulateAndSendToServer("h")
 await accumulateAndSendToServer("e")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("o")
 try await Task.sleep(seconds: 2)
 await accumulateAndSendToServer(",")
 await accumulateAndSendToServer(" ")
 await accumulateAndSendToServer("w")
 await accumulateAndSendToServer("o")
 await accumulateAndSendToServer("r")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("d")
 wait(for: [promise], timeout: 10)
}

As an aside:

  1. In debounce, the test for isCancelled is redundant. The Task.sleep will throw an error if the task was canceled.

  2. As a matter of convention, Apple uses operation for the name of the closure parameters, presumably to avoid confusion with Task instances.

  3. I would change the Task to be a Task<Void, Error>?. Then you can simplify debounce to:

    func debounce(operation: @escaping () async -> Void) {
     task?.cancel()
     task = Task {
     defer { task = nil }
     try await Task.sleep(seconds: duration)
     await operation()
     }
    }
    
  4. When throttling network requests for user input, you generally want to throttle the network requests, but not the accumulation of the user input. So I have pulled the value += input out of the throttler/debouncer. I also use a capture list of [value] to make sure that we avoid race conditions between the accumulation of user input and the network requests.


FWIW, this is my rendition of Limiter:

actor Limiter {
 private let policy: Policy
 private let duration: TimeInterval
 private var task: Task<Void, Error>?
 init(policy: Policy, duration: TimeInterval) {
 self.policy = policy
 self.duration = duration
 }
 func submit(operation: @escaping () async -> Void) {
 switch policy {
 case .throttle: throttle(operation: operation)
 case .debounce: debounce(operation: operation)
 }
 }
}
// MARK: - Limiter.Policy
extension Limiter {
 enum Policy {
 case throttle
 case debounce
 }
}
// MARK: - Private utility methods
private extension Limiter {
 func throttle(operation: @escaping () async -> Void) {
 guard task == nil else { return }
 task = Task {
 defer { task = nil }
 try await Task.sleep(seconds: duration)
 }
 Task {
 await operation()
 }
 }
 func debounce(operation: @escaping () async -> Void) {
 task?.cancel()
 task = Task {
 defer { task = nil }
 try await Task.sleep(seconds: duration)
 await operation()
 }
 }
}

Which uses these extensions

// MARK: - Task.sleep(seconds:)
extension Task where Success == Never, Failure == Never {
 /// Suspends the current task for at least the given duration
 /// in seconds.
 ///
 /// If the task is canceled before the time ends,
 /// this function throws `CancellationError`.
 ///
 /// This function doesn't block the underlying thread.
 public static func sleep(seconds duration: TimeInterval) async throws {
 try await Task.sleep(nanoseconds: UInt64(duration * .nanosecondsPerSecond))
 }
}
// MARK: - TimeInterval
extension TimeInterval {
 static let nanosecondsPerSecond = TimeInterval(NSEC_PER_SEC)
}

And the following tests:

final class LimiterTests: XCTestCase {
 // test throttling as user enters "hello, world" into a text field
 func testThrottler() async throws {
 // Given
 let promise = expectation(description: "Ensure first task fired")
 let throttler = Limiter(policy: .throttle, duration: 1)
 var fulfillmentCount = 0
 promise.expectedFulfillmentCount = 2
 var value = ""
 func accumulateAndSendToServer(_ input: String) async {
 value += input
 await throttler.submit { [value] in
 // Then
 switch fulfillmentCount {
 case 0: XCTAssertEqual(value, "h")
 case 1: XCTAssertEqual(value, "hello,")
 default: XCTFail()
 }
 promise.fulfill()
 fulfillmentCount += 1
 }
 }
 // When
 await accumulateAndSendToServer("h")
 await accumulateAndSendToServer("e")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("o")
 try await Task.sleep(seconds: 2)
 await accumulateAndSendToServer(",")
 await accumulateAndSendToServer(" ")
 await accumulateAndSendToServer("w")
 await accumulateAndSendToServer("o")
 await accumulateAndSendToServer("r")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("d")
 wait(for: [promise], timeout: 10)
 }
 // test debouncing as user enters "hello, world" into a text field
 func testDebouncer() async throws {
 // Given
 let promise = expectation(description: "Ensure last task fired")
 let debouncer = Limiter(policy: .debounce, duration: 1)
 var value = ""
 var fulfillmentCount = 0
 promise.expectedFulfillmentCount = 2
 func accumulateAndSendToServer(_ input: String) async {
 value += input
 await debouncer.submit { [value] in
 // Then
 switch fulfillmentCount {
 case 0: XCTAssertEqual(value, "hello")
 case 1: XCTAssertEqual(value, "hello, world")
 default: XCTFail()
 }
 promise.fulfill()
 fulfillmentCount += 1
 }
 }
 // When
 await accumulateAndSendToServer("h")
 await accumulateAndSendToServer("e")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("o")
 try await Task.sleep(seconds: 2)
 await accumulateAndSendToServer(",")
 await accumulateAndSendToServer(" ")
 await accumulateAndSendToServer("w")
 await accumulateAndSendToServer("o")
 await accumulateAndSendToServer("r")
 await accumulateAndSendToServer("l")
 await accumulateAndSendToServer("d")
 wait(for: [promise], timeout: 10)
 }
 func testThrottler2() async throws {
 // Given
 let promise = expectation(description: "Ensure throttle before duration")
 let throttler = Limiter(policy: .throttle, duration: 1)
 var end = Date.now + 1
 promise.expectedFulfillmentCount = 2
 func test() {
 // Then
 XCTAssertLessThanOrEqual(.now, end)
 promise.fulfill()
 }
 // When
 await throttler.submit(operation: test)
 await throttler.submit(operation: test)
 await throttler.submit(operation: test)
 await throttler.submit(operation: test)
 await throttler.submit(operation: test)
 try await Task.sleep(seconds: 2)
 end = .now + 1
 await throttler.submit(operation: test)
 await throttler.submit(operation: test)
 await throttler.submit(operation: test)
 try await Task.sleep(seconds: 2)
 wait(for: [promise], timeout: 10)
 }
 func testDebouncer2() async throws {
 // Given
 let promise = expectation(description: "Ensure debounce after duration")
 let debouncer = Limiter(policy: .debounce, duration: 1)
 var end = Date.now + 1
 promise.expectedFulfillmentCount = 2
 func test() {
 // Then
 XCTAssertGreaterThanOrEqual(.now, end)
 promise.fulfill()
 }
 // When
 await debouncer.submit(operation: test)
 await debouncer.submit(operation: test)
 await debouncer.submit(operation: test)
 await debouncer.submit(operation: test)
 await debouncer.submit(operation: test)
 try await Task.sleep(seconds: 2)
 end = .now + 1
 await debouncer.submit(operation: test)
 await debouncer.submit(operation: test)
 await debouncer.submit(operation: test)
 try await Task.sleep(seconds: 2)
 wait(for: [promise], timeout: 10)
 }
}
answered Jan 18, 2022 at 4:19
\$\endgroup\$
6
  • \$\begingroup\$ Thx for the review! Looks like bug reported for callAsFunction crash for actors: bugs.swift.org/browse/SR-15361, good catch. Regarding why nonisolated worked for my version is because I call Task inside so it becomes isolated again (I think?). But also doing it this way so the caller doesn't have to be in a concurrency/await state, anyone can call and forget it. Last thing, I still couldn't get the first 2 tests to pass in either of our versions (testThrottler and testDebouncer). I've struggled to understand if something is wrong with the limiter logic or the test itself. \$\endgroup\$ Commented Jan 18, 2022 at 14:13
  • 1
    \$\begingroup\$ I would advise against that that "use non-isolated function to initiate an asynchronous isolated asynchronous task" technique because not only is it very misleading, but actors also feature reentrancy, and you introduce all sorts of unintended races, which is why I excised the pattern from my answer above. \$\endgroup\$ Commented Jan 19, 2022 at 5:27
  • \$\begingroup\$ shouldn't that be case 1: XCTAssertEqual(value, "ld") in testDebouncer() ? \$\endgroup\$ Commented Dec 1, 2022 at 8:44
  • 1
    \$\begingroup\$ @Martin - Because sendToServer was appending to value, and he never resets value, the final result of "old" is the final "o" of "hello" and the "ld" of "world". This is, IMHO, a very counter-intuitive example (you generally want to debounce/throttle network requests, not the user input). I was attempting to mirror the test in the question as closely as possible. But I have revised my answer with a more intuitive handling of debouncing/throttling network requests but not user input. \$\endgroup\$ Commented Dec 1, 2022 at 19:50
  • \$\begingroup\$ Oh! Thanks so much for the detailed explanation @Rob! That makes sense to me now. \$\endgroup\$ Commented Dec 2, 2022 at 13:04

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.