//:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** @author John Miller * @version 1.1 * @date Sat Mar 21 20:34:23 EDT 2015 * @see LICENSE (MIT style license file). * * Coroutine implementation options: * (1) Java Threads, (2) Scala Actors, (3) Akka Actors, (4) Scala Continuations * This one uses Java Threads. */ package scalation.process import java.util.concurrent.Semaphore //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `Coroutine` class supports (one-at-a-time) quasi-concurrent programming. * A coroutine runs/acts until it yields control from 'this' to 'that' coroutine. * When resumed, a coroutines continues its execution where it left off. * When a coroutine finishes, it may be terminated or zombified, in which case * it may be restarted. */ abstract class Coroutine (label: String = "cor") extends Runnable { import Coroutine._ private val DEBUG = true // debug flag private val LIMIT = 100 // maximum executions of act method private val _sema = new Semaphore (0) // waiting semaphore private var alive = true // whether this coroutine has not terminated private var started = false // whether this coroutine has started private var _isZombie = false // whether this coroutine is a zombie private val thr = new Thread (this) // thread to drive runnable coroutine nCreated += 1 private val id = label + "." + nCreated if (DEBUG) println ("constructor: " + id + " waits to be STARTed") //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Return the Coroutine counts. */ def counts: Tuple3 [Int, Int, Int] = (nCreated, nStarted, nTerminated) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Return whether this coroutine is a zombie. */ def isZombie: Boolean = _isZombie //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Zombify this coroutine. */ def zombify () { _isZombie = true } //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Thread's 'run' method delegates to the 'act' method. Upon interruption * the 'act' method is run again from the beginning. */ def run () { nStarted += 1 var i = 0 do { i += 1 try { act () } catch { case ex: InterruptedException => println ("run: RESTART coroutine " + id + " for the " + i + "th time") } // try } while (alive && i < LIMIT) nTerminated +=1 if (DEBUG) println ("run: TERMINATE coroutine " + id) } // run //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Actor model features the 'act' method, even though threads are used. * This abstract method must be implemented in application models. */ def act () //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Yield control from 'this' to 'that' coroutine. * @param that the other coroutine to yield control to * @param quit whether 'this' coroutine is to terminate (true) * or wait to be resumed (false) */ def yyield (that: Coroutine, quit: Boolean = false) { if (that != null) { if (that._isZombie) { if (DEBUG) println ("yyield: " + id + " RESTARTs that zombie coroutine " + that.id) that.interrupt () } else if (that.started) { if (DEBUG) println ("yyield: " + id + " RESUMEs that coroutine " + that.id) that.resume () } else { if (DEBUG) println ("yyield: " + id + " STARTs that new coroutine " + that.id) that.start () } // if } // if if (quit) { if (DEBUG) println ("yyield: " + id + " TERMINATEs") terminate () // terminate this coroutine return } else { _sema.acquire () // wait until resumed } // if } // yyield //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Start this coroutine, i.e., invoke its 'run' -> 'act' method. */ def start () { started = true thr.start () } // start //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Interrupt this waiting zombie coroutine. */ def interrupt () { if (_isZombie) { _isZombie = false thr.interrupt () } else { println ("interrupt: only zombie coroutine may be interrupted") } // if } // interrupt //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Resume this coroutine. */ private def resume () { _sema.release () } // resume //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** Terminate this coroutine. */ private def terminate () { alive = false } } // Coroutine class //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `Coroutine` companion object provides counters for the `Coroutine` class. */ object Coroutine { var nCreated = 0 // number of Coroutines created var nStarted = 0 // number of Coroutines started var nTerminated = 0 // number of Coroutines terminated } // Coroutine companion object //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: /** The `CoroutineTest` object is used to test the `Coroutine` class. * Should print: * Cor1: phase 1 * Cor2: phase 1 * Cor1: phase 2 * Cor2: phase 2 */ object CoroutineTest extends App { class Cor1 extends Coroutine { override def act () { println ("Cor1: phase 1") yyield (cor2) println ("Cor1: phase 2") yyield (cor2, true) } // act } // Cor1 class Cor2 extends Coroutine { override def act () { println ("Cor2: phase 1") yyield (cor1) println ("Cor2: phase 2") yyield (null, true) } // act } // Cor2 val cor1 = new Cor1 () val cor2 = new Cor2 () println ("start coroutines") cor1.start () } // CoroutineTest