Exercise: Rate-limiting a producer to avoid memory consumption

Posted on May 8, 2025

Intro

N.B.: I’m still learning! This post is my current best understanding of the material presented. This post in particular is very quick-and-dirty. It does not follow best practice coding conventions and may contain errors.

I’m still working my way through ParConc and there’s a “left for the reader” exercise I wanted to try. Chapter 4 introduces parallel streams and asks the reader to fill out an implementation for rate limiting.

The original code for the exercise can be found here.

The aim is to modify the IList data type to include a third constructor, and then update the streamFromList, streamMap, and streamFold functions to accommodate.

Setup

Here’s the updated IList constructor:

data IList a
    = Nil
    | Cons a (IVar (IList a))
    | Fork (Par ()) (IList a)

My understanding of what this does is as follows:

  • Nil and Cons are as before: Nil signifies the end of the stream, Cons provides one element of the stream and an IVar “box” that contains another stream that can be obtained in parallel via repeated calls to get – basically, a chain of promises.
  • Fork is new. It contains a computation p :: Par () and a ilist il :: IList a.
    • il itself contains the “remainder” of the stream, but the key part of this is that the IVar is empty until p is run. This is in contrast to the given implementation that tries to produce elements of the stream a quickly as possible.

Next, we need to update the NFData instance. We do this as:

instance NFData a => NFData (IList a) where
  rnf Nil = ()
  rnf (Cons a b) = rnf a `seq` rnf b
  rnf (Fork _p l) = rnf l

This just evaluates the IList portion of the fork. We want the caller to decide when to invoke the computation represented by p, so we don’t want to call runPar p – we just leave p alone for now.

N.B. Although modifying this instance doesn’t appear as part of the exercise instructions, I believe it’s still necessary.

Updating streamFromList and streamFold

Next, we look at streamFromList.

In the original implementation, we wanted a list like [1..10] to turn into something morally like

Cons 1 (IVar (Cons 2 (Ivar (Cons 3 (... (Cons 10 Nil))))))

where each IVar is filled (by a call to put) as fast as possible.

In the rate limiting implementation, we want something like

Cons 1 (IVar (Cons 2 (Ivar (Fork comp (Cons 4 l)))))

where l represents the “rest of the list” (with subsequent Fork constructors added periodically), but such that l is an empty IVar until comp is called.

This prevents too many list elements from piling up if we have a fast producer but slow consumer.

Here’s the modified implementation I came up with for streamFromList and streamFold. Note that we add in some Show constraints in order to help with debug traces.

streamFromList :: NFData a => Show a => Int -> Int -> [a] -> Par (Stream a)
streamFromList chunkSize forkDistance xs = do
  var <- new                            -- <1>
  fork $ loop xs var 1                  -- <2>
  return var                            -- <3>
 where
  -- We need a fork at forkDistance, forkDistance + chunkSize, forkDistance + 2 * chunkSize, etc
  needsFork :: Int -> Bool
  needsFork i = mod (i - forkDistance) chunkSize == 0 && i > 0

  loop :: Show a => NFData a => [a] -> IVar (IList a) -> Int -> Par ()
  -- No more elements of the list: just put in Nil
  loop [] var _ = do
    traceM "Producer: Nil"
    put var Nil

  loop l@(x:xs) var i =
    if not $ needsFork i

    then do
        traceM $ "Producer: Cons " <> show x
        -- We don't need rate limiting here. Put the next element into the variable
        -- and loop
        tail <- new
        put var (Cons x tail)
        loop xs tail (i+1)
    else do
      traceM $ "Producer: Fork " <> show x
      -- We DO need rate limiting here. We return the list with an EMPTY IVar,
      -- and put a Fork into the the var. When runPar is called on the Fork's first
      -- element, the tail element will fill
      tail <- new
      put var $ Fork (loop xs tail (i+1)) (Cons x tail)
      
  
-- | Reduce a stream to a single value.  This function will not return
--   until it reaches the end-of-stream.
-- <<streamFold
streamFold :: Show b => (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc instrm = do
  ilst <- get instrm
  case ilst of
    Nil      -> do
      traceM "    Consumer: Nil"
      return acc
    Cons h t -> do
      traceM $ "    Consumer: Cons " <> show h
      streamFold fn (fn acc h) t
    Fork comp (Cons x tail) -> do
      traceM $ "    Consumer: Fork " <> show x
      fork comp
      streamFold fn (fn acc x) (tail)
    otherwise -> error "should be impossible"

The Results

We’ll now test this with a simple slow fibonnaci sequence, calculating the first 40 terms in reverse (meaning the problem starts difficult, and gets easier as time goes on):

-- Testing a slow consumer, fast producer
main :: IO ()
main = do
  (chunkSize : forkDistance : []) <- getArgs
  let (cs, fd) = (read chunkSize, read forkDistance)
  print $ runPar $ do
    s <- streamFromList cs fd (reverse [1..40])
    streamFold (\y x -> show (slowFib x) <> " " <> y) "" s
    
-- Deliberately inefficient Fibonacci
slowFib :: Integer -> Integer
slowFib 0 = 0
slowFib 1 = 1
slowFib n = slowFib (n-1) + slowFib (n-2)

We compile with ghc -O2 Stream.hs -rtsopts -threaded (you must remove the module header for this to work, I think) and then run with something like ./Stream 5 0 +RTS -N2 -s. This should allow for the producer (streamFromList) to introduce at most 5 terms before the consumer (streamFold) can eliminate them.

The output from the debug traces shows what we would expect: early in the process the production/consumption is interleaved, but the producer is always 5 terms ahead of the consumer. Later, when the slowFib function takes less time, we see that the producer is sometimes less than 5 terms ahead. This starts at term number 5 in the logs below.

√ parconc-examples % ./Stream 5 0 +RTS -N2 -s                      
Producer: Cons 40
Producer: Cons 39
Producer: Cons 38
Producer: Cons 37
Producer: Fork 36
    Consumer: Cons 40
    Consumer: Cons 39
    Consumer: Cons 38
    Consumer: Cons 37
    Consumer: Fork 36
Producer: Cons 35
Producer: Cons 34
Producer: Cons 33
Producer: Cons 32
Producer: Fork 31
    Consumer: Cons 35
    Consumer: Cons 34
    Consumer: Cons 33
    Consumer: Cons 32
    Consumer: Fork 31
Producer: Cons 30
Producer: Cons 29
Producer: Cons 28
Producer: Cons 27
Producer: Fork 26
    Consumer: Cons 30
    Consumer: Cons 29
    Consumer: Cons 28
    Consumer: Cons 27
    Consumer: Fork 26
Producer: Cons 25
Producer: Cons 24
Producer: Cons 23
Producer: Cons 22
Producer: Fork 21
    Consumer: Cons 25
    Consumer: Cons 24
    Consumer: Cons 23
    Consumer: Cons 22
    Consumer: Fork 21
Producer: Cons 20
Producer: Cons 19
Producer: Cons 18
Producer: Cons 17
Producer: Fork 16
    Consumer: Cons 20
    Consumer: Cons 19
    Consumer: Cons 18
    Consumer: Cons 17
    Consumer: Fork 16
Producer: Cons 15
Producer: Cons 14
Producer: Cons 13
Producer: Cons 12
Producer: Fork 11
    Consumer: Cons 15
    Consumer: Cons 14
    Consumer: Cons 13
    Consumer: Cons 12
    Consumer: Fork 11
Producer: Cons 10
Producer: Cons 9
Producer: Cons 8
Producer: Cons 7
Producer: Fork 6
    Consumer: Cons 10
    Consumer: Cons 9
    Consumer: Cons 8
    Consumer: Cons 7
    Consumer: Fork 6
Producer: Cons 5
Producer: Cons 4
    Consumer: Cons 5
Producer: Cons 3
    Consumer: Cons 4
    Consumer: Cons 3
Producer: Cons 2
    Consumer: Cons 2
Producer: Fork 1
    Consumer: Fork 1
Producer: Nil
    Consumer: Nil

contrast this to disabling chunking (by setting the chunk size greater than the number of elements, say 100). In this case, we see that the producer finishes producing all the elements (which are now stuck in memory!) before the consumer finishes consuming the very first one.

√ parconc-examples % ./Stream 100 0 +RTS -N2 -s
Producer: Cons 40
Producer: Cons 39
Producer: Cons 38
Producer: Cons 37
Producer: Cons 36
Producer: Cons 35
    Consumer: Cons 40
Producer: Cons 34
Producer: Cons 33
Producer: Cons 32
Producer: Cons 31
Producer: Cons 30
Producer: Cons 29
Producer: Cons 28
Producer: Cons 27
Producer: Cons 26
Producer: Cons 25
Producer: Cons 24
Producer: Cons 23
Producer: Cons 22
Producer: Cons 21
Producer: Cons 20
Producer: Cons 19
Producer: Cons 18
Producer: Cons 17
Producer: Cons 16
Producer: Cons 15
Producer: Cons 14
Producer: Cons 13
Producer: Cons 12
Producer: Cons 11
Producer: Cons 10
Producer: Cons 9
Producer: Cons 8
Producer: Cons 7
Producer: Cons 6
Producer: Cons 5
Producer: Cons 4
Producer: Cons 3
Producer: Cons 2
Producer: Cons 1
Producer: Nil
    Consumer: Cons 39
    Consumer: Cons 38
    Consumer: Cons 37
    Consumer: Cons 36
    Consumer: Cons 35
    Consumer: Cons 34
    Consumer: Cons 33
    Consumer: Cons 32
    Consumer: Cons 31
    Consumer: Cons 30
    Consumer: Cons 29
    Consumer: Cons 28
    Consumer: Cons 27
    Consumer: Cons 26
    Consumer: Cons 25
    Consumer: Cons 24
    Consumer: Cons 23
    Consumer: Cons 22
    Consumer: Cons 21
    Consumer: Cons 20
    Consumer: Cons 19
    Consumer: Cons 18
    Consumer: Cons 17
    Consumer: Cons 16
    Consumer: Cons 15
    Consumer: Cons 14
    Consumer: Cons 13
    Consumer: Cons 12
    Consumer: Cons 11
    Consumer: Cons 10
    Consumer: Cons 9
    Consumer: Cons 8
    Consumer: Cons 7
    Consumer: Cons 6
    Consumer: Cons 5
    Consumer: Cons 4
    Consumer: Cons 3
    Consumer: Cons 2
    Consumer: Cons 1
    Consumer: Nil

Updating Stream Map

streamMap is both a producer and a consumer. For simplicity, we’ll make streamMap chunk its output in the same places as its input.

streamMap :: NFData b => Show b => Show a => (a -> b) -> Stream a -> Par (Stream b)
streamMap fn instrm = do
  outstrm <- new
  fork $ loop instrm outstrm
  return outstrm
 where
   loop instrm outstrm = do
    ilst <- get instrm
    case ilst of
      Nil -> put outstrm Nil
      Cons h t -> do
        traceM $ "        Transducer: Cons " <> show h <> " -> " <> show (fn h)
        newtl <- new
        put outstrm (Cons (fn h) newtl)
        loop t newtl
      Fork p (Cons x tail) -> do
        traceM $ "        Transducer: Fork " <> show x <> " -> " <> show (fn x)
        newtl <- new
        put outstrm (Fork (p >> loop tail newtl) (Cons (fn x) newtl))
      otherwise -> error "should be impossible"

This makes it so that the computation from the Fork constructor first producers the input stream to streamMap, and then uses that to produce the output stream.

We modify our main function slightly:

main :: IO ()
main = do
  (chunkSize : forkDistance : []) <- getArgs
  let (cs, fd) = (read chunkSize, read forkDistance)
  print $ runPar $ do
    -- We go from 20 to 1 instead of 40 to 1
    s <- streamFromList cs fd  (reverse [1..20])
    -- And multiply each term by 2
    s' <- streamMap (*2) s
    streamFold (\y x -> show (slowFib x) <> " " <> y) "" s'

and receive the output that we’d expect. The Producer producers, the Transducer (streamMap) transduces, and the consumer consumes. We see a similar pattern of interleaving, observing that the transducer and producer basically proceed in lockstep, with the consumer handling the rate limiting.

Producer: Cons 20
Producer: Cons 19
Producer: Cons 18
Producer: Cons 17
Producer: Fork 16
        Transducer: Cons 20 -> 40
        Transducer: Cons 19 -> 38
        Transducer: Cons 18 -> 36
    Consumer: Cons 40
        Transducer: Cons 17 -> 34
        Transducer: Fork 16 -> 32
    Consumer: Cons 38
    Consumer: Cons 36
    Consumer: Cons 34
    Consumer: Fork 32
Producer: Cons 15
Producer: Cons 14
Producer: Cons 13
Producer: Cons 12
Producer: Fork 11
        Transducer: Cons 15 -> 30
        Transducer: Cons 14 -> 28
        Transducer: Cons 13 -> 26
        Transducer: Cons 12 -> 24
        Transducer: Fork 11 -> 22
    Consumer: Cons 30
    Consumer: Cons 28
    Consumer: Cons 26
    Consumer: Cons 24
    Consumer: Fork 22
Producer: Cons 10
Producer: Cons 9
Producer: Cons 8
Producer: Cons 7
Producer: Fork 6
        Transducer: Cons 10 -> 20
        Transducer: Cons 9 -> 18
        Transducer: Cons 8 -> 16
        Transducer: Cons 7 -> 14
        Transducer: Fork 6 -> 12
    Consumer: Cons 20
    Consumer: Cons 18
    Consumer: Cons 16
    Consumer: Cons 14
    Consumer: Fork 12
Producer: Cons 5
Producer: Cons 4
Producer: Cons 3
Producer: Cons 2
Producer: Fork 1
        Transducer: Cons 5 -> 10
        Transducer: Cons 4 -> 8
        Transducer: Cons 3 -> 6
        Transducer: Cons 2 -> 4
        Transducer: Fork 1 -> 2
    Consumer: Cons 10
    Consumer: Cons 8
    Consumer: Cons 6
    Consumer: Cons 4
    Consumer: Fork 2
Producer: Nil
    Consumer: Nil

Conclusion

There are other rate-limiting strategies we could have used that might lead to better performance, but they’d be workload dependent. In particular, I was thinking that perhaps we’d want to start the producer producing n terms in advance, so that the consumer has more than a single term to consume when it encounters a Fork. This could give speedups in the case where the work done by the consume is not always substantially slower than the work done by the producer.

Perhaps I’ll revisit this in the future.