server_client_thread.cpp
00001 00002 /*************************************************************************** 00003 * server_client_thread.cpp - Thread handling Fawkes network client 00004 * 00005 * Created: Fri Nov 17 17:23:24 2006 00006 * Copyright 2006-2007 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <core/exceptions/system.h> 00025 00026 #include <netcomm/fawkes/server_client_thread.h> 00027 #include <netcomm/fawkes/server_thread.h> 00028 #include <netcomm/fawkes/message_queue.h> 00029 #include <netcomm/fawkes/transceiver.h> 00030 #include <netcomm/socket/stream.h> 00031 #include <netcomm/utils/exceptions.h> 00032 #include <core/threading/mutex.h> 00033 #include <core/threading/wait_condition.h> 00034 00035 #include <unistd.h> 00036 00037 namespace fawkes { 00038 00039 /** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h> 00040 * Sending thread for a Fawkes client connected to the server. 00041 * This thread is spawned for each client connected to the server to handle the 00042 * server-side sending 00043 * @ingroup NetComm 00044 * @author Tim Niemueller 00045 */ 00046 00047 class FawkesNetworkServerClientSendThread 00048 : public Thread 00049 { 00050 public: 00051 /** Constructor. 00052 * @param s client stream socket 00053 * @param parent parent FawkesNetworkServerClientThread instance 00054 */ 00055 FawkesNetworkServerClientSendThread(StreamSocket *s, 00056 FawkesNetworkServerClientThread *parent) 00057 : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP) 00058 { 00059 __s = s; 00060 __parent = parent; 00061 __outbound_mutex = new Mutex(); 00062 __outbound_msgqs[0] = new FawkesNetworkMessageQueue(); 00063 __outbound_msgqs[1] = new FawkesNetworkMessageQueue(); 00064 __outbound_active = 0; 00065 __outbound_msgq = __outbound_msgqs[0]; 00066 } 00067 00068 /** Destructor. */ 00069 ~FawkesNetworkServerClientSendThread() 00070 { 00071 for (unsigned int i = 0; i < 2; ++i) { 00072 while ( ! __outbound_msgqs[i]->empty() ) { 00073 FawkesNetworkMessage *m = __outbound_msgqs[i]->front(); 00074 m->unref(); 00075 __outbound_msgqs[i]->pop(); 00076 } 00077 } 00078 delete __outbound_msgqs[0]; 00079 delete __outbound_msgqs[1]; 00080 delete __outbound_mutex; 00081 } 00082 00083 virtual void loop() 00084 { 00085 if ( ! __parent->alive() ) return; 00086 00087 while ( __outbound_havemore ) { 00088 __outbound_mutex->lock(); 00089 __outbound_havemore = false; 00090 FawkesNetworkMessageQueue *q = __outbound_msgq; 00091 __outbound_active = 1 - __outbound_active; 00092 __outbound_msgq = __outbound_msgqs[__outbound_active]; 00093 __outbound_mutex->unlock(); 00094 00095 if ( ! q->empty() ) { 00096 try { 00097 FawkesNetworkTransceiver::send(__s, q); 00098 } catch (ConnectionDiedException &e) { 00099 __parent->connection_died(); 00100 exit(); 00101 } 00102 } 00103 } 00104 } 00105 00106 00107 /** Enqueue message to outbound queue. 00108 * This enqueues the given message to the outbound queue. The message will be send 00109 * in the next loop iteration. 00110 * @param msg message to enqueue 00111 */ 00112 void enqueue(FawkesNetworkMessage *msg) 00113 { 00114 msg->ref(); 00115 __outbound_mutex->lock(); 00116 __outbound_msgq->push(msg); 00117 __outbound_havemore = true; 00118 __outbound_mutex->unlock(); 00119 wakeup(); 00120 } 00121 00122 00123 /** Wait until all data has been sent. */ 00124 void wait_for_all_sent() 00125 { 00126 loop_mutex->lock(); 00127 loop_mutex->unlock(); 00128 } 00129 00130 /** Stub to see name in backtrace for easier debugging. @see Thread::run() */ 00131 protected: virtual void run() { Thread::run(); } 00132 00133 private: 00134 StreamSocket *__s; 00135 FawkesNetworkServerClientThread *__parent; 00136 00137 Mutex *__outbound_mutex; 00138 unsigned int __outbound_active; 00139 bool __outbound_havemore; 00140 FawkesNetworkMessageQueue *__outbound_msgq; 00141 FawkesNetworkMessageQueue *__outbound_msgqs[2]; 00142 00143 }; 00144 00145 00146 /** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h 00147 * Fawkes Network Client Thread for server. 00148 * The FawkesNetworkServerThread spawns an instance of this class for every incoming 00149 * connection. It is then used to handle the client. 00150 * The thread will start another thread, an instance of 00151 * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing 00152 * traffic. 00153 * 00154 * @ingroup NetComm 00155 * @author Tim Niemueller 00156 */ 00157 00158 /** Constructor. 00159 * @param s socket to client 00160 * @param parent parent network thread 00161 */ 00162 FawkesNetworkServerClientThread::FawkesNetworkServerClientThread(StreamSocket *s, 00163 FawkesNetworkServerThread *parent) 00164 : Thread("FawkesNetworkServerClientThread") 00165 { 00166 _s = s; 00167 _parent = parent; 00168 _alive = true; 00169 _clid = 0; 00170 _inbound_queue = new FawkesNetworkMessageQueue(); 00171 00172 _send_slave = new FawkesNetworkServerClientSendThread(_s, this); 00173 00174 set_prepfin_conc_loop(true); 00175 } 00176 00177 00178 /** Destructor. */ 00179 FawkesNetworkServerClientThread::~FawkesNetworkServerClientThread() 00180 { 00181 _send_slave->cancel(); 00182 _send_slave->join(); 00183 delete _send_slave; 00184 delete _s; 00185 delete _inbound_queue; 00186 } 00187 00188 00189 /** Get client ID. 00190 * The client ID can be used to send replies. 00191 * @return client ID 00192 */ 00193 unsigned int 00194 FawkesNetworkServerClientThread::clid() const 00195 { 00196 return _clid; 00197 } 00198 00199 00200 /** Set client ID. 00201 * @param client_id new client ID 00202 */ 00203 void 00204 FawkesNetworkServerClientThread::set_clid(unsigned int client_id) 00205 { 00206 _clid = client_id; 00207 } 00208 00209 00210 /** Receive data. 00211 * Receives data from the network if there is any and then dispatches all 00212 * inbound messages via the parent FawkesNetworkThread::dispatch() 00213 */ 00214 void 00215 FawkesNetworkServerClientThread::recv() 00216 { 00217 try { 00218 FawkesNetworkTransceiver::recv(_s, _inbound_queue); 00219 00220 _inbound_queue->lock(); 00221 while ( ! _inbound_queue->empty() ) { 00222 FawkesNetworkMessage *m = _inbound_queue->front(); 00223 m->set_client_id(_clid); 00224 _parent->dispatch(m); 00225 m->unref(); 00226 _inbound_queue->pop(); 00227 } 00228 _parent->wakeup(); 00229 _inbound_queue->unlock(); 00230 00231 } catch (ConnectionDiedException &e) { 00232 _alive = false; 00233 _s->close(); 00234 _parent->wakeup(); 00235 } 00236 } 00237 00238 00239 void 00240 FawkesNetworkServerClientThread::once() 00241 { 00242 _send_slave->start(); 00243 } 00244 00245 00246 /** Thread loop. 00247 * The client thread loop polls on the socket for 10 ms (wait for events 00248 * on the socket like closed connection or data that can be read). If any 00249 * event occurs it is processed. If the connection died or any other 00250 * error occured the thread is cancelled and the parent FawkesNetworkThread 00251 * is woken up to carry out any action that is needed when a client dies. 00252 * If data is available for reading thedata is received and dispatched 00253 * via recv(). 00254 * Afterwards the outbound message queue is processed and alle messages are 00255 * sent. This is also done if the operation could block (POLL_OUT is not 00256 * honored). 00257 */ 00258 void 00259 FawkesNetworkServerClientThread::loop() 00260 { 00261 if ( ! _alive) { 00262 usleep(1000000); 00263 return; 00264 } 00265 00266 short p = 0; 00267 try { 00268 p = _s->poll(); // block until we got a message 00269 } catch (InterruptedException &e) { 00270 // we just ignore this and try it again 00271 return; 00272 } 00273 00274 if ( (p & Socket::POLL_ERR) || 00275 (p & Socket::POLL_HUP) || 00276 (p & Socket::POLL_RDHUP)) { 00277 _alive = false; 00278 _parent->wakeup(); 00279 } else if ( p & Socket::POLL_IN ) { 00280 // Data can be read 00281 recv(); 00282 } 00283 } 00284 00285 /** Enqueue message to outbound queue. 00286 * This enqueues the given message to the outbound queue. The message will be send 00287 * in the next loop iteration. 00288 * @param msg message to enqueue 00289 */ 00290 void 00291 FawkesNetworkServerClientThread::enqueue(FawkesNetworkMessage *msg) 00292 { 00293 _send_slave->enqueue(msg); 00294 } 00295 00296 00297 /** Check aliveness of connection. 00298 * @return true if connection is still alive, false otherwise. 00299 */ 00300 bool 00301 FawkesNetworkServerClientThread::alive() const 00302 { 00303 return _alive; 00304 } 00305 00306 00307 /** Force sending of all pending outbound messages. 00308 * This is a blocking operation. The current poll will be interrupted by sending 00309 * a signal to this thread (and ignoring it) and then wait for the sending to 00310 * finish. 00311 */ 00312 void 00313 FawkesNetworkServerClientThread::force_send() 00314 { 00315 _send_slave->wait_for_all_sent(); 00316 } 00317 00318 00319 /** Connection died notification. 00320 * To be called only be the send slave thread. 00321 */ 00322 void 00323 FawkesNetworkServerClientThread::connection_died() 00324 { 00325 _alive = false; 00326 _parent->wakeup(); 00327 } 00328 00329 } // end namespace fawkes

