线程
线程标示
与进程 ID 不同的是,线程 ID 只在其所属的进程上下文中才有意义。另外线程 ID 的类型是 pthread_t,一般实现为结构体,而不是类似于 pid_t 那样的数字。线程 ID 的比较需要使用指定的函数:
函数 |
用途 |
|---|---|
pthread_self() |
获取线程的 ID |
pthread_euqal(tid1, tid2) |
比较线程 ID |
创建线程
创建线程需要的函数为:
int pthread_create(pthread_t* tid, const pthread_attr_t* attr, void(*fun)(void*), void* arg)
四个参数为:
参数 |
方向 |
作用 |
|---|---|---|
tid |
输出 |
用来获取新建线程的 tid,可为 null |
attr |
输入 |
用来设置线程的属性 |
fun |
输入 |
线程将要运行的函数 |
arg |
输入 |
fun 的参数 |
从第四个参数可以看出,fun 的参数只能有一个。在 C++ 中,更多的参数可以使用 bind 来绑定
新线程会继承进程的进程空间
新线程继承父线程的浮点环境和信号屏蔽字
新线程不会继承父线程的挂起信号集
进程中线程之间的关系是平等的,相互可以发信号和通知关闭
进程中的信号处理函数是公用的
int main(int argc, char* argv[]) {
pthread_t parId = pthread_self();
pthread_t chID;
pthread_create(
&chID, nullptr,
[](void* arg) -> void* {
pthread_t id = pthread_self();
cout << pthread_equal(id, *(pthread_t*)arg);
return nullptr;
},
&parId
);
pthread_join(chID, nullptr);
return 0;
}
从上面可以看到 C++ 中的 Lambda 与 pthread 配合的例子。
另外,和 pthread_join 相对应的函数是 pthread_detach
重要
在上述例子中,尽管我们可以将 chID 传递给子线程,但是子线程不能使用它。原因是子线程的运行可能早于 chID 初始化的时机
线程在执行以下语句后会被终止:
调用 return 语句
调用 pthread_exit
被其它线程取消
函数 |
作用 |
|---|---|
int pthread_exit(pthread_t tid, void** return_val) |
退出线程 |
int pthread_cancel(pthread_t* tid) |
取消线程 |
取消线程只是一个请求,线程可以通过以下函数自由安排取消线程时执行的函数:
函数 |
作用 |
|---|---|
void pthread_cleanup_push(void(*fun)(void*), void* arg) |
添加清理程序 |
void pthread_cleanup_pop(int execute) |
删除处理程序 |
某些平台上,这两个函数的实现为宏。另外,这两个函数 必须 配套使用
互斥锁
唯一需要注意的是互斥锁请求锁的动作是进入阻塞状态而不是忙等。也就是说互斥锁不会无意义地浪费 CPU
互斥锁有两种:普通锁和超时锁,两者都使用 pthread_mutex_t,但是加锁的函数不同。但无论是哪一种,都需要对互斥锁进行初始化
初始化的放手有两种:
调用 pthread_mutex_init
使用常量 PTHREAD_MUTEX_INITIALIZER 初始化
int pthread_mutex_init(pthread_mutex_t* mtx, const pthread_mutexattr_t* attr);
int pthread_mutex_lock(pthread_mutex_t* mtx);
int pthread_mutex_timedlock(pthread_mutex_t* mtx, const timespec* time);
int pthread_mutex_try_lock(pthread_mutex_t* mtx);
int pthread_mutex_unlock(pthread_mutex_t* mtx);
pthread_try_lock 失败的返回值为 EBUSY,成功的返回值为 0
pthread_mutex_timedlock 超时的返回值为 ETIMEOUT,其参数 time 是 绝对时间 而不是相对时间
互斥锁的属性使用 pthread_mutexattr_t 表示,使用时需要进行初始化,结束时需要进行析构
int pthread_mutexattr_init(pthread_mutexattr_t* attr);
int pthread_mutexattr_destory(pthread_mutexattr_t* attr);
pthread_mutexattr_init 会用默认的属性对 attr 进行初始化。要设置其它属性,需要使用额外的函数,这些属性中比较重要的有三个:进程共享属性、健壮属性、类型属性
进程共享属性用来设置锁是否能被其它进程更改,其有两个值:
值 |
作用 |
|---|---|
PTHREAD_PROCESS_PRIVATE |
锁只能被此进程内的线程更改 |
PTHREAD_PROCESS_SHARED |
锁可用于进程间同步 |
这个属性由以下函数设置:
int pthread_mutexattr_getpshared(const pthread_mutexattr_t* attr, int* pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t* attr, int pshared);
健壮属性用于进程间互斥锁,其控制了持有锁的进程结束时锁恢复的问题
简单来说,当持有锁的进程终止而其有没释放锁时,系统有两种行为:
值 |
作用 |
|---|---|
PTHREAD_MUTEX_STALLED |
系统不采取任何动作,行为未定义 |
PTHREAD_MUTEX_ROBUST |
其它进程会成功调用 pthread_mutex_lock 并得到的返回值 EOWNERDEAD |
int pthread_mutexattr_getrobust(const pthread_mutexattr_t* attr, int* pshared);
int pthread_mutexattr_setrobust(pthread_mutexattr_t* attr, int pshared);
也就是说,当我们使用了 PTHREAD_MUTEX_SHARED 后调用 pthread_mutex_lock 会陷入三种状态之一:
失败
成功且不需要恢复的锁
成功但需要恢复的锁
对于第三种情况,需要调用函数 pthread_mutex_consistent 来恢复锁的状态
int pthread_mutex_consistent(pthread_mutex_t* mtx);
如果锁在需要恢复的时候没有恢复而是直接解锁,那么其它尝试获取锁的进程就会得到错误码 ENOTRECOVERABKE,这种情况一旦发生,互斥锁就不再可用
锁的类型属性控制了锁的锁定特性,有四种类型:
值 |
作用 |
|---|---|
PTHREAD_MUTEX_NORMAL |
标准互斥锁,不提供错误检查和死锁检测 |
PTHREAD_MUTEX_ERRORCHECK |
提供错误检查的互斥锁 |
PTHREAD_MUTEX_RECURSIVE |
可被有锁的线程加锁 |
PTHREAD_MUTEX_DEFAULT |
被映射为上述三种锁之一 |
备注
对于 Linux 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL,而对于 FreeBSD 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_ERRORCHECK
设置此属性需要以下函数:
int pthread_mutexattr_gettype(const pthread_mutexattr_t* attr, int* type);
int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int* type);
读写锁
读写锁在其它场景下又被称为 共享锁 和 排它锁 ,是一种读友好的锁。其具有以下特性:
加读锁时不允许其它线程写,但是允许其它线程读
加写锁是不允许其它线程读写
读写锁的构造函数和析构函数为:
int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* attr);
int pthread_rwlock_destory(pthread_rwlock_t* rwlock);
读写锁也可以通过静态变量 PTHREAD_RWLOCK_INITIALIZER 初始化
重要
读写锁必须要执行析构函数
对读写锁加读锁和写锁并解锁的接口为:
int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock); // 加写锁
int pthread_rwlock_unlock(pthread_rwlock_t* rwlock); // 解锁
读写锁也提供了 try 版本:
int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock); // 加写锁
加锁成功时返回 0,失败时返回 EBUSY
也提供了具有超时功能的加锁功能:
int pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock); // 加写锁
读写锁唯一支持的属性就是 进程共享 属性,此功能与互斥锁的进程属性是一样的
条件变量
与互斥锁不同的是,条件变量将所有需要资源的线程放到一个队列中,并使其陷入阻塞中,当有资源可用时,可以通过唤醒一个或所有线程来消费资源。由此可见,条件变量是无锁的
尽管条件变量是无锁的,但是条件本身不是原子的,因此需要锁来对其进行保护。
条件变量的使用必须进行构造和析构:
int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t attr);
int pthread_cond_destory(pthread_cond_t* cond);
使用条件变量获取资源:
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mtx);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mtx, const timespec* time);
如果超时还没得到条件变量,则返回 ETIMEOUT
使用条件变量释放资源:
int pthread_cond_signal(pthread_cond_t* cond); // 至少唤醒一个线程
int pthread_cond_broadcast(pthread_cond_t* cond); // 唤醒所有进程
条件变量较关键的属性为:进程共享和时钟属性
时钟属性控制了 pthread_cond_timedwait 采用哪个时钟
int pthread_condattr_getclock(const pthread_condattr_t* attr, clockid_t clock_id);
允许的时钟 ID 为:
标识符 |
功能 |
|---|---|
CLOCK_REALTIME |
实时系统时间 |
CLOCK_MONOTONIC |
不带负跳数的实时系统时间 |
CLOCK_PROCESS_CPUTIME_ID |
调用进程的 CPU 时间 |
CLOCK_THREAD_CPUTIME_ID |
调用线程的 CPU 时间 |
自旋锁
和互斥锁在无法获取锁时进入阻塞状态不同,自旋锁使用忙等的形式不断尝试获取锁。因此自旋锁只适用于锁被持有的时间很短的情况下
备注
实际上现在互斥锁的实现非常高效,一些互斥锁在尝试获取互斥量之前或自旋一段时间。
自旋锁在使用前和使用后需要进行初始化和销毁:
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
自旋锁唯一的属性就是进程间共享属性,当 pshared == PTHREAD_PROCESS_PRIVATE 时,表明锁是进程间私有的,而当 pshared == PTHREAD_PROCESS_SHARED 时,表明锁可以被不属于当前进程的线程获取
其和互斥锁的接口类似:
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
重要
不要在持有自旋锁的情况下调用可能使进程陷入休眠状态的函数
在未初始化的自旋锁上调用 pthread_spin_lock 的行为是未定义的,在未加锁的自旋锁上调用 ptherad_spin_unlock 也是未定义的。
实际上我们可以使用互斥锁自己实现一个自旋锁:
void spin_lock(pthread_mutex_t* mtx){
while(pthread_mutex_try_lock(mtx) == EBUSY){
yield();
}
}
其中 yield 的作用是让出 CPU 。这样,spin_lock 会在无法获取锁的时候不断循环,直至得到锁
屏障
屏障用于将多个进程阻塞至某个点。pthread_join 就是一种屏障,它允许一个线程等待另一个线程退出
屏障必须进行初始化和销毁:
int pthread_barrier_init(pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_wait(pthread_barrier_t *barrier);
count 参数表明了多少线程调用 pthread_barrier_wait 时线程才会被唤醒。
对于任意线程,只有一个线程 pthread_barrier_wait 的返回值为 PTHREAD_BARRIER_SERIAL_THREAD,其余线程均返回为 0,因此一个线程可以被视为主线程,其余线程被视为辅助线程
和锁相同,屏障也可以被重用
事务内存
GCC 对事务内存的支持被存放在库 libitm 中,请参阅 GNU libitm
线程池
线程池是池化技术的一种,所谓池化技术,就是:
以空间换取时间
提高资源的利用率
减少每次获取资源的消耗
实现对象的复用
当线程的创建销毁时间明显影响影响了系统的效率时(每个线程计算量很小),应当考虑使用线程池。另一方面,线程池会自动管理线程的数量,不会因为大量线程的创建导致系统效率低下
以下整理自 Linux C/C++ 后台开发,150行代码,手写线程池(完整版)
线程池包含三部分组件:
执行队列(线程)
任务队列(task)
管理组件
使用 C++ 可以方便地创建一个线程池:
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
class ThreadPool {
std::condition_variable cond_;
std::mutex mutex_;
bool terminate_ = false;
std::queue<std::thread> works_;
std::queue<std::function<void(void)>> jobs_;
ThreadPool() {
unsigned numWorksers = std::thread::hardware_concurrency();
for(size_t i = 0; i < numWorksers; ++i) {
auto t = std::thread(&ThreadPool::callback, this);
// thread 只有移动语义
works_.push(std::move(t));
}
}
protected:
void callback() {
while(true) {
// unique_lock 会自动加锁
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]() {
return !jobs_.empty() || this->terminate_; // 当 jobs_ 不为空或者 terminate_ == true 时跳出等待
});
if(this->terminate_) break;
auto job = jobs_.front();
jobs_.pop();
lock.unlock();
job();
}
}
public:
static ThreadPool& getInstance() {
static ThreadPool pool;
return pool;
}
~ThreadPool() {
this->terminate_ = true;
mutex_.lock();
cond_.notify_all();
mutex_.unlock();
while(!this->works_.empty()) {
std::thread t = std::move(works_.front());
works_.pop();
t.join();
}
}
void push(std::function<void(void)> func) {
mutex_.lock();
jobs_.push(func);
cond_.notify_one();
mutex_.unlock();
}
};
对于线程池中线程数量的选择,应遵循以下原则: skyformat99/thread_pool
计算密集型任务:线程个数 = CPU个数
I/O密集型任务:线程个数 > CPU个数