Query data with Pulsar SQL

在 Pulsar 中查询数据之前,你需要安装 Pulsar 和内置连接器。

要求

  1. 安装 Pulsar
  2. 安装 Pulsar 内置连接器

在 Pulsar 中查询数据

要使用 Pulsar SQL 查询数据,请完成以下步骤。

  1. 启动 Pulsar 独立集群。
  1. ./bin/pulsar standalone
  1. 启动 Pulsar SQL worker。
  1. ./bin/pulsar sql-worker run
  1. 初始化 Pulsar 独立集群和 SQL worker,运行 SQL CLI 。
  1. ./bin/pulsar sql
  1. 使用 SQL 命令测试。
  1. presto> show catalogs;
  2. Catalog
  3. ---------
  4. pulsar
  5. system
  6. (2 rows)
  7. Query 20180829_211752_00004_7qpwh, FINISHED, 1 node
  8. Splits: 19 total, 19 done (100.00%)
  9. 0:00 [0 rows, 0B] [0 rows/s, 0B/s]
  10. presto> show schemas in pulsar;
  11. Schema
  12. -----------------------
  13. information_schema
  14. public/default
  15. public/functions
  16. sample/standalone/ns1
  17. (4 rows)
  18. Query 20180829_211818_00005_7qpwh, FINISHED, 1 node
  19. Splits: 19 total, 19 done (100.00%)
  20. 0:00 [4 rows, 89B] [21 rows/s, 471B/s]
  21. presto> show tables in pulsar."public/default";
  22. Table
  23. -------
  24. (0 rows)
  25. Query 20180829_211839_00006_7qpwh, FINISHED, 1 node
  26. Splits: 19 total, 19 done (100.00%)
  27. 0:00 [0 rows, 0B] [0 rows/s, 0B/s]

由于 Pulsar 中没有数据,因此没有返回记录。

  1. Start the built-in connector DataGeneratorSource and ingest some mock data.
  1. ./bin/pulsar-admin sources create --name generator --destinationTopicName generator_test --source-type data-generator

然后可以在命名空间“public/default”中查询一个主题。

  1. presto> show tables in pulsar."public/default";
  2. Table
  3. ----------------
  4. generator_test
  5. (1 row)
  6. Query 20180829_213202_00000_csyeu, FINISHED, 1 node
  7. Splits: 19 total, 19 done (100.00%)
  8. 0:02 [1 rows, 38B] [0 rows/s, 17B/s]

现在可以查询主题“generator_test”中的数据:

  1. presto> select * from pulsar."public/default".generator_test;
  2. firstname | middlename | lastname | email | username | password | telephonenumber | age | companyemail | nationalidentitycardnumber |
  3. -------------+-------------+-------------+----------------------------------+--------------+----------+-----------------+-----+-----------------------------------------------+----------------------------+
  4. Genesis | Katherine | Wiley | genesis.wiley@gmail.com | genesisw | y9D2dtU3 | 959-197-1860 | 71 | genesis.wiley@interdemconsulting.eu | 880-58-9247 |
  5. Brayden | | Stanton | brayden.stanton@yahoo.com | braydens | ZnjmhXik | 220-027-867 | 81 | brayden.stanton@supermemo.eu | 604-60-7069 |
  6. Benjamin | Julian | Velasquez | benjamin.velasquez@yahoo.com | benjaminv | 8Bc7m3eb | 298-377-0062 | 21 | benjamin.velasquez@hostesltd.biz | 213-32-5882 |
  7. Michael | Thomas | Donovan | donovan@mail.com | michaeld | OqBm9MLs | 078-134-4685 | 55 | michael.donovan@memortech.eu | 443-30-3442 |
  8. Brooklyn | Avery | Roach | brooklynroach@yahoo.com | broach | IxtBLafO | 387-786-2998 | 68 | brooklyn.roach@warst.biz | 085-88-3973 |
  9. Skylar | | Bradshaw | skylarbradshaw@yahoo.com | skylarb | p6eC6cKy | 210-872-608 | 96 | skylar.bradshaw@flyhigh.eu | 453-46-0334 |
  10. .
  11. .
  12. .

可以查询模拟数据。

查询自己的数据

If you want to query your own data, you need to ingest your own data first. You can write a simple producer and write custom defined data to Pulsar. The following is an example.

  1. public class TestProducer {
  2. public static class Foo {
  3. private int field1 = 1;
  4. private String field2;
  5. private long field3;
  6. public Foo() {
  7. }
  8. public int getField1() {
  9. return field1;
  10. }
  11. public void setField1(int field1) {
  12. this.field1 = field1;
  13. }
  14. public String getField2() {
  15. return field2;
  16. }
  17. public void setField2(String field2) {
  18. this.field2 = field2;
  19. }
  20. public long getField3() {
  21. return field3;
  22. }
  23. public void setField3(long field3) {
  24. this.field3 = field3;
  25. }
  26. }
  27. public static void main(String[] args) throws Exception {
  28. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  29. Producer<Foo> producer = pulsarClient.newProducer(AvroSchema.of(Foo.class)).topic("test_topic").create();
  30. for (int i = 0; i < 1000; i++) {
  31. Foo foo = new Foo();
  32. foo.setField1(i);
  33. foo.setField2("foo" + i);
  34. foo.setField3(System.currentTimeMillis());
  35. producer.newMessage().value(foo).send();
  36. }
  37. producer.close();
  38. pulsarClient.close();
  39. }
  40. }