You need to design and implement a generic pool for storing objects (implementing poolable). The pool should support the following:
A support for creational pattern which would be used by the pool to create objects automatically when needed
Maximum and Minimum number of objects which can be created and made available within the pool. Minimum number would be used to make sure that the number of objects are available in the pool, Maximum number would make sure that the pool will create a max of these objects
Automatic clearing of objects not used in the pool, this clearing agent can run at a fixed time interval
Once the object borrowed from the pool is used should support returning back
package com.tripplepoint.codingassignment.objectpool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ObjectPool<T extends Poolable> implements ObjectPoolable<T> {
private static final Logger logger = Logger.getLogger(ObjectPool.class
.getCanonicalName());
private final BlockingQueue<T> pool;
/** Maximum number of connections that the pool can have */
private final int maxPoolSize;
/** Number of connections that should be created initially */
private final int minPoolSize;
/** Indicates weather shutdown is called or not */
volatile private boolean shutdownCalled;
// fine grained locking
private final Lock takeLock = new ReentrantLock();
private final Lock releseLock = new ReentrantLock();
/**
* after allocationTrigeerInterval time thread will check the pool and
* depends on necessity it will create or discard the object from the pool.
*/
private final long allocationTrigeerInterval;
private final TimeUnit timeUnit1;
private ScheduledExecutorService executorService;
private long nextAllocationTrigeerInterval;
/** Count the number get Request for an interval */
private AtomicInteger takeRate = new AtomicInteger(0);
/** Count the number Released object for an interval */
private AtomicInteger releaseRate = new AtomicInteger(0);
/**
* Object Factory to create the object of poolable type
*/
private ObjectFactory<T> objectFactory;
public ObjectPool(int maxPoolSize, int minPoolSize,
ObjectFactory<T> objectFactory, long allocationTrigeerInterval,
TimeUnit timeUnit1) {
if ((minPoolSize > maxPoolSize) || minPoolSize < 1 || maxPoolSize < 1) {
throw new IllegalArgumentException("Invalid pool size parameters");
}
this.maxPoolSize = maxPoolSize;
this.minPoolSize = minPoolSize;
this.objectFactory = objectFactory;
this.pool = new LinkedBlockingQueue<T>(maxPoolSize);
this.shutdownCalled = false;
this.allocationTrigeerInterval = allocationTrigeerInterval;
this.nextAllocationTrigeerInterval = allocationTrigeerInterval;
this.timeUnit1 = timeUnit1;
initialize();
if (pool.size() != minPoolSize) {
logger.log(
Level.WARNING,
"Initial sized pool creation failed. InitializedPoolSize={0}, minPoolSize={1}",
new Object[] { pool.size(), minPoolSize });
}
}
protected void initialize() {
for (int i = 0; i < minPoolSize; i++) {
createandPoolObject();
}
/**
* creating separate thread to monitor the pool size 1. If there are
* less object inside the pool than minPoolSize then it will create new
* objects inside the pool 2. If there are more objects than minPoolSize
* then it will remove the objects from the pool
*/
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(()-> {
try {
int poolSize = pool.size();
int takerate = takeRate.get();
//TODO nextAllocationTrigeerInterval will not updated for the next scheduled run :Need to change the logic probably
nextAllocationTrigeerInterval = (((poolSize - takerate) / (takerate - releaseRate.get())) + 1) * nextAllocationTrigeerInterval;
// Reset the rate
takeRate.set(0);
releaseRate.set(0);
if (poolSize < minPoolSize) {
int sizeToBeAdded = minPoolSize - poolSize;
for (int i = 0; i < sizeToBeAdded; i++) {
createandPoolObject();
}
} else if (poolSize > minPoolSize) {
int sizeTobeRemoved = poolSize - minPoolSize;
for (int i = 0; i < sizeTobeRemoved; i++) {
removeObjectfromPool();
}
}
} catch (IllegalStateException excep) {
logger.log(
Level.FINEST,
"Shutdown is called , so no need of create/discard the object from the pool",
new Object[] { excep.toString() });
}
}, allocationTrigeerInterval, nextAllocationTrigeerInterval, timeUnit1);
}
/**
* This method create the poolable object and and into the pool.
*/
private void createandPoolObject() {
if (!shutdownCalled) {
T poolObject = null;
try {
poolObject = objectFactory.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
logger.log(Level.WARNING,
"Object Creation failed with exception ={0}",
new Object[] { e.getMessage() });
return;
}
pool.offer(poolObject);
logger.log(
Level.FINE,
"Created Poolable {0}, currentPoolSize={1}, maxPoolSize={2}",
new Object[] { poolObject.toString(), pool.size(),
maxPoolSize });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
private void removeObjectfromPool() {
if (!shutdownCalled) {
pool.remove();
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
/**
* Borrowed thread will be blocked till the poolable object will be
* available in the pool.
*
* @return
* @throws InterruptedException
*/
public T getObject() throws InterruptedException {
T obj = null;
if (!shutdownCalled) {
takeLock.lock();
try {
obj = pool.take();
takeRate.incrementAndGet();
} finally {
takeLock.unlock();
}
logger.log(Level.FINEST, "get Object from Pool Object{0}",
new Object[] { obj.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
return obj;
}
/**
* This method will wait for defined time if there is no poolable object in
* the pool
*
* @param timeOut
* @param timeUnit
* @return
* @throws InterruptedException
*/
public T getObject(long timeOut, TimeUnit timeUnit)
throws InterruptedException {
T obj = null;
if (!shutdownCalled) {
takeLock.lock();
try {
obj = pool.poll(timeOut, timeUnit);
takeRate.incrementAndGet();
} finally {
takeLock.unlock();
}
logger.log(Level.FINEST, "get Object from Pool Object{0}",
new Object[] { obj.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
return obj;
}
/**
* Surrender thread will block if the pool is exceeding the max size of the
* pool
*
* @param t
* @throws InterruptedException
*/
public void release(T t) throws InterruptedException {
if (!shutdownCalled) {
releseLock.lock();
try {
pool.put(t);
releaseRate.incrementAndGet();
} finally {
releseLock.unlock();
}
logger.log(Level.FINEST, "Releases Pool Object{0}",
new Object[] { t.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
/**
* Surrender thread will wait for defined time if Object pool is full
*
* @param poolable
* object
* @throws InterruptedException
*/
public void release(T t, long timeOut, TimeUnit timeUnit)
throws InterruptedException {
if (!shutdownCalled) {
releseLock.lock();
try {
pool.offer(t, timeOut, timeUnit);
releaseRate.incrementAndGet();
} finally {
releseLock.unlock();
}
logger.log(Level.FINEST, "Release Pool Object{0}",
new Object[] { t.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
/**
* After calling this method it will not accept any more request It will
* process queued request and clear the pool
*/
public synchronized void shutdown() {
logger.log(Level.INFO, "Object pool shutdown started...");
shutdownCalled = true;
takeLock.lock(); // waiting for get and release operation to finish
releseLock.lock();
try {
pool.stream().forEach((t) -> t.close());
pool.clear();
executorService.shutdown();
} finally {
releseLock.unlock();
takeLock.unlock();
}
logger.log(Level.INFO, "Object pool shutdown completed...");
}
/**
* return the number of Objects in the pool
*/
public int getPoolSize() {
// waiting for get and release operation to finish
takeLock.lock();
releseLock.lock();
try {
return pool.size();
} finally {
releseLock.unlock();
takeLock.unlock();
}
}
}
2 Answers 2
You use lock locks in the get and release methods. This somewhat defeats the purpose of using the threadsafe pool. The getPoolSize doesn't need to wait on finishing the get or release as pool.size()
is thread-safe and the pool's size is fluid anyway when multiple threads are contesting over it.
For shutdown you can use pool.drainTo
to clear the pool to another collection when shutting down:
executorService.shutdown(); //first to stop the creation thread
List<T> list = new ArrayList<>(pool.size()+10);
pool.drainTo(list);
list.stream().forEach((t) -> t.close());
That means you can remove the locks as the pool will take care of the thread-safety.
shutdown
is declared synchronized but that it the only method that is. But there is no protection against calling it twice in a row, nor do any of the operations need to be guarded like that.
I would also add a third getObject
variant that will create an object on the calling thread when the pool is empty.
public T getOrCreateObject()
throws InterruptedException {
T obj = null;
if (!shutdownCalled) {
obj = pool.poll();
if(obj == null){
obj = objectFactory.newInstance();
}
takeRate.incrementAndGet();
logger.log(Level.FINEST, "get Object from Pool Object{0}",
new Object[] { obj.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
return obj;
}
The creation task checks if (poolSize > minPoolSize)
I think you mean maxPoolSize
there.
Release shouldn't be blocking. Instead if the pool is full just discard the object.
public void release(T t) throws InterruptedException {
if (!shutdownCalled) {
if(!pool.offer(t))t.close();
releaseRate.incrementAndGet();
logger.log(Level.FINEST, "Releases Pool Object{0}",
new Object[] { t.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
removeObjectfromPool
doesn't call close
on the discarded object.
-
\$\begingroup\$ Shutdown Method : drainTo 1. Is it thread safe as bulk operation on collection are not thread safe . 2. Does it permits the take, release other operations after calling the drainTo ? Is it atomic operation ? This information will help to decide 1. shutDownCalled variable to be maintain or not. 2. Does it require to acquire the locks before calling pool.clear. \$\endgroup\$Damaji kalunge– Damaji kalunge2016年11月19日 04:30:10 +00:00Commented Nov 19, 2016 at 4:30
-
\$\begingroup\$ Java Specification : Removes all available elements from this queue and adds them to the given collection. This operation may be more efficient than repeatedly polling this queue. A failure encountered while attempting to add elements to collection c may result in elements being in neither, either or both collections when the associated exception is thrown. Attempts to drain a queue to itself result in IllegalArgumentException. Further, the behavior of this operation is undefined if the specified collection is modified while the operation is in progress. \$\endgroup\$Damaji kalunge– Damaji kalunge2016年11月19日 04:31:57 +00:00Commented Nov 19, 2016 at 4:31
-
\$\begingroup\$ get and relese method with lock defeats the purpose of using thread safe linkedblocking queue But to make the following operation atomic need to use the lock obj = pool.take(); takeRate.incrementAndGet(); otherwise takerate will not consistent \$\endgroup\$Damaji kalunge– Damaji kalunge2016年11月19日 04:59:05 +00:00Commented Nov 19, 2016 at 4:59
-
\$\begingroup\$ This API have two overloaded method One is getObject blocking and another one is non-blocking wait for certain time. These two methods using the same lock, so there is possibility of non-blocking method will behave like blocking What would be the appropriate solution for this ? having another lock, these leads too many locks as in future if need to support other versions of getObject and shutdown becomes costly as need to acquire too many locks. \$\endgroup\$Damaji kalunge– Damaji kalunge2016年11月19日 05:15:30 +00:00Commented Nov 19, 2016 at 5:15
-
\$\begingroup\$ if (poolSize > minPoolSize) this is correct as per the problem statement. \$\endgroup\$Damaji kalunge– Damaji kalunge2016年11月19日 06:00:22 +00:00Commented Nov 19, 2016 at 6:00
- Return the
softReference
from the object factory - Store the
SoftReference
in the Pool - Made the code changes by considering the comments posted earlier
package com.tripplepoint.codingassignment.objectpool;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ObjectPool<T extends Poolable> implements ObjectPoolable<T> {
private static final Logger logger = Logger.getLogger(ObjectPool.class
.getCanonicalName());
private final BlockingQueue<SoftReference<T>> pool;
/** Maximum number of connections that the pool can have */
private final int maxPoolSize;
/** Number of connections that should be created initially */
private final int minPoolSize;
/** Indicates weather shutdown is called or not */
volatile private boolean shutdownCalled;
// fine grained locking
private final Lock takeLock = new ReentrantLock();
private final Lock releseLock = new ReentrantLock();
/**
* after allocationTrigeerInterval time thread will check the pool and
* depends on necessity it will create or discard the object from the pool.
*/
private final long allocationTrigeerInterval;
private final TimeUnit timeUnit1;
private ScheduledExecutorService executorService;
private long nextAllocationTrigeerInterval;
/** Count the number get Request for an interval */
private AtomicInteger takeRate = new AtomicInteger(0);
/** Count the number Released object for an interval */
private AtomicInteger releaseRate = new AtomicInteger(0);
/**
* Object Factory to create the object of poolable type
*/
private ObjectFactory<T> objectFactory;
public ObjectPool(int maxPoolSize, int minPoolSize,
ObjectFactory<T> objectFactory, long allocationTrigeerInterval,
TimeUnit timeUnit1) {
if ((minPoolSize > maxPoolSize) || minPoolSize < 1 || maxPoolSize < 1) {
throw new IllegalArgumentException("Invalid pool size parameters");
}
this.maxPoolSize = maxPoolSize;
this.minPoolSize = minPoolSize;
this.objectFactory = objectFactory;
this.pool = new LinkedBlockingQueue<>(maxPoolSize);
this.shutdownCalled = false;
this.allocationTrigeerInterval = allocationTrigeerInterval;
this.nextAllocationTrigeerInterval = allocationTrigeerInterval;
this.timeUnit1 = timeUnit1;
initialize();
if (pool.size() != minPoolSize) {
logger.log(
Level.WARNING,
"Initial sized pool creation failed. InitializedPoolSize={0}, minPoolSize={1}",
new Object[] { pool.size(), minPoolSize });
}
}
protected void initialize() {
for (int i = 0; i < minPoolSize; i++) {
createandPoolObject();
}
/**
* creating separate thread to monitor the pool size 1. If there are
* less object inside the pool than minPoolSize then it will create new
* objects inside the pool 2. If there are more objects than minPoolSize
* then it will remove the objects from the pool
*/
executorService = Executors.newSingleThreadScheduledExecutor();
executorService
.scheduleWithFixedDelay(
() -> {
try {
int poolSize = pool.size();
int takerate = takeRate.getAndSet(0);
// TODO nextAllocationTrigeerInterval will not
// updated for the next scheduled run :Need to
// change the logic
nextAllocationTrigeerInterval = (((poolSize - takerate) / (takerate - releaseRate
.getAndSet(0))) + 1)
* nextAllocationTrigeerInterval;
if (poolSize < minPoolSize) {
int sizeToBeAdded = minPoolSize - poolSize;
for (int i = 0; i <= sizeToBeAdded; i++) {
createandPoolObject();
}
} else if (poolSize > minPoolSize) {
int sizeTobeRemoved = poolSize
- minPoolSize;
for (int i = 0; i <= sizeTobeRemoved; i++) {
removeObjectfromPool();
}
}
} catch (IllegalStateException excep) {
logger.log(
Level.FINEST,
"Shutdown is called , so no need of create/discard the object from the pool",
new Object[] { excep.toString() });
}
}, allocationTrigeerInterval,
nextAllocationTrigeerInterval,
timeUnit1);
}
/**
* This method create the poolable object and and into the pool.
*/
private void createandPoolObject() {
if (!shutdownCalled) {
SoftReference<T> poolObject = null;
try {
poolObject = objectFactory.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
logger.log(Level.WARNING,
"Object Creation failed with exception ={0}",
new Object[] { e.getMessage() });
return;
}
pool.offer(poolObject);
logger.log(
Level.FINE,
"Created Poolable {0}, currentPoolSize={1}, maxPoolSize={2}",
new Object[] { poolObject.toString(), pool.size(),
maxPoolSize });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
private void removeObjectfromPool() {
if (!shutdownCalled) {
pool.remove();
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
/**
* Borrowed thread will be blocked till the poolable object will be
* available in the pool.
*
* @return
* @throws InterruptedException
*/
public T getObject() throws InterruptedException {
T obj = null;
if (!shutdownCalled) {
takeLock.lock();
try {
obj = pool.take().get();
takeRate.incrementAndGet();
} finally {
takeLock.unlock();
}
logger.log(Level.FINEST, "get Object from Pool Object{0}",
new Object[] { obj.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
return obj;
}
/**
* TO-DO this method will act as blocked in case where no element in queue
* take is called and after that this method get called ? This method will
* wait for defined time if there is no poolable object in the pool
*
* @param timeOut
* @param timeUnit
* @return
* @throws InterruptedException
*/
public T getObject(long timeOut, TimeUnit timeUnit)
throws InterruptedException {
T obj = null;
if (!shutdownCalled) {
takeLock.lock();
try {
obj = pool.poll(timeOut, timeUnit).get();
takeRate.incrementAndGet();
} finally {
takeLock.unlock();
}
logger.log(Level.FINEST, "get Object from Pool Object{0}",
new Object[] { obj.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
return obj;
}
/**
* This method will add the object in to the pool if size is not full If
* size is full then it will discards object by performing clean up
* activity.
*
* @param t
* @throws InterruptedException
*/
public void release(T t) throws InterruptedException {
if (!shutdownCalled) {
releseLock.lock();
try {
if (!pool.offer(new SoftReference<T>(t))) {
t.close();
}
releaseRate.incrementAndGet();
} finally {
releseLock.unlock();
}
logger.log(Level.FINEST, "Releases Pool Object{0}",
new Object[] { t.toString() });
} else {
throw new IllegalStateException("Object Pool is already shoudown");
}
}
/**
* After calling this method it will not accept any more request It will
* process queued request and clear the pool
*/
public void shutdown() {
logger.log(Level.INFO, "Object pool shutdown started...");
if (!shutdownCalled) {
shutdownCalled = true;
List<SoftReference<T>> list = new ArrayList<>(maxPoolSize);
takeLock.lock(); // waiting for get and release operation to finish
releseLock.lock();
try {
executorService.shutdown();// First stop the creation of thread
pool.drainTo(list); // TODO Not sure of thread safety
list.stream().forEach((t) -> t.get().close());
list.clear();
pool.clear();
} finally {
releseLock.unlock();
takeLock.unlock();
}
logger.log(Level.INFO, "Object pool shutdown completed...");
} else {
throw new IllegalStateException(
"Other thread as alreday called shutdown...");
}
}
/**
* return the number of Objects in the pool
*/
public int getPoolSize() {
return pool.size();
}
}
Poolable
andObjectPoolable
? \$\endgroup\$