Version 3 (modified by blamario, 8 years ago)



The SCC framework is implemented in multiple layers. Lower layers are useful by themselves.

The lowest layer: trampoline-style nestable coroutines

This layer, implemented by the Control.Concurrent.Coroutine module, provides a limited coroutine functionality in Haskell. The centerpiece of the approach is the monad transformer Coroutine, that transforms an arbitrary monadic computation into a suspendable and resumable one. The basic definition is simple:

newtype Coroutine s m r = Coroutine {resume :: m (CoroutineState s m r)}

data CoroutineState s m r = Done r | Suspend! (s (Coroutine s m r))

instance (Functor s, Monad m) => Monad (Coroutine s m) where
   return x = Coroutine (return (Done x))
   t >>= f = Coroutine (resume t >>= apply f)
      where apply f (Done x) = resume (f x)
            apply f (Suspend s) = return (Suspend (fmap (>>= f) s))

The Coroutine transformer type is parameterized by a functor. Here is an example of one functor particularly useful for a Coroutine computation:

data Yield x y = Yield x y
instance Functor (Yield x) where
   fmap f (Yield x y) = Yield x (f y)


The next layer builds on the coroutine foundation to provide streaming computations. The main idea here is to introduce sinks and sources:

data Sink (m :: * -> *) a x = Sink {
   put :: forall d. (AncestorFunctor a d) => x -> Coroutine d m Bool,
   canPut :: forall d. (AncestorFunctor a d) => Coroutine d m Bool

newtype Source (m :: * -> *) a x = Source {
   get :: forall d. (AncestorFunctor a d) => Coroutine d m (Maybe x)

The only way to obtain a new source to read from, or a sink to write to, is by launching a new nested coroutine using the function pipe:

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) =>
        (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)

This function takes two coroutines as arguments, producer and consumer. The producer gets a sink argument, and the consumer a source argument. All data that the producer writes to the sink can be read from the source by the consumer. The arrangement couldn't be simpler. Here is a contrived example of a producer/consumer pair:

import Control.Concurrent.Coroutine
import Control.Concurrent.SCC.Streams

main = runCoroutine (pipe (rangeProducer 1 10) (sumConsumer 0)) >>= print

rangeProducer min max sink | min > max = return ()
                           | otherwise = put sink min
                                         >> rangeProducer (succ min) max sink
sumConsumer sum source = get source
                         >>= maybe (return sum)
                                   (\n-> sumConsumer (sum + n) source)

The rangeProducer coroutine produces a range of integers while the sumConsumer consumes all the numbers and sums them up. The two coroutines are bound together by pipe. Note that either of them could be bound to a different coroutine and still perform its job. Furthermore, though in this example both coroutines run in the IO monad, they are completely generic and could run in any monad.

Components and combinators

What can one do with a number of sources and sinks? The next layer tries to organize the answers. First, it defines the types of various actors on sources and sinks:

type OpenConsumer m a d x r = AncestorFunctor a d => Source m a x -> Coroutine d m r
type OpenProducer m a d x r = AncestorFunctor a d => Sink m a x -> Coroutine d m r
type OpenTransducer m a1 a2 d x y = 
   (AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 y -> Coroutine d m [x]
type OpenSplitter m a1 a2 a3 a4 d x b =
   (AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d, AncestorFunctor a4 d) =>
   Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Sink m a4 b -> Coroutine d m [x]

newtype Consumer m x r = Consumer {consume :: forall a d. OpenConsumer m a d x r}
newtype Producer m x r = Producer {produce :: forall a d. OpenProducer m a d x r}
newtype Transducer m x y = Transducer {transduce :: forall a1 a2 d. OpenTransducer m a1 a2 d x y}
newtype Splitter m x b = Splitter {split :: forall a1 a2 a3 a4 d. OpenSplitter m a1 a2 a3 a4 d x b}

Dynamic component configuration

This is not so much a layer as an overlay, because it's quite generic. Any value can be made into a configurable component, provided that we supply the following information about it:

  • a name,
  • the maximum number of threads it can use, and
  • the cost of using the component.

The highest layer: Command-line shell language