qa_bb_remote.cpp

00001
00002 /***************************************************************************
00003  *  qa_bb_remote.cpp - BlackBoard remote access QA
00004  *
00005  *  Created: Mon Mar 03 17:31:18 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023
00024 
00025 /// @cond QA
00026 
00027 #include <blackboard/local.h>
00028 #include <blackboard/remote.h>
00029 #include <blackboard/exceptions.h>
00030 #include <blackboard/bbconfig.h>
00031 #include <blackboard/interface_listener.h>
00032
00033 #include <interfaces/TestInterface.h>
00034
00035 #include <interface/interface_info.h>
00036 #include <core/exceptions/system.h>
00037 #include <netcomm/fawkes/client.h>
00038 #include <netcomm/fawkes/server_thread.h>
00039
00040 #include <signal.h>
00041 #include <cstdlib>
00042 #include <cstring>
00043
00044 #include <iostream>
00045 #include <vector>
00046
00047 using namespace std;
00048 using namespace fawkes;
00049
00050
00051 bool quit = false;
00052
00053 void
00054 signal_handler(int signum)
00055 {
00056   quit = true;
00057 }
00058
00059
00060 #define NUM_CHUNKS 5
00061 
00062 void
00063 test_messaging(TestInterface *ti_reader, TestInterface *ti_writer)
00064 {
00065   while (! quit) {
00066     int expval = ti_reader->test_int() + 1;
00067     TestInterface::SetTestIntMessage *m = new TestInterface::SetTestIntMessage(expval);
00068     unsigned int msgid = ti_reader->msgq_enqueue(m);
00069     printf("Sent with message ID %u\n", msgid);
00070
00071     if ( ti_writer->msgq_size() > 1 ) {
00072       cout << "Error, more than one message! flushing." << endl;
00073       ti_writer->msgq_flush();
00074     }
00075
00076     usleep(100000);
00077
00078     if ( ti_writer->msgq_first() != NULL ) {
00079       if ( ti_writer->msgq_first_is<TestInterface::SetTestStringMessage>() ) {
00080         TestInterface::SetTestStringMessage *msg = ti_writer->msgq_first(msg);
00081         printf("Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n", msg->id());
00082       }
00083       if ( ti_writer->msgq_first_is<TestInterface::SetTestIntMessage>() ) {
00084         TestInterface::SetTestIntMessage *m2 = ti_writer->msgq_first<TestInterface::SetTestIntMessage>();
00085         printf("Received message with ID %u\n", m2->id());
00086         ti_writer->set_test_int( m2->test_int() );
00087         try {
00088           ti_writer->write();
00089         } catch (InterfaceWriteDeniedException &e) {
00090           cout << "BUG: caught write denied exception" << endl;
00091           e.print_trace();
00092         }
00093         ti_writer->msgq_pop();
00094       } else {
00095         cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl;
00096       }
00097
00098       usleep(100000);
00099
00100       //cout << "Reading value from reader interface.. " << flush;
00101       ti_reader->read();
00102       int val = ti_reader->test_int();
00103       if ( val == expval ) {
00104         //cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
00105       } else {
00106         cout << " failure, value is " << ti_reader->test_int() << ", expected "
00107              << expval << endl;
00108       }
00109     } else {
00110       printf("No message in queue, if network test this means the message was dropped\n");
00111     }
00112
00113     usleep(10);
00114   }
00115 }
00116
00117 class SyncInterfaceListener : public fawkes::BlackBoardInterfaceListener
00118 {
00119 public:
00120   SyncInterfaceListener(fawkes::Interface *reader,
00121                         fawkes::Interface *writer,
00122                         fawkes::BlackBoard *reader_bb,
00123                         fawkes::BlackBoard *writer_bb)
00124     : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id())
00125   {
00126     __reader    = reader;
00127     __writer    = writer;
00128     __reader_bb = reader_bb;
00129     __writer_bb = writer_bb;
00130
00131     bbil_add_data_interface(__reader);
00132     bbil_add_message_interface(__writer);
00133
00134     __reader_bb->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
00135     __writer_bb->register_listener(this, BlackBoard::BBIL_FLAG_MESSAGES);
00136   }
00137
00138 
00139   /** Destructor. */
00140   ~SyncInterfaceListener()
00141   {
00142     __reader_bb->unregister_listener(this);
00143     __writer_bb->unregister_listener(this);
00144   }
00145
00146
00147   bool
00148   bb_interface_message_received(Interface *interface,
00149                                 Message *message) throw()
00150   {
00151     try {
00152       if ( interface == __writer ) {
00153         printf("%s: Forwarding message\n", bbil_name());
00154         Message *m = message->clone();
00155         m->set_hops(message->hops());
00156         m->ref();
00157         __reader->msgq_enqueue(m);
00158         message->set_id(m->id());
00159         m->unref();
00160         return false;
00161       } else {
00162         // Don't know why we were called, let 'em enqueue
00163         printf("%s: Message received for unknown interface\n", bbil_name());
00164         return true;
00165       }
00166     } catch (Exception &e) {
00167       printf("%s: Exception when message received\n", bbil_name());
00168       e.print_trace();
00169       return false;
00170     }
00171   }
00172
00173
00174   void
00175   bb_interface_data_changed(Interface *interface) throw()
00176   {
00177     try {
00178       if ( interface == __reader ) {
00179         //__logger->log_debug(bbil_name(), "Copying data");
00180         __reader->read();
00181         __writer->copy_values(__reader);
00182         __writer->write();
00183       } else {
00184         // Don't know why we were called, let 'em enqueue
00185         printf("%s: Data changed for unknown interface", bbil_name());
00186       }
00187     } catch (Exception &e) {
00188       printf("%s: Exception when data changed\n", bbil_name());
00189       e.print_trace();
00190     }
00191   }
00192
00193  private:
00194   fawkes::Interface  *__writer;
00195   fawkes::Interface  *__reader;
00196
00197   fawkes::BlackBoard *__writer_bb;
00198   fawkes::BlackBoard *__reader_bb;
00199
00200 };
00201
00202
00203 int
00204 main(int argc, char **argv)
00205 {
00206   signal(SIGINT, signal_handler);
00207
00208   LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00209   BlackBoard *lbb = llbb;
00210
00211   FawkesNetworkServerThread  *fns = new FawkesNetworkServerThread(1910);
00212   fns->start();
00213
00214   llbb->start_nethandler(fns);
00215
00216   BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910);
00217
00218   InterfaceInfoList *infl = rbb->list_all();
00219   for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00220     const unsigned char *hash = (*i).hash();
00221     char phash[__INTERFACE_HASH_SIZE * 2 + 1];
00222     memset(phash, 0, sizeof(phash));
00223     for (unsigned int j = 0; j < __INTERFACE_HASH_SIZE; ++j) {
00224       sprintf(&phash[j * 2], "%02x", hash[j]);
00225     }
00226     printf("%s::%s (%s), w:%i  r:%u  s:%u\n",
00227            (*i).type(), (*i).id(), phash, (*i).has_writer(),
00228            (*i).num_readers(), (*i).serial());
00229   }
00230   delete infl;
00231
00232   //TestInterface *ti_writer;
00233   TestInterface *ti_reader;
00234   TestInterface *ti_writer;
00235   try {
00236     cout << "Opening interfaces.. " << flush;
00237     ti_writer = rbb->open_for_writing<TestInterface>("SomeID");
00238     ti_reader = rbb->open_for_reading<TestInterface>("SomeID");
00239     cout << "success, "
00240          << "writer hash=" << ti_writer->hash_printable()
00241          << "  reader hash=" << ti_reader->hash_printable()
00242          << endl;
00243   } catch (Exception &e) {
00244     cout << "failed! Aborting" << endl;
00245     e.print_trace();
00246     exit(1);
00247   }
00248
00249   try {
00250     cout << "Trying to open second writer.. " << flush;
00251     TestInterface *ti_writer_two;
00252     ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID");
00253     cout << "BUG: Detection of second writer did NOT work!" << endl;
00254     exit(2);
00255   } catch (BlackBoardWriterActiveException &e) {
00256     cout << "exception caught as expected, detected and prevented second writer!" << endl;
00257   }
00258
00259   try {
00260     cout << "Trying to open third writer.. " << flush;
00261     TestInterface *ti_writer_three;
00262     ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID");
00263     cout << "No exception as expected, different ID ok!" << endl;
00264     rbb->close(ti_writer_three);
00265   } catch (BlackBoardWriterActiveException &e) {
00266     cout << "BUG: Third writer with different ID detected as another writer!" << endl;
00267     exit(3);
00268   }
00269
00270   cout << endl << endl
00271        << "Running data tests ==================================================" << endl;
00272
00273   cout << "Writing initial value ("
00274        << TestInterface::TEST_CONSTANT << ") into interface as TestInt" << endl;
00275   ti_writer->set_test_int( TestInterface::TEST_CONSTANT );
00276   try {
00277     ti_writer->write();
00278   } catch (InterfaceWriteDeniedException &e) {
00279     cout << "BUG: caught write denied exception" << endl;
00280     e.print_trace();
00281   }
00282
00283   cout << "Giving some time to have value processed" << endl;
00284   usleep(100000);
00285
00286   cout << "Reading value from reader interface.. " << flush;
00287   ti_reader->read();
00288   int val = ti_reader->test_int();
00289   if ( val == TestInterface::TEST_CONSTANT ) {
00290     cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
00291   } else {
00292     cout << " failure, value is " << ti_reader->test_int() << ", expected "
00293          << TestInterface::TEST_CONSTANT << endl;
00294   }
00295
00296   cout << "Closing interfaces.. " << flush;
00297   try {
00298     rbb->close(ti_reader);
00299     rbb->close(ti_writer);
00300     cout << "done" << endl;
00301   } catch (Exception &e) {
00302     cout << "failed" << endl;
00303     e.print_trace();
00304   }
00305
00306   cout << endl << endl << "Starting MESSAGING tests" << endl
00307        << "Press Ctrl-C to continue with next test" << endl << endl;
00308
00309   ti_writer = lbb->open_for_writing<TestInterface>("Messaging");
00310   ti_reader = rbb->open_for_reading<TestInterface>("Messaging");
00311
00312   printf("Writer serial: %u  shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00313   printf("Reader serial: %u  shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00314
00315   test_messaging(ti_reader, ti_writer);
00316
00317   rbb->close(ti_reader);
00318   lbb->close(ti_writer);
00319
00320   cout << endl << endl << "Starting MESSAGING tests, doing repeater scenario" << endl
00321        << "Press Ctrl-C to continue with next test" << endl << endl;
00322   quit = false;
00323
00324   delete rbb;
00325
00326   LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00327
00328   FawkesNetworkServerThread  *repfns = new FawkesNetworkServerThread(1911);
00329   repfns->start();
00330
00331   repllbb->start_nethandler(repfns);
00332
00333   BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911);
00334   rbb = new RemoteBlackBoard("localhost", 1911);
00335
00336   TestInterface *rep_reader;
00337   TestInterface *rep_writer;
00338
00339   ti_writer = rbb->open_for_writing<TestInterface>("Messaging");
00340   ti_reader = lbb->open_for_reading<TestInterface>("Messaging");
00341
00342   rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging");
00343   rep_writer = lbb->open_for_writing<TestInterface>("Messaging");
00344
00345   printf("Writer serial: %u  shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00346   printf("Reader serial: %u  shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00347
00348   SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb);
00349
00350   test_messaging(ti_reader, ti_writer);
00351
00352   delete sil;
00353   lbb->close(ti_reader);
00354   rbb->close(ti_writer);
00355   rep_rbb->close(rep_reader);
00356   lbb->close(rep_writer);
00357   delete repllbb;
00358   delete rep_rbb;
00359
00360   cout << "Tests done" << endl;
00361
00362   delete rbb;
00363   delete llbb;
00364   delete fns;
00365 }
00366
00367 
00368 /// @endcond