1 module sockjs.connection; 2 3 import vibe.d; 4 import std.stdio; 5 import std.string; 6 import std.regex; 7 import std.array; 8 9 import sockjs.sockjs; 10 import sockjs.sockjsSyntax; 11 12 //version = simulatePollErrors; 13 //version = simulateSendErrors; 14 15 /// 16 public class Connection 17 { 18 public: 19 20 /// 21 this(Server _server, string _remotePeer, string _userId) 22 { 23 m_userId = _userId; 24 m_remotePeer = _remotePeer; 25 m_server = _server; 26 m_queueMutex = new TaskMutex; 27 m_timeoutMutex = new TaskMutex; 28 m_pollCondition = new TaskCondition(m_timeoutMutex); 29 30 m_timeoutTimer = createTimer(&timeout); 31 m_pollTimeout = createTimer(&pollTimeout); 32 m_closeTimer = createTimer(&closeTimeout); 33 34 resetTimeout(); 35 } 36 37 /// 38 public void destroy() 39 { 40 m_timeoutTimer.stop(); 41 m_pollTimeout.stop(); 42 m_closeTimer.stop(); 43 44 m_userId = null; 45 46 m_server = null; 47 m_remotePeer = null; 48 49 m_queueMutex = null; 50 m_timeoutMutex = null; 51 m_pollCondition = null; 52 } 53 54 /// 55 void write(string _msg) 56 { 57 if(isOpen) 58 { 59 synchronized(m_queueMutex) 60 m_outQueue ~= _msg; 61 62 //debug writefln("emit"); 63 64 m_pollCondition.notifyAll(); 65 } 66 } 67 68 /// 69 void close(int _code=0, string _msg="") 70 { 71 if(m_state == State.Open) 72 { 73 m_closeMsg.code = _code; 74 m_closeMsg.msg = _msg; 75 76 doClose(); 77 } 78 } 79 80 /// 81 alias void delegate() EventOnClose; 82 /// 83 alias void delegate(string _msg) EventOnMsg; 84 85 /// 86 @property void onClose(EventOnClose _callback) { m_onClose = _callback; } 87 /// 88 @property void onData(EventOnMsg _callback) { m_onData = _callback; } 89 90 /// 91 @property const string userId() { return m_userId; } 92 /// 93 @property const string remoteAddress() { return m_remotePeer; } 94 /// 95 @property const bool isOpen() { return m_state == State.Open; } 96 97 /// 98 @property string protocol() { return "xhr-polling"; } 99 100 /// 101 package @property CloseMsg closeMsg() const { return m_closeMsg; } 102 103 private: 104 105 /// 106 void flushQueue(HTTPServerResponse res) 107 { 108 auto buffer = appender("a["); 109 110 string[] outQueue; 111 112 synchronized(m_queueMutex) 113 { 114 outQueue = m_outQueue.dup; 115 m_outQueue.length = 0; 116 } 117 118 foreach(s; outQueue) 119 { 120 buffer ~= '"'; 121 buffer ~= s; 122 buffer ~= "\","; 123 } 124 125 string outbody = buffer.data; 126 outbody = outbody[0..$-1]; 127 outbody ~= "]\n"; 128 129 //debug writefln("flush: '%s'",outbody); 130 131 res.writeBody(outbody); 132 } 133 134 /// 135 void timeout() 136 { 137 m_timeoutTimer.stop(); 138 139 doClose(); 140 } 141 142 /// 143 void doClose() 144 { 145 if(m_onClose && m_state==State.Open) 146 m_onClose(); 147 148 m_state = State.Closed; 149 150 if(m_server !is null) 151 m_closeTimer.rearm(m_server.options.connection_blocking.msecs); 152 153 if(m_pollCondition !is null) 154 m_pollCondition.notifyAll(); 155 } 156 157 /// 158 void longPoll(HTTPServerResponse res) 159 { 160 version(simulatePollErrors) 161 { 162 static int pollCount=0; 163 164 pollCount++; 165 166 debug writefln("pollcount: %s",pollCount); 167 168 if(pollCount % 3 == 0) 169 { 170 debug writefln("SIMULATE POLL ERROR"); 171 172 throw new SockJsException("simulate packet loss"); 173 } 174 } 175 176 scope(exit) resetTimeout(); 177 178 m_timeoutTimer.stop(); 179 180 if(isDataPending) 181 { 182 flushQueue(res); 183 } 184 else 185 { 186 m_pollTimeout.rearm(m_server.options.heartbeat_delay.msecs); 187 188 synchronized(m_timeoutMutex) m_pollCondition.wait(); 189 190 m_pollTimeout.stop(); 191 192 if(m_state == State.Closed) 193 { 194 try SockJsSyntax.writeClose(res, m_closeMsg); 195 catch(Throwable e) 196 { 197 logError("closing error: %s",e); 198 } 199 } 200 else 201 { 202 if(isDataPending) 203 { 204 //debug writefln("long poll signaled"); 205 206 flushQueue(res); 207 } 208 else 209 { 210 SockJsSyntax.writeHeartbeat(res); 211 } 212 } 213 } 214 } 215 216 /// 217 void resetTimeout() 218 { 219 if(m_timeoutTimer && m_server !is null) 220 m_timeoutTimer.rearm(m_server.options.disconnect_delay.msecs); 221 } 222 223 /// 224 void pollTimeout() 225 { 226 if(m_pollCondition !is null) 227 m_pollCondition.notifyAll(); 228 } 229 230 /// 231 void closeTimeout() 232 { 233 if(m_server !is null) 234 m_server.connectionClosed(this); 235 } 236 237 /// 238 @property const bool isDataPending() {synchronized(m_queueMutex){return m_outQueue.length > 0;}} 239 240 /// 241 package void handleRequest(bool _send, string _body, HTTPServerResponse res) 242 { 243 if(!_send) 244 { 245 longPoll(res); 246 } 247 else 248 { 249 version(simulateSendErrors) 250 { 251 static int sendCount=0; 252 253 sendCount++; 254 255 debug writefln("sendcount: %s",sendCount); 256 257 if(sendCount % 3 == 0) 258 { 259 debug writefln("SIMULATE SEND ERROR"); 260 261 throw new Exception("simulate packet loss"); 262 } 263 } 264 265 res.statusCode = 404; 266 267 scope(exit) res.writeBody(""); 268 269 if(isOpen) 270 { 271 scope(success) res.statusCode = 204; 272 273 if(_body.length > 4) 274 { 275 auto arr = _body[2..$-2]; 276 277 foreach(e; splitter(arr, regex(q"{","}"))) 278 { 279 if(m_onData) 280 m_onData(e); 281 } 282 } 283 } 284 } 285 } 286 287 private: 288 289 /// 290 public struct CloseMsg 291 { 292 int code; 293 string msg; 294 } 295 296 /// 297 enum State 298 { 299 Open, 300 Closed, 301 } 302 303 EventOnClose m_onClose; 304 EventOnMsg m_onData; 305 306 string m_userId; 307 string m_remotePeer; 308 TaskMutex m_timeoutMutex; 309 TaskMutex m_queueMutex; 310 TaskCondition m_pollCondition; 311 string[] m_outQueue; 312 Server m_server; 313 State m_state = State.Open; 314 CloseMsg m_closeMsg; 315 Timer m_timeoutTimer; 316 Timer m_pollTimeout; 317 Timer m_closeTimer; 318 }