server_thread.cpp

00001
00002 /***************************************************************************
00003  *  server_thread.cpp - Fawkes Network Protocol (server part)
00004  *
00005  *  Created: Sun Nov 19 15:08:30 2006
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023
00024 #include <netcomm/fawkes/server_thread.h>
00025 #include <netcomm/fawkes/server_client_thread.h>
00026 #include <netcomm/utils/acceptor_thread.h>
00027 #include <netcomm/fawkes/message.h>
00028 #include <netcomm/fawkes/handler.h>
00029 #include <netcomm/fawkes/message_queue.h>
00030 #include <netcomm/fawkes/message_content.h>
00031 #include <core/threading/thread_collector.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/exception.h>
00034
00035 #include <unistd.h>
00036
00037 namespace fawkes {
00038 
00039 /** @class FawkesNetworkServerThread <netcomm/fawkes/server_thread.h>
00040  * Fawkes Network Thread.
00041  * Maintains a list of clients and reacts on events triggered by the clients.
00042  * Also runs the acceptor thread.
00043  *
00044  * @ingroup NetComm
00045  * @author Tim Niemueller
00046  */
00047 
00048 /** Constructor.
00049  * @param thread_collector thread collector to register new threads with
00050  * @param fawkes_port port for Fawkes network protocol
00051  */
00052 FawkesNetworkServerThread::FawkesNetworkServerThread(unsigned int fawkes_port,
00053                                                      ThreadCollector *thread_collector)
00054   : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
00055 {
00056   this->thread_collector = thread_collector;
00057   clients.clear();
00058   next_client_id = 1;
00059   inbound_messages = new FawkesNetworkMessageQueue();
00060
00061   acceptor_thread = new NetworkAcceptorThread(this, fawkes_port,
00062                                               "FawkesNetworkAcceptorThread");
00063   if ( thread_collector ) {
00064     thread_collector->add(acceptor_thread);
00065   } else {
00066     acceptor_thread->start();
00067   }
00068 }
00069
00070 
00071 /** Destructor. */
00072 FawkesNetworkServerThread::~FawkesNetworkServerThread()
00073 {
00074   for (cit = clients.begin(); cit != clients.end(); ++cit) {
00075     if ( thread_collector ) {
00076       thread_collector->remove((*cit).second);
00077     } else {
00078       (*cit).second->cancel();
00079       (*cit).second->join();
00080     }
00081     delete (*cit).second;
00082   }
00083   if ( thread_collector ) {
00084     thread_collector->remove(acceptor_thread);
00085   } else {
00086     acceptor_thread->cancel();
00087     acceptor_thread->join();
00088   }
00089   delete acceptor_thread;
00090
00091   delete inbound_messages;
00092 }
00093
00094 
00095 /** Add a new connection.
00096  * Called by the NetworkAcceptorThread if a new client connected.
00097  * @param s socket for new client
00098  */
00099 void
00100 FawkesNetworkServerThread::add_connection(StreamSocket *s) throw()
00101 {
00102   FawkesNetworkServerClientThread *client = new FawkesNetworkServerClientThread(s, this);
00103
00104   clients.lock();
00105   client->set_clid(next_client_id);
00106   if ( thread_collector ) {
00107     thread_collector->add(client);
00108   } else {
00109     client->start();
00110   }
00111   clients[next_client_id] = client;
00112   for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00113     (*hit).second->client_connected(next_client_id);
00114   }
00115   ++next_client_id;
00116   clients.unlock();
00117
00118   wakeup();
00119 }
00120
00121 
00122 /** Add a handler.
00123  * @param handler to add.
00124  */
00125 void
00126 FawkesNetworkServerThread::add_handler(FawkesNetworkHandler *handler)
00127 {
00128   handlers.lock();
00129   if ( handlers.find(handler->id()) != handlers.end()) {
00130     handlers.unlock();
00131     throw Exception("Handler already registered");
00132   }
00133   handlers[handler->id()] = handler;
00134   handlers.unlock();
00135 }
00136
00137 
00138 /** Remove handler.
00139  * @param handler handler to remove
00140  */
00141 void
00142 FawkesNetworkServerThread::remove_handler(FawkesNetworkHandler *handler)
00143 {
00144   handlers.lock();
00145   if( handlers.find(handler->id()) != handlers.end() ) {
00146     handlers.erase(handler->id());
00147   }
00148   handlers.unlock();
00149 }
00150
00151 
00152 /** Fawkes network thread loop.
00153  * The thread loop will check all clients for their alivness and dead
00154  * clients are removed. Then inbound messages are processed and dispatched
00155  * properly to registered handlers. Then the thread waits for a new event
00156  * to happen (event emitting threads need to wakeup this thread!).
00157  */
00158 void
00159 FawkesNetworkServerThread::loop()
00160 {
00161   clients.lock();
00162
00163   // check for dead clients
00164   cit = clients.begin();
00165   while (cit != clients.end()) {
00166     if ( ! cit->second->alive() ) {
00167       if ( thread_collector ) {
00168         thread_collector->remove((*cit).second);
00169       } else {
00170         cit->second->cancel();
00171         cit->second->join();
00172       }
00173       usleep(5000);
00174       delete cit->second;
00175       unsigned int clid = (*cit).first;
00176       ++cit;
00177       clients.erase(clid);
00178       for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00179         (*hit).second->client_disconnected(clid);
00180       }
00181     } else {
00182       ++cit;
00183     }
00184   }
00185
00186   // dispatch messages
00187   inbound_messages->lock();
00188   while ( ! inbound_messages->empty() ) {
00189     FawkesNetworkMessage *m = inbound_messages->front();
00190     if ( handlers.find(m->cid()) != handlers.end()) {
00191       handlers[m->cid()]->handle_network_message(m);
00192     }
00193     m->unref();
00194     inbound_messages->pop();
00195   }
00196   inbound_messages->unlock();
00197
00198   clients.unlock();
00199 }
00200
00201 
00202 /** Force sending of all pending messages. */
00203 void
00204 FawkesNetworkServerThread::force_send()
00205 {
00206   clients.lock();
00207   for (cit = clients.begin(); cit != clients.end(); ++cit) {
00208     (*cit).second->force_send();
00209   }
00210   clients.unlock();
00211 }
00212
00213 
00214 /** Broadcast a message.
00215  * Method to broadcast a message to all connected clients.
00216  * @param msg Message to broadcast
00217  */
00218 void
00219 FawkesNetworkServerThread::broadcast(FawkesNetworkMessage *msg)
00220 {
00221   for (cit = clients.begin(); cit != clients.end(); ++cit) {
00222     if ( (*cit).second->alive() ) {
00223       (*cit).second->enqueue(msg);
00224     }
00225   }
00226 }
00227
00228 
00229 /** Broadcast a message.
00230  * A FawkesNetworkMessage is created and broacasted via the emitter.
00231  * @param component_id component ID
00232  * @param msg_id message type id
00233  * @param payload payload buffer
00234  * @param payload_size size of payload buffer
00235  * @see FawkesNetworkEmitter::broadcast()
00236  */
00237 void
00238 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id,
00239                                void *payload, unsigned int payload_size)
00240 {
00241   FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id,
00242                                                      payload, payload_size);
00243   broadcast(m);
00244   m->unref();
00245 }
00246
00247 
00248 /** Broadcast message without payload.
00249  * @param component_id component ID
00250  * @param msg_id message type ID
00251  */
00252 void
00253 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
00254 {
00255   FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
00256   broadcast(m);
00257   m->unref();
00258 }
00259
00260 
00261 /** Send a message.
00262  * Method to send a message to a specific client.
00263  * The client ID provided in the message is used to determine the correct
00264  * recipient. If no client is connected for the given client ID the message
00265  * shall be silently ignored.
00266  * Implemented Emitter interface message.
00267  * @param msg Message to send
00268  */
00269 void
00270 FawkesNetworkServerThread::send(FawkesNetworkMessage *msg)
00271 {
00272   unsigned int clid = msg->clid();
00273   if ( clients.find(clid) != clients.end() ) {
00274     if ( clients[clid]->alive() ) {
00275       clients[clid]->enqueue(msg);
00276     } else {
00277       throw Exception("Client %u not alive", clid);
00278     }
00279   } else {
00280     throw Exception("Client %u not found", clid);
00281   }
00282 }
00283
00284 
00285 /** Send a message.
00286  * A FawkesNetworkMessage is created and sent via the emitter.
00287  * @param to_clid client ID of recipient
00288  * @param component_id component ID
00289  * @param msg_id message type id
00290  * @param payload payload buffer
00291  * @param payload_size size of payload buffer
00292  * @see FawkesNetworkEmitter::broadcast()
00293  */
00294 void
00295 FawkesNetworkServerThread::send(unsigned int to_clid,
00296                           unsigned short int component_id, unsigned short int msg_id,
00297                            void *payload, unsigned int payload_size)
00298 {
00299   FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00300                                                      payload, payload_size);
00301   send(m);
00302   m->unref();
00303 }
00304
00305 
00306 /** Send a message.
00307  * A FawkesNetworkMessage is created and sent via the emitter.
00308  * @param to_clid client ID of recipient
00309  * @param component_id component ID
00310  * @param msg_id message type id
00311  * @param content Fawkes complex network message content
00312  * @see FawkesNetworkEmitter::broadcast()
00313  */
00314 void
00315 FawkesNetworkServerThread::send(unsigned int to_clid,
00316                           unsigned short int component_id, unsigned short int msg_id,
00317                           FawkesNetworkMessageContent *content)
00318 {
00319   FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00320                                                      content);
00321   send(m);
00322   m->unref();
00323 }
00324
00325 
00326 /** Send a message without payload.
00327  * A FawkesNetworkMessage with empty payload is created and sent via the emitter.
00328  * This is particularly useful for simple status messages that you want to send.
00329  * @param to_clid client ID of recipient
00330  * @param component_id component ID
00331  * @param msg_id message type id
00332  * @see FawkesNetworkEmitter::broadcast()
00333  */
00334 void
00335 FawkesNetworkServerThread::send(unsigned int to_clid,
00336                           unsigned short int component_id, unsigned short int msg_id)
00337 {
00338   FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
00339   send(m);
00340   m->unref();
00341 }
00342
00343 
00344 /** Dispatch messages.
00345  * Actually messages are just put into the inbound message queue and dispatched
00346  * during the next loop iteration. So after adding all the messages you have
00347  * to wakeup the thread to get them actually dispatched.
00348  * @param msg message to dispatch
00349  */
00350 void
00351 FawkesNetworkServerThread::dispatch(FawkesNetworkMessage *msg)
00352 {
00353   msg->ref();
00354   inbound_messages->push_locked(msg);
00355 }
00356
00357 } // end namespace fawkes