1 module dlangui.core.queue;
2
3 import core.sync.condition;
4 import core.sync.mutex;
5
6 class BlockingQueue(T) {
7
8 private Mutex _mutex;
9 private Condition _condition;
10 private T[] _buffer;
11 private int _readPos;
12 private int _writePos;
13 private shared bool _closed;
14
15 this() {
16 _mutex = new Mutex();
17 _condition = new Condition(_mutex);
18 _readPos = 0;
19 _writePos = 0;
20 }
21
22 ~this() {
23 close();
24 if (_condition) {
25 destroy(_condition);
26 _condition = null;
27 }
28 if (_mutex) {
29 destroy(_mutex);
30 _mutex = null;
31 }
32 }
33
34 void close() {
35 if (_mutex && !_closed) {
36 synchronized(_mutex) {
37 _closed = true;
38 if (_condition !is null)
39 _condition.notifyAll();
40 }
41 } else {
42 _closed = true;
43 }
44 }
45
46 /// returns true if queue is closed
47 @property bool closed() {
48 return _closed;
49 }
50
51 private void move() {
52 if (_readPos > 1024 && _readPos > _buffer.length * 3 / 4) {
53 // move buffer data
54 for (int i = 0; _readPos + i < _writePos; i++)
55 _buffer[i] = _buffer[_readPos + i];
56 _writePos -= _readPos;
57 _readPos = 0;
58 }
59 }
60
61 private void append(ref T item) {
62 if (_writePos >= _buffer.length) {
63 move();
64 _buffer.length = _buffer.length == 0 ? 64 : _buffer.length * 2;
65 }
66 _buffer[_writePos++] = item;
67 }
68
69 void put(T item) {
70 if (_closed)
71 return;
72 synchronized(_mutex) {
73 if (_closed)
74 return;
75 append(item);
76 _condition.notifyAll();
77 }
78 }
79
80 void put(T[] items) {
81 if (_closed)
82 return;
83 synchronized(_mutex) {
84 if (_closed)
85 return;
86 foreach(ref item; items) {
87 append(item);
88 }
89 _condition.notifyAll();
90 }
91 }
92
93 bool get(ref T value, int timeoutMillis = 0) {
94 if (_closed)
95 return false;
96 synchronized(_mutex) {
97 if (_closed)
98 return false;
99 if (_readPos < _writePos) {
100 value = _buffer[_readPos++];
101 return true;
102 }
103 try {
104 if (timeoutMillis <= 0)
105 _condition.wait(); // no timeout
106 else if (!_condition.wait(dur!"msecs"(timeoutMillis)))
107 return false; // timeout
108 } catch (Exception e) {
109 // ignore
110 }
111 if (_readPos < _writePos) {
112 value = _buffer[_readPos++];
113 return true;
114 }
115 }
116 return false;
117 }
118
119 bool getAll(ref T[] values, int timeoutMillis) {
120 if (_closed)
121 return false;
122 synchronized(_mutex) {
123 if (_closed)
124 return false;
125 values.length = 0;
126 while (_readPos < _writePos)
127 values ~= _buffer[_readPos++];
128 if (values.length > 0)
129 return true;
130 if (timeoutMillis <= 0)
131 _condition.wait(); // no timeout
132 else if (!_condition.wait(dur!"msecs"(timeoutMillis)))
133 return false; // timeout
134 while (_readPos < _writePos)
135 values ~= _buffer[_readPos++];
136 if (values.length > 0)
137 return true;
138 }
139 return false;
140 }
141 }
142
143 alias Runnable = void delegate();
144 alias RunnableQueue = BlockingQueue!Runnable;