核心定义

Publisher

  1. type Publisher struct {
  2. *Stream
  3. }
  4. func (p *Publisher) Close() {
  5. if p.Running() {
  6. p.Cancel()
  7. }
  8. }
  9. // Running 发布者是否正在发布
  10. func (p *Publisher) Running() bool {
  11. return p.Stream != nil && p.Err() == nil
  12. }
  13. // Publish 发布者进行发布操作
  14. func (p *Publisher) Publish(streamPath string) bool

发布者定义必须包含Publisher,并且以组合继承的方式引入:

  1. type MyPublisher struct{
  2. Publisher
  3. }

由于Publisher也组合继承了Stream结构,所以也将可以直接调用Stream的所有方法。

  • Close函数,显式关闭房间,实际上是调用了Stream的Cancel函数
  • Running函数,用来检查发布者是否正在发布。
  • Publish函数,用来启动发布操作,传入流路径,和发布者本身
  1. // HLS 发布者
  2. type HLS struct {
  3. TS
  4. HLSInfo
  5. TsHead http.Header //用于提供cookie等特殊身份的http头
  6. SaveContext context.Context //用来保存ts文件到服务器
  7. }

在HLS的定义中,组合继承了TS,在发布HLS的时候,也需要调用TS的Publish函数,以启动相应的逻辑。

  1. func (p *HLS) Publish(streamName string) (result bool) {
  2. if result = p.TS.Publish(streamName); result {
  3. p.Type = "HLS"
  4. p.HLSInfo.TSInfo = &p.TS.TSInfo
  5. collection.Store(streamName, p)
  6. go func(){
  7. p.run(&p.HLSInfo.Video)
  8. collection.Delete(streamName)
  9. }()
  10. if p.HLSInfo.Audio.Req != nil {
  11. go p.run(&p.HLSInfo.Audio)
  12. }
  13. }
  14. return
  15. }

Stream

  1. // Stream 流定义
  2. type Stream struct {
  3. context.Context
  4. *Publisher
  5. StreamInfo //可序列化,供后台查看的数据
  6. Control chan interface{}
  7. Cancel context.CancelFunc
  8. Subscribers map[string]*Subscriber // 订阅者
  9. VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
  10. AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
  11. FirstScreen *Ring //最近的关键帧位置,首屏渲染
  12. AVRing *Ring //数据环
  13. WaitingMutex *sync.RWMutex //用于订阅和等待发布者
  14. UseTimestamp bool //是否采用数据包中的时间戳
  15. }
  16. func (r *Stream) PushAudio(timestamp uint32, payload []byte)
  17. func (r *Stream) PushVideo(timestamp uint32, payload []byte)
  • Stream结构体可以用来调用的函数包括:PushAudio、PushVideo,用来把发布者的数据转发到订阅者。
  • 调用Stream的Cancel函数可以强制关闭房间。
  • Stream的Publisher属性如果nil,表示房间没有发布者,处于等待状态
  • 不能直接遍历Subscribers,可能会引起并发冲突。操作Subscribers必须给Stream发送指令。

目前有三种指令,可以传递给Control 通道

  1. // UnSubscribeCmd 取消订阅命令
  2. type UnSubscribeCmd struct {
  3. *Subscriber
  4. }
  5. // SubscribeCmd 订阅房间命令
  6. type SubscribeCmd struct {
  7. *Subscriber
  8. }
  9. // ChangeRoomCmd 切换房间命令
  10. type ChangeRoomCmd struct {
  11. *Subscriber
  12. NewStream *Stream
  13. }

Subscriber

  1. // Subscriber 订阅者实体定义
  2. type Subscriber struct {
  3. context.Context
  4. *Stream
  5. SubscriberInfo
  6. OnData func(*avformat.SendPacket) error
  7. Cancel context.CancelFunc
  8. Sign string
  9. OffsetTime uint32
  10. }
  11. func (s *Subscriber) IsClosed() bool {
  12. return s.Context != nil && s.Err() != nil
  13. }
  14. // Close 关闭订阅者
  15. func (s *Subscriber) Close() {
  16. if s.Cancel != nil {
  17. s.Cancel()
  18. }
  19. }
  20. //Subscribe 开始订阅
  21. func (s *Subscriber) Subscribe(streamPath string) (err error)

订阅者结构体,订阅者不同于发布者,不需要额外定义订阅者结构体去组合继承Subscriber。只需要直接使用Subscriber对象即可。 如何实现自定义输出?就是给Subscriber设置OnData函数。

  • IsClosed 用来判断订阅者是否已关闭
  • Close 用来关闭订阅者
  • Subscribe 用来启动订阅行为,这个函数会阻塞当前协程。