Interactive code snippets not yet available for SoH 2.0, see our Status of of School of Haskell 2.0 blog post

Drinkery the boozy streaming library

In the ecosystem of Haskell, a number of stream processing libraries has been made. The very purpose is to process a sequence of values with effects, in a composable manner. Still, I was not satisfied with the sets of features of the existing packages. Accordingly, I decided to make a new one.

It's called drinkery. This package achieves following features:

  • Sequential producer: You can build a producer using a monadic action like yield :: s -> Producer s ().
  • ListT done right: Correct implementation of a list monad transformer.
  • Monadic consumer: A consumer monad processes a stream involving effects.
  • Transducer: A mechanism that transforms streams.
  • Full duplex: Downstream can send a value to the upstream.
  • Decent performance

Hello, world

import Data.Sinky
import qualified Data.Sinky.Finite as D

main = tapListT' (taste "0H1e2l3l4o5,6 w7o8r9ld!\n")
  +& D.filter (not . isDigit)
  $& traverseFrom_ consume (liftIO . putChar)

Sequential producer

Most libraries offers monads where you can emit a value in a computation; Producer for pipes, ConduitM for conduit, PlanT for machines. Sinky employs a Producer which manipulates a Tap, non-terminating source. The type parameter r is a request type that grants full-duplexity; assume () for now.

newtype Producer r s m a = Producer { unProducer :: (a -> Tap r s m) -> Tap r s m }

newtype Tap r s m = Tap { unTap :: r -> m (s, Tap r s m) }

As well as other implementations, yield emits one element.

yield :: (Monoid r, Applicative m) => s -> Producer r (Maybe s) m ()

Producer can be converted to a Tap:

runProducer :: (Monoid r, Applicative m) => Producer r (Maybe s) m a -> Tap r (Maybe s) m

One big difference from other libraries is the explicit use of Maybe to have an end of a stream. This allows you to omit Maybe and use a specific value instead (e.g. empty bytestring).

ListT done right

It's a well-known fact that transformer's ListT is not a monad transformer. A proper implementation is convenient for writing nested loops. Today, several libraries implement ListT. The essentials are:

  • (a) A monad transformer T: T m is a monad for any monad m.
  • (b) An Alternative instance: You can convert a list into an action by asum . map pure.
  • (c) A way to draw a from T m a.

drinkery implements it as a Boehm-Berarducci encoded list. (a) is satisfied because ListT r m is a monad regardless of m. Also There is an alternative instance (b). A specialised function is defined for convenience:

newtype ListT r m s = ListT
  { unListT :: forall x. (s -> Tap r x m -> Tap r x m) -> Tap r x m -> Tap r x m }

sample :: Foldable f => f s -> ListT r m s

You can turn it into a Tap.

runListT :: (Monoid r, Applicative m) => ListT r m s -> Tap r (Maybe s) m

Benchmark time. This benchmark goes through a triple of loops where inner loops depend on the outer one.

sourceAlt :: Monad m => ([Int] -> m Int) -> m Int
sourceAlt k = do
  a <- k [1..50]
  b <- k [1..a]
  c <- k [1..b]
  return $! a + b + c

Thanks to the encoding, ListT doesn't impose a slowdown. In fact, it's the fastest implementation!

drain/drinkery/Producer                    mean 304.7 μs  ( +- 11.48 μs  )
drain/drinkery/ListT                 mean 304.7 μs  ( +- 24.92 μs  )
drain/pipes/Producer                     mean 372.9 μs  ( +- 17.91 μs  )
drain/pipes/ListT                        mean 770.3 μs  ( +- 75.69 μs  )
drain/list-t                             mean 5.332 ms  ( +- 393.9 μs  )
drain/ListT                              mean 23.69 ms  ( +- 1.331 ms  )

Monadic consumer

Monadic consumption is one of the important abilities of a stream processing library (there's an exception like streaming, though). The most common implementation is called iteratee. machines, pipes, and conduit have some additions on it.

newtype Iteratee s m a = Iteratee
  { runIteratee :: m (Either (s -> Iteratee s m a) a) }

On the other hand, drinkery's consumer is dissimilar.

newtype Sink t m a = Sink { runSink :: t m -> m (a, t m) }

consume :: (Monoid r, Monad m) => Sink (Tap r s) m s

The first type parameter represents the source (usually Tap r s). A Sink action is a function that consumes a tap and returns the remainder. You can just apply runSink to feed a tap.

Several combinators are defined to work with finite streams. Their first argument is usually consume.

foldlFrom' :: (Monad m) => m (Maybe a) -> (b -> a -> b) -> b -> m b
foldMFrom :: (Monad m) => m (Maybe a) -> (b -> a -> m b) -> b -> m b
traverseFrom_ :: (Monad m) => m (Maybe a) -> (a -> m b) -> m ()
drainFrom :: (Foldable t, Monad m) => m (Maybe a) -> m ()

Since it operates on a Tap you can push an element back:

leftover ::  (Monoid r, Monad m) => s -> Sink (Tap r s) m ()

Fanout

Sometimes we want to distribute an input stream to multiple consumers. This is not possible with Sink itself and cloning a tap is not trivial either. For this purpose, drinkery offers a classic iteratee:

newtype Awaiter s m a = Awaiter { runAwaiter :: m (Either (s -> Awaiter s m a) a) }

await :: Monad m => Awaiter s m s

serving_ combines a list of Awaiters to one.

serving_ :: Monad m => [Awaiter s m a] -> Awaiter s m ()

It can be converted into a Sink.

iterAwaiterT consume :: Awaiter s m a -> Sink s m a

Taste & compare

Not defined in the package yet, but Sink can simultaneously consume two streams through the Product type. It should also be possible to manipulate an extensible record of taps.

drinkL :: (Monoid r, Monad m) => Sink (Product (Tap r s) tap) m s
drinkL = drinking $ \(Pair p q) -> fmap (`Pair`q) <$> unTap p mempty

Multi-stream is a rare feature. Only machines supports it as far as I know, but it probably won't be long till drinkery gains it in a more flexible way.

Transducer

A stream transducer receives an input and produces zero or more outputs. There are three ways to represent a stream transducer.

  • Concrete structure which can consume and produce values (e.g. machines, pipes, conduit)
  • A stream producer where the base monad is a consumer (iteratee)
  • A function from a stream producer to another stream producer (streaming)

drinkery took the second approach. Distiller tap r s m is a tap which consumes tap.

type Distiller tap r s m = Tap r s (Sink tap m)

Surprisingly, the only combinator introduced is (++$), the composition of a tap and a distiller.

(++$) :: (Functor m) => tap m -> Distiller tap r s m -> Tap r s m

Since Distiller is a special case of a tap, you can feed a drinker a distiller, and you can also connect two distillers using (++$). Note that the drinker also has an access to the input of the distiller, allowing it to send a request.

runSink :: Sink (Tap r s) (Sink tap m) a -> Distiller tap m r s -> Sink tap m a

Full-duplexity

One distinctive feature of pipes is Proxy, the base type, having four parameters:

data Proxy a' a b' b m r
    = Request a' (a  -> Proxy a' a b' b m r )
    | Respond b  (b' -> Proxy a' a b' b m r )
    | M          (m    (Proxy a' a b' b m r))
    | Pure    r

This allows a producer to receive a value of type b', and does a consumer to send a'. This interactivity is useful for handling seeking. However, pipes' composition operator fixes the request type to ():

(>->) :: Monad m
  => Proxy a' a () b m r
  -> Proxy () b c' c m r
  -> Proxy a' a c' c m r

You need to resort to one of the special combinators, (+>>). A sad fact is that Proxy cannot accumulate requests for one input. You would have to define some custom function.

(+>>)
    :: Monad m
    => (b' -> Proxy a' a b' b m r)
    ->        Proxy b' b c' c m r
    ->        Proxy a' a c' c m r

The good old iteratee's composition does propagate requests, but in a rather disappointing fashion: the type of requests is SomeException. Honestly, iteratee's combinators and their semantics are quite puzzling.

newtype Iteratee s m a = Iteratee{ runIter :: forall r.
          (a -> Stream s -> m r) ->
          ((Stream s -> Iteratee s m a) -> Maybe SomeException -> m r) ->
          m r}

Other libraries don't support bidirectionality at all.

As we've seen in the first section, drinkery's Tap (Producer and ListT likewise) has an extra parameter for reception. You can send requests by calling request.

request :: (Monoid r, Monad m) => r -> Sink (Tap r s) m ()

Producer or ListT are able to receive orders from the drinker, flushing the pending requests.

accept :: Monoid r => Producer r s m r
inquire :: Monoid r => ListT r m r

Of course composition doesn't take this ability away.

Resource management

Conduit has a unique mechanism for resource management which makes it a respectably practical library; you can attach a finaliser to a stream producer.

addCleanup :: Monad m => (Bool -> m ()) -> ConduitM i o m r -> ConduitM i o m r

In drinkery, you can create an instance of CloseRequest to finalise a tap. (+&), a specialised version of runSink, closes the tap as soon as the drinker finishes.

class CloseRequest a where
  -- | A value representing a close request
  closeRequest :: a

instance CloseRequest r => Closable (Tap r s)

(+&) :: (Closable tap, Monad m) => tap m -> Sink tap m a -> m a

Performance

I benchmarked a composition of two scanning operations scan (+) 0 D.++$ scan (+) 0 processing 22100 elements.

scan-chain/drinkery/++$                  mean 1.717 ms  ( +- 104.5 μs  )
scan-chain/drinkery/$&                   mean 1.239 ms  ( +- 110.7 μs  )
scan-chain/pipes                         mean 1.210 ms  ( +- 78.40 μs  )
scan-chain/conduit                       mean 1.911 ms  ( +- 97.84 μs  )
scan-chain/machines                      mean 2.731 ms  ( +- 176.9 μs  )

It's quite good. Note that there are two ways to compose distiller: ++$ attaches to a tap or a distiller, and $& attaches to a drinker. The latter seems to be faster. Notably, a single scan is significantly faster than the rivals:

scan/drinkery                            mean 534.6 μs  ( +- 58.07 μs  )
scan/pipes                               mean 736.7 μs  ( +- 54.84 μs  )
scan/conduit                             mean 862.3 μs  ( +- 68.02 μs  )
scan/machines                            mean 1.352 ms  ( +- 84.82 μs  )

Conclusion

drinkery offers a significantly greater flexibility without losing speed. The API is not complete; I plan to add a lot more combinators in the near future.

I'm looking forward to your patronage.