適用於 Apache Spark 的 Azure 數據總管連接器
Apache Spark 是用於進行大規模資料處理的整合分析引擎。 Azure 資料總管是快速、完全受控的資料分析服務,可即時分析大量資料流。
適用於 Spark 的 Kusto 連接器是可在任何 Spark 叢集上執行的開放原始碼專案。 它會實作資料來源和資料接收器,以在 Azure 資料總管和 Spark 叢集之間移動資料。 使用 Azure 資料總管和 Apache Spark,您可以建置以數據驅動案例為目標的快速且可調整的應用程式。 例如,機器學習服務 (ML)、擷取-轉換-載入 (ETL) 和 Log Analytics。 透過連接器,Azure 資料總管會成為標準 Spark 來源和接收作業的有效資料存放區,例如寫入、讀取和 writeStream。
您可以透過佇列擷取或串流擷取寫入 Azure 資料總管。 從 Azure 數據總管讀取支援數據行剪除和述詞下推,以篩選 Azure 數據總管中的數據,減少已傳輸的數據量。
注意
如需使用適用於 Azure 數據總管的 Synapse Spark 連接器的詳細資訊,請參閱 使用適用於 Azure Synapse Analytics 的 Apache Spark 連線到 Azure 數據總管。
本主題描述如何安裝和設定 Azure 數據總管 Spark 連接器,以及在 Azure 數據總管和 Apache Spark 叢集之間行動數據。
注意
雖然下列一些範例參考 Azure Databricks Spark 叢集,但 Azure 數據總管 Spark 連接器不會直接相依於 Databricks 或任何其他 Spark 散發。
必要條件
- Azure 訂用帳戶。 建立免費的 Azure 帳戶。
- Azure 資料總管叢集和資料庫。 建立叢集和資料庫。
- Spark 叢集
- 安裝連接器程式庫:
- Maven 3.x 已安裝
提示
Spark 2.3.x 版本也受到支援,但可能需要變更pom.xml相依性。
如何建置 Spark 連接器
從 2.3.0 版開始,我們引進了新成品識別碼以取代 spark-kusto-connector:kusto-spark_3.0_2.12,以 Spark 3.x 和 Scala 2.12 為目標。
注意
2.5.1 之前的版本無法再用於擷取至現有資料表,請更新為較新的版本。 此為選用步驟。 如果您使用預先建置的程式庫,例如 Maven,請參閱 Spark 叢集設定。
建置必要條件
請參閱此來源以建置 Spark 連接器。
針對使用 Maven 專案定義的 Scala/Java 應用程式,請使用最新的成品連結您的應用程式。 在 Maven Central 上尋找最新的成品。
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
如果您未使用預先建置的程式庫,您必須安裝相依性中列出的程式庫,包括下列 Kusto Java SDK 程式庫。 若要尋找要安裝的正確版本,請查看相關版本的 pom:
若要建置 jar 並執行所有測試:
mvn clean package -DskipTests
若要建置 jar、執行所有測試,並將 jar 安裝到本機 Maven 存放庫:
mvn clean install -DskipTests
如需詳細資訊,請參閱連接器使用方式。
Spark 叢集設定
注意
建議在執行下列步驟時使用最新的 Kusto Spark 連接器版本。
根據 Azure Databricks 叢集 Spark 3.0.1 和 Scala 2.12 設定下列 Spark 叢集設定:
從 Maven 安裝最新的 spark-kusto-connector 程式庫:
驗證已安裝所有必要的程式庫:
若要使用 JAR 檔案進行安裝,請驗證已安裝其他相依性:
驗證
Kusto Spark 連接器可讓您使用下列其中一種方法,向 Microsoft Entra ID 進行驗證:
- Microsoft Entra 應用程式
- Microsoft Entra 存取權杖
- 裝置驗證 (適用於非生產案例)
- Azure 金鑰保存庫 若要存取金鑰保存庫資源,請安裝 azure-keyvault 套件並提供應用程式認證。
Microsoft Entra 應用程式驗證
Microsoft Entra 應用程式驗證是最簡單且最常見的驗證方法,建議用於 Kusto Spark 連接器。
透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。
az login
選擇用來託管主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。
az account set --subscription YOUR_SUBSCRIPTION_GUID
建立服務主體。 在此範例中,服務主體稱為
my-service-principal
。az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
從傳回的 JSON 資料中,複製
appId
、password
和tenant
供日後使用。{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
您已建立您的 Microsoft Entra 應用程式和服務主體。
Spark 連接器會使用下列 Entra 應用程式屬性進行驗證:
屬性 | 選項字串 | 描述 |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra 應用程式 (用戶端) 識別碼。 |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra 驗證授權單位。 Microsoft Entra 租用戶 (目錄) 識別碼。 選用 - 預設為 microsoft.com。 如需詳細資訊,請參閱 Microsoft Entra 授權單位。 |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra 應用程式用戶端金鑰。 |
KUSTO_ACCESS_TOKEN | kustoAccessToken | 如果您已經有使用 Kusto 的存取權建立的 accessToken,則可以用來傳遞至連接器以及進行驗證。 |
注意
舊版 API 版本 (小於 2.0.0) 具有下列命名:"kustoAADClientID"、"kustoClientAADClientPassword"、"kustoAADAuthorityID"
Kusto 權限
根據您想要執行的 Spark 作業,授與 Kusto 端下列權限。
Spark 作業 | 權限 |
---|---|
讀取 - 單一模式 | 讀取者 |
讀取 - 強制分散式模式 | 讀取者 |
寫入 - 具有 CreateTableIfNotExist 資料表建立選項的佇列模式 | 管理員 |
寫入 - 具有 FailIfNotExist 資料表建立選項的佇列模式 | 擷取器 |
寫入 - TransactionalMode | 管理員 |
如需主體角色的詳細資訊,請參閱角色型存取控制。 如需管理安全性角色,請參閱安全性角色管理。
Spark 接收器:寫入 Kusto
設定接收器參數:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
以批次形式將 Spark DataFrame 寫入 Kusto 叢集:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
或使用簡化的語法:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
寫入串流資料:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark 來源:從 Kusto 讀取
讀取少量資料時,定義資料查詢:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
選用:如果您提供暫時性 Blob 儲存體 (而非 Kusto),則會在呼叫者的責任下建立 Blob。 這包括佈建儲存體、輪替存取金鑰,以及刪除暫時性成品。 KustoBlobStorageUtils 模組包含協助程式函式,用於根據帳戶和容器座標和帳戶認證,或具有寫入、讀取和列出權限的完整 SAS URL 來刪除 Blob。 不再需要對應的 RDD 時,每個交易都會將暫時性 Blob 成品儲存在個別的目錄中。 此目錄會擷取為 Spark 驅動程式節點上所報告讀取交易資訊記錄的一部分。
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
在上述範例中,不會使用連接器介面存取金鑰保存庫;會使用運用 Databricks 秘密這種更為簡單的方法。
從 Kusto 讀取。
如果您提供暫時性 Blob 儲存體,請從 Kusto 讀取,如下所示:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
如果 Kusto 提供暫時性 Blob 儲存體,請從 Kusto 讀取,如下所示:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)