External Integrations

Ingesting Data from Kafka

Citus can leverage existing Postgres data ingestion tools. For instance, we can use a tool called kafka-sink-pg-json to copy JSON messages from a Kafka topic into a database table. As a demonstration, we’ll create a kafka_test table and ingest data from the test topic with a custom mapping of JSON keys to table columns.

The easiest way to experiment with Kafka is using the Confluent platform, which includes Kafka, Zookeeper, and associated tools whose versions are verified to work together.

  1. # we're using Confluent 2.0 for kafka-sink-pg-json support
  2. curl -L http://packages.confluent.io/archive/2.0/confluent-2.0.0-2.11.7.tar.gz \
  3. | tar zx
  4. # Now get the jar and conf files for kafka-sink-pg-json
  5. mkdir sink
  6. curl -L https://github.com/justonedb/kafka-sink-pg-json/releases/download/v1.0.2/justone-jafka-sink-pg-json-1.0.zip -o sink.zip
  7. unzip -d sink $_ && rm $_

The download of kafka-sink-pg-json contains some configuration files. We want to connect to the coordinator Citus node, so we must edit the configuration file sink/justone-kafka-sink-pg-json-connector.properties:

  1. # add to sink/justone-kafka-sink-pg-json-connector.properties
  2. # the kafka topic we will use
  3. topics=test
  4. # db connection info
  5. # use your own settings here
  6. db.host=localhost:5432
  7. db.database=postgres
  8. db.username=postgres
  9. db.password=bar
  10. # the schema and table we will use
  11. db.schema=public
  12. db.table=kafka_test
  13. # the JSON keys, and columns to store them
  14. db.json.parse=/@a,/@b
  15. db.columns=a,b

Notice db.columns and db.json.parse. The elements of these lists match up, with the items in db.json.parse specifying where to find values inside incoming JSON objects.

Note

The paths in db.json.parse are written in a language that allows some flexibility in getting values out of JSON. Given the following JSON,

  1. {
  2. "identity":71293145,
  3. "location": {
  4. "latitude":51.5009449,
  5. "longitude":-2.4773414
  6. },
  7. "acceleration":[0.01,0.0,0.0]
  8. }

here are some example paths and what they match:

  • /@identity - the path to element 71293145.

  • /@location/@longitude - the path to element -2.4773414.

  • /@acceleration/#0 - the path to element 0.01

  • /@location - the path to element {"latitude":51.5009449, "longitude":-2.4773414}

Our own scenario is simple. Our events will be objects like {"a":1, "b":2}. The parser will pull those values into eponymous columns.

Now that the configuration file is set up, it’s time to prepare the database. Connect to the coordinator node with psql and run this:

  1. -- create metadata tables for kafka-sink-pg-json
  2. \i sink/install-justone-kafka-sink-pg-1.0.sql
  3. -- create and distribute target ingestion table
  4. create table kafka_test ( a int, b int );
  5. select create_distributed_table('kafka_test', 'a');

Start the Kafka machinery:

  1. # save some typing
  2. export C=confluent-2.0.0
  3. # start zookeeper
  4. $C/bin/zookeeper-server-start \
  5. $C/etc/kafka/zookeeper.properties
  6. # start kafka server
  7. $C/bin/kafka-server-start \
  8. $C/etc/kafka/server.properties
  9. # create the topic we'll be reading/writing
  10. $C/bin/kafka-topics --create --zookeeper localhost:2181 \
  11. --replication-factor 1 --partitions 1 \
  12. --topic test

Run the ingestion program:

  1. # the jar files for this are in "sink"
  2. export CLASSPATH=$PWD/sink/*
  3. # Watch for new events in topic and insert them
  4. $C/bin/connect-standalone \
  5. sink/justone-kafka-sink-pg-json-standalone.properties \
  6. sink/justone-kafka-sink-pg-json-connector.properties

At this point Kafka-Connect is watching the test topic, and will parse events there and insert them into kafka_test. Let’s send an event from the command line.

  1. echo '{"a":42,"b":12}' | \
  2. $C/bin/kafka-console-producer --broker-list localhost:9092 --topic test

After a small delay the new row will show up in the database.

  1. select * from kafka_test;
  2. ┌────┬────┐
  3. a b
  4. ├────┼────┤
  5. 42 12
  6. └────┴────┘

Caveats

  • At the time of this writing, kafka-sink-pg-json requires Kafka version 0.9 or earlier.

  • The kafka-sink-pg-json connector config file does not provide a way to connect with SSL support, so this tool will not work with Citus Cloud which requires secure connections.

  • A malformed JSON string in the Kafka topic will cause the tool to become stuck. Manual intervention in the topic is required to process more events.

Ingesting Data from Spark

People sometimes use Spark to transform Kafka data, such as by adding computed values. In this section we’ll see how to ingest Spark dataframes into a distributed Citus table.

First let’s start a local Spark cluster. It has several moving parts, so the easiest way is to run the pieces with docker-compose.

  1. wget https://raw.githubusercontent.com/gettyimages/docker-spark/master/docker-compose.yml
  2. # this may require "sudo" depending on the docker daemon configuration
  3. docker-compose up

To do the ingestion into PostgreSQL, we’ll be writing custom Scala code. We’ll use the Scala Build Tool (SBT) to load dependencies and run our code, so download SBT and install it on your machine.

Next create a new directory for our project.

  1. mkdir sparkcitus

Create a file called sparkcitus/build.sbt to tell SBT our project configuration, and add this:

  1. // add this to build.sbt
  2. name := "sparkcitus"
  3. version := "1.0"
  4. scalaVersion := "2.10.4"
  5. resolvers ++= Seq(
  6. "Maven Central" at "http://central.maven.org/maven2/"
  7. )
  8. libraryDependencies ++= Seq(
  9. "org.apache.spark" %% "spark-core" % "2.2.1",
  10. "org.apache.spark" %% "spark-sql" % "2.2.1",
  11. "org.postgresql" % "postgresql" % "42.2.2"
  12. )

Next create a helper Scala class for doing ingestion through JDBC. Add the following to sparkcitus/copy.scala:

  1. import java.io.InputStream
  2. import java.sql.DriverManager
  3. import java.util.Properties
  4. import org.apache.spark.sql.{DataFrame, Row}
  5. import org.postgresql.copy.CopyManager
  6. import org.postgresql.core.BaseConnection
  7. object CopyHelper {
  8. def rowsToInputStream(rows: Iterator[Row]): InputStream = {
  9. val bytes: Iterator[Byte] = rows.map { row =>
  10. (row.toSeq
  11. .map { v =>
  12. if (v == null) {
  13. """\N"""
  14. } else {
  15. "\"" + v.toString.replaceAll("\"", "\"\"") + "\""
  16. }
  17. }
  18. .mkString("\t") + "\n").getBytes
  19. }.flatten
  20. new InputStream {
  21. override def read(): Int =
  22. if (bytes.hasNext) {
  23. bytes.next & 0xff // make the signed byte an unsigned int
  24. } else {
  25. -1
  26. }
  27. }
  28. }
  29. def copyIn(url: String, df: DataFrame, table: String):Unit = {
  30. var cols = df.columns.mkString(",")
  31. df.foreachPartition { rows =>
  32. val conn = DriverManager.getConnection(url)
  33. try {
  34. val cm = new CopyManager(conn.asInstanceOf[BaseConnection])
  35. cm.copyIn(
  36. s"COPY $table ($cols) " + """FROM STDIN WITH (NULL '\N', FORMAT CSV, DELIMITER E'\t')""",
  37. rowsToInputStream(rows))
  38. ()
  39. } finally {
  40. conn.close()
  41. }
  42. }
  43. }
  44. }

Continuing the setup, save some sample data into people.json. Note the intentional lack of surrounding square brackets. Later we’ll create a Spark dataframe from the data.

  1. {"name":"Tanya Rosenau" , "age": 24},
  2. {"name":"Rocky Slay" , "age": 85},
  3. {"name":"Tama Erdmann" , "age": 48},
  4. {"name":"Jared Olivero" , "age": 42},
  5. {"name":"Gudrun Shannon" , "age": 53},
  6. {"name":"Quentin Yoon" , "age": 32},
  7. {"name":"Yanira Huckstep" , "age": 53},
  8. {"name":"Brendon Wesley" , "age": 19},
  9. {"name":"Minda Nordeen" , "age": 79},
  10. {"name":"Katina Woodell" , "age": 83},
  11. {"name":"Nevada Mckinnon" , "age": 65},
  12. {"name":"Georgine Mcbee" , "age": 56},
  13. {"name":"Mittie Vanetten" , "age": 17},
  14. {"name":"Lecia Boyett" , "age": 37},
  15. {"name":"Tobias Mickel" , "age": 69},
  16. {"name":"Jina Mccook" , "age": 82},
  17. {"name":"Cassidy Turrell" , "age": 37},
  18. {"name":"Cherly Skalski" , "age": 29},
  19. {"name":"Reita Bey" , "age": 69},
  20. {"name":"Keely Symes" , "age": 34}

Finally, create and distribute a table in Citus:

  1. create table spark_test ( name text, age integer );
  2. select create_distributed_table('spark_test', 'name');

Now we’re ready to hook everything together. Start up sbt:

  1. # run this in the sparkcitus directory
  2. sbt

Once inside sbt, compile the project and then go into the “console” which is a Scala repl that loads our code and dependencies:

  1. sbt:sparkcitus> compile
  2. [success] Total time: 3 s
  3. sbt:sparkcitus> console
  4. [info] Starting scala interpreter...
  5. scala>

Type these Scala commands into the console:

  1. // inside the sbt scala interpreter
  2. import org.apache.spark.sql.SparkSession
  3. // open a session to the Spark cluster
  4. val spark = SparkSession.builder().appName("sparkcitus").config("spark.master", "local").getOrCreate()
  5. // load our sample data into Spark
  6. val df = spark.read.json("people.json")
  7. // this is a simple connection url (it assumes Citus
  8. // is running on localhost:5432), but more complicated
  9. // JDBC urls differ subtly from Postgres urls, see:
  10. // https://jdbc.postgresql.org/documentation/head/connect.html
  11. val url = "jdbc:postgresql://localhost/postgres"
  12. // ingest the data frame using our CopyHelper class
  13. CopyHelper.copyIn(url, df, "spark_test")

This uses the CopyHelper to ingest the ionformation. At this point the data will appear in the distributed table.

Note

Our method of ingesting the dataframe is straightforward but doesn’t protect against Spark errors. Spark guarantees “at least once” semantics, i.e. a read error can cause a subsequent read to encounter previously seen data.

A more complicated, but robust, approach is to use the custom Spark partitioner spark-citus so that partitions match up exactly with Citus shards. This allows running transactions directly on worker nodes which can rollback on read failure. See the presentation linked in that repository for more information.

Business Intelligence with Tableau

Tableau is a popular business intelligence and analytics tool for databases. Citus and Tableau provide a seamless experience for performing ad-hoc reporting or analysis.

You can now interact with Tableau using the following steps.

  • Choose PostgreSQL from the “Add a Connection” menu.

    postgres option selected in menu

  • Enter the connection details for the coordinator node of your Citus cluster. (Note if you’re connecting to Citus Cloud you must select “Require SSL.”)

    postgres connection details form

  • Once you connect to Tableau, you will see the tables in your database. You can define your data source by dragging and dropping tables from the “Table” pane. Or, you can run a custom query through “New Custom SQL”.

  • You can create your own sheets by dragging and dropping dimensions, measures, and filters. You can also create an interactive user interface with Tableau. To do this, Tableau automatically chooses a date range over the data. Citus can compute aggregations over this range in human real-time.

united states map with bar chart