BlockingConcurrentQueue.cs 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. using System.Collections.Concurrent;
  2. namespace AipGateway.API.Domain.Common.Utilities
  3. {
  4. public class BlockingConcurrentQueue<T> : IDisposable
  5. {
  6. private readonly ConcurrentQueue<T> _internalQueue;
  7. private AutoResetEvent _autoResetEvent;
  8. private long _consumed;
  9. private long _isAddingCompleted = 0;
  10. private long _produced;
  11. private long _sleeping;
  12. public BlockingConcurrentQueue()
  13. {
  14. _internalQueue = new ConcurrentQueue<T>();
  15. _produced = 0;
  16. _consumed = 0;
  17. _sleeping = 0;
  18. _autoResetEvent = new AutoResetEvent(false);
  19. }
  20. public bool IsAddingCompleted
  21. {
  22. get { return Interlocked.Read(ref _isAddingCompleted) == 1; }
  23. }
  24. public bool IsCompleted
  25. {
  26. get
  27. {
  28. if (Interlocked.Read(ref _isAddingCompleted) == 1 && _internalQueue.IsEmpty)
  29. return true;
  30. else
  31. return false;
  32. }
  33. }
  34. public void CompleteAdding()
  35. {
  36. Interlocked.Exchange(ref _isAddingCompleted, 1);
  37. }
  38. public void Dispose()
  39. {
  40. _autoResetEvent.Dispose();
  41. }
  42. public void Enqueue(T item)
  43. {
  44. _internalQueue.Enqueue(item);
  45. if (Interlocked.Read(ref _isAddingCompleted) == 1)
  46. throw new InvalidOperationException("Adding Completed.");
  47. Interlocked.Increment(ref _produced);
  48. if (Interlocked.Read(ref _sleeping) == 1)
  49. {
  50. Interlocked.Exchange(ref _sleeping, 0);
  51. _autoResetEvent.Set();
  52. }
  53. }
  54. public bool TryDequeue(out T result)
  55. {
  56. while (true)
  57. {
  58. if (Interlocked.Read(ref _consumed) == Interlocked.Read(ref _produced))
  59. {
  60. Interlocked.Exchange(ref _sleeping, 1);
  61. _autoResetEvent.WaitOne();
  62. }
  63. if (_internalQueue.TryDequeue(out result))
  64. {
  65. Interlocked.Increment(ref _consumed);
  66. return true;
  67. }
  68. }
  69. }
  70. }
  71. }