3
\$\begingroup\$

I implemented an AsyncQueue, whose dequeue() operation returns a Promise. The AsyncLimitedQueue additionally enforces a limit on the number of entries, so that queue() returns a Promise as well.

Both classes use Semaphore, which I also built using Promises. For the sake of brevity, I would like to restrain this question to the queue implementations.

Usage example

Excerpt from https://github.com/ComFreek/async-playground/blob/master/examples/queue-stdio-lines.ts:

async function* readInput() {
 // null signals the end of input
 const queue: IAsyncQueue<string|null> = new AsyncQueue();
 // readline is an NPM package simplifying reading lines from stdio 
 const rl = readline.createInterface({ /* ... */ });
 rl.on('line', (line: string) => queue.queue(line));
 rl.on('close', () => queue.queue(null));
 yield* queue;
}
for await (const line of readInput()) {
 if (line === null) {
 break;
 }
 console.log(line);
}

Questions

  • Have I followed best practices?

  • Are the method contracts clean? Especially, have I employed asynchronous operations and promises where appropriate?

  • Having both interfaces IAsyncQueue and IAsyncLimitedQueue, I intentionally did not create a common superinterface. Was this a right decision?

    The reason is that IAsyncQueue#queue and similar operations are synchronous and therefore non-blocking contrary to IAsyncLimitedQueue#queue, which is asynchronous.
    This difference is also manifested in their types, void vs Promise<void>. Making AsyncQueue#queue returning a Promise as well would violate the principle of least astonishment, namely that it is synchronous despite its return type.

Code

Online: repo, docs

AsyncQueue.ts

import { ISemaphore, Semaphore } from '../semaphore/index';
/**
 * Asynchronous FIFO queue with a Promise-driven dequeue operation.
 *
 * All element values are allowed, especially falsy ones, e.g.
 * false, 0, undefined, null, [], {} are all valid elements which
 * can be queued and dequeued.
 *
 * The {@link AsyncIterable} interface iterates the queue's (future) contents
 * ad infinitum. Users are advised to signal the end by manual insertion of a
 * special value (a so-called poison pill):
 *
 * ```
 * const queue = new AsyncQueue<string|null>();
 * file.on('data', (data) => queue.queue(data));
 * file.on('close', () => queue.queue(null));
 *
 * for await (const data of queue) {
 * if (data === null) {
 * break;
 * }
 * // Otherwise, process data
 * }
 * ```
 */
export interface IAsyncQueue<T> extends AsyncIterable<T> {
 /**
 * Queue an element immediately.
 */
 queue(data: T): void;
 /**
 * Queue all elements of an iterable, e.g. an array or a generator function.
 *
 * @example `queue.queueAll(['myArray', 'of', 'strings'])`
 *
 * @example If one has a generator function f:
 * `function *f(): Iterable<string> { ... }`
 * then you can call `queue.queueAll(f())`.
 */
 queueAll(iterable: Iterable<T>): void;
 /**
 * Queue all elements of an asynchronous iterable, e.g. an asynchronous
 * generator functions.
 *
 * @example Using an asynchronous generator function:
 * ```
 * async function *f(): AsyncIterable<string> {
 * yield* ['Array', 'of', 'strings'];
 * }
 *
 * const previousSize = queue.size();
 * queue.queueAllAsync(f());
 * // ^ We do not await the queueing!
 * // Therefore: queue.size() === previousSize here!
 * // This is indeed guaranteed by JS' execution model. There is
 * // no way queueAllAsync could have queried an element from f()
 * // asynchronously using a promise before this code gives up
 * // the "CPU power" by await or yield.
 *
 * await queue.dequeue(); // 'Array'
 * await queue.dequeue(); // 'of'
 * await queue.dequeue(); // 'strings'
 *
 * // queue.size() === 0 and queue.dequeue() would block
 * // ad infinitum
 *
 * await queue.queueAllAsync(f());
 * // We now await the queueing!
 * // Therefore: queue.size() === 3 here!
 * ```
 *
 * @example AsyncQueue instances are also asynchronous iterables,
 * meaning that you can stack multiple queues together:
 * ```
 * const backgroundQueue: IAsyncQueue<string> = new AsyncQueue();
 * const foregroundQueue: IAsyncQueue<string> = new AsyncQueue();
 *
 * setTimeout(() => backgroundQueue.queue('Hello World!'), 100);
 *
 * foregroundQueue.queueAllAsync(backgroundQueue);
 * const retrievedString = await foregroundQueue.dequeue();
 *
 * // retrievedString === 'Hello World!'
 * ```
 */
 queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
 /**
 * Dequeue an element, waiting for data to be available if necessary.
 *
 * @returns A promise which is fulfilled when an element (as queued by
 * queue()) becomes available.
 * If multiple dequeus() are issued sequentially, it is
 * implementation-defined whether they are fulfilled in the same
 * order or not. However, the data is still retrieved in FIFO
 * fashion, meaning the first fulfilled promise gets the first
 * element, the second fulfilled the second one and so forth.
 */
 dequeue(): Promise<T>;
 /**
 * Dequeue an element if available or throw an exception otherwise.
 *
 * @returns The first element of the queue.
 * @throws An exception if the queue is empty at the time of the call.
 */
 poll(): T;
 /**
 * Return the current size at the moment of the call.
 *
 * Even though code like
 * ```
 * if (queue.size() >= 1) {
 * const element = queue.poll();
 * }
 * ```
 * is technically not wrong (due to JS' execution model), users are
 * advised to avoid this pattern. Instead, users are encouraged to
 *
 * - in cases where waiting for a promise is impossible, to use
 * {@link poll} and catch the exception,
 * - or to use {@link dequeue} with JS' `await` or
 * `queue.dequeue().then(...)`.
 */
 size(): number;
}
export class NoElementError extends Error {
}
export class AsyncQueue<T> implements IAsyncQueue<T> {
 private buffer: T[] = [];
 private elementSem: ISemaphore = new Semaphore(0);
 public queue(data: T): void {
 this.buffer.push(data);
 this.elementSem.free();
 }
 public queueAll(iterable: Iterable<T>): void {
 for (const element of iterable) {
 this.queue(element);
 }
 }
 public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void> {
 for await (const element of iterable) {
 this.queue(element);
 }
 }
 public async dequeue(): Promise<T> {
 await this.elementSem.take();
 try {
 return this.poll();
 }
 catch (err) {
 if (err instanceof NoElementError) {
 throw new Error('AsyncQueue dequeue: poll() threw an exception \
 even though dequeue() waited for its element semaphore to be available via take().');
 }
 else {
 throw err;
 }
 }
 }
 public poll(): T {
 if (this.buffer.length >= 1) {
 const dequeuedElement = this.buffer.shift();
 // Force-cast the element since we know that the buffer contains
 // at least one element and JS' execution model prohibits other
 // interleaving fibers to modify the buffer (=> no race condition).
 //
 // Also, we cannot check for shift() returning undefined as the queue
 // might well contain "undefined" as such.
 return (dequeuedElement as T);
 }
 else {
 throw new NoElementError();
 }
 }
 public size(): number {
 return this.buffer.length;
 }
 public async *[Symbol.asyncIterator](): AsyncIterableIterator<T> {
 while (true) {
 yield this.dequeue();
 }
 }
}

AsyncLimitedQueue:

import { IAsyncQueue, AsyncQueue } from './AsyncQueue';
import { ISemaphore, Semaphore } from '../semaphore/index';
/**
 * Asynchronous entrance-limited FIFO queue with a Promise-driven dequeue operation.
 *
 * Contrary to {@link IAsyncQueue}, the queue operation is Promise-driven as well,
 * e.g. implementations might delay entrance into the queue, e.g. to enforce a
 * limit on the number of elements stored in the queue at the same time, cf.
 * {@link AsyncLimitedQueue}.
 * Other types of entrance limitations are conceivable as well, such as a
 * restriction on the sum of contained elements in case of a number queue.
 *
 * All element values are allowed, especially falsy ones, e.g.
 * false, 0, undefined, null, [], {} are all valid elements which
 * can be queued and dequeued.
 *
 * {@link queue IAsyncLimitedQueue#queue} operations are possibly delayed and
 * executed in implementation-dependenent order.
 *
 * @example Issueing multiple {@link queue} operations without awaiting the
 * previous ones may result in implementation-defined insertion order.
 * ```
 * queue.queue(1);
 * queue.queue(2);
 *
 * await queue.dequeue(); // can be 1 or 2
 * await queue.dequeue(); // can be 1 or 2 as well (the remaining number)
 * ```
 *
 * @example If you would like to retain the order, await the {@link queue}
 * operations, use {@link queueAll IAsyncLimitedQueue#queueAll} or
 * {@link queueAllAsync IAsyncLimitedQueue#queueAllAsync}.
 * ```
 * await queue.queue(1);
 * await queue.queue(2);
 * ```
 * ```
 * queue.queueAll([1, 2]);
 * ```
 *
 * The {@link AsyncIterable} interface iterates the queue's (future) contents
 * ad infinitum. Users are advised to signal the end by manual insertion of a
 * special value (a so-called poison pill), see {@link IAsyncQueue}.
 */
export interface IAsyncLimitedQueue<T> extends AsyncIterable<T> {
 /**
 * Queue an element, waiting for entrance if necessary.
 *
 * @example
 * ```
 * queue.queue(42).then(() => {
 * // 42 is now stored within the queue
 * });
 * ```
 */
 queue(data: T): Promise<void>;
 /**
 * Queue all elements of an iterable, e.g. an array or a generator function.
 * @see IAsyncQueue#queueAll
 */
 queueAll(iterable: Iterable<T>): Promise<void>;
 /**
 * Queue all elements of an asynchronous iterable, e.g. an asynchronous
 * generator functions.
 *
 * @see IAsyncQueue#queueAllAsync
 */
 queueAllAsync(iterable: AsyncIterable<T>): Promise<void>;
 /**
 * Offer an element, only queueing it if entrance is available at the time
 * of the call.
 *
 * @returns True if the element could be inserted right away. False
 * otherwise.
 */
 offer(data: T): boolean;
 /**
 * Offer all elements of an iterable for in-order insertion.
 *
 * @param iterable An iterable whose first (limit - queue.size()) elements
 * will be inserted. Iterables which iterate an infinite
 * number of elements can also be passed and will *not*
 * result in an endless loop.
 *
 * @returns The number of elements, which could be inserted right away.
 * Possibly 0 when the queue was full at the time of the call.
 */
 offerAll(iterable: Iterable<T>): number;
 /**
 * Offer all elements of an asynchronous iterable for in-order insertion.
 *
 * @param iterable An iterable whose elements will be {@link offer}ed
 * in-order for this queue.
 * The method will stop querying and offering further
 * elements upon the first {@link offer} call, which
 * returns `false`.
 * <br>
 * Contrary to {@link offerAll}, iterables iterating an
 * infinite number of elements might prevent the Promise,
 * which {@link offerAllAsync} returns, from ever resolving.
 * <br>
 * This depends on {@link dequeue} operations which could
 * get scheduled by the JS VM while elements from the passed
 * asynchronous iterator are accessed.
 *
 * @returns A promise resolving to the number of elements, which could be
 * inserted (offered successfully) consecutively without waiting.
 * Possibly 0 when the queue was full at the time of the call.
 * Fulfillment of this promise is not guaranteed in case of infinite
 * iterables.
 */
 offerAllAsync(iterable: AsyncIterable<T>): Promise<number>;
 /**
 * Dequeue an element if available or throw an exception otherwise.
 *
 * @returns The first element of the queue.
 * @throws An exception if the queue is empty at the time of the call.
 */
 poll(): T;
 /**
 * Dequeue an element, waiting for data to be available if necessary.
 *
 * @returns A promise which is fulfilled when an element (as queued by
 * queue()) becomes available.
 * If multiple dequeus() are issued sequentially, it is
 * implementation-defined whether they are fulfilled in the same
 * order or not. However, the data is still retrieved in FIFO
 * fashion, meaning the first fulfilled promise gets the first
 * element, the second fulfilled the second one and so forth.
 */
 dequeue(): Promise<T>;
 /**
 * Return the current size at the moment of the call.
 *
 * Even though code like
 * ```
 * if (queue.size() >= 1) {
 * const element = queue.poll();
 * }
 * ```
 * is technically not wrong (due to JS' execution model), users are
 * advised to avoid this pattern. Instead, users are encouraged to
 *
 * - in cases where waiting for a promise is impossible, to use
 * {@link poll} and catch the exception,
 * - or to use {@link dequeue} with JS' `await` or
 * `queue.dequeue().then(...)`.
 */
 size(): number;
}
/**
 * Asynchronous element-limited FIFO queue with a Promise-driven dequeue operation.
 *
 * {@link AsyncLimitedQueue#queue} operations are delayed (in unspecified order)
 * until space becomes available through dequeue operations.
 */
export class AsyncLimitedQueue<T> implements IAsyncLimitedQueue<T> {
 private limitSem: ISemaphore;
 /**
 * Initialize the queue.
 * @param limit A integer >= 1 specifying the number of elements after which
 * queue() effectively blocks (i.e. the promise returned by it
 * does not get "immediately" fulfilled for some informal value
 * of immediately).
 * @param storageQueue An asynchronous (non-limiting) queue backing the data.
 * It defaults to a AsyncQueue.
 *
 * @throws An exception in case the limit is not an integer or is <= 0.
 */
 public constructor(limit: number, private storageQueue: IAsyncQueue<T> = new AsyncQueue()) {
 if (!Number.isInteger(limit) || limit <= 0) {
 throw new Error('AsyncLimitedQueue: Illegal limit (non-integer or\
 <= 0) on queued elements. It must be an integer >= 1.');
 }
 this.limitSem = new Semaphore(limit);
 }
 public async queue(data: T): Promise<void> {
 await this.limitSem.take();
 this.storageQueue.queue(data);
 }
 public async queueAll(iterable: Iterable<T>): Promise<void> {
 for (const element of iterable) {
 await this.queue(element);
 }
 }
 public async queueAllAsync(iterable: AsyncIterable<T>): Promise<void> {
 for await (const element of iterable) {
 await this.queue(element);
 }
 }
 public offer(data: T): boolean {
 if (this.limitSem.tryTake()) {
 this.storageQueue.queue(data);
 return true;
 }
 else {
 return false;
 }
 }
 public offerAll(iterable: Iterable<T>): number {
 let insertedElements = 0;
 for (const element of iterable) {
 if (!this.offer(element)) {
 return insertedElements;
 }
 insertedElements++;
 }
 return insertedElements;
 }
 public async offerAllAsync(iterable: AsyncIterable<T>): Promise<number> {
 let insertedElements = 0;
 for await (const element of iterable) {
 if (!this.offer(element)) {
 return insertedElements;
 }
 insertedElements++;
 }
 return insertedElements;
 }
 public async dequeue(): Promise<T> {
 return this.storageQueue.dequeue().then(element => {
 this.limitSem.free();
 return element;
 });
 }
 public poll(): T {
 return this.storageQueue.poll();
 }
 public async *[Symbol.asyncIterator](): AsyncIterableIterator<T> {
 while (true) {
 yield this.dequeue();
 }
 }
 public size(): number {
 return this.storageQueue.size();
 }
}
asked Mar 4, 2018 at 16:13
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.