I have need for a task scheduler determined by a directed graph. The tasks are held in a std::vector<task_type>
, while the dependency graph is held in a adjacency_list<vecS, vecS, bidirectionalS>
(bidirectionalS
so that I have access to the in_degree()
function). A single dispatch will start the tasks. Are there obvious improvements here?
To set up a task dependency graph as below, use the driver provided.
scheduler_driver.hpp:
#ifndef __SCHEDULER_DRIVER_HPP__
#define __SCHEDULER_DRIVER_HPP__
#include <iostream>
#include <ostream>
#include <iterator>
#include <vector>
#include <chrono>
#include "scheduler.h"
#endif
scheduler_driver.cpp:
#include "scheduler_driver.hpp"
enum task_nodes
{
task_0,
task_1,
task_2,
task_3,
task_4,
task_5,
task_6,
task_7,
task_8,
task_9,
N
};
int basic_task(int a, int d)
{
std::chrono::milliseconds sleepDuration(d);
std::this_thread::sleep_for(sleepDuration);
std::cout << "Result: " << a << "\n";
return a;
}
using namespace SCHEDULER;
int main(int argc, char **argv)
{
typedef int R;
typedef std::function<R()> F;
Graph deps(N);
boost::add_edge(task_0, task_1, deps);
boost::add_edge(task_0, task_2, deps);
boost::add_edge(task_0, task_3, deps);
boost::add_edge(task_1, task_4, deps);
boost::add_edge(task_1, task_5, deps);
boost::add_edge(task_1, task_6, deps);
boost::add_edge(task_2, task_7, deps);
boost::add_edge(task_2, task_8, deps);
boost::add_edge(task_2, task_9, deps);
std::vector<F> tasks =
{
std::bind(basic_task, 0, 1000),
std::bind(basic_task, 1, 1000),
std::bind(basic_task, 2, 1000),
std::bind(basic_task, 3, 1000),
std::bind(basic_task, 4, 1000),
std::bind(basic_task, 5, 1000),
std::bind(basic_task, 6, 1000),
std::bind(basic_task, 7, 1000),
std::bind(basic_task, 8, 1000),
std::bind(basic_task, 9, 1000)
};
scheduler<R> *s = new scheduler<R>(std::move(deps), std::move(tasks));
s->doit();
return 0;
}
scheduler.h:
#ifndef __SCHEDULER2_H__
#define __SCHEDULER2_H__
#include <iostream>
#include <vector>
#include <iterator>
#include <functional>
#include <algorithm>
#include <mutex>
#include <thread>
#include <future>
#include <boost/graph/graph_traits.hpp>
#include <boost/graph/adjacency_list.hpp>
#include <boost/graph/depth_first_search.hpp>
#include <boost/graph/visitors.hpp>
using namespace boost;
namespace SCHEDULER
{
using Graph = adjacency_list<vecS, vecS, bidirectionalS>;
using Edge = graph_traits<Graph>::edge_descriptor;
using Vertex = graph_traits<Graph>::vertex_descriptor;
using VectexCont = std::vector<Vertex>;
using outIt = graph_traits<Graph>::out_edge_iterator;
using inIt = graph_traits<Graph>::in_edge_iterator;
template<typename R>
class scheduler
{
public:
using ret_type = R;
using fun_type = std::function<R()>;
using prom_type = std::promise<ret_type>;
using fut_type = std::shared_future<ret_type>;
scheduler() = default;
scheduler(const Graph &deps_, const std::vector<fun_type> &tasks_) :
g(deps_),
tasks(tasks_) { init_();}
scheduler(Graph&& deps_, std::vector<fun_type>&& tasks_) :
g(std::move(deps_)),
tasks(std::move(tasks_)) { init_(); }
scheduler(const scheduler&) = delete;
scheduler& operator=(const scheduler&) = delete;
void doit();
private:
void init_();
std::list<Vertex> get_sources(const Vertex& v);
auto task_thread(fun_type&& f, int i);
Graph g;
std::vector<fun_type> tasks;
std::vector<prom_type> prom;
std::vector<fut_type> fut;
std::vector<std::thread> th;
std::vector<std::list<Vertex>> sources;
};
template<typename R>
void
scheduler<R>::init_()
{
int num_tasks = tasks.size();
prom.resize(num_tasks);
fut.resize(num_tasks);
// Get the futures
for(size_t i=0;
i<num_tasks;
++i)
{
fut[i] = prom[i].get_future();
}
// Predetermine in_edges for faster traversal
sources.resize(num_tasks);
for(size_t i=0;
i<num_tasks;
++i)
{
sources[i] = get_sources(i);
}
}
template<typename R>
std::list<Vertex>
scheduler<R>::get_sources(const Vertex& v)
{
std::list<Vertex> r;
Vertex v1;
inIt j, j_end;
boost::tie(j,j_end) = in_edges(v, g);
for(;j != j_end;++j)
{
v1 = source(*j, g);
r.push_back(v1);
}
return r;
}
template<typename R>
auto
scheduler<R>::task_thread(fun_type&& f, int i)
{
auto j_beg = sources[i].begin(),
j_end = sources[i].end();
for(;
j_beg != j_end;
++j_beg)
{
R val = fut[*j_beg].get();
}
return std::thread([this](fun_type f, int i)
{
prom[i].set_value(f());
},f,i);
}
template<typename R>
void
scheduler<R>::doit()
{
size_t num_tasks = tasks.size();
th.resize(num_tasks);
for(int i=0;
i<num_tasks;
++i)
{
th[i] = task_thread(std::move(tasks[i]), i);
}
for_each(th.begin(), th.end(), mem_fn(&std::thread::join));
}
} // namespace SCHEDULER
#endif
1 Answer 1
Here are some things that may help you improve your code.
Be careful with signed and unsigned
In the doit
routine and various others, the code compares an int i
to a size_t num_tasks
, but size_t
is unsigned and int
is signed. Instead, declare both variables as size_t
types. Also, see the next suggestion.
Be careful with namespace assumptions
The C++ standard says that size_t
is defined in the std
namespace. It might also be defined as ::size_t
(and often is) but it's not guaranteed. Better would be to use std::size_t
explicitly.
Avoid using namespace
in headers
The scheduler.h
header includes this line:
using namespace boost;
This may seem convenient, but it means that any code that includes this header has now brought everything from the boost
namespace into the global namespace which is a recipe for annoying name clashes later. If, for example, we used both your header and code like this:
#include "scheduler_driver.hpp"
#include <array>
#include <boost/array.hpp>
void clash() {
using namespace std;
array<array<int, 5>, 5> m;
}
it would fail to compile because of the name clash on array
. In this artificial example, it's easy to spot, but in real code it's a headache that will have a future programmer cursing your name. It's easy to fix -- just remove that line and add the boost::
namespace prefix explicitly where needed.
Avoid data races
The basic_task
function accesses std::cout
but that's a single resource that might simultaneously be used by other threads. One fix for this is to use a std::mutex
like this:
static std::mutex cout_mutex;
// wherever cout is used:
some_function() {
std::lock_guard<std::mutex> lock(cout_mutex);
std::cout << "Now we can do this safely!\n";
}
Note that std::lock_guard
is intended to be used via RAII so that the lock is obtained when the object is created and released when it's destroyed, making it easy to avoid the bug of forgetting to unlock the mutex.
I know this is just test code, but if the actual function called within your code uses shared resources, you should apply the same technique.
Consider exceptions
If valid()
is not true
when std::future::get()
is called, the behavior is undefined. One way that can happen is if the future
throws an exception before the computation of the result is complete. That's not going to happen with this toy code, but if you're going to apply this scheduler to real tasks, that should be taken into account.
Consider an alternative representation
The graph-based mechanism you current have seems to work, but I wonder if it might not be made simpler. In particular, one thing that comes to mind is the use of a std::priority_queue
in which the priority is simply the depth of that node in the dependency tree. The difference is that although it would mean that none of the leaf tasks would get executed until all of the higher priority tasks were executed, the priority queue has the benefit of constant lookup time. Might not be a big difference with this code, (yours is linear) but it might be worth considering if you have a very large number of tasks.
"Don't throw away your future"
OK, that sounds like it should be fatherly advice on life, but it's actually a lot more minor than that. Within the task_thread()
routine, the value of the future.get()
is assigned to val
but never used.
Explore related questions
See similar questions with these tags.