Rediger

Del via


Azure Data Explorer Connector for Apache Spark

Apache Spark is a unified analytics engine for large-scale data processing. Azure Data Explorer is a fast, fully managed data analytics service for real-time analysis on large volumes of data.

The Kusto connector for Spark is an open source project that can run on any Spark cluster. It implements data source and data sink for moving data across Azure Data Explorer and Spark clusters. Using Azure Data Explorer and Apache Spark, you can build fast and scalable applications targeting data driven scenarios. For example, machine learning (ML), Extract-Transform-Load (ETL), and Log Analytics. With the connector, Azure Data Explorer becomes a valid data store for standard Spark source and sink operations, such as write, read, and writeStream.

You can write to Azure Data Explorer via queued ingestion or streaming ingestion. Reading from Azure Data Explorer supports column pruning and predicate pushdown, which filters the data in Azure Data Explorer, reducing the volume of transferred data.

Note

For information about working with the Synapse Spark connector for Azure Data Explorer, see Connect to Azure Data Explorer using Apache Spark for Azure Synapse Analytics.

This topic describes how to install and configure the Azure Data Explorer Spark connector and move data between Azure Data Explorer and Apache Spark clusters.

Note

Although some of the examples below refer to an Azure Databricks Spark cluster, Azure Data Explorer Spark connector does not take direct dependencies on Databricks or any other Spark distribution.

Prerequisites

Tip

Spark 2.3.x versions are also supported, but may require some changes in pom.xml dependencies.

How to build the Spark connector

Starting version 2.3.0 we introduce new artifact Ids replacing spark-kusto-connector: kusto-spark_3.0_2.12 targeting Spark 3.x and Scala 2.12.

Note

Versions prior to 2.5.1 do not work anymore for ingest to an existing table, please update to a later version. This step is optional. If you are using pre-built libraries, for example, Maven, see Spark cluster setup.

Build prerequisites

  1. Refer to this source for building the Spark Connector.

  2. For Scala/Java applications using Maven project definitions, link your application with the latest artifact. Find the latest artifact on Maven Central.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. If you aren't using prebuilt libraries, you need to install the libraries listed in dependencies including the following Kusto Java SDK libraries. To find the right version to install, look in the relevant release's pom:

    1. To build jar and run all tests:

      mvn clean package -DskipTests
      
    2. To build jar, run all tests, and install jar to your local Maven repository:

      mvn clean install -DskipTests
      

For more information, see connector usage.

Spark cluster setup

Note

It's recommended to use the latest Kusto Spark connector release when performing the following steps.

  1. Configure the following Spark cluster settings, based on Azure Databricks cluster Spark 3.0.1 and Scala 2.12:

    Databricks cluster settings.

  2. Install the latest spark-kusto-connector library from Maven:

    Import libraries. Select Spark-Kusto-Connector.

  3. Verify that all required libraries are installed:

    Verify libraries installed.

  4. For installation using a JAR file, verify that other dependencies were installed:

    Add dependencies.

Authentication

Kusto Spark connector enables you to authenticate with Microsoft Entra ID using one of the following methods:

Microsoft Entra application authentication

Microsoft Entra application authentication is the simplest and most common authentication method and is recommended for the Kusto Spark connector.

  1. Sign in to your Azure subscription via Azure CLI. Then authenticate in the browser.

    az login
    
  2. Choose the subscription to host the principal. This step is needed when you have multiple subscriptions.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Create the service principal. In this example, the service principal is called my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. From the returned JSON data, copy the appId, password, and tenant for future use.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

You've created your Microsoft Entra application and service principal.

The Spark connector uses the following Entra app properties for authentication:

Properties Option String Description
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra application (client) identifier.
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra authentication authority. Microsoft Entra Directory (tenant) ID. Optional - defaults to microsoft.com. For more information, see Microsoft Entra authority.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra application key for the client.
KUSTO_ACCESS_TOKEN kustoAccessToken If you already have an accessToken that is created with access to Kusto, that can be used passed to the connector as well for authentication.

Note

Older API versions (less than 2.0.0) have the following naming: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Kusto privileges

Grant the following privileges on the kusto side based on the Spark operation you wish to perform.

Spark operation Privileges
Read - Single Mode Reader
Read – Force Distributed Mode Reader
Write – Queued Mode with CreateTableIfNotExist table create option Admin
Write – Queued Mode with FailIfNotExist table create option Ingestor
Write – TransactionalMode Admin

For more information on principal roles, see role-based access control. For managing security roles, see security roles management.

Spark sink: writing to Kusto

  1. Set up sink parameters:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Write Spark DataFrame to Kusto cluster as batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Or use the simplified syntax:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Write streaming data:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark source: reading from Kusto

  1. When reading small amounts of data, define the data query:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Optional: If you provide the transient blob storage (and not Kusto) the blobs are created under the caller's responsibility. This includes provisioning the storage, rotating access keys, and deleting transient artifacts. The KustoBlobStorageUtils module contains helper functions for deleting blobs based on either account and container coordinates and account credentials, or a full SAS URL with write, read, and list permissions. When the corresponding RDD is no longer needed, each transaction stores transient blob artifacts in a separate directory. This directory is captured as part of read-transaction information logs reported on the Spark Driver node.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    In the example above, the Key Vault isn't accessed using the connector interface; a simpler method of using the Databricks secrets is used.

  3. Read from Kusto.

    • If you provide the transient blob storage, read from Kusto as follows:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • If Kusto provides the transient blob storage, read from Kusto as follows:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)