Konektor Azure Data Exploreru pro Apache Spark
Apache Spark je jednotný analytický modul pro zpracování velkých objemů dat. Azure Data Explorer je rychlá, plně spravovaná služba analýzy dat pro analýzy velkých objemů dat v reálném čase.
Konektor Kusto pro Spark je opensourcový projekt , který se dá spustit v jakémkoli clusteru Spark. Implementuje zdroj dat a jímku dat pro přesun dat mezi Azure Data Explorerem a clustery Spark. Pomocí Azure Data Exploreru a Apache Sparku můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty. Například strojové učení (ML), extrakce transformace a načítání (ETL) a Log Analytics. S konektorem se Azure Data Explorer stane platným úložištěm dat pro standardní operace zdroje Sparku a jímky, jako je zápis, čtení a zápisStream.
Do Azure Data Exploreru můžete zapisovat prostřednictvím příjmu dat ve frontě nebo příjmu dat streamování. Čtení z Azure Data Exploreru podporuje vyřazení sloupců a predikát, které filtruje data v Azure Data Exploreru a snižuje objem přenášených dat.
Poznámka:
Informace o práci s konektorem Synapse Spark pro Azure Data Explorer najdete v tématu Připojení k Azure Data Exploreru pomocí Apache Sparku pro Azure Synapse Analytics.
Toto téma popisuje, jak nainstalovat a nakonfigurovat konektor Spark Azure Data Exploreru a přesouvat data mezi Azure Data Explorerem a clustery Apache Spark.
Poznámka:
I když některé z níže uvedených příkladů odkazují na cluster Azure Databricks Spark, konektor Spark v Azure Data Exploreru nepřebírají přímé závislosti na Databricks ani v žádné jiné distribuci Sparku.
Požadavky
- Předplatné Azure. Vytvořte bezplatný účet Azure.
- Cluster a databáze Azure Data Exploreru. Vytvořte cluster a databázi.
- Cluster Spark
- Instalace knihovny konektorů:
- Předem vytvořené knihovny pro Spark 2.4+Scala 2.11 nebo Spark 3+scala 2.12
- Úložiště Maven
- Nainstalovaný Maven 3.x
Tip
Podporují se také verze Sparku 2.3.x, ale můžou vyžadovat určité změny v pom.xml závislostech.
Postup sestavení konektoru Spark
Počínaje verzí 2.3.0 zavádíme nové ID artefaktů, které nahrazují spark-kusto-connector: kusto-spark_3.0_2.12 , které cílí na Spark 3.x a Scala 2.12.
Poznámka:
Verze starší než 2.5.1 už nefungují pro příjem do existující tabulky, aktualizujte prosím na novější verzi. Tento krok je nepovinný. Pokud například používáte předem vytvořené knihovny, podívejte se na nastavení clusteru Spark.
Požadavky na sestavení
Informace o vytvoření konektoru Spark najdete v tomto zdroji .
V případě aplikací Scala/Java využívajících definice projektu Maven propojte aplikaci s nejnovějším artefaktem. Najděte nejnovější artefakt v Centru Mavenu.
For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
Pokud nepoužíváte předem připravené knihovny, musíte nainstalovat knihovny uvedené v závislostech , včetně následujících knihoven sady Kusto Java SDK . Správnou verzi, která se má nainstalovat, najdete v pom příslušné verze:
Sestavení jar a spuštění všech testů:
mvn clean package -DskipTests
Pokud chcete sestavit soubor JAR, spusťte všechny testy a nainstalujte soubor JAR do místního úložiště Maven:
mvn clean install -DskipTests
Další informace najdete v tématu Využití konektoru.
Nastavení clusteru Spark
Poznámka:
Při provádění následujících kroků doporučujeme použít nejnovější verzi konektoru Kusto Spark.
Nakonfigurujte následující nastavení clusteru Spark na základě clusteru Azure Databricks Spark 3.0.1 a Scala 2.12:
Nainstalujte nejnovější knihovnu spark-kusto-connector z Mavenu:
Ověřte, že jsou nainstalované všechny požadované knihovny:
V případě instalace pomocí souboru JAR ověřte, že byly nainstalovány další závislosti:
Ověřování
Konektor Kusto Spark umožňuje ověřování pomocí ID Microsoft Entra pomocí jedné z následujících metod:
- Aplikace Microsoft Entra
- Přístupový token Microsoft Entra
- Ověřování zařízení (pro neprodukční scénáře)
- Azure Key Vault pro přístup k prostředku služby Key Vault, nainstalujte balíček azure-keyvault a zadejte přihlašovací údaje aplikace.
Ověřování aplikací Microsoft Entra
Ověřování aplikací Microsoft Entra je nejjednodušší a nejběžnější metodou ověřování a doporučuje se pro konektor Kusto Spark.
Přihlaste se ke svému předplatnému Azure prostřednictvím Azure CLI. Pak se ověřte v prohlížeči.
az login
Zvolte předplatné, které má být hostitelem objektu zabezpečení. Tento krok je potřeba v případě, že máte více předplatných.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Vytvořte instanční objekt. V tomto příkladu se instanční objekt nazývá
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z vrácených dat JSON zkopírujte
appId
password
tenant
a pro budoucí použití.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Vytvořili jste aplikaci Microsoft Entra a instanční objekt.
Konektor Spark používá k ověřování následující vlastnosti aplikace Entra:
Vlastnosti | Řetězec možnosti | Popis |
---|---|---|
KUSTO_AAD_APP_ID | KustoAadAppId | Identifikátor aplikace Microsoft Entra (klient). |
KUSTO_AAD_AUTHORITY_ID | KustoAadAuthorityID | Ověřovací autorita Microsoft Entra. ID adresáře Microsoft Entra (tenanta). Volitelné – výchozí hodnota je microsoft.com. Další informace naleznete v tématu Microsoft Entra authority. |
KUSTO_AAD_APP_SECRET | KustoAadAppSecret | Klíč aplikace Microsoft Entra pro klienta |
KUSTO_ACCESS_TOKEN | KustoAccessToken | Pokud už máte accessToken, který je vytvořený s přístupem k Kusto, můžete ho použít i pro ověřování konektoru. |
Poznámka:
Starší verze rozhraní API (méně než 2.0.0) mají následující názvy: kustoAADClientID, kustoClientAADClientPassword, KustoAADAuthorityID.
Oprávnění Kusto
Na straně Kusto udělte následující oprávnění na základě operace Sparku, kterou chcete provést.
Operace Sparku | Oprávnění |
---|---|
Čtení – jeden režim | Čtenář |
Čtení – vynucení distribuovaného režimu | Čtenář |
Zápis – režim ve frontě s možností CreateTableIfNotExist table create | Správce |
Zápis – režim ve frontě s možností vytvoření tabulky FailIfNotExist | Ingestor |
Zápis – TransactionalMode | Správce |
Další informace o hlavních rolích najdete v tématu Řízení přístupu na základě role. Informace o správě rolí zabezpečení najdete v tématu Správa rolí zabezpečení.
Jímka Sparku: zápis do Kusto
Nastavení parametrů jímky:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
Zapište datový rámec Sparku do clusteru Kusto jako dávku:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
Nebo použijte zjednodušenou syntaxi:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
Zápis streamovaných dat:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Zdroj Sparku: čtení z Kusto
Při čtení malých objemů dat definujte datový dotaz:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
Volitelné: Pokud poskytnete přechodné úložiště objektů blob (a ne Kusto), objekty blob se vytvoří v rámci odpovědnosti volajícího. To zahrnuje zřízení úložiště, obměnu přístupových klíčů a odstranění přechodných artefaktů. Modul KustoBlobStorageUtils obsahuje pomocné funkce pro odstraňování objektů blob na základě souřadnic účtu a kontejneru a přihlašovacích údajů účtu nebo úplné adresy URL SAS s oprávněními k zápisu, čtení a seznamu. Pokud už odpovídající sada RDD není nutná, každá transakce ukládá přechodné artefakty objektů blob do samostatného adresáře. Tento adresář se zaznamenává jako součást protokolů informací o transakcích pro čtení hlášených v uzlu ovladače Sparku.
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
V předchozím příkladu není služba Key Vault přístupná pomocí rozhraní konektoru; Používá se jednodušší metoda použití tajných kódů Databricks.
Přečtěte si z Kusto.
Pokud poskytnete přechodné úložiště objektů blob, načtěte z Kusto následujícím způsobem:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
Pokud Kusto poskytuje přechodné úložiště objektů blob, načtěte z Kusto následujícím způsobem:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)