1 module dlangui.core.asyncsocket;
2
3 import std.socket;
4 import core.thread;
5 import dlangui.core.queue;
6 import dlangui.core.logger;
7
8 /// Socket state
9 enum SocketState {
10 Disconnected,
11 Connecting,
12 Connected
13 }
14
15 /// Asynchronous socket interface
16 interface AsyncSocket {
17 @property SocketState state();
18 void connect(string host, ushort port);
19 void disconnect();
20 void send(ubyte[] data);
21 }
22
23 /// Socket error code
24 enum SocketError {
25 ConnectError,
26 WriteError,
27 NotConnected,
28 AlreadyConnected,
29 }
30
31 /// Callback interface for using by AsyncSocket implementations
32 interface AsyncSocketCallback {
33 void onDataReceived(AsyncSocket socket, ubyte[] data);
34 void onConnect(AsyncSocket socket);
35 void onDisconnect(AsyncSocket socket);
36 void onError(AsyncSocket socket, SocketError error, string msg);
37 }
38
39 /// proxy for AsyncConnectionHandler - to call in GUI thread
40 class AsyncSocketCallbackProxy : AsyncSocketCallback {
41 private:
42 AsyncSocketCallback _handler;
43 void delegate(void delegate() runnable) _executor;
44 public:
45 this(AsyncSocketCallback handler, void delegate(void delegate() runnable) executor) {
46 _executor = executor;
47 _handler = handler;
48 }
49 void onDataReceived(AsyncSocket socket, ubyte[] data) {
50 _executor(delegate() {
51 _handler.onDataReceived(socket, data);
52 });
53 }
54 void onConnect(AsyncSocket socket) {
55 _executor(delegate() {
56 _handler.onConnect(socket);
57 });
58 }
59 void onDisconnect(AsyncSocket socket) {
60 _executor(delegate() {
61 _handler.onDisconnect(socket);
62 });
63 }
64 void onError(AsyncSocket socket, SocketError error, string msg) {
65 _executor(delegate() {
66 _handler.onError(socket, error, msg);
67 });
68 }
69 }
70
71 /// Asynchrous socket which uses separate thread for operation
72 class AsyncClientConnection : Thread, AsyncSocket {
73 protected:
74 Socket _sock;
75 SocketSet _readSet;
76 SocketSet _writeSet;
77 SocketSet _errorSet;
78 RunnableQueue _queue;
79 AsyncSocketCallback _callback;
80 SocketState _state = SocketState.Disconnected;
81 void threadProc() {
82 ubyte[] readBuf = new ubyte[65536];
83 Log.d("entering ClientConnection thread proc");
84 for(;;) {
85 if (_queue.closed)
86 break;
87 Runnable task;
88 if (_queue.get(task, _sock ? 10 : 1000)) {
89 if (_queue.closed)
90 break;
91 task();
92 }
93 if (_sock) {
94 _readSet.reset();
95 _writeSet.reset();
96 _errorSet.reset();
97 _readSet.add(_sock);
98 _writeSet.add(_sock);
99 _errorSet.add(_sock);
100 if (Socket.select(_readSet, _writeSet, _errorSet, dur!"msecs"(10)) > 0) {
101 if (_writeSet.isSet(_sock)) {
102 if (_state == SocketState.Connecting) {
103 _state = SocketState.Connected;
104 _callback.onConnect(this);
105 }
106 }
107 if (_readSet.isSet(_sock)) {
108 long bytesRead = _sock.receive(readBuf);
109 if (bytesRead > 0) {
110 _callback.onDataReceived(this, readBuf[0 .. cast(int)bytesRead].dup);
111 }
112 }
113 if (_errorSet.isSet(_sock)) {
114 doDisconnect();
115 }
116 }
117 }
118 }
119 doDisconnect();
120 Log.d("exiting ClientConnection thread proc");
121 }
122 void doDisconnect() {
123 if (_sock) {
124 _sock.shutdown(SocketShutdown.BOTH);
125 _sock.close();
126 destroy(_sock);
127 _sock = null;
128 if (_state != SocketState.Disconnected) {
129 _state = SocketState.Disconnected;
130 _callback.onDisconnect(this);
131 }
132 }
133 }
134 public:
135 this(AsyncSocketCallback cb) {
136 super(&threadProc);
137 _callback = cb;
138 _queue = new RunnableQueue();
139 start();
140 }
141 ~this() {
142 _queue.close();
143 join();
144 }
145 @property SocketState state() {
146 return _state;
147 }
148 void connect(string host, ushort port) {
149 _queue.put(delegate() {
150 if (_state == SocketState.Connecting) {
151 _callback.onError(this, SocketError.NotConnected, "socket is already connecting");
152 return;
153 }
154 if (_state == SocketState.Connected) {
155 _callback.onError(this, SocketError.NotConnected, "socket is already connected");
156 return;
157 }
158 doDisconnect();
159 _sock = new TcpSocket();
160 _sock.blocking = false;
161 _readSet = new SocketSet();
162 _writeSet = new SocketSet();
163 _errorSet = new SocketSet();
164 _state = SocketState.Connecting;
165 _sock.connect(new InternetAddress(host, port));
166 });
167 }
168 void disconnect() {
169 _queue.put(delegate() {
170 if (!_sock)
171 return;
172 doDisconnect();
173 });
174 }
175 void send(ubyte[] data) {
176 _queue.put(delegate() {
177 if (!_sock) {
178 _callback.onError(this, SocketError.NotConnected, "socket is not connected");
179 return;
180 }
181 for (;;) {
182 long bytesSent = _sock.send(data);
183 if (bytesSent == Socket.ERROR) {
184 _callback.onError(this, SocketError.WriteError, "error while writing to connection");
185 return;
186 } else {
187 //Log.d("Bytes sent:" ~ to!string(bytesSent));
188 if (bytesSent >= data.length)
189 return;
190 data = data[cast(int)bytesSent .. $];
191 }
192 }
193 });
194 }
195 }