- Conduit
- Conduits in Five Minutes
- Structure of this Chapter
- The Resource monad transformer
- Goals
- How it Works
- Some Type Magic
- Definition of ResourceT
- Other Typeclasses
- Forking
- Convenience Exports
- Source
- State
- sourceState and sourceIO
- Sinks
- Types
- Sinks: no helpers
- Sinks: with helpers
- List functions
- Connecting
- Conduit
- Types
- Simple conduits
- Stateful conduits
- Using conduits
- Data loss
- SequencedSink
- Buffering
- Inversion of Control
- A web server
- Whither the request body?
- BufferedSource
- Typeclass
- Recapping the web server
Conduit
Conduits are a solution to the streaming data problem. Often times, laziness allows us to process large amounts of data without pulling all values into memory. However, doing so in the presence of I/O requires us to use lazy I/O. The main downside to lazy I/O is non-determinism: we have no guarantees of when our resource finalizers will be run. For small application, this may be acceptable, but for a high-load server, we could quickly run out of scarce resources, such as file handles.
Conduits allow us to process large streams of data while still retaining deterministic resource handling. They provide a unified interface for data streams, whether they come from files, sockets, or memory. And when combined with ResourceT
, we can safely allocate resources, knowing that they will always be reclaimed- even in the presence of exceptions.
This appendix covers version 0.2 of the conduit package.
Conduits in Five Minutes
While a good understanding of the lower-level mechanics of conduits is advisable, you can get very far without it. Let’s start off with some high-level examples. Don’t worry if some of the details seem a bit magical right now. We’ll cover everything in the course of this appendix. Let’s start with the terminology, and then some sample code.
Source
A producer of data. The data could be in a file, coming from a socket, or in memory as a list. To access this data, we pull from the source.
Sink
A consumer of data. Basic examples would be a sum function (adding up a stream of numbers fed in), a file sink (which writes all incoming bytes to a file), or a socket. We push data into a sink. When the sink finishes processing (we’ll explain that later), it returns some value.
Conduit
A transformer of data. The simplest example is a map function, though there are many others. Like a sink, we push data into a conduit. But instead of returning a single value at the end, a conduit can return multiple outputs every time it is pushed to.
Fuse
(Thanks to David Mazieres for the term.) A conduit can be fused with a source to produce a new, modified source (the $=
operator). For example, you could have a source that reads bytes from a file, and a conduit that decodes bytes into text. If you fuse them together, you would now have a source that reads text from a file. Likewise, a conduit and a sink can fuse into a new sink (=$
), and two conduits can fuse into a new conduit (=$=
).
Connect
You can connect a source to a sink using the $$
operator. Doing so will pull data from the source and push it to the sink, until either the source or sink signals that they are “done.”
Let’s see some examples of conduit code.
{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit -- the core library
import qualified Data.Conduit.List as CL -- some list-like functions
import qualified Data.Conduit.Binary as CB -- bytes
import qualified Data.Conduit.Text as CT
import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified Data.Text as T
import Control.Monad.ST (runST)
-- Let's start with the basics: connecting a source to a sink. We'll use the
-- built in file functions to implementing efficient, constant-memory,
-- resource-friendly file copying.
--
-- Two things to note: we use $$ to connect our source to our sink, and then
-- use runResourceT.
copyFile :: FilePath -> FilePath -> IO ()
copyFile src dest = runResourceT $ CB.sourceFile src $$ CB.sinkFile dest
-- The Data.Conduit.List module provides a number of helper functions for
-- creating sources, sinks, and conduits. Let's look at a typical fold: summing
-- numbers.
sumSink :: Resource m => Sink Int m Int
sumSink = CL.fold (+) 0
-- If we want to go a little more low-level, we can code our sink with the
-- sinkState function. This function takes three parameters: an initial state,
-- a push function (receive some more data), and a close function.
sumSink2 :: Resource m => Sink Int m Int
sumSink2 = sinkState
0 -- initial value
-- update the state with the new input and
-- indicate that we want more input
(\accum i -> return $ StateProcessing (accum + i))
(\accum -> return accum) -- return the current accum value on close
-- Another common helper function is sourceList. Let's see how we can combine
-- that function with our sumSink to reimplement the built-in sum function.
sum' :: [Int] -> Int
sum' input = runST $ runResourceT $ CL.sourceList input $$ sumSink
-- Since this is Haskell, let's write a source to generate all of the
-- Fibonacci numbers. We'll use sourceState. The state will contain the next
-- two numbers in the sequence. We also need to provide a pull function, which
-- will return the next number and update the state.
fibs :: Resource m => Source m Int
fibs = sourceState
(0, 1) -- initial state
(\(x, y) -> return $ StateOpen (y, x + y) x)
-- Suppose we want to get the sum of the first 10 Fibonacci numbers. We can use
-- the isolate conduit to make sure the sum sink only consumes 10 values.
sumTenFibs :: Int
sumTenFibs =
runST -- runs fine in pure code
$ runResourceT
$ fibs
$= CL.isolate 10 -- fuse the source and conduit into a source
$$ sumSink
-- We can also fuse the conduit into the sink instead, we just swap a few
-- operators.
sumTenFibs2 :: Int
sumTenFibs2 =
runST
$ runResourceT
$ fibs
$$ CL.isolate 10
=$ sumSink
-- Alright, let's make some conduits. Let's turn our numbers into text. Sounds
-- like a job for a map...
intToText :: Int -> Text -- just a helper function
intToText = T.pack . show
textify :: Resource m => Conduit Int m Text
textify = CL.map intToText
-- Like previously, we can use a conduitState helper function. But here, we
-- don't even need state, so we provide a dummy state value.
textify2 :: Resource m => Conduit Int m Text
textify2 = conduitState
()
(\() input -> return $ StateProducing () [intToText input])
(\() -> return [])
-- Let's make the unlines conduit, that puts a newline on the end of each piece
-- of input. We'll just use CL.map; feel free to write it with conduitState as
-- well for practice.
unlines' :: Resource m => Conduit Text m Text
unlines' = CL.map $ \t -> t `T.append` "\n"
-- And let's write a function that prints the first N fibs to a file. We'll
-- use UTF8 encoding.
writeFibs :: Int -> FilePath -> IO ()
writeFibs count dest =
runResourceT
$ fibs
$= CL.isolate count
$= textify
$= unlines'
$= CT.encode CT.utf8
$$ CB.sinkFile dest
-- We used the $= operator to fuse the conduits into the sources, producing a
-- single source. We can also do the opposite: fuse the conduits into the sink. We can even combine the two.
writeFibs2 :: Int -> FilePath -> IO ()
writeFibs2 count dest =
runResourceT
$ fibs
$= CL.isolate count
$= textify
$$ unlines'
=$ CT.encode CT.utf8
=$ CB.sinkFile dest
-- Or we could fuse all those inner conduits into a single conduit...
someIntLines :: ResourceThrow m -- encoding can throw an exception
=> Int
-> Conduit Int m ByteString
someIntLines count =
CL.isolate count
=$= textify
=$= unlines'
=$= CT.encode CT.utf8
-- and then use that conduit
writeFibs3 :: Int -> FilePath -> IO ()
writeFibs3 count dest =
runResourceT
$ fibs
$= someIntLines count
$$ CB.sinkFile dest
main :: IO ()
main = do
putStrLn $ "First ten fibs: " ++ show sumTenFibs
writeFibs 20 "fibs.txt"
copyFile "fibs.txt" "fibs2.txt"
Structure of this Chapter
The remainder of this chapter covers five major topics in conduits:
ResourceT
, the underlying technique that allows us to have guaranteed resource deallocation.Sources, our data producers
Sinks, our data consumers
Conduits, our data transformers
Buffering, which allows us to avoid Inversion of Control
The Resource monad transformer
The Resource transformer (ResourceT
) plays a vital role in proper resource management in the conduit project. It is included within the conduit
package itself. We’ll explaining ResourceT
as its own entity. While some of the design decisions clearly are biased towards conduits, ResourceT
should remain a usable tool in its own right.
Goals
What’s wrong with the following code?
import System.IO
main = do
output <- openFile "output.txt" WriteMode
input <- openFile "input.txt" ReadMode
hGetContents input >>= hPutStr output
hClose input
hClose output
If the file input.txt does not exist, then an exception will be thrown when trying to open it. As a result, hClose output
will never be called, and we’ll have leaked a scarce resource (a file descriptor). In our tiny program, this isn’t a big deal, but clearly we can’t afford such waste in a long running, highly active server process.
Fortunately, solving the problem is easy:
import System.IO
main =
withFile "output.txt" WriteMode $ \output ->
withFile "input.txt" ReadMode $ \input ->
hGetContents input >>= hPutStr output
withFile
makes sure that the Handle
is always closed, even in the presence of exceptions. It also handles asynchronous exceptions. Overall, it’s a great approach to use… when you can use it. While often withFile
is easy to use, sometimes it can require restructuring our programs. And this restructuring can range from mildly tedious to wildly inefficient.
Let’s take enumerators for example. If you look in the documentation, there is an enumFile
function (for reading contents from a file), but no iterFile
(for writing contents to a file). That’s because the flow of control in an iteratee doesn’t allow proper allocation of the Handle. Instead, in order to write to a file, you need to allocate the Handle before entering the Iteratee, e.g.:
import System.IO
import Data.Enumerator
import Data.Enumerator.Binary
main =
withFile "output.txt" WriteMode $ \output ->
run_ $ enumFile "input.txt" $$ iterHandle output
This code works fine, but imagine that, instead of simply piping data directly to the file, there was a huge amount of computation that occurred before we need to use the output handle. We will have allocated a file descriptor long before we needed it, and thereby locked up a scarce resource in our application. Besides this, there are times when we can’t allocate the file before hand, such as when we won’t know which file to open until we’ve read from the input file.
One of the stated goals of conduits is to solve this problem, and it does so via ResourceT
. As a result, the above program can be written in conduit as:
{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import Data.Conduit.Binary
main = runResourceT $ sourceFile "input.txt" $$ sinkFile "output.txt"
How it Works
There are essentially three base functions on ResourceT
, and then a bunch of conveniences thrown on top. The first function is:
register :: IO () -> ResourceT IO ReleaseKey
This function, and the others below, are actually more polymorphic than implied here, allowing other monads besides IO
. In fact, almost any transformer on top of IO
, as well as any ST
stacks, work. We’ll cover the details of that later.
This function registers a piece of code that it asserts must be run. It gives back a ReleaseKey
, which is used by the next function:
release :: ReleaseKey -> ResourceT IO ()
Calling release
on a ReleaseKey
immediately performs the action you previously registered. You may call release
on the same ReleaseKey
as many times as you like; the first time it is called, it unregisters the action. This means you can safely register an action like a memory free, and have no concerns that it will be called twice.
Eventually, we’ll want to exit our special ResourceT
. To do so, we use:
runResourceT :: ResourceT IO a -> IO a
This seemingly innocuous function is where all the magic happens. It runs through all of the registered cleanup actions and performs them. It is fully exception safe, meaning the cleanups will be performed in the presence of both synchronous and asynchronous exceptions. And as mentioned before, calling release
will unregister an action, so there is no concern of double-freeing.
Finally, as a convenience, we provide one more function for the common case of allocating a resource and registering a release action:
with :: IO a -- ^ allocate
-> (a -> IO ()) -- ^ free resource
-> ResourceT IO (ReleaseKey, a)
So, to rework our first buggy example to use ResourceT
, we would write:
import System.IO
import Control.Monad.Trans.Resource
import Control.Monad.Trans.Class (lift)
main = runResourceT $ do
(releaseO, output) <- with (openFile "output.txt" WriteMode) hClose
(releaseI, input) <- with (openFile "input.txt" ReadMode) hClose
lift $ hGetContents input >>= hPutStr output
release releaseI
release releaseO
Now there is no concern of any exceptions preventing the releasing of resources. We could skip the release
calls if we want to, and in an example this small, it would not make any difference. But for larger applications, where we want processing to continue, this ensures that the Handle
s are freed as early as possible, keeping our scarce resource usage to a minimum.
Some Type Magic
As alluded to, there’s a bit more to ResourceT
than simply running in IO
. Let’s cover some of the things we need from this underlying Monad
.
Mutable references to keep track of the registered release actions. You might think we could just use a
StateT
transformer, but then our state wouldn’t survive exceptions.We only want to register actions in the base monad. For example, if we have a
ResourceT (WriterT [Int] IO)
stack, we only want to registerIO
actions. This makes it easy to lift our stacks around (i.e., add an extra transformer to the middle of an existing stack), and avoids confusing issues about the threading of other monadic side-effects.Some way to guarantee an action is performed, even in the presence of exceptions. This boils down to needing a
bracket
-like function.
For the first point, we define a new typeclass to represent monads that have mutable references:
class Monad m => HasRef m where
type Ref m :: * -> *
newRef' :: a -> m (Ref m a)
readRef' :: Ref m a -> m a
writeRef' :: Ref m a -> a -> m ()
modifyRef' :: Ref m a -> (a -> (a, b)) -> m b
mask :: ((forall a. m a -> m a) -> m b) -> m b
mask_ :: m a -> m a
try :: m a -> m (Either SomeException a)
We have an associated type to signify what the reference type should be. (For fans of fundeps, you’ll see in the next section that this has to be an associated type.) Then we provide a number of basic reference operations. Finally, there are some functions to help with exceptions, which are needed to safely implement the functions described in the last section. The instance for IO
is very straight-forward:
instance HasRef IO where
type Ref IO = I.IORef
newRef' = I.newIORef
modifyRef' = I.atomicModifyIORef
readRef' = I.readIORef
writeRef' = I.writeIORef
mask = E.mask
mask_ = E.mask_
try = E.try
However, we have a problem when it comes to implementing the ST
instance: there is no way to deal with exceptions in the ST
monad. As a result, mask
, mask_
and try
are given default implementations that do no exception checking. This gives rise to the first word of warning: operations in the ST monad are not exception safe. You should not be allocating scarce resources in ST when using ResourceT
. You might be wondering why bother with ResourceT
at all then for ST
. The answer is that there is a lot you can do with conduits without allocating scarce resources, and ST
is a great way to do this in a pure way. But more on this later.
Now onto point 2: we need some way to deal with this base monad concept. Again, we use an associated type (again explained in the next section). Our solution looks something like:
class (HasRef (Base m), Monad m) => Resource m where
type Base m :: * -> *
resourceLiftBase :: Base m a -> m a
But we forgot about point 3: some bracket
-like function. So we need one more method in this typeclass:
resourceBracket_ :: Base m a -> Base m b -> m c -> m c
The reason the first two arguments to resourceBracket_
(allocation and cleanup) live in Base m
instead of m
is that, in ResourceT
, all allocation and cleanup lives in the base monad.
So on top of our HasRef
instance for IO
, we now need a Resource
instance as well. This is similarly straight-forward:
instance Resource IO where
type Base IO = IO
resourceLiftBase = id
resourceBracket_ = E.bracket_
We have similar ST
instances, with resourceBracket_
having no exception safety. The final step is dealing with monad transformers. We don’t need to provide a HasRef
instance, but we do need a Resource
instance. The tricky part is providing a valid implementation of resourceBracket_
. For this, we use some functions from monad-control:
instance (MonadTransControl t, Resource m, Monad (t m))
=> Resource (t m) where
type Base (t m) = Base m
resourceLiftBase = lift . resourceLiftBase
resourceBracket_ a b c =
control' $ \run -> resourceBracket_ a b (run c)
where
control' f = liftWith f >>= restoreT . return
For any transformer, its base is the base of its inner monad. Similarly, we lift to the base by lifting to the inner monad and then lifting to the base from there. The tricky part is the implemetnation of resourceBracket_
. I will not go into a detailed explanation, as I would simply make a fool of myself.
Definition of ResourceT
We now have enough information to understand the definition of ResourceT
:
newtype ReleaseKey = ReleaseKey Int
type RefCount = Int
type NextKey = Int
data ReleaseMap base =
ReleaseMap !NextKey !RefCount !(IntMap (base ()))
newtype ResourceT m a =
ResourceT (Ref (Base m) (ReleaseMap (Base m)) -> m a)
We see that ReleaseKey
is simply an Int
. If you skip a few lines down, this will make sense, since we’re using an IntMap
to keep track of the registered actions. We also define two type synonyms: RefCount
and NextKey
. NextKey
keeps track of the most recently assigned value for a key, and is incremented each time register
is called. We’ll touch on RefCount
later.
The ReleaseMap
is three pieces of information: the next key and the reference count, and then the map of all registered actions. Notice that ReleaseMap
takes a type parameter base
, which states which monad release actions must live in.
Finally, a ResourceT
is essentially a ReaderT
that keeps a mutable reference to a ReleaseMap
. The reference type is determined by the base of the monad in question, as is the cleanup monad. This is why we need to use associated types.
The majority of the rest of the code in the Control.Monad.Trans.Resource
module is just providing instances for the ResourceT
type.
Other Typeclasses
There are three other typeclasses provided by the module:
ResourceUnsafeIO
Any monad which can lift IO
actions into it, but that this may be considered unsafe. The prime candidate here is ST
. Care should be taken to only lift actions which do not acquire scarce resources and which don’t “fire the missiles.” In other words, all the normal warnings of unsafeIOToST
apply.
ResourceThrow
For actions that can throw exceptions. This automatically applies to all IO
-based monads. For ST
-based monads, you can use the supplied ExceptionT
transformer to provide exception-throwing capabilities. Some functions in conduit, for example, will require this (e.g., text decoding).
ResourceIO
A convenience class tying together a bunch of other classes, included the two mentioned above. This is purely for convenience; you could achieve the same effect without this type class, you’d just have to do a lot more typing.
Forking
It would seem that forking a thread would be inherently unsafe with ResourceT
, since the parent thread may call runResourceT
while the child thread is still accessing some of the allocated resources. This is indeed true, if you use the normal forkIO
function.
You can’t actually use the standard forkIO
, since it only operates in the IO
monad, but you could use the fork
function from lifted-base. In fact, due to this issue, the regions package does not provide a MonadBaseControl
instance for its transformer (which is very similar to ResourceT
). However, our goal in ResourceT
is not to make it impossible for programmers to mess up, only to make it easier to do the right thing. Therefore, we still provide the instance, even though it could be abused.
In order to solve this, ResourceT
includes reference counting. When you fork a new thread via resourceForkIO
, the RefCount
value of the ReleaseMap
is incremented. Every time runResourceT
is called, the value is decremented. Only when the value hits 0 are all the release actions called.
Convenience Exports
In addition to what’s been listed so far, there are a few extra functions exported (mostly) for convenience.
newRef
,writeRef
, andreadRef
wrap up theHasRef
versions of the functions and allow them to run in anyResourceT
.withIO
is essentially a type-restricted version ofwith
, but working around some of the nastiness with types you would otherwise run into. In general: you’ll want to usewithIO
when writingIO
code.transResourceT
let’s you modify which monad your ResourceT is running in, assuming it keeps the same base.transResourceT :: (Base m ~ Base n)
=> (m a -> n a)
-> ResourceT m a
-> ResourceT n a
transResourceT f (ResourceT mx) = ResourceT (\r -> f (mx r))
Source
I think it’s simplest to understand sources by looking at the types:
data SourceResult m a = Open (Source m a) a | Closed
data Source m a = Source
{ sourcePull :: ResourceT m (SourceResult m a)
, sourceClose :: ResourceT m ()
}
A source has just two operations on it: you can pull data from it, and you can close it (think of closing a file handle). When you pull, you either get some data and the a new Source
(the source is still open), or nothing (the source is closed). Let’s look at some of the simplest sources:
import Prelude hiding (repeat)
import Data.Conduit
-- | Never give any data
eof :: Monad m => Source m a
eof = Source
{ sourcePull = return Closed
, sourceClose = return ()
}
-- | Always give the same value
repeat :: Monad m => a -> Source m a
repeat a = Source
{ sourcePull = return $ Open (repeat a) a
, sourceClose = return ()
}
These sources are very straight-forward, since they always return the same results. Additionally, their close records don’t do anything. You might think that this is a bug: shouldn’t a call to sourcePull
return Closed
after it’s been closed? This isn’t required, since one of the rules of sources is that they can never be reused. In other words:
If a
Source
returnsOpen
, it has provided you with a newSource
which you should use in place of the original one.If it returns
Closed
, then you cannot perform any more operations on it.
Don’t worry too much about the invariant. In practice, you will almost never call sourcePull
or sourceClose
yourself. In fact, you hardly even write them yourself either (that’s what sourceState
and sourceIO
are for). The point is that we can make some assumptions when we implement our sources.
State
There is something similar about the two sources mentioned above: they never change. They always return the same value. In other words, they have no state. For almost all serious sources, we’ll need some kind of state.
The state might actually be defined outside of our program. For example, if we write a source that reads data from a Handle
, we don’t need to manually specify any state, since the Handle
itself already has.
The way we store state in a source is by updating the returned Source
value in the Open
constructor. This is best seen with an example.
import Data.Conduit
import Control.Monad.Trans.Resource
-- | Provide data from the list, one element at a time.
sourceList :: Resource m => [a] -> Source m a
sourceList list = Source
{ sourcePull =
case list of
[] -> return Closed -- no more data
-- This is where we store our state: by return a new
-- Source with the rest of the list
x:xs -> return $ Open (sourceList xs) x
, sourceClose = return ()
}
Each time we pull from the source, it checks the input list. If the list is empty, pulling returns Closed
, which makes sense. If the list is not empty, pulling returns Open
with both the next value in the list, and a new Source
value containing the rest of the input list.
sourceState and sourceIO
In addition to being able to manually create Source
s, we also have a few convenience functions that allow us to create most sources in a more high-level fashion. sourceState
let’s you write code similar to how you would use the State
monad. You provide an initial state, your pull function is provided with the current state, and it returns a new state and a return value. Let’s use this to reimplement sourceList.
import Data.Conduit
import Control.Monad.Trans.Resource
-- | Provide data from the list, one element at a time.
sourceList :: Resource m => [a] -> Source m a
sourceList state0 = sourceState
state0
pull
where
pull [] = return StateClosed
pull (x:xs) = return $ StateOpen xs x
Notice the usage of the StateClosed
and StateOpen
constructors. These are very similar to Closed
and Open
, except that instead of specifying the next Source
to be used, you provide the next state (here, the remainder of the list).
The other common activity is to perform some I/O allocation (like opening a file), registering some cleanup action (closing that file), and having a function for pulling data from that resource. conduit
comes built-in with a sourceFile
function that gives a stream of ByteString
s. Let’s write a wildly inefficient alternative that returns a stream of characters.
import Data.Conduit
import Control.Monad.Trans.Resource
import System.IO
import Control.Monad.IO.Class (liftIO)
sourceFile :: ResourceIO m => FilePath -> Source m Char
sourceFile fp = sourceIO
(openFile fp ReadMode)
hClose
(\h -> liftIO $ do
eof <- hIsEOF h
if eof
then return IOClosed
else fmap IOOpen $ hGetChar h)
Like sourceState
, it uses a variant on the Open
and Closed
constructors. sourceIO
does a number of things for us:
It registers the cleanup function with the
ResourceT
transformer, ensuring it gets called even in the presence of exceptions.It sets up the
sourceClose
record to release the resource immediately.As soon as you return
IOClosed
, it will release the resource.
Sinks
A sink consumes a stream of data, and produces a result. A sink must always produce a result, and must always produce a single result. This is encoded in the types themselves.
There is a Monad
instance for sink, making it simple to compose multiple sinks together into a larger sink. You can also use the built-in sink functions to perform most of your work. Like sources, you’ll rarely need to dive into the inner workings. Let’s start off with an example: getting lines from a stream of Char
s (we’ll assume Unix line endings for simplicity).
import Data.Conduit
import qualified Data.Conduit.List as CL
-- Get a single line from the stream.
sinkLine :: Resource m => Sink Char m String
sinkLine = sinkState
id -- initial state, nothing at the beginning of the line
push
close
where
-- On a new line, return the contents up until here
push front '\n' =
return $ StateDone Nothing $ front []
-- Just another character, add it to the front and keep going
push front char =
return $ StateProcessing $ front . (char:)
-- Got an EOF before hitting a newline, just give what we have so far
close front = return $ front []
-- Get all the lines from the stream, until we hit a blank line or EOF.
sinkLines :: Resource m => Sink Char m [String]
sinkLines = do
line <- sinkLine
if null line
then return []
else do
lines <- sinkLines
return $ line : lines
content :: String
content = unlines
[ "This is the first line."
, "Here's the second."
, ""
, "After the blank."
]
main :: IO ()
main = do
lines <- runResourceT $ CL.sourceList content $$ sinkLines
mapM_ putStrLn lines
Running this sample produces the expected output:
This is the first line.
Here's the second.
sinkLine
demonstrates usage of the sinkState
function, which is very similar to the sourceState
function we just saw. It takes three arguments: an initial state, a push function (takes the current state and next input, and returns a new state and result) and a close function (takes the current state and returns an output). As opposed to sourceState
- which doesn’t need a close function- a sink is required to always return a result.
Our push function has two clauses. When it gets a newline character, it indicates that processing is complete via StateDone
. The Nothing
indicates that there is no leftover input (we’ll discuss that later). It also gives an output of all the characters it has received. The second clause simply appends the new character to the existing state and indicates that we are still working via StateProcessing
. The close function returns all characters.
sinkLines
shows how we can use the monadic interface to produce new sinks. If you replace sinkLine
with getLine
, this would look like standard code to pull lines from standard input. This familiar interface should make it easy to get up and running quickly.
Types
The types for sinks are just a bit more involved than sources. Let’s have a look:
type SinkPush input m output = input -> ResourceT m (SinkResult input m output)
type SinkClose m output = ResourceT m output
data SinkResult input m output =
Processing (SinkPush input m output) (SinkClose m output)
| Done (Maybe input) output
data Sink input m output =
SinkNoData output
| SinkData
{ sinkPush :: SinkPush input m output
, sinkClose :: SinkClose m output
}
| SinkLift (ResourceT m (Sink input m output))
Whenever a sink is pushed to, it can either say it needs more data (Processing
) or say it’s all done. When still processing, it must provided updated push and close function; when done, it returns any leftover inut and the output. Fairly straight-forward.
The first real “gotcha” is the three constructors for Sink
. Why do we need SinkNoData
: aren’t sinks all about consuming data? The answer is that we need it to efficiently implement our Monad
instance. When we use return
, we’re giving back a value that requires no data in order to compute it. We could model this with the SinkData
constructor, with something like:
myReturn a = SinkData (\input -> return (Done (Just input) a)) (return a)
But doing so would force reading in an extra bit of input that we don’t need right now, and possibly will never need. (Have a look again at the sinkLines
example.) So instead, we have an extra constructor to indicate that no input is required. Likewise, SinkLift
is provided in order to implement an efficient MonadTrans
instance.
Sinks: no helpers
Let’s try to implement some sinks on the “bare metal”, without any helper functions.
import Data.Conduit
import System.IO
import Control.Monad.Trans.Resource
import Control.Monad.IO.Class (liftIO)
-- Consume all input and discard it.
sinkNull :: Resource m => Sink a m ()
sinkNull =
SinkData push close
where
push _ignored = return $ Processing push close
close = return ()
-- Let's stream characters to a file. Here we do need some kind of
-- initialization. We do this by initializing in a push function,
-- and then returning a different push function for subsequent
-- calls. By using withIO, we know that the handle will be closed even
-- if there's an exception.
sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
sinkFile fp =
SinkData pushInit closeInit
where
pushInit char = do
(releaseKey, handle) <- withIO (openFile fp WriteMode) hClose
push releaseKey handle char
closeInit = do
-- Never opened a file, so nothing to do here
return ()
push releaseKey handle char = do
liftIO $ hPutChar handle char
return $ Processing (push releaseKey handle) (close releaseKey handle)
close releaseKey _ = do
-- Close the file handle as soon as possible.
return ()
-- And we'll count how many values were in the stream.
count :: Resource m => Sink a m Int
count =
SinkData (push 0) (close 0)
where
push count _ignored =
return $ Processing (push count') (close count')
where
count' = count + 1
close count = return count
Nothing is particularly complicated to implement. You should notice a common pattern here: declaring your push and close functions in a where
clause, and then using them twice: once for the initial SinkData
, and once for the Processing
constructor. This can become a bit tedious; that’s why we have helper functions.
Sinks: with helpers
Let’s rewrite sinkFile
and count
to take advantage of the helper functions sinkIO
and sinkState
, respectively.
import Data.Conduit
import System.IO
import Control.Monad.IO.Class (liftIO)
-- We never have to touch the release key directly, sinkIO automatically
-- releases our resource as soon as we return IODone from our push function,
-- or sinkClose is called.
sinkFile :: ResourceIO m => FilePath -> Sink Char m ()
sinkFile fp = sinkIO
(openFile fp WriteMode)
hClose
-- push: notice that we are given the handle and the input
(\handle char -> do
liftIO $ hPutChar handle char
return IOProcessing)
-- close: we're also given the handle, but we don't use it
(\_handle -> return ())
-- And we'll count how many values were in the stream.
count :: Resource m => Sink a m Int
count = sinkState
0
-- The push function gets both the current state and the next input...
(\state _ignored ->
-- and it returns the new state
return $ StateProcessing $ state + 1)
-- The close function gets the final state and returns the output.
(\state -> return state)
Nothing dramatic, just slightly shorter, less error-prone code. Using these two helper functions is highly recommended, as it ensures proper resource management and state updating.
List functions
As easy as it is to write your own sinks, you’ll likely want to take advantage of the built-in sinks available in the Data.Conduit.List module. These provide analogues to common list functions, like folding. (The module also has some Conduit
s, like map.)
If you’re looking for some way to practice with conduits, reimplementing the functions in the List
module- both with and without the helper functions- would be a good start.
Let’s look at some simple things we can make out of the built-in sinks.
import Data.Conduit
import qualified Data.Conduit.List as CL
import Control.Monad.IO.Class (liftIO)
-- A sum function.
sum' :: Resource m => Sink Int m Int
sum' = CL.fold (+) 0
-- Print every input value to standard output.
printer :: (Show a, ResourceIO m) => Sink a m ()
printer = CL.mapM_ (liftIO . print)
-- Sum up all the values in a stream after the first five.
sumSkipFive :: Resource m => Sink Int m Int
sumSkipFive = do
CL.drop 5
CL.fold (+) 0
-- Print each input number and sum the total
printSum :: ResourceIO m => Sink Int m Int
printSum = do
total <- CL.foldM go 0
liftIO $ putStrLn $ "Sum: " ++ show total
return total
where
go accum int = do
liftIO $ putStrLn $ "New input: " ++ show int
return $ accum + int
Connecting
At the end of the day, we’re actually going to want to use our sinks. While we could manually call sinkPush
and sinkClose
, it’s tedious. For example:
main :: IO ()
main = runResourceT $ do
res <-
case printSum of
SinkData push close -> loop [1..10] push close
SinkNoData res -> return res
liftIO $ putStrLn $ "Got a result: " ++ show res
where
loop [] _push close = close
loop (x:xs) push close = do
mres <- push x
case mres of
Done _leftover res -> return res
Processing push' close' -> loop xs push' close'
Instead, the recommended approach is to connect your sink to a source. Not only is this simpler, it’s less error prone, and means you have a lot of flexibility in where your data is coming from. To rewrite the example above:
main :: IO ()
main = runResourceT $ do
res <- CL.sourceList [1..10] $$ printSum
liftIO $ putStrLn $ "Got a result: " ++ show res
Connecting takes care of testing for the sink constructor (SinkData
versus SinkNoData
versus SinkLift
), pulling from the source, and pushing to/closing the sink.
However, there is one thing I wanted to point out from the long-winded example. On the second to last line, we ignore the leftover value of Done
. This brings up the issue of data loss. This is an important topic that has had a lot of thought put into it. Unfortunately, we can’t fully cover it yet, as we haven’t discussed the main culprit in the drama: Conduit
s (the type, not the package).
But as a quick note here, the leftover value from the Done
constructor is not always ignored. The Monad
instance, for example, uses it to pass data from one sink to the next in a binding. And in fact, the real connect operator doesn’t always throw away the leftovers. When we cover resumable sources later, we’ll see that the leftover value is put back on the buffer to allow later sinks reusing an existing source to pull the value.
Conduit
This section covers the final major datatype in our package, conduits. While sources produce a stream of data and sinks consume a stream, conduits transform a stream.
Types
As we did previously, let’s start off by looking at the types involved.
data ConduitResult input m output =
Producing (Conduit input m output) [output]
| Finished (Maybe input) [output]
data Conduit input m output = Conduit
{ conduitPush :: input -> ResourceT m (ConduitResult input m output)
, conduitClose :: ResourceT m [output]
}
This should look very similar to what we’ve seen with sinks. A conduit can be pushed to, in which case it returns a result. A result either indicates that it is still producing data, or that it is finished. When a conduit is closed, it returns some more output.
But let’s examine the idiosyncracies a bit. Like sinks, we can only push one piece of input at a time, and leftover data may be 0 or 1 pieces. However, there are a few changes:
When producing (the equivalent of processing for a sink), we can return output. This is because a conduit will product a new stream of output instead of producing a single output value at the end of processing.
A sink always returns a single output value, while a conduit returns 0 or more outputs (a list). To understand why, consider conduits such as
concatMap
(produces multiple outputs for one input) andfilter
(returns 0 or 1 output for each input).We have no special constructor like
SinkNoData
. That’s because we provide noMonad
instance for conduits. We’ll see later how you can still use a familiar Monadic approach to creating conduits.
Overall conduits should seem very similar to what we’ve covered so far.
Simple conduits
We’ll start off by defining some simple conduits that don’t have any state.
import Prelude hiding (map, concatMap)
import Data.Conduit
-- A simple conduit that just passes on the data as-is.
passThrough :: Monad m => Conduit input m input
passThrough = Conduit
{ conduitPush = \input -> return $ Producing passThrough [input]
, conduitClose = return []
}
-- map values in a stream
map :: Monad m => (input -> output) -> Conduit input m output
map f = Conduit
{ conduitPush = \input -> return $ Producing (map f) [f input]
, conduitClose = return []
}
-- map and concatenate
concatMap :: Monad m => (input -> [output]) -> Conduit input m output
concatMap f = Conduit
{ conduitPush = \input -> return $ Producing (concatMap f) $ f input
, conduitClose = return []
}
Stateful conduits
Of course, not all conduits can be declared without state. Doing so on the bare metal is not too difficult.
import Prelude hiding (reverse)
import qualified Data.List
import Data.Conduit
import Control.Monad.Trans.Resource
-- Reverse the elements in the stream. Note that this has the same downside as
-- the standard reverse function: you have to read the entire stream into
-- memory before producing any output.
reverse :: Resource m => Conduit input m input
reverse =
mkConduit []
where
mkConduit state = Conduit (push state) (close state)
push state input = return $ Producing (mkConduit $ input : state) []
close state = return state
-- Same thing with sort: it will pull everything into memory
sort :: (Ord input, Resource m) => Conduit input m input
sort =
mkConduit []
where
mkConduit state = Conduit (push state) (close state)
push state input = return $ Producing (mkConduit $ input : state) []
close state = return $ Data.List.sort state
But we can do better. Just like sourceState
and sinkState
, we have conduitState
to simplify things.
import Prelude hiding (reverse)
import qualified Data.List
import Data.Conduit
-- Reverse the elements in the stream. Note that this has the same downside as
-- the standard reverse function: you have to read the entire stream into
-- memory before producing any output.
reverse :: Resource m => Conduit input m input
reverse =
conduitState [] push close
where
push state input = return $ StateProducing (input : state) []
close state = return state
-- Same thing with sort: it will pull everything into memory
sort :: (Ord input, Resource m) => Conduit input m input
sort =
conduitState [] push close
where
push state input = return $ StateProducing (input : state) []
close state = return $ Data.List.sort state
Using conduits
The way Conduit
s interact with the rest of the package is via fusing. A conduit can be fused into a source, producing a new source, fused into a sink to produce a new sink, or fused with another conduit to produce a new conduit. It’s best to just look at the fusion operators.
-- Left fusion: source + conduit = source
($=) :: (Resource m, IsSource src) => src m a -> Conduit a m b -> Source m b
-- Right fusion: conduit + sink = sink
(=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c
-- Middle fusion: conduit + conduit = conduit
(=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c
Using these operators is straightforward.
useConduits = do
runResourceT
$ CL.sourceList [1..10]
$= reverse
$= CL.map show
$$ CL.consume
-- equivalent to
runResourceT
$ CL.sourceList [1..10]
$$ reverse
=$ CL.map show
=$ CL.consume
-- and equivalent to
runResourceT
$ CL.sourceList [1..10]
$$ (reverse =$= CL.map show)
=$ CL.consume
There is in fact one last way of expressing the same idea. I’ll leave it as an exercise to the reader to discover it.
It may seem like all these different approaches are redundant. While occassionally you can in fact choose whichever approach you feel like using, in many cases you will need a specific approach. For example:
If you have a stream of numbers, and you want to apply a conduit (e.g.,
map show
) to only some of the stream that will be passed to a specific sink, you’ll want to use the right fusion operator.If you’re reading a file, and want to parse the entire file as textual data, you’ll want to use left fusion to convert the entire stream.
If you want to create reusable conduits that combine together individual, smaller conduits, you’ll use middle fusion.
Data loss
Let’s forget about conduits for a moment. Instead, suppose we want to write a program- using plain old lists- that will take a list of numbers, apply some kind of transformation to them, take the first five transformed values and do something with them, and then do something else with the remaining non-transformed values. For example, we want something like:
main = do
let list = [1..10]
transformed = map show list
(begin, end) = splitAt 5 transformed
untransformed = map read end
mapM_ putStrLn begin
print $ sum untransformed
But clearly this isn’t a good general solution, since we don’t want to have to transform and then untransform every element in the list. For one thing, we may not always have an inverse function. Another issue is efficiency. In this case, we can write something more efficient:
main = do
let list = [1..10]
(begin, end) = splitAt 5 list
transformed = map show begin
mapM_ putStrLn transformed
print $ sum end
Note the change: we perform our split before transforming any elements. This works because, with map
, we have a 1-to-1 correspondence between the input and output elements. So splitting at 5 before or after mapping show
is the same thing. But what happens if we replace map show
with something more devious.
deviousTransform =
concatMap go
where
go 1 = [show 1]
go 2 = [show 2, "two"]
go 3 = replicate 5 "three"
go x = [show x]
We no longer have the 1-to-1 correspondence. As a result, we can’t use the second method. But it’s even worse: we can’t use the first method either, since there’s no inverse of our deviousTransform
.
There’s only one solution to the problem that I’m aware of: transform elements one at a time. The final program looks like this:
deviousTransform 1 = [show 1]
deviousTransform 2 = [show 2, "two"]
deviousTransform 3 = replicate 5 "three"
deviousTransform x = [show x]
transform5 :: [Int] -> ([String], [Int])
transform5 list =
go [] list
where
go output (x:xs)
| newLen >= 5 = (take 5 output', xs)
| otherwise = go output' xs
where
output' = output ++ deviousTransform x
newLen = length output'
-- Degenerate case: not enough input to make 5 outputs
go output [] = (output, [])
main = do
let list = [1..10]
(begin, end) = transform5 list
mapM_ putStrLn begin
print $ sum end
The final output of this program is
1
2
two
three
three
49
What’s important to note is that the number 3 is converted into five copies of the word “three”, yet only two of them show up in the output. The rest are discarded in the take 5
call.
This whole exercise is just to demonstrate the issue of data loss in conduits. By forcing conduits to accept only one input at a time, we avoid the issue of transforming too many elements at once. That doesn’t mean we don’t lose any data: if a conduit produces too much output for the receiving sink to handle, some of it may be lost.
To put all this another way: conduits avoid chunking to get away from data loss. This is not an issue unique to conduits. If you look in the implementation of concatMapM
for enumerator, you’ll see that it forces elements to be handled one at a time. In conduits, we opted to force the issue at the type level.
SequencedSink
Suppose we want to be able to combine up existing conduits and sinks to produce a new, more powerful conduit. For example, we want to write a conduit that takes a stream of numbers and sums up every five. In other words, for the input [1..50]
, it should result in the sequence [15,40,65,90,115,140,165,190,215,240]
. We can definitely do this with the low-level conduit interface.
sum5Raw :: Resource m => Conduit Int m Int
sum5Raw =
conduitState (0, 0) push close
where
push (total, count) input
| newCount == 5 = return $ StateProducing (0, 0) [newTotal]
| otherwise = return $ StateProducing (newTotal, newCount) []
where
newTotal = total + input
newCount = count + 1
close (total, count)
| count == 0 = return []
| otherwise = return [total]
But this is frustrating, since we already have all the tools we need to do this at a high level! There’s the fold
sink for adding up the numbers, and the isolate
conduit which will only allow up to a certain number of elements to be passed to a sink. Can’t we combine these somehow?
The answer is a SequencedSink
. The idea is to create a normal Sink
, except it returns a special output called a SequencedSinkResponse
. This value can emit new output, stop processing data, or transfer control to a new conduit. (See the Haddocks for more information.) Then we can turn this into a Conduit
using the sequenceSink
function. This function also takes some state value that gets passed through to the sink.
So we can rewrite sum5Raw
in a much more high-level manner.
sum5 :: Resource m => Conduit Int m Int
sum5 = sequenceSink () $ \() -> do
nextSum <- CL.isolate 5 =$ CL.fold (+) 0
return $ Emit () [nextSum]
All of the ()
in there are simply the unused state variable being passed around, they can be ignored. Otherwise, we’re doing exactly what we want. We fuse isolate
to fold
to get the sum of the next five elements from the stream. We then emit that value, and start all over again.
Let’s say we want to modify this slightly. We want to get the first 8 sums, and then pass through the remaining values, multiplied by 2. We can keep track of how many values we’ve returned in our state, and then use the StartConduit
constructor to pass control to the multiply-by-2 conduit next.
sum5Pass :: Resource m => Conduit Int m Int
sum5Pass = sequenceSink 0 $ \count -> do
if count == 8
then return $ StartConduit $ CL.map (* 2)
else do
nextSum <- CL.isolate 5 =$ CL.fold (+) 0
return $ Emit (count + 1) [nextSum]
These are obviously very contrived examples, but I hope it makes clear the power and simplicity available from this approach.
Buffering
Buffering is one of the unique features of conduits. With buffering, conduits no longer need to control the flow of your application. In some cases, this can lead to simpler code.
Inversion of Control
Buffering was actually one of the main motivations in the creation of the conduit
package. To see its importance, we need to consider the approach we’ve seen so far, which we’ll call inversion of control, or IoC.
Inversion of control can mean different things in different circles. If you object to its usage here, go ahead replace it with some other phrase like “warm, fuzzy thing.” I won’t be offended.
Suppose you want to count how many newline characters there are in a file. In the standard imperative approach, you would do someting like:
Open the file
Pull some data into a buffer
Loop over the values in the buffer, incrementing a counter on each newline character
Return to 2
Close the file
Notice that your code is explicitly calling out to other code and that code is returning control back to your code. You have retained full control of the flow of execution of your program. The conduit approach we’ve seen so far does not work this way. Instead, you would:
Write a sink that counts newlines and adds the result to an accumulator.
Connect the sink to a source
There’s no doubt in my mind that this is an easier approach. You don’t have to worry about opening and closing files or pulling data from the file. Instead, the data you need to process is simply presented to you. This is the advantage of IoC: you can focus on specifically your piece of the code.
We use this IoC approach all over Haskell: for example, instead of readMVar
and putMVar
, you can use withMVar
. Don’t bother with openFile
and closeFile
, just use withFile
and pass in a function that uses the Handle
. Even C has a version of this: why malloc
and free
when you could just alloca
?
Actually, that last one is a huge red herring. Of course you can’t just use alloca
for everything. alloca
only allocates memory locally on the stack, not dynamically on the heap. There’s no way to return your allocated memory outside the current function.
But actually, the same restriction applies to the whole family of with
functions: you can never return an allocated resource outside of the “block”. Usually this works out just fine, but we need to recognize that this is a change in how we structure our programs. Often times, with simple examples, this is a minor change. However, in larger settings this can become very difficult to manage, bordering on impossible at times.
A web server
Let’s say we’re going to write a web server. We’re going to use the following low-level operations:
data Socket
recv :: Socket -> Int -> IO ByteString -- returns empty when the socket is closed
sendAll :: Socket -> ByteString -> IO ()
We’re up to the part where we need to implement the function handleConn
that handles an individual connection. It will look something like this:
data Request -- request headers, HTTP version, etc
data Response -- status code, response headers, resposne body
type Application = Request -> IO Response
handleConn :: Application -> Socket -> IO ()
What does our handleConn
need to do? In broad strokes:
Parse the request line
Parse the request headers
Construct the
Request
valuePass
Request
to theApplication
and get back aResponse
Send the
Response
over theSocket
We start off by writing steps 1 and 2 manually, without using conduits. We’ll do this very simply and just assume three space-separated strings. We end up with something that looks like:
data RequestLine = RequestLine ByteString ByteString ByteString
parseRequestLine :: Socket -> IO RequestLine
parseRequestLine socket = do
bs <- recv socket 4096
let (method:path:version:ignored) = S8.words bs
return $ RequestLine method path version
There are two issues here: it doesn’t handle the case where there are less than three words in the chunk of data, and it throws away any extra data. We can definitely solve both of these issues manually, but it’s very tedious. It’s much easier to implement this in terms of conduits.
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
data RequestLine = RequestLine ByteString ByteString ByteString
parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = do
let space = toEnum $ fromEnum ' '
let getWord = do
CB.dropWhile (== space)
bss <- CB.takeWhile (/= space) =$ CL.consume
return $ S.concat bss
method <- getWord
path <- getWord
version <- getWord
return $ RequestLine method path version
This means that our code will automatically be supplied with more data as it comes in, and any extra data will automatically be buffered in the Source
, ready for the next time it’s used. Now we can easily structure our program together, demonstrating the power of the conduits approach:
import Data.ByteString (ByteString)
import Data.Conduit
import Data.Conduit.Network (sourceSocket)
import Control.Monad.IO.Class (liftIO)
import Network.Socket (Socket)
data RequestLine = RequestLine ByteString ByteString ByteString
type Headers = [(ByteString, ByteString)]
data Request = Request RequestLine Headers
data Response = Response
type Application = Request -> IO Response
parseRequestHeaders :: Sink ByteString IO Headers
parseRequestHeaders = undefined
parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = undefined
sendResponse :: Socket -> Response -> IO ()
sendResponse = undefined
handleConn :: Application -> Socket -> IO ()
handleConn app socket = do
req <- runResourceT $ sourceSocket socket $$ do
requestLine <- parseRequestLine
headers <- parseRequestHeaders
return $ Request requestLine headers
res <- liftIO $ app req
liftIO $ sendResponse socket res
Whither the request body?
This is all great, until we realize we can’t read the request body. The Application
is simply given the Request
, and lives in the IO
monad. It has no access whatsoever to the incoming stream of data.
There’s an easy fix for this actually: have the Application
live in the Sink
monad. This is the very approach we took with enumerator-based WAI 0.4. However, there are two problems:
People find it confusing. What people expect is that the
Request
value would have arequestBody
value of typeSource
.This makes certain kinds of usage incredibly difficult. For example, trying to write an HTTP proxy combining WAI and
http-enumerator
proved to be almost impossible.
This is the downside of inversion of control. Our code wants to be in control. It wants to be given something to pull from, something to push to, and run with it. We need some solution to the problem.
If you think that the situation I described with the proxy isn’t so bad, it’s because I’ve gone easy on the details. We also need to take into account streaming the response body, and the streaming needs to happen on both the client and server side.
The simplest solution would be to just create a new Source
and pass that to the Application
. Unfortunately, this will cause problems with our buffering. You see, when we connect our source to the parseRequestLine
and parseRequestHeaders
sinks, it made a call to recv
. If the data it received was not enough to cover all of the headers, it would issue another call. When it had enough data, it would stop. However, odds are that it didn’t stop exactly at the end of the headers. It likely consumed a bit of the request body as well.
If we just create a new source and pass that to the request, it will be missing the beginning of the request body. We need some way to pass that buffered data along.
BufferedSource
And so we finally get to introduce the last data type in conduits: BufferedSource
. This is an abstract data type, but all it really does is keep a mutable reference to a buffer and an underlying Source
. In order to create one of these, you use the bufferSource
function.
bufferSource ::Resource m => Source m a -> ResourceT m (BufferedSource m a)
This one little change is what allows us to easily solve our web server dilemna. Instead of connecting a Source
to our parsing Sink
s, we use a BufferedSource
. At the end of each connection, any leftover data is put back on the buffer. For our web server case, we can now create a BufferedSource
, use that to read the request line and headers, and then pass that same BufferedSource
to the application for reading the request body.
Typeclass
We want to be able to connect a buffered source to a sink, just like we would a regular source. We would also like to be able to fuse it to a conduit. In order to make this convenient, conduit has a typeclass, IsSource
. There are instances provided for both Source
and BufferedSource
. Both the connect ($$
) and left-fuse ($=
) operators use this typeclass.
There’s one “gotcha” in the BufferedSource
instance of this typeclass, so let’s explain it. Suppose we want to write a file copy function, without any buffering. This is a fairly standard usage of conduits:
sourceFile input $$ sinkFile output
When this line is run, both the input and output files are opened, the data is copied, and then both files are closed. Let’s change this example slightly to use buffering:
bsrc <- bufferSource $ sourceFile input
bsrc $$ isolate 50 =$ sinkFile output1
bsrc $$ sinkFile output2
When is the input file opened and closed? The opening occurs on the first line, when buffering the source. And if we follow the normal rules from sources, the file should be closed after the second line. However, if we did that, we couldn’t reuse bsrc
for line 3!
So instead, $$
does not close the file. As a result, you can pass a buffered source to as many actions as you want, without concerns that the file handle has been closed out from under you.
If you remember from earlier, the invariant of a source is that it cannot be pulled from after it returns a Closed
response. In order to allow you to work more easily with a BufferedSource
, this invariant is relaxed. It is the responsibility of the BufferSource
implementation to ensure that after the underlying Source
is closed, it is never used again.
This presents one caveat: when you’re finished with a buffered source, you should manually call bsourceClose
on it. However, as usual, this is merely an optimization, as the source will automatically be closed when runResourceT
is called.
Recapping the web server
So what exactly does our web server look like now?
import Data.ByteString (ByteString)
import Data.Conduit
import Data.Conduit.Network (sourceSocket)
import Control.Monad.IO.Class (liftIO)
import Network.Socket (Socket)
data RequestLine = RequestLine ByteString ByteString ByteString
type Headers = [(ByteString, ByteString)]
data Request = Request RequestLine Headers (BufferedSource IO ByteString)
data Response = Response
type Application = Request -> ResourceT IO Response
parseRequestHeaders :: Sink ByteString IO Headers
parseRequestHeaders = undefined
parseRequestLine :: Sink ByteString IO RequestLine
parseRequestLine = undefined
sendResponse :: Socket -> Response -> IO ()
sendResponse = undefined
handleConn :: Application -> Socket -> IO ()
handleConn app socket = runResourceT $ do
bsrc <- bufferSource $ sourceSocket socket
requestLine <- bsrc $$ parseRequestLine
headers <- bsrc $$ parseRequestHeaders
let req = Request requestLine headers bsrc
res <- app req
liftIO $ sendResponse socket res
We’ve made a few minor changes. Firstly, the Application
now lives in the ResourceT IO
monad. This isn’t strictly necessary, but it’s very convenient: the application can now register cleanup actions that will only take place after the response has been fully sent to the client.
But the major changes are in the handleConn
function. We now start off by buffering our source. This buffered source is then used twice in our function, and then passed off to the application.