如果把生产消费想像成自动流水生产线的话,生产就是流水线的物料,消费就是某种设备对物料进行加工的行为,流水线就是队列。 现在,要写一个体现生产消费模式的泛型帮助类,比
如果把生产消费想像成自动流水生产线的话,生产就是流水线的物料,消费就是某种设备对物料进行加工的行为,流水线就是队列。
现在,要写一个体现生产消费模式的泛型帮助类,比如叫ProducerConsumer<T>。
该类肯定会维护一个有关生产、物料的Queue<T>类型的字段,还存在一个有关消费、Action<T>类型的字段。
在ProducerConsumer类的构造函数中,为Action<T>类型的字段赋值,并开启后台有关消费的线程。
ProducerConsumer类肯定存在一个进队列的方法,并且要保证在多线程情况下,同一时间只有一个生产或物料进入队列。
ProducerConsumer类还存在一个有关消费的方法,并且保证在多线程情况下,同一时间只有一个生产或物料出列,并消费它。
另外,在生产或物料在出队列的时候,可能会出现队列中暂时没有生产或物料的情况,这时候我们希望线程阻塞一下,这需要通过AutoResetEvent实现。AutoResetEvent的大致原理是:当生产或物料进入队列的时候需要告诉AutoResetEvent一下,当队列中暂时没有生产或物料的时候,也需要告诉AutoResetEvent,让它来阻塞线程。
//有关生产消费的泛型类 public class ProducerConsumer<T> { //用来存储生产者的队列 private readonly Queue<T> queue = new Queue<T>(); //锁 private readonly object queueLocker = new object(); //消费行为 private readonly Action<T> consumerAction; //出列的时候需要检查队列中是否有元素,如果没有,需要阻塞 private readonly AutoResetEvent queueWaitHandle = new AutoResetEvent(false); public ProducerConsumer(Action<T> consumerAction) { if (consumerAction == null) { throw new ArgumentNullException("consumerAction"); } this.consumerAction = consumerAction; //后台开启一个线程开始消费生产者 new Thread(this.ConsumeItems){IsBackground = true}.Start(); } //进列 public void Enqueue(T item) { //确保同一时间只有一个生产者进列 lock (queueLocker) { queue.Enqueue(item); //每次进列都要设置AutoResetEvent事件 this.queueWaitHandle.Set(); } } //消费动作 private void ConsumeItems() { while (true) { T nextItem = default(T); //标志,确认队列中的生产者是否存在 bool doesItemExist; //确保同一时间只有一个生产者出列 lock (this.queueLocker) { //先确认队列中的生产者是否存在 doesItemExist = this.queue.Count > 0; if (doesItemExist) { nextItem = this.queue.Dequeue(); } } //如果生产者存在,才消费生产者 if (doesItemExist) { this.consumerAction(nextItem); } else//否则的话,再等等下一个队列中的生产者 { this.queueWaitHandle.WaitOne(); } } } }
客户端,针对多线程情形。
class Program { static void Main(string[] args) { //实例化一个int类型的生产消费实例 var producerConsumer = new ProducerConsumer<int>(i => Console.WriteLine("正在消费" + i)); Random random = new Random(); //开启进队列线程 var t1 = new Thread(() => { for (int i = 0; i < 100; i++) { producerConsumer.Enqueue(i); Thread.Sleep(random.Next(0,5)); } }); var t2 = new Thread(() => { for (int i = 0; i > -100; i--) { producerConsumer.Enqueue(i); Thread.Sleep(random.Next(0, 5)); } }); t1.Start(); t2.Start(); t1.Join(); t2.Join(); Thread.Sleep(50); Console.ReadKey(); } }
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对自由互联的支持。如果你想了解更多相关内容请查看下面相关链接