OLAP Quickstart
OLAP (OnLine Analysis Processing) is a key technology in the field of data analysis, it is generally used to perform complex queries on large data sets with latencies in seconds. Now Flink can not only support streaming and batch computing, but also supports users to deploy it as an OLAP computing service. This page will show you how to quickly set up a local Flink OLAP service, and will also introduce some best practices helping you deploy Flink OLAP service in production.
Architecture Introduction
This chapter will introduce you to the overall architecture of Flink OLAP service and the advantages of using it.
Architecture
Flink OLAP service consists of three parts: Client, Flink SQL Gateway and Flink Session Cluster.
- Client: Could be any client that can interact with Flink SQL Gateway, such as SQL Client, Flink JDBC Driver and so on.
- Flink SQL Gateway: The SQL Gateway provides an easy way to parse the sql query, look up the metadata, analyze table stats, optimize the plan and submit JobGraphs to cluster.
- Flink Session Cluster: OLAP queries run on session cluster, mainly to avoid the overhead of cluster startup.
Advantage
- Massively Parallel Processing
- Flink OLAP runs naturally as a massively parallel processing system, which enables planners to easily adjust the job parallelism to fulfill queries’ latency requirement under different data sizes.
- Elastic Resource Management
- Flink’s resource management supports min/max scaling, which means the session cluster can allocate the resource according to workload dynamically.
- Reuse Connectors
- Flink OLAP can reuse the rich Connectors in Flink ecosystem.
- Unified Engine
- Unified computing engine for Streaming/Batch/OLAP.
Deploying in Local Mode
In this chapter, you will learn how to build Flink OLAP services locally.
Downloading Flink
The same as Local Installation. Flink runs on all UNIX-like environments, i.e. Linux, Mac OS X, and Cygwin (for Windows). User need to have at Java 11 installed. To check the Java version installed, user can type in the terminal:
java -version
Next, Download the latest binary release of Flink, then extract the archive:
tar -xzf flink-*.tgz
Starting a local cluster
To start a local cluster, run the bash script that comes with Flink:
./bin/start-cluster.sh
You should be able to navigate to the web UI at http://localhost:8081 to view the Flink dashboard and see that the cluster is up and running.
Start a SQL Client CLI
You can start the CLI with an embedded gateway by calling:
./bin/sql-client.sh
Running Queries
You could simply execute queries in CLI and retrieve the results.
SET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'number-of-rows' = '100000'
);
SELECT buyer, SUM(price) AS total_cost
FROM Orders
GROUP BY buyer
ORDER BY total_cost LIMIT 3;
And then you could find job detail information in web UI at http://localhost:8081.
Deploying in Production
This section guides you through setting up a production ready Flink OLAP service.
Client
Flink JDBC Driver
You should use Flink JDBC Driver when submitting queries to SQL Gateway since it provides low-level connection management. When used in production, you should pay attention to reuse the JDBC connection to avoid frequently creating/closing sessions in the Gateway and then reduce the E2E query latency. For detailed information, please refer to the [Flink JDBC Driver]({{ <ref “docs/dev/table/jdbcDriver”> }}).
Cluster Deployment
In production, you should use Flink Session Cluster, Flink SQL Gateway to build an OLAP service.
Session Cluster
For Flink Session Cluster, you can deploy it on Native Kubernetes using session mode. Kubernetes is a popular container-orchestration system for automating computer application deployment, scaling, and management. By deploying on Native Kubernetes, Flink Session Cluster is able to dynamically allocate and de-allocate TaskManagers. For more information, please refer to [Native Kubernetes]({{ < ref “docs/deployment/resource-providers/native_kubernetes”> }}). Furthermore, you can config the option slotmanager.number-of-slots.min in session cluster. This will help you significantly reduce the cold start time of your query. For detailed information, please refer to FLIP-362.
SQL Gateway
For Flink SQL Gateway, you should deploy it as a stateless microservice and register the instance on service discovery component. Through this way, client can balance the query between instances easily. For more information, please refer to SQL Gateway Overview.
Datasource Configurations
Catalogs
In OLAP scenario, you should configure FileCatalogStore
provided by Catalogs as the catalog used by cluster. As a long-running service, Flink OLAP cluster’s catalog information will not change frequently and should be re-used cross sessions to reduce the cold-start cost. For more information, please refer to the Catalog Store.
Connectors
Both Session Cluster and SQL Gateway rely on connectors to analyze table stats and read data from the configured data source. To add connectors, please refer to the Connectors.
Recommended Cluster Configurations
In OLAP scenario, appropriate configurations that can greatly help users improve the overall usability and query performance. Here are some recommended production configurations:
SQL&Table Options
Parameters | Default | Recommended |
---|---|---|
table.optimizer.join-reorder-enabled | false | true |
pipeline.object-reuse | false | true |
sql-gateway.session.plan-cache.enabled | false | true |
Runtime Options
Parameters | Default | Recommended |
---|---|---|
execution.runtime-mode | STREAMING | BATCH |
execution.batch-shuffle-mode | ALL_EXCHANGES_BLOCKING | ALL_EXCHANGES_PIPELINED |
env.java.opts.all | {default value} | {default value} -XX:PerMethodRecompilationCutoff=10000 -XX:PerBytecodeRecompilationCutoff=10000-XX:ReservedCodeCacheSize=512M -XX:+UseZGC |
JDK Version | 11 | 17 |
Using JDK17 within ZGC can greatly help optimize the metaspace garbage collection issue, detailed information can be found in FLINK-32746. Meanwhile, ZGC can provide close to zero application pause time when collecting garbage objects in memory. Additionally, OLAP queries need to be executed in BATCH
mode because both Pipelined
and Blocking
edges may appear in the execution plan of an OLAP query. Batch scheduler allows queries to be scheduled in stages, which could avoid scheduling deadlocks in this scenario.
Scheduling Options
Parameters | Default | Recommended |
---|---|---|
jobmanager.scheduler | Default | Default |
jobmanager.execution.failover-strategy | region | full |
restart-strategy.type | (none) | disable |
jobstore.type | File | Memory |
jobstore.max-capacity | Integer.MAX_VALUE | 500 |
Network Options
Parameters | Default | Recommended |
---|---|---|
rest.server.numThreads | 4 | 32 |
web.refresh-interval | 3000 | 300000 |
pekko.framesize | 10485760b | 104857600b |
ResourceManager Options
Parameters | Default | Recommended |
---|---|---|
kubernetes.jobmanager.replicas | 1 | 2 |
kubernetes.jobmanager.cpu.amount | 1.0 | 16.0 |
jobmanager.memory.process.size | (none) | 32g |
jobmanager.memory.jvm-overhead.max | 1g | 3g |
kubernetes.taskmanager.cpu.amount | (none) | 16 |
taskmanager.numberOfTaskSlots | 1 | 32 |
taskmanager.memory.process.size | (none) | 65536m |
taskmanager.memory.managed.size | (none) | 16384m |
slotmanager.number-of-slots.min | 0 | {taskManagerNumber * numberOfTaskSlots} |
You can configure slotmanager.number-of-slots.min
to a proper value as the reserved resource pool serving OLAP queries. TaskManager should configure with a large resource specification in OLAP scenario since this can put more computations in local and reduce network/deserialization/serialization overhead. Meanwhile, as a single point of calculation in OLAP, JobManager also prefer large resource specification.
Future Work
Flink OLAP is now part of Apache Flink Roadmap, which means the community will keep putting efforts to improve Flink OLAP, both in usability and query performance. Relevant work are traced in underlying tickets: