In this blog, I will describe, how to enable the S3 magic committer on the Spark cluster(Running on AWS Elastic Kubernetes Service).
Prerequisites:
EKS Cluster on AWS
Spark cluster setup on EKS
First, I will describe the need of s3 magic committer. I will take my use case as an example. Reading data from Postgres tables(meta, data ), joining them, and writing on S3. Here is the code:
Meta table:
val metaTable = s"(select * from analysis_meta where id >= 1220572171319017472 and id < 1620572171319017473) as meta"
val metaDf =
spark
.read
.format("jdbc")
.option("numPartitions" , "2000")
.option("lowerBound", "1220572171319017472")
.option("upperBound", "1620572171319017473")
.option("partitionColumn", "id")
.option("dbtable", metaTable)
.option("driver" , "org.postgresql.Driver")
.option("url", "jdbc:postgresql://AWS-RDS-URL.amazonaws.com/analysis_db?user=****&password=****")
.load()
Spark requires these parameters to partition the data of the table:
numPartitions
lowerBound
upperBound
partitionColumn
These parameters are mandatory otherwise spark will read all the data into a single partition.
Data table:
val textTable = s"(select id , text from analysis_data where id >= 1220572171319017472 and id < 1620572171319017473) as data"
val textDf =
spark
.read
.format("jdbc")
.option("numPartitions", "2000")
.option("lowerBound", "1220572171319017472")
.option("upperBound", "1620572171319017473")
.option("partitionColumn", "id")
.option("dbtable", textTable)
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://AWS-RDS-URL.amazonaws.com/analysis_db?user=****&password=****")
.load()
Join the meta and data table then write on S3:
val df = metaDf.join(textDf, "id")
df
.write
.partitionBy("lang", "month") // partition by language and month
.mode(SaveMode.Append)
.parquet(s"s3a://bucket-name/analysis-data")
let’s run this job. Both tables have 80M rows(~ 400GB). It will take 1 hour to complete.
DAG:
Stage:
Job:
My Spark cluster size is 8 * m5.2xlarge nodes.
If you notice in the jobs details screenshot, the Job took 1 hour to complete but data is not committed into the folder. It is still in _temporary folder.
To commit all the data spark took more than 12 hours.
That was surprising but It is not a Spark problem. Spark is warning about that in the log:
11549:23/06/30 09:26:40 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
“Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.”
Let’s switch to the magic committer. It requires just a configuration change:
If you noticed, I used taints for resource isolation so no other pods can run the druid cluster node group.
Click on next and add the details of nodes (I am using 5 m5.large nodes and 256 GB disk):
Click on Next and selects the subnets:
Click on next and review the details:
Click on Create.
It will take a few seconds to create a node group:
Refresh the page:
The node group is ready.
Let’s validate the nodes:
$ kubectl get nodes -l TYPE=DRUID
NAME STATUS ROLES AGE VERSION
ip-172-31-16-51.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-3-242.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-35-102.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-67-191.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-90-74.ec2.internal Ready 22m v1.26.4-eks-0a21954
Create a namespace for the druid cluster:
$ kubect create namespace druid
namespace/druid created
Druid required the zookeeper cluster so first, we need to install the zookeeper cluster. I am using bitnami helm charts to create the zookeeper cluster:
$ helm install zookeeper -n druid -f zookeeper/values.yaml oci://registry-1.docker.io/bitnamicharts/zookeeper
NAME: zookeeper
LAST DEPLOYED: Sun Jun 18 13:39:20 2023
NAMESPACE: druid
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 11.4.2
APP VERSION: 3.8.1
** Please be patient while the chart is being deployed **
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.druid.svc.cluster.local
To connect to your ZooKeeper server run the following commands:
export POD_NAME=$(kubectl get pods --namespace druid -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh
To connect to your ZooKeeper server from outside the cluster execute the following commands:
kubectl port-forward --namespace druid svc/zookeeper 2181:2181 &
zkCli.sh 127.0.0.1:2181
Let’s quickly check the zookeeper cluster:
$ kubectl get pods -n druid
NAME READY STATUS RESTARTS AGE
zookeeper-0 1/1 Running 0 4m32s
zookeeper-1 1/1 Running 0 4m32s
zookeeper-2 1/1 Running 0 4m32s
The zookeeper cluster is up and running.
Let’s start on Druid. I am using the Druid operator to install the Druid cluster.
First, install the druid operator using helm(clone the code from here to get the chart):
$ kubectl create namespace druid-operator
namespace/druid-operator created
$ helm -n druid-operator install cluster-druid-operator -f druid-operator/chart/values.yaml ./druid-operator/chart
NAME: cluster-druid-operator
LAST DEPLOYED: Sun Jun 18 13:52:03 2023
NAMESPACE: druid-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Refer to https://github.com/druid-io/druid-operator/blob/master/docs/README.md to get started.
let’s verify the operator pods;
$ kubectl get pods -n druid-operator NAME READY STATUS RESTARTS AGE cluster-druid-operator-d68649447-qjmlv 1/1 Running 0 90s
Druid services:
Druid has several types of services:
Coordinator service manages data availability on the cluster.
Overlord service controls the assignment of data ingestion workloads.
Broker handles queries from external clients.
Router services are optional; they route requests to Brokers, Coordinators, and Overlords.
Historical services store queryable data.
MiddleManager services ingest data.
For this deployment, I will deploy Coordinator and Overlord as one service. Here is the config to deploy Coordinator and Overlord as one service: