线程threads
pthread_create() 创立线程
pthread_join() 等待某线程结束
- 线程是操作系统能够进行运算调度的最小单位
- 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位
顺序、原子性、可见性的丧失
- 原子性: 一个操作是不可中断的,要么全部执行成功要么全部执行失败. 即使单处理器, 也会因为操作系统中断而丧失.
- 顺序性: 如果在本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无序的
- 可见性: 当一个线程修改了共享变量后,其他线程能够立即得知这个修改. 因为处理器的cache而丧失.
因此需要锁.
锁和条件变量的使用
互斥锁
1 | pthread_mutex_lock(&lock); |
- 如果没有别的线程拥有这把锁, 那么当前线程得到锁, 继续运行;
- 如果别的线程持有该锁, 那么当前线程将一直等待直到得到锁.
锁初始化:
1
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
或者
1
2int rc = pthread_mutex_init(&lock, NULL);
assert(rc == 0); // always check success!
条件变量
1 | int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); |
- pthread_cond_wait让线程睡眠, 等待其他线程发出信号
- pthread_cond_signal唤醒正在沉睡中的对应线程
- 初始化:
- pthread_cond_init()
- PTHREAD COND INITIALIZER
锁的实现
硬件层
LOCK指令
- 保证顺序, 原子性和可见性
LL/SC
把一对读-写实现成一对“可能失败”的操作
- Load Linked (LL):先执行一次读操作,并且在地址上做一个“标记”
- Store Conditional (SC):试探性地写入LL相同的地址(通过对内存地址获得独占访问)。如果LL-SC之间没有其他到该地址的写操作,则写入内存,返回SUCC;否则不执行写入(进而不会污染任何内存),并返回FAIL。
自旋锁
开关中断
1
2
3
4
5
6
7
8void lock(lock_t *lk) {
pushcli();
while (atomic_xchg(&lk->locked, 1)) ;
}
void unlock(lock_t *lk) {
atomic_xchg(&lk->locked, 0);
popsti();
}
取消处理器空转, 在自旋失败的时候切换到别的线程执行
失败时将自己加入等待队列
有人释放锁时唤醒
例子: 游泳池的进入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28typedef struct __lock_t {
int flag;
int guard; // 持有guard锁才能更改flag
queue_t *q;
} lock_t;
void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; // acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acqured
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0; // 解锁
park(); // 睡眠
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; // acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0;
else
unpark(queue_remove(m->q)); // 唤醒
m->guard = 0;
}
并发数据结构
- 最简单的做法: 每一个操作都上一把锁.
- 问题: 性能太差
- 计数器: 可用近似计数器, 每个CPU分别计数, 到达一个阈值就更新给全局计数器.
- 链表: 每次list一把大锁; 或者每个结点一把锁
- 队列: 头和尾各一把锁
条件变量
非常直观的同步方法:一个条件变量代表“某个条件满足”,支持:
- 等待某个条件满足后发生
- 某个条件满足,唤醒一个正在等待的线程
- 某个条件满足,唤醒所有正在等待的线程
pthread_cond_wait(pthread_cond_t c, pthread_mutex_t m);
pthread_cond_signal(pthread_cond_t *c);唤醒一个pthread_cond_broadcast(pthread_cond_t *c);唤醒所有
wait()
函数还需要一个互斥锁作为参数。wait()
会解开锁并且让调用者进入睡眠状态(这是一个原子操作)。当调用者被唤醒时,wait()
必须重新上锁,然后才能返回到调用者。实现join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17void worker(int id) {
...
mutex_lock(&mutex);
done[id] = 1; // 可能和read并发执行
signal(&joins);
mutex_unlock(&mutex);
}
int main() {
for (int i = 0; i < nworkers; i++)
create(worker, i);
for (int i = 0; i < nworkers; i++) {
lock(&mutex);
if (!done[i]) {
wait(&joins, &mutex);
}
unlock(&mutex);
}
如果删除了
done
:子线程先运行,父线程进入睡眠状态,永远无法唤醒。如果删除了互斥锁:父线程获取数据的同时子线程修改了数据,产生竞争,父线程随即进入睡眠状态,永远无法唤醒。
注意: 不要先signal再wait
注意: wait前要上锁!!!
signal建议在解锁之前
生产者消费者问题
如果用一个信号变量, 会出现2个问题:
- 问题1: 消费者C1因无商品休眠后, 被唤醒时, C2抢先清空了缓冲区, 但C1仍然认为有商品.
- 问题关键: 没有确保唤醒了的线程立即执行.
- 解决方法: 将count的判断由if改为while.
- 解决的原因: wake up后马上check条件变量是否为1.
- 启示: 总是用while循环来处理条件变量, 多检查一次总是安全的!!!
- 问题2: 消费者可能会唤醒消费者
- 问题关键: 我们需要信号去唤醒线程,但信号的对象必须明确:消费者不能唤醒消费者,只能唤醒生产者,反之亦然。
- 解决方法: 用两个条件变量
- 问题1: 消费者C1因无商品休眠后, 被唤醒时, C2抢先清空了缓冲区, 但C1仍然认为有商品.
生产者和消费者仍然需要支持并发性:需要多个商品缓冲区,可以一次生产/消费多个商品。
虚假唤醒: 反正用while就行.
信号量Semaphores
将生产者消费者问题中的互斥锁, 条件变量, 计数器合三为一.
1
2
3
4
5
6
7
8
9
10
11
12
sem_t s;
sem_init(&s, 0, 1); // 第三个参数是初始值
int sem_wait(sem_t &s) {
// decrement the value of semaphore s by one
// wait if value of semaphore s is negative
}
int sem_post(sem_t &s) {
// increment the value of semaphore s by one
// if there are one or more threads waiting, wake one
}
信号量也是一个结构体
sem_t
,内部包含一个计数器count
,它的行为是:P(原子)操作时,
count
减1,如果计数器数值小于零则线程睡眠等待1
2
3
4
5
6
7void P(sem_t &sem) {
sem->count--;
if (sem->count < 0) {
push(sem->queue, current);
suspend();
}
}V(原子)操作时,
count
加一,同时如果有正在睡眠的线程,则把睡眠的线程唤醒1
2
3
4
5
6void V(sem_t &sem) {
sem->count++;
if (!empty(sem->queue)) {
wakeup(pop(sem->queue));
}
}
信号量 = 互斥锁 + 条件变量
- P/V是原子操作
- 仅有一个手环 = 互斥锁
- P = wait; V = signal
- 因为计数器的存在,不会发生signal“丢失”(先signal再wait)
- 例子: 游泳池和手环
实际上,信号量可以理解成一个锁(手环)的“池子”,计数器就是锁的数量。然后把P/V操作直观地理解:
- P操作就是从池子里取走一把锁。如果取成功,线程继续执行,如果取失败,就必须等待。
- V操作就是把一把锁放进池子里。这时候如果有线程在等锁,那个线程可以取走这把锁执行。
如果
count
的初始值为1,我们可以直接把P/V当作互斥锁来使用——池子里只有一把锁,进入临界区必须取得锁,临界区结束后归还。一个简单的二元信号量锁的实现方式如下:
1
2
3
4
5
6sem_t m;
sem_init(&m, 0, 1); // 初始化信号量为1(书本原代码此处为问题)
sem_wait(&m);
// critical section here
sem_post(&m);信号量可用于等待其他进程
信号量解生产者消费者问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21sem_t empty = SEM_INIT(n);
sem_t fill = SEM_INIT(0);
void producer() {
while (1) {
P(&empty);
mutex_lock(&mutex);
printf("(");
mutex_unlock(&mutex);
V(&fill);
}
}
void consumer() {
while (1) {
P(&fill);
mutex_lock(&mutex);
printf(")");
mutex_unlock(&mutex);
V(&empty);
}
}- 注意生产者消费者问题中的线程安全性. (printf是安全的)
- 解决方案:先等信号,再上锁。
注意: P和V都不能被锁!!! 否则会死锁!!!
读写锁: 当一个线程想要获得第一把读锁时,它同时会获取写锁,直到它结束后才会释放写锁。这样,如果有线程想要获得写锁,就必须等待所有的读线程退出临界区。(性能垃圾)
哲学家吃饭问题
- 问题所在: 如何防止死锁
- 解决方案:
- 增加一个管理者, 管理所有叉子
- 所有哲学家都先拿编号大的叉子(破坏环路)
- 上锁, 每次只有一个哲学家wait
- 只有拿起两支筷子的哲学家才可以进餐,否则,一支筷子也不拿。
并发bugs
死锁 (ABBA)
出现线程“互相等待”的情况
可能: spinlock不小心打开中断
死锁最基本的形式 (ABBA)
1
2
3
4
5
6
7thread1 tread2
lock(A);
lock(B);
lock(B); // 阻塞
lock(A); // 阻塞
// 任何线程都不能进入临界区在一个线程连续的两次lock之间发生另一个lock事件,死锁才会能触发。
例子:
1
2
3
4
5
6
7
8void move_obj(int i, int j) {
spin_lock(&lock[i]);
spin_lock(&lock[j]);
arr[i] = NULL;
arr[j] = arr[i];
spin_unlock(&lock[j]);
spin_unlock(&lock[i]);
}同时move_obj(1, 2)
;
move_obj(2, 1).
死锁产生的四个条件 :
- 互斥:一个资源每次只能被一个进程使用
- 请求与保持:一个进程请求资源阻塞时,不释放已获得的资源
- 不剥夺:进程已获得的资源不能强行剥夺
- 循环等待:若干进程之间形成头尾相接的循环等待资源关系
AA型死锁非常容易检测
为了避免ABBA型的死锁
- 任意时刻系统中的锁都是有限的
- 严格按照固定的顺序获得所有锁
- 给上锁过程上个大锁
原子性违反 (ABA)
- 互斥锁(lock/unlock) - 原子性
- 忘记上锁——原子性违反 (Atomicity Violation, AV)
顺序违反 (BA)
- 条件变量(wait/signal) - 同步
- 忘记同步——顺序违反 (Order Violation, OV)
- 常见的情况:use after free
数据竞争
- 数据竞争是两个共享内存访问,它们满足以下条件:
- 访问同一个共享内存变量
- 发生在不同的线程
- 至少有一个是写