Version 4 (modified by blamario, 5 years ago) |
---|

# Architecture

The SCC framework is implemented in multiple layers. Lower layers are useful by themselves.

## Infrastructure

The framework relies on the monad-parallel package to enable parallel execution of multiple coroutines' steps.

## The lowest layer: trampoline-style nestable coroutines

This layer, implemented by the Control.Concurrent.Coroutine module, provides a limited coroutine functionality in Haskell. The centerpiece of the approach is the monad transformer Coroutine, that transforms an arbitrary monadic computation into a suspendable and resumable one. The basic definition is simple:

newtype Coroutine s m r = Coroutine {resume :: m (CoroutineState s m r)} data CoroutineState s m r = Done r | Suspend! (s (Coroutine s m r)) instance (Functor s, Monad m) => Monad (Coroutine s m) where return x = Coroutine (return (Done x)) t >>= f = Coroutine (resume t >>= apply f) where apply f (Done x) = resume (f x) apply f (Suspend s) = return (Suspend (fmap (>>= f) s))

The Coroutine transformer type is parameterized by a functor. Here is an example of one functor particularly useful for a Coroutine computation:

data Yield x y = Yield x y instance Functor (Yield x) where fmap f (Yield x y) = Yield x (f y)

## Streams

The next layer builds on the coroutine foundation to provide streaming computations. The main idea here is to introduce sinks and sources:

data Sink (m :: * -> *) a x = Sink { put :: forall d. (AncestorFunctor a d) => x -> Coroutine d m Bool, canPut :: forall d. (AncestorFunctor a d) => Coroutine d m Bool } newtype Source (m :: * -> *) a x = Source { get :: forall d. (AncestorFunctor a d) => Coroutine d m (Maybe x) }

The only way to obtain a new source to read from, or a sink to write to, is by launching a new nested coroutine using the function pipe:

pipe :: forall m a a1 a2 x r1 r2. (Monad m, Functor a, a1 ~ SinkFunctor a x, a2 ~ SourceFunctor a x) => (Sink m a1 x -> Coroutine a1 m r1) -> (Source m a2 x -> Coroutine a2 m r2) -> Coroutine a m (r1, r2)

This function takes two coroutines as arguments, *producer* and *consumer*. The producer gets a sink argument, and the consumer a source argument. All data that the producer writes to the sink can be read from the source by the consumer. The arrangement couldn't be simpler. Here is a contrived example of a producer/consumer pair:

import Control.Concurrent.Coroutine import Control.Concurrent.SCC.Streams main = runCoroutine (pipe (rangeProducer 1 10) (sumConsumer 0)) >>= print rangeProducer min max sink | min > max = return () | otherwise = put sink min >> rangeProducer (succ min) max sink sumConsumer sum source = get source >>= maybe (return sum) (\n-> sumConsumer (sum + n) source)

The `rangeProducer` coroutine produces a range of integers while the `sumConsumer` consumes all the numbers and sums them up. The two coroutines are bound together by `pipe`. Note that either of them could be bound to a different coroutine and still perform its job. Furthermore, though in this example both coroutines run in the IO monad, they are completely generic and could run in any monad.

## Components and combinators

What can one do with a number of sources and sinks? The next layer tries to organize the answers. First, it defines the types of various actors on sources and sinks:

type OpenConsumer m a d x r = AncestorFunctor a d => Source m a x -> Coroutine d m r type OpenProducer m a d x r = AncestorFunctor a d => Sink m a x -> Coroutine d m r type OpenTransducer m a1 a2 d x y = (AncestorFunctor a1 d, AncestorFunctor a2 d) => Source m a1 x -> Sink m a2 y -> Coroutine d m [x] type OpenSplitter m a1 a2 a3 a4 d x b = (AncestorFunctor a1 d, AncestorFunctor a2 d, AncestorFunctor a3 d, AncestorFunctor a4 d) => Source m a1 x -> Sink m a2 x -> Sink m a3 x -> Sink m a4 b -> Coroutine d m [x] newtype Consumer m x r = Consumer {consume :: forall a d. OpenConsumer m a d x r} newtype Producer m x r = Producer {produce :: forall a d. OpenProducer m a d x r} newtype Transducer m x y = Transducer {transduce :: forall a1 a2 d. OpenTransducer m a1 a2 d x y} newtype Splitter m x b = Splitter {split :: forall a1 a2 a3 a4 d. OpenSplitter m a1 a2 a3 a4 d x b}

## Dynamic component configuration

This is not so much a layer as an overlay, because it's quite generic. Any value can be made into a configurable component, provided that we supply the following information about it:

- a name,
- the maximum number of threads it can use, and
- the cost of using the component.