Intro
N.B.: I’m still learning! This post is my current best understanding of the material presented. This post in particular is very quick-and-dirty. It does not follow best practice coding conventions and may contain errors.
I’m still working my way through ParConc and there’s a “left for the reader” exercise I wanted to try. Chapter 4 introduces parallel streams and asks the reader to fill out an implementation for rate limiting.
The original code for the exercise can be found here.
The aim is to modify the IList
data type to include a third constructor, and then update the streamFromList
, streamMap
, and streamFold
functions to accommodate.
Setup
Here’s the updated IList
constructor:
data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a)
My understanding of what this does is as follows:
Nil
andCons
are as before:Nil
signifies the end of the stream,Cons
provides one element of the stream and anIVar
“box” that contains another stream that can be obtained in parallel via repeated calls toget
– basically, a chain of promises.Fork
is new. It contains a computationp :: Par ()
and a ilistil :: IList a
.il
itself contains the “remainder” of the stream, but the key part of this is that the IVar is empty untilp
is run. This is in contrast to the given implementation that tries to produce elements of the stream a quickly as possible.
Next, we need to update the NFData
instance. We do this as:
instance NFData a => NFData (IList a) where
Nil = ()
rnf Cons a b) = rnf a `seq` rnf b
rnf (Fork _p l) = rnf l rnf (
This just evaluates the IList
portion of the fork. We want the caller to decide when to
invoke the computation represented by p
, so we don’t want to call runPar p
– we just leave p
alone for now.
N.B. Although modifying this instance doesn’t appear as part of the exercise instructions, I believe it’s still necessary.
Updating streamFromList
and streamFold
Next, we look at streamFromList
.
In the original implementation, we wanted a list like [1..10]
to turn into something morally like
Cons 1 (IVar (Cons 2 (Ivar (Cons 3 (... (Cons 10 Nil))))))
where each IVar is filled (by a call to put
) as fast as possible.
In the rate limiting implementation, we want something like
Cons 1 (IVar (Cons 2 (Ivar (Fork comp (Cons 4 l)))))
where l
represents the “rest of the list” (with subsequent Fork
constructors added periodically), but such that l
is an empty IVar until comp
is called.
This prevents too many list elements from piling up if we have a fast producer but slow consumer.
Here’s the modified implementation I came up with for streamFromList
and streamFold
. Note that we add in some Show
constraints in order to help with debug traces.
streamFromList :: NFData a => Show a => Int -> Int -> [a] -> Par (Stream a)
= do
streamFromList chunkSize forkDistance xs <- new -- <1>
var $ loop xs var 1 -- <2>
fork return var -- <3>
where
-- We need a fork at forkDistance, forkDistance + chunkSize, forkDistance + 2 * chunkSize, etc
needsFork :: Int -> Bool
= mod (i - forkDistance) chunkSize == 0 && i > 0
needsFork i
loop :: Show a => NFData a => [a] -> IVar (IList a) -> Int -> Par ()
-- No more elements of the list: just put in Nil
= do
loop [] var _ "Producer: Nil"
traceM Nil
put var
@(x:xs) var i =
loop lif not $ needsFork i
then do
$ "Producer: Cons " <> show x
traceM -- We don't need rate limiting here. Put the next element into the variable
-- and loop
tail <- new
Cons x tail)
put var (tail (i+1)
loop xs else do
$ "Producer: Fork " <> show x
traceM -- We DO need rate limiting here. We return the list with an EMPTY IVar,
-- and put a Fork into the the var. When runPar is called on the Fork's first
-- element, the tail element will fill
tail <- new
$ Fork (loop xs tail (i+1)) (Cons x tail)
put var
-- | Reduce a stream to a single value. This function will not return
-- until it reaches the end-of-stream.
-- <<streamFold
streamFold :: Show b => (a -> b -> a) -> a -> Stream b -> Par a
!acc instrm = do
streamFold fn <- get instrm
ilst case ilst of
Nil -> do
" Consumer: Nil"
traceM return acc
Cons h t -> do
$ " Consumer: Cons " <> show h
traceM
streamFold fn (fn acc h) tFork comp (Cons x tail) -> do
$ " Consumer: Fork " <> show x
traceM
fork comptail)
streamFold fn (fn acc x) (otherwise -> error "should be impossible"
The Results
We’ll now test this with a simple slow fibonnaci sequence, calculating the first 40 terms in reverse (meaning the problem starts difficult, and gets easier as time goes on):
-- Testing a slow consumer, fast producer
main :: IO ()
= do
main : forkDistance : []) <- getArgs
(chunkSize let (cs, fd) = (read chunkSize, read forkDistance)
print $ runPar $ do
<- streamFromList cs fd (reverse [1..40])
s -> show (slowFib x) <> " " <> y) "" s
streamFold (\y x
-- Deliberately inefficient Fibonacci
slowFib :: Integer -> Integer
0 = 0
slowFib 1 = 1
slowFib = slowFib (n-1) + slowFib (n-2) slowFib n
We compile with ghc -O2 Stream.hs -rtsopts -threaded
(you must remove the module header for this to work, I think) and then run with something like ./Stream 5 0 +RTS -N2 -s
. This should allow for the producer (streamFromList
) to introduce at most 5 terms before the consumer (streamFold
) can eliminate them.
The output from the debug traces shows what we would expect: early in the process the production/consumption is interleaved, but the producer is always 5 terms ahead of the consumer. Later, when the slowFib
function takes less time, we see that the producer is sometimes less than 5 terms ahead. This starts at term number 5 in the logs below.
√ parconc-examples % ./Stream 5 0 +RTS -N2 -s
Producer: Cons 40
Producer: Cons 39
Producer: Cons 38
Producer: Cons 37
Producer: Fork 36
Consumer: Cons 40
Consumer: Cons 39
Consumer: Cons 38
Consumer: Cons 37
Consumer: Fork 36
Producer: Cons 35
Producer: Cons 34
Producer: Cons 33
Producer: Cons 32
Producer: Fork 31
Consumer: Cons 35
Consumer: Cons 34
Consumer: Cons 33
Consumer: Cons 32
Consumer: Fork 31
Producer: Cons 30
Producer: Cons 29
Producer: Cons 28
Producer: Cons 27
Producer: Fork 26
Consumer: Cons 30
Consumer: Cons 29
Consumer: Cons 28
Consumer: Cons 27
Consumer: Fork 26
Producer: Cons 25
Producer: Cons 24
Producer: Cons 23
Producer: Cons 22
Producer: Fork 21
Consumer: Cons 25
Consumer: Cons 24
Consumer: Cons 23
Consumer: Cons 22
Consumer: Fork 21
Producer: Cons 20
Producer: Cons 19
Producer: Cons 18
Producer: Cons 17
Producer: Fork 16
Consumer: Cons 20
Consumer: Cons 19
Consumer: Cons 18
Consumer: Cons 17
Consumer: Fork 16
Producer: Cons 15
Producer: Cons 14
Producer: Cons 13
Producer: Cons 12
Producer: Fork 11
Consumer: Cons 15
Consumer: Cons 14
Consumer: Cons 13
Consumer: Cons 12
Consumer: Fork 11
Producer: Cons 10
Producer: Cons 9
Producer: Cons 8
Producer: Cons 7
Producer: Fork 6
Consumer: Cons 10
Consumer: Cons 9
Consumer: Cons 8
Consumer: Cons 7
Consumer: Fork 6
Producer: Cons 5
Producer: Cons 4
Consumer: Cons 5
Producer: Cons 3
Consumer: Cons 4
Consumer: Cons 3
Producer: Cons 2
Consumer: Cons 2
Producer: Fork 1
Consumer: Fork 1
Producer: Nil
Consumer: Nil
contrast this to disabling chunking (by setting the chunk size greater than the number of elements, say 100). In this case, we see that the producer finishes producing all the elements (which are now stuck in memory!) before the consumer finishes consuming the very first one.
√ parconc-examples % ./Stream 100 0 +RTS -N2 -s
Producer: Cons 40
Producer: Cons 39
Producer: Cons 38
Producer: Cons 37
Producer: Cons 36
Producer: Cons 35
Consumer: Cons 40
Producer: Cons 34
Producer: Cons 33
Producer: Cons 32
Producer: Cons 31
Producer: Cons 30
Producer: Cons 29
Producer: Cons 28
Producer: Cons 27
Producer: Cons 26
Producer: Cons 25
Producer: Cons 24
Producer: Cons 23
Producer: Cons 22
Producer: Cons 21
Producer: Cons 20
Producer: Cons 19
Producer: Cons 18
Producer: Cons 17
Producer: Cons 16
Producer: Cons 15
Producer: Cons 14
Producer: Cons 13
Producer: Cons 12
Producer: Cons 11
Producer: Cons 10
Producer: Cons 9
Producer: Cons 8
Producer: Cons 7
Producer: Cons 6
Producer: Cons 5
Producer: Cons 4
Producer: Cons 3
Producer: Cons 2
Producer: Cons 1
Producer: Nil
Consumer: Cons 39
Consumer: Cons 38
Consumer: Cons 37
Consumer: Cons 36
Consumer: Cons 35
Consumer: Cons 34
Consumer: Cons 33
Consumer: Cons 32
Consumer: Cons 31
Consumer: Cons 30
Consumer: Cons 29
Consumer: Cons 28
Consumer: Cons 27
Consumer: Cons 26
Consumer: Cons 25
Consumer: Cons 24
Consumer: Cons 23
Consumer: Cons 22
Consumer: Cons 21
Consumer: Cons 20
Consumer: Cons 19
Consumer: Cons 18
Consumer: Cons 17
Consumer: Cons 16
Consumer: Cons 15
Consumer: Cons 14
Consumer: Cons 13
Consumer: Cons 12
Consumer: Cons 11
Consumer: Cons 10
Consumer: Cons 9
Consumer: Cons 8
Consumer: Cons 7
Consumer: Cons 6
Consumer: Cons 5
Consumer: Cons 4
Consumer: Cons 3
Consumer: Cons 2
Consumer: Cons 1
Consumer: Nil
Updating Stream Map
streamMap
is both a producer and a consumer. For simplicity, we’ll make streamMap
chunk its output in the same places as its input.
streamMap :: NFData b => Show b => Show a => (a -> b) -> Stream a -> Par (Stream b)
= do
streamMap fn instrm <- new
outstrm $ loop instrm outstrm
fork return outstrm
where
= do
loop instrm outstrm <- get instrm
ilst case ilst of
Nil -> put outstrm Nil
Cons h t -> do
$ " Transducer: Cons " <> show h <> " -> " <> show (fn h)
traceM <- new
newtl Cons (fn h) newtl)
put outstrm (
loop t newtlFork p (Cons x tail) -> do
$ " Transducer: Fork " <> show x <> " -> " <> show (fn x)
traceM <- new
newtl Fork (p >> loop tail newtl) (Cons (fn x) newtl))
put outstrm (otherwise -> error "should be impossible"
This makes it so that the computation from the Fork
constructor first producers the input stream to streamMap
, and then uses that to produce the output stream.
We modify our main function slightly:
main :: IO ()
= do
main : forkDistance : []) <- getArgs
(chunkSize let (cs, fd) = (read chunkSize, read forkDistance)
print $ runPar $ do
-- We go from 20 to 1 instead of 40 to 1
<- streamFromList cs fd (reverse [1..20])
s -- And multiply each term by 2
<- streamMap (*2) s
s' -> show (slowFib x) <> " " <> y) "" s' streamFold (\y x
and receive the output that we’d expect. The Producer producers, the Transducer (streamMap
) transduces, and the consumer consumes. We see a similar pattern of interleaving, observing that the transducer and producer basically proceed in lockstep, with the consumer handling the rate limiting.
Producer: Cons 20
Producer: Cons 19
Producer: Cons 18
Producer: Cons 17
Producer: Fork 16
Transducer: Cons 20 -> 40
Transducer: Cons 19 -> 38
Transducer: Cons 18 -> 36
Consumer: Cons 40
Transducer: Cons 17 -> 34
Transducer: Fork 16 -> 32
Consumer: Cons 38
Consumer: Cons 36
Consumer: Cons 34
Consumer: Fork 32
Producer: Cons 15
Producer: Cons 14
Producer: Cons 13
Producer: Cons 12
Producer: Fork 11
Transducer: Cons 15 -> 30
Transducer: Cons 14 -> 28
Transducer: Cons 13 -> 26
Transducer: Cons 12 -> 24
Transducer: Fork 11 -> 22
Consumer: Cons 30
Consumer: Cons 28
Consumer: Cons 26
Consumer: Cons 24
Consumer: Fork 22
Producer: Cons 10
Producer: Cons 9
Producer: Cons 8
Producer: Cons 7
Producer: Fork 6
Transducer: Cons 10 -> 20
Transducer: Cons 9 -> 18
Transducer: Cons 8 -> 16
Transducer: Cons 7 -> 14
Transducer: Fork 6 -> 12
Consumer: Cons 20
Consumer: Cons 18
Consumer: Cons 16
Consumer: Cons 14
Consumer: Fork 12
Producer: Cons 5
Producer: Cons 4
Producer: Cons 3
Producer: Cons 2
Producer: Fork 1
Transducer: Cons 5 -> 10
Transducer: Cons 4 -> 8
Transducer: Cons 3 -> 6
Transducer: Cons 2 -> 4
Transducer: Fork 1 -> 2
Consumer: Cons 10
Consumer: Cons 8
Consumer: Cons 6
Consumer: Cons 4
Consumer: Fork 2
Producer: Nil
Consumer: Nil
Conclusion
There are other rate-limiting strategies we could have used that might lead to better performance, but they’d be workload dependent. In particular, I was thinking that perhaps we’d want to start the producer producing n
terms in advance, so that the consumer has more than a single term to consume when it encounters a Fork
. This could give speedups in the case where the work done by the consume is not always substantially slower than the work done by the producer.
Perhaps I’ll revisit this in the future.