测试前准备

声明

- 本文测试所用设备系统为MacOS- 模拟MQTT client行为的客户端为MQTTBOX

基本步骤流程

通过OpenEdge将数据写入TSDB具体依OpenEdge的函数计算服务实现,具体包括datapoint构造、身份信息签名及数据写入3个部分。

datapoint构造

  1. def build_datapoints(event):
  2. """
  3. function to build datapoints by event
  4. datapoint for example: {
  5. "datetime": "2018-08-10 18:15:05",
  6. "temperature": 32,
  7. "unit": ""
  8. }
  9. """
  10. datapoints = dict()
  11. datapoint = dict()
  12. datapoint['metric'] = 'temperature'
  13. datapoint['tags'] = {'unit': event['unit']}
  14. datapoint['value'] = event['temperature']
  15. timestamp = time.mktime(time.strptime(event['datetime'],
  16. '%Y-%m-%d %H:%M:%S'))
  17. datapoint['timestamp'] = str(timestamp).split('.')[0]
  18. datapoints['datapoints'] = [datapoint]
  19. return datapoints

通过上述代码即可成功构造datapoint数据(dict字典类型),其中metric、tags字段为必选字段,即构造的datapoint数据中必须包含metric和tags(TSDB要求,具体可查看TSDB API细节)。

身份信息签名

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import calendar
  4. import datetime
  5. import json
  6. import requests
  7. import time
  8. from sign import sign
  9. # set the transport protocol
  10. TRANS_PROTOCOL = 'http://'
  11. # set http method
  12. HTTP_METHOD = 'POST'
  13. # set base url and path
  14. base_url = 'your_db.tsdb.iot.xx.baidubce.com'
  15. path = '/v1/datapoint' # write your_data to datapoint of your_db on TSDB
  16. # save the information of Access Key ID and Secret Access Key
  17. ak = 'your_ak_info'
  18. sk = 'your_sk_info'
  19. credentials = sign.BceCredentials(ak, sk)
  20. # set a http header except field 'Authorization'
  21. headers = {'Host': base_url, 'Content-Type': 'application/json;charset=utf-8'}
  22. # we don't have params in our url,so set it to None
  23. # set header fields should be signed headers_to_sign = {"host"}
  24. # invoke sign method to get a signed string
  25. sign_str = sign.sign(credentials, HTTP_METHOD, path, headers, params, headers_to_sign=headers_to_sign)

不难看出,通过上述代码即可完成身份信息的签名,需要注意的是,对身份信息签名时需要用户在百度云注册账户,并创建属于自己的AK/SK信息,具体可参考如何获取AK/SK

数据写入

  1. def access_db(http_method, url, data=None):
  2. """
  3. function to access TSDB by RESTful API(only have GET,POST,PUT now)
  4. """
  5. # invoke sign method to get a signed string
  6. sign_str = sign.sign(credentials, HTTP_METHOD, path, headers, params,
  7. headers_to_sign=headers_to_sign)
  8. # add field 'Authorization' to complete the whole http header
  9. final_headers = dict(headers.items() + {'Authorization': sign_str}.items())
  10. try:
  11. if (http_method == 'POST') and (data is not None):
  12. rsp = requests.post(url, headers=final_headers, data=json.dumps(data))
  13. elif http_method == 'GET':
  14. rsp = requests.get(url, headers=final_headers)
  15. elif (http_method == 'PUT') and (data is not None):
  16. rsp = requests.put(url, headers=final_headers, data=json.dumps(data))
  17. else:
  18. rsp = 'Bad http method or data is empty'
  19. except StandardError:
  20. raise
  21. return rsp
  22. def handler(event, context):
  23. """
  24. function handler
  25. """
  26. datapoints = build_datapoints(event)
  27. try:
  28. rsp = access_db(HTTP_METHOD, TRANS_PROTOCOL + base_url + path,
  29. datapoints)
  30. except StandardError:
  31. raise
  32. # check http response status code to confirm if we write data successfully
  33. if str(rsp.status_code) == '204':
  34. pass
  35. else:
  36. if isinstance(rsp, str):
  37. raise TypeError('Response must be a string')
  38. else:
  39. raise BaseException('Get error: ' + str(rsp.status_code))

上述函数方法中,access_db()方法将身份签名信息联合构造的datapoint数据信息一起通过POST方法写入TSDB(GET、PUT方法具体用法请参考TSDB API说明);handler()方法是整体程序的入口,用于调用access_db()方法将数据写入TSDB,并通过写入返回的状态信息判断数据是否写入成功。

测试与验证

OpenEdge Hub模块配置

  1. name: openedge-hub
  2. listen:
  3. - tcp://:1883
  4. principals:
  5. - username: 'test'
  6. password: 'be178c0543eb17f5f3043021c9e5fcf30285e557a4fc309cce97ff9ca6182912'
  7. permissions:
  8. - action: 'pub'
  9. permit: ['#']
  10. - action: 'sub'
  11. permit: ['#']

OpenEdge Function模块配置:

  1. name: openedge-function
  2. hub:
  3. address: tcp://openedge-hub:1883
  4. username: test
  5. password: hahaha
  6. rules:
  7. - id: rule-e1iluuac1
  8. subscribe:
  9. topic: data
  10. qos: 1
  11. compute:
  12. function: write
  13. publish:
  14. topic: data/tsdb
  15. qos: 1
  16. functions:
  17. - name: 'write'
  18. runtime: 'python2.7'
  19. handler: 'write.handler'
  20. codedir: 'var/db/openedge/module/func-nyeosbbch'
  21. entry: "openedge-function-runtime-python27:build"
  22. env:
  23. USER_ID: acuiot
  24. instance:
  25. min: 1
  26. max: 10
  27. timeout: 30s

通过上述配置不难发现,借助MQTTBOX向主题“data”发布消息,并通过“write”函数将该数据写入云端TSDB。

需要说明的是:为实际生产考虑,避免写入消息量过大时导致OpenEdge处理的消息过多,此处仅对写入失败进行错误信息提示,写入成功则不提示任何信息。

通过MQTTBOX查看数据写入TSDB是否成功

此外,也可以通过账户登录云端TSDB进行查看,具体如下:

通过云端TSDB查看数据是否写入成功

从上图不难看出,数据已经成功写入TSDB。

最后更新于 2018-12-28 10:23:09

原文: https://openedge.tech/docs/practice/Write-data-to-TSDB-with-OpenEdge