import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.Iterator;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.ArrayList;
import java.util.SortedSet;
import java.util.TreeSet;
import java.io.*;
import java.util.*;
/**This class provides the simulation engine logical process scheduling loop. This
* loop runs in its own thread in parallel with the communication thread
* of the PE. After instantiation, this class should have one or more
* LogicalProcesses added (by {@link #createLP createLP}) before the thread is
* started.
*
Requires Java 5.0 due to use of the
* PriorityBlockingQueue
class and others.
* @author Yin Xiong
* @author Glenn Matthews
* @version 1.9 (09/2006)
*/
public class SchedulingLoop implements Runnable{
/**The id of the parent PE of this scheduler.*/
int id;
/**The flag indicating the end of sim*/
Flag eosFlag;
/**Flag for computing GVT*/
Flag gvtFlag;
/**The most recent GVT*/
int GVT;
/**The frequency that GVT should be computed*/
int gvtClock;
/**The queue that store the saved LP state*/
Vector lpStates;
/** Very primitive way of stopping the run loop: set this to true.
* Run loop will terminate after current iteration and clean up after
* itself.
*/
public boolean done = false;
/** Shared priority queue containing incoming {@link Message}s from the
* communication thread.
*/
public PriorityBlockingQueue incoming;
/** Shared priority queue containing outgoing {@link Message}s to the
* communication thread. Note that the addressing of these Messages are
* not likely to be fully specified, since this class does not have access
* to a GlobalNameServer. In general, the communication thread should
* expect to have to use {@link Message#senderAPPid} and
* {@link Message#receiverAPPid} to look up the LPid and PE of the sender
* and receiver. The communication thread is also expected to recognize
* and handle the case where receiverAPPid ==
* {@link LogicalProcess#ALL_LPS}.
*/
public PriorityBlockingQueue outgoing;
/** Local list storing all LogicalProcesses managed by this SchedulingLoop. */
List LPs;
/** Local priority queue storing simulation messages yet to be handled. */
public PriorityQueue simMessages;
/** @since 1.7.
* The queue that contains anti-messages. A anti-message is a copy of the
* regular message with a negative sign.
* When processing an event on LP i that schedules an event on
* LP j, a message and a copy of the same message are created.
* The copy is the 'anti-message' sometimes called negative, or copy of the
* positive message. The original (positive) message is transmitted to the
* destination LP j and the anti-message stays at the source LP i as a record
* that the LP i transmitted a positive message to LP j
*/
Vector antiMsgsOut=new Vector();
/**@since 1.7
*The queue that contains unmatched anti-messages. Their positive counter-
*parts haven't been received yet.
*/
Vector antiMsgsIn=new Vector();
/**@since 1.7
*The queue that stores the processed messages; used for rollback;
*When an anti-message comes in, the scheduler first check if the
*positive message has already been processed; if yes, rollback; if
*not, cancel it; if the positive message is yet to receive, then store
*the anti message in antiMsgsIn.
*/
Vector eventQueue=new Vector();
/** Local list storing sorted sets (one per LP) of current and future
* monitoring commands.
*/
private List> monitoring;
/** Local list storing sorted sets (one per LP) of current and future
* steering commands.
*/
private List> steering;
/**Constructs a new PE instance. Sets the id of the PE to peid.
*@param pid int the id of the PE, i.e. the id of this process.
*/
public SchedulingLoop(int pid)
{
id=pid;
gvtClock=5;
LPs=new Vector();
incoming=new PriorityBlockingQueue();
outgoing=new PriorityBlockingQueue();
LPs = new ArrayList();
simMessages = new PriorityQueue();
monitoring = new ArrayList>();
steering = new ArrayList>();
lpStates=new Vector();
}//end constructor
/**Sets the GVT clock.
*@param gc int the frequency that GVT will be computed.
*@since 1.9
*/
void setGVTClock(int gc)
{
gvtClock=gc;
}//end setGVTClock
/**Sets the flags.
*@param ef Flag end-of-program flag
*@param gf Flag flag indicating it's time to compute GVT
*@since 1.9
*/
void setFlags(Flag ef, Flag gf)
{
eosFlag=ef;
gvtFlag=gf;
}//end setFlags
/**Computes the local minimum timestamp using Samadi's algorithm which dependes on
*acknowledgement for each and every message sent
*local minimum timestamp = the minumum timestamp among
*
*- all unprocessed events
*
- all unacknowledged messages and anti-messages it has sent
*
- all marked acknowledgement messages it has received since it last received
*a new GVT value
*
* the PE sets a flag indicating it's in find mode
*/
public int computeLocalMin()
{
int localMin=-1;
int temp;
//check unprocessed event queue
if(simMessages.size()>0){
Message msg=(Message)simMessages.peek();
temp=msg.timeStamp;
if(temp>GVT){
if(localMin<0 || (localMin>0 && tempGVT){
if(localMin<0 || (localMin>0 && tempGVT){
if(localMin<0 || (localMin>0 && temp0 && lp.currentTime0){
for(int i=0;irbtime && m.senderLP==lp){
if(index>i)
index=i;
System.out.println("scheduler " + id + " sending anti msg for LP " + lp + " with timestamp " + m.timeStamp);
//System.out.println("check msg type: " + m.msgType);
sendMessage(m);
m.tag=1; //change tag to 1 to indicate this amti msg has already been sent
antiMsgsOut.setElementAt(m,i);
}//end if
}//end for
System.out.println("scheduler " + id + " finished sending anti msgs");
//message at index is the new beginning, return this message for the LP to send out
m=(Message)antiMsgsOut.get(index);
m.setMsgType(Message.MsgType.SIM_REGULAR);
System.out.println("after rollback, LP " + lp + " begins with resending " + m.timeStamp);
return m;
}//end sendAntiMsgs
/** Method required by Runnable interface; called when the thread is first
* created. Starts the scheduling loop and continues looping until {@link #done}
* is set to true
.
*/
public void run() {
int mostRecentLP = -1;
done = false;
//begin scheduling loop
while (!done) {
//deal with all incoming messages
processIncoming();
//apply the most recent simulation message to the LP it's aimed at
mostRecentLP = runOneLP();
//apply scheduled monitoring/steering to that LP
if (mostRecentLP != -1) {
applyMonitoring(mostRecentLP);
applySteering(mostRecentLP);
}//end apply monitoring/steering
/*******************added by yin for stopping sim************/
try{
Thread thisThread=Thread.currentThread();
thisThread.yield();
if(eosFlag.get()<0){
done=true;
//System.out.println("scheduler " + id + " gets flag " + eosFlag.get() + " and is shutting down.");
}
/******************end added by yin for stopping sim********/
}
catch(Exception e){
e.printStackTrace();
}//end try-catch
} //end while (!done)
//TODO - cleanup and finish running
} // end run()
/** Calls {@link LogicalProcess#runLP} for the first {@link Message} in the
* {@link #simMessages} queue. Messages directed to a LogicalProcess not
* owned by this SchedulingLoop are assumed to have been misdelivered, and
* are placed in outgoing
.
* @return The index of the given LogicalProcess in the local lists.
*/
public int runOneLP()
{
if (simMessages.size() == 0)
return -1; //no messages to handle!
Message m = simMessages.poll();
eventQueue.add(m);
//System.out.println("scheduler " + id + " processed " + eventQueue.size() + " events");
String target=m.getReceiverAPPid();
int index=-1;
if(target!=null)
index = getLPIndexByAPPid(target);
else
index = getLPIndexByLPid(m.getReceiverLP());
if (index <0) {
System.out.println("Destination LP not found ");
//put the message back into outgoing queue
sendMessage(m);
return -1; //Unknown LP!
}
//Pass the message into the LP in question
LogicalProcess lp = LPs.get(index);
//before run LP(and change LP state),save LP state
LogicalProcess loglp=lp.appSavingLPState();
loglp.savingLPState(lp);
lpStates.add(loglp);
//System.out.println("PE " + id + " saving lp state: " + loglp.toString());
//System.out.println("lastTime,currentTime: " + loglp.lastTime + " " + loglp.currentTime);
//if buffer is "full" (as spesified by gvtClock), set flag for computing GVT
if(lpStates.size()>gvtClock){
gvtFlag.set(id);
System.out.println("buffer used: " + lpStates.size() + " set gvt flag");
}
//check local virtual time and receive time; if LVT > receive time, roll back
if(lp.currentTime>m.timeStamp){
System.out.println("\n" + lp.myname+ ": local virtual time: "
+lp.currentTime+", receive time: "+m.timeStamp);
System.out.println("***" + lp.myname +": got a message from the past, rollback to " + m.timeStamp + "\n");
Message mm=rollBack(lp,m.timeStamp);
//lp.runLP(mm);
}
else
lp.runLP(m);
return index;
} // end runOneLP()
/**Rolls back to the time specified by rbtime and returns the first message
*immediately after the rollback. e.g. if roll back from 4 to 2, then the
*returned message should be 3.
*@param theLP a LogicalProcess that needs to rollback
*@param rbtime int the time the LP will rollback to
*/
Message rollBack(LogicalProcess theLP, int rbtime)
{
//System.out.println("in rollBack, saved LP states: " + lpStates.size());
for(int i=0;i iterator = simMessages.iterator();
try {
while (iterator.hasNext()) {
Message pm = iterator.next();
if(pm.timeStamp==m.timeStamp && pm.receiverLP==m.receiverLP){
System.out.println("found matching positive msg, annihilating " + pm.timeStamp);
simMessages.remove(pm);
done=true;
break;
}//end if
}//end while
}//end try
catch(Exception e){
e.printStackTrace();
}
}//end if
//if the the positive msg hasn't received yet, store this anti message in the antiMsgsIn queue
if(!done){
System.out.println("scheduler " + id + " hasn't received the positive msg yet, store anti msg");
antiMsgsIn.add(m);
}
}//end annihilateMsg
/**@since 1.7
*Returns true if the message has been canceled with an anti-message.
*@param m Message to be checked
*/
boolean isMsgCanceled(Message m)
{
boolean canceled=false;
for(int i=0;i
* {@link Message.MsgType#SIM_REGULAR} - placed in {@link #simMessages}.
* {@link Message.MsgType#SYSTEM_MONITOR} - placed in {@link #monitoring}.
* {@link Message.MsgType#SYSTEM_STEER} - placed in {@link #steering}.
*
* Messages of unrecognized MsgType are discarded. Messages directed to
* LogicalProcesses not owned by this SchedulingLoop are also discarded.
*/
public void processIncoming() {
if (incoming.size() == 0)
return;
//System.out.println("Incoming message!");
//NOTE - According to the Java 5.0 documentation, this Iterator
//is *NOT* guaranteed to traverse the PriorityBlockingQueue in any
//particular order. In this particular case, we want to get everything
//out of it, so that's not a problem.
Iterator iterator = incoming.iterator();
try {
while (iterator.hasNext()) {
Message m = iterator.next();
switch (m.getMsgType()) {
case SIM_REGULAR:
boolean canceled=isMsgCanceled(m);
if(!canceled)
simMessages.offer(m);
break;
case SIM_ANTI:
System.out.println("PE " + id + " msg type ANTI");
annihilateMsg(m);
break;
case SYSTEM_MONITOR:
MonitorNode mn;
try {
mn = (MonitorNode)m.content;
int i = getLPIndexByAPPid(m.receiverAPPid);
if (i >= 0)
monitoring.get(i).add(mn);
else
System.out.println("Monitoring message was"+
"directed to LP \""+m.receiverAPPid+
"\" - not present here!");
} catch (ClassCastException cce) {
System.out.println("Monitoring message does not"+
"contain a MonitorNode object!");
}
break;
case SYSTEM_STEER:
SteerNode sn;
try {
sn = (SteerNode)m.content;
int i = getLPIndexByAPPid(m.receiverAPPid);
if (i >= 0)
steering.get(i).add(sn);
else
System.out.println("Steering message was"+
"directed to LP \""+m.receiverAPPid+
"\" - not present here!");
} catch (ClassCastException cce) {
System.out.println("Steering message does not"+
"contain a SteerNode object!");
}
break;
//TODO - process other message types?
default:
System.out.print("Message type "+m.getMsgType()+
" is not recognized by this SchedulingLoop! ");
System.out.println("\""+m+"\" will be discarded.");
break;
}
iterator.remove();
}
} catch (ConcurrentModificationException e) {
//System.out.println("Error! Concurrent modification of \"incoming\""+
// " PriorityBlockingQueue!");
//e.printStackTrace();
}
} // end processIncoming()
/** Applies any scheduled monitoring of a single LP.
* @param index The storage index of the LP to monitor. */
public void applyMonitoring(int index) {
//System.out.println("There's some monitoring to do!");
LogicalProcess lp = LPs.get(index);
SortedSet mon = monitoring.get(index);
int lptime = lp.currentTime;
if (mon.isEmpty()) {
//System.out.println("No assigned monitoring for LP "+lp.APPid);
return;
}
Iterator it = mon.iterator();
while (it.hasNext()) {
MonitorNode m = it.next();
if (m.startTime > lptime)
break; //set is sorted, so all remaining elements aren't valid
//Monitoring is applied BEFORE old commands are deleted
//This way we can set startTime = stopTime for a one-shot command
String s = lp.getState(m.targetVar);
ReportNode rep = new ReportNode(s,lptime,m.targetVar,m.clientID);
Message msg = new Message(lptime, Message.MsgType.SYSTEM_REPORT, rep,
lp.APPid, null);
msg.setSenderLP(lp.LPid);
msg.setReceiverLP(m.clientID);
sendMessage(msg);
//NOW we can delete this command if it's obsolete.
if (m.stopTime < lptime)
it.remove();
}
} //end applyMonitoring()
/** Applies any scheduled steering to the specified LP.
* @param index The storage index of the LP to steer. */
public void applySteering(int index) {
//System.out.println("There's some steering to do!");
LogicalProcess lp = LPs.get(index);
SortedSet steer = steering.get(index);
int lptime = lp.currentTime;
if (steer.isEmpty()) {
//System.out.println("No assigned monitoring for LP "+lp.APPid);
return;
}
Iterator it = steer.iterator();
while (it.hasNext()) {
SteerNode s = it.next();
if (s.startTime > lptime)
break; //set is sorted, so all remaining elements aren't valid
//Steering is applied one last time BEFORE old commands are deleted
//This way we can set startTime = stopTime for a one-shot command
String str = lp.setState(s.targetVar, s.setPoint);
if (str != null)
System.out.println(str);
AcknowledgeNode ack = new AcknowledgeNode(s.setPoint, lptime,
str, s.targetVar, s.clientID);
Message msg = new Message(lptime, Message.MsgType.SYSTEM_ACKNOWLEDGE,
ack, lp.APPid, null);
msg.setSenderLP(lp.LPid);
msg.setReceiverLP(s.clientID);
sendMessage(msg);
//NOW we can delete obsolete commands
if (s.stopTime < lptime)
it.remove();
}
} //end applySteering()
/** Finds the list index of a given LP (position associated with it in
* {@link #LPs}, {@link #monitoring}, and {@link #steering}) based on its
* LP.
* @param lpid The LPid to look for
* @return The index of the first occurrence of this LP, or -1 if none found.
*/
public int getLPIndexByLPid(int lpid) {
if (lpid<0)
return -1;
for (int i=0; i());
steering.add(new TreeSet());
} catch(Exception e) {
e.printStackTrace();
}
//NOTE - Unlike PE, SchedulingLoop doesn't keep a LocalNameServer. That is
//the responsibility of the communication thread. All messages generated
//by LPs go into the outgoing list, even if the communication thread just
//turns around and drops them right back into the incoming list.
}
/**Locates one of the LPs managed by this scheduling loop, based on its APPid.
* @param appid The APPid to look for (case insensitive)
* @return The first LP found with the given APPid, or null if none found.
*/
public LogicalProcess getLPByAPPid(String appid) {
for (LogicalProcess lp : LPs) {
if (appid.equalsIgnoreCase(lp.APPid))
return lp;
}
return null;
} //end getLPByAPPid()
}//end class PE