00001
00002 #ifndef INCLUDED_MessageQueue_h_
00003 #define INCLUDED_MessageQueue_h_
00004
00005 #ifdef PLATFORM_APERIOS
00006 # warning MessageQueue is not Aperios compatable
00007 #else
00008
00009 #include "ListMemBuf.h"
00010 #include "RCRegion.h"
00011 #include "SemaphoreManager.h"
00012 #include "MutexLock.h"
00013 #include "Shared/MarkScope.h"
00014 #include "Shared/attributes.h"
00015 #include <exception>
00016 #include <stdlib.h>
00017
00018 #include "Shared/TimeET.h"
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 class MessageQueueBase {
00032 public:
00033
00034
00035 class MessageFilter {
00036 public:
00037
00038 virtual bool filterSendRequest(RCRegion* rcr)=0;
00039
00040 virtual ~MessageFilter() {}
00041 };
00042
00043
00044 MessageQueueBase()
00045 : lock(), overflowPolicy(THROW_BAD_ALLOC), isClosed(false), reportDroppings(false), numMessages(0),
00046 numReceivers(0), messagesRead(0)
00047 {
00048 for(unsigned int i=0; i<ProcessID::NumProcesses; ++i)
00049 filters[i]=NULL;
00050 }
00051
00052 virtual ~MessageQueueBase() {}
00053
00054
00055
00056
00057
00058
00059 typedef unsigned short index_t;
00060
00061
00062
00063 virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check =0;
00064
00065 virtual void removeReceiver(SemaphoreManager::semid_t rcvr)=0;
00066
00067 virtual unsigned int getNumReceivers() const { return numReceivers; }
00068
00069
00070
00071
00072
00073
00074 virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check =0;
00075
00076 virtual void removeReadStatusListener(SemaphoreManager::semid_t sem)=0;
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092 virtual void sendMessage(RCRegion * rcr, bool autoDereference=false)=0;
00093
00094
00095
00096
00097
00098 virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00099
00100
00101
00102 virtual RCRegion * peekMessage(index_t msg)=0;
00103
00104 virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr)=0;
00105
00106 virtual void close() { AutoLock autolock(lock); isClosed=true; }
00107
00108
00109 virtual void setReportDroppings(bool report) { reportDroppings=report; }
00110
00111 virtual bool getReportDroppings() const { return reportDroppings; }
00112
00113
00114
00115 virtual unsigned int getMessageSN(index_t msg)=0;
00116
00117
00118 virtual unsigned int getMessagesRead() { return messagesRead; }
00119
00120
00121 virtual unsigned int getMessagesSent() { return numMessages; }
00122
00123
00124 virtual unsigned int getMessagesUnread() { return getMessagesSent() - getMessagesRead(); }
00125
00126
00127 typedef MarkScope AutoLock;
00128
00129 MutexLock<ProcessID::NumProcesses>& getLock() const { return lock; }
00130
00131
00132 virtual index_t oldest() const=0;
00133 virtual index_t newer(index_t it) const=0;
00134 virtual index_t older(index_t it) const=0;
00135 virtual index_t newest() const=0;
00136 virtual bool isEnd(index_t it) const=0;
00137
00138
00139 enum OverflowPolicy_t {
00140 DROP_OLDEST,
00141 DROP_NEWEST,
00142 WAIT,
00143 THROW_BAD_ALLOC
00144 };
00145
00146 void setOverflowPolicy(OverflowPolicy_t op) { overflowPolicy=op; }
00147
00148 OverflowPolicy_t getOverflowPolicy() const { return overflowPolicy; }
00149
00150
00151 static void setSemaphoreManager(SemaphoreManager* mgr) { semgr=mgr; }
00152
00153 static SemaphoreManager* getSemaphoreManager() { return semgr; }
00154
00155
00156
00157 void addMessageFilter(MessageFilter& filter) {
00158 filters[ProcessID::getID()]=&filter;
00159 }
00160
00161 void removeMessageFilter() {
00162 filters[ProcessID::getID()]=NULL;
00163 }
00164 protected:
00165
00166 static SemaphoreManager* semgr;
00167
00168 mutable MutexLock<ProcessID::NumProcesses> lock;
00169 volatile OverflowPolicy_t overflowPolicy;
00170 bool isClosed;
00171 bool reportDroppings;
00172 unsigned int numMessages;
00173 unsigned int numReceivers;
00174 unsigned int messagesRead;
00175 MessageFilter* filters[ProcessID::NumProcesses];
00176 private:
00177 MessageQueueBase(const MessageQueueBase&);
00178 MessageQueueBase& operator=(const MessageQueueBase&);
00179 };
00180
00181
00182
00183
00184 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS=10, unsigned int MAX_SENDERS=10>
00185 class MessageQueue : public MessageQueueBase {
00186 public:
00187
00188 static const unsigned int CAPACITY=MAX_UNREAD;
00189
00190 static const unsigned int RECEIVER_CAPACITY=MAX_RECEIVERS;
00191
00192
00193
00194
00195 static const unsigned int SENDER_CAPACITY=MAX_SENDERS;
00196
00197
00198 MessageQueue() : MessageQueueBase(), mq(), rcvrs(), sndrs() {}
00199
00200
00201 virtual ~MessageQueue();
00202
00203 virtual SemaphoreManager::semid_t addReadStatusListener() ATTR_must_check;
00204 virtual void removeReadStatusListener(SemaphoreManager::semid_t sem);
00205
00206 virtual SemaphoreManager::semid_t addReceiver() ATTR_must_check;
00207 virtual void removeReceiver(SemaphoreManager::semid_t rcvr);
00208
00209 virtual void sendMessage(RCRegion * rcr, bool autoDereference=false);
00210 virtual RCRegion * readMessage(index_t msg, SemaphoreManager::semid_t rcvr);
00211 virtual RCRegion * peekMessage(index_t msg);
00212 virtual void markRead(index_t msg, SemaphoreManager::semid_t rcvr);
00213
00214 virtual unsigned int getMessageSN(index_t msg) { return mq[msg].sn; }
00215
00216 virtual index_t oldest() const { AutoLock autolock(lock); return mq.begin(); }
00217 virtual index_t newer(index_t it) const { AutoLock autolock(lock); return mq.next(it); }
00218 virtual index_t older(index_t it) const { AutoLock autolock(lock); return mq.prev(it); }
00219 virtual index_t newest() const { AutoLock autolock(lock); return mq.prev(mq.end()); }
00220 virtual bool isEnd(index_t it) const { AutoLock autolock(lock); return it==mq.end() || it>=mq_t::MAX_ENTRIES; }
00221
00222 protected:
00223
00224 struct entry {
00225 entry() : id(), sn(), numRead(0) { memset(readFlags,0,sizeof(readFlags)); }
00226 entry(unsigned int serialNumber, RCRegion* r)
00227 : id(r->ID()), sn(serialNumber), numRead(0) { memset(readFlags,0,sizeof(readFlags)); }
00228 RCRegion::Identifier id;
00229 unsigned int sn;
00230 bool readFlags[MAX_RECEIVERS];
00231 unsigned int numRead;
00232 };
00233
00234
00235 typedef ListMemBuf<entry,MAX_UNREAD,index_t> mq_t;
00236
00237 mq_t mq;
00238
00239
00240 typedef ListMemBuf<SemaphoreManager::semid_t,MAX_RECEIVERS,index_t> rcvrs_t;
00241
00242 rcvrs_t rcvrs;
00243
00244
00245 typename rcvrs_t::index_t lookupReceiver(SemaphoreManager::semid_t rcvr) const;
00246
00247
00248 typedef ListMemBuf<SemaphoreManager::semid_t,MAX_SENDERS,index_t> sndrs_t;
00249
00250 sndrs_t sndrs;
00251 };
00252
00253 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00254 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::~MessageQueue() {
00255
00256
00257
00258 while(!mq.empty()) {
00259 RCRegion * rcr = RCRegion::attach(mq.front().id);
00260 rcr->RemoveSharedReference();
00261 rcr->RemoveReference();
00262 mq.pop_front();
00263 }
00264 }
00265
00266 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00267 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReadStatusListener() {
00268 AutoLock autolock(lock);
00269 SemaphoreManager::semid_t sem=semgr->getSemaphore();
00270 if(sem==semgr->invalid()) {
00271 std::cerr << "ERROR: unable to add read status listener to message queue because semaphore manager is out of semaphores" << std::endl;
00272 return semgr->invalid();
00273 }
00274 if(sndrs.push_back(sem)==sndrs.end()) {
00275 std::cerr << "ERROR: unable to add read status listener to message queue because message queue can't register any more senders (MAX_SENDERS)" << std::endl;
00276 semgr->releaseSemaphore(sem);
00277 return semgr->invalid();
00278 }
00279 return sem;
00280 }
00281
00282 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00283 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReadStatusListener(SemaphoreManager::semid_t sem) {
00284 AutoLock autolock(lock);
00285 for(index_t it=sndrs.begin(); it!=sndrs.end(); it=sndrs.next(it))
00286 if(sndrs[it]==sem) {
00287 sndrs.erase(it);
00288 semgr->releaseSemaphore(sem);
00289 break;
00290 }
00291 }
00292
00293 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00294 SemaphoreManager::semid_t MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::addReceiver() {
00295 AutoLock autolock(lock);
00296 SemaphoreManager::semid_t sem=semgr->getSemaphore();
00297 if(sem==semgr->invalid()) {
00298 std::cerr << "ERROR: unable to add receiver to message queue because semaphore manager is out of semaphores" << std::endl;
00299 return semgr->invalid();
00300 }
00301 if(rcvrs.push_back(sem)==rcvrs.end()) {
00302 std::cerr << "ERROR: unable to add receiver to message queue because message queue can't register any more receivers (MAX_RECEIVERS)" << std::endl;
00303 semgr->releaseSemaphore(sem);
00304 return semgr->invalid();
00305 }
00306 numReceivers++;
00307 return sem;
00308 }
00309
00310 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00311 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::removeReceiver(SemaphoreManager::semid_t rcvr) {
00312 AutoLock autolock(lock);
00313 index_t rcvr_id=rcvrs.begin();
00314 for(; rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00315 if(rcvrs[rcvr_id]==rcvr)
00316 break;
00317 if(rcvr_id==rcvrs.end()) {
00318 std::cerr << "WARNING: tried to remove message queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00319 return;
00320 }
00321 rcvrs.erase(rcvr_id);
00322 semgr->releaseSemaphore(rcvr);
00323 numReceivers--;
00324 for(index_t it=mq.begin(); it!=mq.end(); it=mq.next(it)) {
00325 if(mq[it].readFlags[rcvr_id]) {
00326
00327 mq[it].readFlags[rcvr_id]=false;
00328 mq[it].numRead--;
00329 } else if(mq[it].numRead==numReceivers) {
00330
00331 RCRegion * rcr = RCRegion::attach(mq[it].id);
00332 rcr->RemoveSharedReference();
00333 rcr->RemoveReference();
00334 it=mq.prev(it);
00335 mq.erase(mq.next(it));
00336 messagesRead++;
00337 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00338 semgr->raise(sndrs[sit],1);
00339 }
00340 }
00341 }
00342
00343 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00344 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::sendMessage(RCRegion * rcr, bool autoDereference) {
00345 AutoLock autolock(lock);
00346 if(rcr==NULL) {
00347 rcr=new RCRegion(0);
00348 autoDereference=true;
00349 }
00350 if(filters[ProcessID::getID()]!=NULL && !filters[ProcessID::getID()]->filterSendRequest(rcr)) {
00351 if(autoDereference)
00352 rcr->RemoveReference();
00353 return;
00354 }
00355 if(numReceivers==0) {
00356
00357
00358 messagesRead++;
00359 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00360 semgr->raise(sndrs[sit],1);
00361 if(autoDereference)
00362 rcr->RemoveReference();
00363 return;
00364 }
00365 if(isClosed) {
00366 if(reportDroppings)
00367 std::cerr << "Warning: MessageQueue dropping " << rcr->ID().key << " because queue is closed" << std::endl;
00368 if(autoDereference)
00369 rcr->RemoveReference();
00370 return;
00371 }
00372 if(mq.size()==mq.getMaxCapacity()) {
00373 switch(overflowPolicy) {
00374 case DROP_OLDEST: {
00375 if(reportDroppings)
00376 std::cerr << "WARNING: MessageQueue full, dropping oldest unread message (" << mq.front().id.key << ")" << std::endl;
00377 RCRegion * eldest = RCRegion::attach(mq.front().id);
00378 eldest->RemoveSharedReference();
00379 mq.pop_front();
00380 eldest->RemoveReference();
00381 } break;
00382 case DROP_NEWEST:
00383 if(reportDroppings)
00384 std::cerr << "WARNING: MessageQueue full, dropping newest unread message (" << rcr->ID().key << ")" << std::endl;
00385 if(autoDereference)
00386 rcr->RemoveReference();
00387 return;
00388 case WAIT:
00389 if(reportDroppings)
00390 std::cerr << "WARNING: MessageQueue full, waiting for readers to catch up" << std::endl;
00391 while(mq.size()==mq.getMaxCapacity()) {
00392
00393 unsigned int ll=lock.get_lock_level();
00394 lock.releaseAll();
00395 usleep(MutexLockBase::usleep_granularity*15);
00396 for(unsigned int i=0; i<ll; i++)
00397 lock.lock(ProcessID::getID());
00398 if(overflowPolicy!=WAIT) {
00399 sendMessage(rcr,autoDereference);
00400 return;
00401 }
00402 }
00403 break;
00404 case THROW_BAD_ALLOC:
00405 if(reportDroppings)
00406 std::cerr << "WARNING: MessageQueue full, throwing bad_alloc exception" << std::endl;
00407 throw std::bad_alloc();
00408 break;
00409 }
00410 }
00411 rcr->AddSharedReference();
00412 if(mq.push_back(entry(numMessages++,rcr))==mq.end()) {
00413
00414 std::cerr << "ERROR: MessageQueue unable to add message; buggy overflow policy?" << std::endl;
00415 exit(EXIT_FAILURE);
00416 }
00417
00418
00419
00420 for(index_t it=rcvrs.begin(); it!=rcvrs.end(); it=rcvrs.next(it))
00421 semgr->raise(rcvrs[it],1);
00422
00423 if(autoDereference)
00424 rcr->RemoveReference();
00425 }
00426
00427 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00428 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::readMessage(index_t msg, SemaphoreManager::semid_t rcvr) {
00429 AutoLock autolock(lock);
00430 RCRegion * rcr = RCRegion::attach(mq[msg].id);
00431 index_t rcvr_id=lookupReceiver(rcvr);
00432 if(rcvr_id==rcvrs.end())
00433 return rcr;
00434 if(mq[msg].readFlags[rcvr_id]) {
00435 std::cerr << "WARNING: MessageQueue::readMessage(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00436 return rcr;
00437 }
00438 mq[msg].readFlags[rcvr_id]=true;
00439 mq[msg].numRead++;
00440 if(mq[msg].numRead==numReceivers) {
00441
00442 rcr->RemoveSharedReference();
00443 mq.erase(msg);
00444 messagesRead++;
00445 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00446 semgr->raise(sndrs[sit],1);
00447 }
00448 return rcr;
00449 }
00450
00451 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00452 RCRegion * MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::peekMessage(index_t msg) {
00453
00454 return RCRegion::attach(mq[msg].id);
00455 }
00456
00457 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00458 void MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::markRead(index_t msg, SemaphoreManager::semid_t rcvr) {
00459 AutoLock autolock(lock);
00460 index_t rcvr_id=lookupReceiver(rcvr);
00461 if(rcvr_id==rcvrs.end())
00462 return;
00463 if(mq[msg].readFlags[rcvr_id]) {
00464 std::cerr << "WARNING: MessageQueue::markRead(): Receiver re-reading message, could be recycled/invalidated any time" << std::endl;
00465 return;
00466 }
00467 mq[msg].readFlags[rcvr_id]=true;
00468 mq[msg].numRead++;
00469 if(mq[msg].numRead==numReceivers) {
00470
00471 RCRegion * rcr = RCRegion::attach(mq[msg].id);
00472 rcr->RemoveSharedReference();
00473 rcr->RemoveReference();
00474 mq.erase(msg);
00475 messagesRead++;
00476 for(index_t sit=sndrs.begin(); sit!=sndrs.end(); sit=sndrs.next(sit))
00477 semgr->raise(sndrs[sit],1);
00478 }
00479 }
00480
00481 template<unsigned int MAX_UNREAD, unsigned int MAX_RECEIVERS, unsigned int MAX_SENDERS>
00482 typename MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::rcvrs_t::index_t
00483 MessageQueue<MAX_UNREAD,MAX_RECEIVERS,MAX_SENDERS>::lookupReceiver(SemaphoreManager::semid_t rcvr) const {
00484 for(index_t rcvr_id=rcvrs.begin(); rcvr_id!=rcvrs.end(); rcvr_id=rcvrs.next(rcvr_id))
00485 if(rcvrs[rcvr_id]==rcvr)
00486 return rcvr_id;
00487 std::cerr << "WARNING: tried to look up queue receiver " << rcvr << ", which is not registered as a receiver for this queue" << std::endl;
00488 return rcvrs.end();
00489 }
00490
00491
00492
00493
00494
00495
00496 #endif //APERIOS check
00497
00498 #endif //INCLUDED