<iframe src="//www.googletagmanager.com/ns.html?id=GTM-MXN9JJ" height="0" width="0" style="display:none;visibility:hidden">

The Smaato Blog

Spark on Docker on Amazon EC2: Only the Code Tells You Everything

Home » Blog » Spark on Docker on Amazon EC2: Only the Code Tells You Everything
Posted by Dr. Stefan Shadwinkel on November 13, 2015
Spark on Docker on Amazon EC2

Our global real-time advertising platform processes vast amounts of data per second. Therefore managing, supporting, and enhancing all its tools and processes with data-driven solutions is crucial to our success.

Developing these solution requires a flexible setup that can also be easily scaled to allow testing on reasonable data sizes. One part in our current setup is to run Apache Spark on Docker on Amazon EC2 instances.

Using straight EC2 instances instead of EMR has the benefits of lower costs and being able to directly run the latest version or development builds of Spark.

In this blog post, we will look into the peculiarities of configuring Spark on Docker on EC2 and dive into some Spark code excerpts to understand Spark's behavior.

Docker Networking Options

Docker introduced better multi-host networking in version 1.7.0, yet this remained an experimental feature until the very recent final 1.9.0 release.

So far, we were mainly concerned with running very flexible, performant Spark clusters, but in the future we will focus on integrating our Spark clusters with more general cluster managers like Mesos and on running Spark on Amazon ECS.

For us, the most simple and most performant way to run Spark on Docker was to use host networking, as we encountered significant performance issues with bridged networking and the newer options were not yet available when we started out.

General Setup

In general, we follow the same setup as outlined in the documentation of this docker image: https://hub.docker.com/r/epahomov/docker-spark/ 

The process outlined there uses the following steps:

  • Set fixed custom ports for each Akka actor created by Spark.
  • Expose them directly using Docker host networking.
  • Use the org.apache.spark.broadcast.HttpBroadcastFactory to mitigate the need for random ports.
Deal with /etc/hosts to find the correct local ip address.

 The Devil in Details

The main issues encountered when running Spark with this approach on Amazon EC2 are:

  • Using /etc/hosts is not as easy, as it depends on the used Linux (we run a stripped down debian instead of ubuntu) and gets more complicated due to Amazon's internal vs. external ip addresses and our need to consistently use appropriate hostnames that we manage ourselves.
  • When setting SPARK_MASTER_OPTS and SPARK_WORKER_OPTS to 

"-Dspark.driver.port=7001 -Dspark.fileserver.port=7002 -Dspark.broadcast.port=7003 -Dspark.replClassServer.port=7004 -Dspark.blockManager.port=7005 -Dspark.executor.port=7006 -Dspark.ui.port=4040 -Dspark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory"

one achieves that Spark worker nodes connect and successfully register with the Spark master, but launching Spark jobs does fail due to connectivity issues involving the Spark executors on the worker nodes.

Setting the Correct Hostname

Point one above is addressed by "just" setting the correct hostname. You would imagine this to be easy. But hang on.

It is possible to set the ip or hostname that is used by all the Spark daemons using command line parameters. For master and worker, the parameters are --ip or --host, while for the backend daemons like the executor, the --hostname parameter must be used.

There is a caveat, however: no such parameter exists for spark-submit. But we want to be able to run spark-submit through Docker, not needing a local Spark installation with a concrete cluster configuration.

To do so, the Docker container running spark-submit needs to run and advertise an org.apache.spark.HttpFileServer in order to provide the actual job jar to the main cluster.

To make the story short, HttpFileServer derives from HttpServer, which in turn uses Utils.localHostNameForURI() to determine the address to bind to and advertise. Utils.localHostNameForURI() then utilizes its own localIpAddress value to provide an answer.

Starting from Line 737 we see the following code in org.apache.spark.util.Utils:

private lazy val localIpAddress: InetAddress = findLocalInetAddress()

private def findLocalInetAddress(): InetAddress = {
 val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
 if (defaultIpOverride != null) {
   InetAddress.getByName(defaultIpOverride)
 } else {
   val address = InetAddress.getLocalHost
   if (address.isLoopbackAddress) {
     ...

 

Thus, the only way, and really the only way, to set the hostname for the HttpFileServer used by spark-submit is to set the SPARK_LOCAL_IP environment variable. Luckily, this can be done when invoking spark-submit through Docker:

docker run ... \
--env SPARK_LOCAL_IP=$DOCKER_HOSTNAME \
...

 

Yet, after doing so, this environment variable is not available within the JVM launched by the spark-submit script. It uses the configured Spark environment from $SPARK_HOME/conf/spark-env.sh only.

As we don't want to change the docker image all the time, we use a wrapper script that forwards the environment set by docker into the $SPARK_HOME/conf/spark-env.sh script and launches the respective Spark script:

docker-run-spark-env.sh

#!/bin/sh

env | grep SPARK | awk '{print "export \"" $0 "\""}' > /opt/spark/conf/spark-env.sh

exec $@


 

With that script included in the docker image, we can now launch arbitrary jobs through docker by:

docker run \
 -P \
 --net=host \
 --env SPARK_LOCAL_IP=$DOCKER_HOSTNAME \
 ... # more parameters \
 $DOCKER_IMAGE \
 docker-run-spark-env.sh spark-submit --class com.smaato.MySparkApp

JVM options for all the Spark parts

Having the above setup in place is a big step forward, as we can now ship our jars to the Spark cluster by only using docker containers.

Yet, the job will still fail with connectivity problems in regards to the spawned executors.

Having a look at org.apache.spark.launcher.SparkClassCommandBuilder reveals why:

if (className.equals("org.apache.spark.deploy.master.Master")) {
 javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
 javaOptsKeys.add("SPARK_MASTER_OPTS");
 memKey = "SPARK_DAEMON_MEMORY";
} else if (className.equals("org.apache.spark.deploy.worker.Worker")) {
 javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
 javaOptsKeys.add("SPARK_WORKER_OPTS");
 memKey = "SPARK_DAEMON_MEMORY";
} else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) {
 javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
 javaOptsKeys.add("SPARK_HISTORY_OPTS");
 memKey = "SPARK_DAEMON_MEMORY";
} else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) {
 javaOptsKeys.add("SPARK_JAVA_OPTS");
 javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
 memKey = "SPARK_EXECUTOR_MEMORY";
} else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
 javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
 memKey = "SPARK_EXECUTOR_MEMORY";
} else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") ||
   className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) {
 javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
 javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
 memKey = "SPARK_DAEMON_MEMORY";
} else if (className.startsWith("org.apache.spark.tools.")) {
 ...

 

As initially described, the mentioned docker image sets all the ports to be used by Spark through the SPARK_MASTER_OPTS and SPARK_WORKER_OPTS environment variables.

But those are not used by the launched executors which only look at the SPARK_JAVA_OPTS and SPARK_EXECUTOR_OPTS variables.

Therefore, the workers launch, register with the master, and show up in the web ui just fine. But once you launch jobs, the started executors don't find their way back and just exit with connectivity problems.

Summing it all up

The final way to successfully run Spark on Docker on EC2 is thus using a command like the one below for all the parts (master, worker, spark-submit, spark-shell, spark-sql, etc.):

export SPECIAL_SPARK_OPTS="-Dspark.driver.port=7001 -Dspark.fileserver.port=7002 -Dspark.broadcast.port=7003 -Dspark.replClassServer.port=7004 -Dspark.blockManager.port=7005 -Dspark.executor.port=7006 -Dspark.ui.port=4040 -Dspark.broadcast.factory=org.apache.spark.broadcast.HttpBroadcastFactory"

docker run \
 --name $SPARK_CONTAINER_NAME \
 -d \
 -P \
 --net=host \
 --env SPARK_MASTER_PORT=7077 \
 --env SPARK_MASTER_WEBUI_PORT=8080 \
 --env SPARK_WORKER_PORT=8888 \
 --env SPARK_WORKER_WEBUI_PORT=8081 \
 --env SPARK_MASTER_OPTS="$SPECIAL_SPARK_OPTS" \
 --env SPARK_WORKER_OPTS="$SPECIAL_SPARK_OPTS" \
 --env SPARK_JAVA_OPTS="$SPECIAL_SPARK_OPTS" \
 --env SPARK_MASTER_IP=$DOCKER_HOSTNAME \
 --env SPARK_LOCAL_IP=$DOCKER_HOSTNAME \
 --env SPARK_LOCAL_HOSTNAME=$DOCKER_HOSTNAME \
 $DOCKER_IMAGE docker-run-spark-env.sh $SPARK_COMMAND

Conclusion

On one hand, the described method works great and provides a lot of flexibility: just create a docker image based on any arbitrary Spark build, add the docker-run-spark-env.sh script, launch a bunch of EC2 instances, add DNS entries for those and run all the Spark parts using the described command.

On the other hand, the levels of indirection introduced by Docker and EC2 add more complexity and make it still a bit cumbersome. Furthermore, it did take quite a while to dig through all the logs and Spark code to figure out which parts couldn't connect to each other and why.

To gain even more flexibility and increase resource utilization, we will further develop our Docker Spark setup. We are evaluating Docker Swarm, Kubernetes, Mesos with Marathon, and Amazon ECS to see which tools fit best with our environment, processes, and workloads. All the lessons learned from digging through the Spark code are very valuable in setting up those environments as well.

We will report on our experiences with these technologies in future blog posts and hope you gained more insight and understanding of the Spark components from reading this blog post. As it is with many technologies in the big data space, reading source code is still essential to fully utilize these tools to their potential.

Please leave a comment if you have any questions or insights to share.

Written by Dr. Stefan Shadwinkel

Stefan works for Smaato's development team as a Senior Big Data Developer. He has extensive experience in big data analytics as well as in the field of fraud prevention.

Want the latest in mobile advertising monetization strategies & ideas in your inbox? Subscribe today!