Fabric 用 NotebookUtils (旧 MSSparkUtils)
Notebook のユーティリティ (NotebookUtils) は、Fabric Notebook で一般的なタスクを簡単に実行するのに役立つビルトイン パッケージです。 MSSparkUtils では、ファイル システムの操作、環境変数の取得、ノートブックの連結、シークレットの操作を実行できます。 NotebookUtils パッケージは、PySpark (Python)、Scala、SparkR ノートブック、Fabric パイプラインで利用できます。
Note
- MsSparkUtils の名前が正式に NotebookUtils に変更されました。 既存のコードは 後方互換対応 のままとなるので、重大な変更は発生しません。 継続的なサポートと新機能へのアクセスを確保するために notebookutils にアップグレードすることを 強くお勧めします。 mssparkutils 名前空間は今後廃止される予定です。
- NotebookUtils は、Spark 3.4 (Runtime v1.2) 以降 で動作するように設計されています。 今後、notebookutils 名前空間では、すべての新機能と更新プログラムが排他的にサポートされます。
ファイルシステム ユーティリティ
notebookutils.fs には、Azure Data Lake Storage (ADLS) Gen2 や Azure Blob Storage などのさまざまなファイルシステムを操作するためのユーティリティが揃っています。 Azure Data Lake Storage Gen2 および Azure Blob Storage へのアクセスを適切に構成するようにしてください。
次のコマンドを実行して、使用可能なメソッドの概要を取得します。
notebookutils.fs.help()
出力
notebookutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use notebookutils.fs.help("methodName") for more info about a method.
NotebookUtils でファイル システムを操作する方法は、Spark API と同じです。 たとえば、notebookutils.fs.mkdirs() と Fabric レイクハウスを使用する場合は以下のようになります。
使用方法 | HDFS ルートからの相対パス | ABFS ファイル システムの絶対パス | ドライバー ノード内のローカル ファイル システムの絶対パス |
---|---|---|---|
デフォルト以外のレイクハウス | サポートされていません | notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
デフォルトのLakehouse | "Files" または "Tables" の下のディレクトリ: notebookutils.fs.mkdirs("Files/<new_dir>") | notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
ファイルの一覧表示
ディレクトリの内容を一覧表示するには、notebookutils.fs.ls('ディレクトリ パス') を使用します。 次に例を示します。
notebookutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
notebookutils.fs.ls("file:/tmp") # based on local file system of driver node
ファイルのプロパティを表示します
このメソッドでは、ファイルのプロパティ (ファイル名、ファイル パス、ファイル サイズ、ディレクトリとファイルの区別) を返します。
files = notebookutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
新しいディレクトリの作成
このメソッドでは、指定されたディレクトリが存在しない場合に、そのディレクトリを作成し、必要に応じて親ディレクトリを作成します。
notebookutils.fs.mkdirs('new directory name')
notebookutils.fs.mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
notebookutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
ファイルのコピー
このメソッドでは、ファイルまたはディレクトリをコピーし、ファイル システム間のコピー アクティビティをサポートします。
notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
パフォーマンスの高いコピー ファイル
この方法では、特に大量のデータを扱う場合に、ファイルのコピーまたは移動をより効率的に行うことができます。 Fabric のパフォーマンスを向上させるには、従来の fastcp
メソッドの代わりに cp
を使用することをお勧めします。
Note
notebookutils.fs.fastcp()
では、OneLake 内のファイルのリージョン間のコピーはサポートされていません。 この場合は、代わりに notebookutils.fs.cp()
を使用できます。
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
ファイル コンテンツのプレビュー
このメソッドでは、指定したファイルの最初の 'maxBytes' バイトまでを、UTF-8 でエンコードされた文字列として返します。
notebookutils.fs.head('file path', maxBytes to read)
ファイルの移動
このメソッドでは、ファイルまたはディレクトリを移動し、ファイル システム間の移動をサポートします。
notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
ファイルの書き込み
このメソッドでは、指定した文字列を UTF-8 でエンコードしてファイルに書き込みます。
notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
ファイルへのコンテンツの追加
このメソッドでは、指定した文字列を UTF-8 でエンコードしてファイルに追加します。
notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Note
notebookutils.fs.append()
とnotebookutils.fs.put()
は、原子性の保証がないため、同じファイルへの同時書き込みはサポートされていません。notebookutils.fs.append
API をfor
ループで使用して同じファイルに書き込む場合は、繰り返し書き込む間にsleep
ステートメントを 0.5 秒から 1 秒程度追加することをお勧めします。 これは、notebookutils.fs.append
API の内部flush
操作が非同期であるため、短い遅延によりデータ整合性を確保するのに役立つからです。
ファイルまたはディレクトリの削除
このメソッドでは、ファイルまたはディレクトリを削除します。
notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
ディレクトリのマウント、マウント解除
詳細な使用方法については、「ファイルのマウントとマウント解除」を参照してください。
Notebook のユーティリティ
Notebook のユーティリティを使用して、ノートブックを実行したり、値を持つノートブックを終了したりします。 次のコマンドを実行して、使用可能なメソッドの概要を取得します。
notebookutils.notebook.help()
出力:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.
[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.
Use notebookutils.notebook.help("methodName") for more info about a method.
Note
Notebook ユーティリティは、Apache Spark ジョブ定義 (SJD) には適用されません。
Notebookの参照
このメソッドでは、Notebookを参照し、その終了値を返します。 関数呼び出しの入れ子は、対話形式またはパイプラインで、Notebookで実行できます。 参照されたNotebookは、この関数を呼び出すNotebookの Spark プールで実行されます。
notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
次に例を示します。
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
Fabric Notebookでは、ワークスペース ID を指定することで、複数のワークスペース間でのNotebookの参照もサポートしています。
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
セルの出力で参照の実行のスナップショット リンクを開くことができます。 このスナップショットでは、コードの実行結果をキャプチャし、参照の実行を簡単にデバッグできるようにします。
Note
- ワークスペース間参照Notebookは、ランタイム バージョン 1.2 以降 でサポートされています。
- [Notebook リソース]の下のファイルを使用する場合は、参照先のNotebookで
notebookutils.nbResPath
を使用して、対話型実行と同じフォルダーを指していることを確認します。
複数のNotebook参照を並列で実行する
重要
この機能はプレビュー中です。
notebookutils.notebook.runMultiple()
メソッドを使用すると、複数のNotebookを並列で、または定義済みのトポロジ構造で実行できます。 この API は、Spark セッション内でマルチスレッド実装メカニズムを使用しています。つまり、参照ノートブックの実行でコンピューティング リソースが共有されます。
notebookutils.notebook.runMultiple()
を使用すると、以下のことができます。
各Notebookが完了するのを待たずに、複数のNotebookを同時に実行します。
単純な JSON 形式を使用して、Notebookの依存関係と実行順序を指定します。
Spark コンピューティング リソースの使用を最適化し、Fabric プロジェクトのコストを削減します。
出力内の各Notebook実行レコードのスナップショットを表示し、Notebook タスクを容易にデバッグ/監視します。
各エグゼクティブ アクティビティの終了値を取得し、ダウンストリーム タスクで使用します。
notebookutils.notebook.help("runMultiple") を実行して、例や詳細な使用方法を見つけることもできます。
このメソッドを使用して一連のNotebookを並列で実行する簡単な例を次に示します。
notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
ルート Notebookからの実行結果は次のとおりです。
次に、notebookutils.notebook.runMultiple()
を使用してトポロジ構造を持つ複数のNotebookを実行する例を示します。 コード エクスペリエンスを使用してNotebookを簡単にオーケストレーションするには、このメソッドを使用します。
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
ルート Notebookからの実行結果は次のとおりです。
DAG が正しく定義されているかどうかを確認するメソッドも提供しています。
notebookutils.notebook.validateDAG(DAG)
Note
- 複数のNotebook実行での並列処理の次数は、Spark セッションの使用可能なコンピューティング リソースの合計に制限されます。
- Notebookアクティビティまたは同時実行Notebookの上限は 50です。 この制限を超えると、コンピューティング リソースの使用率が高いため、安定性とパフォーマンスの問題が発生する可能性があります。 問題が発生した場合は、DAG パラメーターの
runMultiple
フィールドを調整して、Notebookを複数の呼び出しに分離するか、コンカレンシーを減らすことを検討してください。 - DAG 全体のデフォルトのタイムアウトは 12 時間で、子Notebookの各セルのデフォルトのタイムアウトは 90 秒です。 タイムアウトを変更するには、DAG パラメーターの timeoutInSeconds フィールドと timeoutPerCellInSeconds フィールドを設定します。
Notebookを終了する
このメソッドでは、値を指定してNotebookを終了します。 関数呼び出しの入れ子は、対話形式またはパイプラインで、Notebookで実行できます。
exit() 関数をNotebookから対話形式で呼び出すと、Fabric Notebookによって例外がスローされて、以後に続くセルの実行がスキップされ、Spark セッションは維持されます。
exit() 関数を呼び出すパイプラインでNotebookのオーケストレーションを行うと、そのNotebook アクティビティは終了して終了値を返し、パイプラインの実行が完了し、Spark セッションが停止します。
参照されたNotebook内で exit() 関数を呼び出すと、Fabric Spark では、参照されたNotebookの実行をその時点で停止し、run() 関数を呼び出すメイン Notebook内の次のセルから実行を継続します。 たとえば、Notebook1 には 3 つのセルがあり、2 番目のセルで exit() 関数が呼び出されるとします。 Notebook2 には 5 つのセルがあり、3 番目のセルで run(notebook1) が呼び出されるとします。 Notebook2 を実行すると、Notebook1 は、2 番目のセルで exit() 関数に到達して停止します。 その後、Notebook2 の 4 番目のセルと 5 番目のセルは引き続き実行されます。
notebookutils.notebook.exit("value string")
Note
exit() 関数は、現在のセルの出力を上書きします。他のコード ステートメントの出力が失われるのを防ぐため、別のセルで notebookutils.notebook.exit()
を呼び出してください。
次に例を示します。
Sample1 Notebookには、以下の 2 つのセルがあります。
セル 1 では、input パラメーターがデフォルト値 10 で定義されます。
セル 2 では、Notebookが終了し、終了値として input が返されます。
デフォルト値を使用して、別のNotebookで Sample1 を実行できます。
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
出力:
Notebook is executed successfully with exit value 10
別のNotebookで Sample1 を実行して、入力値を 20 に設定できます。
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
出力:
Notebook is executed successfully with exit value 20
ノートブックの成果物を管理する
notebookutils.notebook
には、プログラムによってノートブック項目を管理するための専用のユーティリティが用意されています。 これらの API は、ノートブック項目の作成、取得、更新、削除を簡単に行うのに役立ちます。
これらのメソッドを効果的に利用するには、次の使用例を検討してください。
ノートブックの作成
with open("/path/to/notebook.ipynb", "r") as f:
content = f.read()
artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")
ノートブックの内容の取得
artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")
ノートブックの更新
updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name", "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")
ノートブックの削除
is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")
ワークスペース内のノートブックの一覧表示
artifacts_list = notebookutils.notebook.list("optional_workspace_id")
資格情報ユーティリティ
認証情報ユーティリティを使用すると、アクセス トークンを取得し、Azure Key Vault のシークレットを管理できます。
次のコマンドを実行して、使用可能なメソッドの概要を取得します。
notebookutils.credentials.help()
出力:
Help on module notebookutils.credentials in notebookutils:
NAME
notebookutils.credentials - Utility for credentials operations in Fabric
FUNCTIONS
getSecret(akvName, secret) -> str
Gets a secret from the given Azure Key Vault.
:param akvName: The name of the Azure Key Vault.
:param secret: The name of the secret.
:return: The secret value.
getToken(audience) -> str
Gets a token for the given audience.
:param audience: The audience for the token.
:return: The token.
help(method_name=None)
Provides help for the notebookutils.credentials module or the specified method.
Examples:
notebookutils.credentials.help()
notebookutils.credentials.help("getToken")
:param method_name: The name of the method to get help with.
DATA
creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...
FILE
/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py
トークンを取得する
getToken は、特定の対象ユーザーと名前 (省略可能) の Microsoft Entra トークンを返します。 次の一覧に、現在利用可能な対象ユーザー キーを示します。
- ストレージ オーディエンス リソース: "storage"
- Power BI リソース: "pbi"
- Azure Key Vault リソース: "keyvault"
- Synapse RTA KQL DB リソース: "kusto"
トークンを取得するには、以下のコマンドを実行します。
notebookutils.credentials.getToken('audience Key')
ユーザー資格情報を使用したシークレットの取得
getSecret は、ユーザー資格情報を使用して、指定した Azure Key Vault のエンドポイントの Azure Key Vault シークレットを返します。
notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
ファイルのマウントとマウント解除
Fabric では、Microsoft Spark Utilities パッケージの次のマウント シナリオをサポートしています。 mount、unmount、getMountPath()、mounts() の API を使用して、リモート ストレージ (ADLS Gen2) を、すべての作業ノード (ドライバー ノードとワーカー ノード) にアタッチできます。 ストレージ マウント ポイントを確立した後は、データがローカル ファイル システムに格納されているかのように、ローカル ファイル API を使用してアクセスできます。
ADLS Gen2 アカウントをマウントする方法
次の例では、Azure Data Lake Storage Gen2 をマウントする方法を示します。 Blob Storage のマウントも同様に機能します。
この例では、storegen2 という名前の 1 つの Data Lake Storage Gen2 アカウントがあり、そのアカウントに mycontainer という名前のコンテナーが 1 つあり、それをNotebook Spark セッション内で /test にマウントしようとしていることを前提としています。
mycontainer という名前のコンテナーをマウントするには、まず、このコンテナーにアクセスする権限が作業者に付与されているかどうかを notebookutils がチェックする必要があります。 現在、Fabric では、トリガー マウント操作の認証方法として、accountKey と sastoken の 2 つをサポートしています。
Shared Access Signature トークンまたはアカウント キーを使ってマウントする
NotebookUtils では、ターゲットをマウントする際に、アカウント キーまたは Shared Access Signature (SAS) トークンをパラメーターとして明示的に渡すことができます。
セキュリティ上の理由から、アカウント キーまたは SAS トークンを Azure Key Vault に保存することをお勧めします (次のスクリーンショットを参照)。 以後は、notebookutils.credentials.getSecret API でそれらを取得できます。 Azure Key Vault の詳細については、「Azure Key Vault のマネージド ストレージ アカウント キーについて」をご覧ください。
accountKey メソッドのサンプル コード:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
sastoken のサンプル コード:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
マウント パラメーター:
- fileCacheTimeout: BLOB は、既定で 120 秒間ローカル一時フォルダーにキャッシュされます。 この間、blobfuse はファイルが最新かどうかを確認しません。 必要に応じて、パラメーターの指定によりデフォルトのタイムアウト時間を変更できます。 複数のクライアントが同時にファイルを変更する場合、ローカル ファイルとリモート ファイルの間の不整合を回避するために、キャッシュ時間を短縮するか、0 に変更し、常に最新のファイルをサーバーから取得することをお勧めします。
- timeout: マウント操作のタイムアウトは、デフォルトで 120 秒です。 必要に応じて、パラメーターの指定によりデフォルトのタイムアウト時間を変更できます。 実行者が多すぎる場合や、マウントのタイムアウトが発生する場合は、値を増やすことをお勧めします。
これらのパラメーターの使用例を示します。
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Note
セキュリティ上の理由から、コードに認証情報を直接埋め込まないことをお勧めします。 認証情報の保護を強化するため、ノートブックの出力に表示されるシークレットはすべて編集されます。 詳細については、「シークレットの編集」を参照してください。
Lakehouseをマウントする方法
レイクハウスを /<mount_name> にマウントするサンプル コード:
notebookutils.fs.mount(
"abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse",
"/<mount_name>"
)
notebookutils fs API を使ってマウント ポイント内のファイルにアクセスする
マウント操作の主な目的は、ユーザーがローカル ファイル システム API を使って、リモート ストレージ アカウントに格納されているデータにアクセスできるようすることです。 また、notebookutils fs API を使用し、マウントされたパスをパラメーターとして指定して、データにアクセスすることもできます。 このパス形式は少し異なります。
たとえば、Data Lake Storage Gen2 コンテナー mycontainer が、mount API で /test にマウントされているとします。 ローカル ファイル システム API を使ってそのデータにアクセスするときのパスの形式は、次のようになります。
/synfs/notebook/{sessionId}/test/{filename}
notebookutils fs API を使用してデータにアクセスする場合は、getMountPath() を使用して正確なパスを取得することをお勧めします。
path = notebookutils.fs.getMountPath("/test")
ディレクトリのリストを取得します。
notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
ファイルの内容を読み取ります。
notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
ディレクトリを作成します。
notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
ローカル パスを使用してマウント ポイント内のファイルにアクセスする
マウント ポイント内のファイルは、標準のファイル システムを使って簡単に読み書きできます。 Python の例を次に示します。
#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
既存のマウント ポイントをチェックする方法
notebookutils.fs.mounts() API を使用すると、既にあるすべてのマウント ポイントの情報を確認できます。
notebookutils.fs.mounts()
マウント ポイントをマウント解除する方法
マウント ポイント (この例では /test) をマウント解除するには、次のようなコードを実行します。
notebookutils.fs.unmount("/test")
既知の制限事項
現在、マウントはジョブ レベルで構成されています。マウント ポイントが存在するか、利用不可であるかを mounts API で確認することをお勧めします。
マウント解除メカニズムは自動で適用されません。 アプリケーションの実行が完了したときに、マウント ポイントをマウント解除してディスク領域を解放するには、コードでマウント解除 API を明示的に呼び出す必要があります。 そうしないと、マウント ポイントは、アプリケーションの実行が完了した後もノードに存在します。
ADLS Gen1 ストレージ アカウントのマウントはサポートされていません。
Lakehouse ユーティリティ
notebookutils.lakehouse
は、Lakehouse アイテムの管理用に調整されたユーティリティを提供します。 これらのユーティリティを使用すると、ユーザーはレイクハウス アーティファクトを簡単に作成、取得、更新、削除できます。
メソッドの概要
以下は、notebookutils.lakehouse
によって提供される使用可能なメソッドの概要です。
# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]
# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table]
# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table]
使用例
これらのメソッドを効果的に利用するには、次の使用例を検討してください。
レイクハウスの作成
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
レイクハウスの取得
artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")
レイクハウスの更新
updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
レイクハウスの削除
is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")
ワークスペース内のレイクハウスの一覧表示
artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")
レイクハウス内のすべてのテーブルの一覧表示
artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")
レイクハウスでのテーブル読み込み操作の開始
notebookutils.lakehouse.loadTable(
{
"relativePath": "Files/myFile.csv",
"pathType": "File",
"mode": "Overwrite",
"recursive": False,
"formatOptions": {
"format": "Csv",
"header": True,
"delimiter": ","
}
}, "table_name", "artifact_name", "optional_workspace_id")
追加情報
各メソッドとそのパラメーターの詳細については、notebookutils.lakehouse.help("methodName")
関数を利用します。
ランタイム ユーティリティ
セッション コンテキスト情報を表示する
notebookutils.runtime.context
を使用して、現在のライブ セッションのコンテキスト情報 (Notebook名、デフォルトのLakehouse、ワークスペース情報、パイプライン実行であるかどうかなど) を取得できます。
notebookutils.runtime.context
セッション管理
対話型セッションを停止する
手動で停止ボタンをクリックする代わりに、コードで API を呼び出して対話型セッションを停止する方が便利な場合があります。 このような場合は、Scala と PySpark で使用できる、コードによる対話型セッションの停止をサポートする API notebookutils.session.stop()
を提供します。
notebookutils.session.stop()
notebookutils.session.stop()
API はバックグラウンドで現在の対話型セッションを非同期的に停止し、Spark セッションを停止し、セッションによって占有されているリソースを解放して、同じプール内の他のセッションで使用できるようにします。
Python インタープリターを再起動する
notebookutils.session ユーティリティは、Python インタープリターを再起動する方法を提供します。
notebookutils.session.restartPython()
Note
- ノートブック参照の実行ケースでは、
restartPython()
は、参照されている現在のノートブックの Python インタープリターのみを再起動します。 - まれに、Spark リフレクション メカニズムが原因でコマンドが失敗することがあります。再試行を追加すると、問題が軽減される可能性があります。
既知の問題
ランタイム バージョン 1.2 以降を使用して
notebookutils.help()
を実行する場合、一覧表示されている fabricClient、PBIClient API は現在サポートされていませんが、今後使用できるようになります。 また、現時点では Credentials API は Scala ノートブックではサポートされていません。Python ノートブックでは、セッション管理に notebookutils.session ユーティリティを使用する場合、停止、restartPython API がサポートされていません。