00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include <blackboard/local.h>
00028 #include <blackboard/remote.h>
00029 #include <blackboard/exceptions.h>
00030 #include <blackboard/bbconfig.h>
00031 #include <blackboard/interface_listener.h>
00032
00033 #include <interfaces/TestInterface.h>
00034
00035 #include <interface/interface_info.h>
00036 #include <core/exceptions/system.h>
00037 #include <netcomm/fawkes/client.h>
00038 #include <netcomm/fawkes/server_thread.h>
00039
00040 #include <signal.h>
00041 #include <cstdlib>
00042 #include <cstring>
00043
00044 #include <iostream>
00045 #include <vector>
00046
00047 using namespace std;
00048 using namespace fawkes;
00049
00050
00051 bool quit = false;
00052
00053 void
00054 signal_handler(int signum)
00055 {
00056 quit = true;
00057 }
00058
00059
00060 #define NUM_CHUNKS 5
00061
00062 void
00063 test_messaging(TestInterface *ti_reader, TestInterface *ti_writer)
00064 {
00065 while (! quit) {
00066 int expval = ti_reader->test_int() + 1;
00067 TestInterface::SetTestIntMessage *m = new TestInterface::SetTestIntMessage(expval);
00068 unsigned int msgid = ti_reader->msgq_enqueue(m);
00069 printf("Sent with message ID %u\n", msgid);
00070
00071 if ( ti_writer->msgq_size() > 1 ) {
00072 cout << "Error, more than one message! flushing." << endl;
00073 ti_writer->msgq_flush();
00074 }
00075
00076 usleep(100000);
00077
00078 if ( ti_writer->msgq_first() != NULL ) {
00079 if ( ti_writer->msgq_first_is<TestInterface::SetTestStringMessage>() ) {
00080 TestInterface::SetTestStringMessage *msg = ti_writer->msgq_first(msg);
00081 printf("Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n", msg->id());
00082 }
00083 if ( ti_writer->msgq_first_is<TestInterface::SetTestIntMessage>() ) {
00084 TestInterface::SetTestIntMessage *m2 = ti_writer->msgq_first<TestInterface::SetTestIntMessage>();
00085 printf("Received message with ID %u\n", m2->id());
00086 ti_writer->set_test_int( m2->test_int() );
00087 try {
00088 ti_writer->write();
00089 } catch (InterfaceWriteDeniedException &e) {
00090 cout << "BUG: caught write denied exception" << endl;
00091 e.print_trace();
00092 }
00093 ti_writer->msgq_pop();
00094 } else {
00095 cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl;
00096 }
00097
00098 usleep(100000);
00099
00100
00101 ti_reader->read();
00102 int val = ti_reader->test_int();
00103 if ( val == expval ) {
00104
00105 } else {
00106 cout << " failure, value is " << ti_reader->test_int() << ", expected "
00107 << expval << endl;
00108 }
00109 } else {
00110 printf("No message in queue, if network test this means the message was dropped\n");
00111 }
00112
00113 usleep(10);
00114 }
00115 }
00116
00117 class SyncInterfaceListener : public fawkes::BlackBoardInterfaceListener
00118 {
00119 public:
00120 SyncInterfaceListener(fawkes::Interface *reader,
00121 fawkes::Interface *writer,
00122 fawkes::BlackBoard *reader_bb,
00123 fawkes::BlackBoard *writer_bb)
00124 : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id())
00125 {
00126 __reader = reader;
00127 __writer = writer;
00128 __reader_bb = reader_bb;
00129 __writer_bb = writer_bb;
00130
00131 bbil_add_data_interface(__reader);
00132 bbil_add_message_interface(__writer);
00133
00134 __reader_bb->register_listener(this, BlackBoard::BBIL_FLAG_DATA);
00135 __writer_bb->register_listener(this, BlackBoard::BBIL_FLAG_MESSAGES);
00136 }
00137
00138
00139
00140 ~SyncInterfaceListener()
00141 {
00142 __reader_bb->unregister_listener(this);
00143 __writer_bb->unregister_listener(this);
00144 }
00145
00146
00147 bool
00148 bb_interface_message_received(Interface *interface,
00149 Message *message) throw()
00150 {
00151 try {
00152 if ( interface == __writer ) {
00153 printf("%s: Forwarding message\n", bbil_name());
00154 Message *m = message->clone();
00155 m->set_hops(message->hops());
00156 m->ref();
00157 __reader->msgq_enqueue(m);
00158 message->set_id(m->id());
00159 m->unref();
00160 return false;
00161 } else {
00162
00163 printf("%s: Message received for unknown interface\n", bbil_name());
00164 return true;
00165 }
00166 } catch (Exception &e) {
00167 printf("%s: Exception when message received\n", bbil_name());
00168 e.print_trace();
00169 return false;
00170 }
00171 }
00172
00173
00174 void
00175 bb_interface_data_changed(Interface *interface) throw()
00176 {
00177 try {
00178 if ( interface == __reader ) {
00179
00180 __reader->read();
00181 __writer->copy_values(__reader);
00182 __writer->write();
00183 } else {
00184
00185 printf("%s: Data changed for unknown interface", bbil_name());
00186 }
00187 } catch (Exception &e) {
00188 printf("%s: Exception when data changed\n", bbil_name());
00189 e.print_trace();
00190 }
00191 }
00192
00193 private:
00194 fawkes::Interface *__writer;
00195 fawkes::Interface *__reader;
00196
00197 fawkes::BlackBoard *__writer_bb;
00198 fawkes::BlackBoard *__reader_bb;
00199
00200 };
00201
00202
00203 int
00204 main(int argc, char **argv)
00205 {
00206 signal(SIGINT, signal_handler);
00207
00208 LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00209 BlackBoard *lbb = llbb;
00210
00211 FawkesNetworkServerThread *fns = new FawkesNetworkServerThread(1910);
00212 fns->start();
00213
00214 llbb->start_nethandler(fns);
00215
00216 BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910);
00217
00218 InterfaceInfoList *infl = rbb->list_all();
00219 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00220 const unsigned char *hash = (*i).hash();
00221 char phash[__INTERFACE_HASH_SIZE * 2 + 1];
00222 memset(phash, 0, sizeof(phash));
00223 for (unsigned int j = 0; j < __INTERFACE_HASH_SIZE; ++j) {
00224 sprintf(&phash[j * 2], "%02x", hash[j]);
00225 }
00226 printf("%s::%s (%s), w:%i r:%u s:%u\n",
00227 (*i).type(), (*i).id(), phash, (*i).has_writer(),
00228 (*i).num_readers(), (*i).serial());
00229 }
00230 delete infl;
00231
00232
00233 TestInterface *ti_reader;
00234 TestInterface *ti_writer;
00235 try {
00236 cout << "Opening interfaces.. " << flush;
00237 ti_writer = rbb->open_for_writing<TestInterface>("SomeID");
00238 ti_reader = rbb->open_for_reading<TestInterface>("SomeID");
00239 cout << "success, "
00240 << "writer hash=" << ti_writer->hash_printable()
00241 << " reader hash=" << ti_reader->hash_printable()
00242 << endl;
00243 } catch (Exception &e) {
00244 cout << "failed! Aborting" << endl;
00245 e.print_trace();
00246 exit(1);
00247 }
00248
00249 try {
00250 cout << "Trying to open second writer.. " << flush;
00251 TestInterface *ti_writer_two;
00252 ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID");
00253 cout << "BUG: Detection of second writer did NOT work!" << endl;
00254 exit(2);
00255 } catch (BlackBoardWriterActiveException &e) {
00256 cout << "exception caught as expected, detected and prevented second writer!" << endl;
00257 }
00258
00259 try {
00260 cout << "Trying to open third writer.. " << flush;
00261 TestInterface *ti_writer_three;
00262 ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID");
00263 cout << "No exception as expected, different ID ok!" << endl;
00264 rbb->close(ti_writer_three);
00265 } catch (BlackBoardWriterActiveException &e) {
00266 cout << "BUG: Third writer with different ID detected as another writer!" << endl;
00267 exit(3);
00268 }
00269
00270 cout << endl << endl
00271 << "Running data tests ==================================================" << endl;
00272
00273 cout << "Writing initial value ("
00274 << TestInterface::TEST_CONSTANT << ") into interface as TestInt" << endl;
00275 ti_writer->set_test_int( TestInterface::TEST_CONSTANT );
00276 try {
00277 ti_writer->write();
00278 } catch (InterfaceWriteDeniedException &e) {
00279 cout << "BUG: caught write denied exception" << endl;
00280 e.print_trace();
00281 }
00282
00283 cout << "Giving some time to have value processed" << endl;
00284 usleep(100000);
00285
00286 cout << "Reading value from reader interface.. " << flush;
00287 ti_reader->read();
00288 int val = ti_reader->test_int();
00289 if ( val == TestInterface::TEST_CONSTANT ) {
00290 cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
00291 } else {
00292 cout << " failure, value is " << ti_reader->test_int() << ", expected "
00293 << TestInterface::TEST_CONSTANT << endl;
00294 }
00295
00296 cout << "Closing interfaces.. " << flush;
00297 try {
00298 rbb->close(ti_reader);
00299 rbb->close(ti_writer);
00300 cout << "done" << endl;
00301 } catch (Exception &e) {
00302 cout << "failed" << endl;
00303 e.print_trace();
00304 }
00305
00306 cout << endl << endl << "Starting MESSAGING tests" << endl
00307 << "Press Ctrl-C to continue with next test" << endl << endl;
00308
00309 ti_writer = lbb->open_for_writing<TestInterface>("Messaging");
00310 ti_reader = rbb->open_for_reading<TestInterface>("Messaging");
00311
00312 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00313 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00314
00315 test_messaging(ti_reader, ti_writer);
00316
00317 rbb->close(ti_reader);
00318 lbb->close(ti_writer);
00319
00320 cout << endl << endl << "Starting MESSAGING tests, doing repeater scenario" << endl
00321 << "Press Ctrl-C to continue with next test" << endl << endl;
00322 quit = false;
00323
00324 delete rbb;
00325
00326 LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
00327
00328 FawkesNetworkServerThread *repfns = new FawkesNetworkServerThread(1911);
00329 repfns->start();
00330
00331 repllbb->start_nethandler(repfns);
00332
00333 BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911);
00334 rbb = new RemoteBlackBoard("localhost", 1911);
00335
00336 TestInterface *rep_reader;
00337 TestInterface *rep_writer;
00338
00339 ti_writer = rbb->open_for_writing<TestInterface>("Messaging");
00340 ti_reader = lbb->open_for_reading<TestInterface>("Messaging");
00341
00342 rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging");
00343 rep_writer = lbb->open_for_writing<TestInterface>("Messaging");
00344
00345 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
00346 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
00347
00348 SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb);
00349
00350 test_messaging(ti_reader, ti_writer);
00351
00352 delete sil;
00353 lbb->close(ti_reader);
00354 rbb->close(ti_writer);
00355 rep_rbb->close(rep_reader);
00356 lbb->close(rep_writer);
00357 delete repllbb;
00358 delete rep_rbb;
00359
00360 cout << "Tests done" << endl;
00361
00362 delete rbb;
00363 delete llbb;
00364 delete fns;
00365 }
00366
00367
00368