信号量与管程

信号量

为什么需要信号量

生产者-消费者问题

  • 一个或多个生产者再生成数据后放进了一个缓冲区里
  • 单个消费者从缓冲区取出数据处理
  • 任何时刻只能有一个生产者或消费者可访问缓冲区

锁方案

  • 临界区:读写缓冲区
    • 保证始终只有一个线程访问缓冲区
  • 问题:生产者线程释放锁后,可能仍然是生产者线程获得锁
    • 锁能够保护共享资源互斥访问
    • 但锁不能够提供线程条件同步

所需的机制特征

  • 表示资源状态:缓冲区空 VS 缓冲区满
  • 条件同步:使得多进程/线程根据资源的状态执行

信号量

信号量是操作系统提供的一种协调共享资源访问的方法

信号量的组成

  • 一个整形变量:用于表示系统资源的数量
  • 两个原子操作(P操作和V操作)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Class Semaphore{
int sem;
WaitQueue q;
}
Semaphore::P(){
sem--; //请求资源
if (sem<0){ //当资源不足的时候将线程阻塞
Add this thread t to q;
block(t);
}
}
Semaphore::V(){
sem++; //释放资源
if (sem<=0){ //若仍存在阻塞的线程,则释放一个线程
Remove a thread t from q;
waitupt(t);
}
}
  • 信号量的使用
    • 互斥访问:保护临界区互斥访问
    • 条件同步:多线程之间同步

用信号量实现生产者-消费者问题

  • 有界缓冲区的生产者-消费者问题
    • 一个或多个生产者再生成数据后放进了一个缓冲区里
    • 单个消费者从缓冲区取出数据处理
    • 任何时刻只能有一个生产者或消费者可访问缓冲区
  • 要求
    • 任何时刻都只能有一个线程操作(互斥访问)
    • 缓冲区空时:消费者必需等待生产者(条件同步)
    • 缓冲区满时:生产者必需等待消费者(条件同步)

设计

  • 缓冲区空:信号量emptyBuffers
  • 缓冲区满:信号量fullBuffers
  • 互斥访问:互斥锁mutex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Class BoundedBuffer{
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
}
BoundedBuffer::Deposit(c){
emptyBuffers->P();
mutex->P();
Add c to the buffer;
mutex->V();
fullBuffers->V();
}
BoundedBuffer::Remove(c){
fullBuffers->P();
mutex->P();
Remove c to the buffer;
mutex->V();
emptyBuffers->V();
}

哲学家就餐问题

  • 5个哲学家围绕一张圆桌而坐
    • 桌子上放着5个叉子
    • 每两个哲学家之间放一个叉子
  • 哲学家的动作包括思考和进餐
    • 进餐时需同时拿到左右两边的叉子
    • 思考时将两支叉子放回原处
  • 问题:如何保证哲学家们的动作有序进行
    • 即保证不存在有人永远拿不到叉子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#define N 5
semaphore fork[5];
void philosopher(int i)
{
while (1)
{
think();
if (i%2==0){
P(fork[i]);
P(fork[(i+1)%N]);
}
else{
P(fork[(i+1)%N]);
P(fork[i]);
} // 使用不同的顺序拿叉子以防止死锁
eat();
V(fork[i]);
V(fork[(i+1)%N]);
}
}

管程

管程是一种用于多线程互斥访问共享资源的程序结构

  • 采用面向对象方法,简化了线程间的同步控制
  • 任意时刻最多只能有一个线程执行管程代码
  • 正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复

组成

  • 一个锁
    • 用于控制管程代码的互斥访问
  • 条件变量
    • 用于管理共享数据的条件同步
  • 等待原因
    • 用于判断是否需要等待的条件
1
2
3
4
5
6
7
8
9
10
11
12
// 可能需要等待的线程
Acquire(mutex);
while (等待条件满足) //当前没有资源可供使用
cond.wait(mutex);
使用资源
Release(mutex);

// 改变等待条件的线程
Acquire(mutex);
使资源可用且使等待条件不满足
Signal(cond); // 唤醒一个在等待的线程
Release(mutex);

面向对象封装

  • 成员变量
    • 等待原因(比如某个变量值)
    • 条件变量
      • 提供原语操作(实现等待和通知等)
      • 包含等待队列
  • 成员函数
    • 资源操作代码

条件变量原语操作

  • 条件变量是管程内的等待机制
    • 每个条件变量表示一种等待的原因,对应一个等待队列
  • Wait原语操作
    • 将自己阻塞在等待队列中
    • 等待被唤醒或执行线程释放管程的互斥访问
  • Signal原语操作
    • 将等待队列中的一个线程唤醒
  • Broadcast原语操作
    • 唤醒等待队列中所有的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Class Condition{
int numWaiting = 0;
WaitQueue q;
}
Condition::Wait(lock){
numWaiting++;
Add this thread t to q;
release(lock); // 解锁后再调用scheduler防止死锁
do_scheduler();
acquire(lock); // 重新申请锁,恢复原状
}
Condition::Signal(){
if (numWaiting > 0){
Remove a thread t from q;
wakeup(t);
numWaiting--;
}
}

用管程来实现生产者-消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Class BoundedBuffer{
Lock lock;
int count=0;
Condition full,empty;
}
BoundedBuffer::Deposit(c){
lock->Acquire();
while (count==n)
full.Wait(&lock); // 如果满了就等待消费者消费
Add c to the buffer;
count++;
empty.Signal();
lock->Release(); // 新的生产者信号,通知消费者可以消费
}
BoundedBuffer::Remove(c){
lock->Acquire();
while (count==0)
empty.wait(&lock); // 如果空了就等待生产者生产
Remove c from buffer;
count--;
full.Signal();
lock->Release(); // 新的消费者信号,通知生产者可以生产
}

Signal后的三种选择

Hansen管程

发送方退出管程

  • 规定Signal必须是管程中的过程的最后一个语句

Hoare管程

让被唤醒的线程立即执行,发送方进入Signal队列

  • 如果发送方有其他工作要做,会很麻烦
    • 很难确定没有其他工作要做,因为Signal的实现并不知道其是如何被使用的

Mesa管程

发送方继续执行

  • 易于实现
  • 被唤醒的进程实际执行的时候,条件不一定为真
    • 需要让被唤醒的进程重新回到wait()执行,需要重新判断等待原因条件

信号量与管程对比

  • 信号量
    • 控制对多个共享资源的访问,用于进程/线程间的条件同步
    • 可以并发,取决于sem的初始值
      • sem表示资源的数量
    • P操作可能导致阻塞,也可能不阻塞
    • V操作唤醒其他进程/线程后,当前进程/线程与被唤醒者可以并发执行
  • 管程
    • 一种程序结构,限制同一时刻只有一个线程可以访问临界区
    • 管程内部同一时刻只有一个线程在执行
    • 需要自行判断资源是否可用(等待原因判断)
    • wait操作一定会造成阻塞
    • signal操作后,被唤醒的线程是否执行取决于管程的风格

屏障

线程A和线程B希望在某一个特定的点交会并继续执行

屏障原语(Barrier)

功能

  • 协调多个进程并行共同完成某项任务
  • 设定一个屏障变量b以及初始值n
    • 如果屏障变量值小于n,则线程等待
    • 如果屏障变量的值达到n,则唤醒所有线程,所有的线程继续工作

屏障实现

  • 使用两个信号量(两个线程时)
    • 初始化两个信号量均为0

image-20221020103351441

  • 使用管程实现(超过两个线程时)
    • 等待原因:抵达屏障的线程数量没有达到n

同步机制

操作系统提供的同步机制

image-20221020103532408

读者-写者问题

  • 共享数据的两类使用者
    • 读者:只读取数据,不进行修改
    • 写者:读取并修改数据
  • 对共享数据的读写
    • “读者-读者”允许
      • 同一时刻,允许有多个读者同时读
    • “读者-写者”互斥
      • 没有写者的时候读者才可以读
      • 没有读者的时候写者才可以写
    • “写者-写者”互斥
      • 没有其他写者的时候写者才可以写

用信号量解决读者-写者问题

用信号量描述每一个约束

  • 信号量WriteMutex
    • 控制对于读写操作的互斥
    • 初始化为1
  • 读者数量Rcount
    • 描述正在进行读操作的读者数目
    • 初始化为0
  • 信号量CountMutex
    • 控制对读者计数的互斥修改
    • 初始化为1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Writer Process
P(WriteMutex); //写写互斥
Write;
V(WriteMutex);

// Reader Process
P(CountMutex);
if (Rcount==0) P(WriteMutex); //读写互斥
++Rcount;
V(CountMutex);

read;

P(CountMutex);
--Rcount;
if (Rcount==0) V(WriteMutex); //读写互斥
V(CountMutex);

用管程解决读者-写者问题

管程封装

  • 两个基本方法
1
2
3
4
5
6
7
8
9
10
Database::Read() {
Wait until no writers;
read database;
check out – wake up waiting writers;
}
Database::Write() {
Wait until no readers/writers;
write database;
check out – wake up waiting readers/writers;
}
  • 管程的状态变量
1
2
3
4
5
6
7
AR = 0; // of active readers
AW = 0; // of active writers
WR = 0; // of waiting readers
WW = 0; // of waiting writers
Lock lock;
Condition okToRead;
Condition okToWrite;

读者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Private Database::StartRead() 
{
lock.Acquire();
while ((AW+WW) > 0) //读写互斥
{
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}

Private Database::DoneRead()
{
lock.Acquire();
AR--;
if (AR ==0 && WW > 0) //唤醒写者
{
okToWrite.signal();
}
lock.Release();
}

写者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Private Database::StartWrite() 
{
lock.Acquire();
while ((AW+AR) > 0) //读写/写写互斥
{
WW++;
okToWrite.wait(&lock);
WW--;
}
AW++;
lock.Release();
}

Private Database::DoneWrite()
{
lock.Acquire();
AW--;
if (WW > 0) //唤醒写者
{
okToWrite.signal();
}
else if (WR > 0) //唤醒读者
{
okToRead.broadcast();
}
lock.Release();
}