# Effortless Monad Transformers for Pipes, Conduit and more

One of the great features provided by Pipes is the ability to utilize monad transformers in isolated sections of a pipeline. First consider an element counter pipe that sends the number of elements it has recieved downstream for every element recieved form upstream. Using the state monad transformer is a natural way to approach this problem, like the following:

```
counter = go
where
go = do
x <- await
lift $ S.modify (+1)
r <- lift S.get
yield r
go
> (`S.evalStateT` 0) . runEffect $ each "hi there" >-> counter >-> P.print
1
2
3
4
5
6
7
8
```

Combining several independant counters or several pipes unilizing diverse monad transformers into a single pipeline is problematic however.

- Every section of pipeline can get access to the state. The state is global to the pipeline
- Combining several necessitates the need for nesting
`hoist lift`

in ever more complexity.

To solve these issues `Pipes.Lift`

from the main pipes package provides facilities to isolate the most common monad transformers to sections of a pipeline with `runStateP`

, `runMaybeP`

, ...

```
> import qualified Pipes.Lift as PL
> let localCounter = PL.evalStateP 0 counter
> runEffect $ each "hi there" >-> localCounter >-> P.print
1
2
3
4
5
6
7
8
```

The `Pipes.Lift`

facilities solve the above problems in isolation then composed together without inducing any overhead.

The current implementation of the facilities in `Pipes.Lift`

requires touch the internal representation of both the `Pipe`

s type and the monad transformer in question. Below I will demonstrate how to simply implement most monad transformers for pipes with out touch either the internal representation of the `Pipe`

type or that of the monad transformer. This make it easy to add existing monad transform functionality to any existing pipeline.

Example monad transformers that this applies to are:

- Transfomers
- ErrorT
- MaybeT
- WriterT
- ReaderT
- StateT
- RWST
- EitherT

- MonadRandom
- And more.

Any monad that has a MFunctor form the mmorph package instance works flawlessly and composing seemlessly with other such monads.

Other monads that can not have a MFunctor instance, or just do not, can be used as well but they do not automatically compose and a little extra care is needed when writing streaming facilities using them.

- ContT - From Transfomers. An indexed/paramaterized ContT has a MFunctor instnace however.
- ST Monad Transformer - A type guarantee that the STRefs are not escaping down the pipeline is required, this can be as simple as specifing a concrete type like
`Int`

. - Monad-logger - I do not know if a MFunctor can exist for this monad.
- I am sure many more.

I will be focusing on monad transformers that have a MFunctor instance for this article. I will be posting examples of both types soon, you can get in contact with me at Patrick dot John dot Wheeler at gmail dot com if you would like a few rough exmaples sooner.

Here are some example implementations explanations will follow below:

```
runStateP s = (`evalStateT` s) . runSubPipeT
runErrorP = E.runErrorT . runSubPipeT
runMaybeP = M.runMaybeT . runSubPipeT
runRandP = Rand.runRandT . runSubPipeT
-- Monads with out MFunctor instances
runContP mf = (`runContT` mf) . runSubPipeT'
-- log debug information to stdout.
runStdoutLoggerP = runStdoutLoggingT . runSubPipeT'
-- Conduit example
> CL.sourceList [1..5] $= evalStateC 0 counterC $$ sinkPrint
1
2
3
4
5
```

Similar implementations exist for bidirectional `Proxy`

s and at the end of the day the unidirectional `runStateP`

and friends above are just specialization's from the bidirectional compatible forms as will be shown later.

The outline of the technique is to have a section of pipeline `await`

from two monad layers lower then itself, `catFromLifted`

, and `yield`

back to a monad level two lower then itself, `catToLifted`

, when the section needing the monad is done.

```
{-
catFromLifted = go
where
go = do
x <- lift . lift $ await
yield x
go
-}
catFromLifted = (lift . lift . request) >\\ cat
{-
catToLifted = go
where
go = do
x <- await
lift . lift $ yield x
go
-}
catToLifted = cat //> (lift . lift . respond)
```

Lets look at the type signature for catFromLifted a little closer.

```
catFromLifted
:: (Monad (t (Proxy () y y2 y1 m)), Monad m, MonadTrans t) =>
Proxy a1 a () y (t (Proxy () y y2 y1 m)) c
```
(note: I am not clear why this section ends up in a code block, let me know, if you do)
The `t` monad transformer layer is where the monad transformer will end up being run.
Lets look at an example using the same `counter` pipe used above:
``` haskell
counter = go
where
go = do
await -- discard upstream values
s <- lift get
lift $ modify (+1)
yield s
go
localState = (`evalStateT` 0) . runEffect $ catFromLifted >-> hoist (hoist lift) counter >-> catToLifted
mainPipeLIne = runEffect $ each "hi" >-> localState >-> P.print
```

generalizing this some we have:

```
runSubPipeT p = runEffect $ catFromLifted >-> hoist (hoist lift) p >-> catToLifted
-- runStateP s p = (`evalStateT` s) (runSubPipeT p)
runStateP s = (`evalStateT` s) . runSubPipeT
```

Compare the above `runStateP`

to the this is the current implementation in `Pipes.Lift`

.

```
runStateP
:: (Monad m)
=> s -> Proxy a' a b' b (S.StateT s m) r -> Proxy a' a b' b m (r, s)
runStateP = go
where
go s p = case p of
Request a' fa -> Request a' (\a -> go s (fa a ))
Respond b fb' -> Respond b (\b' -> go s (fb' b'))
Pure r -> Pure (r, s)
M m -> M (do
(p', s') <- S.runStateT m s
return (go s' p') )
```

As shown above this approach generalizes to all of the core transformers monads and more besides.

## Bidirectionality

This approach easily generalizes to bidirectional proxies. `await`

becomes `request`

, `yield`

becomes `respond`

, `cat`

becomes `pull`

. You will noticed that there is some extra function composition with `.`

this is to work the functions around the the initial argument to `pull :: Monad m => a' -> Proxy a' a a' a m r`

.

```
pullFromLifted = ((lift . lift . request) >\\) . pull
pullToLifted = (//> (lift . lift . respond)) . pull
-- Wraps a proxy so it requests and resonds to a proxy two levels lower then itself
fromToLiftedB p = (//> (lift . lift . respond))
. ((lift . lift .request) >\\)
. hoist (hoist lift)
. p
runSubPipeTB = (runEffect' .) . fromToLiftedB
runErrorPB = (E.runErrorT .) . runSubPipeTB
runMaybePB = (M.runMaybeT .) . runSubPipeTB
runReaderPB r = ((`R.runReaderT` r) .) . runSubPipeTB
runWriterPB = (W.runWriterT .) . runSubPipeTB
runStatePB s = ((`S.runStateT` s) .) . runSubPipeTB
```

The approach in Pipes 4.0.0 focuses on unidirectional code but the base data type works with bidirectional data streams. So it is convenient to be able to take a bidirectional compatible function and turn it into a function compatible with the unidirectional `Pipe`

. We can do this with only a few additional functions.

```
specialize p = p ()
generalize f = (\_ -> f)
-- | This takes a one arrity function on a Proxies and turns it into a function
-- on Pipes.
directionalize f = specialize . f . generalize
```

With directionalize it is straight forward to produce the unidirectional variants form the bidirectional functions.

```
runErrorP = directionalize runErrorPB
runMaybeP = directionalize runMaybePB
runWriterP = directionalize runWriterPB
runStateP = directionalize . runStatePB
runRWSP = (directionalize .) . runRWSPB
```

## Compare Core

Lets investigate that the new internal unaware implementation does not adversely effect the end result by looking at the core representation of a simple program.

```
{-# language NoMonomorphismRestriction #-}
module Main where
import Pipes
import qualified Pipes.Prelude as P
import Lift
import qualified Pipes.Lift as PL
standardState = PL.evalStateP 0 counter
newState = evalStateP 0 counter
testStdFunc = runEffect $ each "hi" >-> standardState >-> P.print
testnewFunc = runEffect $ each "hi" >-> newState >-> P.print
main = print "done"
```

core:

```
Result size of Tidy Core = {terms: 88, types: 182, coercions: 0}
main
main = print ($fShow[] $fShowChar) (unpackCString# "done")
newState
newState =
\ @ a_a36u @ a1_a36v @ r_a36w @ m_a36x $dMonad_a36y $dNum_a36z ->
evalStateP'
$dMonad_a36y
(fromInteger $dNum_a36z (__integer 0))
(counter ($fMonadProxy $dMonad_a36y) $dNum_a36z)
testnewFunc
testnewFunc =
\ @ m_a3c1 $dMonadIO_a3c2 ->
let {
$dMonad_a36I
$dMonad_a36I = $p1MonadIO $dMonadIO_a3c2 } in
$ (runEffect $dMonad_a36I)
(>->
$dMonad_a36I
(>->
$dMonad_a36I
(each $dMonad_a36I $fFoldable[] (unpackCString# "hi"))
(newState $dMonad_a36I $fNumInteger))
(print $dMonadIO_a3c2 $fShowInteger))
standardState
standardState =
\ @ s_a3cw @ a_a3cx @ m_a3cy @ r_a3cz $dMonad_a3cA $dNum_a3cB ->
evalStateP
$dMonad_a3cA
(fromInteger $dNum_a3cB (__integer 0))
(counter $dMonad_a3cA $dNum_a3cB)
testStdFunc
testStdFunc =
\ @ m_a3d1 $dMonadIO_a3d2 ->
let {
$dMonad_a3cK
$dMonad_a3cK = $p1MonadIO $dMonadIO_a3d2 } in
$ (runEffect $dMonad_a3cK)
(>->
$dMonad_a3cK
(>->
$dMonad_a3cK
(each $dMonad_a3cK $fFoldable[] (unpackCString# "hi"))
(standardState $dMonad_a3cK $fNumInteger))
(print $dMonadIO_a3d2 $fShowInteger))
main
main = runMainIO main
```

As can be seen from the core, `standardState`

using `evalState`

from `Pipes.Lift`

is identical in all important respects to `newState`

using the new `evalStateP`

. At least for the simple cases both the method presented here and those currently in `Pipes.Lift`

produces the same core representation.

## Conduit and Oother Streaming Libraries

It seems likely that conduit and other streaming libaries can benefit of from the same method. Below I include a rough implmentation for conduit and the state monad transformer.

```
{-# language NoMonomorphismRestriction #-}
import Data.Conduit
import qualified Data.Conduit.List as CL
import Control.Monad.Morph
import Control.Monad.Trans
import qualified Control.Monad.Trans.State as S
catFromLifted = go
where
go = do
x <- lift . lift $ await
case x of
Nothing -> return ()
Just x -> do
yield x
go
catToLifted = go
where
go = do
x <- lift . lift $ await
case x of
Nothing -> return ()
Just x -> do
lift . lift $ yield x
go
counterC = go
where
go = do
x0 <- await
case x0 of
Nothing -> return ()
Just x -> do
lift $ S.modify (+1)
r <- lift S.get
yield r
go
sinkPrint = CL.mapM_ print
runSubConduit p = catFromLifted $= hoist (hoist lift) p $$ catToLifted
evalStateC s = (`S.evalStateT` s) . runSubConduit
main = CL.sourceList [1..5] $= evalStateC 0 counterC $$ sinkPrint
{-
> CL.sourceList [1..5] $= evalStateC 0 counterC $$ sinkPrint
1
2
3
4
5
-}
```

## Conclusion

Hopefully the above makes it easier for everyone to incorporate a streaming library into your existing monad transformer stack and/or vs versa. Shuffling monad layers around like above technique seems incredibly useful so if you know of similar techniques let me know.