Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

ThreadedMessageQueue.h

Go to the documentation of this file.
00001 #ifndef INCLUDED_ThreadedMessageQueue_h_
00002 #define INCLUDED_ThreadedMessageQueue_h_
00003 
00004 #include "Thread.h"
00005 #include "Shared/MarkScope.h"
00006 #include <list>
00007 #include <stdexcept>
00008 #include <algorithm>
00009 
00010 //! Provides a mechanism for exchanging messages between threads
00011 /*! Receivers are responsible for message cleanup/deletion. */
00012 template<class T>
00013 class ThreadedMessageQueue {
00014 public:
00015   //! Constructor
00016   ThreadedMessageQueue() : lock(), signal(), msgs(), receiver(NULL) {}
00017   
00018   //! Destructor, calls finishCallback()
00019   ~ThreadedMessageQueue() { finishCallback(); }
00020   
00021   //! Adds a message to the queue
00022   void send(const T& msg) { MarkScope l(lock); msgs.push_back(msg); signal.broadcast(); }
00023   
00024   //! Applies the specified predicate to each of the messages (you can modify the queue value itself via pass-by-reference)
00025   template<class F> void for_each(const F& f) { std::for_each(msgs.begin(),msgs.end(),f); }
00026   
00027   //! Applies the specified predicate to each of the message, removing those for which the predicate returns true
00028   template<class F> void remove(const F& f) { MarkScope l(lock); msgs.remove_if(f); }
00029 
00030   //! Removes an element from the queue
00031   void remove(const T e) { MarkScope l(lock); msgs.remove(e); }
00032   
00033   //! Returns number of messages in #msgs
00034   size_t size() const { return msgs.size(); }
00035   
00036   //! Clears any backlog
00037   void clear() { MarkScope l(lock); msgs.clear(); }
00038   
00039   //! Returns the next message (does not remove from the queue), blocking until available
00040   const T& front() const {
00041     MarkScope l(lock); 
00042     while(msgs.size()==0)
00043       signal.wait(lock);
00044     return msgs.front();
00045   }
00046   
00047   //! Removes the front message, if any
00048   void pop() { MarkScope l(lock); if(msgs.size()>0) msgs.pop_front(); }
00049   
00050   //! Spawns a thread to trigger a class member callback with each message - only a single receiver is supported, so this replaces any previous receiver
00051   template<typename F, typename C>
00052   void spawnCallback(F fn, C& cl) {
00053     stopCallback();
00054     receiver = new ReceiverThread<F,C>(*this, fn, cl);
00055     receiver->start();
00056   }
00057   
00058   //! Sends a thread cancellation to the receiver thread to halt processing
00059   void stopCallback() {
00060     if(receiver!=NULL) {
00061       {
00062 #ifdef USE_SIGNAL_TO_CANCEL_THREAD
00063         // lock is necessary on stop to ensure cancellation does
00064         // not arrive between the testCancel and the system wait call
00065         MarkScope l(lock);
00066 #endif
00067         receiver->stop();
00068         receiver->keepRunning=false;
00069       } // must release lock for wait cancellation to go through
00070       receiver->join();
00071       delete receiver;
00072       receiver=NULL;
00073     }
00074   }
00075   
00076   //! Sets a flag to exit the receiver thread at the completion of the current callback
00077   /*! If no callback is active, cancels the receiver immediately -- doesn't wait
00078    *  for another message first */
00079   void finishCallback() {
00080     if(receiver==NULL)
00081       return;
00082     {
00083       MarkScope l(lock);
00084       if(msgs.size()==0)
00085         receiver->stop();
00086       receiver->keepRunning=false;
00087     } // must release lock for wait cancellation to go through
00088     receiver->join();
00089     delete receiver;
00090     receiver=NULL;
00091   }
00092   
00093   //! Sets a flag that when the receiver gets to the end of the queue, it will exit
00094   /*! If the receiver is already blocking at the end of the queue, stops the thread now */
00095   void finishQueue() {
00096     if(receiver==NULL)
00097       return;
00098     {
00099       MarkScope l(lock);
00100       if(msgs.size()==0)
00101         receiver->stop();
00102       receiver->block=false;
00103     } // must release lock for wait cancellation to go through
00104     receiver->join();
00105     delete receiver;
00106     receiver=NULL;
00107   }
00108   
00109 protected:
00110  public: // *** debug
00111   mutable Thread::Lock lock; //!< provides mutual exclusion on #msgs operations and #signal reception
00112   Thread::Condition signal; //!< connects notification of send() in #receiver
00113   std::list<T> msgs; //!< unprocessed messages
00114   
00115   //! Holds controller flags for the receiver thread to indicate exit conditions
00116   class ReceiverThreadBase : public Thread {
00117   public:
00118     ReceiverThreadBase() : Thread(), keepRunning(true), block(true) {} //!< constructor
00119     bool keepRunning; //!< if cleared, indicates receiver should stop processing messages and exit its thread
00120     bool block; //!< if cleared, indiciates receiver should exit its thread if/when it runs out of messages to process
00121   };
00122   
00123   //! Pulls messages out of the queue and gives them to the specified callback.
00124   template<typename F, class C>
00125   class ReceiverThread : public ReceiverThreadBase {
00126   public:
00127     //! constructor
00128     ReceiverThread(ThreadedMessageQueue<T>& tmq, F f, C& c) : ReceiverThreadBase(), q(tmq), fn(f), cl(c) {}
00129     
00130   protected:
00131     ThreadedMessageQueue& q; //!< the queue being monitored
00132     F fn; //!< function pointer
00133     C& cl; //!< class pointer
00134     
00135     virtual void* run() {
00136       while(ReceiverThreadBase::keepRunning && (ReceiverThreadBase::block || q.size()>0)) {
00137         // Note: we must remove the item from from the queue before
00138         // invoking the callback, because the callback will delete the
00139         // message.  We never want a queue to contain a pointer to a
00140         // deleted message, since another thread could find it.  This
00141         // is what was causing Tekkotsu to crash when running with
00142         // Mirage.
00143         T item = q.front();
00144         q.pop();
00145         (cl.*fn)(item);
00146       }
00147       return NULL;
00148     }
00149   };
00150   ReceiverThreadBase* receiver; //!< currently only a single receiver is supported
00151   
00152 private:
00153   ThreadedMessageQueue(const ThreadedMessageQueue& o); //!< Do not call
00154   ThreadedMessageQueue& operator=(const ThreadedMessageQueue& o); //!< Do not call
00155 };
00156 
00157 /*! @file
00158  * @brief Describes ThreadedMessageQueue, which 
00159  * @author ejt (Creator)
00160  */
00161 
00162 #endif

Tekkotsu v5.1CVS
Generated Mon May 9 04:58:52 2016 by Doxygen 1.6.3