12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- using System.Collections.Concurrent;
- namespace AipGateway.API.Domain.Common.Utilities
- {
- public class BlockingConcurrentQueue<T> : IDisposable
- {
- private readonly ConcurrentQueue<T> _internalQueue;
- private AutoResetEvent _autoResetEvent;
- private long _consumed;
- private long _isAddingCompleted = 0;
- private long _produced;
- private long _sleeping;
- public BlockingConcurrentQueue()
- {
- _internalQueue = new ConcurrentQueue<T>();
- _produced = 0;
- _consumed = 0;
- _sleeping = 0;
- _autoResetEvent = new AutoResetEvent(false);
- }
- public bool IsAddingCompleted
- {
- get { return Interlocked.Read(ref _isAddingCompleted) == 1; }
- }
- public bool IsCompleted
- {
- get
- {
- if (Interlocked.Read(ref _isAddingCompleted) == 1 && _internalQueue.IsEmpty)
- return true;
- else
- return false;
- }
- }
- public void CompleteAdding()
- {
- Interlocked.Exchange(ref _isAddingCompleted, 1);
- }
- public void Dispose()
- {
- _autoResetEvent.Dispose();
- }
- public void Enqueue(T item)
- {
- _internalQueue.Enqueue(item);
- if (Interlocked.Read(ref _isAddingCompleted) == 1)
- throw new InvalidOperationException("Adding Completed.");
- Interlocked.Increment(ref _produced);
- if (Interlocked.Read(ref _sleeping) == 1)
- {
- Interlocked.Exchange(ref _sleeping, 0);
- _autoResetEvent.Set();
- }
- }
- public bool TryDequeue(out T result)
- {
- while (true)
- {
- if (Interlocked.Read(ref _consumed) == Interlocked.Read(ref _produced))
- {
- Interlocked.Exchange(ref _sleeping, 1);
- _autoResetEvent.WaitOne();
- }
- if (_internalQueue.TryDequeue(out result))
- {
- Interlocked.Increment(ref _consumed);
- return true;
- }
- }
- }
- }
- }
|