thread_manager.cpp

00001
00002 /***************************************************************************
00003  *  thread_manager.cpp - Thread manager
00004  *
00005  *  Created: Thu Nov  3 19:11:31 2006 (on train to Cologne)
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023
00024 #include <mainapp/thread_manager.h>
00025 #include <core/threading/thread.h>
00026 #include <core/threading/mutex_locker.h>
00027 #include <core/threading/wait_condition.h>
00028 #include <core/threading/thread_initializer.h>
00029 #include <core/threading/thread_finalizer.h>
00030 #include <core/exceptions/software.h>
00031 #include <core/exceptions/system.h>
00032
00033 #include <aspect/blocked_timing.h>
00034
00035 using namespace fawkes;
00036 
00037 /** @class FawkesThreadManager mainapp/thread_manager.h
00038  * Thread Manager.
00039  * This class provides a manager for the threads. Threads are memorized by
00040  * their wakeup hook. When the thread manager is deleted, all threads are
00041  * appropriately cancelled, joined and deleted. Thus the thread manager
00042  * can be used for "garbage collection" of threads.
00043  *
00044  * The thread manager allows easy wakeup of threads of a given wakeup hook.
00045  *
00046  * The thread manager needs a thread initializer. Each thread that is added
00047  * to the thread manager is initialized with this. The runtime type information
00048  * (RTTI) supplied by C++ can be used to initialize threads if appropriate
00049  * (if the thread has certain aspects that need special treatment).
00050  *
00051  * @author Tim Niemueller
00052  */
00053
00054 FawkesThreadManager::FawkesThreadManagerAspectCollector::FawkesThreadManagerAspectCollector(FawkesThreadManager *parent_manager)
00055 {
00056   __parent_manager = parent_manager;
00057 }
00058
00059
00060 void
00061 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(ThreadList &tl)
00062 {
00063   BlockedTimingAspect *timed_thread;
00064
00065   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00066     if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00067       throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00068     }
00069   }
00070
00071   __parent_manager->add_maybelocked(tl, /* lock */ false);
00072 }
00073
00074
00075 void
00076 FawkesThreadManager::FawkesThreadManagerAspectCollector::add(Thread *t)
00077 {
00078   BlockedTimingAspect *timed_thread;
00079
00080   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00081     throw IllegalArgumentException("ThreadProducerAspect may not add threads with BlockedTimingAspect");
00082   }
00083
00084   __parent_manager->add_maybelocked(t, /* lock */ false);
00085 }
00086
00087
00088 void
00089 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(ThreadList &tl)
00090 {
00091   BlockedTimingAspect *timed_thread;
00092
00093   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00094     if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(*i)) != NULL ) {
00095       throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00096     }
00097   }
00098
00099   __parent_manager->remove_maybelocked(tl, /* lock */ false);
00100 }
00101
00102
00103 void
00104 FawkesThreadManager::FawkesThreadManagerAspectCollector::remove(Thread *t)
00105 {
00106   BlockedTimingAspect *timed_thread;
00107
00108   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00109     throw IllegalArgumentException("ThreadProducerAspect may not remove threads with BlockedTimingAspect");
00110   }
00111
00112   __parent_manager->remove_maybelocked(t, /* lock */ false);
00113 }
00114
00115
00116 void
00117 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::ThreadList &tl)
00118 {
00119   throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00120 }
00121
00122 void
00123 FawkesThreadManager::FawkesThreadManagerAspectCollector::force_remove(fawkes::Thread *t)
00124 {
00125   throw AccessViolationException("ThreadManagerAspect threads may not force removal of threads");
00126 }
00127
00128 
00129 /** Constructor.
00130  */
00131 FawkesThreadManager::FawkesThreadManager()
00132 {
00133   initializer = NULL;
00134   finalizer   = NULL;
00135   threads.clear();
00136   waitcond_timedthreads = new WaitCondition();
00137   __interrupt_timed_thread_wait = false;
00138   __aspect_collector = new FawkesThreadManagerAspectCollector(this);
00139 }
00140
00141 
00142 /** Destructor. */
00143 FawkesThreadManager::~FawkesThreadManager()
00144 {
00145   // stop all threads, we call finalize, and we run through it as long as there are
00146   // still running threads, after that, we force the thread's death.
00147   for (tit = threads.begin(); tit != threads.end(); ++tit) {
00148     (*tit).second.force_stop(finalizer);
00149   }
00150   untimed_threads.force_stop(finalizer);
00151   threads.clear();
00152
00153   delete waitcond_timedthreads;
00154   delete __aspect_collector;
00155 }
00156
00157 
00158 /** Set initializer/finalizer.
00159  * This method has to be called before any thread is added/removed.
00160  * @param initializer thread initializer
00161  * @param finalizer thread finalizer
00162  */
00163 void
00164 FawkesThreadManager::set_inifin(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
00165 {
00166   this->initializer = initializer;
00167   this->finalizer   = finalizer;
00168 }
00169
00170 
00171 /** Remove the given thread from internal structures.
00172  * Thread is removed from the internal structures. If the thread has the
00173  * BlockedTimingAspect then the hook is added to the changed list.
00174  *
00175  * @param t thread to remove
00176  * @param changed list of changed hooks, appropriate hook is added if necessary
00177  */
00178 void
00179 FawkesThreadManager::internal_remove_thread(Thread *t)
00180 {
00181   BlockedTimingAspect *timed_thread;
00182
00183   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00184     // find thread and remove
00185     BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00186     if ( threads.find(hook) != threads.end() ) {
00187       threads[hook].remove_locked(t);
00188     }
00189   } else {
00190     untimed_threads.remove_locked(t);
00191   }
00192 }
00193
00194 
00195 /** Add the given thread to internal structures.
00196  * Thread is added to the internal structures. If the thread has the
00197  * BlockedTimingAspect then the hook is added to the changed list.
00198  *
00199  * @param t thread to add
00200  * @param changed list of changed hooks, appropriate hook is added if necessary
00201  */
00202 void
00203 FawkesThreadManager::internal_add_thread(Thread *t)
00204 {
00205   BlockedTimingAspect *timed_thread;
00206   if ( (timed_thread = dynamic_cast<BlockedTimingAspect *>(t)) != NULL ) {
00207     BlockedTimingAspect::WakeupHook hook = timed_thread->blockedTimingAspectHook();
00208
00209     if ( threads.find(hook) == threads.end() ) {
00210       threads[hook].set_name("FawkesThreadManagerList Hook %i", hook);
00211       threads[hook].set_maintain_barrier(true);
00212     }
00213     threads[hook].push_back_locked(t);
00214
00215     waitcond_timedthreads->wake_all();
00216   } else {
00217     untimed_threads.push_back_locked(t);
00218   }
00219 }
00220
00221 
00222 /** Add threads.
00223  * Add the given threads to the thread manager. The threads are initialised
00224  * as appropriate and started. See the class documentation for supported
00225  * specialisations of threads and the performed initialisation steps.
00226  * If the thread initializer cannot initalize one or more threads no thread
00227  * is added. In this regard the operation is atomic, either all threads are
00228  * added or none.
00229  * @param tl thread list with threads to add
00230  * @exception CannotInitializeThreadException thrown if at least one of the
00231  * threads could not be initialised
00232  */
00233 void
00234 FawkesThreadManager::add_maybelocked(ThreadList &tl, bool lock)
00235 {
00236   if ( ! (initializer && finalizer) ) {
00237     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00238   }
00239
00240   if ( tl.sealed() ) {
00241     throw Exception("Not accepting new threads from list that is not fresh, "
00242                     "list '%s' already sealed", tl.name());
00243   }
00244
00245   tl.lock();
00246
00247   // Try to initialise all threads
00248   try {
00249     tl.init(initializer, finalizer);
00250   } catch (Exception &e) {
00251     tl.unlock();
00252     throw;
00253   }
00254
00255   tl.seal();
00256   tl.start();
00257
00258   // All thread initialized, now add threads to internal structure
00259   MutexLocker locker(threads.mutex(), lock);
00260   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00261     internal_add_thread(*i);
00262   }
00263
00264   tl.unlock();
00265 }
00266
00267 
00268 /** Add one thread.
00269  * Add the given thread to the thread manager. The thread is initialized
00270  * as appropriate and started. See the class documentation for supported
00271  * specialisations of threads and the performed initialisation steps.
00272  * If the thread initializer cannot initalize the thread it is not added.
00273  * @param thread thread to add
00274  * @param lock if true the environment is locked before adding the thread
00275  * @exception CannotInitializeThreadException thrown if at least the
00276  * thread could not be initialised
00277  */
00278 void
00279 FawkesThreadManager::add_maybelocked(Thread *thread, bool lock)
00280 {
00281   if ( thread == NULL ) {
00282     throw NullPointerException("FawkesThreadMananger: cannot add NULL as thread");
00283   }
00284
00285   if ( ! (initializer && finalizer) ) {
00286     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00287   }
00288
00289   try {
00290     initializer->init(thread);
00291   } catch (CannotInitializeThreadException &e) {
00292     e.append("Adding thread in FawkesThreadManager failed");
00293     throw;
00294   }
00295
00296   thread->start();
00297   MutexLocker locker(threads.mutex(), lock);
00298   internal_add_thread(thread);
00299 }
00300
00301 
00302 /** Remove the given threads.
00303  * The thread manager tries to finalize and stop the threads and then removes the
00304  * threads from the internal structures.
00305  *
00306  * This may fail if at least one thread of the given list cannot be finalized, for
00307  * example if prepare_finalize() returns false or if the thread finalizer cannot
00308  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
00309  *
00310  * @param tl threads to remove.
00311  * @exception CannotFinalizeThreadException At least one thread cannot be safely
00312  * finalized
00313  * @exception ThreadListNotSealedException if the given thread lits tl is not
00314  * sealed the thread manager will refuse to remove it
00315  */
00316 void
00317 FawkesThreadManager::remove_maybelocked(ThreadList &tl, bool lock)
00318 {
00319   if ( ! (initializer && finalizer) ) {
00320     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00321   }
00322
00323
00324   if ( ! tl.sealed() ) {
00325     throw ThreadListNotSealedException("(FawkesThreadManager) Cannot remove unsealed thread "
00326                                        "list. Not accepting unsealed list '%s' for removal",
00327                                        tl.name());
00328   }
00329
00330   tl.lock();
00331   MutexLocker locker(threads.mutex(), lock);
00332
00333   try {
00334     if ( ! tl.prepare_finalize(finalizer) ) {
00335       tl.cancel_finalize();
00336       tl.unlock();
00337       throw CannotFinalizeThreadException("One or more threads in list '%s' cannot be "
00338                                           "finalized", tl.name());
00339     }
00340   } catch (CannotFinalizeThreadException &e) {
00341     tl.unlock();
00342     throw;
00343   } catch (Exception &e) {
00344     tl.unlock();
00345     e.append("One or more threads in list '%s' cannot be finalized", tl.name());
00346     throw CannotFinalizeThreadException(e);
00347   }
00348
00349   tl.stop();
00350   tl.finalize(finalizer);
00351
00352   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00353     internal_remove_thread(*i);
00354   }
00355
00356   tl.unlock();
00357 }
00358
00359 
00360 /** Remove the given thread.
00361  * The thread manager tries to finalize and stop the thread and then removes the
00362  * thread from the internal structures.
00363  *
00364  * This may fail if the thread cannot be finalized, for
00365  * example if prepare_finalize() returns false or if the thread finalizer cannot
00366  * finalize the thread. In this case a CannotFinalizeThreadException is thrown.
00367  *
00368  * @param thread thread to remove.
00369  * @exception CannotFinalizeThreadException At least one thread cannot be safely
00370  * finalized
00371  */
00372 void
00373 FawkesThreadManager::remove_maybelocked(Thread *thread, bool lock)
00374 {
00375   if ( thread == NULL ) return;
00376
00377   if ( ! (initializer && finalizer) ) {
00378     throw NullPointerException("FawkesThreadManager: initializer/finalizer not set");
00379   }
00380
00381   MutexLocker locker(threads.mutex(), lock);
00382   try {
00383     if ( ! thread->prepare_finalize() ) {
00384       thread->cancel_finalize();
00385       throw CannotFinalizeThreadException("Thread '%s'cannot be finalized", thread->name());
00386     }
00387   } catch (CannotFinalizeThreadException &e) {
00388     e.append("FawkesThreadManager cannot stop thread '%s'", thread->name());
00389     thread->cancel_finalize();
00390     throw;
00391   }
00392
00393   thread->cancel();
00394   thread->join();
00395   finalizer->finalize(thread);
00396   thread->finalize();
00397
00398   internal_remove_thread(thread);
00399 }
00400
00401
00402
00403 
00404 /** Force removal of the given threads.
00405  * The thread manager tries to finalize and stop the threads and then removes the
00406  * threads from the internal structures.
00407  *
00408  * This will succeed even if a thread of the given list cannot be finalized, for
00409  * example if prepare_finalize() returns false or if the thread finalizer cannot
00410  * finalize the thread.
00411  *
00412  * <b>Caution, using this function may damage your robot.</b>
00413  *
00414  * @param tl threads to remove.
00415  * @exception ThreadListNotSealedException if the given thread lits tl is not
00416  * sealed the thread manager will refuse to remove it
00417  * The threads are removed from thread manager control. The threads will be stopped
00418  * before they are removed (may cause unpredictable results otherwise).
00419  */
00420 void
00421 FawkesThreadManager::force_remove(ThreadList &tl)
00422 {
00423   if ( ! tl.sealed() ) {
00424     throw ThreadListNotSealedException("Not accepting unsealed list '%s' for removal",
00425                                        tl.name());
00426   }
00427
00428   tl.lock();
00429   threads.mutex()->stopby();
00430   tl.force_stop(finalizer);
00431
00432   for (ThreadList::iterator i = tl.begin(); i != tl.end(); ++i) {
00433     internal_remove_thread(*i);
00434   }
00435
00436   tl.unlock();
00437 }
00438
00439 
00440 /** Force removal of the given thread.
00441  * The thread manager tries to finalize and stop the thread and then removes the
00442  * thread from the internal structures.
00443  *
00444  * This will succeed even if the thread cannot be finalized, for
00445  * example if prepare_finalize() returns false or if the thread finalizer cannot
00446  * finalize the thread.
00447  *
00448  * <b>Caution, using this function may damage your robot.</b>
00449  *
00450  * @param thread thread to remove.
00451  * @exception ThreadListNotSealedException if the given thread lits tl is not
00452  * sealed the thread manager will refuse to remove it
00453  * The threads are removed from thread manager control. The threads will be stopped
00454  * before they are removed (may cause unpredictable results otherwise).
00455  */
00456 void
00457 FawkesThreadManager::force_remove(fawkes::Thread *thread)
00458 {
00459   MutexLocker lock(threads.mutex());
00460   try {
00461     thread->prepare_finalize();
00462   } catch (Exception &e) {
00463     // ignore
00464   }
00465
00466   thread->cancel();
00467   thread->join();
00468   if (finalizer) finalizer->finalize(thread);
00469   thread->finalize();
00470
00471   internal_remove_thread(thread);
00472 }
00473
00474
00475 void
00476 FawkesThreadManager::wakeup_and_wait(BlockedTimingAspect::WakeupHook hook,
00477                                      unsigned int timeout_usec)
00478 {
00479   MutexLocker lock(threads.mutex());
00480
00481   unsigned int timeout_sec = 0;
00482   if (timeout_usec >= 1000000) {
00483     timeout_sec   = timeout_usec / 1000000;
00484     timeout_usec -= timeout_sec  * 1000000;
00485   }
00486
00487   // Note that the following lines might throw an exception, we just pass it on
00488   if ( threads.find(hook) != threads.end() ) {
00489     threads[hook].wakeup_and_wait(timeout_sec, timeout_usec * 1000);
00490   }
00491 }
00492
00493
00494 void
00495 FawkesThreadManager::wakeup(BlockedTimingAspect::WakeupHook hook, Barrier *barrier)
00496 {
00497   MutexLocker lock(threads.mutex());
00498
00499   if ( threads.find(hook) != threads.end() ) {
00500     if ( barrier ) {
00501       threads[hook].wakeup(barrier);
00502     } else {
00503       threads[hook].wakeup();
00504     }
00505     if ( threads[hook].size() == 0 ) {
00506       threads.erase(hook);
00507     }
00508   }
00509 }
00510
00511
00512 void
00513 FawkesThreadManager::try_recover(std::list<std::string> &recovered_threads)
00514 {
00515   threads.lock();
00516   for (tit = threads.begin(); tit != threads.end(); ++tit) {
00517     tit->second.try_recover(recovered_threads);
00518   }
00519   threads.unlock();
00520 }
00521
00522
00523 bool
00524 FawkesThreadManager::timed_threads_exist()
00525 {
00526   return (threads.size() > 0);
00527 }
00528
00529
00530 void
00531 FawkesThreadManager::wait_for_timed_threads()
00532 {
00533   __interrupt_timed_thread_wait = false;
00534   waitcond_timedthreads->wait();
00535   if ( __interrupt_timed_thread_wait ) {
00536     __interrupt_timed_thread_wait = false;
00537     throw InterruptedException("Waiting for timed threads was interrupted");
00538   }
00539 }
00540
00541 void
00542 FawkesThreadManager::interrupt_timed_thread_wait()
00543 {
00544   __interrupt_timed_thread_wait = true;
00545   waitcond_timedthreads->wake_all();
00546 }
00547
00548
00549 
00550 /** Get a thread collector to be used for an aspect initializer.
00551  * @return thread collector instance to use for ThreadProducerAspect.
00552  */
00553 ThreadCollector *
00554 FawkesThreadManager::aspect_collector() const
00555 {
00556   return __aspect_collector;
00557 }