client.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <netcomm/fawkes/client.h>
00025 #include <netcomm/fawkes/client_handler.h>
00026 #include <netcomm/fawkes/message_queue.h>
00027 #include <netcomm/fawkes/transceiver.h>
00028 #include <netcomm/socket/stream.h>
00029 #include <netcomm/utils/exceptions.h>
00030
00031 #include <core/threading/thread.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/threading/mutex_locker.h>
00034 #include <core/threading/wait_condition.h>
00035 #include <core/exceptions/system.h>
00036
00037 #include <list>
00038 #include <cstring>
00039 #include <cstdlib>
00040 #include <unistd.h>
00041
00042 namespace fawkes {
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053 HandlerAlreadyRegisteredException::HandlerAlreadyRegisteredException()
00054 : Exception("A handler for this component has already been registered")
00055 {
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065 class FawkesNetworkClientSendThread : public Thread
00066 {
00067 public:
00068
00069
00070
00071
00072
00073 FawkesNetworkClientSendThread(StreamSocket *s, FawkesNetworkClient *parent)
00074 : Thread("FawkesNetworkClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
00075 {
00076 __s = s;
00077 __parent = parent;
00078 __outbound_mutex = new Mutex();
00079 __outbound_msgqs[0] = new FawkesNetworkMessageQueue();
00080 __outbound_msgqs[1] = new FawkesNetworkMessageQueue();
00081 __outbound_active = 0;
00082 __outbound_msgq = __outbound_msgqs[0];
00083 }
00084
00085
00086 ~FawkesNetworkClientSendThread()
00087 {
00088 for (unsigned int i = 0; i < 2; ++i) {
00089 while ( ! __outbound_msgqs[i]->empty() ) {
00090 FawkesNetworkMessage *m = __outbound_msgqs[i]->front();
00091 m->unref();
00092 __outbound_msgqs[i]->pop();
00093 }
00094 }
00095 delete __outbound_msgqs[0];
00096 delete __outbound_msgqs[1];
00097 delete __outbound_mutex;
00098 }
00099
00100 virtual void once()
00101 {
00102 __parent->set_send_slave_alive();
00103 }
00104
00105 virtual void loop()
00106 {
00107 if ( ! __parent->connected() ) return;
00108
00109 while ( __outbound_havemore ) {
00110 __outbound_mutex->lock();
00111 __outbound_havemore = false;
00112 FawkesNetworkMessageQueue *q = __outbound_msgq;
00113 __outbound_active = 1 - __outbound_active;
00114 __outbound_msgq = __outbound_msgqs[__outbound_active];
00115 __outbound_mutex->unlock();
00116
00117 if ( ! q->empty() ) {
00118 try {
00119 FawkesNetworkTransceiver::send(__s, q);
00120 } catch (ConnectionDiedException &e) {
00121 __parent->connection_died();
00122 exit();
00123 }
00124 }
00125 }
00126 }
00127
00128
00129
00130
00131 void force_send()
00132 {
00133 if ( loop_mutex->try_lock() ) {
00134 loop();
00135 loop_mutex->unlock();
00136 }
00137 }
00138
00139
00140
00141
00142 void enqueue(FawkesNetworkMessage *message)
00143 {
00144 message->ref();
00145 __outbound_mutex->lock();
00146 __outbound_msgq->push(message);
00147 __outbound_havemore = true;
00148 __outbound_mutex->unlock();
00149 wakeup();
00150 }
00151
00152
00153 protected: virtual void run() { Thread::run(); }
00154
00155 private:
00156 StreamSocket *__s;
00157 FawkesNetworkClient *__parent;
00158 Mutex *__outbound_mutex;
00159 unsigned int __outbound_active;
00160 bool __outbound_havemore;
00161 FawkesNetworkMessageQueue *__outbound_msgq;
00162 FawkesNetworkMessageQueue *__outbound_msgqs[2];
00163
00164 };
00165
00166
00167
00168
00169
00170
00171
00172
00173 class FawkesNetworkClientRecvThread : public Thread
00174 {
00175 public:
00176
00177
00178
00179
00180
00181 FawkesNetworkClientRecvThread(StreamSocket *s, FawkesNetworkClient *parent,
00182 Mutex *recv_mutex)
00183 : Thread("FawkesNetworkClientRecvThread")
00184 {
00185 __s = s;
00186 __parent = parent;
00187 __inbound_msgq = new FawkesNetworkMessageQueue();
00188 __recv_mutex = recv_mutex;
00189 }
00190
00191
00192 ~FawkesNetworkClientRecvThread()
00193 {
00194 while ( ! __inbound_msgq->empty() ) {
00195 FawkesNetworkMessage *m = __inbound_msgq->front();
00196 m->unref();
00197 __inbound_msgq->pop();
00198 }
00199 delete __inbound_msgq;
00200 }
00201
00202
00203 void recv()
00204 {
00205 std::list<unsigned int> wakeup_list;
00206
00207 try {
00208 FawkesNetworkTransceiver::recv(__s, __inbound_msgq);
00209
00210 MutexLocker lock(__recv_mutex);
00211
00212 __inbound_msgq->lock();
00213 while ( ! __inbound_msgq->empty() ) {
00214 FawkesNetworkMessage *m = __inbound_msgq->front();
00215 wakeup_list.push_back(m->cid());
00216 __parent->dispatch_message(m);
00217 m->unref();
00218 __inbound_msgq->pop();
00219 }
00220 __inbound_msgq->unlock();
00221
00222 lock.unlock();
00223
00224 wakeup_list.sort();
00225 wakeup_list.unique();
00226 for (std::list<unsigned int>::iterator i = wakeup_list.begin(); i != wakeup_list.end(); ++i) {
00227 __parent->wake_handlers(*i);
00228 }
00229 } catch (ConnectionDiedException &e) {
00230 throw;
00231 }
00232 }
00233
00234 virtual void once()
00235 {
00236 __parent->set_recv_slave_alive();
00237 }
00238
00239 virtual void loop()
00240 {
00241
00242 if (! __s ) return;
00243
00244 short p = 0;
00245 try {
00246 p = __s->poll();
00247 } catch (InterruptedException &e) {
00248 return;
00249 }
00250
00251 if ( (p & Socket::POLL_ERR) ||
00252 (p & Socket::POLL_HUP) ||
00253 (p & Socket::POLL_RDHUP)) {
00254 __parent->connection_died();
00255 exit();
00256 } else if ( p & Socket::POLL_IN ) {
00257
00258 try {
00259 recv();
00260 } catch (ConnectionDiedException &e) {
00261 __parent->connection_died();
00262 exit();
00263 }
00264 }
00265 }
00266
00267
00268 protected: virtual void run() { Thread::run(); }
00269
00270 private:
00271 StreamSocket *__s;
00272 FawkesNetworkClient *__parent;
00273 FawkesNetworkMessageQueue * __inbound_msgq;
00274 Mutex *__recv_mutex;
00275 };
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291 FawkesNetworkClient::FawkesNetworkClient(const char *hostname, unsigned short int port, const char *ip)
00292 {
00293 __hostname = strdup(hostname);
00294 __ip = ip ? strdup(ip) : NULL;
00295 __port = port;
00296
00297 s = NULL;
00298 __send_slave = NULL;
00299 __recv_slave = NULL;
00300
00301 connection_died_recently = false;
00302 __send_slave_alive = false;
00303 __recv_slave_alive = false;
00304
00305 slave_status_mutex = new Mutex();
00306
00307 _id = 0;
00308 _has_id = false;
00309
00310 __recv_mutex = new Mutex();
00311 __recv_waitcond = new WaitCondition(__recv_mutex);
00312 __connest_mutex = new Mutex();
00313 __connest_waitcond = new WaitCondition(__connest_mutex);
00314 __connest = false;
00315 __connest_interrupted = false;
00316 }
00317
00318
00319
00320
00321
00322
00323 FawkesNetworkClient::FawkesNetworkClient()
00324 {
00325 __hostname = NULL;
00326 __ip = NULL;
00327 __port = 0;
00328
00329 s = NULL;
00330 __send_slave = NULL;
00331 __recv_slave = NULL;
00332
00333 connection_died_recently = false;
00334 __send_slave_alive = false;
00335 __recv_slave_alive = false;
00336
00337 slave_status_mutex = new Mutex();
00338
00339 _id = 0;
00340 _has_id = false;
00341
00342 __recv_mutex = new Mutex();
00343 __recv_waitcond = new WaitCondition(__recv_mutex);
00344 __connest_mutex = new Mutex();
00345 __connest_waitcond = new WaitCondition(__connest_mutex);
00346 __connest = false;
00347 __connest_interrupted = false;
00348 }
00349
00350
00351
00352
00353
00354
00355
00356
00357 FawkesNetworkClient::FawkesNetworkClient(unsigned int id, const char *hostname,
00358 unsigned short int port, const char *ip)
00359 {
00360 __hostname = strdup(hostname);
00361 __ip = ip ? strdup(ip) : NULL;
00362 __port = port;
00363
00364 s = NULL;
00365 __send_slave = NULL;
00366 __recv_slave = NULL;
00367
00368 connection_died_recently = false;
00369 __send_slave_alive = false;
00370 __recv_slave_alive = false;
00371
00372 slave_status_mutex = new Mutex();
00373
00374 _id = id;
00375 _has_id = true;
00376
00377 __recv_mutex = new Mutex();
00378 __recv_waitcond = new WaitCondition(__recv_mutex);
00379 __connest_mutex = new Mutex();
00380 __connest_waitcond = new WaitCondition(__connest_mutex);
00381 __connest = false;
00382 __connest_interrupted = false;
00383 }
00384
00385
00386
00387 FawkesNetworkClient::~FawkesNetworkClient()
00388 {
00389 disconnect();
00390
00391 delete s;
00392 if (__hostname) free(__hostname);
00393 if (__ip) free(__ip);
00394 delete slave_status_mutex;
00395
00396 delete __connest_waitcond;
00397 delete __connest_mutex;
00398 delete __recv_waitcond;
00399 delete __recv_mutex;
00400 }
00401
00402
00403
00404
00405
00406
00407 void
00408 FawkesNetworkClient::connect()
00409 {
00410 if ( __hostname == NULL && __ip == NULL) {
00411 throw NullPointerException("Hostname not set. Cannot connect.");
00412 }
00413
00414 if ( s != NULL ) {
00415 disconnect();
00416 }
00417
00418
00419 connection_died_recently = false;
00420
00421 try {
00422 s = new StreamSocket();
00423 s->connect(__ip ? __ip : __hostname, __port);
00424 __send_slave = new FawkesNetworkClientSendThread(s, this);
00425 __send_slave->start();
00426 __recv_slave = new FawkesNetworkClientRecvThread(s, this, __recv_mutex);
00427 __recv_slave->start();
00428 } catch (SocketException &e) {
00429 connection_died_recently = true;
00430 if ( __send_slave ) {
00431 __send_slave->cancel();
00432 __send_slave->join();
00433 delete __send_slave;
00434 __send_slave = NULL;
00435 }
00436 if ( __recv_slave ) {
00437 __recv_slave->cancel();
00438 __recv_slave->join();
00439 delete __recv_slave;
00440 __recv_slave = NULL;
00441 }
00442 __send_slave_alive = false;
00443 __recv_slave_alive = false;
00444 delete s;
00445 s = NULL;
00446 throw;
00447 }
00448
00449 __connest_mutex->lock();
00450 while ( ! __connest && ! __connest_interrupted ) {
00451 __connest_waitcond->wait();
00452 }
00453 bool interrupted = __connest_interrupted;
00454 __connest_interrupted = false;
00455 __connest_mutex->unlock();
00456 if ( interrupted ) {
00457 throw InterruptedException("FawkesNetworkClient::connect()");
00458 }
00459
00460 notify_of_connection_established();
00461 }
00462
00463
00464
00465
00466
00467
00468
00469
00470 void
00471 FawkesNetworkClient::connect(const char *hostname, unsigned short int port)
00472 {
00473 connect(hostname, NULL, port);
00474 }
00475
00476
00477
00478
00479
00480
00481
00482
00483 void
00484 FawkesNetworkClient::connect(const char *hostname, const char *ip, unsigned short int port)
00485 {
00486 if (__hostname) free(__hostname);
00487 if (__ip) free(__ip);
00488 __hostname = strdup(hostname);
00489 __ip = ip ? strdup(ip) : NULL;
00490 __port = port;
00491 connect();
00492 }
00493
00494
00495 void
00496 FawkesNetworkClient::disconnect()
00497 {
00498 if ( s == NULL ) return;
00499
00500 if ( __send_slave_alive ) {
00501 if ( ! connection_died_recently ) {
00502 __send_slave->force_send();
00503
00504 usleep(100000);
00505 }
00506 __send_slave->cancel();
00507 __send_slave->join();
00508 delete __send_slave;
00509 __send_slave = NULL;
00510 }
00511 if ( __recv_slave_alive ) {
00512 __recv_slave->cancel();
00513 __recv_slave->join();
00514 delete __recv_slave;
00515 __recv_slave = NULL;
00516 }
00517 __send_slave_alive = false;
00518 __recv_slave_alive = false;
00519 delete s;
00520 s = NULL;
00521
00522 if (! connection_died_recently) {
00523 connection_died();
00524 }
00525 }
00526
00527
00528
00529
00530
00531
00532 void
00533 FawkesNetworkClient::interrupt_connect()
00534 {
00535 __connest_mutex->lock();
00536 __connest_interrupted = true;
00537 __connest_waitcond->wake_all();
00538 __connest_mutex->unlock();
00539 }
00540
00541
00542
00543
00544
00545 void
00546 FawkesNetworkClient::enqueue(FawkesNetworkMessage *message)
00547 {
00548 if (__send_slave) __send_slave->enqueue(message);
00549 }
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563 void
00564 FawkesNetworkClient::enqueue_and_wait(FawkesNetworkMessage *message,
00565 unsigned int timeout_sec)
00566 {
00567 if (__send_slave && __recv_slave) {
00568 __recv_mutex->lock();
00569 if ( __recv_received.find(message->cid()) != __recv_received.end()) {
00570 __recv_mutex->unlock();
00571 unsigned int cid = message->cid();
00572 throw Exception("There is already a thread waiting for messages of "
00573 "component id %u", cid);
00574 }
00575 __send_slave->enqueue(message);
00576 unsigned int cid = message->cid();
00577 __recv_received[cid] = false;
00578 while (!__recv_received[cid] && ! connection_died_recently) {
00579 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
00580 __recv_received.erase(cid);
00581 __recv_mutex->unlock();
00582 throw TimeoutException("Timeout reached while waiting for incoming message "
00583 "(outgoing was %u:%u)", message->cid(), message->msgid());
00584 }
00585 }
00586 __recv_received.erase(cid);
00587 __recv_mutex->unlock();
00588 message->unref();
00589 } else {
00590 unsigned int cid = message->cid();
00591 unsigned int msgid = message->msgid();
00592 throw Exception("Cannot enqueue given message %u:%u, sender or "
00593 "receiver missing", cid, msgid);
00594 }
00595 }
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605 void
00606 FawkesNetworkClient::register_handler(FawkesNetworkClientHandler *handler,
00607 unsigned int component_id)
00608 {
00609 handlers.lock();
00610 if ( handlers.find(component_id) != handlers.end() ) {
00611 handlers.unlock();
00612 throw HandlerAlreadyRegisteredException();
00613 } else {
00614 handlers[component_id] = handler;
00615 }
00616 handlers.unlock();
00617 }
00618
00619
00620
00621
00622
00623
00624 void
00625 FawkesNetworkClient::deregister_handler(unsigned int component_id)
00626 {
00627 handlers.lock();
00628 if ( handlers.find(component_id) != handlers.end() ) {
00629 handlers[component_id]->deregistered(_id);
00630 handlers.erase(component_id);
00631 }
00632 handlers.unlock();
00633 __recv_mutex->lock();
00634 if (__recv_received.find(component_id) != __recv_received.end()) {
00635 __recv_received[component_id] = true;
00636 __recv_waitcond->wake_all();
00637 }
00638 __recv_mutex->unlock();
00639 }
00640
00641
00642 void
00643 FawkesNetworkClient::dispatch_message(FawkesNetworkMessage *m)
00644 {
00645 unsigned int cid = m->cid();
00646 handlers.lock();
00647 if (handlers.find(cid) != handlers.end()) {
00648 handlers[cid]->inbound_received(m, _id);
00649 }
00650 handlers.unlock();
00651 }
00652
00653
00654 void
00655 FawkesNetworkClient::wake_handlers(unsigned int cid)
00656 {
00657 __recv_mutex->lock();
00658 if (__recv_received.find(cid) != __recv_received.end()) {
00659 __recv_received[cid] = true;
00660 }
00661 __recv_waitcond->wake_all();
00662 __recv_mutex->unlock();
00663 }
00664
00665 void
00666 FawkesNetworkClient::notify_of_connection_dead()
00667 {
00668 __connest_mutex->lock();
00669 __connest = false;
00670 __connest_mutex->unlock();
00671
00672 handlers.lock();
00673 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
00674 i->second->connection_died(_id);
00675 }
00676 handlers.unlock();
00677
00678 __recv_mutex->lock();
00679 __recv_waitcond->wake_all();
00680 __recv_mutex->unlock();
00681 }
00682
00683 void
00684 FawkesNetworkClient::notify_of_connection_established()
00685 {
00686 handlers.lock();
00687 for ( HandlerMap::iterator i = handlers.begin(); i != handlers.end(); ++i ) {
00688 i->second->connection_established(_id);
00689 }
00690 handlers.unlock();
00691 }
00692
00693
00694 void
00695 FawkesNetworkClient::connection_died()
00696 {
00697 connection_died_recently = true;
00698 notify_of_connection_dead();
00699 }
00700
00701
00702 void
00703 FawkesNetworkClient::set_send_slave_alive()
00704 {
00705 slave_status_mutex->lock();
00706 __send_slave_alive = true;
00707 if ( __send_slave_alive && __recv_slave_alive ) {
00708 __connest_mutex->lock();
00709 __connest = true;
00710 __connest_waitcond->wake_all();
00711 __connest_mutex->unlock();
00712 }
00713 slave_status_mutex->unlock();
00714 }
00715
00716
00717 void
00718 FawkesNetworkClient::set_recv_slave_alive()
00719 {
00720 slave_status_mutex->lock();
00721 __recv_slave_alive = true;
00722 if ( __send_slave_alive && __recv_slave_alive ) {
00723 __connest_mutex->lock();
00724 __connest = true;
00725 __connest_waitcond->wake_all();
00726 __connest_mutex->unlock();
00727 }
00728 slave_status_mutex->unlock();
00729 }
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739 void
00740 FawkesNetworkClient::wait(unsigned int component_id, unsigned int timeout_sec)
00741 {
00742 __recv_mutex->lock();
00743 if ( __recv_received.find(component_id) != __recv_received.end()) {
00744 __recv_mutex->unlock();
00745 throw Exception("There is already a thread waiting for messages of "
00746 "component id %u", component_id);
00747 }
00748 __recv_received[component_id] = false;
00749 while (! __recv_received[component_id] && ! connection_died_recently) {
00750 if (!__recv_waitcond->reltimed_wait(timeout_sec, 0)) {
00751 __recv_received.erase(component_id);
00752 __recv_mutex->unlock();
00753 throw TimeoutException("Timeout reached while waiting for incoming message "
00754 "(component %u)", component_id);
00755 }
00756 }
00757 __recv_received.erase(component_id);
00758 __recv_mutex->unlock();
00759 }
00760
00761
00762
00763
00764
00765
00766
00767 void
00768 FawkesNetworkClient::wake(unsigned int component_id)
00769 {
00770 __recv_mutex->lock();
00771 if ( __recv_received.find(component_id) != __recv_received.end()) {
00772 __recv_received[component_id] = true;
00773 }
00774 __recv_waitcond->wake_all();
00775 __recv_mutex->unlock();
00776 }
00777
00778
00779
00780
00781
00782 bool
00783 FawkesNetworkClient::connected() const throw()
00784 {
00785 return (! connection_died_recently && (s != NULL));
00786 }
00787
00788
00789
00790
00791
00792 bool
00793 FawkesNetworkClient::has_id() const
00794 {
00795 return _has_id;
00796 }
00797
00798
00799
00800
00801
00802 unsigned int
00803 FawkesNetworkClient::id() const
00804 {
00805 if ( !_has_id ) {
00806 throw Exception("Trying to get the ID of a client that has no ID");
00807 }
00808
00809 return _id;
00810 }
00811
00812
00813
00814
00815 const char *
00816 FawkesNetworkClient::get_hostname() const
00817 {
00818 return __hostname;
00819 }
00820
00821
00822
00823
00824 const char *
00825 FawkesNetworkClient::get_ip() const
00826 {
00827 return __ip;
00828 }
00829
00830 }