I've implemented a "Ticket" class which is shared as a shared_ptr between multiple threads.
The program flow is like this:
- parallelQuery() is called to start a new query job. A shared instance of Ticket is created.
- The query is split into multiple tasks, each task is enqueued on a worker thread (this part is important, otherwise I'd just join threads and done). Each task gets the shared ticket.
- ticket.wait() is called to wait for all tasks of the job to complete.
- When one task is done it calls the done() method on the ticket.
- When all tasks are done the ticket is unlocked, result data from the task aggregated and returned from parallelQuery()
In pseudo code:
std::vector<T> parallelQuery(std::string str) {
auto ticket = std::make_shared<Ticket>(2);
auto task1 = std::make_unique<Query>(ticket, str+"a");
addTaskToWorker(task1);
auto task2 = std::make_unique<Query>(ticket, str+"b");
addTaskToWorker(task2);
ticket->waitUntilDone();
auto result = aggregateData(task1, task2);
return result;
}
My code works. But it is a critical part of my application and I want a second opinion, especially in regards to locking and unlocking when the task is complete.
Here is the Ticket class:
#include <mutex>
#include <atomic>
class Ticket {
public:
Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) {
_mutex.lock();
}
void waitUntilDone() {
_doneLock.lock();
if (_done != _numTasks) {
_doneLock.unlock();
_mutex.lock();
}
else {
_doneLock.unlock();
}
}
void done() {
_doneLock.lock();
_done++;
if (_done == _numTasks) {
_mutex.unlock();
}
_doneLock.unlock();
}
void cancel() {
_canceled = true;
_mutex.unlock();
}
bool wasCanceled() {
return _canceled;
}
bool isDone() {
return _done >= _numTasks;
}
int getNumTasks() {
return _numTasks;
}
private:
std::atomic<int> _numTasks;
std::atomic<int> _done;
std::atomic<bool> _canceled;
// mutex used for caller wait state
std::mutex _mutex;
// mutex used to safeguard done counter with lock condition in waitUntilDone
std::mutex _doneLock;
};
Just for completeness, here are the test cases:
#include <thread>
#include <atomic>
#include <unistd.h>
#include "catch.hpp"
#include "Ticket.hpp"
Ticket ticket1;
std::atomic<bool> stopTicketRunner(false);
TEST_CASE("Testing Master Ticket wait") {
std::thread thread = std::thread([] {
while (!stopTicketRunner) {
usleep(100000);
if (!ticket1.isDone())
ticket1.done();
}
});
ticket1.waitUntilDone();
REQUIRE(ticket1.isDone());
REQUIRE(!ticket1.wasCancelled());
stopTicketRunner = true;
thread.join();
}
Ticket ticket2;
std::atomic<bool> stopTicketCancelRunner(false);
TEST_CASE("Testing Master Ticket cancel") {
std::thread thread = std::thread([] {
while (!stopTicketCancelRunner) {
usleep(100000);
if (!ticket2.wasCancelled())
ticket2.cancel();
}
});
ticket2.waitUntilDone();
REQUIRE(!ticket2.isDone());
REQUIRE(ticket2.wasCancelled());
stopTicketCancelRunner = true;
thread.join();
}
Ticket ticket3;
std::atomic<bool> stopTicketFinishRunner(false);
TEST_CASE("Testing Master Ticket fast finish (finished before wait called)") {
std::thread thread = std::thread([] {
while (!stopTicketFinishRunner) {
if (!ticket3.isDone())
ticket3.done();
usleep(100000);
}
});
usleep(1000000);
REQUIRE(ticket3.isDone());
ticket3.waitUntilDone();
REQUIRE(ticket3.isDone());
REQUIRE(!ticket3.wasCancelled());
stopTicketFinishRunner = true;
thread.join();
}
Ticket ticket4(3);
std::atomic<bool> stopTicketMultiRunner(false);
TEST_CASE("Testing Master Ticket multiple tasks)") {
std::thread thread1 = std::thread([] {
while (!stopTicketMultiRunner) {
usleep(500000);
if (!ticket4.isDone())
ticket4.done();
}
});
std::thread thread2 = std::thread([] {
while (!stopTicketMultiRunner) {
usleep(300000);
if (!ticket4.isDone())
ticket4.done();
}
});
std::thread thread3 = std::thread([] {
while (!stopTicketMultiRunner) {
usleep(100000);
if (!ticket4.isDone())
ticket4.done();
}
});
REQUIRE(!ticket4.isDone());
REQUIRE(ticket4.getNumTasks() == 3);
ticket4.waitUntilDone();
REQUIRE(ticket4.isDone());
REQUIRE(!ticket4.wasCancelled());
stopTicketMultiRunner = true;
thread1.join();
thread2.join();
thread3.join();
}
Ticket ticket5(3);
std::atomic<bool> stopTicketMultiCancelRunner(false);
TEST_CASE("Testing canceling Ticket with multiple tasks") {
std::thread thread1 = std::thread([] {
while (!stopTicketMultiCancelRunner) {
usleep(5000000);
if (!ticket5.isDone())
ticket5.done();
}
});
std::thread thread2 = std::thread([] {
while (!stopTicketMultiCancelRunner) {
usleep(300000);
if (!ticket5.isDone() && !ticket5.wasCancelled())
ticket5.cancel();
}
});
std::thread thread3 = std::thread([] {
while (!stopTicketMultiCancelRunner) {
usleep(1000000);
if (!ticket5.isDone())
ticket5.done();
}
});
REQUIRE(!ticket5.isDone());
REQUIRE(ticket5.getNumTasks() == 3);
ticket5.waitUntilDone();
REQUIRE(!ticket5.isDone());
REQUIRE(ticket5.wasCancelled());
stopTicketMultiCancelRunner = true;
thread1.join();
thread2.join();
thread3.join();
}
-
\$\begingroup\$ On Stack overflow (stackoverflow.com/questions/57441479/…), you told you have a problem and here you say that your code works! \$\endgroup\$Phil1970– Phil19702019年08月10日 17:28:48 +00:00Commented Aug 10, 2019 at 17:28
-
\$\begingroup\$ My code does work, and all tests pass. Your mentioned problem that done must be called on the same thread did not occur in practice. My appologies though for putting the question up here for a general code review, I should have mentioned this on SO. I'll delete the message here, since I got valuable feedback on SO. Thanks again. \$\endgroup\$benjist– benjist2019年08月10日 17:36:58 +00:00Commented Aug 10, 2019 at 17:36
-
\$\begingroup\$ Added test cases just for completeness. I'll delete this post here nevertheless in a moment. \$\endgroup\$benjist– benjist2019年08月10日 17:41:09 +00:00Commented Aug 10, 2019 at 17:41
-
\$\begingroup\$ It is undefined behavior to unlock a mutex from another thread that the one that lock it: en.cppreference.com/w/cpp/thread/mutex/unlock. It might happen to works with your compiler but your code won't work elsewhere or might fail in a future version, different compiler options or with debug checks activated. \$\endgroup\$Phil1970– Phil19702019年08月10日 17:51:54 +00:00Commented Aug 10, 2019 at 17:51
1 Answer 1
Most of this has already been discussed here: https://stackoverflow.com/questions/57441479/avoiding-deadlock-in-concurrent-waiting-object
Issue 1
Mutex are owned by a thread so it is undefined behavior to call done()
from a different thread that the one that create the Ticket
.
Issue 2
If a mutex is locked by a thread that already own the lock the behavior is undefined: https://en.cppreference.com/w/cpp/thread/mutex/lock
Issue 3
Depending on the actual implementation, which thread call which function and in which order, there are potential for deadlock, unprotected section and other undefined behavior.
Essentially, std::mutex
seems to let the implementation decide if the mutex is owned or not and if it is recursive or not.
A mutex should not be used as a binary semaphore as it will not always works as it depends on undefined behavior.
Issue 4
_doneLock
is not recommandable as a variable name. The lock is name is confusing given that std::mutex
is used with std::unique_lock
or std::gard_lock
Issue 5
Using 2 locks which can be locked in different order can lead to deadlock (see my answer on Stack overflow for details).
Issue 6
Your test are just as problematics. The main problem is that you call done
is a loop. So it is possible that thread1
in ticket5
case would increment _done
twice while thread2
never increment it. It is also possible that _done
will be greater than expected because of that or that you use the result while one thread is still running actively.
Also all you test start by sleeping so you do not properly test the case a task would finish very early.
Suggestion
For proper testing, I think you should also try to add some sleep
to the original code for testing purpose like before the call to waitUntilDone
to validate that your code also works if some or all threads finish before you wait on them.
Also it might be useful to try that in Ticket
class too in particular just before or after some lock/unlock to somehow simulate what would happen in a thread switch happen at that point.