从 Apache Spark 获取数据

Apache Spark 是用于大规模数据处理的统一分析引擎。

适用于 Spark 的 Kusto 连接器是可在任何 Spark 群集上运行的开放源代码项目。 它实现了用于跨 Azure 数据资源管理器和 Spark 群集移动数据的数据源和数据接收器。 使用 Eventhouse 和 Apache Spark,可以构建面向数据驱动的应用场景的可缩放快速应用程序。 例如,机器学习 (ML)、提取-转换-加载 (ETL) 和 Log Analytics。 有了此连接器,Eventhouse 变成了标准 Spark 源和接收器操作(例如 write、read 和 writeStream)的有效数据存储。

可以通过排队引入或流式引入写入 Eventhouse。 从 Eventhouse 中读取支持列删除和谓词下推,这可在 Eventhouse 中筛选数据,从而减少传输的数据量。

本文介绍了如何安装和配置 Spark 连接器,以及如何在 Eventhouse 和 Apache Spark 群集之间移动数据。

注意

尽管下面的某些示例提到了 Azure Databricks Spark 群集,但 Spark 连接器并不直接依赖于 Databricks 或任何其他 Spark 分发版。

先决条件

提示

Spark 2.3.x 版本也是受支持的,但可能需要在 pom.xml 依赖项中进行一些更改。

如何生成 Spark 连接器

从版本 2.3.0 开始,我们引入了新的项目 ID,替换 spark-kusto-connector:针对 Spark 3.x 和 Scala 2.12 的 kusto-spark_3.0_2.12

注意

2\.5.1 以前的版本无法再用于引入到某个现有表,请更新到更高的版本。 此步骤是可选的。 如果使用的是预生成库(例如 Maven),请参阅 Spark 群集设置

生成先决条件

  1. 参考此源来生成 Spark 连接器。

  2. 对于使用 Maven 项目定义的 Scala 和 Java 应用程序,将应用程序与最新项目链接。 在 Maven Central 上查找最新项目。

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. 如果使用的不是预生成库,则需要安装依赖项中列出的库,包括以下 Kusto Java SDK 库。 若要查找要安装的正确版本,请查看相关版本的 pom

    1. 生成 jar 并运行所有测试:

      mvn clean package -DskipTests
      
    2. 生成 jar,运行所有测试,并将 jar 安装到本地 Maven 存储库:

      mvn clean install -DskipTests
      

有关详细信息,请参阅连接器用法

Spark 群集设置

注意

建议使用最新的 Kusto Spark 连接器版本执行以下步骤。

  1. 根据 Azure Databricks 群集 Spark 3.0.1 和 Scala 2.12,配置以下 Spark 群集设置:

    Databricks 群集设置。

  2. 从 Maven 安装最新 spark-kusto-connector 库:

    导入库。选择 Spark-Kusto-Connector。

  3. 验证是否已安装所有必需的库:

    验证是否已安装库。

  4. 使用 JAR 文件进行安装时,请验证是否已安装其他依赖项:

    添加依赖项。

身份验证

使用 Kusto Spark 连接器,可以通过下列其中一个方法使用 Microsoft Entra ID 进行身份验证:

Microsoft Entra 应用程序身份验证

Microsoft Entra 应用程序身份验证是最简单且最常用的身份验证方法,建议将其用于 Kusto Spark 连接器。

  1. 通过 Azure CLI 登录到你的 Azure 订阅。 然后在浏览器中进行身份验证。

    az login
    
  2. 选择要托管主体的订阅。 当你有多个订阅时,此步骤是必需的。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 创建服务主体。 在此示例中,服务主体名为 my-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 从返回的 JSON 数据中复制 appIdpasswordtenant 供将来使用。

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

现已创建了 Microsoft Entra 应用程序和服务主体。

Spark 连接器使用以下 Entra 应用属性进行身份验证:

属性 选项字符串 说明
KUSTO_AAD_APP_ID kustoAadAppId Microsoft Entra 应用程序(客户端)标识符。
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Microsoft Entra 身份验证机构。 Microsoft Entra Directory (tenant) ID。 可选 - 默认为 microsoft.com。 有关详细信息,请参阅 Microsoft Entra 颁发机构
KUSTO_AAD_APP_SECRET kustoAadAppSecret 客户端的 Microsoft Entra 应用程序密钥。
KUSTO_ACCESS_TOKEN kustoAccessToken 如果已创建 accessToken 且有权访问 Kusto,则可以将其用于连接器以及身份验证。

注意

较旧的 API 版本(低于 2.0.0)有以下命名:“kustoAADClientID”、“kustoClientAADClientPassword”、“kustoAADAuthorityID”

Kusto 权限

根据要执行的 Spark 操作,在 Kusto 端授予以下权限。

Spark 操作 权限
读取 - 单一模式 读者
读取 – 强制分布式模式 读者
写入 - 包含 CreateTableIfNotExist 表创建选项的排队模式 管理员
写入 - 包含 FailIfNotExist 表创建选项的排队模式 引入者
写入 – TransactionalMode 管理员

有关主体角色的详细信息,请参阅“基于角色的访问控制”。 有关如何管理安全角色,请参阅安全角色管理

Spark 接收器:写入 Kusto

  1. 设置接收器参数:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. 将 Spark 数据帧分批写入 Kusto 群集:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    或者使用简化的语法:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. 写入流数据:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Spark 源:从 Kusto 读取

  1. 读取少量数据时,可以定义数据查询:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 可选:如果提供暂时性 Blob 存储(而不是 Kusto),则由调用方负责创建 Blob。 这包括预配存储、轮换访问密钥以及删除暂时性项目。 KustoBlobStorageUtils 模块包含用于以下用途的帮助程序函数:基于帐户和容器坐标以及帐户凭据(或基于具有写入、读取和列出权限的完整 SAS URL)删除 Blob。 当不再需要相应的 RDD 时,每个事务会将暂时性 blob 项目存储在一个单独的目录中。 此目录是作为 Spark 驱动程序节点上报告的读取事务信息日志的一部分捕获的。

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    在上面的示例中,无法使用连接器接口访问 Key Vault;使用了一种更简单的方法,即使用 Databricks 机密。

  3. 从 Kusto 读取。

    • 如果提供暂时性 blob 存储,请如下所示从 Kusto 读取:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • 如果 Kusto 提供暂时性 blob 存储,请如下所示从 Kusto 读取:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)