import java.util.PriorityQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.Iterator; import java.util.ConcurrentModificationException; import java.util.List; import java.util.ArrayList; import java.util.SortedSet; import java.util.TreeSet; import java.io.*; import java.util.*; /**This class provides the simulation engine logical process scheduling loop. This * loop runs in its own thread in parallel with the communication thread * of the PE. After instantiation, this class should have one or more * LogicalProcesses added (by {@link #createLP createLP}) before the thread is * started. *

Requires Java 5.0 due to use of the * PriorityBlockingQueue class and others. * @author Yin Xiong * @author Glenn Matthews * @version 1.9 (09/2006) */ public class SchedulingLoop implements Runnable{ /**The id of the parent PE of this scheduler.*/ int id; /**The flag indicating the end of sim*/ Flag eosFlag; /**Flag for computing GVT*/ Flag gvtFlag; /**The most recent GVT*/ int GVT; /**The frequency that GVT should be computed*/ int gvtClock; /**The queue that store the saved LP state*/ Vector lpStates; /** Very primitive way of stopping the run loop: set this to true. * Run loop will terminate after current iteration and clean up after * itself. */ public boolean done = false; /** Shared priority queue containing incoming {@link Message}s from the * communication thread. */ public PriorityBlockingQueue incoming; /** Shared priority queue containing outgoing {@link Message}s to the * communication thread. Note that the addressing of these Messages are * not likely to be fully specified, since this class does not have access * to a GlobalNameServer. In general, the communication thread should * expect to have to use {@link Message#senderAPPid} and * {@link Message#receiverAPPid} to look up the LPid and PE of the sender * and receiver. The communication thread is also expected to recognize * and handle the case where receiverAPPid == * {@link LogicalProcess#ALL_LPS}. */ public PriorityBlockingQueue outgoing; /** Local list storing all LogicalProcesses managed by this SchedulingLoop. */ List LPs; /** Local priority queue storing simulation messages yet to be handled. */ public PriorityQueue simMessages; /** @since 1.7. * The queue that contains anti-messages. A anti-message is a copy of the * regular message with a negative sign. * When processing an event on LP i that schedules an event on * LP j, a message and a copy of the same message are created. * The copy is the 'anti-message' sometimes called negative, or copy of the * positive message. The original (positive) message is transmitted to the * destination LP j and the anti-message stays at the source LP i as a record * that the LP i transmitted a positive message to LP j */ Vector antiMsgsOut=new Vector(); /**@since 1.7 *The queue that contains unmatched anti-messages. Their positive counter- *parts haven't been received yet. */ Vector antiMsgsIn=new Vector(); /**@since 1.7 *The queue that stores the processed messages; used for rollback; *When an anti-message comes in, the scheduler first check if the *positive message has already been processed; if yes, rollback; if *not, cancel it; if the positive message is yet to receive, then store *the anti message in antiMsgsIn. */ Vector eventQueue=new Vector(); /** Local list storing sorted sets (one per LP) of current and future * monitoring commands. */ private List> monitoring; /** Local list storing sorted sets (one per LP) of current and future * steering commands. */ private List> steering; /**Constructs a new PE instance. Sets the id of the PE to peid. *@param pid int the id of the PE, i.e. the id of this process. */ public SchedulingLoop(int pid) { id=pid; gvtClock=5; LPs=new Vector(); incoming=new PriorityBlockingQueue(); outgoing=new PriorityBlockingQueue(); LPs = new ArrayList(); simMessages = new PriorityQueue(); monitoring = new ArrayList>(); steering = new ArrayList>(); lpStates=new Vector(); }//end constructor /**Sets the GVT clock. *@param gc int the frequency that GVT will be computed. *@since 1.9 */ void setGVTClock(int gc) { gvtClock=gc; }//end setGVTClock /**Sets the flags. *@param ef Flag end-of-program flag *@param gf Flag flag indicating it's time to compute GVT *@since 1.9 */ void setFlags(Flag ef, Flag gf) { eosFlag=ef; gvtFlag=gf; }//end setFlags /**Computes the local minimum timestamp using Samadi's algorithm which dependes on *acknowledgement for each and every message sent *

local minimum timestamp = the minumum timestamp among *

* the PE sets a flag indicating it's in find mode */ public int computeLocalMin() { int localMin=-1; int temp; //check unprocessed event queue if(simMessages.size()>0){ Message msg=(Message)simMessages.peek(); temp=msg.timeStamp; if(temp>GVT){ if(localMin<0 || (localMin>0 && tempGVT){ if(localMin<0 || (localMin>0 && tempGVT){ if(localMin<0 || (localMin>0 && temp0 && lp.currentTime0){ for(int i=0;irbtime && m.senderLP==lp){ if(index>i) index=i; System.out.println("scheduler " + id + " sending anti msg for LP " + lp + " with timestamp " + m.timeStamp); //System.out.println("check msg type: " + m.msgType); sendMessage(m); m.tag=1; //change tag to 1 to indicate this amti msg has already been sent antiMsgsOut.setElementAt(m,i); }//end if }//end for System.out.println("scheduler " + id + " finished sending anti msgs"); //message at index is the new beginning, return this message for the LP to send out m=(Message)antiMsgsOut.get(index); m.setMsgType(Message.MsgType.SIM_REGULAR); System.out.println("after rollback, LP " + lp + " begins with resending " + m.timeStamp); return m; }//end sendAntiMsgs /** Method required by Runnable interface; called when the thread is first * created. Starts the scheduling loop and continues looping until {@link #done} * is set to true. */ public void run() { int mostRecentLP = -1; done = false; //begin scheduling loop while (!done) { //deal with all incoming messages processIncoming(); //apply the most recent simulation message to the LP it's aimed at mostRecentLP = runOneLP(); //apply scheduled monitoring/steering to that LP if (mostRecentLP != -1) { applyMonitoring(mostRecentLP); applySteering(mostRecentLP); }//end apply monitoring/steering /*******************added by yin for stopping sim************/ try{ Thread thisThread=Thread.currentThread(); thisThread.yield(); if(eosFlag.get()<0){ done=true; //System.out.println("scheduler " + id + " gets flag " + eosFlag.get() + " and is shutting down."); } /******************end added by yin for stopping sim********/ } catch(Exception e){ e.printStackTrace(); }//end try-catch } //end while (!done) //TODO - cleanup and finish running } // end run() /** Calls {@link LogicalProcess#runLP} for the first {@link Message} in the * {@link #simMessages} queue. Messages directed to a LogicalProcess not * owned by this SchedulingLoop are assumed to have been misdelivered, and * are placed in outgoing. * @return The index of the given LogicalProcess in the local lists. */ public int runOneLP() { if (simMessages.size() == 0) return -1; //no messages to handle! Message m = simMessages.poll(); eventQueue.add(m); //System.out.println("scheduler " + id + " processed " + eventQueue.size() + " events"); String target=m.getReceiverAPPid(); int index=-1; if(target!=null) index = getLPIndexByAPPid(target); else index = getLPIndexByLPid(m.getReceiverLP()); if (index <0) { System.out.println("Destination LP not found "); //put the message back into outgoing queue sendMessage(m); return -1; //Unknown LP! } //Pass the message into the LP in question LogicalProcess lp = LPs.get(index); //before run LP(and change LP state),save LP state LogicalProcess loglp=lp.appSavingLPState(); loglp.savingLPState(lp); lpStates.add(loglp); //System.out.println("PE " + id + " saving lp state: " + loglp.toString()); //System.out.println("lastTime,currentTime: " + loglp.lastTime + " " + loglp.currentTime); //if buffer is "full" (as spesified by gvtClock), set flag for computing GVT if(lpStates.size()>gvtClock){ gvtFlag.set(id); System.out.println("buffer used: " + lpStates.size() + " set gvt flag"); } //check local virtual time and receive time; if LVT > receive time, roll back if(lp.currentTime>m.timeStamp){ System.out.println("\n" + lp.myname+ ": local virtual time: " +lp.currentTime+", receive time: "+m.timeStamp); System.out.println("***" + lp.myname +": got a message from the past, rollback to " + m.timeStamp + "\n"); Message mm=rollBack(lp,m.timeStamp); //lp.runLP(mm); } else lp.runLP(m); return index; } // end runOneLP() /**Rolls back to the time specified by rbtime and returns the first message *immediately after the rollback. e.g. if roll back from 4 to 2, then the *returned message should be 3. *@param theLP a LogicalProcess that needs to rollback *@param rbtime int the time the LP will rollback to */ Message rollBack(LogicalProcess theLP, int rbtime) { //System.out.println("in rollBack, saved LP states: " + lpStates.size()); for(int i=0;i iterator = simMessages.iterator(); try { while (iterator.hasNext()) { Message pm = iterator.next(); if(pm.timeStamp==m.timeStamp && pm.receiverLP==m.receiverLP){ System.out.println("found matching positive msg, annihilating " + pm.timeStamp); simMessages.remove(pm); done=true; break; }//end if }//end while }//end try catch(Exception e){ e.printStackTrace(); } }//end if //if the the positive msg hasn't received yet, store this anti message in the antiMsgsIn queue if(!done){ System.out.println("scheduler " + id + " hasn't received the positive msg yet, store anti msg"); antiMsgsIn.add(m); } }//end annihilateMsg /**@since 1.7 *Returns true if the message has been canceled with an anti-message. *@param m Message to be checked */ boolean isMsgCanceled(Message m) { boolean canceled=false; for(int i=0;i *
  • {@link Message.MsgType#SIM_REGULAR} - placed in {@link #simMessages}. *
  • {@link Message.MsgType#SYSTEM_MONITOR} - placed in {@link #monitoring}. *
  • {@link Message.MsgType#SYSTEM_STEER} - placed in {@link #steering}. * * Messages of unrecognized MsgType are discarded. Messages directed to * LogicalProcesses not owned by this SchedulingLoop are also discarded. */ public void processIncoming() { if (incoming.size() == 0) return; //System.out.println("Incoming message!"); //NOTE - According to the Java 5.0 documentation, this Iterator //is *NOT* guaranteed to traverse the PriorityBlockingQueue in any //particular order. In this particular case, we want to get everything //out of it, so that's not a problem. Iterator iterator = incoming.iterator(); try { while (iterator.hasNext()) { Message m = iterator.next(); switch (m.getMsgType()) { case SIM_REGULAR: boolean canceled=isMsgCanceled(m); if(!canceled) simMessages.offer(m); break; case SIM_ANTI: System.out.println("PE " + id + " msg type ANTI"); annihilateMsg(m); break; case SYSTEM_MONITOR: MonitorNode mn; try { mn = (MonitorNode)m.content; int i = getLPIndexByAPPid(m.receiverAPPid); if (i >= 0) monitoring.get(i).add(mn); else System.out.println("Monitoring message was"+ "directed to LP \""+m.receiverAPPid+ "\" - not present here!"); } catch (ClassCastException cce) { System.out.println("Monitoring message does not"+ "contain a MonitorNode object!"); } break; case SYSTEM_STEER: SteerNode sn; try { sn = (SteerNode)m.content; int i = getLPIndexByAPPid(m.receiverAPPid); if (i >= 0) steering.get(i).add(sn); else System.out.println("Steering message was"+ "directed to LP \""+m.receiverAPPid+ "\" - not present here!"); } catch (ClassCastException cce) { System.out.println("Steering message does not"+ "contain a SteerNode object!"); } break; //TODO - process other message types? default: System.out.print("Message type "+m.getMsgType()+ " is not recognized by this SchedulingLoop! "); System.out.println("\""+m+"\" will be discarded."); break; } iterator.remove(); } } catch (ConcurrentModificationException e) { //System.out.println("Error! Concurrent modification of \"incoming\""+ // " PriorityBlockingQueue!"); //e.printStackTrace(); } } // end processIncoming() /** Applies any scheduled monitoring of a single LP. * @param index The storage index of the LP to monitor. */ public void applyMonitoring(int index) { //System.out.println("There's some monitoring to do!"); LogicalProcess lp = LPs.get(index); SortedSet mon = monitoring.get(index); int lptime = lp.currentTime; if (mon.isEmpty()) { //System.out.println("No assigned monitoring for LP "+lp.APPid); return; } Iterator it = mon.iterator(); while (it.hasNext()) { MonitorNode m = it.next(); if (m.startTime > lptime) break; //set is sorted, so all remaining elements aren't valid //Monitoring is applied BEFORE old commands are deleted //This way we can set startTime = stopTime for a one-shot command String s = lp.getState(m.targetVar); ReportNode rep = new ReportNode(s,lptime,m.targetVar,m.clientID); Message msg = new Message(lptime, Message.MsgType.SYSTEM_REPORT, rep, lp.APPid, null); msg.setSenderLP(lp.LPid); msg.setReceiverLP(m.clientID); sendMessage(msg); //NOW we can delete this command if it's obsolete. if (m.stopTime < lptime) it.remove(); } } //end applyMonitoring() /** Applies any scheduled steering to the specified LP. * @param index The storage index of the LP to steer. */ public void applySteering(int index) { //System.out.println("There's some steering to do!"); LogicalProcess lp = LPs.get(index); SortedSet steer = steering.get(index); int lptime = lp.currentTime; if (steer.isEmpty()) { //System.out.println("No assigned monitoring for LP "+lp.APPid); return; } Iterator it = steer.iterator(); while (it.hasNext()) { SteerNode s = it.next(); if (s.startTime > lptime) break; //set is sorted, so all remaining elements aren't valid //Steering is applied one last time BEFORE old commands are deleted //This way we can set startTime = stopTime for a one-shot command String str = lp.setState(s.targetVar, s.setPoint); if (str != null) System.out.println(str); AcknowledgeNode ack = new AcknowledgeNode(s.setPoint, lptime, str, s.targetVar, s.clientID); Message msg = new Message(lptime, Message.MsgType.SYSTEM_ACKNOWLEDGE, ack, lp.APPid, null); msg.setSenderLP(lp.LPid); msg.setReceiverLP(s.clientID); sendMessage(msg); //NOW we can delete obsolete commands if (s.stopTime < lptime) it.remove(); } } //end applySteering() /** Finds the list index of a given LP (position associated with it in * {@link #LPs}, {@link #monitoring}, and {@link #steering}) based on its * LP. * @param lpid The LPid to look for * @return The index of the first occurrence of this LP, or -1 if none found. */ public int getLPIndexByLPid(int lpid) { if (lpid<0) return -1; for (int i=0; i()); steering.add(new TreeSet()); } catch(Exception e) { e.printStackTrace(); } //NOTE - Unlike PE, SchedulingLoop doesn't keep a LocalNameServer. That is //the responsibility of the communication thread. All messages generated //by LPs go into the outgoing list, even if the communication thread just //turns around and drops them right back into the incoming list. } /**Locates one of the LPs managed by this scheduling loop, based on its APPid. * @param appid The APPid to look for (case insensitive) * @return The first LP found with the given APPid, or null if none found. */ public LogicalProcess getLPByAPPid(String appid) { for (LogicalProcess lp : LPs) { if (appid.equalsIgnoreCase(lp.APPid)) return lp; } return null; } //end getLPByAPPid() }//end class PE