Interactive code snippets not yet available for SoH 2.0, see our Status of of School of Haskell 2.0 blog post

An EDSL for hard working IT programmers

Introduction

I have a problem: How I present the few applicative and monadic combinators that I just developed. I could present them as:

  • something for multithreaded event handling without inversion of control. Or
  • something for paralelization of processes: async without the wait
  • for automatic thread control
  • for alternative and applicative composition of parallel IO actions
  • for indeterminism and asynchronicity effects
  • for high level programming at the specification level
  • for creating and composing applications by means of a single expression
  • for overcoming futures and promises of Scala and JavaScript making them unnecessary

Too much stuff for a single article. Maybe I should split it into pieces and taking more time to write something more extensive and less dense. But I'm lazy and moreover they are only a few primitives, four or five, six with the two state combinators: async waitEvents spawn and react getSData and setSData. No new operators. Breaking the article would hide the big picture. That would not display the beautiful unity of the common solution.

  • If you are interested in how the idea came about read the next paragraph
  • If you are interested in the internals, read the section "Enter the monad"
  • If you are interested in how the monad control multiple threads see "Implicit thread control"
  • If you are interested in examples read from "Example" on
  • If you are interested in Async, promises and futures, read "Beyond futures and promises"
  • If you are interested in running the examples read "Composition of Programs"
  • If you are interested in de-inverting the control of callbacks see "deinverting callbacks"

The problem: parallelization, concurrency and inversion of control

Suppose that I have a blocking computation that return data when something happens. It may be also a long running computation that blocks the thread for a time:

     receive: IO a

I can use it as such, but it blocks. I can not use it in a context where other events are firing. I must create a thread for each blocking IO call. All of these threads probably modify a central state. Otherwise there will be no communication of data among threads. Alternatively someone may have created a kind of framework for this particular problem, where these blocking calls are managed. It may be a GUI toolkit, or a Web application framework, A browser environment or a library for the management of an ATM machine etc. in any case, what the programmer see is a set of blocking synchronous calls and/or a set of callbacks or handlers that he has to program and configure in the framework.

Blocking IO creates the need to resort to manual thread management and concurrency. That means that the code is split into parallel and concurrent chunks which are hard to code and debug. In the second case I have non blocking IO, since the thread management is done implicitly by the framework, but, in the other side, I have to split the program logic into disconnected chunks. As a result, the program logic is very hard to grasp. This is know as the callback hell, a consequence of the inversion of control.

The OOP non-solution half solution

The second scenario appears when threading is managed by a framework. Essentially it is the same case. In both cases we end up with disconnected chunks of code and a mutable state. The standard way to manage this messy central state has been to divide it into smaller states and encapsulate them together with the methods that modify and serve the state. This is the Object Oriented Programming solution; The first OOP languages were created for managing events (SIMULA), and mouse events in a interactive GUI (Smalltalk)

Object-oriented programming is an exceptionally bad idea which could only have originated in California. - Dijkstra

Object Oriented Programming naturally fit with this inversion of control, that pervades IT problems. Whenever there are more than one asynchronous input, there is multitasking or inversion of control with state. The solution is a state machine. What OOP does is to split this state machine into smaller state machines called objects, that interact among them. But that implies the need to "deconstruct" the specifications.

Deconstruct the specification recipe considered harmful

Usually the specification of something that must be done is naturally expressed as if this "something" is a process, in the third person active perspective. People connect mentally the relevant elements directly, without concern for other secondary problems.

And there may be many of these intermediate elements. In a recipe people say : "you must fry the eggs". You don´t say "there is fire and there are eggs, you start the fire and the eggs will be fried by the fire". You see that a complete description in terms of active and passive elements implies to give protagonism to low level elements that you are not interesting when writing an specification.

But in OOP developments, these elements must appear in the form of objects. Creating an OOP solution implies the deconstruction of the specification recipe into multiple third person passive perspectives, one for each class or object that alternatively act as passive and active elements.

In OOP, you can not create a function or method called fryTheEggs that stop the fire when the eggs are fried, without blocking the execution of everything else. So you need to manage explicitly at least two threads that sooner or later will have to communicate asynchronously. Alternatively, you may define start-fire, stop-fire in the fire class, and a callback called egg-fried so that you wire-up the application with these elements.

There is no way to express fry-the-eggs in a single self contained expression that you can reuse. As a consequence, OOP can not produce composable programs. In any case, in OOP, the programs are made of disconnected pieces, the software does not follow the natural flow as it would have been naturally extracted from the specifications and low level details emerge at top level. The resulting code is hard to maintain.

The application/service created with this deconstruction is not composable. There is no way to insert your service within something bigger with a single invocation. Even with the objects that you fully control, since an object, by definition, is a box with many connectors, so it can not be assembled in a pipeline. This derives in the scarce reusability of the software, and the need of profuse documentation. It is inelegant, buggy, hard to maintain, and permits an huge number of arbitrary alternatives in the design space that aggravates the mentioned problems. It may be though that this is good for the IT business, because it justifies big IT department budgets, but this is not good in the medium-long term. There are better ways to do it.

In contrast a true functional solution follow closely the user specification because the very elements of the problem that the user manages should be first class in the program. There is an algebra in which each individual top level element of the user specification is a term in an equation. Therefore, reusability and composability are the natural consequence. That algebra is instantiated in an embedded domain-specific language EDSL.

But there are reasons why functional programs are not composable. The main obstacle for composability in functional languages are the asynchronous inputs. In the past there were a good effort into using continuations to deal with them, but lately they have been abandoned due to the irruption of OOP programmers in the functional arena.

What we need

Simplicity is prerequisite for reliability. - Dijkstra

The application must be programmed following the natural flow defined in the specification. The code must not split the specifications into explicit parallel running tasks, neither invert the control and deconstruct the specification into objects. The design space must be limited so everyone should program the same specification the same way. So other's code can be grasped immediately without the aid of external documentation. The application must transport user-defined state, that can be inspected and updated, added and deleted, but this state must be instrumental. It should not be the center, because the center is the process described in the specification.

We need an EDSL for hardworking IT programmers, that use Java, JavaScript, Scala, C#, PHP, Ruby or Python and don't know Haskell. They need to experiment an immediate and huge advantage using Haskell. Not a monad stack but a simple monad, not more complex to use than IO, that may liberate them from the Oppressive Object Paradigm, or OOP inversion of control, without forcing them to sacrifice time and effort to the gods of Category Theory. With applicative and alternative combinators and a few primitives for implicit parallelization and thread control and for de-inversion of callbacks in the IO monad. Plus user-defined state management and early termination

What we need is a software connector that works like a hardware serial bus . The hardware designers invented the serial bus for the same problem. Their chips had many more pins than the software objects have methods, so connecting between them directly was impossible. For that purpose they invented the serial bus, that receive injected signals at different points. How a connector for different elements that inject events from GUI widgets, asynchronous responses, callbacks from frameworks and hardware interruptions would look like?

“Simplicity is a great virtue but it requires hard work to achieve it and education to appreciate it. And to make matters worse: complexity sells. - Dijkstra

Enter the monad

A monad with the asynchronicity effect can rescue the industry from the inversion of control trap for which OOP was originally designed while allowing implicit parallelization and thread control. A entire application can be coded in a single monadic expression with little or no plumbing. That allows the creation of composable applications and services of the A -> A -> A kind.

In A monad for reactive programming I defined a monad that de-invert the control when there are different events. The Transient monad can listen for events at different points in the monadic expression. Current solutions have a single listen point for events. This single watching point has different names: At the OS level there are calls like select. GUI and Client Web frameworks have an event loop at the lower level. But at the top level they send events to different UI elements. That kind of interface invert the control, since the programmer has to define callbacks. The reactive solutions bubble up the events to a single listen point again, and attach an event preprocessor to it together with a single expression that act like a big event handler.

The solution of the above mentioned article keep the events in the UI elements that produced the event without inverting the control. Moreover, these events listeners do not block, so every event watching point is active in the monad at the same time.

The events in the above mentioned article are injected by a simulated event loop in the state monad. This time I will show how to listen for IO computations without the help of a framework that bring events. These events may be hardware buttons, device driver inputs, requests from users, responses from databases, requests from other systems in the cloud etc.

What we intend here is to formulate a general solution that permit coding close to the user requirement document, that is expressive enough to code an entire application as a single monadic expression even if involves multiple inputs and parallel executions. This expression will spawn communicate and kill tasks whenever necessary automatically. We will see that we can improve the readability and reduce the complexity, so we can increase the maintainability, enabling composability of entire services or applications.

Since I have to deal with dirty things like blocking, threads and IO, don't expect what follows to be a walk in the Platonic realm. I start with a monad like the Transient monad, that can be stopped with empty and continued with runCont cont where cont is the continuation context, set with getCont. (explanation below)

    data Transient m x= Transient  {runTrans :: m (Maybe x)}

    data EventF  = forall a b . EventF
             {xcomp :: (TransientIO a)
             ,fcomp :: [a -> TransientIO b]
             , ... other ....}

    type StateIO= StateT EventF  IO

    type TransientIO= Transient StateIO
    
    instance Monad TransientIO where
        return x = Transient $ return $ Just x
        x >>= f = Transient $ do
            cont <- setEventCont x  f
            mk <- runTrans x
            resetEventCont cont
            case mk of
                Just k  -> runTrans $ f k
                Nothing -> return Nothing

    instance Applicative TransientIO where
    pure a  = Transient  .  return $ Just a
    Transient f <*> Transient g= Transient $ do
        k <- f
        x <- g
        return $  k <*> x

    instance  Alternative TransientIO where
        empty= Transient $ return  Nothing
        Transient f <|> Transient g= Transient $ do
            k <- f
            x <- g
            return $  k <|> x

    getCont ::(MonadState EventF  m) => m EventF
    getCont = get

    runCont :: EventF -> StateIO ()
    runCont (EventF  x fs ...)= do runIt x (unsafeCoerce fs); return ()
      where
      runIt x fs= runTrans $  x >>= compose fs

      compose []= const empty
      compose (f: fs)= \x -> f x >>= compose fs

For a view of how this monad has evolved look at the first article A monad for reactive programming part 1 where I present a simpler version of this monad that has some shortcomings. In the second part I solved these shortcomings. I think that this is the best way to understand it.

What this monad does is to store the closure x and the continuations f in the state. getCont captures the execution state at the point and runCont executes it.

As far as "continuation" is taken here, there may be more than one of them.

For example, in this expression:

 x0 >>=((x >>= f1) >>= f2) >>= f3

for the closure generated at the execution point x, the continuations are

 f1 >>= f2 >>= f3

And the closure is the result of the execution of x0 >>= x

What setEventCont and resetEventCont does is to compose the list of continuations (one for each nested expression) in a 'flattened' representation, as a list in fcomp. Since the list does not "know" that the continuations types match, I have to erase the types using unsafeCoerce.

Each level is recursive. that means that if I have:

do
  (a >> b) <|> (c >> d)
  e

When the expression is executed, the closure is (a >> b) <|> (c >> d) and the continuation is e.

when a >> b is executed setEventcont put a as closure and b >> e as continuation. If a has an statement that uses the continuation mechanism, for example async (see below) it will execute a >>b >>e.

But look at how Alternative operator is defined:

      f <|>  g = Transient $ do
                  k <- runTrans f
                  x <- runTrans g
                  return $  k <|> x

where f is a >> b and g is c >> d . Then both operands are executed. When c is executed, (d >> e) is the continuation. So if a and c receive different events (see below) they would execute their respective continuations, that have e in common.

Parallelization

With these three primitives getCont runCont and empty I will define a async primitive that will run a blocking IO action in a new thread and will execute the continuation in that thread when something is received:

    buffer :: Dynamic
    
    buffer= unsafePerformIO $ newEmptyMVar

    async :: IO a -> TransientIO a
    async receive =  do
      cont <- getCont
      r <- liftIO $ tryTakeMVar buffer

      case r of

        Nothing ->do
           liftIO . forkIO $  do
              r <- receive
              putMVar buffer $ toDync  r
              runCont cont
              return()
           empty

        Just r -> return $ formDynamic r

Essentially, async get the continuation, then inspect the buffer. If there is Nothing then spawn receive in a new thread. The current thread is finished (empty). When something arrives, it is put in the buffer, then runCont will continue at the beginning of receive' in the new thread. It does so because getCont got the Transient continuation there. This time, there will be something in the buffer and will return it, so the procedure will continue after the event arrives, but in a new thread.

Note that receive only fill the buffer. when runContexecutes the closure it will inspect the buffer again. This time there will be something, the closure will succeed and the continuation will fire.

getCont and runCont are similar to setjmp and longjmp in C. Moreover, the mechanism is not very different form how the IO scheduler in GHC or in any operating system. But this time it runs at the application level rather than at the GHC level.

Wait for events

If we want to trigger the continuation repeatedly whenever something is received by receive, it is a matter of adding a loop to the Nothing branch. Then the continuation will be called for every received event.

let's call this variant waitEvents:

    waitEvents :: IO a -> TransientIO a
    waitEvents receive =  do
      cont <- getCont
      r <- tryTakeMVar buffer

      case r of

        Nothing ->do
           liftIO . forkIO $ loop $ do
              r <- receive
              putMVar buffer  r
              runCont cont
              return()
           empty

        Just r -> return r
        
      where
      loop x= x >> loop x

Example

This program will say hello to every name entered.

    runTransient :: TransientIO x -> IO (Maybe x, EventF)
    runTransient t= runStateT (runTrans t) eventf0
    
    main= do
        runTransient $ do
          name <- waitEvents getLine
          liftIO $ putStrLn $ "hello "++ name
        stay

Note that there is no loop. waitEvent install getLine at the start of a process that execute the continuation, what is after getLine, for each entry. the loop is internal to waitEvents

Here runTransient execute a transient computation down to the IO monad.

stay is whatever that keep the console application from exiting. That is because since the transient branch that wait for events is non-blocking, it would finish immediately. After async or waitEvents, the current thread dies and the rest of the monadic computation run in a different thread

Implicit thread control

Since each event in any part of the monadic computation are active and trigger the continuation of the monad at that point, the monadic expression is multithreaded and non determinist.

How to control the threads?. It is natural to think that since waitEvents and async execute continuations within the monadic expression, then once something happens in a statement then their continuations must be invalidated.

That means that whenever async of waitEvents receive something, the threads that are running below must be killed. Then this statement, with the new buffered input will execute his closure and rebuild the continuation again.

This is the natural thread management that I implemented. I do not detail the modifications necessary for waitEvents to permit this behaviour. It is a matter of keeping in the state the list of spawned threads so that each waitEvents has the information about all the threads that are triggered after it. Additionally, this list contain also a buffer for each of these threads.

In this example:

    main= do
        runTransient $ do
          waitEvents watchReset <|> return ()
          name <- waitEvents getLine
          liftIO $ putStrLn $ "hello "++ name
        stay

The return() composed with the alternative operator <|>would bypass immediately the wait for the reset event, but as soon as the reset is pressed, all the event handlers spawned after it will be killed. Immediately they will be spawned again.

This is a slightly different version:

    main= do
        runTransient $ do
          r <- (waitEvents watchStop >> return True) <|> return False
          if r then liftIO $ putStrln "STOP" else do
             name <- waitEvents getLine
             liftIO $ putStrLn $ "hello "++ name
        stay

In this case the program will be stopped and will not be re-spawned when watchStop is activated. since now the branch of the monad executed is different. It prints the stop message and finalizes.

Non blocking IO

Let's create one nonblocking keyboard input thing called option. At the same time this is a good example of inter-thread communication within the Transient monad:

    option :: (Typeable a, Show a, Read a, Eq a) =>
               a -> [Char] -> TransientIO a
    option ret message= do
      liftIO $ putStrLn $ message++"("++show ret++")"
      waitEvents "" getLine'    
      where
      getLine'=  do
       atomically $ do
         mr <- readTVar getLineRef
         case mr of
           Nothing -> retry
           Just r ->
              case readsPrec 0 r of
              [] -> retry
              (s,_):_ -> if ret== s
                     then do
                       writeTVar  getLineRef Nothing
                       return ret               
                     else retry
               _ -> retry

    getLineRef= unsafePerformIO $ newTVarIO Nothing

    inputLoop :: IO ()
    inputLoop=  do
           r<- getLine         !> "started inputLoop"
           if r=="end" return True else do
              atomically . writeTVar  getLineRef $ Just r
              inputLoop

Applicative and Alternative combinators

option read in nonblocking mode the standard input, so many options can be combined using applicative or alternative operators. option shows a message and wait for inputLoop to enter a input line. If some option match, it return the value. If it does not match, it fails with empty, but the loop in waitEvents re-executes getLine' again for this option. In this way, the options are continuosly watching the input. Note that more than one option can be triggered simultaneously, in a different thread.

inputLoop is initialized by async. It wait for input, and expose it to all the running getLine' processes (one per option) in a TVar. if the user press "end" inputLoop return and async kill all the watching threads below.

    main= do
       runTransient choose
       stay

    choose :: TransientIO()
    choose= do
       r <- async inputLoop <|> return False
       case r of
         True -> return ()
         False-> do
           r <-  option (1 :: Int) "red"  <|> option 2 "green" <|> option 3 "blue"
           liftIO $ print r

The above program will print repeatedly the option chosen. We see that option is composable using the alternative operator.

Now let's create another event generator, a number is sent every second, while two options are waiting for keyboard input:

    data Option= Option String  String | Number Int deriving Show
    
    choose= do
       r <-  ( Option <$> ( option "1" "red"  <*> option "2" "green"))
         <|> ( Number <$> waitEvents  waitnumber )
       liftIO $ putStrLn $ "result=" ++ show r
       
    where
    waitnumber= do
      threadDelay 1000000
      return 42

Applicative and alternative combinators can be used fully. The Applicative wait for both events to be triggered to have data in their respective buffers. waitnumber produce an event each second.

Each Option run a different waitEvent in a different thread, but each one of them execute the same closure (xcomp) that is the whole applicative expression. The three have a TVar waiting for new input. they fill their respective buffers when they validate. The thread that fill the last buffer succeed and execute the continuation. The other two fail, but stay ready for the next input, since optionuses waitEvents , which has a loop.

Beyond futures and promises

Scala Futures and the haskell library async uses placeholders, that receive the result. These placeholders can be used instad of the result of the computation, but some waiting operation must be put somewhere either at the end of the chain of sentences that operate with the future.

Scala Futures uses them in nice chains of multi threaded lists that can be transformed in the style of map-reduce.

In this sense they are similar to the javaScript promises, which chain code with then, but the latter does not perform multiple tasks like in the case of Scala futures.

For some needs, Scala and JavaScript must use callbacks since the constraints of their frameworks do not allow enough flexibility. futures and promises forces the programmer to enter in a different kind of computation model, different from the one of the native languages. In the case of Scala it is mostly monoidal. in the case of Javascript is a restricted form of bind operation.

This library put the continuation code at the end of the receiving pipeline and parallelize the execution, but the continuation is the plain code that is after the receive call in the monadic expression, so there is no restriction about what can be done.

async can be used for any process that we want to parallelize using applicative notation. This program sum the words in google and haskell homepages in parallel. Using Network.HTTP

    sum= do
       (r,r') <- (,) <$> async  (worker "http://www.haskell.org/")
                     <*> async  (worker "http://www.google.com/")

       liftIO $ putStrLn $ "result="  ++ show r + r'


    where
    getURL= simpleHTTP . getRequest

    worker :: String -> IO Int
    worker url=do
      r <- getURL url
      body <- getResponseBody r
      return . length . words $ body

That is a complete working example. Note that unlike in the async library, there is no wait primitive and no explicit constructon for parallelization. The applicative instance does the parallelization and async does also the wait. All the processing is done in the worker in his own thread. More on that below.

We also can do parallel IO processing in the style of futures of the Scala language using the Monoid instance of TransientIO. But this time since we use continuations, futures are no longer necessary since the thread of the download that finalizes the latter is the one that continues the execution. A future, instead would yield the control to a third main thread that coordinates the rest. In transient, there is no main thread. The faster thread, the one that finalizes the download the first, fails (as I explained above).

    instance Monoid a => Monoid (TransientIO a) where
      mappend x y = mappend <$> x <*> y  
      mempty= return mempty

    sum= do
       rs <- foldl (<>) (return 0) $ map (async . worker)
                  [ "http://www.haskell.org/", "http://www.google.com/"]

       liftIO $ putStrLn $ "result="  ++ show rs

Since the workerreturn an Int, to sum the results we need a Monoid instance for Int

    instance Monoid Int where
      mappend= (+)
      mempty= 0

Note that there is a full de-inversion of control, since the result return to the monad. The Futures and promises of Scala (or in javascript) can not return the execution flow to the calling procedure without a form of explicit asynchronous rendezvous with the main thread that executes the main flow where it continues single threaded, in imperative mode.

Since the parallel effects of the Transient monad continue in the do block, the processing of the results can continue in the monad. That permits more complex and yet clearer computations. It is not reduced to list-like processing and the main thunk of the computation should not be single threaded.

I said that in an applicative instance with async sentences, the one that finalize the latter computes the result and continue the rest of the computation, the other threads stop. But in an applicative expression with async, all the thread may succeed and return result to the do block, so all of them executes the next statements in parallel. This is the non-deterministic effect of transient that is explored in another article.

A Web Server

Here is toy Web Server:


    server=  do
       sock <-  liftIO $  listenOn $ PortNumber 80
       (h,_,_) <- spawn $ accept sock
       liftIO $ do
           hPutStr h msg
           hFlush h
           hClose h

    msg = "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"

In the current code, the primitives async, waitEvents and spawn are defined in terms of parallel, which is a generalization of async and waitEvents explained above :

    data Loop= Once | Loop | Multithread

    waitEvents ::  IO b -> TransientIO b
    waitEvents= parallel Loop

    async  :: IO b -> TransientIO b
    async = parallel Once

    spawn= parallel Multithread

    parallel  ::  Loop ->  IO b -> TransientIO b

When parallel is called with Multithread, it spawn the continuation in a thread for each event received immediately without waiting for the termination of the previous one. waitEvent execute the continuation within the thread so the receive method is not called again until the previous event is processed.

Composition of programs (Runnable example)

We can compose any of these programs together since none of them block and the automatic thread control apply gracefully to all of the elements. This program combines the above programs and some others.

The combination in this case is using the alternative operator:

colors <|> app  <|> sum1 <|> sum2 <|> server <|> menu

Since the fpcomplete environment uses ghci and it shares threads among snippets of code, I can run only one example in this article, and the composability of Transient is nice to show them all together.

To verify the multitasking press: app and then colors. app would start an iterative counter with an applicative expression, while colors will ask for an option among tree of them. both will run in parallel until you press "main" which will stop both, since main is above in the monad.



{-# START_FILE main.hs #-}


{-# LANGUAGE ScopedTypeVariables #-}

module Main where

import           Base
import           Control.Applicative
import           Control.Concurrent
import           Control.Exception
import           Control.Monad.State
import           Data.Monoid
import           System.IO.Unsafe

import           Network.HTTP

import           Network
import           System.IO

-- show

data Option= Option Int  Int | Number Int deriving Show

instance Monoid Int where
      mappend= (+)
      mempty= 0

main= do
    runTransient $ do
       async inputLoop  <|> return ()
       option "main" "to return to the main menu" <|> return ""
       liftIO $ putStrLn "MAIN MENU"
       colors <|> app  <|> sum1 <|> sum2 <|> server <|> menu

    stay

colors :: TransientIO ()
colors= do
       option "colors" "choose between three colors"
       r <-  color 1  "red"  <|> color 2 "green" <|> color 3 "blue"
       liftIO $ print r
       where
       color :: Int -> String -> TransientIO String
       color n str= option (show n) str >> return  str

app :: TransientIO ()
app= do
       option "app" "applicative expression that return a counter in 2-tuples every second"
       r <-  (,) <$>  number  <*> number
       liftIO $ putStrLn $ "result=" ++ show r

       where
       number= waitEvents $ do
          threadDelay 1000000
          n <- takeMVar counter
          putMVar counter (n+1)
          return  n

       counter=unsafePerformIO $ newMVar (0 :: Int)

sum1 :: TransientIO ()
sum1= do
       option "sum1" "access to two web pages concurrently and sum the number of words using Applicative"
       (r,r') <- (,) <$> async  (worker "http://www.haskell.org/")
                     <*> async  (worker "http://www.google.com/")

       liftIO $ putStrLn $ "result="  ++ show (r + r')

getURL= simpleHTTP . getRequest

worker :: String -> IO Int
worker url=do
      r <- getURL url
      body <- getResponseBody r
      putStrLn $ "number of words in " ++ url ++" is: " ++ show(length (words body))
      return . length . words $ body

sum2 :: TransientIO ()
sum2= do
       option "sum2" "access to N web pages concurrenty and sum the number of words using map-fold"
       rs <- foldl (<>) (return 0) $ map (async . worker)
                  [ "http://www.haskell.org/"
                  , "http://www.google.com/"]

       liftIO $ putStrLn $ "result="  ++ show rs

server :: TransientIO ()
server=  do
       option "server" "A web server in the port 8080"
       liftIO $ print "Server Stated"
       sock <-  liftIO $  listenOn $ PortNumber 8080
       (h,_,_) <- spawn $ accept sock
       liftIO $ do
           hPutStr h msg
           putStrLn "new request"
           hFlush h
           hClose h
         `catch` (\(e::SomeException) -> sClose sock)

msg = "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"


menu :: TransientIO ()
menu=  do
     option "menu"  "a submenu with two options"
     colors  <|> sum2

-- / show





{-# START_FILE Base.hs #-}

-----------------------------------------------------------------------------
--
-- Module      :  Base
-- Copyright   :
-- License     :  GPL (Just (Version {versionBranch = [3], versionTags = []}))
--
-- Maintainer  :  agocorona@gmail.com
-- Stability   :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}

-- show
module Base (
module Control.Applicative,
TransientIO,
async,waitEvents, spawn,react,
runTransient,
inputLoop, option, stay,
getSData,setSData,delSData
) where
-- /show

import           Control.Applicative
import           Control.Monad.State
import           Data.Dynamic
import qualified Data.Map               as M
import           Data.Monoid
import           Debug.Trace
import           System.IO.Unsafe
import           Unsafe.Coerce
import           Control.Concurrent
import           Control.Concurrent.STM
import           Data.List
import           Data.Maybe
import           GHC.Conc
import           System.Mem.StableName

(!>) = const . id --  flip trace
infixr 0 !>

data Transient m x= Transient  {runTrans :: m (Maybe x)}
type SData= ()

type EventId= Int

data EventF  = forall a b . EventF{xcomp      :: (EventId,TransientIO a)
                                  ,fcomp      :: [a -> TransientIO b]
                                  ,mfData     :: M.Map TypeRep SData
                                  ,mfSequence :: Int
                                  ,nodeInfo   :: Maybe (P RowElem)
                                  ,row        :: P RowElem
                                  ,replay     :: Bool
                                  }

type P= MVar

(=:) :: P a  -> a -> IO()
(=:) n v= modifyMVar_ n $ const $ return v

type Buffer= Maybe ()
type NodeTuple= (EventId, ThreadId, Buffer)

type Children=  Maybe (P RowElem)

data RowElem=   Node NodeTuple |  RowList Row Children

instance Show RowElem where
  show (Node (e,_,_))= show e
  show (RowList r ch)= show ( reverse r)  ++ "->" ++ show ch

type Row = [P RowElem]

instance Eq NodeTuple where
     (i,_,_) ==  (i',_,_)= i == i'


instance Show x => Show (MVar x) where
  show  x = show (unsafePerformIO $ readMVar x)

eventf0= EventF  (-1,empty) [const $ empty] M.empty 0
         Nothing rootRef False


topNode= (-1 :: Int,unsafePerformIO $ myThreadId,False,Nothing)

rootRef :: MVar RowElem
rootRef=  unsafePerformIO $ newMVar $ RowList []  Nothing

instance MonadState EventF  TransientIO where
  get=  Transient $ get >>= return . Just
  put x= Transient $ put x >> return (Just ())


type TransientIO= Transient StateIO

type StateIO= StateT EventF  IO



runTransient :: TransientIO x -> IO (Maybe x, EventF)
runTransient t= runStateT (runTrans t) eventf0


newRow :: MonadIO m => m (P RowElem)
newRow= liftIO $ newMVar $ RowList [] Nothing

setEventCont ::   TransientIO a -> (a -> TransientIO b) -> StateIO EventF
setEventCont x f  = do
   st@(EventF   _ fs d _ es ro r)  <- get
   n <- if replay st then return $ mfSequence st
     else  liftIO $ readMVar refSequence
   ro' <- newRow
   ro `eat` ro'
   put $ EventF   (n,x) ( f: unsafeCoerce fs) d n es ro' r !> ("stored " ++ show n)
   return st

eat ro ro'= liftIO $
 modifyMVar_  ro $ \(RowList es t) -> return $ RowList (ro':es) t

resetEventCont (EventF x fs _ _ _ _ _)=do
   st@(EventF   _ _ d  n es ro r )  <- get
   put $ EventF  x fs d n es ro r


getCont ::(MonadState EventF  m) => m EventF
getCont = get

runCont :: EventF -> StateIO ()
runCont (EventF  (i,x) fs _ _ _ _ _)= do runIt i x (unsafeCoerce fs); return ()
   where
   runIt i x fs= runTrans $ do
         st <- get
         put st{mfSequence=i}
         r <- x
         put st
         compose fs r

compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs


instance   Functor TransientIO where
  fmap f x=   Transient $ fmap (fmap f) $ runTrans x --


instance Applicative TransientIO where
  pure a  = Transient  .  return $ Just a
  Transient f <*> Transient g= Transient $ do
       k <- f
       x <- g
       return $  k <*> x

instance  Alternative TransientIO where
  empty= Transient $ return  Nothing
  Transient f <|> Transient g= Transient $ do
       k <- f
       x <- g
       return $  k <|> x


-- | a sinonym of empty that can be used in a monadic expression. it stop the
-- computation
stop :: TransientIO a
stop= Control.Applicative.empty

instance Monoid a => Monoid (TransientIO a) where
  mappend x y = mappend <$> x <*> y
  mempty= return mempty

instance Monad TransientIO where
      return x = Transient $ return $ Just x
      x >>= f  = Transient $ do
        cont <- setEventCont x  f
        mk <- runTrans x
        resetEventCont cont
        case mk of
           Just k  -> do addRow' !> "ADDROW" ; runTrans $ f k

           Nothing -> return Nothing

        where
        addRow'= do
            r <- gets row
            n <- addRow r
            modify $ \s -> s{row= n}
addRow r=
            liftIO $ do
              n <- newMVar $ RowList [] Nothing
              modifyMVar_ r $ \(RowList ns ch) -> do
                case ch of
                  Just x -> error $ "children not empty: "++ show x
                  Nothing ->  return $ RowList  ns $ Just n
              return n



instance MonadTrans (Transient ) where
  lift mx = Transient $ mx >>= return . Just

instance MonadIO TransientIO where
  liftIO = lift . liftIO --     let x= liftIO io in x `seq` lift x



-- | Get the session data of the desired type if there is any.
getSessionData ::  (MonadState EventF m,Typeable a) =>  m (Maybe a)
getSessionData =  resp where
 resp= gets mfData >>= \list  ->
    case M.lookup ( typeOf $ typeResp resp ) list of
      Just x  -> return . Just $ unsafeCoerce x
      Nothing -> return $ Nothing
 typeResp :: m (Maybe x) -> x
 typeResp= undefined

-- | getSessionData specialized for the View monad. if Nothing, the monadic computation
-- does not continue. getSData is a widget that does not validate when there is no data
--  of that type in the session.
getSData :: MonadState EventF m => Typeable a =>Transient m  a
getSData= Transient getSessionData


-- | setSessionData ::  (StateType m ~ MFlowState, Typeable a) => a -> m ()
setSessionData  x=
  modify $ \st -> st{mfData= M.insert  (typeOf x ) (unsafeCoerce x) (mfData st)}

-- | a shorter name for setSessionData
setSData ::  ( MonadState EventF m,Typeable a) => a -> m ()
setSData= setSessionData

delSessionData x=
  modify $ \st -> st{mfData= M.delete (typeOf x ) (mfData st)}

delSData :: ( MonadState EventF m,Typeable a) => a -> m ()
delSData= delSessionData


----

genNewId :: MonadIO m => MonadState EventF m =>  m Int
genNewId=  do
      st <- get
      case replay st of
        True -> do
          let n= mfSequence st
          put $ st{mfSequence= n+1}
          return n
        False -> liftIO $
          modifyMVar refSequence $ \n -> return (n+1,n)

refSequence :: MVar Int
refSequence= unsafePerformIO $ newMVar 0


--- IO events

--buffers :: IORef [(EventId,Dynamic)]
--buffers= unsafePerformIO $ newIORef []

data Loop= Once | Loop | Multithread deriving Eq

waitEvents ::  IO b -> TransientIO b
waitEvents= parallel Loop
spawn= parallel Multithread


async  :: IO b -> TransientIO b
async = parallel Once

parallel  ::  Loop ->  IO b -> TransientIO b
parallel hasloop receive =  Transient $ do

      cont <- getCont
      id <- genNewId
      let currentRow= row cont
--          mnode=  nodeInfo cont
      mnode  <-   liftIO $ lookTree id currentRow !> ("idToLook="++ show id++ " in: "++ show currentRow)

      case mnode of
        Nothing ->do
                 return () !> "NOT FOUND"
                 liftIO $ do
                   ref <- newMVar $ Node (id,undefined,Nothing)

                   modifyMVar_ (row cont) $ \(RowList ns t) -> return $  RowList (ref : ns) t
                   forkIO $ do
                     th <- myThreadId
                     modifyMVar_ ref $ \(Node(id,_,n)) -> return $ Node (id,th,Nothing)


                     loop hasloop  receive $ \r -> do

                      th <-  myThreadId
                      modifyMVar_  ref $ \(Node(i,_,_)) -> return
                                       $ Node(i,th,Just $ unsafeCoerce r)
                      case cont of
                        EventF  (i,x) f _ _ _ _ _-> do
                          mr <- runStateT  (runTrans x)
                                cont{replay= True,mfSequence=i,nodeInfo=Just ref}
                             !> "runx" !> ("mfSequence="++ show i)
                          case mr  of
                            (Nothing,_) ->return()


                            (Just r,cont') ->do

                               let row1= row cont'
                               delEvents  row1        !> ("delEvents, activated    "++ show row1)
                               id <- readMVar refSequence
                               n <-  if hasloop== Multithread then return row1 else  addRow  row1
                               runStateT (runTrans $ ( compose $ unsafeCoerce f) r)
                                       cont'{row=n,replay= False,mfSequence=id } !> ("SEQ=" ++ show(mfSequence cont'))
                               return ()
--                      delEvents children []


                   modifyMVar_ (row cont) $ \(RowList ns ch) -> return $  RowList (ref : ns) ch

                 return Nothing


        Just (node@(id',th', mrec)) -> do
          modify $ \cont -> cont{nodeInfo=Nothing}
          return $ if isJust mrec then Just $ unsafeCoerce $ fromJust mrec else Nothing

        where


        loop Once rec x  = rec >>= x
        loop Loop rec f = do
            r <- rec
            f r
            loop Loop rec f

        loop Multithread rec f = do
            r <- rec
            forkIO $ f r
            loop Multithread rec f

        lookTree :: EventId -> P RowElem -> IO (Maybe NodeTuple)
        lookTree id ref=  do
            RowList ns _<- readMVar ref
            lookList id ns



        lookList id mn= case mn of
              [] -> return Nothing
              (p:nodes) -> do
                  me <- readMVar p
                  case me of
                    Node(node@((id',_,_))) ->
                      if id== id'
                         then return $ Just node
                         else lookList id nodes
                    RowList row _ -> do
                         mx <- lookList id nodes
                         case mx of
                           Nothing -> lookList id row
                           Just x -> return $ Just x
        delEvents :: P RowElem  -> IO()
        delEvents ref = do
            RowList mevs mch <- takeMVar ref
            maybeDel mch
            putMVar ref $ RowList mevs Nothing

        maybeDel mch=  case mch of
              Nothing -> return ()
              Just p -> do
                  RowList es mch' <- readMVar p
                  delList es !> ("toDelete="++ show es)
                  maybeDel mch'


        delList es=  mapM_ del es where
          del p = readMVar p >>= del'
          del' (Node(node@(_,th,_)))= killThread th !> ("DELETING " ++ show node)
          del' (RowList l mch)= delList l >> maybeDel mch


type EventSetter eventdata response= (eventdata ->  IO response) -> IO ()
type ToReturn  response=  IO response

-- | de-invert a event handling setter. the second parameter compute the response to return each time the event handler is called.
-- It is useful whenever there is a framework or OS service that need to set interruption handlers, event handlers, request handlers,
-- callbacks etc.
--
-- For example, if we have this OnResponse callback setter for a asynchronous query response that send data to display:
--
-- >    data Control= SendMore | MoMore
-- >
-- >    OnResponse :: (Response -> IO Control) -> IO()
--
--  We can iterate the responses and  we can interrupt them this way:
--
-- >    rcontrol <- newMVar Control
-- >
-- >    resp <- react $ OnResponse (const $ readMVar rcontrol)
-- >    display resp
-- >    r <- (option "more" "more" >> return SendMore) <|> (option "stop" "stop" >> return NoMore)
-- >    putMVar rcontrol r

react
  :: Typeable eventdata
  => EventSetter eventdata response
  -> ToReturn  response
  -> TransientIO eventdata

react setHandler iob= Transient $ do
        cont    <- getCont
        mEvData <- getSessionData
        case mEvData of
          Nothing -> do
            liftIO $ setHandler $ \dat ->do
--              let cont'= cont{mfData = M.insert (typeOf dat)(unsafeCoerce dat) (mfData cont)}
              runStateT (setSData dat >> runCont cont) cont
              iob
            return Nothing
          Just dat -> delSessionData dat >> return (Just  dat)



getLineRef= unsafePerformIO $ newTVarIO Nothing

-- for testing purposes
option1 x  message=  inputLoop `seq` (waitEvents  $ do
     liftIO $ putStrLn $ message++"("++show x++")"
     atomically $ do
       mr <- readTVar getLineRef
       th <- unsafeIOToSTM myThreadId
       case mr of
         Nothing -> retry
         Just r ->
            case reads1 r !> ("received " ++  show r ++  show th) of
            (s,_):_ -> if  s == x  !> ("waiting" ++ show x)
                     then do
                       writeTVar  getLineRef Nothing !>"match"
                       return s

                     else retry
            _ -> retry)
     where
     reads1 s=x where
      x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
      typeOfr :: [(a,String)] ->  a
      typeOfr  = undefined

option ::  String -> String -> TransientIO String
option ret message= do
    liftIO $ putStrLn $"Enter "++show ret++"\tto: " ++ message
    waitEvents  $ getLine' (==ret)
    liftIO $do putStrLn $ show ret ++ " chosen"
    return ret

getLine' :: (String->  Bool) -> IO String
getLine' cond=   inputLoop `seq` do
     atomically $ do
       mr <- readTVar getLineRef
       th <- unsafeIOToSTM myThreadId
       case mr of
         Nothing -> retry
         Just r ->
            if cond r  !> show (cond r)
                     then do
                       writeTVar  getLineRef Nothing !>"match"
                       return r

                     else retry
     where
     reads1 s=x where
      x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
      typeOfr :: [(a,String)] ->  a
      typeOfr  = undefined

inputLoop :: IO ()
inputLoop=  do
    putStrLn "Press end to exit"
    inputLoop'
    where
        inputLoop'= do
           r<- getLine                      !> "started inputLoop"
           if r=="end" then putMVar rexit () else do
              atomically . writeTVar  getLineRef $ Just r
              inputLoop'


rexit= unsafePerformIO newEmptyMVar

stay=  takeMVar rexit >> print "bye"

Session data

I added a type indexed map to the state so the user can store his own session data with these primitives:

    setSData :: a -> TransientIO ()

    getSData :: TransientIO  a

Session data can be used instead of a state monad transformer for each new kind of user data.

My purpose is to create a monad for general IT purposes, for profane programmers with no knowledge of monad transformers.

Since empty stop the computation but does not transport any error condition, session data can be used for this purpose:

    data Status= Error String | NoError

    fail :: String -> TransientIO a
    fail msg= setSData (Error msg) >> empty

After the execution, I can inspect the status:

    status <- getSData <|> NoError

The alternative expression is necessary since if Status has not been set, the computation would stop. NoError guarantee that it does not stop.

de-inverting callbacks

So far so good. But what happens when besides dealing with raw blocking IO there is a framework that deal with some particular events, so it initiates the threads himself and expect you just to set the callbacks?

Suppose that we have this event handling setter:

     setHandlerForWatever :: (a -> IO ()) -> IO ()

It is necessary a de-inversion call whateverHappened at some point of the computation may be at the beginning) so that the callback continues the monadic execution:

     do
        somethingToDo
        r <- whatheverHappened
        doSomethingWith r
        ....

To define the de-inverted call whateverHappened we use the same trick than in async, but this time there is no forkIO neither thread control, since the framework does it for you:

     whateverHappened= do
        cont <- getCont
        mEvData <- Just <*> getSData <|> return Nothing
        case mEvData of
          Nothing ->  setHandlerForWhatever $\dat -> do
              runStateT ( setSData dat >> runTansient cont) cont
              empty
          Just dat -> return dat

Whether the framework is single threaded or multi threaded is not important, we give it the event handlers that it need by means of continuations.

To have something more general, I defined:

   type EventSetter eventdata response= (eventdata ->  IO response) -> IO ()
   type ToReturn  response=  IO response
   
   react
      :: Typeable eventdata
      => EventSetter eventdata response
      -> ToReturn  response
      -> TransientIO eventdata

the second parameter is the value returned by the callback. So if you have a callback called OnResponse

    data Control= SendMore | MoMore
    
    OnResponse :: (Response -> IO Control) -> IO()

I can display all data received while controlling the reception this way:

    rcontrol <- newMVar Control
    
    resp <- react $ OnResponse (const $ readMVar rcontrol)
    display resp
    r <- (option "more" "more" >> return SendMore) <|> (option "stop" "stop" >> return NoMore)
    putMVar rcontrol r
    
    

Since react set as callback all the rest of the computation and since the ToReturn expression is evaluated the latest, the continuation is executed and set rcontrol before the ToReturn expression is evaluated.

Note that you can reassign the callback at any moment since react would set whatever continuation that is after it.

Conclusions and future work

The code is at

https://github.com/agocorona/transient

My aim is to create a family of combinators for programming in industry. As I said before, that implies no monad transformers, the simplest monad that could produce the simplest error messages.

The haskell applicative, alternative, monoidal and monadic combinators when applied to a monad that manage asynchronous IO permits multithreaded programming with little plumbing that is close to the specification level with great composability. No inversion of control means no need to deconstruct the specifications and no state machines.

This, together with the uniform and composable thread management, narrow the design space and makes the application more understandable from the requirements, and thus the technical documentation and maintenance costs are reduced to a minimum.

Note that the bulk of the programming is done in the IO monad. That is on purpose. The idea is a simple IT EDSL with the rigth effects that permit rapid and intuitive development. Additional monads can be used by running them within the IO procedures defined by the programmer if they wish. I will add some additional effects like backtracking to undo transactions and to produce execution traces. That would be the base of a new version of MFlow, my server-side framework and integration platform. The ability to perform rollbacks and respond to asynchronous events at the same time is important for cloud applications. reader and writer effects for any programmer need are almost trivial to implement using getSData and setSData.

Resource allocation and deallocation for file handlers etc can be done using the same strategy used for thread control, but it is more orthogonal to delegate it to the IO threads themselves. The programmer can use exceptions or monads that guarantee proper release of resources before the thread is killed.

In Multithread mode the single entry buffer can be overrun. It is necessary to handle this case or, else, assume that the receive procedure has his own buffer and his own event contention mechanism. That is the most orthogonal option.

This is huge. I plan to create interfaces for some GUI toolkit. The GUI objects will be fully composable for the first time.

Spawning threads in other machines is the next big step. MFlow and hplayground will converge with this platform. If you want to collaborate, don´t hesitate to send me a message!

With the react primitive It is possible to de-invert any framework, including the callbacks of a GUI toolkit, so the widgets can be managed with Applicative and monadic combinators. hplayground does that for the Javascript callbacks and HTML forms. Since hplayground, that run in the client and MFlow that run server side share the same widget EDSL, that can be ported to a GUI, an application can run in any environment, including console applications.

Part II: The hardworking programmer II: practical backtracking to undo actions