server_client_thread.cpp

00001
00002 /***************************************************************************
00003  *  server_client_thread.cpp - Thread handling Fawkes network client
00004  *
00005  *  Created: Fri Nov 17 17:23:24 2006
00006  *  Copyright  2006-2007  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023
00024 #include <core/exceptions/system.h>
00025
00026 #include <netcomm/fawkes/server_client_thread.h>
00027 #include <netcomm/fawkes/server_thread.h>
00028 #include <netcomm/fawkes/message_queue.h>
00029 #include <netcomm/fawkes/transceiver.h>
00030 #include <netcomm/socket/stream.h>
00031 #include <netcomm/utils/exceptions.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/threading/wait_condition.h>
00034
00035 #include <unistd.h>
00036
00037 namespace fawkes {
00038 
00039 /** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h>
00040  * Sending thread for a Fawkes client connected to the server.
00041  * This thread is spawned for each client connected to the server to handle the
00042  * server-side sending
00043  * @ingroup NetComm
00044  * @author Tim Niemueller
00045  */
00046
00047 class FawkesNetworkServerClientSendThread
00048   : public Thread
00049 {
00050  public:
00051   /** Constructor.
00052    * @param s client stream socket
00053    * @param parent parent FawkesNetworkServerClientThread instance
00054    */
00055   FawkesNetworkServerClientSendThread(StreamSocket *s,
00056                                       FawkesNetworkServerClientThread *parent)
00057     : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
00058   {
00059     __s = s;
00060     __parent = parent;
00061     __outbound_mutex    = new Mutex();
00062     __outbound_msgqs[0] = new FawkesNetworkMessageQueue();
00063     __outbound_msgqs[1] = new FawkesNetworkMessageQueue();
00064     __outbound_active   = 0;
00065     __outbound_msgq     = __outbound_msgqs[0];
00066   }
00067 
00068   /** Destructor. */
00069   ~FawkesNetworkServerClientSendThread()
00070   {
00071     for (unsigned int i = 0; i < 2; ++i) {
00072       while ( ! __outbound_msgqs[i]->empty() ) {
00073         FawkesNetworkMessage *m = __outbound_msgqs[i]->front();
00074         m->unref();
00075         __outbound_msgqs[i]->pop();
00076       }
00077     }
00078     delete __outbound_msgqs[0];
00079     delete __outbound_msgqs[1];
00080     delete __outbound_mutex;
00081   }
00082
00083   virtual void loop()
00084   {
00085     if ( ! __parent->alive() )  return;
00086
00087     while ( __outbound_havemore ) {
00088       __outbound_mutex->lock();
00089       __outbound_havemore = false;
00090       FawkesNetworkMessageQueue *q = __outbound_msgq;
00091       __outbound_active = 1 - __outbound_active;
00092       __outbound_msgq = __outbound_msgqs[__outbound_active];
00093       __outbound_mutex->unlock();
00094
00095       if ( ! q->empty() ) {
00096         try {
00097           FawkesNetworkTransceiver::send(__s, q);
00098         } catch (ConnectionDiedException &e) {
00099           __parent->connection_died();
00100           exit();
00101         }
00102       }
00103     }
00104   }
00105
00106 
00107   /** Enqueue message to outbound queue.
00108    * This enqueues the given message to the outbound queue. The message will be send
00109    * in the next loop iteration.
00110    * @param msg message to enqueue
00111    */
00112   void enqueue(FawkesNetworkMessage *msg)
00113   {
00114     msg->ref();
00115     __outbound_mutex->lock();
00116     __outbound_msgq->push(msg);
00117     __outbound_havemore = true;
00118     __outbound_mutex->unlock();
00119     wakeup();
00120   }
00121
00122 
00123   /** Wait until all data has been sent. */
00124   void wait_for_all_sent()
00125   {
00126     loop_mutex->lock();
00127     loop_mutex->unlock();
00128   }
00129 
00130  /** Stub to see name in backtrace for easier debugging. @see Thread::run() */
00131  protected: virtual void run() { Thread::run(); }
00132
00133  private:
00134   StreamSocket                    *__s;
00135   FawkesNetworkServerClientThread *__parent;
00136
00137   Mutex                     *__outbound_mutex;
00138   unsigned int               __outbound_active;
00139   bool                       __outbound_havemore;
00140   FawkesNetworkMessageQueue *__outbound_msgq;
00141   FawkesNetworkMessageQueue *__outbound_msgqs[2];
00142
00143 };
00144
00145 
00146 /** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h
00147  * Fawkes Network Client Thread for server.
00148  * The FawkesNetworkServerThread spawns an instance of this class for every incoming
00149  * connection. It is then used to handle the client.
00150  * The thread will start another thread, an instance of
00151  * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing
00152  * traffic.
00153  *
00154  * @ingroup NetComm
00155  * @author Tim Niemueller
00156  */
00157 
00158 /** Constructor.
00159  * @param s socket to client
00160  * @param parent parent network thread
00161  */
00162 FawkesNetworkServerClientThread::FawkesNetworkServerClientThread(StreamSocket *s,
00163                                                                  FawkesNetworkServerThread *parent)
00164   : Thread("FawkesNetworkServerClientThread")
00165 {
00166   _s = s;
00167   _parent = parent;
00168   _alive = true;
00169   _clid = 0;
00170   _inbound_queue = new FawkesNetworkMessageQueue();
00171
00172   _send_slave = new FawkesNetworkServerClientSendThread(_s, this);
00173
00174   set_prepfin_conc_loop(true);
00175 }
00176
00177 
00178 /** Destructor. */
00179 FawkesNetworkServerClientThread::~FawkesNetworkServerClientThread()
00180 {
00181   _send_slave->cancel();
00182   _send_slave->join();
00183   delete _send_slave;
00184   delete _s;
00185   delete _inbound_queue;
00186 }
00187
00188 
00189 /** Get client ID.
00190  * The client ID can be used to send replies.
00191  * @return client ID
00192  */
00193 unsigned int
00194 FawkesNetworkServerClientThread::clid() const
00195 {
00196   return _clid;
00197 }
00198
00199 
00200 /** Set client ID.
00201  * @param client_id new client ID
00202  */
00203 void
00204 FawkesNetworkServerClientThread::set_clid(unsigned int client_id)
00205 {
00206   _clid = client_id;
00207 }
00208
00209 
00210 /** Receive data.
00211  * Receives data from the network if there is any and then dispatches all
00212  * inbound messages via the parent FawkesNetworkThread::dispatch()
00213  */
00214 void
00215 FawkesNetworkServerClientThread::recv()
00216 {
00217   try {
00218     FawkesNetworkTransceiver::recv(_s, _inbound_queue);
00219
00220     _inbound_queue->lock();
00221     while ( ! _inbound_queue->empty() ) {
00222       FawkesNetworkMessage *m = _inbound_queue->front();
00223       m->set_client_id(_clid);
00224       _parent->dispatch(m);
00225       m->unref();
00226       _inbound_queue->pop();
00227     }
00228     _parent->wakeup();
00229     _inbound_queue->unlock();
00230
00231   } catch (ConnectionDiedException &e) {
00232     _alive = false;
00233     _s->close();
00234     _parent->wakeup();
00235   }
00236 }
00237
00238
00239 void
00240 FawkesNetworkServerClientThread::once()
00241 {
00242   _send_slave->start();
00243 }
00244
00245 
00246 /** Thread loop.
00247  * The client thread loop polls on the socket for 10 ms (wait for events
00248  * on the socket like closed connection or data that can be read). If any
00249  * event occurs it is processed. If the connection died or any other
00250  * error occured the thread is cancelled and the parent FawkesNetworkThread
00251  * is woken up to carry out any action that is needed when a client dies.
00252  * If data is available for reading thedata is received and dispatched
00253  * via recv().
00254  * Afterwards the outbound message queue is processed and alle messages are
00255  * sent. This is also done if the operation could block (POLL_OUT is not
00256  * honored).
00257  */
00258 void
00259 FawkesNetworkServerClientThread::loop()
00260 {
00261   if ( ! _alive) {
00262     usleep(1000000);
00263     return;
00264   }
00265
00266   short p = 0;
00267   try {
00268     p = _s->poll(); // block until we got a message
00269   } catch (InterruptedException &e) {
00270     // we just ignore this and try it again
00271     return;
00272   }
00273
00274   if ( (p & Socket::POLL_ERR) ||
00275        (p & Socket::POLL_HUP) ||
00276        (p & Socket::POLL_RDHUP)) {
00277     _alive = false;
00278     _parent->wakeup();
00279   } else if ( p & Socket::POLL_IN ) {
00280     // Data can be read
00281     recv();
00282   }
00283 }
00284 
00285 /** Enqueue message to outbound queue.
00286  * This enqueues the given message to the outbound queue. The message will be send
00287  * in the next loop iteration.
00288  * @param msg message to enqueue
00289  */
00290 void
00291 FawkesNetworkServerClientThread::enqueue(FawkesNetworkMessage *msg)
00292 {
00293   _send_slave->enqueue(msg);
00294 }
00295
00296 
00297 /** Check aliveness of connection.
00298  * @return true if connection is still alive, false otherwise.
00299  */
00300 bool
00301 FawkesNetworkServerClientThread::alive() const
00302 {
00303   return _alive;
00304 }
00305
00306 
00307 /** Force sending of all pending outbound messages.
00308  * This is a blocking operation. The current poll will be interrupted by sending
00309  * a signal to this thread (and ignoring it) and then wait for the sending to
00310  * finish.
00311  */
00312 void
00313 FawkesNetworkServerClientThread::force_send()
00314 {
00315   _send_slave->wait_for_all_sent();
00316 }
00317
00318 
00319 /** Connection died notification.
00320  * To be called only be the send slave thread.
00321  */
00322 void
00323 FawkesNetworkServerClientThread::connection_died()
00324 {
00325   _alive = false;
00326   _parent->wakeup();
00327 }
00328
00329 } // end namespace fawkes