Perform Stream Processing by SQL

This part provides a demo of performing real-time stream processing by SQL. You will be introduced to some basic concepts such as streams, queries and materialized views with some examples to demonstrate the power of our processing engine, such as the ease to use and dealing with complex queries.

Overview

One of the most important applications of stream processing is real-time business information analysis. Imagine that we are managing a supermarket and would like to analyze the sales information to adjust our marketing strategies.

Suppose we have two streams of data:

  1. info(product, category) // represents the category a product belongs to
  2. visit(product, user, length) // represents the length of time when a customer looks at a product

Unlike tables in traditional relational databases, a stream is an endless series of data which comes with time. Next, we will run some analysis on the two streams to get some useful information.

Requirements

Ensure you have deployed HStreamDB successfully. The easiest way is to follow quickstart to start a local cluster. Of course, you can also try other methods mentioned in the Deployment part.

We have mentioned that we have two streams, info and visit in the overview. Now let’s create them. Start an HStream SQL shell and run the following statements:

  1. > CREATE STREAM info;
  2. info
  3. > CREATE STREAM visit;
  4. visit

We have successfully created two streams.

Step 2: Create streaming queries

We can now create streaming queries on the streams. A query is a running task which fetches data from the stream(s) and produces results continuously. Let’s create a trivial query which fetches data from stream info and outputs them:

  1. > SELECT * FROM info EMIT CHANGES;

The query will keep running until you interrupt it. Next, we can just leave it there and start another query. It fetches data from the stream visit and outputs the maximum length of time of each product. Start a new SQL shell and run

  1. > SELECT product, MAX(length) AS max_len FROM visit GROUP BY product EMIT CHANGES;

Neither of the queries will print any results since we have not inserted any data yet. So let’s do that.

Step 3: Insert data into streams

There are multiple ways to insert data into the streams, such as client libraries and HStream IO, and the data inserted will all be cheated the same while processing. You can refer to guides for client usage or the overview of HStream IO.

For consistency and ease of demonstration, we would use SQL statements.

Start a new SQL shell and run:

  1. > INSERT INTO info (product, category) VALUES ("Apple", "Fruit");
  2. Done.
  3. > INSERT INTO visit (product, user, length) VALUES ("Apple", "Alice", 10);
  4. Done.
  5. > INSERT INTO visit (product, user, length) VALUES ("Apple", "Bob", 20);
  6. Done.
  7. > INSERT INTO visit (product, user, length) VALUES ("Apple", "Caleb", 10);
  8. Done.

Switch to the shells with running queries You should be able to see the expected outputs as follows:

  1. > SELECT * FROM info EMIT CHANGES;
  2. {"product":"Apple","category":"Fruit"}
  1. > SELECT product, MAX(length) AS max_len FROM visit GROUP BY product EMIT CHANGES;
  2. {"product":"Apple","max_len":10.0}
  3. {"product":"Apple","max_len":20.0}
  4. {"product":"Apple","max_len":20.0}

Note that max_len changes from 10 to 20, which is expected.

Step 3: Create materialized views

Now let’s do some more complex analysis. If we want to know the longest visit time of each category any time we need it, the best way is to create materialized views.

A materialized view is an object which contains the result of a query. In HStreamDB, the view is maintained and continuously updated in memory, which means we can read the results directly from the view right when needed without any extra computation. Thus getting results from a view is very fast.

Here we can create a view like

  1. > CREATE VIEW result AS SELECT info.category, MAX(visit.length) as max_length FROM info, visit WHERE info.product = visit.product GROUP BY info.category EMIT CHANGES;
  2. Done. Query ID: 1362152824401458

Note the query ID will be different to the one shown above. Now let’s try to get something from the view:

  1. > SELECT * FROM result;
  2. Done.

It outputs no data because we have not inserted any data into the streams since after the view is created. Let’s do it now:

  1. > INSERT INTO info (product, category) VALUES ("Apple", "Fruit");
  2. Done.
  3. > INSERT INTO info (product, category) VALUES ("Banana", "Fruit");
  4. Done.
  5. > INSERT INTO info (product, category) VALUES ("Carrot", "Vegetable");
  6. Done.
  7. > INSERT INTO info (product, category) VALUES ("Potato", "Vegetable");
  8. Done.
  9. > INSERT INTO visit (product, user, length) VALUES ("Apple", "Alice", 10);
  10. Done.
  11. > INSERT INTO visit (product, user, length) VALUES ("Apple", "Bob", 20);
  12. Done.
  13. > INSERT INTO visit (product, user, length) VALUES ("Carrot", "Bob", 50);
  14. Done.

Step 4: Get results from views

Now let’s find out what is in our view:

  1. > SELECT * FROM result;
  2. {"max_length":20.0,"info.category":"Fruit"}
  3. {"max_length":50.0,"info.category":"Vegetable"}

It works. Now insert more data and repeat the inspection:

  1. > INSERT INTO visit (product, user, length) VALUES ("Banana", "Alice", 40);
  2. Done.
  3. > INSERT INTO visit (product, user, length) VALUES ("Potato", "Eve", 60);
  4. Done.
  5. > SELECT * FROM result;
  6. {"max_length":40.0,"info.category":"Fruit"}
  7. {"max_length":60.0,"info.category":"Vegetable"}

The result is updated right away.

For a detailed introduction of the SQL, see HStream SQL.