//:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** @author Michael E. Cotterell * @version 1.0 * @date Wed Aug 21 20:08:06 EDT 2013 * @see LICENSE (MIT style license file). */ package scalation.coroutine import scala.util.continuations._ import scalation.util.PQItem //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `Coroutine` trait provides a more restricted form of interleaved * execution than actors or threads. In particular, only one coroutine can be * in the running state at any one time. This reduces concurrency, but also * simplifies synchronization problems. For certain domains such as process- * oriented simulation, this restricted concurrency is all that is needed. * This coroutine package schedules couroutines to run in time order. *

* The current implementation is based on delimited continuations. To use * continuations, provide the option `-P:continuations:enable` to the Scala * compiler or REPL to activate the compiler plugin. */ trait Coroutine extends PQItem with Ordered [Coroutine] { /** If this coroutine has been suspended, then this contains the remainder * of the current coroutine execution. That is, when a coroutine is * suspended, the remainder of the coroutine's `run` function is * transformed turned into a closure by the continuations plugin and then * stored here so that the coroutine can be resumed. */ private var _session: Unit => Unit = null /** Current state of the coroutine. */ private var _state: Coroutine.State = Coroutine.Created //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Current state of the coroutine. */ def state (): Coroutine.State = _state //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Compare the activation times of the two coroutines. */ def compare (other: Coroutine): Int = other.actTime.compare (actTime) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** When the coroutine is started, this function is executed before the `run` function. */ def preRun (): Unit @suspendable = () //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** When the coroutine is finished executing its `run` function, this * function is executed. */ def postRun (): Unit @suspendable = () //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** When an an instance of a class that mixes in `Couroutine`, starting the * coroutine causes the `run` function to be called. The general contract * among all coroutines is that only the `run` function one coroutine can * be executing at any given time. */ def run (): Unit @suspendable //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Causes this coroutine to begin execution. If another coroutine is * currently being executed, then that coroutine is suspended before this * one is executed. If this coroutine has already been started, then this * has the same effect as the `restart` function. */ final def start () { reset { _state = Coroutine.Running preRun () run () postRun () end () } // reset } // start //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Causes this coroutine to restart execution. If another coroutine is * currently being executed, then that coroutine is suspended before this * one is executed. If this coroutine has not already been started, then * this has the same effect as the `run` function. It is important to note * that the state from any previous execution of this coroutine is not saved. */ final def restart (): Unit @suspendable = start () //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** If the coroutine is running, it is suspended and makes no further * progress unless and until it is resumed. *

* If the coroutine is in some other state, the behavior of this function * is to be treated as undefined. */ final def suspend (): Unit @suspendable = { // suspend execution and save the remaining execution as the session shift { remaining: (Unit => Unit) => { _session = remaining; _state = Coroutine.Suspended } } // shift } // suspend //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** If the coroutine is suspended, it is resumed and continues its * execution. *

* If the coroutine is in some other state, the behavior of this function * is to be treated as undefined. */ final def resume () { _state = Coroutine.Running // update the state _session () // execute the remaining session } // resume //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Terminates the coroutine, regardless of state. */ final def end (): Unit @suspendable = { _session = null // get rid of the session _state = Coroutine.Ended // update the state } // end //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** If the coroutine is running, suspends the coroutine for a certain amount * of time. If the coroutine is suspended, then begin execution again after * a certain amount of time. Time units depend on the coroutine scheduler. * If no coroutine scheduler is provided then the default, * `Coroutine.defaultScheduler`, is used. * @param delay time delay before the coroutine is resumed * @param scheduler the coroutine scheduler */ final def pause (delay: Int, scheduler: Scheduler = Coroutine.defaultScheduler): Unit @suspendable = { // suspend execution and save the remaining execution as the session shift { remaining: (Unit => Unit) => { _session = remaining _state = Coroutine.Paused actTime = scheduler.clock + delay // set the time for future execution scheduler.reschedule (this) // reschedule the coroutine scheduler.resume () // resume the scheduler } } // shift } // wait //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** If the coroutine is running, suspend execution indefinitely and yield * to another coroutine. If the other coroutine has never been started, * then it is started. *

* If the coroutine is in some other state, the behavior of this function * is to be treated as undefined. * @param coroutine the coroutine to yield to */ final def yieldToCoroutine (coroutine: Coroutine): Unit @suspendable = { // suspend execution and save the remaining execution as the session shift { remaining: (Unit => Unit) => { _session = remaining _state = Coroutine.Paused if (coroutine.state == Coroutine.Created) { coroutine.start() } else { coroutine.resume() } // if } } // shift } // yieldToScheduler } // Coroutine //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `Coroutine` object provides case objects for the various coroutine * states as well as a default coroutine scheduler. */ object Coroutine { //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Describes the state of a coroutine. */ sealed trait State //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The coroutine is newly created. */ final case object Created extends State //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The coroutine is running. */ final case object Running extends State //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The coroutine is paused for a definite amount of time. After that amount * of time, the coroutine will automatically be resumed. This differs from * `Suspended` where the coroutine must explicitly be resumed. */ final case object Paused extends State //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The coroutine is suspended indefinitely. In order for the coroutine to * resume execution, it must explicitly be resumed. */ final case object Suspended extends State //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The coroutine has finished execution. */ final case object Ended extends State //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The default coroutine scheduler. This is a scheduler backed by a * priority queue. */ final val defaultScheduler = new PriorityScheduler () } // Coroutine //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `CoroutineDSL` object provides a lightweight DSL for creating * coroutines. */ object CoroutineDSL { //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Creates a coroutine. * Here is an example: * * {{{ * val cr = coroutine ("coolCoroutine") { self: Coroutine => * println ("hello") * self.pause (1) * println ("there") * self.yieldToCoroutine(Coroutine.defaultScheduler) * } // cr * }}} */ def coroutine (cname: String) (f: (Coroutine) => Unit @suspendable): Coroutine = new Coroutine { override def toString () = "coroutine(%s)".format(cname) def run (): Unit @suspendable = f (this) } // coroutine //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Creates a coroutine that yields to the default scheduler once it's done * executing. */ def simActor (name: String) (f: (Coroutine) => Unit @suspendable): Coroutine = new Coroutine { override def toString () = "simActor(%s)".format (name) override def postRun (): Unit @suspendable = yieldToCoroutine (Coroutine.defaultScheduler) def run (): Unit @suspendable = f (this) } // simActor } // CoroutineDSL //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `CoroutineTest` object is used to test coroutines. */ object CoroutineTest extends App { // creating a coroutine this way requires that the run function have the // return type `Unit @suspendable` and also requires that the continuations // package be imported case class CoolCoroutineA () extends Coroutine { def run (): Unit @suspendable = { println ("m") pause (1) println ("c") pause (1) println("a") pause (1) println ("l") } // run } // CoolCoroutineA val a = CoolCoroutineA () // creating a coroutine using the coroutine DSL looks a little nicer and // avoids the need to import the continuations package import CoroutineDSL._ val b = coroutine ("CoolCoroutineB") { self: Coroutine => println ("i") self.pause (1) println ("h") self.pause (1) println ("e") self.yieldToCoroutine (Coroutine.defaultScheduler) } // b val sched = Coroutine.defaultScheduler sched.reschedule (a) sched.reschedule (b) sched.start () } // CoroutineTest