讀取使用 Delta Sharing 開啟共享共享的數據(適用於收件者)
本文說明如何使用 Delta Sharing 開啟共用 通訊協議來讀取與您共用的數據。 其中包含使用 Databricks、Apache Spark、pandas、Power BI 和 Tableau 讀取共享數據的指示。
在開啟共用中,您會使用數據提供者與小組成員共用的認證檔案,以取得共享數據的安全讀取存取權。 只要認證有效,且提供者會繼續共享數據,存取就會保存。 提供者會管理認證到期和輪替。 數據的更新幾乎即時可供您使用。 您可以讀取和製作共享數據的複本,但無法修改源數據。
注意
如果使用 Databricks-to-Databricks Delta Sharing 與您共用數據,則不需要認證檔案來存取數據,而且本文不適用於您。 如需指示,請參閱使用 Databricks-to-Databricks Delta Sharing 讀取共用的數據(適用於收件者)。
下列各節說明如何使用 Azure Databricks、Apache Spark、pandas 和 Power BI,以使用認證檔案來存取和讀取共享數據。 如需差異共用連接器的完整清單,以及如何使用連接器的詳細資訊,請參閱差異共用 開放原始碼 檔。 如果您無法存取共享數據,請連絡數據提供者。
注意
除非另有說明,否則合作夥伴整合是由第三方提供,而且您必須具有適當提供者的帳戶,才能使用其產品和服務。 雖然 Databricks 盡最大努力讓此內容保持在最新狀態,但我們不會就合作夥伴整合頁面上的內容或內容的精確度做出任何表示。 連絡適當的提供者,瞭解整合。
開始之前
小組成員必須下載數據提供者共用的認證檔案。 請參閱 在開放式共用模型中取得存取權。
他們應該使用安全通道與您共用該檔案或檔案位置。
Azure Databricks:使用開放式共用連接器讀取共享數據
本節說明如何使用開放式共享連接器,在 Azure Databricks 工作區中使用筆記本存取共享數據。 您或小組的另一個成員會將認證檔案儲存在 DBFS 中,然後使用它向數據提供者的 Azure Databricks 帳戶進行驗證,並讀取數據提供者與您共享的數據。
注意
如果數據提供者使用 Databricks 對 Databricks 共用,但未與您共用認證檔案,您必須使用 Unity 目錄存取數據。 如需指示,請參閱使用 Databricks-to-Databricks Delta Sharing 讀取共用的數據(適用於收件者)。
在此範例中,您會建立具有多個數據格的筆記本,以便獨立執行。 您可以改為將筆記本命令新增至相同的數據格,並依序執行。
步驟 1:將認證檔案儲存在 DBFS 中(Python 指示)
在此步驟中,您會在 Azure Databricks 中使用 Python Notebook 來儲存認證檔案,讓小組上的使用者可以存取共享的數據。
如果您或小組中的某人已經將認證檔案儲存在 DBFS 中,請跳至下一個步驟。
在文本編輯器中,開啟認證檔案。
在您的 Azure Databricks 工作區中,按兩下 [ 新增 > 筆記本]。
- 輸入名稱。
- 將筆記本的默認語言設定為 Python。
- 選取要附加至筆記本的叢集。
- 按一下 [建立]。
筆記本會在筆記本編輯器中開啟。
若要使用 Python 或 pandas 來存取共享的數據,請安裝 差異共用 Python 連接器。 在筆記本編輯器中,貼上下列命令:
%sh pip install delta-sharing
執行儲存格。
如果尚未安裝 Python 連結
delta-sharing
庫,則會安裝在叢集中。在新的數據格中,貼上下列命令,將認證檔案的內容上傳至 DBFS 中的資料夾。 取代變數,如下所示:
<dbfs-path>
:您要儲存認證檔案的資料夾路徑<credential-file-contents>
:認證檔案的內容。 這不是檔案的路徑,而是檔案複製的內容。認證檔案包含 JSON,定義三個字段:
shareCredentialsVersion
、endpoint
和bearerToken
。%scala dbutils.fs.put("<dbfs-path>/config.share",""" <credential-file-contents> """)
執行儲存格。
上傳認證檔案之後,您可以刪除此儲存格。 所有工作區使用者可以從 DBFS 讀取認證檔案,而且認證檔案可在您工作區中的所有叢集和 SQL 倉儲上的 DBFS 中使用。 若要刪除儲存格,請按兩下最右邊單元格動作選單中的 x。
步驟 2:使用筆記本來列出和讀取共享數據表
在此步驟中,您會列出共用中的數據表,或一組共用數據表和分割區,然後查詢數據表。
使用 Python,列出共用中的數據表。
在新的儲存格中,貼上下列命令。 將 取代
<dbfs-path>
為步驟 1:將認證檔案儲存在 DBFS 中的路徑(Python 指示)。當程式代碼執行時,Python 會從叢集上的 DBFS 讀取認證檔案。 存取儲存在 DBFS 中的路徑
/dbfs/
數據。import delta_sharing client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share") client.list_all_tables()
執行儲存格。
結果是數據表的陣列,以及每個數據表的元數據。 下列輸出顯示兩個資料表:
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
如果輸出是空的,或不包含您預期的數據表,請連絡數據提供者。
查詢共享數據表。
使用 Scala:
在新的儲存格中,貼上下列命令。 當程式代碼執行時,認證檔案會透過 JVM 從 DBFS 讀取。
取代變數,如下所示:
<profile-path>
:認證檔的 DBFS 路徑。 例如:/<dbfs-path>/config.share
。<share-name>
:數據表的 值share=
。<schema-name>
:數據表的 值schema=
。<table-name>
:數據表的 值name=
。
%scala spark.read.format("deltaSharing") .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
執行儲存格。 每次載入共享數據表時,都會看到來源的全新數據。
使用 SQL:
若要使用 SQL 查詢數據,您可以從共享資料表在工作區中建立本機數據表,然後查詢本機數據表。 共用的數據不會儲存或快取在本機數據表中。 每次查詢本機數據表時,您會看到共享數據的目前狀態。
在新的儲存格中,貼上下列命令。
取代變數,如下所示:
<local-table-name>
:本機數據表的名稱。<profile-path>
:認證檔案的位置。<share-name>
:數據表的 值share=
。<schema-name>
:數據表的 值schema=
。<table-name>
:數據表的 值name=
。
%sql DROP TABLE IF EXISTS table_name; CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>"; SELECT * FROM <local-table-name> LIMIT 10;
當您執行 命令時,會直接查詢共享數據。 作為測試,會查詢數據表,並傳回前 10 個結果。
如果輸出是空的,或不包含您預期的數據,請連絡數據提供者。
Apache Spark:讀取共享數據
請遵循下列步驟,使用Spark 3.x或更新版本存取共享數據。
這些指示假設您可以存取數據提供者所共用的認證檔案。 請參閱 在開放式共用模型中取得存取權。
安裝 Delta 共用 Python 和 Spark 連接器
若要存取與共享資料相關的元數據,例如與您共用的數據表清單,請執行下列動作。 此範例使用 Python。
-
pip install delta-sharing
使用Spark列出共享數據表
列出共享中的數據表。 在下列範例中,將 取代 <profile-path>
為認證檔的位置。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
結果是數據表的陣列,以及每個數據表的元數據。 下列輸出顯示兩個資料表:
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
如果輸出是空的,或不包含您預期的數據表,請連絡數據提供者。
使用Spark存取共享數據
執行下列命令,並取代這些變數:
<profile-path>
:認證檔案的位置。<share-name>
:數據表的 值share=
。<schema-name>
:數據表的 值schema=
。<table-name>
:數據表的 值name=
。<version-as-of>
:自選。 要載入數據的數據表版本。 只有當數據提供者共享數據表的歷程記錄時,才能運作。delta-sharing-spark
需要 0.5.0 或更新版本。<timestamp-as-of>
:自選。 在指定時間戳之前或指定時間戳的版本載入數據。 只有當數據提供者共享數據表的歷程記錄時,才能運作。delta-sharing-spark
需要 0.6.0 或更新版本。
Python
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)
spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)
spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
Scala
執行下列命令,並取代這些變數:
<profile-path>
:認證檔案的位置。<share-name>
:數據表的 值share=
。<schema-name>
:數據表的 值schema=
。<table-name>
:數據表的 值name=
。<version-as-of>
:自選。 要載入數據的數據表版本。 只有當數據提供者共享數據表的歷程記錄時,才能運作。delta-sharing-spark
需要 0.5.0 或更新版本。<timestamp-as-of>
:自選。 在指定時間戳之前或指定時間戳的版本載入數據。 只有當數據提供者共享數據表的歷程記錄時,才能運作。delta-sharing-spark
需要 0.6.0 或更新版本。
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
使用Spark存取共用變更數據摘要
如果數據表歷程記錄已與您共用,而且源數據表上已啟用異動數據摘要(CDF),您可以執行下列命令來存取變更數據摘要,並取代這些變數。 delta-sharing-spark
需要 0.5.0 或更新版本。
必須提供一個和一個 start 參數。
<profile-path>
:認證檔案的位置。<share-name>
:數據表的 值share=
。<schema-name>
:數據表的 值schema=
。<table-name>
:數據表的 值name=
。<starting-version>
:自選。 包含的查詢起始版本。 指定為 Long。<ending-version>
:自選。 包含的查詢結束版本。 如果未提供結束版本,API 會使用最新的數據表版本。<starting-timestamp>
:自選。 查詢的起始時間戳,這會轉換成建立大於或等於此時間戳的版本。 以格式yyyy-mm-dd hh:mm:ss[.fffffffff]
指定為字串。<ending-timestamp>
:自選。 查詢的結束時間戳,這會轉換成稍早建立或等於此時間戳的版本。 以格式指定為字串yyyy-mm-dd hh:mm:ss[.fffffffff]
Python
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<ending-version>)
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Scala
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
如果輸出是空的,或不包含您預期的數據,請連絡數據提供者。
使用Spark結構化串流存取共享數據表
如果數據表歷程記錄與您共用,您可以串流讀取共享數據。 delta-sharing-spark
需要 0.6.0 或更新版本。
支援的選項:
ignoreDeletes
:略過刪除資料的交易。ignoreChanges
:如果由於資料變更作業 [例如UPDATE
、MERGE INTO
、DELETE
(分割內) 或OVERWRITE
] 而在來源資料表中重寫了檔案,則重新處理會更新。 仍可以發出未變更的資料列。 因此,下游取用者應能夠處理重複項。 刪除項目不會傳播至下游。ignoreChanges
包含ignoreDeletes
。 因此,如果您使用ignoreChanges
,您的數據流將不會因刪除或更新源數據表而中斷。startingVersion
:要從其開始的共用資料表版本。 從此版本 (含) 開始的所有資料表變更都將由串流來源讀取。startingTimestamp
:要從其開始的時間戳記。 在該時間戳記 (含) 或之後提交的所有資料表變更都將由串流來源讀取。 範例:"2023-01-01 00:00:00.0"
。maxFilesPerTrigger
:要在每個微批次中考慮的新檔案數。maxBytesPerTrigger
:在每個微批次中處理的資料量。 此選項會設定「軟最大值」,這表示批次會處理大約這個數據量,而且可能會處理超過限制,以便在最小輸入單位大於此限制的情況下向前移動串流查詢。readChangeFeed
:串流讀取共用資料表的變更資料摘要。
不支援的選項:
Trigger.availableNow
範例結構化串流查詢
Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
另請參閱 Azure Databricks 上的串流。
讀取已啟用刪除向量或資料列對應的資料表
重要
這項功能處於公開預覽狀態。
刪除向量是提供者可以在共用差異資料表上啟用的儲存體最佳化功能。 請參閱什麼是刪除向量?。
Azure Databricks 也支援差異資料表的資料行對應。 請參閱重新命名和卸除與 Delta Lake 資料行對應的資料行。
如果您的提供者共享資料表並啟用刪除向量或數據行對應,您可以使用執行 delta-sharing-spark
3.1 或更新版本的計算來讀取資料表。 如果您使用 Databricks 叢集,您可以使用執行 Databricks Runtime 14.1 或更新版本之叢集來執行批次讀取。 CDF 和串流查詢需要 Databricks Runtime 14.2 或更新版本。
您可以依現況執行批次查詢,因為它們可以根據共用資料表的資料表功能自動解析 responseFormat
。
若要讀取變更資料摘要 (CDF) 或對已啟用刪除向量或資料行對應的共用資料表執行串流查詢,您必須設定其他選項 responseFormat=delta
。
下列範例顯示批次、CDF 和串流查詢:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
// Batch query
spark.read.format("deltaSharing").load(tablePath)
// CDF query
spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("responseFormat", "delta")
.option("startingVersion", 1)
.load(tablePath)
// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
Pandas:讀取共享數據
請遵循下列步驟來存取 pandas 0.25.3 或更新版本中的共享數據。
這些指示假設您可以存取數據提供者所共用的認證檔案。 請參閱 在開放式共用模型中取得存取權。
安裝 Delta Sharing Python 連接器
若要存取與共享資料相關的元數據,例如與您共用的數據表清單,您必須安裝 差異共用 Python 連接器。
pip install delta-sharing
使用 pandas 列出共享數據表
若要列出共用中的數據表,請執行下列命令,並將 取代 <profile-path>/config.share
為認證檔案的位置。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
如果輸出是空的,或不包含您預期的數據表,請連絡數據提供者。
使用 pandas 存取共享數據
若要使用 Python 存取 pandas 中的共享數據,請執行下列命令,並取代變數,如下所示:
<profile-path>
:認證檔案的位置。<share-name>
:數據表的 值share=
。<schema-name>
:數據表的 值schema=
。<table-name>
:數據表的 值name=
。
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")
使用 pandas 存取共用變更數據摘要
若要使用 Python 存取 pandas 中共用數據表的變更數據摘要,請執行下列命令,並取代變數,如下所示。 變更數據摘要可能無法使用,視數據提供者是否共享數據表的變更數據摘要而定。
<starting-version>
:自選。 包含的查詢起始版本。<ending-version>
:自選。 包含的查詢結束版本。<starting-timestamp>
:自選。 查詢的起始時間戳。 這會轉換成建立大於或等於此時間戳的版本。<ending-timestamp>
:自選。 查詢的結束時間戳。 這會轉換成稍早建立或等於此時間戳的版本。
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<starting-version>)
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
如果輸出是空的,或不包含您預期的數據,請連絡數據提供者。
Power BI:讀取共享數據
Power BI Delta Sharing 連接器可讓您透過 Delta Sharing 開啟通訊協定探索、分析及可視化與您共用的數據集。
需求
- Power BI Desktop 2.99.621.0 或更新版本。
- 存取數據提供者所共享的認證檔案。 請參閱 在開放式共用模型中取得存取權。
線上到 Databricks
若要使用 Delta Sharing 連接器連線到 Azure Databricks,請執行下列動作:
- 使用文本編輯器開啟共用認證檔案,以擷取端點 URL 和令牌。
- 開啟 Power BI Desktop。
- 在 [ 取得數據] 功能表上,搜尋 差異共用。
- 選取連接器,然後按兩下 [ 連線]。
- 輸入您從認證檔案複製到 [ 差異共用伺服器 URL] 字段的端點 URL 。
- 或者,在 [ 進階選項] 索引標籤中,為您可以下載的數據列數目上限設定數據 列限制 。 根據預設,這會設定為1百萬個數據列。
- 按一下 [確定]。
- 針對 [驗證],將您從認證檔案 擷取的令牌複製到持有人令牌。
- 按一下 [連線]。
Power BI Delta 共用連接器的限制
Power BI Delta Sharing Connector 有下列限制:
- 連接器載入的數據必須符合您電腦的記憶體。 若要管理這項需求,連接器會將匯入的數據列數目限制為您 在 Power BI Desktop 的 [進階選項] 索引卷標底下所設定的數據列限制 。
Tableau:讀取共享數據
Tableau Delta Sharing 連接器可讓您透過 Delta Sharing 開啟通訊協定探索、分析及可視化與您共用的數據集。
需求
- Tableau Desktop 和 Tableau Server 2024.1 或更新版本
- 存取數據提供者所共享的認證檔案。 請參閱 在開放式共用模型中取得存取權。
聯機到 Azure Databricks
若要使用 Delta Sharing 連接器連線到 Azure Databricks,請執行下列動作:
- 移至 Tableau Exchange,依照指示下載 Delta Sharing Connector,並將它放在適當的桌面資料夾中。
- 開啟 Tableau Desktop。
- 在 [ 連接器] 頁面上,搜尋 「由 Databricks 共用的差異」。
- 選取 [ 上傳共用檔案],然後選擇提供者所共用的認證檔案。
- 按兩下 [ 取得數據]。
- 在 [數據總管] 中,選取數據表。
- 選擇性地新增 SQL 篩選條件或資料列限制。
- 按兩下 [ 取得數據表數據]。
Tableau Delta Sharing 連接器的限制
Tableau Delta Sharing Connector 有下列限制:
- 連接器載入的數據必須符合您電腦的記憶體。 若要管理這項需求,連接器會將匯入的數據列數目限制限制為您在Tableau中設定的數據列限制。
- 所有數據行都會以類型
String
傳回。 - 只有在差異共用伺服器支援 predicateHint 時,SQL 篩選才能運作。
要求新的認證
如果您的認證啟用 URL 或下載的認證遺失、損毀或遭入侵,或您的認證到期,而您的提供者不會傳送新的認證,請連絡您的提供者以要求新的認證。