适用于 Fabric 的 Microsoft Spark 实用程序 (MSSparkUtils)
Microsoft Spark 实用工具 (MSSparkUtils) 是内置的包,可帮助你轻松执行常见任务。 可以使用 MSSparkUtils 来处理文件系统、获取环境变量、将笔记本链在一起以及处理机密。 MSSparkUtils 包在 PySpark (Python) Scala、SparkR 笔记本和 Fabric 管道中可用。
注意
- MsSparkUtils 已正式重命名为 NotebookUtils。 现有代码将保持向后兼容,不会造成任何中断性变更。 强烈建议升级到 Notebookutils,以确保对新功能的持续支持和访问。 mssparkutils 命名空间将来会停用。
- NotebookUtils 旨在与 Spark 3.4 (Runtime v1.2) 及更高版本配合使用。 今后,所有新功能和更新都将由 Notebookutils 命名空间独家支持。
文件系统实用工具
mssparkutils.fs 提供用于处理各种文件系统的实用工具,包括 Azure Data Lake Storage (ADLS) Gen2 和 Azure Blob 存储。 请确保正确配置对 Azure Data Lake Storage Gen2 和 Azure Blob 存储的访问。
运行以下命令以概要了解可用的方法:
from notebookutils import mssparkutils
mssparkutils.fs.help()
输出
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
MSSparkUtils 以与 Spark API 相同的方式处理文件系统。 以 mssparkuitls.fs.mkdirs() 和 Fabric 湖屋用法为例:
使用情况 | HDFS 根目录的相对路径 | ABFS 文件系统的绝对路径 | 驱动程序节点中本地文件系统的绝对路径 |
---|---|---|---|
非默认湖屋 | 不支持 | mssparkutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
默认湖屋 | “Files”或“Tables”下的目录:mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
列出文件
若要列出目录的内容,使用 mssparkutils.fs.ls (“你的目录路径”) 。 例如:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
查看文件属性
此方法会返回文件属性,其中包括文件名、文件路径、文件大小,以及它是目录还是文件。
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
创建新目录
此方法会创建给定目录(如果不存在),并创建任何必要的父目录。
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
复制文件
此方法会复制文件或目录,并支持跨文件系统复制活动。
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
高性能复制文件
此方法会提供更快的方式来复制或移动文件,尤其是面对大量数据时。
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
预览文件内容
此方法以 UTF-8 编码的字符串形式返回给定文件的第一个“maxBytes”之前的字节。
mssparkutils.fs.head('file path', maxBytes to read)
移动文件
此方法会移动文件或目录,并支持跨文件系统移动。
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
写入文件
此方法将以 UTF-8 编码的给定字符串写入文件。
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
将内容追加到文件
此方法将以 UTF-8 编码的给定字符串追加到文件中。
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
注意
在 for
循环中使用 mssparkutils.fs.append
API 将内容写入到同一文件中时,建议每隔大约 0.5~1 秒在重复写入之间添加 sleep
语句。 这是因为 mssparkutils.fs.append
API 的内部 flush
操作是异步的,因此短暂的延迟有助于确保数据完整性。
删除文件或目录
此方法会删除文件或目录。
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
装载/卸载目录
在文件装载和卸载中查找有关详细使用情况的更多信息。
笔记本实用工具
使用 MSSparkUtils 笔记本实用工具运行笔记本或使用值退出笔记本。 运行以下命令以概要了解可用的方法:
mssparkutils.notebook.help()
输出:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
注意
笔记本实用工具不适用于 Apache Spark 作业定义 (SJD)。
引用笔记本
此方法会引用笔记本并返回其退出值。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。 所引用的笔记本将在其调用此函数的笔记本的 Spark 池上运行。
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
例如:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
Fabric 笔记本还支持通过指定工作区 ID 跨多个工作区引用笔记本。
mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
可以在单元格输出中打开引用运行的快照链接。 快照会捕获代码运行结果,并允许你轻松调试引用运行。
注意
- 运行时版本 1.2 及更高版本支持跨工作区引用笔记本。
- 如果使用笔记本资源下的文件,请在引用的笔记本中使用
mssparkutils.nbResPath
,以确保它指向与交互式运行相同的文件夹。
并行引用运行多个笔记本
重要
此功能目前为预览版。
mssparkutils.notebook.runMultiple()
方法让你可以并行运行多个笔记本,或使用预定义的拓扑结构。 该 API 在 Spark 会话中使用多线程实现机制,这意味着计算资源由引用笔记本运行共享。
通过 mssparkutils.notebook.runMultiple()
,您可以:
同时执行多个笔记本,而无需等待每个笔记本完成。
使用简单的 JSON 格式为笔记本指定依赖项和执行顺序。
优化 Spark 计算资源的使用,并降低 Fabric 项目的成本。
在输出中查看每个笔记本运行记录的快照,并方便地调试/监视笔记本任务。
获取每个执行活动的退出值,并在下游任务中使用它们。
还可以尝试运行 mssparkutils.notebook.help("runMultiple") 来查找示例和详细用法。
下面是使用此方法并行运行一组笔记本的简单示例:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
根笔记本中的执行结果如下所示:
下面是使用 mssparkutils.notebook.runMultiple()
运行具有拓扑结构的笔记本的示例。 使用此方法通过代码体验轻松编排笔记本。
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
根笔记本中的执行结果如下所示:
注意
- 多个笔记本运行的并行度受限于 Spark 会话的总可用计算资源。
- 笔记本活动或并发笔记本的上限为 50。 超出此限制可能会因计算资源使用过度而导致稳定性和性能问题。 如果出现相关问题,请考虑将笔记本拆分为多个
runMultiple
调用,或者通过调整 DAG 参数中的“并发”字段来减少并发。 - 整个 DAG 的默认超时为 12 小时,子笔记本中每个单元的默认超时为 90 秒。 可通过在 DAG 参数中设置“timeoutInSeconds”和“timeoutPerCellInSeconds”字段更改超时。
退出笔记本
此方法会退出具有值的笔记本。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。
在笔记本中以交互方式调用 exit() 函数时,Fabric 笔记将引发异常、跳过后续运行单元格,并使 Spark 会话保持活动状态。
在管道中协调调用 exit() 函数的笔记本时,笔记本活动将返回退出值,完成管道运行,并停止 Spark 会话。
在所引用的笔记本中调用 exit() 函数时,Fabric Spark 将在其中停止进一步的执行,并继续运行调用 run() 函数的主笔记本中的下一个单元格。 例如:Notebook1 有三个单元格,调用第二个单元格中的 exit() 函数。 Notebook2 有五个单元格,调用第三个单元格中的 run(notebook1) 函数。 运行 Notebook2 时,如果命中 exit() 函数,Notebook1 将在第二个单元格停止。 Notebook2 会继续运行其第四和第五个单元格。
mssparkutils.notebook.exit("value string")
例如:
示例1 笔记本有以下两个单元格:
单元格 1 定义 input 参数,默认值设为 10。
单元格 2 退出笔记本,input 作为退出值。
可以使用默认值在另一笔记本中运行 Sample1:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
输出:
Notebook executed successfully with exit value 10
可以在另一笔记本中运行 Sample1,并将 input 值设为 20 :
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
输出:
Notebook executed successfully with exit value 20
凭据实用工具
可以使用 MSSparkUtils 凭据实用工具获取访问令牌,并管理 Azure Key Vault 中的机密。
运行以下命令以概要了解可用的方法:
mssparkutils.credentials.help()
输出:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name
获取令牌
getToken 为给定受众和名称返回 Microsoft Entra 令牌(可选)。 以下列表显示了当前可用的受众密钥:
- 存储受众资源: "storage"
- Power BI 资源: "pbi"
- 创建 Azure 密钥保管库资源: "keyvault"
- Synapse RTA KQL DB 资源: "kusto"
运行以下命令以获取这些令牌:
mssparkutils.credentials.getToken('audience Key')
使用用户凭据获取机密
getSecret 将会使用用户凭据返回给定 Azure Key Vault 端点和机密名称的 Azure Key Vault 机密。
mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
文件装载和卸载
Fabric 支持以下 Microsoft Spark 实用工具包中的装载方案。 可以使用 mount、unmount、getMountPath() 和 mounts() API 将远程存储 (ADLS Gen2) 附加到所有工作节点(驱动程序节点和工作器节点)。 存储装入点就位后,使用本地文件 API 访问数据,如同数据存储在本地文件系统中一样。
如何装载 ADLS Gen2 帐户
以下示例演示如何装载 Azure Data Lake Storage Gen2。 装载 Blob 存储的方式是类似的。
本示例假定你有一个名为 storegen2 的 Data Lake Storage Gen2 帐户,并且该帐户有一个名为 mycontainer 的容器,你希望将其装载到笔记本 Spark 会话中的 /test。
若要装载名为 mycontainer 的容器,mssparkutils 首先需要检查你是否具有访问该容器的权限。 目前,Fabric 支持两种用于触发器装载操作的身份验证方法:accountKey 和 sastoken。
通过共享访问签名令牌或帐户密钥进行装载
MSSparkUtils 支持将账户密钥或 共享访问签名 (SAS) 令牌作为参数显式传递以挂载目标。
出于安全原因,建议将帐户密钥或 SAS 令牌存储在 Azure Key Vault 中(如以下屏幕截图所示)。 可以使用 mssparkutils.credentials.getSecret API 来检索它们。 有关 Azure Key Vault 的详细信息,请参阅关于 Azure Key Vault 托管存储帐户密钥。
accountKey 方法的示例代码:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
sastoken 的示例代码:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
注意
如果 mssparkutils
不可用,可能需要将它导入:
from notebookutils import mssparkutils
装载参数:
- fileCacheTimeout:默认情况下,Blob 将在本地临时文件夹中缓存 120 秒。 在此期间,blobfuse 不会检查文件是否是最新的。 可以将参数设置为更改默认超时时间。 当多个客户端同时修改文件时,为了避免本地文件与远程文件之间的不一致,我们建议缩短缓存时间,甚至将其更改为 0,并始终从服务器获取最新文件。
- timeout:默认情况下,装载操作超时为 120 秒。 可以将参数设置为更改默认超时时间。 当执行程序过多或装载超时时,建议增加值。
可以使用如下所示的这些参数:
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
注意
出于安全原因,建议不要在代码中存储凭据。 为了进一步保护凭据,我们将在笔记本输出中修订机密。 有关详细信息,请查看机密编修。
如何装载湖屋
将湖屋装载到 /test 的示例代码。
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
注意
不支持装载区域端点。 Fabric 仅支持装载全局端点 onelake.dfs.fabric.microsoft.com
。
使用 mssparktuils fs API 访问装入点下的文件
装载操作的主要目的是让客户能够使用本地文件系统 API 访问远程存储帐户中存储的数据。 你也可以使用 mssparkutils fs API 以装载路径作为参数来访问数据。 此路径格式稍有不同。
假设已使用装载 API 将 Data Lake Storage Gen2 容器 mycontainer 装载到 /test。 当你使用本地文件系统 API 访问数据时,路径格式如下所示:
/synfs/notebook/{sessionId}/test/{filename}
如果要使用 mssparkutils fs API 访问数据,建议使用 getMountPath() 获取准确路径:
path = mssparkutils.fs.getMountPath("/test")
列出目录:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
读取文件内容:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
创建目录:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
通过本地路径访问装载点下的文件
可以使用标准文件系统在装载点中轻松读取和写入文件。 下面是 Python 示例:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
如何检查现有装入点
可以使用 mssparkutils.fs.mounts() API 来检查所有现有装入点信息:
mssparkutils.fs.mounts()
如何卸载装入点
使用以下代码可卸载装入点(在此示例中为 /test):
mssparkutils.fs.unmount("/test")
已知的限制
当前装载是作业级别的配置,我们建议使用 mounts API 来检查装入点是否存在或是否可用。
卸载机制不是自动进行的。 应用程序运行完成后,若要卸载装入点并释放磁盘空间,需要在代码中显式调用卸载 API。 否则,应用程序运行完成后,装入点仍会存在于节点中。
不支持装载 ADLS Gen1 存储帐户。
Lakehouse 实用工具
mssparkutils.lakehouse
提供专门为管理 Lakehouse 项目而定制的实用工具。 这些实用工具让用户可以毫不费力地创建、检索、更新和删除 Lakehouse 项目。
注意
Lakehouse API 仅在运行时版本 1.2+ 上受支持。
方法概述
下面是以下 mssparkutils.lakehouse
所提供的可用方法的概述:
# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]
用法示例
若要有效利用这些方法,请考虑以下使用情况示例:
创建 Lakehouse 项目
artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
检索 Lakehouse 项目
artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")
更新 Lakehouse 项目
updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
删除 Lakehouse 项目
is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")
列出 Lakehouse 项目
artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")
其他信息
有关每个方法及其参数的更详细信息,请使用 mssparkutils.lakehouse.help("methodName")
函数。
借助 MSSparkUtils 的 Lakehouse 实用工具,让管理 Lakehouse 项目变得更高效,还可以集成到 Fabric 管道中,从而提升整体数据管理体验。
请随意浏览这些实用工具,并将其合并到 Fabric 工作流中,以便无缝管理 Lakehouse 项目。
运行时实用工具
显示会话上下文信息
借助 mssparkutils.runtime.context
,可以获取当前实时会话的上下文信息,包括笔记本名称、默认 Lakehouse、工作区信息、是否是管道运行等。
mssparkutils.runtime.context
已知问题
使用 1.2 以上的运行时版本并运行 mssparkutils.help()
时,列出的 fabricClient、warehouse 和 workspace API 目前不支持,将在后续版本中提供。