核心定义
Publisher
type Publisher struct {
*Stream
}
func (p *Publisher) Close() {
if p.Running() {
p.Cancel()
}
}
// Running 发布者是否正在发布
func (p *Publisher) Running() bool {
return p.Stream != nil && p.Err() == nil
}
// Publish 发布者进行发布操作
func (p *Publisher) Publish(streamPath string) bool
发布者定义必须包含Publisher,并且以组合继承的方式引入:
type MyPublisher struct{
Publisher
}
由于Publisher也组合继承了Stream结构,所以也将可以直接调用Stream的所有方法。
- Close函数,显式关闭房间,实际上是调用了Stream的Cancel函数
- Running函数,用来检查发布者是否正在发布。
- Publish函数,用来启动发布操作,传入流路径,和发布者本身
// HLS 发布者
type HLS struct {
TS
HLSInfo
TsHead http.Header //用于提供cookie等特殊身份的http头
SaveContext context.Context //用来保存ts文件到服务器
}
在HLS的定义中,组合继承了TS,在发布HLS的时候,也需要调用TS的Publish函数,以启动相应的逻辑。
func (p *HLS) Publish(streamName string) (result bool) {
if result = p.TS.Publish(streamName); result {
p.Type = "HLS"
p.HLSInfo.TSInfo = &p.TS.TSInfo
collection.Store(streamName, p)
go func(){
p.run(&p.HLSInfo.Video)
collection.Delete(streamName)
}()
if p.HLSInfo.Audio.Req != nil {
go p.run(&p.HLSInfo.Audio)
}
}
return
}
Stream
// Stream 流定义
type Stream struct {
context.Context
*Publisher
StreamInfo //可序列化,供后台查看的数据
Control chan interface{}
Cancel context.CancelFunc
Subscribers map[string]*Subscriber // 订阅者
VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
FirstScreen *Ring //最近的关键帧位置,首屏渲染
AVRing *Ring //数据环
WaitingMutex *sync.RWMutex //用于订阅和等待发布者
UseTimestamp bool //是否采用数据包中的时间戳
}
func (r *Stream) PushAudio(timestamp uint32, payload []byte)
func (r *Stream) PushVideo(timestamp uint32, payload []byte)
- Stream结构体可以用来调用的函数包括:PushAudio、PushVideo,用来把发布者的数据转发到订阅者。
- 调用Stream的Cancel函数可以强制关闭房间。
- Stream的Publisher属性如果nil,表示房间没有发布者,处于等待状态
- 不能直接遍历Subscribers,可能会引起并发冲突。操作Subscribers必须给Stream发送指令。
目前有三种指令,可以传递给Control 通道
// UnSubscribeCmd 取消订阅命令
type UnSubscribeCmd struct {
*Subscriber
}
// SubscribeCmd 订阅房间命令
type SubscribeCmd struct {
*Subscriber
}
// ChangeRoomCmd 切换房间命令
type ChangeRoomCmd struct {
*Subscriber
NewStream *Stream
}
Subscriber
// Subscriber 订阅者实体定义
type Subscriber struct {
context.Context
*Stream
SubscriberInfo
OnData func(*avformat.SendPacket) error
Cancel context.CancelFunc
Sign string
OffsetTime uint32
}
func (s *Subscriber) IsClosed() bool {
return s.Context != nil && s.Err() != nil
}
// Close 关闭订阅者
func (s *Subscriber) Close() {
if s.Cancel != nil {
s.Cancel()
}
}
//Subscribe 开始订阅
func (s *Subscriber) Subscribe(streamPath string) (err error)
订阅者结构体,订阅者不同于发布者,不需要额外定义订阅者结构体去组合继承Subscriber。只需要直接使用Subscriber对象即可。 如何实现自定义输出?就是给Subscriber设置OnData函数。
- IsClosed 用来判断订阅者是否已关闭
- Close 用来关闭订阅者
- Subscribe 用来启动订阅行为,这个函数会阻塞当前协程。