base_thread.cpp

00001
00002 /***************************************************************************
00003  *  base_thread.cpp - FireVision Base Thread
00004  *
00005  *  Created: Tue May 29 16:41:50 2007
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU Library General Public License for more details.
00019  *
00020  *  Read the full text in the LICENSE.GPL file in the doc directory.
00021  */
00022
00023 #include "base_thread.h"
00024 #include "acquisition_thread.h"
00025 #include "aqt_vision_threads.h"
00026
00027 #include <core/threading/thread.h>
00028 #include <core/threading/mutex.h>
00029 #include <core/threading/mutex_locker.h>
00030 #include <core/threading/barrier.h>
00031 #include <utils/logging/logger.h>
00032
00033 #include <fvutils/system/camargp.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <cams/factory.h>
00037 #include <cams/cam_exceptions.h>
00038 #include <cams/control/factory.h>
00039 #include <core/exceptions/software.h>
00040
00041 #include <aspect/vision.h>
00042
00043 #include <algorithm>
00044 #include <unistd.h>
00045
00046 using namespace fawkes;
00047 
00048 /** @class FvBaseThread "base_thread.h"
00049  * FireVision base thread.
00050  * This implements the functionality of the FvBasePlugin.
00051  * @author Tim Niemueller
00052  */
00053 
00054 /** Constructor. */
00055 FvBaseThread::FvBaseThread()
00056   : Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP),
00057     BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR),
00058     VisionMasterAspect(this)
00059 {
00060   // default to 30 seconds
00061   __aqt_timeout = 30;
00062   __aqt_barrier = new Barrier(1);
00063 }
00064
00065 
00066 /** Destructor. */
00067 FvBaseThread::~FvBaseThread()
00068 {
00069   delete __aqt_barrier;
00070 }
00071
00072
00073 void
00074 FvBaseThread::init()
00075 {
00076   // wipe all previously existing FireVision shared memory segments
00077   // that are orphaned
00078   SharedMemoryImageBuffer::cleanup(/* use lister */ false);
00079   SharedMemoryLookupTable::cleanup(/* use lister */ false);
00080 }
00081
00082
00083 void
00084 FvBaseThread::finalize()
00085 {
00086   __aqts.lock();
00087   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00088     thread_collector->remove(__ait->second);
00089     delete __ait->second;
00090   }
00091   __aqts.clear();
00092   __aqts.unlock();
00093   __owned_controls.lock();
00094   LockList<CameraControl *>::iterator i;
00095   for (i = __owned_controls.begin(); i != __owned_controls.end(); ++i) {
00096     delete *i;
00097   }
00098   __owned_controls.clear();
00099   __owned_controls.unlock();
00100 }
00101
00102 
00103 /** Thread loop. */
00104 void
00105 FvBaseThread::loop()
00106 {
00107   __aqts.lock();
00108
00109   try {
00110     for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00111       __ait->second->set_vt_prepfin_hold(true);
00112     }
00113   } catch (Exception &e) {
00114     logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop");
00115     for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00116       __ait->second->set_vt_prepfin_hold(false);
00117     }
00118     __aqts.unlock();
00119     return;
00120   }
00121
00122   // Wakeup all cyclic acquisition threads and wait for them
00123   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00124     if ( __ait->second->aqtmode() == FvAcquisitionThread::AqtCyclic ) {
00125       //logger->log_debug(name(), "Waking Thread %s", __ait->second->name());
00126       __ait->second->wakeup(__aqt_barrier);
00127     }
00128   }
00129
00130   __aqt_barrier->wait();
00131
00132   // Check for aqt timeouts
00133   for (__ait = __aqts.begin(); __ait != __aqts.end();) {
00134     if ( __ait->second->vision_threads->empty() &&
00135          (__ait->second->vision_threads->empty_time() > __aqt_timeout) ) {
00136
00137       logger->log_info(name(), "Acquisition thread %s timed out, destroying",
00138                        __ait->second->name());
00139
00140
00141       thread_collector->remove(__ait->second);
00142       delete __ait->second;
00143       __aqts.erase(__ait++);
00144     } else {
00145       ++__ait;
00146     }
00147   }
00148
00149   __started_threads.lock();
00150   fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stit = __started_threads.begin();
00151   while (stit != __started_threads.end()) {
00152
00153     logger->log_info(name(), "Thread %s has been started, %zu",
00154                      stit->second->name(), __started_threads.size());
00155
00156     // if the thread is registered in that aqt mark it running
00157     stit->second->vision_threads->set_thread_running(stit->first);
00158
00159     if ( stit->second->vision_threads->has_cyclic_thread() ) {
00160       if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic ) {
00161         logger->log_info(name(), "Switching acquisition thread %s to cyclic mode",
00162                          stit->second->name());
00163
00164         stit->second->prepare_finalize();
00165         stit->second->cancel();
00166         stit->second->join();
00167         stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic);
00168         stit->second->start();
00169         stit->second->cancel_finalize();
00170       }
00171     } else if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
00172       logger->log_info(name(), "Switching acquisition thread %s to continuous mode",
00173                        stit->second->name());
00174       stit->second->prepare_finalize();
00175       stit->second->cancel();
00176       stit->second->join();
00177       stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
00178       stit->second->start();
00179       stit->second->cancel_finalize();
00180     }
00181
00182     // Make thread actually capture data
00183     stit->second->set_enabled(true);
00184
00185     fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stittmp = stit;
00186     ++stit;
00187     __started_threads.erase( stittmp );
00188   }
00189   __started_threads.unlock();
00190
00191   // Re-create barrier as necessary after _adding_ threads
00192   unsigned int num_cyclic_threads = 0;
00193   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00194     if ( __ait->second->vision_threads->has_cyclic_thread() ) {
00195       ++num_cyclic_threads;
00196     }
00197   }
00198   cond_recreate_barrier(num_cyclic_threads);
00199
00200   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00201     __ait->second->set_vt_prepfin_hold(false);
00202   }
00203
00204   __aqts.unlock();
00205 }
00206
00207 
00208 /** Get vision master.
00209  * @return vision master
00210  */
00211 VisionMaster *
00212 FvBaseThread::vision_master()
00213 {
00214   return this;
00215 }
00216
00217
00218 Camera *
00219 FvBaseThread::register_for_camera(const char *camera_string, Thread *thread,
00220                                   colorspace_t cspace)
00221 {
00222   Camera *c = NULL;
00223   __aqts.lock();
00224
00225   logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string);
00226
00227   VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread);
00228   if ( vision_thread == NULL ) {
00229     throw TypeMismatchException("Thread is not a vision thread");
00230   }
00231
00232   CameraArgumentParser *cap = new CameraArgumentParser(camera_string);
00233   try {
00234     std::string id = cap->cam_type() + "." + cap->cam_id();
00235     if ( __aqts.find(id) != __aqts.end() ) {
00236       // this camera has already been loaded
00237       c = __aqts[id]->camera_instance(cspace,
00238                                     (vision_thread->vision_thread_mode() ==
00239                                      VisionAspect::CONTINUOUS));
00240
00241       __aqts[id]->vision_threads->add_waiting_thread(thread);
00242
00243     } else {
00244       Camera *cam = NULL;
00245       try {
00246         cam = CameraFactory::instance(cap);
00247         cam->open();
00248         cam->start();
00249       } catch (Exception &e) {
00250         delete cam;
00251         e.append("Could not open or start camera");
00252         throw;
00253       }
00254
00255       FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock);
00256
00257       c = aqt->camera_instance(cspace, (vision_thread->vision_thread_mode() ==
00258                                         VisionAspect::CONTINUOUS));
00259
00260       aqt->vision_threads->add_waiting_thread(thread);
00261
00262       __aqts[id] = aqt;
00263       thread_collector->add(aqt);
00264
00265       // no need to recreate barrier, by default aqts operate in continuous mode
00266
00267       logger->log_info(name(), "Acquisition thread '%s' started for thread '%s' and camera '%s'",
00268                        aqt->name(), thread->name(), id.c_str());
00269
00270     }
00271
00272     thread->add_notification_listener(this);
00273
00274   } catch (UnknownCameraTypeException &e) {
00275     delete cap;
00276     e.append("FvBaseVisionMaster: could not instantiate camera");
00277     __aqts.unlock();
00278     throw;
00279   } catch (Exception &e) {
00280     delete cap;
00281     e.append("FvBaseVisionMaster: could not open or start camera");
00282     __aqts.unlock();
00283     throw;
00284   }
00285
00286   delete cap;
00287
00288   __aqts.unlock();
00289   return c;
00290 }
00291
00292
00293 Camera *
00294 FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread)
00295 {
00296   Camera *camera = register_for_camera(camera_string, thread, CS_UNKNOWN);
00297   CameraArgumentParser cap(camera_string);
00298   try {
00299     std::string id = cap.cam_type() + "." + cap.cam_id();
00300     __aqts.lock();
00301     if ( __aqts.find(id) != __aqts.end() ) {
00302       __aqts[id]->raw_subscriber_thread = thread;
00303     }
00304     __aqts.unlock();
00305   } catch (Exception &e) {
00306     __aqts.unlock();
00307     throw;
00308   }
00309   return camera;
00310 }
00311
00312 CameraControl *
00313 FvBaseThread::create_camctrl(const char *camera_string)
00314 {
00315   CameraControl *cc = CameraControlFactory::instance(camera_string);
00316   if (cc) {
00317     __owned_controls.lock();
00318     __owned_controls.push_back(cc);
00319     __owned_controls.sort();
00320     __owned_controls.unique();
00321     __owned_controls.unlock();
00322     return cc;
00323   } else {
00324     throw Exception("Cannot create camera control of desired type");
00325   }
00326 }
00327
00328 CameraControl *
00329 FvBaseThread::acquire_camctrl(const char *cam_string)
00330 {
00331   CameraArgumentParser cap(cam_string);
00332   std::string id = cap.cam_type() + "." + cap.cam_id();
00333
00334   // Has this camera been loaded?
00335   MutexLocker lock(__aqts.mutex());
00336   if (__aqts.find(id) != __aqts.end()) {
00337     return CameraControlFactory::instance(__aqts[id]->get_camera());
00338   } else {
00339     return create_camctrl(cam_string);
00340   }
00341 }
00342
00343
00344 CameraControl *
00345 FvBaseThread::acquire_camctrl(const char *cam_string,
00346                               const std::type_info &typeinf)
00347 {
00348   CameraArgumentParser cap(cam_string);
00349   std::string id = cap.cam_type() + "." + cap.cam_id();
00350
00351   // Has this camera been loaded?
00352   MutexLocker lock(__aqts.mutex());
00353   if (__aqts.find(id) != __aqts.end()) {
00354     return CameraControlFactory::instance(typeinf, __aqts[id]->get_camera());
00355   } else {
00356     return create_camctrl(cam_string);
00357   }
00358 }
00359
00360
00361 void
00362 FvBaseThread::release_camctrl(CameraControl *cc)
00363 {
00364   __owned_controls.lock();
00365   LockList<CameraControl *>::iterator f;
00366   if ((f = std::find(__owned_controls.begin(), __owned_controls.end(), cc)) != __owned_controls.end()) {
00367     delete *f;
00368     __owned_controls.erase(f);
00369   }
00370   __owned_controls.unlock();
00371 }
00372
00373 
00374 /** Conditionally re-create barriers.
00375  * Re-create barriers if the number of cyclic threads has changed.
00376  * @param num_cyclic_threads new number of cyclic threads
00377  */
00378 void
00379 FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads)
00380 {
00381   if ( (num_cyclic_threads + 1) != __aqt_barrier->count() ) {
00382     delete __aqt_barrier;
00383     __aqt_barrier = new Barrier( num_cyclic_threads + 1 ); // +1 for base thread
00384   }
00385 }
00386
00387
00388 void
00389 FvBaseThread::unregister_thread(Thread *thread)
00390 {
00391   __aqts.lock();
00392   unsigned int num_cyclic_threads = 0;
00393
00394   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00395
00396     // Remove thread from all aqts
00397     __ait->second->vision_threads->remove_thread(thread);
00398
00399     if (__ait->second->raw_subscriber_thread == thread) {
00400       __ait->second->raw_subscriber_thread = NULL;
00401     }
00402
00403     if ( __ait->second->vision_threads->has_cyclic_thread() ) {
00404       ++num_cyclic_threads;
00405
00406     } else if (__ait->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
00407       logger->log_info(name(), "Switching acquisition thread %s to continuous mode "
00408                                "on unregister", __ait->second->name());
00409
00410       __ait->second->prepare_finalize();
00411       __ait->second->cancel();
00412       __ait->second->join();
00413       __ait->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
00414       __ait->second->start();
00415       __ait->second->cancel_finalize();
00416     }
00417   }
00418   // Recreate as necessary after _removing_ threads
00419   cond_recreate_barrier(num_cyclic_threads);
00420
00421   __aqts.unlock();
00422 }
00423
00424
00425 bool
00426 FvBaseThread::thread_started(Thread *thread) throw()
00427 {
00428   __aqts.lock();
00429   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00430     if (__ait->second->vision_threads->has_waiting_thread(thread)) {
00431       __started_threads.lock();
00432       __started_threads[thread] = __ait->second;
00433       __started_threads.unlock();
00434     }
00435   }
00436   __aqts.unlock();
00437
00438   return false;
00439 }
00440
00441
00442 bool
00443 FvBaseThread::thread_init_failed(Thread *thread) throw()
00444 {
00445   __aqts.lock();
00446   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00447     __ait->second->vision_threads->remove_waiting_thread(thread);
00448   }
00449   __aqts.unlock();
00450
00451   return false;
00452 }