开发发布者插件

所谓发布者,就是提供音视频数据的程序,例如接收来自OBS、ffmpeg的推流的程序。内置插件中,集群功能里面有一个特殊的发布者,它接收来自源服务器的音视频数据,然后在本服务器中广播音视频。以此为例,我们需要提供一个结构体定义来表示特定的发布者:

  1. type Receiver struct {
  2. InputStream
  3. io.Reader
  4. *bufio.Writer
  5. }

其中InputStream 是固定的,必须包含,且必须以组合继承的方式定义。其余的成员则是任意的。发布者的发布动作需要特定条件的触发,例如在集群插件中,当本服务器有订阅者订阅了某个流,而该流并没有发布者的时候就会触发向源服务器拉流的函数:

  1. func PullUpStream(streamPath string) {
  2. addr, err := net.ResolveTCPAddr("tcp", config.Master)
  3. if MayBeError(err) {
  4. return
  5. }
  6. conn, err := net.DialTCP("tcp", nil, addr)
  7. if MayBeError(err) {
  8. return
  9. }
  10. brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
  11. p := &Receiver{
  12. Reader: conn,
  13. Writer: brw.Writer,
  14. }
  15. if p.Publish(streamPath, p) {
  16. p.WriteByte(MSG_SUBSCRIBE)
  17. p.WriteString(streamPath)
  18. p.WriteByte(0)
  19. p.Flush()
  20. for _, v := range p.Subscribers {
  21. p.Auth(v)
  22. }
  23. } else {
  24. return
  25. }
  26. defer p.Cancel()
  27. for {
  28. cmd, err := brw.ReadByte()
  29. if MayBeError(err) {
  30. return
  31. }
  32. switch cmd {
  33. case MSG_AUDIO:
  34. if audio, err := p.readAVPacket(avformat.FLV_TAG_TYPE_AUDIO); err == nil {
  35. p.PushAudio(audio)
  36. }
  37. case MSG_VIDEO:
  38. if video, err := p.readAVPacket(avformat.FLV_TAG_TYPE_VIDEO); err == nil && len(video.Payload) > 2 {
  39. tmp := video.Payload[0] // 第一个字节保存着视频的相关信息.
  40. video.VideoFrameType = tmp >> 4 // 帧类型 4Bit, H264一般为1或者2
  41. p.PushVideo(video)
  42. }
  43. case MSG_AUTH:
  44. cmd, err = brw.ReadByte()
  45. if MayBeError(err) {
  46. return
  47. }
  48. bytes, err := brw.ReadBytes(0)
  49. if MayBeError(err) {
  50. return
  51. }
  52. subId := strings.Split(string(bytes[0:len(bytes)-1]), ",")[0]
  53. if v, ok := p.Subscribers[subId]; ok {
  54. if cmd != 1 {
  55. v.Cancel()
  56. }
  57. }
  58. }
  59. }
  60. }

正在该函数中会向源服务器建立tcp连接,然后发送特定命令表示需要拉流,当我们接收到源服务器的数据的时候,就调用PushVideo和PushAudio函数来广播音视频。

核心逻辑是调用InputStream的Publish以及PushVideo、PushAudio函数