Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Gaaagaa/xthreadpool

Repository files navigation

C++11 —— 使用 thread 实现线程池

1. 引言

在新的 C++11 标准中,引入并发编程的一些基础组件:线程(thread)互斥锁(mutex)条件变量(condition_variable) 等,凭借这些,就足够我设计一个平台无关的 线程池 组件了。下面就详细介绍一下这个线程池组件。

2. 结构设计图

x_threadpool_t

需要特别说明的是,这个线程池组件,在增加了"存在关联性的任务对象顺序执行"的功能后,原本的任务队列就分成了两级任务队列,目的是为了降低 "任务提交""任务提取" 之间(属于一种生产/消费的关系)的锁竞争。

3. 源码说明

源码有点多,这里就不贴出来了,直接给下载地址:https://github.com/Gaaagaa/xthreadpool 。主要的线程池类 x_threadpool_txthreadpool.h 中已完整实现,在实际项目应用中,只需要 xthreadpool.h 这一个文件就足够了。

测试程序的编译命令:

  1. MSVC++2017:cl /EHsc main.cpp
  2. gcc :g++ -Wall -std=c++11 -lpthread -o main main.cpp

技术特点:

  1. 使用 C++11 的 thread 实现,可跨平台,亲测的编译器有 MSVC++2017、gcc 4.8.5、gcc 8.2.0;
  2. 支持传统的面向对象编程的任务对象类接口:继承抽象任务对象接口类,实现多态(可结合对象池的模式进行资源复用);
  3. 支持泛型接口的任务对象,如:C 函数接口、lambda 表达式、仿函数对象、类对象的成员函数调用;
  4. 支持动态调整工作线程的数量:通过 resize() 接口实现,该功能当前的实现方式还不够好,仍有优化的方式;
  5. 支持运行时的线程状态检测(判断当前工作线程是否需要退出);
  6. 存在关联性的任务对象可顺序执行(这一特点只针对一些特别的应用场景,后续示例中会展示)。

4. 使用说明与示例代码

4.1 启动与关闭

#include "xthreadpool.h"
#include <chrono>
#include <stdio.h>
int main(int argc, char * argv[])
{
 // 工作线程数量
 // 若为 0,将取 hardware_concurrency() 返回值的 2倍 + 1
 int nthreads = 4;
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(nthreads))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 提交任务对象
 // ......
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

4.2 提交"传统 C 回调函数"的任务对象

#include "xthreadpool.h"
#include <chrono>
#include <stdio.h>
void func_task(int task_id, int task_iter)
{
 int count = 1;
 do
 {
 printf("func_task[%d, %d] => count: %d\n", task_id, task_iter, count);
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 } while (count++ < 10);
}
int main(int argc, char * argv[])
{
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(0))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 提交任务对象 : C 函数接口的任务
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task_ex(func_task, iter, iter * iter);
 }
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

4.3 提交"仿函数对象"的任务对象

#include "xthreadpool.h"
#include <chrono>
#include <stdio.h>
/**
 * @struct functor_task_A
 * @brief 仿函数模式的任务对象类。
 */
struct functor_task_A
{
 // constructor/destructor
public:
 functor_task_A(int xtask_id = 0) : m_xtask_id(xtask_id) { }
public:
 void operator()() const
 {
 int count = 1;
 do
 {
 printf("functor_task_A[%d] => count: %d\n", m_xtask_id, count);
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 } while (count++ < 10);
 }
 // data members
private:
 int m_xtask_id;
};
/**
 * @struct functor_task_B
 * @brief 仿函数模式的任务对象类。
 */
struct functor_task_B
{
 // constructor/destructor
public:
 functor_task_B(int xtask_id = 0) : m_xtask_id(xtask_id) { }
public:
 void operator()(int flag) const
 {
 int count = 1;
 do
 {
 printf("functor_task_B[%d, %d] => count: %d\n", m_xtask_id, flag, count);
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 } while (count++ < 10);
 }
 // data members
private:
 int m_xtask_id;
};
int main(int argc, char * argv[])
{
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(0))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 提交任务对象 : 仿函数对象
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task_ex((functor_task_A(iter)));
 }
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task_ex((functor_task_B(iter)), iter * iter / 2);
 }
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

4.4 提交"调用类对象的成员函数"的任务对象

#include "xthreadpool.h"
#include <chrono>
#include <stdio.h>
/**
 * @class memfunc_task
 * @brief 调用成员函数的任务对象类。
 */
class memfunc_task
{
 // constructor/destructor
public:
 memfunc_task(int xtask_id = 0) : m_xtask_id(xtask_id) { }
 // overrides
public:
 /**********************************************************/
 /**
 * @brief 任务对象执行流程的操作接口。
 */
 void memfunc(int task_iter)
 {
 int count = 1;
 do
 {
 printf("memfunc_task[%d, %d] => count: %d\n", m_xtask_id, task_iter, count);
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 } while (count++ < 10);
 }
 // data members
private:
 int m_xtask_id;
};
int main(int argc, char * argv[])
{
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(0))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 提交任务对象
 // 注意,这个栈区对象的生命期需要在线程池关闭前存活,
 // 至少要保证所提交的任务都执行完成时,才可结束该对象的生命期
 memfunc_task mftask(0);
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task_ex(&memfunc_task::memfunc, &mftask, iter);
 }
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

4.5 提交"重载后的任务对象类"的任务对象

#include "xthreadpool.h"
#include <chrono>
#include <stdio.h>
/**
 * @class user_task
 * @brief 用户自定义的任务对象类。
 */
class user_task : public x_task_t
{
 // constructor/destructor
public:
 user_task(int xtask_id = 0) : m_xtask_id(xtask_id) { }
 // overrides
public:
 /**********************************************************/
 /**
 * @brief 任务对象执行流程的操作接口。
 */
 virtual void run(x_running_checker_t * xchecker_ptr) override
 {
 int count = 1;
 do
 {
 printf("[%d]user_task[%d] => count: %d\n", (int)xchecker_ptr->thread_index(), m_xtask_id, count);
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 } while ((count++ < 10) && xchecker_ptr->is_enable_running());
 }
 // data members
private:
 int m_xtask_id;
};
int main(int argc, char * argv[])
{
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(0))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 提交任务对象 : 重载的任务对象
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task((x_task_ptr_t)(new user_task(iter)));
 }
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

4.6 提交"lambda表达式"的任务对象

#include "xthreadpool.h"
#include <chrono>
#include <stdio.h>
int main(int argc, char * argv[])
{
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(0))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 提交任务对象 : lambda 表达式
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task_ex(
 [iter]() -> void
 {
 int task_id = iter;
 for (int jter = 0; jter < 10; ++jter)
 {
 printf("lambda A task id : %d -> %d\n", task_id, task_id + jter);
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 }
 });
 }
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task_ex(
 [iter](const std::string & str_name) -> void
 {
 int task_id = iter;
 for (int jter = 0; jter < 10; ++jter)
 {
 printf("lambda B task id : %d -> %d name : %s\n", task_id, task_id + jter, str_name.c_str());
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 }
 }, std::string("Lambda B"));
 }
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

4.7 任务对象执行过程中检测工作线程是否可继续运行

x_threadpool_t 内部提供了一个类 x_running_checker_t 可检测当前工作线程是否可继续执行下去(或者说,是否要立即终止执行的任务流程)。提供此功能,主要目的是针对那些耗时长的任务对象(处于运行状态),在线程池关闭或者需要动态调整(减少)工作线程数量时,能够优雅地终止任务流程。参看如下代码是如何使用的:

#include "xthreadpool.h"
#include <chrono>
#include <stdio.h>
int main(int argc, char * argv[])
{
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(0))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 提交任务对象 : lambda 表达式(检测工作线程的退出标识)
 for (int iter = 0; iter < 100; iter += 10)
 {
 xht_pool.submit_task_ex(
 [iter](x_running_checker_t * xchecker_ptr) -> void
 {
 int task_id = iter;
 for (int jter = 0; (jter < 10) && xchecker_ptr->is_enable_running(); ++jter)
 {
 printf("rchecker[%d] lambda A task id : %d -> %d\n", (int)xchecker_ptr->thread_index(), task_id, task_id + jter);
 std::this_thread::sleep_for(std::chrono::milliseconds(100));
 }
 },
 x_running_checker_t::xholder());
 }
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

4.8 使某一批任务对象在线程池中可按提交次序而顺序执行

单看 "按提交次序而顺序执行" 这句话,可能不好理解,先看后面提到的这样应用场景。

A 对象产生(需要执行)的任务对象序列为[ A1, A2, ..., Am ],并按此顺序依次提交到线程池中;与此同时,B 对象产生(需要执行)的任务对象序列为[ B1, B2, ..., Bn ],也按此顺序依次提交到线程池中。而不管是 A 对象还是 B 对象,都期望按所提交顺序执行任务对象。这种情况下,在线程池中的任务队列可能有如下图所示的排列:

order

从本质上看,我们只需要保证 A(或 B) 对象产生的任务在同一时刻只能有一个工作线程执行,这就能保证其顺序性。 面对这种应用场景,我在设计任务对象的抽象基类 x_task_t 时,就为此做好了相关的扩展接口。先看下 x_task_t 代码中的两个接口:

struct x_task_t
{
 ......
 // extensible interfaces
public:
 ......
 /**********************************************************/
 /**
 * @brief 判断 任务对象 是否挂起。
 * @note 若任务对象处于挂起状态,工作线程提取任务时,则跳过该对象。
 */
 virtual bool is_suspend(void) const { return false; }
 /**********************************************************/
 /**
 * @brief 设置任务对象的运行标识。
 * 
 * @note
 * <pre>
 * 工作线程在提取到任务对象后,则立即调用 set_running_flag(true) 操作;
 * 执行 run() 操作返回后,又调用 set_running_flag(false) 操作。
 * </pre>
 */
 virtual void set_running_flag(bool xrunning_flag) { }
 ......
};

再看下 x_threadpool_t 工作线程 "提取任务对象" 以及 "执行任务对象" 的部分实现流程(注意任务对象 is_suspend() 和 set_running_flag() 两个接口被调用的地方):

class x_threadpool_t
{
 ......
 x_task_ptr_t get_task(void)
 {
 ......
 for (std::list< x_task_ptr_t >::iterator itlst = m_lst_run_tasks.begin();
 (itlst != m_lst_run_tasks.end()) && is_enable_get_task();
 ++itlst)
 {
 if ((nullptr == *itlst) || !(*itlst)->is_suspend())
 {
 xtask_ptr = *itlst;
 m_lst_run_tasks.erase(itlst);
 m_xst_lst_tasks.fetch_sub(1);
 break;
 }
 }
 if (nullptr != xtask_ptr)
 {
 xtask_ptr->set_running_flag(true);
 }
 return xtask_ptr;
 }
 ......
 void thread_run(size_t xthread_index)
 {
 ......
 xtask_ptr = get_task();
 if (nullptr == xtask_ptr)
 {
 if (get_lst_task_size() > 0)
 thread_yield(xcounter);
 continue;
 }
 if (xht_checker.is_enable_running())
 {
 xtask_ptr->run(&xht_checker);
 }
 // 执行完任务对象后,将任务对象转换为 非挂起状态,
 // 加锁进行操作,是为了与 get_task() 内的操作保持队列的同步
 {
 // 标识当前不可提取待执行的任务对象,迫使 get_task() 内部迅速解锁
 m_xst_get_task.fetch_add(1);
 m_lock_run_task.lock();
 xtask_ptr->set_running_flag(false);
 m_lock_run_task.unlock();
 m_xst_get_task.fetch_sub(1);
 }
 ......
 }
 ......
};

我们可通过重载 x_task_t 所提供的两个扩展接口:is_suspend() 和 set_running_flag() 进行状态切换(即在调用 set_running_flag() 时让相关任务切换 is_suspend() 挂起/非挂起 两种状态),就可实现顺序执行。

示例代码如下所示:

#include "xthreadpool.h"
#include <chrono>
#include <string>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
////////////////////////////////////////////////////////////////////////////////
class x_order_task_t;
/**
 * @class objectT
 * @brief 产生 x_order_task_t 任务对象序列的测试类。
 */
class objectT
{
 // constructor/destructor
public:
 objectT(const std::string & xstr_name)
 : m_xrunning_flag(false)
 , m_seqno_taskid(0)
 , m_xstr_name(xstr_name)
 {
 }
 // public interfaces
public:
 inline void set_running(bool xrunning_flag)
 {
 m_xrunning_flag = xrunning_flag;
 }
 inline bool is_running(void) const
 {
 return m_xrunning_flag;
 }
 inline const std::string & name(void) const
 {
 return m_xstr_name;
 }
 x_order_task_t * new_task(void);
 // data members
private:
 bool m_xrunning_flag;
 int m_seqno_taskid;
 std::string m_xstr_name;
};
/**
 * @class x_order_task_t
 * @brief 保证顺序执行的测试任务对象。
 */
class x_order_task_t : public x_threadpool_t::x_task_t
{
 // constructor/destructor
public:
 x_order_task_t(objectT * obj_owner, int taskid)
 : m_obj_owner(obj_owner)
 , m_taskid(taskid)
 {
 }
 virtual ~x_order_task_t(void) { }
 // overrides
public:
 virtual void run(x_running_checker_t * xchecker_ptr) override
 {
 printf("%s:%02d\n", m_obj_owner->name().c_str(), m_taskid);
 }
 virtual bool is_suspend(void) const override
 {
 return m_obj_owner->is_running();
 }
 virtual void set_running_flag(bool xrunning_flag) override
 {
 m_obj_owner->set_running(xrunning_flag);
 }
 // data members
private:
 objectT * m_obj_owner;
 int m_taskid;
};
x_order_task_t * objectT::new_task(void)
{
 return (new x_order_task_t(this, m_seqno_taskid++));
}
//====================================================================
int main(int argc, char * argv[])
{
 // 线程池对象
 x_threadpool_t xht_pool;
 // 启动线程池
 if (!xht_pool.startup(0, true))
 {
 printf("startup return false!\n");
 return -1;
 }
 //======================================
 // 便于测试而增加的代码
 // 提交任务对象 : 使线程池的所有工作线程处于暂停状态
 bool is_pause_pool = true;
 for (int iter = 0, nthds = (int)xht_pool.size(); iter < nthds + 1; ++iter)
 {
 xht_pool.submit_task_ex(
 [&is_pause_pool]() -> void
 {
 while (is_pause_pool)
 {
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 }
 });
 }
 //======================================
 // 提交任务对象 : 按提交顺序执行任务对象
 srand(time(NULL));
 objectT objA("objA");
 objectT objB("objB");
 for (int iter = 0; iter < 100; ++iter)
 {
 if (0 == (rand() % 2))
 {
 xht_pool.submit_task(objA.new_task());
 }
 else
 {
 xht_pool.submit_task(objB.new_task());
 }
 }
 // 使线程池开始工作(便于测试而增加的代码)
 is_pause_pool = false;
 //======================================
 // 等待所有任务执行完成
 while (xht_pool.task_count() > 0)
 std::this_thread::sleep_for(std::chrono::milliseconds(1));
 // 关闭线程池
 xht_pool.shutdown();
 return 0;
}

看一下测试的输出结果:

objA:00
objB:00
objB:01
objB:02
objB:03
objB:04
objA:01
objA:02
objA:03
objB:05
objA:04
objB:06
objB:07
objB:08
objB:09
objA:05
objA:06
objB:10
objB:11
objB:12
objA:07
objA:08
objA:09
objA:10
objB:13
objA:11
objA:12
objB:14
objB:15
objB:16
objB:17
objB:18
objB:19
objA:13
objA:14
objA:15
objA:16
objA:17
objA:18
objA:19
objB:20
objA:20
objA:21
objA:22
objB:21
objB:22
objB:23
objA:23
objB:24
objA:24
objB:25
objB:26
objB:27
objB:28
objB:29
objA:25
objB:30
objA:26
objA:27
objB:31
objB:32
objB:33
objA:28
objA:29
objA:30
objB:34
objA:31
objA:32
objB:35
objA:33
objA:34
objB:36
objA:35
objA:36
objB:37
objA:37
objA:38
objA:39
objA:40
objB:38
objB:39
objB:40
objA:41
objA:42
objA:43
objA:44
objA:45
objB:41
objA:46
objA:47
objA:48
objB:42
objA:49
objB:43
objA:50
objA:51
objA:52
objA:53
objA:54
objB:44

经检测,这已经达到我们所期望的结果。

About

使用 C++11 新标准 thread 线程对象实现的线程池类。

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors

Languages

AltStyle によって変換されたページ (->オリジナル) /