适用于 Microsoft Fabric 数据仓库的 Spark 连接器
适用于 Fabric 数据仓库的 Spark 连接器使 Spark 开发人员和数据科学家能够访问和处理来自仓库和湖屋的 SQL 分析终结点的数据。 该连接器提供以下功能:
- 可以从同一工作区或跨多个工作区处理仓库或 SQL 分析端点中的数据。
- Lakehouse 的 SQL 分析端点根据工作区上下文自动发现。
- 该连接器提供简化的 Spark API,抽象化底层复杂性,并且只需一行代码即可操作。
- 访问表或视图时,该连接器支持在 SQL 引擎级别定义的安全模型。 这些模型包括对象级安全性 (OLS)、行级别安全性 (RLS) 和列级别安全性 (CLS)。
- 该连接器预安装在 Fabric 运行时中,无需单独安装。
注意
该连接器目前提供预览版。 有关详细信息,请参阅本文后面的当前限制。
身份验证
Microsoft Entra 身份验证是一种集成身份验证方法。 用户登录到 Microsoft Fabric 工作区,其凭证会自动传递到 SQL 引擎进行身份验证和授权。 凭证是自动映射的,用户不需要提供特定的配置选项。
权限
要连接到 SQL 引擎,用户至少需要仓库或 SQL 分析端点(项目级别)上的读取权限(类似于 SQL Server 中的 CONNECT 权限)。 用户还需要精细的对象级权限才能从特定表或视图读取数据。 若要了解详细信息,请参阅 Microsoft Fabric 中数据仓库的安全性。
代码模板和示例
使用方法签名
以下命令显示读取请求的 synapsesql
方法签名。 从仓库和湖屋的 SQL 分析端点访问表或视图需要三部分 tableName
参数。 根据应用场景使用以下名称更新参数:
- 第 1 部分:仓库或湖屋的名称。
- 第 2 部分:架构的名称。
- 第 3 部分:表或视图的名称。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame
除了直接从表或视图中读取,此连接器还允许你指定自定义或传递查询,然后,该查询将传递给 SQL 引擎,结果返回到 Spark。
spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame
虽然此连接器会自动发现指定仓库/湖屋的终结点,但如果要显式指定它,则可以执行此操作。
//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>")
读取同一工作区内的数据
重要
请在笔记本开头或在开始使用连接器之前运行这些导入语句。
适用于 Scala
import com.microsoft.spark.fabric.tds.implicits.read.FabricSparkTDSImplicits._
import com.microsoft.spark.fabric.Constants
适用于 PySpark (Python)
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
以下代码是从 Spark 数据帧中的表或视图读取数据的示例:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
以下代码是从行数限制为 10 的 Spark 数据帧中的表或视图读取数据的示例:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)
以下代码是应用筛选条件后从 Spark 数据帧中的表或视图读取数据的示例:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")
以下代码是从 Spark 数据帧中的表或视图仅针对选定列读取数据的示例:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")
跨工作区读取数据
要跨工作区访问和读取仓库或湖屋中的数据,可以指定仓库或湖屋所在的工作区 ID,然后指定湖屋或仓库项 ID。 以下代码行举例说明如何在具有指定工作区 ID 和湖屋/仓库 ID 的仓库或湖屋中,从 Spark DataFrame 的表或视图中读取数据:
# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
注意
运行笔记本时,默认情况下,连接器在湖屋连接到笔记本的工作区中查找指定的仓库或湖屋。 要从另一个工作区引用仓库或湖屋,请指定工作区 ID 和湖屋或仓库项 ID,如上所示。
基于仓库中的数据创建湖屋表
这些代码行举例说明如何从 Spark DataFrame 中的表或视图读取数据,并使用它来创建湖屋表:
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")
疑难解答
完成后,读取响应片段显示在单元格的输出中。 当前单元格失败也会取消笔记本的后续单元格执行操作。 Spark 应用程序日志中提供了详细的错误信息。
当前限制
目前,连接器:
- 支持从湖屋项的 Fabric 仓库和 SQL 分析端点进行数据检索。
- Fabric DW 现在支持
Time Travel
,但此连接器不适用于具有时间旅行语法的查询。 - 保留使用签名(如 Apache Spark for Azure Synapse Analytics 随附的签名)以保持一致性。 但是,连接和使用 Azure Synapse Analytics 专用 SQL 池并非后向兼容。
- 包含特殊字符的列名将通过在提交查询前添加转义字符来处理,该查询基于第 3 部分表/视图名。 对于基于自定义或传递查询的读取,用户需要对包含特殊字符的列名进行转义。