Data ingestion into Druid

We are using Apache Spark for the data processing. Surprisingly, There is no official Spark connector for the Druid database. Druid’s primary ingestion methods are all pull-based. That means Druid reads the data from sources like S3 and Postgres and stores it in data sources, which are similar to tables in a traditional RDBMS.

For Loading data into Druid, we are writing all the data on S3 buckets in JSON format and sending API calls to the Druid indexer to load data from S3.

Druid supports JSON, CSV, Parquet, ORC, and other formats.

Druid requires these details to load the data:

  • Datasource name
  • Date/Time field
  • Data location
  • Data format
  • Granularity
  • Dimensions(Fields name)

Here is an example of a request Spec:

request.json

{
  "type": "index_parallel",
  "spec": {
    "dataSchema": {
      "dataSource": "analysis_datasource",
      "timestampSpec": {
        "column": "dateTime",
        "format": "auto"
      },
      "dimensionsSpec": {
        "includeAllDimensions": true
      },
      "granularitySpec": {
        "segmentGranularity": "month",
        "queryGranularity": "none",
        "rollup": false
      }
    },
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "s3",
        "prefixes": ["s3://druid-data-bucket/analysis/scores"]
      },
      "inputFormat": {
        "type": "json"
      },
      "appendToExisting": true,
      "dropExisting": false
    },
    "tuningConfig": {
      "type": "index_parallel"
    }
  }
}

These are minimal parameters. You can change it according to your requirements. Here are more details

Make an API call to the druid indexer using CURL:

curl -X 'POST' -H 'Content-Type:application/json' -d @request.json http://localhost:8081/druid/indexer/v1/task

Response:

{"task":"index_parallel_analysis_datasource_oebimnie_2023-07-08T14:15:35.643Z"}

Ingestion is started. You can check the ingestion tab in the Druid UI:

So far so good. But you notice that only one task is running. A single task may take longer time if the data is big. Even your Druid cluster has more slots to run parallel task.

In my case, the Druid cluster has 12 slots. That means Druid can run 12 parallel tasks.

So let’s increase the parallelism from 1 to 12. (default parallelism is 1)

Update the tuningConfig:

"tuningConfig": {
  "type": "index_parallel",
  "maxNumConcurrentSubTasks": 12
}

Before starting the ingestion, Make sure your data is partitioned or sorted on the S3 bucket like month, week, and day. In my case data is partitioned by month.

Let’s run again and check the ingestion Tab:

Now, 12 subtasks are running in parallel. Druid will split data loading into multiple subtasks and run the 12 subtasks in parallel.

12 slots are fully utilized!!!

21M Data is loaded into druid in 28 Minutes.

The data is loaded !!!!

Let’s try the query:

References:

https://druid.apache.org/docs/latest/ingestion/index.html

Spark On EKS: Enable S3 Magic committer

In this blog, I will describe, how to enable the S3 magic committer on the Spark cluster(Running on AWS Elastic Kubernetes Service).

Prerequisites:

  • EKS Cluster on AWS
  • Spark cluster setup on EKS

First, I will describe the need of s3 magic committer. I will take my use case as an example. Reading data from Postgres tables(meta, data ), joining them, and writing on S3. Here is the code:

Meta table:

val metaTable = s"(select * from analysis_meta where id >= 1220572171319017472 and id < 1620572171319017473) as meta"

val metaDf =
        spark
          .read
          .format("jdbc")
          .option("numPartitions" , "2000")
          .option("lowerBound",  "1220572171319017472")
          .option("upperBound", "1620572171319017473")
          .option("partitionColumn", "id")
          .option("dbtable", metaTable)
          .option("driver" , "org.postgresql.Driver")
          .option("url", "jdbc:postgresql://AWS-RDS-URL.amazonaws.com/analysis_db?user=****&password=****")
          .load()

Spark requires these parameters to partition the data of the table:

  • numPartitions
  • lowerBound
  • upperBound
  • partitionColumn

These parameters are mandatory otherwise spark will read all the data into a single partition.

Data table:

val textTable = s"(select id , text from analysis_data where id >= 1220572171319017472 and id < 1620572171319017473) as data"

val textDf =
      spark
        .read
        .format("jdbc")
        .option("numPartitions", "2000")
        .option("lowerBound", "1220572171319017472")
        .option("upperBound", "1620572171319017473")
        .option("partitionColumn", "id")
        .option("dbtable", textTable)
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://AWS-RDS-URL.amazonaws.com/analysis_db?user=****&password=****")
        .load()

Join the meta and data table then write on S3:

val df = metaDf.join(textDf, "id")

      df
        .write
        .partitionBy("lang", "month")  // partition by language and month
        .mode(SaveMode.Append)
        .parquet(s"s3a://bucket-name/analysis-data")

let’s run this job. Both tables have 80M rows(~ 400GB). It will take 1 hour to complete.

DAG:

Stage:

Job:

My Spark cluster size is 8 * m5.2xlarge nodes.

If you notice in the jobs details screenshot, the Job took 1 hour to complete but data is not committed into the folder. It is still in _temporary folder.

To commit all the data spark took more than 12 hours.

That was surprising but It is not a Spark problem. Spark is warning about that in the log:

11549:23/06/30 09:26:40 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.

Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.”

Let’s switch to the magic committer. It requires just a configuration change:

spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled=true

and Spark Hadoop cloud dependency. I uploaded this jar on S3 and included it in the job submission.

I am running the spark cluster on EKS so here full job submission:

kubectl exec -ti --namespace spark spark-cluster-worker-0 -- spark-submit --master spark://spark-cluster-master-svc:7077 \
	--conf 'spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem' \
    --conf 'spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled=true' \
    --conf 'spark.hadoop.fs.s3a.connection.maximum=1000' \
    --conf 'spark.driver.memory=8g' \
	--conf 'spark.executor.memory=20g' \
    --conf 'spark.executor.cores=6' \
    --conf 'spark.sql.shuffle.partitions=400' \
    --supervise \
	--deploy-mode cluster \
    --jars s3a://bucket-name/dependencies/spark-hadoop-cloud_2.12-3.3.2.jar \
	--class com.techmonad.main.Postgres2S3App \
      s3a://bucket-name/app/Postgres2S3App-30-06-2023_06_13.jar

The job took the same time but data is committed into a folder in 2 hours. Wow!!!!

here is the code.

References: