Apache Spark SQL User Defined Function (UDF) POC in JavaData ScienceTech by Sunny Srinidhi - May 14, 2019December 19, 20192 If you’ve worked with Spark SQL, you might have come across the concept of User Defined Functions (UDFs). As the name suggests, it’s a feature where you define a function, pretty straight forward. But how is this different from any other custom function that you write? Well, when you’re working with Spark in a distributed environment, your code is distributed across the cluster. For this to happen, your code entities have to be serializable, including the various functions you call.When you want to manipulate columns in your Dataset, Spark provides a variety of built-in functions. But there are cases when you want a custom implementation to work with your columns. For this, Spark provides UDF. But you should be warned, UDFs should be used as sparingly as possible. This is because a UDF is a blackbox, and Spark cannot and doesn’t try to optimize it. So you have to take care that your UDF is optimized to the best possible level. I wanted to write this post because you’ll find a lot of examples of registering and using UDFs in Scala, but very few examples in Java. When I had to implement UDFs in my project, I had to spend the better part of a day to churn out a working code. So I thought I better document it for future reference. So let’s get our hands dirty with code snippets.The DatasetBefore we look at the code, let’s take a look at the sample dataset that we’re going to use. For this example, I used a .csv file, as it is easy to create, understand, and manipulate in an example. The dataset I used is as follows:name,number name1,1 name2,2 name3,3 name4,4As you can see, there is a header row (which means we need to ignore it during processing). There is one String column and one integer column. In this POC, we’ll be adding one more integer column and one more string column:The new integer column will double the value of the existing integer column (number).The new string column will convert the existing string column (name) text into uppercase.The ExampleFirst, let’s look at the boiler plate code, where we create a JavaSparkContext, a SQLConext, and a SparkSession.private String sparkMaster; private JavaSparkContext javaSparkContext; private SQLContext sqlContext; private SparkSession sparkSession; public static void main(String[] args) { sparkMaster = properties.getProperty("spark.master"); javaSparkContext = createJavaSparkContext(); sqlContext = new SQLContext(javaSparkContext); sparkSession = sqlContext.sparkSession(); }sparkMaster tells the system if this program has to be run on a cluster (spark.master=yarn) or as a stand along program (spark.master=local).Registering the UDFsOnce we have this boiler plate code in place, we’ll move on to registering the UDFs. For this example, we have two UDFs — one for doubling the integer value, and the other for converting the string value to uppercase. We can register a UDF using the SparkSession instance that we created earlier: public static final String COLUMN_DOUBLE_UDF_NAME = "columnDoubleUdf"; public static final String COLUMN_UPPERCASE_UDF_NAME = "columnUppercase"; this.sqlContext.udf().register(COLUMN_DOUBLE_UDF_NAME, (UDF1<String, Integer>) (columnValue) -> { return Integer.parseInt(columnValue) * 2; }, DataTypes.IntegerType); this.sqlContext.udf().register(COLUMN_UPPERCASE_UDF_NAME, (UDF1<String, String>) (columnValue) -> { return columnValue.toUpperCase(); }, DataTypes.StringType);In both the UDFs, we have only one input parameter, columnValue. But if you observe the UDF declaration, you can see that there are two parameters — (UDF1<String, Integer>). This is because we have to specify the return type as well, in this case, an integer. Similarly, in the second UDF, we’re returning a string.Now, we’ll read the input data into a Dataset. In this example, we create a POJO class which matches the structure of the data in the .csv file. This way, we’ll get a Dataset of that class instead of the generic SparkSQL Row class. This can be avoided, and is provided here just an an example. To read the file content as a Dataset, we’ll use the following code:public static void main(String[] args) { … Dataset inputFileDataset = getDatasetFromFile(inputFilePath); … } public Dataset getDatasetFromFile(String filePath) { Dataset fileDataSet = this.sparkSession.read().option("header", "true").csv(filePath) .as(Encoders.bean(FileInputLine.class)); return fileDataSet; }Now that we have the dataset in memory, let’s print it and see how it looks:+-----+------+ | name|number| +-----+------+ |name1| 1| |name2| 2| |name3| 3| |name4| 4| +-----+------+Right, this is what we want. Let’s call our first UDF, to double the value of the number column, and add that as a new column, doubledNumber. The code for this is pretty simple. On the fileDataSet object, we call the withColumn() method, which takes two parameters. The first parameter is the name of the column, and the second is a call to the UDF, which returns a Column. In our case, the code looks like this:public static final String DOUBLED_COLUMN_NAME = "doubledNumber"; public static final String NUMBER_COLUMN_NAME = "number"; Dataset<Row> doubledColumnDataset = inputFileDataset.withColumn(DOUBLED_COLUMN_NAME, callUDF(COLUMN_DOUBLE_UDF_NAME, col(NUMBER_COLUMN_NAME)));You can see that the parameters we pass to a UDF is a col() value. This function will return the string value of the column.After executing this, we should have an extra column in the new dataset, doubledColumnDataset. We can confirm this by printing this dataset to our console with the following instruction: doubledColumnDataset.show();This should print the following table:+-----+------+-------------+ | name|number|doubledNumber| +-----+------+-------------+ |name1| 1| 2| |name2| 2| 4| |name3| 3| 6| |name4| 4| 8| +-----+------+-------------+As expected, we have the original dataset along with the new column, which is nothing but the double of the integer values in the original dataset. If you observe, this dataset is of type Row, Dataset<Row>. This is the default type we’ll get. If you fancy, you can create another POJO class for this, as we did for the first Dataset. But I wanted to show examples of both cases.Anyway, now that we have the expected dataset with a new column after calling the UDF, let’s call the second UDF on this new dataset:public static final String UPPSERCASE_NAME_COLUMN_NAME = "nameUppercase"; public static final String NAME_COLUMN_NAME = "name"; Dataset<Row> upperCaseColumnDataset = doubledColumnDataset.withColumn(UPPSERCASE_NAME_COLUMN_NAME, callUDF(COLUMN_UPPERCASE_UDF_NAME, col(NAME_COLUMN_NAME)));As usual, we’ll print this dataset and verify if we have the new column:upperCaseColumnDataset.show();We’ll see this following table:+-----+------+-------------+-------------+ | name|number|doubledNumber|nameUppercase| +-----+------+-------------+-------------+ |name1| 1| 2| NAME1| |name2| 2| 4| NAME2| |name3| 3| 6| NAME3| |name4| 4| 8| NAME4| +-----+------+-------------+-------------+That’s pretty much it. We now know how to register a UDF, how to call a UDF, and how to pass parameters to a UDF. In the next post, we’ll see how to pass literals to a UDF, why we need to pass literals, what could cause exceptions (Task not serializable exception) and much more. BTW, if you try to run this particular program on a cluster, it might fail. We’ll discuss that as well in the next post.If you’re interested in looking a complete working project with these code snippets, have a look at my POC project on GitHub. Share this:ShareTwitterFacebookPrintEmailLinkedInRedditPinterestPocketTelegramThreadsWhatsAppMastodonNextdoorXLike this:Like Loading...Related
Hi Sunny, nice post. Can you please provide me with the link to your next post where you have provided the approach to run this program in cluster mode? I am trying to run this program in a cluster and am getting a ClassCastException similar to the one reported in https://issues.apache.org/jira/browse/SPARK-19938 The application works fine if I don’t use any UDFs in the code. Thanks!!Reply
Hi Shrikant! I haven’t written about how you can run it in a cluster. It should work without changes, as our production code is still running to this day without any changes. But if you can tell me what the issue is, maybe I can try and help. You can connect with me on Twitter (link at the top right corner of this page), so that we don’t have to discuss that stuff out here in the public.Reply