This is (supposedly) a multi-threaded scheduler for one-time and/or repeating tasks. The tasks are simple std::function<void()>
objects. I built it to be a crucial part of a larger project I'm working on, but I developed it stand-alone, so no context is missing for a review.
I'm making heavy use of C++11 language and library features (especially thread support and chrono stuff).
Tasks are supposed to be scheduled by specifying a start time_point
, or a delay (converted to a time_point
by adding it to now()
.) An optional duration specifies repeat intervals for the task (if it's non-zero).
It should be possible to de-schedule tasks, preventing them from being started for execution from then on. (Already running tasks won't be stopped, to keep things a bit simpler, and also because I couldn't figure out a clean way to do it anyway.)
I've never done anything with multithreading of this scale/complexity, and in case my brain never recovers from repeatedly being torn into 5 or more threads, I'd like to get some review/feedback from others. Specifically, race conditions/deadlocks/other threading-unpleasantness I didn't spot, lifetime issues, or really anything problematic.
Some simple code at the very bottom demonstrates how it's meant to be used. It seemed to work when compiled with clang 3.3 and libc++.
#include <chrono>
#include <condition_variable>
#include <deque>
#include <list>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
namespace scheduling {
template <class Clock>
class Scheduler {
typedef Clock clock_type;
typedef typename clock_type::time_point time_point;
typedef typename clock_type::duration duration;
typedef std::function<void()> task_type;
private:
struct Task {
public:
Task (task_type&& task, const time_point& start, const duration& repeat) : task(std::move(task)), start(start), repeat(repeat) { }
task_type task;
time_point start;
duration repeat;
bool operator<(const Task& other) const {
return start < other.start;
}
};
public:
typedef typename std::list<Task>::iterator task_handle;
private:
std::mutex mutex;
std::condition_variable tasks_updated;
std::deque<task_handle> todo;
std::condition_variable modified;
bool running;
std::list<Task> tasks;
std::list<task_handle> handles;
std::vector<std::thread> threads;
public:
Scheduler() : threads(4) {
}
~Scheduler() {
halt();
}
task_handle schedule(task_type&& task, const time_point& start, const duration& repeat=duration::zero()) {
task_handle h;
{
std::lock_guard<std::mutex> lk(mutex);
h = tasks.emplace(tasks.end(), std::move(task), start, repeat);
handles.push_back(h);
}
tasks_updated.notify_all();
return h;
}
task_handle schedule(task_type&& task, const duration& delay=duration::zero(), const duration& repeat=duration::zero()) {
return schedule(std::move(task, clock_type::now()+delay, repeat));
}
void unschedule(const task_handle& handle) {
{
std::lock_guard<std::mutex> lk(mutex);
auto handle_it = std::find(handles.begin(), handles.end(), handle);
if (handle_it != handles.end()) {
tasks.erase(handle);
todo.remove(handle);
handles.erase(handle_it);
}
}
tasks_updated.notify_all();
}
void clear() {
{
std::lock_guard<std::mutex> lk(mutex);
tasks.clear();
handles.clear();
}
tasks_updated.notify_all();
}
void run() {
{
std::lock_guard<std::mutex> lk(mutex);
if (running) return;
running = true;
for (auto& t : threads) {
t = std::thread([this]{this->loop();});
}
}
while (true) {
std::unique_lock<std::mutex> lk(mutex);
if (!running) break;
auto task_it = min_element(tasks.begin(), tasks.end());
time_point next_task = task_it == tasks.end() ? clock_type::time_point::max() : task_it->start;
if (tasks_updated.wait_until(lk, next_task) == std::cv_status::timeout) {
if (task_it->repeat != clock_type::duration::zero()) {
task_it->start += task_it->repeat;
}
else {
handles.remove(task_it);
tasks.erase(task_it);
}
todo.push_back(task_it);
modified.notify_all();
}
}
for (auto& t : threads) {
t.join();
}
}
void halt() {
{
std::lock_guard<std::mutex> lk(mutex);
if (!running) return;
running = false;
}
tasks_updated.notify_all();
modified.notify_all();
}
private:
void loop() {
while (true) {
std::function<void()> f;
{
std::unique_lock<std::mutex> lk(mutex);
while (todo.empty() && running) {
modified.wait(lk);
}
if (!running) {
return;
}
f = todo.front()->task;
todo.pop_front();
}
f();
}
}
};
}
#include <iostream>
void outp(const std::string& outp) {
static std::mutex m;
std::lock_guard<std::mutex> lk(m);
std::cout << std::this_thread::get_id() << ": " << outp << std::endl;
}
int main(int argc, char* argv[]) {
scheduling::Scheduler<std::chrono::steady_clock> sched;
sched.schedule([&sched]{outp("Task 1");}, std::chrono::steady_clock::now());
sched.schedule([&sched]{outp("Task 2");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 3");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 4");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 5");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 6");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 7");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 8");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 9");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 10"); sched.halt(); }, std::chrono::steady_clock::now()+std::chrono::seconds(5));
sched.run();
}
1 Answer 1
It looks/sounds like you're trying to build a sort of thread pool. If that's the case, take a look at this StackExchange link for some help on what it is (a thread design pattern). For your purposes you essentially have a worker queue (a single threaded thread pool), where the only difference is (as you are already doing) you use a mutex object to block the 'running' of the current scheduled task instead of a semaphore object.
Just some notes on your code that I could see:
Line 35, you have std::deque<task_handle> todo
while a task_handle
is defined as a std::list<Task>::iterator
. If you're using a deque
for performance reasons, consider switching all container types to a deque
as I didn't see any code that inherently 'needs' a list (efficiency of removing/adding in the middle of the list vs. at begin/end).
There's also a lot of extra 'scope/control' braces to help 'control' the flow of the mutexes (i.e. Java way of doing mutex locks) via an 'auto mutex locker' object. I get the need for the control braces, but what's happening under the hood is more complex:
void clear() {
{ // <- stack enters new location
std::lock_guard<std::mutex> lk(mutex); // stack consumes memory for 'auto' object, then mutex.lock is called and control returned
tasks.clear();
handles.clear();
} // stack destroys 'auto lock' object which then calls mutex.unlock
tasks_updated.notify_all();
}
Consider the following instead:
void clear() { // no extra stack (aside from normal operation)
mutex.lock();
tasks.clear();
handles.clear();
mutex.unlock();
tasks_updated.notify_all();
} // no extra stack calls and actually 1 less line of code
Line 103: time_point next_task = task_it == tasks.end() ? clock_type::time_point::max() : task_it->start;
While technically correct it could potentially lead to some confusion down the road if this ever produces a bug; it is your code so as long as you can read it it's all good.
Your private loop function is an infinite loop:
void loop() {
while (true) {
std::unique_lock<std::mutex> lk(mutex);
while (todo.empty() && running) {
modified.wait(lk);
}
if (!running) {
return;
}
f = todo.front()->task;
todo.pop_front();
}
}
consider this instead (just replaced while (true) with while (running)):
void loop() {
while (running) {
std::unique_lock<std::mutex> lk(mutex);
while (todo.empty() && running) {
modified.wait(lk);
}
if (!running) {
return;
}
f = todo.front()->task;
todo.pop_front();
}
}
Also, why is Scheduler
templated for the clock type instead of just being explicit or derive some classes off of a scheduler that has a specific clock type?
You also have task_type
declared as a std::function<void()>
which means a function that returns void
and takes no arguments, yet you're passing in functions that take a std::string
as an argument (this is only in your demo code), while technically it might work you're also possibly smashing the stack when you do this (a big no-no).
You also have a struct
stuffed in a class (your Task
structure) that looks likes it's only used specifically for the Scheduler
class, instead consider just making the member variables in your struct
private member variables of your Scheduler
class.
One last note: it's usually considered 'best' practice to group your member scope accessors (public/private/protected), instead of spreading the keywords where needed.
I can refactor the code an post what I see if you would like, otherwise I hope this all helps.
-
\$\begingroup\$ Thanks for your review, and merry christmas! I do use 4 threads (
:44
). Good tip with list/deque - I'll be changing that. The scopes were for exceptions. (possibly in the future; I'll leave it this way, but I acknowlege your concerns ;P) Infinite loop becauserunning
should be accessed under a lock. Template because subclasses are more complex than i thought i could ever need. I am not passing void(std::string)'s.Scheduler
uses multiple instances ofTask
, it's used like a named tuple. I'll reorder public/private. Sorry this is so short and brutal, 600 character limit and all... \$\endgroup\$tobyp– tobyp2013年12月26日 22:51:19 +00:00Commented Dec 26, 2013 at 22:51
Explore related questions
See similar questions with these tags.
std::function
is not free and its type erasure mechanism comes at a run-time cost. Perhaps it's worth considering using templates with perfect forwarding to accept anything callable, similarly to howstd::async
does it (see how it takesFunction
): en.cppreference.com/w/cpp/thread/async \$\endgroup\$boost::packaged_task
(e.g., "task constructor") for a more complete example: boost.org/doc/libs/release/doc/html/thread/… Each lambda function is of a distinct "closure type" ("a unique unnamed non-union non-aggregate type", typically implemented as a function object): en.cppreference.com/w/cpp/language/lambda \$\endgroup\$<string>
and<algorithm>
without including those headers. \$\endgroup\$