fuse_client.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <fvutils/net/fuse_client.h>
00025
00026 #include <fvutils/net/fuse_transceiver.h>
00027 #include <fvutils/net/fuse_message_queue.h>
00028 #include <fvutils/net/fuse_message.h>
00029 #include <fvutils/net/fuse_client_handler.h>
00030
00031 #include <core/threading/mutex.h>
00032 #include <core/threading/wait_condition.h>
00033 #include <core/exceptions/software.h>
00034 #include <netcomm/socket/stream.h>
00035 #include <netcomm/utils/exceptions.h>
00036
00037 #include <cstring>
00038 #include <netinet/in.h>
00039 #include <cstdlib>
00040 #include <unistd.h>
00041
00042 using namespace fawkes;
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059 FuseClient::FuseClient(const char *hostname, unsigned short int port,
00060 FuseClientHandler *handler)
00061 : Thread("FuseClient")
00062 {
00063 __hostname = strdup(hostname);
00064 __port = port;
00065 __handler = handler;
00066
00067 __wait_timeout = 10;
00068
00069 __inbound_msgq = new FuseNetworkMessageQueue();
00070 __outbound_msgq = new FuseNetworkMessageQueue();
00071
00072 __mutex = new Mutex();
00073 __recv_mutex = new Mutex();
00074 __recv_waitcond = new WaitCondition(__recv_mutex);
00075 __socket = new StreamSocket();
00076 __greeting_mutex = new Mutex();
00077 __greeting_waitcond = new WaitCondition(__greeting_mutex);
00078
00079 __alive = true;
00080 __greeting_received = false;
00081 }
00082
00083
00084
00085 FuseClient::~FuseClient()
00086 {
00087 free(__hostname);
00088
00089 while ( ! __inbound_msgq->empty() ) {
00090 FuseNetworkMessage *m = __inbound_msgq->front();
00091 m->unref();
00092 __inbound_msgq->pop();
00093 }
00094 delete __inbound_msgq;
00095
00096 while ( ! __outbound_msgq->empty() ) {
00097 FuseNetworkMessage *m = __outbound_msgq->front();
00098 m->unref();
00099 __outbound_msgq->pop();
00100 }
00101 delete __outbound_msgq;
00102
00103 delete __mutex;
00104 delete __recv_mutex;
00105 delete __recv_waitcond;
00106 delete __socket;
00107 delete __greeting_mutex;
00108 delete __greeting_waitcond;
00109 }
00110
00111
00112
00113 void
00114 FuseClient::connect()
00115 {
00116 __socket->connect(__hostname, __port);
00117
00118 FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00119 greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00120 __outbound_msgq->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00121 greetmsg, sizeof(FUSE_greeting_message_t)));
00122 }
00123
00124
00125
00126 void
00127 FuseClient::disconnect()
00128 {
00129 __mutex->lock();
00130 delete __socket;
00131 __socket = new StreamSocket();
00132 __alive = false;
00133 __mutex->unlock();
00134 }
00135
00136
00137
00138 void
00139 FuseClient::send()
00140 {
00141 try {
00142 FuseNetworkTransceiver::send(__socket, __outbound_msgq);
00143 } catch (ConnectionDiedException &e) {
00144 e.print_trace();
00145 __socket->close();
00146 __alive = false;
00147 __handler->fuse_connection_died();
00148 __recv_waitcond->wake_all();
00149 }
00150 }
00151
00152
00153
00154 void
00155 FuseClient::recv()
00156 {
00157 __recv_mutex->lock();
00158 try {
00159 while ( __socket->available() ) {
00160 FuseNetworkTransceiver::recv(__socket, __inbound_msgq);
00161 }
00162 } catch (ConnectionDiedException &e) {
00163 e.print_trace();
00164 __socket->close();
00165 __alive = false;
00166 __handler->fuse_connection_died();
00167 __recv_waitcond->wake_all();
00168 }
00169 __recv_mutex->unlock();
00170 }
00171
00172
00173
00174
00175
00176 void
00177 FuseClient::enqueue(FuseNetworkMessage *m)
00178 {
00179 m->ref();
00180 __outbound_msgq->push_locked(m);
00181 }
00182
00183
00184
00185
00186
00187
00188
00189 void
00190 FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size)
00191 {
00192 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00193 __outbound_msgq->push_locked(m);
00194 }
00195
00196
00197
00198
00199
00200 void
00201 FuseClient::enqueue(FUSE_message_type_t type)
00202 {
00203 FuseNetworkMessage *m = new FuseNetworkMessage(type);
00204 __outbound_msgq->push_locked(m);
00205 }
00206
00207
00208
00209
00210
00211
00212 void
00213 FuseClient::enqueue_and_wait(FuseNetworkMessage *m)
00214 {
00215 __recv_mutex->lock();
00216 m->ref();
00217 __outbound_msgq->push_locked(m);
00218 __recv_waitcond->wait();
00219 __recv_mutex->unlock();
00220 }
00221
00222
00223
00224
00225
00226
00227
00228
00229 void
00230 FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size)
00231 {
00232 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00233 __recv_mutex->lock();
00234 __outbound_msgq->push_locked(m);
00235 __recv_waitcond->wait();
00236 __recv_mutex->unlock();
00237 }
00238
00239
00240
00241
00242
00243
00244 void
00245 FuseClient::enqueue_and_wait(FUSE_message_type_t type)
00246 {
00247 FuseNetworkMessage *m = new FuseNetworkMessage(type);
00248 __recv_mutex->lock();
00249 __outbound_msgq->push_locked(m);
00250 __recv_waitcond->wait();
00251 __recv_mutex->unlock();
00252 }
00253
00254
00255
00256
00257
00258
00259
00260
00261 void
00262 FuseClient::sleep()
00263 {
00264 try {
00265 __socket->poll(__wait_timeout , Socket::POLL_IN);
00266 } catch (Exception &e) {
00267 }
00268 }
00269
00270
00271
00272
00273
00274 void
00275 FuseClient::loop()
00276 {
00277 __mutex->lock();
00278
00279 if ( ! __alive ) {
00280 __mutex->unlock();
00281 usleep(10000);
00282 return;
00283 }
00284
00285 bool wake = false;
00286
00287 send();
00288 sleep();
00289 recv();
00290
00291
00292
00293 __inbound_msgq->lock();
00294 while ( ! __inbound_msgq->empty() ) {
00295 FuseNetworkMessage *m = __inbound_msgq->front();
00296
00297 wake = true;
00298
00299 if ( m->type() == FUSE_MT_GREETING ) {
00300 FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00301 if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00302 __handler->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version));
00303 __alive = false;
00304 } else {
00305 __greeting_mutex->lock();
00306 __greeting_received = true;
00307 __greeting_waitcond->wake_all();
00308 __greeting_mutex->unlock();
00309 __handler->fuse_connection_established();
00310 }
00311 } else {
00312 __handler->fuse_inbound_received(m);
00313 }
00314
00315 m->unref();
00316 __inbound_msgq->pop();
00317 }
00318 __inbound_msgq->unlock();
00319
00320 if ( wake ) {
00321 __recv_waitcond->wake_all();
00322 }
00323 __mutex->unlock();
00324 }
00325
00326
00327
00328
00329
00330
00331 void
00332 FuseClient::wait()
00333 {
00334 __recv_mutex->lock();
00335 __recv_waitcond->wait();
00336 __recv_mutex->unlock();
00337 }
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347 void
00348 FuseClient::wait_greeting()
00349 {
00350 __greeting_mutex->lock();
00351 while (! __greeting_received) {
00352 __greeting_waitcond->wait();
00353 }
00354 __greeting_mutex->unlock();
00355 }