Apache Spark is one of the most popular big data processing tools today. It’s used extensively for data sizes small to large. The availability of Spark in more than one programming language makes it a favourite tool for data engineers and data scientists coming from various backgrounds. However, PySpark appears to be the most popular choice.
Even though Spark comes with a bunch of auto-optimisation techniques out of the box, there’s a lot we can do at the code or the infrastructure level that can significantly improve the performance of our Spark jobs. In this article, we are going to look at a few such techniques. Maybe, if I get the time, I’ll follow this up with more such techniques in the future.
More often than not, we use one data frame in our code more than once. This can be data read from the disk, or an intermediate computation that’s required again and again in future computations. When we are repeatedly using a data frame like this, it makes sense to keep it aside as is and not read the data from disk again or re-run the computations again. If it’s cooked and stored in the state we need, we can just keep using it as many times as we need. This is where caching comes in handy.
Spark offers three options for both RDDs and data frames to cache data for as long as we need it. The data will be cleared when the job is either killed, failed, or completed. The three options are:
- Retain data in memory
- Write data to disk and read it when required
- Retain part of the data in memory and write the other part to disk
As you might imagine, the first option is the best and the most used. Keeping data in memory is the best option because the read and write overhead to memory is negligible when compared to reading and writing data from disk. The I/O overhead for this makes this a bad option in most cases. But you might wonder why even provide this option if it’s bad. Well, the amount of data we work with might not always fit into the memory we have available for our jobs.. Having enough memory for your Spark jobs is sometimes a luxury that most jobs don’t posses. So in such cases, even though the I/O overhead is significant, it would still be less than the time taken to redo the computation for a data frame. In such cases, we can simply write the data frame to disk and read it again when required.
There are two data frame methods on offer for this –
persist(). As you can imagine,
cache() will cache the data in memory. Actually, it is just a shorthand for the
persist() method. The
persist() method takes in arguments to help understand if we want to use only memory, only disk, or both to save our data. The
cache() method does not take any arguments, and in turn calls the
persist() method and passes the required arguments, making it easy for us to write the code.
Caching data can save significant amount of time and resources in Spark jobs. Go through your code and see if there are data frames that are being used more than once. If so, try caching them and re-running the jobs to see there’s any difference in the performance.
Data serialisation gets overlooked a lot of times, and is not given much importance. But it can provide performance improvements when tuned properly. Spark serialises data when data needs to be shuffled across the network or when data needs to be written to disk. This is pretty common for distributed applications. Spark provides two serialisers for this – Java serialiser and Kyro serialiser. As Spark is built to run in a JVM, it uses the Java serialiser by default. Java serialiser is good, but Kyro provides much better performance. The Kyro serialiser is a much compact binary when compared to Java serialiser, and there have been tests where Kyro has performed as much as 10x better than the Java serialiser.
The performance benefits could vary based on the data we are working with. But before dismissing Kyro serialiser, it would be wise to test it out with your datasets in a test env and comparing the results with the Java serialiser. Similarly, there are instances where Java serialiser would be just enough, for example, when working with comparatively smaller datasets. Switching between the two is as simple as setting or unsetting a configuration parameter –
Garbage collection is a huge part of optimising JVM, and Spark runs on JVM. So by default, Spark will see significant performance gains by collecting garbage properly. The recommendation is to use the G1GC garbage collector.
The best way to use garbage collectors is to first look at the logs of our jobs (run in verbose mode), and try to see how garbage collection is happening now. Based on this, we’ll have to both optimise our code and the usage of the garbage collector as well.
G1GC improves the performance of our Spark jobs by reducing the pause times between various processes or tasks in our Spark jobs. By continuously monitoring the memory, even the heap space, the garbage collector makes sure there’s enough memory and unused objects are no longer taking up space.
Ideally, the garbage collection overhead should not be more than 10% of the heap space, and this can be monitored to make sure everything is healthy. And this optimisation should be done both at the code level and the platform level. In our code, we need to be careful about the objects we create and not create any objects that we don’t need.
All programming languages offer certain performance tuning methods, and we can use all of them in our Spark code as well to gain performance benefits at the programming language level. For this discussion, we are going to suppose our Spark code is in Java and go ahead with the performance tuning techniques.
Even outside of Spark, Java provides some performance tuning methods that can all be utilised in Spark, as Spark jobs still run inside JVMs. For example, using simple data structures without a lot of nesting makes it easy for Java to handle them and work on them. Similarly, we can use enums or enumerated data types for maps or key value pairs instead of using strings as keys in these Collection data structures. This makes it easy for Java to handle these objects in a much better way. There are many more such techniques that can be employed not just in Java, but in Python and Scala too.
Early versions of Spark came with separate storage and execution memories. This meant if the storage memory is full, we’ll have to spill the execution memory data over to the disk to make place for the new data. This is both slow and not ideal when we’re dealing with huge amounts of data. Spark has two configuration parameters that help in making this easier:
spark.executor.memory is used to set the maximum amount of memory one executor can use. This by default is 1 GB.
spark.memory.fraction is used to set the fraction of the JVM heap memory that’s reserved for JVM and Spark’s internal use, and the executor’s use. By default the value is 0.6, which means 60% of the memory is used by the executor for user applications and 40% is reserved for system use.
When our Spark application runs out of storage space, cached data is spilled over and the execution memory is also used for storing. When this happens, the Least Recently Used (LRU) algorithm is used to spill the oldest data in the cache.
To avoid this issue, Spark released unified memory in version 1.6. According to this, there’s no fixed limit for execution or storage memory. If an application wants, it can use all of the memory for execution. Unified memory makes two assumptions:
- We can unload cached data from memory to allow the application to utilise all it for execution. This is because we can soon load the data back into the cache anyway.
- There should be a minimum amount of cache memory or data that can’t be deleted from the memory for applications that use the cache memory critically (
Using these techniques of unified memory and controlling the amount of memory allocated for the executors, we can easily tune the performance of our Spark applications. This has to be one of the most easiest performance tuning techniques available.
In this blog post, we saw a few techniques with which we can optimise the performance of our Spark jobs. This list is just a few of the various configuration parameters, options, or techniques available to optimise Spark jobs. As I mentioned earlier, if time permits, I’ll write another blog post with more such techniques. For now, you can read more about Apache Spark or big data in general.Become a Patron!