00001 #include "EventRouter.h"
00002 #include "Shared/Profiler.h"
00003 #include "Behaviors/BehaviorBase.h"
00004 #include "Shared/ProjectInterface.h"
00005 #include <algorithm>
00006 #include "Events/TimerEvent.h"
00007 #include "EventTranslator.h"
00008 #include "Events/RemoteRouter.h"
00009 #include "Events/EventProxy.h"
00010
00011 #include <sstream>
00012
00013 #ifndef PLATFORM_APERIOS
00014 # include "IPC/Thread.h"
00015 # include "Shared/MarkScope.h"
00016 #endif
00017
00018 EventRouter * erouter=NULL;
00019
00020 EventRouter::EventRouter()
00021 : proxies(), rrouters(), sck(NULL), nextProxyPort(defaultPort+1),
00022 timers(), trappers(), listeners(), postings()
00023 {
00024 for(unsigned int i=0; i<ProcessID::NumProcesses; ++i) {
00025 forwards[i]=NULL;
00026 }
00027
00028 }
00029
00030 EventRouter::~EventRouter() {
00031 reset();
00032 removeAllTimers();
00033 for(unsigned int i=0; i<ProcessID::NumProcesses; ++i) {
00034 delete forwards[i];
00035 forwards[i]=NULL;
00036 }
00037
00038
00039 printf("Deleting %zu EventProxies and %zu RemoteRouters\n", proxies.size(), rrouters.size());
00040 for (std::list<EventProxy *>::iterator pi = proxies.begin(); pi != proxies.end(); pi++)
00041 delete *pi;
00042
00043
00044 for (std::map<int, RemoteRouter *>::iterator mi = rrouters.begin(); mi != rrouters.end(); mi++)
00045 delete (*mi).second;
00046
00047 }
00048
00049 void EventRouter::postEvent(EventBase* e) { processEvent(*e); delete e; }
00050
00051
00052 void EventRouter::processTimers() {
00053
00054 unsigned int curtime=get_time();
00055 TimerEntry curTimer(curtime);
00056 timer_it_t last_it=upper_bound(timers.begin(),timers.end(),&curTimer,TimerEntryPtrCmp());
00057 std::vector<TimerEntry*> process(timers.begin(),last_it);
00058 for(timer_it_t it=process.begin(); it!=process.end(); it++)
00059 if(!(*it)->repeat)
00060 (*it)->next=(unsigned int)-1;
00061 else if((*it)->delay==0)
00062 (*it)->next=curtime+1;
00063 else while((*it)->next<=curtime)
00064 (*it)->next+=(*it)->delay;
00065 sort(timers.begin(),last_it,TimerEntryPtrCmp());
00066 inplace_merge(timers.begin(),last_it,timers.end(),TimerEntryPtrCmp());
00067
00068 for(timer_it_t it=process.begin(); it!=process.end(); it++) {
00069 TimerEvent e((*it)->el,EventBase::timerEGID,(*it)->sid,EventBase::statusETID,(*it)->next-(*it)->delay);
00070 try {
00071 (*it)->el->processEvent(e);
00072 } catch(const std::exception& ex) {
00073 std::string msg="Occurred while processing event "+e.getName()+" by ";
00074 if(BehaviorBase * beh=dynamic_cast<BehaviorBase*>((*it)->el))
00075 msg+="listener "+beh->getName();
00076 else
00077 msg+="unnamed EventListener";
00078 if(!ProjectInterface::uncaughtException(__FILE__,__LINE__,msg.c_str(),&ex))
00079 throw;
00080 } catch(...) {
00081 std::string msg="Occurred while processing event "+e.getName()+" by ";
00082 if(BehaviorBase * beh=dynamic_cast<BehaviorBase*>((*it)->el))
00083 msg+="listener "+beh->getName();
00084 else
00085 msg+="unnamed EventListener";
00086 if(!ProjectInterface::uncaughtException(__FILE__,__LINE__,msg.c_str(),NULL))
00087 throw;
00088 }
00089 postEvent(e);
00090 }
00091
00092 static const TimerEntry deadTimer((unsigned int)-1);
00093 last_it=lower_bound(timers.begin(),timers.end(),&deadTimer,TimerEntryPtrCmp());
00094 for(timer_it_t it=last_it; it!=timers.end(); it++)
00095 delete *it;
00096 timers.erase(last_it,timers.end());
00097
00098
00099 }
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109 void EventRouter::addTimer(EventListener* el, size_t sid, unsigned int delay, bool repeat) {
00110 if(delay==-1U) {
00111 removeTimer(el,sid);
00112 return;
00113 }
00114 for(timer_it_t it=timers.begin(); it!=timers.end(); it++)
00115 if((*it)->el==el && (*it)->sid==sid) {
00116 (*it)->Set(delay,repeat);
00117
00118 if(it!=timers.begin() && (*it)->next<(*(it-1))->next)
00119 rotate(upper_bound(timers.begin(),it,*it,TimerEntryPtrCmp()),it,it+1);
00120 else if(it+1!=timers.end() && (*it)->next>(*(it+1))->next)
00121 rotate(it,it+1,lower_bound(it+1,timers.end(),*it,TimerEntryPtrCmp()));
00122 return;
00123 }
00124
00125 TimerEntry * add=new TimerEntry(el,sid,delay,repeat);
00126 timers.insert(lower_bound(timers.begin(),timers.end(),add,TimerEntryPtrCmp()),add);
00127
00128 }
00129
00130 void EventRouter::removeTimer(EventListener* el) {
00131 for(timer_it_t it=timers.begin(); it!=timers.end(); it++)
00132 if((*it)->el==el) {
00133 delete *it;
00134 *it=NULL;
00135 }
00136 timers.erase(std::remove(timers.begin(),timers.end(),(const TimerEntry*)NULL),timers.end());
00137 }
00138
00139 void EventRouter::removeTimer(EventListener* el, size_t sid) {
00140 for(timer_it_t it=timers.begin(); it!=timers.end(); it++)
00141 if((*it)->el==el && (*it)->sid==sid) {
00142 delete *it;
00143 timers.erase(it);
00144 return;
00145 }
00146 }
00147
00148 void EventRouter::removeAllTimers() {
00149 for(timer_it_t it=timers.begin(); it!=timers.end(); it++)
00150 delete *it;
00151 timers.erase(timers.begin(),timers.end());
00152 }
00153
00154 void EventRouter::addListener(EventListener* el, EventBase::EventGeneratorID_t egid) {
00155 bool hadListener=hasListeners(egid);
00156 listeners.addMapping(el,egid);
00157 if(!hadListener)
00158 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::activateETID,0,EventBase::EventGeneratorNames[egid],1));
00159 else
00160 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00161 }
00162 void EventRouter::addListener(EventListener* el, EventBase::EventGeneratorID_t egid, size_t sid) {
00163 bool hadListener=hasListeners(egid);
00164 for(unsigned int et=0; et<EventBase::numETIDs; et++)
00165 listeners.addMapping(el,egid,sid,(EventBase::EventTypeID_t)et);
00166 if(!hadListener)
00167 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::activateETID,0,EventBase::EventGeneratorNames[egid],1));
00168 else
00169 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00170 }
00171 void EventRouter::addListener(EventListener* el, EventBase::EventGeneratorID_t egid, size_t sid, EventBase::EventTypeID_t etid) {
00172 bool hadListener=hasListeners(egid);
00173 listeners.addMapping(el,egid,sid,etid);
00174 if(!hadListener)
00175 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::activateETID,0,EventBase::EventGeneratorNames[egid],1));
00176 else
00177 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00178 }
00179 void EventRouter::addListener(EventListener* el, const EventBase& e) {
00180 bool hadListener=hasListeners(e.getGeneratorID());
00181 listeners.addMapping(el,e.getGeneratorID(),e.getSourceID(),e.getTypeID());
00182 if(!hadListener)
00183 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::activateETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],1));
00184 else
00185 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::statusETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],1));
00186 }
00187
00188
00189 void EventRouter::addRemoteListener(EventListener* el, int host,
00190 EventBase::EventGeneratorID_t egid) {
00191 RemoteRouter &rr = remoteRouterForHost(host);
00192 addListener(el, egid);
00193 rr.addListener(egid);
00194 }
00195
00196 void EventRouter::addRemoteListener(EventListener* el, int host,
00197 EventBase::EventGeneratorID_t egid, size_t sid) {
00198 RemoteRouter &rr = remoteRouterForHost(host);
00199 addListener(el, egid, sid);
00200 rr.addListener(egid, sid);
00201 }
00202
00203 void EventRouter::addRemoteListener(EventListener* el, int host, const EventBase& e){
00204 addRemoteListener(el, host, e.getGeneratorID(), e.getSourceID(), e.getTypeID());
00205 }
00206
00207 void EventRouter::addRemoteListener(EventListener* el, int host,
00208 EventBase::EventGeneratorID_t egid, size_t sid,
00209 EventBase::EventTypeID_t etid) {
00210 RemoteRouter &rr = remoteRouterForHost(host);
00211 addListener(el, egid, sid, etid);
00212 rr.addListener(egid, sid, etid);
00213 }
00214
00215
00216 void EventRouter::removeRemoteListener(EventListener* el, int host,
00217 EventBase::EventGeneratorID_t egid) {
00218 RemoteRouter &rr = remoteRouterForHost(host);
00219 removeListener(el, egid);
00220 rr.removeListener(egid);
00221 }
00222
00223 void EventRouter::removeRemoteListener(EventListener* el, int host,
00224 EventBase::EventGeneratorID_t egid, size_t sid) {
00225 RemoteRouter &rr = remoteRouterForHost(host);
00226 removeListener(el, egid, sid);
00227 rr.removeListener(egid, sid);
00228 }
00229
00230 void EventRouter::removeRemoteListener(EventListener* el, int host,
00231 const EventBase& e) {
00232 removeRemoteListener(el, host, e.getGeneratorID(), e.getSourceID(), e.getTypeID());
00233 }
00234
00235 void EventRouter::removeRemoteListener(EventListener* el, int host,
00236 EventBase::EventGeneratorID_t egid, size_t sid,
00237 EventBase::EventTypeID_t etid) {
00238 RemoteRouter &rr = remoteRouterForHost(host);
00239 removeListener(el, egid, sid, etid);
00240 rr.removeListener(egid, sid, etid);
00241 }
00242
00243
00244 void EventRouter::requestRemoteStateUpdates(int host, RemoteState::StateType type,
00245 unsigned int interval) {
00246 RemoteRouter &rr = remoteRouterForHost(host);
00247 rr.requestStateUpdates(type, interval);
00248 }
00249
00250 void EventRouter::stopRemoteStateUpdates(int host, RemoteState::StateType type) {
00251 RemoteRouter &rr = remoteRouterForHost(host);
00252 rr.stopStateUpdates(type);
00253 }
00254
00255 RemoteRouter &EventRouter::remoteRouterForHost(int host) {
00256 RemoteRouter *rr = rrouters[host];
00257 if (rr) {
00258 printf("Returning existing remote router for host %s", intToStringIP(host).c_str());
00259 return *rr;
00260 } else {
00261 rrouters[host] = rr = new RemoteRouter(host);
00262 printf("Returning new remote router for host %s", intToStringIP(host).c_str());
00263 return *rr;
00264 }
00265 }
00266
00267 std::string EventRouter::intToStringIP(int ip) {
00268 std::stringstream ret;
00269 ret << (ip >> 24 & 0xff) << '.' << (ip >> 16 & 0xff) << '.'
00270 << (ip >> 8 & 0xff) << '.' << (ip >> 0 & 0xff);
00271 return ret.str();
00272 }
00273
00274 int EventRouter::stringToIntIP(std::string ip_str) {
00275 std::istringstream sstr;
00276 sstr.str(ip_str);
00277 int ip = 0, b;
00278 char c;
00279
00280 sstr >> b >> c;
00281 ip |= b << 24;
00282
00283 sstr >> b >> c;
00284 ip |= b << 16;
00285
00286 sstr >> b >> c;
00287 ip |= b << 8;
00288
00289 sstr >> b;
00290 ip |= b << 0;
00291
00292 return ip;
00293 }
00294
00295
00296 bool EventRouter::serveRemoteEventRequests() {
00297 if (sck)
00298 return false;
00299 sck = wireless->socket(Socket::SOCK_STREAM);
00300 wireless->setReceiver(sck, this);
00301 wireless->setDaemon(sck, true);
00302 wireless->listen(sck, EventRouter::defaultPort);
00303 return true;
00304 }
00305
00306 int EventRouter::processData(char* , int bytes) {
00307 if (bytes != sizeof(int)) {
00308 std::cerr << "Unknown data received" << std::endl;
00309 return -1;
00310 }
00311
00312 int nextPort = nextProxyPort++;
00313 std::cout << "Starting EventProxy on port " << nextPort
00314 << " for host " << intToStringIP(sck->getPeerAddress()) << std::endl;
00315 proxies.push_back(new EventProxy(nextPort));
00316
00317
00318 sck->write((byte *)&nextPort, sizeof(int));
00319
00320
00321 wireless->close(sck);
00322
00323 return 0;
00324 }
00325
00326
00327
00328 void EventRouter::removeListener(EventListener* el) {
00329 for(unsigned int eg=0; eg<EventBase::numEGIDs; eg++) {
00330 EventBase::EventGeneratorID_t egid=(EventBase::EventGeneratorID_t)eg;
00331 if(!listeners.removeMapping(el,egid))
00332 continue;
00333 listeners.clean(egid);
00334 if(!hasListeners(egid))
00335 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::deactivateETID,0,EventBase::EventGeneratorNames[egid],0));
00336 else
00337 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00338 }
00339 }
00340 void EventRouter::removeListener(EventListener* el, EventBase::EventGeneratorID_t egid) {
00341 if(!listeners.removeMapping(el,egid))
00342 return;
00343 listeners.clean(egid);
00344 if(!hasListeners(egid))
00345 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::deactivateETID,0,EventBase::EventGeneratorNames[egid],0));
00346 else
00347 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00348 }
00349 void EventRouter::removeListener(EventListener* el, EventBase::EventGeneratorID_t egid, size_t sid) {
00350 unsigned int removed=0;
00351 for(unsigned int et=0; et<EventBase::numETIDs; et++)
00352 removed+=listeners.removeMapping(el,egid,sid,(EventBase::EventTypeID_t)et);
00353 if(!removed)
00354 return;
00355 listeners.clean(egid);
00356 if(!hasListeners(egid))
00357 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::deactivateETID,0,EventBase::EventGeneratorNames[egid],0));
00358 else
00359 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00360 }
00361 void EventRouter::removeListener(EventListener* el, EventBase::EventGeneratorID_t egid, size_t sid, EventBase::EventTypeID_t etid) {
00362 if(!listeners.removeMapping(el,egid,sid,etid))
00363 return;
00364 listeners.clean(egid);
00365 if(!hasListeners(egid))
00366 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::deactivateETID,0,EventBase::EventGeneratorNames[egid],0));
00367 else
00368 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00369 }
00370 void EventRouter::removeListener(EventListener* el, const EventBase& e) {
00371 if(!listeners.removeMapping(el,e.getGeneratorID(),e.getSourceID(),e.getTypeID()))
00372 return;
00373 listeners.clean(e.getGeneratorID());
00374 if(!hasListeners(e.getGeneratorID()))
00375 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::deactivateETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],0));
00376 else
00377 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::statusETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],1));
00378 }
00379
00380 void EventRouter::addTrapper(EventTrapper* el, const EventBase& e) {
00381 bool hadListener=hasListeners(e.getGeneratorID());
00382 trappers.addMapping(el,e.getGeneratorID(),e.getSourceID(),e.getTypeID());
00383 if(!hadListener)
00384 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::activateETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],1));
00385 else
00386 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::statusETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],1));
00387 }
00388
00389 void EventRouter::addTrapper(EventTrapper* el, EventBase::EventGeneratorID_t egid) {
00390 bool hadListener=hasListeners(egid);
00391 trappers.addMapping(el,egid);
00392 if(!hadListener)
00393 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::activateETID,0,EventBase::EventGeneratorNames[egid],1));
00394 else
00395 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00396 }
00397
00398 void EventRouter::addTrapper(EventTrapper* el, EventBase::EventGeneratorID_t egid, size_t sid) {
00399 bool hadListener=hasListeners(egid);
00400 for(unsigned int et=0; et<EventBase::numETIDs; et++)
00401 trappers.addMapping(el,egid,sid,(EventBase::EventTypeID_t)et);
00402 if(!hadListener)
00403 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::activateETID,0,EventBase::EventGeneratorNames[egid],1));
00404 else
00405 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00406 }
00407
00408 void EventRouter::addTrapper(EventTrapper* el, EventBase::EventGeneratorID_t egid, size_t sid, EventBase::EventTypeID_t etid) {
00409 bool hadListener=hasListeners(egid);
00410 trappers.addMapping(el,egid,sid,etid);
00411 if(!hadListener)
00412 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::activateETID,0,EventBase::EventGeneratorNames[egid],1));
00413 else
00414 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00415 }
00416
00417
00418 void EventRouter::addTrapper(EventTrapper* el) {
00419 for(unsigned int eg=0; eg<EventBase::numEGIDs; eg++)
00420 addTrapper(el,(EventBase::EventGeneratorID_t)eg);
00421 }
00422
00423
00424 void EventRouter::removeTrapper(EventTrapper* el, const EventBase& e) {
00425 if(!trappers.removeMapping(el,e.getGeneratorID(),e.getSourceID(),e.getTypeID()))
00426 return;
00427 trappers.clean(e.getGeneratorID());
00428 if(!hasListeners(e.getGeneratorID()))
00429 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::deactivateETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],0));
00430 else
00431 postEvent(EventBase(EventBase::erouterEGID,e.getGeneratorID(),EventBase::statusETID,0,EventBase::EventGeneratorNames[e.getGeneratorID()],1));
00432 }
00433 void EventRouter::removeTrapper(EventTrapper* el, EventBase::EventGeneratorID_t egid) {
00434 if(!trappers.removeMapping(el,egid))
00435 return;
00436 trappers.clean(egid);
00437 if(!hasListeners(egid))
00438 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::deactivateETID,0,EventBase::EventGeneratorNames[egid],0));
00439 else
00440 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,EventBase::EventGeneratorNames[egid],1));
00441 }
00442 void EventRouter::removeTrapper(EventTrapper* el, EventBase::EventGeneratorID_t egid, size_t sid) {
00443 int removed=0;
00444 for(unsigned int et=0; et<EventBase::numETIDs; et++)
00445 removed+=trappers.removeMapping(el,egid,sid,(EventBase::EventTypeID_t)et);
00446 if(!removed)
00447 return;
00448 trappers.clean(egid);
00449 if(!hasListeners(egid))
00450 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::deactivateETID,0,EventBase::EventGeneratorNames[egid],0));
00451 else
00452 postEvent(EventBase(EventBase::erouterEGID,egid,EventBase::statusETID,0,