开发发布者插件
所谓发布者,就是提供音视频数据的程序,例如接收来自OBS、ffmpeg的推流的程序。内置插件中,集群功能里面有一个特殊的发布者,它接收来自源服务器的音视频数据,然后在本服务器中广播音视频。以此为例,我们需要提供一个结构体定义来表示特定的发布者:
type Receiver struct {
InputStream
io.Reader
*bufio.Writer
}
其中InputStream 是固定的,必须包含,且必须以组合继承的方式定义。其余的成员则是任意的。发布者的发布动作需要特定条件的触发,例如在集群插件中,当本服务器有订阅者订阅了某个流,而该流并没有发布者的时候就会触发向源服务器拉流的函数:
func PullUpStream(streamPath string) {
addr, err := net.ResolveTCPAddr("tcp", config.Master)
if MayBeError(err) {
return
}
conn, err := net.DialTCP("tcp", nil, addr)
if MayBeError(err) {
return
}
brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
p := &Receiver{
Reader: conn,
Writer: brw.Writer,
}
if p.Publish(streamPath, p) {
p.WriteByte(MSG_SUBSCRIBE)
p.WriteString(streamPath)
p.WriteByte(0)
p.Flush()
for _, v := range p.Subscribers {
p.Auth(v)
}
} else {
return
}
defer p.Cancel()
for {
cmd, err := brw.ReadByte()
if MayBeError(err) {
return
}
switch cmd {
case MSG_AUDIO:
if audio, err := p.readAVPacket(avformat.FLV_TAG_TYPE_AUDIO); err == nil {
p.PushAudio(audio)
}
case MSG_VIDEO:
if video, err := p.readAVPacket(avformat.FLV_TAG_TYPE_VIDEO); err == nil && len(video.Payload) > 2 {
tmp := video.Payload[0] // 第一个字节保存着视频的相关信息.
video.VideoFrameType = tmp >> 4 // 帧类型 4Bit, H264一般为1或者2
p.PushVideo(video)
}
case MSG_AUTH:
cmd, err = brw.ReadByte()
if MayBeError(err) {
return
}
bytes, err := brw.ReadBytes(0)
if MayBeError(err) {
return
}
subId := strings.Split(string(bytes[0:len(bytes)-1]), ",")[0]
if v, ok := p.Subscribers[subId]; ok {
if cmd != 1 {
v.Cancel()
}
}
}
}
}
正在该函数中会向源服务器建立tcp连接,然后发送特定命令表示需要拉流,当我们接收到源服务器的数据的时候,就调用PushVideo和PushAudio函数来广播音视频。
核心逻辑是调用InputStream的Publish以及PushVideo、PushAudio函数