使用 Delta Sharing 开放共享读取共享的数据(针对接收者)
本文介绍如何读取使用 Delta Sharing 开放共享协议与你共享的数据。 其中包括有关使用 Databricks、Apache Spark、pandas、Power BI 和 Tableau 读取共享数据的说明。
在开放共享中,可以使用团队成员通过数据提供程序共享的凭据文件来获得对共享数据的安全读取访问权限。 只要凭据有效且提供商继续共享数据,访问就会持续。 提供商管理凭据过期和轮换。 你可以准乎实时地获得对数据的更新。 可读取和复制共享数据,但不能修改源数据。
注意
如果数据是通过 Databricks-to-Databricks Delta Sharing 与你共享的,则你不需要凭据文件即可访问数据。此文章不适合你。 有关说明,请参阅使用 Databricks-to-Databricks Delta Sharing 读取共享数据(针对接收者)。
以下部分介绍如何使用 Azure Databricks、Apache Spark、pandas 和 Power BI 通过凭据文件访问和读取共享数据。 有关 Delta Sharing 连接器的完整列表及其用法的信息,请参阅 Delta Sharing 开放源代码文档。 如果在访问共享数据时遇到问题,请联系数据提供者。
注意
除非另有说明,否则合作伙伴集成由第三方提供。你必须拥有相应提供者的帐户才能使用他们的产品和服务。 尽管 Databricks 尽量使此内容保持最新状态,但我们不会就合作伙伴集成页上的集成或内容准确性做出任何陈述。 有关集成的问题,请咨询相应的提供商。
开始之前
团队成员必须下载数据提供程序共享的凭据文件。 请参阅在开放共享模型中获取访问权限。
他们应使用安全通道与你共享该文件或文件位置。
Azure Databricks:使用开放共享连接器读取共享的数据
本部分介绍如何使用开放共享连接器在 Azure Databricks 工作区中通过笔记本访问共享的数据。 你或者其他团队成员将凭据文件存储在 DBFS 中,然后使用它向数据提供程序的 Azure Databricks 帐户进行身份验证,并读取数据提供程序与你共享的数据。
注意
如果数据提供程序正在使用 Databricks-to-Databricks 共享,并且未与你共享凭据文件,则必须通过 Unity Catalog 访问数据。 有关说明,请参阅使用 Databricks-to-Databricks Delta Sharing 读取共享数据(针对接收者)。
在此示例中,创建了一个笔记本,其中包含多个可独立运行的单元格。 可改为将笔记本命令添加到同一单元格,并按顺序运行它们。
步骤 1:将凭据文件存储在 DBFS 中(Python 说明)
在此步骤中,你将使用 Azure Databricks 中的 Python 笔记本来存储凭据文件,以便团队中的用户可以访问共享的数据。
如果你或者团队中的某人已将凭据文件存储在 DBFS 中,请跳到下一步骤。
在文本编辑器中打开凭据文件。
在 Azure Databricks 工作区中,单击“新建”>“笔记本”。
- 输入名称。
- 将笔记本的默认语言设置为 Python。
- 选择要附加到笔记本的群集。
- 单击“创建”。
笔记本随即在笔记本编辑器中打开。
若要使用 Python 或 pandas 访问共享数据,请安装 delta-sharing Python 连接器。 在笔记本编辑器中,粘贴以下命令:
%sh pip install delta-sharing
运行单元。
delta-sharing
Python 库将安装在群集中(如果尚未安装)。在新的单元格中,粘贴以下命令,该命令将凭据文件的内容上传到 DBFS 的一个文件夹中。 替换变量,如下所示:
<dbfs-path>
:要保存凭据文件的文件夹的路径<credential-file-contents>
:凭据文件的内容。 这并非文件路径,而是复制的文件内容。凭据文件包含定义三个字段的 JSON:
shareCredentialsVersion
、endpoint
和bearerToken
。%scala dbutils.fs.put("<dbfs-path>/config.share",""" <credential-file-contents> """)
运行该单元。
上传凭据文件后,可删除此单元格。 所有工作区用户都可以从 DBFS 读取凭据文件,并且凭据文件在 DBFS 的工作区中的所有群集和 SQL 仓库上都可用。 若要删除单元格,请单击最右侧单元格操作菜单 中的“x”。
步骤 2:使用笔记本列出和读取共享的表
在此步骤中,你将列出共享中的表或者共享的表和分区集,然后查询某个表。
使用 Python 列出共享中的表。
在新的单元格中粘贴以下命令。 请将
<dbfs-path>
替换为在步骤 1:将凭据文件存储在 DBFS 中(Python 说明)中创建的路径。代码运行时,Python 从 DBFS 的群集上读取凭据文件。 访问存储在 DBFS 中路径
/dbfs/
处的数据。import delta_sharing client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share") client.list_all_tables()
运行该单元。
结果是一个表数组,以及每个表的元数据。 以下输出显示了两个表:
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
如果输出为空或不包含预期表,请与数据提供者联系。
查询某个共享表。
使用 Scala:
在新的单元格中粘贴以下命令。 代码运行时,通过 JVM 从 DBFS 读取凭据文件。
替换变量,如下所示:
<profile-path>
:凭据文件的 DBFS 路径。 例如/<dbfs-path>/config.share
。<share-name>
:表的share=
的值。<schema-name>
:表的schema=
的值。<table-name>
:表的name=
的值。
%scala spark.read.format("deltaSharing") .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
运行该单元。 每次加载共享表时,你都会看到来自源的新数据。
使用 SQL:
若要使用 SQL 查询数据,必须从共享表在工作区中创建本地表,然后查询本地表。 共享数据不存储或缓存在本地表中。 每次查询本地表时,你都会看到共享数据的当前状态。
在新的单元格中粘贴以下命令。
替换变量,如下所示:
<local-table-name>
:本地表的名称。<profile-path>
:凭据文件的位置。<share-name>
:表的share=
的值。<schema-name>
:表的schema=
的值。<table-name>
:表的name=
的值。
%sql DROP TABLE IF EXISTS table_name; CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>"; SELECT * FROM <local-table-name> LIMIT 10;
运行该命令时,将直接查询共享数据。 作为测试,查询该表并返回前 10 个结果。
如果输出为空或不包含预期数据,请与数据提供者联系。
Apache Spark:读取共享的数据
按照以下步骤使用 Spark 3.x 或更高版本访问共享的数据。
这些说明假设你有权访问数据提供程序共享的凭据文件。 请参阅在开放共享模型中获取访问权限。
安装 Delta Sharing Python 和 Spark 连接器
若要访问与共享数据相关的元数据(例如与你共享的表列表),请执行以下操作。 本示例使用 Python。
-
pip install delta-sharing
安装 Apache Spark 连接器。
使用 Spark 列出共享的表
列出共享中的表。 在下面的示例中,将 <profile-path>
替换为凭据文件的位置。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
结果是一个表数组,以及每个表的元数据。 以下输出显示了两个表:
Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
如果输出为空或不包含预期表,请与数据提供者联系。
使用 Spark 访问共享的数据
运行以下命令并替换以下变量:
<profile-path>
:凭据文件的位置。<share-name>
:表的share=
的值。<schema-name>
:表的schema=
的值。<table-name>
:表的name=
的值。<version-as-of>
:可选。 要加载数据的表的版本。 只有在数据提供者共享表的历史记录的情况下才有效。 需要delta-sharing-spark
0.5.0 或更高版本。<timestamp-as-of>
:可选。 在给定时间戳的版本或其之前的版本加载数据。 只有在数据提供者共享表的历史记录的情况下才有效。 需要delta-sharing-spark
0.6.0 或更高版本。
Python
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)
spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)
spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))
Scala
运行以下命令并替换以下变量:
<profile-path>
:凭据文件的位置。<share-name>
:表的share=
的值。<schema-name>
:表的schema=
的值。<table-name>
:表的name=
的值。<version-as-of>
:可选。 要加载数据的表的版本。 只有在数据提供者共享表的历史记录的情况下才有效。 需要delta-sharing-spark
0.5.0 或更高版本。<timestamp-as-of>
:可选。 在给定时间戳的版本或其之前的版本加载数据。 只有在数据提供者共享表的历史记录的情况下才有效。 需要delta-sharing-spark
0.6.0 或更高版本。
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)
使用 Spark 访问共享的变更数据馈送
如果数据提供者已与你共享表历史记录并在源表上启用了变更数据馈送 (CDF),你可以通过运行以下命令并替换相应的变量来访问变更数据馈送。 需要 delta-sharing-spark
0.5.0 或更高版本。
必须提供一个且只能提供一个启动参数。
<profile-path>
:凭据文件的位置。<share-name>
:表的share=
的值。<schema-name>
:表的schema=
的值。<table-name>
:表的name=
的值。<starting-version>
:可选。 查询的起始版本(含)。 指定为长整型。<ending-version>
:可选。 查询的结束版本(含)。 如果未提供结束版本,API 将使用最新的表版本。<starting-timestamp>
:可选。 查询的起始时间戳,将转换为在晚于或等于此时间戳创建的版本。 指定为采用yyyy-mm-dd hh:mm:ss[.fffffffff]
格式的字符串。<ending-timestamp>
:可选。 查询的结束时间戳,将转换为在早于或等于此时间戳创建的版本。 指定为采用yyyy-mm-dd hh:mm:ss[.fffffffff]
格式的字符串
Python
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<ending-version>)
delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Scala
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
如果输出为空或不包含预期数据,请与数据提供者联系。
使用 Spark 结构化流式处理访问共享表
如果表历史记录已与你共享,你可以流式读取共享数据。 需要 delta-sharing-spark
0.6.0 或更高版本。
支持的选项:
ignoreDeletes
:忽略删除数据的事务。ignoreChanges
:如果由于数据更改操作(例如UPDATE
、MERGE INTO
、DELETE
(分区内)或OVERWRITE
)而在源表中重写了文件,则重新处理更新。 仍可以发出未更改的行。 因此,下游使用者应该能够处理重复项。 删除不会传播到下游。ignoreChanges
包括ignoreDeletes
。 因此,如果使用ignoreChanges
,则流不会因源表的删除或更新而中断。startingVersion
:要从其开始的共享表版本。 从此版本(含)开始的所有表更改都将由流式处理源读取。startingTimestamp
:要从其开始的时间戳。 在该时间戳(含)或之后提交的所有表更改都将由流式处理源读取。 示例:"2023-01-01 00:00:00.0"
。maxFilesPerTrigger
:要在每个微批中考虑的新文件数。maxBytesPerTrigger
:在每个微批中处理的数据量。 此选项设置一个“柔性最大值”,这意味着批处理大约处理此数量的数据,并且可能会超过此限制,以便在最小输入单元大于此限制的情况下,继续处理流式查询。readChangeFeed
:流式读取共享表的变更数据馈送。
不支持的选项:
Trigger.availableNow
结构化流式处理查询示例
Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
另请参阅 Azure Databricks 上的流式处理。
读取启用了删除矢量或列映射的表
重要
此功能目前以公共预览版提供。
删除向量是提供商可以在共享 Delta 表上启用的一项存储优化功能。 请参阅什么是删除向量?。
Azure Databricks 还支持 Delta 表的列映射。 请参阅使用 Delta Lake 列映射重命名和删除列。
如果你的提供商共享了启用了删除矢量或列映射的表,则可以使用运行 delta-sharing-spark
3.1 或更高版本的计算来读取该表。 如果你使用 Databricks 群集,则可以使用运行 Databricks Runtime 14.1 或更高版本的群集执行批量读取。 CDF 和流式处理查询需要 Databricks Runtime 14.2 或更高版本。
可以按原样执行批处理查询,因为它们可以根据共享表的表功能自动解析 responseFormat
。
若要读取变更数据馈送 (CDF) 或对启用了删除向量或列映射的共享表执行流式处理查询,必须设置附加选项 responseFormat=delta
。
以下示例显示了批处理、CDF 和流式处理查询:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"
// Batch query
spark.read.format("deltaSharing").load(tablePath)
// CDF query
spark.read.format("deltaSharing")
.option("readChangeFeed", "true")
.option("responseFormat", "delta")
.option("startingVersion", 1)
.load(tablePath)
// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
Pandas:读取共享的数据
按照以下步骤在 pandas 0.25.3 或更高版本中访问共享数据。
这些说明假设你有权访问数据提供程序共享的凭据文件。 请参阅在开放共享模型中获取访问权限。
安装 Delta Sharing Python 连接器
若要访问与共享数据相关的元数据(例如与你共享的表列表),必须安装 delta-sharing Python 连接器。
pip install delta-sharing
使用 pandas 列出共享的表
若要列出共享中的表,请运行以下命令并将 <profile-path>/config.share
替换为凭据文件的位置。
import delta_sharing
client = delta_sharing.SharingClient(f"<profile-path>/config.share")
client.list_all_tables()
如果输出为空或不包含预期表,请与数据提供者联系。
使用 pandas 访问共享的数据
若要使用 Python 访问 pandas 中的共享数据,请运行以下命令并按如下所述替换变量:
<profile-path>
:凭据文件的位置。<share-name>
:表的share=
的值。<schema-name>
:表的schema=
的值。<table-name>
:表的name=
的值。
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")
使用 pandas 访问共享的更改数据馈送
若要使用 Python 访问 pandas 中共享表的更改数据馈送,请运行以下命令并按如下所示替换变量。 可能未提供更改数据馈送,具体取决于数据提供程序是否共享了表的更改数据馈送。
<starting-version>
:可选。 查询的起始版本(含)。<ending-version>
:可选。 查询的结束版本(含)。<starting-timestamp>
:可选。 查询的起始时间戳。 此值将转换为在晚于或等于此时间戳创建的版本。<ending-timestamp>
:可选。 查询的结束时间戳。 此值将转换为在早于或等于此时间戳创建的版本。
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_version=<starting-version>,
ending_version=<starting-version>)
delta_sharing.load_table_changes_as_pandas(
f"<profile-path>#<share-name>.<schema-name>.<table-name>",
starting_timestamp=<starting-timestamp>,
ending_timestamp=<ending-timestamp>)
如果输出为空或不包含预期数据,请与数据提供者联系。
Power BI:读取共享的数据
使用 Power BI Delta Sharing 连接器可以通过 Delta Sharing 开放协议来发现、分析和可视化与你共享的数据集。
要求
- Power BI Desktop 2.99.621.0 或更高版本。
- 访问数据提供程序共享的凭据文件。 请参阅在开放共享模型中获取访问权限。
连接到 Databricks
若要使用 Delta Sharing 连接器连接到 Azure Databricks,请执行以下操作:
- 使用文本编辑器打开共享的凭据文件以检索终结点 URL 和令牌。
- 打开 Power BI Desktop。
- 在“获取数据”菜单中,搜索“Delta Sharing”。
- 选择连接器并单击“连接”。
- 在“Delta Sharing 服务器 URL”字段中,输入从凭据文件复制的终结点 URL。
- (可选)在“高级选项”选项卡中,为可下载的最大行数设置“行限制”。 此限制默认设置为 100 万行。
- 单击 “确定” 。
- 对于“身份验证”,请将从凭据文件中检索的令牌复制到“持有者令牌”中。
- 单击“连接” 。
Power BI Delta Sharing 连接器的限制
Power BI Delta Sharing 连接器具有以下限制:
- 该连接器加载的数据必须能够装入计算机的内存。 为了管理此要求,该连接器会将导入的行数限制为在 Power BI Desktop 的“高级选项”选项卡下设置的“行限制”。
Tableau:读取共享数据
使用 Tableau Delta Sharing 连接器可以通过 Delta Sharing 开放协议来发现、分析和可视化与你共享的数据集。
要求
- Tableau Desktop 和 Tableau Server 2024.1 或更高版本
- 访问数据提供程序共享的凭据文件。 请参阅在开放共享模型中获取访问权限。
连接到 Azure DataBricks
若要使用 Delta Sharing 连接器连接到 Azure Databricks,请执行以下操作:
- 转到Tableau Exchange,按照说明下载 Delta Sharing 连接器,并将其放入相应的桌面文件夹中。
- 打开 Tableau Desktop。
- 在“连接器”页上,搜索“Delta Sharing(按 Databricks)” 。
- 选择“上传共享文件”,然后选择提供程序共享的凭据文件。
- 单击“获取数据”。
- 在数据资源管理器中,选择该表。
- (可选)添加 SQL 筛选器或行限制。
- 点击“获取表数据”。
Tableau Delta Sharing 连接器的限制
Tableau Delta Sharing 连接器具有以下限制:
- 该连接器加载的数据必须能够装入计算机的内存。 为了管理此要求,连接器会将导入的行数限制为在 Tableau 中设置的行限制。
- 所有列都作为类型
String
返回。 - 只有当 Delta Sharing 服务器支持predicateHint时,SQL 筛选器才有效。
请求新凭据
如果你的凭据激活 URL 或下载的凭据丢失、损坏或泄露,或者凭据过期且提供商未发送新凭据,请联系提供商请求新凭据。