Beautiful parallel non-determinism (Transient effects III)

As of March 2020, School of Haskell has been switched to read-only mode.

What?

I will demonstrate how the Transient monad can process many elements and filter them at the same time using guard, like the list monad, but in this case, running parallel threads. The programmer can decide how many threads will execute.

I will also use the same primitives for a parallel search in a filesystem, followin an example on the book "parallel and concurrent Haskell" in the chapter Parallel programming using threads from Simon Marlow. We will see that the high level approach used for non-determinism solve the problem of parallel search with more generality and with less modifications of the straighforward single threaded code.

Finally I will show the relation between logic programming in a language like Prolog or curry with the nondeterminist examples.

The naive parallelization of non deterministic computations may produce potentially a big amount of threads and the granularity may be smaller than the optimal if each element has a light processing that does not justify spawning a thread for it. To control this situation, the Transient monad now incorporates primitives for such purpose. These primitives can be used in any other scenario where granularity of paralelization is a problem.

About Transient

In the hardworking programmer series, I develop a "supercharged" EDSL, based on the Transient monad with many high level effects out of the box. Some are classical effect with a new twist, like early termination or state. Some others are high level, including some ones that languages and operating systems do not bring or bring them at a low level, like implicit threading, event handling or backtracking.

The purpose is to enpower the programmer. I just want to ease the introduction of people to haskell and let them be productive from the beginning. Transient has some uncommon effects that makes it to look like a very high level language. In the first article I argued that these effects will permit to program composable software even in the presence of multithreading and events.

Unlike other EDSLs, the transient effects are not created by aggregating monad transformers, neither using an effect system. It is done manipulating continuations in a new way, a kind of meta-programming.

The repository with the last version of Transient and the examples is at

https://github.com/agocorona/transient

About the Transient Monad

In the transient monad each statement can have access to his own continuation. The monad also carries programmer-defined state. Therefore, it is possible to edit the continuation at run time. It is also possible to notify other statements about some condition by adding information to the state. The best example of this manipulation is the backtracking effect.

Effect primitives in Transient are like the nodes of a living flowchart that could rewire it at run time, to do more than what the original flowchart supposed to do. That kind of metaprogramming is the way the transient statements can create new effects. In category theoretical terms, the effects edit the arrows of the continuations.

This is an unexplored territory. I´m currently researching new possibilities. It is impossible for me to explain the details every step, but I hope that you may catch the power of the concept and forgive me for my bad writing habits. I´m rewriting this from time to time since this is an unfinished work.

from the previous version, I added better and simpler internals, and primitives for thread control threads, oneThread and freeThreads, added for solving parallel indeterminst problems, as well as some primitives for the same purpose: choose and collect.

I will describe them here.

But first, a long disquisition that you may obviate if you are not interested in the details. step over it if you want to go to the "meat"

About Backtracking and Indeterminism

There is a legitimate confusion between backtracking and indeterminism. Backtracking in Transient means to resume execution at a previous statement, but not only that. The execution can go further back until a cut of backtracking is found. Alternatively it can continue forward depending on some primitives used for flow control.

In the other side, Indeterminism, exemplified in the List monad, is the effect that process and return more than one value. However some people understand that the List monad perform a kind of backtracking. Details about this follows:

After publishing the Backtracking example Jello_Raptor asked me:

*Err, Why not just use List as a monad to get the same effect? I mean that's just what using List for recursive descent parsers does, no? If you need state and things, there's always ListT (or one of many implementations thereof).

And I answered this:

I do not know how to use the list monad for backtracking. I know how to use it for indeterminism

This problem (undoing IO actions) is not similar to the one of the example here that I think is what you have in your mind:

refering to this article

In the above article each combination is filtered against the guard condition. Really in that example there is no backtracking, but indeterminism. It is a list monad that compute all the responses "at the same time". Only the ones that pass the guard condition are shown. As always to remove the magic and understand what is going on, the best thing is to desugar everything. In this case, if you change the bind operation by concatMap. You will see that it is ordinary list processing going on there.

Lazy evaluation in fact makes the backtracking simulated by the list monad a bit more real, since the list is created on demand, and the guard forces the evaluation of each element of the list by executing the code within 'choose', but this is no different from how any other monad des it executing lazy statements. That is not privative of the List monad. Laziness assures that every pure computation implement "backtracking in some way", since it forces the execution of unevaluated expressions coming from statements above them. But is the hability to carry many values (indeterminism) of the List monad the effect that permits to simulate the execution of different alternatives.

To summarize, non-determinism can simulate backtracking in some problems like this, where a tree of combinations are explored. A deterministic program would need either execute explicit loops or do true backtracking to solve the problem of the above link, but in the case of my article about backtracking, there is no tree of alternatives to explore. The problem is to undo actions which have undo "annotations" attached. and the list monad can do nothing with it.

The backtracking effect demonstrated in my article is different; While in the case of the list it explores alternatives simulating being restarted from the same point again and again choosing of another alternative each time (it is not that, but it simulates this behavior), in my case the backtracking does his job while it is going back (by undoing or compensating actions) This is beyond what any indeterministic monad can do. However my monad can reverse the execution flow and go forward (with retry) then backward (with undo), then forward etc and can simulate an indeterministic effect if onUndo feed the flow with different values each time. In this case it would be backtracking simulating indeterminism, while the list monad execute indeterminism simulating backtracking. That is closely related, but not the same than exploring a tree deep first versus doing it breath first. Laziness complicates the analogy in the case of the list monad

--- end of response ----

I will not simulate indeterminism doing backtracking, because it is easier to perform indeterminism as such, by mimicking the behaviour of the List monad. But the list monad does not perform parallelization. In addition Transient permits paralelization.

Non-determinism in the List monad

This is the code of the indeterminist example above (that the author call backtracking) using the list monad:


import Control.Monad

solveConstraint :: [(Int,Int)]
solveConstraint = do
  x <- choose [1,2,3]
  y <- choose [4,5,6]
  guard (x*y == 8)
  return (x,y)
  where
  choose = id

main= print solveConstraint

If we execute and print the result of solveConstraint we get

>[(2,4)]

the magic is operated by the List monad and the guard operation. I will not enter in the details. The best way to understand it, as i said above, is to desugar everything until the monad is expressed in terms of pure function application and see that solveConstraint is normal list processing.

MonadPlus and guard

guard need the MonadPlus class. It is the indeterminism class. It is in charge of adding results to a computation. the methods of MonadPlus are mzero, when no result is added, and mplus when a result is added to the previous ones.

In the list monad, MonadPlusis defined as:

instance MonadPlus [] where
  mzero = []
  mplus = (++)

guard is defined polimorphically for all the monads that have the MonadPlus instance as:

guard :: MonadPlus m => Bool -> m ()
guard True  = return ()
guard _ = mzero

look at the desugaring of guard in the MonadPlus document to understand how it filter result in the list monad.

MonadPlus and guard in Transient

Let's do it for the Transient monad. The MonadPlus instance would be:

instance MonadPlus TransientIO where
    mzero= stop
    mplus (Transient x) (Transient y)=  Transient $ do
         mx <- x
         case mx of
             Nothing -> y
             justx -> return justx

stop is the call that interrupt the execution of the monad. It is defined as a Transient statement that return Nothing:

stop= Transient $ return Nothing

mplus executes the first expression, if the first gives Nothing, then it executes the second.

Input a list of values and process them in parallel

We need to introduce a list of values in the monad.

-- | slurp a list of values and process them in parallel . To limit the number of processing
-- threads, use `threads`
choose  ::  [a] -> TransientIO a
choose   xs = do
    evs <- liftIO  $ newMVar xs  
    parallel  $ do
           es <- takeMVar evs
           putMVar evs $ tail es  

           case es of
            [x] -> return $ Left x
                else
            (x:_)  return $ Right x 

choose stores the set of values in a IORef, then parallel extract each value from thar IORef. If there is no more, it stop the computation returning a Left value

choose uses parallel, the basic primitive of transient, that executes his continuation every time that the IO computation return a value. it has changed. Now the signature is simpler:

parallel  ::   IO (Either a a) -> TransientIO a

When the IO computation return a Left value, parallel executes and finish. When it retun a Rigth value, it does the same, but reexecutes the IO computation until a Left value arrives

Well, there is a much more elegant way to do choose:

choose' :: [a] -> TransientIO a
choose'  xs = foldl (<|>) empty $ map (parallel . return . Left) xs

But it is a bit slower, since it uses a number of parallel statements equal to the number of elements, instead of one.

Control of parallelization

parallel launch a new thread. This thread wait for outputs of the IO procedure passed as the parameter. Then it executes the continuation. Everithing that happens after parallel is in this new thread.

Another new thing is freeThreads. It avoids thread control. All the threads created after it control their lifecycle themselves. A non free thread can be killed by the parent process with killchilds

It is possible however to restrict the number of threads. The example above would produce nine threads. One for each element of the first choose. Each one of them launches one thread for each element of the second choose. if there are more elements (N) and more choose's (NC), the number of threads would grow exponentially N^NC. That is not optimal, taking into account also that the processing done by each thread may be very light. In this case, only the processing is just the guard operation. To be effective, one thread must process more than one guard.

To avoid it, threads set the maximum number of threads that may run simultaneously. If the maximum is reached, each new parallel statement does not spawn a thread, but execute his loop within the current thread, until some other thread dies. Then the next parallel will use this free slot.

finally oneThread assures that all the threads generated after this combinator are killed before continuing the execution. In the previous versions, this was done automatically by parallel. This decoupling allows for more flexibility.

oneThread :: TransientIO a -> TransientIO a

List-like processing using the Transient monad

Finally, with the definitions of choose and guard the code in the Transient monad is almost the same than for the list monad. Remember that choose is essentially the primitive parallel feeded with different values:

import Base
import Indeterminism

solveConstraint=  do
      x <- choose  [1,2,3]
      y <- choose  [4,5,6]
      guard $ x * y == 8
      return (x,y)

main= keep $ do
      s <- threads 4 solveConstraint
      liftIO $ print s

keep is a new call that run the transient monad and keep the application running. It also wait for text input.

Using the parallel non-determinist effects in Transient to perform parallel search

The true power of parallel non-determinism can only be extracted when working with large datasets and combined with other effects. Then, non-determinism can magically convert a single-threaded program into a multithreaded program almost without changes.

Let's see the Simon Marlow example in the Book "parallel and Concurrent programming in Haskell". The program, using a single thread, crawl the filesystem from a given directory to all the subdirectories looking for a file. The program return when a file is found.

find :: String -> FilePath -> IO (Maybe FilePath)
find s d = do
  fs <- getDirectoryContents d                         -- 1
  let fs' = sort $ filter (`notElem` [".",".."]) fs    -- 2
  if any (== s) fs'                                    -- 3
     then return (Just (d </> s))
     else loop fs'                                     -- 4
 where
  loop [] = return Nothing                             -- 5
  loop (f:fs)  = do
    let d' = d </> f                                   -- 6
    isdir <- doesDirectoryExist d'                     -- 7
    if isdir
       then do r <- find s d'                          -- 8
               case r of
                 Just _  -> return r                   -- 9
                 Nothing -> loop fs                    -- 10
       else loop fs                                    -- 11

Parallelizing this program is not entirely straightforward because doing it naively could waste a lot of work; if we search multiple subdirectories concurrently and we find the file in one subdirectory, then we would like to stop searching the others as soon as possible. Moreover, if an error is encountered at any point, then we need to propagate the exception correctly. We must be careful to keep the deterministic behavior of the sequential version, too. If we encounter an error while searching a subtree, then the error should not prevent the return of a correct result if the sequential program would have done so.

To use the async library, it is necessary to transform the single threaded program in some contorted ways. Instead, the non-determinist effect with paralelization using the Transient monad as explained above, essentially allows you to parallelize the single-threaded program with few changes.

The transformed program is:

find' :: String -> FilePath -> TransientIO FilePath
find' s d = do
  fs <- liftIO $ getDirectoryContents d                -- 1
  let fs' = sort $ filter (`notElem` [".",".."]) fs    -- 2
  if any (== s) fs'                                    -- 3
     then  return $ d</> s

     else do
       f <- choose fs'                                 -- 4
       let d' = d </> f                                -- 6
       isdir <- liftIO $ doesDirectoryExist d'         -- 7
       if isdir then find' s d'                        -- 8
                else stop        -- not a dir, not the file 
                                 -- looked for, end

Instead of the loop, I use choose, defined above, to carry out the recursive search in the tree of directories.

That's all. This is the whole of it. Well, find' is executed in parallel many times and calls itself. It can return many matches and can execute up to one thread for each choose if not limited. If we run find' "as is", it will return files and it will not stop until all the directory tree is explored. In a big filesystem probably will allocate too much RAM since the number of simultaneous threads is unlimited.

To control the execution, we use collect:

main= keep $  do
    r<- threads 10 . collect 1 $ find' "Main.hs"  ".."
    liftIO $ putStrLn $ "Found= "++ show  r

collect monitors the threads spawned by find' and the number of responses. When the number of responses are the number requested, it stop all the running find'processes. If there is no thread active because the process has finished and there a not enough responses, it return the ones found. In this example the search uses ten threads and we look for one file.

Yo can see that there are separated combinators for each functionality: thread control, monitoring and thread spawning. That allows for more use cases and more flexible programs with a few of these combinators.

Running examples

All the examples above and some more can be run below. You must choose the option "nondet". Then you see a submenu with all the examples.

To see the effect of restricting the number of threads, one of the samples, the thread sample, display the result together with the identifier of the thread that generated it. You can input the max number of simultaneous threads and see how the order of the responses and the thread identifiers change.

I also included all the examples of the previous articles, that demonstrates backtracking for undoing IO actions, parallelism, composability, event handling, async-like/futures like/map-reduce-like processing:


{-# START_FILE Main.hs #-}

{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE DeriveDataTypeable #-}

module Main where

import           Data.Typeable
import           Base
import           Backtrack
import           Indeterminism
import           Control.Applicative
import           Control.Concurrent
import           Control.Exception
import           Control.Monad.State
import           Data.Monoid
import           System.IO.Unsafe
import           System.Directory
import           System.FilePath
import           Network.HTTP
import           qualified Data.Map as M
import           Network
import           System.IO
import           Data.IORef
import Data.List hiding (find,map, group)

-- show

solveConstraint=  do
      x <- choose  [1,2,3]
      y <- choose  [4,5,6]

      guard $ x * y == 8

      return (x,y)

pythags = do
  x <- choose [1..50]
  y <- choose ([1..x] :: [Int])
  z <- choose [1..round $ sqrt(fromIntegral $ 2*x*x)]

  guard (x*x+y*y==z*z)

  return (x, y,z)

example1= do
    option "ex1" "example 1"
    r <- threads 4 solveConstraint
    liftIO $ print r

example2= do
    option "pyt" "pythagoras"
    r<- threads 1 pythags
    liftIO $ print r

groupSample= threads 4 $ do
    option "coll" "group sample: return results in a list"
    r <- group 9 $ do
      x <- choose  [1,2,3]
      y <- choose  [4,5,6]
      return (x,y)

    liftIO $ print r

threadSample= freeThreads $ do
     option "th" "threads sample"
     liftIO $ print "number of threads? (< 10)"

     n <- input  ( < 10)

     threads n $ do
        x <- choose  [1,2,3]
        y <- choose  [4,5,6]

        -- added some delay to show tread interleaving better in Linux
        th <- liftIO $ threadDelay 100000 >> myThreadId

        liftIO $ print (x,y,th)

nonDeterminsm= do
      option "nondet" "Non determinism examples"
      example1 <|> example2
               <|> groupSample
               <|> threadSample
               <|> fileSearch



find' :: String -> FilePath -> TransientIO FilePath
find' s d = do
  fs <- liftIO $ getDirectoryContents d
       `catch` \(e:: SomeException) -> return []       -- 1
  let fs' = sort $ filter (`notElem` [".",".."]) fs    -- 2
  if any (== s) fs'                                    -- 3
     then return $ d</> s
     else do
       f <- choose fs'                                 -- 4
       let d' = d </> f                                -- 6
       isdir <- liftIO $ doesDirectoryExist d'         -- 7
       if isdir then find' s d'                        -- 8
                else stop


------------------

fileSearch=   do
    option "file" "example of file search"
    r<- threads 3 $ collect 1 $ find' "Main.hs"  ".."
    liftIO $ putStrLn $ "Found= "++ show  r



main= keep $ do
      oneThread $ option "main" "to return to the main menu"   <|> return ""
      liftIO $ putStrLn "MAIN MENU"

      nonDeterminsm <|> trans <|>
             colors <|> app   <|>
            futures <|> server

-- / show

trans= do
       option "trans" "transaction examples with backtracking for undoing actions"
       transaction <|> transaction2

transaction=   do
       option "back" "backtracking test"
       productNavigation
       reserve
       payment

transaction2= do
       option "back2" "backtracking test 2"
       productNavigation
       reserveAndSendMsg
       payment


       liftIO $ print "done!"


productNavigation = liftIO $ putStrLn "product navigation"

reserve= liftIO (putStrLn "product reserved,added to cart")
                 `onUndo` liftIO (putStrLn "product un-reserved")

payment = do
           liftIO $ putStrLn "Payment failed"
           undo

reserveAndSendMsg= do
            reserve
            liftIO  (putStrLn "update other database necesary for the reservation")
                 `onUndo` liftIO (putStrLn "database update undone")



colors :: TransientIO ()
colors= do
       option "colors" "choose between three colors"
       r <-  color 1  "red"  <|> color 2 "green" <|> color 3 "blue"
       liftIO $ print r
       where
       color :: Int -> String -> TransientIO String
       color n str= do
         option (show n) str
         liftIO . print $ str ++ " color"
         return  str

app :: TransientIO ()
app= do
       option "app" "applicative expression that return a counter in 2-tuples every second"
       liftIO $ putStrLn "to stop the sequence, write main(enter)"
       counter <- liftIO $ newMVar 0

       r <-  (,) <$>  number  counter 1 <*> number counter 1


       liftIO $ putStrLn $ "result=" ++ show r
       where
       number counter n= waitEvents $ do
          threadDelay $ n * 1000000
          n <- takeMVar counter
          putMVar counter (n+1)
          return n

futures= do
       option "async" "for parallelization of IO actions with applicative and monioidal combinators"
       sum1 <|> sum2

sum1 :: TransientIO ()
sum1= do
       option "sum1" "access to two web pages concurrently and sum the number of words using Applicative"
       liftIO $ print " downloading data..."
       (r,r') <- (,) <$> async  (worker "http://www.haskell.org/")
                     <*> async  (worker "http://www.google.com/")

       liftIO $ putStrLn $ "result="  ++ show  (r + r')

getURL= simpleHTTP . getRequest

worker :: String -> IO Int
worker url=do
      r <- getURL url
      body <- getResponseBody r
      putStrLn $ "number of words in " ++ url ++" is: " ++ show(length (words body))
      return . length . words $ body

sum2 :: TransientIO ()
sum2= do
      option "sum2" "access to N web pages concurrenty and sum the number of words using map-fold"
      liftIO $ print " downloading data..."
      rs <- foldl (<>) (return 0) $ map (async . worker)
                  [ "http://www.haskell.org/"
                  , "http://www.google.com/"]

      liftIO $ putStrLn $ "result="  ++ show rs

instance Monoid Int where
      mappend= (+)
      mempty= 0

server :: TransientIO ()
server=  do
       option "server" "A web server in the port 8080"
       liftIO $ print "Server Stated"
       sock <-  liftIO $  listenOn $ PortNumber 8080
       (h,_,_) <- spawn $ accept sock
       liftIO $ do
           hPutStr h msg
           putStrLn "new request"
           hFlush h
           hClose h
         `catch` (\(e::SomeException) -> sClose sock)

msg = "HTTP/1.0 200 OK\r\nContent-Length: 5\r\n\r\nPong!\r\n"



{-# START_FILE Backtrack.hs #-}
-- show
-- /show
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}

{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
module Backtrack (registerUndo, onUndo, undo, retry, undoCut) where

import Base
import Data.Typeable
import Control.Applicative
import Control.Monad.State
import Unsafe.Coerce
import System.Mem.StableName

data Backtrack= forall a b.Backtrack{backtracking :: Bool
                                    ,backStack :: [EventF]}
                                    deriving Typeable

-- | assures that backtracking will not go further
undoCut :: TransientIO ()
undoCut= Transient $ do
     delSessionData $ Backtrack False []
     return $ Just ()

-- | the secod parameter will be executed when backtracking 
{-# NOINLINE onUndo #-}
onUndo :: TransientIO a -> TransientIO a -> TransientIO a
onUndo ac bac= do
   r<-registerUndo $ Transient $ do 
     Backtrack back _ <- getSessionData `onNothing` return (Backtrack False [])
     runTrans $ if back then bac  else ac 
   return r
   
-- | register an actions that will be executed when backtracking
{-# NOINLINE registerUndo #-}
registerUndo :: TransientIO a -> TransientIO a
registerUndo f  = Transient $ do
   cont@(EventF x _ _ _ _ _ _ _ _)  <- get   !> "backregister"
   md  <- getSessionData 
   ss <- case md of
        Just (bss@(Backtrack b (bs@((EventF x'  _ _ _ _ _ _ _ _):_)))) -> do
            addrx  <- addr x
            addrx' <- addr x'         -- to avoid duplicate backtracking points
            return $ if addrx == addrx' then bss else  Backtrack b $ cont:bs
        Nothing ->  return $ Backtrack False [cont]
   setSessionData ss
   runTrans f
   where
   addr x = liftIO $ return . hashStableName =<< (makeStableName $! x)

-- | restart the flow forward from this point on
retry :: TransientIO ()
retry= do
    Backtrack _ stack <- getSessionData `onNothing` return (Backtrack False [])
    setSData $ Backtrack False stack

-- | execute backtracking. It execute the registered actions in reverse order. 
--
-- If the backtracking flag is changed the flow proceed  forward from that point on. 
--
--If the backtrack stack is finished or undoCut executed, `undo` will stop.
undo :: TransientIO a
undo= Transient $ do
  bs <- getSessionData  `onNothing` return nullBack            !>"GOBACK"
  goBackt  bs

  where
  nullBack= Backtrack False []
  goBackt (Backtrack _ [])= return Nothing                     !> "END"
  goBackt (Backtrack b (stack@(first@(EventF x fs _ _  _ _ _ _ _): bs)))= do
--        put first{replay=True} 
        setSData $ Backtrack True stack
        mr <-  runClosure first                                !> "RUNCLOSURE"
        Backtrack back _ <- getSessionData `onNothing` return nullBack 
                                                               !>"END RUNCLOSURE"
        case back of
           True ->  goBackt $ Backtrack True bs                !> "BACK AGAIN"
           False -> case mr of
                   Nothing -> return empty                     !> "FORWARD END"
                   Just x ->  runContinuation first x          !> "FORWARD EXEC"



{-# START_FILE Indeterminism.hs #-}
-- show
-- /show
-----------------------------------------------------------------------------
--
-- Module      :  Transient.Indeterminism
-- Copyright   :
-- License     :  GPL (Just (Version {versionBranch = [3], versionTags = []}))
--
-- Maintainer  :  [email protected]
-- Stability   :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE BangPatterns, DeriveDataTypeable #-}
module Indeterminism (
choose, choose', collect, group --, found
) where

import Base
import Control.Monad.IO.Class
import Data.IORef
import Control.Applicative
import Data.Monoid
import Control.Concurrent
import Data.Typeable
import Control.Monad.State
import Control.Concurrent.STM as STM
import GHC.Conc


-- | slurp a list of values and process them in parallel . To limit the number of processing
-- threads, use `threads`
choose  ::  [a] -> TransientIO a
choose []= empty
choose   xs = do
    evs <- liftIO  $ newIORef xs
    parallel   $ do
           es <- atomicModifyIORef' evs $ \es -> let !tes= tail es in (tes,es)
           case es of
            [x]  -> return $ Left $ head es
            x:_  -> return $ Right x

-- | group the output of a possible mmultithreaded process in groups of n elements.
group :: Int -> TransientIO a -> TransientIO [a]
group num proc =  do
    v <- liftIO $ newIORef (0,[])
    x <- proc
    n <- liftIO $ atomicModifyIORef' v $ \(n,xs) -> let !n'=n +1 in ((n', x:xs),n')
    if n < num
      then stop
      else do
       liftIO $ atomicModifyIORef v $ \(n,xs) ->  ((0,[]),xs)

choose' :: [a] -> TransientIO a
choose'  xs = foldl (<|>) empty $ map (parallel . return . Left) xs


--newtype Collect a= Collect (MVar (Int, [a])) deriving Typeable

-- collect the results of a search done in parallel, usually initiated by
-- `choose` . The results are added to the collection with `found`
--
--

(**>) x y=   Transient $do
       runTrans x
       runTrans y

(<**) x y= Transient $ do
       r <- runTrans x
       runTrans y
       return r

-- execute a process and get the first n solutions.
-- if the process end without finding the number of solutions requested, it return the fond ones
-- if he find the number of solutions requested, it kill the threads of the process and return
-- It works monitoring the solutions found and the number of active threads.
collect :: Typeable a => Int -> TransientIO a -> TransientIO [a]
collect n search=  do
  rv <- liftIO $ atomically $ newTVar (0,[]) !> "NEWMVAR"
  endflag <- liftIO $ newTVarIO False
  st <- get
  let any1 = do
        r <- search   !> "ANY"
        liftIO $ atomically $ do
            (n1,rs) <- readTVar rv
            writeTVar  rv (n1+1,r:rs) !> "MODIFY"
        stop

      detect= do
        stnow <- get
        freeThreads $ async $ do
             threadDelay 1000 -- to allow the spawning of worker threads
             xs <- atomically $ do
                (n',xs) <- (readTVar rv ) !> "read"
                ns <- readTVar $ children st
--                unsafeIOToSTM $ putStrLn $ "LEN="++show (length ns)++ " "++ show n'++ " "++ show n

                if (n' >= n) || (length ns == 1)
                  then return xs
                  else retry

             th <- myThreadId !> "KILL"
             free th stnow
             killChildren st
             addThread st stnow
             return  xs

  any1  **> detect




{-# START_FILE Base.hs #-}
-- show
-- /show
{-# LANGUAGE ScopedTypeVariables #-}
-----------------------------------------------------------------------------
--
-- Module      :  Base
-- Copyright   :
-- License     :  GPL (Just (Version {versionBranch = [3], versionTags = []}))
--
-- Maintainer  :  [email protected]
-- Stability   :
-- Portability :
--
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE DeriveDataTypeable        #-}
-- show
module Base  where
-- /show

import           Control.Applicative
import           Control.Monad.State
import           Data.Dynamic
import qualified Data.Map               as M
import           Data.Monoid
import           Debug.Trace
import           System.IO.Unsafe
import           Unsafe.Coerce
import           Control.Exception
import           Control.Concurrent
import           Control.Concurrent.STM
import           System.Mem.StableName
import           Data.Maybe
import           GHC.Conc
import           Data.List
import           Data.IORef

(!>) =const . id -- flip trace
infixr 0 !>

data Transient m x= Transient  {runTrans :: m (Maybe x)}
type SData= ()

type EventId= Int



data EventF  = forall a b . EventF{xcomp       :: TransientIO a
                                  ,fcomp       :: [b -> TransientIO b]
                                  ,mfData      :: M.Map TypeRep SData
                                  ,mfSequence  :: Int
                                  ,threadId    :: ThreadId
                                  ,freeTh      :: Bool
                                  ,parent      :: Maybe EventF
                                  ,children    :: TVar[EventF]
                                  ,maxThread     :: Maybe (P Int)
                                  }
                                  deriving Typeable

type P= IORef
newp= newIORef


--(=:) :: P a  -> (a -> a) -> IO()
(=:) n f= liftIO $ atomicModifyIORef' n $  \v ->  ((f v),())

addr x= show $ unsafePerformIO $ do
       st <- makeStableName $! x
       return $ hashStableName st

waitQSemB sem= atomicModifyIORef' sem $ \n -> if n > 0 then(n-1,True) else (n,False)
signalQSemB sem= atomicModifyIORef' sem  $ \n ->  (n + 1,())

-- | set the maximun number of threads for a procedure. It is useful for the
threads :: Int -> TransientIO a -> TransientIO a
threads n proc= Transient $do
   msem <- gets maxThread
   sem <- liftIO $ newIORef n
   modify $ \s -> s{maxThread= Just sem}
   r <- runTrans proc
   modify $ \s -> s{maxThread = msem} -- restore it
   return r


instance MonadState EventF  TransientIO where
  get=  Transient $ get >>= return . Just
  put x= Transient $ put x >> return (Just ())

type StateIO= StateT EventF  IO

type TransientIO= Transient StateIO

--runTrans ::  TransientIO x -> StateT EventF  IO (Maybe x)
--runTrans (Transient mx) = mx


runTransient :: TransientIO x -> IO (Maybe x, EventF)
runTransient t= do

  th <- myThreadId
  let eventf0=  EventF  empty [] M.empty 0
          th False  Nothing  (unsafePerformIO $ newTVarIO []) Nothing
   

  runStateT (runTrans t) eventf0{threadId=th} !> "MAIN="++show th


setEventCont ::   TransientIO a -> (a -> TransientIO b) -> StateIO ()
setEventCont x f  = do
   st@(EventF   _ fs d n  r applic  ch rc bs)  <- get
--   if applic
--    then return () else
   put $ EventF x ( unsafeCoerce f : fs) d n  r applic  ch rc bs



resetEventCont :: Maybe a -> StateIO ()
resetEventCont mx =do
   st@(EventF _ fs d n  r nr  ch rc bs)  <- get
--   if nr
--    then return ()
--    else do
   let f= \mx ->  case mx of
                       Nothing -> empty
                       Just x  -> (unsafeCoerce $ head fs)  x
   put $ EventF  (f mx) ( tailsafe fs)d n  r nr  ch rc bs
   where
   tailsafe []=[]
   tailsafe (x:xs)= xs



getCont ::(MonadState EventF  m) => m EventF
getCont = get


runCont :: EventF -> StateIO ()
runCont (EventF  x fs _ _  _ _  _ _ _)= runTrans ((unsafeCoerce x') >>= compose ( fs)) >> return ()
    where
    x'=  do
--           modify $ \s -> s{replay=True}
           r<- x
--           modify $ \s -> s{replay=False}
           return r

{-
runCont cont= do
     mr <- runClosure cont
     case mr of
         Nothing -> return Nothing
         Just r -> runContinuation cont r
-}

compose []= const empty
compose (f: fs)= \x -> f x >>= compose fs



runClosure :: EventF -> StateIO (Maybe a)
runClosure (EventF x _ _ _ _ _ _ _ _) =  unsafeCoerce $ runTrans x

runContinuation ::  EventF -> a -> StateIO (Maybe b)
runContinuation (EventF _ fs _ _ _ _  _ _ _) x= runTrans $  (unsafeCoerce $ compose $  fs) x

instance   Functor TransientIO where
  fmap f mx=   -- Transient $ fmap (fmap f) $ runTrans mx
    do
     x <- mx
     return $ f x

instance Applicative TransientIO where
  pure a  = Transient . return $ Just a


  f <*> g = Transient $ do

         rf <- liftIO $ newIORef Nothing
         rg <- liftIO $ newIORef Nothing   -- !> "NEWIOREF"

         cont@(EventF _ fs a b c d peers children g1) <- get   -- !> "APLICATIVE DOIT"

         let
             appg x = Transient $  do
                   liftIO $ writeIORef rg $ Just x :: StateIO ()
                   k <- liftIO $ readIORef rf

                   return $ k <*> Just x  -- !> "RETURNED: " ++ show(isJust k)++ show(isJust x)


             appf k = Transient $  do
                   liftIO $ writeIORef rf  $ Just k :: StateIO ()
                   x<- liftIO $ readIORef rg

                   return $ Just k <*> x  --  !> "RETURNED: " ++ show(isJust k)++ show(isJust x)



         put $ EventF f (unsafeCoerce appf:  fs)
                                          a b c d peers children g1
         k <- runTrans f
         liftIO $ writeIORef rf  k -- :: StateIO ()

         put $ EventF g (unsafeCoerce appg :  fs)
                                          a b c d peers  children g1
         x <- runTrans g
         liftIO $ writeIORef rg  x

         return $ k <*> x


instance  Alternative TransientIO where
  empty= Transient $ return  Nothing
  Transient f <|> Transient g= Transient $ do
         k <-   f
         x <-   g
         return $ k <|> x

-- | delete all the previous childs generated by the expressions and continue execution
-- of the current thread.
oneThread :: TransientIO a -> TransientIO a
oneThread comp=  do
   chs <- liftIO $ newTVarIO []
   r <- comp
   modify $ \ s -> s{children= chs}
   killchilds
   return r

-- | internal. use `oneThread`
killchilds :: TransientIO()
killchilds= Transient $  do
   cont <- get
   liftIO $  killChildren cont
   return $ Just ()

-- | The threads generated in the process passed as parameter will not be killed.
freeThreads :: TransientIO a -> TransientIO a
freeThreads proc= Transient $ do
     st <- get
     put st{freeTh= True}
     r <- runTrans proc
     modify $ \st -> st{freeTh= freeTh st}
     return r

-- | The threads will be killed when the parent thread dies
hookedThreads proc= Transient $ do
     st <- get
     put st{freeTh= False}
     r <- runTrans proc
     modify $ \st -> st{freeTh= freeTh st}
     return r

instance MonadPlus TransientIO where
    mzero= stop
    mplus (Transient x) (Transient y)=  Transient $ do
         mx <- x
         case mx of
             Nothing -> y
             justx -> return justx

-- | a sinonym of empty that can be used in a monadic expression. it stop the
-- computation
stop :: TransientIO a
stop= Control.Applicative.empty

instance Monoid a => Monoid (TransientIO a) where
  mappend x y = mappend <$> x <*> y
  mempty= return mempty

instance Monad TransientIO where
      return x = Transient $ return $ Just x
      x >>= f  = Transient $ do
        cont <- setEventCont x  f

        mk <- runTrans x
        resetEventCont mk
        case mk of
           Just k  -> do
               runTrans $ f k

           Nothing -> return Nothing



instance MonadTrans (Transient ) where
  lift mx = Transient $ mx >>= return . Just

instance MonadIO TransientIO where
  liftIO = lift . liftIO --     let x= liftIO io in x `seq` lift x



-- | Get the session data of the desired type if there is any.
getSessionData ::  (MonadState EventF m,Typeable a) =>  m (Maybe a)
getSessionData =  resp where
 resp= gets mfData >>= \list  ->
    case M.lookup ( typeOf $ typeResp resp ) list of
      Just x  -> return . Just $ unsafeCoerce x
      Nothing -> return $ Nothing
 typeResp :: m (Maybe x) -> x
 typeResp= undefined

-- | getSessionData specialized for the View monad. if Nothing, the
-- monadic computation does not continue. getSData is a widget that does
-- not validate when there is no data of that type in the session.
getSData :: MonadState EventF m => Typeable a =>Transient m  a
getSData= Transient getSessionData


-- | setSessionData ::  (StateType m ~ MFlowState, Typeable a) => a -> m ()
setSessionData  x=
  modify $ \st -> st{mfData= M.insert  (typeOf x ) (unsafeCoerce x) (mfData st)}

-- | a shorter name for setSessionData
setSData ::  ( MonadState EventF m,Typeable a) => a -> m ()
setSData= setSessionData

delSessionData x=
  modify $ \st -> st{mfData= M.delete (typeOf x ) (mfData st)}

delSData :: ( MonadState EventF m,Typeable a) => a -> m ()
delSData= delSessionData

withSData ::  ( MonadState EventF m,Typeable a) => (Maybe a -> a) -> m ()
withSData f= modify $ \st -> st{mfData=
    let dat = mfData st
        mx= M.lookup typeofx dat
        mx'= case mx of Nothing -> Nothing; Just x -> unsafeCoerce x
        fx=  f mx'
        typeofx= typeOf $ typeoff f
    in  M.insert typeofx  (unsafeCoerce fx) dat}
    where
    typeoff :: (Maybe a -> a) -> a
    typeoff = undefined
----

genNewId :: MonadIO m => MonadState EventF m =>  m Int
genNewId=  do
          st <- get

          let n= mfSequence st
          put $ st{mfSequence= n+1}
          return n

refSequence :: IORef Int
refSequence= unsafePerformIO $ newp 0



data Loop= Once | Loop | Multithread deriving Eq

waitEvents ::   IO b -> TransientIO b
waitEvents io= do
   r <- parallel (Right <$> io)
   killchilds
   return r


async  ::  IO b -> TransientIO b
async io= do
   r <- parallel  (Left <$>io)
   killchilds
   return r

spawn ::  IO b -> TransientIO b
spawn io= freeThreads $ do
   r <- parallel (Right <$>io)
   return r

data EventValue= EventValue SData deriving Typeable

parallel  ::    IO (Either b b) -> TransientIO b
parallel  ioaction= Transient$   do
        cont <- getCont                    -- !> "PARALLEL"
        mv <- getSessionData
        case mv  of
         Just (EventValue v)  -> do
            delSessionData $ EventValue () -- !> "ISJUST "
            return  $ Just $ unsafeCoerce v
         Nothing -> do
            liftIO  $ loop cont    ioaction
            return Nothing




loop (cont'@(EventF x fs a b c d peers childs g))  rec  =  do
  chs <- liftIO $ newTVarIO []
  let cont = EventF x fs a b c d (Just cont') chs g
      iocont dat=
          runStateT ( do
             setSessionData . EventValue $ unsafeCoerce dat
             runCont cont
             ) cont
             >> return ()
      loop'= do
        mdat <- rec
        case mdat of
         Left dat -> iocont dat

         Right dat -> do
              forkMaybe  $ iocont dat
              loop'

      forkMaybe  proc = do
         dofork <- case maxThread cont of
                  Nothing -> return True
                  Just sem -> do
                    dofork <- waitQSemB sem
                    if dofork then  return True else return False

         if dofork
            then  do
                 th <- forkFinally proc $ \me -> do
                         case me of -- !> "THREAD ENDED" of
                          Left  e -> do
                           when (fromException e /= Just ThreadKilled)$ liftIO $ print e
                           killChildren  cont  !> "KILL RECEIVED" ++ (show $ unsafePerformIO myThreadId)

                          Right _ -> do
                             --  if parent is alive
                             --  then remove himself from the list (with free)
                             --  and pass his active children to his parent
                             return ()
                             th <- myThreadId
                             mparent <- free th cont
                             case mparent of
                              Nothing  ->  return()
                              Just parent -> atomically $ do
                                     chs' <- readTVar $ children cont
                                     chs  <- (readTVar $ children parent) 
                                     writeTVar (children parent)$ chs ++ chs'
                                     return ()

                         case maxThread cont of
                           Just sem -> signalQSemB sem
                           Nothing -> return ()


                 addThread cont' cont{threadId=th}  -- !>  "thread created: "++ show th

            else proc  -- !> "NO THREAD"

  forkMaybe  loop'


free th env= do
  if isNothing $ parent env
   then  return Nothing  !>  show th ++ " orphan"
   else do
    let msibling= fmap children $ parent env
    
    case msibling of
     Nothing -> return Nothing
     Just sibling  -> do
       found <- atomically $ do
                sbs <- readTVar sibling  
                let (sbs', found) = drop [] th  sbs !> "search "++show th ++ " in " ++ show (map threadId sbs)
                when found $ writeTVar sibling sbs'
                return found
       if (not found && isJust (parent env)) 
         then free th $ fromJust $ parent env !> "toparent"
         else return $ Just env

   where
   drop processed th []= (processed,False)
   drop processed th (ev:evts)| th == threadId ev= (processed ++ evts, True)
                    | otherwise= drop (ev:processed) th evts


addThread parent child = when(not $ freeTh parent) $ do
   let headpths= children parent
   atomically $ do
       ths <- readTVar headpths
       writeTVar headpths $  child:ths

killChildren  cont = do
     forkIO $ do
        let childs= children cont --  !> "killChildren list= "++ addr (children cont)
        ths <- atomically $ do
           ths <- readTVar childs
           writeTVar childs []
           return ths
        mapM_ (killThread . threadId) ths  !> "KILLEVENT " ++ show (map threadId ths)
     return ()


type EventSetter eventdata response= (eventdata ->  IO response) -> IO ()
type ToReturn  response=  IO response
react
  :: Typeable eventdata
  => EventSetter eventdata response
  -> ToReturn  response
  -> TransientIO eventdata

react setHandler iob= Transient $ do
        cont    <- getCont
        mEvData <- getSessionData
        case mEvData of
          Nothing -> do
            liftIO $ setHandler $ \dat ->do
--              let cont'= cont{mfData = M.insert (typeOf dat)(unsafeCoerce dat) (mfData cont)}
              runStateT (setSData dat >> runCont cont) cont
              iob
            return Nothing
          Just dat -> delSessionData dat >> return (Just  dat)



getLineRef= unsafePerformIO $ newTVarIO Nothing


option1 x  message= do

 waitEvents  $ do
     liftIO $ putStrLn $ message++"("++show x++")"
     th <-  myThreadId
     atomically $ do
       mr <- readTVar getLineRef

       case mr of
         Nothing -> retry
         Just r ->
            case reads1 r of  -- !> ("received " ++  show r ++  show th) of
            (s,_):_ -> if  s == x  -- !> ("waiting" ++ show x)
                     then do
                       writeTVar  getLineRef Nothing -- !>"match"
                       return s

                     else retry
            _ -> retry
     where
     reads1 s=x where
      x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
      typeOfr :: [(a,String)] ->  a
      typeOfr  = undefined


roption= unsafePerformIO $ newMVar []

-- | install a event receiver that wait for a string and trigger the continuation when this string arrives.
option :: (Typeable b, Show b, Read b, Eq b) =>
     b -> [Char] -> TransientIO b
option ret message= do
    let sret= show ret

    liftIO $ putStrLn $ "Enter "++sret++"\tto: " ++ message
    liftIO $ modifyMVar_ roption $ \msgs-> return $ sret:msgs
    waitEvents  $ getLine' (==ret)
    liftIO $ putStrLn $ show ret ++ " chosen"
    return ret


-- | validates an input entered in the keyboard in non blocking mode. non blocking means that
-- the user can enter also anything else to activate other option
-- unlike `option`, input only wait for one valid response
input :: (Typeable a, Read a) => (a -> Bool) -> TransientIO a
input cond= Transient . liftIO . atomically $ do
       mr <- readTVar getLineRef
       case mr of
         Nothing -> retry
         Just r ->
            case reads1 r  of
            (s,_):_ -> if cond s  --  !> show (cond s)
                     then do
                       writeTVar  getLineRef Nothing -- !>"match"
                       return $ Just s

                     else return Nothing
            _ -> return Nothing


getLine' cond=    do
     atomically $ do
       mr <- readTVar getLineRef
       case mr of
         Nothing -> retry
         Just r ->
            case reads1 r of --  !> ("received " ++  show r ++ show (unsafePerformIO myThreadId)) of
            (s,_):_ -> if cond s -- !> show (cond s)
                     then do
                       writeTVar  getLineRef Nothing -- !>"match"
                       return s

                     else retry
            _ -> retry

reads1 s=x where
      x= if typeOf(typeOfr x) == typeOf "" then unsafeCoerce[(s,"")] else readsPrec 0 s
      typeOfr :: [(a,String)] ->  a
      typeOfr  = undefined

inputLoop=  do
    putStrLn "Press end to exit"
    inputLoop'  -- !> "started inputLoop"
    where
        inputLoop'= do
           r<- getLine
           if r=="end" then putMVar rexit () else do
              atomically . writeTVar  getLineRef $ Just r
              inputLoop'


rexit= unsafePerformIO newEmptyMVar

stay=  takeMVar rexit

keep mx = do
   forkIO $ inputLoop
   forkIO $ runTransient mx  >> return ()
   stay

exit :: TransientIO a
exit= do
  liftIO $ putStrLn "Tempus fugit: exit"
  liftIO $ putMVar rexit   True
  return undefined

onNothing iox iox'= do
       mx <- iox
       case mx of
           Just x -> return x
           Nothing -> iox'


collect can also be used in context of map-reduce.

To see how the transient EDSL compose programs and how the internals of the EDSL work, see the first article

Constraint Logic programming

In the paper "transforming functional logic programs into monadic functional programs" the authors propose a generic solution that translate the logic expression to any monad with the MonadPlus instance if the logic variables (free variables whose value is unknown) are substituted by generators. Each value generated in each iteration must be shared in the expression being evaluated.

So once we define a generator class, Transient can perform logic programming for all the instances of this class. It is better to show it with an example:

To create a generator it is enough to use choose defined above, applied to a list of possible values of the data type.

Using choose we can create the class Free:

class  Free m a where
    free :: m [a]


instance Free TransientIO Bool where
    free= choose[True,False]

instance Free TransientIO Int where
    free= choose[0..]  -- exclude negatives
...

So this Transient routine


pairs :: TransientIO Int
pairs= do
   x <- free
   y <- free
   guard (x/y == 4)
   return (x,y)

or expressed with monad comprehension:

pairs=[(x,y)| x/y == 4, x <- free, y <- free]

Is a constraint logic program quite similar in syntax to the syntax in a functional logic program such is curry with constraints:

pairs= (x,y) where x /y == 4, x free, y free

But also all the rest of the above programs are constraint logic programs too, since choose is a way to express constraints on the possible values of the logic variables.

I guess that there is a way to program classical examples of logic programming with logic clauses besides constraints. This example designe for the list monad I suppose that is straighforward to translate to Transient.

Conclusions and future work

I used Transient here for some typical toy problems of non-determinism, but also I have show how for some realistic IT problems the non-determinism built-in in the Transient monad can generate elegant multithreaded solutions with the same list monad syntax, while doing the same without this monad requires complex code transformations.

The advantage of using Transient for problems with multiple solutions is that the evaluation model of Transient is multithreaded so it can make use of all the machine cores for the exploration of the possible answers.

And, perhaps more important, the Transient monad is unique for all the effects: It is not necessary to switch from a monad to other with complicated monadic stacks to use multithreading, events, non-determinism, transactions or distributed computing (soon) among others. All the effects can be mixed freely. The code is as simple and clean as it could be.

parallel, the basic primitive of Transient is inherently non-deterministic, since it implicitly uses multithreading and execute the continuation for each event in a new thread. But non-determinism is typically represented by the List monad. Here I demonstrated how Transient can process in parallel and filter many results using the semantics of the List monad with simple but effective and high level threading control.

I have used the parallel non-determinism of Transient to easily paralelize a search program with very little changes and at a higher level programming, more close to the programmer intuitions. In fact it does not need more but less code!. This shows how non-determinism can be in Transient to solve practical problems while maintaining a high-level and soundness of the solution and thus increase the maintainability of the software.

Since all of this machinery can be mixed freely with other effects, there is nothing that I may know with the generality and seamlessness of Transient for parallel processing.

guard and collect can also be used in map-reduce scenarios.

For greater flexibility, thread control has been decoupled from parallel. Now paralell does not perform thread control. there are explicit primitives for it.

Continuations are stored in the state with type erasure. This makes the edition of them difficult since the type system does not help. Using Dynamic would restrict the intermediate result to the Typeable ones and it would slow down execution. Moreover that would only produce errors at runtime, not at compile time. Perhaps some trick can aleviate these problems. Definitively something better than the hacky edition of state is necessary.

Transient will include more effects: thread state persistence in files and source emit are necessary for the integration of MFlow and hplayground as a single "isomorphic" web framework.

Thread state persistence is the effect now provided by the Workflow package. Source emit is the solution for the serialization of code and the execution in another node, including a web browser. It is not clear that it can be feasible.

A cloud effect for the distribution of computing resources, where the transient monad does the distribution automatically depending on the load of each node and his capabilities, including browsers, may be possible in the medium term.

AMDG