从 Kafka 写入

Kafka 介绍

Apache Kafka 是开源的分布式消息分发平台,被广泛应用于高性能数据管道、流式数据分析、数据集成和事件驱动类型的应用程序。Kafka 包含 Producer、Consumer 和 Topic,其中 Producer 是向 Kafka 发送消息的进程,Consumer 是从 Kafka 消费消息的进程。Kafka 相关概念可以参考官方文档

kafka topic

Kafka 的消息按 topic 组织,每个 topic 会有一到多个 partition。可以通过 kafka 的 kafka-topics 管理 topic。

创建名为 kafka-events 的topic:

  1. bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092

修改 kafka-events 的 partition 数量为 3:

  1. bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092

展示所有的 topic 和 partition:

  1. bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe

写入 TDengine

TDengine 支持 Sql 方式和 Schemaless 方式的数据写入,Sql 方式数据写入可以参考 TDengine SQL 写入TDengine 高效写入。Schemaless 方式数据写入可以参考 TDengine Schemaless 写入 文档。

示例代码

  • Python

python Kafka 客户端

Kafka 的 python 客户端可以参考文档 kafka client。推荐使用 confluent-kafka-pythonkafka-python。以下示例以 kafka-python 为例。

从 Kafka 消费数据

Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 kafka-python 客户端单条消费数据的示例如下:

  1. from kafka import KafkaConsumer
  2. consumer = KafkaConsumer('my_favorite_topic')
  3. for msg in consumer:
  4. print (msg)

单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 kafka-python 客户端批量消费数据的示例如下:

  1. from kafka import KafkaConsumer
  2. consumer = KafkaConsumer('my_favorite_topic')
  3. while True:
  4. msgs = consumer.poll(timeout_ms=500, max_records=1000)
  5. if msgs:
  6. print (msgs)

Python 多线程

为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下:

  1. from concurrent.futures import ThreadPoolExecutor, Future
  2. pool = ThreadPoolExecutor(max_workers=10)
  3. pool.submit(...)

Python 多进程

单个python进程不能充分发挥多核 CPU 的性能,有时候我们会选择多进程的方式。在多进程的情况下,需要注意,Kafka Consumer 的数量应该小于等于 Kafka Topic Partition 数量。Python 多进程示例代码如下:

  1. from multiprocessing import Process
  2. ps = []
  3. for i in range(5):
  4. p = Process(target=Consumer().consume())
  5. p.start()
  6. ps.append(p)
  7. for p in ps:
  8. p.join()

除了 Python 内置的多线程和多进程方式,还可以通过第三方库 gunicorn 实现并发。

完整示例

kafka_example_perform

kafka_example_perform 是示例程序的入口

  1. #! encoding=utf-8
  2. import argparse
  3. import logging
  4. import multiprocessing
  5. import time
  6. from multiprocessing import pool
  7. import kafka_example_common as common
  8. import kafka_example_consumer as consumer
  9. import kafka_example_producer as producer
  10. if __name__ == '__main__':
  11. parser = argparse.ArgumentParser()
  12. parser.add_argument('-kafka-broker', type=str, default='localhost:9092',
  13. help='kafka borker host. default is `localhost:9200`')
  14. parser.add_argument('-kafka-topic', type=str, default='tdengine-kafka-practices',
  15. help='kafka topic. default is `tdengine-kafka-practices`')
  16. parser.add_argument('-kafka-group', type=str, default='kafka_practices',
  17. help='kafka consumer group. default is `kafka_practices`')
  18. parser.add_argument('-taos-host', type=str, default='localhost',
  19. help='TDengine host. default is `localhost`')
  20. parser.add_argument('-taos-port', type=int, default=6030, help='TDengine port. default is 6030')
  21. parser.add_argument('-taos-user', type=str, default='root', help='TDengine username, default is `root`')
  22. parser.add_argument('-taos-password', type=str, default='taosdata', help='TDengine password, default is `taosdata`')
  23. parser.add_argument('-taos-db', type=str, default='tdengine_kafka_practices',
  24. help='TDengine db name, default is `tdengine_kafka_practices`')
  25. parser.add_argument('-table-count', type=int, default=100, help='TDengine sub-table count, default is 100')
  26. parser.add_argument('-table-items', type=int, default=1000, help='items in per sub-tables, default is 1000')
  27. parser.add_argument('-message-type', type=str, default='line',
  28. help='kafka message type. `line` or `json`. default is `line`')
  29. parser.add_argument('-max-poll', type=int, default=1000, help='max poll for kafka consumer')
  30. parser.add_argument('-threads', type=int, default=10, help='thread count for deal message')
  31. parser.add_argument('-processes', type=int, default=1, help='process count')
  32. args = parser.parse_args()
  33. total = args.table_count * args.table_items
  34. logging.warning("## start to prepare testing data...")
  35. prepare_data_start = time.time()
  36. producer.produce_total(100, args.kafka_broker, args.kafka_topic, args.message_type, total, args.table_count)
  37. prepare_data_end = time.time()
  38. logging.warning("## prepare testing data finished! spend-[%s]", prepare_data_end - prepare_data_start)
  39. logging.warning("## start to create database and tables ...")
  40. create_db_start = time.time()
  41. # create database and table
  42. common.create_database_and_tables(host=args.taos_host, port=args.taos_port, user=args.taos_user,
  43. password=args.taos_password, db=args.taos_db, table_count=args.table_count)
  44. create_db_end = time.time()
  45. logging.warning("## create database and tables finished! spend [%s]", create_db_end - create_db_start)
  46. processes = args.processes
  47. logging.warning("## start to consume data and insert into TDengine...")
  48. consume_start = time.time()
  49. if processes > 1: # multiprocess
  50. multiprocessing.set_start_method("spawn")
  51. pool = pool.Pool(processes)
  52. consume_start = time.time()
  53. for _ in range(processes):
  54. pool.apply_async(func=consumer.consume, args=(
  55. args.kafka_broker, args.kafka_topic, args.kafka_group, args.taos_host, args.taos_port, args.taos_user,
  56. args.taos_password, args.taos_db, args.message_type, args.max_poll, args.threads))
  57. pool.close()
  58. pool.join()
  59. else:
  60. consume_start = time.time()
  61. consumer.consume(kafka_brokers=args.kafka_broker, kafka_topic=args.kafka_topic, kafka_group_id=args.kafka_group,
  62. taos_host=args.taos_host, taos_port=args.taos_port, taos_user=args.taos_user,
  63. taos_password=args.taos_password, taos_database=args.taos_db, message_type=args.message_type,
  64. max_poll=args.max_poll, workers=args.threads)
  65. consume_end = time.time()
  66. logging.warning("## consume data and insert into TDengine over! spend-[%s]", consume_end - consume_start)
  67. # print report
  68. logging.warning(
  69. "\n#######################\n"
  70. " Prepare data \n"
  71. "#######################\n"
  72. "# data_type # %s \n"
  73. "# total # %s \n"
  74. "# spend # %s s\n"
  75. "#######################\n"
  76. " Create database \n"
  77. "#######################\n"
  78. "# stable # 1 \n"
  79. "# sub-table # 100 \n"
  80. "# spend # %s s \n"
  81. "#######################\n"
  82. " Consume \n"
  83. "#######################\n"
  84. "# data_type # %s \n"
  85. "# threads # %s \n"
  86. "# processes # %s \n"
  87. "# total_count # %s \n"
  88. "# spend # %s s\n"
  89. "# per_second # %s \n"
  90. "#######################\n",
  91. args.message_type, total, prepare_data_end - prepare_data_start, create_db_end - create_db_start,
  92. args.message_type, args.threads, processes, total, consume_end - consume_start,
  93. total / (consume_end - consume_start))

查看源码

kafka_example_common

kafka_example_common 是示例程序的公共代码

  1. #! encoding = utf-8
  2. import taos
  3. LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
  4. 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
  5. 'California.SantaClara', 'California.Cupertino']
  6. CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1'
  7. USE_DATABASE_SQL = 'use {}'
  8. DROP_TABLE_SQL = 'drop table if exists meters'
  9. DROP_DATABASE_SQL = 'drop database if exists {}'
  10. CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) tags ' \
  11. '(location binary(64), groupId int)'
  12. CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
  13. def create_database_and_tables(host, port, user, password, db, table_count):
  14. tags_tables = _init_tags_table_names(table_count=table_count)
  15. conn = taos.connect(host=host, port=port, user=user, password=password)
  16. conn.execute(DROP_DATABASE_SQL.format(db))
  17. conn.execute(CREATE_DATABASE_SQL.format(db))
  18. conn.execute(USE_DATABASE_SQL.format(db))
  19. conn.execute(DROP_TABLE_SQL)
  20. conn.execute(CREATE_STABLE_SQL)
  21. for tags in tags_tables:
  22. location, group_id = _get_location_and_group(tags)
  23. tables = tags_tables[tags]
  24. for table_name in tables:
  25. conn.execute(CREATE_TABLE_SQL.format(table_name, location, group_id))
  26. conn.close()
  27. def clean(host, port, user, password, db):
  28. conn = taos.connect(host=host, port=port, user=user, password=password)
  29. conn.execute(DROP_DATABASE_SQL.format(db))
  30. conn.close()
  31. def _init_tags_table_names(table_count):
  32. tags_table_names = {}
  33. group_id = 0
  34. for i in range(table_count):
  35. table_name = 'd{}'.format(i)
  36. location_idx = i % len(LOCATIONS)
  37. location = LOCATIONS[location_idx]
  38. if location_idx == 0:
  39. group_id += 1
  40. if group_id > 10:
  41. group_id -= 10
  42. key = _tag_table_mapping_key(location=location, group_id=group_id)
  43. if key not in tags_table_names:
  44. tags_table_names[key] = []
  45. tags_table_names[key].append(table_name)
  46. return tags_table_names
  47. def _tag_table_mapping_key(location, group_id):
  48. return '{}_{}'.format(location, group_id)
  49. def _get_location_and_group(key):
  50. fields = key.split('_')
  51. return fields[0], fields[1]

查看源码

kafka_example_producer

kafka_example_producer 是示例程序的 producer 代码,负责生成并发送测试数据到 kafka

  1. #! encoding = utf-8
  2. import json
  3. import random
  4. import threading
  5. from concurrent.futures import ThreadPoolExecutor, Future
  6. from datetime import datetime
  7. from kafka import KafkaProducer
  8. locations = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
  9. 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
  10. 'California.SantaClara', 'California.Cupertino']
  11. producers: list[KafkaProducer] = []
  12. lock = threading.Lock()
  13. start = 1640966400
  14. def produce_total(workers, broker, topic, message_type, total, table_count):
  15. if len(producers) == 0:
  16. lock.acquire()
  17. if len(producers) == 0:
  18. _init_kafka_producers(broker=broker, count=10)
  19. lock.release()
  20. pool = ThreadPoolExecutor(max_workers=workers)
  21. futures = []
  22. for _ in range(0, workers):
  23. futures.append(pool.submit(_produce_total, topic, message_type, int(total / workers), table_count))
  24. pool.shutdown()
  25. for f in futures:
  26. f.result()
  27. _close_kafka_producers()
  28. def _produce_total(topic, message_type, total, table_count):
  29. producer = _get_kafka_producer()
  30. for _ in range(total):
  31. message = _get_fake_date(message_type=message_type, table_count=table_count)
  32. producer.send(topic=topic, value=message.encode(encoding='utf-8'))
  33. def _init_kafka_producers(broker, count):
  34. for _ in range(count):
  35. p = KafkaProducer(bootstrap_servers=broker, batch_size=64 * 1024, linger_ms=300, acks=0)
  36. producers.append(p)
  37. def _close_kafka_producers():
  38. for p in producers:
  39. p.close()
  40. def _get_kafka_producer():
  41. return producers[random.randint(0, len(producers) - 1)]
  42. def _get_fake_date(table_count, message_type='json'):
  43. if message_type == 'json':
  44. return _get_json_message(table_count=table_count)
  45. if message_type == 'line':
  46. return _get_line_message(table_count=table_count)
  47. return ''
  48. def _get_json_message(table_count):
  49. return json.dumps({
  50. 'ts': _get_timestamp(),
  51. 'current': random.randint(0, 1000) / 100,
  52. 'voltage': random.randint(105, 115),
  53. 'phase': random.randint(0, 32000) / 100000,
  54. 'location': random.choice(locations),
  55. 'groupId': random.randint(1, 10),
  56. 'table_name': _random_table_name(table_count)
  57. })
  58. def _get_line_message(table_count):
  59. return "{} values('{}', {}, {}, {})".format(
  60. _random_table_name(table_count), # table
  61. _get_timestamp(), # ts
  62. random.randint(0, 1000) / 100, # current
  63. random.randint(105, 115), # voltage
  64. random.randint(0, 32000) / 100000, # phase
  65. )
  66. def _random_table_name(table_count):
  67. return 'd{}'.format(random.randint(0, table_count - 1))
  68. def _get_timestamp():
  69. global start
  70. lock.acquire(blocking=True)
  71. start += 0.001
  72. lock.release()
  73. return datetime.fromtimestamp(start).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

查看源码

kafka_example_consumer

kafka_example_consumer 是示例程序的 consumer 代码,负责从 kafka 消费数据,并写入到 TDengine

  1. #! encoding = utf-8
  2. import json
  3. import logging
  4. import time
  5. from concurrent.futures import ThreadPoolExecutor, Future
  6. from json import JSONDecodeError
  7. from typing import Callable
  8. import taos
  9. from kafka import KafkaConsumer
  10. from kafka.consumer.fetcher import ConsumerRecord
  11. import kafka_example_common as common
  12. class Consumer(object):
  13. DEFAULT_CONFIGS = {
  14. 'kafka_brokers': 'localhost:9092', # kafka broker
  15. 'kafka_topic': 'tdengine_kafka_practices',
  16. 'kafka_group_id': 'taos',
  17. 'taos_host': 'localhost', # TDengine host
  18. 'taos_port': 6030, # TDengine port
  19. 'taos_user': 'root', # TDengine user name
  20. 'taos_password': 'taosdata', # TDengine password
  21. 'taos_database': 'power', # TDengine database
  22. 'message_type': 'json', # message format, 'json' or 'line'
  23. 'clean_after_testing': False, # if drop database after testing
  24. 'max_poll': 1000, # poll size for batch mode
  25. 'workers': 10, # thread count for multi-threading
  26. 'testing': False
  27. }
  28. INSERT_SQL_HEADER = "insert into "
  29. INSERT_PART_SQL = '{} values (\'{}\', {}, {}, {})'
  30. def __init__(self, **configs):
  31. self.config = self.DEFAULT_CONFIGS
  32. self.config.update(configs)
  33. self.consumer = None
  34. if not self.config.get('testing'):
  35. self.consumer = KafkaConsumer(
  36. self.config.get('kafka_topic'),
  37. bootstrap_servers=self.config.get('kafka_brokers'),
  38. group_id=self.config.get('kafka_group_id'),
  39. )
  40. self.conns = taos.connect(
  41. host=self.config.get('taos_host'),
  42. port=self.config.get('taos_port'),
  43. user=self.config.get('taos_user'),
  44. password=self.config.get('taos_password'),
  45. db=self.config.get('taos_database'),
  46. )
  47. if self.config.get('workers') > 1:
  48. self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
  49. self.tasks = []
  50. # tags and table mapping # key: {location}_{groupId} value:
  51. def consume(self):
  52. """
  53. consume data from kafka and deal. Base on `message_type`, `bath_consume`, `insert_by_table`,
  54. there are several deal function.
  55. :return:
  56. """
  57. self.conns.execute(common.USE_DATABASE_SQL.format(self.config.get('taos_database')))
  58. try:
  59. if self.config.get('message_type') == 'line': # line
  60. self._run(self._line_to_taos)
  61. if self.config.get('message_type') == 'json': # json
  62. self._run(self._json_to_taos)
  63. except KeyboardInterrupt:
  64. logging.warning("## caught keyboard interrupt, stopping")
  65. finally:
  66. self.stop()
  67. def stop(self):
  68. """
  69. stop consuming
  70. :return:
  71. """
  72. # close consumer
  73. if self.consumer is not None:
  74. self.consumer.commit()
  75. self.consumer.close()
  76. # multi thread
  77. if self.config.get('workers') > 1:
  78. if self.pool is not None:
  79. self.pool.shutdown()
  80. for task in self.tasks:
  81. while not task.done():
  82. time.sleep(0.01)
  83. # clean data
  84. if self.config.get('clean_after_testing'):
  85. self.conns.execute(common.DROP_TABLE_SQL)
  86. self.conns.execute(common.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
  87. # close taos
  88. if self.conns is not None:
  89. self.conns.close()
  90. def _run(self, f):
  91. """
  92. run in batch consuming mode
  93. :param f:
  94. :return:
  95. """
  96. i = 0 # just for test.
  97. while True:
  98. messages = self.consumer.poll(timeout_ms=100, max_records=self.config.get('max_poll'))
  99. if messages:
  100. if self.config.get('workers') > 1:
  101. self.pool.submit(f, messages.values())
  102. else:
  103. f(list(messages.values()))
  104. if not messages:
  105. i += 1 # just for test.
  106. time.sleep(0.1)
  107. if i > 3: # just for test.
  108. logging.warning('## test over.') # just for test.
  109. return # just for test.
  110. def _json_to_taos(self, messages):
  111. """
  112. convert a batch of json data to sql, and insert into TDengine
  113. :param messages:
  114. :return:
  115. """
  116. sql = self._build_sql_from_json(messages=messages)
  117. self.conns.execute(sql=sql)
  118. def _line_to_taos(self, messages):
  119. """
  120. convert a batch of lines data to sql, and insert into TDengine
  121. :param messages:
  122. :return:
  123. """
  124. lines = []
  125. for partition_messages in messages:
  126. for message in partition_messages:
  127. lines.append(message.value.decode())
  128. sql = self.INSERT_SQL_HEADER + ' '.join(lines)
  129. self.conns.execute(sql=sql)
  130. def _build_single_sql_from_json(self, msg_value):
  131. try:
  132. data = json.loads(msg_value)
  133. except JSONDecodeError as e:
  134. logging.error('## decode message [%s] error ', msg_value, e)
  135. return ''
  136. # location = data.get('location')
  137. # group_id = data.get('groupId')
  138. ts = data.get('ts')
  139. current = data.get('current')
  140. voltage = data.get('voltage')
  141. phase = data.get('phase')
  142. table_name = data.get('table_name')
  143. return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
  144. def _build_sql_from_json(self, messages):
  145. sql_list = []
  146. for partition_messages in messages:
  147. for message in partition_messages:
  148. sql_list.append(self._build_single_sql_from_json(message.value))
  149. return self.INSERT_SQL_HEADER + ' '.join(sql_list)
  150. def test_json_to_taos(consumer: Consumer):
  151. records = [
  152. [
  153. ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
  154. value=json.dumps({'table_name': 'd0',
  155. 'ts': '2022-12-06 15:13:38.643',
  156. 'current': 3.41,
  157. 'voltage': 105,
  158. 'phase': 0.02027, }),
  159. partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
  160. serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
  161. ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
  162. value=json.dumps({'table_name': 'd1',
  163. 'ts': '2022-12-06 15:13:39.643',
  164. 'current': 3.41,
  165. 'voltage': 102,
  166. 'phase': 0.02027, }),
  167. partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
  168. serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
  169. ]
  170. ]
  171. consumer._json_to_taos(messages=records)
  172. def test_line_to_taos(consumer: Consumer):
  173. records = [
  174. [
  175. ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
  176. value="d0 values('2023-01-01 00:00:00.001', 3.49, 109, 0.02737)".encode('utf-8'),
  177. partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
  178. serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
  179. ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
  180. value="d1 values('2023-01-01 00:00:00.002', 6.19, 112, 0.09171)".encode('utf-8'),
  181. partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
  182. serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
  183. ]
  184. ]
  185. consumer._line_to_taos(messages=records)
  186. def consume(kafka_brokers, kafka_topic, kafka_group_id, taos_host, taos_port, taos_user,
  187. taos_password, taos_database, message_type, max_poll, workers):
  188. c = Consumer(kafka_brokers=kafka_brokers, kafka_topic=kafka_topic, kafka_group_id=kafka_group_id,
  189. taos_host=taos_host, taos_port=taos_port, taos_user=taos_user, taos_password=taos_password,
  190. taos_database=taos_database, message_type=message_type, max_poll=max_poll, workers=workers)
  191. c.consume()
  192. if __name__ == '__main__':
  193. consumer = Consumer(testing=True)
  194. common.create_database_and_tables(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test',
  195. table_count=10)
  196. consumer.conns.execute(common.USE_DATABASE_SQL.format('py_kafka_test'))
  197. test_json_to_taos(consumer)
  198. test_line_to_taos(consumer)
  199. common.clean(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test')

查看源码

执行步骤

执行 Python 示例程序

  1. 1. 安装并启动 kafka
  2. 2. python 环境准备
  3. - 安装 python3
  4. - 安装 taospy
  5. - 安装 kafka-python
  6. 3. 执行示例程序
  7. 程序的执行入口是 `kafka_example_perform.py`,获取程序完整的执行参数,请执行 help 命令。
  8. ```
  9. python3 kafka_example_perform.py --help
  10. ```
  11. 以下为创建 100 个子表,每个子表 20000 条数据,kafka max poll 100,一个进程,每个进程一个处理线程的程序执行命令
  12. ```
  13. python3 kafka_example_perform.py -table-count=100 -table-items=20000 -max-poll=100 -threads=1 -processes=1
  14. ```