适用于 Fabric 的 Microsoft Spark 实用程序 (MSSparkUtils)

Microsoft Spark 实用工具 (MSSparkUtils) 是内置的包,可帮助你轻松执行常见任务。 可以使用 MSSparkUtils 来处理文件系统、获取环境变量、将笔记本链在一起以及处理机密。 MSSparkUtils 包在 PySpark (Python) Scala、SparkR 笔记本和 Fabric 管道中可用。

注意

  • MsSparkUtils 已正式重命名为 NotebookUtils。 现有代码将保持向后兼容,不会造成任何中断性变更。 强烈建议升级到 Notebookutils,以确保对新功能的持续支持和访问。 mssparkutils 命名空间将来会停用。
  • NotebookUtils 旨在与 Spark 3.4 (Runtime v1.2) 及更高版本配合使用。 今后,所有新功能和更新都将由 Notebookutils 命名空间独家支持。

文件系统实用工具

mssparkutils.fs 提供用于处理各种文件系统的实用工具,包括 Azure Data Lake Storage (ADLS) Gen2 和 Azure Blob 存储。 请确保正确配置对 Azure Data Lake Storage Gen2Azure Blob 存储的访问。

运行以下命令以概要了解可用的方法:

from notebookutils import mssparkutils
mssparkutils.fs.help()

输出

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils 以与 Spark API 相同的方式处理文件系统。 以 mssparkuitls.fs.mkdirs() 和 Fabric 湖屋用法为例:

使用情况 HDFS 根目录的相对路径 ABFS 文件系统的绝对路径 驱动程序节点中本地文件系统的绝对路径
非默认湖屋 不支持 mssparkutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
默认湖屋 “Files”或“Tables”下的目录:mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

列出文件

若要列出目录的内容,使用 mssparkutils.fs.ls (“你的目录路径”) 。 例如:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

查看文件属性

此方法会返回文件属性,其中包括文件名、文件路径、文件大小,以及它是目录还是文件。

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

创建新目录

此方法会创建给定目录(如果不存在),并创建任何必要的父目录。

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

复制文件

此方法会复制文件或目录,并支持跨文件系统复制活动。

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

高性能复制文件

此方法会提供更快的方式来复制或移动文件,尤其是面对大量数据时。

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

预览文件内容

此方法以 UTF-8 编码的字符串形式返回给定文件的第一个“maxBytes”之前的字节。

mssparkutils.fs.head('file path', maxBytes to read)

移动文件

此方法会移动文件或目录,并支持跨文件系统移动。

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

写入文件

此方法将以 UTF-8 编码的给定字符串写入文件。

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

将内容追加到文件

此方法将以 UTF-8 编码的给定字符串追加到文件中。

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

注意

for 循环中使用 mssparkutils.fs.append API 将内容写入到同一文件中时,建议每隔大约 0.5~1 秒在重复写入之间添加 sleep 语句。 这是因为 mssparkutils.fs.append API 的内部 flush 操作是异步的,因此短暂的延迟有助于确保数据完整性。

删除文件或目录

此方法会删除文件或目录。

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

装载/卸载目录

文件装载和卸载中查找有关详细使用情况的更多信息。

笔记本实用工具

使用 MSSparkUtils 笔记本实用工具运行笔记本或使用值退出笔记本。 运行以下命令以概要了解可用的方法:

mssparkutils.notebook.help()

输出:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

注意

笔记本实用工具不适用于 Apache Spark 作业定义 (SJD)。

引用笔记本

此方法会引用笔记本并返回其退出值。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。 所引用的笔记本将在其调用此函数的笔记本的 Spark 池上运行。

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

例如:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

Fabric 笔记本还支持通过指定工作区 ID 跨多个工作区引用笔记本。

mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

可以在单元格输出中打开引用运行的快照链接。 快照会捕获代码运行结果,并允许你轻松调试引用运行。

显示引用运行结果的屏幕截图。

包含代码运行结果的快照的屏幕截图。

注意

  • 运行时版本 1.2 及更高版本支持跨工作区引用笔记本。
  • 如果使用笔记本资源下的文件,请在引用的笔记本中使用 mssparkutils.nbResPath,以确保它指向与交互式运行相同的文件夹。

并行引用运行多个笔记本

重要

此功能目前为预览版

mssparkutils.notebook.runMultiple() 方法让你可以并行运行多个笔记本,或使用预定义的拓扑结构。 该 API 在 Spark 会话中使用多线程实现机制,这意味着计算资源由引用笔记本运行共享。

通过 mssparkutils.notebook.runMultiple(),您可以:

  • 同时执行多个笔记本,而无需等待每个笔记本完成。

  • 使用简单的 JSON 格式为笔记本指定依赖项和执行顺序。

  • 优化 Spark 计算资源的使用,并降低 Fabric 项目的成本。

  • 在输出中查看每个笔记本运行记录的快照,并方便地调试/监视笔记本任务。

  • 获取每个执行活动的退出值,并在下游任务中使用它们。

还可以尝试运行 mssparkutils.notebook.help("runMultiple") 来查找示例和详细用法。

下面是使用此方法并行运行一组笔记本的简单示例:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

根笔记本中的执行结果如下所示:

引用笔记本列表的屏幕截图。

下面是使用 mssparkutils.notebook.runMultiple() 运行具有拓扑结构的笔记本的示例。 使用此方法通过代码体验轻松编排笔记本。

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

根笔记本中的执行结果如下所示:

带参数引用笔记本列表的屏幕截图。

注意

  • 多个笔记本运行的并行度受限于 Spark 会话的总可用计算资源。
  • 笔记本活动或并发笔记本的上限为 50。 超出此限制可能会因计算资源使用过度而导致稳定性和性能问题。 如果出现相关问题,请考虑将笔记本拆分为多个 runMultiple 调用,或者通过调整 DAG 参数中的“并发”字段来减少并发。
  • 整个 DAG 的默认超时为 12 小时,子笔记本中每个单元的默认超时为 90 秒。 可通过在 DAG 参数中设置“timeoutInSeconds”和“timeoutPerCellInSeconds”字段更改超时。

退出笔记本

此方法会退出具有值的笔记本。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。

  • 在笔记本中以交互方式调用 exit() 函数时,Fabric 笔记将引发异常、跳过后续运行单元格,并使 Spark 会话保持活动状态。

  • 在管道中协调调用 exit() 函数的笔记本时,笔记本活动将返回退出值,完成管道运行,并停止 Spark 会话。

  • 在所引用的笔记本中调用 exit() 函数时,Fabric Spark 将在其中停止进一步的执行,并继续运行调用 run() 函数的主笔记本中的下一个单元格。 例如:Notebook1 有三个单元格,调用第二个单元格中的 exit() 函数。 Notebook2 有五个单元格,调用第三个单元格中的 run(notebook1) 函数。 运行 Notebook2 时,如果命中 exit() 函数,Notebook1 将在第二个单元格停止。 Notebook2 会继续运行其第四和第五个单元格。

mssparkutils.notebook.exit("value string")

例如:

示例1 笔记本有以下两个单元格:

  • 单元格 1 定义 input 参数,默认值设为 10。

  • 单元格 2 退出笔记本,input 作为退出值。

显示退出函数示例的屏幕截图。

可以使用默认值在另一笔记本中运行 Sample1:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

输出:

Notebook executed successfully with exit value 10

可以在另一笔记本中运行 Sample1,并将 input 值设为 20 :

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

输出:

Notebook executed successfully with exit value 20

凭据实用工具

可以使用 MSSparkUtils 凭据实用工具获取访问令牌,并管理 Azure Key Vault 中的机密。

运行以下命令以概要了解可用的方法:

mssparkutils.credentials.help()

输出:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

获取令牌

getToken 为给定受众和名称返回 Microsoft Entra 令牌(可选)。 以下列表显示了当前可用的受众密钥:

  • 存储受众资源: "storage"
  • Power BI 资源: "pbi"
  • 创建 Azure 密钥保管库资源: "keyvault"
  • Synapse RTA KQL DB 资源: "kusto"

运行以下命令以获取这些令牌:

mssparkutils.credentials.getToken('audience Key')

使用用户凭据获取机密

getSecret 将会使用用户凭据返回给定 Azure Key Vault 端点和机密名称的 Azure Key Vault 机密。

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

文件装载和卸载

Fabric 支持以下 Microsoft Spark 实用工具包中的装载方案。 可以使用 mount、unmount、getMountPath() 和 mounts() API 将远程存储 (ADLS Gen2) 附加到所有工作节点(驱动程序节点和工作器节点)。 存储装入点就位后,使用本地文件 API 访问数据,如同数据存储在本地文件系统中一样。

如何装载 ADLS Gen2 帐户

以下示例演示如何装载 Azure Data Lake Storage Gen2。 装载 Blob 存储的方式是类似的。

本示例假定你有一个名为 storegen2 的 Data Lake Storage Gen2 帐户,并且该帐户有一个名为 mycontainer 的容器,你希望将其装载到笔记本 Spark 会话中的 /test

屏幕截图显示从何处选择容器以装载。

若要装载名为 mycontainer 的容器,mssparkutils 首先需要检查你是否具有访问该容器的权限。 目前,Fabric 支持两种用于触发器装载操作的身份验证方法:accountKeysastoken

通过共享访问签名令牌或帐户密钥进行装载

MSSparkUtils 支持将账户密钥或 共享访问签名 (SAS) 令牌作为参数显式传递以挂载目标。

出于安全原因,建议将帐户密钥或 SAS 令牌存储在 Azure Key Vault 中(如以下屏幕截图所示)。 可以使用 mssparkutils.credentials.getSecret API 来检索它们。 有关 Azure Key Vault 的详细信息,请参阅关于 Azure Key Vault 托管存储帐户密钥

显示机密在 Azure Key Vault 中的存储位置的屏幕截图。

accountKey 方法的示例代码:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

sastoken 的示例代码:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

注意

如果 mssparkutils 不可用,可能需要将它导入:

from notebookutils import mssparkutils

装载参数:

  • fileCacheTimeout:默认情况下,Blob 将在本地临时文件夹中缓存 120 秒。 在此期间,blobfuse 不会检查文件是否是最新的。 可以将参数设置为更改默认超时时间。 当多个客户端同时修改文件时,为了避免本地文件与远程文件之间的不一致,我们建议缩短缓存时间,甚至将其更改为 0,并始终从服务器获取最新文件。
  • timeout:默认情况下,装载操作超时为 120 秒。 可以将参数设置为更改默认超时时间。 当执行程序过多或装载超时时,建议增加值。

可以使用如下所示的这些参数:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

注意

出于安全原因,建议不要在代码中存储凭据。 为了进一步保护凭据,我们将在笔记本输出中修订机密。 有关详细信息,请查看机密编修

如何装载湖屋

将湖屋装载到 /test 的示例代码。

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

注意

不支持装载区域端点。 Fabric 仅支持装载全局端点 onelake.dfs.fabric.microsoft.com

使用 mssparktuils fs API 访问装入点下的文件

装载操作的主要目的是让客户能够使用本地文件系统 API 访问远程存储帐户中存储的数据。 你也可以使用 mssparkutils fs API 以装载路径作为参数来访问数据。 此路径格式稍有不同。

假设已使用装载 API 将 Data Lake Storage Gen2 容器 mycontainer 装载到 /test。 当你使用本地文件系统 API 访问数据时,路径格式如下所示:

/synfs/notebook/{sessionId}/test/{filename}

如果要使用 mssparkutils fs API 访问数据,建议使用 getMountPath() 获取准确路径:

path = mssparkutils.fs.getMountPath("/test")
  • 列出目录:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • 读取文件内容:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • 创建目录:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

通过本地路径访问装载点下的文件

可以使用标准文件系统在装载点中轻松读取和写入文件。 下面是 Python 示例:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

如何检查现有装入点

可以使用 mssparkutils.fs.mounts() API 来检查所有现有装入点信息:

mssparkutils.fs.mounts()

如何卸载装入点

使用以下代码可卸载装入点(在此示例中为 /test):

mssparkutils.fs.unmount("/test")

已知的限制

  • 当前装载是作业级别的配置,我们建议使用 mounts API 来检查装入点是否存在或是否可用。

  • 卸载机制不是自动进行的。 应用程序运行完成后,若要卸载装入点并释放磁盘空间,需要在代码中显式调用卸载 API。 否则,应用程序运行完成后,装入点仍会存在于节点中。

  • 不支持装载 ADLS Gen1 存储帐户。

Lakehouse 实用工具

mssparkutils.lakehouse 提供专门为管理 Lakehouse 项目而定制的实用工具。 这些实用工具让用户可以毫不费力地创建、检索、更新和删除 Lakehouse 项目。

注意

Lakehouse API 仅在运行时版本 1.2+ 上受支持。

方法概述

下面是以下 mssparkutils.lakehouse 所提供的可用方法的概述:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

用法示例

若要有效利用这些方法,请考虑以下使用情况示例:

创建 Lakehouse 项目

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

检索 Lakehouse 项目

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

更新 Lakehouse 项目

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

删除 Lakehouse 项目

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

列出 Lakehouse 项目

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

其他信息

有关每个方法及其参数的更详细信息,请使用 mssparkutils.lakehouse.help("methodName") 函数。

借助 MSSparkUtils 的 Lakehouse 实用工具,让管理 Lakehouse 项目变得更高效,还可以集成到 Fabric 管道中,从而提升整体数据管理体验。

请随意浏览这些实用工具,并将其合并到 Fabric 工作流中,以便无缝管理 Lakehouse 项目。

运行时实用工具

显示会话上下文信息

借助 mssparkutils.runtime.context,可以获取当前实时会话的上下文信息,包括笔记本名称、默认 Lakehouse、工作区信息、是否是管道运行等。

mssparkutils.runtime.context

已知问题

使用 1.2 以上的运行时版本并运行 mssparkutils.help() 时,列出的 fabricClientwarehouseworkspace API 目前不支持,将在后续版本中提供。