fuse_server_client_thread.cpp

00001
00002 /***************************************************************************
00003  *  fuse_server_client_thread.cpp - client thread for FuseServer
00004  *
00005  *  Created: Tue Nov 13 20:00:55 2007
00006  *  Copyright  2005-2007  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023
00024 #include <fvutils/net/fuse_server_client_thread.h>
00025
00026 #include <fvutils/net/fuse_server.h>
00027 #include <fvutils/net/fuse_server.h>
00028 #include <fvutils/net/fuse_transceiver.h>
00029 #include <fvutils/net/fuse_message_queue.h>
00030 #include <fvutils/net/fuse_image_content.h>
00031 #include <fvutils/net/fuse_lut_content.h>
00032 #include <fvutils/net/fuse_imagelist_content.h>
00033 #include <fvutils/net/fuse_lutlist_content.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <fvutils/compression/jpeg_compressor.h>
00037
00038 #include <core/exceptions/system.h>
00039 #include <netcomm/socket/stream.h>
00040 #include <netcomm/utils/exceptions.h>
00041 #include <utils/logging/liblogger.h>
00042
00043 #include <netinet/in.h>
00044 #include <cstring>
00045 #include <cstdlib>
00046
00047 using namespace fawkes;
00048 
00049 /** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h>
00050  * FUSE Server Client Thread.
00051  * This thread is instantiated and started for each client that connects to a
00052  * FuseServer.
00053  * @ingroup FUSE
00054  * @ingroup FireVision
00055  * @author Tim Niemueller
00056  */
00057 
00058 /** Constructor.
00059  * @param fuse_server parent FUSE server
00060  * @param s socket to client
00061  */
00062 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s)
00063   : Thread("FuseServerClientThread")
00064 {
00065   __fuse_server = fuse_server;
00066   __socket = s;
00067   __jpeg_compressor = NULL;
00068
00069   __inbound_queue  = new FuseNetworkMessageQueue();
00070   __outbound_queue  = new FuseNetworkMessageQueue();
00071
00072   FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00073   greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00074   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00075                                                 greetmsg, sizeof(FUSE_greeting_message_t)));
00076
00077   __alive = true;
00078 }
00079
00080 
00081 /** Destructor. */
00082 FuseServerClientThread::~FuseServerClientThread()
00083 {
00084   delete __socket;
00085   delete __jpeg_compressor;
00086
00087   for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) {
00088     delete __bit->second;
00089   }
00090   __buffers.clear();
00091
00092   for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) {
00093     delete __lit->second;
00094   }
00095   __luts.clear();
00096
00097   while ( ! __inbound_queue->empty() ) {
00098     FuseNetworkMessage *m = __inbound_queue->front();
00099     m->unref();
00100     __inbound_queue->pop();
00101   }
00102
00103   while ( ! __outbound_queue->empty() ) {
00104     FuseNetworkMessage *m = __outbound_queue->front();
00105     m->unref();
00106     __outbound_queue->pop();
00107   }
00108
00109   delete __inbound_queue;
00110   delete __outbound_queue;
00111 }
00112
00113 
00114 /** Send all messages in outbound queue. */
00115 void
00116 FuseServerClientThread::send()
00117 {
00118   if ( ! __outbound_queue->empty() ) {
00119     try {
00120       FuseNetworkTransceiver::send(__socket, __outbound_queue);
00121     } catch (Exception &e) {
00122       __fuse_server->connection_died(this);
00123       __alive = false;
00124     }
00125   }
00126 }
00127
00128 
00129 /** Receive data.
00130  * Receives data from the network if there is any and then processes all
00131  * inbound messages.
00132  */
00133 void
00134 FuseServerClientThread::recv()
00135 {
00136   try {
00137     FuseNetworkTransceiver::recv(__socket, __inbound_queue);
00138   } catch (ConnectionDiedException &e) {
00139     __socket->close();
00140     __fuse_server->connection_died(this);
00141     __alive = false;
00142   }
00143 }
00144
00145 
00146 /** Process greeting message.
00147  * @param m received message
00148  */
00149 void
00150 FuseServerClientThread::process_greeting_message(FuseNetworkMessage *m)
00151 {
00152   FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00153   if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00154     throw Exception("Invalid version on other side");
00155   }
00156 }
00157
00158
00159 SharedMemoryImageBuffer *
00160 FuseServerClientThread::get_shmimgbuf(const char *id)
00161 {
00162   char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
00163   tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
00164   strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
00165
00166   if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) {
00167     // the buffer has not yet been opened
00168     try {
00169       SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id);
00170       __buffers[tmp_image_id] = b;
00171       return b;
00172     } catch (Exception &e) {
00173       throw;
00174     }
00175   } else {
00176     return __bit->second;
00177   }
00178 }
00179
00180 
00181 /** Process image request message.
00182  * @param m received message
00183  */
00184 void
00185 FuseServerClientThread::process_getimage_message(FuseNetworkMessage *m)
00186 {
00187   FUSE_imagereq_message_t *irm = m->msg<FUSE_imagereq_message_t>();
00188
00189   SharedMemoryImageBuffer *b;
00190   try {
00191     b = get_shmimgbuf(irm->image_id);
00192   } catch (Exception &e) {
00193     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00194                                                     m->payload(), m->payload_size(),
00195                                                     /* copy payload */ true);
00196     __outbound_queue->push(nm);
00197     return;
00198   }
00199
00200   if ( irm->format == FUSE_IF_RAW ) {
00201     FuseImageContent *im = new FuseImageContent(b);
00202     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00203   } else if ( irm->format == FUSE_IF_JPEG ) {
00204     if ( ! __jpeg_compressor) {
00205       __jpeg_compressor = new JpegImageCompressor();
00206       __jpeg_compressor->set_compression_destination(ImageCompressor::COMP_DEST_MEM);
00207     }
00208     b->lock_for_read();
00209     __jpeg_compressor->set_image_dimensions(b->width(), b->height());
00210     __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer());
00211     unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size());
00212     __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size());
00213     __jpeg_compressor->compress();
00214     b->unlock();
00215     size_t compressed_buffer_size = __jpeg_compressor->compressed_size();
00216     long int sec = 0, usec = 0;
00217     b->capture_time(&sec, &usec);
00218     FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(),
00219                                                 compressed_buffer, compressed_buffer_size,
00220                                                 CS_UNKNOWN, b->width(), b->height(),
00221                                                 sec, usec);
00222     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00223     free(compressed_buffer);
00224   } else {
00225     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00226                                                     m->payload(), m->payload_size(),
00227                                                     /* copy payload */ true);
00228     __outbound_queue->push(nm);
00229   }
00230 }
00231 
00232 /** Process image info request message.
00233  * @param m received message
00234  */
00235 void
00236 FuseServerClientThread::process_getimageinfo_message(FuseNetworkMessage *m)
00237 {
00238   FUSE_imagedesc_message_t *idm = m->msg<FUSE_imagedesc_message_t>();
00239
00240   SharedMemoryImageBuffer *b;
00241   try {
00242     b = get_shmimgbuf(idm->image_id);
00243
00244     FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
00245
00246     strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH);
00247     ii->colorspace = htons(b->colorspace());
00248     ii->width = htonl(b->width());
00249     ii->height = htonl(b->height());
00250     ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
00251
00252     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO,
00253                                                     ii, sizeof(FUSE_imageinfo_t));
00254     __outbound_queue->push(nm);
00255   } catch (Exception &e) {
00256     FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00257                                                     m->payload(), m->payload_size(),
00258                                                     /* copy payload */ true);
00259     __outbound_queue->push(nm);
00260   }
00261 }
00262
00263 
00264 /** Process LUT request message.
00265  * @param m received message
00266  */
00267 void
00268 FuseServerClientThread::process_getlut_message(FuseNetworkMessage *m)
00269 {
00270   FUSE_lutdesc_message_t *idm = m->msg<FUSE_lutdesc_message_t>();
00271
00272   char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
00273   tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
00274   strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
00275
00276   if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) {
00277     // the buffer had already be opened
00278     FuseLutContent *lm = new FuseLutContent(__lit->second);
00279     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00280   } else {
00281     try {
00282       SharedMemoryLookupTable *b = new SharedMemoryLookupTable(tmp_lut_id);
00283       __luts[tmp_lut_id] = b;
00284       FuseLutContent *lm = new FuseLutContent(b);
00285       __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00286     } catch (Exception &e) {
00287       // could not open the shared memory segment for some reason, send failure
00288       FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
00289                                                       m->payload(), m->payload_size(),
00290                                                       /* copy payload */ true);
00291       __outbound_queue->push(nm);
00292     }
00293   }
00294 }
00295
00296 
00297 /** Process LUT setting.
00298  * @param m received message
00299  */
00300 void
00301 FuseServerClientThread::process_setlut_message(FuseNetworkMessage *m)
00302 {
00303   FuseLutContent *lc = m->msgc<FuseLutContent>();
00304   FUSE_lutdesc_message_t *reply = (FUSE_lutdesc_message_t *)malloc(sizeof(FUSE_lutdesc_message_t));
00305   strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH);
00306   // Currently we expect colormaps, so make sure we get sensible dimensions
00307
00308   SharedMemoryLookupTable *b;
00309   if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) {
00310     // the buffer had already been opened
00311     b = __lit->second;
00312   } else {
00313     try {
00314       b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false);
00315       __luts[lc->lut_id()] = b;
00316     } catch (Exception &e) {
00317       __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00318                                                     reply, sizeof(FUSE_lutdesc_message_t)));
00319       e.append("Cannot open shared memory lookup table %s", lc->lut_id());
00320       LibLogger::log_warn("FuseServerClientThread", e);
00321       delete lc;
00322       return;
00323     }
00324   }
00325
00326   if ( (b->width() != lc->width())   ||
00327        (b->height() != lc->height()) ||
00328        (b->depth() != lc->depth())   ||
00329        (b->bytes_per_cell() != lc->bytes_per_cell()) ) {
00330     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00331                                                   reply, sizeof(FUSE_lutdesc_message_t)));
00332     LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. "
00333                         "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
00334                         b->width(), b->height(), b->depth(), b->bytes_per_cell(),
00335                         lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell());
00336   } else {
00337     b->set(lc->buffer());
00338     __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED,
00339                                                   reply, sizeof(FUSE_lutdesc_message_t)));
00340   }
00341
00342   delete lc;
00343 }
00344
00345 
00346 /** Process image list request message.
00347  * @param m received message
00348  */
00349 void
00350 FuseServerClientThread::process_getimagelist_message(FuseNetworkMessage *m)
00351 {
00352   FuseImageListContent *ilm = new FuseImageListContent();
00353
00354   SharedMemoryImageBufferHeader *h      = new SharedMemoryImageBufferHeader();
00355   SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
00356   SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00357
00358   while ( i != endi ) {
00359     const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
00360     if ( ih ) {
00361       ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
00362     }
00363
00364     ++i;
00365   }
00366
00367   delete h;
00368
00369   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
00370 }
00371
00372 
00373 /** Process LUT list request message.
00374  * @param m received message
00375  */
00376 void
00377 FuseServerClientThread::process_getlutlist_message(FuseNetworkMessage *m)
00378 {
00379   FuseLutListContent *llm = new FuseLutListContent();
00380
00381   SharedMemoryLookupTableHeader *h = new SharedMemoryLookupTableHeader();
00382   SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
00383   SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00384
00385   while ( i != endi ) {
00386     const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
00387     if ( lh ) {
00388       llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
00389     }
00390
00391     ++i;
00392   }
00393
00394   delete h;
00395
00396   __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
00397 }
00398
00399 
00400 /** Process inbound messages. */
00401 void
00402 FuseServerClientThread::process_inbound()
00403 {
00404   __inbound_queue->lock();
00405   while ( ! __inbound_queue->empty() ) {
00406     FuseNetworkMessage *m = __inbound_queue->front();
00407
00408     try {
00409       switch (m->type()) {
00410       case FUSE_MT_GREETING:
00411         process_greeting_message(m);
00412         break;
00413       case FUSE_MT_GET_IMAGE:
00414         process_getimage_message(m);
00415         break;
00416       case FUSE_MT_GET_IMAGE_INFO:
00417         process_getimageinfo_message(m);
00418         break;
00419       case FUSE_MT_GET_IMAGE_LIST:
00420         process_getimagelist_message(m);
00421         break;
00422       case FUSE_MT_GET_LUT_LIST:
00423         process_getlutlist_message(m);
00424         break;
00425       case FUSE_MT_GET_LUT:
00426         process_getlut_message(m);
00427         break;
00428       case FUSE_MT_SET_LUT:
00429         process_setlut_message(m);
00430         break;
00431       default:
00432         throw Exception("Unknown message type received\n");
00433       }
00434     } catch (Exception &e) {
00435       e.append("FUSE protocol error");
00436       LibLogger::log_warn("FuseServerClientThread", e);
00437       __fuse_server->connection_died(this);
00438       __alive = false;
00439     }
00440
00441     m->unref();
00442     __inbound_queue->pop();
00443   }
00444   __inbound_queue->unlock();
00445 }
00446
00447
00448 void
00449 FuseServerClientThread::loop()
00450 {
00451   if ( ! __alive ) {
00452     usleep(10000);
00453     return;
00454   }
00455
00456   short p = 0;
00457   try {
00458     p = __socket->poll(10); // block for up to 10 ms
00459   } catch (InterruptedException &e) {
00460     // we just ignore this and try it again
00461     return;
00462   }
00463
00464   if ( (p & Socket::POLL_ERR) ||
00465        (p & Socket::POLL_HUP) ||
00466        (p & Socket::POLL_RDHUP)) {
00467     __fuse_server->connection_died(this);
00468     __alive = false;
00469   } else if ( p & Socket::POLL_IN ) {
00470     try {
00471       // Data can be read
00472       recv();
00473       process_inbound();
00474     }
00475     catch (...) {
00476       __fuse_server->connection_died(this);
00477       __alive = false;
00478     }
00479   }
00480
00481   if ( __alive ) {
00482     send();
00483   }
00484 }