remote.cpp

00001
00002 /***************************************************************************
00003  *  remote.h - Remote BlackBoard access via Fawkes network protocol
00004  *
00005  *  Created: Mon Mar 03 10:53:00 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023
00024 #include <blackboard/remote.h>
00025 #include <blackboard/exceptions.h>
00026 #include <blackboard/net/messages.h>
00027 #include <blackboard/net/ilist_content.h>
00028 #include <blackboard/net/interface_proxy.h>
00029 #include <blackboard/internal/notifier.h>
00030 #include <blackboard/internal/instance_factory.h>
00031
00032 #include <interface/interface_info.h>
00033
00034 #include <core/threading/mutex.h>
00035 #include <core/threading/mutex_locker.h>
00036 #include <core/threading/wait_condition.h>
00037 #include <netcomm/fawkes/client.h>
00038
00039 #include <string>
00040 #include <cstring>
00041 #include <fnmatch.h>
00042 #include <arpa/inet.h>
00043
00044 namespace fawkes {
00045 
00046 /** @class RemoteBlackBoard <blackboard/remote.h>
00047  * Remote BlackBoard.
00048  * This class implements the access to a remote BlackBoard using the Fawkes
00049  * network protocol.
00050  *
00051  * @author Tim Niemueller
00052  */
00053 
00054 /** Constructor.
00055  * @param client Fawkes network client to use.
00056  */
00057 RemoteBlackBoard::RemoteBlackBoard(FawkesNetworkClient *client)
00058 {
00059   __fnc = client;
00060   __fnc_owner = false;
00061
00062   if ( ! __fnc->connected() ) {
00063     throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00064   }
00065
00066   __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00067
00068   __mutex = new Mutex();
00069   __notifier = new BlackBoardNotifier();
00070   __instance_factory = new BlackBoardInstanceFactory();
00071
00072   __wait_mutex = new Mutex();
00073   __wait_cond  = new WaitCondition(__wait_mutex);
00074
00075   __m = NULL;
00076 }
00077
00078 
00079 /** Constructor.
00080  * This will internall create a fawkes network client that is used to communicate
00081  * with the remote BlackBoard.
00082  * @param hostname hostname to connect to
00083  * @param port port to connect to
00084  */
00085 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
00086 {
00087   __fnc = new FawkesNetworkClient(hostname, port);
00088   try {
00089     __fnc->connect();
00090   } catch (Exception &e) {
00091     delete __fnc;
00092     throw;
00093   }
00094
00095   __fnc_owner = true;
00096
00097   if ( ! __fnc->connected() ) {
00098     throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00099   }
00100
00101   __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00102
00103   __mutex = new Mutex();
00104   __notifier = new BlackBoardNotifier();
00105   __instance_factory = new BlackBoardInstanceFactory();
00106
00107   __wait_mutex = new Mutex();
00108   __wait_cond  = new WaitCondition(__wait_mutex);
00109
00110   __m = NULL;
00111 }
00112
00113 
00114 /** Destructor. */
00115 RemoteBlackBoard::~RemoteBlackBoard()
00116 {
00117   __fnc->deregister_handler(FAWKES_CID_BLACKBOARD);
00118   delete __mutex;
00119   delete __notifier;
00120   delete __instance_factory;
00121
00122   for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00123     delete __pit->second;
00124   }
00125
00126   if (__fnc_owner) {
00127     __fnc->disconnect();
00128     delete __fnc;
00129   }
00130
00131   delete __wait_cond;
00132   delete __wait_mutex;
00133 }
00134
00135
00136 bool
00137 RemoteBlackBoard::is_alive() const throw()
00138 {
00139   return __fnc->connected();
00140 }
00141
00142
00143 void
00144 RemoteBlackBoard::reopen_interfaces()
00145 {
00146   __proxies.lock();
00147   __ipit = __invalid_proxies.begin();
00148   while ( __ipit != __invalid_proxies.end() ) {
00149     try {
00150       Interface *iface = (*__ipit)->interface();
00151       open_interface(iface->type(), iface->id(), iface->is_writer(), iface);
00152       iface->set_validity(true);
00153       __ipit = __invalid_proxies.erase(__ipit);
00154     } catch (Exception &e) {
00155           // we failed to re-establish validity for the given interface, bad luck
00156       ++__ipit;
00157     }
00158   }
00159   __proxies.unlock();
00160 }
00161
00162 bool
00163 RemoteBlackBoard::try_aliveness_restore() throw()
00164 {
00165   bool rv = true;
00166   try {
00167     if ( ! __fnc->connected() ) {
00168       __fnc->connect();
00169
00170       reopen_interfaces();
00171     }
00172   } catch (...) {
00173     rv = false;
00174   }
00175   return rv;
00176 }
00177
00178
00179 void
00180 RemoteBlackBoard::open_interface(const char *type, const char *identifier,
00181                                  bool writer, Interface *iface)
00182 {
00183   if ( ! __fnc->connected() ) {
00184     throw Exception("Cannot instantiate remote interface, connection is dead");
00185   }
00186
00187   bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
00188   strncpy(om->type, type, __INTERFACE_TYPE_SIZE);
00189   strncpy(om->id, identifier, __INTERFACE_ID_SIZE);
00190   memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE);
00191
00192   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00193                                                         writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
00194                                                         om, sizeof(bb_iopen_msg_t));
00195
00196   __wait_mutex->lock();
00197   __fnc->enqueue(omsg);
00198   omsg->unref();
00199   while (! __m ||
00200          ((__m->msgid() != MSG_BB_OPEN_SUCCESS) &&
00201           (__m->msgid() != MSG_BB_OPEN_FAILURE))) {
00202     if ( __m ) {
00203       __m->unref();
00204       __m = NULL;
00205     }
00206     __wait_cond->wait();
00207   }
00208   __wait_mutex->unlock();
00209
00210   if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) {
00211     // We got the interface, create internal storage and prepare instance for return
00212     BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier,
00213                                                                    iface, writer);
00214     __proxies[proxy->serial()] = proxy;
00215   } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) {
00216     bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>();
00217     unsigned int error = ntohl(fm->errno);
00218     __m->unref();
00219     __m = NULL;
00220     if ( error == BB_ERR_WRITER_EXISTS ) {
00221       throw BlackBoardWriterActiveException(identifier, type);
00222     } else if ( error == BB_ERR_HASH_MISMATCH ) {
00223       throw Exception("Hash mismatch for interface %s:%s", type, identifier);
00224     } else if ( error == BB_ERR_UNKNOWN_TYPE ) {
00225       throw Exception("Type %s unknoen (%s:%s)", type, type, identifier);
00226     } else if ( error == BB_ERR_WRITER_EXISTS ) {
00227       throw BlackBoardWriterActiveException(identifier, type);
00228     } else {
00229       throw Exception("Could not open interface");
00230     }
00231   }
00232
00233   __m->unref();
00234   __m = NULL;
00235 }
00236
00237 Interface *
00238 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer)
00239 {
00240   if ( ! __fnc->connected() ) {
00241     throw Exception("Cannot instantiate remote interface, connection is dead");
00242   }
00243
00244   Interface *iface = __instance_factory->new_interface_instance(type, identifier);
00245   try {
00246     open_interface(type, identifier, writer, iface);
00247   } catch (...) {
00248     __instance_factory->delete_interface_instance(iface);
00249     throw;
00250   }
00251
00252   return iface;
00253 }
00254
00255
00256 Interface *
00257 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier)
00258 {
00259   return open_interface(type, identifier, /* writer? */ false);
00260 }
00261
00262
00263 Interface *
00264 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier)
00265 {
00266   return open_interface(type, identifier, /* writer? */ true);
00267 }
00268
00269
00270 std::list<Interface *>
00271 RemoteBlackBoard::open_multiple_for_reading(const char *type, const char *id_pattern)
00272 {
00273   std::list<Interface *> rv;
00274
00275   InterfaceInfoList *infl = list_all();
00276   for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00277     if ((strncmp(type, i->type(), __INTERFACE_TYPE_SIZE) != 0) ||
00278         (fnmatch(id_pattern, i->id(), 0) == FNM_NOMATCH) ) {
00279       // type or ID prefix does not match, go on
00280       continue;
00281     }
00282
00283     try {
00284       Interface *iface = open_for_reading((*i).type(), (*i).id());
00285       rv.push_back(iface);
00286     } catch (Exception &e) {
00287       for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
00288         close(*j);
00289       }
00290       throw;
00291     }
00292   }
00293
00294   return rv;
00295 }
00296
00297 
00298 /** Close interface.
00299  * @param interface interface to close
00300  */
00301 void
00302 RemoteBlackBoard::close(Interface *interface)
00303 {
00304   if ( interface == NULL )  return;
00305
00306   unsigned int serial = interface->serial();
00307
00308   if ( __proxies.find(serial) != __proxies.end() ) {
00309     delete __proxies[serial];
00310     __proxies.erase(serial);
00311   }
00312
00313   if ( __fnc->connected() ) {
00314     // We cannot "officially" close it, if we are disconnected it cannot be used anyway
00315     bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
00316     sm->serial = htonl(interface->serial());
00317
00318     FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00319                                                           MSG_BB_CLOSE,
00320                                                           sm, sizeof(bb_iserial_msg_t));
00321     __fnc->enqueue(omsg);
00322     omsg->unref();
00323   }
00324
00325   __instance_factory->delete_interface_instance(interface);
00326 }
00327
00328
00329 void
00330 RemoteBlackBoard::register_listener(BlackBoardInterfaceListener *listener, unsigned int flags)
00331 {
00332   __notifier->register_listener(listener, flags);
00333 }
00334
00335
00336 void
00337 RemoteBlackBoard::unregister_listener(BlackBoardInterfaceListener *listener)
00338 {
00339   __notifier->unregister_listener(listener);
00340 }
00341
00342
00343 void
00344 RemoteBlackBoard::register_observer(BlackBoardInterfaceObserver *observer, unsigned int flags)
00345 {
00346   __notifier->register_observer(observer, flags);
00347 }
00348
00349
00350 void
00351 RemoteBlackBoard::unregister_observer(BlackBoardInterfaceObserver *observer)
00352 {
00353   __notifier->unregister_observer(observer);
00354 }
00355
00356
00357 InterfaceInfoList *
00358 RemoteBlackBoard::list_all()
00359 {
00360   MutexLocker lock(__mutex);
00361   InterfaceInfoList *infl = new InterfaceInfoList();
00362
00363   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00364                                                         MSG_BB_LIST_ALL);
00365   __wait_mutex->lock();
00366   __fnc->enqueue(omsg);
00367   omsg->unref();
00368   while (! __m ||
00369          (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
00370     if ( __m ) {
00371       __m->unref();
00372       __m = NULL;
00373     }
00374     __wait_cond->wait();
00375   }
00376   __wait_mutex->unlock();
00377
00378   BlackBoardInterfaceListContent *bbilc = __m->msgc<BlackBoardInterfaceListContent>();
00379   while ( bbilc->has_next() ) {
00380     size_t iisize;
00381     bb_iinfo_msg_t *ii = bbilc->next(&iisize);
00382     infl->append(ii->type, ii->id, ii->hash,  ii->serial,
00383                  ii->has_writer, ii->num_readers);
00384   }
00385
00386   __m->unref();
00387   __m = NULL;
00388
00389   return infl;
00390 }
00391
00392 
00393 /** We are no longer registered in Fawkes network client.
00394  * Ignored.
00395  * @param id the id of the calling client
00396  */
00397 void
00398 RemoteBlackBoard::deregistered(unsigned int id) throw()
00399 {
00400 }
00401
00402
00403 void
00404 RemoteBlackBoard::inbound_received(FawkesNetworkMessage *m,
00405                                    unsigned int id) throw()
00406 {
00407   if ( m->cid() == FAWKES_CID_BLACKBOARD ) {
00408     unsigned int msgid = m->msgid();
00409     try {
00410       if ( msgid == MSG_BB_DATA_CHANGED ) {
00411         unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00412         if ( __proxies.find(serial) != __proxies.end() ) {
00413           __proxies[serial]->process_data_changed(m);
00414         }
00415       } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
00416         unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00417         if ( __proxies.find(serial) != __proxies.end() ) {
00418           __proxies[serial]->process_interface_message(m);
00419         }
00420       } else if (msgid == MSG_BB_READER_ADDED) {
00421         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00422         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00423           __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
00424         }
00425       } else if (msgid == MSG_BB_READER_REMOVED) {
00426         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00427         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00428           __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
00429         }
00430       } else if (msgid == MSG_BB_WRITER_ADDED) {
00431         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00432         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00433           __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
00434         }
00435       } else if (msgid == MSG_BB_WRITER_REMOVED) {
00436         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00437         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00438           __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
00439         }
00440       } else {
00441         __wait_mutex->stopby();
00442         __m = m;
00443         __m->ref();
00444         __wait_cond->wake_all();
00445       }
00446     } catch (Exception &e) {
00447       // Bam, you're dead. Ok, not now, we just ignore that this shit happened...
00448     }
00449   }
00450 }
00451
00452
00453 void
00454 RemoteBlackBoard::connection_died(unsigned int id) throw()
00455 {
00456   // mark all assigned interfaces as invalid
00457   __proxies.lock();
00458   for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00459     __pit->second->interface()->set_validity(false);
00460     __invalid_proxies.push_back(__pit->second);
00461   }
00462   __proxies.clear();
00463   __proxies.unlock();
00464 }
00465
00466
00467 void
00468 RemoteBlackBoard::connection_established(unsigned int id) throw()
00469 {
00470 }
00471
00472 } // end namespace fawkes