生产者-消费者算是并发编程中常见的问题。依靠缓冲区我们可以实现生产者与消费者之间的解耦。生产者只管往缓冲区里面放东西,消费者只管往缓冲区里面拿东西。这样我们避免生产者想要交付数据给消费者,但消费者此时还无法接受数据这样的情况发生。
wait notify
这个问题其实就是线程间的通讯,所以要注意的是不能同时读写。生产者在缓冲区满的时候不生产,等待;消费者在缓冲区为空的时候不消费,等待。比较经典的做法是wait和notify。
生产者线程执行15次set操作
public class Producer implements Runnable{ private Channel channel; public Producer(Channel channel) { this.channel = channel; } @Override public void run() { for(int i=0;i<15;i++){ channel.set(Thread.currentThread().getName()+" "+i); } } }
消费者线程执行10次get操作
public class Consumer implements Runnable { private Channel channel; public Consumer(Channel channel) { this.channel = channel; } @Override public void run() { for(int i=0;i<10;i++){ System.out.println("Consumer "+Thread.currentThread().getName()+" get "+channel.get()); } } }
现在定义Channel类,并创建两个生产者线程和三个消费者线程
public class Channel { private List<String> buffer=new ArrayList<>(); private final int MAX_SIZE=10; public synchronized String get(){ while (buffer.size()==0){//不要用if,醒来了也要再次判断 try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } String str=buffer.remove(0); notifyAll(); return str; } public synchronized void set(String str){ while (buffer.size()==MAX_SIZE){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } buffer.add(str); notifyAll(); } public static void main(String[] args) { Channel channel=new Channel(); Producer producer=new Producer(channel); Consumer consumer=new Consumer(channel); for(int i=0;i<2;i++){ new Thread(producer).start(); } for (int i=0;i<3;i++){ new Thread(consumer).start(); } } }
使用notifyAll而不是notify的原因是,notify有可能出现多次唤醒同类的情况,造成“假死”。我们可以使用Condition来实现更精确的唤醒。
Condition
将上面代码中的Channel类修改一下即可
public class Channel { private List<String> buffer=new ArrayList<>(); private final int MAX_SIZE=10; private Lock lock=new ReentrantLock(); private Condition producer=lock.newCondition(); private Condition consumer=lock.newCondition(); public String get(){ String str=null; try { lock.lock(); while (buffer.size()==0){ consumer.await(); } str=buffer.remove(0); producer.signalAll(); }catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } return str; } public void set(String str){ try { lock.lock(); while (buffer.size()==MAX_SIZE){ producer.await(); } buffer.add(str); consumer.signalAll(); }catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } } }
双缓冲与Exchanger
当同步的花销非常大时,我们可以采用双缓冲区的办法。双缓冲的一个好处就在于:因为生产者和消费者各自拥有一个缓冲区,所以他们不会同时对同一个缓冲区进行操作,那么我们就不需要为读写操作加锁,用空间换了时间。在Java中可以通过Exchanger来交换两个线程之间的数据结构。
public class Producer implements Runnable{ private List<String> buffer; private Exchanger<List<String>> exchanger; public Producer(List<String> buffer, Exchanger<List<String>> exchanger){ this.buffer=buffer; this.exchanger=exchanger; } @Override public void run() { for(int i=0;i<10;i++){ for (int j=0;j<10;j++) buffer.add("Thrad "+Thread.currentThread().getName()+" : "+i+" "+j); try { buffer=exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Consumer implements Runnable { private Exchanger<List<String>> exchanger; private List<String> buffer; public Consumer(List<String> buffer,Exchanger<List<String>> exchanger) { this.exchanger = exchanger; this.buffer = buffer; } @Override public void run() { for(int i=0;i<10;i++){ try { buffer=exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } for(int j=0;j<10;j++){ String message=buffer.get(0); System.out.println(message); buffer.remove(0); } } } } public class Main { public static void main(String[] args) { List<String> buffer1=new ArrayList<>(); List<String> buffer2=new ArrayList<>(); Exchanger<List<String>> exchanger=new Exchanger<>(); Producer producer=new Producer(buffer1,exchanger); Consumer consumer=new Consumer(buffer2,exchanger); Thread t1=new Thread(producer); Thread t2=new Thread(consumer); t1.start(); t2.start(); } }
BlockingQueue
我们可以使用更为方便安全的阻塞式集合来实现生产消费者模型。
这类集合具有的特点是:当集合已满或者是为空的时候,被调用的方法不会立即执行,该方法将被阻塞,直到可以成功执行为止。
public class Channel { private BlockingQueue<String> blockingQueue=new ArrayBlockingQueue<>(10); public String get(){ String str=null; try { str=blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return str; } public void set(String str){ try { blockingQueue.put(str); } catch (InterruptedException e) { e.printStackTrace(); } } }
这次的Channel类是不是比之前的简洁了许多,有了BlockingQueue我们就不用再去写wait和notify了。
到此这篇关于Java中生产者消费者问题总结的文章就介绍到这了,更多相关Java生产者消费者内容请搜索易盾网络以前的文章或继续浏览下面的相关文章希望大家以后多多支持易盾网络!