4.6 实现聊天室:广播器

上一节介绍了聊天室的核心流程,其中多次提到了 Broadcaster,但没有过多涉及到其中的细节。本节我们详细介绍它的实现:广播器,这是聊天室的一个核心模块。

4.6.1 单例模式

Go 不是完全面向对象的语言,只支持部分面向对象的特性。面向对象中的单例模式是一个常见、简单的模式。前文提到,广播器中我们应用了单例模式,这里进行必要的讲解。

4.6.1.1 简介

英文名称:Singleton Pattern,该模式规定一个类只允许有一个实例,而且自行实例化并向整个系统提供这个实例。因此单例模式的要点有:1)只有一个实例;2)必须自行创建;3)必须自行向整个系统提供这个实例。

单例模式主要避免一个全局使用的类频繁地创建与销毁。当你想控制实例的数量,或有时候不允许存在多实例时,单例模式就派上用场了。

为了更好的讲解单例模式,我们先使用 Java 来描述它,之后回到 Go 中来。

image

通过该类图我们可以看出,实现一个单例模式有如下要求:

  • 私有、静态的类实例变量;
  • 构造函数私有化;
  • 静态工厂方法,返回此类的唯一实例;

根据实例化的时机,单例模式一般分成饿汉式和懒汉式。

  • 饿汉式:在定义 instance 时直接实例化,private static Singleton instance = new Singleton();
  • 懒汉式:在 getInstance 方法中进行实例化;

那两者有什么区别或优缺点?饿汉式单例类在自己被加载时就将自己实例化。即便加载器是静态的,饿汉式单例类被加载时仍会将自己实例化。单从资源利用率角度讲,这个比懒汉式单例类稍差些。从速度和反应时间角度讲,则比懒汉式单例类稍好些。然而,懒汉式单例类在实例化时,必须处理好在多个线程同时首次引用此类时的访问限制问题,特别是当单例类作为资源控制器在实例化时必须涉及资源初始化,而资源初始化很有可能耗费时间。这意味着出现多线程同时首次引用此类的几率变得较大。

4.6.1.2 单例模式的 Java 实现

结合上面的讲解,以一个计数器为例,我们看看 Java 中饿汉式的实现:

  1. public class Singleton {
  2. private static final Singleton instance = new Singleton();
  3. private int count = 0;
  4. private Singleton() {}
  5. public static Singleton getInstance() {
  6. return instance;
  7. }
  8. public int Add() int {
  9. this.count++;
  10. return this.count;
  11. }
  12. }

代码很简单,不过多解释。直接看懒汉式的实现:

  1. public class Singleton {
  2. private static Singleton instance = null;
  3. private int count = 0;
  4. private Singleton() {}
  5. public static synchronized Singleton getInstance() {
  6. if (instance == null) {
  7. instance = new Singleton();
  8. }
  9. return instance;
  10. }
  11. public int Add() int {
  12. this.count++;
  13. return this.count;
  14. }
  15. }

主要区别在于 getInstance 的实现,要注意 synchronized ,避免多线程时出现问题。

4.6.1.3 单例模式的 Go 实现

回到 Go 语言,看看 Go 语言如何实现单例。

  1. // 饿汉式单例模式
  2. package singleton
  3. type singleton struct {
  4. count int
  5. }
  6. var Instance = new(singleton)
  7. func (s *singleton) Add() int {
  8. s.count++
  9. return s.count
  10. }

前面说了,Go 只支持部分面向对象的特性,因此看起来有点不太一样:

  • 类(结构体 singleton)本身非公开(小写字母开头,非导出);
  • 没有提供导出的 GetInstance 工厂方法(Go 没有静态方法),而是直接提供包级导出变量 Instance;

这样使用:

  1. c := singleton.Instance.Add()

看看懒汉式单例模式在 Go 中如何实现:

  1. // 懒汉式单例模式
  2. package singleton
  3. import (
  4. "sync"
  5. )
  6. type singleton struct {
  7. count int
  8. }
  9. var (
  10. instance *singleton
  11. mutex sync.Mutex
  12. )
  13. func New() *singleton {
  14. mutex.Lock()
  15. if instance == nil {
  16. instance = new(singleton)
  17. }
  18. mutex.Unlock()
  19. return instance
  20. }
  21. func (s *singleton) Add() int {
  22. s.count++
  23. return s.count
  24. }

代码多了不少:

  • 包级变量变成非导出(instance),注意这里类型应该用指针,因为结构体的默认值不是 nil;
  • 提供了工厂方法,按照 Go 的惯例,我们命名为 New();
  • 多 goroutine 保护,对应 Java 的 synchronized,Go 使用 sync.Mutex;

关于懒汉式有一个“双重检查”,这是 C 语言的一种代码模式。

在上面 New() 函数中,同步化(锁保护)实际上只在 instance 变量第一次被赋值之前才有用。在 instance 变量有了值之后,同步化实际上变成了一个不必要的瓶颈。如果能够有一个方法去掉这个小小的额外开销,不是更加完美吗?因此出现了“双重检查”。看看 Go 如何实现“双重检查”,只看 New() 代码:

  1. func New() *singleton {
  2. if instance == nil { // 第一次检查(①)
  3. // 这里可能有多于一个 goroutine 同时达到(②)
  4. mutex.Lock()
  5. // 这里每个时刻只会有一个 goroutine(③)
  6. if instance == nil { // 第二次检查(④)
  7. instance = new(singleton)
  8. }
  9. mutex.Unlock()
  10. }
  11. return instance
  12. }

有读者可能看不懂上面代码的意思,这里详细解释下。假设 goroutine X 和 Y 作为第一批调用者同时或几乎同时调用 New 函数。

  1. 因为 goroutine X 和 Y 是第一批调用者,因此,当它们进入此函数时,instance 变量是 nil。因此 goroutine X 和 Y 会同时或几乎同时到达位置 ①;
  2. 假设 goroutine X 会先达到位置 ②,并进入 mutex.Lock() 达到位置 ③。这时,由于 mutex.Lock 的同步限制,goroutine Y 无法到达位置 ③,而只能在位置 ② 等候;
  3. goroutine X 执行 instance = new(singleton) 语句,使得 instance 变量得到一个值,即对 singleton 实例的引用。此时,goroutine Y 只能继续在位置 ② 等候;
  4. goroutine X 释放锁,返回 instance,退出 New 函数;
  5. goroutine Y 进入 mutex.Lock(),到达位置 ③,进而到达位置 ④。由于 instance 变量已经不是 nil,因此 goroutine Y 释放锁,返回 instance 所引用的 singleton 实例(也就是 goroutine X 锁创建的 singleton 实例),退出 New 函数;

到这里,goroutine X 和 Y 得到了同一个 singleton 实例。可见上面的 New 函数中,锁仅用来避免多个 goroutine 同时实例化 singleton。

相比前面的版本,双重检查版本,只要 instance 实例化后,锁永远不会执行了,而前面版本每次调用 New 获取实例都需要执行锁。性能很显然,我们可以基准测试来验证:(双重检查版本 New 重命名为 New2)

  1. package singleton_test
  2. import (
  3. "testing"
  4. "github.com/go-programming-tour-book/go-demo/singleton"
  5. )
  6. func BenchmarkNew(b *testing.B) {
  7. for i := 0; i < b.N; i++ {
  8. singleton.New()
  9. }
  10. }
  11. func BenchmarkNew2(b *testing.B) {
  12. for i := 0; i < b.N; i++ {
  13. singleton.New2()
  14. }
  15. }

因为是单例,所以两个基准测试需要分别执行。

New1 的结果:

  1. $ go test -benchmem -bench ^BenchmarkNew$ github.com/go-programming-tour-book/go-demo/singleton
  2. goos: darwin
  3. goarch: amd64
  4. pkg: github.com/go-programming-tour-book/go-demo/singleton
  5. BenchmarkNew-8 80470467 14.0 ns/op 0 B/op 0 allocs/op
  6. PASS
  7. ok github.com/go-programming-tour-book/go-demo/singleton 1.151s

New2 的结果:

  1. $ go test -benchmem -bench ^BenchmarkNew2$ github.com/go-programming-tour-book/go-demo/singleton
  2. goos: darwin
  3. goarch: amd64
  4. pkg: github.com/go-programming-tour-book/go-demo/singleton
  5. BenchmarkNew2-8 658810392 1.80 ns/op 0 B/op 0 allocs/op
  6. PASS
  7. ok github.com/go-programming-tour-book/go-demo/singleton 1.380s

New2 快十几倍。

Go 语言单例模式,推荐一般优先考虑使用饿汉式。

4.6.2 广播器的实现

本章第 6 节我们看过广播器结构的定义:

  1. // broadcaster 广播器
  2. type broadcaster struct {
  3. // 所有聊天室用户
  4. users map[string]*User
  5. // 所有 channel 统一管理,可以避免外部乱用
  6. enteringChannel chan *User
  7. leavingChannel chan *User
  8. messageChannel chan *Message
  9. // 判断该昵称用户是否可进入聊天室(重复与否):true 能,false 不能
  10. checkUserChannel chan string
  11. checkUserCanInChannel chan bool
  12. }

很显然,广播器全局应该只有一个,所以是典型的单例。我们使用饿汉式实现。

  1. var Broadcaster = &broadcaster{
  2. users: make(map[string]*User),
  3. enteringChannel: make(chan *User),
  4. leavingChannel: make(chan *User),
  5. messageChannel: make(chan *Message, MessageQueueLen),
  6. checkUserChannel: make(chan string),
  7. checkUserCanInChannel: make(chan bool),
  8. }

导出的 Broadcaster 代表广播器的唯一实例,通过 logic.Broadcaster 来使用这个单例。

在本章第 4 节时提到了通过如下语句启动广播器:

  1. go logic.Broadcaster.Start()

现在看看 Start 的具体实现:

  1. // logic/broadcast.go
  2. // Start 启动广播器
  3. // 需要在一个新 goroutine 中运行,因为它不会返回
  4. func (b *broadcaster) Start() {
  5. for {
  6. select {
  7. case user := <-b.enteringChannel:
  8. // 新用户进入
  9. b.users[user.NickName] = user
  10. b.sendUserList()
  11. case user := <-b.leavingChannel:
  12. // 用户离开
  13. delete(b.users, user.NickName)
  14. // 避免 goroutine 泄露
  15. user.CloseMessageChannel()
  16. b.sendUserList()
  17. case msg := <-b.messageChannel:
  18. // 给所有在线用户发送消息
  19. for _, user := range b.users {
  20. if user.UID == msg.User.UID {
  21. continue
  22. }
  23. user.MessageChannel <- msg
  24. }
  25. case nickname := <-b.checkUserChannel:
  26. if _, ok := b.users[nickname]; ok {
  27. b.checkUserCanInChannel <- false
  28. } else {
  29. b.checkUserCanInChannel <- true
  30. }
  31. }
  32. }
  33. }

核心关注的知识点:

  • 需要在一个新 goroutine 中进行,因为它不会返回。注意这里并非说,只要不会返回的函数/方法就应该在新的 goroutine 中运行,虽然大部分情况是这样;
  • Go 有一个最佳实践:应该让调用者决定并发(启动新 goroutine),这样它清楚自己在干什么。Start 的设计遵循了这一实践,没有自己内部开启新的 goroutine;
  • for + select 形式,是 Go 中一种较常用的编程模式,可以不断监听各种 channel 的状态,有点类似 Unix 系统的 select 系统调用;
  • 每新开一个 goroutine,你必须知道它什么时候会停止。这一句 user.CloseMessageChannel() 就涉及到 goroutine 的停止,避免泄露;

4.6.2.1 select-case 结构

Go 中有一个专门为 channel 设计的 select-case 分支流程控制语法。 此语法和 switch-case 分支流程控制语法很相似。 比如,select-case 流程控制代码块中也可以有若干 case 分支和最多一个 default 分支。 但是,这两种流程控制也有很多不同点。在一个 select-case 流程控制中:

  • select 关键字和 { 之间不允许存在任何表达式和语句;
  • fallthrough 语句不能使用;
  • 每个 case 关键字后必须跟随一个 channel 接收数据操作或者一个 channel 发送数据操作,所以叫做专门为 channel 设计的;
  • 所有的非阻塞 case 操作中将有一个被随机选择执行(而不是按照从上到下的顺序),然后执行此操作对应的 case 分支代码块;
  • 在所有的 case 操作均阻塞的情况下,如果 default 分支存在,则 default 分支代码块将得到执行; 否则,当前 goroutine 进入阻塞状态;

所以,广播器的 Start 方法中,当所有 case 操作都阻塞时,Start 方法所在的 goroutine 进入阻塞状态。

另外,根据以上规则,一个不含任何分支的 select-case 代码块 select{} 将使当前 goroutine 处于永久阻塞状态,这可以用于一些服务开发中,如果你见到了 select{} 这样的写法不要惊讶了。比如:

  1. func main() {
  2. go func() {
  3. // 该函数不会退出
  4. for {
  5. // 省略代码
  6. }
  7. } ()
  8. select {}
  9. }

这样保证 main goroutine 永远阻塞,让其他 goroutine 运行。但如果除了当前因为 select{} 阻塞的 goroutine 外,没有其他可运行的 goroutine,会导致死锁。因此下面的代码会死锁:

  1. func main() {
  2. select {}
  3. }

运行报错:

fatal error: all goroutines are asleep - deadlock!

4.6.2.2 goroutine 泄露

在 Go 中,goroutine 的创建成本低廉且调度效率高。Go 运行时能很好的支持具有成千上万个 goroutine 的程序运行,数十万个也并不意外。但是,goroutine 在内存占用方面却需要谨慎,内存资源是有限的,因此你不能创建无限的 goroutine。

每当你在程序中使用 go 关键字启动 goroutine 时,你必须知道该 goroutine 将在何时何地退出。如果你不知道答案,那可能会内存泄漏。

我们回过头梳理下聊天室项目有哪些新启动的 goroutine。

1)启动广播器

  1. // 广播消息处理
  2. go logic.Broadcaster.Start()

我们很清楚,该广播器的生命周期是和程序生命周期一致的,因此它不应该结束。

2)负责给用户发送消息的 goroutine

在 WebSocketHandleFunc 函数中:

  1. // 2. 开启给用户发送消息的 goroutine
  2. go user.SendMessage(req.Context())

user.SendMessage 的具体实现是:

  1. func (u *User) SendMessage(ctx context.Context) {
  2. for msg := range u.MessageChannel {
  3. wsjson.Write(ctx, u.conn, msg)
  4. }
  5. }

根据 for-range 用于 channel 的语法,默认情况下,for-range 不会退出。很显然,如果我们不做特殊处理,这里的 goroutine 会一直存在。而实际上,当用户离开聊天室时,它对应连接的写 goroutine 应该终止。这也就是上面 Start 方法中,在用户离开聊天室的 channel 收到消息时,要将用户的 MessageChannel 关闭的原因。MessageChannel 关闭了,for msg := range u.MessageChannel 就会退出循环,goroutine 结束,避免了内存泄露。

3)库开启的 goroutine

在本章第 1 节,我们用 TCP 实现简单聊天室时,每一个用户到来,都会新开启一个 goroutine 服务该用户。在我们的 WebSocket 聊天室中,这个新开启 goroutine 的动作,由库给我们做了(具体是 net/http 库)。也许你不明白为什么是 http 库开启的,这里教大家一个思考思路。

一个程序能够长时间运行而不停止,肯定是程序里有死循环。在本章第 1 节中,我们自己写了一个死循环:

  1. func main() {
  2. listener, err := net.Listen("tcp", ":2020")
  3. if err != nil {
  4. panic(err)
  5. }
  6. go broadcaster()
  7. for {
  8. conn, err := listener.Accept()
  9. if err != nil {
  10. log.Println(err)
  11. continue
  12. }
  13. go handleConn(conn)
  14. }
  15. }

那 WebSocket 版本的聊天室的死循环在哪里呢?回到 cmd/chatroom/main.go :

  1. func main() {
  2. fmt.Printf(banner, addr)
  3. server.RegisterHandle()
  4. log.Fatal(http.ListenAndServe(addr, nil))
  5. }

很显然在不出错时,http.ListenAndServe(addr, nil) 函数调用不会返回。因为 HTTP 协议基于 TCP 协议,因此 http 库中肯定存在类似我们上面实现 tcp 聊天室时的死循环代码。

通过跟踪 http.ListenAndServe -> Server.ListenAndServe,我们找到了如下代码:

  1. func (srv *Server) ListenAndServe() error {
  2. if srv.shuttingDown() {
  3. return ErrServerClosed
  4. }
  5. addr := srv.Addr
  6. if addr == "" {
  7. addr = ":http"
  8. }
  9. ln, err := net.Listen("tcp", addr)
  10. if err != nil {
  11. return err
  12. }
  13. return srv.Serve(ln)
  14. }

这一句 ln, err := net.Listen(“tcp”, addr) 和我们自己实现时一样。接着看 Server.Serve 方法,看到 for 死循环了(只保留关键代码):

  1. func (srv *Server) Serve(l net.Listener) error {
  2. ...
  3. origListener := l
  4. l = &onceCloseListener{Listener: l}
  5. defer l.Close()
  6. ...
  7. for {
  8. rw, e := l.Accept()
  9. if e != nil {
  10. select {
  11. case <-srv.getDoneChan():
  12. return ErrServerClosed
  13. default:
  14. }
  15. ...
  16. tempDelay = 0
  17. c := srv.newConn(rw)
  18. c.setState(c.rwc, StateNew) // before Serve can return
  19. go c.serve(ctx)
  20. }
  21. }

在这个死循环的最后一句:go c.serve(ctx) ,即当有新客户端连接时,开启一个新 goroutine 为其服务。最终,根据我们定义的路由,进入相应的函数进行处理。

那由 net/http 开启的 goroutine 什么时候结束呢?根据上面的分析,该 goroutine 最终会执行到我们定义的路由处理器中。所以,当我们的处理函数返回后,该 goroutine 也就结束了。因此,我们要确保 WebSocketHandleFunc 函数是有可能返回的。通过上一节的分析知道,当用户退出聊天室或其他原因导致连接断开时,User.ReceiveMessage 中的循环都会结束,函数退出。

总结一下容易导致 goroutine 或内存泄露的场景

1)time.After

这是很多人实际遇到过的内存泄露场景。如下代码:

  1. func ProcessMessage(ctx context.Context, in <-chan string) {
  2. for {
  3. select {
  4. case s, ok := <-in:
  5. if !ok {
  6. return
  7. }
  8. // handle `s`
  9. case <-time.After(5 * time.Minute):
  10. // do something
  11. case <-ctx.Done():
  12. return
  13. }
  14. }
  15. }

在标准库 time.After 的文档中有一段说明:

等待持续时间过去,然后在返回的 channel 上发送当前时间。它等效于 NewTimer().C。在计时器触发之前,计时器不会被垃圾收集器回收。

所以,如果还没有到 5 分钟,该函数返回了,计时器就不会被 GC 回收,因此出现了内存泄露。因此大家使用 time.After 时一定要仔细,一般建议不用它,而是使用 time.NewTimer:

  1. func ProcessMessage(ctx context.Context, in <-chan string) {
  2. idleDuration := 5 * time.Minute
  3. idleDelay := time.NewTimer(idleDuration)
  4. // 这句必须的
  5. defer idleDelay.Stop()
  6. for {
  7. idleDelay.Reset(idleDuration)
  8. select {
  9. case s, ok := <-in:
  10. if !ok {
  11. return
  12. }
  13. // handle `s`
  14. case <-idleDelay.C:
  15. // do something
  16. case <-ctx.Done():
  17. return
  18. }
  19. }
  20. }

2)发送到 channel 阻塞导致 goroutine 泄露

假如存在如下的程序:

  1. func process(term string) error {
  2. // 创建一个在 100 ms 内取消的 context
  3. ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
  4. defer cancel()
  5. // 为 goroutine 创建一个传递结果的 channel
  6. ch := make(chan string)
  7. // 启动一个 goroutine 来寻找记录,然后得到结果
  8. // 并将返回值从 channel 中传回
  9. go func() {
  10. ch <- search(term)
  11. }()
  12. select {
  13. case <-ctx.Done():
  14. return errors.New("search canceled")
  15. case result := <-ch:
  16. fmt.Println("Received:", result)
  17. return nil
  18. }
  19. }
  20. // search 模拟成一个查找记录的函数
  21. // 在查找记录时。执行此工作需要 200 ms。
  22. func search(term string) string {
  23. time.Sleep(200 * time.Millisecond)
  24. return "some value"
  25. }

这是一个挺常见的场景:要进行一些耗时操作,因此开启一个 goroutine 进行处理,它的处理结果,通过 channel 回传给原来的 goroutine;同时,这个耗时操作不能太长,因此有了 WithTimeout Context。最后通过 select-case 来监控 ctx.Done 和传递数据的 channel 是否就绪。

如果超时没处理完,ctx.Done 会执行,函数返回,新开启的 goroutine 会因为 channel 中的另一端没有就绪的接收 goroutine 而一直阻塞,导致 goroutine 泄露。

解决这种因为发送到 channel 阻塞导致 goroutine 泄露的简单办法是将 channel 改为有缓冲的 channel,并保证容量充足。比如上面例子,将 ch 改为:ch := make(chan string, 1) 即可。

3)从 channel 接收阻塞导致 goroutine 泄露

我们聊天室可能导致 goroutine 泄露就属于这种情况。

  1. func (u *User) SendMessage(ctx context.Context) {
  2. for msg := range u.MessageChannel {
  3. wsjson.Write(ctx, u.conn, msg)
  4. }
  5. }

for-range 循环直到 MessageChannel 这个 channel 关闭才会结束,因此需要有地方调用 close(u.MessageChannel)。

这种情况的另一种情形是:虽然没有 for-range,但给 channel 发送数据的一方已经不再发送数据了,接收的一方还在等待,这个等待会无限持续下去。唯一能取消它等待的就是 close 这个 channel。

4.6.2.3 广播器和外界的通信

从广播器的结构定义知道,它和其他 goroutine 的通信通过 channel 进行。判断用户是否存在的方式前面讲解了,这里看用户进入、离开和消息的通信。

  1. func(b *broadcaster) UserEntering(u *User) {
  2. b.enteringChannel <- u
  3. }
  4. func(b *broadcaster) UserLeaving(u *User) {
  5. b.leavingChannel <- u
  6. }
  7. func(b *broadcaster) Broadcast(msg *Message) {
  8. b.messageChannel <- msg
  9. }

通过 channel 和其他 goroutine 通信,可以有几种方式,以用户进入聊天室为例。

方式一:

在 broadcast.go 中定义导出的 channel:var EnteringChannel = make(chan *User) 或者还是作为 broadcaster 的字段,但是导出的,各个 goroutine 都可以直接对 EnteringChannel 进行读写。这种方式显然不好,用面向对象说法,封装性不好,容易被乱用。

方式二:

broadcaster 结构和现在不变,通过方法将 enteringChannel 暴露出去:

  1. func (b *broadcaster) EnteringChannel() chan<- *User {
  2. return b.enteringChannel
  3. }

前面讲过单向 channel,该方法的返回值类型:chan<- *User 就是一个单向 channel,它是只写的(only send channel),这限制了外部 goroutine 使用它的方式:只能往 channel 写数据,读取由我自己负责。

使用方式:logic.Broadcaster.EnteringChannel() <- user 。

整体上这种方式没有大问题,只是使用方式有点别扭。

方式三:

这种方式就是我们目前采用的方式,对外完全隐藏 channel,调用方不需要知道有 channel 的存在,只是感觉在做普通的方法调用。channel 的处理由内部自己处理,保证了安全性。这种方式比较优雅。

User 中的 MessageChannel 我们没有采用这种方式,而是使用了方式一,让大家感受一下两种方式的不同。读者可以试着改为方式三的形式。

回到 Start 的循环中,这是在 broadcaster goroutine 中运行的,负责循环接收各个 channel 发送的数据,根据不同的 channel 处理不同的业务逻辑。

4.6.3 小结

本节讲解了单例模式,以及在 Go 中如何实现单例模式。

在讲解广播器的具体实现时引出了一个很重要的知识点:goroutine 泄漏,详细讲解了各种可能泄露的场景,读者在实际项目中一定要注意。

至此,一个 WebSocket 聊天室就实现了,但功能相对比较简单。下节我们会实现聊天室的一些非核心功能。

本图书由 煎鱼©2020 版权所有,所有文章采用知识署名-非商业性使用-禁止演绎 4.0 国际进行许可。

4.6 实现聊天室:广播器 - 图2