Running Spark on Alluxio

Slack Docker Pulls GitHub edit source

This guide describes how to configure Apache Spark to access Alluxio.

Overview

Applications of Spark 1.1 or later can access an Alluxio cluster through its HDFS-compatible interface out-of-the-box. Using Alluxio as the data access layer, Spark applications can transparently access data in many different types and instances of persistent storage services (e.g., AWS S3 buckets, Azure Object Store buckets, remote HDFS deployments and etc). Data can be actively fetched or transparently cached into Alluxio to speed up the I/O performance especially when Spark deployment is remote to data. In addition, Alluxio can help simplify the architecture by decoupling compute and physical storage. When the real data path in persistent under storage is hidden from Spark, a change to under storages can be independent from application logic; meanwhile as a near-compute cache Alluxio can still provide compute frameworks like Spark data-locality.

Prerequisites

  • Setup Java for Java 8 Update 60 or higher (8u60+), 64-bit.
  • Alluxio has been set up and is running. This guide assumes the persistent under storage is a local HDFS deployment. E.g., a line of alluxio.master.mount.table.root.ufs=hdfs://localhost:9000/alluxio/ is included in ${ALLUXIO_HOME}/conf/alluxio-site.properties. Note that Alluxio supports many other under storage systems in addition to HDFS. To access data from any number of those systems is orthogonal to the focus of this guide but covered by Unified and Transparent Namespace.
  • Make sure that the Alluxio client jar is available. This Alluxio client jar file can be found at /<PATH_TO_ALLUXIO>/client/alluxio-2.0.1-client.jar in the tarball downloaded from Alluxio download page. Alternatively, advanced users can compile this client jar from the source code by following the instructions.

Basic Setup

Distribute the Alluxio client jar across the nodes where Spark drivers or executors are running. Specifically, put the client jar on the same local path (e.g. /<PATH_TO_ALLUXIO>/client/alluxio-2.0.1-client.jar) on each node.

Add the Alluxio client jar to the classpath of Spark drivers and executors in order for Spark applications to use the client jar to read and write files in Alluxio. Specifically, add the following line to spark/conf/spark-defaults.conf on every node running Spark. Also, make sure the client jar is copied to every node running Spark.

  1. spark.driver.extraClassPath /<PATH_TO_ALLUXIO>/client/alluxio-2.0.1-client.jar
  2. spark.executor.extraClassPath /<PATH_TO_ALLUXIO>/client/alluxio-2.0.1-client.jar

Examples: Use Alluxio as Input and Output

This section shows how to use Alluxio as input and output sources for your Spark applications.

Access Data Only in Alluxio

Copy local data to the Alluxio file system. Put the file LICENSE into Alluxio, assuming you are in the Alluxio project directory:

  1. $ ./bin/alluxio fs copyFromLocal LICENSE /Input

Run the following commands from spark-shell, assuming Alluxio Master is running on localhost:

  1. > val s = sc.textFile("alluxio://localhost:19998/Input")
  2. > val double = s.map(line => line + line)
  3. > double.saveAsTextFile("alluxio://localhost:19998/Output")

Open your browser and check http://localhost:19999/browse. There should be an output directory /Output which contains the doubled content of the input file Input.

Access Data in Under Storage

Alluxio supports transparently fetching the data from the under storage system, given the exact path. For this section, HDFS is used as an example of a distributed under storage system.

Put a file Input_HDFS into HDFS:

  1. $ hdfs dfs -put -f ${ALLUXIO_HOME}/LICENSE hdfs://localhost:9000/alluxio/Input_HDFS

Note that Alluxio has no notion of the file. You can verify this by going to the web UI. Run the following commands from spark-shell, assuming Alluxio Master is running on localhost:

  1. > val s = sc.textFile("alluxio://localhost:19998/Input_HDFS")
  2. > val double = s.map(line => line + line)
  3. > double.saveAsTextFile("alluxio://localhost:19998/Output_HDFS")

Open your browser and check http://localhost:19999/browse. There should be an output directory Output_HDFS which contains the doubled content of the input file Input_HDFS. Also, the input file Input_HDFS now will be 100% loaded in the Alluxio file system space.

Advanced Setup

Configure Spark to find Alluxio cluster in HA mode

When connecting to the Alluxio HA cluster using internal leader election, add the following lines to ${SPARK_HOME}/conf/spark-defaults.conf so Spark applications can know the Alluxio masters to connect to and find out the leader master.

  1. spark.driver.extraJavaOptions -Dalluxio.master.rpc.addresses=master_hostname_1:19998,master_hostname_2:19998,master_hostname_3:19998
  2. spark.executor.extraJavaOptions -Dalluxio.master.rpc.addresses=master_hostname_1:19998,master_hostname_2:19998,master_hostname_3:19998

Alternatively you can add the property to the Hadoop configuration file ${SPARK_HOME}/conf/core-site.xml:

  1. <configuration>
  2. <property>
  3. <name>alluxio.master.rpc.addresses</name>
  4. <value>master_hostname_1:19998,master_hostname_2:19998,master_hostname_3:19998</value>
  5. </property>
  6. </configuration>

Similarly, users can also configure Spark to find Alluxio HA cluster using Zookeeper-based leader election, please refer to HA mode client configuration parameters.

Customize Alluxio User Properties for Individual Spark Jobs

Spark users can use pass JVM system properties to Spark jobs by adding "-Dproperty=value" to spark.executor.extraJavaOptions for Spark executors and spark.driver.extraJavaOptions for Spark drivers. For example, to submit a Spark job with the write CACHE_THROUGH when writing to Alluxio:

  1. $ spark-submit \
  2. --conf 'spark.driver.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH' \
  3. --conf 'spark.executor.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH' \
  4. ...

To customize Alluxio client-side properties in a Spark job, see how to configure Spark Jobs.

Note that, in client mode you need set --driver-java-options "-Dalluxio.user.file.writetype.default=CACHE_THROUGH" instead of --conf spark.driver.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH (see explanation).

Advanced Usage

Access Data from Alluxio in HA Mode

If Spark is set up by the instructions in Configure Spark to find Alluxio cluster in HA mode, you can write URIs using the “alluxio://” scheme without specifying cluster information in the authority. This is because in HA mode, the address of leader Alluxio master will be served by the internal leader election or by the configured Zookeeper service.

  1. > val s = sc.textFile("alluxio:///Input")
  2. > val double = s.map(line => line + line)
  3. > double.saveAsTextFile("alluxio:///Output")

Alternatively, one can use the HA authority in URI directly without any configuration setup. For example, specify the master rpc addresses in the URI to connect to Alluxio HA cluster using internal leader election:

  1. > val s = sc.textFile("alluxio://master_hostname_1:19998;master_hostname_2:19998;master_hostname_3:19998/Input")
  2. > val double = s.map(line => line + line)
  3. > double.saveAsTextFile("alluxio://master_hostname_1:19998;master_hostname_2:19998;master_hostname_3:19998/Output")

Note that you must use semicolons rather than commas to separate different addresses to refer a URI of Alluxio in HA mode in Spark. Otherwise, the URI will be considered invalid by Spark. Please refer to the instructions in HA authority.

Cache RDD into Alluxio

Storing RDDs in Alluxio memory is simply saving the RDD as a file to Alluxio. Two common ways to save RDDs as files in Alluxio are

  1. saveAsTextFile: writes the RDD as a text file, where each element is a line in the file,
  2. saveAsObjectFile: writes the RDD out to a file, by using Java serialization on each element.

The saved RDDs in Alluxio can be read again (from memory) by using sc.textFile or sc.objectFile respectively.

  1. // as text file
  2. > rdd.saveAsTextFile("alluxio://localhost:19998/rdd1")
  3. > rdd = sc.textFile("alluxio://localhost:19998/rdd1")
  4. // as object file
  5. > rdd.saveAsObjectFile("alluxio://localhost:19998/rdd2")
  6. > rdd = sc.objectFile("alluxio://localhost:19998/rdd2")

See the blog article “Effective Spark RDDs with Alluxio”.

Cache Dataframe into Alluxio

Storing Spark DataFrames in Alluxio memory is simply saving the DataFrame as a file to Alluxio. DataFrames are commonly written as parquet files, with df.write.parquet(). After the parquet is written to Alluxio, it can be read from memory by using sqlContext.read.parquet().

  1. > df.write.parquet("alluxio://localhost:19998/data.parquet")
  2. > df = sqlContext.read.parquet("alluxio://localhost:19998/data.parquet")

See the blog article “Effective Spark DataFrames with Alluxio”.

TroubleShooting

Logging Configuration

You may configure Spark’s application logging for debugging purposes. Spark’s documentation explains how to configure logging for a Spark application.

If you are using YARN then there is a separate section which explains how to configure logging with YARN for a Spark application.

Check Spark is Correctly Set Up

To ensure that your Spark can correctly work with Alluxio before running Spark, a tool that comes with Alluxio can help check the configuration.

When you have a running Spark cluster (or Spark standalone) of version 2.x, you can run the following command in the Alluxio project directory:

  1. $ integration/checker/bin/alluxio-checker.sh spark <spark master uri>

For example,

  1. $ integration/checker/bin/alluxio-checker.sh spark spark://sparkMaster:7077

This command will report potential problems that might prevent you from running Spark on Alluxio.

You can use -h to display helpful information about the command.

Incorrect Data Locality Level of Spark Tasks

If Spark task locality is ANY while it should be NODE_LOCAL, it is probably because Alluxio and Spark use different network address representations, maybe one of them uses hostname while another uses IP address. Please refer to JIRA ticket SPARK-10149 for more details, where you can find solutions from the Spark community.

Note: Alluxio workers use hostnames to represent network addresses to be consistent with HDFS. There is a workaround when launching Spark to achieve data locality. Users can explicitly specify hostnames by using the following script offered in Spark. Start Spark Worker in each slave node with slave-hostname:

  1. $ ${SPARK_HOME}/sbin/start-slave.sh -h <slave-hostname> <spark master uri>

For example:

  1. $ ${SPARK_HOME}/sbin/start-slave.sh -h simple30 spark://simple27:7077

You can also set the SPARK_LOCAL_HOSTNAME in $SPARK_HOME/conf/spark-env.sh to achieve this. For example:

  1. SPARK_LOCAL_HOSTNAME=simple30

In either way, the Spark Worker addresses become hostnames and Locality Level becomes NODE_LOCAL as shown in Spark WebUI below.

hostname

locality

Data Locality of Spark Jobs on YARN

To maximize the amount of locality your Spark jobs attain, you should use as many executors as possible, hopefully at least one executor per node. As with all methods of Alluxio deployment, there should also be an Alluxio worker on all computation odes.

When a Spark job is run on YARN, Spark launches its executors without taking data locality into account. Spark will then correctly take data locality into account when deciding how to distribute tasks to its executors. For example, if host1 contains blockA and a job using blockA is launched on the YARN cluster with --num-executors=1, Spark might place the only executor on host2 and have poor locality. However, if --num-executors=2 and executors are started on host1 and host2, Spark will be smart enough to prioritize placing the job on host1.

Class alluxio.hadoop.FileSystem not found Issues with SparkSQL and Hive MetaStore

To run the spark-shell with the Alluxio client, the Alluxio client jar will have to be added to the classpath of the Spark driver and Spark executors, as described earlier. However, sometimes SparkSQL may fail to save tables to the Hive MetaStore (location in Alluxio), with an error message similar to the following:

  1. org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found)

The recommended solution is to configure spark.sql.hive.metastore.sharedPrefixes. In Spark 1.4.0 and later, Spark uses an isolated classloader to load java classes for accessing the Hive MetaStore. However, the isolated classloader ignores certain packages and allows the main classloader to load “shared” classes (the Hadoop HDFS client is one of these “shared” classes). The Alluxio client should also be loaded by the main classloader, and you can append the alluxio package to the configuration parameter spark.sql.hive.metastore.sharedPrefixes to inform Spark to load Alluxio with the main classloader. For example, the parameter may be set in spark/conf/spark-defaults.conf:

  1. spark.sql.hive.metastore.sharedPrefixes=com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,alluxio

java.io.IOException: No FileSystem for scheme: alluxio Issue with Spark on YARN

If you use Spark on YARN with Alluxio and run into the exception java.io.IOException: No FileSystem for scheme: alluxio, please add the following content to ${SPARK_HOME}/conf/core-site.xml:

  1. <configuration>
  2. <property>
  3. <name>fs.alluxio.impl</name>
  4. <value>alluxio.hadoop.FileSystem</value>
  5. </property>
  6. </configuration>