次の方法で共有


Apache Spark 用の Azure Data Explorer コネクタ

Apache Spark は、"大規模なデータ処理のための統合された分析エンジン" です。 Azure Data Explorer は、大量のデータのリアルタイム分析を実現するフル マネージドの高速データ分析サービスです。

Spark 用の Kusto コネクタは、任意の Spark クラスターで実行できるオープンソース プロジェクトです。 Azure Data Explorer と Spark クラスター間でデータを移動するためのデータ ソースとデータ シンクを実装します。 Azure Data Explorer と Apache Spark を使用して、データ ドリブン シナリオをターゲットとする、高速でスケーラブルなアプリケーションを作成することができます。 たとえば、機械学習 (ML)、ETL (抽出 - 読み込み - 変換)、および Log Analytics などです。 このコネクタにより、Azure Data Explorer は、書き込み、読み取り、writeStream などの標準的な Spark のソースおよびシンク操作に有効なデータ ストアになります。

キューに登録されたインジェストまたはストリーミング インジェストを使用して、Azure Data Explorer に書き込むことができます。 Azure Data Explorer からの読み取りでは、列の排除と述語のプッシュダウンがサポートされています。これにより、Azure Data Explorer でデータがフィルター処理されるため、転送されるデータの量が減ります。

Note

Azure Data Explorer 用の Synapse Spark コネクタを使用する方法については、「Apache Spark for Azure Synapse Analytics を使用して Azure Data Explorer に接続する」を参照してください。

このトピックでは、Azure Data Explorer Spark コネクタをインストールして構成し、Azure Data Explorer と Apache Spark クラスター間でデータを移動する方法について説明します。

Note

下の例のいくつかでは Azure Databricks Spark クラスターが登場しますが、Azure Data Explorer Spark コネクタは、Databricks やその他の Spark ディストリビューションに直接依存しません。

前提条件

ヒント

Spark 2.3.x バージョンもサポートされていますが、pom.xml の依存関係の変更が必要になる場合があります。

Spark コネクタのビルド方法

バージョン 2.3.0 以降では、spark-kusto-connector に代わる新しい成果物 ID として、Spark 3.x と Scala 2.12 を対象とする kusto-spark_3.0_2.12 が導入されています。

Note

2.5.1 より前のバージョンでは、既存のテーブルへの取り込みは機能しなくなったため、新しいバージョンに更新してください。 このステップはオプションです。 Maven などの事前構築されたライブラリを使用する場合は、「Spark クラスターの設定」を参照してください。

構築の前提条件

  1. Spark コネクタの構築については、こちらのソースを参照してください。

  2. Maven プロジェクトの定義を使う Scala と Java のアプリケーションの場合は、アプリケーションを最新の成果物にリンクしてください。 最新の成果物は Maven Central にあります。

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. ビルド済みのライブラリを使っていない場合は、次の Kusto Java SDK ライブラリなど、「依存関係」に記載されているライブラリをインストールする必要があります。 インストールする適切なバージョンを確認するには、関連するリリースの POM を確認します。

    1. Jar をビルドして、すべてのテストの実行するには:

      mvn clean package -DskipTests
      
    2. Jar をビルドし、すべてのテストを実行し、ローカルの Maven リポジトリに jar をインストールするには:

      mvn clean install -DskipTests
      

詳細については、コネクタの使用に関するページを参照してください。

Spark クラスターの設定

Note

次の手順を実行するときは、最新の Kusto Spark コネクタ リリースを使うことをお勧めします。

  1. Azure Databricks クラスター Spark 3.0.1 と Scala 2.12 に基づいて、Spark クラスターの次の設定を構成します。

    Databricks クラスターの設定。

  2. Maven から最新の spark-kusto-connector ライブラリをインストールします。

    ライブラリをインポートする。Spark-Kusto-Connector を選択する。

  3. 必要なライブラリがすべてインストールされていることを確認します。

    ライブラリがインストールされていることを確認する。

  4. JAR ファイルを使うインストールの場合は、他の依存関係がインストールされたことを確認します。

    依存関係を追加する。

認証

Kusto Spark コネクタを使うと、次のいずれかの方法を使って、Microsoft Entra ID での認証を行うことができます。

Microsoft Entra アプリケーション認証

Microsoft Entra アプリケーション認証は、最も単純で最も一般的な認証方法であり、Kusto Spark コネクタにはこの方法をお勧めします。

  1. Azure CLI 経由で Azure サブスクリプションにサインインします。 次に、ブラウザーで認証します。

    az login
    
  2. プリンシパルをホストするサブスクリプションを選択します。 この手順は、複数のサブスクリプションがある場合に必要です。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. サービス プリンシパルを作成します。 この例では、サービス プリンシパルを my-service-principal と呼びます。

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 返された JSON データから、appIdpassword、および tenant を後で使用のためにコピーします。

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Microsoft Entra アプリケーションとサービス プリンシパルが作成されました。

Spark コネクタは、認証に次の Entra アプリ プロパティを使います。

プロパティ オプション文字列 説明
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra アプリケーション (クライアント) 識別子。
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra 認証機関。 Microsoft Entra ディレクトリ (テナント) ID。 (省略可能) 既定値は microsoft.com です。 詳細については、Microsoft Entra 機関に関する記事を参照してください。
KUSTO_AAD_APP_SECRET kustoAadAppSecret クライアントの Microsoft Entra アプリケーション キー。
KUSTO_ACCESS_TOKEN kustoAccessToken Kusto にアクセスできる accessToken を既に作成してある場合は、それをコネクタに渡して認証に使うこともできます。

Note

以前の API バージョン (2.0.0 未満) には、"kustoAADClientID"、"kustoClientAADClientPassword"、"kustoAADAuthorityID" という名前が付けられています。

Kusto の特権

実行する Spark 操作に基づいて、kusto 側で次の特権を許可します。

Spark の操作 特権
読み取り - 単一モード Reader
読み取り – 強制分散モード Reader
書き込み - CreateTableIfNotExist テーブル作成オプションを使用したキュー モード [Admin]
書き込み - FailIfNotExist テーブル作成オプションを使用したキュー モード 取り込み者
書き込み – TransactionalMode [Admin]

プリンシパルのロールについて詳しくは、ロールベースのアクセス制御に関する記事をご覧ください。 セキュリティ ロールの管理については、「security roles management」(セキュリティ ロールの管理) を参照してください。

Spark シンク: Kusto に書き込む

  1. シンクのパラメーターの設定:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Spark DataFrame を Kusto クラスターにバッチとして書き込みます。

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    または、簡略化された構文を使用します。

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. ストリーミング データを書き込む:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark ソース: Kusto から読み取る

  1. 少量のデータを読み込む場合は、データ クエリを定義します。

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 省略可能: (Kusto ではなく) ユーザーが一時的な BLOB ストレージを提供する場合は、作成される BLOB は呼び出し元の責任になります。 これには、ストレージのプロビジョニング、アクセス キーのローテーション、および一時的な成果物の削除が含まれます。 KustoBlobStorageUtils モジュールには、アカウントとコンテナーの座標およびアカウントの資格情報、または書き込み、読み取り、リストの権限を持つ完全な SAS URL のいずれかに基づいて、BLOB を削除するためのヘルパー関数が含まれています。 対応する RDD が不要になると、トランザクションごとに一時的な BLOB 成果物が別のディレクトリに格納されます。 このディレクトリは、Spark ドライバー ノードで報告される読み取りトランザクション情報ログの一部としてキャプチャされます。

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    上記の例では、コネクタ インターフェイスを使用して Key Vault にアクセスすることはできません。より簡単な Databricks シークレットを使用する方法が使用されます。

  3. Kusto から読み取ります。

    • ユーザーが一時的な BLOB ストレージを提供する場合は、次のように Kusto から読み取ります。

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Kusto が一時的な BLOB ストレージを提供する場合は、次のように Kusto から読み取ります。

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)