1.生产者-消费者模型

生产者-消费者模型是一个常见的多线程编程模型,如图所示。

一个内存队列,多个生产者线程往内存队列中放数据;多个消费者线程从内存中取数据。要实现一个这样的编程模型,需要做以下几件事。

Java并发原理-线程的生产者-消费者模型

1.内存队列本身要加锁,才能实现线程安全。

2.阻塞。当内存队列满了,生产者放不进去时,会被阻塞;当内存队列是空的时候,消费者无事可做,会被阻塞。

3.双向通知。消费者被阻塞之后,生产者放入新数据,要notify()消费者;反之,生产者被阻塞之后,消费者消费了数据,要notity()生产者。

第1件事必须做,第2和第3件事不一定要做。eg,可以采取一个简单的办法,生产者放不进去时,睡眠几百毫秒再重试,消费者取不到数据时,睡眠几百毫秒再重试。但这个办法效率低下,也不实时。所以,我们只讨论如何阻塞、如何通知的问题。

1-1.如何阻塞?

方法1:线程自己阻塞自己,也就是生产者、消费者各自调用wait()和notify()。

方法2:用一个阻塞队列,当取不到或者放不进去数据的时候,入队/出队函数本身就是阻塞的。这也就是BlockingQueue的实现,后面详细讲述。

1-2.如何双向通知?

方法1:wait()与notify()机制。

方法2:Condition机制。

此处先讲wait()与notify()机制,后面会专门讲Condition机制与BlockingQueue机制。

2.为什么必须和synchronized一起使用?

在Java里面,wait()和notify()是Object的成员函数,是基础中的基础。为什么Java要把wait()和notify()放在如此基础的类里面呢?而不是作为向Thread一类的成员函数,或者其他的成员函数呢?

回答这个问题之前,先回答为什么wait()和notify()必须和synchronized一起使用?先看以下代码:

class A{
  
  private Object obj1 = new Object();
  
  public void f1(){
    synchronized(obj1){
      ...
      obj1.wait();
      ...
    }
  }
  
  public void f2(){
    synchronized(obj1){
      ...
      obj1.notify();
      ...
    }
  }
  
}

或者下面的代码:

class A{
  public void synchronized f1(){
    ...
    this.wait();
    ...
  }
  
  public void synchronized f2(){
    ...
    this.notify();
    ...
  }
}

然后,开两个线程。线程A调用f1(),线程B调用f2()。答案已经很明显:两个线程之间要通讯,对于同一个对象来说,一个线程调用该对象的wait(),另一个线程调用该对象的notify(),该对象本身就需要同步!所以,在调用wait()、notify()之前,要先通过synchronized关键字同步给对象,也就是给该对象加锁。

前面讲了,synchronized关键字可以加在任何对象的成员函数上面,任何对象都可以成为锁。那么,wait()和notify()要同样如此普及,就放在Object里面了。

3.为什么wait()的时候必须释放锁?

当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁。此时,调用wait()进入阻塞状态,一直不能退出synchronized代码块;那么,线程B永远无法进入synchronized(obj1)同步块里,永远没有机会调用notify(),岂不是死锁了?

这就涉及一个关键问题:在wait()的内部,会先释放obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒,去重新拿锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchornized同步块,再次释放锁。
wait()内部的伪代码如下:

wait(){
  // 释放锁
  // 阻塞,等待其他线程被notify
  // 重新拿锁
}

只有如此才能避免上面所说的死锁问题。后面将Condition实现的时候,会再详细讨论这个问题。

4.wait()与notify()问题

以上述的生产者-消费者模型来看,其伪代码大致如下:

public void enqueue(){
  synchronized(queue){
    while(queue.full()){
      queue.wait();
      // 入队列
      ...
      // 通知消费者,队列中有数据了
      queue.notify(); 
    }
  }
}


public void dequeue(){
  synchronized(queue){
    while(queue,empty()){
      queue.wait();
      // 出队列
      ...
      // 通知生产者,队列中有空位了,可以继续放数据
      queue.notify();
    }
  }
}

生产者本来只想通知消费者,但它把其他的生产者也通知了;消费者本来只想通知生产者,但它被其他的消费者通知了。原因就是wait()和notify()所作用的对象和synchronized所作用的对象是同一个,只能有一个对象,无法区分队列空和队列满两个条件。这正是Condition要解决的问题。