Tekkotsu Homepage
Demos
Overview
Downloads
Dev. Resources
Reference
Credits

MessageReceiver.cc

Go to the documentation of this file.
00001 #ifndef PLATFORM_APERIOS
00002 
00003 #include "MessageReceiver.h"
00004 #include "Shared/debuget.h"
00005 //#include "local/sim/Process.h"
00006 //#include <iostream>
00007 //using namespace std;
00008 
00009 MessageReceiver::MessageReceiver(MessageQueueBase& mq, bool (*callback) (RCRegion*)/*=NULL*/, bool startThread/*=true*/, bool subscribe/*=true*/)
00010 : Thread(), queue(mq), semid(mq.getSemaphoreManager()->invalid()),
00011 nextMessage(0), lastProcessedMessage(-1U), process(callback), curit((index_t)-1)
00012 {
00013   if(startThread)
00014     start();
00015   else if(subscribe) {
00016     ASSERTRET(semid==queue.getSemaphoreManager()->invalid(),"semid is already set?");
00017     semid=queue.addReceiver();
00018     if(semid==queue.getSemaphoreManager()->invalid())
00019       std::cerr << "ERROR: could not start MessageReceiver -- out of semaphore IDs" << std::endl;
00020   }
00021 }
00022 
00023 MessageReceiver::~MessageReceiver() {
00024   if(!isStarted())
00025     return;
00026   stop();
00027   join();
00028   queue.removeReceiver(semid);
00029   semid=queue.getSemaphoreManager()->invalid();
00030 }
00031 
00032 RCRegion * MessageReceiver::peekNextMessage() {
00033   MessageQueueBase::AutoLock autolock(queue.getLock());
00034   findCurrentMessage();
00035   if(queue.isEnd(curit))
00036     return NULL;
00037   return queue.peekMessage(curit);
00038 }
00039 
00040 RCRegion * MessageReceiver::getNextMessage() {
00041   MessageQueueBase::AutoLock autolock(queue.getLock());
00042   findCurrentMessage();
00043   if(queue.isEnd(curit))
00044     return NULL;
00045   nextMessage=queue.getMessageSN(curit)+1;
00046   curit=queue.newer(curit); //next time, start on (or peek at) the one after this
00047   return queue.readMessage(curit,semid);
00048 }
00049 
00050 Thread& MessageReceiver::stop() {
00051   Thread::stop();
00052   queue.getSemaphoreManager()->raise(semid,1); //trigger a check so the thread will notice the stop
00053   return *this;
00054 }
00055 
00056 void MessageReceiver::findCurrentMessage() {
00057   if(queue.isEnd(curit)) {
00058     curit=queue.newest(); //start with the newest
00059     while(!queue.isEnd(curit) && queue.getMessageSN(curit)>=nextMessage)
00060       curit=queue.older(curit); //scan back to the first already read by this process
00061     curit=queue.newer(curit); //go to the following one (first unread)
00062   } else {
00063     while(!queue.isEnd(curit) && queue.getMessageSN(curit)<nextMessage)
00064       curit=queue.newer(curit); //scan forward to next message not read by this process
00065   }
00066 }
00067 
00068 void MessageReceiver::finish() {
00069   if(isStarted()) {
00070     stop();
00071     join();
00072   }
00073   if(semid!=queue.getSemaphoreManager()->invalid()) {
00074     //cout << Process::getName() << " finish" << endl;
00075     while(processNextMessage()) {}
00076     queue.removeReceiver(semid);
00077     semid=queue.getSemaphoreManager()->invalid();
00078   }
00079 }
00080 
00081 bool MessageReceiver::launched() {
00082   if(semid==queue.getSemaphoreManager()->invalid())
00083     semid=queue.addReceiver();
00084   if(semid==queue.getSemaphoreManager()->invalid()) {
00085     std::cerr << "ERROR: could not start MessageReceiver -- out of semaphore IDs" << std::endl;
00086     return false;
00087   }
00088   return Thread::launched();
00089 }
00090 
00091 unsigned int MessageReceiver::runloop() {
00092   //cout << Process::getName() << " runloop" << endl;
00093   pushNoCancel();
00094   waitNextMessage();
00095   while(processNextMessage()) { //get everything else in the queue
00096     queue.getSemaphoreManager()->lower(semid,1,false);
00097   }
00098   popNoCancel();
00099   return 0;
00100 }
00101 
00102 bool MessageReceiver::waitNextMessage() {
00103   return queue.getSemaphoreManager()->lower(semid,1,true);
00104 }
00105 
00106 bool MessageReceiver::processNextMessage() {
00107   RCRegion * msg=peekNextMessage();
00108   if(msg==NULL)
00109     return false;
00110   //cout << Process::getName() << " got " << msg->ID().key << ' ' << lastProcessedMessage << ' ' << queue.getMessageSN(curit) << ' ' << curit << endl;
00111   bool used=false;
00112   if(lastProcessedMessage!=queue.getMessageSN(curit)) {
00113     lastProcessedMessage=queue.getMessageSN(curit);
00114     //cout << Process::getName() << " process received " << lastProcessedMessage << " at " << TimeET() << endl;
00115     used=process(msg);
00116     if(used)
00117       markRead(false); // message was consumed, mark it read
00118     //cout << used << ' ' << curit;
00119     //if(!queue.isEnd(curit))
00120     //cout << lastProcessedMessage << ' ' << queue.getMessageSN(curit);
00121     //cout << endl;
00122   }
00123   msg->RemoveReference();
00124   return used;
00125 }
00126 
00127 void MessageReceiver::markRead(bool checkNext) {
00128   findCurrentMessage();
00129   if(queue.isEnd(curit))
00130     return;
00131   nextMessage=queue.getMessageSN(curit)+1;
00132   queue.markRead(curit,semid);
00133   curit=queue.newer(curit); //next time, start on (or peek at) the one after this
00134   if(checkNext && !queue.isEnd(curit))
00135     queue.getSemaphoreManager()->raise(semid,1); //trigger a check if there are more waiting in the queue
00136 }
00137 
00138 
00139 
00140 /*! @file
00141  * @brief 
00142  * @author Ethan Tira-Thompson (ejt) (Creator)
00143  */
00144 
00145 #endif //PLATFORM_APERIOS check (aperios doesn't support pthreads...)

Tekkotsu v5.1CVS
Generated Mon May 9 04:58:45 2016 by Doxygen 1.6.3