Future values via multi-threading

Future values

A previous post described future values (or simply “futures”), which are values depend on information from the future, e.g., from the real world. There I gave a simple denotational semantics for future values as time/value pairs. This post describes the multi-threaded implementation of futures in Reactive‘s Data.Reactive module.

A simple representation

Futures are represented as actions that deliver a value. These actions are required to yield the same answer every time. Having a special representation for the future that never arrives (mempty) will allow for some very important optimizations later.

data Future a = Future (IO a) | Never

A value can be “forced” from a future. If the value isn’t yet available, forcing will block.

force :: Future a -> IO a
force (Future io) = io
force Never       = hang  -- block forever

Threads and synchronization

The current implementation of futures uses Concurrent Haskell‘s forkIO (fork a thread) and MVars (synchronized communication variables).

Except for trivial futures (pure/return and mempty), the action in a future will simply be reading an MVar. Internal to the implementation:

newFuture :: IO (Future a, a -> IO ())
newFuture = do v <- newEmptyMVar
               return (Future (readMVar v), putMVar v)

The MVar is written to as the final step of a forked thread. Importantly, it is to be written only once. (It’s really an IVar.)

future :: IO a -> Future a
future mka = unsafePerformIO $
             do (fut,sink) <- newFuture
                forkIO $ mka >>= sink
                return fut

Note that the actual value is computed just once, and is accessed cheaply.

Functor, Applicative, and Monad

The Functor, Applicative, and Monad instances are defined easily in terms of future and the corresponding instances for IO:

instance Functor Future where
  fmap f (Future get) = future (fmap f get)
  fmap _ Never        = Never

instance Applicative Future where
  pure a                      = Future (pure a)
  Future getf <*> Future getx = future (getf <*> getx)
  _           <*> _           = Never

instance Monad Future where
  return            = pure
  Future geta >>= h = future (geta >>= force . h)
  Never       >>= _ = Never

Monoid — racing

The remaining class to implement is Monoid. The mempty method is Never. The mappend method is to select the earlier of two futures, which is implemented by having the two futures race to extrat a value:

instance Monoid (Future a) where
  mempty  = Never
  mappend = race

race :: Future a -> Future a -> Future a

Racing is easy in the case either is Never:

Never `race` b     = b
a     `race` Never = a

Otherwise, spin a thread for each future. The winner kills the loser.

a `race` b = unsafePerformIO $
             do (c,sink) <- newFuture
                let run fut tid = forkIO $ do x <- force fut
                                              killThread tid
                                              sink x
                mdo ta <- run a tb
                    tb <- run b ta
                    return ()
                return c

The problem

This last piece of the implementation can fall short of the semantics. For a `mappend` b, we might get b instead of a even if they’re available simultaneously. It’s even possible to get the later of the two if they’re nearly simultaneous.

Edit (2008-02-02): although simultaneous physical events are extremely unlikely, futures are compositional, so it’s easy to construct two distinct but simultaneous futures in terms of on a common physical future.

What will it take to get deterministic semantics for a `mappend` b? Here’s an idea: include an explicit time in a future. When one future happens with a time t, query whether the other one occurs by the same time. What does it take to support this query operation?

A simpler implementation

In this implementation, futures (other than Never) hold actions that typically read an MVar that is written only once. They could instead simply hold lazy values.

data Future a = Future a | Never

Forcing a future evaluates to weak head normal form (WHNF).

force :: Future a -> IO a
force (Future a) = a `seq` return a
force Never      = hang

Making new future would work almost identically, just adding an unsafePerformIO.

newFuture :: IO (Future a, a -> IO ())
newFuture = 
  do v <- newEmptyMVar
     return (Future (unsafePerformIO $ readMVar v), putMVar v)

The Functor, Applicative, and Monad instances simplify considerably, using function application directly, instead of the corresponding methods from the Functor, Applicative, and Monad instances of IO.

instance Functor Future where
  fmap f (Future a) = Future (f a)
  fmap _ Never      = Never

instance Applicative Future where
  pure a                = Future a
  Future f <*> Future x = Future (f x)
  _        <*> _        = Never

instance Monad Future where
  return         = pure
  Future a >>= h = h a
  Never    >>= _ = Never

The Monoid instance is completely unchanged.

How does this implementation compare with the one above?

  • It’s simpler, as stated.
  • It’s a bit more efficient. Each MVar is only read once. Also, the Functor, Applicative, and Monad instances no longer create threads and MVars.
  • The type statically enforces that forcing a future always gives the same result.
  • Forcing a future reduces the value to WHNF and so is probably a little less lazy than the implementation above.

I’ve tried out this implementation and found that it intermittently crashes some examples that work fine in the IO version. I have no idea why, and I’d be very interested in ideas about it or anything else in this post.

10 Comments

  1. Ivan:

    A very interesting article :)

    In the race function, what happens in the unlikely case that the two threads both perform the ‘killThread’ action before either gets to ‘sink x’?

    Also, why is it a problem that in a mappend b we may get b when a and b are available simultaneously?

    Ivan

  2. conal:

    In the race function, what happens in the unlikely case that the two threads both perform the ‘killThread’ action before either gets to ’sink x’?

    I think that could happen, and the result wouldn’t be pleasant, as there would be no winner instead of one or two. Perhaps a safer alternative would be to swap the killThread and sink lines. Then there’d be a possibility of two sinks getting through. Both sink calls would putMVar to the same MVar, which means the second call would block until its thread is killed later. But there’s still a gotcha: the readMVar operation is not atomic. It does a takeMVar followed by a putMVar. Between those two calls, the blocked putMVar thread could easily be scheduled. The result is that the same future could report two different values. I think the “simple implementation” avoids that problem by hiding the action that contains the readMVar, allowing it to get executed at most once.

    I don’t expect any of these remarks to be reassuring. At least, they’re not to me.

    Also, why is it a problem that in a mappend b we may get b when a and b are available simultaneously?

    Because I want a clear & deterministic (denotational) semantics. Why? For tractability of reasoning. The semantics has to specify one bias or the other in the case of simultaneity.

  3. Russell:

    Instead of

    a `seq` return a

    I recommend the slightly less evil

    Control.Exception.evaluate a

  4. Conal Elliott » Blog Archive » Invasion of the composable Mutant-Bots:

    […] One thing I like a lot about the implementations in this post, compared with Reactive, is that they do not need any concurrency, and so it’s easy to achieve deterministic semantics. I don’t quite know how to do that for Reactive, as mentioned in “Future values via multi-threading.” […]

  5. sjanssen:

    It seems to me that this is not referentially transparent:

    let x = future getChar in liftA2 (,) x x

    vs.

    liftA2 (,) (future getChar) (future getChar)

  6. conal:

    sjanssen wrote:

    It seems to me that this is not referentially transparent: ...

    To me also. I think the uses of future in Future.hs are referentially transparent. I could simply not export future or make a clear explanation of how I intend it be used.

  7. Jeffrey Yasskin:

    I don’t think you should worry so much about left-biasing mappend. Assuming that you intend this to be useful on multi-core machines, you need to understand that there is no such thing as simultaneity. Take the following example (known as Independent Reads of Independent Writes, IRIW), with a==b==0 initially:

    Thread 1 Thread 2 Thread 3 Thread 4

    r1 = a r3 = b a = 1 b = 1

    r2 = b r4 = a

    On many 4-processor systems, it is quite possible that r1 == r3 == 1 and r2 == r4 == 0, proving that a was written before b to thread 1, and b was written before a to thread 2. Not only didn’t they happen at the same time, they happened in inconsistent orders in different places. (It’s very relativistic.) When processors do allow you to eliminate this possibility, they instead guarantee that memory actions happen in a global total order. Again, no simultaneity.

    So you have two choices. Either your Time parameter models the behavior of your Futures, in which case you have to arrange to use the fairly expensive sequentially-consistent instructions for reads and writes to shared variables (which guarantees that no two events will be simultaneous). Or a totally-ordered Time is inappropriate to model the semantics of Futures, and you instead have to fall back on something like the happens-before partial order that defines the Java and C++0x memory models. There may be another good semantics for this besides happens-before, but it’s unlikely to present you with the problem of events that are provably simultaneous, so even if mappend picks “wrong”, your users will never be able to tell.

    P.S. I’m working on a C++ Futures library and had realized that futures were monadic, but these articles are a great summary of the realization. Thanks!

  8. conal:

    Thanks for the comment, Jeffrey.

    I wonder if you understood what I’m after in an implementation, which is to realize (implement) the simple, determinate, denotational semantics from the earlier Future values post. That semantics includes and addresses simultaneity, and implementing it faithfully doesn’t require hardware simultaneity.

    My original demand-driven FRP implementations implemented this semantics (including determinacy and simultaneity) correctly (whether on uniprocessor or multiprocessor), as does the new data-driven implementation described in Simply efficient functional reactivity.

  9. Jeffrey Yasskin:

    I didn’t completely understand, and I was also too close to C++. It looks like MVars do guarantee sequential consistency, so you don’t have to worry about only having a partial order on time. Then I missed that fmap and mappend exactly propagate their input times, despite taking a finite amount of real time to execute. So it’s possible for:

    do a <- future (return 0)
       b <- (+1) `fmap` a
       c <- (+2) `fmap` a
       val_bc <- force (b `mappend` c)
       val_cb <- force (c `mappend` b)
       guard (val_bc == 1 && val_cb == 2)
    

    to fail even though the denotational semantics say it should succeed. Even if fmap introduced a new time, it would take some tricky programming to completely avoid the possibility that (val_bc == 2 && val_cb==1), which would prove that they happened in an inconsistent order. But Improving already implements most of that tricky programming, so great!

    After skimming your paper, I didn’t see a way to introduce Futures or (Improving Time)s, just the compositions. To get time to work, I think you need a global monotonic MVar counter. Then when the future gets its value, you do something like:

    do c_val <- takeMVar counter
       -- Important that this happens while counter has no value:
       putMVar an_mvar_inside_the_improving c_val
       putMVar counter (c_val + 1)
    

    and to write exact and compare_I for a particular Future, you use:

    exact = unsafePerformIO $ readMVar an_mvar_inside_the_improving  -- bound from the creation of the future
    compare_I other = unsafePerformIO $ do
        -- make sure that the global counter has passed other's value:
        evaluate other
        my_val <- tryTakeMVar an_mvar_inside_the_improving
        case my_val of
            -- If an_mvar... is not set any time after other was set, it's guaranteed to get a higher value.
            Nothing -> return GT
            Just val -> do putMVar an_mvar_inside_the_improving val
                           return $ compare val other
    

    I haven’t run this, so it’s very possible it doesn’t actually work. It’s also fairly expensive to maintain that global counter on a machine with lots of processors because the cache line has to move between processors a lot, but that may be masked under other Haskell overhead.

    Or have you already implemented all of that somewhere that I missed?

    Also, according to http://www.haskell.org/ghc/docs/latest/html/libraries/base/Control-Concurrent.html#v%3AkillThread, “if there are two threads that can kill each other, it is guaranteed that only one of the threads will get to kill the other.”, so you don’t actually need the lock in race.

  10. Conal Elliott » Blog Archive » Another angle on functional future values:

    […] follow-up post gave an implementation of Future values via multi threading. Unfortunately, that implementation did not necessarily satisfy the semantics, because it allowed […]

Leave a comment