线程

线程标示

与进程 ID 不同的是,线程 ID 只在其所属的进程上下文中才有意义。另外线程 ID 的类型是 pthread_t,一般实现为结构体,而不是类似于 pid_t 那样的数字。线程 ID 的比较需要使用指定的函数:

函数

用途

pthread_self()

获取线程的 ID

pthread_euqal(tid1, tid2)

比较线程 ID

创建线程

创建线程需要的函数为:

int pthread_create(pthread_t* tid, const pthread_attr_t* attr, void(*fun)(void*), void* arg)

四个参数为:

参数

方向

作用

tid

输出

用来获取新建线程的 tid,可为 null

attr

输入

用来设置线程的属性

fun

输入

线程将要运行的函数

arg

输入

fun 的参数

从第四个参数可以看出,fun 的参数只能有一个。在 C++ 中,更多的参数可以使用 bind 来绑定

  • 新线程会继承进程的进程空间

  • 新线程继承父线程的浮点环境和信号屏蔽字

  • 新线程不会继承父线程的挂起信号集

  • 进程中线程之间的关系是平等的,相互可以发信号和通知关闭

  • 进程中的信号处理函数是公用的

int main(int argc, char* argv[]) {
   pthread_t parId = pthread_self();
   pthread_t chID;
   pthread_create(
      &chID, nullptr,
      [](void* arg) -> void* {
         pthread_t id = pthread_self();
         cout << pthread_equal(id, *(pthread_t*)arg);
         return nullptr;
      },
      &parId
   );
   pthread_join(chID, nullptr);
   return 0;
}

从上面可以看到 C++ 中的 Lambda 与 pthread 配合的例子。

另外,和 pthread_join 相对应的函数是 pthread_detach

重要

在上述例子中,尽管我们可以将 chID 传递给子线程,但是子线程不能使用它。原因是子线程的运行可能早于 chID 初始化的时机

线程在执行以下语句后会被终止:

  • 调用 return 语句

  • 调用 pthread_exit

  • 被其它线程取消

函数

作用

int pthread_exit(pthread_t tid, void** return_val)

退出线程

int pthread_cancel(pthread_t* tid)

取消线程

取消线程只是一个请求,线程可以通过以下函数自由安排取消线程时执行的函数:

函数

作用

void pthread_cleanup_push(void(*fun)(void*), void* arg)

添加清理程序

void pthread_cleanup_pop(int execute)

删除处理程序

某些平台上,这两个函数的实现为宏。另外,这两个函数 必须 配套使用

互斥锁

唯一需要注意的是互斥锁请求锁的动作是进入阻塞状态而不是忙等。也就是说互斥锁不会无意义地浪费 CPU

互斥锁有两种:普通锁和超时锁,两者都使用 pthread_mutex_t,但是加锁的函数不同。但无论是哪一种,都需要对互斥锁进行初始化

初始化的放手有两种:

  • 调用 pthread_mutex_init

  • 使用常量 PTHREAD_MUTEX_INITIALIZER 初始化

int pthread_mutex_init(pthread_mutex_t* mtx, const pthread_mutexattr_t* attr);
int pthread_mutex_lock(pthread_mutex_t* mtx);
int pthread_mutex_timedlock(pthread_mutex_t* mtx, const timespec* time);
int pthread_mutex_try_lock(pthread_mutex_t* mtx);
int pthread_mutex_unlock(pthread_mutex_t* mtx);
  • pthread_try_lock 失败的返回值为 EBUSY,成功的返回值为 0

  • pthread_mutex_timedlock 超时的返回值为 ETIMEOUT,其参数 time 是 绝对时间 而不是相对时间

互斥锁的属性使用 pthread_mutexattr_t 表示,使用时需要进行初始化,结束时需要进行析构

int pthread_mutexattr_init(pthread_mutexattr_t* attr);
int pthread_mutexattr_destory(pthread_mutexattr_t* attr);

pthread_mutexattr_init 会用默认的属性对 attr 进行初始化。要设置其它属性,需要使用额外的函数,这些属性中比较重要的有三个:进程共享属性、健壮属性、类型属性

进程共享属性用来设置锁是否能被其它进程更改,其有两个值:

作用

PTHREAD_PROCESS_PRIVATE

锁只能被此进程内的线程更改

PTHREAD_PROCESS_SHARED

锁可用于进程间同步

这个属性由以下函数设置:

int pthread_mutexattr_getpshared(const pthread_mutexattr_t* attr, int* pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t* attr, int pshared);

健壮属性用于进程间互斥锁,其控制了持有锁的进程结束时锁恢复的问题

简单来说,当持有锁的进程终止而其有没释放锁时,系统有两种行为:

作用

PTHREAD_MUTEX_STALLED

系统不采取任何动作,行为未定义

PTHREAD_MUTEX_ROBUST

其它进程会成功调用 pthread_mutex_lock 并得到的返回值 EOWNERDEAD

int pthread_mutexattr_getrobust(const pthread_mutexattr_t* attr, int* pshared);
int pthread_mutexattr_setrobust(pthread_mutexattr_t* attr, int pshared);

也就是说,当我们使用了 PTHREAD_MUTEX_SHARED 后调用 pthread_mutex_lock 会陷入三种状态之一:

  • 失败

  • 成功且不需要恢复的锁

  • 成功但需要恢复的锁

对于第三种情况,需要调用函数 pthread_mutex_consistent 来恢复锁的状态

int pthread_mutex_consistent(pthread_mutex_t* mtx);

如果锁在需要恢复的时候没有恢复而是直接解锁,那么其它尝试获取锁的进程就会得到错误码 ENOTRECOVERABKE,这种情况一旦发生,互斥锁就不再可用

锁的类型属性控制了锁的锁定特性,有四种类型:

作用

PTHREAD_MUTEX_NORMAL

标准互斥锁,不提供错误检查和死锁检测

PTHREAD_MUTEX_ERRORCHECK

提供错误检查的互斥锁

PTHREAD_MUTEX_RECURSIVE

可被有锁的线程加锁

PTHREAD_MUTEX_DEFAULT

被映射为上述三种锁之一

备注

对于 Linux 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL,而对于 FreeBSD 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_ERRORCHECK

设置此属性需要以下函数:

int pthread_mutexattr_gettype(const pthread_mutexattr_t* attr, int* type);
int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int* type);

读写锁

读写锁在其它场景下又被称为 共享锁排它锁 ,是一种读友好的锁。其具有以下特性:

  • 加读锁时不允许其它线程写,但是允许其它线程读

  • 加写锁是不允许其它线程读写

读写锁的构造函数和析构函数为:

int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* attr);
int pthread_rwlock_destory(pthread_rwlock_t* rwlock);

读写锁也可以通过静态变量 PTHREAD_RWLOCK_INITIALIZER 初始化

重要

读写锁必须要执行析构函数

对读写锁加读锁和写锁并解锁的接口为:

int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock); // 加写锁
int pthread_rwlock_unlock(pthread_rwlock_t* rwlock); // 解锁

读写锁也提供了 try 版本:

int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock); // 加写锁

加锁成功时返回 0,失败时返回 EBUSY

也提供了具有超时功能的加锁功能:

int pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock); // 加写锁

读写锁唯一支持的属性就是 进程共享 属性,此功能与互斥锁的进程属性是一样的

条件变量

与互斥锁不同的是,条件变量将所有需要资源的线程放到一个队列中,并使其陷入阻塞中,当有资源可用时,可以通过唤醒一个或所有线程来消费资源。由此可见,条件变量是无锁的

尽管条件变量是无锁的,但是条件本身不是原子的,因此需要锁来对其进行保护。

条件变量的使用必须进行构造和析构:

int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t attr);
int pthread_cond_destory(pthread_cond_t* cond);

使用条件变量获取资源:

int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mtx);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mtx, const timespec* time);

如果超时还没得到条件变量,则返回 ETIMEOUT

使用条件变量释放资源:

int pthread_cond_signal(pthread_cond_t* cond); // 至少唤醒一个线程
int pthread_cond_broadcast(pthread_cond_t* cond); // 唤醒所有进程

条件变量较关键的属性为:进程共享和时钟属性

时钟属性控制了 pthread_cond_timedwait 采用哪个时钟

int pthread_condattr_getclock(const pthread_condattr_t* attr, clockid_t clock_id);

允许的时钟 ID 为:

标识符

功能

CLOCK_REALTIME

实时系统时间

CLOCK_MONOTONIC

不带负跳数的实时系统时间

CLOCK_PROCESS_CPUTIME_ID

调用进程的 CPU 时间

CLOCK_THREAD_CPUTIME_ID

调用线程的 CPU 时间

自旋锁

和互斥锁在无法获取锁时进入阻塞状态不同,自旋锁使用忙等的形式不断尝试获取锁。因此自旋锁只适用于锁被持有的时间很短的情况下

备注

实际上现在互斥锁的实现非常高效,一些互斥锁在尝试获取互斥量之前或自旋一段时间。

自旋锁在使用前和使用后需要进行初始化和销毁:

int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);

自旋锁唯一的属性就是进程间共享属性,当 pshared == PTHREAD_PROCESS_PRIVATE 时,表明锁是进程间私有的,而当 pshared == PTHREAD_PROCESS_SHARED 时,表明锁可以被不属于当前进程的线程获取

其和互斥锁的接口类似:

int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);

重要

不要在持有自旋锁的情况下调用可能使进程陷入休眠状态的函数

在未初始化的自旋锁上调用 pthread_spin_lock 的行为是未定义的,在未加锁的自旋锁上调用 ptherad_spin_unlock 也是未定义的。

实际上我们可以使用互斥锁自己实现一个自旋锁:

void spin_lock(pthread_mutex_t* mtx){
   while(pthread_mutex_try_lock(mtx) == EBUSY){
      yield();
   }
}

其中 yield 的作用是让出 CPU 。这样,spin_lock 会在无法获取锁的时候不断循环,直至得到锁

屏障

屏障用于将多个进程阻塞至某个点。pthread_join 就是一种屏障,它允许一个线程等待另一个线程退出

屏障必须进行初始化和销毁:

int pthread_barrier_init(pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_wait(pthread_barrier_t *barrier);

count 参数表明了多少线程调用 pthread_barrier_wait 时线程才会被唤醒。

对于任意线程,只有一个线程 pthread_barrier_wait 的返回值为 PTHREAD_BARRIER_SERIAL_THREAD,其余线程均返回为 0,因此一个线程可以被视为主线程,其余线程被视为辅助线程

和锁相同,屏障也可以被重用

事务内存

GCC 对事务内存的支持被存放在库 libitm 中,请参阅 GNU libitm

线程池

线程池是池化技术的一种,所谓池化技术,就是:

  • 以空间换取时间

  • 提高资源的利用率

  • 减少每次获取资源的消耗

  • 实现对象的复用

当线程的创建销毁时间明显影响影响了系统的效率时(每个线程计算量很小),应当考虑使用线程池。另一方面,线程池会自动管理线程的数量,不会因为大量线程的创建导致系统效率低下

以下整理自 Linux C/C++ 后台开发,150行代码,手写线程池(完整版)

线程池包含三部分组件:

  • 执行队列(线程)

  • 任务队列(task)

  • 管理组件

使用 C++ 可以方便地创建一个线程池:

#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>

class ThreadPool {
    std::condition_variable cond_;
    std::mutex              mutex_;
    bool                    terminate_ = false;

    std::queue<std::thread>               works_;
    std::queue<std::function<void(void)>> jobs_;

    ThreadPool() {
        unsigned numWorksers = std::thread::hardware_concurrency();
        for(size_t i = 0; i < numWorksers; ++i) {
            auto t = std::thread(&ThreadPool::callback, this);
            // thread 只有移动语义
            works_.push(std::move(t));
        }
    }

protected:
    void callback() {

        while(true) {
            // unique_lock 会自动加锁
            std::unique_lock<std::mutex> lock(mutex_);
            cond_.wait(lock, [this]() {
                return !jobs_.empty() || this->terminate_;    // 当 jobs_ 不为空或者 terminate_  == true 时跳出等待
            });

            if(this->terminate_) break;

            auto job = jobs_.front();
            jobs_.pop();
            lock.unlock();
            job();
        }
    }

public:
    static ThreadPool& getInstance() {
        static ThreadPool pool;
        return pool;
    }
    ~ThreadPool() {
        this->terminate_ = true;
        mutex_.lock();
        cond_.notify_all();
        mutex_.unlock();
        while(!this->works_.empty()) {
            std::thread t = std::move(works_.front());
            works_.pop();
            t.join();
        }
    }

    void push(std::function<void(void)> func) {
        mutex_.lock();
        jobs_.push(func);
        cond_.notify_one();
        mutex_.unlock();
    }
};

对于线程池中线程数量的选择,应遵循以下原则: skyformat99/thread_pool

  • 计算密集型任务:线程个数 = CPU个数

  • I/O密集型任务:线程个数 > CPU个数