This blog was originally published in Razorpay Engineering Blog: https://engineering.razorpay.com/how-trino-and-alluxio-power-analytics-at-razorpay-803d3386daaf
Razorpay is a large fintech company in India. Razorpay provides a payment solution that offers a fast, affordable, and secure way to accept and disburse payments online. On the engineering side, the availability and scalability of analytics infrastructure are crucial to providing seamless experiences to internal and external users. In this blog, Tanmay Krishna and Utkarsh Saxena share insights on how Razorpay’s data platform team built a query layer using Trino and Alluxio.
1. Overview
We make data-driven decisions at Razorpay to provide seamless experiences to our customers. It was crucial to build a scalable infrastructure to support our rapid growth.
A third-party vendor managed our data engineering and analytics infrastructure until mid-2021. During these times, we built data pipelines using Spark and Presto to meet our increasing business needs for data analytics. But as we continued to scale up, we encountered severe reliability issues with the platform, which led to frequent SLA breaches. We migrated our Apache Spark workloads to Databricks and have built a self-managed Analytics platform based on Trino and Alluxio on Kubernetes.
This article is about our journey from building the platform to tackling availability and scalability issues.
2. Trino@Razorpay
We chose Trino because it is fast, scalable, and integrates well within our ecosystem. We use Apache Spark and Hudi to replicate data from OLTP databases and ingest events emitted by microservices in our AWS S3 data lake while using Hive Metastore to store table definitions and metadata. Trino made it easy to query these raw tables and derive insights from them as our users were already comfortable with SQL.
During our evaluations of PrestoDB and Trino, we found Trino to have much better Container + Kubernetes support and an active community.
Overview of Data Lake Architecture
2.1 Use Cases
Based on query characteristics and required SLOs, we categorized Trino workload into three groups:
- BI Dashboards and Reports: These queries are triggered from the user-facing dashboards, scheduled reports, and alerts on Looker.
- ELT Schedules: As Trino offers a SQL interface, it is also used for creating and updating fact tables containing aggregated and/or denormalized data. These queries are scheduled to run via Airflow.
- Ad Hoc Analysis: Ad-hoc workload consists of queries used for exploring datasets, serving urgent data requests, testing ELT queries, etc.
We set up separate clusters for each use case as this helped us provide better SLAs for BI and scheduled queries while improving our resource utilization because of predictable workloads.
2.2 Setup
Trino supports Disaggregated Storage architecture, which allows both Coordinator and Worker components to be stateless and well suited for container-based deployments. Hence, it was a natural choice for us to deploy Trino on Kubernetes, allowing us to use our existing CI/CD and infrastructure services without any overhead.
We adopted the community helm chart, which defines both Coordinator and Worker components as Deployments. We made changes specific to our setup, including exporting metrics to Prometheus and additional config maps for resource groups and catalogs. It also makes it easier to manage/deploy multiple clusters. We also configured node affinity/anti-affinity with cluster autoscaler-enabled, so every worker is scheduled on a separate machine, eliminating resource contention issues among Trino workers.
2.3 Monitoring
All metrics emitted from both coordinator and worker are scraped and pushed to Prometheus and visualized via Grafana dashboards. These help us to monitor cluster resources such as CPU, Heap usages proactively, and also KPIs like Running/Queued Queries, P95 Query Latency, Query Failures, etc.
Apart from these server metrics we also capture query level info such as Query ID, failure reason, and username along with relevant query statistics such as time spent in each state, input & output number of rows/bytes, memory & CPU used by a query to help us figure out costly queries which users can optimize. We use a fork of Varada’s Query Analyzer to collect these stats from all trino clusters along with relevant metadata such as Airflow DAG ID or Looker History ID.
3. Alluxio as a Cache Layer
Our first major challenge was with the Storage layer. Fetching file metadata from S3 is slow when a table has many files, causing additional overhead on Trino Coordinator. Most of our workload is read-oriented, which results in the same set of files being accessed multiple times across different clusters. This repeated file access was contributing significantly to S3 access cost.
To alleviate these, we introduced Alluxio as a caching layer between S3 and Trino satisfying the following major requirements:
- Shared Cache across multiple Trino clusters
- Cache both file data as well as file metadata
- Support for block-level access APIs
- It plays well with the containerized environment and supports deployment on Kubernetes
3.1 Setup
The recommended way of deploying Alluxio with Trino is to co-locate their Worker processes on the same node to achieve optimal read performance. Instead, we decided to set up a remote Alluxio cluster as we wanted to share the cache across different Trino clusters.
- By not co-locating Alluxio and Trino processes, it became easier to configure different scaling policies for different Trino clusters without worrying about the cache hit rate.
- Another critical point is that we typically use AWS spot instances for Trino clusters, so node losses are expected. However, this would be detrimental to cache hit rates if we co-locate both the processes on the same node.
In our setup, we deploy Alluxio processes as a StatefulSet in Kubernetes. For better availability, we use a multi-master deployment with an embedded journal. For simple integration with Trino, we use Alluxio’s Catalog service as it makes it easier to access Tables in our Data lake and configure caching strategies at the metadata level instead of the storage.
3.2 Metadata Sync
Our ingestion pipelines write data directly to S3, outside of Alluxio. To maintain data freshness in Alluxio, we synchronize metadata at both schema and file levels periodically. Using the sync command, we have set up a Kubernetes CronJob to synchronize metadata between Hive Metastore and the Alluxio Catalog service. File-level metadata is synchronized based on access patterns and cache expiry policies. This varies across tables based on their ingestion lag.
With Alluxio as a cache layer, we achieved significant improvements in our query runtimes. The following graph shows the improvement in our p75, p90, and p95 query runtime performance. Beyond the time of this writing, we have made further improvements on alluxio based on Kubernetes service accounts that further give us improvements. We will document our findings in another blog.
4. Improving Operability
4.1 Availability
As we were ramping up the production workload on our platform, we found that we were prone to single-point failures and performance degradation at scale as we were running a single Trino cluster for each use case.
Disruption in Trino cluster availability causes ETL schedule failures, leading to stale data. It also disrupts monitoring and trend analysis dashboards on BI tools. Overall, it impacts the productivity of business users.
Another problem was increasing operational costs. A typical cost optimization technique for AWS cloud deployments is to use Spot instances and spread them across multiple Availability Zones (AZ) or regions to improve their availability. But this was an expensive strategy due to huge costs incurred due to inter Availability Zone network transfer. The primary reason was data transfer among Trino workers.
A solution was to isolate Trino clusters among Availability zones and have multiple of them across different AZs for the same workload.
- We deployed various Trino clusters in different AZs for critical use-cases and distributed traffic across them using a load balancer.
- We segregated Alluxio clusters for each AZ zone to avoid high network transfer costs and provide isolation and better availability than a centralized Alluxio cluster.
For distributing traffic among Trino clusters, we explored Trino/Presto Gateway by Lyft and Falarica. With Falarica’s Presto Gateway, the challenge was setting it up and scope for future extensions, as it is a cluster manager. Its setup is tightly coupled with PrestoDB using plugins, so setting it up required extending its implementation to use Trino SPIs.
Lyft’s Presto Gateway was limited regarding available routing strategies. Its design was around creating routing policies on resource groups.
Our setup required a different set of routing Policies based on Kubernetes ingress and custom parameters passed from clients. Additionally, we required time-based Trino cluster traffic routing and more thorough health checks. We decided to build our own Trino-Gateway.
4.2 Introducing Razorpay’s Trino Gateway
Trino-Gateway is a Golang-based microservice that accepts Trino queries and routes them to eligible backend clusters. It comprises 3 components:
- Gateway Server: Admin interface for CRUD of backend trino clusters, groups, and policies. Also exposes a GUI to check query history.
- Monitor: Periodically monitors cluster health and load and modifies routing policies if necessary.
- Router: Serves client traffic.
Currently, the gateway supports the following routing strategies out-of-the-box:
- Least Load: Select the least loaded cluster with the load being a function of cluster metrics.
- Round robin distribution: Round-robin strategy with the goal being uniform traffic distribution.
It also supports configuring “cron expressions” to provide based traffic enable/disable for Trino clusters. A SQL query can also be configured to run as health checks, which can monitor whether critical connectors of the cluster are in a healthy state or not.
We plan to develop more routing strategies and User Identity Management.
More information about implementation, design and features are available in the project ReadMe and its source code. The project is available publicly, and we welcome contributions.
4.3 Autoscaling
We could achieve consistent query performance with high reliability and deploy schedule-based scaling policies. However, we still had concerns.
- The cluster ran at full capacity even during less load, which resulted in wasted resources and high infrastructure costs.
- The platform team had to manually scale up resources during high traffic scenarios as the cluster could not adjust itself.
Implementing an auto-scaling mechanism for Trino Workers had a few challenges:
- Query failures during worker shutdown: When a worker is terminated/lost for any reason, all the queries with at least one task scheduled on that worker will fail with INTERNAL_ERROR.
- Workload characteristics: Each workload has its characteristics (resource usage, traffic, read vs write, SLAs), making it challenging to define a single autoscaling policy for all clusters.
- Computing ideal cluster size: Trino is designed to utilize all available resources. Even a few queries can consume all the cluster resources for a brief period. In theory, we could not define whether we need X workers for N queries. We can only try to ensure our SLAs are intact for any N.
To overcome these challenges, we designed a mechanism that can be used for varying workload characteristics(defined by metrics), iteratively adjusting the cluster size while maintaining SLAs, and minimizing query failures while downscaling.
We tried to leverage HPA/KEDA to implement the autoscaling mechanism. Both HPA/KEDA have out-of-the-box support for scaling Kubernetes resources based on resource utilization or custom metrics(Prometheus).
While HPA scales from 1 to N, KEDA supports scaling from 0 to N, by scaling 0 to 1 by itself. While the scale-up worked using either of them, it was inefficient. We had to disable the scale-down because of the reactive nature of HPA. We decided not to use them for the following reasons:
- Too low target values would result in faster scaling up, leading to increased cost/inefficiency. Too high target values would result in faster downscaling leading to a bad user experience due to query failures/degradation. Because of its reactive nature, this leads to frequent thrashing.
- HPA lacks in-built support for configuring cool-down periods to scale events. Cool-down periods are hardcoded within HPA. We wanted to scale up/down only when the conditions were satisfied for a specific period and with a configurable time gap between successive events.
- KEDA only supports cooldown_period while scaling up/down from/to zero and creates an HPA resource to take care of scaling from 1 to N replicas.
We decided to build a custom mechanism by leveraging Alertmanager, which had built-in support for decision intervals, and cool-down periods and was already used internally at Razorpay.
The Autoscaler service consumes the alert webhook from Alertmanager and scales corresponding Trino worker deployment as specified in the alert labels.
Configuring an autoscaling rule involves defining the alert rule and scaling config for the trino cluster
namespaces:
trino-adhoc:
deployments:
trino-adhoc-worker:
minReplicas: 5
maxReplicas: 40
scaleUpBatchSize: 5
scaleDownBatchSize: 3
Autoscaler Config
- name: Trino Adhoc Scale Up
rules:
- alert: "[Trino][Autoscale] Scale up Trino Adhoc Cluster"
expr: avg(java_lang_OperatingSystem_ProcessCpuLoad{app="trino-adhoc", component="worker"})*100 > 70 or avg(sum(java_lang_Memory_HeapMemoryUsage_used{app="trino-adhoc", component="worker"}) by (kubernetes_pod_name)/sum(java_lang_Memory_HeapMemoryUsage_max{app="trino-adhoc", component="worker"}) by (kubernetes_pod_name)) > 0.75
for: 2m
labels:
action: scale-up
namespace: trino-adhoc
deployment: trino-adhoc-worker
service: trino-autoscaler-adhoc
annotations:
description: Scaling up workers in trino-adhoc
- name: Trino Adhoc Scale Down
rules:
- alert: "[Trino][Autoscale] Scale down Trino Adhoc Cluster"
expr: avg(java_lang_OperatingSystem_ProcessCpuLoad{app="trino-adhoc", component="worker"})*100 < 20
for: 10m
labels:
action: scale-down
namespace: trino-adhoc
deployment: trino-adhoc-worker
service: trino-autoscaler-adhoc
annotations:
description: Scaling down workers in trino-adhoc
Alert Rules
Scaling velocity is controlled by scaleUpBatchSize and scaleDownBatchSize in autoscaler_config along with for period in the alert-manager rules, which allows us to have more/less aggressive policies for different workloads/clusters. We can have more sophisticated rules in the future based on running/queued queries and tasks or other relevant metrics.
Today, we use a combination of the above-mentioned mechanism and scheduled scaling(to scale from 0 to 1 or vice-versa) to manage our clusters. This has worked well and helped us improve resource efficiency and user experience by eliminating manual intervention for scaling clusters.
5. Conclusion
A coherent view of our Platform around Trino
The platform serves 650 daily active users and ~100k queries/day at P90 & P95 query latencies of 60 secs and 130 secs respectively with a ~97% Success Rate.
We would like to thank the Trino and Alluxio communities for their active and transparent communications and for helping us with all our queries.
For more information about how to run Trino with Alluxio, please visit the documentation: https://docs.alluxio.io/os/user/stable/en/compute/Trino.html
About the Authors
Tanmay Krishna is a Software Engineer at Razorpay.
Utkarsh Saxena is a Senior Data Engineer at Razorpay.