Freigeben über


Extending Spark with Extension Methods in Scala: Fun with Implicits

The sample Jupyter Scala notebook described in this blog can be downloaded from https://github.com/hdinsight/spark-jupyter-notebooks/blob/master/Scala/ScalaExtensionMethod.ipynb


Extension methods are programming language constructs which enable extending an object with additional methods after the original object has already been compiled. They are useful when a developer wants to add capabilities to an existing object when only the compiled object is available or the developer does not want to modify the source code or the source code is not available or accessible to the developer. Scala supports extension methods through implicits which we will use in an example to extend Spark DataFrame with a method to save it in an Azure SQL table. Though this example is presented as a complete Jupyter notebook that can be run on HDInsight clusters, the purpose of this blog is to demonstrate a way to the Spark developers to ship their JARs that extend Spark with new functionalities.

To start with this example we need to configure Jupyter to use an additional Microsoft JDBC Driver JAR available at https://www.microsoft.com/en-us/download/details.aspx?id=11774 and place it in a known folder in the default container of the default storage account of the HDInsight cluster. To learn about how to deploy Azure HDInsight Linux Spark cluster and launch Jupyter notebook refer to the Azure article at https://azure.microsoft.com/en-us/documentation/articles/hdinsight-apache-spark-jupyter-spark-sql/ ExtendingSpark-1

Define a case class Rectangle that will hold the data which will be used to create the DataFrame for this example. In practice the DataFrame data source can be anything that is supported by Spark.

ExtendingSpark-2

Define a simple utility class to generate the Azure SQL database connection string from Azure SQL server fully qualified domain name, Azure SQL database name, Azure SQL database username and Azure SQL database password.

ExtendingSpark-3

Enter the correct database parameters by replacing the entries marked below and create the Azure SQL database connection string.

ExtendingSpark-4

Create the Azure SQL table that corresponds to the case class Rectangle defined earlier. Azure SQL mandates a clustered index needs to be created in absence of a primary key. This part of the code runs once in the driver.

ExtendingSpark-5

The DataFrameExtension object is the crux of this blog. It defines a class ExtendedDataFrame that takes in the DataFrame object in the constructor and defines a methods saveToAzureSQL that takes in the Azure SQL database connection string and the Azure SQL table name as parameters. This method first creates a comma separated string out of the column names of the DataFrame that will be part of the column identifiers of Azure SQL insert statement. It then creates a format string based on the data types of the DataFrame columns that will hold each row of the DataFrame as part of the values in the same insert statement. Each partition in the DataFrame will then be grouped into batches of 1000 records and added to the insert statement based on the format determined earlier to create a multiple row insert statement. The insert statement is then executed through executeUpdate method of the Azure SQL JDBC connection object to actually land the data in the Azure SQL table. An implicit conversion rule is named with the extendedDataFrame instance to help resolve the saveToAzureSql method when called on the DataFrame object in the application code. For basic details of Scala implicit conversions refer to https://docs.scala-lang.org/tutorials/tour/implicit-conversions.html ExtendingSpark-6

To test the implicit conversion rule declared above create a list of Rectangle objects.

ExtendingSpark-7

Use the hiveContext already instantiated by the Jupyter Scala notebook to create a DataFrame out of the list of Rectangle objects.

ExtendingSpark-8

Import the DataFrameExtension object declared above and call the saveToAzureSql method directly on the DataFrame as if the method is actually declared inside of the DataFrame class. This part of the code runs on executors.

ExtendingSpark-9

Verify that the data is actually saved in the Azure SQL table.

ExtendingSpark-10

 

[Contributed by Arijit Tarafdar]