1 module dlangui.core.queue; 2 3 import core.sync.condition; 4 import core.sync.mutex; 5 6 class BlockingQueue(T) { 7 8 private Mutex _mutex; 9 private Condition _condition; 10 private T[] _buffer; 11 private int _readPos; 12 private int _writePos; 13 private shared bool _closed; 14 15 this() { 16 _mutex = new Mutex(); 17 _condition = new Condition(_mutex); 18 _readPos = 0; 19 _writePos = 0; 20 } 21 22 ~this() { 23 close(); 24 if (_condition) { 25 destroy(_condition); 26 _condition = null; 27 } 28 if (_mutex) { 29 destroy(_mutex); 30 _mutex = null; 31 } 32 } 33 34 void close() { 35 if (_mutex && !_closed) { 36 synchronized(_mutex) { 37 _closed = true; 38 if (_condition !is null) 39 _condition.notifyAll(); 40 } 41 } else { 42 _closed = true; 43 } 44 } 45 46 /// returns true if queue is closed 47 @property bool closed() { 48 return _closed; 49 } 50 51 private void move() { 52 if (_readPos > 1024 && _readPos > _buffer.length * 3 / 4) { 53 // move buffer data 54 for (int i = 0; _readPos + i < _writePos; i++) 55 _buffer[i] = _buffer[_readPos + i]; 56 _writePos -= _readPos; 57 _readPos = 0; 58 } 59 } 60 61 private void append(ref T item) { 62 if (_writePos >= _buffer.length) { 63 move(); 64 _buffer.length = _buffer.length == 0 ? 64 : _buffer.length * 2; 65 } 66 _buffer[_writePos++] = item; 67 } 68 69 void put(T item) { 70 if (_closed) 71 return; 72 synchronized(_mutex) { 73 if (_closed) 74 return; 75 append(item); 76 _condition.notifyAll(); 77 } 78 } 79 80 void put(T[] items) { 81 if (_closed) 82 return; 83 synchronized(_mutex) { 84 if (_closed) 85 return; 86 foreach(ref item; items) { 87 append(item); 88 } 89 _condition.notifyAll(); 90 } 91 } 92 93 bool get(ref T value, int timeoutMillis = 0) { 94 if (_closed) 95 return false; 96 synchronized(_mutex) { 97 if (_closed) 98 return false; 99 if (_readPos < _writePos) { 100 value = _buffer[_readPos++]; 101 return true; 102 } 103 try { 104 if (timeoutMillis <= 0) 105 _condition.wait(); // no timeout 106 else if (!_condition.wait(dur!"msecs"(timeoutMillis))) 107 return false; // timeout 108 } catch (Exception e) { 109 // ignore 110 } 111 if (_readPos < _writePos) { 112 value = _buffer[_readPos++]; 113 return true; 114 } 115 } 116 return false; 117 } 118 119 bool getAll(ref T[] values, int timeoutMillis) { 120 if (_closed) 121 return false; 122 synchronized(_mutex) { 123 if (_closed) 124 return false; 125 values.length = 0; 126 while (_readPos < _writePos) 127 values ~= _buffer[_readPos++]; 128 if (values.length > 0) 129 return true; 130 if (timeoutMillis <= 0) 131 _condition.wait(); // no timeout 132 else if (!_condition.wait(dur!"msecs"(timeoutMillis))) 133 return false; // timeout 134 while (_readPos < _writePos) 135 values ~= _buffer[_readPos++]; 136 if (values.length > 0) 137 return true; 138 } 139 return false; 140 } 141 } 142 143 alias Runnable = void delegate(); 144 alias RunnableQueue = BlockingQueue!Runnable;