在.NET中创建阻塞队列<T>?

在.NET中创建阻塞队列<T>?

我有一个场景,其中有多个线程添加到队列中,多个线程从同一个队列中读取。如果队列达到特定大小全螺纹正在填充的队列将在Add上被阻塞,直到从队列中删除某个项为止。

下面的解决方案是我现在使用的方法,我的问题是:如何改进这个问题?是否有一个已经在我应该使用的bcl中启用此行为的对象?

internal class BlockingCollection<T> : CollectionBase, IEnumerable{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }}


慕田峪7331174
浏览 615回答 3
3回答

牛魔王的故事

这看起来非常不安全(几乎没有同步);类似于:class&nbsp;SizeQueue<T>{ &nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;readonly&nbsp;Queue<T>&nbsp;queue&nbsp;=&nbsp;new&nbsp;Queue<T>(); &nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;readonly&nbsp;int&nbsp;maxSize; &nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;SizeQueue(int&nbsp;maxSize)&nbsp;{&nbsp;this.maxSize&nbsp;=&nbsp;maxSize;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;Enqueue(T&nbsp;item) &nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;lock&nbsp;(queue) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;while&nbsp;(queue.Count&nbsp;>=&nbsp;maxSize) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Monitor.Wait(queue); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;queue.Enqueue(item); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(queue.Count&nbsp;==&nbsp;1) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;wake&nbsp;up&nbsp;any&nbsp;blocked&nbsp;dequeue &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Monitor.PulseAll(queue); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;T&nbsp;Dequeue() &nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;lock&nbsp;(queue) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;while&nbsp;(queue.Count&nbsp;==&nbsp;0) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Monitor.Wait(queue); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;T&nbsp;item&nbsp;=&nbsp;queue.Dequeue(); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(queue.Count&nbsp;==&nbsp;maxSize&nbsp;-&nbsp;1) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;wake&nbsp;up&nbsp;any&nbsp;blocked&nbsp;enqueue &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Monitor.PulseAll(queue); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;item; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;}}(编辑)实际上,您可能需要一种关闭队列的方法,以便读者开始干净地退出-也许类似于bool标志-如果设置了,空队列就会返回(而不是阻塞):bool&nbsp;closing;public&nbsp;void&nbsp;Close(){ &nbsp;&nbsp;&nbsp;&nbsp;lock(queue) &nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;closing&nbsp;=&nbsp;true; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Monitor.PulseAll(queue); &nbsp;&nbsp;&nbsp;&nbsp;}}public&nbsp;bool&nbsp;TryDequeue(out&nbsp;T&nbsp;value){ &nbsp;&nbsp;&nbsp;&nbsp;lock&nbsp;(queue) &nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;while&nbsp;(queue.Count&nbsp;==&nbsp;0) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(closing) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;value&nbsp;=&nbsp;default(T); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;false; &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Monitor.Wait(queue); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;value&nbsp;=&nbsp;queue.Dequeue(); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(queue.Count&nbsp;==&nbsp;maxSize&nbsp;-&nbsp;1) &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;{ &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;wake&nbsp;up&nbsp;any&nbsp;blocked&nbsp;enqueue &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Monitor.PulseAll(queue); &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;true; &nbsp;&nbsp;&nbsp;&nbsp;}}

拉风的咖菲猫

使用.NET 4 BlockingCollection,以登记队列使用add(),去队列使用get()。它内部使用非阻塞的ConcurrentQueue。这里有更多的信息快速最佳生产者/消费者队列技术BlockingCollection与并发队列

萧十郎

“怎样才能改善这种情况呢?”那么,您需要查看类中的每个方法,并考虑如果另一个线程同时调用该方法或任何其他方法会发生什么。例如,您在Remove方法中放置了一个锁,而在Add方法中没有。如果一个线程在另一个线程删除的同时添加,会发生什么情况?坏事。还要考虑的是,方法可以返回第二个对象,该对象提供对第一个对象的内部数据的访问-例如GetEnDigator。假设一个线程正在通过该枚举器,另一个线程同时修改列表。不太好。一个好的经验法则是通过将类中的方法数量减少到绝对最小来简化这一点。特别是,不要继承另一个容器类,因为您将公开该类的所有方法,为调用者破坏内部数据提供了一种方法,或者看到对数据的部分完成的更改(同样糟糕,因为数据在此时似乎已损坏)。隐藏所有的细节,并对你如何允许他们的访问是完全无情的。我强烈建议你使用现成的解决方案-得到一本关于线程或使用第三方图书馆的书。否则,考虑到您正在尝试的内容,您将在很长一段时间内调试您的代码。而且,返回一个项(例如,首先添加的项,因为它是一个队列),而不是调用方选择特定项,是否更有意义呢?当队列为空时,也许删除也应该阻塞。更新:Marc的答案实际上实现了所有这些建议!:)但是我将把这个留在这里,因为理解他的版本为什么是这样的改进可能是有帮助的。
打开App,查看更多内容
随时随地看视频慕课网APP