Today, real-time computation platform is becoming increasingly important in many organizations. In this article, we will describe how ctrip.com applies Alluxio to accelerate the Spark SQL real-time jobs and maintain the jobs’ consistency during the downtime of our internal data lake (HDFS). In addition, we leverage Alluxio as a caching layer to dramatically reduce the workload pressure on our HDFS NameNode.
Background and Architecture
Ctrip.com (NASDAQ:CTRP) is the largest online travel booking website in China. It provides online travel services including hotel reservations, transportation ticketing, packaged tours, with hundreds of millions of online visits every day. Driven by the high demand, a massive amount of data is stored in big data platforms in different formats. Handling nearly 300,000 offline and real-time analytics jobs every day, our main Hadoop cluster is at the scale of a thousand servers, with more than 50PB of data stored and increasing by 400TB daily.
To efficiently enable data access across different analytics jobs and also data injection in real-time, we deployed HDFS as a single data lake as illustrated in the diagram above. There are different data sources including user logs and orders that insert data into HDFS either written directly or processed first by Spark Streaming before written to HDFS.
Some of our streaming jobs also write their results to HDFS for future computation (e.g., to be joint with historical data). Based on the business requirement, different analytics engines may access HDFS: Spark SQL and Kylin are typically for the OLAP queries, Hive and Spark SQL for both ETL jobs, and Presto is for Ad Hoc query business.
Challenges to Support Streaming Jobs in Main Cluster
This architecture met most of our business requirements but as the cluster size and types of business operations increased, we experienced several challenges. Particularly, many important and time-sensitive jobs (e.g., real-time recommendation systems) have a heavy dependency on a stable HDFS service. We want to address several issues related to HDFS service on streaming jobs:
- Too many small files on HDFS Namenode: Spark Streaming jobs are generating a large number of small files without merging small files. With a typical Streaming’s batch time of 10 seconds, the number of files inserted to HDFS by one streaming job using Append can be as many as 8,640 per day. Without merging the files using Repartition, the number of files will reach Partition*8640. Note that, we have nearly 400 Streaming jobs which create 5 million files on a daily basis. However, HDFS NameNode storing and serving all of our filesystem metadata (from files to blocks information) is not designed for a large number of small files. Even we run compaction jobs merging small files jobs every day, the number of files in our cluster still exceeding 640 million and put a lot of pressure on Namenode.
- Job Failures due to HDFS maintenance: To increase the service capacity of Namenode, we have applied a lot of source-code level optimizations in the Namenode. However, we still need to schedule regular downtime maintenance on the Namenode. Because Spark Streaming jobs are writing to HDFS, and a large number of Streaming jobs would fail when HDFS is down for maintenance.
- Resource contention between Streaming and ETL Jobs: Spark Streaming jobs consistently use thousands of VCores, leading to an impact on peak-hour ETL jobs. Meanwhile, if there is a Streaming error during peak times, job retry may cause a situation where resources are not allocated for a long time.
Dedicated Satellite Cluster for Streaming Jobs
To address these issues, we set up an independent Hadoop cluster for Spark Streaming, including independent HDFS, YARN, and other components as illustrated below.
Although the above issues can be addressed to some extent, this architecture brings some new problems: data access from the jobs in the primary cluster to the real-time cluster is slow and inconvenient. Users send the data by running DistCp to the primary cluster before the data analysis. Rather than keep using DistCp to transfer data across clusters, we started to investigate Alluxio to remove this ETL process.
Alluxio, as the open-source data orchestration system, provides efficient data read and write performance and unified memory-speed data access APIs across different storage systems at the same time. Alluxio can support mounting almost all of today’s mainstream distributed storage systems including HDFS, S3, etc into Alluxio’s namespace with simple command or configurations.
To solve the problem of data cross-cluster sharing, we introduced and deployed Alluxio. Particularly, we mounted two HDFS clusters, enabling to read from two clusters from a single entrance through Alluxio; whereas the data access, transfer and caching was fully implemented by Alluxio and transparent to the users. The new architecture is shown in the following diagram.
As shown from the above figure, Spark Streaming jobs are writing directly to Alluxio which is backed by two mounts points connecting to both main cluster (HDFS1) and satellite cluster (HDFS2) under two paths respectively. This can be simply achieved by the following commands:
$ alluxio fs mount /path/on/alluxio hdfs://namenode:port/path/on/hdfs
The HDFS-2 satellite cluster is now dedicated to storing data for streaming jobs. After data arrives from Kafka, it will be consumed and processed by Spark Streaming, and then written directly to an Alluxio path backed by HDFS-2 cluster. Depending on the importance of data, one can choose to write intermediate results to Alluxio only using Alluxio write type MUST-CACHE or write important data synchronously to HDFS using CACHE_THROUGH.
After using Alluxio, we found the RPC pressure on HDFS NameNode is also reduced. For some hotspot data and multiple-use data, we load that part of the data into Alluxio through a scheduled job, speeding up the computing engine to load the data on the one hand, and reducing the number of data access requests to NameNode on the other.
In addition, Alluxio provides commands to set TTL (Time To Live) on a path. After TTL expires since the creation, Alluxio will trigger a possible TTL option for this path to either “Free” or “Delete” the data. “Delete” mode removes the underlying files of the files together; whereas “Free” mode deletes only Alluxio and not the underlying file system. To reduce the memory pressure on Alluxio master, we require that all users must set TTL to the data in production so that Alluxio can automatically delete the expired data.
After moving streaming jobs in the satellite cluster and providing them data access through Alluxio in our production, we successfully isolated the resource contention and the impact of downtime due to primary cluster maintenance from the streaming jobs without running explicit ETL jobs. In addition, for Spark SQL jobs that load data from Alluxio memory, we see a 30% improvement in the execution time compared to the original online jobs that read data directly from HDFS.