Sdílet prostřednictvím


Konektor Azure Data Exploreru pro Apache Spark

Apache Spark je jednotný analytický modul pro zpracování velkých objemů dat. Azure Data Explorer je rychlá, plně spravovaná služba analýzy dat pro analýzy velkých objemů dat v reálném čase.

Konektor Kusto pro Spark je opensourcový projekt , který se dá spustit v jakémkoli clusteru Spark. Implementuje zdroj dat a jímku dat pro přesun dat mezi Azure Data Explorerem a clustery Spark. Pomocí Azure Data Exploreru a Apache Sparku můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty. Například strojové učení (ML), extrakce transformace a načítání (ETL) a Log Analytics. S konektorem se Azure Data Explorer stane platným úložištěm dat pro standardní operace zdroje Sparku a jímky, jako je zápis, čtení a zápisStream.

Do Azure Data Exploreru můžete zapisovat prostřednictvím příjmu dat ve frontě nebo příjmu dat streamování. Čtení z Azure Data Exploreru podporuje vyřazení sloupců a predikát, které filtruje data v Azure Data Exploreru a snižuje objem přenášených dat.

Poznámka:

Informace o práci s konektorem Synapse Spark pro Azure Data Explorer najdete v tématu Připojení k Azure Data Exploreru pomocí Apache Sparku pro Azure Synapse Analytics.

Toto téma popisuje, jak nainstalovat a nakonfigurovat konektor Spark Azure Data Exploreru a přesouvat data mezi Azure Data Explorerem a clustery Apache Spark.

Poznámka:

I když některé z níže uvedených příkladů odkazují na cluster Azure Databricks Spark, konektor Spark v Azure Data Exploreru nepřebírají přímé závislosti na Databricks ani v žádné jiné distribuci Sparku.

Požadavky

Tip

Podporují se také verze Sparku 2.3.x, ale můžou vyžadovat určité změny v pom.xml závislostech.

Postup sestavení konektoru Spark

Počínaje verzí 2.3.0 zavádíme nové ID artefaktů, které nahrazují spark-kusto-connector: kusto-spark_3.0_2.12 , které cílí na Spark 3.x a Scala 2.12.

Poznámka:

Verze starší než 2.5.1 už nefungují pro příjem do existující tabulky, aktualizujte prosím na novější verzi. Tento krok je nepovinný. Pokud například používáte předem vytvořené knihovny, podívejte se na nastavení clusteru Spark.

Požadavky na sestavení

  1. Informace o vytvoření konektoru Spark najdete v tomto zdroji .

  2. V případě aplikací Scala/Java využívajících definice projektu Maven propojte aplikaci s nejnovějším artefaktem. Najděte nejnovější artefakt v Centru Mavenu.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Pokud nepoužíváte předem připravené knihovny, musíte nainstalovat knihovny uvedené v závislostech , včetně následujících knihoven sady Kusto Java SDK . Správnou verzi, která se má nainstalovat, najdete v pom příslušné verze:

    1. Sestavení jar a spuštění všech testů:

      mvn clean package -DskipTests
      
    2. Pokud chcete sestavit soubor JAR, spusťte všechny testy a nainstalujte soubor JAR do místního úložiště Maven:

      mvn clean install -DskipTests
      

Další informace najdete v tématu Využití konektoru.

Nastavení clusteru Spark

Poznámka:

Při provádění následujících kroků doporučujeme použít nejnovější verzi konektoru Kusto Spark.

  1. Nakonfigurujte následující nastavení clusteru Spark na základě clusteru Azure Databricks Spark 3.0.1 a Scala 2.12:

    Nastavení clusteru Databricks

  2. Nainstalujte nejnovější knihovnu spark-kusto-connector z Mavenu:

    Import knihovenVyberte Spark-Kusto-Connector.

  3. Ověřte, že jsou nainstalované všechny požadované knihovny:

    Ověřte nainstalované knihovny.

  4. V případě instalace pomocí souboru JAR ověřte, že byly nainstalovány další závislosti:

    Přidejte závislosti.

Ověřování

Konektor Kusto Spark umožňuje ověřování pomocí ID Microsoft Entra pomocí jedné z následujících metod:

Ověřování aplikací Microsoft Entra

Ověřování aplikací Microsoft Entra je nejjednodušší a nejběžnější metodou ověřování a doporučuje se pro konektor Kusto Spark.

  1. Přihlaste se ke svému předplatnému Azure prostřednictvím Azure CLI. Pak se ověřte v prohlížeči.

    az login
    
  2. Zvolte předplatné, které má být hostitelem objektu zabezpečení. Tento krok je potřeba v případě, že máte více předplatných.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Vytvořte instanční objekt. V tomto příkladu se instanční objekt nazývá my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z vrácených dat JSON zkopírujte appIdpasswordtenant a pro budoucí použití.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Vytvořili jste aplikaci Microsoft Entra a instanční objekt.

Konektor Spark používá k ověřování následující vlastnosti aplikace Entra:

Vlastnosti Řetězec možnosti Popis
KUSTO_AAD_APP_ID KustoAadAppId Identifikátor aplikace Microsoft Entra (klient).
KUSTO_AAD_AUTHORITY_ID KustoAadAuthorityID Ověřovací autorita Microsoft Entra. ID adresáře Microsoft Entra (tenanta). Volitelné – výchozí hodnota je microsoft.com. Další informace naleznete v tématu Microsoft Entra authority.
KUSTO_AAD_APP_SECRET KustoAadAppSecret Klíč aplikace Microsoft Entra pro klienta
KUSTO_ACCESS_TOKEN KustoAccessToken Pokud už máte accessToken, který je vytvořený s přístupem k Kusto, můžete ho použít i pro ověřování konektoru.

Poznámka:

Starší verze rozhraní API (méně než 2.0.0) mají následující názvy: kustoAADClientID, kustoClientAADClientPassword, KustoAADAuthorityID.

Oprávnění Kusto

Na straně Kusto udělte následující oprávnění na základě operace Sparku, kterou chcete provést.

Operace Sparku Oprávnění
Čtení – jeden režim Čtenář
Čtení – vynucení distribuovaného režimu Čtenář
Zápis – režim ve frontě s možností CreateTableIfNotExist table create Správce
Zápis – režim ve frontě s možností vytvoření tabulky FailIfNotExist Ingestor
Zápis – TransactionalMode Správce

Další informace o hlavních rolích najdete v tématu Řízení přístupu na základě role. Informace o správě rolí zabezpečení najdete v tématu Správa rolí zabezpečení.

Jímka Sparku: zápis do Kusto

  1. Nastavení parametrů jímky:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Zapište datový rámec Sparku do clusteru Kusto jako dávku:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Nebo použijte zjednodušenou syntaxi:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Zápis streamovaných dat:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Zdroj Sparku: čtení z Kusto

  1. Při čtení malých objemů dat definujte datový dotaz:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Volitelné: Pokud poskytnete přechodné úložiště objektů blob (a ne Kusto), objekty blob se vytvoří v rámci odpovědnosti volajícího. To zahrnuje zřízení úložiště, obměnu přístupových klíčů a odstranění přechodných artefaktů. Modul KustoBlobStorageUtils obsahuje pomocné funkce pro odstraňování objektů blob na základě souřadnic účtu a kontejneru a přihlašovacích údajů účtu nebo úplné adresy URL SAS s oprávněními k zápisu, čtení a seznamu. Pokud už odpovídající sada RDD není nutná, každá transakce ukládá přechodné artefakty objektů blob do samostatného adresáře. Tento adresář se zaznamenává jako součást protokolů informací o transakcích pro čtení hlášených v uzlu ovladače Sparku.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    V předchozím příkladu není služba Key Vault přístupná pomocí rozhraní konektoru; Používá se jednodušší metoda použití tajných kódů Databricks.

  3. Přečtěte si z Kusto.

    • Pokud poskytnete přechodné úložiště objektů blob, načtěte z Kusto následujícím způsobem:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Pokud Kusto poskytuje přechodné úložiště objektů blob, načtěte z Kusto následujícím způsobem:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)