OS并发

线程threads

pthread_create() 创立线程

pthread_join() 等待某线程结束

  • 线程是操作系统能够进行运算调度的最小单位
  • 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位

顺序、原子性、可见性的丧失

  • 原子性: 一个操作是不可中断的,要么全部执行成功要么全部执行失败. 即使单处理器, 也会因为操作系统中断而丧失.
  • 顺序性: 如果在本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无序的
  • 可见性: 当一个线程修改了共享变量后,其他线程能够立即得知这个修改. 因为处理器的cache而丧失.

因此需要锁.

锁和条件变量的使用

互斥锁

1
2
3
4
pthread_mutex_lock(&lock);
x = x + 1;
// or whatever your critical section is
pthread_mutex_unlock(&lock);
  • 如果没有别的线程拥有这把锁, 那么当前线程得到锁, 继续运行;
  • 如果别的线程持有该锁, 那么当前线程将一直等待直到得到锁.
  • 锁初始化:

    1
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

    或者

    1
    2
    int rc = pthread_mutex_init(&lock, NULL);
    assert(rc == 0); // always check success!

条件变量

1
2
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
  • pthread_cond_wait让线程睡眠, 等待其他线程发出信号
  • pthread_cond_signal唤醒正在沉睡中的对应线程
  • 初始化:
    • pthread_cond_init()
    • PTHREAD COND INITIALIZER

锁的实现

硬件层

LOCK指令

  • 保证顺序, 原子性和可见性

    LL/SC

  • 把一对读-写实现成一对“可能失败”的操作

    • Load Linked (LL):先执行一次读操作,并且在地址上做一个“标记”
    • Store Conditional (SC):试探性地写入LL相同的地址(通过对内存地址获得独占访问)。如果LL-SC之间没有其他到该地址的写操作,则写入内存,返回SUCC;否则不执行写入(进而不会污染任何内存),并返回FAIL。

      自旋锁

    • 开关中断

      1
      2
      3
      4
      5
      6
      7
      8
      void lock(lock_t *lk) {
      pushcli();
      while (atomic_xchg(&lk->locked, 1)) ;
      }
      void unlock(lock_t *lk) {
      atomic_xchg(&lk->locked, 0);
      popsti();
      }
      • 注意ABBA类型, 不能过早地开中断. 可以将lock和unlock想成入栈和出栈的过程(处理器维护).

        能睡眠的互斥锁

  • 取消处理器空转, 在自旋失败的时候切换到别的线程执行

  • 失败时将自己加入等待队列

  • 有人释放锁时唤醒

  • 例子: 游泳池的进入

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    typedef struct __lock_t {
    int flag;
    int guard; // 持有guard锁才能更改flag
    queue_t *q;
    } lock_t;

    void lock(lock_t *m) {
    while (TestAndSet(&m->guard, 1) == 1)
    ; // acquire guard lock by spinning
    if (m->flag == 0) {
    m->flag = 1; // lock is acqured
    m->guard = 0;
    } else {
    queue_add(m->q, gettid());
    m->guard = 0; // 解锁
    park(); // 睡眠
    }
    }

    void unlock(lock_t *m) {
    while (TestAndSet(&m->guard, 1) == 1)
    ; // acquire guard lock by spinning
    if (queue_empty(m->q))
    m->flag = 0;
    else
    unpark(queue_remove(m->q)); // 唤醒
    m->guard = 0;
    }

并发数据结构

  • 最简单的做法: 每一个操作都上一把锁.
  • 问题: 性能太差
  • 计数器: 可用近似计数器, 每个CPU分别计数, 到达一个阈值就更新给全局计数器.
  • 链表: 每次list一把大锁; 或者每个结点一把锁
  • 队列: 头和尾各一把锁

条件变量

  • 非常直观的同步方法:一个条件变量代表“某个条件满足”,支持:

    • 等待某个条件满足后发生
    • 某个条件满足,唤醒一个正在等待的线程
    • 某个条件满足,唤醒所有正在等待的线程
  • pthread_cond_wait(pthread_cond_t c, pthread_mutex_t m);
    pthread_cond_signal(pthread_cond_t *c);唤醒一个

    pthread_cond_broadcast(pthread_cond_t *c);唤醒所有

  • wait()函数还需要一个互斥锁作为参数。wait()会解开锁并且让调用者进入睡眠状态(这是一个原子操作)。当调用者被唤醒时,wait()必须重新上锁,然后才能返回到调用者。

  • 实现join

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    void worker(int id) {
    ...
    mutex_lock(&mutex);
    done[id] = 1; // 可能和read并发执行
    signal(&joins);
    mutex_unlock(&mutex);
    }
    int main() {
    for (int i = 0; i < nworkers; i++)
    create(worker, i);
    for (int i = 0; i < nworkers; i++) {
    lock(&mutex);
    if (!done[i]) {
    wait(&joins, &mutex);
    }
    unlock(&mutex);
    }
  • 如果删除了done:子线程先运行,父线程进入睡眠状态,永远无法唤醒。

  • 如果删除了互斥锁:父线程获取数据的同时子线程修改了数据,产生竞争,父线程随即进入睡眠状态,永远无法唤醒。

  • 注意: 不要先signal再wait

  • 注意: wait前要上锁!!!

  • signal建议在解锁之前

生产者消费者问题

  • 如果用一个信号变量, 会出现2个问题:

    • 问题1: 消费者C1因无商品休眠后, 被唤醒时, C2抢先清空了缓冲区, 但C1仍然认为有商品.
      • 问题关键: 没有确保唤醒了的线程立即执行.
      • 解决方法: 将count的判断由if改为while.
      • 解决的原因: wake up后马上check条件变量是否为1.
      • 启示: 总是用while循环来处理条件变量, 多检查一次总是安全的!!!
    • 问题2: 消费者可能会唤醒消费者
      • 问题关键: 我们需要信号去唤醒线程,但信号的对象必须明确:消费者不能唤醒消费者,只能唤醒生产者,反之亦然。
      • 解决方法: 用两个条件变量
  • 生产者和消费者仍然需要支持并发性:需要多个商品缓冲区,可以一次生产/消费多个商品。

  • 虚假唤醒: 反正用while就行.

    img

    img

信号量Semaphores

  • 将生产者消费者问题中的互斥锁, 条件变量, 计数器合三为一.

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #include <semaphore.h>
    sem_t s;
    sem_init(&s, 0, 1); // 第三个参数是初始值
    int sem_wait(sem_t &s) {
    // decrement the value of semaphore s by one
    // wait if value of semaphore s is negative
    }

    int sem_post(sem_t &s) {
    // increment the value of semaphore s by one
    // if there are one or more threads waiting, wake one
    }
  • 信号量也是一个结构体sem_t,内部包含一个计数器count,它的行为是:

    • P(原子)操作时,count减1,如果计数器数值小于零则线程睡眠等待

      1
      2
      3
      4
      5
      6
      7
      void P(sem_t &sem) {
      sem->count--;
      if (sem->count < 0) {
      push(sem->queue, current);
      suspend();
      }
      }
    • V(原子)操作时,count加一,同时如果有正在睡眠的线程,则把睡眠的线程唤醒

      1
      2
      3
      4
      5
      6
      void V(sem_t &sem) {
      sem->count++;
      if (!empty(sem->queue)) {
      wakeup(pop(sem->queue));
      }
      }
  • 信号量 = 互斥锁 + 条件变量

    • P/V是原子操作
    • 仅有一个手环 = 互斥锁
    • P = wait; V = signal
    • 因为计数器的存在,不会发生signal“丢失”(先signal再wait)
    • 例子: 游泳池和手环
  • 实际上,信号量可以理解成一个锁(手环)的“池子”,计数器就是锁的数量。然后把P/V操作直观地理解:

    • P操作就是从池子里取走一把锁。如果取成功,线程继续执行,如果取失败,就必须等待。
    • V操作就是把一把锁放进池子里。这时候如果有线程在等锁,那个线程可以取走这把锁执行。
  • 如果count的初始值为1,我们可以直接把P/V当作互斥锁来使用——池子里只有一把锁,进入临界区必须取得锁,临界区结束后归还。

  • 一个简单的二元信号量锁的实现方式如下:

    1
    2
    3
    4
    5
    6
    sem_t m;
    sem_init(&m, 0, 1); // 初始化信号量为1(书本原代码此处为问题)

    sem_wait(&m);
    // critical section here
    sem_post(&m);
  • 信号量可用于等待其他进程

  • 信号量解生产者消费者问题:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    sem_t empty = SEM_INIT(n);
    sem_t fill = SEM_INIT(0);
    void producer() {
    while (1) {
    P(&empty);
    mutex_lock(&mutex);
    printf("(");
    mutex_unlock(&mutex);
    V(&fill);
    }
    }

    void consumer() {
    while (1) {
    P(&fill);
    mutex_lock(&mutex);
    printf(")");
    mutex_unlock(&mutex);
    V(&empty);
    }
    }
    • 注意生产者消费者问题中的线程安全性. (printf是安全的)
    • 解决方案:先等信号,再上锁。
  • 注意: P和V都不能被锁!!! 否则会死锁!!!

  • 读写锁: 当一个线程想要获得第一把读锁时,它同时会获取写锁,直到它结束后才会释放写锁。这样,如果有线程想要获得写锁,就必须等待所有的读线程退出临界区。(性能垃圾)

哲学家吃饭问题

  • 问题所在: 如何防止死锁
  • 解决方案:
    1. 增加一个管理者, 管理所有叉子
    2. 所有哲学家都先拿编号大的叉子(破坏环路)
    3. 上锁, 每次只有一个哲学家wait
    4. 只有拿起两支筷子的哲学家才可以进餐,否则,一支筷子也不拿。

并发bugs

死锁 (ABBA)

  • 出现线程“互相等待”的情况

  • 可能: spinlock不小心打开中断

  • 死锁最基本的形式 (ABBA)

    1
    2
    3
    4
    5
    6
    7
    thread1      tread2
    lock(A);
    lock(B);

    lock(B); // 阻塞
    lock(A); // 阻塞
    // 任何线程都不能进入临界区
    • 在一个线程连续的两次lock之间发生另一个lock事件,死锁才会能触发。

    • 例子:

      1
      2
      3
      4
      5
      6
      7
      8
      void move_obj(int i, int j) {
      spin_lock(&lock[i]);
      spin_lock(&lock[j]);
      arr[i] = NULL;
      arr[j] = arr[i];
      spin_unlock(&lock[j]);
      spin_unlock(&lock[i]);
      }

      同时move_obj(1, 2);move_obj(2, 1).

  • 死锁产生的四个条件 :

    • 互斥:一个资源每次只能被一个进程使用
    • 请求与保持:一个进程请求资源阻塞时,不释放已获得的资源
    • 不剥夺:进程已获得的资源不能强行剥夺
    • 循环等待:若干进程之间形成头尾相接的循环等待资源关系
  • AA型死锁非常容易检测

  • 为了避免ABBA型的死锁

    • 任意时刻系统中的锁都是有限的
    • 严格按照固定的顺序获得所有锁
    • 给上锁过程上个大锁

原子性违反 (ABA)

  • 互斥锁(lock/unlock) - 原子性
  • 忘记上锁——原子性违反 (Atomicity Violation, AV)

顺序违反 (BA)

  • 条件变量(wait/signal) - 同步
  • 忘记同步——顺序违反 (Order Violation, OV)
  • 常见的情况:use after free

数据竞争

  • 数据竞争是两个共享内存访问,它们满足以下条件:
    • 访问同一个共享内存变量
    • 发生在不同的线程
    • 至少有一个是写