00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <fvutils/net/fuse_server_client_thread.h>
00025
00026 #include <fvutils/net/fuse_server.h>
00027 #include <fvutils/net/fuse_server.h>
00028 #include <fvutils/net/fuse_transceiver.h>
00029 #include <fvutils/net/fuse_message_queue.h>
00030 #include <fvutils/net/fuse_image_content.h>
00031 #include <fvutils/net/fuse_lut_content.h>
00032 #include <fvutils/net/fuse_imagelist_content.h>
00033 #include <fvutils/net/fuse_lutlist_content.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <fvutils/compression/jpeg_compressor.h>
00037
00038 #include <core/exceptions/system.h>
00039 #include <netcomm/socket/stream.h>
00040 #include <netcomm/utils/exceptions.h>
00041 #include <utils/logging/liblogger.h>
00042
00043 #include <netinet/in.h>
00044 #include <cstring>
00045 #include <cstdlib>
00046
00047 using namespace fawkes;
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s)
00063 : Thread("FuseServerClientThread")
00064 {
00065 __fuse_server = fuse_server;
00066 __socket = s;
00067 __jpeg_compressor = NULL;
00068
00069 __inbound_queue = new FuseNetworkMessageQueue();
00070 __outbound_queue = new FuseNetworkMessageQueue();
00071
00072 FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00073 greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00074 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00075 greetmsg, sizeof(FUSE_greeting_message_t)));
00076
00077 __alive = true;
00078 }
00079
00080
00081
00082 FuseServerClientThread::~FuseServerClientThread()
00083 {
00084 delete __socket;
00085 delete __jpeg_compressor;
00086
00087 for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) {
00088 delete __bit->second;
00089 }
00090 __buffers.clear();
00091
00092 for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) {
00093 delete __lit->second;
00094 }
00095 __luts.clear();
00096
00097 while ( ! __inbound_queue->empty() ) {
00098 FuseNetworkMessage *m = __inbound_queue->front();
00099 m->unref();
00100 __inbound_queue->pop();
00101 }
00102
00103 while ( ! __outbound_queue->empty() ) {
00104 FuseNetworkMessage *m = __outbound_queue->front();
00105 m->unref();
00106 __outbound_queue->pop();
00107 }
00108
00109 delete __inbound_queue;
00110 delete __outbound_queue;
00111 }
00112
00113
00114
00115 void
00116 FuseServerClientThread::send()
00117 {
00118 if ( ! __outbound_queue->empty() ) {
00119 try {
00120 FuseNetworkTransceiver::send(__socket, __outbound_queue);
00121 } catch (Exception &e) {
00122 __fuse_server->connection_died(this);
00123 __alive = false;
00124 }
00125 }
00126 }
00127
00128
00129
00130
00131
00132
00133 void
00134 FuseServerClientThread::recv()
00135 {
00136 try {
00137 FuseNetworkTransceiver::recv(__socket, __inbound_queue);
00138 } catch (ConnectionDiedException &e) {
00139 __socket->close();
00140 __fuse_server->connection_died(this);
00141 __alive = false;
00142 }
00143 }
00144
00145
00146
00147
00148
00149 void
00150 FuseServerClientThread::process_greeting_message(FuseNetworkMessage *m)
00151 {
00152 FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00153 if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00154 throw Exception("Invalid version on other side");
00155 }
00156 }
00157
00158
00159 SharedMemoryImageBuffer *
00160 FuseServerClientThread::get_shmimgbuf(const char *id)
00161 {
00162 char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
00163 tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
00164 strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
00165
00166 if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) {
00167
00168 try {
00169 SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id);
00170 __buffers[tmp_image_id] = b;
00171 return b;
00172 } catch (Exception &e) {
00173 throw;
00174 }
00175 } else {
00176 return __bit->second;
00177 }
00178 }
00179
00180
00181
00182
00183
00184 void
00185 FuseServerClientThread::process_getimage_message(FuseNetworkMessage *m)
00186 {
00187 FUSE_imagereq_message_t *irm = m->msg<FUSE_imagereq_message_t>();
00188
00189 SharedMemoryImageBuffer *b;
00190 try {
00191 b = get_shmimgbuf(irm->image_id);
00192 } catch (Exception &e) {
00193 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00194 m->payload(), m->payload_size(),
00195 true);
00196 __outbound_queue->push(nm);
00197 return;
00198 }
00199
00200 if ( irm->format == FUSE_IF_RAW ) {
00201 FuseImageContent *im = new FuseImageContent(b);
00202 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00203 } else if ( irm->format == FUSE_IF_JPEG ) {
00204 if ( ! __jpeg_compressor) {
00205 __jpeg_compressor = new JpegImageCompressor();
00206 __jpeg_compressor->set_compression_destination(ImageCompressor::COMP_DEST_MEM);
00207 }
00208 b->lock_for_read();
00209 __jpeg_compressor->set_image_dimensions(b->width(), b->height());
00210 __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer());
00211 unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size());
00212 __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size());
00213 __jpeg_compressor->compress();
00214 b->unlock();
00215 size_t compressed_buffer_size = __jpeg_compressor->compressed_size();
00216 long int sec = 0, usec = 0;
00217 b->capture_time(&sec, &usec);
00218 FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(),
00219 compressed_buffer, compressed_buffer_size,
00220 CS_UNKNOWN, b->width(), b->height(),
00221 sec, usec);
00222 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
00223 free(compressed_buffer);
00224 } else {
00225 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00226 m->payload(), m->payload_size(),
00227 true);
00228 __outbound_queue->push(nm);
00229 }
00230 }
00231
00232
00233
00234
00235 void
00236 FuseServerClientThread::process_getimageinfo_message(FuseNetworkMessage *m)
00237 {
00238 FUSE_imagedesc_message_t *idm = m->msg<FUSE_imagedesc_message_t>();
00239
00240 SharedMemoryImageBuffer *b;
00241 try {
00242 b = get_shmimgbuf(idm->image_id);
00243
00244 FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
00245
00246 strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH);
00247 ii->colorspace = htons(b->colorspace());
00248 ii->width = htonl(b->width());
00249 ii->height = htonl(b->height());
00250 ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
00251
00252 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO,
00253 ii, sizeof(FUSE_imageinfo_t));
00254 __outbound_queue->push(nm);
00255 } catch (Exception &e) {
00256 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
00257 m->payload(), m->payload_size(),
00258 true);
00259 __outbound_queue->push(nm);
00260 }
00261 }
00262
00263
00264
00265
00266
00267 void
00268 FuseServerClientThread::process_getlut_message(FuseNetworkMessage *m)
00269 {
00270 FUSE_lutdesc_message_t *idm = m->msg<FUSE_lutdesc_message_t>();
00271
00272 char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
00273 tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
00274 strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
00275
00276 if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) {
00277
00278 FuseLutContent *lm = new FuseLutContent(__lit->second);
00279 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00280 } else {
00281 try {
00282 SharedMemoryLookupTable *b = new SharedMemoryLookupTable(tmp_lut_id);
00283 __luts[tmp_lut_id] = b;
00284 FuseLutContent *lm = new FuseLutContent(b);
00285 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
00286 } catch (Exception &e) {
00287
00288 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
00289 m->payload(), m->payload_size(),
00290 true);
00291 __outbound_queue->push(nm);
00292 }
00293 }
00294 }
00295
00296
00297
00298
00299
00300 void
00301 FuseServerClientThread::process_setlut_message(FuseNetworkMessage *m)
00302 {
00303 FuseLutContent *lc = m->msgc<FuseLutContent>();
00304 FUSE_lutdesc_message_t *reply = (FUSE_lutdesc_message_t *)malloc(sizeof(FUSE_lutdesc_message_t));
00305 strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH);
00306
00307
00308 SharedMemoryLookupTable *b;
00309 if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) {
00310
00311 b = __lit->second;
00312 } else {
00313 try {
00314 b = new SharedMemoryLookupTable(lc->lut_id(), false);
00315 __luts[lc->lut_id()] = b;
00316 } catch (Exception &e) {
00317 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00318 reply, sizeof(FUSE_lutdesc_message_t)));
00319 e.append("Cannot open shared memory lookup table %s", lc->lut_id());
00320 LibLogger::log_warn("FuseServerClientThread", e);
00321 delete lc;
00322 return;
00323 }
00324 }
00325
00326 if ( (b->width() != lc->width()) ||
00327 (b->height() != lc->height()) ||
00328 (b->depth() != lc->depth()) ||
00329 (b->bytes_per_cell() != lc->bytes_per_cell()) ) {
00330 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED,
00331 reply, sizeof(FUSE_lutdesc_message_t)));
00332 LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. "
00333 "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
00334 b->width(), b->height(), b->depth(), b->bytes_per_cell(),
00335 lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell());
00336 } else {
00337 b->set(lc->buffer());
00338 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED,
00339 reply, sizeof(FUSE_lutdesc_message_t)));
00340 }
00341
00342 delete lc;
00343 }
00344
00345
00346
00347
00348
00349 void
00350 FuseServerClientThread::process_getimagelist_message(FuseNetworkMessage *m)
00351 {
00352 FuseImageListContent *ilm = new FuseImageListContent();
00353
00354 SharedMemoryImageBufferHeader *h = new SharedMemoryImageBufferHeader();
00355 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
00356 SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00357
00358 while ( i != endi ) {
00359 const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
00360 if ( ih ) {
00361 ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
00362 }
00363
00364 ++i;
00365 }
00366
00367 delete h;
00368
00369 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
00370 }
00371
00372
00373
00374
00375
00376 void
00377 FuseServerClientThread::process_getlutlist_message(FuseNetworkMessage *m)
00378 {
00379 FuseLutListContent *llm = new FuseLutListContent();
00380
00381 SharedMemoryLookupTableHeader *h = new SharedMemoryLookupTableHeader();
00382 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
00383 SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
00384
00385 while ( i != endi ) {
00386 const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
00387 if ( lh ) {
00388 llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
00389 }
00390
00391 ++i;
00392 }
00393
00394 delete h;
00395
00396 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
00397 }
00398
00399
00400
00401 void
00402 FuseServerClientThread::process_inbound()
00403 {
00404 __inbound_queue->lock();
00405 while ( ! __inbound_queue->empty() ) {
00406 FuseNetworkMessage *m = __inbound_queue->front();
00407
00408 try {
00409 switch (m->type()) {
00410 case FUSE_MT_GREETING:
00411 process_greeting_message(m);
00412 break;
00413 case FUSE_MT_GET_IMAGE:
00414 process_getimage_message(m);
00415 break;
00416 case FUSE_MT_GET_IMAGE_INFO:
00417 process_getimageinfo_message(m);
00418 break;
00419 case FUSE_MT_GET_IMAGE_LIST:
00420 process_getimagelist_message(m);
00421 break;
00422 case FUSE_MT_GET_LUT_LIST:
00423 process_getlutlist_message(m);
00424 break;
00425 case FUSE_MT_GET_LUT:
00426 process_getlut_message(m);
00427 break;
00428 case FUSE_MT_SET_LUT:
00429 process_setlut_message(m);
00430 break;
00431 default:
00432 throw Exception("Unknown message type received\n");
00433 }
00434 } catch (Exception &e) {
00435 e.append("FUSE protocol error");
00436 LibLogger::log_warn("FuseServerClientThread", e);
00437 __fuse_server->connection_died(this);
00438 __alive = false;
00439 }
00440
00441 m->unref();
00442 __inbound_queue->pop();
00443 }
00444 __inbound_queue->unlock();
00445 }
00446
00447
00448 void
00449 FuseServerClientThread::loop()
00450 {
00451 if ( ! __alive ) {
00452 usleep(10000);
00453 return;
00454 }
00455
00456 short p = 0;
00457 try {
00458 p = __socket->poll(10);
00459 } catch (InterruptedException &e) {
00460
00461 return;
00462 }
00463
00464 if ( (p & Socket::POLL_ERR) ||
00465 (p & Socket::POLL_HUP) ||
00466 (p & Socket::POLL_RDHUP)) {
00467 __fuse_server->connection_died(this);
00468 __alive = false;
00469 } else if ( p & Socket::POLL_IN ) {
00470 try {
00471
00472 recv();
00473 process_inbound();
00474 }
00475 catch (...) {
00476 __fuse_server->connection_died(this);
00477 __alive = false;
00478 }
00479 }
00480
00481 if ( __alive ) {
00482 send();
00483 }
00484 }