Microsoft Fabric Synapse 数据仓库的 Spark 连接器

Synapse 数据仓库的 Spark 连接器让 Spark 开发人员和数据科学家能够访问和处理仓库和湖屋 SQL 分析端点中的数据。 该连接器提供以下功能:

  • 可以从同一工作区或跨多个工作区处理仓库或 SQL 分析端点中的数据。
  • Lakehouse 的 SQL 分析端点根据工作区上下文自动发现。
  • 该连接器提供简化的 Spark API,抽象化底层复杂性,并且只需一行代码即可操作。
  • 访问表或视图时,该连接器支持在 SQL 引擎级别定义的安全模型。 这些模型包括对象级安全性 (OLS)、行级别安全性 (RLS) 和列级别安全性 (CLS)。
  • 该连接器预安装在 Fabric 运行时中,无需单独安装。

注意

该连接器目前提供预览版。 有关详细信息,请参阅本文后面的当前限制

身份验证

Microsoft Entra 身份验证是一种集成身份验证方法。 用户登录到 Microsoft Fabric 工作区,其凭证会自动传递到 SQL 引擎进行身份验证和授权。 凭证是自动映射的,用户不需要提供特定的配置选项。

权限

要连接到 SQL 引擎,用户至少需要仓库或 SQL 分析端点(项目级别)上的读取权限(类似于 SQL Server 中的 CONNECT 权限)。 用户还需要精细的对象级权限才能从特定表或视图读取数据。 若要了解详细信息,请参阅 Microsoft Fabric 中数据仓库的安全性

代码模板和示例

使用方法签名

以下命令显示读取请求的 synapsesql 方法签名。 从仓库和湖屋的 SQL 分析端点访问表或视图需要三部分 tableName 参数。 根据应用场景使用以下名称更新参数:

  • 第 1 部分:仓库或湖屋的名称。
  • 第 2 部分:架构的名称。
  • 第 3 部分:表或视图的名称。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame

除了直接从表或视图中读取,此连接器还允许你指定自定义或传递查询,然后,该查询将传递给 SQL 引擎,结果返回到 Spark。

spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame

虽然此连接器会自动发现指定仓库/湖屋的终结点,但如果要显式指定它,则可以执行此操作。

//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>") => org.apache.spark.sql.DataFrame

读取同一工作区内的数据

重要

请在笔记本开头或在开始使用连接器之前运行这些导入语句。

import com.microsoft.spark.fabric.tds.implicits.read.FabricSparkTDSImplicits._

import com.microsoft.spark.fabric.Constants

以下代码是从 Spark 数据帧中的表或视图读取数据的示例:

val df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")

以下代码是从行数限制为 10 的 Spark 数据帧中的表或视图读取数据的示例:

val df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)

以下代码是应用筛选条件后从 Spark 数据帧中的表或视图读取数据的示例:

val df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")

以下代码是从 Spark 数据帧中的表或视图仅针对选定列读取数据的示例:

val df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")

跨工作区读取数据

若要跨工作区访问和读取数据仓库或湖屋中的数据,可以指定数据仓库或湖屋所在的工作区 ID,然后指定湖屋或数据仓库项 ID。此行提供了一个从数据仓库或湖屋中的表格或视图中读取数据的示例,该数据仓库或湖屋具有指定的工作区 ID 和湖屋/数据仓库 ID:

import com.microsoft.spark.fabric.Constants
//For lakehouse
val df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
//For data warehouse
val df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<data warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")

注意

运行笔记本时,默认情况下,连接器在湖屋连接到笔记本的工作区中查找指定的数据仓库或湖屋。 要从另一个工作区引用数据仓库或湖屋,请如上所述指定工作区 ID 和湖屋或数据仓库项目 ID。

跨单元格和语言使用具体化数据

可以使用 Spark 数据帧的 createOrReplaceTempView API,通过 Spark SQL 或 PySpark 中的其他单元格访问在一个单元格或 Scala(将其注册为临时视图后)中提取的数据。 这些代码行提供了一个示例,用于从 Scala 中 Spark 数据帧的表或视图读取数据,并跨 Spark SQL 和 PySpark 使用此数据:

%%spark
spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").createOrReplaceTempView("<Temporary View Name>")

现在,将笔记本中或单元格级别的语言首选项更改为 Spark SQL,并从已注册的临时视图中提取数据:

%%sql
SELECT * FROM <Temporary View Name> LIMIT 100

接下来,将笔记本中或单元格级别的语言首选项更改为 PySpark (Python),并从已注册的临时视图中提取数据:

%%pyspark
df = spark.read.table("<Temporary View Name>")

基于仓库中的数据创建湖屋表

这些代码行提供了一个示例,用于从 Scala 中 Spark 数据帧的表或视图读取数据,并使用该数据创建湖屋表:

val df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")

疑难解答

完成后,读取响应片段显示在单元格的输出中。 当前单元格失败也会取消笔记本的后续单元格执行操作。 Spark 应用程序日志中提供了详细的错误信息。

当前限制

目前,连接器:

  • 支持从湖屋项的 Fabric 仓库和 SQL 分析端点进行数据检索。
  • 仅支持 Scala。
  • Fabric DW 现在支持 Time Travel,但此连接器不适用于具有时间旅行语法的查询。
  • 不实现下推优化。
  • 保留使用签名(如 Apache Spark for Azure Synapse Analytics 随附的签名)以保持一致性。 但是,连接和使用 Azure Synapse Analytics 专用 SQL 池并非后向兼容。
  • 包含特殊字符的列名将通过在提交查询前添加转义字符来处理,该查询基于第 3 部分表/视图名。 对于基于自定义或传递查询的读取,用户需要对包含特殊字符的列名进行转义。