1 module sockjs.connection;
2 
3 import vibe.d;
4 import std.stdio;
5 import std.string;
6 import std.regex;
7 import std.array;
8 
9 import sockjs.sockjs;
10 import sockjs.sockjsSyntax;
11 
12 //version = simulatePollErrors;
13 //version = simulateSendErrors;
14 
15 ///
16 public class Connection
17 {
18 public:
19 
20 	///
21 	this(Server _server, string _remotePeer, string _userId)
22 	{
23 		m_userId = _userId;
24 		m_remotePeer = _remotePeer;
25 		m_server = _server;
26 		m_queueMutex = new TaskMutex;
27 		m_timeoutMutex = new TaskMutex;
28 		m_pollCondition = new TaskCondition(m_timeoutMutex);
29 
30 		m_timeoutTimer = createTimer(&timeout);
31 		m_pollTimeout = createTimer(&pollTimeout);
32 		m_closeTimer = createTimer(&closeTimeout);
33 
34 		resetTimeout();
35 	}
36 
37 	///
38 	public void destroy()
39 	{
40 		m_timeoutTimer.stop();
41 		m_pollTimeout.stop();
42 		m_closeTimer.stop();
43 		
44 		m_userId = null;
45 		
46 		m_server = null;
47 		m_remotePeer = null;
48 		
49 		m_queueMutex = null;
50 		m_timeoutMutex = null;
51 		m_pollCondition = null;
52 	}
53 
54 	///
55 	void write(string _msg)
56 	{
57 		if(isOpen)
58 		{
59 			synchronized(m_queueMutex)
60 				m_outQueue ~= _msg;
61 
62 			//debug writefln("emit");
63 
64 			m_pollCondition.notifyAll();
65 		}
66 	}
67 
68 	///
69 	void close(int _code=0, string _msg="")
70 	{
71 		if(m_state == State.Open)
72 		{
73 			m_closeMsg.code = _code;
74 			m_closeMsg.msg = _msg;
75 
76 			doClose();
77 		}
78 	}
79 
80 	///
81 	alias void delegate() EventOnClose;
82 	///
83 	alias void delegate(string _msg) EventOnMsg;
84 
85 	///
86 	@property void onClose(EventOnClose _callback) { m_onClose = _callback; }
87 	///
88 	@property void onData(EventOnMsg _callback) { m_onData = _callback; }
89 
90 	///
91 	@property const string userId() { return m_userId; }
92 	///
93 	@property const string remoteAddress() { return m_remotePeer; }
94 	///
95 	@property const bool isOpen() { return m_state == State.Open; }
96 
97 	///
98 	@property string protocol() { return "xhr-polling"; }
99 
100 	///
101 	package @property CloseMsg closeMsg() const { return m_closeMsg; }
102 
103 private:
104 
105 	///
106 	void flushQueue(HTTPServerResponse res)
107 	{
108 		auto buffer = appender("a[");
109 
110 		string[] outQueue;
111 
112 		synchronized(m_queueMutex)
113 		{
114 			outQueue = m_outQueue.dup;
115 			m_outQueue.length = 0;
116 		}
117 
118 		foreach(s; outQueue)
119 		{
120 			buffer ~= '"';
121 			buffer ~= s;
122 			buffer ~= "\",";
123 		}
124 
125 		string outbody = buffer.data;
126 		outbody = outbody[0..$-1];
127 		outbody ~= "]\n";
128 
129 		//debug writefln("flush: '%s'",outbody);
130 
131 		res.writeBody(outbody);
132 	}
133 
134 	///
135 	void timeout()
136 	{
137 		m_timeoutTimer.stop();
138 
139 		doClose();
140 	}
141 
142 	///
143 	void doClose()
144 	{
145 		if(m_onClose && m_state==State.Open)
146 			m_onClose();
147 
148 		m_state = State.Closed;
149 
150 		if(m_server !is null)
151 			m_closeTimer.rearm(m_server.options.connection_blocking.msecs);
152 
153 		if(m_pollCondition !is null)
154 			m_pollCondition.notifyAll();
155 	}
156 
157 	///
158 	void longPoll(HTTPServerResponse res)
159 	{
160 		version(simulatePollErrors)
161 		{
162 			static int pollCount=0;
163 
164 			pollCount++;
165 
166 			debug writefln("pollcount: %s",pollCount);
167 
168 			if(pollCount % 3 == 0)
169 			{
170 				debug writefln("SIMULATE POLL ERROR");
171 
172 				throw new SockJsException("simulate packet loss");
173 			}
174 		}
175 
176 		scope(exit) resetTimeout();
177 
178 		m_timeoutTimer.stop();
179 
180 		if(isDataPending)
181 		{
182 			flushQueue(res);
183 		}
184 		else
185 		{
186 			m_pollTimeout.rearm(m_server.options.heartbeat_delay.msecs);
187 
188 			synchronized(m_timeoutMutex) m_pollCondition.wait();
189 
190 			m_pollTimeout.stop();
191 
192 			if(m_state == State.Closed)
193 			{
194 				try SockJsSyntax.writeClose(res, m_closeMsg);
195 				catch(Throwable e)
196 				{
197 					logError("closing error: %s",e);
198 				}
199 			}
200 			else
201 			{
202 				if(isDataPending)
203 				{
204 					//debug writefln("long poll signaled");
205 
206 					flushQueue(res);
207 				}
208 				else
209 				{
210 					SockJsSyntax.writeHeartbeat(res);
211 				}
212 			}
213 		}
214 	}
215 
216 	///
217 	void resetTimeout()
218 	{
219 		if(m_timeoutTimer && m_server !is null)
220 			m_timeoutTimer.rearm(m_server.options.disconnect_delay.msecs);
221 	}
222 
223 	///
224 	void pollTimeout()
225 	{
226 		if(m_pollCondition !is null)
227 			m_pollCondition.notifyAll();
228 	}
229 
230 	///
231 	void closeTimeout()
232 	{
233 		if(m_server !is null)
234 			m_server.connectionClosed(this);
235 	}
236 
237 	///
238 	@property const bool isDataPending() {synchronized(m_queueMutex){return m_outQueue.length > 0;}}
239 
240 	///
241 	package void handleRequest(bool _send, string _body, HTTPServerResponse res)
242 	{
243 		if(!_send)
244 		{
245 			longPoll(res);
246 		}
247 		else
248 		{
249 			version(simulateSendErrors)
250 			{
251 				static int sendCount=0;
252 
253 				sendCount++;
254 
255 				debug writefln("sendcount: %s",sendCount);
256 
257 				if(sendCount % 3 == 0)
258 				{
259 					debug writefln("SIMULATE SEND ERROR");
260 
261 					throw new Exception("simulate packet loss");
262 				}
263 			}
264 
265 			res.statusCode = 404;
266 
267 			scope(exit) res.writeBody("");
268 
269 			if(isOpen)
270 			{
271 				scope(success) res.statusCode = 204;
272 
273 				if(_body.length > 4)
274 				{
275 					auto arr = _body[2..$-2];
276 
277 					foreach(e; splitter(arr, regex(q"{","}")))
278 					{
279 						if(m_onData)
280 							m_onData(e);
281 					}
282 				}
283 			}
284 		}
285 	}
286 
287 private:
288 
289 	///
290 	public struct CloseMsg
291 	{
292 		int		code;
293 		string	msg;
294 	}
295 
296 	///
297 	enum State
298 	{
299 		Open,
300 		Closed,
301 	}
302 
303 	EventOnClose	m_onClose;
304 	EventOnMsg		m_onData;
305 
306 	string			m_userId;
307 	string			m_remotePeer;
308 	TaskMutex		m_timeoutMutex;
309 	TaskMutex		m_queueMutex;
310 	TaskCondition	m_pollCondition;
311 	string[]		m_outQueue;
312 	Server			m_server;
313 	State			m_state = State.Open;
314 	CloseMsg		m_closeMsg;
315 	Timer			m_timeoutTimer;
316 	Timer			m_pollTimeout;
317 	Timer			m_closeTimer;
318 }