Tekkotsu Homepage
Dev. Resources


Go to the documentation of this file.
00001 //-*-c++-*-
00002 #ifndef INCLUDED_MessageReceiver_h_
00003 #define INCLUDED_MessageReceiver_h_
00006 #  warning MessageReceiver is not Aperios compatable
00007 #else
00009 #include "MessageQueue.h"
00010 #include "Thread.h"
00012 //! Spawns a thread for monitoring a MessageQueue, calls a specified function when new messages are available
00013 /*! Uses a semaphore which is raised by the MessageQueue itself when a new message is posted.
00014  *  This should have almost no overhead, and fairly low latency (at least, much lower latency
00015  *  than you would get by running multiple busy loops polling for new messages)
00016  *
00017  *  Keep in mind that the monitor runs in a separate thread, so you will need to consider mutex issues
00018  *  when the callback is executing. */
00019 class MessageReceiver : public Thread {
00020 public:
00021   //! constructor, indicate the message queue, and optional callback function and whether to start the monitor right away
00022   /*! @param mq is the message queue that the receiver will register with
00023    *  @param callback is the function to call when messages are received
00024    *  @param startThread controls whether the thread will be launched by the constructor
00025    *  @param subscribe only applies if @a startThread is false, indicates whether the receiver should register as a listener even though the thread isn't checking (yet)
00026    *  This last parameter allows you to avoid missing messages that come in before you're ready to process them */
00027   explicit MessageReceiver(MessageQueueBase& mq, bool (*callback) (RCRegion*)=NULL, bool startThread=true, bool subscribe=true);
00028   //! destructor, stops and joins thread
00029   virtual ~MessageReceiver();
00031   //! returns the next unread message without marking it read, or NULL if there are currently no more messages.  MessageReceiver retains reference.
00032   virtual RCRegion * peekNextMessage();
00033   //! returns the next unread message, marking it as read.  Caller inherits reference, and should call removeReference when done.
00034   virtual RCRegion * getNextMessage();
00035   //! marks the current message as read, and allows MessageQueue to process next unread message
00036   void markRead() { markRead(true); }
00038   //! thread control -- stop monitoring (can call start() later to resume)
00039   virtual Thread& stop();
00040   //! thread control -- stop(), join(), and process any final messages in the queue; unsubscribes as a listener of the MessageQueue
00041   virtual void finish();
00043   //! allows you to change the callback function -- should be set before the thread is started (otherwise, why bother starting it?)
00044   virtual void setCallback(bool (*callback) (RCRegion*)) { process=callback; }
00046 protected:
00047   typedef MessageQueueBase::index_t index_t; //!< shorthand for the message id type
00049   virtual void findCurrentMessage(); //!< sets #curit to the oldest message which hasn't been marked read
00050   virtual bool launched(); //!< register as a listener with the queue, if we haven't already (retains listener status between stop/start)
00051   virtual unsigned int runloop(); //!< wait for a new message, and then process it
00052   virtual bool waitNextMessage(); //!< wait for #semid to be raised to indicate a new message is in the queue (or at least, that it needs to be checked); returns false if interrupted
00053   virtual bool processNextMessage(); //!< gets the next message and processes it
00054   virtual void markRead(bool checkNext); //!< if @a checksNext is set, raises #semid so that if additional messages came in while we were processing the current one, they will be picked up
00056   MessageQueueBase& queue; //!< the MessageQueue being monitored
00057   SemaphoreManager::semid_t semid; //!< the semaphore raised when the queue should be checked for new messages
00058   unsigned int nextMessage; //!< the expected serial number of the next message to be sent
00059   unsigned int lastProcessedMessage; //!< the serial number of the last received message
00060   bool (*process) (RCRegion*); //!< the client callback function
00061   index_t curit; //!< the message id of the last received message (currently being processed)
00063 private:
00064   MessageReceiver(const MessageReceiver& r); //!< don't call
00065   MessageReceiver& operator=(const MessageReceiver& r); //!< don't call
00066 };
00068 /*! @file
00069  * @brief 
00070  * @author ejt (Creator)
00071  */
00073 #endif //Aperios check
00075 #endif //INCLUDED

Tekkotsu v5.1CVS
Generated Mon May 9 04:58:45 2016 by Doxygen 1.6.3