This page shows how to insert data into QuestDB using different programming languages and tools. To ingest data to a running instance, there are three main methods that can be used:
- InfluxDB line protocol which provides flexibility, ease of use, and high ingestion rates
- Postgres wire protocol for compatibility with a range of clients
- Rest API which can be used for importing datasets from CSV
Prerequisites
This page assumes that QuestDB is running and accessible. QuestDB can be run using either Docker, the binaries or Homebrew for macOS users.
InfluxDB line protocol
QuestDB implements InfluxDB line protocol which is accessible by default on TCP port 9009
. This allows using QuestDB as a drop-in replacement for InfluxDB and others implementing the protocol. Configuration settings for ingestion using this protocol can be set for for Influx line over TCP and Influx line over UDP.
More information on the InfluxDB line protocol implementation with details on message format, ports, authentication and examples can be found on the InfluxDB API reference page. Additionally, a guide on the Telegraf agent for collecting and sending metrics to QuestDB via this protocol can be found on the Telegraf guide.
<Tabs defaultValue=”nodejs” values={[ { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, { label: “Java”, value: “java” } ]}>
const net = require("net")
const client = new net.Socket()
const HOST = "localhost"
const PORT = 9009
function run() {
client.connect(PORT, HOST, () => {
const rows = [
`trades,name=test_ilp1 value=12.4 ${Date.now() * 1e6}`,
`trades,name=test_ilp2 value=11.4 ${Date.now() * 1e6}`,
]
rows.forEach((row) => {
client.write(`${row}\n`)
})
client.destroy()
})
client.on("data", function (data) {
console.log("Received: " + data)
})
client.on("close", function () {
console.log("Connection closed")
})
}
run()
package main
import (
"fmt"
"io/ioutil"
"net"
"time"
)
func main() {
host := "127.0.0.1:9009"
tcpAddr, err := net.ResolveTCPAddr("tcp4", host)
checkErr(err)
rows := [2]string{
fmt.Sprintf("trades,name=test_ilp1 value=12.4 %d", time.Now().UnixNano()),
fmt.Sprintf("trades,name=test_ilp2 value=11.4 %d", time.Now().UnixNano()),
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
checkErr(err)
defer conn.Close()
for _, s := range rows {
_, err = conn.Write([]byte(fmt.Sprintf("%s\n", s)))
checkErr(err)
}
result, err := ioutil.ReadAll(conn)
checkErr(err)
fmt.Println(string(result))
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
import io.questdb.cutlass.line.LineProtoSender;
import io.questdb.cutlass.line.tcp.LineTCPProtoSender;
import io.questdb.network.Net;
import io.questdb.std.Os;
public class LineTCPSenderMain {
/*
Maven:
<dependency>
<groupId>org.questdb</groupId>
<artifactId>questdb</artifactId>
<version>{@version@}</version>
</dependency>
Gradle:
compile group: 'org.questdb', name: 'questdb', version: '{@version@}'
*/
public static void main(String[] args) {
String hostIPv4 = "127.0.0.1";
int port = 9009;
int bufferCapacity = 256 * 1024;
try (LineProtoSender sender = new LineTCPProtoSender(Net.parseIPv4(hostIPv4), port, bufferCapacity)) {
sender
.metric("trades")
.tag("name", "test_ilp1")
.field("value", 12.4)
.$(Os.currentTimeNanos());
sender
.metric("trades")
.tag("name", "test_ilp2")
.field("value", 11.4)
.$(Os.currentTimeNanos());
sender.flush();
}
}
}
Postgres compatibility
You can query data using the Postgres endpoint that QuestDB exposes. This is accessible via port 8812
by default. More information on the Postgres wire protocol implementation with details on supported features can be found on the Postgres API reference page.
<Tabs defaultValue=”nodejs” values={[ { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, { label: “Rust”, value: “rust” }, { label: “Java”, value: “java” }, { label: “Python”, value: “python” }, ]}>
This example uses the pg
package which allows for quickly building queries using PostgreSQL wire protocol. Details on the use of this package can be found on the node-postgres documentation.
This example uses naive Date.now() * 1000
inserts for Timestamp types in microsecond resolution. For accurate microsecond timestamps, the node-microtime package can be used which makes system calls to tv_usec
from C++.
const { Client } = require("pg")
const start = async () => {
try {
const client = new Client({
database: "qdb",
host: "127.0.0.1",
password: "quest",
port: 8812,
user: "admin",
})
await client.connect()
const createTable = await client.query(
"CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);",
)
console.log(createTable)
const insertData = await client.query(
"INSERT INTO trades VALUES($1, $2, $3, $4);",
[Date.now() * 1000, Date.now(), "node pg example", 123],
)
await client.query("COMMIT")
console.log(insertData)
for (let rows = 0; rows < 10; rows++) {
// Providing a 'name' field allows for prepared statements / bind variables
const query = {
name: "insert-values",
text: "INSERT INTO trades VALUES($1, $2, $3, $4);",
values: [Date.now() * 1000, Date.now(), "node pg prep statement", rows],
}
const preparedStatement = await client.query(query)
}
await client.query("COMMIT")
const readAll = await client.query("SELECT * FROM trades")
console.log(readAll.rows)
await client.end()
} catch (e) {
console.log(e)
}
}
start()
This example uses the pgx driver and toolkit for postgres in Go. More details on the use of this toolkit can be found on the GitHub repository for pgx.
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/jackc/pgx/v4"
)
var conn *pgx.Conn
var err error
func main() {
ctx := context.Background()
conn, _ = pgx.Connect(ctx, "postgresql://admin:quest@localhost:8812/qdb")
defer conn.Close(ctx)
// text-based query
_, err := conn.Exec(ctx, "CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")
if err != nil {
log.Fatalln(err)
}
// Prepared statement given the name 'ps1'
_, err = conn.Prepare(ctx, "ps1", "INSERT INTO trades VALUES($1,$2,$3,$4)")
if err != nil {
log.Fatalln(err)
}
for i := 0; i < 10; i++ {
// Execute 'ps1' statement with a string and the loop iterator value
_, err = conn.Exec(ctx, "ps1", time.Now(), time.Now().Round(time.Millisecond), "go prepared statement", i+1)
if err != nil {
log.Fatalln(err)
}
}
// Read all rows from table
rows, err := conn.Query(ctx, "SELECT * FROM trades")
fmt.Println("Reading from trades table:")
for rows.Next() {
var name string
var value int64
var ts time.Time
var date time.Time
err = rows.Scan(&ts, &date, &name, &value)
fmt.Println(ts, date, name, value)
}
err = conn.Close(ctx)
}
The following example shows how to use parameterized queries and prepared statements using the rust-postgres client.
use postgres::{Client, NoTls, Error};
use chrono::{Utc};
use std::time::SystemTime;
fn main() -> Result<(), Error> {
let mut client = Client::connect("postgresql://admin:quest@localhost:8812/qdb", NoTls)?;
// Basic query
client.batch_execute("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")?;
// Parameterized query
let name: &str = "rust example";
let val: i32 = 123;
let utc = Utc::now();
let sys_time = SystemTime::now();
client.execute(
"INSERT INTO trades VALUES($1,$2,$3,$4)",
&[&utc.naive_local(), &sys_time, &name, &val],
)?;
// Prepared statement
let mut txn = client.transaction()?;
let statement = txn.prepare("insert into trades values ($1,$2,$3,$4)")?;
for value in 0..10 {
let utc = Utc::now();
let sys_time = SystemTime::now();
txn.execute(&statement, &[&utc.naive_local(), &sys_time, &name, &value])?;
}
txn.commit()?;
println!("import finished");
Ok(())
}
package com.myco;
import java.sql.*;
import java.util.Properties;
class App {
public static void main(String[] args) throws SQLException {
Properties properties = new Properties();
properties.setProperty("user", "admin");
properties.setProperty("password", "quest");
properties.setProperty("sslmode", "disable");
final Connection connection = DriverManager.getConnection("jdbc:postgresql://localhost:8812/qdb", properties);
connection.setAutoCommit(false);
final PreparedStatement statement = connection.prepareStatement("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);");
statement.execute();
try (PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO TRADES VALUES (?, ?, ?, ?)")) {
preparedStatement.setTimestamp(1, new Timestamp(io.questdb.std.Os.currentTimeMicros()));
preparedStatement.setDate(2, new Date(System.currentTimeMillis()));
preparedStatement.setString(3, "abc");
preparedStatement.setInt(4, 123);
preparedStatement.execute();
}
System.out.println("Done");
connection.close();
}
}
This example uses the psychopg2 database adapter which does not support prepared statements (bind variables). This functionality is on the roadmap for the antecedent psychopg3 adapter.
import psycopg2 as pg
import datetime as dt
try:
connection = pg.connect(user="admin",
password="quest",
host="127.0.0.1",
port="8812",
database="qdb")
cursor = connection.cursor()
# text-only query
cursor.execute("CREATE TABLE IF NOT EXISTS trades (ts TIMESTAMP, date DATE, name STRING, value INT) timestamp(ts);")
# insert 10 records
for x in range(10):
now = dt.datetime.utcnow()
date = dt.datetime.now().date()
cursor.execute("""
INSERT INTO trades
VALUES (%s, %s, %s, %s);
""", (now, date, "python example", x))
# commit records
connection.commit()
cursor.execute("SELECT * FROM trades;")
records = cursor.fetchall()
for row in records:
print(row)
finally:
if (connection):
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
REST API
QuestDB exposes a REST API for compatibility with a wide range of libraries and tools. The REST API is accessible on port 9000
and has the following entrypoints:
/imp
- import data/exec
- execute an SQL statement
More details on the use of these entrypoints can be found on the REST API reference page.
/imp
endpoint
The /imp
endpoint allows for importing a CSV file directly.
import Tabs from “@theme/Tabs” import TabItem from “@theme/TabItem”
<Tabs defaultValue=”curl” values={[ { label: “cURL”, value: “curl” }, { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, ]}>
This example imports a CSV file with automatic schema detection.
curl -F data=@data.csv http://localhost:9000/imp
This example overwrites an existing table, specifies a timestamp format and a designated timestamp column. For more information on the optional parameters for specifying timestamp formats, partitioning and renaming tables, see the REST API documentation.
curl \
-F schema='[{"name":"ts", "type": "TIMESTAMP", "pattern": "yyyy-MM-dd - HH:mm:ss"}]' \
-F data=@weather.csv 'http://localhost:9000/imp?overwrite=true×tamp=ts'
const fetch = require("node-fetch")
const FormData = require("form-data")
const fs = require("fs")
const qs = require("querystring")
const HOST = "http://localhost:9000"
async function run() {
const form = new FormData()
form.append("data", fs.readFileSync(__dirname + "/data.csv"), {
filename: fileMetadata.name,
contentType: "application/octet-stream",
})
try {
const r = await fetch(`${HOST}/imp`, {
method: "POST",
body: form,
headers: form.getHeaders(),
})
console.log(r)
} catch (e) {
console.error(e)
}
}
run()
package main
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"mime/multipart"
"net/http"
"net/url"
"os"
)
func main() {
u, err := url.Parse("http://localhost:9000")
checkErr(err)
u.Path += "imp"
url := fmt.Sprintf("%v", u)
fileName := "/path/to/data.csv"
file, err := os.Open(fileName)
checkErr(err)
defer file.Close()
buf := new(bytes.Buffer)
writer := multipart.NewWriter(buf)
uploadFile, _ := writer.CreateFormFile("data", "data.csv")
_, err = io.Copy(uploadFile, file)
checkErr(err)
writer.Close()
req, err := http.NewRequest(http.MethodPut, url, buf)
checkErr(err)
req.Header.Add("Content-Type", writer.FormDataContentType())
client := &http.Client{}
res, err := client.Do(req)
checkErr(err)
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
checkErr(err)
log.Println(string(body))
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
/exec
endpoint
Alternatively, the /exec
endpoint can be used to create a table and the INSERT
statement can be used to populate it with values:
<Tabs defaultValue=”curl” values={[ { label: “cURL”, value: “curl” }, { label: “NodeJS”, value: “nodejs” }, { label: “Go”, value: “go” }, ]}>
# Create Table
curl -G \
--data-urlencode "query=CREATE TABLE IF NOT EXISTS trades(name STRING, value INT)" \
http://localhost:9000/exec
# Insert a row
curl -G \
--data-urlencode "query=INSERT INTO trades VALUES('abc', 123456);" \
http://localhost:9000/exec
Note that these two queries can be combined into a single curl request:
curl -G \
--data-urlencode "query=CREATE TABLE IF NOT EXISTS trades(name STRING, value INT);\
INSERT INTO trades VALUES('abc', 123456);" \
http://localhost:9000/exec
The node-fetch
package can be installed using npm i node-fetch
.
const fetch = require("node-fetch")
const qs = require("querystring")
const HOST = "http://localhost:9000"
async function createTable() {
try {
const queryData = {
query: "CREATE TABLE IF NOT EXISTS trades (name STRING, value INT);",
}
const response = await fetch(`${HOST}/exec?${qs.encode(queryData)}`)
const json = await response.json()
console.log(json)
} catch (error) {
console.log(error)
}
}
async function insertData() {
try {
const queryData = {
query: "INSERT INTO trades VALUES('abc', 123456);",
}
const response = await fetch(`${HOST}/exec?${qs.encode(queryData)}`)
const json = await response.json()
console.log(json)
} catch (error) {
console.log(error)
}
}
createTable()
insertData()
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
)
func main() {
u, err := url.Parse("http://localhost:9000")
checkErr(err)
u.Path += "exec"
params := url.Values{}
params.Add("query", `
CREATE TABLE IF NOT EXISTS
trades (name STRING, value INT);
INSERT INTO
trades
VALUES(
"abc",
123456
);
`)
u.RawQuery = params.Encode()
url := fmt.Sprintf("%v", u)
res, err := http.Get(url)
checkErr(err)
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
checkErr(err)
log.Println(string(body))
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
Web Console
By default, QuestDB has an embedded Web Console running at http://\[server-address\]:9000. When running locally, this is accessible at http://localhost:9000. The Web Console can be used to explore table schemas, visualizing query results as tables or graphs, and importing datasets from CSV files. For details on these components, refer to the Web Console reference page.