共用方式為


適用於 Microsoft Fabric 數據倉儲的 Spark 連接器

適用於 Fabric 數據倉儲的 Spark 連接器可讓 Spark 開發人員和數據科學家存取和處理來自倉儲和 Lakehouse SQL 分析端點的數據。 連接器提供下列功能:

  • 您可以在相同工作區或跨多個工作區使用來自倉儲或 SQL 分析端點的資料。
  • Lakehouse 的 SQL 分析端點會根據工作區內容自動探索。
  • 連接器具有簡化的Spark API、擷取基礎複雜性,並只使用一行程式碼運作。
  • 當您存取資料表或檢視時,連接器會維護在 SQL 引擎層級定義的安全性模型。 這些模型包括物件層級安全性(OLS)、資料列層級安全性(RLS),以及資料行層級安全性(CLS)。
  • 連接器會在 Fabric 執行時間內預安裝,可免除個別安裝的需求。

注意

此連接器目前為預覽版。 有關詳細資訊,請參閱本文後面的目前限制

驗證

Microsoft Entra 驗證是整合式驗證方法。 使用者登入 Microsoft Fabric 工作區,且其認證會自動傳遞至 SQL 引擎以進行驗證和授權。 認證會自動對應,而且使用者不需要提供特定的組態選項。

權限

若要連線到 SQL 引擎,使用者至少需要倉儲或 SQL 分析端點(項目層級)的讀取許可權(類似於 SQL Server 中的 CONNECT 許可權)。 使用者也需要細微的物件層級許可權,才能從特定資料表或檢視表讀取資料。 若要深入瞭解,請參閱 Microsoft Fabric 中的資料倉儲安全性。

程式碼範本和範例

使用方法簽名

下列命令顯示讀取要求的 synapsesql 方法簽名。 需要三部分 tableName 自變數,才能從倉儲和 Lakehouse 的 SQL 分析端點存取資料表或檢視。 根據您的案例,以下列名稱更新引數:

  • 第 1 部分:倉儲或 lakehouse 的名稱。
  • 第 2 部分:結構描述的名稱。
  • 第 3 部分:資料表或檢視表的名稱。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame

除了直接從資料表或檢視讀取之外,此連接器也可讓您指定自定義或傳遞查詢,此查詢會傳遞至 SQL 引擎,並將結果傳回 Spark。

spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame

雖然此連接器會自動探索指定之倉儲/Lakehouse 的端點,但如果您想要明確指定,您可以執行此動作。

//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>") 

讀取相同工作區中的資料

重要

在筆記本開頭或在開始使用連接器之前,執行這些匯入語句:

針對 Scala

import com.microsoft.spark.fabric.tds.implicits.read.FabricSparkTDSImplicits._

import com.microsoft.spark.fabric.Constants

適用於 PySpark (Python)

import com.microsoft.spark.fabric

from com.microsoft.spark.fabric.Constants import Constants

下列程式碼是從 Spark DataFrame 中的資料表或檢視讀取資料的範例:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")

下列程式碼是從 Spark DataFrame 中資料列計數限制為 10 的資料表或檢視讀取資料的範例:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)

下列程式碼是套用篩選條件之後,從 Spark DataFrame 中的資料表或檢視讀取資料的範例:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")

下列程式碼是僅針對所選資料行從 Spark DataFrame 中的資料表或檢視讀取資料的範例:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")

跨工作區讀取資料

若要跨工作區存取和讀取倉儲或 Lakehouse 的數據,您可以指定倉儲或 Lakehouse 所在的工作區標識符,然後指定 Lakehouse 或倉儲專案標識符。 下列這一行提供從 Spark 數據框架中從倉儲或 Lakehouse 讀取數據表或檢視數據的範例,其中包含指定的工作區標識碼和 lakehouse/warehouse 標識符:

# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")

# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")

注意

當您執行筆記本時,連接器預設會在連結至筆記本的 Lakehouse 工作區中尋找指定的倉儲或 Lakehouse。 若要從另一個工作區參考倉儲或 Lakehouse,請指定工作區標識碼和 Lakehouse 或倉儲專案識別碼,如上所示。

根據倉儲的資料建立 Lakehouse 資料表

這些程式代碼行提供從 Spark DataFrame 中的數據表或檢視讀取數據的範例,並用它來建立 Lakehouse 數據表:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")

疑難排解

完成時,讀取回應代碼段會出現在儲存格的輸出中。 目前儲存格中的失敗也會取消筆記本的後續儲存格執行。 Spark 應用程式記錄中提供詳細的錯誤資訊。

目前的限制

目前,連接器:

  • 支援從 Lakehouse 項目的網狀架構倉儲和 SQL 分析端點擷取資料。
  • Fabric DW 現在支援 Time Travel 此連接器不適用於具有時間行動語法的查詢。
  • 保留使用簽章,就像 Apache Spark for Azure Synapse Analytics 隨附的簽章一樣,以保持一致性。 不過,聯機和使用 Azure Synapse Analytics 中的專用 SQL 集區並不相容。
  • 系統會根據 3 部分資料表/檢視名稱,在查詢之前新增逸出字元來處理具有特殊字元的資料行名稱。 如果是以自定義或傳遞查詢為基礎的讀取,使用者必須逸出包含特殊字元的資料行名稱。