Single process pub-sub
The previous example was admittedly quite simple. Let’s build on that foundation (pun intended) to do something a bit more interesting. Suppose we have a workflow on our site like the following:
Enter some information on page X, and submit.
Submission starts a background job, and the user is redirected to a page to view status of that job.
That second page will subscribe to updates from the background job and display them to the user.
The core principle here is the ability to let one thread publish updates, and have another thread subscribe to receive those updates. This is known generally as pub/sub, and fortunately is very easy to achieve in Haskell via STM.
Like the previous chapter, let me start off with the caveat: this technique only works properly if you have a single web application process. If you have two different servers and a load balancer, you’d either need sticky sessions or some other solution to make sure that the requests from a single user are going to the same machine. In those situations, you may want to consider using an external pubsub solution, such as Redis.
With that caveat out of the way, let’s get started.
Foundation datatype
We’ll need two different mutable references in our foundation. The first will keep track of the next “job id” we’ll hand out. Each of these background jobs we’ll be represented by a unique identifier, which will be used in our URLs. The second piece of data will be a map from the job ID to the broadcast channel used for publishing updates. In code:
data App = App
{ jobs :: TVar (IntMap (TChan (Maybe Text)))
, nextJob :: TVar Int
}
Notice that our TChan
contains Maybe Text
values. The reason for the Maybe
wrapper is so that we can indicate that the channel is complete, by providing a Nothing
value.
Allocating a job
In order to allocate a job, we need to:
Get a job ID.
Create a new broadcast channel.
Add the channel to the channel map.
Due to the beauty of STM, this is pretty easy.
(jobId, chan) <- liftIO $ atomically $ do
jobId <- readTVar nextJob
writeTVar nextJob $! jobId + 1
chan <- newBroadcastTChan
m <- readTVar jobs
writeTVar jobs $ IntMap.insert jobId chan m
return (jobId, chan)
Fork our background job
There are many different ways we could go about this, and they depend entirely on what the background job is going to be. Here’s a minimal example of a background job that prints out a few messages, with a 1 second delay between each message. Note how after our final message, we broadcast a Nothing
value and remove our channel from the map of channels.
liftIO $ forkIO $ do
threadDelay 1000000
atomically $ writeTChan chan $ Just "Did something\n"
threadDelay 1000000
atomically $ writeTChan chan $ Just "Did something else\n"
threadDelay 1000000
atomically $ do
writeTChan chan $ Just "All done\n"
writeTChan chan Nothing
m <- readTVar jobs
writeTVar jobs $ IntMap.delete jobId m
View progress
For this demonstration, I’ve elected for a very simple progress viewing: a plain text page with stream response. There are a few other possibilities here: an HTML page that auto-refreshes every X seconds or using eventsource or websockets. I encourage you to give those a shot also, but here’s the simplest implementation I can think of:
getViewProgressR jobId = do
App {..} <- getYesod
mchan <- liftIO $ atomically $ do
m <- readTVar jobs
case IntMap.lookup jobId m of
Nothing -> return Nothing
Just chan -> fmap Just $ dupTChan chan
case mchan of
Nothing -> notFound
Just chan -> respondSource typePlain $ do
let loop = do
mtext <- liftIO $ atomically $ readTChan chan
case mtext of
Nothing -> return ()
Just text -> do
sendChunkText text
sendFlush
loop
loop
We start off by looking up the channel in the map. If we can’t find it, it means the job either never existed, or has already been completed. In either event, we return a 404. (Another possible enhancement would be to store some information on all previously completed jobs and let the user know if they’re done.)
Assuming the channel exists, we use respondSource
to start a streaming response. We then repeatedly call readTChan
until we get a Nothing
value, at which point we exit (via return ()
). Notice that on each iteration, we call both sendChunkText
and sendFlush
. Without that second call, the user won’t receive any updates until the output buffer completely fills up, which is not what we want for a real-time update system.
Complete application
For completeness, here’s the full source code for this application:
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import Data.Text (Text)
import Yesod
data App = App
{ jobs :: TVar (IntMap (TChan (Maybe Text)))
, nextJob :: TVar Int
}
mkYesod "App" [parseRoutes|
/ HomeR GET POST
/view-progress/#Int ViewProgressR GET
|]
instance Yesod App
getHomeR :: Handler Html
getHomeR = defaultLayout $ do
setTitle "PubSub example"
[whamlet|
<form method=post>
<button>Start new background job
|]
postHomeR :: Handler ()
postHomeR = do
App {..} <- getYesod
(jobId, chan) <- liftIO $ atomically $ do
jobId <- readTVar nextJob
writeTVar nextJob $! jobId + 1
chan <- newBroadcastTChan
m <- readTVar jobs
writeTVar jobs $ IntMap.insert jobId chan m
return (jobId, chan)
liftIO $ forkIO $ do
threadDelay 1000000
atomically $ writeTChan chan $ Just "Did something\n"
threadDelay 1000000
atomically $ writeTChan chan $ Just "Did something else\n"
threadDelay 1000000
atomically $ do
writeTChan chan $ Just "All done\n"
writeTChan chan Nothing
m <- readTVar jobs
writeTVar jobs $ IntMap.delete jobId m
redirect $ ViewProgressR jobId
getViewProgressR :: Int -> Handler TypedContent
getViewProgressR jobId = do
App {..} <- getYesod
mchan <- liftIO $ atomically $ do
m <- readTVar jobs
case IntMap.lookup jobId m of
Nothing -> return Nothing
Just chan -> fmap Just $ dupTChan chan
case mchan of
Nothing -> notFound
Just chan -> respondSource typePlain $ do
let loop = do
mtext <- liftIO $ atomically $ readTChan chan
case mtext of
Nothing -> return ()
Just text -> do
sendChunkText text
sendFlush
loop
loop
main :: IO ()
main = do
jobs <- newTVarIO IntMap.empty
nextJob <- newTVarIO 1
warp 3000 App {..}