fuse_client.cpp

00001
00002 /***************************************************************************
00003  *  fuse_client.cpp - FUSE network transport client
00004  *
00005  *  Created: Thu Mar 29 00:47:24 2007
00006  *  Copyright  2005-2007  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023
00024 #include <fvutils/net/fuse_client.h>
00025
00026 #include <fvutils/net/fuse_transceiver.h>
00027 #include <fvutils/net/fuse_message_queue.h>
00028 #include <fvutils/net/fuse_message.h>
00029 #include <fvutils/net/fuse_client_handler.h>
00030
00031 #include <core/threading/mutex.h>
00032 #include <core/threading/wait_condition.h>
00033 #include <core/exceptions/software.h>
00034 #include <netcomm/socket/stream.h>
00035 #include <netcomm/utils/exceptions.h>
00036
00037 #include <cstring>
00038 #include <netinet/in.h>
00039 #include <cstdlib>
00040 #include <unistd.h>
00041
00042 using namespace fawkes;
00043 
00044 /** @class FuseClient <fvutils/net/fuse_client.h>
00045  * FUSE client.
00046  * FUSE is the FireVision protocol to retrieve information, images and lookup
00047  * tables from vision processes and to send control commands to these systems.
00048  * The client is used in the retrieving or controlling process.
00049  * @ingroup FUSE
00050  * @ingroup FireVision
00051  * @author Tim Niemueller
00052  */
00053 
00054 /** Constructor.
00055  * @param hostname host to connect to
00056  * @param port port to connect to
00057  * @param handler client handler to handle incoming data
00058  */
00059 FuseClient::FuseClient(const char *hostname, unsigned short int port,
00060                        FuseClientHandler *handler)
00061   : Thread("FuseClient")
00062 {
00063   __hostname = strdup(hostname);
00064   __port = port;
00065   __handler = handler;
00066
00067   __wait_timeout = 10;
00068
00069   __inbound_msgq = new FuseNetworkMessageQueue();
00070   __outbound_msgq = new FuseNetworkMessageQueue();
00071
00072   __mutex = new Mutex();
00073   __recv_mutex    = new Mutex();
00074   __recv_waitcond = new WaitCondition(__recv_mutex);
00075   __socket = new StreamSocket();
00076   __greeting_mutex    = new Mutex();
00077   __greeting_waitcond = new WaitCondition(__greeting_mutex);
00078
00079   __alive = true;
00080   __greeting_received = false;
00081 }
00082
00083 
00084 /** Destructor. */
00085 FuseClient::~FuseClient()
00086 {
00087   free(__hostname);
00088
00089   while ( ! __inbound_msgq->empty() ) {
00090     FuseNetworkMessage *m = __inbound_msgq->front();
00091     m->unref();
00092     __inbound_msgq->pop();
00093   }
00094   delete __inbound_msgq;
00095
00096   while ( ! __outbound_msgq->empty() ) {
00097     FuseNetworkMessage *m = __outbound_msgq->front();
00098     m->unref();
00099     __outbound_msgq->pop();
00100   }
00101   delete __outbound_msgq;
00102
00103   delete __mutex;
00104   delete __recv_mutex;
00105   delete __recv_waitcond;
00106   delete __socket;
00107   delete __greeting_mutex;
00108   delete __greeting_waitcond;
00109 }
00110
00111 
00112 /** Connect. */
00113 void
00114 FuseClient::connect()
00115 {
00116   __socket->connect(__hostname, __port);
00117
00118   FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00119   greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00120   __outbound_msgq->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00121                                                greetmsg, sizeof(FUSE_greeting_message_t)));
00122 }
00123
00124 
00125 /** Disconnect. */
00126 void
00127 FuseClient::disconnect()
00128 {
00129   __mutex->lock();
00130   delete __socket;
00131   __socket = new StreamSocket();
00132   __alive = false;
00133   __mutex->unlock();
00134 }
00135
00136 
00137 /** Send queued messages. */
00138 void
00139 FuseClient::send()
00140 {
00141   try {
00142     FuseNetworkTransceiver::send(__socket, __outbound_msgq);
00143   } catch (ConnectionDiedException &e) {
00144     e.print_trace();
00145     __socket->close();
00146     __alive = false;
00147     __handler->fuse_connection_died();
00148     __recv_waitcond->wake_all();
00149   }
00150 }
00151
00152 
00153 /** Receive messages. */
00154 void
00155 FuseClient::recv()
00156 {
00157   __recv_mutex->lock();
00158   try {
00159     while ( __socket->available() ) {
00160       FuseNetworkTransceiver::recv(__socket, __inbound_msgq);
00161     }
00162   } catch (ConnectionDiedException &e) {
00163     e.print_trace();
00164     __socket->close();
00165     __alive = false;
00166     __handler->fuse_connection_died();
00167     __recv_waitcond->wake_all();
00168   }
00169   __recv_mutex->unlock();
00170 }
00171
00172 
00173 /** Enqueue message.
00174  * @param m message to enqueue
00175  */
00176 void
00177 FuseClient::enqueue(FuseNetworkMessage *m)
00178 {
00179   m->ref();
00180   __outbound_msgq->push_locked(m);
00181 }
00182
00183 
00184 /** Enqueue message.
00185  * @param type type of message
00186  * @param payload payload of message
00187  * @param payload_size size of payload
00188  */
00189 void
00190 FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size)
00191 {
00192   FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00193   __outbound_msgq->push_locked(m);
00194 }
00195
00196 
00197 /** Enqueue message without payload.
00198  * @param type type of message
00199  */
00200 void
00201 FuseClient::enqueue(FUSE_message_type_t type)
00202 {
00203   FuseNetworkMessage *m = new FuseNetworkMessage(type);
00204   __outbound_msgq->push_locked(m);
00205 }
00206
00207 
00208 /** Enqueue message and wait for reply.
00209  * The wait happens atomically, use this to avoid race conditions.
00210  * @param m message to enqueue
00211  */
00212 void
00213 FuseClient::enqueue_and_wait(FuseNetworkMessage *m)
00214 {
00215   __recv_mutex->lock();
00216   m->ref();
00217   __outbound_msgq->push_locked(m);
00218   __recv_waitcond->wait();
00219   __recv_mutex->unlock();
00220 }
00221
00222 
00223 /** Enqueue message and wait for reply.
00224  * The wait happens atomically, use this to avoid race conditions.
00225  * @param type type of message
00226  * @param payload payload of message
00227  * @param payload_size size of payload
00228  */
00229 void
00230 FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size)
00231 {
00232   FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00233   __recv_mutex->lock();
00234   __outbound_msgq->push_locked(m);
00235   __recv_waitcond->wait();
00236   __recv_mutex->unlock();
00237 }
00238
00239 
00240 /** Enqueue message without payload and wait for reply.
00241  * The wait happens atomically, use this to avoid race conditions.
00242  * @param type type of message
00243  */
00244 void
00245 FuseClient::enqueue_and_wait(FUSE_message_type_t type)
00246 {
00247   FuseNetworkMessage *m = new FuseNetworkMessage(type);
00248   __recv_mutex->lock();
00249   __outbound_msgq->push_locked(m);
00250   __recv_waitcond->wait();
00251   __recv_mutex->unlock();
00252 }
00253
00254
00255 
00256 /** Sleep for some time.
00257  * Wait until inbound messages have been receive, the connection dies or the
00258  * timeout has been reached, whatever comes first. So you sleep at most timeout ms,
00259  * but short under some circumstances (incoming data or lost connection).
00260  */
00261 void
00262 FuseClient::sleep()
00263 {
00264   try {
00265     __socket->poll(__wait_timeout /* ms timeout */, Socket::POLL_IN);
00266   } catch (Exception &e) {
00267   }
00268 }
00269
00270 
00271 /** Thread loop.
00272  * Sends enqueued messages and reads incoming messages off the network.
00273  */
00274 void
00275 FuseClient::loop()
00276 {
00277   __mutex->lock();
00278
00279   if ( ! __alive ) {
00280     __mutex->unlock();
00281     usleep(10000);
00282     return;
00283   }
00284
00285   bool wake = false;
00286
00287   send();
00288   sleep();
00289   recv();
00290
00291   //process_inbound();
00292
00293   __inbound_msgq->lock();
00294   while ( ! __inbound_msgq->empty() ) {
00295     FuseNetworkMessage *m = __inbound_msgq->front();
00296
00297     wake = true;
00298
00299     if ( m->type() == FUSE_MT_GREETING ) {
00300       FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00301       if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00302         __handler->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version));
00303         __alive = false;
00304       } else {
00305         __greeting_mutex->lock();
00306         __greeting_received = true;
00307         __greeting_waitcond->wake_all();
00308         __greeting_mutex->unlock();
00309         __handler->fuse_connection_established();
00310       }
00311     } else {
00312       __handler->fuse_inbound_received(m);
00313     }
00314
00315     m->unref();
00316     __inbound_msgq->pop();
00317   }
00318   __inbound_msgq->unlock();
00319
00320   if ( wake ) {
00321     __recv_waitcond->wake_all();
00322   }
00323   __mutex->unlock();
00324 }
00325
00326 
00327 /** Wait for messages.
00328  * This will wait for messages to arrive. The calling
00329  * thread is blocked until messages are available.
00330  */
00331 void
00332 FuseClient::wait()
00333 {
00334   __recv_mutex->lock();
00335   __recv_waitcond->wait();
00336   __recv_mutex->unlock();
00337 }
00338
00339 
00340 /** Wait for greeting message.
00341  * This method will wait for the greeting message to arrive. Make sure that you called
00342  * connect() before waiting or call it concurrently in another thread. The calling thread
00343  * will be blocked until the message has been received. If the message has already been
00344  * received this method will return immediately. Thus it is safe to call this at any time
00345  * without risking a race condition.
00346  */
00347 void
00348 FuseClient::wait_greeting()
00349 {
00350   __greeting_mutex->lock();
00351   while (! __greeting_received) {
00352     __greeting_waitcond->wait();
00353   }
00354   __greeting_mutex->unlock();
00355 }