Impala Tutorials
This section includes tutorial scenarios that demonstrate how to begin using Impala once the software is installed. It focuses on techniques for loading data, because once you have some data in tables and can query that data, you can quickly progress to more advanced Impala features.
Note:
Where practical, the tutorials take you from “ground zero” to having the desired Impala tables and data. In some cases, you might need to download additional files from outside sources, set up additional software components, modify commands or scripts to fit your own configuration, or substitute your own sample data.
Before trying these tutorial lessons, install Impala using one of these procedures:
- If you already have some Apache Hadoop environment set up and just need to add Impala to it, follow the installation process described in Installing Impala. Make sure to also install the Hive metastore service if you do not already have Hive configured.
Tutorials for Getting Started
These tutorials demonstrate the basics of using Impala. They are intended for first-time users, and for trying out Impala on any new cluster to make sure the major components are working correctly.
Explore a New Impala Instance
This tutorial demonstrates techniques for finding your way around the tables and databases of an unfamiliar (possibly empty) Impala instance.
When you connect to an Impala instance for the first time, you use the SHOW DATABASES
and SHOW TABLES
statements to view the most common types of objects. Also, call the version()
function to confirm which version of Impala you are running; the version number is important when consulting documentation and dealing with support issues.
A completely empty Impala instance contains no tables, but still has two databases:
default
, where new tables are created when you do not specify any other database._impala_builtins
, a system database used to hold all the built-in functions.
The following example shows how to see the available databases, and the tables in each. If the list of databases or tables is long, you can use wildcard notation to locate specific databases or tables based on their names.
$ impala-shell -i localhost --quiet
Starting Impala Shell without Kerberos authentication
Welcome to the Impala shell. Press TAB twice to see a list of available commands.
...
(Shell
build version: Impala Shell v3.4.x (hash) built on
date)
[localhost:21000] > select version();
+-------------------------------------------
| version()
+-------------------------------------------
| impalad version ...
| Built on ...
+-------------------------------------------
[localhost:21000] > show databases;
+--------------------------+
| name |
+--------------------------+
| _impala_builtins |
| ctas |
| d1 |
| d2 |
| d3 |
| default |
| explain_plans |
| external_table |
| file_formats |
| tpc |
+--------------------------+
[localhost:21000] > select current_database();
+--------------------+
| current_database() |
+--------------------+
| default |
+--------------------+
[localhost:21000] > show tables;
+-------+
| name |
+-------+
| ex_t |
| t1 |
+-------+
[localhost:21000] > show tables in d3;
[localhost:21000] > show tables in tpc;
+------------------------+
| name |
+------------------------+
| city |
| customer |
| customer_address |
| customer_demographics |
| household_demographics |
| item |
| promotion |
| store |
| store2 |
| store_sales |
| ticket_view |
| time_dim |
| tpc_tables |
+------------------------+
[localhost:21000] > show tables in tpc like 'customer*';
+-----------------------+
| name |
+-----------------------+
| customer |
| customer_address |
| customer_demographics |
+-----------------------+
Once you know what tables and databases are available, you descend into a database with the USE
statement. To understand the structure of each table, you use the DESCRIBE
command. Once inside a database, you can issue statements such as INSERT
and SELECT
that operate on particular tables.
The following example explores a database named TPC
whose name we learned in the previous example. It shows how to filter the table names within a database based on a search string, examine the columns of a table, and run queries to examine the characteristics of the table data. For example, for an unfamiliar table you might want to know the number of rows, the number of different values for a column, and other properties such as whether the column contains any NULL
values. When sampling the actual data values from a table, use a LIMIT
clause to avoid excessive output if the table contains more rows or distinct values than you expect.
[localhost:21000] > use tpc;
[localhost:21000] > show tables like '*view*';
+-------------+
| name |
+-------------+
| ticket_view |
+-------------+
[localhost:21000] > describe city;
+-------------+--------+---------+
| name | type | comment |
+-------------+--------+---------+
| id | int | |
| name | string | |
| countrycode | string | |
| district | string | |
| population | int | |
+-------------+--------+---------+
[localhost:21000] > select count(*) from city;
+----------+
| count(*) |
+----------+
| 0 |
+----------+
[localhost:21000] > desc customer;
+------------------------+--------+---------+
| name | type | comment |
+------------------------+--------+---------+
| c_customer_sk | int | |
| c_customer_id | string | |
| c_current_cdemo_sk | int | |
| c_current_hdemo_sk | int | |
| c_current_addr_sk | int | |
| c_first_shipto_date_sk | int | |
| c_first_sales_date_sk | int | |
| c_salutation | string | |
| c_first_name | string | |
| c_last_name | string | |
| c_preferred_cust_flag | string | |
| c_birth_day | int | |
| c_birth_month | int | |
| c_birth_year | int | |
| c_birth_country | string | |
| c_login | string | |
| c_email_address | string | |
| c_last_review_date | string | |
+------------------------+--------+---------+
[localhost:21000] > select count(*) from customer;
+----------+
| count(*) |
+----------+
| 100000 |
+----------+
[localhost:21000] > select count(distinct c_birth_month) from customer;
+-------------------------------+
| count(distinct c_birth_month) |
+-------------------------------+
| 12 |
+-------------------------------+
[localhost:21000] > select count(*) from customer where c_email_address is null;
+----------+
| count(*) |
+----------+
| 0 |
+----------+
[localhost:21000] > select distinct c_salutation from customer limit 10;
+--------------+
| c_salutation |
+--------------+
| Mr. |
| Ms. |
| Dr. |
| |
| Miss |
| Sir |
| Mrs. |
+--------------+
When you graduate from read-only exploration, you use statements such as CREATE DATABASE
and CREATE TABLE
to set up your own database objects.
The following example demonstrates creating a new database holding a new table. Although the last example ended inside the TPC
database, the new EXPERIMENTS
database is not nested inside TPC
; all databases are arranged in a single top-level list.
[localhost:21000] > create database experiments;
[localhost:21000] > show databases;
+--------------------------+
| name |
+--------------------------+
| _impala_builtins |
| ctas |
| d1 |
| d2 |
| d3 |
| default |
| experiments |
| explain_plans |
| external_table |
| file_formats |
| tpc |
+--------------------------+
[localhost:21000] > show databases like 'exp*';
+---------------+
| name |
+---------------+
| experiments |
| explain_plans |
+---------------+
The following example creates a new table, T1
. To illustrate a common mistake, it creates this table inside the wrong database, the TPC
database where the previous example ended. The ALTER TABLE
statement lets you move the table to the intended database, EXPERIMENTS
, as part of a rename operation. The USE
statement is always needed to switch to a new database, and the current_database()
function confirms which database the session is in, to avoid these kinds of mistakes.
[localhost:21000] > create table t1 (x int);
[localhost:21000] > show tables;
+------------------------+
| name |
+------------------------+
| city |
| customer |
| customer_address |
| customer_demographics |
| household_demographics |
| item |
| promotion |
| store |
| store2 |
| store_sales |
| t1 |
| ticket_view |
| time_dim |
| tpc_tables |
+------------------------+
[localhost:21000] > select current_database();
+--------------------+
| current_database() |
+--------------------+
| tpc |
+--------------------+
[localhost:21000] > alter table t1 rename to experiments.t1;
[localhost:21000] > use experiments;
[localhost:21000] > show tables;
+------+
| name |
+------+
| t1 |
+------+
[localhost:21000] > select current_database();
+--------------------+
| current_database() |
+--------------------+
| experiments |
+--------------------+
For your initial experiments with tables, you can use ones with just a few columns and a few rows, and text-format data files.
Note: As you graduate to more realistic scenarios, you will use more elaborate tables with many columns, features such as partitioning, and file formats such as Parquet. When dealing with realistic data volumes, you will bring in data using LOAD DATA
or INSERT ... SELECT
statements to operate on millions or billions of rows at once.
The following example sets up a couple of simple tables with a few rows, and performs queries involving sorting, aggregate functions and joins.
[localhost:21000] > insert into t1 values (1), (3), (2), (4);
[localhost:21000] > select x from t1 order by x desc;
+---+
| x |
+---+
| 4 |
| 3 |
| 2 |
| 1 |
+---+
[localhost:21000] > select min(x), max(x), sum(x), avg(x) from t1;
+--------+--------+--------+--------+
| min(x) | max(x) | sum(x) | avg(x) |
+--------+--------+--------+--------+
| 1 | 4 | 10 | 2.5 |
+--------+--------+--------+--------+
[localhost:21000] > create table t2 (id int, word string);
[localhost:21000] > insert into t2 values (1, "one"), (3, "three"), (5, 'five');
[localhost:21000] > select word from t1 join t2 on (t1.x = t2.id);
+-------+
| word |
+-------+
| one |
| three |
+-------+
After completing this tutorial, you should now know:
- How to tell which version of Impala is running on your system.
- How to find the names of databases in an Impala instance, either displaying the full list or searching for specific names.
- How to find the names of tables in an Impala database, either displaying the full list or searching for specific names.
- How to switch between databases and check which database you are currently in.
- How to learn the column names and types of a table.
- How to create databases and tables, insert small amounts of test data, and run simple queries.
Load CSV Data from Local Files
This scenario illustrates how to create some very small tables, suitable for first-time users to experiment with Impala SQL features. TAB1
and TAB2
are loaded with data from files in HDFS. A subset of data is copied from TAB1
into TAB3
.
Populate HDFS with the data you want to query. To begin this process, create one or more new subdirectories underneath your user directory in HDFS. The data for each table resides in a separate subdirectory. Substitute your own username for username
where appropriate. This example uses the -p
option with the mkdir
operation to create any necessary parent directories if they do not already exist.
$ whoami
username
$ hdfs dfs -ls /user
Found 3 items
drwxr-xr-x - username username 0 2013-04-22 18:54 /user/username
drwxrwx--- - mapred mapred 0 2013-03-15 20:11 /user/history
drwxr-xr-x - hue supergroup 0 2013-03-15 20:10 /user/hive
$ hdfs dfs -mkdir -p /user/username/sample_data/tab1 /user/username/sample_data/tab2
Here is some sample data, for two tables named TAB1
and TAB2
.
Copy the following content to .csv
files in your local filesystem:
tab1.csv:
1,true,123.123,2012-10-24 08:55:00
2,false,1243.5,2012-10-25 13:40:00
3,false,24453.325,2008-08-22 09:33:21.123
4,false,243423.325,2007-05-12 22:32:21.33454
5,true,243.325,1953-04-22 09:11:33
tab2.csv:
1,true,12789.123
2,false,1243.5
3,false,24453.325
4,false,2423.3254
5,true,243.325
60,false,243565423.325
70,true,243.325
80,false,243423.325
90,true,243.325
Put each .csv
file into a separate HDFS directory using commands like the following, which use paths available in the Impala Demo VM:
$ hdfs dfs -put tab1.csv /user/username/sample_data/tab1
$ hdfs dfs -ls /user/username/sample_data/tab1
Found 1 items
-rw-r--r-- 1 username username 192 2013-04-02 20:08 /user/username/sample_data/tab1/tab1.csv
$ hdfs dfs -put tab2.csv /user/username/sample_data/tab2
$ hdfs dfs -ls /user/username/sample_data/tab2
Found 1 items
-rw-r--r-- 1 username username 158 2013-04-02 20:09 /user/username/sample_data/tab2/tab2.csv
The name of each data file is not significant. In fact, when Impala examines the contents of the data directory for the first time, it considers all files in the directory to make up the data of the table, regardless of how many files there are or what the files are named.
To understand what paths are available within your own HDFS filesystem and what the permissions are for the various directories and files, issue hdfs dfs -ls /
and work your way down the tree doing -ls
operations for the various directories.
Use the impala-shell
command to create tables, either interactively or through a SQL script.
The following example shows creating three tables. For each table, the example shows creating columns with various attributes such as Boolean or integer types. The example also includes commands that provide information about how the data is formatted, such as rows terminating with commas, which makes sense in the case of importing data from a .csv
file. Where we already have .csv
files containing data in the HDFS directory tree, we specify the location of the directory containing the appropriate .csv
file. Impala considers all the data from all the files in that directory to represent the data for the table.
DROP TABLE IF EXISTS tab1;
-- The EXTERNAL clause means the data is located outside the central location
-- for Impala data files and is preserved when the associated Impala table is dropped.
-- We expect the data to already exist in the directory specified by the LOCATION clause.
CREATE EXTERNAL TABLE tab1
(
id INT,
col_1 BOOLEAN,
col_2 DOUBLE,
col_3 TIMESTAMP
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/username/sample_data/tab1';
DROP TABLE IF EXISTS tab2;
-- TAB2 is an external table, similar to TAB1.
CREATE EXTERNAL TABLE tab2
(
id INT,
col_1 BOOLEAN,
col_2 DOUBLE
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/username/sample_data/tab2';
DROP TABLE IF EXISTS tab3;
-- Leaving out the EXTERNAL clause means the data will be managed
-- in the central Impala data directory tree. Rather than reading
-- existing data files when the table is created, we load the
-- data after creating the table.
CREATE TABLE tab3
(
id INT,
col_1 BOOLEAN,
col_2 DOUBLE,
month INT,
day INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
Note: Getting through these CREATE TABLE
statements successfully is an important validation step to confirm everything is configured correctly with the Hive metastore and HDFS permissions. If you receive any errors during the CREATE TABLE
statements:
- Make sure you followed the installation instructions closely, in Installing Impala.
- Make sure the
hive.metastore.warehouse.dir
property points to a directory that Impala can write to. The ownership should behive:hive
, and theimpala
user should also be a member of thehive
group.
Point an Impala Table at Existing Data Files
A convenient way to set up data for Impala to access is to use an external table, where the data already exists in a set of HDFS files and you just point the Impala table at the directory containing those files. For example, you might run in impala-shell
a *.sql
file with contents similar to the following, to create an Impala table that accesses an existing data file used by Hive.
The following examples set up 2 tables, referencing the paths and sample data from the sample TPC-DS kit for Impala. For historical reasons, the data physically resides in an HDFS directory tree under /user/hive, although this particular data is entirely managed by Impala rather than Hive. When we create an external table, we specify the directory containing one or more data files, and Impala queries the combined content of all the files inside that directory. Here is how we examine the directories and files within the HDFS filesystem:
$ cd ~/username/datasets
$ ./tpcds-setup.sh
... Downloads and unzips the kit, builds the data and loads it into HDFS ...
$ hdfs dfs -ls /user/hive/tpcds/customer
Found 1 items
-rw-r--r-- 1 username supergroup 13209372 2013-03-22 18:09 /user/hive/tpcds/customer/customer.dat
$ hdfs dfs -cat /user/hive/tpcds/customer/customer.dat | more
1|AAAAAAAABAAAAAAA|980124|7135|32946|2452238|2452208|Mr.|Javier|Lewis|Y|9|12|1936|CHILE||Javie
r.Lewis@VFAxlnZEvOx.org|2452508|
2|AAAAAAAACAAAAAAA|819667|1461|31655|2452318|2452288|Dr.|Amy|Moses|Y|9|4|1966|TOGO||Amy.Moses@
Ovk9KjHH.com|2452318|
3|AAAAAAAADAAAAAAA|1473522|6247|48572|2449130|2449100|Miss|Latisha|Hamilton|N|18|9|1979|NIUE||
Latisha.Hamilton@V.com|2452313|
4|AAAAAAAAEAAAAAAA|1703214|3986|39558|2450030|2450000|Dr.|Michael|White|N|7|6|1983|MEXICO||Mic
hael.White@i.org|2452361|
5|AAAAAAAAFAAAAAAA|953372|4470|36368|2449438|2449408|Sir|Robert|Moran|N|8|5|1956|FIJI||Robert.
Moran@Hh.edu|2452469|
...
Here is a SQL script to set up Impala tables pointing to some of these data files in HDFS. (The script in the VM sets up tables like this through Hive; ignore those tables for purposes of this demonstration.) Save the following as customer_setup.sql:
--
-- store_sales fact table and surrounding dimension tables only
--
create database tpcds;
use tpcds;
drop table if exists customer;
create external table customer
(
c_customer_sk int,
c_customer_id string,
c_current_cdemo_sk int,
c_current_hdemo_sk int,
c_current_addr_sk int,
c_first_shipto_date_sk int,
c_first_sales_date_sk int,
c_salutation string,
c_first_name string,
c_last_name string,
c_preferred_cust_flag string,
c_birth_day int,
c_birth_month int,
c_birth_year int,
c_birth_country string,
c_login string,
c_email_address string,
c_last_review_date string
)
row format delimited fields terminated by '|'
location '/user/hive/tpcds/customer';
drop table if exists customer_address;
create external table customer_address
(
ca_address_sk int,
ca_address_id string,
ca_street_number string,
ca_street_name string,
ca_street_type string,
ca_suite_number string,
ca_city string,
ca_county string,
ca_state string,
ca_zip string,
ca_country string,
ca_gmt_offset float,
ca_location_type string
)
row format delimited fields terminated by '|'
location '/user/hive/tpcds/customer_address';
We would run this script with a command such as:
impala-shell -i localhost -f customer_setup.sql
Describe the Impala Table
Now that you have updated the database metadata that Impala caches, you can confirm that the expected tables are accessible by Impala and examine the attributes of one of the tables. We created these tables in the database named default
. If the tables were in a database other than the default, we would issue a command use db_name
to switch to that database before examining or querying its tables. We could also qualify the name of a table by prepending the database name, for example default.customer
and default.customer_name
.
[impala-host:21000] > show databases
Query finished, fetching results ...
default
Returned 1 row(s) in 0.00s
[impala-host:21000] > show tables
Query finished, fetching results ...
customer
customer_address
Returned 2 row(s) in 0.00s
[impala-host:21000] > describe customer_address
+------------------+--------+---------+
| name | type | comment |
+------------------+--------+---------+
| ca_address_sk | int | |
| ca_address_id | string | |
| ca_street_number | string | |
| ca_street_name | string | |
| ca_street_type | string | |
| ca_suite_number | string | |
| ca_city | string | |
| ca_county | string | |
| ca_state | string | |
| ca_zip | string | |
| ca_country | string | |
| ca_gmt_offset | float | |
| ca_location_type | string | |
+------------------+--------+---------+
Returned 13 row(s) in 0.01
Query the Impala Table
You can query data contained in the tables. Impala coordinates the query execution across a single node or multiple nodes depending on your configuration, without the overhead of running MapReduce jobs to perform the intermediate processing.
There are a variety of ways to execute queries on Impala:
Using the
impala-shell
command in interactive mode:$ impala-shell -i impala-host
Connected to localhost:21000
[impala-host:21000] > select count(*) from customer_address;
50000
Returned 1 row(s) in 0.37s
Passing a set of commands contained in a file:
$ impala-shell -i impala-host -f myquery.sql
Connected to localhost:21000
50000
Returned 1 row(s) in 0.19s
Passing a single command to the
impala-shell
command. The query is executed, the results are returned, and the shell exits. Make sure to quote the command, preferably with single quotation marks to avoid shell expansion of characters such as*
.$ impala-shell -i impala-host -q 'select count(*) from customer_address'
Connected to localhost:21000
50000
Returned 1 row(s) in 0.29s
Data Loading and Querying Examples
This section describes how to create some sample tables and load data into them. These tables can then be queried using the Impala shell.
Loading Data
Loading data involves:
- Establishing a data set. The example below uses
.csv
files. - Creating tables to which to load data.
- Loading the data into the tables you created.
Sample Queries
To run these sample queries, create a SQL query file query.sql
, copy and paste each query into the query file, and then run the query file using the shell. For example, to run query.sql
on impala-host
, you might use the command:
impala-shell.sh -i impala-host -f query.sql
The examples and results below assume you have loaded the sample data into the tables as described above.
Example: Examining Contents of Tables
Let’s start by verifying that the tables do contain the data we expect. Because Impala often deals with tables containing millions or billions of rows, when examining tables of unknown size, include the LIMIT
clause to avoid huge amounts of unnecessary output, as in the final query. (If your interactive query starts displaying an unexpected volume of data, press Ctrl-C
in impala-shell
to cancel the query.)
SELECT * FROM tab1;
SELECT * FROM tab2;
SELECT * FROM tab2 LIMIT 5;
Results:
+----+-------+------------+-------------------------------+
| id | col_1 | col_2 | col_3 |
+----+-------+------------+-------------------------------+
| 1 | true | 123.123 | 2012-10-24 08:55:00 |
| 2 | false | 1243.5 | 2012-10-25 13:40:00 |
| 3 | false | 24453.325 | 2008-08-22 09:33:21.123000000 |
| 4 | false | 243423.325 | 2007-05-12 22:32:21.334540000 |
| 5 | true | 243.325 | 1953-04-22 09:11:33 |
+----+-------+------------+-------------------------------+
+----+-------+---------------+
| id | col_1 | col_2 |
+----+-------+---------------+
| 1 | true | 12789.123 |
| 2 | false | 1243.5 |
| 3 | false | 24453.325 |
| 4 | false | 2423.3254 |
| 5 | true | 243.325 |
| 60 | false | 243565423.325 |
| 70 | true | 243.325 |
| 80 | false | 243423.325 |
| 90 | true | 243.325 |
+----+-------+---------------+
+----+-------+-----------+
| id | col_1 | col_2 |
+----+-------+-----------+
| 1 | true | 12789.123 |
| 2 | false | 1243.5 |
| 3 | false | 24453.325 |
| 4 | false | 2423.3254 |
| 5 | true | 243.325 |
+----+-------+-----------+
Example: Aggregate and Join
SELECT tab1.col_1, MAX(tab2.col_2), MIN(tab2.col_2)
FROM tab2 JOIN tab1 USING (id)
GROUP BY col_1 ORDER BY 1 LIMIT 5;
Results:
+-------+-----------------+-----------------+
| col_1 | max(tab2.col_2) | min(tab2.col_2) |
+-------+-----------------+-----------------+
| false | 24453.325 | 1243.5 |
| true | 12789.123 | 243.325 |
+-------+-----------------+-----------------+
Example: Subquery, Aggregate and Joins
SELECT tab2.*
FROM tab2,
(SELECT tab1.col_1, MAX(tab2.col_2) AS max_col2
FROM tab2, tab1
WHERE tab1.id = tab2.id
GROUP BY col_1) subquery1
WHERE subquery1.max_col2 = tab2.col_2;
Results:
+----+-------+-----------+
| id | col_1 | col_2 |
+----+-------+-----------+
| 1 | true | 12789.123 |
| 3 | false | 24453.325 |
+----+-------+-----------+
Example: INSERT Query
INSERT OVERWRITE TABLE tab3
SELECT id, col_1, col_2, MONTH(col_3), DAYOFMONTH(col_3)
FROM tab1 WHERE YEAR(col_3) = 2012;
Query TAB3
to check the result:
SELECT * FROM tab3;
Results:
+----+-------+---------+-------+-----+
| id | col_1 | col_2 | month | day |
+----+-------+---------+-------+-----+
| 1 | true | 123.123 | 10 | 24 |
| 2 | false | 1243.5 | 10 | 25 |
+----+-------+---------+-------+-----+
Advanced Tutorials
These tutorials walk you through advanced scenarios or specialized features.
Attaching an External Partitioned Table to an HDFS Directory Structure
This tutorial shows how you might set up a directory tree in HDFS, put data files into the lowest-level subdirectories, and then use an Impala external table to query the data files from their original locations.
The tutorial uses a table with web log data, with separate subdirectories for the year, month, day, and host. For simplicity, we use a tiny amount of CSV data, loading the same data into each partition.
First, we make an Impala partitioned table for CSV data, and look at the underlying HDFS directory structure to understand the directory structure to re-create elsewhere in HDFS. The columns field1
, field2
, and field3
correspond to the contents of the CSV data files. The year
, month
, day
, and host
columns are all represented as subdirectories within the table structure, and are not part of the CSV files. We use STRING
for each of these columns so that we can produce consistent subdirectory names, with leading zeros for a consistent length.
create database external_partitions;
use external_partitions;
create table logs (field1 string, field2 string, field3 string)
partitioned by (year string, month string , day string, host string)
row format delimited fields terminated by ',';
insert into logs partition (year="2013", month="07", day="28", host="host1") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="28", host="host2") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="29", host="host1") values ("foo","foo","foo");
insert into logs partition (year="2013", month="07", day="29", host="host2") values ("foo","foo","foo");
insert into logs partition (year="2013", month="08", day="01", host="host1") values ("foo","foo","foo");
Back in the Linux shell, we examine the HDFS directory structure. (Your Impala data directory might be in a different location; for historical reasons, it is sometimes under the HDFS path /user/hive/warehouse.) We use the hdfs dfs -ls
command to examine the nested subdirectories corresponding to each partitioning column, with separate subdirectories at each level (with =
in their names) representing the different values for each partitioning column. When we get to the lowest level of subdirectory, we use the hdfs dfs -cat
command to examine the data file and see CSV-formatted data produced by the INSERT
statement in Impala.
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db
Found 1 items
drwxrwxrwt - impala hive 0 2013-08-07 12:24 /user/impala/warehouse/external_partitions.db/logs
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs
Found 1 items
drwxr-xr-x - impala hive 0 2013-08-07 12:24 /user/impala/warehouse/external_partitions.db/logs/year=2013
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013
Found 2 items
drwxr-xr-x - impala hive 0 2013-08-07 12:23 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07
drwxr-xr-x - impala hive 0 2013-08-07 12:24 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=08
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07
Found 2 items
drwxr-xr-x - impala hive 0 2013-08-07 12:22 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28
drwxr-xr-x - impala hive 0 2013-08-07 12:23 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=29
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28
Found 2 items
drwxr-xr-x - impala hive 0 2013-08-07 12:21 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/host=host1
drwxr-xr-x - impala hive 0 2013-08-07 12:22 /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/host=host2
$ hdfs dfs -ls /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/host=host1
Found 1 items
-rw-r--r-- 3 impala hive 12 2013-08-07 12:21 /user/impala/warehouse/external_partiti
ons.db/logs/year=2013/month=07/day=28/host=host1/3981726974111751120--8907184999369517436_822630111_data.0
$ hdfs dfs -cat /user/impala/warehouse/external_partitions.db/logs/year=2013/month=07/day=28/\
host=host1/3981726974111751120--8 907184999369517436_822630111_data.0
foo,foo,foo
Still in the Linux shell, we use hdfs dfs -mkdir
to create several data directories outside the HDFS directory tree that Impala controls (/user/impala/warehouse in this example, maybe different in your case). Depending on your configuration, you might need to log in as a user with permission to write into this HDFS directory tree; for example, the commands shown here were run while logged in as the hdfs
user.
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=28/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=28/host=host2
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=28/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=07/day=29/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/logs/year=2013/month=08/day=01/host=host1
We make a tiny CSV file, with values different than in the INSERT
statements used earlier, and put a copy within each subdirectory that we will use as an Impala partition.
$ cat >dummy_log_data
bar,baz,bletch
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=08/day=01/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=07/day=28/host=host1
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=07/day=28/host=host2
$ hdfs dfs -mkdir -p /user/impala/data/external_partitions/year=2013/month=07/day=29/host=host1
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=07/day=28/host=host1
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=07/day=28/host=host2
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=07/day=29/host=host1
$ hdfs dfs -put dummy_log_data /user/impala/data/logs/year=2013/month=08/day=01/host=host1
Back in the impala-shell interpreter, we move the original Impala-managed table aside, and create a new external table with a LOCATION
clause pointing to the directory under which we have set up all the partition subdirectories and data files.
use external_partitions;
alter table logs rename to logs_original;
create external table logs (field1 string, field2 string, field3 string)
partitioned by (year string, month string, day string, host string)
row format delimited fields terminated by ','
location '/user/impala/data/logs';
Because partition subdirectories and data files come and go during the data lifecycle, you must identify each of the partitions through an ALTER TABLE
statement before Impala recognizes the data files they contain.
alter table logs add partition (year="2013",month="07",day="28",host="host1")
alter table log_type add partition (year="2013",month="07",day="28",host="host2");
alter table log_type add partition (year="2013",month="07",day="29",host="host1");
alter table log_type add partition (year="2013",month="08",day="01",host="host1");
We issue a REFRESH
statement for the table, always a safe practice when data files have been manually added, removed, or changed. Then the data is ready to be queried. The SELECT *
statement illustrates that the data from our trivial CSV file was recognized in each of the partitions where we copied it. Although in this case there are only a few rows, we include a LIMIT
clause on this test query just in case there is more data than we expect.
refresh log_type;
select * from log_type limit 100;
+--------+--------+--------+------+-------+-----+-------+
| field1 | field2 | field3 | year | month | day | host |
+--------+--------+--------+------+-------+-----+-------+
| bar | baz | bletch | 2013 | 07 | 28 | host1 |
| bar | baz | bletch | 2013 | 08 | 01 | host1 |
| bar | baz | bletch | 2013 | 07 | 29 | host1 |
| bar | baz | bletch | 2013 | 07 | 28 | host2 |
+--------+--------+--------+------+-------+-----+-------+
Switching Back and Forth Between Impala and Hive
Sometimes, you might find it convenient to switch to the Hive shell to perform some data loading or transformation operation, particularly on file formats such as RCFile, SequenceFile, and Avro that Impala currently can query but not write to.
Whenever you create, drop, or alter a table or other kind of object through Hive, the next time you switch back to the impala-shell interpreter, issue a one-time INVALIDATE METADATA
statement so that Impala recognizes the new or changed object.
Whenever you load, insert, or change data in an existing table through Hive (or even through manual HDFS operations such as the hdfs command), the next time you switch back to the impala-shell interpreter, issue a one-time REFRESH table_name
statement so that Impala recognizes the new or changed data.
For examples showing how this process works for the REFRESH
statement, look at the examples of creating RCFile and SequenceFile tables in Impala, loading data through Hive, and then querying the data through Impala. See Using the RCFile File Format with Impala Tables and Using the SequenceFile File Format with Impala Tables for those examples.
For examples showing how this process works for the INVALIDATE METADATA
statement, look at the example of creating and loading an Avro table in Hive, and then querying the data through Impala. See Using the Avro File Format with Impala Tables for that example.
Note:
Originally, Impala did not support UDFs, but this feature is available in Impala starting in Impala 1.2. Some INSERT ... SELECT
transformations that you originally did through Hive can now be done through Impala. See User-Defined Functions (UDFs) for details.
Prior to Impala 1.2, the REFRESH
and INVALIDATE METADATA
statements needed to be issued on each Impala node to which you connected and issued queries. In Impala 1.2 and higher, when you issue either of those statements on any Impala node, the results are broadcast to all the Impala nodes in the cluster, making it truly a one-step operation after each round of DDL or ETL operations in Hive.
Cross Joins and Cartesian Products with the CROSS JOIN Operator
Originally, Impala restricted join queries so that they had to include at least one equality comparison between the columns of the tables on each side of the join operator. With the huge tables typically processed by Impala, any miscoded query that produced a full Cartesian product as a result set could consume a huge amount of cluster resources.
In Impala 1.2.2 and higher, this restriction is lifted when you use the CROSS JOIN
operator in the query. You still cannot remove all WHERE
clauses from a query like SELECT * FROM t1 JOIN t2
to produce all combinations of rows from both tables. But you can use the CROSS JOIN
operator to explicitly request such a Cartesian product. Typically, this operation is applicable for smaller tables, where the result set still fits within the memory of a single Impala node.
The following example sets up data for use in a series of comic books where characters battle each other. At first, we use an equijoin query, which only allows characters from the same time period and the same planet to meet.
[localhost:21000] > create table heroes (name string, era string, planet string);
[localhost:21000] > create table villains (name string, era string, planet string);
[localhost:21000] > insert into heroes values
> ('Tesla','20th century','Earth'),
> ('Pythagoras','Antiquity','Earth'),
> ('Zopzar','Far Future','Mars');
Inserted 3 rows in 2.28s
[localhost:21000] > insert into villains values
> ('Caligula','Antiquity','Earth'),
> ('John Dillinger','20th century','Earth'),
> ('Xibulor','Far Future','Venus');
Inserted 3 rows in 1.93s
[localhost:21000] > select concat(heroes.name,' vs. ',villains.name) as battle
> from heroes join villains
> where heroes.era = villains.era and heroes.planet = villains.planet;
+--------------------------+
| battle |
+--------------------------+
| Tesla vs. John Dillinger |
| Pythagoras vs. Caligula |
+--------------------------+
Returned 2 row(s) in 0.47s
Readers demanded more action, so we added elements of time travel and space travel so that any hero could face any villain. Prior to Impala 1.2.2, this type of query was impossible because all joins had to reference matching values between the two tables:
[localhost:21000] > -- Cartesian product not possible in Impala 1.1.
> select concat(heroes.name,' vs. ',villains.name) as battle from heroes join villains;
ERROR: NotImplementedException: Join between 'heroes' and 'villains' requires at least one conjunctive equality predicate between the two tables
With Impala 1.2.2, we rewrite the query slightly to use CROSS JOIN
rather than JOIN
, and now the result set includes all combinations:
[localhost:21000] > -- Cartesian product available in Impala 1.2.2 with the CROSS JOIN syntax.
> select concat(heroes.name,' vs. ',villains.name) as battle from heroes cross join villains;
+-------------------------------+
| battle |
+-------------------------------+
| Tesla vs. Caligula |
| Tesla vs. John Dillinger |
| Tesla vs. Xibulor |
| Pythagoras vs. Caligula |
| Pythagoras vs. John Dillinger |
| Pythagoras vs. Xibulor |
| Zopzar vs. Caligula |
| Zopzar vs. John Dillinger |
| Zopzar vs. Xibulor |
+-------------------------------+
Returned 9 row(s) in 0.33s
The full combination of rows from both tables is known as the Cartesian product. This type of result set is often used for creating grid data structures. You can also filter the result set by including WHERE
clauses that do not explicitly compare columns between the two tables. The following example shows how you might produce a list of combinations of year and quarter for use in a chart, and then a shorter list with only selected quarters.
[localhost:21000] > create table x_axis (x int);
[localhost:21000] > create table y_axis (y int);
[localhost:21000] > insert into x_axis values (1),(2),(3),(4);
Inserted 4 rows in 2.14s
[localhost:21000] > insert into y_axis values (2010),(2011),(2012),(2013),(2014);
Inserted 5 rows in 1.32s
[localhost:21000] > select y as year, x as quarter from x_axis cross join y_axis;
+------+---------+
| year | quarter |
+------+---------+
| 2010 | 1 |
| 2011 | 1 |
| 2012 | 1 |
| 2013 | 1 |
| 2014 | 1 |
| 2010 | 2 |
| 2011 | 2 |
| 2012 | 2 |
| 2013 | 2 |
| 2014 | 2 |
| 2010 | 3 |
| 2011 | 3 |
| 2012 | 3 |
| 2013 | 3 |
| 2014 | 3 |
| 2010 | 4 |
| 2011 | 4 |
| 2012 | 4 |
| 2013 | 4 |
| 2014 | 4 |
+------+---------+
Returned 20 row(s) in 0.38s
[localhost:21000] > select y as year, x as quarter from x_axis cross join y_axis where x in (1,3);
+------+---------+
| year | quarter |
+------+---------+
| 2010 | 1 |
| 2011 | 1 |
| 2012 | 1 |
| 2013 | 1 |
| 2014 | 1 |
| 2010 | 3 |
| 2011 | 3 |
| 2012 | 3 |
| 2013 | 3 |
| 2014 | 3 |
+------+---------+
Returned 10 row(s) in 0.39s
Dealing with Parquet Files with Unknown Schema
As data pipelines start to include more aspects such as NoSQL or loosely specified schemas, you might encounter situations where you have data files (particularly in Parquet format) where you do not know the precise table definition. This tutorial shows how you can build an Impala table around data that comes from non-Impala or even non-SQL sources, where you do not have control of the table layout and might not be familiar with the characteristics of the data.
The data used in this tutorial represents airline on-time arrival statistics, from October 1987 through April 2008. See the details on the 2009 ASA Data Expo web site. You can also see the explanations of the columns; for purposes of this exercise, wait until after following the tutorial before examining the schema, to better simulate a real-life situation where you cannot rely on assumptions and assertions about the ranges and representations of data values.
Download the Data Files into HDFS
First, we download and unpack the data files. There are 8 files totalling 1.4 GB.
$ wget -O airlines_parquet.tar.gz https://home.apache.org/~arodoni/airlines_parquet.tar.gz
$ wget https://home.apache.org/~arodoni/airlines_parquet.tar.gz.sha512
$ shasum -a 512 -c airlines_parquet.tar.gz.sha512
airlines_parquet.tar.gz: OK
$ tar xvzf airlines_parquet.tar.gz
$ cd airlines_parquet/
$ du -kch *.parq
253M 4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq
14M 4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.1.parq
253M 4345e5eef217aa1b-c8f16177f35fd984_501176748_data.0.parq
64M 4345e5eef217aa1b-c8f16177f35fd984_501176748_data.1.parq
184M 4345e5eef217aa1b-c8f16177f35fd985_1199995767_data.0.parq
241M 4345e5eef217aa1b-c8f16177f35fd986_2086627597_data.0.parq
212M 4345e5eef217aa1b-c8f16177f35fd987_1048668565_data.0.parq
152M 4345e5eef217aa1b-c8f16177f35fd988_1432111844_data.0.parq
1.4G total
Next, we put the Parquet data files in HDFS, all together in a single directory, with permissions on the directory and the files so that the impala
user will be able to read them.
After unpacking, we saw the largest Parquet file was 253 MB. When copying Parquet files into HDFS for Impala to use, for maximum query performance, make sure that each file resides in a single HDFS data block. Therefore, we pick a size larger than any single file and specify that as the block size, using the argument -Ddfs.block.size=253m
on the hdfs dfs -put
command.
$ sudo -u hdfs hdfs dfs -mkdir -p /user/impala/staging/airlines
$ sudo -u hdfs hdfs dfs -Ddfs.block.size=253m -put *.parq /user/impala/staging/airlines
$ sudo -u hdfs hdfs dfs -ls /user/impala/staging
Found 1 items
$ sudo -u hdfs hdfs dfs -ls /user/impala/staging/airlines
Found 8 items
Create Database and Tables
With the files in an accessible location in HDFS, you create a database table that uses the data in those files:
- The
CREATE EXTERNAL
syntax and theLOCATION
attribute point Impala at the appropriate HDFS directory. - The
LIKE PARQUET 'path_to_any_parquet_file'
clause means we skip the list of column names and types; Impala automatically gets the column names and data types straight from the data files. (Currently, this technique only works for Parquet files.) - Ignore the warning about lack of
READ_WRITE
access to the files in HDFS; theimpala
user can read the files, which will be sufficient for us to experiment with queries and perform some copy and transform operations into other tables.
$ impala-shell
> CREATE DATABASE airlines_data;
USE airlines_data;
CREATE EXTERNAL TABLE airlines_external
LIKE PARQUET 'hdfs:staging/airlines/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq'
STORED AS PARQUET LOCATION 'hdfs:staging/airlines';
WARNINGS: Impala does not have READ_WRITE access to path 'hdfs://myhost.com:8020/user/impala/staging'
Examine Physical and Logical Schema
With the table created, we examine its physical and logical characteristics to confirm that the data is really there and in a format and shape that we can work with.
- The
SHOW TABLE STATS
statement gives a very high-level summary of the table, showing how many files and how much total data it contains. Also, it confirms that the table is expecting all the associated data files to be in Parquet format. (The ability to work with all kinds of HDFS data files in different formats means that it is possible to have a mismatch between the format of the data files, and the format that the table expects the data files to be in.) - The
SHOW FILES
statement confirms that the data in the table has the expected number, names, and sizes of the original Parquet files. - The
DESCRIBE
statement (or its abbreviationDESC
) confirms the names and types of the columns that Impala automatically created after reading that metadata from the Parquet file. - The
DESCRIBE FORMATTED
statement prints out some extra detail along with the column definitions. The pieces we care about for this exercise are:- The containing database for the table.
- The location of the associated data files in HDFS.
- The table is an external table so Impala will not delete the HDFS files when we finish the experiments and drop the table.
- The table is set up to work exclusively with files in the Parquet format.
> SHOW TABLE STATS airlines_external;
+-------+--------+--------+--------------+-------------------+---------+-------------------+
| #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental stats |
+-------+--------+--------+--------------+-------------------+---------+-------------------+
| -1 | 8 | 1.34GB | NOT CACHED | NOT CACHED | PARQUET | false |
+-------+--------+--------+--------------+-------------------+---------+-------------------+
> SHOW FILES IN airlines_external;
+----------------------------------------------------------------------------------------+----------+-----------+
| path | size | partition |
+----------------------------------------------------------------------------------------+----------+-----------+
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq | 252.99MB | |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.1.parq | 13.43MB | |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd984_501176748_data.0.parq | 252.84MB | |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd984_501176748_data.1.parq | 63.92MB | |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd985_1199995767_data.0.parq | 183.64MB | |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd986_2086627597_data.0.parq | 240.04MB | |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd987_1048668565_data.0.parq | 211.35MB | |
| /user/impala/staging/airlines/4345e5eef217aa1b-c8f16177f35fd988_1432111844_data.0.parq | 151.46MB | |
+----------------------------------------------------------------------------------------+----------+-----------+
> DESCRIBE airlines_external;
+---------------------+--------+---------------------------------------------------+
| name | type | comment |
+---------------------+--------+---------------------------------------------------+
| year | int | Inferred from Parquet file. |
| month | int | Inferred from Parquet file. |
| day | int | Inferred from Parquet file. |
| dayofweek | int | Inferred from Parquet file. |
| dep_time | int | Inferred from Parquet file. |
| crs_dep_time | int | Inferred from Parquet file. |
| arr_time | int | Inferred from Parquet file. |
| crs_arr_time | int | Inferred from Parquet file. |
| carrier | string | Inferred from Parquet file. |
| flight_num | int | Inferred from Parquet file. |
| tail_num | int | Inferred from Parquet file. |
| actual_elapsed_time | int | Inferred from Parquet file. |
| crs_elapsed_time | int | Inferred from Parquet file. |
| airtime | int | Inferred from Parquet file. |
| arrdelay | int | Inferred from Parquet file. |
| depdelay | int | Inferred from Parquet file. |
| origin | string | Inferred from Parquet file. |
| dest | string | Inferred from Parquet file. |
| distance | int | Inferred from Parquet file. |
| taxi_in | int | Inferred from Parquet file. |
| taxi_out | int | Inferred from Parquet file. |
| cancelled | int | Inferred from Parquet file. |
| cancellation_code | string | Inferred from Parquet file. |
| diverted | int | Inferred from Parquet file. |
| carrier_delay | int | Inferred from Parquet file. |
| weather_delay | int | Inferred from Parquet file. |
| nas_delay | int | Inferred from Parquet file. |
| security_delay | int | Inferred from Parquet file. |
| late_aircraft_delay | int | Inferred from Parquet file. |
+---------------------+--------+---------------------------------------------------+
> DESCRIBE FORMATTED airlines_external;
+------------------------------+-------------------------------
| name | type
+------------------------------+-------------------------------
...
| # Detailed Table Information | NULL
| Database: | airlines_data
| Owner: | impala
...
| Location: | /user/impala/staging/airlines
| Table Type: | EXTERNAL_TABLE
...
| # Storage Information | NULL
| SerDe Library: | org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
| InputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputForma
| OutputFormat: | org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
...
Analyze Data
Now that we are confident that the connections are solid between the Impala table and the underlying Parquet files, we run some initial queries to understand the characteristics of the data: the overall number of rows, and the ranges and how many different values are in certain columns.
> SELECT COUNT(*) FROM airlines_external;
+-----------+
| count(*) |
+-----------+
| 123534969 |
+-----------+
The NDV()
function returns a number of distinct values, which, for performance reasons, is an estimate when there are lots of different values in the column, but is precise when the cardinality is less than 16 K. Use NDV()
function for this kind of exploration rather than COUNT(DISTINCT colname)
, because Impala can evaluate multiple NDV()
functions in a single query, but only a single instance of COUNT DISTINCT
.
> SElECT NDV(carrier), NDV(flight_num), NDV(tail_num),
NDV(origin), NDV(dest) FROM airlines_external;
+--------------+-----------------+---------------+-------------+-----------+
| ndv(carrier) | ndv(flight_num) | ndv(tail_num) | ndv(origin) | ndv(dest) |
+--------------+-----------------+---------------+-------------+-----------+
| 29 | 8463 | 3 | 342 | 349 |
+--------------+-----------------+---------------+-------------+-----------+
> SELECT tail_num, COUNT(*) AS howmany FROM airlines_external
GROUP BY tail_num;
+----------+-----------+
| tail_num | howmany |
+----------+-----------+
| NULL | 123122001 |
| 715 | 1 |
| 0 | 406405 |
| 112 | 6562 |
+----------+-----------+
> SELECT DISTINCT dest FROM airlines_external
WHERE dest NOT IN (SELECT origin FROM airlines_external);
+------+
| dest |
+------+
| CBM |
| SKA |
| LAR |
| RCA |
| LBF |
+------+
> SELECT DISTINCT dest FROM airlines_external
WHERE dest NOT IN (SELECT DISTINCT origin FROM airlines_external);
+------+
| dest |
+------+
| CBM |
| SKA |
| LAR |
| RCA |
| LBF |
+------+
> SELECT DISTINCT origin FROM airlines_external
WHERE origin NOT IN (SELECT DISTINCT dest FROM airlines_external);
Fetched 0 row(s) in 2.63
With the above queries, we see that there are modest numbers of different airlines, flight numbers, and origin and destination airports. Two things jump out from this query: the number of tail_num
values is much smaller than we might have expected, and there are more destination airports than origin airports. Let’s dig further. What we find is that most tail_num
values are NULL
. It looks like this was an experimental column that wasn’t filled in accurately. We make a mental note that if we use this data as a starting point, we’ll ignore this column. We also find that certain airports are represented in the ORIGIN
column but not the DEST
column; now we know that we cannot rely on the assumption that those sets of airport codes are identical.
Note: The first SELECT DISTINCT DEST
query takes almost 40 seconds. We expect all queries on such a small data set, less than 2 GB, to take a few seconds at most. The reason is because the expression NOT IN (SELECT origin FROM airlines_external)
produces an intermediate result set of 123 million rows, then runs 123 million comparisons on each data node against the tiny set of destination airports. The way the NOT IN
operator works internally means that this intermediate result set with 123 million rows might be transmitted across the network to each data node in the cluster. Applying another DISTINCT
inside the NOT IN
subquery means that the intermediate result set is only 340 items, resulting in much less network traffic and fewer comparison operations. The more efficient query with the added DISTINCT
is approximately 7 times as fast.
Next, we try doing a simple calculation, with results broken down by year. This reveals that some years have no data in the airtime
column. That means we might be able to use that column in queries involving certain date ranges, but we cannot count on it to always be reliable. The question of whether a column contains any NULL
values, and if so what is their number, proportion, and distribution, comes up again and again when doing initial exploration of a data set.
> SELECT year, SUM(airtime) FROM airlines_external
GROUP BY year ORDER BY year DESC;
+------+--------------+
| year | sum(airtime) |
+------+--------------+
| 2008 | 713050445 |
| 2007 | 748015545 |
| 2006 | 720372850 |
| 2005 | 708204026 |
| 2004 | 714276973 |
| 2003 | 665706940 |
| 2002 | 549761849 |
| 2001 | 590867745 |
| 2000 | 583537683 |
| 1999 | 561219227 |
| 1998 | 538050663 |
| 1997 | 536991229 |
| 1996 | 519440044 |
| 1995 | 513364265 |
| 1994 | NULL |
| 1993 | NULL |
| 1992 | NULL |
| 1991 | NULL |
| 1990 | NULL |
| 1989 | NULL |
| 1988 | NULL |
| 1987 | NULL |
+------+--------------+
With the notion of NULL
values in mind, let’s come back to the tail_num
column that we discovered had a lot of NULL
s. Let’s quantify the NULL
and non-NULL
values in that column for better understanding. First, we just count the overall number of rows versus the non-NULL
values in that column. That initial result gives the appearance of relatively few non-NULL
values, but we can break it down more clearly in a single query. Once we have the COUNT(*)
and the COUNT(colname)
numbers, we can encode that initial query in a WITH
clause, then run a follow-on query that performs multiple arithmetic operations on those values. Seeing that only one-third of one percent of all rows have non-NULL
values for the tail_num
column clearly illustrates that column is not of much use.
> SELECT COUNT(*) AS 'rows', COUNT(tail_num) AS 'non-null tail numbers'
FROM airlines_external;
+-----------+-----------------------+
| rows | non-null tail numbers |
+-----------+-----------------------+
| 123534969 | 412968 |
+-----------+-----------------------+
> WITH t1 AS
(SELECT COUNT(*) AS 'rows', COUNT(tail_num) AS 'nonnull'
FROM airlines_external)
SELECT `rows`, `nonnull`, `rows` - `nonnull` AS 'nulls',
(`nonnull` / `rows`) * 100 AS 'percentage non-null'
FROM t1;
+-----------+---------+-----------+---------------------+
| rows | nonnull | nulls | percentage non-null |
+-----------+---------+-----------+---------------------+
| 123534969 | 412968 | 123122001 | 0.3342923897119365 |
+-----------+---------+-----------+---------------------+
By examining other columns using these techniques, we can form a mental picture of the way data is distributed throughout the table, and which columns are most significant for query purposes. For this tutorial, we focus mostly on the fields likely to hold discrete values, rather than columns such as actual_elapsed_time
whose names suggest they hold measurements. We would dig deeper into those columns once we had a clear picture of which questions were worthwhile to ask, and what kinds of trends we might look for. For the final piece of initial exploration, let’s look at the year
column. A simple GROUP BY
query shows that it has a well-defined range, a manageable number of distinct values, and relatively even distribution of rows across the different years.
> SELECT MIN(year), MAX(year), NDV(year) FROM airlines_external;
+-----------+-----------+-----------+
| min(year) | max(year) | ndv(year) |
+-----------+-----------+-----------+
| 1987 | 2008 | 22 |
+-----------+-----------+-----------+
> SELECT year, COUNT(*) howmany FROM airlines_external
GROUP BY year ORDER BY year DESC;
+------+---------+
| year | howmany |
+------+---------+
| 2008 | 7009728 |
| 2007 | 7453215 |
| 2006 | 7141922 |
| 2005 | 7140596 |
| 2004 | 7129270 |
| 2003 | 6488540 |
| 2002 | 5271359 |
| 2001 | 5967780 |
| 2000 | 5683047 |
| 1999 | 5527884 |
| 1998 | 5384721 |
| 1997 | 5411843 |
| 1996 | 5351983 |
| 1995 | 5327435 |
| 1994 | 5180048 |
| 1993 | 5070501 |
| 1992 | 5092157 |
| 1991 | 5076925 |
| 1990 | 5270893 |
| 1989 | 5041200 |
| 1988 | 5202096 |
| 1987 | 1311826 |
+------+---------+
We could go quite far with the data in this initial raw format, just as we downloaded it from the web. If the data set proved to be useful and worth persisting in Impala for extensive queries, we might want to copy it to an internal table, letting Impala manage the data files and perhaps reorganizing a little for higher efficiency. In this next stage of the tutorial, we copy the original data into a partitioned table, still in Parquet format. Partitioning based on the year
column lets us run queries with clauses such as WHERE year = 2001
or WHERE year BETWEEN 1989 AND 1999
, which can dramatically cut down on I/O by ignoring all the data from years outside the desired range. Rather than reading all the data and then deciding which rows are in the matching years, Impala can zero in on only the data files from specific year
partitions. To do this, Impala physically reorganizes the data files, putting the rows from each year into data files in a separate HDFS directory for each year
value. Along the way, we’ll also get rid of the tail_num
column that proved to be almost entirely NULL
.
The first step is to create a new table with a layout very similar to the original airlines_external
table. We’ll do that by reverse-engineering a CREATE TABLE
statement for the first table, then tweaking it slightly to include a PARTITION BY
clause for year
, and excluding the tail_num
column. The SHOW CREATE TABLE
statement gives us the starting point.
Although we could edit that output into a new SQL statement, all the ASCII box characters make such editing inconvenient. To get a more stripped-down CREATE TABLE
to start with, we restart the impala-shell command with the -B
option, which turns off the box-drawing behavior.
$ impala-shell -i localhost -B -d airlines_data;
> SHOW CREATE TABLE airlines_external;
"CREATE EXTERNAL TABLE airlines_data.airlines_external (
year INT COMMENT 'inferred from: optional int32 year',
month INT COMMENT 'inferred from: optional int32 month',
day INT COMMENT 'inferred from: optional int32 day',
dayofweek INT COMMENT 'inferred from: optional int32 dayofweek',
dep_time INT COMMENT 'inferred from: optional int32 dep_time',
crs_dep_time INT COMMENT 'inferred from: optional int32 crs_dep_time',
arr_time INT COMMENT 'inferred from: optional int32 arr_time',
crs_arr_time INT COMMENT 'inferred from: optional int32 crs_arr_time',
carrier STRING COMMENT 'inferred from: optional binary carrier',
flight_num INT COMMENT 'inferred from: optional int32 flight_num',
tail_num INT COMMENT 'inferred from: optional int32 tail_num',
actual_elapsed_time INT COMMENT 'inferred from: optional int32 actual_elapsed_time',
crs_elapsed_time INT COMMENT 'inferred from: optional int32 crs_elapsed_time',
airtime INT COMMENT 'inferred from: optional int32 airtime',
arrdelay INT COMMENT 'inferred from: optional int32 arrdelay',
depdelay INT COMMENT 'inferred from: optional int32 depdelay',
origin STRING COMMENT 'inferred from: optional binary origin',
dest STRING COMMENT 'inferred from: optional binary dest',
distance INT COMMENT 'inferred from: optional int32 distance',
taxi_in INT COMMENT 'inferred from: optional int32 taxi_in',
taxi_out INT COMMENT 'inferred from: optional int32 taxi_out',
cancelled INT COMMENT 'inferred from: optional int32 cancelled',
cancellation_code STRING COMMENT 'inferred from: optional binary cancellation_code',
diverted INT COMMENT 'inferred from: optional int32 diverted',
carrier_delay INT COMMENT 'inferred from: optional int32 carrier_delay',
weather_delay INT COMMENT 'inferred from: optional int32 weather_delay',
nas_delay INT COMMENT 'inferred from: optional int32 nas_delay',
security_delay INT COMMENT 'inferred from: optional int32 security_delay',
late_aircraft_delay INT COMMENT 'inferred from: optional int32 late_aircraft_delay'
)
STORED AS PARQUET
LOCATION 'hdfs://a1730.example.com:8020/user/impala/staging/airlines'
TBLPROPERTIES ('numFiles'='0', 'COLUMN_STATS_ACCURATE'='false',
'transient_lastDdlTime'='1439425228', 'numRows'='-1', 'totalSize'='0',
'rawDataSize'='-1')"
After copying and pasting the CREATE TABLE
statement into a text editor for fine-tuning, we quit and restart impala-shell without the -B
option, to switch back to regular output.
Next we run the CREATE TABLE
statement that we adapted from the SHOW CREATE TABLE
output. We kept the STORED AS PARQUET
clause because we want to rearrange the data somewhat but still keep it in the high-performance Parquet format. The LOCATION
and TBLPROPERTIES
clauses are not relevant for this new table, so we edit those out. Because we are going to partition the new table based on the year
column, we move that column name (and its type) into a new PARTITIONED BY
clause.
> CREATE TABLE airlines_data.airlines
(month INT,
day INT,
dayofweek INT,
dep_time INT,
crs_dep_time INT,
arr_time INT,
crs_arr_time INT,
carrier STRING,
flight_num INT,
actual_elapsed_time INT,
crs_elapsed_time INT,
airtime INT,
arrdelay INT,
depdelay INT,
origin STRING,
dest STRING,
distance INT,
taxi_in INT,
taxi_out INT,
cancelled INT,
cancellation_code STRING,
diverted INT,
carrier_delay INT,
weather_delay INT,
nas_delay INT,
security_delay INT,
late_aircraft_delay INT)
PARTITIONED BY (year INT)
STORED AS PARQUET
;
Next, we copy all the rows from the original table into this new one with an INSERT
statement. (We edited the CREATE TABLE
statement to make an INSERT
statement with the column names in the same order.) The only change is to add a PARTITION(year)
clause, and move the year
column to the very end of the SELECT
list of the INSERT
statement. Specifying PARTITION(year)
, rather than a fixed value such as PARTITION(year=2000)
, means that Impala figures out the partition value for each row based on the value of the very last column in the SELECT
list. This is the first SQL statement that legitimately takes any substantial time, because the rows from different years are shuffled around the cluster; the rows that go into each partition are collected on one node, before being written to one or more new data files.
> INSERT INTO airlines_data.airlines
PARTITION (year)
SELECT
month,
day,
dayofweek,
dep_time,
crs_dep_time,
arr_time,
crs_arr_time,
carrier,
flight_num,
actual_elapsed_time,
crs_elapsed_time,
airtime,
arrdelay,
depdelay,
origin,
dest,
distance,
taxi_in,
taxi_out,
cancelled,
cancellation_code,
diverted,
carrier_delay,
weather_delay,
nas_delay,
security_delay,
late_aircraft_delay,
year
FROM airlines_data.airlines_external;
Once partitioning or join queries come into play, it’s important to have statistics that Impala can use to optimize queries on the corresponding tables. The COMPUTE INCREMENTAL STATS
statement is the way to collect statistics for partitioned tables. Then the SHOW TABLE STATS
statement confirms that the statistics are in place for each partition, and also illustrates how many files and how much raw data is in each partition.
> COMPUTE INCREMENTAL STATS airlines;
+-------------------------------------------+
| summary |
+-------------------------------------------+
| Updated 22 partition(s) and 27 column(s). |
+-------------------------------------------+
> SHOW TABLE STATS airlines;
+-------+-----------+--------+----------+--------------+-------------------+---------+-------------------+----------------------------------------------------------------------------------------------------------+
| year | #Rows | #Files | Size | Bytes Cached | Cache Replication | Format | Incremental stats | Location |
+-------+-----------+--------+----------+--------------+-------------------+---------+-------------------+----------------------------------------------------------------------------------------------------------+
| 1987 | 1311826 | 1 | 11.75MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1987 |
| 1988 | 5202096 | 1 | 44.04MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1988 |
| 1989 | 5041200 | 1 | 46.07MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1989 |
| 1990 | 5270893 | 1 | 46.25MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1990 |
| 1991 | 5076925 | 1 | 46.77MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1991 |
| 1992 | 5092157 | 1 | 48.21MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1992 |
| 1993 | 5070501 | 1 | 47.46MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1993 |
| 1994 | 5180048 | 1 | 47.47MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1994 |
| 1995 | 5327435 | 1 | 62.40MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1995 |
| 1996 | 5351983 | 1 | 62.93MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1996 |
| 1997 | 5411843 | 1 | 65.05MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1997 |
| 1998 | 5384721 | 1 | 62.21MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1998 |
| 1999 | 5527884 | 1 | 65.10MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=1999 |
| 2000 | 5683047 | 1 | 67.68MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2000 |
| 2001 | 5967780 | 1 | 74.03MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2001 |
| 2002 | 5271359 | 1 | 74.00MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2002 |
| 2003 | 6488540 | 1 | 99.35MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2003 |
| 2004 | 7129270 | 1 | 123.29MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2004 |
| 2005 | 7140596 | 1 | 120.72MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2005 |
| 2006 | 7141922 | 1 | 121.88MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2006 |
| 2007 | 7453215 | 1 | 130.87MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2007 |
| 2008 | 7009728 | 1 | 123.14MB | NOT CACHED | NOT CACHED | PARQUET | true | hdfs://myhost.com:8020/user/hive/warehouse/airline_data.db/airlines/year=2008 |
| Total | 123534969 | 22 | 1.55GB | 0B | | | | |
+-------+-----------+--------+----------+--------------+-------------------+---------+-------------------+----------------------------------------------------------------------------------------------------------+
At this point, we sanity check the partitioning we did. All the partitions have exactly one file, which is on the low side. A query that includes a clause WHERE year=2004
will only read a single data block; that data block will be read and processed by a single data node; therefore, for a query targeting a single year, all the other nodes in the cluster will sit idle while all the work happens on a single machine. It’s even possible that by chance (depending on HDFS replication factor and the way data blocks are distributed across the cluster), that multiple year partitions selected by a filter such as WHERE year BETWEEN 1999 AND 2001
could all be read and processed by the same data node. The more data files each partition has, the more parallelism you can get and the less probability of “hotspots” occurring on particular nodes, therefore a bigger performance boost by having a big cluster.
However, the more data files, the less data goes in each one. The overhead of dividing the work in a parallel query might not be worth it if each node is only reading a few megabytes. 50 or 100 megabytes is a decent size for a Parquet data block; 9 or 37 megabytes is on the small side. Which is to say, the data distribution we ended up with based on this partitioning scheme is on the borderline between sensible (reasonably large files) and suboptimal (few files in each partition). The way to see how well it works in practice is to run the same queries against the original flat table and the new partitioned table, and compare times.
Spoiler: in this case, with my particular 4-node cluster with its specific distribution of data blocks and my particular exploratory queries, queries against the partitioned table do consistently run faster than the same queries against the unpartitioned table. But I could not be sure that would be the case without some real measurements. Here are some queries I ran to draw that conclusion, first against airlines_external
(no partitioning), then against AIRLINES
(partitioned by year). The AIRLINES
queries are consistently faster. Changing the volume of data, changing the size of the cluster, running queries that did or didn’t refer to the partition key columns, or other factors could change the results to favor one table layout or the other.
Note: If you find the volume of each partition is only in the low tens of megabytes, consider lowering the granularity of partitioning. For example, instead of partitioning by year, month, and day, partition by year and month or even just by year. The ideal layout to distribute work efficiently in a parallel query is many tens or even hundreds of megabytes per Parquet file, and the number of Parquet files in each partition somewhat higher than the number of data nodes.
> SELECT SUM(airtime) FROM airlines_external;
+--------------+
| 8662859484 |
+--------------+
> SELECT SUM(airtime) FROM airlines;
+--------------+
| 8662859484 |
+--------------+
> SELECT SUM(airtime) FROM airlines_external WHERE year = 2005;
+--------------+
| 708204026 |
+--------------+
> SELECT SUM(airtime) FROM airlines WHERE year = 2005;
+--------------+
| 708204026 |
+--------------+
Now we can finally analyze this data set that from the raw data files and we didn’t know what columns they contained. Let’s see whether the airtime
of a flight tends to be different depending on the day of the week. We can see that the average is a little higher on day number 6; perhaps Saturday is a busy flying day and planes have to circle for longer at the destination airport before landing.
> SELECT dayofweek, AVG(airtime) FROM airlines
GROUP BY dayofweek ORDER BY dayofweek;
+-----------+-------------------+
| dayofweek | avg(airtime) |
+-----------+-------------------+
| 1 | 102.1560425016671 |
| 2 | 102.1582931538807 |
| 3 | 102.2170009256653 |
| 4 | 102.37477661846 |
| 5 | 102.2697358763511 |
| 6 | 105.3627448363705 |
| 7 | 103.4144351202054 |
+-----------+-------------------+
To see if the apparent trend holds up over time, let’s do the same breakdown by day of week, but also split up by year. Now we can see that day number 6 consistently has a higher average air time in each year. We can also see that the average air time increased over time across the board. And the presence of NULL
for this column in years 1987 to 1994 shows that queries involving this column need to be restricted to a date range of 1995 and higher.
> SELECT year, dayofweek, AVG(airtime) FROM airlines
GROUP BY year, dayofweek ORDER BY year DESC, dayofweek;
+------+-----------+-------------------+
| year | dayofweek | avg(airtime) |
+------+-----------+-------------------+
| 2008 | 1 | 103.1821651651355 |
| 2008 | 2 | 103.2149301386094 |
| 2008 | 3 | 103.0585076622796 |
| 2008 | 4 | 103.4671383539038 |
| 2008 | 5 | 103.5575385182659 |
| 2008 | 6 | 107.4006306562128 |
| 2008 | 7 | 104.8648851041755 |
| 2007 | 1 | 102.2196114337825 |
| 2007 | 2 | 101.9317791906348 |
| 2007 | 3 | 102.0964767689043 |
| 2007 | 4 | 102.6215927201686 |
| 2007 | 5 | 102.4289399000661 |
| 2007 | 6 | 105.1477448215756 |
| 2007 | 7 | 103.6305945644095 |
...
| 1996 | 1 | 99.33860750862108 |
| 1996 | 2 | 99.54225446396656 |
| 1996 | 3 | 99.41129336113134 |
| 1996 | 4 | 99.5110373340348 |
| 1996 | 5 | 99.22120745027595 |
| 1996 | 6 | 101.1717447111921 |
| 1996 | 7 | 99.95410136133704 |
| 1995 | 1 | 96.93779698300494 |
| 1995 | 2 | 96.93458674589712 |
| 1995 | 3 | 97.00972311337051 |
| 1995 | 4 | 96.90843832024412 |
| 1995 | 5 | 96.78382115425562 |
| 1995 | 6 | 98.70872826057003 |
| 1995 | 7 | 97.85570478374616 |
| 1994 | 1 | NULL |
| 1994 | 2 | NULL |
| 1994 | 3 | NULL |
...
| 1987 | 5 | NULL |
| 1987 | 6 | NULL |
| 1987 | 7 | NULL |
+------+-----------+-------------------+