ThreadedMessageQueue.h
Go to the documentation of this file.00001 #ifndef INCLUDED_ThreadedMessageQueue_h_
00002 #define INCLUDED_ThreadedMessageQueue_h_
00003
00004 #include "Thread.h"
00005 #include "Shared/MarkScope.h"
00006 #include <list>
00007 #include <stdexcept>
00008 #include <algorithm>
00009
00010
00011
00012 template<class T>
00013 class ThreadedMessageQueue {
00014 public:
00015
00016 ThreadedMessageQueue() : lock(), signal(), msgs(), receiver(NULL) {}
00017
00018
00019 ~ThreadedMessageQueue() { finishCallback(); }
00020
00021
00022 void send(const T& msg) { MarkScope l(lock); msgs.push_back(msg); signal.broadcast(); }
00023
00024
00025 template<class F> void for_each(const F& f) { std::for_each(msgs.begin(),msgs.end(),f); }
00026
00027
00028 template<class F> void remove(const F& f) { msgs.remove_if(f); }
00029
00030
00031 size_t size() const { return msgs.size(); }
00032
00033
00034 void clear() { MarkScope l(lock); msgs.clear(); }
00035
00036
00037 const T& front() const {
00038 MarkScope l(lock);
00039 while(msgs.size()==0)
00040 signal.wait(lock);
00041 return msgs.front();
00042 }
00043
00044
00045 void pop() { MarkScope l(lock); if(msgs.size()>0) msgs.pop_front(); }
00046
00047
00048 template<typename F, typename C>
00049 void spawnCallback(F fn, C& cl) {
00050 stopCallback();
00051 receiver = new ReceiverThread<F,C>(*this, fn, cl);
00052 receiver->start();
00053 }
00054
00055
00056 void stopCallback() {
00057 if(receiver!=NULL) {
00058 {
00059 #ifdef USE_SIGNAL_TO_CANCEL_THREAD
00060
00061
00062 MarkScope l(lock);
00063 #endif
00064 receiver->stop();
00065 receiver->keepRunning=false;
00066 }
00067 receiver->join();
00068 delete receiver;
00069 receiver=NULL;
00070 }
00071 }
00072
00073
00074
00075
00076 void finishCallback() {
00077 if(receiver==NULL)
00078 return;
00079 {
00080 MarkScope l(lock);
00081 if(msgs.size()==0)
00082 receiver->stop();
00083 receiver->keepRunning=false;
00084 }
00085 receiver->join();
00086 delete receiver;
00087 receiver=NULL;
00088 }
00089
00090
00091
00092 void finishQueue() {
00093 if(receiver==NULL)
00094 return;
00095 {
00096 MarkScope l(lock);
00097 if(msgs.size()==0)
00098 receiver->stop();
00099 receiver->block=false;
00100 }
00101 receiver->join();
00102 delete receiver;
00103 receiver=NULL;
00104 }
00105
00106 protected:
00107 mutable Thread::Lock lock;
00108 Thread::Condition signal;
00109 std::list<T> msgs;
00110
00111
00112 class ReceiverThreadBase : public Thread {
00113 public:
00114 ReceiverThreadBase() : Thread(), keepRunning(true), block(true) {}
00115 bool keepRunning;
00116 bool block;
00117 };
00118
00119
00120 template<typename F, class C>
00121 class ReceiverThread : public ReceiverThreadBase {
00122 public:
00123
00124 ReceiverThread(ThreadedMessageQueue<T>& tmq, F f, C& c) : ReceiverThreadBase(), q(tmq), fn(f), cl(c) {}
00125
00126 protected:
00127 ThreadedMessageQueue& q;
00128 F fn;
00129 C& cl;
00130
00131 virtual void* run() {
00132 while(ReceiverThreadBase::keepRunning && (ReceiverThreadBase::block || q.size()>0)) {
00133 (cl.*fn)(q.front());
00134 q.pop();
00135 }
00136 return NULL;
00137 }
00138 };
00139 ReceiverThreadBase* receiver;
00140
00141 private:
00142 ThreadedMessageQueue(const ThreadedMessageQueue& o);
00143 ThreadedMessageQueue& operator=(const ThreadedMessageQueue& o);
00144 };
00145
00146
00147
00148
00149
00150
00151 #endif