server_thread.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <netcomm/fawkes/server_thread.h>
00025 #include <netcomm/fawkes/server_client_thread.h>
00026 #include <netcomm/utils/acceptor_thread.h>
00027 #include <netcomm/fawkes/message.h>
00028 #include <netcomm/fawkes/handler.h>
00029 #include <netcomm/fawkes/message_queue.h>
00030 #include <netcomm/fawkes/message_content.h>
00031 #include <core/threading/thread_collector.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/exception.h>
00034
00035 #include <unistd.h>
00036
00037 namespace fawkes {
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 FawkesNetworkServerThread::FawkesNetworkServerThread(unsigned int fawkes_port,
00053 ThreadCollector *thread_collector)
00054 : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
00055 {
00056 this->thread_collector = thread_collector;
00057 clients.clear();
00058 next_client_id = 1;
00059 inbound_messages = new FawkesNetworkMessageQueue();
00060
00061 acceptor_thread = new NetworkAcceptorThread(this, fawkes_port,
00062 "FawkesNetworkAcceptorThread");
00063 if ( thread_collector ) {
00064 thread_collector->add(acceptor_thread);
00065 } else {
00066 acceptor_thread->start();
00067 }
00068 }
00069
00070
00071
00072 FawkesNetworkServerThread::~FawkesNetworkServerThread()
00073 {
00074 for (cit = clients.begin(); cit != clients.end(); ++cit) {
00075 if ( thread_collector ) {
00076 thread_collector->remove((*cit).second);
00077 } else {
00078 (*cit).second->cancel();
00079 (*cit).second->join();
00080 }
00081 delete (*cit).second;
00082 }
00083 if ( thread_collector ) {
00084 thread_collector->remove(acceptor_thread);
00085 } else {
00086 acceptor_thread->cancel();
00087 acceptor_thread->join();
00088 }
00089 delete acceptor_thread;
00090
00091 delete inbound_messages;
00092 }
00093
00094
00095
00096
00097
00098
00099 void
00100 FawkesNetworkServerThread::add_connection(StreamSocket *s) throw()
00101 {
00102 FawkesNetworkServerClientThread *client = new FawkesNetworkServerClientThread(s, this);
00103
00104 clients.lock();
00105 client->set_clid(next_client_id);
00106 if ( thread_collector ) {
00107 thread_collector->add(client);
00108 } else {
00109 client->start();
00110 }
00111 clients[next_client_id] = client;
00112 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00113 (*hit).second->client_connected(next_client_id);
00114 }
00115 ++next_client_id;
00116 clients.unlock();
00117
00118 wakeup();
00119 }
00120
00121
00122
00123
00124
00125 void
00126 FawkesNetworkServerThread::add_handler(FawkesNetworkHandler *handler)
00127 {
00128 handlers.lock();
00129 if ( handlers.find(handler->id()) != handlers.end()) {
00130 handlers.unlock();
00131 throw Exception("Handler already registered");
00132 }
00133 handlers[handler->id()] = handler;
00134 handlers.unlock();
00135 }
00136
00137
00138
00139
00140
00141 void
00142 FawkesNetworkServerThread::remove_handler(FawkesNetworkHandler *handler)
00143 {
00144 handlers.lock();
00145 if( handlers.find(handler->id()) != handlers.end() ) {
00146 handlers.erase(handler->id());
00147 }
00148 handlers.unlock();
00149 }
00150
00151
00152
00153
00154
00155
00156
00157
00158 void
00159 FawkesNetworkServerThread::loop()
00160 {
00161 clients.lock();
00162
00163
00164 cit = clients.begin();
00165 while (cit != clients.end()) {
00166 if ( ! cit->second->alive() ) {
00167 if ( thread_collector ) {
00168 thread_collector->remove((*cit).second);
00169 } else {
00170 cit->second->cancel();
00171 cit->second->join();
00172 }
00173 usleep(5000);
00174 delete cit->second;
00175 unsigned int clid = (*cit).first;
00176 ++cit;
00177 clients.erase(clid);
00178 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00179 (*hit).second->client_disconnected(clid);
00180 }
00181 } else {
00182 ++cit;
00183 }
00184 }
00185
00186
00187 inbound_messages->lock();
00188 while ( ! inbound_messages->empty() ) {
00189 FawkesNetworkMessage *m = inbound_messages->front();
00190 if ( handlers.find(m->cid()) != handlers.end()) {
00191 handlers[m->cid()]->handle_network_message(m);
00192 }
00193 m->unref();
00194 inbound_messages->pop();
00195 }
00196 inbound_messages->unlock();
00197
00198 clients.unlock();
00199 }
00200
00201
00202
00203 void
00204 FawkesNetworkServerThread::force_send()
00205 {
00206 clients.lock();
00207 for (cit = clients.begin(); cit != clients.end(); ++cit) {
00208 (*cit).second->force_send();
00209 }
00210 clients.unlock();
00211 }
00212
00213
00214
00215
00216
00217
00218 void
00219 FawkesNetworkServerThread::broadcast(FawkesNetworkMessage *msg)
00220 {
00221 for (cit = clients.begin(); cit != clients.end(); ++cit) {
00222 if ( (*cit).second->alive() ) {
00223 (*cit).second->enqueue(msg);
00224 }
00225 }
00226 }
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 void
00238 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id,
00239 void *payload, unsigned int payload_size)
00240 {
00241 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id,
00242 payload, payload_size);
00243 broadcast(m);
00244 m->unref();
00245 }
00246
00247
00248
00249
00250
00251
00252 void
00253 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
00254 {
00255 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
00256 broadcast(m);
00257 m->unref();
00258 }
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269 void
00270 FawkesNetworkServerThread::send(FawkesNetworkMessage *msg)
00271 {
00272 unsigned int clid = msg->clid();
00273 if ( clients.find(clid) != clients.end() ) {
00274 if ( clients[clid]->alive() ) {
00275 clients[clid]->enqueue(msg);
00276 } else {
00277 throw Exception("Client %u not alive", clid);
00278 }
00279 } else {
00280 throw Exception("Client %u not found", clid);
00281 }
00282 }
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294 void
00295 FawkesNetworkServerThread::send(unsigned int to_clid,
00296 unsigned short int component_id, unsigned short int msg_id,
00297 void *payload, unsigned int payload_size)
00298 {
00299 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00300 payload, payload_size);
00301 send(m);
00302 m->unref();
00303 }
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314 void
00315 FawkesNetworkServerThread::send(unsigned int to_clid,
00316 unsigned short int component_id, unsigned short int msg_id,
00317 FawkesNetworkMessageContent *content)
00318 {
00319 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00320 content);
00321 send(m);
00322 m->unref();
00323 }
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334 void
00335 FawkesNetworkServerThread::send(unsigned int to_clid,
00336 unsigned short int component_id, unsigned short int msg_id)
00337 {
00338 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
00339 send(m);
00340 m->unref();
00341 }
00342
00343
00344
00345
00346
00347
00348
00349
00350 void
00351 FawkesNetworkServerThread::dispatch(FawkesNetworkMessage *msg)
00352 {
00353 msg->ref();
00354 inbound_messages->push_locked(msg);
00355 }
00356
00357 }