共用方式為


適用於 Apache Spark 的 Azure 數據總管連接器

Apache Spark 是用於進行大規模資料處理的整合分析引擎。 Azure 資料總管是快速、完全受控的資料分析服務,可即時分析大量資料流。

適用於 Spark 的 Kusto 連接器是可在任何 Spark 叢集上執行的開放原始碼專案。 它會實作資料來源和資料接收器,以在 Azure 資料總管和 Spark 叢集之間移動資料。 使用 Azure 資料總管和 Apache Spark,您可以建置以數據驅動案例為目標的快速且可調整的應用程式。 例如,機器學習服務 (ML)、擷取-轉換-載入 (ETL) 和 Log Analytics。 透過連接器,Azure 資料總管會成為標準 Spark 來源和接收作業的有效資料存放區,例如寫入、讀取和 writeStream。

您可以透過佇列擷取或串流擷取寫入 Azure 資料總管。 從 Azure 數據總管讀取支援數據行剪除和述詞下推,以篩選 Azure 數據總管中的數據,減少已傳輸的數據量。

注意

如需使用適用於 Azure 數據總管的 Synapse Spark 連接器的詳細資訊,請參閱 使用適用於 Azure Synapse Analytics 的 Apache Spark 連線到 Azure 數據總管

本主題描述如何安裝和設定 Azure 數據總管 Spark 連接器,以及在 Azure 數據總管和 Apache Spark 叢集之間行動數據。

注意

雖然下列一些範例參考 Azure Databricks Spark 叢集,但 Azure 數據總管 Spark 連接器不會直接相依於 Databricks 或任何其他 Spark 散發。

必要條件

提示

Spark 2.3.x 版本也受到支援,但可能需要變更pom.xml相依性。

如何建置 Spark 連接器

從 2.3.0 版開始,我們引進了新成品識別碼以取代 spark-kusto-connector:kusto-spark_3.0_2.12,以 Spark 3.x 和 Scala 2.12 為目標。

注意

2.5.1 之前的版本無法再用於擷取至現有資料表,請更新為較新的版本。 此為選用步驟。 如果您使用預先建置的程式庫,例如 Maven,請參閱 Spark 叢集設定

建置必要條件

  1. 請參閱此來源以建置 Spark 連接器。

  2. 針對使用 Maven 專案定義的 Scala/Java 應用程式,請使用最新的成品連結您的應用程式。 在 Maven Central 上尋找最新的成品。

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. 如果您未使用預先建置的程式庫,您必須安裝相依性中列出的程式庫,包括下列 Kusto Java SDK 程式庫。 若要尋找要安裝的正確版本,請查看相關版本的 pom

    1. 若要建置 jar 並執行所有測試:

      mvn clean package -DskipTests
      
    2. 若要建置 jar、執行所有測試,並將 jar 安裝到本機 Maven 存放庫:

      mvn clean install -DskipTests
      

如需詳細資訊,請參閱連接器使用方式

Spark 叢集設定

注意

建議在執行下列步驟時使用最新的 Kusto Spark 連接器版本。

  1. 根據 Azure Databricks 叢集 Spark 3.0.1 和 Scala 2.12 設定下列 Spark 叢集設定:

    Databricks 叢集設定。

  2. 從 Maven 安裝最新的 spark-kusto-connector 程式庫:

    匯入程式庫。選取 Spark-Kusto-Connector。

  3. 驗證已安裝所有必要的程式庫:

    驗證已安裝程式庫。

  4. 若要使用 JAR 檔案進行安裝,請驗證已安裝其他相依性:

    新增相依性。

驗證

Kusto Spark 連接器可讓您使用下列其中一種方法,向 Microsoft Entra ID 進行驗證:

Microsoft Entra 應用程式驗證

Microsoft Entra 應用程式驗證是最簡單且最常見的驗證方法,建議用於 Kusto Spark 連接器。

  1. 透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。

    az login
    
  2. 選擇用來託管主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 建立服務主體。 在此範例中,服務主體稱為 my-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 從傳回的 JSON 資料中,複製 appIdpasswordtenant 供日後使用。

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

您已建立您的 Microsoft Entra 應用程式和服務主體。

Spark 連接器會使用下列 Entra 應用程式屬性進行驗證:

屬性 選項字串 描述
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra 應用程式 (用戶端) 識別碼。
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra 驗證授權單位。 Microsoft Entra 租用戶 (目錄) 識別碼。 選用 - 預設為 microsoft.com。 如需詳細資訊,請參閱 Microsoft Entra 授權單位
KUSTO_AAD_APP_SECRET kustoAadAppSecret Microsoft Entra 應用程式用戶端金鑰。
KUSTO_ACCESS_TOKEN kustoAccessToken 如果您已經有使用 Kusto 的存取權建立的 accessToken,則可以用來傳遞至連接器以及進行驗證。

注意

舊版 API 版本 (小於 2.0.0) 具有下列命名:"kustoAADClientID"、"kustoClientAADClientPassword"、"kustoAADAuthorityID"

Kusto 權限

根據您想要執行的 Spark 作業,授與 Kusto 端下列權限。

Spark 作業 權限
讀取 - 單一模式 讀取者
讀取 - 強制分散式模式 讀取者
寫入 - 具有 CreateTableIfNotExist 資料表建立選項的佇列模式 管理員
寫入 - 具有 FailIfNotExist 資料表建立選項的佇列模式 擷取器
寫入 - TransactionalMode 管理員

如需主體角色的詳細資訊,請參閱角色型存取控制。 如需管理安全性角色,請參閱安全性角色管理

Spark 接收器:寫入 Kusto

  1. 設定接收器參數:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. 以批次形式將 Spark DataFrame 寫入 Kusto 叢集:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    或使用簡化的語法:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. 寫入串流資料:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark 來源:從 Kusto 讀取

  1. 讀取少量資料時,定義資料查詢:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 選用:如果提供暫時性 Blob 儲存體 (而非 Kusto),則會在呼叫者的責任下建立 Blob。 這包括佈建儲存體、輪替存取金鑰,以及刪除暫時性成品。 KustoBlobStorageUtils 模組包含協助程式函式,用於根據帳戶和容器座標和帳戶認證,或具有寫入、讀取和列出權限的完整 SAS URL 來刪除 Blob。 不再需要對應的 RDD 時,每個交易都會將暫時性 Blob 成品儲存在個別的目錄中。 此目錄會擷取為 Spark 驅動程式節點上所報告讀取交易資訊記錄的一部分。

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    在上述範例中,不會使用連接器介面存取金鑰保存庫;會使用運用 Databricks 秘密這種更為簡單的方法。

  3. 從 Kusto 讀取。

    • 如果提供暫時性 Blob 儲存體,請從 Kusto 讀取,如下所示:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • 如果 Kusto 提供暫時性 Blob 儲存體,請從 Kusto 讀取,如下所示:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)