00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef queue_h
00023 #define queue_h
00024
00025 #ifndef _REENTRANT
00026 #define _REENTRANT
00027 #endif
00028
00029 #include "config.h"
00030 #include "base.h"
00031 #include "log.h"
00032 #include "util.h"
00033 #include <map>
00034 #include <list>
00035 #include <pthread.h>
00036 #include <setjmp.h>
00037 #include <signal.h>
00038 #include <time.h>
00039 #include <iostream>
00040
00041 using std::multimap;
00042 using std::list;
00043
00044 namespace karoo {
00045
00046 class pebble;
00047 class exepool;
00048
00049 extern void initQueues();
00050 extern void closeQueues();
00051
00052 typedef struct {
00053 double mean;
00054 double sd;
00055 double min;
00056 double max;
00057
00058 double mean_delay;
00059 double sd_delay;
00060 double min_delay;
00061 double max_delay;
00062
00063 int busy_percent;
00064
00065 int num_iterations;
00066
00067 int num_errors;
00068
00069 bool is_stopped;
00070
00071 double min_idle;
00072 double max_idle;
00073
00074 int queued;
00075 int waiting;
00076 int running;
00077 int timeouts;
00078 } qstats;
00079
00080 struct exeque_comp {
00081 bool operator() (const struct timespec& lhs, const struct timespec& rhs) const;
00082 };
00083
00084
00085
00092 class exeque : public referable
00093 {
00094 private:
00095 pthread_t thread;
00096 bool taken_over;
00097 mutable sem_t stopped;
00098 karoo_mutex stop_sem;
00099 karoo_mutex delete_sem;
00100 mutable karoo_mutex stats_sem;
00101
00102 bool refusing;
00103 bool stopping;
00104 long granularity;
00105 exepool* delete_after_stop;
00106 friend class exepool;
00107 void scheduleDelete(exepool* parent);
00108
00109 double* time_samples;
00110 double* idle_time_samples;
00111 double* delayed_time_samples;
00112 int time_sample_pos;
00113 mutable int time_sample_size;
00114 int time_sample_alloc_size;
00115 int timeouts;
00116 int iters;
00117 int errors;
00118 double idle_time;
00119 int run_count;
00120 int running;
00121 time_t stopped_time;
00122
00123 struct timespec run_time;
00124
00125 sigjmp_buf env;
00126
00127 protected:
00135 multimap<const struct timespec,pebble*,exeque_comp> inbox;
00139 mutable karoo_mutex inbox_sem;
00144 karoo_cond inbox_notifier;
00150
00151
00152 log logger;
00153
00154 public:
00155
00161 exeque(long granularity = 0, int time_sample_alloc_size = 100);
00166 virtual ~exeque();
00167
00172 void setLogger(const log& logger) { this->logger = logger; }
00173
00179 bool start(long granularity = 5, bool take_over = false);
00180
00185 void stop(bool block = false);
00186
00190 void join();
00191
00192
00199 bool isStopped(long wait_usec = 0) const;
00200
00202 void refuse();
00203
00204
00205
00210 void add(pebble* p);
00216 void add(const struct timespec& ts, pebble* p);
00224 void add(int after_seconds, pebble* p, long after_nano_seconds = 0);
00225
00229 void run();
00230
00241 void reset(long granularity = 0, int time_sample_alloc_size = 100);
00242
00289 const qstats& getStats(qstats& qs, bool reset = false) const;
00290
00295 int getBusyPercent(bool reset = false);
00300 int getRunCount(bool reset = false);
00305 int getErrorCount(bool reset = false);
00310 int getTimeoutCount(bool reset = false);
00314 int getWaitingCount() const;
00318 int getQueuedCount() const;
00323 int getImminentCount(unsigned usecs = 0) const;
00324
00330 bool hasBeenIdleSince(const struct timespec& comp) const;
00331
00335 pthread_t getThread() { return thread; }
00336
00341 sigjmp_buf& getSigLongJmpEnv() { return env; }
00342 };
00343
00344 class pebble: public referable
00345 {
00346 private:
00347 friend class exeque;
00348 karoo_mutex sem;
00349 bool dereference_scheduled;
00350 protected:
00351 exeque* q;
00352 text exception;
00353 bool error;
00354 public:
00355 pebble();
00356 virtual ~pebble();
00357 virtual void run() = 0;
00358 exeque* getQueue() { return q; }
00359 const text& getException() const { return exception; }
00360 bool getSuccess() const { return !error; }
00361 void dereferenceAfterRun() { dereference_scheduled = true; }
00362 };
00363
00364 class poolstats {
00365 private:
00366 int size;
00367 qstats* qs;
00368 friend class exepool;
00369 public:
00370 poolstats() { size = 0; qs = NULL; }
00371 poolstats(const poolstats& ref);
00372 virtual ~poolstats();
00373
00374 poolstats& operator=(const poolstats& ref);
00375 const qstats& operator[](int i) const { return qs[i]; }
00376 qstats& operator[](int i) { return qs[i]; }
00377 int getSize() const { return size; }
00378 };
00379
00380
00388 class exepool : public referable
00389 {
00390 private:
00391 mutable karoo_mutex sem;
00392 friend class poolpebble;
00393 void schedule(pebble* p);
00394 protected:
00395 long granularity;
00396 int time_sample_alloc_size;
00397 int alloc_size;
00398 int size;
00399 exeque* feed;
00400 exeque** qs;
00401 log logger;
00402 karoo_mutex auto_sem;
00403 list<exeque*> auto_scheduled;
00404 list<exeque*> auto_stopped;
00405 public:
00406 exepool(int alloc_size, long granularity = 5, int time_sample_alloc_size = 100);
00411 virtual ~exepool();
00412
00416 void cleanupLater(exeque* q);
00417
00422 void cleanupAutoDeletes(bool force);
00423
00428 void setLogger(const log& logger);
00429
00430 void stop(bool block = false);
00431 bool isStopped(long wait_usec = 0) const;
00432
00437 int getImminentCount(unsigned usecs = 0) const;
00438
00439 void add(pebble* p);
00440 void add(const struct timespec& ts, pebble* p);
00441 void add(int after_seconds, pebble* p);
00442
00443 const poolstats& getStats(poolstats& ps, bool reset = false) const;
00444 int getSize() const;
00445 };
00446
00447 class poolpebble: public pebble
00448 {
00449 private:
00450 protected:
00451 exepool* pool;
00452 pebble* p;
00453 public:
00454 poolpebble(exepool* pool, pebble* p);
00455 virtual ~poolpebble();
00456 void run();
00457 };
00458
00485 template <typename T>
00486 class synchronised
00487 {
00488 private:
00489 T val;
00490 mutable karoo_mutex sem;
00491 public:
00495 synchronised<T>() { MUTEX_INIT(sem); }
00496 synchronised<T>(const T& val) { MUTEX_INIT(sem); this->val = val; }
00497 synchronised<T>(const synchronised<T>& ref) { MUTEX_INIT(sem); this->val = (T)ref; }
00498 virtual ~synchronised<T>() { MUTEX_DESTROY(sem); }
00499
00509 T operator=(const T& val) {
00510 MUTEX_LOCK(sem);
00511 try {
00512 this->val = val;
00513 MUTEX_UNLOCK(sem);
00514 return val;
00515 }
00516 catch (...) {
00517 try { MUTEX_UNLOCK(sem); } catch(...) {}
00518 throw;
00519 }
00520 }
00521
00532 T operator=(const synchronised<T>& ref) {
00533 T v = (T)ref;
00534 MUTEX_LOCK(sem);
00535 try {
00536 val = v;
00537 MUTEX_UNLOCK(sem);
00538 return v;
00539 } catch (...) {
00540 try { MUTEX_UNLOCK(sem); } catch(...) {}
00541 throw;
00542 }
00543 }
00544
00554 operator T() const {
00555 MUTEX_LOCK(sem);
00556 try {
00557 T v = val;
00558 MUTEX_UNLOCK(sem);
00559 return v;
00560 }
00561 catch (...) {
00562 try { MUTEX_UNLOCK(sem); } catch(...) {}
00563 throw;
00564 }
00565 }
00566
00567 T operator++(int) {
00568 MUTEX_LOCK(sem);
00569 try {
00570 T v = val;
00571 val++;
00572 MUTEX_UNLOCK(sem);
00573 return v;
00574 } catch (...) {
00575 try { MUTEX_UNLOCK(sem); } catch(...) {}
00576 throw;
00577 }
00578 }
00579
00580 T operator++() {
00581 MUTEX_LOCK(sem);
00582 try {
00583 val++;
00584 T v = val;
00585 MUTEX_UNLOCK(sem);
00586 return v;
00587 } catch (...) {
00588 try { MUTEX_UNLOCK(sem); } catch(...) {}
00589 throw;
00590 }
00591 }
00592 };
00593
00602 extern std::ostream& operator<<(std::ostream& out, const qstats& qs);
00603
00604 }
00605
00606 #endif