Golang SDK

新建实时同步任务

在 Dashboard 或者通过命令行工具创建任务,数据源类型使用 Auto Push (自主推送)。

引入 Golang SDK

需要在项目中导入 SDK 的包,进行 SDK 的使用。导入方式如下:

  1. import "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy"

数据上报流程

导入 SDK 后,通过实例化一个 Client 接口对象后,调用相关的同步( Send() )或 异步( SendAsync() )接口来完成单条数据的上报任务,SDK 内部会根据 StreamID 将相同 StreamID 的数据批量上报。发送 Demo 可参考 example_test.go。 整体流程包括以下三个步骤:

初始化 SDK

  1. client, err := dataproxy.NewClient(
  2. dataproxy.WithGroupID("test"),
  3. dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList"),
  4. dataproxy.WithMetricsName("test"),
  5. )
  6. if err != nil {
  7. fmt.Println(err)
  8. return
  9. }

参数说明:

  • dataproxy.WithGroupID("test") 设置了 GroupID ;
  • dataproxy.WithURL("http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList") 设置了 Manager 的 URL ;
  • dataproxy.WithMetricsName("test") 设置了这个 Client 的指标中标签 “name” 的值;

调用发送接口进行数据上报

SDK 的数据发送接口是协程安全的,支持以同步或者异步模式发送单条数据。Demo 里有同步的方式,也有异步的方式。

同步方式:

  1. for i := 0; i < 1000; i++ {
  2. err := client.Send(context.Background(), dataproxy.Message{
  3. GroupID: "test",
  4. StreamID: "test",
  5. Payload: []byte("test|a|b|c"),
  6. })
  7. if err !=nil {
  8. fmt.Println(err)
  9. }
  10. }

异步方式:

  1. var success atomic.Uint64
  2. var failed atomic.Uint64
  3. for i := 0; i < 1000; i++ {
  4. client.SendAsync(context.Background(),
  5. dataproxy.Message{
  6. GroupID: "test",
  7. StreamID: "test",
  8. Payload: []byte("test|a|b|c"),
  9. },
  10. func(msg dataproxy.Message, err error) {
  11. if err != nil {
  12. success.Add(1)
  13. } else {
  14. failed.Add(1)
  15. }
  16. })
  17. }
  18. // wait async send finish
  19. time.Sleep(3 * time.Second)
  20. fmt.Println("success:", success.Load())
  21. fmt.Println("failed:", failed.Load())

我们推荐使用异步的方式发送,因为同步方式是无并发的,调用发送请求后需要等到响应或者超时,才能发下一条,在需要高吞吐量的场景是无法满足需求的。

关闭 SDK

关闭 SDK 只需要简单的调用 Close() 方法:

  1. client.Close()

注意事项

  • GroupIDURL 是 SDK 初始化必选的配置,你可以使用 dataproxy.WithGroupID()dataproxy.WithURL() 来设置这些配置;
  • 当你在一个进程里初始化多个 Client 实例时,MetricsName 必须配置不同的名称,否则指标查询将会失败,你可以使用 dataproxy.WithMetricsName() 来设置它;
  • SDK 的默认配置已经能够满足运营需求,除非你需要调大或者调小配置,一般不需要更改其他配置,配置的默认值请参考后面的章节;
  • SDK 的同步发送方法 Send() 是无并发的,在发送一个消息后,需要等待响应或者等待到超时才会返回,非必要的情况下不建议使用同步方法;
  • SDK 默认会对消息进行2次重传,如果依然失败,需要调用者来决定如何做下一步处理。

错误

常见错误如下:

错误描述
errors.New(“URL is not given”)Manager URL 未设置。
errors.New(“group ID is not given”)GroupID未设置。
errors.New(“invalid URL”)Manager URL 非法,可能为空。
errors.New(“invalid group ID”)GroupID 非法,可能为空。
errors.New(“service has no endpoints”)服务无端点,服务发现失败
errors.New(“no available worker”)没有可用工作者,工作者忙。
errNo{code: 10001, strCode: “10001”, message: “message send timeout”}发送超时。
errNo{code: 10002, strCode: “10002”, message: “message send failed”}发送失败。
errNo{code: 10003, strCode: “10003”, message: “producer already been closed”}生产者已关闭。
errNo{code: 10004, strCode: “10004”, message: “producer send queue is full”}发送队列满。
errNo{code: 10005, strCode: “10005”, message: “message context expired”}发送队列满,超时都未等到空闲位置。
errNo{code: 10010, strCode: “10010”, message: “input log is invalid”}输入的数据非法,可能为空。

配置项

配置项默认值描述可选
WithGroupID()“”设置GroupID
WithURL()“”设置 Manager 的URL
WithUpdateInterval()5m设置服务发现的更新时间
WithConnTimeout()3000ms设置连接超时
WriteBufferSize8M设置写缓冲区大小
WithReadBufferSize1M设置读缓冲区大小
WithSocketSendBufferSize8M设置网络发送缓冲区大小
WithSocketRecvBufferSize1M设置网络接收缓冲区大小
WithBufferPoolnil设置缓冲池是,如果应用有,建议共用
WithBytePoolnil设置内存池是,如果应用有,建议共用
WithBufferPoolSize409600设置缓冲池大小
WithBytePoolSize409600设置内存池大小
WithBytePoolWidth同BatchingMaxSize设置内存池宽度
WithLoggerstd.out设置调试日志是,不建议,默认的日志没有日志级别控制
WithMetricsName“dataproxy-go”设置指标名是,如果一个应用实例化了多个 client ,必须配置不一样的指标名,否则指标获取会失败
WithMetricsRegistryprometheus.DefaultRegisterer设置指标存储器
WithWorkerNum8设置工作者数量
WithSendTimeout30000ms设置发送超时
WithMaxRetries2设置量大重试次数
WithBatchingMaxPublishDelay20ms设置消息发送延迟,超过该时间,不能构成一个批次也会发送
WithBatchingMaxMessages50设置批次消息条数,达到条数即批量发送
WithBatchingMaxSize40K设置批次大小,达到该大小即批量发送
WithMaxPendingMessages204800设置每个工作者队列大小
WithBlockIfQueueIsFullfalse设置队列满是否阻塞
WithAddColumnsnil设置附加字段,DataProxy 支持在消息指定的位置增加字段,如 addcol1worldid=xxx 表示所有的消息的第一列都是 worldid,值为 xxx 的

配置项请参考 options.go

指标

指标名类型标签描述
data_proxy_error_countcountername:名称
code:错误码
统计发生的错误次数及错误码
data_proxy_retry_countcountername:名称
worker:工作者 ID
统计发生的重试次数及工作者 ID
data_proxy_timeout_countcountername:名称
worker:工作者 ID
统计发生的超时次数及工作者ID
data_proxy_msg_countcountername:名称
code:错误码
统计处理的消息数量和处理结果
data_proxy_update_conn_countcountername:名称
code:错误码
统计发生的连接更新次数和错误码
data_proxy_pending_msg_gaugegaugename:名称
worker:工作者 ID
统计排队中的消息数量及工作者 ID
data_proxy_batch_sizehistogramname:名称
code:错误码
统计每个批次的大小和错误码
data_proxy_batch_timehistogramname:名称
code:错误码
统计每个批次的延迟和错误码

指标请参考 metrics.go

错误码请参考 worker.go