3
\$\begingroup\$

I've implemented a "Ticket" class which is shared as a shared_ptr between multiple threads.

The program flow is like this:

  1. parallelQuery() is called to start a new query job. A shared instance of Ticket is created.
  2. The query is split into multiple tasks, each task is enqueued on a worker thread (this part is important, otherwise I'd just join threads and done). Each task gets the shared ticket.
  3. ticket.wait() is called to wait for all tasks of the job to complete.
  4. When one task is done it calls the done() method on the ticket.
  5. When all tasks are done the ticket is unlocked, result data from the task aggregated and returned from parallelQuery()

In pseudo code:

 std::vector<T> parallelQuery(std::string str) {
 auto ticket = std::make_shared<Ticket>(2);
 auto task1 = std::make_unique<Query>(ticket, str+"a");
 addTaskToWorker(task1);
 auto task2 = std::make_unique<Query>(ticket, str+"b");
 addTaskToWorker(task2);
 ticket->waitUntilDone();
 auto result = aggregateData(task1, task2);
 return result;
 }

My code works. But it is a critical part of my application and I want a second opinion, especially in regards to locking and unlocking when the task is complete.

Here is the Ticket class:

#include <mutex>
#include <atomic>
 class Ticket {
 public:
 Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) {
 _mutex.lock();
 }
 void waitUntilDone() {
 _doneLock.lock();
 if (_done != _numTasks) {
 _doneLock.unlock();
 _mutex.lock();
 }
 else {
 _doneLock.unlock();
 }
 }
 void done() {
 _doneLock.lock();
 _done++;
 if (_done == _numTasks) {
 _mutex.unlock();
 }
 _doneLock.unlock();
 }
 void cancel() {
 _canceled = true;
 _mutex.unlock();
 }
 bool wasCanceled() {
 return _canceled;
 }
 bool isDone() {
 return _done >= _numTasks;
 }
 int getNumTasks() {
 return _numTasks;
 }
 private:
 std::atomic<int> _numTasks;
 std::atomic<int> _done;
 std::atomic<bool> _canceled;
 // mutex used for caller wait state
 std::mutex _mutex;
 // mutex used to safeguard done counter with lock condition in waitUntilDone
 std::mutex _doneLock;
 };

Just for completeness, here are the test cases:

#include <thread>
#include <atomic>
#include <unistd.h>
#include "catch.hpp"
#include "Ticket.hpp"
Ticket ticket1;
std::atomic<bool> stopTicketRunner(false);
TEST_CASE("Testing Master Ticket wait") {
 std::thread thread = std::thread([] {
 while (!stopTicketRunner) {
 usleep(100000);
 if (!ticket1.isDone())
 ticket1.done();
 }
 });
 ticket1.waitUntilDone();
 REQUIRE(ticket1.isDone());
 REQUIRE(!ticket1.wasCancelled());
 stopTicketRunner = true;
 thread.join();
}
Ticket ticket2;
std::atomic<bool> stopTicketCancelRunner(false);
TEST_CASE("Testing Master Ticket cancel") {
 std::thread thread = std::thread([] {
 while (!stopTicketCancelRunner) {
 usleep(100000);
 if (!ticket2.wasCancelled())
 ticket2.cancel();
 }
 });
 ticket2.waitUntilDone();
 REQUIRE(!ticket2.isDone());
 REQUIRE(ticket2.wasCancelled());
 stopTicketCancelRunner = true;
 thread.join();
}
Ticket ticket3;
std::atomic<bool> stopTicketFinishRunner(false);
TEST_CASE("Testing Master Ticket fast finish (finished before wait called)") {
 std::thread thread = std::thread([] {
 while (!stopTicketFinishRunner) {
 if (!ticket3.isDone())
 ticket3.done();
 usleep(100000);
 }
 });
 usleep(1000000);
 REQUIRE(ticket3.isDone());
 ticket3.waitUntilDone();
 REQUIRE(ticket3.isDone());
 REQUIRE(!ticket3.wasCancelled());
 stopTicketFinishRunner = true;
 thread.join();
}
Ticket ticket4(3);
std::atomic<bool> stopTicketMultiRunner(false);
TEST_CASE("Testing Master Ticket multiple tasks)") {
 std::thread thread1 = std::thread([] {
 while (!stopTicketMultiRunner) {
 usleep(500000);
 if (!ticket4.isDone())
 ticket4.done();
 }
 });
 std::thread thread2 = std::thread([] {
 while (!stopTicketMultiRunner) {
 usleep(300000);
 if (!ticket4.isDone())
 ticket4.done();
 }
 });
 std::thread thread3 = std::thread([] {
 while (!stopTicketMultiRunner) {
 usleep(100000);
 if (!ticket4.isDone())
 ticket4.done();
 }
 });
 REQUIRE(!ticket4.isDone());
 REQUIRE(ticket4.getNumTasks() == 3);
 ticket4.waitUntilDone();
 REQUIRE(ticket4.isDone());
 REQUIRE(!ticket4.wasCancelled());
 stopTicketMultiRunner = true;
 thread1.join();
 thread2.join();
 thread3.join();
}
Ticket ticket5(3);
std::atomic<bool> stopTicketMultiCancelRunner(false);
TEST_CASE("Testing canceling Ticket with multiple tasks") {
 std::thread thread1 = std::thread([] {
 while (!stopTicketMultiCancelRunner) {
 usleep(5000000);
 if (!ticket5.isDone())
 ticket5.done();
 }
 });
 std::thread thread2 = std::thread([] {
 while (!stopTicketMultiCancelRunner) {
 usleep(300000);
 if (!ticket5.isDone() && !ticket5.wasCancelled())
 ticket5.cancel();
 }
 });
 std::thread thread3 = std::thread([] {
 while (!stopTicketMultiCancelRunner) {
 usleep(1000000);
 if (!ticket5.isDone())
 ticket5.done();
 }
 });
 REQUIRE(!ticket5.isDone());
 REQUIRE(ticket5.getNumTasks() == 3);
 ticket5.waitUntilDone();
 REQUIRE(!ticket5.isDone());
 REQUIRE(ticket5.wasCancelled());
 stopTicketMultiCancelRunner = true;
 thread1.join();
 thread2.join();
 thread3.join();
}
asked Aug 9, 2019 at 22:35
\$\endgroup\$
4
  • \$\begingroup\$ On Stack overflow (stackoverflow.com/questions/57441479/…), you told you have a problem and here you say that your code works! \$\endgroup\$ Commented Aug 10, 2019 at 17:28
  • \$\begingroup\$ My code does work, and all tests pass. Your mentioned problem that done must be called on the same thread did not occur in practice. My appologies though for putting the question up here for a general code review, I should have mentioned this on SO. I'll delete the message here, since I got valuable feedback on SO. Thanks again. \$\endgroup\$ Commented Aug 10, 2019 at 17:36
  • \$\begingroup\$ Added test cases just for completeness. I'll delete this post here nevertheless in a moment. \$\endgroup\$ Commented Aug 10, 2019 at 17:41
  • \$\begingroup\$ It is undefined behavior to unlock a mutex from another thread that the one that lock it: en.cppreference.com/w/cpp/thread/mutex/unlock. It might happen to works with your compiler but your code won't work elsewhere or might fail in a future version, different compiler options or with debug checks activated. \$\endgroup\$ Commented Aug 10, 2019 at 17:51

1 Answer 1

3
\$\begingroup\$

Most of this has already been discussed here: https://stackoverflow.com/questions/57441479/avoiding-deadlock-in-concurrent-waiting-object

Issue 1

Mutex are owned by a thread so it is undefined behavior to call done() from a different thread that the one that create the Ticket.

Issue 2

If a mutex is locked by a thread that already own the lock the behavior is undefined: https://en.cppreference.com/w/cpp/thread/mutex/lock

Issue 3

Depending on the actual implementation, which thread call which function and in which order, there are potential for deadlock, unprotected section and other undefined behavior.

Essentially, std::mutex seems to let the implementation decide if the mutex is owned or not and if it is recursive or not.

A mutex should not be used as a binary semaphore as it will not always works as it depends on undefined behavior.

Issue 4

_doneLock is not recommandable as a variable name. The lock is name is confusing given that std::mutex is used with std::unique_lock or std::gard_lock

Issue 5

Using 2 locks which can be locked in different order can lead to deadlock (see my answer on Stack overflow for details).

Issue 6

Your test are just as problematics. The main problem is that you call done is a loop. So it is possible that thread1 in ticket5 case would increment _done twice while thread2 never increment it. It is also possible that _done will be greater than expected because of that or that you use the result while one thread is still running actively.

Also all you test start by sleeping so you do not properly test the case a task would finish very early.

Suggestion

For proper testing, I think you should also try to add some sleep to the original code for testing purpose like before the call to waitUntilDone to validate that your code also works if some or all threads finish before you wait on them.

Also it might be useful to try that in Ticket class too in particular just before or after some lock/unlock to somehow simulate what would happen in a thread switch happen at that point.

answered Aug 10, 2019 at 18:38
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.