Interactive code snippets not yet available for SoH 2.0, see our Status of of School of Haskell 2.0 blog post

Moving Haskell processes between nodes (Transient effects IV)

This article has been rewritten the 19/08/2015

Motivation (yours not mine)

The distributed framework that I´m developing is a completely different way to consider distributed computing in the same sense that Transient is a different way to program in Haskell or in any other language. It is so straightforward that is difficult to understand after decades of inversion of control, programming state machines, objects frameworks, routes, configurations and callbacks. If you dare to think different you will benefit from this higher level way of programming that I'm convinced that is the future.

It is the future because it is functionality oriented, not object oriented. A software functionality is a business process, that naturally compose with other business processes. A process is like physiology, while OOP is like anatomy. Functional programming is about composability, whether efectful or not. Transient is deeply functional and can express business functionalities in compact category theoretical expressions that compose. Have you seen two distributed programs that can be composed to create a wider one?

The main obstacle for such composability is events and blocking. events are asynchronous and need callbacks. Alternatively the process may avoid callback spagettization if the thread blocks and wait for events. That is what the synchronous IO call does.

And this is what Erlang does. but that impairs composability, since to handle other events it is necessary to spawn another thread. Both threads would need to communicate and so on. That is why an Erlang program is made of many small independent distributed elements. But then synchronizations and extra communications and monitorizations are necessary. That accidental complexity complicate the development and the code is again difficult to understand.

A third solution is to use continuations. But continuations are either hard to master or have a akward syntax or are hard to impossible to serialize in order to save state or to transport them. In particular, haskell has no support for the serialization of continuations. That is the reason why continuation frameworks have been not accepted neither in haskell neither in other languages, except perhaps in the lisp family where these problems are less critical. A remaining problem not solved by continuations is the composition of parallel programs.

Transient does compose arbitrary pieces of software in concurrent parallel and sequential arangements not by communicating them trough channels neither by means of special constructions or configurations, but by means of standard Haskell combinators. Therefore these compositions are checked at compilation time, even if the elements will run in different machines.

A sad story

Two years ago I created MFlow, the only web framework that uses Haskell category theoretical abstractions for the creation of the high level elements of web applications such are routes and navigations among other things. For the first time navigation elements are composable. For example:

   login >> navigateCatalog >>= shoppingCartManagement >>= payment

This is an application, assembled with four navigations. Back button and arbitrary URLs entered works fine. It has not the problems associated with continuations because it does not use continuations as such.

Besides having more features than the widely used frameworks and more innovative ones, apart from the above mentioned, MFlow has not been sucessful. What was the cause? My careless and clumsy English? My lack of attention to details? Lack of social skills? For sure.

But there are other reasons why this framework or other early continuation frameworks like WASH has not gained support; For example, the haskellers that program web apps comes from other languages and find that strange.

But there is also a fear of novelty, a reluctance to think different and a fear to do things in a different way than what is academically or industrially established.

There is also a tendency to wait and see. "mmm is interesting... let's look what happens... this guy does interesting things but nobody knows him... I will not shoot myself in the foot. I will not risk my investments; The more traditional path is more sure".

And finally, there's tradition of inversion of control that is a second nature in the programming community since the invention of OOP and the mouse. The invention of the mouse, and then a second source of input, ended the golden age of imperfect but effective composability with the console apps and the unix pipes. Simply, the unix pipes model, that composed single streams was not valid because a second stream of events was added. The solution was the state machine, divided in small pieces each one called classes and objects. That was OOP. But objects are not composable.

Later on, I created hplayground, a client side framework with the same characteristics of composability. For this purpose I created the Transient monad. Transient is a new way of using continuations and a new way of looking at the Monad instance, as a manipulation of continuations. Again, a different way to think about both concepts. The whole history is narrated in the series of articles in FPComplete.

The history repeated with hplayground sadly, since again, it is the only fully composable FRP, 100% category theoretical standard haskell for programming the shortest and cleanest code ever seen in a Web Browser.

But I don't care because this is THE future. I did not invented it. It was there and there is no better way. Perhaps someone will reinvent it in Java 14 or in Scala 10 and composable de-inverted programming gain acceptance. I will see it from Heaven.

Composable distributed computing

Having the possibility to express multithreaded programs that deal with IO in a composable way, I was persuaded that a composable form of distributed computing was possible using the non-blocking calls of transient. Composable in this context means that a distributed computing functionality can be invoked within a monadic, monoidal, applicative or alternative expression to create a wider application that are composable too.

In mathematical terms, composability means that there is a form of algebra for combining the elements of the domain problem. In this case, it is the abstract algebra that include monads, applicatives, alternatives and monoids. Composable programming is like teaching the computer to do maths without greek letters. In the same way that we combine numbers in complicated algebraic expressions -every programming language know how to do that algebra- we can teach computers to create expressions that include concurrent, parallel, sequential and distributed computations. And we can combine these expressions to create even more complex expressions.

For example, a map-reduce operation that may involve many nodes can be inserted in a monadic expression to access data that later will feed a browser application. All this programmed in a single expression of beautiful haskell code with standard combinators.

And it is possible as you may verify below. Not only because map-reduce is now doable that way, but because in the future I will integrate MFlow and hplayground, the web frameworks, with Transient.

The plan

I plan to develop various levels of distributed computing, all of them composable:

  • Level 1: basic calls for node-to-node communication: beamTo, forkTo, callTo, beamInit, listen

  • Level 2: clusted oriented computing: process execution replicated in al connected nodes: clustered, asynchronize, connect

  • level 3: location independent communications: processes are connected without mention of the hosts and ports. Process location static and stablished at configuration time.

  • level 4: location independent, processes movable from node to node depending on run-time conditions according with programmer defined strategies. (A cloud monad)

At this moment I have developed the two first levels

The repository is at:

https://github.com/agocorona/transient

Level 1: basic calls for node-node commmunication

I defined three primitives, the level 1 of the distributed framework, but at the level in which they are defined are quite powerful:

beamTo :: HostName -> PortID -> TransientIO ()
forkTo :: HostName -> PortID -> TransientIO ()
callTo :: HostName -> PortID -> TransientIO a -> TransientIO a

The second action execute a copy of the running process in the other node with the same execution state.

The first continue the execution of the monadic sequence in other node, but "destroy the original". That means that the running process continues in the other node.

The first two send the process and forget about it. The third call a procedure in the other node, and receive the result.

There are other auxiliary primitives that I will detail.

beamTo can be used for that kind of sequential process that may need to do different things in different nodes. and perhaps may return with the results. Web applications can start in the data tier, continue in the server and migrate to the browser for a long interaction, then can return to the server and so on.

forkTo may be used for example, to configure a node interactively and then clone that configuration and execution state in many nodes.

Map-reduce

callTo may be used for fast roundtrip calls and also to start subtasks in other nodes. Combined with the monoid instance of Transient, it can be used to create mapreduce calls which spawn worker task to other nodes:

rs <- foldl (<>) mempty $ map (\(h,p) -> callTo h p worker) nodes

The three primitives can be combined for complex scenarios.

NOTE: the reduce operation in this case is a fold in the receiving node. For largue volumes of data this is not realistic. The reduction should be also distributed. That is something that [I did here] (https://www.fpcomplete.com/user/agocorona/estimation-of-using-distributed-computing-streaming-transient-effects-vi-1#distributed-datasets)

forwarding events across the network

callTo is not a simple remote call by various reasons. Not only it works in de-inverted mode, it means that it does not set any callback. Instead, the result return to the routine that called it. That permit much more understandable code.

But that is not done sacrificing composability since it is also non blocking when used with applicative, alternative and monoidal expressions, since the rest of the expression does not depend on the result of callTo to continue. The map-reduce snippet would not work in parallel if not for this property.

But there is more. Since callTo uses the Transient monad, that is inherently non deterministic and multithreaded. the local node uses parallel to receive successive responses, each one in a different thread. This avoid buffer overruns and contentions. An example, running below, is as follows:

networkEvents rh rp= do
     r <- callTo rh rp $ do
                         option "fire"  "fire event"
                         return "event fired"
     putStrLnhp  rp $ r ++ " in remote node"

putStrLnhp p msg= liftIO $ putStr (show p) >> putStr " ->" >> putStrLn msg

optionis executed in the remote node, when the user write "fire" the remote node return the message to the caller node and another message appears in the local node.

since option produces an event for each "fire" entered, callTo works here as an event pipeline between the two nodes.

As an example, a single callTo can be used to interact with a client side widget or it can retrieve chunks from a query to a large database.

Level 2

The higher level cloud primitives that I am developping now are:

connect will connect a new node to a network of nodes in a Wide Area Network. It will connect to one of the nodes and all the rest will receive notifications about it.

connect :: Node -> TransientIO ()

clustered will execute a computation in all the connected nodes, like for example, to update/query a distributed database. This will not return until all computations succeed. It gets the return values and mconcat them.

clustered :: Monoid a => TransientIO a -> TransientIO a

it is the map-reduce snippet of the second paragraph, but applied to all the connected nodes. Later on, a system of capabilities will filter the nodes, so that the nodes will specialize in some calls. For example, some nodes may store data and other can process the data and respond to user requests.

asynchronize Will do the same that clustered, but asynchronously. The primitive will schedule the remote execution, but it will return immediately.

asynchronize :: TransientIO () -> TransientIO ()

connect can be expressed in terms of clustered:

connect (host,port) = do
     nodes <- callTo host port $ clustered $ do
                           updateNodeList 
                           myNode <- getMyNode
                           return [myNode]
     setNodes nodes

Since one of the nodes in the list is the local one, reentrant invocation of callTo is necessary

That's all for the moment.

distributed Chat application in four lines

Wow. Perhaps this may be too mind blowing. So much that may be anticlimactic, but this snippet is a distributed chat application. If nodes contain all the connected chat programs:

chat :: [(HostName, PortID)] -> Transient StateIO ()
chat nodes = do 
    name  <- step $ do liftIO $ putStrLn "Name?" ; input (const True)
    text  <- step $ waitEvents  $ putStr ">" >>hFlush stdout >> getLine' (const True)
    let line= name ++": "++ text 
    foldl (<>) mempty $ map (\(h,n) -> callTo h  n  . liftIO $ putStrLn line) nodes 

This is part of the executable examples below.

input is a non-blocking execute-once version of getLine. It has a parameter that is the validator. getLine' is a non-blocking version of getLine .
waitEvents is another Transient primitive that executes the continuation for each input. So it will execute the last line for each line entered with the keyboard.

The last line uses the Monoid instance of Transient to execute callTo in parallel for all the nodes.

This would also do it:

mapM_ (\(h,n) -> callTo h  n  . liftIO $ putStrLn line) nodes

But this last one does not do it in parallel, but sequentially.

How it works

This may be a case of the egg of Columbus . The migration of the program is done through logging and replaying. The first who used replaying to transport thread execution trough the network - that I may know - is Jeff Epstein using Cloud Haskell and my Workflow library. The workflow library log the intermediate results of a monadic sequence. Using the log it is possible to recover the execution state. After replaying the log, the program will be executing a closure identical to the one that was when it was interrupted.

Migration of continuations or strong mobility of the kind of beamTo was done at an early time in Haskell by Dubois et al. It is a shame that this project was not continued. Probably one of the reasons of the abandonment is related with something that is mentioned in the paper in the point 3; Moving the execution state includes everything, upto registers, stack and memory. This happens also for every language that compiles to machine code. The binary serialization of state may be huge and this cause many problems. Moreover, the source and destination architecture, and the binary program must be identical. Static closures or to be more mundane, pointers to static functions, defined at compilation time can be transmitted to invoke foreign code provided that both source and destination are two identical binary executables. That is the mechanism used in Cloud Haskell.

logging and replaying is lighter than execution state serialization and deserialization. It is also architecture independent and the effect is the same. The replaying of the log reconstruct the execution state. That includes the values of all the local variables. If the log is transferred, the state can be recovered in another node, even if it has different architecture.

It is important to understand the freedom that this mechanism gives. For example callTo is not a RPC call, as is the case in cloud haskell. The remotely executed routine has access to all the local variables (that have been logged) of the caller, so I can do this:

do 
    name <- step input
    callTo host  node  $ writeIORef refname name
    callTo host' node' $ writeIORef refname name
    ...

This example writes the value of name in the variables of each remote node and resume the execution of the calling action.

The above code is executed sequentially. To do it in parallel for all the nodes:

do 
    name <- step input
    (,) <$> callTo host node (writeIORef refname name) <*> callTo host' node' (writeIORef refname name) 

or more generally, using the monoid instance of Transient:

do 
    name <- step input
    fold (<>) mempty $ map (\(h,n) -> callTo h  n  . writeIORef $ refname name) nodes

step is the primitive that update the log and replay it. It must appear explicitly in all the previous monadic statements before the moving primitives. In this toy implementation step is not enforced, so if a monadic statement has no step, it will be executed in the receiving node. That may be intended, in order to access remote variables for example.

Logging

Logging as an effect in the Transient monad. It is implemented now in the Transient.Logged module. Currently it is only used for remote execution, but it will be used for other purposes. It has sophisticated optimizations: for example when a long action return a value, it is possible to erase his log and replace it with the value returned, so the log/replay mechanism must not store and execute each and every intermediate result but only the top level ones. But if a failure happens deeper inside this action, then the detail log of this particular action will be saved.

Implementation details

I did it using the Transient monad. It permits a much cleaner code, since it does manage state, events and threading implicitly, without breaking the specification in separate actions.

The user program executes in two ways: directly at the main procedure and when a request arrive. This is in the first line of this Transient expression.

beamInit port program=   do
    listen port  <|> return ()
    (program >> empty)  <|> close
    where
    close= do
       (h,sock) <- getSData
       liftIO $ hClose h  `catch` (\(e::SomeException) -> sClose sock)

beamInit initialize the user program in these two modes.

in the first line, listen generates a thread and continues the execution when a request arrives. the second operand, return() just executes the continuation in "normal" mode, not when a request arrive. This means that in this mode program will be executed without a log, so the statements of the programs will be executed and logged. In the other side, when listen receive a log, the program is replayed from the log.

The next sentence is the user program. At the end, if the thread was initiated by listen, the handle is closed. If there is no such data, getSData will stop the execution.

To see the execution model of Transient, see the previous articles of transient in my FPcomplete space. A Transient process can be stopped in a thread and be continued in another thread when an event arrives.

listen is defined as:

listen :: PortID -> TrasientIO ()
listen  port = do
       sock <- liftIO $ listenOn  port
       (h,_,_) <- spawn $ accept sock
       liftIO $ hSetBuffering h LineBuffering
       slog <- liftIO $ hGetLine  h
       setSData (h,sock)
       setSData $ Log True $ read slog

spawn is the Transient primitive that launch a thread for each accepted message. the process continues within this new thread. The message has an execution log, that is stored in the session state. The (handler,socket) is also stored.

More info about the mechanism of logging and recovery below.

This is beamTo:

beamTo :: HostName -> PortID -> TransientIO ()
beamTo node port= do
      Log rec log <- getSData <|> return (Log False [])
      if rec then return () else do
          h <- liftIO $ connectTo node port
          liftIO $ do
             hSetBuffering h LineBuffering
             hPutStrLn h (show $ reverse log)  >> hFlush h
             hClose h
          delSData h
          stop

The other two moving primitives are similar.

beamTo get the log, if it is running in normal mode (that means not in recovery mode) it just send the log to the receiving node. Then it closes the handler and remove the handler from the session state. Then it finish.

Supervision

all the network processes are initiated by listen responding to received messages. listen and all the running processes will be killed if the parent thread or any of their ancestors call a kill childs primitive or if they die.

Typically as the name indicates, the Transient processes, including the distributed ones are short lived. Since there is no inversion of control, the processes and the communications are created and destroyed for the needs of the main monadic process while it advance in the execution.

For example, the map-reduce snippet does not need some special receiving dispatcher processes waiting permanently in the remote node. The only watching process is the one of listen which can schedule many different calls downstream. Once finished the work, the process dies.

There is no need of monitoring for long living processes although it is necessary the detection of failed communications in order to reschedule them.

The example program, how to run it

The program has two options: one uses callTo to read an IORef in the other program. The other uses beamTo to input a string locally and then moves to the other node where it print a message and update the IORef. The other two are the ones mentioned above.

In this example the other node is not only in the same machine, but in the same executable. In the example listen is invoked and later some distribute primitives are invoked against the listen port. A single machine is not the rigth environment for testing distributed computing but this is all I can do with here in the School of Haskell. I´m very grateful to you, FP guys. You are doing a great job and a big service to the Haskell community.

By executing the options in this order call -> move -> call, you can see how the interaction among nodes takes place. This demo was intended for two different processes instead of one, but the FPcomplete environment does not permit it.

As always, the distributed examples have been mixed with the previous ones that demonstrate other effects of the transient monad. This show the wonderful composability of this way of programming.

simply press "distr" and choose one of the examples.

You can also intentionally provoke some errors to see how the absence of monad transformers simplify the task of solving bugs.


{-# START_FILE Main.hs #-}

{-# LANGUAGE ScopedTypeVariables #-}

{-# LANGUAGE DeriveDataTypeable  #-}

module Main where

import           Data.Typeable
import           Base
import           Backtrack
import           Indeterminism
import           Logged
import           Move
import           Vars
import           Control.Applicative
import           Control.Concurrent
import           Control.Exception
import           Control.Monad.State
import           Data.Monoid
import           System.IO.Unsafe
import           System.Directory
import           System.FilePath
import           Network.HTTP
import           Network
import           System.IO
import           Data.IORef

import Data.List hiding (find,map, group)

-- show
main= keep $ do
      oneThread $ option "main" "to kill previous spawned processes and return to the main menu"   <|> return ""
      liftIO $ putStrLn "MAIN MENU"

      nonDeterminsm <|> trans <|>
             colors <|> app   <|>
            futures <|> server <|> 
            distributed <|> pubSub

-- /show

solveConstraint=  do
      x <- choose  [1,2,3]
      y <- choose  [4,5,6]

      guard $ x * y == 8

      return (x,y)

pythags = freeThreads $ do
  x <- choose [1..50]
  y <- choose ([1..x] :: [Int])
  z <- choose [1..round $ sqrt(fromIntegral $ 2*x*x)]

  guard (x*x+y*y==z*z)
  th <- liftIO $  myThreadId
  return (x, y, z, th)

example1= do
    option "ex1" "example 1"
    r <- threads 4 solveConstraint
    liftIO $ print r

example2= do
    option "pyt" "pythagoras"
    r<- threads 2 pythags
    liftIO $ print r

collectSample= threads 4 $ do
    option "coll" "group sample: return results in a list"
    r <- collect 0 $ do
      x <- choose  [1,2,3]
      y <- choose  [4,5,6]
      th <- liftIO $ threadDelay 1000 >> myThreadId

      return (x,y,th)

    liftIO $ print r

threadSample= do
     option "th" "threads sample"
     liftIO $ print "number of threads? (< 10)"

     n <- input  ( < 10)

     threads n $ do
        x <- choose  [1,2,3]
        y <- choose  [4,5,6]
        th <- liftIO $ myThreadId
        liftIO $ print (x,y,th)

nonDeterminsm= do
      option "nondet" "Non determinism examples"
      example1 <|> example2
               <|> collectSample
               <|> threadSample
               <|> fileSearch



find' :: String -> FilePath -> TransientIO FilePath
find' s d = do
  fs <- liftIO $ getDirectoryContents d
       `catch` \(e:: SomeException) -> return []       -- 1
  let fs' = sort $ filter (`notElem` [".",".."]) fs    -- 2
  if any (== s) fs'                                    -- 3
     then do
       liftIO $ print $ d </> s
       return $ d</> s
     else do
       f <- choose fs'                                 -- 4
       let d' = d </> f                                -- 6
       isdir <- liftIO $ doesDirectoryExist d'         -- 7
       if isdir then find' s d'                        -- 8
                else stop


------------------

fileSearch=   do
    option "file" "example of file search"
    r<- threads 3 $ collect 10 $ find' "Main.hs"  "."
    liftIO $ putStrLn $ "SOLUTION= "++ show  r
--    exit







trans= do
       option "trans" "transaction examples with backtracking for undoing actions"
       transaction <|> transaction2

transaction=   do
       option "back" "backtracking test"
       productNavigation
       reserve
       payment

transaction2= do
       option "back2" "backtracking test 2"
       productNavigation
       reserveAndSendMsg
       payment


       liftIO $ print "done!"


productNavigation = liftIO $ putStrLn "product navigation"

reserve= liftIO (putStrLn "product reserved,added to cart")
                 `onUndo` liftIO (putStrLn "product un-reserved")

payment = do
           liftIO $ putStrLn "Payment failed"
           undo

reserveAndSendMsg= do
            reserve
            liftIO  (putStrLn "update other database necesary for the reservation")
                 `onUndo` liftIO (putStrLn "database update undone")



colors :: TransientIO ()
colors= do
       option "colors" "choose between three colors"
       r <-  color 1  "red"  <|> color 2 "green" <|> color 3 "blue"
       liftIO $ print r
       where
       color :: Int -> String -> TransientIO String
       color n str= do
         option (show n) str
         liftIO . print $ str ++ " color"
         return  str

app :: TransientIO ()
app= do
       option "app" "applicative expression that return a counter in 2-tuples every second"
       liftIO $ putStrLn "to stop the sequence, write main(enter)"
       counter <- liftIO $ newMVar 0

       r <-  (,) <$>  number  counter 1 <*> number counter 1


       liftIO $ putStrLn $ "result=" ++ show r
       where
       number counter n= waitEvents $ do
          threadDelay $ n * 1000000
          n <- takeMVar counter
          putMVar counter (n+1)
          return n

futures= do
       option "async" "for parallelization of IO actions with applicative and monoidal combinators"
       sum1 <|> sum2

sum1 :: TransientIO ()
sum1= do
       option "sum1" "access to two web pages concurrently and sum the number of words using Applicative"
       liftIO $ print " downloading data..."
       (r,r') <- (,) <$> async  (worker "http://www.haskell.org/")
                     <*> async  (worker "http://www.google.com/")

       liftIO $ putStrLn $ "result="  ++ show  (r + r')

getURL= simpleHTTP . getRequest

worker :: String -> IO Int
worker url=do
      r <- getURL url
      body <- getResponseBody r
      putStrLn $ "number of words in " ++ url ++" is: " ++ show(length (words body))
      return . length . words $ body

sum2 :: TransientIO ()
sum2= do
      option "sum2" "access to N web pages concurrenty and sum the number of words using map-fold"
      liftIO $ print " downloading data..."
      rs <- foldl (<>) (return 0) $ map (async . worker)
                  [ "http://www.haskell.org/"
                  , "http://www.google.com/"]

      liftIO $ putStrLn $ "result="  ++ show rs

instance Monoid Int where
      mappend= (+)
      mempty= 0

server :: TransientIO ()
server=  do
       option "server" "A web server in the port 8080"
       liftIO $ print "Server Stated"
       sock <-  liftIO $  listenOn $ PortNumber 8080

       (h,_,_) <- spawn $ accept sock `catch` (\(e::SomeException) -> sClose sock >> throw e)
       liftIO $ do
           hPutStr h msg
           putStrLn "new request"
           hFlush h
           hClose h
         `catch` (\(e::SomeException) -> sClose sock)

msg = "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"


-- show

-- distributed computing

distributed= do
      option "distr" "examples of distributed computing"
      let port1 = PortNumber 2000


      addNodes [(host,port1)]
      listen port1 <|> return ()-- conn port1 port1 <|> conn port2 port1

      examples' host port1
      where
      host= "localhost"
      conn p p'=  connect host p host p'

examples' remoteHost remotePort= do
   logged $ option "maind"  "to see this menu" <|> return ""
   r <-logged    $ option "move" "move to another node"
               <|> option "call" "call a function in another node"
               <|> option "chat" "chat"
               <|> option "netev" "events propagating trough the network"
   case r of
       "call"  -> callExample remoteHost remotePort
       "move"  -> moveExample remoteHost remotePort
       "chat"  -> chat
       "netev" -> networkEvents remoteHost remotePort


callExample host port= do
   logged $ putStrLnhp  port "asking for the remote data"
   s <- callTo host port $ liftIO $ do
                       putStrLnhp  port "remote callTo request"
                       readIORef environ


   liftIO $ putStrLn $ "resp=" ++ show s


environ= unsafePerformIO $ newIORef "Not Changed"

moveExample host port= do
   logged $ putStrLnhp  port "enter a string. It will be inserted in the other node by a migrating program"
   name <- logged $ input (const True)
   beamTo host port
   putStrLnhp  port "moved!"
   putStrLnhp  port $ "inserting "++ name ++" as new data in this node"
   liftIO $ writeIORef environ name
   return()


chat ::  TransIO ()
chat  = do
    name  <- logged $ do liftIO $ putStrLn "Name?" ; input (const True)
    text <- logged $  waitEvents  $ putStr ">" >> hFlush stdout >> getLine' (const True)
    let line= name ++": "++ text
    clustered $   liftIO $ putStrLn line


networkEvents rh rp= do
     logged $  do
       putStrLnhp  rp "callTo is not  a simole remote call. it stablish a connection"
       putStrLnhp  rp "between transient processes in different nodes"
       putStrLnhp  rp "in this example, events are piped back from a remote node to the local node"

     r <- callTo rh rp $ do
                         option "fire"  "fire event"
                         return "event fired"
     putStrLnhp  rp $ r ++ " in remote node"

putStrLnhp p msg= liftIO $ putStr (show p) >> putStr " ->" >> putStrLn msg

-- /show

pubSub=  do
  option "pubs" "an example of publish-suscribe using Event Vars (EVars)"
  v <- newEVar  :: TransIO (EVar String)
  suscribe v <|> publish v
  where  
  publish v= do
    liftIO $ putStrLn "Enter a message to publish"
    msg <- input(const True)
    writeEVar v msg
    liftIO $ print "after writing the EVar"

  suscribe :: EVar String -> TransIO ()
  suscribe v= proc1 v  <|>  (proc2 v)
   

  proc1 v=  do
    msg <- readEVar v 
    liftIO $ putStrLn $  "proc1 readed var: " ++ show msg
    
  proc2 v= do
    msg <- readEVar v 
    liftIO $ putStrLn $ "proc2 readed var: " ++ show msg

{-# START_FILE Base.hs #-}
-- show
-- /show
{-# LANGUAGE ScopedTypeVariables #-}
-----------------------------------------------------------------------------
--
-- Module      :  Base
-- Copyright   :
-- License     :  GPL (Just (Version {versionBranch = [3], versionTags = []}))
--
-- Maintainer  :  agocorona@gmail.com
-- Stability   :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE DeriveDataTypeable        #-}

module Base  where


import           Control.Applicative
import           Control.Monad.State
import           Data.Dynamic
import qualified Data.Map               as M
import           Data.Monoid
import           Debug.Trace
import           System.IO.Unsafe
import           Unsafe.Coerce
import           Control.Exception
import           Control.Concurrent
import           Control.Concurrent.STM
import           System.Mem.StableName
import           Data.Maybe
import           GHC.Conc
import           Data.List
import           Data.IORef


{-# INLINE (!>) #-}
(!>) = const. id --  flip trace
infixr 0 !>

data TransIO  x = Transient  {runTrans :: StateT EventF IO (Maybe x)}
type SData= ()

type EventId= Int

type TransientIO= TransIO

data EventF  = forall a b . EventF{xcomp       :: TransientIO a
                                  ,fcomp       :: [b -> TransientIO b]
                                  ,mfData      :: M.Map TypeRep SData
                                  ,mfSequence  :: Int
                                  ,threadId    :: ThreadId
                                  ,freeTh      :: Bool
                                  ,parent      :: Maybe EventF
                                  ,children    :: TVar[EventF]
                                  ,maxThread   :: Maybe (P Int)
                                  }
                                  deriving Typeable

type P= IORef
newp= newIORef


--(=:) :: P a  -> (a -> a) -> IO()
(=:) n f= liftIO $ atomicModifyIORef' n $  \v ->  ((f v),())

addr x= show $ unsafePerformIO $ do
       st <- makeStableName $! x
       return $ hashStableName st




instance MonadState EventF  TransientIO where
  get=  Transient $ get >>= return . Just
  put x= Transient $ put x >> return (Just ())

type StateIO= StateT EventF  IO

--type TransientIO= Transient StateIO

--runTrans ::  TransientIO x -> StateT EventF  IO (Maybe x)
--runTrans (Transient mx) = mx


runTransient :: TransientIO x -> IO (Maybe x, EventF)
runTransient t= do

  th <- myThreadId
  let eventf0=  EventF  empty [] M.empty 0
          th False  Nothing  (unsafePerformIO $ newTVarIO []) Nothing


  runStateT (runTrans t) eventf0{threadId=th} !> "MAIN="++show th





getCont ::(MonadState EventF  m) => m EventF
getCont = get


runCont :: EventF -> StateIO ()
runCont (EventF  x fs _ _  _ _  _ _ _)= runTrans ((unsafeCoerce x') >>= compose ( fs)) >> return ()
    where
    x'=  do
--           modify $ \s -> s{replay=True}
           r<- x
--           modify $ \s -> s{replay=False}
           return r

{-
runCont cont= do
     mr <- runClosure cont
     case mr of
         Nothing -> return Nothing
         Just r -> runContinuation cont r
-}

compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs



runClosure :: EventF -> StateIO (Maybe a)
runClosure (EventF x _ _ _ _ _ _ _ _) =  unsafeCoerce $ runTrans x

runContinuation ::  EventF -> a -> StateIO (Maybe b)
runContinuation (EventF _ fs _ _ _ _  _ _ _) x= runTrans $  (unsafeCoerce $ compose $  fs) x

instance   Functor TransientIO where
  fmap f mx=   -- Transient $ fmap (fmap f) $ runTrans mx
    do
     x <- mx
     return $ f x

instance Applicative TransientIO where
  pure a  = Transient . return $ Just a


  f <*> g = Transient $ do

         rf <- liftIO $ newIORef Nothing
         rg <- liftIO $ newIORef Nothing   -- !> "NEWIOREF"

         cont@(EventF _ fs a b c d peers children g1) <- get   -- !> "APLICATIVE DOIT"

         let
             appg x = Transient $  do
                   liftIO $ writeIORef rg $ Just x :: StateIO ()
                   k <- liftIO $ readIORef rf
                   return $ k <*> Just x  -- !> "RETURNED: " ++ show(isJust k)++ show(isJust x)


             appf k = Transient $  do
                   liftIO $ writeIORef rf  $ Just k :: StateIO ()
                   x<- liftIO $ readIORef rg
                   return $ Just k <*> x  --  !> "RETURNED: " ++ show(isJust k)++ show(isJust x)



         put $ EventF f (unsafeCoerce appf:  fs)
                                          a b c d peers children g1
         k <- runTrans f
         was <- getSessionData `onNothing` return NoRemote
         if was== WasRemote
           then  return Nothing
           else do
             liftIO $ writeIORef rf  k -- :: StateIO ()

             mfdata <- gets mfData
             put $ EventF g (unsafeCoerce appg :  fs) mfdata b c d peers  children g1


             x <- runTrans g              !> "RUN g"
             liftIO $ writeIORef rg  x
             return $ k <*> x


data IDynamic= IDyns String | forall a.(Read a, Show a,Typeable a) => IDynamic a

instance Show IDynamic where
  show (IDynamic x)= show $ show x
  show (IDyns s)= show s

instance Read IDynamic where
  readsPrec n str= map (\(x,s) -> (IDyns x,s)) $ readsPrec n str


type Recover= Bool
type CurrentPointer= [LogElem]
type LogEntries= [LogElem]
data LogElem=  WaitRemote | Exec | Step IDynamic deriving (Read,Show)
data Log= Log Recover  CurrentPointer LogEntries deriving Typeable


instance  Alternative TransientIO where
  empty = Transient $ return  Nothing
  (<|>) = mplus

--  Transient f <|> Transient g= Transient $ do
--         k <-   f
--         x <-   g
--         return $ k <|> x

data RemoteStatus=  WasRemote | NoRemote deriving (Typeable, Eq)

instance MonadPlus TransientIO where
    mzero= empty
    mplus  x y=  Transient $ do
         mx <- runTrans x    -- !> "RUNTRANS11111"
         was <- getSessionData `onNothing` return NoRemote
         if was== WasRemote
           then return Nothing
           else case mx of
             Nothing -> runTrans y   --  !> "RUNTRANS22222"
             justx -> return justx

-- | a sinonym of empty that can be used in a monadic expression. it stop the
-- computation
stop :: TransientIO a
stop= Control.Applicative.empty

instance Monoid a => Monoid (TransientIO a) where
  mappend x y = mappend <$> x <*> y
  mempty= return mempty

-- | set the current closure and continuation for the current statement
setEventCont ::   TransientIO a -> (a -> TransientIO b) -> StateIO ()
setEventCont x f  = do
   st@(EventF   _ fs d n  r applic  ch rc bs)  <- get
   put $ EventF x ( unsafeCoerce f : fs) d n  r applic  ch rc bs


-- | reset the closure and continuation. remove inner binds than the prevous computations may have stacked
-- in the list of continuations.
resetEventCont :: Maybe a -> StateIO ()
resetEventCont mx =do
   st@(EventF _ fs d n  r nr  ch rc bs)  <- get

   let f= \mx ->  case mx of
                       Nothing -> empty
                       Just x  -> (unsafeCoerce $ head fs)  x
   put $ EventF  (f mx) ( tailsafe fs)d n  r nr  ch rc bs
   where
   tailsafe []=[]
   tailsafe (x:xs)= xs


instance Monad TransientIO where
      return x = Transient $ return $ Just x
      x >>= f  = Transient $ do
        setEventCont x  f

        mk <- runTrans x
        resetEventCont mk
        case mk of
           Just k  -> do
               runTrans $ f k

           Nothing -> return Nothing




--instance MonadTrans (Transient ) where
--  lift mx = Transient $ mx >>= return . Just

instance MonadIO TransientIO where
  liftIO x = Transient $ liftIO x >>= return . Just --     let x= liftIO io in x `seq` lift x




-- Threads

waitQSemB sem= atomicModifyIORef' sem $ \n -> if n > 0 then(n-1,True) else (n,False)
signalQSemB sem= atomicModifyIORef' sem  $ \n ->  (n + 1,())

-- | set the maximun number of threads for a procedure. It is useful for the
threads :: Int -> TransientIO a -> TransientIO a
threads n proc= Transient $ do
   msem <- gets maxThread
   sem <- liftIO $ newIORef n
   modify $ \s -> s{maxThread= Just sem}
   r <- runTrans proc
   modify $ \s -> s{maxThread = msem} -- restore it
   return r
-- | delete all the previous childs generated by the expressions and continue execution
-- of the current thread.
oneThread :: TransientIO a -> TransientIO a
oneThread comp=  do
   chs <- liftIO $ newTVarIO []
   r <- comp
   modify $ \ s -> s{children= chs}
   killChilds
   return r



-- | The threads generated in the process passed as parameter will not be killed.
freeThreads :: TransientIO a -> TransientIO a
freeThreads proc= Transient $ do
     st <- get
     put st{freeTh= True}
     r <- runTrans proc
     modify $ \st -> st{freeTh= freeTh st}
     return r

-- | The threads will be killed when the parent thread dies. That is the default
-- This can be invoked to revert the effect of `freeThreads`
hookedThreads :: TransientIO a -> TransientIO a
hookedThreads proc= Transient $ do
     st <- get
     put st{freeTh= False}
     r <- runTrans proc
     modify $ \st -> st{freeTh= freeTh st}
     return r


-- | kill all the child processes
killChilds :: TransientIO()
killChilds= Transient $  do
   cont <- get
   liftIO $  killChildren cont
   return $ Just ()


-- | Get the session data of the desired type if there is any.
getSessionData ::  (MonadState EventF m,Typeable a) =>  m (Maybe a)
getSessionData =  resp where
 resp= gets mfData >>= \list  ->
    case M.lookup ( typeOf $ typeResp resp ) list  of
      Just x  -> return . Just $ unsafeCoerce x
      Nothing -> return $ Nothing
 typeResp :: m (Maybe x) -> x
 typeResp= undefined

-- | getSessionData specialized for the View monad. if Nothing, the
-- monadic computation does not continue. getSData is a widget that does
-- not validate when there is no data of that type in the session.
--
-- If there is no such data, `getSData`  silently stop the computation.
-- That may or may not be the desired behaviour.
-- To make sure that this does not get unnoticed, use this construction:
--
-- >  getSData <|> error "no data"
getSData ::  Typeable a => TransIO  a
getSData= Transient getSessionData


-- | set session data for this type. retrieved with getSessionData orr getSData
setSessionData ::  (MonadState EventF m, Typeable a) => a -> m ()
setSessionData  x= do
  let t= typeOf x in  modify $ \st -> st{mfData= M.insert  t (unsafeCoerce x) (mfData st)}

-- | a shorter name for setSessionData
setSData ::  ( MonadState EventF m,Typeable a) => a -> m ()
setSData= setSessionData

delSessionData x=
  modify $ \st -> st{mfData= M.delete (typeOf x ) (mfData st)}

delSData :: ( MonadState EventF m,Typeable a) => a -> m ()
delSData= delSessionData

withSData ::  ( MonadState EventF m,Typeable a) => (Maybe a -> a) -> m ()
withSData f= modify $ \st -> st{mfData=
    let dat = mfData st
        mx= M.lookup typeofx dat
        mx'= case mx of Nothing -> Nothing; Just x -> unsafeCoerce x
        fx=  f mx'
        typeofx= typeOf $ typeoff f
    in  M.insert typeofx  (unsafeCoerce fx) dat}
    where
    typeoff :: (Maybe a -> a) -> a
    typeoff = undefined
----

genNewId :: MonadIO m => MonadState EventF m =>  m Int
genNewId=  do
          st <- get

          let n= mfSequence st
          put $ st{mfSequence= n+1}
          return n

refSequence :: IORef Int
refSequence= unsafePerformIO $ newp 0



data Loop= Once | Loop | Multithread deriving Eq

waitEvents ::   IO b -> TransientIO b
waitEvents io= do
   r <- parallel (Right <$> io)
   killChilds
   return r


async  ::  IO b -> TransientIO b
async io= do
   r <- parallel  (Left <$>io)
   killChilds
   return r

spawn ::  IO b -> TransientIO b
spawn io= freeThreads $ do
   r <- parallel (Right <$>io)
   return r

data EventValue= EventValue SData deriving Typeable

parallel  ::    IO (Either b b) -> TransientIO b
parallel  ioaction= Transient$   do
    cont <- getCont                    -- !> "PARALLEL"
    mv <- getSessionData
    case mv  of
     Just (EventValue v)  -> do
        delSessionData $ EventValue () -- !> "ISJUST "
        return  $ Just $ unsafeCoerce v
     Nothing -> do
        liftIO  $ loop cont    ioaction
        return Nothing




loop (cont'@(EventF x fs a b c d peers childs g))  rec  =  do
  chs <- liftIO $ newTVarIO []
  let cont = EventF x fs a b c d (Just cont') chs g
      iocont dat=
          runStateT ( do
             setSessionData . EventValue $ unsafeCoerce dat
             runCont cont
             ) cont
             >> return ()
      loop'= do
        mdat <- rec
        case mdat of
         Left dat -> iocont dat

         Right dat -> do
              forkMaybe cont $ iocont dat
              loop'

  forkMaybe  cont loop'
  where
      forkMaybe cont proc = do
         dofork <- case maxThread cont of
                  Nothing -> return True
                  Just sem -> do
                    dofork <- waitQSemB sem
                    if dofork then  return True else return False

         if dofork
            then  do
                 th <- forkFinally proc $ \me -> do
                         case me of -- !> "THREAD ENDED" of
                          Left  e -> do
                           when (fromException e /= Just ThreadKilled)$ liftIO $ print e
                           killChildren  cont  !> "KILL RECEIVED" ++ (show $ unsafePerformIO myThreadId)

                          Right _ -> do
                             --  if parent is alive
                             --  then remove himself from the list (with free)
                             --  and pass his active children to his parent
                             return ()
                             th <- myThreadId
                             mparent <- free th cont
                             case mparent of
                              Nothing  ->  return()
                              Just parent -> atomically $ do
                                     chs' <- readTVar $ children cont
                                     chs  <- (readTVar $ children parent)
                                     writeTVar (children parent)$ chs ++ chs'
                                     return ()

                         case maxThread cont of
                           Just sem -> signalQSemB sem
                           Nothing -> return ()


                 addThread cont' cont{threadId=th}  -- !>  "thread created: "++ show th

            else proc  -- !> "NO THREAD"





free th env= do
  if isNothing $ parent env
   then  return Nothing  !>  show th ++ " orphan"
   else do
    let msibling= fmap children $ parent env

    case msibling of
     Nothing -> return Nothing
     Just sibling  -> do
       found <- atomically $ do
                sbs <- readTVar sibling
                let (sbs', found) = drop [] th  sbs -- !> "search "++show th ++ " in " ++ show (map threadId sbs)
                when found $ writeTVar sibling sbs'
                return found
       if (not found && isJust (parent env))
         then free th $ fromJust $ parent env -- !> "toparent"
         else return $ Just env

   where
   drop processed th []= (processed,False)
   drop processed th (ev:evts)| th == threadId ev= (processed ++ evts, True)
                    | otherwise= drop (ev:processed) th evts


addThread parent child = when(not $ freeTh parent) $ do
   let headpths= children parent
   atomically $ do
       ths <- readTVar headpths
       writeTVar headpths $  child:ths

killChildren  cont = do
     forkIO $ do
        let childs= children cont   !> "killChildren list= "++ addr (children cont)
        ths <- atomically $ do
           ths <- readTVar childs
           writeTVar childs []
           return ths
        mapM_ (killThread . threadId) ths  !> "KILLEVENT " ++ show (map threadId ths)
     return ()


type EventSetter eventdata response= (eventdata ->  IO response) -> IO ()
type ToReturn  response=  IO response
react
  :: Typeable eventdata
  => EventSetter eventdata response
  -> ToReturn  response
  -> TransientIO eventdata

react setHandler iob= Transient $ do
        cont    <- getCont
        mEvData <- getSessionData
        case mEvData of
          Nothing -> do
            liftIO $ setHandler $ \dat ->do
--              let cont'= cont{mfData = M.insert (typeOf dat)(unsafeCoerce dat) (mfData cont)}
              runStateT (setSData dat >> runCont cont) cont
              iob
            return Nothing
          Just dat -> delSessionData dat >> return (Just  dat)


-- * non-blocking keyboard input

getLineRef= unsafePerformIO $ newTVarIO Nothing


roption= unsafePerformIO $ newMVar []

-- | install a event receiver that wait for a string and trigger the continuation when this string arrives.
option :: (Typeable b, Show b, Read b, Eq b) =>
     b -> [Char] -> TransientIO b
option ret message= do
    let sret= show ret

    liftIO $ putStrLn $ "Enter  "++sret++"\tto: " ++ message
    liftIO $ modifyMVar_ roption $ \msgs-> return $ sret:msgs
    waitEvents  $ getLine' (==ret)
    liftIO $ putStrLn $ show ret ++ " chosen"
    return ret


-- | validates an input entered in the keyboard in non blocking mode. non blocking means that
-- the user can enter also anything else to activate other option
-- unlike `option`, input only wait for one valid response
input :: (Typeable a, Read a) => (a -> Bool) -> TransientIO a
input cond= Transient . liftIO . atomically $ do
       mr <- readTVar getLineRef
       case mr of
         Nothing -> retry
         Just r ->
            case reads1 r  of
            (s,_):_ -> if cond s  --  !> show (cond s)
                     then do
                       writeTVar  getLineRef Nothing -- !>"match"
                       return $ Just s

                     else return Nothing
            _ -> return Nothing


getLine' cond=    do
     atomically $ do
       mr <- readTVar getLineRef
       case mr of
         Nothing -> retry
         Just r ->
            case reads1 r of --  !> ("received " ++  show r ++ show (unsafePerformIO myThreadId)) of
            (s,_):_ -> if cond s -- !> show (cond s)
                     then do
                       writeTVar  getLineRef Nothing -- !>"match"
                       return s

                     else retry
            _ -> retry

reads1 s=x where
      x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
      typeOfr :: [(a,String)] ->  a
      typeOfr  = undefined

inputLoop=  do
    putStrLn "Press end to exit"
    inputLoop'  -- !> "started inputLoop"
    where

    inputLoop'= do
           r<- getLine
           if r=="end" then putMVar rexit () else do
              let rs = breakSlash [] r
              mapM_ (\ r -> do threadDelay 1000
                               atomically . writeTVar  getLineRef $ Just r) rs
              inputLoop'

    breakSlash :: [String] -> String -> [String]
    breakSlash s ""= s
    breakSlash res s=
      let (r,rest) = span(/= '/') s
      in breakSlash (res++[r]) $ tail1 rest
      where
      tail1 []=[]
      tail1 x= tail x

rexit= unsafePerformIO newEmptyMVar

stay=  takeMVar rexit

keep mx = do
   forkIO $ inputLoop
   forkIO $ runTransient mx  >> return ()
   stay

exit :: TransientIO a
exit= do
  liftIO $ putStrLn "Tempus fugit: exit"
  liftIO $ putMVar rexit   True
  return undefined

onNothing iox iox'= do
       mx <- iox
       case mx of
           Just x -> return x
           Nothing -> iox'

{-# START_FILE Backtrack.hs #-}
-- show
-- /show

{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
module Backtrack (registerUndo, onUndo, undo, retry, undoCut) where

import Base
import Data.Typeable
import Control.Applicative
import Control.Monad.State
import Unsafe.Coerce
import System.Mem.StableName

data Backtrack= forall a b.Backtrack{backtracking :: Bool
                                    ,backStack :: [EventF]}
                                    deriving Typeable

-- | assures that backtracking will not go further
undoCut :: TransientIO ()
undoCut= Transient $ do
     delSessionData $ Backtrack False []
     return $ Just ()

-- | the secod parameter will be executed when backtracking 
{-# NOINLINE onUndo #-}
onUndo :: TransientIO a -> TransientIO a -> TransientIO a
onUndo ac bac= do
   r<-registerUndo $ Transient $ do 
     Backtrack back _ <- getSessionData `onNothing` return (Backtrack False [])
     runTrans $ if back then bac  else ac 
   return r
   
-- | register an actions that will be executed when backtracking
{-# NOINLINE registerUndo #-}
registerUndo :: TransientIO a -> TransientIO a
registerUndo f  = Transient $ do
   cont@(EventF x _ _ _ _ _ _ _ _)  <- get   !> "backregister"
   md  <- getSessionData 
   ss <- case md of
        Just (bss@(Backtrack b (bs@((EventF x'  _ _ _ _ _ _ _ _):_)))) -> do
            addrx  <- addr x
            addrx' <- addr x'         -- to avoid duplicate backtracking points
            return $ if addrx == addrx' then bss else  Backtrack b $ cont:bs
        Nothing ->  return $ Backtrack False [cont]
   setSessionData ss
   runTrans f
   where
   addr x = liftIO $ return . hashStableName =<< (makeStableName $! x)

-- | restart the flow forward from this point on
retry :: TransientIO ()
retry= do
    Backtrack _ stack <- getSessionData `onNothing` return (Backtrack False [])
    setSData $ Backtrack False stack

-- | execute backtracking. It execute the registered actions in reverse order. 
--
-- If the backtracking flag is changed the flow proceed  forward from that point on. 
--
--If the backtrack stack is finished or undoCut executed, `undo` will stop.
undo :: TransientIO a
undo= Transient $ do
  bs <- getSessionData  `onNothing` return nullBack            !>"GOBACK"
  goBackt  bs

  where
  nullBack= Backtrack False []
  goBackt (Backtrack _ [])= return Nothing                     !> "END"
  goBackt (Backtrack b (stack@(first@(EventF x fs _ _  _ _ _ _ _): bs)))= do
--        put first{replay=True} 
        setSData $ Backtrack True stack
        mr <-  runClosure first                                !> "RUNCLOSURE"
        Backtrack back _ <- getSessionData `onNothing` return nullBack 
                                                               !>"END RUNCLOSURE"
        case back of
           True ->  goBackt $ Backtrack True bs                !> "BACK AGAIN"
           False -> case mr of
                   Nothing -> return empty                     !> "FORWARD END"
                   Just x ->  runContinuation first x          !> "FORWARD EXEC"


{-# START_FILE Indeterminism.hs #-}

-- show
-- /show

-----------------------------------------------------------------------------
--
-- Module      :  Transient.Indeterminism
-- Copyright   :
-- License     :  GPL (Just (Version {versionBranch = [3], versionTags = []}))
--
-- Maintainer  :  agocorona@gmail.com
-- Stability   :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE BangPatterns #-}


module Indeterminism (
choose, choose', collect, group --, found
) where

import Base
import Control.Monad.IO.Class
import Data.IORef
import Control.Applicative
import Data.Monoid
import Control.Concurrent
import Data.Typeable
import Control.Monad.State
import Control.Concurrent.STM as STM
import GHC.Conc


-- | slurp a list of values and process them in parallel . To limit the number of processing
-- threads, use `threads`
choose  ::  [a] -> TransientIO a
choose []= empty
choose   xs = do
    evs <- liftIO  $ newIORef xs
    parallel   $ do
           es <- atomicModifyIORef' evs $ \es -> let !tes= tail es in (tes,es)
           case es of
            [x]  -> return $ Left $ head es
            x:_  -> return $ Right x

-- | group the output of a possible mmultithreaded process in groups of n elements.
group :: Int -> TransientIO a -> TransientIO [a]
group num proc =  do
    v <- liftIO $ newIORef (0,[])
    x <- proc
    n <- liftIO $ atomicModifyIORef' v $ \(n,xs) -> let !n'=n +1 in ((n', x:xs),n')
    if n < num
      then stop
      else liftIO $ atomicModifyIORef v $ \(n,xs) ->  ((0,[]),xs)

choose' :: [a] -> TransientIO a
choose'  xs = foldl (<|>) empty $ map (parallel . return . Left) xs


--newtype Collect a= Collect (MVar (Int, [a])) deriving Typeable

-- collect the results of a search done in parallel, usually initiated by
-- `choose` . The results are added to the collection with `found`
--
--


-- execute a process and get the first n solutions.
-- if the process end without finding the number of solutions requested, it return the fond ones
-- if he find the number of solutions requested, it kill the threads of the process and return
-- It works monitoring the solutions found and the number of active threads.
-- If the first parameter is 0, collect will return all the results
collect ::  Int -> TransientIO a -> TransientIO [a]
collect n search=  do
  rv <- liftIO $ atomically $ newTVar (0,[]) !> "NEWMVAR"
  endflag <- liftIO $ newTVarIO False
  st <- get
  let any1 = do
        r <- search   !> "ANY"
        liftIO $ atomically $ do
            (n1,rs) <- readTVar rv
            writeTVar  rv (n1+1,r:rs) !> "MODIFY"
        stop

      detect= freeThreads $ do
          xs <- async $ do
             threadDelay 1000 -- to allow some activity before monitoring it
             atomically $ do
                (n',xs) <- readTVar rv
                ns <- readTVar $ children st

                if (n > 0 && n' >= n) ||  null ns  !> show (n,n') !> (show $ length ns)
                  then return xs
                  else retry

          th <- liftIO $ myThreadId !> "KILL"
          stnow <- get
          liftIO $ killChildren st
          liftIO $ addThread st stnow
          return  xs

  (any1 >> stop)  <|> detect

{-# START_FILE Logged.hs #-}

-- show
-- /show

-----------------------------------------------------------------------------
--
-- Module      :  Transient.Logged
-- Copyright   :
-- License     :  GPL-3
--
-- Maintainer  :  agocorona@gmail.com
-- Stability   :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE  ExistentialQuantification #-}
module Logged  where

import Data.Typeable
import Unsafe.Coerce
import Base
import Control.Applicative
import Control.Monad.IO.Class



fromIDyn :: (Read a, Show a, Typeable a) => IDynamic -> a
fromIDyn (IDynamic x)= unsafeCoerce x

fromIDyn (IDyns s)=r where r= read s !> "read " ++ s ++ "to type "++ show (typeOf r)

toIDyn x= IDynamic x

-- | synonymous of `step`
logged :: (Show a, Read a, Typeable a) => TransientIO a -> TransientIO a
logged= step



-- | write the result of the computation in the log and return it.
-- but if there is data in the internal log, it read the data from the log and
-- do not execute the computation.
--
-- It accept nested step's. The effect is that if the outer step is executed completely
-- the log of the inner steps are erased. If it is not the case, the inner steps are logged
-- this reduce the log of large computations to the minimum. That is a feature not present
-- in the package Workflow.
--
-- >  r <- step $ do
-- >          step this :: TransIO ()
-- >          step that :: TransIO ()
-- >          step thatOther
-- >  liftIO $ print r
--
--  when `print` is executed, the log is just the value of r.
--
--  but when `thatOther` is executed the log is: [Exec,(), ()]
--
step :: (Show a, Read a, Typeable a) => TransientIO a -> TransientIO a
step mx=  do
    Log recover rs full <- getSData <|> return ( Log False  [][])

    case (recover,rs) of
      (True, Step x: rs') -> do
            setSData $ Log recover rs' full
            return $ fromIDyn x  !>  "read in step:" ++ show x

      (True,Exec:rs') -> do
            setSData $ Log recover rs' full
            mx

      (True, WaitRemote:rs') -> do
            setSData (Log recover rs' full) !> "waitRemote2"
            empty

      _ -> do
            let add= Exec:  full
            setSData $ Log False add add
            r <-  mx
            let add= Step (toIDyn r): full
            setSData $ Log False add add
            return  r

{-# START_FILE Move.hs #-}

--show
-- /show

-----------------------------------------------------------------------------
--
-- Module      :  Transient.Move
-- Copyright   :
-- License     :  GPL-3
--
-- Maintainer  :  agocorona@gmail.com
-- Stability   :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE DeriveDataTypeable , ExistentialQuantification
    ,ScopedTypeVariables, StandaloneDeriving #-}
module Move where
import Base
import Logged
import Data.Typeable
import Control.Applicative
import Network
import Network.HTTP
import Control.Monad.IO.Class
import System.IO
import Control.Exception
import Data.Maybe
import Unsafe.Coerce
import System.Process
import System.Directory
import Control.Monad
import Network.Info
import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Data.Monoid
import qualified Data.Map as M
import Data.List (nub)



-- | install in a remote node a haskell package with an executable transient service initialized with `listen`
-- the package, the git repository and the main exectable must have the same name
installService node port servport package= do
  beamTo node port
  liftIO $ do
     let packagename= name package
     exist <- doesDirectoryExist  packagename
     when (not exist) $ do
         runCommand $ "git clone "++ package
         runCommand $ "cd "++ packagename
         runCommand "cabal install"
         createProcess $ shell $ "./dist/build/"++ packagename++"/"++packagename
                                       ++ " " ++ show port
         return()
  where
  name path=
     let x= dropWhile (/= '/') path
     in if x== "" then tail path else name $ tail    x


beamTo :: HostName -> PortID -> TransientIO ()
beamTo host port= do
  Log rec log _ <- getSData <|> return (Log False [][])
  if rec then return () else do
      h <- liftIO $ connectTo host port
      liftIO $ hSetBuffering h LineBuffering
      liftIO $ hPutStrLn h (show $ reverse log) >> hFlush h
      liftIO $ hClose h
      delSData h
      stop

forkTo  :: HostName -> PortID -> TransientIO ()
forkTo host port= do
  Log rec log _<- getSData <|> return (Log False [][])
  if rec then return () else do
      h <- liftIO $ connectTo host port
      liftIO $ hSetBuffering h LineBuffering
      liftIO $ hPutStrLn h (show $ reverse log)  >> hFlush h
      liftIO $ hClose h
      delSData h


callTo :: (Show a, Read a,Typeable a) => HostName -> PortID -> TransIO a -> TransIO a
callTo host port remoteProc= logged $ Transient $ do
--      liftIO $ print "callto"
      Log rec log fulLog <- getSessionData `onNothing` return (Log False [][])
      if rec
         then
          runTrans $ do

               Connection port  h sock <- getSData <|> error "callto: no hander"
               r <- remoteProc !> "executing remoteProc" !> "CALLTO REMOTE" -- LOg="++ show fulLog
               liftIO $ hPutStrLn h (show r) --  `catch` (\(e::SomeException) -> sClose sock)
                -- !> "sent response, HANDLE="++ show h
               setSData WasRemote
               stop


         else do

            h <- liftIO $ connectTo host port
            liftIO $ hPutStrLn h (show $ reverse fulLog) >> hFlush h !> "CALLTO LOCAL" -- send "++ show  log
            let log'= WaitRemote:tail log
            setSessionData $ Log rec log' log'
            runTrans $ waitEvents $ do -- local side
                   liftIO $ hSetBuffering h LineBuffering
                   s <- hGetLine h
--                   hClose h

                   let r = read s

                   return r   !> "read: " ++ s ++" response type= "++show( typeOf r)




data Connection= Connection PortID Handle Socket deriving Typeable

-- | Wait for messages and replay the rest of the monadic sequence with the log received.
listen :: PortID ->  TransIO ()
listen  port = do
       setSData $ Log False [] []
       sock <- liftIO $ withSocketsDo $ listenOn  port

       (h,host,port1) <- parallel $ Right <$> accept sock
                          `catch` (\(e::SomeException) -> sClose sock >> throw e)

       liftIO $  hSetBuffering h LineBuffering  -- !> "LISTEN in "++ show (h,host,port1)

       slog <- Transient $ liftIO $ (Just <$> hGetLine  h)
                          `catch` (\(e::SomeException) -> print "ERR" >>  return Nothing)

       setSData $ Connection port h sock  -- !> "setdata port=" ++ show port

       let log= read slog   -- !> "read1 " ++ slog
       setSData $ Log True log (reverse log)


-- | init a Transient process in a interactive as well as in a replay mode.
-- It is intended for twin processes that interact among them in different nodes.
beamInit :: PortID -> TransIO a -> IO b
beamInit port program=  keep $ do
    listen port   <|> return ()
    program
--    (program >> stop)   <|> close
--    where
--    close= do
--       Connection _ h sock <- getSData
--       liftIO $ hClose h  `catch` (\(e::SomeException) -> sClose sock)




instance Read PortNumber where
  readsPrec n str= let [(n,s)]=   readsPrec n str in [(fromIntegral n,s)]


deriving instance Read PortID
deriving instance Typeable PortID

-- * Level 2: connections node lists and operations with the node list

type Node= (HostName,PortID)

nodeList :: TVar  [Node]
nodeList = unsafePerformIO $ newTVarIO []

deriving instance Ord PortID

getNodes :: TransIO [Node]
getNodes  = Transient $ Just <$> (liftIO $ atomically $ readTVar  nodeList)




addNodes   nodes= Transient . liftIO . atomically $ do
  prevnodes <- readTVar nodeList

  writeTVar nodeList $ nub $ prevnodes ++  nodes
  return $ Just ()



-- | execute a Transient action in each of the nodes connected. The results are mappend'ed
clustered :: (Typeable a, Show a, Read a) => Monoid a => TransIO a -> TransIO a
clustered proc= logged $ do
     nodes <- logged getNodes
     logged $ foldr (<>) mempty $ map (\(h,p) -> callTo h p proc) nodes !> "fold"

-- | Connect to a new node to another. The other node will notify about this connection to
-- all the nodes connected to him. the new connected node will receive the list of connected nodes
-- the nodes will be updated with this list. it can be retrieved with `getNodes`
connect ::   HostName ->  PortID ->  HostName ->  PortID -> TransientIO ()
connect host  port   remotehost remoteport=  do
    listen port <|> return ()
    logged $ do
        logged $ addNodes [(host,port)]
        logged $ liftIO $ putStrLn $ "connecting to: "++ show (remotehost,remoteport)
        host <- logged $ return host
        port <- logged $ return port
        nodes <- callTo remotehost remoteport   $ do
                   clustered $  addNodes [(host, port)]
                   getNodes

        logged $ addNodes nodes
        logged $ liftIO $ putStrLn $ "Connected to modes: " ++ show nodes


{-# START_FILE Vars.hs #-}
-- show
-- /show

{-# LANGUAGE DeriveDataTypeable #-}
module Vars where

import Base
import qualified Data.Map as M
import Data.Typeable

import Control.Concurrent
import Control.Applicative
import Data.IORef
import Control.Monad.IO.Class
import Control.Monad.State

newtype EVars= EVars  (IORef (M.Map Int [EventF])) deriving Typeable

data EVar a= EVar Int (IORef (Maybe a))

-- * concurrency effect 
-- Evars are event vars. `writeEVar` trigger the execution of all the continuations associated to the  `readEVar` of this variable
-- (the code that is after them) as  stack: the most recent reads are executed first.
--
-- It is like the publish-suscribe pattern but without inversion of control, since a readEVar can be inserted at any place in the
-- Transient flow.
--
-- EVars are created upstream and can be used to communicate two subbranches of the monad. Following the Transient philosophy they 
-- do not block his own thread if used with alternative operators, unlike the IO and STM vars. 
-- 

newEVar ::  TransientIO (EVar a)
newEVar  = Transient $ do
   EVars ref <- getSessionData `onNothing`  do
                            ref <- liftIO $ newIORef M.empty
                            setSData $ EVars ref
                            return  (EVars ref) 
   id <- genNewId
   ref <- liftIO $ newIORef Nothing
   return . Just $ EVar id ref

readEVar :: EVar a -> TransIO a
readEVar (EVar id ref1)= Transient $ do 
         cont <- getCont
         EVars ref <- getSessionData `onNothing` error "No Events context"
         map <- liftIO $ readIORef ref
         let Just conts=  M.lookup  id map <|> Just []
         liftIO $ writeIORef ref $  M.insert id (cont:conts) map
         return Nothing
         liftIO $ readIORef ref1
         
writeEVar (EVar id ref1) x= Transient $ do
   EVars ref <- getSessionData `onNothing`  error "No Events context" 
   liftIO $ writeIORef ref1 $ Just x
   map <- liftIO $ readIORef ref
   let Just conts= M.lookup id map <|> Just []
   mapM runCont conts 
   return $ Just ()
 

step (logged is a synonym) is the primitive that perform the logging and recovery. it add entries to the log. But when a log has been received, it return the corresponding entry in the log instead of executing his action. Until there is no more entries in the log. In this case it resumes normal execution and logging.

step :: (Show a, Read a, Typeable a) => TransientIO a -> TransientIO a
step mx= do

    Log recover rs <- getSData <|> return (Log False [])

    if recover && not (null rs)
      then do
        setSData $ Log recover $ tail rs
        return $ fromIDyn $ head rs
      else do
        r <- mx
        setSData . Log False $ toIDyn r:rs
        return r

Note that the user program is a single monadic expression. there is no callbacks neither handlers, there is no routing, no registering of remote procedures, no dispatching.

The dispatching is done by the replaying of the log. As always, I use the Transient philosophy of a complete de-inversion of control.

To execute the example clone the transient package:

git clone https://github.com/agocorona/transient

the user program is in the file main.hs at the end. There is another test program called move.hs that contains some tests of distributed computing for N nodes. The moving services are defined in the module "Transient.Move" in the "src" folder.

The move.hs program can be executed in two or more nodes. The program need three parameters:

program  localPort  remoteHost remotePort

I use it locally in two instances. If I name the program "move":

   runghc -isrc move 8080 localhost 8081
   runghc -isrc move 8081 localhost 8080

Each node can act as master and operate with the other node.

Current limitations and future plans

At this moment, the program must be running in all the nodes, but the first remote service that I will develop with this basic infrastructure will be for moving code. It is not necessary to have all the code in the other node, just the code that will be called remotely. For example, for a web application, the code that interact with the user interface can be in a haskell file. it can be compiled with Haste or GHCJS before sending it to the browser. In cloud applications, if the nodes have different architectures, the remote node will receive the source code and will compile it locally.

Hot swapping of code is also a goal. Once a program is substituted by a new version, replaying from the log would recover the node state.

In the medium-long term my goal is to develop the level 3 and 4 services that can move executions among nodes at runtime, depending on runtime conditions, capabilities of the nodes, failure events, changes in the topology of the network etc. the monitor would execute continuations in any location. That is why the code need to be seamless. The code must not reflect the topology of the network, it must be abstracted from it. That is why all the program is a single monadic expression executed in different nodes, in the same way that currently Transient execute a single monadic expression in different threads. The distributed mechanism just extend it to run the thread in a different node.

Moving code

Installing a new service that respond requests in a remote node is easy in the era of Internet: it is a matter of impersonating in the other node with beamTo and install the corresponding package:

-- | install in a remote node a haskell package with an executable transient service initialized with `listen`
-- the package, the git repository and the main exectable must have the same name

installService node port servport gitrepo= do
  beamTo node port
  liftIO $ do
     let packagename= name gitrepo
     exist <- doesDirectoryExist  packagename
     when (not exist) $ do
         runCommand $ "git clone "++ gitrepo
         runCommand $ "cd "++ packagename
         runCommand "cabal install"
         createProcess $ shell $ "./dist/build/"++ packagename++"/"++packagename
                                       ++ " " ++ show port

Comparison with Cloud Haskell and Erlang

I don't know erlang neither cloud haskell very well. This development is at the beginning and the erlang model is mature. The Erlang model is intentionally low level, it tries to make the network and the communication costs visible. It focuses on reliability

Mine tries to make the network transparent and emphasizes composability and high level programming. An example of composability is in the mapReduce snippet. My goal is to make the network transparent, in the way of Akka .

But akka and Erlang OTP are asynchronous and blocking. Well, akka is event-based. The green threads of Erlang is a higher level way of using callbacks, where the callback is managed by the threads library. That permits a code more readable, since the flow does continue after the blocking call. But that does not change the fact that both Erlang OTP and Akka use a blocking semantics for synchronous calls. That precludes parallel composition of the kind of my mapReduce snippet: simply, there is no way to combine two Erlang or akka elements that work in parallel within the same expression. All the seaming has to be done by hand. Usually by means of auxiliary monitor processes that execute what in haskell would be an applicative expression where the elements are embedded.

For this reason, the Actor model is asynchronous, in order to avoid blocking. Transient does not enforce asynchronous communications since it is non blocking, this permits more integrated and seamless code, more readable and robust since the compiler can verify more invariants.

In fact a distributed transient program is a single monadic, seamless expression or a subexpression of the whole Transient program, replicated in all the nodes totally or partially. This expression is not monolithic as is the case of other reactive frameworks, but made of pieces, each one with a functional meaning.

For example, a sender has no meaning without the receiver. However, they are separate expressions in all the other frameworks. Even if both computations are part of the same object. To use them, the programmer has to do some manual wiring to glue them together. In Transient both sender and receiver are codified in a single expression, that is type checked and composable. That splitting of sender and receiver in Erlang and other cloud solutions is not a feature as such, but the result of a necessity, due to the lack of composability associated with inversion of control.

These are two examples of the ping-pong app in Transient, using the latest version in gitHub:

pingPong :: TransIO ()
pingPong= do
     (node1, node2) <- logged $ return (node1,node2) -- will work with the node1 context
     beamTo node2
     step $ liftIO $ putStrLn "PING"

     beamTo node1
     step $ liftIO $ putStrLn "PONG"
pingPong :: TransIO ()
pingPong= callTo node2  liftIO (putStrLn "PING") >> liftIO (putStrLn "PONG")

It is also the opposite of what Erlang and other similar distributed architectures propose but maintaining their advantages. Transient implement a shared nothing semantics, and the transported data is explicitly labeled with logging calls, but it behaves as if sharing actually happens. Once eliminated the surprise, there is very little cognitive impedance in doing a distributed call using Transient.

The final objective is a cloud monad, or a cloud effect where distribution of resources happens automatically depending on an strategy for the distribution of resources that will be defined by the programmer. The software will be the one that will evaluate the costs. The platform will be agnostic about the concrete architecture; This will depend on the application and will be dynamic.

For example, if a database program has too much load, it can split into two shards in two different nodes, the requests will be redirected to the appropriate node, but this is upto the application logic. The framework will provide facilities for request forwarding and moving code and data.

Concerning inversion of control, the approach is more close to this: http://lampwww.epfl.ch/~odersky/papers/jmlc06.pdf. This paper seems to be a precursor of more advanced works in scala, like the Scala futures and Akka. Although Scala can do half of the inversion of control since the handling code can never return to the main flow, since this main flow is imperative (And the paper makes this clear). They use typically monoidal (List like) processing using futures. flatMap, is concatMap in Haskell, the bind (>>=) of the list monad.

However this kind of processing can not return the result to the main imperative procedure that called him, since it is not monadic. So it need to use callbacks.

Concerning mobility of the software and functionality, It is more closer to Objectspace Voyager: http://www.inf.fu-berlin.de/lehre/WS99/VS/Misc/Voyager/API/doc/orb.pdf

I dedicate this software to Graham Glass, the genius behind Objectspace Voyager that developed a cloud platform 15 years ago that is still years ahead of anything else.

https://github.com/agocorona/transient

AMDG