插件开发教程

Bifrost v1.1.x 版本开始提供接口,在 linux 下支持通过 go1.12+ 版本的 plugin 包生成的 so 进行动态的加载到正在运行的 Bifrost 进程中去

Bifrost 本身提供了相关的接口进行插件注入,也同样提供了相关的测试接口,方便插件开过程中进行调试

创建插件目录

  • 在 github.com/brokercap/Bifrost/plugin 目录下创建和插件名称一致的目录名称(当然这个你也可以在自己的新建一个项目开发插件,但这个目录名必须和插件名保持一致)

拿redis插件作为案例来讲,创建redis 目录

  1. github.com/brokercap/Bifrost/plugin/redis
  • 在插件目录下子目录及文件
  1. -rwxr-xr-x 1 root root 90 Mar 18 18:38 redis.go
  2. -rwxr-xr-x 1 root root 14732 Mar 27 10:30 redisTest.go
  3. drwxr-xr-x 0 root root 0 Mar 26 18:41 src
  4. drwxr-xr-x 0 root root 0 Mar 26 08:58 www

src 目录

  1. -rwxr-xr-x 1 root root 5783 Mar 26 18:41 redis.go

www 目录

  1. -rwxr-xr-x 1 root root 1325 Mar 26 08:58 doc.html
  2. -rwxr-xr-x 1 root root 3864 Mar 18 10:57 redis.html
  3. -rwxr-xr-x 1 root root 1839 Mar 20 08:50 redis.js
  1. **src** 是插件go源码处理逻辑所放目录,当然这个你可以根据自己的意愿改,只要在编译的时候,指定这个目录就行了
  2. **www** 是插件存放htmljs等静态文件的目录,这个目录名必须为www, 当编译完运行的时候,需要前 当前插件目录及www拷贝到 Bifrost 运行目录下的 plugin
  3. redis.html redis.js 两个文件名必须和插件名保持一致,在Bifrsot 管理界面进行加载插件界面的时候,是通过插件名 + .html ,+ .js 进行加载的,
  4. 先加载html 再加载 js
  5. doc.html 为插件使用手册界面
  6. #### 插件接口

github.com/brokercap/Bifrost/plugin/driver

  1. - **往bifrost注册插件**

func init(){ driver.Register(“redis”,&MyConn{},VERSION,BIFROST_VERION)}

  1. redis : 插件名称,这个名称必须和插件目录名保持一致,以及www目录下的htmljs文件名
  2. &MyConn{} : 插件提供bifrost调用的 对象
  3. VERSION : 当前自己开发的插件版本
  4. BIFROST_VERION : 当前开发时候bifrost的版本
  5. - **MyConn 实现接口**

type MyConn struct {}

func (MyConn *MyConn) Open(uri string) driver.ConnFun{ return newConn(uri)}

func (MyConn *MyConn) GetUriExample() string{ return “pwd@tcp(127.0.0.1:6379)/0 or 127.0.0.1:6379”}

func (MyConn *MyConn) CheckUri(uri string) error{ c:= newConn(uri) if c.err != nil{ return c.err } c.Close() return nil}

  1. Open : Bifrost调用插件接口
  2. GetUriExample :提供插件连接uri,比如连接 redis ip+port
  3. CheckUri :用于bifrost调用,确认用户输入的连接地址是否有效
  4. - **实现传输数据的接口**

type Conn struct { Uri string}

func (This *Conn) SetParam(p interface{}) (interface{},error){}

func (This Conn) Connect() bool {}func (This Conn) Insert(data driver.PluginDataType) (driver.PluginBinlog,error) {}

func (This Conn) Update(data driver.PluginDataType) (driver.PluginBinlog,error) {}func (This Conn) Del(data driver.PluginDataType) (driver.PluginBinlog,error) {}func (This Conn) Query(data driver.PluginDataType) (driver.PluginBinlog,error) {}func (This Conn) Commit() (*driver.PluginBinlog,error){}

  1. Connect : 用户在调用 MyConn 对象Open方法的时候,会进行进行Connet的方法,这个方法里进行连接Redis等第三方服务
  2. SetParam :
  3. 每一次执行Insrt,Update,Delete,QueryCommit 接口之行,都会执行一次SetParam. 每个表第一次执行SetParam的时时候,提交的是
  4. *用户在插件界面配置的参数信息。传进来的参数是由插件js ,方法doGetPluginParam() 里返回的result.data 的数据*.. 数据结构为 map[string]interface{}
  5. 但是在第二次以后执行的时候,传入的是 由上一次SetParam 接口返回的 第一个结果值.
  6. 例如A同步数据.伪代码如下:

var tableParamOriginal = map[sting]interface{}var tableParam interface{}var err errorfor{ slect{ case data := <- ch pluginObj := GetPlugin(ToServerKey) if tableParam == nil{ tableParam,err = pluginObj.SetParam(tableParamOriginal) } switch data.EventType{ case “insert”: pluginObj.Insert(data) break case “update”: pluginObj.Update(data) break case “delete”: pluginObj.Delete(data) break case “sql”: pluginObj.Query(data) break } break

  1. case <-time.After(5 * time.Second):
  2. pluginObj := GetPlugin(ToServerKey)
  3. if tableParam == nil{
  4. tableParam,err = pluginObj.SetParam(tableParamOriginal)
  5. }
  6. pluginObj.Commit()
  7. }
  8. }

}

````

编写插件的时候,应该判断一下SetParam传进来的是什么数据结构,进行优化

因为可以自行在SetParam 接口里接传进来的map数据结构转面struct之后再以指针的时候再返回回去,这样下一次数据数据同步的时候,就只要传递指标了.

以 Redis 插件为例:

  1. func (This *Conn) GetParam(p interface{}) (*PluginParam,error){
  2. s,err := json.Marshal(p)
  3. if err != nil{
  4. return nil,err
  5. }
  6. var param PluginParam
  7. err2 := json.Unmarshal(s,&param)
  8. if err2 != nil{
  9. return nil,err2
  10. }
  11. This.p = &param
  12. return &param,nil
  13. }
  14. func (This *Conn) SetParam(p interface{}) (interface{},error){
  15. if p == nil{
  16. return nil,fmt.Errorf("param is nil")
  17. }
  18. switch p.(type) {
  19. case *PluginParam:
  20. This.p = p.(*PluginParam)
  21. return p,nil
  22. default:
  23. return This.GetParam(p)
  24. }
  25. }

Insert : 是数据库中新增数据的情况下会进行回调的方法,数据结构为github.com/brokercap/Bifrost/plugin/driver 下面的 PluginBinlog

Update :在数据进行修改的时候,回调的方法

Delete : 数据被删除的时候回调的方法

Query : Binlog 中记录的日志为sql 格式的时候,比如alter等语句,进行回调的方法

Commit :假如连续5秒都没有数据更新,回调一次这个方法

插件接口数据结构

  1. type PluginDataType struct {
  2. Timestamp uint32
  3. EventType string
  4. Rows []map[string]interface{}
  5. Query string
  6. SchemaName string
  7. TableName string
  8. BinlogFileNum int
  9. BinlogPosition uint32
  10. }

TimeStamp : 事件发生的时间戳

EventType :事件类型,insert,uupdate,delete,sql

Rows :在事件类型为 insert ,update,delete 的时候,这个数组里会有数据,insert , delete ,只有有Rows[0],但是在update类型的时候,Rows[0]为更新之前的数据,Rows[1] 为更新之后的数据。

Query : 当EventType == sql 的时候,这个字段不为空,存储的是执行的sql语句

SchemaName: 数据名名称

TableName : 表名

BinlogFileNum : 二进制日志文件后缀名,去掉了多余了000,比如二进制文件为 mysql-bin.000070,而这个字段存的是70

BinlogPosition : 二进制位点

  1. type PluginBinlog struct {
  2. BinlogFileNum int
  3. BinlogPosition uint32
  4. }

BinlogFileNum : 对标PluginDataType.BinlogFileNumBinlogPosition : 对标PluginDataType.BinlogPosition

其他函数

  1. func TransfeResult(val string, data *PluginDataType,rowIndex int) string {}

这个函数是由插件调用,将用户input输入的参数里的{$SchemaName} 等标签进行替换的方法

val : 拥有标签的字符串

data : 事件数据

rowIndex : 这个为Rows[] 里的下标值,代表用哪一个Rows[rowIndex]里的数据进行替换指定的标签

{$TableName} : 会被替换成 data.TableName的值

{$SchemaName} : 会被替换成 data.SchemaName

{$EventType} : 会被替换成 data.EventType

{$字段名} : 会被替换成 个Rows[rowIndex][字段名] 的值

  • 插件开发测试
  1. github.com/brokercap/Bifrost/sdk/pluginTestData

pluginTestData 包提供了 新增,更新,删除,及 SQL 事件的模拟数据

在src 下新增 *_test.go 文件 进行单元测试,案例如下:

  1. package src_test
  2. import (
  3. "testing"
  4. "log"
  5. "github.com/brokercap/Bifrost/sdk/pluginTestData"
  6. MyPlugin "github.com/brokercap/Bifrost/plugin/redis/src"
  7. "github.com/brokercap/Bifrost/test/pluginTest"
  8. //"github.com/garyburd/redigo/redis"
  9. "github.com/go-redis/redis"
  10. "fmt"
  11. "strings"
  12. )
  13. var url string = "10.40.2.41:6379"
  14. var redisConn *redis.Client
  15. func getParam() map[string]interface{}{
  16. p := make(map[string]interface{},0)
  17. p["KeyConfig"] = "{$SchemaName}-{$TableName}-{$id}"
  18. p["DataType"] = "json"
  19. p["Type"] = "set"
  20. p["DataType"] = "json"
  21. return p
  22. }
  23. func initRedisConn() error{
  24. redisConn = redis.NewClient(&redis.Options{
  25. Addr: url,
  26. Password: "", // no password set
  27. DB: 0,
  28. })
  29. if redisConn == nil{
  30. return fmt.Errorf("connect error")
  31. }
  32. return nil
  33. }
  34. func TestChechUri(t *testing.T){
  35. var url string = "127.0.0.1:6379"
  36. myConn := MyPlugin.MyConn{}
  37. if err := myConn.CheckUri(url);err!= nil{
  38. log.Println("TestChechUri err:",err)
  39. }else{
  40. log.Println("TestChechUri success")
  41. }
  42. }
  43. func TestSetParam(t *testing.T){
  44. myConn := MyPlugin.MyConn{}
  45. conn := myConn.Open(url)
  46. conn.SetParam(getParam())
  47. }
  48. func TestInsert(t *testing.T){
  49. myConn := MyPlugin.MyConn{}
  50. conn := myConn.Open(url)
  51. conn.SetParam(getParam())
  52. conn.Insert(pluginTest.GetTestInsertData())
  53. log.Println("test over")
  54. }
  55. func TestUpate(t *testing.T){
  56. myConn := MyPlugin.MyConn{}
  57. conn := myConn.Open(url)
  58. conn.SetParam(getParam())
  59. conn.Insert(pluginTest.GetTestUpdateData())
  60. log.Println("test over")
  61. }
  62. func TestDelete(t *testing.T){
  63. myConn := MyPlugin.MyConn{}
  64. conn := myConn.Open(url)
  65. conn.SetParam(getParam())
  66. conn.Insert(pluginTest.GetTestDeleteData())
  67. log.Println("test over")
  68. }
  69. func TestQuery(t *testing.T){
  70. myConn := MyPlugin.MyConn{}
  71. conn := myConn.Open(url)
  72. conn.SetParam(getParam())
  73. conn.Insert(pluginTest.GetTestQueryData())
  74. log.Println("test over")
  75. }
  76. func TestCommit(t *testing.T){
  77. myConn := MyPlugin.MyConn{}
  78. conn := myConn.Open(url)
  79. conn.Commit()
  80. log.Println("test over")
  81. }
  82. func TestCheckData(t *testing.T){
  83. var err error
  84. err = initRedisConn()
  85. if err!=nil{
  86. t.Fatal(err)
  87. }
  88. myConn := MyPlugin.MyConn{}
  89. conn := myConn.Open(url)
  90. conn.SetParam(getParam())
  91. e := pluginTestData.NewEvent()
  92. var checkResult map[string][]string
  93. t.Log("")
  94. t.Log("insert test start")
  95. insertData := e.GetTestInsertData()
  96. //log.Println(insertData)
  97. _, err = conn.Insert(insertData)
  98. if err != nil{
  99. t.Fatal(err)
  100. }
  101. var key string
  102. key = insertData.SchemaName+"-"+insertData.TableName+"-"+fmt.Sprint(insertData.Rows[0]["id"])
  103. var c string
  104. c,err = redisConn.Get( key).Result()
  105. if err!=nil{
  106. t.Fatal(err)
  107. }
  108. checkResult,err = e.CheckData(insertData.Rows[0],c)
  109. if err != nil{
  110. log.Fatal(err)
  111. t.Fatal(err)
  112. }
  113. for _,v := range checkResult["ok"]{
  114. t.Log(v)
  115. }
  116. for _,v := range checkResult["error"]{
  117. t.Error(v)
  118. }
  119. t.Log("")
  120. t.Log("update test start")
  121. updateData := e.GetTestUpdateData()
  122. _, err = conn.Update(updateData)
  123. if err != nil{
  124. t.Fatal(err)
  125. }
  126. key = updateData.SchemaName+"-"+updateData.TableName+"-"+fmt.Sprint(updateData.Rows[1]["id"])
  127. c,err = redisConn.Get( key).Result()
  128. if err!=nil{
  129. t.Fatal(err)
  130. }
  131. checkResult,err = e.CheckData(updateData.Rows[1],c)
  132. if err != nil{
  133. t.Fatal(err)
  134. }
  135. for _,v := range checkResult["ok"]{
  136. t.Log(v)
  137. }
  138. for _,v := range checkResult["error"]{
  139. t.Error(v)
  140. }
  141. t.Log("")
  142. t.Log("delete test start")
  143. deleteData := e.GetTestDeleteData()
  144. _, err = conn.Del(deleteData)
  145. if err != nil{
  146. t.Fatal(err)
  147. }
  148. key = deleteData.SchemaName+"-"+deleteData.TableName+"-"+fmt.Sprint(deleteData.Rows[0]["id"])
  149. c,err = redisConn.Get( key).Result()
  150. if strings.Contains(fmt.Sprint(err),"redis: nil") {
  151. t.Log("key:",key, " delete success")
  152. }else{
  153. t.Error("key:",key, " delete error,",err)
  154. }
  155. log.Println("test over")
  156. }
  157. //模拟正式环境刷数据
  158. func TestSyncLikeProduct(t *testing.T) {
  159. p := pluginTestData.NewPlugin("redis",url)
  160. err0 := p.SetParam(getParam())
  161. if err0 != nil{
  162. t.Fatal(err0)
  163. }
  164. var n uint = 10
  165. err := p.DoTestStart(n)
  166. if err != nil{
  167. t.Fatal(err)
  168. }else{
  169. t.Log("test success")
  170. }
  171. }
  172. //模拟正式环境性能测试(只随机生成一条数据。循环提交)
  173. func TestSyncLikeProductForSpeed(t *testing.T) {
  174. p := pluginTestData.NewPlugin("redis",url)
  175. err0 := p.SetParam(getParam())
  176. p.SetEventType(pluginTestData.INSERT)
  177. if err0 != nil{
  178. t.Fatal(err0)
  179. }
  180. var n uint = 100
  181. err := p.DoTestStartForSpeed(n)
  182. if err != nil{
  183. t.Fatal(err)
  184. }else{
  185. t.Log("test success")
  186. }
  187. }
  • 插件编译

和 Bifrost 源码一起静态编译

  • 修改Bifrost import_toserver.go文件
  1. github.com/brokercap/Bifrost/plugin/import_toserver.go

在import_toserver.go 文件里 import 插件源码

  • go build ./Bifrost.go

  • 将插插目录下的 www 目录拷贝到 github.com/brokercap/Bifrost/plugin/插件名/www

独立编译成so

在插件根目录新建 插件名.go 文件名,案例如下:

  1. # ll plugin/redis/
  2. total 17
  3. -rwxr-xr-x 1 root root 90 Mar 18 18:38 redis.go
  4. # cat plugin/redis/redis.go
  5. package main
  6. import (
  7. _ "github.com/brokercap/Bifrost/plugin/redis/src"
  8. )
  9. func main() {
  10. }
  1. # cd plugin/redis/
  2. # go build --buildmode=plugin -o redis.so redis.go
  3. # ll
  4. total 7885
  5. -rwxr-xr-x 1 root root 90 Mar 18 18:38 redis.go
  6. -rwxr-xr-x 1 root root 8055309 Mar 29 17:32 redis.so
  7. -rwxr-xr-x 1 root root 14732 Mar 27 10:30 redisTest.go
  8. drwxr-xr-x 0 root root 0 Mar 26 18:41 src
  9. drwxr-xr-x 0 root root 0 Mar 26 08:58 www

将生成 redis.so 及 www 拷贝到 Bifrost运行目录下的redis目录下即可

但这个只限linux 平台下,并且Bifrost 配置文件中配置 dynamic_plugin=true才有效,这个配置默认是false