插件开发
插件的定义
所谓的插件,没有什么固定的规则,只需要完成安装
操作即可。插件可以实现任意的功能扩展,最常见的是实现某种传输协议用来推流或者拉流
插件的安装(注册)
下面是内置插件jessica的源码,代表了典型的插件安装
package jessica
import (
"io/ioutil"
"log"
"mime"
"net/http"
"path"
"path/filepath"
"strings"
. "github.com/Monibuca/engine/v2"
. "github.com/logrusorgru/aurora"
)
var config = new(ListenerConfig)
var publicPath string
func init() {
plugin := &PluginConfig{
Name: "Jessica",
Type: PLUGIN_SUBSCRIBER,
Config: config,
Run: run,
}
InstallPlugin(plugin)
publicPath = filepath.Join(plugin.Dir, "ui", "public")
}
func run() {
Print(Green("server Jessica start at"), BrightBlue(config.ListenAddr))
http.HandleFunc("/jessibuca/", jessibuca)
log.Fatal(http.ListenAndServe(config.ListenAddr, http.HandlerFunc(WsHandler)))
}
func jessibuca(w http.ResponseWriter, r *http.Request) {
filePath := strings.TrimPrefix(r.URL.Path, "/jessibuca")
if mime := mime.TypeByExtension(path.Ext(filePath)); mime != "" {
w.Header().Set("Content-Type", mime)
}
if f, err := ioutil.ReadFile(filepath.Join(publicPath, filePath)); err == nil {
if _, err = w.Write(f); err != nil {
w.WriteHeader(500)
}
} else {
w.WriteHeader(404)
}
}
源码说明
- init会在go项目启动最开始的时候执行,我们需要在引擎Run之前注册我们的插件。
- 注册插件,是调用引擎提供的InstallPlugin函数,传入插件的关键信息。
- 插件的名称Name必须是唯一的,只需要保证在项目中唯一即可。
- 插件的Config属性是一个自定义的结构体,只需要保证配置文件的解析和这个结构体定义一致即可。
- 当主程序读取配置文件完成解析后,会调用各个插件的Run函数,上面代码中执行了一个http的端口监听
- jessica插件的界面需要读取一些静态资源,所以利用了Gateway的http服务,我们注册了一个路由。所有插件都可以共用Gateway插件的http服务,但要注意的是路由不可以有冲突。当然插件也可以自己创建http服务,启用不同的端口号。
开发插件的UI界面
步骤:
- 创建ui目录用于存放ui的源码和编译出的文件
- 创建ui/src/App.vue作为UI界面的入口
- 编译为vue lib 名称必须为plugin-[组件名小写] 供gateway调用
例如:下面的npm命令将ui/src/App.vue导出名为plugin-jessica 的Vue lib,导出的文件在项目下的ui/dist目录。
"scripts": {
"build": "vue-cli-service build --target lib --name plugin-jessica"
}
组件的Props中可以配置插件配置项用于接收插件的配置信息 例如:
props: {
ListenAddr: String
}
Gateway插件中提供的公共组件均可以使用,例如stream-table,可供展示所有的流信息
<stream-table>
<template v-slot="scope">
<m-button @click="preview(scope)">预览</m-button>
<template>
</stream-table>
该组件有一个默认的作用域槽,可以用来扩展对每一个流的操作
Gateway插件中提供的Vuex对象,所有插件均可访问
export default new Vuex.Store({
state: {
plugins: [],
Address: location.hostname,
NetWork: [],
Streams: [],
Memory: {
Used: 0,
Usage: 0
},
CPUUsage: 0,
HardDisk: {
Used: 0,
Usage: 0
},
Children: {},
engineInfo: {},
},
mutations: {
update(state, payload) {
Object.assign(state, payload)
},
},
actions: {
fetchEngineInfo({ commit }) {
return window.ajax.getJSON(apiHost + "/api/sysInfo").then(engineInfo => commit("update", { engineInfo }))
},
fetchPlugins({ commit }) {
return window.ajax.getJSON(apiHost + "/api/plugins").then(plugins => {
plugins.sort((a, b) => a.Name > b.Name ? 1 : -1)
commit("update", { plugins })
return plugins
})
},
fetchSummary({ commit }) {
summaryES = new EventSource(apiHost + "/api/summary");
summaryES.onmessage = evt => {
if (!evt.data) return;
let summary = JSON.parse(evt.data);
summary.Address = location.hostname;
if (!summary.Streams) summary.Streams = [];
summary.Streams.sort((a, b) =>
a.StreamPath > b.StreamPath ? 1 : -1
);
commit("update", summary)
};
},
},
})
开发无UI的插件
默认就是无UI的插件
开发订阅者插件
所谓订阅者就是用来从流媒体服务器接收音视频流的程序,例如RTMP协议执行play命令后、http-flv请求响应程序、websocket响应程序。内置插件中录制flv程序也是一个特殊的订阅者。 下面是http-flv插件的源码,供参考
package hdl
import (
"log"
"net/http"
"strings"
. "github.com/Monibuca/engine/v2"
"github.com/Monibuca/engine/v2/avformat"
. "github.com/logrusorgru/aurora"
)
var config = new(ListenerConfig)
func init() {
InstallPlugin(&PluginConfig{
Name: "HDL",
Type: PLUGIN_SUBSCRIBER,
Config: config,
Run: run,
})
}
func run() {
Print(Green("HDL start at "), BrightBlue(config.ListenAddr))
log.Fatal(http.ListenAndServe(config.ListenAddr, http.HandlerFunc(HDLHandler)))
}
func HDLHandler(w http.ResponseWriter, r *http.Request) {
sign := r.URL.Query().Get("sign")
if err := AuthHooks.Trigger(sign); err != nil {
w.WriteHeader(403)
return
}
stringPath := strings.TrimLeft(r.RequestURI, "/")
if strings.HasSuffix(stringPath, ".flv") {
stringPath = strings.TrimRight(stringPath, ".flv")
}
if s := FindStream(stringPath); s != nil {
//atomic.AddInt32(&hdlId, 1)
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "video/x-flv")
w.Write(avformat.FLVHeader)
p := Subscriber{
Sign: sign,
OnData: func(packet *avformat.SendPacket) error {
return avformat.WriteFLVTag(w, packet)
},
SubscriberInfo: SubscriberInfo{
ID: r.RemoteAddr, Type: "FLV",
},
}
p.Subscribe(stringPath)
} else {
w.WriteHeader(404)
}
}
其中,核心逻辑就是创建Subscriber对象,每一个订阅者需要提供OnData函数,用来接收来自发布者广播出来的音视频数据。 最后调用该对象的Subscribe函数进行播放。请注意:Subscribe函数会阻塞当前goroutine。
开发发布者插件
所谓发布者,就是提供音视频数据的程序,例如接收来自OBS、ffmpeg的推流的程序。内置插件中,集群功能里面有一个特殊的发布者,它接收来自源服务器的音视频数据,然后在本服务器中广播音视频。 以此为例,我们需要提供一个结构体定义来表示特定的发布者:
type Receiver struct {
Publisher
io.Reader
*bufio.Writer
}
其中Publisher 是固定的,必须包含,且必须以组合继承的方式定义。其余的成员则是任意的。 发布者的发布动作需要特定条件的触发,例如在集群插件中,当本服务器有订阅者订阅了某个流,而该流并没有发布者的时候就会触发向源服务器拉流的函数:
func PullUpStream(streamPath string) {
addr, err := net.ResolveTCPAddr("tcp", config.OriginServer)
if MayBeError(err) {
return
}
conn, err := net.DialTCP("tcp", nil, addr)
if MayBeError(err) {
return
}
brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
p := &Receiver{
Reader: brw.Reader,
Writer: brw.Writer,
}
if p.Publish(streamPath) {
p.Type = "Cluster"
p.WriteByte(MSG_SUBSCRIBE)
p.WriteString(streamPath)
p.WriteByte(0)
p.Flush()
for _, v := range p.Subscribers {
p.Auth(v)
}
} else {
return
}
defer p.Cancel()
for cmd, err := brw.ReadByte(); !MayBeError(err); cmd, err = brw.ReadByte() {
switch cmd {
case MSG_AUDIO:
if t, payload, err := p.readAVPacket(avformat.FLV_TAG_TYPE_AUDIO); err == nil {
p.PushAudio(t, payload)
}
case MSG_VIDEO:
if t, payload, err := p.readAVPacket(avformat.FLV_TAG_TYPE_VIDEO); err == nil && len(payload) > 2 {
p.PushVideo(t, payload)
}
case MSG_AUTH:
cmd, err = brw.ReadByte()
if MayBeError(err) {
return
}
bytes, err := brw.ReadBytes(0)
if MayBeError(err) {
return
}
subId := strings.Split(string(bytes[0:len(bytes)-1]), ",")[0]
if v, ok := p.Subscribers[subId]; ok {
if cmd != 1 {
v.Cancel()
}
}
default:
log.Printf("unknown cmd:%v", cmd)
}
}
}
正在该函数中会向源服务器建立tcp连接,然后发送特定命令表示需要拉流,当我们接收到源服务器的数据的时候,就调用PushVideo和PushAudio函数来广播音视频。
核心逻辑是调用Publisher的Publish以及PushVideo、PushAudio函数
开发钩子插件
钩子插件就是在服务器的关键逻辑处插入的函数调用,方便扩展服务器的功能,比如对连接进行验证,或者触发一些特殊的发布者。 目前提供的钩子包括
- 当发布者开始发布时
OnPublishHooks.AddHook(onPublish)
例如:
func onPublish(r *Stream) {
for _, v := range r.Subscribers {
if err := CheckSign(v.Sign); err != nil {
v.Cancel()
}
}
}
此时可以访问房间里面的订阅者,对其进行验证。
- 当有订阅者订阅了某个流时,
OnSubscribeHooks.AddHook(onSubscribe)
例如:
func onSubscribe(s *Subscriber) {
if s.Publisher == nil {
go PullUpStream(s.StreamPath)
}
}
拉取源服务器的流