base_thread.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "base_thread.h"
00024 #include "acquisition_thread.h"
00025 #include "aqt_vision_threads.h"
00026
00027 #include <core/threading/thread.h>
00028 #include <core/threading/mutex.h>
00029 #include <core/threading/mutex_locker.h>
00030 #include <core/threading/barrier.h>
00031 #include <utils/logging/logger.h>
00032
00033 #include <fvutils/system/camargp.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <cams/factory.h>
00037 #include <cams/cam_exceptions.h>
00038 #include <cams/control/factory.h>
00039 #include <core/exceptions/software.h>
00040
00041 #include <aspect/vision.h>
00042
00043 #include <algorithm>
00044 #include <unistd.h>
00045
00046 using namespace fawkes;
00047
00048
00049
00050
00051
00052
00053
00054
00055 FvBaseThread::FvBaseThread()
00056 : Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP),
00057 BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR),
00058 VisionMasterAspect(this)
00059 {
00060
00061 __aqt_timeout = 30;
00062 __aqt_barrier = new Barrier(1);
00063 }
00064
00065
00066
00067 FvBaseThread::~FvBaseThread()
00068 {
00069 delete __aqt_barrier;
00070 }
00071
00072
00073 void
00074 FvBaseThread::init()
00075 {
00076
00077
00078 SharedMemoryImageBuffer::cleanup( false);
00079 SharedMemoryLookupTable::cleanup( false);
00080 }
00081
00082
00083 void
00084 FvBaseThread::finalize()
00085 {
00086 __aqts.lock();
00087 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00088 thread_collector->remove(__ait->second);
00089 delete __ait->second;
00090 }
00091 __aqts.clear();
00092 __aqts.unlock();
00093 __owned_controls.lock();
00094 LockList<CameraControl *>::iterator i;
00095 for (i = __owned_controls.begin(); i != __owned_controls.end(); ++i) {
00096 delete *i;
00097 }
00098 __owned_controls.clear();
00099 __owned_controls.unlock();
00100 }
00101
00102
00103
00104 void
00105 FvBaseThread::loop()
00106 {
00107 __aqts.lock();
00108
00109 try {
00110 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00111 __ait->second->set_vt_prepfin_hold(true);
00112 }
00113 } catch (Exception &e) {
00114 logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop");
00115 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00116 __ait->second->set_vt_prepfin_hold(false);
00117 }
00118 __aqts.unlock();
00119 return;
00120 }
00121
00122
00123 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00124 if ( __ait->second->aqtmode() == FvAcquisitionThread::AqtCyclic ) {
00125
00126 __ait->second->wakeup(__aqt_barrier);
00127 }
00128 }
00129
00130 __aqt_barrier->wait();
00131
00132
00133 for (__ait = __aqts.begin(); __ait != __aqts.end();) {
00134 if ( __ait->second->vision_threads->empty() &&
00135 (__ait->second->vision_threads->empty_time() > __aqt_timeout) ) {
00136
00137 logger->log_info(name(), "Acquisition thread %s timed out, destroying",
00138 __ait->second->name());
00139
00140
00141 thread_collector->remove(__ait->second);
00142 delete __ait->second;
00143 __aqts.erase(__ait++);
00144 } else {
00145 ++__ait;
00146 }
00147 }
00148
00149 __started_threads.lock();
00150 fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stit = __started_threads.begin();
00151 while (stit != __started_threads.end()) {
00152
00153 logger->log_info(name(), "Thread %s has been started, %zu",
00154 stit->second->name(), __started_threads.size());
00155
00156
00157 stit->second->vision_threads->set_thread_running(stit->first);
00158
00159 if ( stit->second->vision_threads->has_cyclic_thread() ) {
00160 if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic ) {
00161 logger->log_info(name(), "Switching acquisition thread %s to cyclic mode",
00162 stit->second->name());
00163
00164 stit->second->prepare_finalize();
00165 stit->second->cancel();
00166 stit->second->join();
00167 stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic);
00168 stit->second->start();
00169 stit->second->cancel_finalize();
00170 }
00171 } else if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
00172 logger->log_info(name(), "Switching acquisition thread %s to continuous mode",
00173 stit->second->name());
00174 stit->second->prepare_finalize();
00175 stit->second->cancel();
00176 stit->second->join();
00177 stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
00178 stit->second->start();
00179 stit->second->cancel_finalize();
00180 }
00181
00182
00183 stit->second->set_enabled(true);
00184
00185 fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stittmp = stit;
00186 ++stit;
00187 __started_threads.erase( stittmp );
00188 }
00189 __started_threads.unlock();
00190
00191
00192 unsigned int num_cyclic_threads = 0;
00193 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00194 if ( __ait->second->vision_threads->has_cyclic_thread() ) {
00195 ++num_cyclic_threads;
00196 }
00197 }
00198 cond_recreate_barrier(num_cyclic_threads);
00199
00200 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00201 __ait->second->set_vt_prepfin_hold(false);
00202 }
00203
00204 __aqts.unlock();
00205 }
00206
00207
00208
00209
00210
00211 VisionMaster *
00212 FvBaseThread::vision_master()
00213 {
00214 return this;
00215 }
00216
00217
00218 Camera *
00219 FvBaseThread::register_for_camera(const char *camera_string, Thread *thread,
00220 colorspace_t cspace)
00221 {
00222 Camera *c = NULL;
00223 __aqts.lock();
00224
00225 logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string);
00226
00227 VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread);
00228 if ( vision_thread == NULL ) {
00229 throw TypeMismatchException("Thread is not a vision thread");
00230 }
00231
00232 CameraArgumentParser *cap = new CameraArgumentParser(camera_string);
00233 try {
00234 std::string id = cap->cam_type() + "." + cap->cam_id();
00235 if ( __aqts.find(id) != __aqts.end() ) {
00236
00237 c = __aqts[id]->camera_instance(cspace,
00238 (vision_thread->vision_thread_mode() ==
00239 VisionAspect::CONTINUOUS));
00240
00241 __aqts[id]->vision_threads->add_waiting_thread(thread);
00242
00243 } else {
00244 Camera *cam = NULL;
00245 try {
00246 cam = CameraFactory::instance(cap);
00247 cam->open();
00248 cam->start();
00249 } catch (Exception &e) {
00250 delete cam;
00251 e.append("Could not open or start camera");
00252 throw;
00253 }
00254
00255 FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock);
00256
00257 c = aqt->camera_instance(cspace, (vision_thread->vision_thread_mode() ==
00258 VisionAspect::CONTINUOUS));
00259
00260 aqt->vision_threads->add_waiting_thread(thread);
00261
00262 __aqts[id] = aqt;
00263 thread_collector->add(aqt);
00264
00265
00266
00267 logger->log_info(name(), "Acquisition thread '%s' started for thread '%s' and camera '%s'",
00268 aqt->name(), thread->name(), id.c_str());
00269
00270 }
00271
00272 thread->add_notification_listener(this);
00273
00274 } catch (UnknownCameraTypeException &e) {
00275 delete cap;
00276 e.append("FvBaseVisionMaster: could not instantiate camera");
00277 __aqts.unlock();
00278 throw;
00279 } catch (Exception &e) {
00280 delete cap;
00281 e.append("FvBaseVisionMaster: could not open or start camera");
00282 __aqts.unlock();
00283 throw;
00284 }
00285
00286 delete cap;
00287
00288 __aqts.unlock();
00289 return c;
00290 }
00291
00292
00293 Camera *
00294 FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread)
00295 {
00296 Camera *camera = register_for_camera(camera_string, thread, CS_UNKNOWN);
00297 CameraArgumentParser cap(camera_string);
00298 try {
00299 std::string id = cap.cam_type() + "." + cap.cam_id();
00300 __aqts.lock();
00301 if ( __aqts.find(id) != __aqts.end() ) {
00302 __aqts[id]->raw_subscriber_thread = thread;
00303 }
00304 __aqts.unlock();
00305 } catch (Exception &e) {
00306 __aqts.unlock();
00307 throw;
00308 }
00309 return camera;
00310 }
00311
00312 CameraControl *
00313 FvBaseThread::create_camctrl(const char *camera_string)
00314 {
00315 CameraControl *cc = CameraControlFactory::instance(camera_string);
00316 if (cc) {
00317 __owned_controls.lock();
00318 __owned_controls.push_back(cc);
00319 __owned_controls.sort();
00320 __owned_controls.unique();
00321 __owned_controls.unlock();
00322 return cc;
00323 } else {
00324 throw Exception("Cannot create camera control of desired type");
00325 }
00326 }
00327
00328 CameraControl *
00329 FvBaseThread::acquire_camctrl(const char *cam_string)
00330 {
00331 CameraArgumentParser cap(cam_string);
00332 std::string id = cap.cam_type() + "." + cap.cam_id();
00333
00334
00335 MutexLocker lock(__aqts.mutex());
00336 if (__aqts.find(id) != __aqts.end()) {
00337 return CameraControlFactory::instance(__aqts[id]->get_camera());
00338 } else {
00339 return create_camctrl(cam_string);
00340 }
00341 }
00342
00343
00344 CameraControl *
00345 FvBaseThread::acquire_camctrl(const char *cam_string,
00346 const std::type_info &typeinf)
00347 {
00348 CameraArgumentParser cap(cam_string);
00349 std::string id = cap.cam_type() + "." + cap.cam_id();
00350
00351
00352 MutexLocker lock(__aqts.mutex());
00353 if (__aqts.find(id) != __aqts.end()) {
00354 return CameraControlFactory::instance(typeinf, __aqts[id]->get_camera());
00355 } else {
00356 return create_camctrl(cam_string);
00357 }
00358 }
00359
00360
00361 void
00362 FvBaseThread::release_camctrl(CameraControl *cc)
00363 {
00364 __owned_controls.lock();
00365 LockList<CameraControl *>::iterator f;
00366 if ((f = std::find(__owned_controls.begin(), __owned_controls.end(), cc)) != __owned_controls.end()) {
00367 delete *f;
00368 __owned_controls.erase(f);
00369 }
00370 __owned_controls.unlock();
00371 }
00372
00373
00374
00375
00376
00377
00378 void
00379 FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads)
00380 {
00381 if ( (num_cyclic_threads + 1) != __aqt_barrier->count() ) {
00382 delete __aqt_barrier;
00383 __aqt_barrier = new Barrier( num_cyclic_threads + 1 );
00384 }
00385 }
00386
00387
00388 void
00389 FvBaseThread::unregister_thread(Thread *thread)
00390 {
00391 __aqts.lock();
00392 unsigned int num_cyclic_threads = 0;
00393
00394 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00395
00396
00397 __ait->second->vision_threads->remove_thread(thread);
00398
00399 if (__ait->second->raw_subscriber_thread == thread) {
00400 __ait->second->raw_subscriber_thread = NULL;
00401 }
00402
00403 if ( __ait->second->vision_threads->has_cyclic_thread() ) {
00404 ++num_cyclic_threads;
00405
00406 } else if (__ait->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
00407 logger->log_info(name(), "Switching acquisition thread %s to continuous mode "
00408 "on unregister", __ait->second->name());
00409
00410 __ait->second->prepare_finalize();
00411 __ait->second->cancel();
00412 __ait->second->join();
00413 __ait->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
00414 __ait->second->start();
00415 __ait->second->cancel_finalize();
00416 }
00417 }
00418
00419 cond_recreate_barrier(num_cyclic_threads);
00420
00421 __aqts.unlock();
00422 }
00423
00424
00425 bool
00426 FvBaseThread::thread_started(Thread *thread) throw()
00427 {
00428 __aqts.lock();
00429 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00430 if (__ait->second->vision_threads->has_waiting_thread(thread)) {
00431 __started_threads.lock();
00432 __started_threads[thread] = __ait->second;
00433 __started_threads.unlock();
00434 }
00435 }
00436 __aqts.unlock();
00437
00438 return false;
00439 }
00440
00441
00442 bool
00443 FvBaseThread::thread_init_failed(Thread *thread) throw()
00444 {
00445 __aqts.lock();
00446 for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00447 __ait->second->vision_threads->remove_waiting_thread(thread);
00448 }
00449 __aqts.unlock();
00450
00451 return false;
00452 }