Apache Beam Spark Pipeline Engine

Beam Spark

The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark version 3.1.

The Spark Runner executes Beam pipelines on top of Apache Spark, providing:

  • Batch and streaming (and combined) pipelines.

  • The same fault-tolerance guarantees as provided by RDDs and DStreams.

  • The same security features Spark provides.

  • Built-in metrics reporting using Spark’s metrics system, which reports Beam Aggregators as well.

  • Native support for Beam side-inputs via spark’s Broadcast variables

Check the Apache Beam Spark runner docs for more information.

Options

OptionDescriptionDefault

The Spark master

The url of the Spark Master. This is the equivalent of setting SparkConf#setMaster(String) and can either be local[x] to run local with x cores, spark://host:port to connect to a Spark Standalone cluster, mesos://host:port to connect to a Mesos cluster, or yarn to connect to a yarn cluster.

local[4]

Streaming: batch interval (ms)

The StreamingContext’s batchDuration - setting Spark’s batch interval.

1000

Streaming: checkpoint directory

A checkpoint directory for streaming resilience, ignored in batch. For durability, a reliable filesystem such as HDFS/S3/GS is necessary.

local dir in /tmp

Streaming: checkpoint duration (ms)

Enable Metrics sink

A servlet within the existing Spark UI to serve metrics data as JSON data.

Streaming: maximum records per batch

The maximum records per batch interval.

Streaming: minimum read time (ms)

Mimimum elapsed read time.

Bundle size

The maximum number of elements in a bundle.

User agent

A user agent string as per RFC2616, describing the pipeline to external services.

Temp location

Path for temporary files.

Plugins to stage (, delimited)

Comma separated list of plugins.

Transform plugin classes

List of transform plugin classes.

XP plugin classes

List of extensions point plugins.

Streaming Hop transforms flush interval (ms)

The amount of time after which the internal buffer is sent completely over the network and emptied.

Hop streaming transforms buffer size

The internal buffer size to use.

Fat jar file location

Fat jar location.

Running from GUI or Hop Server

Due to conflicts in libraries which we hope to resolve in one of our future version it’s currently not possible to start Spark jobs directly from the Hop GUI or Hop Server. Spark submit command will however work as expected and is described further in this document.

If you want to run spark jobs directly from hop you can follow the seps in the following guide.
Apache Spark from GUI/Hop Server

Running remotely

Since execution of a pipeline on Spark is only possible from the Spark Master it is possible to start a Hop server on the master. Then you can remotely execute from anywhere on your Spark master of choice. Make sure that any referencable artifacts like the fat-jar you want to use is available to the Hop server and that you have configured your installation as described in the previous section.

Running with Spark Submit

You can also execute using the ‘spark-submit’ tool. There is a main class you can use:

  1. org.apache.hop.beam.run.MainBeam

It accepts 3 arguments:

ArgumentDescription

1

The filename of the pipeline to execute.

2

The filename of the metadata to load (JSON). You can export metadata in the Hop GUI under the tools menu (part of the Beam plugin in plugins/engines/beam )

3

The name of the pipeline run configuration to use

Spark-submit also needs a fat jar. This can be generated in the Hop GUI under the tools menu or using command:

  1. sh hop-config.sh -fj /path/to/fat.jar

Important : project configurations, environments and these things are not valid in the context of the Spark runtime. This is a TODO for the Hop community to think how we can do this best. Your input is welcome. In the meantime pass variables to the JVM with the option:

  1. --driver-java-options '-DPROJECT_HOME=/path/to/project-home'

In general, it is better not to use relative paths like ${Internal.Entry.Current.Folder} when specifying filenames when executing pipelines remotely. It’s usually better to pick a few root folders as variables. PROJECT_HOME is as good as any variable to use.

An example spark-submit command might look like this:

  1. spark-submit \
  2. --master spark://master-host:7077 \
  3. --class org.apache.hop.beam.run.MainBeam \
  4. --driver-java-options '-DPROJECT_HOME=/my/project/home' \
  5. hop-fat.jar \
  6. /my/project/home/pipeline.hpl \
  7. metadata-export.json \
  8. SparkRunConfig

Spark embedded

You can specify a master of local[4] to run using an embedded Spark engine. It’s primarily used for testing locally. The number 4 in the example is the desired number of threads to use when executing. You can also specify * to automatically figure that out for your system.

Please note that you can get an error like the following:

  1. Cannot assign requested address: Service 'sparkDriver' failed after 16 retries

In this case you can set system environment variable SPARK_LOCAL_IP to 127.0.0.1.

  1. export SPARK_LOCAL_IP="127.0.0.1"

Possible errors

If you see any of the following errors it means your installation was not adapted to support Apache Spark

java.lang.ClassNotFoundException: org.apache.log4j.spi.Filter

Or errors related to Scala, please follow the instructions here, if this does not help raise a jira ticket with as much info as possible.

When you receive a stack trace looking lik this, it usually means it can not find the spark master.

  1. Caused by: java.lang.NullPointerException
  2. at org.apache.spark.SparkContext.<init>(SparkContext.scala:640)
  3. at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
  4. at org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:101)
  5. at org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:67)
  6. at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:215)
  7. at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:243)