Smarter termination for thread racing

I realized in the shower this morning that there’s a serious flaw in my unamb implementation as described in Functional concurrency with unambiguous choice. Here’s the code for racing two computations:

race :: IO a -> IO a -> IO a
a `race` b = do v  <- newEmptyMVar
                ta <- forkPut a v
                tb <- forkPut b v
                x  <- takeMVar  v
                killThread ta
                killThread tb
                return x

forkPut :: IO a -> MVar a -> IO ThreadId
forkPut act v = forkIO ((act >>= putMVar v) `catch` uhandler `catch` bhandler)
 where
   uhandler (ErrorCall "Prelude.undefined") = return ()
   uhandler err                             = throw err
   bhandler BlockedOnDeadMVar               = return ()

The problem is that each of the threads ta and tb may have spawned other threads, directly or indirectly. When I kill them, they don’t get a chance to kill their sub-threads. If the parent thread does get killed, it will most likely happen during the takeMVar.

My first thought was to use some form of garbage collection of threads, perhaps akin to Henry Baker’s paper The Incremental Garbage Collection of Processes. As with memory GC, dropping one consumer would sometimes result is cascading de-allocations. That cascade is missing from my implementation above.

Or maybe there’s a simple and dependable manual solution, enhancing the method above.

I posted a note asking for ideas, and got the following suggestion from Peter Verswyvelen:

I thought that killing a thread was basically done by throwing a ThreadKilled exception using throwTo. Can’t these exception be caught?

In C#/F# I usually use a similar technique: catch the exception that kills the thread, and perform cleanup.

Playing with Peter’s suggestion works out very nicely, as described in this post.

There is a function that takes a clean-up action to be executed even if the main computation is killed:

finally :: IO a -> IO b -> IO a

Using this function, the race definition becomes a little shorter and more descriptive:

a `race` b = do v  <- newEmptyMVar
                ta <- forkPut a v
                tb <- forkPut b v
                takeMVar v `finally`
                  (killThread ta >> killThread tb)

This code is vulnerable to being killed after the first forkPut and before the second one, which would then leave the first thread running. The following variation is a bit safer:

a `race` b = do v  <- newEmptyMVar
                ta <- forkPut a v
                (do tb <- forkPut b v
                    takeMVar v `finally` killThread tb)
                 `finally` killThread ta

Though I guess it’s still possible for the thread to get killed after the first fork and before the next statement begins. Also, this code difficult to write and read. The general pattern here is to fork a thread, do something else, and kill the thread. Give that pattern a name:

forking :: IO () -> IO b -> IO b
forking act k = do tid <- forkIO act
                   k `finally` killThread tid

The post-fork action in both cases is to execute another action (a or b) and put the result into the mvar v. Removing the forkIO from forkPut, leaves putCatch:

putCatch :: IO a -> MVar a -> IO ()
putCatch act v = (act >>= putMVar v) `catch` uhandler `catch` bhandler
 where
   uhandler (ErrorCall "Prelude.undefined") = return ()
   uhandler err                             = throw err
   bhandler BlockedOnDeadMVar               = return ()

Combe forking and putCatch for convenience:

forkingPut :: IO a -> MVar a -> IO b -> IO b
forkingPut act v k = forking (putCatch act v) k

Or, in the style of Semantic editor combinators,

forkingPut = (result.result) forking putCatch

Now the code is tidy and safe:

a `race` b = do v <- newEmptyMVar
                forkingPut a v $
                  forkingPut b v $
                    takeMVar v

Recall that there’s a very slim chance of the parent thread getting killed after spinning a child and before getting ready to kill the sub-thread (i.e., the finally). If this case happens, we will not get an incorrect result. Instead, an unnecessary thread will continue to run and write its result into an mvar that no one is reading.

12 Comments

  1. Luke Palmer:

    So the result is correct, but we still have a leaked thread. This could lead to unpredictable memory leaks: consider the case where the computation is nonterminating and allocating.

    You can use Control.Exception.block to ensure safety:

    race a b = do
        v <- newEmptyMVar
        block $ do
            ta <- forkPut a v
            tb <- forkPut b v
            unblock (takeMVar v) `finally` (killThread ta >> killThread tb)
    
  2. Spencer Janssen:

    Control.Exception.bracket will avoid the race condition, ensuring that child threads are always killed:

    import Control.Concurrent
    import Control.Exception
    
    withThread a b = bracket (forkIO a) killThread (const b)
    
    race a b = do
        v <- newEmptyMVar
        let t x = x >>= putMVar v
        withThread (t a) $ withThread (t b) $ takeMVar v
    
  3. Chung-chieh Shan:

    Very nice — “tidy and safe” indeed!

  4. Peter Verswyvelen:

    Hi Conal,

    Nice to see it works. Simon Marlow also replied on your question. He posted code for the timeout function, which does something similar. See http://darcs.haskell.org/packages/base/System/Timeout.hs

    At home I played a bit with the bracket function and tested the following code:

    import Prelude hiding (catch)
    
    import Control.Concurrent
    import Control.Concurrent.MVar
    import Control.Exception
    import System.IO
    import Data.Char
    
    a `race` b = do bracket entry exit (takeMVar.fst) 
        where
          entry = do
                   v  <- newEmptyMVar
                   ta <- forkPut a v
                   tb <- forkPut b v
                   return (v,(ta,tb))
    
          exit (_,(ta,tb)) = do
            putChar 'a'
            killThread ta
            putChar 'b'
            killThread tb
            putChar 'c'
    
    forkPut :: IO a -> MVar a -> IO ThreadId
    forkPut act v = forkIO ((act >>= putMVar v) `catch` uhandler `catch` bhandler)
     where
       uhandler (ErrorCall "Prelude.undefined") = return ()
       uhandler err                             = throw err
       bhandler BlockedOnDeadMVar               = return ()
    
    sleep n = putChar (intToDigit n) >> threadDelay (n*1000000) >> putChar (intToDigit n)
    
    f = sleep 2 `race` sleep 3
    
    g = f `race` sleep 1
    
    main = do
      hSetBuffering stdout NoBuffering
      g
    

    Now, under Windows, when I run this with GHCI, I get 1231abcabc, as expected.

    But when compiling with ghc, and running the EXE, I get 1231abacb.

    I’m missing a ‘c’, which might mean some tb thread is not killed…

    When compiling with GHC -threaded and running the EXE (with or without +RTS -N2), I get 1231abacbc, so correct again.

    Bob has similar problems under OSX, he just got abc…

    What did I do wrong? Might this indicate a bug in GHC’s runtime?

  5. Peter Verswyvelen:

    Mmm, I used Spencer’s version, and added some more detail, but I still get the problem (it now does work with -threaded and +RTS -N2…). So the interpreted version kills all threads, the compiled version only does it with +RTS -N2, or at least the last putStrLn does not seem to be called.

    Ugly code follows:

    import Prelude hiding (catch)
    
    import Control.Concurrent
    import Control.Concurrent.MVar
    import Control.Exception
    import System.IO
    import Data.Char
    
    withThread a b = bracket (forkIO a) kill (const b)
        where
          kill id = do
            putStrLn ("Killing "++show id++"n")
            killThread id
            putStrLn ("Killed "++show id++"n")
    
    race a b = do
        v <- newEmptyMVar
        let t x = x >>= putMVar v
        withThread (t a) $ withThread (t b) $ takeMVar v
    
    forkPut :: IO a -> MVar a -> IO ThreadId
    forkPut act v = forkIO ((act >>= putMVar v) `catch` uhandler `catch` bhandler)
     where
       uhandler (ErrorCall "Prelude.undefined") = return ()
       uhandler err                             = throw err
       bhandler BlockedOnDeadMVar               = return ()
    
    sleep n = do
      tid <- myThreadId
      putStrLn ("Sleeping "++show n++" sec on "++show tid++"n")
      threadDelay (n*1000000)
      putStrLn ("Slept "++show n++" sec on "++show tid++"n")
    
    f = sleep 2 `race` sleep 3
    
    g = f `race` sleep 1
    
    main = do
      hSetBuffering stdout LineBuffering
      g
    

    Output with GHCI:

    C:temp>runghc racetest
    Sleeping 1 sec on ThreadId 26
    Sleeping 2 sec on ThreadId 27
    Sleeping 3 sec on ThreadId 28
    Slept 1 sec on ThreadId 26
    Killing ThreadId 26
    Killed ThreadId 26
    Killing ThreadId 25
    Killed ThreadId 25
    Killing ThreadId 28
    Killed ThreadId 28
    

    Output from compiled EXE:

    C:temp> racetest
    Sleeping 1 sec on ThreadId 5
    Sleeping 3 sec on ThreadId 7
    Sleeping 2 sec on ThreadId 6
    Slept 1 sec on ThreadId 5
    Killing ThreadId 5
    Killed ThreadId 5
    Killing ThreadId 4
    Killed ThreadId 4
    Killing ThreadId 7
    

    So “Killed ThreadId 7″ is not printed

    Output with +RTS -N2:

    C:temp> racetest +RTS -N2
    Sleeping 1 sec on ThreadId 5
    Sleeping 3 sec on ThreadId 7
    Sleeping 2 sec on ThreadId 6
    Slept 1 sec on ThreadId 5
    
    Killing ThreadId 5
    Killed ThreadId 5
    Killing ThreadId 4
    Killed ThreadId 4
    Killing ThreadId 7
    Killed ThreadId 7
    
  6. conal:

    Peter,

    I think the problem here is simply that the process exits before the threads all get a chance to terminate. In my experiments, adding even a yield to the end of main seems to ensure that all threads get killed.

  7. Peter Verswyvelen:

    Indeed. But I would like to see code that does not allow the process to exit before all of these thread finalizers did their cleanup code (since finalizers may run critical code, like turning off a nuclear power station to prevent a meltdown ;). Okay, not really an issue for Reactive.

  8. conal:

    I’m with you, Peter. I’d like a simple way to guarantee running the cleanup code.

  9. conal:

    I’ve been experimenting with various race implementations, including the ones given above. All of these smarter versions break when given the following example:

    test :: Int
    test = f (f 1) where f v = (v `unamb` 1) `seq` v

    The error message (in ghci) is “*** Exception: thread killed”.

    Definitions that give this error include my three

    a `race` b = do v  <- newEmptyMVar
                    ta <- forkPut a v
                    tb <- forkPut b v
                    takeMVar v `finally` (killThread ta >> killThread tb)
     
    a `race` b = do v <- newEmptyMVar
                    ta <- forkPut a v
                    (do tb <- forkPut b v
                        takeMVar v `finally` killThread tb)
                     `finally` killThread ta
     
    a `race` b = do v <- newEmptyMVar
                    forking (putCatch a v) $
                      forking (putCatch b v) $
                        takeMVar v

    and Luke’s:

    race a b = do
        v <- newEmptyMVar
        block $ do
            ta <- forkPut a v
            tb <- forkPut b v
            unblock (takeMVar v) `finally` (killThread ta >> killThread tb)

    and Spencer’s:

    race a b = do
        v <- newEmptyMVar
        let t x = x >>= putMVar v
        withThread (t a) $ withThread (t b) $ takeMVar v
     where
       withThread u v = bracket (forkIO u) killThread (const v)

    My original version gives the answer I want, namely 1:

    a `race` b = do v  <- newEmptyMVar
                    ta <- forkPut a v
                    tb <- forkPut b v
                    x  <- takeMVar  v
                    killThread ta
                    killThread tb
                    return x

    Here’s a definition that comes from inlining finally in the first smart version above:

    a `race` b = do v  <- newEmptyMVar
                    ta <- forkPut a v
                    tb <- forkPut b v
                    block (do
                      x <- unblock (takeMVar v) `onException` (killThread ta >> killThread tb)
                      killThread ta >> killThread tb
                      return x)

    It also fails. However, if I remove the inner unblock, I get 1, as desired:

    a `race` b = do v  <- newEmptyMVar
                    ta <- forkPut a v
                    tb <- forkPut b v
                    block (do
                      x <- (takeMVar v) `onException` (killThread ta >> killThread tb)
                      killThread ta >> killThread tb
                      return x)

    What is going on here? What is causing “*** Exception: thread killed”?

    Here is a possibly-relevant excerpt from the documentation:

    Some operations are interruptible, which means that they can receive asynchronous exceptions even in the scope of a block. Any function which may itself block is defined as interruptible; this includes Control.Concurrent.MVar.takeMVar (but not Control.Concurrent.MVar.tryTakeMVar), and most operations which perform some I/O with the outside world.

  10. Peter Verswyvelen:

    Well, I tried the following version:

    race :: IO a -> IO a -> IO a
    race a b = bracket entry exit (return . fst)
        where
          entry = block $ do
                    v <- newEmptyMVar
                    ta <- forkIO (a >>= putMVar v)
                    tb <- forkIO (b >>= putMVar v)
                    x <- takeMVar v
                    return (x,((ta,tb)))
          exit (_,((ta,tb))) = do
            killThread ta
            killThread tb
    
    

    Using GHCi or GHC –make, I get the thread killed exception…

    When using GHC –make -O, I don’t get the thread killed exception, just get the correct answer 1

    Sometimes, using GHCi, it just blocks.

    I get the same with Spencer’s version… Haven’t tried the other versions, but I expect they will also behave correctly with GHC -O…

  11. Peter Verswyvelen:

    Not sure if this helps, but if I replace

    amb :: a -> a -> IO a
    amb = race `on` evaluate
    

    by

    amb :: a -> a -> IO a
    amb = race `on` return
    

    I get back 1.

    But of course that might not be what you want, since the semantics of evaluate are different (I don’t fully understand from the GHC documentation what evaluate really does…)

    If I replace amb by “race on (($!) return)” I get the exception again

    I also made a version that hangs under GHCi instead of throwing the exception:

    race :: IO a -> IO a -> IO a
    race a b = bracket entry exit (takeMVar . fst)
        where
          entry = block $ do
                    v <- newEmptyMVar
                    ta <- forkIO (a >>= putMVar v)
                    tb <- forkIO (b >>= putMVar v)
                    putStrLn ("Waiting for either "++show ta++" or "++show tb ++"n")
                    return (v,((ta,tb)))
          exit (v,((ta,tb))) = do
            putStrLn ("Killing "++show ta++"n")
            killThread ta
            putStrLn ("Killing "++show tb++"n")
            killThread tb
            putStrLn ("Killed request send to "++show ta++" and "++show tb++"n")
    

    The version above – when run via GHCi – blocks. If I press CTRL-C it interrupts. When running again in the same session, I get 1. When running a compiled version without optimization, I get the exception. When running a compile version with optimization, I get 1.

    Weird behavior…

  12. Conal Elliott » Blog Archive » Another angle on functional future values:

    […] About « Smarter termination for thread racing […]

Leave a comment