什么是线程通信
线程通信指的是多个线程之间,某个线程修改了一个对象的值时,其他的线程也能够感知到该值的变化并进行相关的操作,使线程与线程之间间接的实现通信。实现线程通信的方法可以通过以下几种方式:
- 基于volatile修饰的共享对象。
- 通过wait/notify机制实现。
- 线程的join()方法。
- 使用synchronized同步关键字。
- 另外也可借助中间件或者数据库做数据共享实现通信等等。
本文主要就通过wait/notify机制进一步分析讲解。
wait/notify
相信各位对于wait/notify应该不会太陌生,他们的主要作用就是用于控制线程之间的等待与唤醒,具体方法作用如下:
- wait():使当前线程进入阻塞状态,并且释放当前线程持有的锁。与sleep()不同的是sleep()不会释放持有的锁。
- notify():唤醒处于阻塞状态下的一个线程。
- notifyAll():唤醒处于阻塞状态下的所有线程。
需要唤醒一个被Object.wait()方法阻塞的线程有两种方法:
- 其他线程调用了同一个对象的notify()/notifyAll()方法。
- 调用了被阻塞线程的interrupt()方法,被阻塞的线程会被唤醒并且抛出InterruptException异常。
wait/notify方法如何使用
wait()/notify()方法实际上是通过同一个共享对象的竞争来实现数据变更通知,可以简单的理解为某个共享对象的数据变化达到某个条件而触发阻塞或唤醒动作,从而实现线程之间的通信。下面我们就使用wait()/notify()来实现一个简易的消息通知场景:
- 首先定义一个消息生产者:
private int maxSize; // 支持最大消息数
private Queue<String> messageQueue; // 存放消息的队列
MessageProducer(Queue<String> messageQueue, int maxSize) {
this.messageQueue = messageQueue;
this.maxSize = maxSize;
}
@Override
public void run() {
int i = 0;
while(true) {
i++;
synchronized (messageQueue) {
if (messageQueue.size() == maxSize) {
System.out.println("消息已占满");
try {
// 消息队列满了,进入阻塞等待消费者消费部分消息后继续生产
messageQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发送消息:" + i);
messageQueue.add("message-" + i);
messageQueue.notify();
}
}
}
}
- 消息生产者通过一个while循环对共享消息队列messageQueue使用synchronized进行加锁操作。
- 消息达到最大数量messageQueue.size() == maxSize时调用wait()方法让当前线程进入等待。
- 每间隔1秒生产一个消息,打印日志并调用notify()通知唤醒阻塞的消息消费者线程。
- 定义一个消息消费者:
private int maxSize; // 支持最大消息数
private Queue<String> messageQueue; // 存放消息的队列
MessageConsumer(Queue<String> messageQueue, int maxSize) {
this.messageQueue = messageQueue;
this.maxSize = maxSize;
}
@Override
public void run() {
while(true) {
synchronized (messageQueue) {
if (messageQueue.isEmpty()) {
System.out.println("消息全部已读");
try {
// 消息全部已读,需等待消息发送者继续发送方可继续读取消息
messageQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("读取消息:" + messageQueue.remove());
messageQueue.notify();
}
}
}
}
- 消息消费者通过while循环首先对共享队列messageQuery进行加锁操作,在这里消息生产者与消费者的messageQuery必须是同一个共享对象。
- 根据messageQueue.isEmpty()判断消息队列是否为空来确认是否有未读的消息,如果为空则表示全部已读,则当前线程进入等待状态,待消息生产者继续生产消息后可再次被唤醒。
- 每次循环使用messageQueue.remove()读取并消费消息操作,并调用messageQueue.notify()方法唤醒阻塞状态的消息生产者继续生产消息。
- 定义main方法运行消息的生产与消费线程:
// 消息生产者与消费者共用一个共享对象
Queue<String> messageQueue = new LinkedBlockingQueue();
// 支持最大消息数
int maxSize = 2;
Thread messageProducer = new MessageProducer(messageQueue, maxSize);
Thread messageConsumer = new MessageConsumer(messageQueue, maxSize);
// 运行两个线程
messageProducer.start();
messageConsumer.start();
}
部分运行结果如下:
从上面的运行结果可以看出消息生产者与消息消费者之间通过共享对象进行wait()/notify()方法操作实现了通信功能,使两个线程在运行过程中不断的进入阻塞与唤醒从而达到消息不断的发送与读取。
wait/notify的为什么要加同步锁
上面的例子我们也看到了wait()/notify()方法的使用都是在同步锁synchronized当中使用的,之所以这么做是因为不在synchronized中就会抛出IllegalMonitorStateException异常,原因主要有两点:
- wait()/notify()方法都是基于一个共享对象来实现线程通信的,这就意味着存在多个线程对同一个共享对象竞争的可能,为了保证共享对象的原子性就需要加锁保护。
- wait()/notify()方法主要实现的是线程的阻塞与唤醒,而在调用notify()的时候要唤醒哪个线程并不确定,这时候通过synchronized实现了同步机制,正好为wait()/notify()提供很好的协同机制。
去掉synchronized同步锁后运行出现的异常如下: [图片上传失败...(image-eb73fb-1665193432014)]
总结
多数的生产者消费者场景通信都是通过wait()/notify()的方式去实现的,在多线程交互的环境当中合理的设计运用好wait()/notify()方法就能够良好的实现线程之间的通信功能,但以此同时也会带来更多的并发以及对象的共享问题,现实开发过程当中还需多多考虑更细节的东西,多进行一些测试以防止意外的出造成不必要的损失。