import java.io.*; import java.util.*; import java.lang.*; import java.util.concurrent.PriorityBlockingQueue; /** This class represents the physical processors involved in the time warp * simulation engine. * @author Yin Xiong * @version 1.7 (07/2006) */ public class PE implements Runnable{ /**The id of this PE.*/ int id; /**Flag for end of sim*/ Flag eosFlag; /**The most recent GVT*/ int GVT; /**Sleep time*/ int sleepTime; /**The message pool where all messages are stored*/ MessagePool msgPool; /**The LocalNameServer in charge of routing messages.*/ LocalNameServer localNameServer; /**The GlobalNameServer*/ GlobalNameServer gns; /**The queue that holds messages that have been cancelled. When a processor * wishes to cancel a message, it enqueues the message being cancelled into * this queue. Logically, each message enqueued in this queue can be viewed * as an anti-message, however, it is the message itself rather than an * explicit anti-message that is enqueued. Access this queue is synchronized * with locks. */ PriorityBlockingQueue cancelQ; /**The queue that holds the outgoing messages. Messages are placed into this *by the Scheduler and the sender(main thread) will come here to get the *outgoing message and send it out. A copy is kept in this queue, but its sign *is changed from positive to negative. */ PriorityBlockingQueue sendQ; /**The queue that holds all unprocessed events for all LPs mapped to this PE. * A single queue is used here in order to eliminates the need for a * separate "scheduling queue" to enumerate the executable LPs. */ //Queue calendarQ; /**The queue that holds the processed and unprocessed events for LPs mapped * to this PE. Each processed event has pointers to messages scheduled by * the computation associated with this event and pointer to state vector * information to allow event computation to roll back. */ PriorityBlockingQueue eventQ; /**The queue that holds the events whose memeory is free to reclaim.*/ PriorityBlockingQueue freeQ; /**The scheduler that schedules events for this PE.*/ SchedulingLoop schedulingLoop; /**set the sleep time*/ public void setSleepTime(int st) { sleepTime=st; } /**Constructs a new PE instance. Sets the id of the PE to peid. *@param peid int the id of the PE, i.e. the id of this process in MPI.comm. */ public PE(int peid, MessagePool msgp, Flag ef, Flag gf, int gc) { id=peid; GVT=0; msgPool=msgp; eosFlag=ef; localNameServer=new LocalNameServer(id); cancelQ=new PriorityBlockingQueue(); eventQ=new PriorityBlockingQueue(); sendQ=new PriorityBlockingQueue(); schedulingLoop=new SchedulingLoop(id); schedulingLoop.setFlags(ef,gf); schedulingLoop.setGVTClock(gc); }//end constructor /**Runs the PE(processing events). *This is the communication thread that send and receive messages. */ public void run() { System.out.println("PE " + id + " is running ...\n"); initLPs(); Thread scheduler=new Thread(schedulingLoop); scheduler.start(); boolean endOfSim=false; while(!endOfSim){ try{ //receive messages receiveMsg(); //send messages stored in sednQ sendMessages(); //process the queues processQs(); if(eosFlag.get()<0){ //System.out.println("PE " + id + " gets flag -1"); endOfSim=true; } Thread thread=Thread.currentThread(); thread.sleep(sleepTime); } catch(Exception e){ e.printStackTrace(); } }//end while finalizeLPs(); finalizePE(); }//end run void finalizePE() { System.out.println("PE " + id + " finished running "); }//end finalizePE /**Receives messages sent to this PE. */ public void receiveMsg() { Vector msgsForMe=msgPool.getMsgsForMe(id); for(int i=0;i0) processEvents(); cqsize=cancelQ.size(); if(cqsize>0) processCanQ(); }//end processQs /**Processes events. It takes out each message in SchedulingLoop's outgoing *queue, finds out the routing information by application-specified id of the *destination or the LP id of the destination, consults the local name server *or the global name server (in case local name server does not have the info) *for the routing information. It then puts the message into the sendQ for *sending out. */ void processEvents() { int size=schedulingLoop.outgoing.size(); // System.out.println("PE " + id + "has " + size + " events to process "); for(int i=0;i