1 module dlangui.core.asyncsocket; 2 3 import std.socket; 4 import core.thread; 5 import dlangui.core.queue; 6 import dlangui.core.logger; 7 8 /// Socket state 9 enum SocketState { 10 Disconnected, 11 Connecting, 12 Connected 13 } 14 15 /// Asynchronous socket interface 16 interface AsyncSocket { 17 @property SocketState state(); 18 void connect(string host, ushort port); 19 void disconnect(); 20 void send(ubyte[] data); 21 } 22 23 /// Socket error code 24 enum SocketError { 25 ConnectError, 26 WriteError, 27 NotConnected, 28 AlreadyConnected, 29 } 30 31 /// Callback interface for using by AsyncSocket implementations 32 interface AsyncSocketCallback { 33 void onDataReceived(AsyncSocket socket, ubyte[] data); 34 void onConnect(AsyncSocket socket); 35 void onDisconnect(AsyncSocket socket); 36 void onError(AsyncSocket socket, SocketError error, string msg); 37 } 38 39 /// proxy for AsyncConnectionHandler - to call in GUI thread 40 class AsyncSocketCallbackProxy : AsyncSocketCallback { 41 private: 42 AsyncSocketCallback _handler; 43 void delegate(void delegate() runnable) _executor; 44 public: 45 this(AsyncSocketCallback handler, void delegate(void delegate() runnable) executor) { 46 _executor = executor; 47 _handler = handler; 48 } 49 void onDataReceived(AsyncSocket socket, ubyte[] data) { 50 _executor(delegate() { 51 _handler.onDataReceived(socket, data); 52 }); 53 } 54 void onConnect(AsyncSocket socket) { 55 _executor(delegate() { 56 _handler.onConnect(socket); 57 }); 58 } 59 void onDisconnect(AsyncSocket socket) { 60 _executor(delegate() { 61 _handler.onDisconnect(socket); 62 }); 63 } 64 void onError(AsyncSocket socket, SocketError error, string msg) { 65 _executor(delegate() { 66 _handler.onError(socket, error, msg); 67 }); 68 } 69 } 70 71 /// Asynchrous socket which uses separate thread for operation 72 class AsyncClientConnection : Thread, AsyncSocket { 73 protected: 74 Socket _sock; 75 SocketSet _readSet; 76 SocketSet _writeSet; 77 SocketSet _errorSet; 78 RunnableQueue _queue; 79 AsyncSocketCallback _callback; 80 SocketState _state = SocketState.Disconnected; 81 void threadProc() { 82 ubyte[] readBuf = new ubyte[65536]; 83 Log.d("entering ClientConnection thread proc"); 84 for(;;) { 85 if (_queue.closed) 86 break; 87 Runnable task; 88 if (_queue.get(task, _sock ? 10 : 1000)) { 89 if (_queue.closed) 90 break; 91 task(); 92 } 93 if (_sock) { 94 _readSet.reset(); 95 _writeSet.reset(); 96 _errorSet.reset(); 97 _readSet.add(_sock); 98 _writeSet.add(_sock); 99 _errorSet.add(_sock); 100 if (Socket.select(_readSet, _writeSet, _errorSet, dur!"msecs"(10)) > 0) { 101 if (_writeSet.isSet(_sock)) { 102 if (_state == SocketState.Connecting) { 103 _state = SocketState.Connected; 104 _callback.onConnect(this); 105 } 106 } 107 if (_readSet.isSet(_sock)) { 108 long bytesRead = _sock.receive(readBuf); 109 if (bytesRead > 0) { 110 _callback.onDataReceived(this, readBuf[0 .. cast(int)bytesRead].dup); 111 } 112 } 113 if (_errorSet.isSet(_sock)) { 114 doDisconnect(); 115 } 116 } 117 } 118 } 119 doDisconnect(); 120 Log.d("exiting ClientConnection thread proc"); 121 } 122 void doDisconnect() { 123 if (_sock) { 124 _sock.shutdown(SocketShutdown.BOTH); 125 _sock.close(); 126 destroy(_sock); 127 _sock = null; 128 if (_state != SocketState.Disconnected) { 129 _state = SocketState.Disconnected; 130 _callback.onDisconnect(this); 131 } 132 } 133 } 134 public: 135 this(AsyncSocketCallback cb) { 136 super(&threadProc); 137 _callback = cb; 138 _queue = new RunnableQueue(); 139 start(); 140 } 141 ~this() { 142 _queue.close(); 143 join(); 144 } 145 @property SocketState state() { 146 return _state; 147 } 148 void connect(string host, ushort port) { 149 _queue.put(delegate() { 150 if (_state == SocketState.Connecting) { 151 _callback.onError(this, SocketError.NotConnected, "socket is already connecting"); 152 return; 153 } 154 if (_state == SocketState.Connected) { 155 _callback.onError(this, SocketError.NotConnected, "socket is already connected"); 156 return; 157 } 158 doDisconnect(); 159 _sock = new TcpSocket(); 160 _sock.blocking = false; 161 _readSet = new SocketSet(); 162 _writeSet = new SocketSet(); 163 _errorSet = new SocketSet(); 164 _state = SocketState.Connecting; 165 _sock.connect(new InternetAddress(host, port)); 166 }); 167 } 168 void disconnect() { 169 _queue.put(delegate() { 170 if (!_sock) 171 return; 172 doDisconnect(); 173 }); 174 } 175 void send(ubyte[] data) { 176 _queue.put(delegate() { 177 if (!_sock) { 178 _callback.onError(this, SocketError.NotConnected, "socket is not connected"); 179 return; 180 } 181 for (;;) { 182 long bytesSent = _sock.send(data); 183 if (bytesSent == Socket.ERROR) { 184 _callback.onError(this, SocketError.WriteError, "error while writing to connection"); 185 return; 186 } else { 187 //Log.d("Bytes sent:" ~ to!string(bytesSent)); 188 if (bytesSent >= data.length) 189 return; 190 data = data[cast(int)bytesSent .. $]; 191 } 192 } 193 }); 194 } 195 }