using System.Collections.Concurrent; namespace AipGateway.API.Domain.Common.Utilities { public class BlockingConcurrentQueue : IDisposable { private readonly ConcurrentQueue _internalQueue; private AutoResetEvent _autoResetEvent; private long _consumed; private long _isAddingCompleted = 0; private long _produced; private long _sleeping; public BlockingConcurrentQueue() { _internalQueue = new ConcurrentQueue(); _produced = 0; _consumed = 0; _sleeping = 0; _autoResetEvent = new AutoResetEvent(false); } public bool IsAddingCompleted { get { return Interlocked.Read(ref _isAddingCompleted) == 1; } } public bool IsCompleted { get { if (Interlocked.Read(ref _isAddingCompleted) == 1 && _internalQueue.IsEmpty) return true; else return false; } } public void CompleteAdding() { Interlocked.Exchange(ref _isAddingCompleted, 1); } public void Dispose() { _autoResetEvent.Dispose(); } public void Enqueue(T item) { _internalQueue.Enqueue(item); if (Interlocked.Read(ref _isAddingCompleted) == 1) throw new InvalidOperationException("Adding Completed."); Interlocked.Increment(ref _produced); if (Interlocked.Read(ref _sleeping) == 1) { Interlocked.Exchange(ref _sleeping, 0); _autoResetEvent.Set(); } } public bool TryDequeue(out T result) { while (true) { if (Interlocked.Read(ref _consumed) == Interlocked.Read(ref _produced)) { Interlocked.Exchange(ref _sleeping, 1); _autoResetEvent.WaitOne(); } if (_internalQueue.TryDequeue(out result)) { Interlocked.Increment(ref _consumed); return true; } } } } }