In the ecosystem of Haskell, a number of stream processing libraries has been made. The very purpose is to process a sequence of values with effects, in a composable manner. Still, I was not satisfied with the sets of features of the existing packages. Accordingly, I decided to make a new one. It's called [drinkery](http://hackage.haskell.org/package/drinkery). This package achieves following features: * __Sequential producer__: You can build a producer using a monadic action like `yield :: s -> Producer s ()`. * __[ListT done right](https://wiki.haskell.org/ListT_done_right)__: Correct implementation of a list monad transformer. * __Monadic consumer__: A consumer monad processes a stream involving effects. * __Transducer__: A mechanism that transforms streams. * __Full duplex__: Downstream can send a value to the upstream. * __Decent performance__ ## Hello, world ```haskell import Data.Sinky import qualified Data.Sinky.Finite as D main = tapListT' (taste "0H1e2l3l4o5,6 w7o8r9ld!\n") +& D.filter (not . isDigit) $& traverseFrom_ consume (liftIO . putChar) ``` ## Sequential producer Most libraries offers monads where you can emit a value in a computation; `Producer` for `pipes`, `ConduitM` for `conduit`, `PlanT` for `machines`. Sinky employs a `Producer` which manipulates a `Tap`, non-terminating source. The type parameter `r` is a request type that grants full-duplexity; assume `()` for now. ```haskell newtype Producer r s m a = Producer { unProducer :: (a -> Tap r s m) -> Tap r s m } newtype Tap r s m = Tap { unTap :: r -> m (s, Tap r s m) } ``` As well as other implementations, `yield` emits one element. ```haskell yield :: (Monoid r, Applicative m) => s -> Producer r (Maybe s) m () ``` `Producer` can be converted to a `Tap`: ```haskell runProducer :: (Monoid r, Applicative m) => Producer r (Maybe s) m a -> Tap r (Maybe s) m ``` One big difference from other libraries is the explicit use of `Maybe` to have an end of a stream. This allows you to omit `Maybe` and use a specific value instead (e.g. empty bytestring). ## ListT done right It's a well-known fact that [transformer](https://hackage.haskell.org/package/transformers-0.5.5.0/docs/Control-Monad-Trans-List.html)'s ListT is _not_ a monad transformer. A proper implementation is convenient for writing nested loops. Today, several libraries implement ListT. The essentials are: * (a) A monad transformer `T`: `T m` is a monad for any monad `m`. * (b) An `Alternative` instance: You can convert a list into an action by `asum . map pure`. * (c) A way to draw `a` from `T m a`. drinkery implements it as a Boehm-Berarducci encoded list. (a) is satisfied because `ListT r m` is a monad regardless of `m`. Also There is an alternative instance (b). A specialised function is defined for convenience: ```haskell newtype ListT r m s = ListT { unListT :: forall x. (s -> Tap r x m -> Tap r x m) -> Tap r x m -> Tap r x m } sample :: Foldable f => f s -> ListT r m s ``` You can turn it into a `Tap`. ```haskell runListT :: (Monoid r, Applicative m) => ListT r m s -> Tap r (Maybe s) m ``` Benchmark time. [This benchmark](https://github.com/fumieval/drinkery/blob/master/benchmarks/benchmark.hs) goes through a triple of loops where inner loops depend on the outer one. ```haskell sourceAlt :: Monad m => ([Int] -> m Int) -> m Int sourceAlt k = do a <- k [1..50] b <- k [1..a] c <- k [1..b] return $! a + b + c ``` Thanks to the encoding, ListT doesn't impose a slowdown. In fact, it's the fastest implementation! ``` drain/drinkery/Producer mean 304.7 μs ( +- 11.48 μs ) drain/drinkery/ListT mean 304.7 μs ( +- 24.92 μs ) drain/pipes/Producer mean 372.9 μs ( +- 17.91 μs ) drain/pipes/ListT mean 770.3 μs ( +- 75.69 μs ) drain/list-t mean 5.332 ms ( +- 393.9 μs ) drain/ListT mean 23.69 ms ( +- 1.331 ms ) ``` ## Monadic consumer Monadic consumption is one of the important abilities of a stream processing library (there's an exception like [streaming](https://hackage.haskell.org/package/streaming), though). The most common implementation is called iteratee. `machines`, `pipes`, and `conduit` have some additions on it. ```haskell newtype Iteratee s m a = Iteratee { runIteratee :: m (Either (s -> Iteratee s m a) a) } ``` On the other hand, drinkery's consumer is dissimilar. ```haskell newtype Sink t m a = Sink { runSink :: t m -> m (a, t m) } consume :: (Monoid r, Monad m) => Sink (Tap r s) m s ``` The first type parameter represents the source (usually `Tap r s`). A `Sink` action is a function that consumes a tap and returns the remainder. You can just apply `runSink` to feed a tap. Several combinators are defined to work with finite streams. Their first argument is usually `consume`. ```haskell foldlFrom' :: (Monad m) => m (Maybe a) -> (b -> a -> b) -> b -> m b foldMFrom :: (Monad m) => m (Maybe a) -> (b -> a -> m b) -> b -> m b traverseFrom_ :: (Monad m) => m (Maybe a) -> (a -> m b) -> m () drainFrom :: (Foldable t, Monad m) => m (Maybe a) -> m () ``` Since it operates on a `Tap` you can push an element back: ```haskell leftover :: (Monoid r, Monad m) => s -> Sink (Tap r s) m () ``` ### Fanout Sometimes we want to distribute an input stream to multiple consumers. This is not possible with `Sink` itself and cloning a tap is not trivial either. For this purpose, drinkery offers a classic iteratee: ```haskell newtype Awaiter s m a = Awaiter { runAwaiter :: m (Either (s -> Awaiter s m a) a) } await :: Monad m => Awaiter s m s ``` `serving_` combines a list of `Awaiter`s to one. ```haskell serving_ :: Monad m => [Awaiter s m a] -> Awaiter s m () ``` It can be converted into a `Sink`. ```haskell iterAwaiterT consume :: Awaiter s m a -> Sink s m a ``` ### Taste & compare Not defined in the package yet, but `Sink` can simultaneously consume two streams through the [Product](https://hackage.haskell.org/package/base-4.10.1.0/docs/Data-Functor-Product.html) type. It should also be possible to manipulate an [extensible](https://hackage.haskell.org/package/extensible) record of taps. ```haskell drinkL :: (Monoid r, Monad m) => Sink (Product (Tap r s) tap) m s drinkL = drinking $ \(Pair p q) -> fmap (`Pair`q) <$> unTap p mempty ``` Multi-stream is a rare feature. Only machines supports it as far as I know, but it probably won't be long till drinkery gains it in a more flexible way. ## Transducer A stream transducer receives an input and produces zero or more outputs. There are three ways to represent a stream transducer. * Concrete structure which can consume and produce values (e.g. machines, pipes, conduit) * A stream producer where the base monad is a consumer (iteratee) * A function from a stream producer to another stream producer (streaming) drinkery took the second approach. `Distiller tap r s m` is a tap which consumes `tap`. ```haskell type Distiller tap r s m = Tap r s (Sink tap m) ``` Surprisingly, the only combinator introduced is `(++$)`, the composition of a tap and a distiller. ```haskell (++$) :: (Functor m) => tap m -> Distiller tap r s m -> Tap r s m ``` Since Distiller is a special case of a tap, you can feed a drinker a distiller, and you can also connect two distillers using `(++$)`. Note that the drinker also has an access to the input of the distiller, allowing it to send a request. ```haskell runSink :: Sink (Tap r s) (Sink tap m) a -> Distiller tap m r s -> Sink tap m a ``` Full-duplexity ---- One distinctive feature of [pipes](https://hackage.haskell.org/package/pipes) is `Proxy`, the base type, having four parameters: ```haskell data Proxy a' a b' b m r = Request a' (a -> Proxy a' a b' b m r ) | Respond b (b' -> Proxy a' a b' b m r ) | M (m (Proxy a' a b' b m r)) | Pure r ``` This allows a producer to receive a value of type `b'`, and does a consumer to send `a'`. This interactivity is useful for handling seeking. However, pipes' composition operator fixes the request type to `()`: ```haskell (>->) :: Monad m => Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r ``` You need to resort to one of the special combinators, `(+>>)`. A sad fact is that `Proxy` cannot accumulate requests for one input. You would have to define some custom function. ```haskell (+>>) :: Monad m => (b' -> Proxy a' a b' b m r) -> Proxy b' b c' c m r -> Proxy a' a c' c m r ``` The good old [iteratee](https://hackage.haskell.org/package/iteratee)'s composition does propagate requests, but in a rather disappointing fashion: the type of requests is `SomeException`. Honestly, iteratee's combinators and their semantics are quite puzzling. ```haskell newtype Iteratee s m a = Iteratee{ runIter :: forall r. (a -> Stream s -> m r) -> ((Stream s -> Iteratee s m a) -> Maybe SomeException -> m r) -> m r} ``` Other libraries don't support bidirectionality at all. As we've seen in the first section, drinkery's Tap (`Producer` and `ListT` likewise) has an extra parameter for reception. You can send requests by calling `request`. ```haskell request :: (Monoid r, Monad m) => r -> Sink (Tap r s) m () ``` Producer or ListT are able to receive orders from the drinker, flushing the pending requests. ```haskell accept :: Monoid r => Producer r s m r inquire :: Monoid r => ListT r m r ``` Of course composition doesn't take this ability away. ### Resource management Conduit has a unique mechanism for resource management which makes it a respectably practical library; you can attach a finaliser to a stream producer. ```haskell addCleanup :: Monad m => (Bool -> m ()) -> ConduitM i o m r -> ConduitM i o m r ``` In drinkery, you can create an instance of `CloseRequest` to finalise a tap. `(+&)`, a specialised version of `runSink`, closes the tap as soon as the drinker finishes. ```haskell class CloseRequest a where -- | A value representing a close request closeRequest :: a instance CloseRequest r => Closable (Tap r s) (+&) :: (Closable tap, Monad m) => tap m -> Sink tap m a -> m a ``` ## Performance I [benchmark](https://github.com/fumieval/drinkery/blob/master/benchmarks/benchmark.hs)ed a composition of two scanning operations `scan (+) 0 D.++$ scan (+) 0` processing 22100 elements. ```haskell scan-chain/drinkery/++$ mean 1.717 ms ( +- 104.5 μs ) scan-chain/drinkery/$& mean 1.239 ms ( +- 110.7 μs ) scan-chain/pipes mean 1.210 ms ( +- 78.40 μs ) scan-chain/conduit mean 1.911 ms ( +- 97.84 μs ) scan-chain/machines mean 2.731 ms ( +- 176.9 μs ) ``` It's quite good. Note that there are two ways to compose distiller: `++$` attaches to a tap or a distiller, and `$&` attaches to a drinker. The latter seems to be faster. Notably, a single scan is significantly faster than the rivals: ```haskell scan/drinkery mean 534.6 μs ( +- 58.07 μs ) scan/pipes mean 736.7 μs ( +- 54.84 μs ) scan/conduit mean 862.3 μs ( +- 68.02 μs ) scan/machines mean 1.352 ms ( +- 84.82 μs ) ``` Conclusion ---- drinkery offers a significantly greater flexibility without losing speed. The API is not complete; I plan to add a lot more combinators in the near future. I'm looking forward to your patronage.