In the ecosystem of Haskell, a number of stream processing libraries has been made. The very purpose is to process a sequence of values with effects, in a composable manner. Still, I was not satisfied with the sets of features of the existing packages. Accordingly, I decided to make a new one.
It's called [drinkery](http://hackage.haskell.org/package/drinkery). This package achieves following features:
* __Sequential producer__: You can build a producer using a monadic action like `yield :: s -> Producer s ()`.
* __[ListT done right](https://wiki.haskell.org/ListT_done_right)__: Correct implementation of a list monad transformer.
* __Monadic consumer__: A consumer monad processes a stream involving effects.
* __Transducer__: A mechanism that transforms streams.
* __Full duplex__: Downstream can send a value to the upstream.
* __Decent performance__
## Hello, world
```haskell
import Data.Sinky
import qualified Data.Sinky.Finite as D
main = tapListT' (taste "0H1e2l3l4o5,6 w7o8r9ld!\n")
+& D.filter (not . isDigit)
$& traverseFrom_ consume (liftIO . putChar)
```
## Sequential producer
Most libraries offers monads where you can emit a value in a computation; `Producer` for `pipes`, `ConduitM` for `conduit`, `PlanT` for `machines`. Sinky employs a `Producer` which manipulates a `Tap`, non-terminating source. The type parameter `r` is a request type that grants full-duplexity; assume `()` for now.
```haskell
newtype Producer r s m a = Producer { unProducer :: (a -> Tap r s m) -> Tap r s m }
newtype Tap r s m = Tap { unTap :: r -> m (s, Tap r s m) }
```
As well as other implementations, `yield` emits one element.
```haskell
yield :: (Monoid r, Applicative m) => s -> Producer r (Maybe s) m ()
```
`Producer` can be converted to a `Tap`:
```haskell
runProducer :: (Monoid r, Applicative m) => Producer r (Maybe s) m a -> Tap r (Maybe s) m
```
One big difference from other libraries is the explicit use of `Maybe` to have an end of a stream. This allows you to omit `Maybe` and use a specific value instead (e.g. empty bytestring).
## ListT done right
It's a well-known fact that [transformer](https://hackage.haskell.org/package/transformers-0.5.5.0/docs/Control-Monad-Trans-List.html)'s ListT is _not_ a monad transformer. A proper implementation is convenient for writing nested loops. Today, several libraries implement ListT. The essentials are:
* (a) A monad transformer `T`: `T m` is a monad for any monad `m`.
* (b) An `Alternative` instance: You can convert a list into an action by `asum . map pure`.
* (c) A way to draw `a` from `T m a`.
drinkery implements it as a Boehm-Berarducci encoded list. (a) is satisfied because `ListT r m` is a monad regardless of `m`. Also There is an alternative instance (b). A specialised function is defined for convenience:
```haskell
newtype ListT r m s = ListT
{ unListT :: forall x. (s -> Tap r x m -> Tap r x m) -> Tap r x m -> Tap r x m }
sample :: Foldable f => f s -> ListT r m s
```
You can turn it into a `Tap`.
```haskell
runListT :: (Monoid r, Applicative m) => ListT r m s -> Tap r (Maybe s) m
```
Benchmark time. [This benchmark](https://github.com/fumieval/drinkery/blob/master/benchmarks/benchmark.hs) goes through a triple of loops where inner loops depend on the outer one.
```haskell
sourceAlt :: Monad m => ([Int] -> m Int) -> m Int
sourceAlt k = do
a <- k [1..50]
b <- k [1..a]
c <- k [1..b]
return $! a + b + c
```
Thanks to the encoding, ListT doesn't impose a slowdown. In fact, it's the fastest implementation!
```
drain/drinkery/Producer mean 304.7 μs ( +- 11.48 μs )
drain/drinkery/ListT mean 304.7 μs ( +- 24.92 μs )
drain/pipes/Producer mean 372.9 μs ( +- 17.91 μs )
drain/pipes/ListT mean 770.3 μs ( +- 75.69 μs )
drain/list-t mean 5.332 ms ( +- 393.9 μs )
drain/ListT mean 23.69 ms ( +- 1.331 ms )
```
## Monadic consumer
Monadic consumption is one of the important abilities of a stream processing library (there's an exception like [streaming](https://hackage.haskell.org/package/streaming), though). The most common implementation is called iteratee. `machines`, `pipes`, and `conduit` have some additions on it.
```haskell
newtype Iteratee s m a = Iteratee
{ runIteratee :: m (Either (s -> Iteratee s m a) a) }
```
On the other hand, drinkery's consumer is dissimilar.
```haskell
newtype Sink t m a = Sink { runSink :: t m -> m (a, t m) }
consume :: (Monoid r, Monad m) => Sink (Tap r s) m s
```
The first type parameter represents the source (usually `Tap r s`). A `Sink` action is a function that consumes a tap and returns the remainder. You can just apply `runSink` to feed a tap.
Several combinators are defined to work with finite streams. Their first argument is usually `consume`.
```haskell
foldlFrom' :: (Monad m) => m (Maybe a) -> (b -> a -> b) -> b -> m b
foldMFrom :: (Monad m) => m (Maybe a) -> (b -> a -> m b) -> b -> m b
traverseFrom_ :: (Monad m) => m (Maybe a) -> (a -> m b) -> m ()
drainFrom :: (Foldable t, Monad m) => m (Maybe a) -> m ()
```
Since it operates on a `Tap` you can push an element back:
```haskell
leftover :: (Monoid r, Monad m) => s -> Sink (Tap r s) m ()
```
### Fanout
Sometimes we want to distribute an input stream to multiple consumers. This is not possible with `Sink` itself and cloning a tap is not trivial either. For this purpose, drinkery offers a classic iteratee:
```haskell
newtype Awaiter s m a = Awaiter { runAwaiter :: m (Either (s -> Awaiter s m a) a) }
await :: Monad m => Awaiter s m s
```
`serving_` combines a list of `Awaiter`s to one.
```haskell
serving_ :: Monad m => [Awaiter s m a] -> Awaiter s m ()
```
It can be converted into a `Sink`.
```haskell
iterAwaiterT consume :: Awaiter s m a -> Sink s m a
```
### Taste & compare
Not defined in the package yet, but `Sink` can simultaneously consume two streams through the [Product](https://hackage.haskell.org/package/base-4.10.1.0/docs/Data-Functor-Product.html) type. It should also be possible to manipulate an [extensible](https://hackage.haskell.org/package/extensible) record of taps.
```haskell
drinkL :: (Monoid r, Monad m) => Sink (Product (Tap r s) tap) m s
drinkL = drinking $ \(Pair p q) -> fmap (`Pair`q) <$> unTap p mempty
```
Multi-stream is a rare feature. Only machines supports it as far as I know, but it probably won't be long till drinkery gains it in a more flexible way.
## Transducer
A stream transducer receives an input and produces zero or more outputs. There are three ways to represent a stream transducer.
* Concrete structure which can consume and produce values (e.g. machines, pipes, conduit)
* A stream producer where the base monad is a consumer (iteratee)
* A function from a stream producer to another stream producer (streaming)
drinkery took the second approach. `Distiller tap r s m` is a tap which consumes `tap`.
```haskell
type Distiller tap r s m = Tap r s (Sink tap m)
```
Surprisingly, the only combinator introduced is `(++$)`, the composition of a tap and a distiller.
```haskell
(++$) :: (Functor m) => tap m -> Distiller tap r s m -> Tap r s m
```
Since Distiller is a special case of a tap, you can feed a drinker a distiller, and you can also connect two distillers using `(++$)`. Note that the drinker also has an access to the input of the distiller, allowing it to send a request.
```haskell
runSink :: Sink (Tap r s) (Sink tap m) a -> Distiller tap m r s -> Sink tap m a
```
Full-duplexity
----
One distinctive feature of [pipes](https://hackage.haskell.org/package/pipes) is `Proxy`, the base type, having four parameters:
```haskell
data Proxy a' a b' b m r
= Request a' (a -> Proxy a' a b' b m r )
| Respond b (b' -> Proxy a' a b' b m r )
| M (m (Proxy a' a b' b m r))
| Pure r
```
This allows a producer to receive a value of type `b'`, and does a consumer to send `a'`. This interactivity is useful for handling seeking. However, pipes' composition operator fixes the request type to `()`:
```haskell
(>->) :: Monad m
=> Proxy a' a () b m r
-> Proxy () b c' c m r
-> Proxy a' a c' c m r
```
You need to resort to one of the special combinators, `(+>>)`. A sad fact is that `Proxy` cannot accumulate requests for one input. You would have to define some custom function.
```haskell
(+>>)
:: Monad m
=> (b' -> Proxy a' a b' b m r)
-> Proxy b' b c' c m r
-> Proxy a' a c' c m r
```
The good old [iteratee](https://hackage.haskell.org/package/iteratee)'s composition does propagate requests, but in a rather disappointing fashion: the type of requests is `SomeException`. Honestly, iteratee's combinators and their semantics are quite puzzling.
```haskell
newtype Iteratee s m a = Iteratee{ runIter :: forall r.
(a -> Stream s -> m r) ->
((Stream s -> Iteratee s m a) -> Maybe SomeException -> m r) ->
m r}
```
Other libraries don't support bidirectionality at all.
As we've seen in the first section, drinkery's Tap (`Producer` and `ListT` likewise) has an extra parameter for reception. You can send requests by calling `request`.
```haskell
request :: (Monoid r, Monad m) => r -> Sink (Tap r s) m ()
```
Producer or ListT are able to receive orders from the drinker, flushing the pending requests.
```haskell
accept :: Monoid r => Producer r s m r
inquire :: Monoid r => ListT r m r
```
Of course composition doesn't take this ability away.
### Resource management
Conduit has a unique mechanism for resource management which makes it a respectably practical library; you can attach a finaliser to a stream producer.
```haskell
addCleanup :: Monad m => (Bool -> m ()) -> ConduitM i o m r -> ConduitM i o m r
```
In drinkery, you can create an instance of `CloseRequest` to finalise a tap. `(+&)`, a specialised version of `runSink`, closes the tap as soon as the drinker finishes.
```haskell
class CloseRequest a where
-- | A value representing a close request
closeRequest :: a
instance CloseRequest r => Closable (Tap r s)
(+&) :: (Closable tap, Monad m) => tap m -> Sink tap m a -> m a
```
## Performance
I [benchmark](https://github.com/fumieval/drinkery/blob/master/benchmarks/benchmark.hs)ed a composition of two scanning operations `scan (+) 0 D.++$ scan (+) 0` processing 22100 elements.
```haskell
scan-chain/drinkery/++$ mean 1.717 ms ( +- 104.5 μs )
scan-chain/drinkery/$& mean 1.239 ms ( +- 110.7 μs )
scan-chain/pipes mean 1.210 ms ( +- 78.40 μs )
scan-chain/conduit mean 1.911 ms ( +- 97.84 μs )
scan-chain/machines mean 2.731 ms ( +- 176.9 μs )
```
It's quite good. Note that there are two ways to compose distiller: `++$` attaches to a tap or a distiller, and `$&` attaches to a drinker. The latter seems to be faster. Notably, a single scan is significantly faster than the rivals:
```haskell
scan/drinkery mean 534.6 μs ( +- 58.07 μs )
scan/pipes mean 736.7 μs ( +- 54.84 μs )
scan/conduit mean 862.3 μs ( +- 68.02 μs )
scan/machines mean 1.352 ms ( +- 84.82 μs )
```
Conclusion
----
drinkery offers a significantly greater flexibility without losing speed. The API is not complete; I plan to add a lot more combinators in the near future.
I'm looking forward to your patronage.