Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

ThreadedMessageQueue.h

Go to the documentation of this file.
00001 #ifndef INCLUDED_ThreadedMessageQueue_h_
00002 #define INCLUDED_ThreadedMessageQueue_h_
00003 
00004 #include "Thread.h"
00005 #include "Shared/MarkScope.h"
00006 #include <list>
00007 #include <stdexcept>
00008 #include <algorithm>
00009 
00010 //! Provides a mechanism for exchanging messages between threads
00011 /*! Receivers are responsible for message cleanup/deletion. */
00012 template<class T>
00013 class ThreadedMessageQueue {
00014 public:
00015   //! Constructor
00016   ThreadedMessageQueue() : lock(), signal(), msgs(), receiver(NULL) {}
00017   
00018   //! Destructor, calls finishCallback()
00019   ~ThreadedMessageQueue() { finishCallback(); }
00020   
00021   //! Adds a message to the queue
00022   void send(const T& msg) { MarkScope l(lock); msgs.push_back(msg); signal.broadcast(); }
00023   
00024   //! Applies the specified predicate to each of the messages (you can modify the queue value itself via pass-by-reference)
00025   template<class F> void for_each(const F& f) { std::for_each(msgs.begin(),msgs.end(),f); }
00026   
00027   //! Applies the specified predicate to each of the message, removing those for which the predicate returns true
00028   template<class F> void remove(const F& f) { msgs.remove_if(f); }
00029   
00030   //! Returns number of messages in #msgs
00031   size_t size() const { return msgs.size(); }
00032   
00033   //! Clears any backlog
00034   void clear() { MarkScope l(lock); msgs.clear(); }
00035   
00036   //! Returns the next message (does not remove from the queue), blocking until available
00037   const T& front() const {
00038     MarkScope l(lock); 
00039     while(msgs.size()==0)
00040       signal.wait(lock);
00041     return msgs.front();
00042   }
00043   
00044   //! Removes the front message, if any
00045   void pop() { MarkScope l(lock); if(msgs.size()>0) msgs.pop_front(); }
00046   
00047   //! Spawns a thread to trigger a class member callback with each message - only a single receiver is supported, so this replaces any previous receiver
00048   template<typename F, typename C>
00049   void spawnCallback(F fn, C& cl) {
00050     stopCallback();
00051     receiver = new ReceiverThread<F,C>(*this, fn, cl);
00052     receiver->start();
00053   }
00054   
00055   //! Sends a thread cancellation to the receiver thread to halt processing
00056   void stopCallback() {
00057     if(receiver!=NULL) {
00058       {
00059 #ifdef USE_SIGNAL_TO_CANCEL_THREAD
00060         // lock is necessary on stop to ensure cancellation does
00061         // not arrive between the testCancel and the system wait call
00062         MarkScope l(lock);
00063 #endif
00064         receiver->stop();
00065         receiver->keepRunning=false;
00066       } // must release lock for wait cancellation to go through
00067       receiver->join();
00068       delete receiver;
00069       receiver=NULL;
00070     }
00071   }
00072   
00073   //! Sets a flag to exit the receiver thread at the completion of the current callback
00074   /*! If no callback is active, cancels the receiver immediately -- doesn't wait
00075    *  for another message first */
00076   void finishCallback() {
00077     if(receiver==NULL)
00078       return;
00079     {
00080       MarkScope l(lock);
00081       if(msgs.size()==0)
00082         receiver->stop();
00083       receiver->keepRunning=false;
00084     } // must release lock for wait cancellation to go through
00085     receiver->join();
00086     delete receiver;
00087     receiver=NULL;
00088   }
00089   
00090   //! Sets a flag that when the receiver gets to the end of the queue, it will exit
00091   /*! If the receiver is already blocking at the end of the queue, stops the thread now */
00092   void finishQueue() {
00093     if(receiver==NULL)
00094       return;
00095     {
00096       MarkScope l(lock);
00097       if(msgs.size()==0)
00098         receiver->stop();
00099       receiver->block=false;
00100     } // must release lock for wait cancellation to go through
00101     receiver->join();
00102     delete receiver;
00103     receiver=NULL;
00104   }
00105   
00106 protected:
00107   mutable Thread::Lock lock; //!< provides mutual exclusion on #msgs operations and #signal reception
00108   Thread::Condition signal; //!< connects notification of send() in #receiver
00109   std::list<T> msgs; //!< unprocessed messages
00110   
00111   //! Holds controller flags for the receiver thread to indicate exit conditions
00112   class ReceiverThreadBase : public Thread {
00113   public:
00114     ReceiverThreadBase() : Thread(), keepRunning(true), block(true) {} //!< constructor
00115     bool keepRunning; //!< if cleared, indicates receiver should stop processing messages and exit its thread
00116     bool block; //!< if cleared, indiciates receiver should exit its thread if/when it runs out of messages to process
00117   };
00118   
00119   //! Pulls messages out of the queue and gives them to the specified callback.
00120   template<typename F, class C>
00121   class ReceiverThread : public ReceiverThreadBase {
00122   public:
00123     //! constructor
00124     ReceiverThread(ThreadedMessageQueue<T>& tmq, F f, C& c) : ReceiverThreadBase(), q(tmq), fn(f), cl(c) {}
00125     
00126   protected:
00127     ThreadedMessageQueue& q; //!< the queue being monitored
00128     F fn; //!< function pointer
00129     C& cl; //!< class pointer
00130     
00131     virtual void* run() {
00132       while(ReceiverThreadBase::keepRunning && (ReceiverThreadBase::block || q.size()>0)) {
00133         (cl.*fn)(q.front());
00134         q.pop();
00135       }
00136       return NULL;
00137     }
00138   };
00139   ReceiverThreadBase* receiver; //!< currently only a single receiver is supported
00140   
00141 private:
00142   ThreadedMessageQueue(const ThreadedMessageQueue& o); //!< Do not call
00143   ThreadedMessageQueue& operator=(const ThreadedMessageQueue& o); //!< Do not call
00144 };
00145 
00146 /*! @file
00147  * @brief Describes ThreadedMessageQueue, which 
00148  * @author ejt (Creator)
00149  */
00150 
00151 #endif

Tekkotsu v5.1CVS
Generated Sat May 4 06:33:03 2013 by Doxygen 1.6.3