import java.io.*; import java.util.*; import java.lang.*; import java.sql.Time; import java.util.concurrent.PriorityBlockingQueue; /**This class represents a logical processe that communicates with other logical * processes by exchanging timestamped messages. LPs cannot "spontaneously" * begin new computations without first receiving a message. * *
If an LP receives a "straggler" message with timestamps smaller than that * its LVT(local virtual time), it rolls back to its LVT and reprocessed (after the straggler) * in timestamp order. Each message sent by a rolled back computation is cancelled by * sending an anti-message that annihilates the original message. A received anti-message * will also cause a rollback if the cancelled message has already been processed. * *
This is an abstract class. The simulation application programmer needs * to implement the abstract methods specified in this class. All such methods * are prefixed "app" for ease of recognition. * *
Version 1.5 changes monitoring/steering yet again. It also adapts * the class to the new multi-threaded design (in which the PE scheduling loop * is a separate thread from the PE communication loop). As a result, this * class (which is handled entirely by the scheduling loop) handles much less * of the communication work than earlier versions did. * *
Version 1.6 changes the format of monitoring/steering slightly (using * Strings instead of Objects). * *
Version 1.7 added a few abstract methods for application programmers to * implement. These methods are needed for state-saving and rollback by the kernel. * * @author Yin Xiong * @author Glenn Matthews * @version 1.7 (07/2006) */ public abstract class LogicalProcess implements Serializable { /**Reserved APPid indicating a message to be broadcast to every LP rather than * sent to a single specific LP. * @since 1.3 */ public static final String ALL_LPS = "All LPs"; /**Unique numeric identifier for the PE where this LP resides; for debugging purposes*/ int PEid; /**Unique numeric identifier assigned to this LP by the system. */ int LPid; /**Unique application-assigned identifier of this LP*/ String APPid; /**Unique application-assigned full name of this LP*/ String myname; /**Configuration data string. Cached in expectation that future versions may * support "suspending" the simulation and writing the simulation status to * disk to be resumed at a later time. */ public String configData; /** Local virtual time(LVT) for this LP. Automatically updated every time * {@link #runLP} is called to match the timestamp of the incoming * {@link Message}. Subclasses may make other changes to this, * but generally should not need to! */ //protected double currentTime; protected int currentTime; /**Value of {@link #currentTime} the previous time {@link #runLP} was called. * Again, automatically updated by runLP; subclasses may make other * changes to it, but only with very good reason. */ //protected double lastTime; protected int lastTime; /**SchedulingLoop that this LP can access; it's declared transient so it *won't be serialized and copied each time this LP's state is saved*/ transient SchedulingLoop scheduler; /** Saves the state of this LP * @since 1.7 */ void savingLPState(LogicalProcess lp) { setLPState(lp); }//end savingLPState /**Sets the state of this LP to the state of the LP passed in as parameter *@since 1.7 */ void setLPState(LogicalProcess lp) { PEid=lp.PEid; LPid=lp.LPid; APPid=lp.APPid; configData=lp.configData; lastTime=lp.lastTime; currentTime=lp.currentTime; }//end setLPState /**Rolls back to the the specified time point and restore the state of this LP *@since 1.7 *@param lpState LogicalProcess that contains the saved state of the LP */ void rollback(LogicalProcess lpState,int rbtime) { setLPState(lpState); appSetLPState(lpState); }//end rollback /**Application LP sets LP state *@since 1.7 *@param lp LogicalProcess containing the state information; the application *program decides what state information need to be stored */ public abstract void appSetLPState(LogicalProcess lp); /**Save LP state, for monitoring-steering purpose*/ public abstract LogicalProcess appSavingLPState(); /**Returns a string representation of the application lp; for debugging purpose */ public abstract String appToString(); /**Sets the end-of-sim flag so the simulation will stop. */ public void setEndOfSimFlag() { scheduler.eosFlag.set(-1); }//end setEndOfSimFlag /**Informs the SchedulingLoop of a new outgoing simulation message. This method * is to be preferred by LogicalProcess implementations, because it requires * no knowledge of the structure of Message. * @param content The contents of the simulation message. * @param timeInc the time increment assigned by the sender(@since 1.7) * @param targetAPPid the {@link #APPid} of the LP the message should go to. */ public final void sendSimMessage( Object content, int timeInc, String targetAPPid) { int sendtime=currentTime+timeInc; //increment time and this is the send time //System.out.println("lasttime,currenttime,sendtime: " + lastTime+ " " + currentTime + " " //+sendtime); Message m = new Message(sendtime, Message.MsgType.SIM_REGULAR, content,APPid, targetAPPid); m.setSenderLP(LPid); //System.out.println("LP " + LPid + " send msg to "+ targetAPPid); scheduler.sendMessage(m); } /**Informs the SchedulingLoop of a new outgoing simulation message. * @param content The contents of the simulation message. * @param timeInc the time increment assigned by the sender(@since 1.7) * @param targetLPid int the LPid of the receiver */ public final void sendSimMessage(Object content, int timeInc, int targetLPid) { int sendtime=currentTime+timeInc; //increment time and this is the send time //System.out.println("lasttime,currenttime,sendtime: " + lastTime+ " " + currentTime + " " //+sendtime); Message m = new Message(sendtime, Message.MsgType.SIM_REGULAR, content); m.setReceiverLP(targetLPid); m.setSenderLP(LPid); //System.out.println("LP " + LPid + " send msg to LP "+ targetLPid); scheduler.sendMessage(m); } /**Assigns the {@link SchedulingLoop} for this LP. * @param sl SchedulingLoop to control this LP. */ public void setScheduler(SchedulingLoop sl) { scheduler = sl; } /**Returns PE id where this LP resides. */ public int getPID() { return PEid; }//ends getPID /**Sets the PE id where the LP resides. For debugging purposes. *@param pid int representing the PE id. */ public void setPID(int pid) { PEid=pid; }//end setPID /**Returns the LP id. */ public int getLID() { return LPid; }//ends getLID /**Sets the LP id for this LP. LP id is system-assigned. *@param lid int representing the LP id. */ public void setLID(int lid) { LPid=lid; }//end setLID /**Returns the application-assigned id of this LP. */ public String getAPPid() { return APPid; }//ends getAppid /**Sets the application-specific short id for this LP. *@param aid String representing the application-specific id for this LP. */ public void setAppID(String aid) { APPid=aid; }//end setAppID /**Returns the name of this LP. */ public String getName() { return myname; }//ends getName /**Sets the long name of this LP. *@param n String, the long name of this LP. */ public void setName(String n) { myname=n; }//end setName /** Returns the current simulation time of this LogicalProcess. * @since 1.5 */ //public double getCurrentTime() { return currentTime; } public int getCurrentTime() { return currentTime; } /**Stores the application-specific data used for configuration and passes it * on to the application to apply. *@param cdata String representing the data that are used for configuration. */ public final void setConfigData(String cdata) { configData=cdata; appSetConfigData(cdata); }//end setConfigData /**Application-specific configuration. * @param cdata String representing configuration data. */ public abstract void appSetConfigData(String cdata); /**Returns a List of Strings describing all monitorable parameters of this LP, * each in the form "paramName, paramType". * Handles all LogicalProcess parameters; calls {@link #appGetMonitorable} to * get application-specific parameters. * Monitorable parameters are: *