1 module dlangui.core.asyncsocket;
2 
3 import std.socket;
4 import core.thread;
5 import dlangui.core.queue;
6 import dlangui.core.logger;
7 
8 /// Socket state
9 enum SocketState {
10     Disconnected,
11     Connecting,
12     Connected
13 }
14 
15 /// Asynchronous socket interface
16 interface AsyncSocket {
17     @property SocketState state();
18     void connect(string host, ushort port);
19     void disconnect();
20     void send(ubyte[] data);
21 }
22 
23 /// Socket error code
24 enum SocketError {
25     ConnectError,
26     WriteError,
27     NotConnected,
28     AlreadyConnected,
29 }
30 
31 /// Callback interface for using by AsyncSocket implementations
32 interface AsyncSocketCallback {
33     void onDataReceived(AsyncSocket socket, ubyte[] data);
34     void onConnect(AsyncSocket socket);
35     void onDisconnect(AsyncSocket socket);
36     void onError(AsyncSocket socket, SocketError error, string msg);
37 }
38 
39 /// proxy for AsyncConnectionHandler - to call in GUI thread
40 class AsyncSocketCallbackProxy : AsyncSocketCallback {
41 private:
42     AsyncSocketCallback _handler;
43     void delegate(void delegate() runnable) _executor;
44 public:
45     this(AsyncSocketCallback handler, void delegate(void delegate() runnable) executor) {
46         _executor = executor;
47         _handler = handler;
48     }
49     void onDataReceived(AsyncSocket socket, ubyte[] data) {
50         _executor(delegate() {
51             _handler.onDataReceived(socket, data);
52         });
53     }
54     void onConnect(AsyncSocket socket) {
55         _executor(delegate() {
56             _handler.onConnect(socket);
57         });
58     }
59     void onDisconnect(AsyncSocket socket) {
60         _executor(delegate() {
61             _handler.onDisconnect(socket);
62         });
63     }
64     void onError(AsyncSocket socket, SocketError error, string msg) {
65         _executor(delegate() {
66             _handler.onError(socket, error, msg);
67         });
68     }
69 }
70 
71 /// Asynchrous socket which uses separate thread for operation
72 class AsyncClientConnection : Thread, AsyncSocket {
73 protected:
74     Socket _sock;
75     SocketSet _readSet;
76     SocketSet _writeSet;
77     SocketSet _errorSet;
78     RunnableQueue _queue;
79     AsyncSocketCallback _callback;
80     SocketState _state = SocketState.Disconnected;
81     void threadProc() {
82         ubyte[] readBuf = new ubyte[65536];
83         Log.d("entering ClientConnection thread proc");
84         for(;;) {
85             if (_queue.closed)
86                 break;
87             Runnable task;
88             if (_queue.get(task, _sock ? 10 : 1000)) {
89                 if (_queue.closed)
90                     break;
91                 task();
92             }
93             if (_sock) {
94                 _readSet.reset();
95                 _writeSet.reset();
96                 _errorSet.reset();
97                 _readSet.add(_sock);
98                 _writeSet.add(_sock);
99                 _errorSet.add(_sock);
100                 if (Socket.select(_readSet, _writeSet, _errorSet, dur!"msecs"(10)) > 0) {
101                     if (_writeSet.isSet(_sock)) {
102                         if (_state == SocketState.Connecting) {
103                             _state = SocketState.Connected;
104                             _callback.onConnect(this);
105                         }
106                     }
107                     if (_readSet.isSet(_sock)) {
108                         long bytesRead = _sock.receive(readBuf);
109                         if (bytesRead > 0) {
110                             _callback.onDataReceived(this, readBuf[0 .. cast(int)bytesRead].dup);
111                         }
112                     }
113                     if (_errorSet.isSet(_sock)) {
114                         doDisconnect();
115                     }
116                 }
117             }
118         }
119         doDisconnect();
120         Log.d("exiting ClientConnection thread proc");
121     }
122     void doDisconnect() {
123         if (_sock) {
124             _sock.shutdown(SocketShutdown.BOTH);
125             _sock.close();
126             destroy(_sock);
127             _sock = null;
128             if (_state != SocketState.Disconnected) {
129                 _state = SocketState.Disconnected;
130                 _callback.onDisconnect(this);
131             }
132         }
133     }
134 public:
135     this(AsyncSocketCallback cb) {
136         super(&threadProc);
137         _callback = cb;
138         _queue = new RunnableQueue();
139         start();
140     }
141     ~this() {
142         _queue.close();
143         join();
144     }
145     @property SocketState state() {
146         return _state;
147     }
148     void connect(string host, ushort port) {
149         _queue.put(delegate() {
150             if (_state == SocketState.Connecting) {
151                 _callback.onError(this, SocketError.NotConnected, "socket is already connecting");
152                 return;
153             }
154             if (_state == SocketState.Connected) {
155                 _callback.onError(this, SocketError.NotConnected, "socket is already connected");
156                 return;
157             }
158             doDisconnect();
159             _sock = new TcpSocket();
160             _sock.blocking = false;
161             _readSet = new SocketSet();
162             _writeSet = new SocketSet();
163             _errorSet = new SocketSet();
164             _state = SocketState.Connecting;
165             _sock.connect(new InternetAddress(host, port));
166         });
167     }
168     void disconnect() {
169         _queue.put(delegate() {
170             if (!_sock)
171                 return;
172             doDisconnect();
173         });
174     }
175     void send(ubyte[] data) {
176         _queue.put(delegate() {
177             if (!_sock) {
178                 _callback.onError(this, SocketError.NotConnected, "socket is not connected");
179                 return;
180             }
181             for (;;) {
182                 long bytesSent = _sock.send(data);
183                 if (bytesSent == Socket.ERROR) {
184                     _callback.onError(this, SocketError.WriteError, "error while writing to connection");
185                     return;
186                 } else {
187                     //Log.d("Bytes sent:" ~ to!string(bytesSent));
188                     if (bytesSent >= data.length)
189                         return;
190                     data = data[cast(int)bytesSent .. $];
191                 }
192             }
193         });
194     }
195 }