The sample Jupyter Scala notebook described in this blog can be downloaded from https://github.com/hdinsight/spark-jupyter-notebooks/blob/master/Scala/ScalaExtensionMethod.ipynb
Extension methods are programming language constructs which enable extending an object with additional methods after the original object has already been compiled. They are useful when a developer wants to add capabilities to an existing object when only the compiled object is available or the developer does not want to modify the source code or the source code is not available or accessible to the developer. Scala supports extension methods through implicits which we will use in an example to extend Spark
DataFrame with a method to save it in an Azure SQL table. Though this example is presented as a complete Jupyter notebook that can be run on HDInsight clusters, the purpose of this blog is to demonstrate a way to the Spark developers to ship their JARs that extend Spark with new functionalities.
To start with this example we need to configure Jupyter to use an additional Microsoft JDBC Driver JAR available at https://www.microsoft.com/en-us/download/details.aspx?id=11774 and place it in a known folder in the default container of the default storage account of the HDInsight cluster. To learn about how to deploy Azure HDInsight Linux Spark cluster and launch Jupyter notebook refer to the Azure article at https://azure.microsoft.com/en-us/documentation/articles/hdinsight-apache-spark-jupyter-spark-sql/
case class Rectangle that will hold the data which will be used to create the
DataFrame for this example. In practice the
DataFrame data source can be anything that is supported by Spark.
Define a simple utility class to generate the Azure SQL database connection string from Azure SQL server fully qualified domain name, Azure SQL database name, Azure SQL database username and Azure SQL database password.
Enter the correct database parameters by replacing the entries marked below and create the Azure SQL database connection string.
Create the Azure SQL table that corresponds to the
case class Rectangle defined earlier. Azure SQL mandates a clustered index needs to be created in absence of a primary key. This part of the code runs once in the driver.
DataFrameExtension object is the crux of this blog. It defines a class
ExtendedDataFrame that takes in the
DataFrame object in the constructor and defines a methods
saveToAzureSQL that takes in the Azure SQL database connection string and the Azure SQL table name as parameters. This method first creates a comma separated string out of the column names of the
DataFrame that will be part of the column identifiers of Azure SQL insert statement. It then creates a format string based on the data types of the
DataFrame columns that will hold each row of the
DataFrame as part of the values in the same insert statement. Each partition in the
DataFrame will then be grouped into batches of 1000 records and added to the insert statement based on the format determined earlier to create a multiple row insert statement. The insert statement is then executed through
executeUpdate method of the Azure SQL JDBC connection object to actually land the data in the Azure SQL table. An implicit conversion rule is named with the
extendedDataFrame instance to help resolve the
saveToAzureSql method when called on the
DataFrame object in the application code. For basic details of Scala implicit conversions refer to http://docs.scala-lang.org/tutorials/tour/implicit-conversions.html
To test the implicit conversion rule declared above create a list of
hiveContext already instantiated by the Jupyter Scala notebook to create a
DataFrame out of the list of
DataFrameExtension object declared above and call the
saveToAzureSql method directly on the
DataFrame as if the method is actually declared inside of the
DataFrame class. This part of the code runs on executors.
Verify that the data is actually saved in the Azure SQL table.
[Contributed by Arijit Tarafdar]