High Performance Writing

This chapter introduces how to write data into TDengine with high throughput.

How to achieve high performance data writing

To achieve high performance writing, there are a few aspects to consider. In the following sections we will describe these important factors in achieving high performance writing.

Application Program

From the perspective of application program, you need to consider:

  1. The data size of each single write, also known as batch size. Generally speaking, higher batch size generates better writing performance. However, once the batch size is over a specific value, you will not get any additional benefit anymore. When using SQL to write into TDengine, it’s better to put as much as possible data in single SQL. The maximum SQL length supported by TDengine is 1,048,576 bytes, i.e. 1 MB. It can be configured by parameter maxSQLLength on client side, and the default value is 65,480.

  2. The number of concurrent connections. Normally more connections can get better result. However, once the number of connections exceeds the processing ability of the server side, the performance may downgrade.

  3. The distribution of data to be written across tables or sub-tables. Writing to single table in one batch is more efficient than writing to multiple tables in one batch.

  4. Data Writing Protocol.

    • Prameter binding mode is more efficient than SQL because it doesn’t have the cost of parsing SQL.
    • Writing to known existing tables is more efficient than wirting to uncertain tables in automatic creating mode because the later needs to check whether the table exists or not before actually writing data into it
    • Writing in SQL is more efficient than writing in schemaless mode because schemaless writing creats table automatically and may alter table schema

Application programs need to take care of the above factors and try to take advantage of them. The application progam should write to single table in each write batch. The batch size needs to be tuned to a proper value on a specific system. The number of concurrent connections needs to be tuned to a proper value too to achieve the best writing throughput.

Data Source

Application programs need to read data from data source then write into TDengine. If you meet one or more of below situations, you need to setup message queues between the threads for reading from data source and the threads for writing into TDengine.

  1. There are multiple data sources, the data generation speed of each data source is much slower than the speed of single writing thread. In this case, the purpose of message queues is to consolidate the data from multiple data sources together to increase the batch size of single write.
  2. The speed of data generation from single data source is much higher than the speed of single writing thread. The purpose of message queue in this case is to provide buffer so that data is not lost and multiple writing threads can get data from the buffer.
  3. The data for single table are from multiple data source. In this case the purpose of message queues is to combine the data for single table together to improve the write efficiency.

If the data source is Kafka, then the appication program is a consumer of Kafka, you can benefit from some kafka features to achieve high performance writing:

  1. Put the data for a table in single partition of single topic so that it’s easier to put the data for each table together and write in batch
  2. Subscribe multiple topics to accumulate data together.
  3. Add more consumers to gain more concurrency and throughput.
  4. Incrase the size of single fetch to increase the size of write batch.

Tune TDengine

TDengine is a distributed and high performance time series database, there are also some ways to tune TDengine to get better writing performance.

  1. Set proper number of vgroups according to available CPU cores. Normally, we recommend 2 * number_of_cores as a starting point. If the verification result shows this is not enough to utilize CPU resources, you can use a higher value.
  2. Set proper minTablesPerVnode, tableIncStepPerVnode, and maxVgroupsPerDb according to the number of tables so that tables are distributed even across vgroups. The purpose is to balance the workload among all vnodes so that system resources can be utilized better to get higher performance.

For more performance tuning tips, please refer to Performance Optimization and Configuration Parameters.

Sample Programs

This section will introduce the sample programs to demonstrate how to write into TDengine with high performance.

Scenario

Below are the scenario for the sample programs of high performance wrting.

  • Application program reads data from data source, the sample program simulates a data source by generating data
  • The speed of single writing thread is much slower than the speed of generating data, so the program starts multiple writing threads while each thread establish a connection to TDengine and each thread has a message queue of fixed size.
  • Application program maps the received data to different writing threads based on table name to make sure all the data for each table is always processed by a specific writing thread.
  • Each writing thread writes the received data into TDengine once the message queue becomes empty or the read data meets a threshold.

Thread Model of High Performance Writing into TDengine

Sample Programs

The sample programs listed in this section are based on the scenario described previously. If your scenarios is different, please try to adjust the code based on the principles described in this chapter.

The sample programs assume the source data is for all the different sub tables in same super table (meters). The super table has been created before the sample program starts to writing data. Sub tables are created automatically according to received data. If there are multiple super tables in your case, please try to adjust the part of creating table automatically.

  • Java
  • Python

Program Inventory

ClassDescription
FastWriteExampleMain Program
ReadTaskRead data from simulated data source and put into a queue according to the hash value of table name
WriteTaskRead data from Queue, compose a wirte batch and write into TDengine
MockDataSourceGenerate data for some sub tables of super table meters
SQLWriterWriteTask uses this class to compose SQL, create table automatically, check SQL length and write data
StmtWriterWrite in Parameter binding mode (Not finished yet)
DataBaseMonitorCalculate the writing speed and output on console every 10 seconds

Below is the list of complete code of the classes in above table and more detailed description.

FastWriteExample

The main Program is responsible for:

  1. Create message queues
  2. Start writing threads
  3. Start reading threads
  4. Otuput writing speed every 10 seconds

The main program provides 4 parameters for tuning:

  1. The number of reading threads, default value is 1
  2. The number of writing threads, default alue is 2
  3. The total number of tables in the generated data, default value is 1000. These tables are distributed evenly across all writing threads. If the number of tables is very big, it will cost much time to firstly create these tables.
  4. The batch size of single write, default value is 3,000

The capacity of message queue also impacts performance and can be tuned by modifying program. Normally it’s always better to have a larger message queue. A larger message queue means lower possibility of being blocked when enqueueing and higher throughput. But a larger message queue consumes more memory space. The default value used in the sample programs is already big enoug.

  1. package com.taos.example.highvolume;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.sql.*;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.concurrent.ArrayBlockingQueue;
  8. import java.util.concurrent.BlockingQueue;
  9. public class FastWriteExample {
  10. final static Logger logger = LoggerFactory.getLogger(FastWriteExample.class);
  11. final static int taskQueueCapacity = 1000000;
  12. final static List<BlockingQueue<String>> taskQueues = new ArrayList<>();
  13. final static List<ReadTask> readTasks = new ArrayList<>();
  14. final static List<WriteTask> writeTasks = new ArrayList<>();
  15. final static DataBaseMonitor databaseMonitor = new DataBaseMonitor();
  16. public static void stopAll() {
  17. logger.info("shutting down");
  18. readTasks.forEach(task -> task.stop());
  19. writeTasks.forEach(task -> task.stop());
  20. databaseMonitor.close();
  21. }
  22. public static void main(String[] args) throws InterruptedException, SQLException {
  23. int readTaskCount = args.length > 0 ? Integer.parseInt(args[0]) : 1;
  24. int writeTaskCount = args.length > 1 ? Integer.parseInt(args[1]) : 3;
  25. int tableCount = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
  26. int maxBatchSize = args.length > 3 ? Integer.parseInt(args[3]) : 3000;
  27. logger.info("readTaskCount={}, writeTaskCount={} tableCount={} maxBatchSize={}",
  28. readTaskCount, writeTaskCount, tableCount, maxBatchSize);
  29. databaseMonitor.init().prepareDatabase();
  30. // Create task queues, whiting tasks and start writing threads.
  31. for (int i = 0; i < writeTaskCount; ++i) {
  32. BlockingQueue<String> queue = new ArrayBlockingQueue<>(taskQueueCapacity);
  33. taskQueues.add(queue);
  34. WriteTask task = new WriteTask(queue, maxBatchSize);
  35. Thread t = new Thread(task);
  36. t.setName("WriteThread-" + i);
  37. t.start();
  38. }
  39. // create reading tasks and start reading threads
  40. int tableCountPerTask = tableCount / readTaskCount;
  41. for (int i = 0; i < readTaskCount; ++i) {
  42. ReadTask task = new ReadTask(i, taskQueues, tableCountPerTask);
  43. Thread t = new Thread(task);
  44. t.setName("ReadThread-" + i);
  45. t.start();
  46. }
  47. Runtime.getRuntime().addShutdownHook(new Thread(FastWriteExample::stopAll));
  48. long lastCount = 0;
  49. while (true) {
  50. Thread.sleep(10000);
  51. long numberOfTable = databaseMonitor.getTableCount();
  52. long count = databaseMonitor.count();
  53. logger.info("numberOfTable={} count={} speed={}", numberOfTable, count, (count - lastCount) / 10);
  54. lastCount = count;
  55. }
  56. }
  57. }

view source code

ReadTask

ReadTask reads data from data source. Each ReadTask is associated with a simulated data source, each data source generates data for a group of specific tables, and the data of any table is only generated from a single specific data source.

ReadTask puts data in message queue in blocking mode. That means, the putting operation is blocked if the message queue is full.

  1. package com.taos.example.highvolume;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.util.Iterator;
  5. import java.util.List;
  6. import java.util.concurrent.BlockingQueue;
  7. class ReadTask implements Runnable {
  8. private final static Logger logger = LoggerFactory.getLogger(ReadTask.class);
  9. private final int taskId;
  10. private final List<BlockingQueue<String>> taskQueues;
  11. private final int queueCount;
  12. private final int tableCount;
  13. private boolean active = true;
  14. public ReadTask(int readTaskId, List<BlockingQueue<String>> queues, int tableCount) {
  15. this.taskId = readTaskId;
  16. this.taskQueues = queues;
  17. this.queueCount = queues.size();
  18. this.tableCount = tableCount;
  19. }
  20. /**
  21. * Assign data received to different queues.
  22. * Here we use the suffix number in table name.
  23. * You are expected to define your own rule in practice.
  24. *
  25. * @param line record received
  26. * @return which queue to use
  27. */
  28. public int getQueueId(String line) {
  29. String tbName = line.substring(0, line.indexOf(',')); // For example: tb1_101
  30. String suffixNumber = tbName.split("_")[1];
  31. return Integer.parseInt(suffixNumber) % this.queueCount;
  32. }
  33. @Override
  34. public void run() {
  35. logger.info("started");
  36. Iterator<String> it = new MockDataSource("tb" + this.taskId, tableCount);
  37. try {
  38. while (it.hasNext() && active) {
  39. String line = it.next();
  40. int queueId = getQueueId(line);
  41. taskQueues.get(queueId).put(line);
  42. }
  43. } catch (Exception e) {
  44. logger.error("Read Task Error", e);
  45. }
  46. }
  47. public void stop() {
  48. logger.info("stop");
  49. this.active = false;
  50. }
  51. }

view source code

WriteTask

  1. package com.taos.example.highvolume;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.util.concurrent.BlockingQueue;
  5. class WriteTask implements Runnable {
  6. private final static Logger logger = LoggerFactory.getLogger(WriteTask.class);
  7. private final int maxBatchSize;
  8. // the queue from which this writing task get raw data.
  9. private final BlockingQueue<String> queue;
  10. // A flag indicate whether to continue.
  11. private boolean active = true;
  12. public WriteTask(BlockingQueue<String> taskQueue, int maxBatchSize) {
  13. this.queue = taskQueue;
  14. this.maxBatchSize = maxBatchSize;
  15. }
  16. @Override
  17. public void run() {
  18. logger.info("started");
  19. String line = null; // data getting from the queue just now.
  20. SQLWriter writer = new SQLWriter(maxBatchSize);
  21. try {
  22. writer.init();
  23. while (active) {
  24. line = queue.poll();
  25. if (line != null) {
  26. // parse raw data and buffer the data.
  27. writer.processLine(line);
  28. } else if (writer.hasBufferedValues()) {
  29. // write data immediately if no more data in the queue
  30. writer.flush();
  31. } else {
  32. // sleep a while to avoid high CPU usage if no more data in the queue and no buffered records, .
  33. Thread.sleep(100);
  34. }
  35. }
  36. if (writer.hasBufferedValues()) {
  37. writer.flush();
  38. }
  39. } catch (Exception e) {
  40. String msg = String.format("line=%s, bufferedCount=%s", line, writer.getBufferedCount());
  41. logger.error(msg, e);
  42. } finally {
  43. writer.close();
  44. }
  45. }
  46. public void stop() {
  47. logger.info("stop");
  48. this.active = false;
  49. }
  50. }

view source code

MockDataSource

  1. package com.taos.example.highvolume;
  2. import java.util.Iterator;
  3. /**
  4. * Generate test data
  5. */
  6. class MockDataSource implements Iterator {
  7. private String tbNamePrefix;
  8. private int tableCount;
  9. private long maxRowsPerTable = 1000000000L;
  10. // 100 milliseconds between two neighbouring rows.
  11. long startMs = System.currentTimeMillis() - maxRowsPerTable * 100;
  12. private int currentRow = 0;
  13. private int currentTbId = -1;
  14. // mock values
  15. String[] location = {"California.LosAngeles", "California.SanDiego", "California.SanJose", "California.Campbell", "California.SanFrancisco"};
  16. float[] current = {8.8f, 10.7f, 9.9f, 8.9f, 9.4f};
  17. int[] voltage = {119, 116, 111, 113, 118};
  18. float[] phase = {0.32f, 0.34f, 0.33f, 0.329f, 0.141f};
  19. public MockDataSource(String tbNamePrefix, int tableCount) {
  20. this.tbNamePrefix = tbNamePrefix;
  21. this.tableCount = tableCount;
  22. }
  23. @Override
  24. public boolean hasNext() {
  25. currentTbId += 1;
  26. if (currentTbId == tableCount) {
  27. currentTbId = 0;
  28. currentRow += 1;
  29. }
  30. return currentRow < maxRowsPerTable;
  31. }
  32. @Override
  33. public String next() {
  34. long ts = startMs + 100 * currentRow;
  35. int groupId = currentTbId % 5 == 0 ? currentTbId / 5 : currentTbId / 5 + 1;
  36. StringBuilder sb = new StringBuilder(tbNamePrefix + "_" + currentTbId + ","); // tbName
  37. sb.append(ts).append(','); // ts
  38. sb.append(current[currentRow % 5]).append(','); // current
  39. sb.append(voltage[currentRow % 5]).append(','); // voltage
  40. sb.append(phase[currentRow % 5]).append(','); // phase
  41. sb.append(location[currentRow % 5]).append(','); // location
  42. sb.append(groupId); // groupID
  43. return sb.toString();
  44. }
  45. }

view source code

SQLWriter

SQLWriter class encapsulates the logic of composing SQL and writing data. Please be noted that the tables have not been created before writing, but are created automatically when catching the exception of table doesn’t exist. For other exceptions caught, the SQL which caused the exception are logged for you to debug.

  1. package com.taos.example.highvolume;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.sql.*;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * A helper class encapsulate the logic of writing using SQL.
  9. * <p>
  10. * The main interfaces are two methods:
  11. * <ol>
  12. * <li>{@link SQLWriter#processLine}, which receive raw lines from WriteTask and group them by table names.</li>
  13. * <li>{@link SQLWriter#flush}, which assemble INSERT statement and execute it.</li>
  14. * </ol>
  15. * <p>
  16. * There is a technical skill worth mentioning: we create table as needed when "table does not exist" error occur instead of creating table automatically using syntax "INSET INTO tb USING stb".
  17. * This ensure that checking table existence is a one-time-only operation.
  18. * </p>
  19. *
  20. * </p>
  21. */
  22. public class SQLWriter {
  23. final static Logger logger = LoggerFactory.getLogger(SQLWriter.class);
  24. private Connection conn;
  25. private Statement stmt;
  26. /**
  27. * current number of buffered records
  28. */
  29. private int bufferedCount = 0;
  30. /**
  31. * Maximum number of buffered records.
  32. * Flush action will be triggered if bufferedCount reached this value,
  33. */
  34. private int maxBatchSize;
  35. /**
  36. * Maximum SQL length.
  37. */
  38. private int maxSQLLength;
  39. /**
  40. * Map from table name to column values. For example:
  41. * "tb001" -> "(1648432611249,2.1,114,0.09) (1648432611250,2.2,135,0.2)"
  42. */
  43. private Map<String, String> tbValues = new HashMap<>();
  44. /**
  45. * Map from table name to tag values in the same order as creating stable.
  46. * Used for creating table.
  47. */
  48. private Map<String, String> tbTags = new HashMap<>();
  49. public SQLWriter(int maxBatchSize) {
  50. this.maxBatchSize = maxBatchSize;
  51. }
  52. /**
  53. * Get Database Connection
  54. *
  55. * @return Connection
  56. * @throws SQLException
  57. */
  58. private static Connection getConnection() throws SQLException {
  59. String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
  60. return DriverManager.getConnection(jdbcURL);
  61. }
  62. /**
  63. * Create Connection and Statement
  64. *
  65. * @throws SQLException
  66. */
  67. public void init() throws SQLException {
  68. conn = getConnection();
  69. stmt = conn.createStatement();
  70. stmt.execute("use test");
  71. ResultSet rs = stmt.executeQuery("show variables");
  72. while (rs.next()) {
  73. String configName = rs.getString(1);
  74. if ("maxSQLLength".equals(configName)) {
  75. maxSQLLength = Integer.parseInt(rs.getString(2));
  76. logger.info("maxSQLLength={}", maxSQLLength);
  77. }
  78. }
  79. }
  80. /**
  81. * Convert raw data to SQL fragments, group them by table name and cache them in a HashMap.
  82. * Trigger writing when number of buffered records reached maxBachSize.
  83. *
  84. * @param line raw data get from task queue in format: tbName,ts,current,voltage,phase,location,groupId
  85. */
  86. public void processLine(String line) throws SQLException {
  87. bufferedCount += 1;
  88. int firstComma = line.indexOf(',');
  89. String tbName = line.substring(0, firstComma);
  90. int lastComma = line.lastIndexOf(',');
  91. int secondLastComma = line.lastIndexOf(',', lastComma - 1);
  92. String value = "(" + line.substring(firstComma + 1, secondLastComma) + ") ";
  93. if (tbValues.containsKey(tbName)) {
  94. tbValues.put(tbName, tbValues.get(tbName) + value);
  95. } else {
  96. tbValues.put(tbName, value);
  97. }
  98. if (!tbTags.containsKey(tbName)) {
  99. String location = line.substring(secondLastComma + 1, lastComma);
  100. String groupId = line.substring(lastComma + 1);
  101. String tagValues = "('" + location + "'," + groupId + ')';
  102. tbTags.put(tbName, tagValues);
  103. }
  104. if (bufferedCount == maxBatchSize) {
  105. flush();
  106. }
  107. }
  108. /**
  109. * Assemble INSERT statement using buffered SQL fragments in Map {@link SQLWriter#tbValues} and execute it.
  110. * In case of "Table does not exit" exception, create all tables in the sql and retry the sql.
  111. */
  112. public void flush() throws SQLException {
  113. StringBuilder sb = new StringBuilder("INSERT INTO ");
  114. for (Map.Entry<String, String> entry : tbValues.entrySet()) {
  115. String tableName = entry.getKey();
  116. String values = entry.getValue();
  117. String q = tableName + " values " + values + " ";
  118. if (sb.length() + q.length() > maxSQLLength) {
  119. executeSQL(sb.toString());
  120. logger.warn("increase maxSQLLength or decrease maxBatchSize to gain better performance");
  121. sb = new StringBuilder("INSERT INTO ");
  122. }
  123. sb.append(q);
  124. }
  125. executeSQL(sb.toString());
  126. tbValues.clear();
  127. bufferedCount = 0;
  128. }
  129. private void executeSQL(String sql) throws SQLException {
  130. try {
  131. stmt.executeUpdate(sql);
  132. } catch (SQLException e) {
  133. // convert to error code defined in taoserror.h
  134. int errorCode = e.getErrorCode() & 0xffff;
  135. if (errorCode == 0x362 || errorCode == 0x218) {
  136. // Table does not exist
  137. createTables();
  138. executeSQL(sql);
  139. } else {
  140. logger.error("Execute SQL: {}", sql);
  141. throw e;
  142. }
  143. } catch (Throwable throwable) {
  144. logger.error("Execute SQL: {}", sql);
  145. throw throwable;
  146. }
  147. }
  148. /**
  149. * Create tables in batch using syntax:
  150. * <p>
  151. * CREATE TABLE [IF NOT EXISTS] tb_name1 USING stb_name TAGS (tag_value1, ...) [IF NOT EXISTS] tb_name2 USING stb_name TAGS (tag_value2, ...) ...;
  152. * </p>
  153. */
  154. private void createTables() throws SQLException {
  155. StringBuilder sb = new StringBuilder("CREATE TABLE ");
  156. for (String tbName : tbValues.keySet()) {
  157. String tagValues = tbTags.get(tbName);
  158. sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
  159. }
  160. String sql = sb.toString();
  161. try {
  162. stmt.executeUpdate(sql);
  163. } catch (Throwable throwable) {
  164. logger.error("Execute SQL: {}", sql);
  165. throw throwable;
  166. }
  167. }
  168. public boolean hasBufferedValues() {
  169. return bufferedCount > 0;
  170. }
  171. public int getBufferedCount() {
  172. return bufferedCount;
  173. }
  174. public void close() {
  175. try {
  176. stmt.close();
  177. } catch (SQLException e) {
  178. }
  179. try {
  180. conn.close();
  181. } catch (SQLException e) {
  182. }
  183. }
  184. }

view source code

DataBaseMonitor

  1. package com.taos.example.highvolume;
  2. import java.sql.*;
  3. /**
  4. * Prepare target database.
  5. * Count total records in database periodically so that we can estimate the writing speed.
  6. */
  7. public class DataBaseMonitor {
  8. private Connection conn;
  9. private Statement stmt;
  10. public DataBaseMonitor init() throws SQLException {
  11. if (conn == null) {
  12. String jdbcURL = System.getenv("TDENGINE_JDBC_URL");
  13. conn = DriverManager.getConnection(jdbcURL);
  14. stmt = conn.createStatement();
  15. }
  16. return this;
  17. }
  18. public void close() {
  19. try {
  20. stmt.close();
  21. } catch (SQLException e) {
  22. }
  23. try {
  24. conn.close();
  25. } catch (SQLException e) {
  26. }
  27. }
  28. public void prepareDatabase() throws SQLException {
  29. stmt.execute("DROP DATABASE IF EXISTS test");
  30. stmt.execute("CREATE DATABASE test");
  31. stmt.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)");
  32. }
  33. public Long count() throws SQLException {
  34. if (!stmt.isClosed()) {
  35. ResultSet result = stmt.executeQuery("SELECT count(*) from test.meters");
  36. result.next();
  37. return result.getLong(1);
  38. }
  39. return null;
  40. }
  41. /**
  42. * show test.stables;
  43. *
  44. * name | created_time | columns | tags | tables |
  45. * ============================================================================================
  46. * meters | 2022-07-20 08:39:30.902 | 4 | 2 | 620000 |
  47. */
  48. public Long getTableCount() throws SQLException {
  49. if (!stmt.isClosed()) {
  50. ResultSet result = stmt.executeQuery("show test.stables");
  51. result.next();
  52. return result.getLong(5);
  53. }
  54. return null;
  55. }
  56. }

view source code

Steps to Launch

Launch Java Sample Program

You need to set environment variable TDENGINE_JDBC_URL before launching the program. If TDengine Server is setup on localhost, then the default value for user name, password and port can be used, like below:

  1. TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"

Launch in IDE

  1. Clone TDengine repolitory

    1. git clone git@github.com:taosdata/TDengine.git --depth 1
  2. Use IDE to open docs/examples/java directory

  3. Configure environment variable TDENGINE_JDBC_URL, you can also configure it before launching the IDE, if so you can skip this step.
  4. Run class com.taos.example.highvolume.FastWriteExample

Launch on server

If you want to launch the sample program on a remote server, please follow below steps:

  1. Package the sample programs. Execute below command under directory TDengine/docs/examples/java

    1. mvn package
  2. Create examples/java directory on the server

    1. mkdir -p examples/java
  3. Copy dependencies (below commands assume you are working on a local Windows host and try to launch on a remote Linux host)

    • Copy dependent packages

      1. scp -r .\target\lib <user>@<host>:~/examples/java
    • Copy the jar of sample programs

      1. scp -r .\target\javaexample-1.0.jar <user>@<host>:~/examples/java
  4. Configure environment variable Edit ~/.bash_profile or ~/.bashrc and add below:

    1. export TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"

    If your TDengine server is not deployed on localhost or doesn’t use default port, you need to change the above URL to correct value in your environment.

  5. Launch the sample program

    1. java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <white_thread_count> <total_table_count> <max_batch_size>
  6. The sample program doesn’t exit unless you press CTRL + C to terminate it. Below is the output of running on a server of 16 cores, 64GB memory and SSD hard disk.

    1. root@vm85$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 2 12
    2. 18:56:35.896 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=2, writeTaskCount=12 tableCount=1000 maxBatchSize=3000
    3. 18:56:36.011 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
    4. 18:56:36.015 [WriteThread-0] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    5. 18:56:36.021 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
    6. 18:56:36.022 [WriteThread-1] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    7. 18:56:36.031 [WriteThread-2] INFO c.taos.example.highvolume.WriteTask - started
    8. 18:56:36.032 [WriteThread-2] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    9. 18:56:36.041 [WriteThread-3] INFO c.taos.example.highvolume.WriteTask - started
    10. 18:56:36.042 [WriteThread-3] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    11. 18:56:36.093 [WriteThread-4] INFO c.taos.example.highvolume.WriteTask - started
    12. 18:56:36.094 [WriteThread-4] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    13. 18:56:36.099 [WriteThread-5] INFO c.taos.example.highvolume.WriteTask - started
    14. 18:56:36.100 [WriteThread-5] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    15. 18:56:36.100 [WriteThread-6] INFO c.taos.example.highvolume.WriteTask - started
    16. 18:56:36.101 [WriteThread-6] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    17. 18:56:36.103 [WriteThread-7] INFO c.taos.example.highvolume.WriteTask - started
    18. 18:56:36.104 [WriteThread-7] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    19. 18:56:36.105 [WriteThread-8] INFO c.taos.example.highvolume.WriteTask - started
    20. 18:56:36.107 [WriteThread-8] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    21. 18:56:36.108 [WriteThread-9] INFO c.taos.example.highvolume.WriteTask - started
    22. 18:56:36.109 [WriteThread-9] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    23. 18:56:36.156 [WriteThread-10] INFO c.taos.example.highvolume.WriteTask - started
    24. 18:56:36.157 [WriteThread-11] INFO c.taos.example.highvolume.WriteTask - started
    25. 18:56:36.158 [WriteThread-10] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    26. 18:56:36.158 [ReadThread-0] INFO com.taos.example.highvolume.ReadTask - started
    27. 18:56:36.158 [ReadThread-1] INFO com.taos.example.highvolume.ReadTask - started
    28. 18:56:36.158 [WriteThread-11] INFO c.taos.example.highvolume.SQLWriter - maxSQLLength=1048576
    29. 18:56:46.369 [main] INFO c.t.e.highvolume.FastWriteExample - count=18554448 speed=1855444
    30. 18:56:56.946 [main] INFO c.t.e.highvolume.FastWriteExample - count=39059660 speed=2050521
    31. 18:57:07.322 [main] INFO c.t.e.highvolume.FastWriteExample - count=59403604 speed=2034394
    32. 18:57:18.032 [main] INFO c.t.e.highvolume.FastWriteExample - count=80262938 speed=2085933
    33. 18:57:28.432 [main] INFO c.t.e.highvolume.FastWriteExample - count=101139906 speed=2087696
    34. 18:57:38.921 [main] INFO c.t.e.highvolume.FastWriteExample - count=121807202 speed=2066729
    35. 18:57:49.375 [main] INFO c.t.e.highvolume.FastWriteExample - count=142952417 speed=2114521
    36. 18:58:00.689 [main] INFO c.t.e.highvolume.FastWriteExample - count=163650306 speed=2069788
    37. 18:58:11.646 [main] INFO c.t.e.highvolume.FastWriteExample - count=185019808 speed=2136950

Program Inventory

Sample programs in Python uses multi-process and cross-process message queues.

Function/CLassDescription
main FunctionProgram entry point, create child processes and message queues
run_monitor_process FunctionCreate database, super table, calculate writing speed and output to console
run_read_task FunctionRead data and distribute to message queues
MockDataSource ClassSimulate data source, return next 1,000 rows of each table
run_write_task FunctionRead as much as possible data from message queue and write in batch
SQLWriter ClassWrite in SQL and create table utomatically
StmtWriter ClassWrite in parameter binding mode (not finished yet)

main function

main function is responsible for creating message queues and fork child processes, there are 3 kinds of child processes:

  1. Monitoring process, initializes database and calculating writing speed
  2. Reading process (n), reads data from data source
  3. Writing process (m), wirtes data into TDengine

main function provides 5 parameters:

  1. The number of reading tasks, default value is 1
  2. The number of writing tasks, default value is 1
  3. The number of tables, default value is 1,000
  4. The capacity of message queue, default value is 1,000,000 bytes
  5. The batch size in single write, default value is 3000
  1. def main():
  2. set_global_config()
  3. logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
  4. f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
  5. monitor_process = Process(target=run_monitor_process)
  6. monitor_process.start()
  7. time.sleep(3) # waiting for database ready.
  8. task_queues: List[Queue] = []
  9. # create task queues
  10. for i in range(WRITE_TASK_COUNT):
  11. queue = Queue(max_size_bytes=QUEUE_SIZE)
  12. task_queues.append(queue)
  13. # create write processes
  14. for i in range(WRITE_TASK_COUNT):
  15. p = Process(target=run_write_task, args=(i, task_queues[i]))
  16. p.start()
  17. logging.debug(f"WriteTask-{i} started with pid {p.pid}")
  18. write_processes.append(p)
  19. # create read processes
  20. for i in range(READ_TASK_COUNT):
  21. queues = assign_queues(i, task_queues)
  22. p = Process(target=run_read_task, args=(i, queues))
  23. p.start()
  24. logging.debug(f"ReadTask-{i} started with pid {p.pid}")
  25. read_processes.append(p)
  26. try:
  27. monitor_process.join()
  28. except KeyboardInterrupt:
  29. monitor_process.terminate()
  30. [p.terminate() for p in read_processes]
  31. [p.terminate() for p in write_processes]
  32. [q.close() for q in task_queues]
  33. def assign_queues(read_task_id, task_queues):
  34. """
  35. Compute target queues for a specific read task.
  36. """
  37. ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
  38. from_index = math.floor(read_task_id * ratio)
  39. end_index = math.ceil((read_task_id + 1) * ratio)
  40. return task_queues[from_index:end_index]
  41. if __name__ == '__main__':
  42. main()

view source code

run_monitor_process

Monitoring process initilizes database and monitoring writing speed.

  1. def run_monitor_process():
  2. log = logging.getLogger("DataBaseMonitor")
  3. conn = get_connection()
  4. conn.execute("DROP DATABASE IF EXISTS test")
  5. conn.execute("CREATE DATABASE test")
  6. conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
  7. "TAGS (location BINARY(64), groupId INT)")
  8. def get_count():
  9. res = conn.query("SELECT count(*) FROM test.meters")
  10. rows = res.fetch_all()
  11. return rows[0][0] if rows else 0
  12. last_count = 0
  13. while True:
  14. time.sleep(10)
  15. count = get_count()
  16. log.info(f"count={count} speed={(count - last_count) / 10}")
  17. last_count = count

view source code

run_read_task function

Reading process reads data from other data system and distributes to the message queue allocated for it.

  1. def run_read_task(task_id: int, task_queues: List[Queue]):
  2. table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
  3. data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
  4. try:
  5. for batch in data_source:
  6. for table_id, rows in batch:
  7. # hash data to different queue
  8. i = table_id % len(task_queues)
  9. # block putting forever when the queue is full
  10. task_queues[i].put_many(rows, block=True, timeout=-1)
  11. except KeyboardInterrupt:
  12. pass

view source code

MockDataSource

Below is the simulated data source, we assume table name exists in each generated data.

  1. import time
  2. class MockDataSource:
  3. samples = [
  4. "8.8,119,0.32,California.LosAngeles,0",
  5. "10.7,116,0.34,California.SanDiego,1",
  6. "9.9,111,0.33,California.SanJose,2",
  7. "8.9,113,0.329,California.Campbell,3",
  8. "9.4,118,0.141,California.SanFrancisco,4"
  9. ]
  10. def __init__(self, tb_name_prefix, table_count):
  11. self.table_name_prefix = tb_name_prefix + "_"
  12. self.table_count = table_count
  13. self.max_rows = 10000000
  14. self.current_ts = round(time.time() * 1000) - self.max_rows * 100
  15. # [(tableId, tableName, values),]
  16. self.data = self._init_data()
  17. def _init_data(self):
  18. lines = self.samples * (self.table_count // 5 + 1)
  19. data = []
  20. for i in range(self.table_count):
  21. table_name = self.table_name_prefix + str(i)
  22. data.append((i, table_name, lines[i])) # tableId, row
  23. return data
  24. def __iter__(self):
  25. self.row = 0
  26. return self
  27. def __next__(self):
  28. """
  29. next 1000 rows for each table.
  30. return: {tableId:[row,...]}
  31. """
  32. # generate 1000 timestamps
  33. ts = []
  34. for _ in range(1000):
  35. self.current_ts += 100
  36. ts.append(str(self.current_ts))
  37. # add timestamp to each row
  38. # [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])]
  39. result = []
  40. for table_id, table_name, values in self.data:
  41. rows = [table_name + ',' + t + ',' + values for t in ts]
  42. result.append((table_id, rows))
  43. return result

view source code

run_write_task function

Writing process tries to read as much as possible data from message queue and writes in batch.

  1. def run_write_task(task_id: int, queue: Queue):
  2. from sql_writer import SQLWriter
  3. log = logging.getLogger(f"WriteTask-{task_id}")
  4. writer = SQLWriter(get_connection)
  5. lines = None
  6. try:
  7. while True:
  8. try:
  9. # get as many as possible
  10. lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
  11. writer.process_lines(lines)
  12. except Empty:
  13. time.sleep(0.01)
  14. except KeyboardInterrupt:
  15. pass
  16. except BaseException as e:
  17. log.debug(f"lines={lines}")
  18. raise e

view source code

SQLWriter

SQLWriter class encapsulates the logic of composing SQL and writing data. Please be noted that the tables have not been created before writing, but are created automatically when catching the exception of table doesn’t exist. For other exceptions caught, the SQL which caused the exception are logged for you to debug. This class also checks the SQL length, if the SQL length is closed to maxSQLLength the SQL will be executed immediately. To improve writing efficiency, it’s better to increase maxSQLLength properly.

  1. import logging
  2. import taos
  3. class SQLWriter:
  4. log = logging.getLogger("SQLWriter")
  5. def __init__(self, get_connection_func):
  6. self._tb_values = {}
  7. self._tb_tags = {}
  8. self._conn = get_connection_func()
  9. self._max_sql_length = self.get_max_sql_length()
  10. self._conn.execute("USE test")
  11. def get_max_sql_length(self):
  12. rows = self._conn.query("SHOW variables").fetch_all()
  13. for r in rows:
  14. name = r[0]
  15. if name == "maxSQLLength":
  16. return int(r[1])
  17. return 1024 * 1024
  18. def process_lines(self, lines: str):
  19. """
  20. :param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
  21. """
  22. for line in lines:
  23. ps = line.split(",")
  24. table_name = ps[0]
  25. value = '(' + ",".join(ps[1:-2]) + ') '
  26. if table_name in self._tb_values:
  27. self._tb_values[table_name] += value
  28. else:
  29. self._tb_values[table_name] = value
  30. if table_name not in self._tb_tags:
  31. location = ps[-2]
  32. group_id = ps[-1]
  33. tag_value = f"('{location}',{group_id})"
  34. self._tb_tags[table_name] = tag_value
  35. self.flush()
  36. def flush(self):
  37. """
  38. Assemble INSERT statement and execute it.
  39. When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
  40. In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
  41. """
  42. sql = "INSERT INTO "
  43. sql_len = len(sql)
  44. buf = []
  45. for tb_name, values in self._tb_values.items():
  46. q = tb_name + " VALUES " + values
  47. if sql_len + len(q) >= self._max_sql_length:
  48. sql += " ".join(buf)
  49. self.execute_sql(sql)
  50. sql = "INSERT INTO "
  51. sql_len = len(sql)
  52. buf = []
  53. buf.append(q)
  54. sql_len += len(q)
  55. sql += " ".join(buf)
  56. self.execute_sql(sql)
  57. self._tb_values.clear()
  58. def execute_sql(self, sql):
  59. try:
  60. self._conn.execute(sql)
  61. except taos.Error as e:
  62. error_code = e.errno & 0xffff
  63. # Table does not exit
  64. if error_code == 0x362 or error_code == 0x218:
  65. self.create_tables()
  66. else:
  67. self.log.error("Execute SQL: %s", sql)
  68. raise e
  69. except BaseException as baseException:
  70. self.log.error("Execute SQL: %s", sql)
  71. raise baseException
  72. def create_tables(self):
  73. sql = "CREATE TABLE "
  74. for tb in self._tb_values.keys():
  75. tag_values = self._tb_tags[tb]
  76. sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
  77. try:
  78. self._conn.execute(sql)
  79. except BaseException as e:
  80. self.log.error("Execute SQL: %s", sql)
  81. raise e

view source code

Steps to Launch

Launch Sample Program in Python

  1. Prerequisities

    • TDengine client driver has been installed
    • Python3 has been installed, the the version >= 3.8
    • TDengine Python connector taospy has been installed
  2. Install faster-fifo to replace python builtin multiprocessing.Queue

    1. pip3 install faster-fifo
  3. Click the “Copy” in the above sample programs to copy fast_write_example.pysql_writer.py and mockdatasource.py.

  4. Execute the program

    1. python3 fast_write_example.py <READ_TASK_COUNT> <WRITE_TASK_COUNT> <TABLE_COUNT> <QUEUE_SIZE> <MAX_BATCH_SIZE>

    Below is the output of running on a server of 16 cores, 64GB memory and SSD hard disk.

    1. root@vm85$ python3 fast_write_example.py 8 8
    2. 2022-07-14 19:13:45,869 [root] - READ_TASK_COUNT=8, WRITE_TASK_COUNT=8, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
    3. 2022-07-14 19:13:48,882 [root] - WriteTask-0 started with pid 718347
    4. 2022-07-14 19:13:48,883 [root] - WriteTask-1 started with pid 718348
    5. 2022-07-14 19:13:48,884 [root] - WriteTask-2 started with pid 718349
    6. 2022-07-14 19:13:48,884 [root] - WriteTask-3 started with pid 718350
    7. 2022-07-14 19:13:48,885 [root] - WriteTask-4 started with pid 718351
    8. 2022-07-14 19:13:48,885 [root] - WriteTask-5 started with pid 718352
    9. 2022-07-14 19:13:48,886 [root] - WriteTask-6 started with pid 718353
    10. 2022-07-14 19:13:48,886 [root] - WriteTask-7 started with pid 718354
    11. 2022-07-14 19:13:48,887 [root] - ReadTask-0 started with pid 718355
    12. 2022-07-14 19:13:48,888 [root] - ReadTask-1 started with pid 718356
    13. 2022-07-14 19:13:48,889 [root] - ReadTask-2 started with pid 718357
    14. 2022-07-14 19:13:48,889 [root] - ReadTask-3 started with pid 718358
    15. 2022-07-14 19:13:48,890 [root] - ReadTask-4 started with pid 718359
    16. 2022-07-14 19:13:48,891 [root] - ReadTask-5 started with pid 718361
    17. 2022-07-14 19:13:48,892 [root] - ReadTask-6 started with pid 718364
    18. 2022-07-14 19:13:48,893 [root] - ReadTask-7 started with pid 718365
    19. 2022-07-14 19:13:56,042 [DataBaseMonitor] - count=6676310 speed=667631.0
    20. 2022-07-14 19:14:06,196 [DataBaseMonitor] - count=20004310 speed=1332800.0
    21. 2022-07-14 19:14:16,366 [DataBaseMonitor] - count=32290310 speed=1228600.0
    22. 2022-07-14 19:14:26,527 [DataBaseMonitor] - count=44438310 speed=1214800.0
    23. 2022-07-14 19:14:36,673 [DataBaseMonitor] - count=56608310 speed=1217000.0
    24. 2022-07-14 19:14:46,834 [DataBaseMonitor] - count=68757310 speed=1214900.0
    25. 2022-07-14 19:14:57,280 [DataBaseMonitor] - count=80992310 speed=1223500.0
    26. 2022-07-14 19:15:07,689 [DataBaseMonitor] - count=93805310 speed=1281300.0
    27. 2022-07-14 19:15:18,020 [DataBaseMonitor] - count=106111310 speed=1230600.0
    28. 2022-07-14 19:15:28,356 [DataBaseMonitor] - count=118394310 speed=1228300.0
    29. 2022-07-14 19:15:38,690 [DataBaseMonitor] - count=130742310 speed=1234800.0
    30. 2022-07-14 19:15:49,000 [DataBaseMonitor] - count=143051310 speed=1230900.0
    31. 2022-07-14 19:15:59,323 [DataBaseMonitor] - count=155276310 speed=1222500.0
    32. 2022-07-14 19:16:09,649 [DataBaseMonitor] - count=167603310 speed=1232700.0
    33. 2022-07-14 19:16:19,995 [DataBaseMonitor] - count=179976310 speed=1237300.0
High Performance Writing - 图2note

Don’t establish connection to TDengine in the parent process if using Python connector in multi-process way, otherwise all the connections in child processes are blocked always. This is a known issue.