Condividi tramite


How does Spark determine partitions for an RDD?

The most fundamental data structure in Spark is called RDD (Resilient Distributed Dataset). An RDD can have one or many partitions, and each partition is a subset of data that resides on one node in your Spark cluster. The number of partitions is one of the factors to determine the parallelism of Spark task execution. Most of the time you do not need to worry about partitions. However, if you are tuning performance or investigating the resulting data files (e.g. one RDD partition is written out as one partial file in HDFS), you need to understand how Spark determines the number of partitions for an RDD.

The rule of thumb: the number of partitions of an RDD is usually the same as the parent RDD, except for coalesce(), or reduceByKey() etc. when a shuffle is involved. For RDDs with no parents (e.g. when reading a file from HDFS), the number of partitions is determined by an algorithm very similar to Hadoop's FileInputFormat.getSplits().

1. How Hadoop calculates splits

The source code in Hadoop to calculate splits from input data in HDFS (FileInputFormat.java): https://github.com/apache/hadoop/blob/e30710aea4e6e55e69372929106cf119af06fd0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java#L319

This is pseudo code:

 getSplits(conf, numSplits) => max(minSize, min(goalSize, blockSize))
   minSize = conf: mapreduce.input.fileinputformat.split.minsize, defaults to 0
   goalSize = totalSize / numSplits

The blockSize is specific to your distributed file system configuration. For WASB, the default is 512MB, but you can change it by configuration "fs.azure.block.size", as you can see here:

https://github.com/apache/hadoop/blob/e30710aea4e6e55e69372929106cf119af06fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java#L687

To put it in simple English, the split size is the smaller of goal size and file system block size. The goal size is the total size of the input files divided by the intended number of splits.

2. How Spark calculates splits

2.1 Unstructured APIs

SparkContext exposes a few APIs to directly read a Hadoop file from HDFS to an RDD. This is done in HadoopRDD.scala which is basically a wrapper on Hadoop's FileInputFormat class. The source code:

https://github.com/apache/spark/blob/992447fb30ee9ebb3cf794f2d06f4d63a2d792db/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L196

The numSplits parameter when calling FileInputFormat.getSplits() is hardcoded to 2 by Spark, as can be seen here:

https://github.com/apache/spark/blob/e9f983df275c138626af35fd263a7abedf69297f/core/src/main/scala/org/apache/spark/SparkContext.scala#L2329

Let's look at an example in HDInsight cluster (Version 3.6 with Spark 2.2). Let's load an CSV file which is around 200KB. In spark-shell, just type:

 val text = spark.sparkContext.textFile("/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
text.getNumPartitions

The result is 2. This is because splitSize is the goal size which is totalSize/2 since blockSize is much bigger at 512MB.

2.2 Structured APIs

For Dataframe based APIs, one can use DataFrameReader to load a file. Eventually this is handed in DataSourceScanExec.createNonBucketedReadRDD():

https://github.com/apache/spark/blob/992447fb30ee9ebb3cf794f2d06f4d63a2d792db/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L396

This is the pseudo code:

 spltiSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
   defaultMaxSplitBytes = conf: spark.sql.files.maxPartitionBytes, defaults to 128MB
   openCostInBytes = conf: spark.sql.files.openCostInBytes, default to 4MB
   bytesPerCore = totalBytes / defaultParallelism
   defaultParallelism = conf: spark.default.parallelism, defaults to (conf: spark.executor.cores) * (conf: spark.executor.instances)

The openCostInBytes is an estimated cost of opening a file, expressed as the same amount of time when reading bytes from a file. This is to avoid the situation when the split size is too small. The split size is usually the goal size, which is the total size of all input files divided by the total number of cores for all executors. When the goal size is smaller than the estimated cost of opening a file, we use that estimation as splitSize. We can see that Spark structured API does not quite concern with the HDFS block size, and tries to evenly distribute the input bytes among all the executors.

For example, in HDInsight cluster (Version 3.6 with Spark 2.2):

 val hvac = spark.read.format("csv").load("/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
hvac.rdd.getNumPartitions()

The result is 1. This is different from the result from Structured API, why? Because the bytesPerCore is way too small than the openCostInBytes so the splitSize is set to 4MB, therefore, the number of splits is 1.

3. How to manipulate number of splits for input files in HDInsight

For Unstructured API, to increase the number of splits, you can lower the blockSize of the file system or lower minSize ; to decrease, increase blockSize or minSize. The minSize is controlled by "mapreduce.input.fileinputformat.split.minsize"; note that you have to use "spark.hadoop" prefix with this configuration. For HDFS, the blockSize is a per file meta data so you cannot manipulate it unless you regenerate the file. For WASB the block size is faked by configuration "fs.azure.block.size" so you can manipulate it at any time. For ADLS the block size is hard-coded to 256MB. Note that the minimum number of partitions is 2 according to the above formula.

For Structured API, to increase the number of splits, you can lower defaultMaxSplitBytes, to decrease, increase defaultMaxSplitBytes. The defaultMaxSplitBytes is controlled by configuration "spark.sql.files.maxPartitionBytes". Note that there is a lower bound for the number of splits, which is equal to the defaultParallelism if the totalSize is not too small (precisely openCostInBytes * defaultParallelism).