Conduit

Conduits are a solution to the streaming data problem. Often times, laziness allows us to process large amounts of data without pulling all values into memory. However, doing so in the presence of I/O requires us to use lazy I/O. The main downside to lazy I/O is non-determinism: we have no guarantees of when our resource finalizers will be run. For small application, this may be acceptable, but for a high-load server, we could quickly run out of scarce resources, such as file handles.

Conduits allow us to process large streams of data while still retaining deterministic resource handling. They provide a unified interface for data streams, whether they come from files, sockets, or memory. And when combined with ResourceT, we can safely allocate resources, knowing that they will always be reclaimed- even in the presence of exceptions.

This appendix covers version 0.2 of the conduit package.

Conduits in Five Minutes

While a good understanding of the lower-level mechanics of conduits is advisable, you can get very far without it. Let’s start off with some high-level examples. Don’t worry if some of the details seem a bit magical right now. We’ll cover everything in the course of this appendix. Let’s start with the terminology, and then some sample code.

Source

A producer of data. The data could be in a file, coming from a socket, or in memory as a list. To access this data, we pull from the source.

Sink

A consumer of data. Basic examples would be a sum function (adding up a stream of numbers fed in), a file sink (which writes all incoming bytes to a file), or a socket. We push data into a sink. When the sink finishes processing (we’ll explain that later), it returns some value.

Conduit

A transformer of data. The simplest example is a map function, though there are many others. Like a sink, we push data into a conduit. But instead of returning a single value at the end, a conduit can return multiple outputs every time it is pushed to.

Fuse

(Thanks to David Mazieres for the term.) A conduit can be fused with a source to produce a new, modified source (the $= operator). For example, you could have a source that reads bytes from a file, and a conduit that decodes bytes into text. If you fuse them together, you would now have a source that reads text from a file. Likewise, a conduit and a sink can fuse into a new sink (=$), and two conduits can fuse into a new conduit (=$=).

Connect

You can connect a source to a sink using the $$ operator. Doing so will pull data from the source and push it to the sink, until either the source or sink signals that they are “done.”

Let’s see some examples of conduit code.

  1. {-# LANGUAGE OverloadedStrings #-}
  2. import Data.Conduit -- the core library
  3. import qualified Data.Conduit.List as CL -- some list-like functions
  4. import qualified Data.Conduit.Binary as CB -- bytes
  5. import qualified Data.Conduit.Text as CT
  6. import Data.ByteString (ByteString)
  7. import Data.Text (Text)
  8. import qualified Data.Text as T
  9. import Control.Monad.ST (runST)
  10. -- Let's start with the basics: connecting a source to a sink. We'll use the
  11. -- built in file functions to implementing efficient, constant-memory,
  12. -- resource-friendly file copying.
  13. --
  14. -- Two things to note: we use $$ to connect our source to our sink, and then
  15. -- use runResourceT.
  16. copyFile :: FilePath -> FilePath -> IO ()
  17. copyFile src dest = runResourceT $ CB.sourceFile src $$ CB.sinkFile dest
  18. -- The Data.Conduit.List module provides a number of helper functions for
  19. -- creating sources, sinks, and conduits. Let's look at a typical fold: summing
  20. -- numbers.
  21. sumSink :: Resource m => Sink Int m Int
  22. sumSink = CL.fold (+) 0
  23. -- If we want to go a little more low-level, we can code our sink with the
  24. -- sinkState function. This function takes three parameters: an initial state,
  25. -- a push function (receive some more data), and a close function.
  26. sumSink2 :: Resource m => Sink Int m Int
  27. sumSink2 = sinkState
  28. 0 -- initial value
  29. -- update the state with the new input and
  30. -- indicate that we want more input
  31. (\accum i -> return $ StateProcessing (accum + i))
  32. (\accum -> return accum) -- return the current accum value on close
  33. -- Another common helper function is sourceList. Let's see how we can combine
  34. -- that function with our sumSink to reimplement the built-in sum function.
  35. sum' :: [Int] -> Int
  36. sum' input = runST $ runResourceT $ CL.sourceList input $$ sumSink
  37. -- Since this is Haskell, let's write a source to generate all of the
  38. -- Fibonacci numbers. We'll use sourceState. The state will contain the next
  39. -- two numbers in the sequence. We also need to provide a pull function, which
  40. -- will return the next number and update the state.
  41. fibs :: Resource m => Source m Int
  42. fibs = sourceState
  43. (0, 1) -- initial state
  44. (\(x, y) -> return $ StateOpen (y, x + y) x)
  45. -- Suppose we want to get the sum of the first 10 Fibonacci numbers. We can use
  46. -- the isolate conduit to make sure the sum sink only consumes 10 values.
  47. sumTenFibs :: Int
  48. sumTenFibs =
  49. runST -- runs fine in pure code
  50. $ runResourceT
  51. $ fibs
  52. $= CL.isolate 10 -- fuse the source and conduit into a source
  53. $$ sumSink
  54. -- We can also fuse the conduit into the sink instead, we just swap a few
  55. -- operators.
  56. sumTenFibs2 :: Int
  57. sumTenFibs2 =
  58. runST
  59. $ runResourceT
  60. $ fibs
  61. $$ CL.isolate 10
  62. =$ sumSink
  63. -- Alright, let's make some conduits. Let's turn our numbers into text. Sounds
  64. -- like a job for a map...
  65. intToText :: Int -> Text -- just a helper function
  66. intToText = T.pack . show
  67. textify :: Resource m => Conduit Int m Text
  68. textify = CL.map intToText
  69. -- Like previously, we can use a conduitState helper function. But here, we
  70. -- don't even need state, so we provide a dummy state value.
  71. textify2 :: Resource m => Conduit Int m Text
  72. textify2 = conduitState
  73. ()
  74. (\() input -> return $ StateProducing () [intToText input])
  75. (\() -> return [])
  76. -- Let's make the unlines conduit, that puts a newline on the end of each piece
  77. -- of input. We'll just use CL.map; feel free to write it with conduitState as
  78. -- well for practice.
  79. unlines' :: Resource m => Conduit Text m Text
  80. unlines' = CL.map $ \t -> t `T.append` "\n"
  81. -- And let's write a function that prints the first N fibs to a file. We'll
  82. -- use UTF8 encoding.
  83. writeFibs :: Int -> FilePath -> IO ()
  84. writeFibs count dest =
  85. runResourceT
  86. $ fibs
  87. $= CL.isolate count
  88. $= textify
  89. $= unlines'
  90. $= CT.encode CT.utf8
  91. $$ CB.sinkFile dest
  92. -- We used the $= operator to fuse the conduits into the sources, producing a
  93. -- single source. We can also do the opposite: fuse the conduits into the sink. We can even combine the two.
  94. writeFibs2 :: Int -> FilePath -> IO ()
  95. writeFibs2 count dest =
  96. runResourceT
  97. $ fibs
  98. $= CL.isolate count
  99. $= textify
  100. $$ unlines'
  101. =$ CT.encode CT.utf8
  102. =$ CB.sinkFile dest
  103. -- Or we could fuse all those inner conduits into a single conduit...
  104. someIntLines :: ResourceThrow m -- encoding can throw an exception
  105. => Int
  106. -> Conduit Int m ByteString
  107. someIntLines count =
  108. CL.isolate count
  109. =$= textify
  110. =$= unlines'
  111. =$= CT.encode CT.utf8
  112. -- and then use that conduit
  113. writeFibs3 :: Int -> FilePath -> IO ()
  114. writeFibs3 count dest =
  115. runResourceT
  116. $ fibs
  117. $= someIntLines count
  118. $$ CB.sinkFile dest
  119. main :: IO ()
  120. main = do
  121. putStrLn $ "First ten fibs: " ++ show sumTenFibs
  122. writeFibs 20 "fibs.txt"
  123. copyFile "fibs.txt" "fibs2.txt"

Structure of this Chapter

The remainder of this chapter covers five major topics in conduits:

  • ResourceT, the underlying technique that allows us to have guaranteed resource deallocation.

  • Sources, our data producers

  • Sinks, our data consumers

  • Conduits, our data transformers

  • Buffering, which allows us to avoid Inversion of Control

The Resource monad transformer

The Resource transformer (ResourceT) plays a vital role in proper resource management in the conduit project. It is included within the conduit package itself. We’ll explaining ResourceT as its own entity. While some of the design decisions clearly are biased towards conduits, ResourceT should remain a usable tool in its own right.

Goals

What’s wrong with the following code?

  1. import System.IO
  2. main = do
  3. output <- openFile "output.txt" WriteMode
  4. input <- openFile "input.txt" ReadMode
  5. hGetContents input >>= hPutStr output
  6. hClose input
  7. hClose output

If the file input.txt does not exist, then an exception will be thrown when trying to open it. As a result, hClose output will never be called, and we’ll have leaked a scarce resource (a file descriptor). In our tiny program, this isn’t a big deal, but clearly we can’t afford such waste in a long running, highly active server process.

Fortunately, solving the problem is easy:

  1. import System.IO
  2. main =
  3. withFile "output.txt" WriteMode $ \output ->
  4. withFile "input.txt" ReadMode $ \input ->
  5. hGetContents input >>= hPutStr output

withFile makes sure that the Handle is always closed, even in the presence of exceptions. It also handles asynchronous exceptions. Overall, it’s a great approach to use… when you can use it. While often withFile is easy to use, sometimes it can require restructuring our programs. And this restructuring can range from mildly tedious to wildly inefficient.

Let’s take enumerators for example. If you look in the documentation, there is an enumFile function (for reading contents from a file), but no iterFile (for writing contents to a file). That’s because the flow of control in an iteratee doesn’t allow proper allocation of the Handle. Instead, in order to write to a file, you need to allocate the Handle before entering the Iteratee, e.g.:

  1. import System.IO
  2. import Data.Enumerator
  3. import Data.Enumerator.Binary
  4. main =
  5. withFile "output.txt" WriteMode $ \output ->
  6. run_ $ enumFile "input.txt" $$ iterHandle output

This code works fine, but imagine that, instead of simply piping data directly to the file, there was a huge amount of computation that occurred before we need to use the output handle. We will have allocated a file descriptor long before we needed it, and thereby locked up a scarce resource in our application. Besides this, there are times when we can’t allocate the file before hand, such as when we won’t know which file to open until we’ve read from the input file.

One of the stated goals of conduits is to solve this problem, and it does so via ResourceT. As a result, the above program can be written in conduit as:

  1. {-# LANGUAGE OverloadedStrings #-}
  2. import Data.Conduit
  3. import Data.Conduit.Binary
  4. main = runResourceT $ sourceFile "input.txt" $$ sinkFile "output.txt"

How it Works

There are essentially three base functions on ResourceT, and then a bunch of conveniences thrown on top. The first function is:

  1. register :: IO () -> ResourceT IO ReleaseKey

This function, and the others below, are actually more polymorphic than implied here, allowing other monads besides IO. In fact, almost any transformer on top of IO, as well as any ST stacks, work. We’ll cover the details of that later.

This function registers a piece of code that it asserts must be run. It gives back a ReleaseKey, which is used by the next function:

  1. release :: ReleaseKey -> ResourceT IO ()

Calling release on a ReleaseKey immediately performs the action you previously registered. You may call release on the same ReleaseKey as many times as you like; the first time it is called, it unregisters the action. This means you can safely register an action like a memory free, and have no concerns that it will be called twice.

Eventually, we’ll want to exit our special ResourceT. To do so, we use:

  1. runResourceT :: ResourceT IO a -> IO a

This seemingly innocuous function is where all the magic happens. It runs through all of the registered cleanup actions and performs them. It is fully exception safe, meaning the cleanups will be performed in the presence of both synchronous and asynchronous exceptions. And as mentioned before, calling release will unregister an action, so there is no concern of double-freeing.

Finally, as a convenience, we provide one more function for the common case of allocating a resource and registering a release action:

  1. with :: IO a -- ^ allocate
  2. -> (a -> IO ()) -- ^ free resource
  3. -> ResourceT IO (ReleaseKey, a)

So, to rework our first buggy example to use ResourceT, we would write:

  1. import System.IO
  2. import Control.Monad.Trans.Resource
  3. import Control.Monad.Trans.Class (lift)
  4. main = runResourceT $ do
  5. (releaseO, output) <- with (openFile "output.txt" WriteMode) hClose
  6. (releaseI, input) <- with (openFile "input.txt" ReadMode) hClose
  7. lift $ hGetContents input >>= hPutStr output
  8. release releaseI
  9. release releaseO

Now there is no concern of any exceptions preventing the releasing of resources. We could skip the release calls if we want to, and in an example this small, it would not make any difference. But for larger applications, where we want processing to continue, this ensures that the Handles are freed as early as possible, keeping our scarce resource usage to a minimum.

Some Type Magic

As alluded to, there’s a bit more to ResourceT than simply running in IO. Let’s cover some of the things we need from this underlying Monad.

  • Mutable references to keep track of the registered release actions. You might think we could just use a StateT transformer, but then our state wouldn’t survive exceptions.

  • We only want to register actions in the base monad. For example, if we have a ResourceT (WriterT [Int] IO) stack, we only want to register IO actions. This makes it easy to lift our stacks around (i.e., add an extra transformer to the middle of an existing stack), and avoids confusing issues about the threading of other monadic side-effects.

  • Some way to guarantee an action is performed, even in the presence of exceptions. This boils down to needing a bracket-like function.

For the first point, we define a new typeclass to represent monads that have mutable references:

  1. class Monad m => HasRef m where
  2. type Ref m :: * -> *
  3. newRef' :: a -> m (Ref m a)
  4. readRef' :: Ref m a -> m a
  5. writeRef' :: Ref m a -> a -> m ()
  6. modifyRef' :: Ref m a -> (a -> (a, b)) -> m b
  7. mask :: ((forall a. m a -> m a) -> m b) -> m b
  8. mask_ :: m a -> m a
  9. try :: m a -> m (Either SomeException a)

We have an associated type to signify what the reference type should be. (For fans of fundeps, you’ll see in the next section that this has to be an associated type.) Then we provide a number of basic reference operations. Finally, there are some functions to help with exceptions, which are needed to safely implement the functions described in the last section. The instance for IO is very straight-forward:

  1. instance HasRef IO where
  2. type Ref IO = I.IORef
  3. newRef' = I.newIORef
  4. modifyRef' = I.atomicModifyIORef
  5. readRef' = I.readIORef
  6. writeRef' = I.writeIORef
  7. mask = E.mask
  8. mask_ = E.mask_
  9. try = E.try

However, we have a problem when it comes to implementing the ST instance: there is no way to deal with exceptions in the ST monad. As a result, mask, mask_ and try are given default implementations that do no exception checking. This gives rise to the first word of warning: operations in the ST monad are not exception safe. You should not be allocating scarce resources in ST when using ResourceT. You might be wondering why bother with ResourceT at all then for ST. The answer is that there is a lot you can do with conduits without allocating scarce resources, and ST is a great way to do this in a pure way. But more on this later.

Now onto point 2: we need some way to deal with this base monad concept. Again, we use an associated type (again explained in the next section). Our solution looks something like:

  1. class (HasRef (Base m), Monad m) => Resource m where
  2. type Base m :: * -> *
  3. resourceLiftBase :: Base m a -> m a

But we forgot about point 3: some bracket-like function. So we need one more method in this typeclass:

  1. resourceBracket_ :: Base m a -> Base m b -> m c -> m c

The reason the first two arguments to resourceBracket_ (allocation and cleanup) live in Base m instead of m is that, in ResourceT, all allocation and cleanup lives in the base monad.

So on top of our HasRef instance for IO, we now need a Resource instance as well. This is similarly straight-forward:

  1. instance Resource IO where
  2. type Base IO = IO
  3. resourceLiftBase = id
  4. resourceBracket_ = E.bracket_

We have similar ST instances, with resourceBracket_ having no exception safety. The final step is dealing with monad transformers. We don’t need to provide a HasRef instance, but we do need a Resource instance. The tricky part is providing a valid implementation of resourceBracket_. For this, we use some functions from monad-control:

  1. instance (MonadTransControl t, Resource m, Monad (t m))
  2. => Resource (t m) where
  3. type Base (t m) = Base m
  4. resourceLiftBase = lift . resourceLiftBase
  5. resourceBracket_ a b c =
  6. control' $ \run -> resourceBracket_ a b (run c)
  7. where
  8. control' f = liftWith f >>= restoreT . return

For any transformer, its base is the base of its inner monad. Similarly, we lift to the base by lifting to the inner monad and then lifting to the base from there. The tricky part is the implemetnation of resourceBracket_. I will not go into a detailed explanation, as I would simply make a fool of myself.

Definition of ResourceT

We now have enough information to understand the definition of ResourceT:

  1. newtype ReleaseKey = ReleaseKey Int
  2. type RefCount = Int
  3. type NextKey = Int
  4. data ReleaseMap base =
  5. ReleaseMap !NextKey !RefCount !(IntMap (base ()))
  6. newtype ResourceT m a =
  7. ResourceT (Ref (Base m) (ReleaseMap (Base m)) -> m a)

We see that ReleaseKey is simply an Int. If you skip a few lines down, this will make sense, since we’re using an IntMap to keep track of the registered actions. We also define two type synonyms: RefCount and NextKey. NextKey keeps track of the most recently assigned value for a key, and is incremented each time register is called. We’ll touch on RefCount later.

The ReleaseMap is three pieces of information: the next key and the reference count, and then the map of all registered actions. Notice that ReleaseMap takes a type parameter base, which states which monad release actions must live in.

Finally, a ResourceT is essentially a ReaderT that keeps a mutable reference to a ReleaseMap. The reference type is determined by the base of the monad in question, as is the cleanup monad. This is why we need to use associated types.

The majority of the rest of the code in the Control.Monad.Trans.Resource module is just providing instances for the ResourceT type.

Other Typeclasses

There are three other typeclasses provided by the module:

ResourceUnsafeIO

Any monad which can lift IO actions into it, but that this may be considered unsafe. The prime candidate here is ST. Care should be taken to only lift actions which do not acquire scarce resources and which don’t “fire the missiles.” In other words, all the normal warnings of unsafeIOToST apply.

ResourceThrow

For actions that can throw exceptions. This automatically applies to all IO-based monads. For ST-based monads, you can use the supplied ExceptionT transformer to provide exception-throwing capabilities. Some functions in conduit, for example, will require this (e.g., text decoding).

ResourceIO

A convenience class tying together a bunch of other classes, included the two mentioned above. This is purely for convenience; you could achieve the same effect without this type class, you’d just have to do a lot more typing.

Forking

It would seem that forking a thread would be inherently unsafe with ResourceT, since the parent thread may call runResourceT while the child thread is still accessing some of the allocated resources. This is indeed true, if you use the normal forkIO function.

You can’t actually use the standard forkIO, since it only operates in the IO monad, but you could use the fork function from lifted-base. In fact, due to this issue, the regions package does not provide a MonadBaseControl instance for its transformer (which is very similar to ResourceT). However, our goal in ResourceT is not to make it impossible for programmers to mess up, only to make it easier to do the right thing. Therefore, we still provide the instance, even though it could be abused.

In order to solve this, ResourceT includes reference counting. When you fork a new thread via resourceForkIO, the RefCount value of the ReleaseMap is incremented. Every time runResourceT is called, the value is decremented. Only when the value hits 0 are all the release actions called.

Convenience Exports

In addition to what’s been listed so far, there are a few extra functions exported (mostly) for convenience.

  • newRef, writeRef, and readRef wrap up the HasRef versions of the functions and allow them to run in any ResourceT.

  • withIO is essentially a type-restricted version of with, but working around some of the nastiness with types you would otherwise run into. In general: you’ll want to use withIO when writing IO code.

  • transResourceT let’s you modify which monad your ResourceT is running in, assuming it keeps the same base.

    1. transResourceT :: (Base m ~ Base n)
    2. => (m a -> n a)
    3. -> ResourceT m a
    4. -> ResourceT n a
    5. transResourceT f (ResourceT mx) = ResourceT (\r -> f (mx r))

Source

I think it’s simplest to understand sources by looking at the types:

  1. data SourceResult m a = Open (Source m a) a | Closed
  2. data Source m a = Source
  3. { sourcePull :: ResourceT m (SourceResult m a)
  4. , sourceClose :: ResourceT m ()
  5. }

A source has just two operations on it: you can pull data from it, and you can close it (think of closing a file handle). When you pull, you either get some data and the a new Source (the source is still open), or nothing (the source is closed). Let’s look at some of the simplest sources:

  1. import Prelude hiding (repeat)
  2. import Data.Conduit
  3. -- | Never give any data
  4. eof :: Monad m => Source m a
  5. eof = Source
  6. { sourcePull = return Closed
  7. , sourceClose = return ()
  8. }
  9. -- | Always give the same value
  10. repeat :: Monad m => a -> Source m a
  11. repeat a = Source
  12. { sourcePull = return $ Open (repeat a) a
  13. , sourceClose = return ()
  14. }

These sources are very straight-forward, since they always return the same results. Additionally, their close records don’t do anything. You might think that this is a bug: shouldn’t a call to sourcePull return Closed after it’s been closed? This isn’t required, since one of the rules of sources is that they can never be reused. In other words:

  • If a Source returns Open, it has provided you with a new Source which you should use in place of the original one.

  • If it returns Closed, then you cannot perform any more operations on it.

Don’t worry too much about the invariant. In practice, you will almost never call sourcePull or sourceClose yourself. In fact, you hardly even write them yourself either (that’s what sourceState and sourceIO are for). The point is that we can make some assumptions when we implement our sources.

State

There is something similar about the two sources mentioned above: they never change. They always return the same value. In other words, they have no state. For almost all serious sources, we’ll need some kind of state.

The state might actually be defined outside of our program. For example, if we write a source that reads data from a Handle, we don’t need to manually specify any state, since the Handle itself already has.

The way we store state in a source is by updating the returned Source value in the Open constructor. This is best seen with an example.

  1. import Data.Conduit
  2. import Control.Monad.Trans.Resource
  3. -- | Provide data from the list, one element at a time.
  4. sourceList :: Resource m => [a] -> Source m a
  5. sourceList list = Source
  6. { sourcePull =
  7. case list of
  8. [] -> return Closed -- no more data
  9. -- This is where we store our state: by return a new
  10. -- Source with the rest of the list
  11. x:xs -> return $ Open (sourceList xs) x
  12. , sourceClose = return ()
  13. }

Each time we pull from the source, it checks the input list. If the list is empty, pulling returns Closed, which makes sense. If the list is not empty, pulling returns Open with both the next value in the list, and a new Source value containing the rest of the input list.

sourceState and sourceIO

In addition to being able to manually create Sources, we also have a few convenience functions that allow us to create most sources in a more high-level fashion. sourceState let’s you write code similar to how you would use the State monad. You provide an initial state, your pull function is provided with the current state, and it returns a new state and a return value. Let’s use this to reimplement sourceList.

  1. import Data.Conduit
  2. import Control.Monad.Trans.Resource
  3. -- | Provide data from the list, one element at a time.
  4. sourceList :: Resource m => [a] -> Source m a
  5. sourceList state0 = sourceState
  6. state0
  7. pull
  8. where
  9. pull [] = return StateClosed
  10. pull (x:xs) = return $ StateOpen xs x

Notice the usage of the StateClosed and StateOpen constructors. These are very similar to Closed and Open, except that instead of specifying the next Source to be used, you provide the next state (here, the remainder of the list).

The other common activity is to perform some I/O allocation (like opening a file), registering some cleanup action (closing that file), and having a function for pulling data from that resource. conduit comes built-in with a sourceFile function that gives a stream of ByteStrings. Let’s write a wildly inefficient alternative that returns a stream of characters.

  1. import Data.Conduit
  2. import Control.Monad.Trans.Resource
  3. import System.IO
  4. import Control.Monad.IO.Class (liftIO)
  5. sourceFile :: ResourceIO m => FilePath -> Source m Char
  6. sourceFile fp = sourceIO
  7. (openFile fp ReadMode)
  8. hClose
  9. (\h -> liftIO $ do
  10. eof <- hIsEOF h
  11. if eof
  12. then return IOClosed
  13. else fmap IOOpen $ hGetChar h)

Like sourceState, it uses a variant on the Open and Closed constructors. sourceIO does a number of things for us:

  • It registers the cleanup function with the ResourceT transformer, ensuring it gets called even in the presence of exceptions.

  • It sets up the sourceClose record to release the resource immediately.

  • As soon as you return IOClosed, it will release the resource.

Sinks

A sink consumes a stream of data, and produces a result. A sink must always produce a result, and must always produce a single result. This is encoded in the types themselves.

There is a Monad instance for sink, making it simple to compose multiple sinks together into a larger sink. You can also use the built-in sink functions to perform most of your work. Like sources, you’ll rarely need to dive into the inner workings. Let’s start off with an example: getting lines from a stream of Chars (we’ll assume Unix line endings for simplicity).

  1. import Data.Conduit
  2. import qualified Data.Conduit.List as CL
  3. -- Get a single line from the stream.
  4. sinkLine :: Resource m => Sink Char m String
  5. sinkLine = sinkState
  6. id -- initial state, nothing at the beginning of the line
  7. push
  8. close
  9. where
  10. -- On a new line, return the contents up until here
  11. push front '\n' =
  12. return $ StateDone Nothing $ front []
  13. -- Just another character, add it to the front and keep going
  14. push front char =
  15. return $ StateProcessing $ front . (char:)
  16. -- Got an EOF before hitting a newline, just give what we have so far
  17. close front = return $ front []
  18. -- Get all the lines from the stream, until we hit a blank line or EOF.
  19. sinkLines :: Resource m => Sink Char m [String]
  20. sinkLines = do
  21. line <- sinkLine
  22. if null line
  23. then return []
  24. else do
  25. lines <- sinkLines
  26. return $ line : lines
  27. content :: String
  28. content = unlines
  29. [ "This is the first line."
  30. , "Here's the second."
  31. , ""
  32. , "After the blank."
  33. ]
  34. main :: IO ()
  35. main = do
  36. lines <- runResourceT $ CL.sourceList content $$ sinkLines
  37. mapM_ putStrLn lines

Running this sample produces the expected output:

  1. This is the first line.
  2. Here's the second.

sinkLine demonstrates usage of the sinkState function, which is very similar to the sourceState function we just saw. It takes three arguments: an initial state, a push function (takes the current state and next input, and returns a new state and result) and a close function (takes the current state and returns an output). As opposed to sourceState- which doesn’t need a close function- a sink is required to always return a result.

Our push function has two clauses. When it gets a newline character, it indicates that processing is complete via StateDone. The Nothing indicates that there is no leftover input (we’ll discuss that later). It also gives an output of all the characters it has received. The second clause simply appends the new character to the existing state and indicates that we are still working via StateProcessing. The close function returns all characters.

sinkLines shows how we can use the monadic interface to produce new sinks. If you replace sinkLine with getLine, this would look like standard code to pull lines from standard input. This familiar interface should make it easy to get up and running quickly.

Types

The types for sinks are just a bit more involved than sources. Let’s have a look:

  1. type SinkPush input m output = input -> ResourceT m (SinkResult input m output)
  2. type SinkClose m output = ResourceT m output
  3. data SinkResult input m output =
  4. Processing (SinkPush input m output) (SinkClose m output)
  5. | Done (Maybe input) output
  6. data Sink input m output =
  7. SinkNoData output
  8. | SinkData
  9. { sinkPush :: SinkPush input m output
  10. , sinkClose :: SinkClose m output
  11. }
  12. | SinkLift (ResourceT m (Sink input m output))

Whenever a sink is pushed to, it can either say it needs more data (Processing) or say it’s all done. When still processing, it must provided updated push and close function; when done, it returns any leftover inut and the output. Fairly straight-forward.

The first real “gotcha” is the three constructors for Sink. Why do we need SinkNoData: aren’t sinks all about consuming data? The answer is that we need it to efficiently implement our Monad instance. When we use return, we’re giving back a value that requires no data in order to compute it. We could model this with the SinkData constructor, with something like:

  1. myReturn a = SinkData (\input -> return (Done (Just input) a)) (return a)

But doing so would force reading in an extra bit of input that we don’t need right now, and possibly will never need. (Have a look again at the sinkLines example.) So instead, we have an extra constructor to indicate that no input is required. Likewise, SinkLift is provided in order to implement an efficient MonadTrans instance.

Sinks: no helpers

Let’s try to implement some sinks on the “bare metal”, without any helper functions.

  1. import Data.Conduit
  2. import System.IO
  3. import Control.Monad.Trans.Resource
  4. import Control.Monad.IO.Class (liftIO)
  5. -- Consume all input and discard it.
  6. sinkNull :: Resource m => Sink a m ()
  7. sinkNull =
  8. SinkData push close
  9. where
  10. push _ignored = return $ Processing push close
  11. close = return ()
  12. -- Let's stream characters to a file. Here we do need some kind of
  13. -- initialization. We do this by initializing in a push function,
  14. -- and then returning a different push function for subsequent
  15. -- calls. By using withIO, we know that the handle will be closed even
  16. -- if there's an exception.
  17. sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
  18. sinkFile fp =
  19. SinkData pushInit closeInit
  20. where
  21. pushInit char = do
  22. (releaseKey, handle) <- withIO (openFile fp WriteMode) hClose
  23. push releaseKey handle char
  24. closeInit = do
  25. -- Never opened a file, so nothing to do here
  26. return ()
  27. push releaseKey handle char = do
  28. liftIO $ hPutChar handle char
  29. return $ Processing (push releaseKey handle) (close releaseKey handle)
  30. close releaseKey _ = do
  31. -- Close the file handle as soon as possible.
  32. return ()
  33. -- And we'll count how many values were in the stream.
  34. count :: Resource m => Sink a m Int
  35. count =
  36. SinkData (push 0) (close 0)
  37. where
  38. push count _ignored =
  39. return $ Processing (push count') (close count')
  40. where
  41. count' = count + 1
  42. close count = return count

Nothing is particularly complicated to implement. You should notice a common pattern here: declaring your push and close functions in a where clause, and then using them twice: once for the initial SinkData, and once for the Processing constructor. This can become a bit tedious; that’s why we have helper functions.

Sinks: with helpers

Let’s rewrite sinkFile and count to take advantage of the helper functions sinkIO and sinkState, respectively.

  1. import Data.Conduit
  2. import System.IO
  3. import Control.Monad.IO.Class (liftIO)
  4. -- We never have to touch the release key directly, sinkIO automatically
  5. -- releases our resource as soon as we return IODone from our push function,
  6. -- or sinkClose is called.
  7. sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
  8. sinkFile fp = sinkIO
  9. (openFile fp WriteMode)
  10. hClose
  11. -- push: notice that we are given the handle and the input
  12. (\handle char -> do
  13. liftIO $ hPutChar handle char
  14. return IOProcessing)
  15. -- close: we're also given the handle, but we don't use it
  16. (\_handle -> return ())
  17. -- And we'll count how many values were in the stream.
  18. count :: Resource m => Sink a m Int
  19. count = sinkState
  20. 0
  21. -- The push function gets both the current state and the next input...
  22. (\state _ignored ->
  23. -- and it returns the new state
  24. return $ StateProcessing $ state + 1)
  25. -- The close function gets the final state and returns the output.
  26. (\state -> return state)

Nothing dramatic, just slightly shorter, less error-prone code. Using these two helper functions is highly recommended, as it ensures proper resource management and state updating.

List functions

As easy as it is to write your own sinks, you’ll likely want to take advantage of the built-in sinks available in the Data.Conduit.List module. These provide analogues to common list functions, like folding. (The module also has some Conduits, like map.)

If you’re looking for some way to practice with conduits, reimplementing the functions in the List module- both with and without the helper functions- would be a good start.

Let’s look at some simple things we can make out of the built-in sinks.

  1. import Data.Conduit
  2. import qualified Data.Conduit.List as CL
  3. import Control.Monad.IO.Class (liftIO)
  4. -- A sum function.
  5. sum' :: Resource m => Sink Int m Int
  6. sum' = CL.fold (+) 0
  7. -- Print every input value to standard output.
  8. printer :: (Show a, ResourceIO m) => Sink a m ()
  9. printer = CL.mapM_ (liftIO . print)
  10. -- Sum up all the values in a stream after the first five.
  11. sumSkipFive :: Resource m => Sink Int m Int
  12. sumSkipFive = do
  13. CL.drop 5
  14. CL.fold (+) 0
  15. -- Print each input number and sum the total
  16. printSum :: ResourceIO m => Sink Int m Int
  17. printSum = do
  18. total <- CL.foldM go 0
  19. liftIO $ putStrLn $ "Sum: " ++ show total
  20. return total
  21. where
  22. go accum int = do
  23. liftIO $ putStrLn $ "New input: " ++ show int
  24. return $ accum + int

Connecting

At the end of the day, we’re actually going to want to use our sinks. While we could manually call sinkPush and sinkClose, it’s tedious. For example:

  1. main :: IO ()
  2. main = runResourceT $ do
  3. res <-
  4. case printSum of
  5. SinkData push close -> loop [1..10] push close
  6. SinkNoData res -> return res
  7. liftIO $ putStrLn $ "Got a result: " ++ show res
  8. where
  9. loop [] _push close = close
  10. loop (x:xs) push close = do
  11. mres <- push x
  12. case mres of
  13. Done _leftover res -> return res
  14. Processing push' close' -> loop xs push' close'

Instead, the recommended approach is to connect your sink to a source. Not only is this simpler, it’s less error prone, and means you have a lot of flexibility in where your data is coming from. To rewrite the example above:

  1. main :: IO ()
  2. main = runResourceT $ do
  3. res <- CL.sourceList [1..10] $$ printSum
  4. liftIO $ putStrLn $ "Got a result: " ++ show res

Connecting takes care of testing for the sink constructor (SinkData versus SinkNoData versus SinkLift), pulling from the source, and pushing to/closing the sink.

However, there is one thing I wanted to point out from the long-winded example. On the second to last line, we ignore the leftover value of Done. This brings up the issue of data loss. This is an important topic that has had a lot of thought put into it. Unfortunately, we can’t fully cover it yet, as we haven’t discussed the main culprit in the drama: Conduits (the type, not the package).

But as a quick note here, the leftover value from the Done constructor is not always ignored. The Monad instance, for example, uses it to pass data from one sink to the next in a binding. And in fact, the real connect operator doesn’t always throw away the leftovers. When we cover resumable sources later, we’ll see that the leftover value is put back on the buffer to allow later sinks reusing an existing source to pull the value.

Conduit

This section covers the final major datatype in our package, conduits. While sources produce a stream of data and sinks consume a stream, conduits transform a stream.

Types

As we did previously, let’s start off by looking at the types involved.

  1. data ConduitResult input m output =
  2. Producing (Conduit input m output) [output]
  3. | Finished (Maybe input) [output]
  4. data Conduit input m output = Conduit
  5. { conduitPush :: input -> ResourceT m (ConduitResult input m output)
  6. , conduitClose :: ResourceT m [output]
  7. }

This should look very similar to what we’ve seen with sinks. A conduit can be pushed to, in which case it returns a result. A result either indicates that it is still producing data, or that it is finished. When a conduit is closed, it returns some more output.

But let’s examine the idiosyncracies a bit. Like sinks, we can only push one piece of input at a time, and leftover data may be 0 or 1 pieces. However, there are a few changes:

  • When producing (the equivalent of processing for a sink), we can return output. This is because a conduit will product a new stream of output instead of producing a single output value at the end of processing.

  • A sink always returns a single output value, while a conduit returns 0 or more outputs (a list). To understand why, consider conduits such as concatMap (produces multiple outputs for one input) and filter (returns 0 or 1 output for each input).

  • We have no special constructor like SinkNoData. That’s because we provide no Monad instance for conduits. We’ll see later how you can still use a familiar Monadic approach to creating conduits.

Overall conduits should seem very similar to what we’ve covered so far.

Simple conduits

We’ll start off by defining some simple conduits that don’t have any state.

  1. import Prelude hiding (map, concatMap)
  2. import Data.Conduit
  3. -- A simple conduit that just passes on the data as-is.
  4. passThrough :: Monad m => Conduit input m input
  5. passThrough = Conduit
  6. { conduitPush = \input -> return $ Producing passThrough [input]
  7. , conduitClose = return []
  8. }
  9. -- map values in a stream
  10. map :: Monad m => (input -> output) -> Conduit input m output
  11. map f = Conduit
  12. { conduitPush = \input -> return $ Producing (map f) [f input]
  13. , conduitClose = return []
  14. }
  15. -- map and concatenate
  16. concatMap :: Monad m => (input -> [output]) -> Conduit input m output
  17. concatMap f = Conduit
  18. { conduitPush = \input -> return $ Producing (concatMap f) $ f input
  19. , conduitClose = return []
  20. }

Stateful conduits

Of course, not all conduits can be declared without state. Doing so on the bare metal is not too difficult.

  1. import Prelude hiding (reverse)
  2. import qualified Data.List
  3. import Data.Conduit
  4. import Control.Monad.Trans.Resource
  5. -- Reverse the elements in the stream. Note that this has the same downside as
  6. -- the standard reverse function: you have to read the entire stream into
  7. -- memory before producing any output.
  8. reverse :: Resource m => Conduit input m input
  9. reverse =
  10. mkConduit []
  11. where
  12. mkConduit state = Conduit (push state) (close state)
  13. push state input = return $ Producing (mkConduit $ input : state) []
  14. close state = return state
  15. -- Same thing with sort: it will pull everything into memory
  16. sort :: (Ord input, Resource m) => Conduit input m input
  17. sort =
  18. mkConduit []
  19. where
  20. mkConduit state = Conduit (push state) (close state)
  21. push state input = return $ Producing (mkConduit $ input : state) []
  22. close state = return $ Data.List.sort state

But we can do better. Just like sourceState and sinkState, we have conduitState to simplify things.

  1. import Prelude hiding (reverse)
  2. import qualified Data.List
  3. import Data.Conduit
  4. -- Reverse the elements in the stream. Note that this has the same downside as
  5. -- the standard reverse function: you have to read the entire stream into
  6. -- memory before producing any output.
  7. reverse :: Resource m => Conduit input m input
  8. reverse =
  9. conduitState [] push close
  10. where
  11. push state input = return $ StateProducing (input : state) []
  12. close state = return state
  13. -- Same thing with sort: it will pull everything into memory
  14. sort :: (Ord input, Resource m) => Conduit input m input
  15. sort =
  16. conduitState [] push close
  17. where
  18. push state input = return $ StateProducing (input : state) []
  19. close state = return $ Data.List.sort state

Using conduits

The way Conduits interact with the rest of the package is via fusing. A conduit can be fused into a source, producing a new source, fused into a sink to produce a new sink, or fused with another conduit to produce a new conduit. It’s best to just look at the fusion operators.

  1. -- Left fusion: source + conduit = source
  2. ($=) :: (Resource m, IsSource src) => src m a -> Conduit a m b -> Source m b
  3. -- Right fusion: conduit + sink = sink
  4. (=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c
  5. -- Middle fusion: conduit + conduit = conduit
  6. (=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c

Using these operators is straightforward.

  1. useConduits = do
  2. runResourceT
  3. $ CL.sourceList [1..10]
  4. $= reverse
  5. $= CL.map show
  6. $$ CL.consume
  7. -- equivalent to
  8. runResourceT
  9. $ CL.sourceList [1..10]
  10. $$ reverse
  11. =$ CL.map show
  12. =$ CL.consume
  13. -- and equivalent to
  14. runResourceT
  15. $ CL.sourceList [1..10]
  16. $$ (reverse =$= CL.map show)
  17. =$ CL.consume

There is in fact one last way of expressing the same idea. I’ll leave it as an exercise to the reader to discover it.

It may seem like all these different approaches are redundant. While occassionally you can in fact choose whichever approach you feel like using, in many cases you will need a specific approach. For example:

  • If you have a stream of numbers, and you want to apply a conduit (e.g., map show) to only some of the stream that will be passed to a specific sink, you’ll want to use the right fusion operator.

  • If you’re reading a file, and want to parse the entire file as textual data, you’ll want to use left fusion to convert the entire stream.

  • If you want to create reusable conduits that combine together individual, smaller conduits, you’ll use middle fusion.

Data loss

Let’s forget about conduits for a moment. Instead, suppose we want to write a program- using plain old lists- that will take a list of numbers, apply some kind of transformation to them, take the first five transformed values and do something with them, and then do something else with the remaining non-transformed values. For example, we want something like:

  1. main = do
  2. let list = [1..10]
  3. transformed = map show list
  4. (begin, end) = splitAt 5 transformed
  5. untransformed = map read end
  6. mapM_ putStrLn begin
  7. print $ sum untransformed

But clearly this isn’t a good general solution, since we don’t want to have to transform and then untransform every element in the list. For one thing, we may not always have an inverse function. Another issue is efficiency. In this case, we can write something more efficient:

  1. main = do
  2. let list = [1..10]
  3. (begin, end) = splitAt 5 list
  4. transformed = map show begin
  5. mapM_ putStrLn transformed
  6. print $ sum end

Note the change: we perform our split before transforming any elements. This works because, with map, we have a 1-to-1 correspondence between the input and output elements. So splitting at 5 before or after mapping show is the same thing. But what happens if we replace map show with something more devious.

  1. deviousTransform =
  2. concatMap go
  3. where
  4. go 1 = [show 1]
  5. go 2 = [show 2, "two"]
  6. go 3 = replicate 5 "three"
  7. go x = [show x]

We no longer have the 1-to-1 correspondence. As a result, we can’t use the second method. But it’s even worse: we can’t use the first method either, since there’s no inverse of our deviousTransform.

There’s only one solution to the problem that I’m aware of: transform elements one at a time. The final program looks like this:

  1. deviousTransform 1 = [show 1]
  2. deviousTransform 2 = [show 2, "two"]
  3. deviousTransform 3 = replicate 5 "three"
  4. deviousTransform x = [show x]
  5. transform5 :: [Int] -> ([String], [Int])
  6. transform5 list =
  7. go [] list
  8. where
  9. go output (x:xs)
  10. | newLen >= 5 = (take 5 output', xs)
  11. | otherwise = go output' xs
  12. where
  13. output' = output ++ deviousTransform x
  14. newLen = length output'
  15. -- Degenerate case: not enough input to make 5 outputs
  16. go output [] = (output, [])
  17. main = do
  18. let list = [1..10]
  19. (begin, end) = transform5 list
  20. mapM_ putStrLn begin
  21. print $ sum end

The final output of this program is

  1. 1
  2. 2
  3. two
  4. three
  5. three
  6. 49

What’s important to note is that the number 3 is converted into five copies of the word “three”, yet only two of them show up in the output. The rest are discarded in the take 5 call.

This whole exercise is just to demonstrate the issue of data loss in conduits. By forcing conduits to accept only one input at a time, we avoid the issue of transforming too many elements at once. That doesn’t mean we don’t lose any data: if a conduit produces too much output for the receiving sink to handle, some of it may be lost.

To put all this another way: conduits avoid chunking to get away from data loss. This is not an issue unique to conduits. If you look in the implementation of concatMapM for enumerator, you’ll see that it forces elements to be handled one at a time. In conduits, we opted to force the issue at the type level.

SequencedSink

Suppose we want to be able to combine up existing conduits and sinks to produce a new, more powerful conduit. For example, we want to write a conduit that takes a stream of numbers and sums up every five. In other words, for the input [1..50], it should result in the sequence [15,40,65,90,115,140,165,190,215,240]. We can definitely do this with the low-level conduit interface.

  1. sum5Raw :: Resource m => Conduit Int m Int
  2. sum5Raw =
  3. conduitState (0, 0) push close
  4. where
  5. push (total, count) input
  6. | newCount == 5 = return $ StateProducing (0, 0) [newTotal]
  7. | otherwise = return $ StateProducing (newTotal, newCount) []
  8. where
  9. newTotal = total + input
  10. newCount = count + 1
  11. close (total, count)
  12. | count == 0 = return []
  13. | otherwise = return [total]

But this is frustrating, since we already have all the tools we need to do this at a high level! There’s the fold sink for adding up the numbers, and the isolate conduit which will only allow up to a certain number of elements to be passed to a sink. Can’t we combine these somehow?

The answer is a SequencedSink. The idea is to create a normal Sink, except it returns a special output called a SequencedSinkResponse. This value can emit new output, stop processing data, or transfer control to a new conduit. (See the Haddocks for more information.) Then we can turn this into a Conduit using the sequenceSink function. This function also takes some state value that gets passed through to the sink.

So we can rewrite sum5Raw in a much more high-level manner.

  1. sum5 :: Resource m => Conduit Int m Int
  2. sum5 = sequenceSink () $ \() -> do
  3. nextSum <- CL.isolate 5 =$ CL.fold (+) 0
  4. return $ Emit () [nextSum]

All of the () in there are simply the unused state variable being passed around, they can be ignored. Otherwise, we’re doing exactly what we want. We fuse isolate to fold to get the sum of the next five elements from the stream. We then emit that value, and start all over again.

Let’s say we want to modify this slightly. We want to get the first 8 sums, and then pass through the remaining values, multiplied by 2. We can keep track of how many values we’ve returned in our state, and then use the StartConduit constructor to pass control to the multiply-by-2 conduit next.

  1. sum5Pass :: Resource m => Conduit Int m Int
  2. sum5Pass = sequenceSink 0 $ \count -> do
  3. if count == 8
  4. then return $ StartConduit $ CL.map (* 2)
  5. else do
  6. nextSum <- CL.isolate 5 =$ CL.fold (+) 0
  7. return $ Emit (count + 1) [nextSum]

These are obviously very contrived examples, but I hope it makes clear the power and simplicity available from this approach.

Buffering

Buffering is one of the unique features of conduits. With buffering, conduits no longer need to control the flow of your application. In some cases, this can lead to simpler code.

Inversion of Control

Buffering was actually one of the main motivations in the creation of the conduit package. To see its importance, we need to consider the approach we’ve seen so far, which we’ll call inversion of control, or IoC.

Inversion of control can mean different things in different circles. If you object to its usage here, go ahead replace it with some other phrase like “warm, fuzzy thing.” I won’t be offended.

Suppose you want to count how many newline characters there are in a file. In the standard imperative approach, you would do someting like:

  1. Open the file

  2. Pull some data into a buffer

  3. Loop over the values in the buffer, incrementing a counter on each newline character

  4. Return to 2

  5. Close the file

Notice that your code is explicitly calling out to other code and that code is returning control back to your code. You have retained full control of the flow of execution of your program. The conduit approach we’ve seen so far does not work this way. Instead, you would:

  1. Write a sink that counts newlines and adds the result to an accumulator.

  2. Connect the sink to a source

There’s no doubt in my mind that this is an easier approach. You don’t have to worry about opening and closing files or pulling data from the file. Instead, the data you need to process is simply presented to you. This is the advantage of IoC: you can focus on specifically your piece of the code.

We use this IoC approach all over Haskell: for example, instead of readMVar and putMVar, you can use withMVar. Don’t bother with openFile and closeFile, just use withFile and pass in a function that uses the Handle. Even C has a version of this: why malloc and free when you could just alloca?

Actually, that last one is a huge red herring. Of course you can’t just use alloca for everything. alloca only allocates memory locally on the stack, not dynamically on the heap. There’s no way to return your allocated memory outside the current function.

But actually, the same restriction applies to the whole family of with functions: you can never return an allocated resource outside of the “block”. Usually this works out just fine, but we need to recognize that this is a change in how we structure our programs. Often times, with simple examples, this is a minor change. However, in larger settings this can become very difficult to manage, bordering on impossible at times.

A web server

Let’s say we’re going to write a web server. We’re going to use the following low-level operations:

  1. data Socket
  2. recv :: Socket -> Int -> IO ByteString -- returns empty when the socket is closed
  3. sendAll :: Socket -> ByteString -> IO ()

We’re up to the part where we need to implement the function handleConn that handles an individual connection. It will look something like this:

  1. data Request -- request headers, HTTP version, etc
  2. data Response -- status code, response headers, resposne body
  3. type Application = Request -> IO Response
  4. handleConn :: Application -> Socket -> IO ()

What does our handleConn need to do? In broad strokes:

  1. Parse the request line

  2. Parse the request headers

  3. Construct the Request value

  4. Pass Request to the Application and get back a Response

  5. Send the Response over the Socket

We start off by writing steps 1 and 2 manually, without using conduits. We’ll do this very simply and just assume three space-separated strings. We end up with something that looks like:

  1. data RequestLine = RequestLine ByteString ByteString ByteString
  2. parseRequestLine :: Socket -> IO RequestLine
  3. parseRequestLine socket = do
  4. bs <- recv socket 4096
  5. let (method:path:version:ignored) = S8.words bs
  6. return $ RequestLine method path version

There are two issues here: it doesn’t handle the case where there are less than three words in the chunk of data, and it throws away any extra data. We can definitely solve both of these issues manually, but it’s very tedious. It’s much easier to implement this in terms of conduits.

  1. import Data.ByteString (ByteString)
  2. import qualified Data.ByteString as S
  3. import Data.Conduit
  4. import qualified Data.Conduit.Binary as CB
  5. import qualified Data.Conduit.List as CL
  6. data RequestLine = RequestLine ByteString ByteString ByteString
  7. parseRequestLine :: Sink ByteString IO RequestLine
  8. parseRequestLine = do
  9. let space = toEnum $ fromEnum ' '
  10. let getWord = do
  11. CB.dropWhile (== space)
  12. bss <- CB.takeWhile (/= space) =$ CL.consume
  13. return $ S.concat bss
  14. method <- getWord
  15. path <- getWord
  16. version <- getWord
  17. return $ RequestLine method path version

This means that our code will automatically be supplied with more data as it comes in, and any extra data will automatically be buffered in the Source, ready for the next time it’s used. Now we can easily structure our program together, demonstrating the power of the conduits approach:

  1. import Data.ByteString (ByteString)
  2. import Data.Conduit
  3. import Data.Conduit.Network (sourceSocket)
  4. import Control.Monad.IO.Class (liftIO)
  5. import Network.Socket (Socket)
  6. data RequestLine = RequestLine ByteString ByteString ByteString
  7. type Headers = [(ByteString, ByteString)]
  8. data Request = Request RequestLine Headers
  9. data Response = Response
  10. type Application = Request -> IO Response
  11. parseRequestHeaders :: Sink ByteString IO Headers
  12. parseRequestHeaders = undefined
  13. parseRequestLine :: Sink ByteString IO RequestLine
  14. parseRequestLine = undefined
  15. sendResponse :: Socket -> Response -> IO ()
  16. sendResponse = undefined
  17. handleConn :: Application -> Socket -> IO ()
  18. handleConn app socket = do
  19. req <- runResourceT $ sourceSocket socket $$ do
  20. requestLine <- parseRequestLine
  21. headers <- parseRequestHeaders
  22. return $ Request requestLine headers
  23. res <- liftIO $ app req
  24. liftIO $ sendResponse socket res

Whither the request body?

This is all great, until we realize we can’t read the request body. The Application is simply given the Request, and lives in the IO monad. It has no access whatsoever to the incoming stream of data.

There’s an easy fix for this actually: have the Application live in the Sink monad. This is the very approach we took with enumerator-based WAI 0.4. However, there are two problems:

  • People find it confusing. What people expect is that the Request value would have a requestBody value of type Source.

  • This makes certain kinds of usage incredibly difficult. For example, trying to write an HTTP proxy combining WAI and http-enumerator proved to be almost impossible.

This is the downside of inversion of control. Our code wants to be in control. It wants to be given something to pull from, something to push to, and run with it. We need some solution to the problem.

If you think that the situation I described with the proxy isn’t so bad, it’s because I’ve gone easy on the details. We also need to take into account streaming the response body, and the streaming needs to happen on both the client and server side.

The simplest solution would be to just create a new Source and pass that to the Application. Unfortunately, this will cause problems with our buffering. You see, when we connect our source to the parseRequestLine and parseRequestHeaders sinks, it made a call to recv. If the data it received was not enough to cover all of the headers, it would issue another call. When it had enough data, it would stop. However, odds are that it didn’t stop exactly at the end of the headers. It likely consumed a bit of the request body as well.

If we just create a new source and pass that to the request, it will be missing the beginning of the request body. We need some way to pass that buffered data along.

BufferedSource

And so we finally get to introduce the last data type in conduits: BufferedSource. This is an abstract data type, but all it really does is keep a mutable reference to a buffer and an underlying Source. In order to create one of these, you use the bufferSource function.

  1. bufferSource ::Resource m => Source m a -> ResourceT m (BufferedSource m a)

This one little change is what allows us to easily solve our web server dilemna. Instead of connecting a Source to our parsing Sinks, we use a BufferedSource. At the end of each connection, any leftover data is put back on the buffer. For our web server case, we can now create a BufferedSource, use that to read the request line and headers, and then pass that same BufferedSource to the application for reading the request body.

Typeclass

We want to be able to connect a buffered source to a sink, just like we would a regular source. We would also like to be able to fuse it to a conduit. In order to make this convenient, conduit has a typeclass, IsSource. There are instances provided for both Source and BufferedSource. Both the connect ($$) and left-fuse ($=) operators use this typeclass.

There’s one “gotcha” in the BufferedSource instance of this typeclass, so let’s explain it. Suppose we want to write a file copy function, without any buffering. This is a fairly standard usage of conduits:

  1. sourceFile input $$ sinkFile output

When this line is run, both the input and output files are opened, the data is copied, and then both files are closed. Let’s change this example slightly to use buffering:

  1. bsrc <- bufferSource $ sourceFile input
  2. bsrc $$ isolate 50 =$ sinkFile output1
  3. bsrc $$ sinkFile output2

When is the input file opened and closed? The opening occurs on the first line, when buffering the source. And if we follow the normal rules from sources, the file should be closed after the second line. However, if we did that, we couldn’t reuse bsrc for line 3!

So instead, $$ does not close the file. As a result, you can pass a buffered source to as many actions as you want, without concerns that the file handle has been closed out from under you.

If you remember from earlier, the invariant of a source is that it cannot be pulled from after it returns a Closed response. In order to allow you to work more easily with a BufferedSource, this invariant is relaxed. It is the responsibility of the BufferSource implementation to ensure that after the underlying Source is closed, it is never used again.

This presents one caveat: when you’re finished with a buffered source, you should manually call bsourceClose on it. However, as usual, this is merely an optimization, as the source will automatically be closed when runResourceT is called.

Recapping the web server

So what exactly does our web server look like now?

  1. import Data.ByteString (ByteString)
  2. import Data.Conduit
  3. import Data.Conduit.Network (sourceSocket)
  4. import Control.Monad.IO.Class (liftIO)
  5. import Network.Socket (Socket)
  6. data RequestLine = RequestLine ByteString ByteString ByteString
  7. type Headers = [(ByteString, ByteString)]
  8. data Request = Request RequestLine Headers (BufferedSource IO ByteString)
  9. data Response = Response
  10. type Application = Request -> ResourceT IO Response
  11. parseRequestHeaders :: Sink ByteString IO Headers
  12. parseRequestHeaders = undefined
  13. parseRequestLine :: Sink ByteString IO RequestLine
  14. parseRequestLine = undefined
  15. sendResponse :: Socket -> Response -> IO ()
  16. sendResponse = undefined
  17. handleConn :: Application -> Socket -> IO ()
  18. handleConn app socket = runResourceT $ do
  19. bsrc <- bufferSource $ sourceSocket socket
  20. requestLine <- bsrc $$ parseRequestLine
  21. headers <- bsrc $$ parseRequestHeaders
  22. let req = Request requestLine headers bsrc
  23. res <- app req
  24. liftIO $ sendResponse socket res

We’ve made a few minor changes. Firstly, the Application now lives in the ResourceT IO monad. This isn’t strictly necessary, but it’s very convenient: the application can now register cleanup actions that will only take place after the response has been fully sent to the client.

But the major changes are in the handleConn function. We now start off by buffering our source. This buffered source is then used twice in our function, and then passed off to the application.