SQL Program Dependency Analyze

Consider the SQL program shown below, which may be a common case to construct a full AI pipeline using pure SQL (with SQLFlow). The below SQL program is trying to:

  1. Preprocess the original data and put the data in one temp table for training
  2. Train one DNN model and one XGBoost model.
  3. Run model explanation on both models for later analysis work.
  4. Clean up the training table.
  1. -- random sampling 1,000,000 samples from one month's data,
  2. -- fill in empty data, and put the result to "training_data"
  3. CREATE TABLE training_data AS
  4. SELECT COALESCE(field1, 0), COALESCE(NULLIF(field2, ""), 0) label
  5. FROM user_visits
  6. WHERE dt<'20200313' and dt>'20200213'
  7. ORDER BY RAND() LIMIT 1000000;
  8. -- do model training
  9. SELECT * FROM training_data
  10. TO TRAIN DNNClassifier
  11. WITH model.n_classes=2, model.hidden_units=[4096,1024,256]
  12. LABEL 'label'
  13. INTO my_dnn_model;
  14. -- train another model using XGBoost
  15. SELECT * FROM training_data
  16. TO TRAIN xgboost.gbtree
  17. WITH objective="binary:logistic"
  18. LABEL 'label'
  19. INTO my_xgb_model;
  20. -- generate explain results for both model for analyze
  21. SELECT * FROM training_data ORDER BY RAND() LIMIT 10000
  22. TO EXPLAIN my_dnn_model;
  23. SELECT * FROM training_data ORDER BY RAND() LIMIT 10000
  24. TO EXPLAIN my_xgb_model;
  25. -- clean up training table
  26. DROP TABLE training_data;

It’s simple to figure out that we can run the two training statement concurrently, and run the corresponding explain statement when one training statement finishes. The execution flow should look like:

SQL Program Dependency Analyze - 图1

Analyze the SQL Program Execution Dependency

By analyzing the SQL program, we are able to figure out which statements can be executed concurrently. Since the SQL statements are often submitted to a distributed cluster like Hive or MaxCompute. Running the SQL statements concurrently can make full use of the cluster computing resource, shrink the execution time.

To achieve this, we need to analyze the hole SQL program and find out what tables are manipulated by each statement. One statement can read, write or do both on some tables, while other statements can also read, write the same table, in the above case, statement 1 is reading table user_visits and writing table training_data, statement 2,3,4,5 are reading table training_data, so they must start only when statement 1 is finished.

We can do this by following below steps:

  1. Parse every SQL statement, return a map recording read/write type of the tables that the current statement is manipulating. We’ll get a list of this map in order, representing each SQL statement. This will be done in the third-party parsers.
  2. Parse the SQLFlow extended SQL if there are any, return a similar map recording read/write type of the models of the current statement (in our Go parser).
  3. Construct a graph using the following rules:
    1. The statements are nodes in the graph, they should run in the original order.
    2. If one statement reads a table/model, then construct one graph edge from statements that write the table to the current statement.
  4. Execute this graph by submitting the graph as an Argo/Tekton workflow, each step is one single statement.

Analyze SQL Statement Input and Output Tables

To analyze the input and output tables for each statement, we call the external parsers (Hive, Calcite, MySQL) to get the AST (https://en.wikipedia.org/wiki/Abstract\_syntax\_tree), for statements that have SQLFlow extended syntax like TO TRAIN, we’ll analyze the standard SQL statement part in the external parsers and then analyze the model read and write using the SQLFlow extended parser.

When going through the SQL statement, we should identify table read or write following below rules:

  • table reads:
    1. SELECT <hints> FROM <table_name>: reads table <table_name>.
    2. SELECT <hints> FROM <table_name> <AS> <table_alias>: reads table <table_name>.
    3. SELECT ... LEFT JOIN <table_name>: reads <table_name>. Same as INNER JOIN, RIGHT JOIN, FULL JOIN, SELF JOIN, UNION.
  • table writes:
    1. CREATE TABLE <table_name> AS <SELECT Query>: writes table <table_name>.
    2. INSERT INTO <table_name> ...: writes table <table_name>.
    3. UPDATE <table_name> ...: writes table <table_name>.
    4. DELETE FROM <table_name> ..., TRUNCATE ...: writes <table_name>.
    5. ALTER TABLE <table_name>: writes <table_name>.

Analyzing Database Context

Some SQL program has USE clause which defines the database the following SQL statements are using.

  1. USE db;
  2. CREATE TABLE my_table AS ...
  3. SELECT * FROM my_table TO TRAIN ...
  4. USE db2;
  5. SELECT * FROM my_table_in_db2 TO TRAIN ...

We need to go over the SQL program and get the last USE for each SQL statement, and modify the table name without prefix db. to db.table in above step after we get the input/output tables for each statement, so that we return the table read/write information with table full name db.table.

Note that external parsers do not support USE statement, we can do this simply by split the SQL program by ; and using regex to match if the statement is like USE db_identifier.

Hazard

SQL program is just like normal computer programs. We treat tables as variables in our case. Analyzing computer programs always have hazards: https://en.wikipedia.org/wiki/Hazard\_(computer\_architecture). In the example above, we can construct a graph like below:

SQL Program Dependency Analyze - 图2

Note that the last “DROP TABLE” statement must execute after the two “Explain” statements, because the table is used by the explain statements before it could be changed (in this case, deleted). This case is called “Write After Read”. To solve this kind of data hazards, we can add one dependency node “table WAR” between the explain statements and drop statement:

SQL Program Dependency Analyze - 图3

We also need to solve “Write After Write” hazard just like “Write After Read” if the SQL program has this situation.

Implementation

In order to construct the dependency graph of the SQL program, we need to analyze the parsed SQL statements in every parser, including MySQL parser, Hive parser, calcite parser, and SQLFlow extended parser. Since the Hive parser and calcite parser is written in Java, we need to pass the table read/write information from Java to Go:

  1. // InputOutputTables represents input and output tables a statement manipulates.
  2. message InputOutputTables {
  3. repeated string input_tables = 1;
  4. repeated string output_tables = 2;
  5. }
  6. message ParserResponse {
  7. repeated string sql_statements = 1;
  8. int32 index = 2;
  9. string error = 3;
  10. // return tables that each statement manipulates.
  11. repeated InputOutputTables input_output_tables = 4;
  12. }

When parsing the SQL program in Go, we can then get table read/write information for each standard SQL statement, then parse the extended SQL to get model read/write information, then construct the graph:

  1. // pkg/ir/deps/sql_program_deps_graph.go
  2. package deps
  3. // SQLProgramGraph is the constructed graph of the SQL program.
  4. type SQLProgram struct {
  5. Statements []*Statement
  6. }
  7. type Statement struct {
  8. Statement ir.SQLFlowStmt
  9. // Statement's input/output must be a table.
  10. Inputs []*Table
  11. Outputs []*Table
  12. }
  13. type TableType string
  14. const(
  15. TableType Table = "table"
  16. Model = "model"
  17. )
  18. type Table struct {
  19. // Type can be "table" or "model".
  20. Type TableType
  21. Name string
  22. // Table's input/output must be a statement.
  23. Inputs *[]Statement
  24. Outputs *[]Statement
  25. }
  26. func (t *Table) FullName() string {
  27. return t.Type + "." + t.Name
  28. }
  29. // Analyze will construct a dependency graph for the SQL program and
  30. // returns the first statement (root node).
  31. func Analyze(program []ir.Statement) (*deps.Statement, error) {}

NOTE: we treat table and model as the same thing when constructing the graph. The actual table name in the graph is “Type.Name”, because we may save the model in OSS storage rather than in a table, if the model name is the same as some table name, there’s no dependency between them.

Then the workflow package can use the constructed graph to generate Argo/Tekton YAML to submit to Kubernetes cluster for execution:

  1. GenCode(*deps.Statement) string // generate couler/fluid code for the graph
  2. GenYAML(string) string // execute couler/fluid Python code and generate Argo/Tekton YAML

Submit the YAML to Kubernetes then the workflow with dependency will start to execute.