当前位置 : 主页 > 编程语言 > java >

Java多线程 Producer and Consumer设计模式

来源:互联网 收集:自由互联 发布时间:2021-11-19
目录 producer 是生产者的意思: 指生产数据的线程, consumer 是消费者的意思: 指的是使用数据的线程 public class ProducerThread extends Thread { private final static Random random = new Random(System.curren
目录
    • producer是生产者的意思:指生产数据的线程,
    • consumer是消费者的意思:指的是使用数据的线程
    public class ProducerThread extends Thread {
    
        private final static Random random = new Random(System.currentTimeMillis());
        private final static AtomicInteger counter = new AtomicInteger(0);
        private final MessageQueue messageQueue;
    
        public ProducerThread(MessageQueue messageQueue, int seq) {
            super("Producer-" + seq);
            this.messageQueue = messageQueue;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Message message = new Message("Message-" + counter.getAndIncrement());
                    messageQueue.put(message);
                    System.out.println(Thread.currentThread().getName() + " put message " + message.getData());
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    break;
                }
            }
    
        }
    }
    
    
    public class ConsumerThread extends Thread {
    
        private final static Random random = new Random(System.currentTimeMillis());
        private final MessageQueue messageQueue;
    
        public ConsumerThread(MessageQueue messageQueue, int seq) {
            super("Consumer-" + seq);
            this.messageQueue = messageQueue;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Message message = messageQueue.take();
                    System.out.println(Thread.currentThread().getName() + " take a message " + message.getData());
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    break;
                }
            }
    
        }
    
    }
    
    
    public class Message {
    
        public Message(String data) {
            this.data = data;
        }
    
        private String data;
    
        public String getData() {
            return data;
        }
    
        public void setData(String data) {
            this.data = data;
        }
    
    
    }
    
    
    public class MessageQueue {
        private final static int DEFAULT_MAX_LIMIT = 100;
        private final LinkedList<Message> queue;
        private final int limit;
    
    
        public MessageQueue() {
            this(DEFAULT_MAX_LIMIT);
        }
    
        public MessageQueue(final int limit) {
            this.limit = limit;
            this.queue = new LinkedList<>();
        }
    
        public void put(final Message message) throws InterruptedException {
            synchronized (queue) {
                while (queue.size() > limit) {
                    queue.wait();
                }
                queue.addLast(message);
                queue.notifyAll();
            }
        }
    
        public Message take() throws InterruptedException {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    queue.wait();
                }
                Message message = queue.removeFirst();
                queue.notifyAll();
                return message;
            }
        }
    
        public int getMaxLimit() {
            return this.limit;
        }
    
        public int getMessageSize() {
            synchronized (queue) {
                return queue.size();
            }
        }
    }
    
    
    public class ProducerAndConsumerClient {
    
        public static void main(String[] args) {
            final MessageQueue messageQueue = new MessageQueue();
    
            new ProducerThread(messageQueue, 1).start();
            new ProducerThread(messageQueue, 2).start();
            new ProducerThread(messageQueue, 3).start();
            new ConsumerThread(messageQueue, 1).start();
            new ConsumerThread(messageQueue, 2).start();
        }
    }
    
    

    Producer-1 put message Message-0
    Producer-3 put message Message-2
    Producer-2 put message Message-1
    Consumer-1 take a message Message-0
    Consumer-2 take a message Message-1
    Producer-2 put message Message-3
    Consumer-1 take a message Message-2
    Producer-2 put message Message-4
    Consumer-2 take a message Message-3
    Producer-3 put message Message-5
    Producer-3 put message Message-6
    Producer-3 put message Message-7
    Consumer-1 take a message Message-4
    Producer-2 put message Message-8
    Consumer-2 take a message Message-5
    Producer-3 put message Message-9
    Producer-1 put message Message-10
    Producer-1 put message Message-11
    Producer-2 put message Message-12
    省略...

    到此这篇关于Java多线程 Producer and Consumer设计模式的文章就介绍到这了,更多相关Java多线程 Producer、Consumer内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!

    网友评论