Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

MessageQueue.h

Go to the documentation of this file.
00001 //-*-c++-*-
00002 #ifndef INCLUDED_MessageQueue_h_
00003 #define INCLUDED_MessageQueue_h_
00004 
00005 #ifdef PLATFORM_APERIOS
00006 #  warning MessageQueue is not Aperios compatable
00007 #else
00008 
00009 #include "ListMemBuf.h"
00010 #include "RCRegion.h"
00011 #include "SemaphoreManager.h"
00012 #include "MutexLock.h"
00013 #include "Shared/MarkScope.h"
00014 #include "Shared/attributes.h"
00015 #include <exception>
00016 #include <stdlib.h>
00017 
00018 #include "Shared/TimeET.h"
00019 
00020 //! Defines the interface for sending new shared memory regions between processes
00021 /*! This base class holds all of the template-independent code to allow general
00022  *  operations on MessageQueues.  The templated version of MessageQueue provides
00023  *  concrete implementation, which is what you would instantiate.
00024  *  
00025  *  Each message entails its own shared memory region, as compared to
00026  *  SharedQueue, where a single large buffer is maintained, and all messages are
00027  *  copied into the common buffer.  This class is better for large regions since
00028  *  it can avoid copying data around.
00029  * 
00030  *  @see MessageQueue, MessageQueueStatusListener, MessageReceiver */
00031 class MessageQueueBase {
00032 public:
00033 
00034   //! an interface for filtering (or otherwise monitoring) messages being sent through a MessageQueue, see MessageQueueBase::addMessageFilter()
00035   class MessageFilter {
00036   public:
00037     //! called immediately prior to sending a message -- return true to pass the message into the queue, false to drop it
00038     virtual bool filterSendRequest(RCRegion* rcr)=0;
00039     //! to make compiler warning happy
00040     virtual ~MessageFilter() {}
00041   };
00042   
00043   //!constructor
00044   MessageQueueBase()
00045     : lock(), overflowPolicy(THROW_BAD_ALLOC), isClosed(false), reportDroppings(false), numMessages(0),
00046       numReceivers(0), messagesRead(0)
00047   {
00048     for(unsigned int i=0; i<ProcessID::NumProcesses; ++i)
00049       filters[i]=NULL;
00050   }
00051   //!destructor
00052   virtual ~MessageQueueBase() {}
00053   
00054   
00055   //! The storage type for message entry indicies
00056   /*! This index is to be used with accessor functions, but may be recycled for
00057    *  a new message after all receivers have read the previous message.  If you
00058    *  wish to have a unique message identifier, see getMessageSN() */
00059   typedef unsigned short index_t;
00060   
00061   
00062   //! add one to the receiver reference count
00063   virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check =0;
00064   //! remove one from the receiver reference count
00065   virtual void removeReceiver(SemaphoreManager::semid_t rcvr)=0;
00066   //! return the receiver reference count
00067   virtual unsigned int getNumReceivers() const { return numReceivers; }
00068 
00069   //! registers a semaphore which should be raised whenever a message is marked read
00070   /*! The number of these are limited to the MAX_SENDERS template parameter of
00071     *  MessageQueue... returns false if too many are already registered
00072     *  
00073     *  You probably don't want to call this directly, use a MessageQueueStatusThread */
00074   virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check =0;
00075   //! removes a semaphore from the status listener list
00076   virtual void removeReadStatusListener(SemaphoreManager::semid_t sem)=0;
00077   
00078   
00079   //! post a message into the queue -- a shared reference is added, the caller retains control current reference
00080   /*! Thus, if you are sending a region and do not intend to use it again, either pass
00081    *  true for autoDereference or call RCRegion::removeReference() after sending
00082    *  to free the sender's memory.
00083    *  
00084    *  If no one dereferences the region, you can continue to access the region,
00085    *  even as the receiver accesses it as well.  Thus if both sides retain references,
00086    *  you can use the region as a shared memory area for future communication.
00087    *  (beware of race conditions!)
00088    *
00089    *  If @a rcr is NULL, an empty message will be sent (there's still some overhead
00090    *  to this -- may want to consider a semaphore instead of a MessageQueue if all
00091    *  you're going to do is 'ping' another process with empty messages) */
00092   virtual void sendMessage(RCRegion * rcr, bool autoDereference=false)=0;
00093   //! request access to a particular message, increments read counter -- do not call more than once per receiver!
00094   /*! The message is marked read and will be popped from the queue if all
00095    *  receivers have read the message as well.  The caller inherits a reference
00096    *  to the returned region -- call removeReference when you are done with
00097    *  it */
00098   virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00099   //! request access to a particular message, does not mark message -- call as often as you like
00100   /*! The caller inherits a reference to the returned region -- call
00101    *  removeReference when you are done with it */
00102   virtual RCRegion * peekMessage(index_t msg)=0;
00103   //! increments read counter -- do not call more than once per receiver per message!
00104   virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00105   //! do not allow any new messages to be posted
00106   virtual void close() { AutoLock autolock(lock); isClosed=true; }
00107 
00108   //! sets #reportDroppings
00109   virtual void setReportDroppings(bool report) { reportDroppings=report; }
00110   //! gets #reportDroppings
00111   virtual bool getReportDroppings() const { return reportDroppings; }
00112   
00113   
00114   //! Each message gets a unique, monotonically increasing serial number; this function returns that number (MessageQueue::serialNumber)
00115   virtual unsigned int getMessageSN(index_t msg)=0;
00116   
00117   //! Checks to see how many messages have been processed (read by all receivers and removed from queue)
00118   virtual unsigned int getMessagesRead() { return messagesRead; }
00119   
00120   //! Returns the number of messages which have been sent
00121   virtual unsigned int getMessagesSent() { return numMessages; }
00122   
00123   //! Returns the number of messages which have been sent but not yet read
00124   virtual unsigned int getMessagesUnread() { return getMessagesSent() - getMessagesRead(); }
00125   
00126   //! a typedef to make it easier to obtain a lock on the queue for the extent of a scope
00127   typedef MarkScope AutoLock;
00128   //! returns a reference to the queue's inter-process lock
00129   MutexLock<ProcessID::NumProcesses>& getLock() const { return lock; }
00130 
00131   
00132   virtual index_t oldest() const=0;          //!< return oldest message still in the queue (may or may not have been read by this process)
00133   virtual index_t newer(index_t it) const=0; //!< return the next message in the queue (may or may not have been read by this process)
00134   virtual index_t older(index_t it) const=0; //!< return the previous message in the queue (may or may not have been read by this process)
00135   virtual index_t newest() const=0;          //!< return most recent message added to the queue (may or may not have been read by this process)
00136   virtual bool isEnd(index_t it) const=0;    //!< returns true if @a it is the one-past-the-end of the queue
00137   
00138   //! an enumerations of policies for dealing with overflow, pass to setOverflowPolicy()
00139   enum OverflowPolicy_t {
00140     DROP_OLDEST,     //!< the oldest unread message is dropped
00141     DROP_NEWEST,     //!< the most recently added message is dropped (i.e. the overflowing message is ignored)
00142     WAIT,            //!< the adding process/thread polls until space is available
00143     THROW_BAD_ALLOC  //!< throw a std::bad_alloc exception (falls through to abort() if you don't catch it)
00144   };
00145   //! allows you to pick how to handle running out of space in the queue, see OverflowPolicy_t
00146   void setOverflowPolicy(OverflowPolicy_t op) { overflowPolicy=op; }
00147   //! returns the current overflow policy, see OverflowPolicy_t
00148   OverflowPolicy_t getOverflowPolicy() const { return overflowPolicy; }
00149   
00150   //! sets #semgr
00151   static void setSemaphoreManager(SemaphoreManager* mgr) { semgr=mgr; }
00152   //! gets #semgr
00153   static SemaphoreManager* getSemaphoreManager() { return semgr; }
00154   
00155   //! once called, any messages put into the queue must pass through @a filter first (note: there can only be one filter per process!)
00156   /*! if a filter was previously registered, it is replaced with the new @a filter */
00157   void addMessageFilter(MessageFilter& filter) {
00158     filters[ProcessID::getID()]=&filter;
00159   }
00160   //! removes the current filter in place, if any
00161   void removeMessageFilter() {
00162     filters[ProcessID::getID()]=NULL;
00163   }
00164 protected:
00165   //! the global semaphore manager, needs to be set (once, globally) via setSemaphoreManager() before any receivers are added
00166   static SemaphoreManager* semgr;
00167   
00168   mutable MutexLock<ProcessID::NumProcesses> lock; //!< a lock to grant serial access to the queue
00169   volatile OverflowPolicy_t overflowPolicy; //!< the choice of how to handle message overflow -- see OverflowPolicy_t
00170   bool isClosed; //!< if true, new messages will be rejected
00171   bool reportDroppings; //!< if true, output will be sent on cerr when overflow occurs
00172   unsigned int numMessages; //!< number of messages which have been sent (serial number of next message)
00173   unsigned int numReceivers; //!< how many receivers to expect
00174   unsigned int messagesRead; //!< number of messages which have been read and removed from queue
00175   MessageFilter* filters[ProcessID::NumProcesses]; //!< provides storage of one message filter per process
00176 private:
00177   MessageQueueBase(const MessageQueueBase&); //!< this shouldn't be called...
00178   MessageQueueBase& operator=(const MessageQueueBase&); //!< this shouldn't be called...
00179 };
00180 
00181 //! An implementation of MessageQueueBase, which provides mechanisms for sending shared memory regions between processes
00182 /*! MAX_UNREAD is assigned to #CAPACITY, MAX_RECEIVERS is assigned to #RECEIVER_CAPACITY, and MAX_SENDERS is assigned to #SENDER_CAPACITY
00183  *  @see MessageQueueBase, MessageQueueStatusListener, MessageReceiver */
00184 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS=10, unsigned int MAX_SENDERS=10>
00185 class MessageQueue : public MessageQueueBase {
00186 public:
00187   //! total number of messages which can be backed up in the queue
00188   static const unsigned int CAPACITY=MAX_UNREAD;
00189   //! total number of receivers which can be registered
00190   static const unsigned int RECEIVER_CAPACITY=MAX_RECEIVERS;
00191   //! total number of senders which can be registered
00192   /*! More specifically, this is the maximum number of StatusListeners -- anyone
00193    *  can call sendMessage(), but only this number can get direct notification when
00194    *  messages are received. */
00195   static const unsigned int SENDER_CAPACITY=MAX_SENDERS;
00196   
00197   //! constructor
00198   MessageQueue() : MessageQueueBase(), mq(), rcvrs(), sndrs() {}
00199   
00200   //! destructor
00201   virtual ~MessageQueue();
00202   
00203   virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check;
00204   virtual void removeReadStatusListener(SemaphoreManager::semid_t sem);
00205 
00206   virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check;
00207   virtual void removeReceiver(SemaphoreManager::semid_t rcvr);
00208   
00209   virtual void sendMessage(RCRegion * rcr, bool autoDereference=false);
00210   virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr);
00211   virtual RCRegion * peekMessage(index_t msg);
00212   virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr);
00213 
00214   virtual unsigned int getMessageSN(index_t msg) { /*AutoLock autolock(lock);*/ return mq[msg].sn; }
00215   
00216   virtual index_t oldest() const { AutoLock autolock(lock); return mq.begin(); }
00217   virtual index_t newer(index_t it) const { AutoLock autolock(lock); return mq.next(it); }
00218   virtual index_t older(index_t it) const { AutoLock autolock(lock); return mq.prev(it); }
00219   virtual index_t newest() const { AutoLock autolock(lock); return mq.prev(mq.end()); }
00220   virtual bool isEnd(index_t it) const { AutoLock autolock(lock); return it==mq.end() || it>=mq_t::MAX_ENTRIES; }
00221   
00222 protected:
00223   //! data storage needed for each message
00224   struct entry {
00225     entry() : id(), sn(), numRead(0) { memset(readFlags,0,sizeof(readFlags)); } //!< constructor
00226     entry(unsigned int serialNumber, RCRegion* r)
00227     : id(r->ID()), sn(serialNumber), numRead(0) { memset(readFlags,0,sizeof(readFlags)); } //!< constructor, pass message info
00228     RCRegion::Identifier id; //!< the identifier for the shared memory region so that other regions can attach it
00229     unsigned int sn; //!< serial number for this message (not the same as its index in the queue -- indicies are reused, this id is unique to this message
00230     bool readFlags[MAX_RECEIVERS]; //!< a flag for each receiver to indicate if they have read it
00231     unsigned int numRead; //!< a count of the number of receivers which have read this message (should always equal sum(readFlags))
00232   };
00233   
00234   //! shorthand for the type of data storage of message entries
00235   typedef ListMemBuf<entry,MAX_UNREAD,index_t> mq_t;
00236   //! the data storage of message entries
00237   mq_t mq;
00238 
00239   //! shorthand for the type of data storage of message entries
00240   typedef ListMemBuf<SemaphoreManager::semid_t,MAX_RECEIVERS,index_t> rcvrs_t;
00241   //! the data storage of receiver semaphores
00242   rcvrs_t rcvrs;
00243 
00244   //! returns the index within #rcvrs of the receiver id @a rcvr
00245   typename rcvrs_t::index_t lookupReceiver(SemaphoreManager::semid_t rcvr) const;
00246   
00247   //! shorthand for the type of data storage of message entries
00248   typedef ListMemBuf<SemaphoreManager::semid_t,MAX_SENDERS,index_t> sndrs_t;
00249   //! the data storage of receiver semaphores
00250   sndrs_t sndrs;
00251 };
00252 
00253 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00254 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::~MessageQueue() {
00255     //lock shouldn't be necessary -- refcount should ensure the containing
00256     //region isn't deleted until only one process has access anyway
00257     //AutoLock autolock(lock);
00258     while(!mq.empty()) {
00259       RCRegion * rcr = RCRegion::attach(mq.front().id);
00260       rcr->RemoveSharedReference();
00261       rcr->RemoveReference();
00262       mq.pop_front();
00263     }
00264 }
00265 
00266 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00267 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReadStatusListener() {
00268     AutoLock autolock(lock);
00269     SemaphoreManager::semid_t sem=semgr->getSemaphore();
00270     if(sem==semgr->invalid()) {
00271       std::cerr << "ERROR: unable to add read status listener to message queue because semaphore manager is out of semaphores" << std::endl;
00272       return semgr->invalid();
00273     }
00274     if(sndrs.push_back(sem)==sndrs.end()) {
00275       std::cerr << "ERROR: unable to add read status listener to message queue because message queue can't register any more senders (MAX_SENDERS)" << std::endl;
00276       semgr->releaseSemaphore(sem);
00277       return semgr->invalid();
00278     }
00279     return sem;
00280 }
00281 
00282 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00283 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReadStatusListener(SemaphoreManager::semid_t sem) {
00284     AutoLock autolock(lock);
00285     for(index_t it=sndrs.begin(); it!=sndrs.end(); it=sndrs.next(it))
00286       if(sndrs[it]==sem) {
00287         sndrs.erase(it);
00288         semgr->releaseSemaphore(sem);
00289         break;
00290       }
00291 }
00292 
00293 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00294 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReceiver() {
00295     AutoLock autolock(lock);
00296     SemaphoreManager::semid_t sem=semgr->getSemaphore();
00297     if(sem==semgr->invalid()) {
00298       std::cerr << "ERROR: unable to add receiver to message queue because semaphore manager is out of semaphores" << std::endl;
00299       return semgr->invalid();
00300     }
00301     if(rcvrs.push_back(sem)==rcvrs.end()) {
00302       std::cerr << "ERROR: unable to add receiver to message queue because message queue can't register any more receivers (MAX_RECEIVERS)" << std::endl;
00303       semgr->releaseSemaphore(sem);
00304       return semgr->invalid();
00305     }
00306     numReceivers++;
00307     return sem;
00308 }
00309 
00310 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00311 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReceiver(SemaphoreManager::semid_t rcvr) {
00312     AutoLock autolock(lock);
00313     index_t rcvr_id=rcvrs.begin();
00314     for(; rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00315       if(rcvrs[rcvr_id]==rcvr)
00316         break;
00317     if(rcvr_id==rcvrs.end()) {
00318       std::cerr << "WARNING: tried to remove message queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00319       return;
00320     }
00321     rcvrs.erase(rcvr_id);
00322     semgr->releaseSemaphore(rcvr);
00323     numReceivers--;
00324     for(index_t it=mq.begin(); it!=mq.end(); it=mq.next(it)) {
00325       if(mq[it].readFlags[rcvr_id]) {
00326         // the removed receiver had read this message, decrement the read count
00327         mq[it].readFlags[rcvr_id]=false;
00328         mq[it].numRead--;
00329       } else if(mq[it].numRead==numReceivers) {
00330         //all *remaining* processes have gotten a look, remove the neutral MessageQueue reference
00331         RCRegion * rcr = RCRegion::attach(mq[it].id);
00332         rcr->RemoveSharedReference();
00333         rcr->RemoveReference();
00334         it=mq.prev(it);
00335         mq.erase(mq.next(it));
00336         messagesRead++;
00337         for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00338           semgr->raise(sndrs[sit],1);
00339       }
00340     }
00341 }
00342 
00343 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00344 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::sendMessage(RCRegion * rcr, bool autoDereference/*=false*/) {
00345     AutoLock autolock(lock);
00346     if(rcr==NULL) {
00347       rcr=new RCRegion(0);
00348       autoDereference=true;
00349     }
00350     if(filters[ProcessID::getID()]!=NULL && !filters[ProcessID::getID()]->filterSendRequest(rcr)) {
00351       if(autoDereference)
00352         rcr->RemoveReference();
00353       return;
00354     }
00355     if(numReceivers==0) {
00356       //if(reportDroppings)
00357       //std::cerr << "Warning: MessageQueue dropping " << rcr->ID().key << " because there are no receivers" << std::endl;
00358       messagesRead++; // counts as a read message (read by all 0 readers is still read by all readers!)
00359       for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00360         semgr->raise(sndrs[sit],1);
00361       if(autoDereference)
00362         rcr->RemoveReference();
00363       return;
00364     }
00365     if(isClosed) {
00366       if(reportDroppings)
00367         std::cerr << "Warning: MessageQueue dropping " << rcr->ID().key << " because queue is closed" << std::endl;
00368       if(autoDereference)
00369         rcr->RemoveReference();
00370       return;
00371     }
00372     if(mq.size()==mq.getMaxCapacity()) {
00373       switch(overflowPolicy) {
00374         case DROP_OLDEST: {
00375           if(reportDroppings)
00376             std::cerr << "WARNING: MessageQueue full, dropping oldest unread message (" << mq.front().id.key << ")" << std::endl;
00377           RCRegion * eldest = RCRegion::attach(mq.front().id);
00378           eldest->RemoveSharedReference();
00379           mq.pop_front();
00380           eldest->RemoveReference();
00381         } break;
00382         case DROP_NEWEST:
00383           if(reportDroppings)
00384             std::cerr << "WARNING: MessageQueue full, dropping newest unread message (" << rcr->ID().key << ")" << std::endl;
00385           if(autoDereference)
00386             rcr->RemoveReference();
00387           return;
00388         case WAIT:
00389           if(reportDroppings)
00390             std::cerr << "WARNING: MessageQueue full, waiting for readers to catch up" << std::endl;
00391           while(mq.size()==mq.getMaxCapacity()) {
00392             //have to release locks so readers can get access
00393             unsigned int ll=lock.get_lock_level();
00394             lock.releaseAll();
00395             usleep(MutexLockBase::usleep_granularity*15);
00396             for(unsigned int i=0; i<ll; i++)
00397               lock.lock(ProcessID::getID());
00398             if(overflowPolicy!=WAIT) { //may have been changed by a different thread while we were waiting
00399               sendMessage(rcr,autoDereference); //retry with the new policy
00400               return;
00401             }
00402           }
00403           break;
00404         case THROW_BAD_ALLOC:
00405           if(reportDroppings)
00406             std::cerr << "WARNING: MessageQueue full, throwing bad_alloc exception" << std::endl;
00407           throw std::bad_alloc();
00408           break;
00409       }
00410     }
00411     rcr->AddSharedReference();
00412     if(mq.push_back(entry(numMessages++,rcr))==mq.end()) {
00413       //our overflow policy should've prevented this
00414       std::cerr << "ERROR: MessageQueue unable to add message; buggy overflow policy?" << std::endl;
00415       exit(EXIT_FAILURE);
00416     }
00417     
00418     //std::cout << Process::getName() << " sent " << (numMessages-1) << " at " << TimeET() << std::endl;
00419     //notify receivers
00420     for(index_t it=rcvrs.begin(); it!=rcvrs.end(); it=rcvrs.next(it))
00421       semgr->raise(rcvrs[it],1);
00422     
00423     if(autoDereference)
00424       rcr->RemoveReference();
00425 }
00426 
00427 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00428 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::readMessage(index_t msg, SemaphoreManager::semid_t rcvr) {
00429     AutoLock autolock(lock);
00430     RCRegion * rcr = RCRegion::attach(mq[msg].id);
00431     index_t rcvr_id=lookupReceiver(rcvr);
00432     if(rcvr_id==rcvrs.end())
00433       return rcr;
00434     if(mq[msg].readFlags[rcvr_id]) {
00435       std::cerr << "WARNING: MessageQueue::readMessage(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00436       return rcr; // already read, just return it
00437     }
00438     mq[msg].readFlags[rcvr_id]=true;
00439     mq[msg].numRead++;
00440     if(mq[msg].numRead==numReceivers) {
00441       //all processes have gotten a look, remove the neutral MessageQueue reference
00442       rcr->RemoveSharedReference();
00443       mq.erase(msg);
00444       messagesRead++;
00445       for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00446         semgr->raise(sndrs[sit],1);
00447     }
00448     return rcr;
00449 }
00450 
00451 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00452 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::peekMessage(index_t msg) {
00453     //AutoLock autolock(lock); //I don't think a lock is necessary here
00454     return RCRegion::attach(mq[msg].id);
00455 }
00456 
00457 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00458 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::markRead(index_t msg, SemaphoreManager::semid_t rcvr) {
00459     AutoLock autolock(lock);
00460     index_t rcvr_id=lookupReceiver(rcvr);
00461     if(rcvr_id==rcvrs.end())
00462       return;
00463     if(mq[msg].readFlags[rcvr_id]) {
00464       std::cerr << "WARNING: MessageQueue::markRead(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00465       return; // already read, just return it
00466     }
00467     mq[msg].readFlags[rcvr_id]=true;
00468     mq[msg].numRead++;
00469     if(mq[msg].numRead==numReceivers) {
00470       //all processes have gotten a look, remove the neutral MessageQueue reference
00471       RCRegion * rcr = RCRegion::attach(mq[msg].id);
00472       rcr->RemoveSharedReference();
00473       rcr->RemoveReference();
00474       mq.erase(msg);
00475       messagesRead++;
00476       for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00477         semgr->raise(sndrs[sit],1);
00478     }
00479 }
00480 
00481 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00482 typename MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::rcvrs_t::index_t
00483 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::lookupReceiver(SemaphoreManager::semid_t rcvr) const {
00484   for(index_t rcvr_id=rcvrs.begin(); rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00485     if(rcvrs[rcvr_id]==rcvr)
00486       return rcvr_id;
00487   std::cerr << "WARNING: tried to look up queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00488   return rcvrs.end();
00489 }
00490 
00491 /*! @file
00492  * @brief Defines MessageQueue, which provides mechanisms for sending shared memory regions between processes
00493  * @author ejt (Creator)
00494  */
00495 
00496 #endif //APERIOS check
00497 
00498 #endif //INCLUDED

Tekkotsu v5.1CVS
Generated Fri Mar 16 05:26:44 2012 by Doxygen 1.6.3