Spark On EKS: Enable S3 Magic committer

In this blog, I will describe, how to enable the S3 magic committer on the Spark cluster(Running on AWS Elastic Kubernetes Service).

Prerequisites:

  • EKS Cluster on AWS
  • Spark cluster setup on EKS

First, I will describe the need of s3 magic committer. I will take my use case as an example. Reading data from Postgres tables(meta, data ), joining them, and writing on S3. Here is the code:

Meta table:

val metaTable = s"(select * from analysis_meta where id >= 1220572171319017472 and id < 1620572171319017473) as meta"

val metaDf =
        spark
          .read
          .format("jdbc")
          .option("numPartitions" , "2000")
          .option("lowerBound",  "1220572171319017472")
          .option("upperBound", "1620572171319017473")
          .option("partitionColumn", "id")
          .option("dbtable", metaTable)
          .option("driver" , "org.postgresql.Driver")
          .option("url", "jdbc:postgresql://AWS-RDS-URL.amazonaws.com/analysis_db?user=****&password=****")
          .load()

Spark requires these parameters to partition the data of the table:

  • numPartitions
  • lowerBound
  • upperBound
  • partitionColumn

These parameters are mandatory otherwise spark will read all the data into a single partition.

Data table:

val textTable = s"(select id , text from analysis_data where id >= 1220572171319017472 and id < 1620572171319017473) as data"

val textDf =
      spark
        .read
        .format("jdbc")
        .option("numPartitions", "2000")
        .option("lowerBound", "1220572171319017472")
        .option("upperBound", "1620572171319017473")
        .option("partitionColumn", "id")
        .option("dbtable", textTable)
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://AWS-RDS-URL.amazonaws.com/analysis_db?user=****&password=****")
        .load()

Join the meta and data table then write on S3:

val df = metaDf.join(textDf, "id")

      df
        .write
        .partitionBy("lang", "month")  // partition by language and month
        .mode(SaveMode.Append)
        .parquet(s"s3a://bucket-name/analysis-data")

let’s run this job. Both tables have 80M rows(~ 400GB). It will take 1 hour to complete.

DAG:

Stage:

Job:

My Spark cluster size is 8 * m5.2xlarge nodes.

If you notice in the jobs details screenshot, the Job took 1 hour to complete but data is not committed into the folder. It is still in _temporary folder.

To commit all the data spark took more than 12 hours.

That was surprising but It is not a Spark problem. Spark is warning about that in the log:

11549:23/06/30 09:26:40 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.

Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.”

Let’s switch to the magic committer. It requires just a configuration change:

spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled=true

and Spark Hadoop cloud dependency. I uploaded this jar on S3 and included it in the job submission.

I am running the spark cluster on EKS so here full job submission:

kubectl exec -ti --namespace spark spark-cluster-worker-0 -- spark-submit --master spark://spark-cluster-master-svc:7077 \
	--conf 'spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem' \
    --conf 'spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled=true' \
    --conf 'spark.hadoop.fs.s3a.connection.maximum=1000' \
    --conf 'spark.driver.memory=8g' \
	--conf 'spark.executor.memory=20g' \
    --conf 'spark.executor.cores=6' \
    --conf 'spark.sql.shuffle.partitions=400' \
    --supervise \
	--deploy-mode cluster \
    --jars s3a://bucket-name/dependencies/spark-hadoop-cloud_2.12-3.3.2.jar \
	--class com.techmonad.main.Postgres2S3App \
      s3a://bucket-name/app/Postgres2S3App-30-06-2023_06_13.jar

The job took the same time but data is committed into a folder in 2 hours. Wow!!!!

here is the code.

References:

Druid cluster setup on AWS EKS

In this blog, I will describe setting up a druid cluster on AWS EKS.

Prerequisites:

  • EKS Cluster on AWS
  • S3 bucket for deep storage
  • Postgres RDS instance for meta storage

Let’s set up a separate node group on the EKS cluster.

Login into the AWS console and go to Elastic Kubernetes Service. Select the EKS cluster.

Click on compute and then Add Node Group:

Fill out the node group details:

If you noticed, I used taints for resource isolation so no other pods can run the druid cluster node group.

Click on next and add the details of nodes (I am using 5 m5.large nodes and 256 GB disk):

Click on Next and selects the subnets:

Click on next and review the details:

Click on Create.

It will take a few seconds to create a node group:

Refresh the page:

The node group is ready.

Let’s validate the nodes:

$ kubectl get nodes -l TYPE=DRUID
NAME STATUS ROLES AGE VERSION
ip-172-31-16-51.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-3-242.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-35-102.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-67-191.ec2.internal Ready 22m v1.26.4-eks-0a21954
ip-172-31-90-74.ec2.internal Ready 22m v1.26.4-eks-0a21954

Create a namespace for the druid cluster:

$ kubect create namespace druid
namespace/druid created

Druid required the zookeeper cluster so first, we need to install the zookeeper cluster. I am using bitnami helm charts to create the zookeeper cluster:

Let’s tweak the zookeeper configuration:

global:
  storageClass: "gp2"
fullnameOverride: "zookeeper"
replicaCount: 3
tolerations:
  - key: "DRUID"
    operator: "Exists"
    effect: "NoExecute"
nodeSelector:
  TYPE: "DRUID"
persistence:
  size: 8Gi
$ helm install zookeeper  -n druid  -f zookeeper/values.yaml   oci://registry-1.docker.io/bitnamicharts/zookeeper 
NAME: zookeeper
LAST DEPLOYED: Sun Jun 18 13:39:20 2023
NAMESPACE: druid
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 11.4.2
APP VERSION: 3.8.1

** Please be patient while the chart is being deployed **

ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:

    zookeeper.druid.svc.cluster.local

To connect to your ZooKeeper server run the following commands:

    export POD_NAME=$(kubectl get pods --namespace druid -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
    kubectl exec -it $POD_NAME -- zkCli.sh

To connect to your ZooKeeper server from outside the cluster execute the following commands:

    kubectl port-forward --namespace druid svc/zookeeper 2181:2181 &
    zkCli.sh 127.0.0.1:2181

Let’s quickly check the zookeeper cluster:

$ kubectl get pods -n druid
NAME READY STATUS RESTARTS AGE
zookeeper-0 1/1 Running 0 4m32s
zookeeper-1 1/1 Running 0 4m32s
zookeeper-2 1/1 Running 0 4m32s

The zookeeper cluster is up and running.

Let’s start on Druid. I am using the Druid operator to install the Druid cluster.

First, install the druid operator using helm(clone the code from here to get the chart):

$ kubectl create namespace druid-operator
namespace/druid-operator created

$ helm -n druid-operator install cluster-druid-operator -f druid-operator/chart/values.yaml  ./druid-operator/chart
NAME: cluster-druid-operator
LAST DEPLOYED: Sun Jun 18 13:52:03 2023
NAMESPACE: druid-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Refer to https://github.com/druid-io/druid-operator/blob/master/docs/README.md to get started.

let’s verify the operator pods;

$ kubectl get pods -n druid-operator
NAME READY STATUS RESTARTS AGE
cluster-druid-operator-d68649447-qjmlv 1/1 Running 0 90s

Druid services:

Druid has several types of services:

  • Coordinator service manages data availability on the cluster.
  • Overlord service controls the assignment of data ingestion workloads.
  • Broker handles queries from external clients.
  • Router services are optional; they route requests to Brokers, Coordinators, and Overlords.
  • Historical services store queryable data.
  • MiddleManager services ingest data.

For this deployment, I will deploy Coordinator and Overlord as one service. Here is the config to deploy Coordinator and Overlord as one service:

druid.coordinator.asOverlord.enabled=true

Cluster size: 2 brokers, 2 coordinators/overlords, 3 historicals, 3 middlemanagers and 2 routers.

Configured all these services into a file:

dev-cluster.yaml

apiVersion: "druid.apache.org/v1alpha1"
kind: "Druid"
metadata:
  name: cluster
spec:
  image: apache/druid:25.0.0
  startScript: /druid.sh
  podLabels:
    environment: DEV
    NAME: DRUID
  podAnnotations:
    NAME: DRUID
  readinessProbe:
    httpGet:
      path: /status/health
      port: 8088
  securityContext:
    fsGroup: 1000
    runAsUser: 1000
    runAsGroup: 1000
  services:
    - spec:
        type: ClusterIP
        clusterIP: None
  tolerations:
    - key: "DRUID"
      operator: "Exists"
      effect: "NoExecute"
  nodeSelector:
    TYPE: "DRUID"
  commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
  jvm.options: |-
    -server
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Djava.io.tmpdir=/druid/data
  log4j.config: |-
    <?xml version="1.0" encoding="UTF-8" ?>
    <Configuration status="INFO">
      <Properties>
        <Property name="druid.log.path" value="log" />
      </Properties>
      <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
          <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
        </Console>
        <!-- Rolling Files-->
        <RollingRandomAccessFile name="FileAppender" fileName="${env:DRUID_LOG_DIR}/${env:DRUID_NODE_TYPE}.log" filePattern="${env:DRUID_LOG_DIR}/${env:DRUID_NODE_TYPE}.%d{yyyyMMdd}.log">
          <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
          <Policies>
          <TimeBasedTriggeringPolicy interval="1" modulate="true"/>
          </Policies>
          <DefaultRolloverStrategy>
          <Delete basePath="${env:DRUID_LOG_DIR}/" maxDepth="1">
          <IfFileName glob="*.log" />
          <IfLastModified age="30d" />
          </Delete>
          </DefaultRolloverStrategy>
        </RollingRandomAccessFile>
      </Appenders>
      <Loggers>
        <Root level="info">
          <AppenderRef ref="FileAppender"/>
        </Root>
        <!-- Set level="debug" to see stack traces for query errors -->
        <Logger name="org.apache.druid.server.QueryResource" level="info" additivity="false">
          <Appender-ref ref="FileAppender"/>
        </Logger>
        <Logger name="org.apache.druid.server.QueryLifecycle" level="info" additivity="false">
          <Appender-ref ref="FileAppender"/>
        </Logger>
        <Logger name="org.apache.druid.server.coordinator" level="info" additivity="false">
         <Appender-ref ref="FileAppender"/>
        </Logger>
        <Logger name="org.apache.druid.segment" level="info" additivity="false">
          <Appender-ref ref="FileAppender"/>
        </Logger>
        <Logger name="org.apache.druid.initialization" level="info" additivity="false">
          <Appender-ref ref="FileAppender"/>
        </Logger>
        <Logger name="org.skife.config" level="info" additivity="false">
          <Appender-ref ref="FileAppender"/>
        </Logger>
        <Logger name="com.sun.jersey.guice" level="info" additivity="false">
          <Appender-ref ref="FileAppender"/>
        </Logger>
      </Loggers>
    </Configuration>
  common.runtime.properties: |
    # Zookeeper
    druid.zk.service.host=zookeeper.druid
    druid.zk.paths.base=/druid
    druid.zk.service.compress=false
    # Metadata Store
    druid.metadata.storage.type=postgresql
    druid.metadata.storage.connector.connectURI=jdbc:postgresql://xxxxxxxxus-east-1.rds.amazonaws.com:5432/druid_meta_db
    druid.metadata.storage.connector.host=xxxxxxxxus-east-1.rds.amazonaws.com
    druid.metadata.storage.connector.port=5432
    druid.metadata.storage.connector.createTables=true
    druid.metadata.storage.connector.user=postgres
    druid.metadata.storage.connector.password=******************************
    # Deep Storage
    druid.storage.type=s3
    druid.storage.bucket=s3_bucket_name
    druid.storage.baseKey=data
    druid.storage.archiveBaseKey=archive
    druid.storage.disableAcl=true
    druid.s3.accessKey=*********************
    druid.s3.secretKey=************************
    #
    # Extensions
    #
    druid.extensions.loadList=["druid-kafka-indexing-service", "druid-s3-extensions", "postgresql-metadata-storage"]
    #
    # Service discovery
    #
    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator
    druid.indexer.logs.type=file
    druid.indexer.logs.directory=/druid/data/indexing-logs
    druid.lookup.enableLookupSyncOnStartup=false
  volumeMounts:
    - mountPath: /druid/data
      name: data-volume
    - mountPath: /druid/deepstorage
      name: deepstorage-volume
  volumes:
    - name: data-volume
      emptyDir:
        sizeLimit: 126Gi
    - name: deepstorage-volume
      hostPath:
        path: /tmp/druid/deepstorage
        type: DirectoryOrCreate
  env:
    - name: POD_NAME
      valueFrom:
        fieldRef:
          fieldPath: metadata.name
    - name: POD_NAMESPACE
      valueFrom:
        fieldRef:
          fieldPath: metadata.namespace

  nodes:
    brokers:
      nodeType: "broker"
      druid.port: 8088
      replicas: 2
      tolerations:
        - key: "DRUID"
          operator: "Exists"
          effect: "NoExecute"
      nodeSelector:
        TYPE: "DRUID"
      env:
        - name: DRUID_LOG_DIR
          value: /druid/data/log
        - name:  DRUID_NODE_TYPE
          value: broker
        - name: DRUID_XMS
          value: 2g
        - name: DRUID_XMX
          value: 2g
        - name: DRUID_MAXDIRECTMEMORYSIZE
          value: 2g
      resources:
        limits:
          cpu: 500m
          memory: 2Gi
        requests:
          cpu: 500m
          memory: 2Gi
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker"
      runtime.properties: |
        druid.service=druid/broker
        druid.broker.http.numConnections=10
        druid.server.http.numThreads=20
        druid.processing.buffer.sizeBytes=56843545
        druid.processing.numMergeBuffers=2
        druid.processing.numThreads=10
        druid.sql.enable=true

    coordinators:
      nodeType: "coordinator"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord"
      replicas: 2
      tolerations:
        - key: "DRUID"
          operator: "Exists"
          effect: "NoExecute"
      nodeSelector:
        TYPE: "DRUID"
      resources:
        limits:
          cpu: 500m
          memory: 1Gi
        requests:
          cpu: 500m
          memory: 1Gi
      env:
        - name: DRUID_LOG_DIR
          value: /druid/data/log
        - name:  DRUID_NODE_TYPE
          value: coordinator
        - name: DRUID_XMS
          value: 1g
        - name: DRUID_XMX
          value: 1g
        - name: DRUID_MAXDIRECTMEMORYSIZE
          value: 1g

      runtime.properties: |
        druid.service=druid/coordinator
        druid.coordinator.asOverlord.enabled=true
        druid.coordinator.asOverlord.overlordService=druid/overlord
        druid.indexer.runner.type=httpRemote
        druid.indexer.queue.startDelay=PT5S
        druid.coordinator.balancer.strategy=cachingCost
        druid.serverview.type=http
        druid.indexer.storage.type=metadata
        druid.coordinator.startDelay=PT10S
        druid.coordinator.period=PT5S
        druid.server.http.numThreads=50

    historicals:
      nodeType: historical
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
      replicas: 3
      tolerations:
        - key: "DRUID"
          operator: "Exists"
          effect: "NoExecute"
      nodeSelector:
        TYPE: "DRUID"
      resources:
        limits:
          cpu: 1
          memory: 2Gi
        requests:
          cpu: 1
          memory: 2Gi
      env:
        - name: DRUID_LOG_DIR
          value: /druid/data/log
        - name: DRUID_NODE_TYPE
          value: historical
        - name: DRUID_XMS
          value: 2g
        - name: DRUID_XMX
          value: 2g
        - name: DRUID_MAXDIRECTMEMORYSIZE
          value: 2g
      runtime.properties: |
        druid.service=druid/historical
        druid.server.http.numThreads=10
        druid.processing.buffer.sizeBytes=50MiB
        druid.processing.numMergeBuffers=2
        druid.processing.numThreads=15
        druid.cache.sizeInBytes=25MiB
        # Segment storage
        druid.segmentCache.locations=[{\"path\":\"/druid/data/segments\",\"maxSize\":157374182400}]
        druid.server.maxSize=157374182400

    middlemanagers:
      druid.port: 8088
      nodeType: middleManager
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/middleManager"
      replicas: 3
      tolerations:
        - key: "DRUID"
          operator: "Exists"
          effect: "NoExecute"
      nodeSelector:
        TYPE: "DRUID"
      resources:
        limits:
          cpu: 500m
          memory: 2Gi
        requests:
          cpu: 500m
          memory: 2Gi
      env:
        - name: DRUID_LOG_DIR
          value: /druid/data/log
        - name: DRUID_NODE_TYPE
          value: middleManager
        - name: DRUID_XMX
          value: 2g
        - name: DRUID_XMS
          value: 2g
      runtime.properties: |
        druid.service=druid/middleManager
        druid.worker.capacity=2
        druid.indexer.runner.javaOpts=-server -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
        druid.indexer.task.baseTaskDir=var/druid/task
        # HTTP server threads
        druid.server.http.numThreads=50
        # Processing threads and buffers on Peons
        druid.indexer.fork.property.druid.processing.numMergeBuffers=1
        druid.indexer.fork.property.druid.processing.buffer.sizeBytes=32000000
        druid.indexer.fork.property.druid.processing.numThreads=2

    routers:
      nodeType: "router"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/router"
      replicas: 2
      tolerations:
        - key: "DRUID"
          operator: "Exists"
          effect: "NoExecute"
      nodeSelector:
        TYPE: "DRUID"
      env:
        - name: DRUID_LOG_DIR
          value: /druid/data/log
        - name: DRUID_NODE_TYPE
          value: router
        - name: DRUID_XMX
          value: 1g
        - name: DRUID_XMS
          value: 1g
      resources:
        limits:
          cpu: 250m
          memory: 1Gi
        requests:
          cpu: 250m
          memory: 1Gi
      runtime.properties: |
        druid.service=druid/router
        druid.router.http.numConnections=500
        druid.router.http.readTimeout=PT5M
        druid.router.http.numMaxThreads=1000
        druid.server.http.numThreads=500
        druid.processing.buffer.sizeBytes=500MiB
        druid.processing.numMergeBuffers=6
        # Service discovery
        druid.router.defaultBrokerServiceName=druid/broker
        druid.router.coordinatorServiceName=druid/coordinator
        # Management proxy to coordinator / overlord: required for unified web console.
        druid.router.managementProxy.enabled=true
      services:
        -  spec:
            ports:
              - name: router-port
                port: 8088
            type: ClusterIP

$ kubectl apply -f druid-cluster/dev-cluster.yaml -n druid
druid.druid.apache.org/cluster created

Let’s check all the pods:

$ kubectl get pods -n druid
NAME READY STATUS RESTARTS AGE
druid-cluster-brokers-0 1/1 Running 0 114s
druid-cluster-brokers-1 1/1 Running 0 114s
druid-cluster-brokers-2 0/1 Pending 0 114s
druid-cluster-coordinators-0 1/1 Running 0 114s
druid-cluster-coordinators-1 0/1 Pending 0 114s
druid-cluster-historicals-0 1/1 Running 0 114s
druid-cluster-historicals-1 1/1 Running 0 114s
druid-cluster-historicals-2 0/1 Pending 0 114s
druid-cluster-middlemanagers-0 1/1 Running 0 114s
druid-cluster-middlemanagers-1 1/1 Running 0 114s
druid-cluster-middlemanagers-2 1/1 Running 0 114s
druid-cluster-routers-0 1/1 Running 0 114s
druid-cluster-routers-1 1/1 Running 0 114s
druid-cluster-routers-2 1/1 Running 0 114s
zookeeper-0 1/1 Running 0 48m
zookeeper-1 1/1 Running 0 48m
zookeeper-2 1/1 Running 0 48m

Some of the pods are pending. wait for a few seconds and try again:

$ kubectl get pods -n druid
NAME READY STATUS RESTARTS AGE
druid-cluster-brokers-0 1/1 Running 0 10m
druid-cluster-brokers-1 0/1 Running 0 94s
druid-cluster-coordinators-0 0/1 Running 0 32s
druid-cluster-coordinators-1 1/1 Running 0 74s
druid-cluster-historicals-0 1/1 Running 0 72s
druid-cluster-historicals-1 1/1 Running 0 2m16s
druid-cluster-historicals-2 1/1 Running 0 3m19s
druid-cluster-middlemanagers-0 1/1 Running 0 10m
druid-cluster-middlemanagers-1 1/1 Running 0 10m
druid-cluster-middlemanagers-2 1/1 Running 0 10m
druid-cluster-routers-0 1/1 Running 0 10m
druid-cluster-routers-1 1/1 Running 0 10m
zookeeper-0 1/1 Running 0 56m
zookeeper-1 1/1 Running 0 56m
zookeeper-2 1/1 Running 0 56m

Now all processes are up and running.

Let’s connect to the druid console using port forwarding:

$ kubectl port-forward service/druid-cluster-routers -n druid 8088:8088

open the browser and connect to http://localhost:8088

Wow! Druid cluster is running on EKS!!!!

Let’s try to load data on Druid:

Click on the load data tab and select the example data:

Select all the default settings and you will reach on submit page:

click on submit and wait for a few seconds:

Refresh it:

Data is loaded.

Let’s do some queries:

Click on the query table:

Select count query and click on run:

You can see the query results!!!.

References:

  1. Druid
  2. Druid-operator
  3. Zookeeper