I would like to make a portable thread pool using boost::thread
. Some one could tell me to look at boost::thread_group
and boost::asio
. I know, but I am not really familiar with these things so I don't know if they are suitable for my purpose.
Please give me comments on what I supposed to do here.
JobItem and JobQueue:
class JobItem
{
private:
uint32_t job_type_; //!< Job type
char* job_ptr_; //!< Pointer to job content
// ...
};
class JobQueue
{
private:
queue<JobItem*> job_queue_;
public:
// Definitely I need to use mutex and condition variable here
JobItem* pick();
void push(JobItem* job_item);
};
JobWorker (worker thread):
class JobWorker
{
private:
JobQueue* shared_job_queue_;
boost::thread the_thread_;
public:
JobWorker(JobQueue* shared_queue) : shared_job_queue_(shared_queue);
// exec_job will be overridden by concrete subclasses
virtual void exec_job(JobItem* job_item) = 0;
void run()
{
while(true)
{
JobItem* job = shared_job_queue_->pick();
exec_job(job);
delete job;
}
}
void start()
{
the_thread_ = boost::thread(boost::bind(&JobWorker::run, this));
}
void stop()
{
the_thread_.join();
}
};
Concrete JobWorker subclasses:
class Task_1_Worker : public JobWorker
{
// override exec_job
void exec_job(JobItem* job)
{
// Process job item here
}
}
class Task_2_Worker : public JobWorker
{
// override exec_job
void exec_job(JobItem* job)
{
// Process job item here
}
}
Now, WorkerPool with template:
template <class JobWorkerT>
class WorkerPool
{
private:
JobQueue shared_job_queue_;
list<JobWorkerT> job_workers_;
public:
WorkerPool(const int num_threads);
~WorkerPool(void) {};
void push(JobItem* job)
{
shared_job_queue_.push(job);
}
void start();
void stop();
};
template <class JobWorkerT>
WorkerPool<JobWorkerT>::WorkerPool(const int num_threads)
{
for(int i=0; i<num_threads; i++)
{
JobWorkerT job_worker(&shared_job_queue_);
job_workers_.push_back(job_worker);
}
}
template <class JobWorkerT>
void WorkerPool<JobWorkerT>::start()
{
for(std::list<JobWorkerT>::iterator iter = job_workers_.begin(); iter != job_workers_.end(); iter++)
{
((JobWorkerT)(*iter)).start();
}
}
template <class JobWorkerT>
void WorkerPool<JobWorkerT>::stop()
{
for(std::list<JobWorkerT>::iterator iter = job_workers_.begin(); iter != job_workers_.end(); iter++)
{
((JobWorkerT)(*iter)).stop();
}
}
main()
int main()
{
WorkerPool<Task_1_Worker>* task1_grp = new WorkerPool<Task_1_Worker>();
WorkerPool<Task_2_Worker>* task2_grp = new WorkerPool<Task_2_Worker>();
task1_grp->start();
task2_grp->start();
// Producers should call task1_grp.push(JobItem*) or task2_grp.push(JobItem*) to dispatch new jobs to thread pools
}
1 Answer 1
Design
Not sure why you have a virtual function on the worker?
virtual void exec_job(JobItem* job_item) = 0;
The worker is simple he picks up jobs and then does them. What the job is defines what work should be done. So I would have put the virtual function that defines the work on the JobItem.
The run()
function of the worker is then simply.
void run()
{
// You want some way for the thread to eventually exit.
// You can make that happen by letting the queue return
// a null object when the object is being shut down.
JobItem* job;
while(job = shared_job_queue_->pick())
{
job->do();
delete job;
}
}
Code Review
Prefer not to use pointers.
Pointers don't have the concept of ownership semantics. So we don't know who is responsible for deleting them. Use a type that conveys ownership semantics so the person reading the code understands the semantics.
JobQueue* shared_job_queue_;
You always need a job queue. So this can never be nullptr
. So rather than a pointer a better choice would have been a reference. Also a reference implies there is no ownership of the object so you are not supposed to delete it.
class JobWorker
{
private:
JobQueue& shared_job_queue_;
public:
JobWorker(JobQueue& shared_queue)
: shared_job_queue_(shared_queue)
{}
Again in the JobQueue
. You push and pop pointers. So there is no concept of ownership.
class JobQueue
{
private:
queue<JobItem*> job_queue_;
public:
// Definitely I need to use mutex and condition variable here
JobItem* pick();
void push(JobItem* job_item);
};
As a user of this class I could easily accidently pass the address of a Job object that is actual an automatic variable. Unless I read the code it is not obvious that the code will blow up. Even worse the code may not blow up for a while since the pointer is not deleted until after the work has been done.
In this case I would use a smart pointer. The obvious one is std::unique_ptr
. When you use this as a parameter you are indicating that you are accepting ownership of the object.
class JobQueue
{
private:
// You are maintaining a queue of owned pointers.
// You will take responsibility for deleting them when this
// object is destroyed.
queue<std::unique_ptr<JobItem>> job_queue_;
public:
// This method returns a pointer.
// More importantly it is returning ownership of the pointer to
// you uniquely so you are the sole owner of the pointer and the
// JobQueue is no longer retaining any pointers to this object.
std::unique_ptr<JobItem> pick();
// This method accepts a unique_ptr.
// Because it accepts the unique_ptr by value you are loosing
// ownership. Once this method is called you will no longer own
// the object it will be gone from the object you passed.
void push(std::unique_ptr<JobItem> job_item);
};
Use modern C++
Its 2016. C++11 has been out for 5 years. C++14 has been out for two years. Both have std::thread
and the associated parallel libraries.
-
\$\begingroup\$ Actually, in this design, JobItem is just a data piece. Based on job type, exec_job will properly manipulate on the data. Also, due to license, I am using VC++ 2008, so there is no way for me to target to C++11 or later. I think boost libraries might help. \$\endgroup\$duong_dajgja– duong_dajgja2016年03月12日 05:12:38 +00:00Commented Mar 12, 2016 at 5:12
-
\$\begingroup\$ I am saying that
JobItem
is a better place to put your virtual function. The job should know how to execute itself. This is the open/closed principle. Now when you have to add new functionality you have to have a different worker type which means re-compiling the application. By makeing theJobItem
have the virtual function a new lib can add a different work type without having to re-build the worker type. \$\endgroup\$Loki Astari– Loki Astari2016年03月12日 12:48:05 +00:00Commented Mar 12, 2016 at 12:48 -
\$\begingroup\$ After thinking a lot of
virtual JobItem::execute()
I decided to go on the way you pointed out. Thank you very much! \$\endgroup\$duong_dajgja– duong_dajgja2016年03月17日 06:29:19 +00:00Commented Mar 17, 2016 at 6:29