Alluxio on EMR: Fast Storage Access and Sharing for Spark Jobs

This is a guest blog by Chengzhi Zhao with an original blog source.

Traditionally, if you want to run a single Spark job on EMR, you might follow the steps: launching a cluster, running the job which reads data from storage layer like S3, performing transformations within RDD/Dataframe/Dataset, finally, sending the result back to S3. You end up having something like this:

Spark Job on EMR

If we add more Spark jobs across multiple clusters, you could have something like this.

Multiple Spark jobs

There can be more use cases, for example, sometimes we need to store the intermediate result of Spark job A, and used the intermediate result as input for Spark job B; sometimes you would have multiple Spark jobs read data from the same dataset multiple times. As for now, each of the Spark jobs have to read input data from disk then process it. What if we can read input from memory? Alluxio is a solution for it.

Alluxio is the storage underneath that usually collocates with the computation frameworks, so that Alluxio can provide fast storage, facilitating data sharing and locality between jobs, regardless of whether they are running on the same computation engine.

I will show you how to set up Alluxio 1.8.1 on EMR 5.17 with a bootstrap script and compare the data loading performance.

Some more detail is described in the following repository: https://github.com/ChengzhiZhao/Alluxio-EMR-bootstrap

STEP 1: set up a bootstrap script

Use bootstrap.sh script includes download Alluxio 1.8.1 and setup required permissions on EMR

STEP 2: add the required configuration.

[  {    "Classification": "core-site",    "Properties": {      "fs.alluxio.impl": "alluxio.hadoop.FileSystem",      "fs.AbstractFileSystem.alluxio.impl": "alluxio.hadoop.AlluxioFileSystem"    }  },  {    "Classification": "spark-defaults",    "Properties": {          "spark.driver.extraClassPath": ":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/opt/alluxio-1.8.1-hadoop-2.8/client/alluxio-1.8.1-client.jar",          "spark.executor.extraClassPath": ":/opt/alluxio-1.8.1-hadoop-2.8/client/alluxio-1.8.1-client.jar"     }  }]

STEP 3: launch the cluster, take a break and relax ☕️

STEP 4: ssh to EMR cluster and perform copy test data to make sure Alluxio is running

cd /opt/alluxio-1.8.1-hadoop-2.8
# copy files to alluxio
bin/alluxio fs copyFromLocal LICENSE /Input
# Copied file:///opt/alluxio-1.8.1-hadoop-2.8/LICENSE to /Input
# verify files by listing alluxio
bin/alluxio fs ls /
# 26847   NOT_PERSISTED 02-18-2019 19:22:28:025 100% /Input

You can also check Alluxio UI by going to {Master public DNS}:19999

Click “Browse” and you should see Input file there with default 128 MB as block size.

STEP 5: Use Spark shell to read the “Input” file from Alluxio

#launch spark-shell
spark-shell
#read data from alluxio and use the DNS -- in this case, ip-10-192-4-226.ec2.internal
val s = sc.textFile("alluxio://ip-10-192-4-226.ec2.internal:19998/Input")
#s: org.apache.spark.rdd.RDD[String] = alluxio://ip-10-192-4-226.ec2.internal:19998/Input MapPartitionsRDD[1] at textFile at <console>:24
s.count
res0: Long = 476

STEP 6: Read and compare as a real example

Note: You can use any test_data_set with Spark, I just pick a test Avro file online.

# read test_data_set
val df=spark.read.format("com.databricks.spark.avro").load("s3://test_bucket/test_data_set/date=2019-04-18")
spark.time(df.count)
#Time taken: 29609 ms
#res0: Long = 86731200
# write df to alluxio
spark.time(df.write.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set"))
#Time taken: 45775 ms
# first execution
spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count)
#Time taken: 13477 ms
#res3: Long = 86731200
# second execution
spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count)
#Time taken: 372 ms
#res4: Long = 86731200
# restart with another spark-shell
spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count)
#Time taken: 9606 ms
#res0: Long = 86731200

As you can see, the second execution is faster, it is similar as we perform cache. But if we close and open another spark-shell, since we read data from Alluxio and data is kept in memory, it would be faster. If you are interested, please refer to this great talk — Best Practices for Using Alluxio with Apache Spark which talks benchmark for Alluxio compared with Spark.

Performance Comparison (Count job with reading from Disk vs. Alluxio)

pros: 1) faster speed to read data as df than from S3
cons: 1) Alluxio persist data first in memory to achieve speed, so spark job could have less memory to run jobs. 2) there are overhead time to write df to Allluxio first

What’s not been covered is Alluxio is also a great storage sharing layer for multiple jobs read and write data. You could have a Flink job that writes to Alluxio and later used by Spark, there are more interesting topics on it. This post focus on how to setup Alluxio 1.8.1 on EMR and run simple test data on it.