thread_list.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <core/threading/thread_list.h>
00025 #include <core/threading/thread.h>
00026 #include <core/threading/mutex.h>
00027 #include <core/threading/barrier.h>
00028 #include <core/threading/interruptible_barrier.h>
00029 #include <core/exceptions/software.h>
00030 #include <core/exceptions/system.h>
00031
00032 #include <string>
00033 #include <cstring>
00034 #include <cstdlib>
00035 #include <cstdio>
00036 #include <unistd.h>
00037
00038 namespace fawkes {
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 ThreadListSealedException::ThreadListSealedException(const char *operation)
00053 : Exception("ThreadList is sealed")
00054 {
00055 append("Operation '%s' is not allowed on a sealed thread list", operation);
00056 }
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071 ThreadListNotSealedException::ThreadListNotSealedException(const char *format, ...)
00072 : Exception()
00073 {
00074 va_list va;
00075 va_start(va, format);
00076 append_va(format, va);
00077 va_end(va);
00078 }
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 ThreadList::ThreadList(const char *tlname)
00094 {
00095 __name = strdup(tlname);
00096 __sealed = false;
00097 __finalize_mutex = new Mutex();
00098 __wnw_barrier = NULL;
00099 clear();
00100 }
00101
00102
00103
00104
00105
00106
00107
00108
00109 ThreadList::ThreadList(bool maintain_barrier, const char *tlname)
00110 {
00111 __name = strdup(tlname);
00112 __sealed = false;
00113 __finalize_mutex = new Mutex();
00114 __wnw_barrier = NULL;
00115 clear();
00116 if ( maintain_barrier) update_barrier();
00117 }
00118
00119
00120
00121
00122
00123 ThreadList::ThreadList(const ThreadList &tl)
00124 : LockList<Thread *>(tl)
00125 {
00126 __name = strdup(tl.__name);
00127 __sealed = tl.__sealed;
00128 __finalize_mutex = new Mutex();
00129 __wnw_barrier = NULL;
00130 if ( tl.__wnw_barrier != NULL ) update_barrier();
00131 }
00132
00133
00134
00135 ThreadList::~ThreadList()
00136 {
00137 free(__name);
00138 delete __finalize_mutex;
00139 delete __wnw_barrier;
00140 }
00141
00142
00143
00144 void
00145 ThreadList::wakeup()
00146 {
00147 lock();
00148 for (iterator i = begin(); i != end(); ++i) {
00149 (*i)->wakeup();
00150 }
00151 unlock();
00152 }
00153
00154
00155
00156
00157
00158
00159 void
00160 ThreadList::wakeup_unlocked()
00161 {
00162 for (iterator i = begin(); i != end(); ++i) {
00163 (*i)->wakeup();
00164 }
00165 }
00166
00167
00168
00169
00170
00171 void
00172 ThreadList::wakeup(Barrier *barrier)
00173 {
00174 lock();
00175 for (iterator i = begin(); i != end(); ++i) {
00176 (*i)->wakeup(barrier);
00177 }
00178 unlock();
00179 }
00180
00181
00182
00183
00184
00185
00186
00187 void
00188 ThreadList::wakeup_unlocked(Barrier *barrier)
00189 {
00190 unsigned int count = 1;
00191 for (iterator i = begin(); i != end(); ++i) {
00192 if ( ! (*i)->flagged_bad() ) {
00193 (*i)->wakeup(barrier);
00194 ++count;
00195 }
00196 }
00197 if (count != barrier->count()) {
00198 throw Exception("ThreadList(%s)::wakeup(): barrier has count (%u) different "
00199 "from number of unflagged threads (%u)", __name, barrier->count(), count);
00200 }
00201 }
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213 void
00214 ThreadList::wakeup_and_wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
00215 {
00216 if ( ! __wnw_barrier ) {
00217 throw NullPointerException("ThreadList::wakeup_and_wait() can only be called if "
00218 "barrier is maintained");
00219 }
00220 lock();
00221 try {
00222 wakeup_unlocked(__wnw_barrier);
00223 } catch (Exception &e) {
00224 unlock();
00225 throw;
00226 }
00227 if ( ! __wnw_barrier->wait(timeout_sec, timeout_nanosec) ) {
00228
00229 RefPtr<ThreadList> passed_threads = __wnw_barrier->passed_threads();
00230 ThreadList bad_threads;
00231 for (iterator i = begin(); i != end(); ++i) {
00232 bool ok = false;
00233 for (iterator j = passed_threads->begin(); j != passed_threads->end(); ++j) {
00234 if (*j == *i) {
00235 ok = true;
00236 break;
00237 }
00238 }
00239 if (! ok) {
00240 bad_threads.push_back(*i);
00241 (*i)->set_flag(Thread::FLAG_BAD);
00242 }
00243 }
00244
00245 __wnw_bad_barriers.push_back(make_pair(__wnw_barrier, bad_threads));
00246
00247 __wnw_barrier = NULL;
00248 update_barrier();
00249
00250
00251 std::string s;
00252 if ( bad_threads.size() > 1 ) {
00253 s = "Multiple threads did not finish in time, flagging as bad: ";
00254 for (iterator i = bad_threads.begin(); i != bad_threads.end(); ++i) {
00255 s += std::string((*i)->name()) + " ";
00256 }
00257 } else if (bad_threads.size() == 0) {
00258 s = "Timeout happened, but no bad threads recorded.";
00259 } else {
00260 throw Exception("Thread %s did not finish in time (max %f), flagging as bad",
00261 bad_threads.front()->name(),
00262 (float)timeout_sec + (float)timeout_nanosec / 1000000000.);
00263 }
00264 unlock();
00265 throw Exception("%s", s.c_str());
00266 }
00267 unlock();
00268 }
00269
00270
00271
00272
00273
00274
00275 void
00276 ThreadList::set_maintain_barrier(bool maintain_barrier)
00277 {
00278 lock();
00279 delete __wnw_barrier;
00280 __wnw_barrier = NULL;
00281 if ( maintain_barrier ) update_barrier();
00282 unlock();
00283 }
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293 void
00294 ThreadList::try_recover(std::list<std::string> &recovered_threads)
00295 {
00296 lock();
00297 bool changed = false;
00298 __wnw_bbit = __wnw_bad_barriers.begin();
00299 while (__wnw_bbit != __wnw_bad_barriers.end()) {
00300 iterator i = __wnw_bbit->second.begin();
00301 while (i != __wnw_bbit->second.end()) {
00302 if ( (*i)->waiting() ) {
00303
00304 recovered_threads.push_back((*i)->name());
00305
00306 (*i)->unset_flag(Thread::FLAG_BAD);
00307 i = __wnw_bbit->second.erase(i);
00308 changed = true;
00309 } else {
00310 ++i;
00311 }
00312 }
00313 if ( __wnw_bbit->second.empty() ) {
00314 delete __wnw_bbit->first;
00315 __wnw_bbit = __wnw_bad_barriers.erase(__wnw_bbit);
00316 } else {
00317 ++__wnw_bbit;
00318 }
00319 }
00320 if ( changed ) update_barrier();
00321 unlock();
00322 }
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335 void
00336 ThreadList::init(ThreadInitializer *initializer, ThreadFinalizer *finalizer)
00337 {
00338 CannotInitializeThreadException cite;
00339 ThreadList initialized_threads;
00340 bool success = true;
00341 for (ThreadList::iterator i = begin(); i != end(); ++i) {
00342 try {
00343 initializer->init(*i);
00344 (*i)->init();
00345 initialized_threads.push_back(*i);
00346 } catch (CannotInitializeThreadException &e) {
00347 notify_of_failed_init();
00348 cite.append("Initializing thread '%s' in list '%s' failed", (*i)->name(), __name);
00349 cite.append(e);
00350 success = false;
00351 break;
00352 } catch (Exception &e) {
00353 notify_of_failed_init();
00354 cite.append("Could not initialize thread '%s'", (*i)->name());
00355 cite.append(e);
00356 success = false;
00357 break;
00358 } catch (...) {
00359 notify_of_failed_init();
00360 cite.append("Could not initialize thread '%s'", (*i)->name());
00361 cite.append("Unknown exception caught");
00362 success = false;
00363 break;
00364 }
00365 }
00366
00367 if ( ! success ) {
00368 initialized_threads.finalize(finalizer);
00369 throw cite;
00370 }
00371 }
00372
00373
00374
00375
00376
00377
00378
00379
00380 void
00381 ThreadList::start()
00382 {
00383 for (iterator i = begin(); i != end(); ++i) {
00384 (*i)->start();
00385 }
00386 }
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406 void
00407 ThreadList::cancel()
00408 {
00409 for (iterator i = begin(); i != end(); ++i) {
00410 (*i)->cancel();
00411 }
00412 }
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432 void
00433 ThreadList::join()
00434 {
00435 for (iterator i = begin(); i != end(); ++i) {
00436 (*i)->join();
00437 }
00438 }
00439
00440
00441
00442
00443
00444
00445
00446
00447 void
00448 ThreadList::stop()
00449 {
00450 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00451 (*i)->cancel();
00452 (*i)->join();
00453
00454 usleep(5000);
00455 }
00456 }
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469 bool
00470 ThreadList::prepare_finalize(ThreadFinalizer *finalizer)
00471 {
00472 __finalize_mutex->lock();
00473 bool can_finalize = true;
00474 CannotFinalizeThreadException cfte("Cannot finalize one or more threads");
00475 bool threw_exception = false;
00476 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00477
00478
00479
00480 try {
00481 if ( ! finalizer->prepare_finalize(*i) ) {
00482 can_finalize = false;
00483 }
00484 if ( ! (*i)->prepare_finalize() ) {
00485 can_finalize = false;
00486 }
00487 } catch (CannotFinalizeThreadException &e) {
00488 cfte.append("Thread '%s' throw an exception while preparing finalization of "
00489 "ThreadList '%s'", (*i)->name(), __name);
00490 threw_exception = true;
00491 }
00492 }
00493 __finalize_mutex->unlock();
00494 if ( threw_exception ) {
00495 throw cfte;
00496 }
00497 return can_finalize;
00498 }
00499
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510 void
00511 ThreadList::finalize(ThreadFinalizer *finalizer)
00512 {
00513 bool error = false;
00514 Exception me("One or more threads failed to finalize");
00515 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00516 try {
00517 finalizer->finalize(*i);
00518 } catch (CannotFinalizeThreadException &e) {
00519 error = true;
00520 me.append("Could not finalize thread '%s' in list '%s'", (*i)->name(), __name);
00521 me.append(e);
00522 }
00523 try {
00524 (*i)->finalize();
00525 } catch (CannotFinalizeThreadException &e) {
00526 error = true;
00527 me.append("AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
00528 me.append(e);
00529 } catch (Exception &e) {
00530 me.append("AspectIniFin called Thread[%s]::finalize() which failed", (*i)->name());
00531 me.append(e);
00532 } catch (...) {
00533 me.append("Thread[%s]::finalize() threw unsupported exception", (*i)->name());
00534 }
00535 }
00536 if ( error ) {
00537 throw me;
00538 }
00539 }
00540
00541
00542
00543
00544 void
00545 ThreadList::cancel_finalize()
00546 {
00547 __finalize_mutex->lock();
00548 for (reverse_iterator i = rbegin(); i != rend(); ++i) {
00549 (*i)->cancel_finalize();
00550 }
00551 __finalize_mutex->unlock();
00552 }
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562 void
00563 ThreadList::set_prepfin_hold(bool hold)
00564 {
00565 iterator i;
00566 try {
00567 for (i = begin(); i != end(); ++i) {
00568 (*i)->set_prepfin_hold(hold);
00569 }
00570 } catch (Exception &e) {
00571
00572
00573 for (iterator j = begin(); j != i; ++j) {
00574 (*j)->set_prepfin_hold(false);
00575 }
00576 throw;
00577 }
00578 }
00579
00580
00581
00582
00583
00584
00585
00586 void
00587 ThreadList::force_stop(ThreadFinalizer *finalizer)
00588 {
00589 try {
00590 prepare_finalize(finalizer);
00591 stop();
00592 finalize(finalizer);
00593 } catch (Exception &e) {
00594
00595 }
00596 }
00597
00598
00599
00600
00601
00602
00603
00604 const char *
00605 ThreadList::name()
00606 {
00607 return __name;
00608 }
00609
00610
00611
00612
00613
00614
00615 void
00616 ThreadList::set_name(const char *format, ...)
00617 {
00618 va_list va;
00619 va_start(va, format);
00620
00621 char *tmpname;
00622 if (vasprintf(&tmpname, format, va) != -1) {
00623 free(__name);
00624 __name = tmpname;
00625 } else {
00626 throw OutOfMemoryException("ThreadList::set_name(): vasprintf() failed");
00627 }
00628 va_end(va);
00629 }
00630
00631
00632
00633
00634
00635
00636
00637 bool
00638 ThreadList::sealed()
00639 {
00640 return __sealed;
00641 }
00642
00643
00644
00645 void
00646 ThreadList::seal()
00647 {
00648 __sealed = true;
00649 }
00650
00651
00652
00653
00654
00655
00656 void
00657 ThreadList::push_front(Thread *thread)
00658 {
00659 if ( __sealed ) throw ThreadListSealedException("push_front");
00660
00661 LockList<Thread *>::push_front(thread);
00662 if ( __wnw_barrier) update_barrier();
00663 }
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674 void
00675 ThreadList::push_front_locked(Thread *thread)
00676 {
00677 if ( __sealed ) throw ThreadListSealedException("push_front_locked");
00678
00679 lock();
00680 LockList<Thread *>::push_front(thread);
00681 if ( __wnw_barrier) update_barrier();
00682 unlock();
00683 }
00684
00685
00686
00687
00688
00689
00690 void
00691 ThreadList::push_back(Thread *thread)
00692 {
00693 if ( __sealed ) throw ThreadListSealedException("push_back");
00694
00695 LockList<Thread *>::push_back(thread);
00696 if ( __wnw_barrier) update_barrier();
00697 }
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708 void
00709 ThreadList::push_back_locked(Thread *thread)
00710 {
00711 if ( __sealed ) throw ThreadListSealedException("push_back_locked");
00712
00713 lock();
00714 LockList<Thread *>::push_back(thread);
00715 if ( __wnw_barrier) update_barrier();
00716 unlock();
00717 }
00718
00719
00720
00721
00722
00723 void
00724 ThreadList::clear()
00725 {
00726 if ( __sealed ) throw ThreadListSealedException("clear");
00727
00728 LockList<Thread *>::clear();
00729 if ( __wnw_barrier) update_barrier();
00730 }
00731
00732
00733
00734
00735
00736 void
00737 ThreadList::remove(Thread *thread)
00738 {
00739 if ( __sealed ) throw ThreadListSealedException("remove_locked");
00740
00741 LockList<Thread *>::remove(thread);
00742 if ( __wnw_barrier) update_barrier();
00743 }
00744
00745
00746
00747
00748
00749 void
00750 ThreadList::remove_locked(Thread *thread)
00751 {
00752 if ( __sealed ) throw ThreadListSealedException("remove_locked");
00753
00754 lock();
00755 LockList<Thread *>::remove(thread);
00756 if ( __wnw_barrier) update_barrier();
00757 unlock();
00758 }
00759
00760
00761
00762 void
00763 ThreadList::pop_front()
00764 {
00765 if ( __sealed ) throw ThreadListSealedException("pop_front");
00766
00767 LockList<Thread *>::pop_front();
00768 if ( __wnw_barrier) update_barrier();
00769 }
00770
00771
00772
00773 void
00774 ThreadList::pop_back()
00775 {
00776 if ( __sealed ) throw ThreadListSealedException("pop_back");
00777
00778 LockList<Thread *>::pop_back();
00779 if ( __wnw_barrier) update_barrier();
00780 }
00781
00782
00783
00784
00785
00786
00787 ThreadList::iterator
00788 ThreadList::erase(iterator pos)
00789 {
00790 if ( __sealed ) throw ThreadListSealedException("erase");
00791
00792 ThreadList::iterator rv = LockList<Thread *>::erase(pos);
00793 if ( __wnw_barrier) update_barrier();
00794 return rv;
00795 }
00796
00797
00798
00799 void
00800 ThreadList::update_barrier()
00801 {
00802 unsigned int num = 1;
00803 for (iterator i = begin(); i != end(); ++i) {
00804 if (! (*i)->flagged_bad() ) ++num;
00805 }
00806 delete __wnw_barrier;
00807 __wnw_barrier = new InterruptibleBarrier(num);
00808 }
00809
00810
00811
00812 void
00813 ThreadList::notify_of_failed_init()
00814 {
00815 for (ThreadList::iterator i = begin(); i != end(); ++i) {
00816 (*i)->notify_of_failed_init();
00817 }
00818 }
00819
00820
00821 }