I'm not sure if this site has the right audience for anyone to be able to respond to this, but I wrote a header only scheduler which schedules a task to be run in the future on another thread:
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <chrono>
#include <vector>
#include <memory>
#include <algorithm>
#include <type_traits>
class Scheduler
{
public:
Scheduler();
~Scheduler();
template<class F, class... Args>
std::future<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type>
schedule(F&& f, Args&&... args);
template<class F, class... Args>
std::future<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type>
schedule_after(const std::chrono::steady_clock::duration& d, F&& f, Args&&... args);
template<class F, class... Args>
std::future<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type>
schedule_at(const std::chrono::steady_clock::time_point& t, F&& f, Args&&... args);
void clear();
private:
struct Task
{
std::chrono::steady_clock::time_point time;
std::function<void()> func;
};
struct TaskComparer
{
bool operator()(const Task& left, const Task& right) const { return right.time < left.time; }
};
std::vector<Task> mTasks;
std::mutex mMutex;
std::condition_variable mCv;
bool mExit;
std::thread mThread;
};
inline Scheduler::Scheduler()
: mExit{false}
, mThread{[&]
{
std::unique_lock<std::mutex> lock{mMutex};
for(;;)
{
auto time = mTasks.empty() ? std::chrono::steady_clock::time_point::max() : mTasks.front().time;
if(mCv.wait_until(lock, time, [&]{ return mExit || (!mTasks.empty() && mTasks.front().time != time); }))
{
if(mExit)
break;
}
else if(!mTasks.empty())
{
std::pop_heap(mTasks.begin(), mTasks.end(), TaskComparer{});
auto task = std::move(mTasks.back());
mTasks.pop_back();
lock.unlock();
task.func();
lock.lock();
}
}
}}
{}
inline Scheduler::~Scheduler()
{
{
std::lock_guard<std::mutex> lock{mMutex};
mExit = true;
}
mCv.notify_one();
mThread.join();
}
template<class F, class... Args>
std::future<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type>
Scheduler::schedule(F&& f, Args&&... args)
{
return schedule_at(std::chrono::steady_clock::now(), std::forward<F>(f), std::forward<Args>(args)...);
}
template<class F, class... Args>
std::future<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type>
Scheduler::schedule_after(const std::chrono::steady_clock::duration& d, F&& f, Args&&... args)
{
return schedule_at(std::chrono::steady_clock::now() + d, std::forward<F>(f), std::forward<Args>(args)...);
}
template<class F, class... Args>
std::future<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type>
Scheduler::schedule_at(const std::chrono::steady_clock::time_point& t, F&& f, Args&&... args)
{
auto func = std::make_shared<std::packaged_task<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto future = func->get_future();
{
std::lock_guard<std::mutex> lock{mMutex};
mTasks.push_back(Task{t, [=, func = std::move(func)]{ (*func)(); }});
std::push_heap(mTasks.begin(), mTasks.end(), TaskComparer{});
}
mCv.notify_one();
return future;
}
inline void Scheduler::clear()
{
std::lock_guard<std::mutex> lock{mMutex};
mTasks.clear();
}
Questions:
- Is there any way to implement
finish
(block until pending tasks are done) without adding members/overhead? The user can do this now by storing all the returnedfuture
s andwait
ing for them, but is that too burdensome? - Is there a way to fix or improve the un-optimal/funky
bind
/packaged_task
/shared_ptr
/lambda
/function
setup for storing tasks without adding a whole lot of code? (really, I want astd::unique_function
but it doesn't exist :() (test before recommending anything because I wound up with this solution because clang was crashing on me with some other setups) - Should I take a
duration<Rep, Period>
or asteady_clock::duration
(ditto fortime_point
)? Even if I hide the actualduration
type behind type erasure, I still need access in order to do the compares for the heap. And how would you comparetime_points
from different clocks anyway? If there is a way to take any kind of duration I would prefer it, as that's whatcondition_variable
does. - Should you be able to specify how many threads it uses? Is it legitimate to want multiple threads? If so, should there be a
run
function the user can run on their own threads instead of this owning a thread (and/orrun_some
, blah!)? I'm not into adding things because someone might want it someday - I'm really unsure if it's legitimate to want this to use multiple threads, at least at this level; For example you could add tasks which just callstd::async
with another task to get the same effect. - All the usual code review questions (general improvements, bug fixes, etc?)
1 Answer 1
In the constructor:
, mThread{[&]
{
std::unique_lock<std::mutex> lock{mMutex};
// LOTS MORE CODE.
}}
{}
Took me a moment to realize that this a lambda and a separate thread not the main thread. I think you can make this clearer by actually moving the code into a method or function so we can see that this is being run be a different thread.
You are not providing a guarantee that your mThread
is going to enter the loop before you start scheduling jobs on the thread; is this going to be a problem? If not I would make some explicit comment to that affect.
Could use RAII here:
lock.unlock();
task.func();
lock.lock();
-
\$\begingroup\$ Agreed about the lambda. I can change it to
[&]{ run(); }
and move the code to a privaterun
function. Concerning RAII, it's all RAII-safe there (and there is no given construct to help anyway).lock
is aunique_lock
. Note I amunlock
ing and re-lock
ing, not the other way around which is thelock_guard
case. \$\endgroup\$David– David2015年01月28日 00:29:37 +00:00Commented Jan 28, 2015 at 0:29 -
\$\begingroup\$ Concerning the guarantee, if you add a task before the loop gets started it won't matter. Personally I don't think it's important to call out all the random things that work unconditionally. If it did depend on some random timing (whether the thread entered the loop or you pushed a task in first) it would just be broken, IMO. \$\endgroup\$David– David2015年01月28日 00:35:59 +00:00Commented Jan 28, 2015 at 0:35
-
\$\begingroup\$ The unlock_guard is a commonly requested feature. Alredy in boost. svn.boost.org/trac/boost/ticket/1850 I think it is worth wrapping anything that takes this pattern. \$\endgroup\$Loki Astari– Loki Astari2015年01月28日 00:58:50 +00:00Commented Jan 28, 2015 at 0:58
-
\$\begingroup\$ Well, there's probably a reason it didn't get into the standard. While you need
lock_guard
to manage exceptions safely, there is no potential unsafety in not locking when you leave the scope from a thrown exception. Not that I'm particularly against it, but I'm not bringing in boost as a dependency forunlock_guard
, and I'm not writing some boilerplate for it in this class either. I suppose it would save 1 line of code, heh. \$\endgroup\$David– David2015年01月28日 01:25:14 +00:00Commented Jan 28, 2015 at 1:25
Explore related questions
See similar questions with these tags.