I have a class DatabaseQueue
which asynchronously runs SQLite updates that it takes from a LinkedBlockingQueue
.
I implemented a pausing mechanism for this class which should ideally stop the execution of the queue as soon as possible and then wait for it to be unpaused before executing more queries.
I'm very new to concurrent programming but I managed to get this working using multiple ReentrantLock
's and Condition
's. This seems very verbose and clumsy though, especially with all the lock()'s and unlock()'s.
In addition to this, I also don't like how I have to execute an additional dummy query just to get the executor thread to continue its loop. What would be a better way to implement this?
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class DatabaseQueue {
private static final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
private static final QueueExecutor executor = new QueueExecutor();
private static boolean locked = false;
private static ReentrantLock lockAttempted = new ReentrantLock();
private static ReentrantLock lockAcquired = new ReentrantLock();
private static Condition attempted = lockAttempted.newCondition();
private static Condition acquired = lockAcquired.newCondition();
public static void start() {
if (locked) {
locked = false;
lockAttempted.lock();
attempted.signal(); //Signal to resume queue execution
lockAttempted.unlock();
} else {
executor.start();
}
}
public static void pause() {
locked = true;
if (queue.isEmpty()) {
queue.add("SELECT 1");
}
lockAcquired.lock();
acquired.awaitUninterruptibly(); //Wait for the lock to be acquired
lockAcquired.unlock();
}
public static void queue(String update) {
queue.add(update);
}
public static class QueueExecutor extends Thread {
public QueueExecutor() {
setName("Database Queue Executor");
setDaemon(true);
}
@Override
public void run() {
Connection connection = Database.getConnection();
while (true) {
try {
if (locked) {
lockAcquired.lock();
acquired.signal(); //Signal that the lock has been acquired
lockAcquired.unlock();
lockAttempted.lock();
attempted.awaitUninterruptibly(); //Wait for the lock to be released
lockAttempted.unlock();
}
String update = queue.take();
connection.createStatement().executeUpdate(update);
Database.update(update);
} catch (InterruptedException | SQLException e) {
e.printStackTrace();
}
}
}
}
}
2 Answers 2
- Why are all your variables static? You shouldn't use them.
- You don't need that many locks. Actually, you need only 1 and a
AtomicBoolean
(locked
). - I can't understand why you add
"SELECT 1"
if thequeue
is empty. That'll waste some precious time. - I think as long as there's only 1
QueueExecutor
you don't need to make thequeue
thread-safe. - You could execute your statements in batches but I'm not sure of the pros and cons.
- Unless you 100% trust the user you should use
PreparedStatement
s but even if you do trust the user you still should use it as it will give you a speed (i.e. query execution) boost. - Wouldn't it be better to pass
Statement
s toqueue
instead ofString
? It would allow you to easily allow the user to usePreparedStatement
s
Example implementation (not tested):
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicBoolean;
public class DatabaseQueue {
private final Deque<PreparedStatement> queue = new ArrayDeque<>();
private final QueueExecutor executor = new QueueExecutor();
private final AtomicBoolean paused = new AtomicBoolean(false);
private final Object object = new Object();
public DatabaseQueue() {
executor.start();
}
public void start() {
if (!paused.compareAndSet(true, false)) {
throw new IllegalStateException("DatabaseQueue wasn't paused");
}
executor.notify();
}
public void pause() {
if (!paused.compareAndSet(false, true)) {
throw new IllegalStateException("DatabaseQueue is already paused");
}
}
public void close() {
executor.shutdown();
paused.set(false); // we want to execute all the statements before closing
executor.notify();
}
public void queue(PreparedStatement statement) {
queue.addLast(statement);
if (!paused.get())
executor.notify();
}
public PreparedStatement createStatement(String sql) throws SQLException {
return Database.getConnection().prepareStatement(sql);
}
class QueueExecutor extends Thread {
private boolean shutdown = false;
QueueExecutor() {
setName("Database Queue Executor");
setDaemon(true);
}
void shutdown() {
shutdown = true;
}
@Override
public void run() {
// even if we should shutdown we first should execute all the statements
do {
if (paused.get() || queue.isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
PreparedStatement statement = queue.removeFirst();
if (!paused.get())
statement.execute();
else
queue.addFirst(statement);
} catch (SQLException e) {
e.printStackTrace();
}
} while (!shutdown || !queue.isEmpty());
}
}
}
-
\$\begingroup\$ queue.take() will pause execution of the thread until a query is available. So, if the queue is first empty, then paused, then a statement is queued, it will execute regardless of being paused. So I'm executing "SELECT 1" to force it to reach the locking code. Also could you please explain how I could utilize just one AtomicBoolean to achieve the same effect? \$\endgroup\$kmecpp– kmecpp2017年08月14日 19:28:29 +00:00Commented Aug 14, 2017 at 19:28
-
\$\begingroup\$ @kmecpp I added example code. I did not test it but it looks reasonable to me \$\endgroup\$Mibac– Mibac2017年08月14日 20:34:09 +00:00Commented Aug 14, 2017 at 20:34
All the points by @Mibac are valid.
Naming is also very important. You have the locked
variable, but the function that controls it is pause()
, which made your code confusing. It would have been better if the variable was also named paused
. @Mibac did this renaming in his code, but did not mention it explicitly.
locked
was not declared as volatile
, so it's not thread-safe. Or as @Mibac suggested, it would be even clearer to use an AtomicBoolean
.
But instead of you using a boolean, I think it's better to use something which is blocking for the pause condition. That way the continuous loop in the writer thread can simply block-wait on 1) a new element in the queue and 2) the pause condition, in that order. Then it writes the element to DB. And then it's back to the start of the while loop waiting again on 1) and 2).
You could use for example Semaphore with one permit as the blocking condition for 2). For example:
while (...) {
element = queue.take(); // blocking
semaphore.acquire(); // blocking
semaphore.release();
writeToDB(element);
}
The semaphore permit is released right after it's acquired since it's just used as a blocking mechanism to wait if another thread requests a pause. Maybe there is a cleaner way to do this with a CyclicBarrier, CountDownLatch or Phaser. I think it's cleaner to use one those constructs which blocks (including Semaphore) rather than just a boolean flag. A Lock might not work because it might require that .lock()
and .unlock()
be called from the same thread.
And for pausing and resuming from other threads:
public void pause() {
if (semaphore.availablePermits() == 1) {
semaphore.acquire(); // blocking, but should never happen
}
}
public void resume() {
semaphore.release();
}
Note here that I only acquire a permit if no other thread has already requested a pause. This basically means that there can only be one pause even if there are requests from many threads. Calling semaphore.release()
when there is no permit to release does nothing bad. semaphore.acquire()
can be a blocking call, but in this setup it should only block for an infinitesimally small time in the rare case where the writer thread would be calling acquire() at the same time, but that is followed immediately by release().
Explore related questions
See similar questions with these tags.