######################################## 线程 ######################################## 线程标示 **************************************** 与进程 ID 不同的是,线程 ID 只在其所属的进程上下文中才有意义。另外线程 ID 的类型是 pthread_t,一般实现为结构体,而不是类似于 pid_t 那样的数字。线程 ID 的比较需要使用指定的函数: +---------------------------+---------------+ | 函数 | 用途 | +===========================+===============+ | pthread_self() | 获取线程的 ID | +---------------------------+---------------+ | pthread_euqal(tid1, tid2) | 比较线程 ID | +---------------------------+---------------+ 创建线程 **************************************** 创建线程需要的函数为: .. code-block:: c int pthread_create(pthread_t* tid, const pthread_attr_t* attr, void(*fun)(void*), void* arg) 四个参数为: +------+------+-----------------------------------+ | 参数 | 方向 | 作用 | +======+======+===================================+ | tid | 输出 | 用来获取新建线程的 tid,可为 null | +------+------+-----------------------------------+ | attr | 输入 | 用来设置线程的属性 | +------+------+-----------------------------------+ | fun | 输入 | 线程将要运行的函数 | +------+------+-----------------------------------+ | arg | 输入 | fun 的参数 | +------+------+-----------------------------------+ 从第四个参数可以看出,fun 的参数只能有一个。在 C++ 中,更多的参数可以使用 bind 来绑定 - 新线程会继承进程的进程空间 - 新线程继承父线程的浮点环境和信号屏蔽字 - 新线程不会继承父线程的挂起信号集 - 进程中线程之间的关系是平等的,相互可以发信号和通知关闭 - 进程中的信号处理函数是公用的 .. code-block:: cpp int main(int argc, char* argv[]) { pthread_t parId = pthread_self(); pthread_t chID; pthread_create( &chID, nullptr, [](void* arg) -> void* { pthread_t id = pthread_self(); cout << pthread_equal(id, *(pthread_t*)arg); return nullptr; }, &parId ); pthread_join(chID, nullptr); return 0; } 从上面可以看到 C++ 中的 Lambda 与 pthread 配合的例子。 另外,和 pthread_join 相对应的函数是 pthread_detach .. important:: 在上述例子中,尽管我们可以将 chID 传递给子线程,但是子线程不能使用它。原因是子线程的运行可能早于 chID 初始化的时机 线程在执行以下语句后会被终止: - 调用 return 语句 - 调用 pthread_exit - 被其它线程取消 +------------------------------------------------------+----------+ | 函数 | 作用 | +======================================================+==========+ | int pthread_exit(pthread_t tid, void\*\* return_val) | 退出线程 | +------------------------------------------------------+----------+ | int pthread_cancel(pthread_t\* tid) | 取消线程 | +------------------------------------------------------+----------+ 取消线程只是一个请求,线程可以通过以下函数自由安排取消线程时执行的函数: +------------------------------------------------------------+--------------+ | 函数 | 作用 | +============================================================+==============+ | void pthread_cleanup_push(void(\*fun)(void\*), void\* arg) | 添加清理程序 | +------------------------------------------------------------+--------------+ | void pthread_cleanup_pop(int execute) | 删除处理程序 | +------------------------------------------------------------+--------------+ 某些平台上,这两个函数的实现为宏。另外,这两个函数 **必须** 配套使用 互斥锁 **************************************** 唯一需要注意的是互斥锁请求锁的动作是进入阻塞状态而不是忙等。也就是说互斥锁不会无意义地浪费 CPU 互斥锁有两种:普通锁和超时锁,两者都使用 pthread_mutex_t,但是加锁的函数不同。但无论是哪一种,都需要对互斥锁进行初始化 初始化的放手有两种: - 调用 pthread_mutex_init - 使用常量 PTHREAD_MUTEX_INITIALIZER 初始化 .. code-block:: cpp int pthread_mutex_init(pthread_mutex_t* mtx, const pthread_mutexattr_t* attr); int pthread_mutex_lock(pthread_mutex_t* mtx); int pthread_mutex_timedlock(pthread_mutex_t* mtx, const timespec* time); int pthread_mutex_try_lock(pthread_mutex_t* mtx); int pthread_mutex_unlock(pthread_mutex_t* mtx); - pthread_try_lock 失败的返回值为 EBUSY,成功的返回值为 0 - pthread_mutex_timedlock 超时的返回值为 ETIMEOUT,其参数 time 是 **绝对时间** 而不是相对时间 互斥锁的属性使用 pthread_mutexattr_t 表示,使用时需要进行初始化,结束时需要进行析构 .. code-block:: c int pthread_mutexattr_init(pthread_mutexattr_t* attr); int pthread_mutexattr_destory(pthread_mutexattr_t* attr); pthread_mutexattr_init 会用默认的属性对 attr 进行初始化。要设置其它属性,需要使用额外的函数,这些属性中比较重要的有三个:进程共享属性、健壮属性、类型属性 进程共享属性用来设置锁是否能被其它进程更改,其有两个值: +-------------------------+----------------------------+ | 值 | 作用 | +=========================+============================+ | PTHREAD_PROCESS_PRIVATE | 锁只能被此进程内的线程更改 | +-------------------------+----------------------------+ | PTHREAD_PROCESS_SHARED | 锁可用于进程间同步 | +-------------------------+----------------------------+ 这个属性由以下函数设置: .. code-block:: c int pthread_mutexattr_getpshared(const pthread_mutexattr_t* attr, int* pshared); int pthread_mutexattr_setpshared(pthread_mutexattr_t* attr, int pshared); 健壮属性用于进程间互斥锁,其控制了持有锁的进程结束时锁恢复的问题 简单来说,当持有锁的进程终止而其有没释放锁时,系统有两种行为: +-----------------------+-----------------------------------------------------------------+ | 值 | 作用 | +=======================+=================================================================+ | PTHREAD_MUTEX_STALLED | 系统不采取任何动作,行为未定义 | +-----------------------+-----------------------------------------------------------------+ | PTHREAD_MUTEX_ROBUST | 其它进程会成功调用 pthread_mutex_lock 并得到的返回值 EOWNERDEAD | +-----------------------+-----------------------------------------------------------------+ .. code-block:: c int pthread_mutexattr_getrobust(const pthread_mutexattr_t* attr, int* pshared); int pthread_mutexattr_setrobust(pthread_mutexattr_t* attr, int pshared); 也就是说,当我们使用了 PTHREAD_MUTEX_SHARED 后调用 pthread_mutex_lock 会陷入三种状态之一: - 失败 - 成功且不需要恢复的锁 - 成功但需要恢复的锁 对于第三种情况,需要调用函数 pthread_mutex_consistent 来恢复锁的状态 .. code-block:: c int pthread_mutex_consistent(pthread_mutex_t* mtx); 如果锁在需要恢复的时候没有恢复而是直接解锁,那么其它尝试获取锁的进程就会得到错误码 ENOTRECOVERABKE,这种情况一旦发生,互斥锁就不再可用 锁的类型属性控制了锁的锁定特性,有四种类型: +--------------------------+--------------------------------------+ | 值 | 作用 | +==========================+======================================+ | PTHREAD_MUTEX_NORMAL | 标准互斥锁,不提供错误检查和死锁检测 | +--------------------------+--------------------------------------+ | PTHREAD_MUTEX_ERRORCHECK | 提供错误检查的互斥锁 | +--------------------------+--------------------------------------+ | PTHREAD_MUTEX_RECURSIVE | 可被有锁的线程加锁 | +--------------------------+--------------------------------------+ | PTHREAD_MUTEX_DEFAULT | 被映射为上述三种锁之一 | +--------------------------+--------------------------------------+ .. note:: 对于 Linux 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL,而对于 FreeBSD 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_ERRORCHECK 设置此属性需要以下函数: .. code-block:: c int pthread_mutexattr_gettype(const pthread_mutexattr_t* attr, int* type); int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int* type); 读写锁 **************************************** 读写锁在其它场景下又被称为 :abbr:`共享锁 (Shared Lock)` 和 :abbr:`排它锁 (eXclusive Lock)` ,是一种读友好的锁。其具有以下特性: - 加读锁时不允许其它线程写,但是允许其它线程读 - 加写锁是不允许其它线程读写 读写锁的构造函数和析构函数为: .. code-block:: c int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* attr); int pthread_rwlock_destory(pthread_rwlock_t* rwlock); 读写锁也可以通过静态变量 PTHREAD_RWLOCK_INITIALIZER 初始化 .. important:: 读写锁必须要执行析构函数 对读写锁加读锁和写锁并解锁的接口为: .. code-block:: c int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock); // 加读锁 int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock); // 加写锁 int pthread_rwlock_unlock(pthread_rwlock_t* rwlock); // 解锁 读写锁也提供了 try 版本: .. code-block:: c int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock); // 加读锁 int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock); // 加写锁 加锁成功时返回 0,失败时返回 EBUSY 也提供了具有超时功能的加锁功能: .. code-block:: c int pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock); // 加读锁 int pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock); // 加写锁 读写锁唯一支持的属性就是 *进程共享* 属性,此功能与互斥锁的进程属性是一样的 条件变量 **************************************** 与互斥锁不同的是,条件变量将所有需要资源的线程放到一个队列中,并使其陷入阻塞中,当有资源可用时,可以通过唤醒一个或所有线程来消费资源。由此可见,条件变量是无锁的 尽管条件变量是无锁的,但是条件本身不是原子的,因此需要锁来对其进行保护。 条件变量的使用必须进行构造和析构: .. code-block:: c int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t attr); int pthread_cond_destory(pthread_cond_t* cond); 使用条件变量获取资源: .. code-block:: c int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mtx); int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mtx, const timespec* time); 如果超时还没得到条件变量,则返回 ETIMEOUT 使用条件变量释放资源: .. code-block:: c int pthread_cond_signal(pthread_cond_t* cond); // 至少唤醒一个线程 int pthread_cond_broadcast(pthread_cond_t* cond); // 唤醒所有进程 条件变量较关键的属性为:进程共享和时钟属性 时钟属性控制了 pthread_cond_timedwait 采用哪个时钟 .. code-block:: c int pthread_condattr_getclock(const pthread_condattr_t* attr, clockid_t clock_id); 允许的时钟 ID 为: +--------------------------+--------------------------+ | 标识符 | 功能 | +==========================+==========================+ | CLOCK_REALTIME | 实时系统时间 | +--------------------------+--------------------------+ | CLOCK_MONOTONIC | 不带负跳数的实时系统时间 | +--------------------------+--------------------------+ | CLOCK_PROCESS_CPUTIME_ID | 调用进程的 CPU 时间 | +--------------------------+--------------------------+ | CLOCK_THREAD_CPUTIME_ID | 调用线程的 CPU 时间 | +--------------------------+--------------------------+ 自旋锁 **************************************** 和互斥锁在无法获取锁时进入阻塞状态不同,自旋锁使用忙等的形式不断尝试获取锁。因此自旋锁只适用于锁被持有的时间很短的情况下 .. note:: 实际上现在互斥锁的实现非常高效,一些互斥锁在尝试获取互斥量之前或自旋一段时间。 自旋锁在使用前和使用后需要进行初始化和销毁: .. code-block:: c int pthread_spin_init(pthread_spinlock_t *lock, int pshared); int pthread_spin_destroy(pthread_spinlock_t *lock); 自旋锁唯一的属性就是进程间共享属性,当 pshared == PTHREAD_PROCESS_PRIVATE 时,表明锁是进程间私有的,而当 pshared == PTHREAD_PROCESS_SHARED 时,表明锁可以被不属于当前进程的线程获取 其和互斥锁的接口类似: .. code-block:: c int pthread_spin_lock(pthread_spinlock_t *lock); int pthread_spin_trylock(pthread_spinlock_t *lock); int pthread_spin_unlock(pthread_spinlock_t *lock); .. important:: 不要在持有自旋锁的情况下调用可能使进程陷入休眠状态的函数 在未初始化的自旋锁上调用 pthread_spin_lock 的行为是未定义的,在未加锁的自旋锁上调用 ptherad_spin_unlock 也是未定义的。 实际上我们可以使用互斥锁自己实现一个自旋锁: .. code-block:: c void spin_lock(pthread_mutex_t* mtx){ while(pthread_mutex_try_lock(mtx) == EBUSY){ yield(); } } 其中 yield 的作用是让出 CPU 。这样,spin_lock 会在无法获取锁的时候不断循环,直至得到锁 屏障 **************************************** 屏障用于将多个进程阻塞至某个点。pthread_join 就是一种屏障,它允许一个线程等待另一个线程退出 屏障必须进行初始化和销毁: .. code-block:: c int pthread_barrier_init(pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count); int pthread_barrier_destroy(pthread_barrier_t *barrier); int pthread_barrier_wait(pthread_barrier_t *barrier); count 参数表明了多少线程调用 pthread_barrier_wait 时线程才会被唤醒。 对于任意线程,只有一个线程 pthread_barrier_wait 的返回值为 PTHREAD_BARRIER_SERIAL_THREAD,其余线程均返回为 0,因此一个线程可以被视为主线程,其余线程被视为辅助线程 和锁相同,屏障也可以被重用 事务内存 **************************************** GCC 对事务内存的支持被存放在库 libitm 中,请参阅 `GNU libitm `_ 线程池 **************************************** 线程池是池化技术的一种,所谓池化技术,就是: - 以空间换取时间 - 提高资源的利用率 - 减少每次获取资源的消耗 - 实现对象的复用 当线程的创建销毁时间明显影响影响了系统的效率时(每个线程计算量很小),应当考虑使用线程池。另一方面,线程池会自动管理线程的数量,不会因为大量线程的创建导致系统效率低下 以下整理自 `Linux C/C++ 后台开发,150行代码,手写线程池(完整版)`_ .. _`Linux C/C++ 后台开发,150行代码,手写线程池(完整版)`: https://www.bilibili.com/video/BV1AT4y13791 线程池包含三部分组件: - 执行队列(线程) - 任务队列(task) - 管理组件 使用 C++ 可以方便地创建一个线程池: .. code-block:: cpp #include #include #include #include #include class ThreadPool { std::condition_variable cond_; std::mutex mutex_; bool terminate_ = false; std::queue works_; std::queue> jobs_; ThreadPool() { unsigned numWorksers = std::thread::hardware_concurrency(); for(size_t i = 0; i < numWorksers; ++i) { auto t = std::thread(&ThreadPool::callback, this); // thread 只有移动语义 works_.push(std::move(t)); } } protected: void callback() { while(true) { // unique_lock 会自动加锁 std::unique_lock lock(mutex_); cond_.wait(lock, [this]() { return !jobs_.empty() || this->terminate_; // 当 jobs_ 不为空或者 terminate_ == true 时跳出等待 }); if(this->terminate_) break; auto job = jobs_.front(); jobs_.pop(); lock.unlock(); job(); } } public: static ThreadPool& getInstance() { static ThreadPool pool; return pool; } ~ThreadPool() { this->terminate_ = true; mutex_.lock(); cond_.notify_all(); mutex_.unlock(); while(!this->works_.empty()) { std::thread t = std::move(works_.front()); works_.pop(); t.join(); } } void push(std::function func) { mutex_.lock(); jobs_.push(func); cond_.notify_one(); mutex_.unlock(); } }; 对于线程池中线程数量的选择,应遵循以下原则: `skyformat99/thread_pool`_ - 计算密集型任务:线程个数 = CPU个数 - I/O密集型任务:线程个数 > CPU个数 .. _`skyformat99/thread_pool`: https://github.com/skyformat99/thread_pool .. seealso:: - `How can I detect that a thread pool work item is taking too long? `_