Closing the Producer Goroutine from the Consumer

The producer-and-consumer pattern is well used in Go concurrent programming. When the consumer stops, we want to gracefully stop the producer as well.

Problem

When a gRPC server receives a streaming request, it usually calls a function that returns a channel, reads the result from that channel and send the result to the client one by one.

Take the following code for instance: upon receiving a request, the main goroutine Service calls launchJob. launchJob starts a separate goroutine as an anonymous function call and returns a channel. In the anonymous function, items will be sent to channel. And Service on the otherside of the channel will reads from it.

  1. func Service(req *Request, stream *StreamResponse) error {
  2. result := launchJob(req.Content)
  3. for r := range result {
  4. if e := stream.Send(result); e != nil {
  5. // should we signal the running goroutine so it will stop sending?
  6. return e
  7. }
  8. }
  9. }
  10. func launchJob(content string) chan Item {
  11. c := make(chan Item)
  12. go func() {
  13. defer close(c)
  14. acquireScarceResources()
  15. defer releaseScarceResources()
  16. ...
  17. // if stream.Send(result) returns an error and the Service returns, this will be blocked
  18. c <- Item{}
  19. ...
  20. }()
  21. return c
  22. }

There is a major problem in this implementation. As pointed out by the comment, if the Send in Service returns an error, the Service function will return, leaving the anonymous function being blocked on c <- Item{} forever.

This problem is important because the leaking goroutine usually owns scarce system resources such as network connection and memory.

Solution: Pipeline Explicit Cancellation

Inspired by this blog post section Explicit cancellation, we can signal the cancellation via closing on a separate channel. And we can follow the terminology as io.Pipe.

  1. package sql
  2. import (
  3. "errors"
  4. )
  5. var ErrClosedPipe = errors.New("pipe: write on closed pipe")
  6. // pipe follows the design at https://blog.golang.org/pipelines
  7. // - wrCh: chan for piping data
  8. // - done: chan for signaling Close from Reader to Writer
  9. type pipe struct {
  10. wrCh chan interface{}
  11. done chan struct{}
  12. }
  13. // PipeReader reads real data
  14. type PipeReader struct {
  15. p *pipe
  16. }
  17. // PipeWriter writes real data
  18. type PipeWriter struct {
  19. p *pipe
  20. }
  21. // Pipe creates a synchronous in-memory pipe.
  22. //
  23. // It is safe to call Read and Write in parallel with each other or with Close.
  24. // Parallel calls to Read and parallel calls to Write are also safe:
  25. // the individual calls will be gated sequentially.
  26. func Pipe() (*PipeReader, *PipeWriter) {
  27. p := &pipe{
  28. wrCh: make(chan interface{}),
  29. done: make(chan struct{})}
  30. return &PipeReader{p}, &PipeWriter{p}
  31. }
  32. // Close closes the reader; subsequent writes to the
  33. func (r *PipeReader) Close() {
  34. close(r.p.done)
  35. }
  36. // ReadAll returns the data chan. The caller should
  37. // use it as `for r := range pr.ReadAll()`
  38. func (r *PipeReader) ReadAll() chan interface{} {
  39. return r.p.wrCh
  40. }
  41. // Close closes the writer; subsequent ReadAll from the
  42. // read half of the pipe will return a closed channel.
  43. func (w *PipeWriter) Close() {
  44. close(w.p.wrCh)
  45. }
  46. // Write writes the item to the underlying data stream.
  47. // It returns ErrClosedPipe when the data stream is closed.
  48. func (w *PipeWriter) Write(item interface{}) error {
  49. select {
  50. case w.p.wrCh <- item:
  51. return nil
  52. case <-w.p.done:
  53. return ErrClosedPipe
  54. }
  55. }

And the consumer and producer be can implemented as

  1. func Service(req *Request, stream *StreamResponse) error {
  2. pr := launchJob(req.Content)
  3. defer pr.Close()
  4. for r := range pr.ReadAll() {
  5. if e := stream.Send(r); e != nil {
  6. return e
  7. }
  8. }
  9. }
  10. func launchJob(content string) PipeReader {
  11. pr, pw := Pipe()
  12. go func() {
  13. defer pw.Close()
  14. if err := pw.Write(Item{}); err != nil {
  15. return
  16. }
  17. }
  18. return pr
  19. }

Further Reading

  1. Google Form: Channel send timeout
  2. Go by Example: Timeouts
  3. Google I/O 2013 - Advanced Go Concurrency Patterns
  4. Go Concurrency Patterns Talk
  5. Go Concurrency Patterns: Pipelines and cancellation