00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <blackboard/remote.h>
00025 #include <blackboard/exceptions.h>
00026 #include <blackboard/net/messages.h>
00027 #include <blackboard/net/ilist_content.h>
00028 #include <blackboard/net/interface_proxy.h>
00029 #include <blackboard/internal/notifier.h>
00030 #include <blackboard/internal/instance_factory.h>
00031
00032 #include <interface/interface_info.h>
00033
00034 #include <core/threading/mutex.h>
00035 #include <core/threading/mutex_locker.h>
00036 #include <core/threading/wait_condition.h>
00037 #include <netcomm/fawkes/client.h>
00038
00039 #include <string>
00040 #include <cstring>
00041 #include <fnmatch.h>
00042 #include <arpa/inet.h>
00043
00044 namespace fawkes {
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057 RemoteBlackBoard::RemoteBlackBoard(FawkesNetworkClient *client)
00058 {
00059 __fnc = client;
00060 __fnc_owner = false;
00061
00062 if ( ! __fnc->connected() ) {
00063 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00064 }
00065
00066 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00067
00068 __mutex = new Mutex();
00069 __notifier = new BlackBoardNotifier();
00070 __instance_factory = new BlackBoardInstanceFactory();
00071
00072 __wait_mutex = new Mutex();
00073 __wait_cond = new WaitCondition(__wait_mutex);
00074
00075 __m = NULL;
00076 }
00077
00078
00079
00080
00081
00082
00083
00084
00085 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
00086 {
00087 __fnc = new FawkesNetworkClient(hostname, port);
00088 try {
00089 __fnc->connect();
00090 } catch (Exception &e) {
00091 delete __fnc;
00092 throw;
00093 }
00094
00095 __fnc_owner = true;
00096
00097 if ( ! __fnc->connected() ) {
00098 throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00099 }
00100
00101 __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00102
00103 __mutex = new Mutex();
00104 __notifier = new BlackBoardNotifier();
00105 __instance_factory = new BlackBoardInstanceFactory();
00106
00107 __wait_mutex = new Mutex();
00108 __wait_cond = new WaitCondition(__wait_mutex);
00109
00110 __m = NULL;
00111 }
00112
00113
00114
00115 RemoteBlackBoard::~RemoteBlackBoard()
00116 {
00117 __fnc->deregister_handler(FAWKES_CID_BLACKBOARD);
00118 delete __mutex;
00119 delete __notifier;
00120 delete __instance_factory;
00121
00122 for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00123 delete __pit->second;
00124 }
00125
00126 if (__fnc_owner) {
00127 __fnc->disconnect();
00128 delete __fnc;
00129 }
00130
00131 delete __wait_cond;
00132 delete __wait_mutex;
00133 }
00134
00135
00136 bool
00137 RemoteBlackBoard::is_alive() const throw()
00138 {
00139 return __fnc->connected();
00140 }
00141
00142
00143 void
00144 RemoteBlackBoard::reopen_interfaces()
00145 {
00146 __proxies.lock();
00147 __ipit = __invalid_proxies.begin();
00148 while ( __ipit != __invalid_proxies.end() ) {
00149 try {
00150 Interface *iface = (*__ipit)->interface();
00151 open_interface(iface->type(), iface->id(), iface->is_writer(), iface);
00152 iface->set_validity(true);
00153 __ipit = __invalid_proxies.erase(__ipit);
00154 } catch (Exception &e) {
00155
00156 ++__ipit;
00157 }
00158 }
00159 __proxies.unlock();
00160 }
00161
00162 bool
00163 RemoteBlackBoard::try_aliveness_restore() throw()
00164 {
00165 bool rv = true;
00166 try {
00167 if ( ! __fnc->connected() ) {
00168 __fnc->connect();
00169
00170 reopen_interfaces();
00171 }
00172 } catch (...) {
00173 rv = false;
00174 }
00175 return rv;
00176 }
00177
00178
00179 void
00180 RemoteBlackBoard::open_interface(const char *type, const char *identifier,
00181 bool writer, Interface *iface)
00182 {
00183 if ( ! __fnc->connected() ) {
00184 throw Exception("Cannot instantiate remote interface, connection is dead");
00185 }
00186
00187 bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
00188 strncpy(om->type, type, __INTERFACE_TYPE_SIZE);
00189 strncpy(om->id, identifier, __INTERFACE_ID_SIZE);
00190 memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE);
00191
00192 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00193 writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
00194 om, sizeof(bb_iopen_msg_t));
00195
00196 __wait_mutex->lock();
00197 __fnc->enqueue(omsg);
00198 omsg->unref();
00199 while (! __m ||
00200 ((__m->msgid() != MSG_BB_OPEN_SUCCESS) &&
00201 (__m->msgid() != MSG_BB_OPEN_FAILURE))) {
00202 if ( __m ) {
00203 __m->unref();
00204 __m = NULL;
00205 }
00206 __wait_cond->wait();
00207 }
00208 __wait_mutex->unlock();
00209
00210 if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) {
00211
00212 BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier,
00213 iface, writer);
00214 __proxies[proxy->serial()] = proxy;
00215 } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) {
00216 bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>();
00217 unsigned int error = ntohl(fm->errno);
00218 __m->unref();
00219 __m = NULL;
00220 if ( error == BB_ERR_WRITER_EXISTS ) {
00221 throw BlackBoardWriterActiveException(identifier, type);
00222 } else if ( error == BB_ERR_HASH_MISMATCH ) {
00223 throw Exception("Hash mismatch for interface %s:%s", type, identifier);
00224 } else if ( error == BB_ERR_UNKNOWN_TYPE ) {
00225 throw Exception("Type %s unknoen (%s:%s)", type, type, identifier);
00226 } else if ( error == BB_ERR_WRITER_EXISTS ) {
00227 throw BlackBoardWriterActiveException(identifier, type);
00228 } else {
00229 throw Exception("Could not open interface");
00230 }
00231 }
00232
00233 __m->unref();
00234 __m = NULL;
00235 }
00236
00237 Interface *
00238 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer)
00239 {
00240 if ( ! __fnc->connected() ) {
00241 throw Exception("Cannot instantiate remote interface, connection is dead");
00242 }
00243
00244 Interface *iface = __instance_factory->new_interface_instance(type, identifier);
00245 try {
00246 open_interface(type, identifier, writer, iface);
00247 } catch (...) {
00248 __instance_factory->delete_interface_instance(iface);
00249 throw;
00250 }
00251
00252 return iface;
00253 }
00254
00255
00256 Interface *
00257 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier)
00258 {
00259 return open_interface(type, identifier, false);
00260 }
00261
00262
00263 Interface *
00264 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier)
00265 {
00266 return open_interface(type, identifier, true);
00267 }
00268
00269
00270 std::list<Interface *>
00271 RemoteBlackBoard::open_multiple_for_reading(const char *type, const char *id_pattern)
00272 {
00273 std::list<Interface *> rv;
00274
00275 InterfaceInfoList *infl = list_all();
00276 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00277 if ((strncmp(type, i->type(), __INTERFACE_TYPE_SIZE) != 0) ||
00278 (fnmatch(id_pattern, i->id(), 0) == FNM_NOMATCH) ) {
00279
00280 continue;
00281 }
00282
00283 try {
00284 Interface *iface = open_for_reading((*i).type(), (*i).id());
00285 rv.push_back(iface);
00286 } catch (Exception &e) {
00287 for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
00288 close(*j);
00289 }
00290 throw;
00291 }
00292 }
00293
00294 return rv;
00295 }
00296
00297
00298
00299
00300
00301 void
00302 RemoteBlackBoard::close(Interface *interface)
00303 {
00304 if ( interface == NULL ) return;
00305
00306 unsigned int serial = interface->serial();
00307
00308 if ( __proxies.find(serial) != __proxies.end() ) {
00309 delete __proxies[serial];
00310 __proxies.erase(serial);
00311 }
00312
00313 if ( __fnc->connected() ) {
00314
00315 bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
00316 sm->serial = htonl(interface->serial());
00317
00318 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00319 MSG_BB_CLOSE,
00320 sm, sizeof(bb_iserial_msg_t));
00321 __fnc->enqueue(omsg);
00322 omsg->unref();
00323 }
00324
00325 __instance_factory->delete_interface_instance(interface);
00326 }
00327
00328
00329 void
00330 RemoteBlackBoard::register_listener(BlackBoardInterfaceListener *listener, unsigned int flags)
00331 {
00332 __notifier->register_listener(listener, flags);
00333 }
00334
00335
00336 void
00337 RemoteBlackBoard::unregister_listener(BlackBoardInterfaceListener *listener)
00338 {
00339 __notifier->unregister_listener(listener);
00340 }
00341
00342
00343 void
00344 RemoteBlackBoard::register_observer(BlackBoardInterfaceObserver *observer, unsigned int flags)
00345 {
00346 __notifier->register_observer(observer, flags);
00347 }
00348
00349
00350 void
00351 RemoteBlackBoard::unregister_observer(BlackBoardInterfaceObserver *observer)
00352 {
00353 __notifier->unregister_observer(observer);
00354 }
00355
00356
00357 InterfaceInfoList *
00358 RemoteBlackBoard::list_all()
00359 {
00360 MutexLocker lock(__mutex);
00361 InterfaceInfoList *infl = new InterfaceInfoList();
00362
00363 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00364 MSG_BB_LIST_ALL);
00365 __wait_mutex->lock();
00366 __fnc->enqueue(omsg);
00367 omsg->unref();
00368 while (! __m ||
00369 (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
00370 if ( __m ) {
00371 __m->unref();
00372 __m = NULL;
00373 }
00374 __wait_cond->wait();
00375 }
00376 __wait_mutex->unlock();
00377
00378 BlackBoardInterfaceListContent *bbilc = __m->msgc<BlackBoardInterfaceListContent>();
00379 while ( bbilc->has_next() ) {
00380 size_t iisize;
00381 bb_iinfo_msg_t *ii = bbilc->next(&iisize);
00382 infl->append(ii->type, ii->id, ii->hash, ii->serial,
00383 ii->has_writer, ii->num_readers);
00384 }
00385
00386 __m->unref();
00387 __m = NULL;
00388
00389 return infl;
00390 }
00391
00392
00393
00394
00395
00396
00397 void
00398 RemoteBlackBoard::deregistered(unsigned int id) throw()
00399 {
00400 }
00401
00402
00403 void
00404 RemoteBlackBoard::inbound_received(FawkesNetworkMessage *m,
00405 unsigned int id) throw()
00406 {
00407 if ( m->cid() == FAWKES_CID_BLACKBOARD ) {
00408 unsigned int msgid = m->msgid();
00409 try {
00410 if ( msgid == MSG_BB_DATA_CHANGED ) {
00411 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00412 if ( __proxies.find(serial) != __proxies.end() ) {
00413 __proxies[serial]->process_data_changed(m);
00414 }
00415 } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
00416 unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00417 if ( __proxies.find(serial) != __proxies.end() ) {
00418 __proxies[serial]->process_interface_message(m);
00419 }
00420 } else if (msgid == MSG_BB_READER_ADDED) {
00421 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00422 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00423 __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
00424 }
00425 } else if (msgid == MSG_BB_READER_REMOVED) {
00426 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00427 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00428 __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
00429 }
00430 } else if (msgid == MSG_BB_WRITER_ADDED) {
00431 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00432 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00433 __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
00434 }
00435 } else if (msgid == MSG_BB_WRITER_REMOVED) {
00436 bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00437 if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00438 __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
00439 }
00440 } else {
00441 __wait_mutex->stopby();
00442 __m = m;
00443 __m->ref();
00444 __wait_cond->wake_all();
00445 }
00446 } catch (Exception &e) {
00447
00448 }
00449 }
00450 }
00451
00452
00453 void
00454 RemoteBlackBoard::connection_died(unsigned int id) throw()
00455 {
00456
00457 __proxies.lock();
00458 for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00459 __pit->second->interface()->set_validity(false);
00460 __invalid_proxies.push_back(__pit->second);
00461 }
00462 __proxies.clear();
00463 __proxies.unlock();
00464 }
00465
00466
00467 void
00468 RemoteBlackBoard::connection_established(unsigned int id) throw()
00469 {
00470 }
00471
00472 }