2
\$\begingroup\$

As an exercise, and in an effort to avoid wrapping callback APIs in every node library it seems, I threw together a resource pool class. The intended usage is the following:

const pool = new Pool({
 createConnection() { return ... Promise<MyConnection> }
 destroyConnection(c: MyConnection) { return ... any }
})
const connection = await pool.resource()
//later...
pool.release(connection)
//maybe if error occurs in handlers setup in `createConnection`
pool.destroy(connection)
//and when finished
pool.drain().then(() => pool.close())

My goal is for the behavior to be as unsurprising as possible. And I'd like feedback!

Here is a link to the gist.

import Queue from 'double-ended-queue'
const POOL_ID = Symbol('pool-id')
const IS_DIRTY = Symbol('dirty')
function isDirty (resource) {
 return resource[IS_DIRTY]
}
function poolId (resource) {
 return resource[POOL_ID]
}
function range (num) {
 return Array.from(Array(num))
}
class Deferred<T> {
 public promise: Promise<T>
 public resolve: (answer: T) => void
 public reject: (error: Error) => void
 constructor () {
 this.promise = new Promise((resolve, reject) => {
 this.resolve = resolve
 this.reject = reject
 })
 }
}
function isDefined (thing: any) {
 return typeof thing !== 'undefined'
}
class Pool<T> {
 private _closing = false
 private _min: number = 0
 private _max: number = 10
 private _idle: number = 30000
 private _resourceIds: Queue<number>
 private _resources: Queue<{ expiration: number, resource: T}>
 private _issued: { [key: string]: T } = {}
 private _pending: Queue<Deferred<T>> = new Queue<Deferred<T>>()
 private _timeoutToken: any
 private _size: number = 0
 private _initializing = false
 private _createResource: () => Promise<T>
 private _destroyResource: (resource: T) => Promise<any>
 constructor ({ createConnection, destroyConnection, idle, min, max }) {
 this._max = max || this._max
 this._min = min || this._min
 this._idle = idle || this._idle
 this._resources = new Queue<{ expiration: number, resource: T}>(max)
 this._createResource = () => {
 this._size++
 return Promise.resolve(createConnection())
 } 
 this._destroyResource = (x) => {
 this._size--
 return Promise.resolve(destroyConnection(x))
 }
 this._resourceIds = new Queue(range(this.max).map((_, i) => 1 + i))
 if (this.min > 0) {
 this._initializing = true
 Promise.all(range(this.min).map(this._createResource))
 .then(resources => {
 this._resources.enqueue(
 ...resources.map(resource => ({ expiration: Date.now(), resource }))
 )
 this._pending.toArray().forEach(x => x.resolve(this.resource()))
 this._initializing = false
 this._pending.clear()
 })
 }
 }
 /**
 * Current number of resources in pool
 */
 public get size () { return this._size }
 /**
 * Current number of resources in use
 */
 public get issued () { return this.max - this._resourceIds.length }
 /**
 * Maximum size of pool
 */
 public get max () { return this._max }
 /**
 * Minimum size of pool
 */
 public get min () { return this._min }
 /**
 * Approximate lifetime (ms) of an unused resource
 */
 public get idle () { return this._idle }
 /**
 * Forcibly destroy all created resources
 * To close the pool gracefully, call #drain first
 * A subsequent call to #resource is a programming error 
 */
 public close () {
 this._closing = true
 return Promise.all(Object.keys(this._issued)
 .map(x => this._issued[x])
 .concat(this._resources.toArray().map(x => x.resource))
 .map(x => this.destroy(x)))
 }
 /**
 * Wait until all resources have been released
 * This will not prevent resources from being issued. (calls to #resource)
 * Usually followed by a call to #close.
 * Calling close while draining, will short circuit this process
 */
 public drain () {
 return new Promise ((resolve) => {
 !function wait () {
 if (this.issued === 0) {
 resolve()
 } else {
 !this._closing && setTimeout(wait, 100) || resolve()
 } 
 }()
 })
 }
 /**
 * Destroy a resource. This is useful if you deterime the resource is in an error state.
 * It can be called at any time. 
 * If a destroyed resource is currently issued it will also be released
 */
 public destroy (resource) {
 if (isDirty(resource)) {
 return Promise.resolve()
 }
 resource[IS_DIRTY] = true
 if (poolId(resource)) {
 this.release(resource)
 }
 return this._destroyResource(resource)
 }
 /**
 * Immediately release a resource back into the pool. 
 * If the resource has not also been destroyed it may be recycled immediately.
 * Released resources that remain unused for #idle milliseconds will be destroyed.
 */
 public release (resource) {
 const resourceId = poolId(resource)
 if (!isDirty(resource) && this._pending.length) {
 return this._pending.dequeue().resolve(resource)
 }
 delete this._issued[resourceId]
 delete resource[POOL_ID]
 this._resourceIds.enqueue(resourceId)
 !isDirty(resource) && this._queuePossibleDestruction(resource)
 }
 /**
 * Request a resource, if none exist, request will be queued or one will be created. 
 * Otherwise, previously released resources are issued.
 */
 public resource () {
 if (this._closing) {
 return Promise.reject(new Error('Cannot issue resource while pool is closing...'))
 }
 if (!this._resourceIds.length || this._initializing) {
 const futureAvailableResource = new Deferred<T>()
 this._pending.enqueue(futureAvailableResource)
 return futureAvailableResource.promise
 }
 const resourceId = this._resourceIds.dequeue()
 if (this._resources.length) {
 const { resource } = this._resources.dequeue()
 if (isDirty(resource)) {
 this._resourceIds.enqueue(resourceId)
 return this.resource()
 }
 return Promise.resolve(this._dispatchResource(resource, resourceId))
 }
 return this._createResource().then(resource => {
 return this._dispatchResource(resource, resourceId)
 })
 }
 private _dispatchResource (resource: any, resourceId: number) { 
 resource[POOL_ID] = resourceId
 this._issued[resourceId] = resource
 return resource
 }
 private _queuePossibleDestruction (resource :T) {
 this._resources.enqueue({ expiration: Date.now() + this._idle, resource })
 if (!this._timeoutToken) {
 this._scheduleNextCleanup(this._idle)
 }
 }
 private _cleanup () {
 if (this.size === this._min || !this._resources.length) {
 return this._timeoutToken = null
 }
 const { expiration } = this._resources.peekFront(),
 expiredMsAgo = expiration - Date.now()
 if (expiredMsAgo <= 0) {
 const { resource } = this._resources.dequeue()
 this.destroy(resource)
 this._scheduleNextCleanup(100)
 } else {
 this._scheduleNextCleanup(expiredMsAgo + 1)
 }
 }
 private _scheduleNextCleanup(ms: number) {
 if (!this._closing) {
 this._timeoutToken = setTimeout(() => {
 this._cleanup() 
 }, ms)
 }
 }
}
200_success
145k22 gold badges190 silver badges478 bronze badges
asked Nov 7, 2016 at 2:52
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Interesting use case! The core functionality reminds me a bit of my own AsyncQueue implementation.

I admittedly did not have too much of a look at your business logic, though. I hope the following notes are still of use for you.

  • Use default values in the object destructuring

    constructor ({ createConnection, destroyConnection, idle, min, max }) {
     this._max = max || this._max
     this._min = min || this._min
     this._idle = idle || this._idle
    constructor ({
     createConnection, destroyConnection,
     idle = 30000,
     min = 0,
     max = 10
    }) {
    
  • I would declare size to be a function than a getter property because it can and will return a varying value over the lifetime of your pool. A function call better conveys reading the current value whereas a property read might be mistaken for a onetime operation for reading a constant. Therefore, I find the getter properties for the other values like max and min to be a perfect fit!
    (Side note: why Array#length is not a function might also be disputable by the same argumentation.)

  • Add parentheses where operator precedence is not totally clear

    // In drain
    !this._closing && setTimeout(wait, 100) || resolve()
    

    I would even go so far to prefer if-else-blocks instead of double-nested short-circuit logic here.

  • Annotate functions returning a Promise with async.
    Currently, some functions have no return type annotation and return Promises on multiple branches. It would be quite easy to introduce another branch and return a raw value (e.g. return this.size) directly instead of wrapped inside a Promise. Using the type system effectively guards against these errors.

answered Mar 10, 2018 at 16:09
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.