NotebookUtils (бывший MSSparkUtils) для Fabric
Служебные программы записной книжки (NotebookUtils) — это встроенный пакет, помогающий легко выполнять распространенные задачи в Fabric Notebook. Записные книжки NotebookUtils можно использовать для работы с файловыми системами, получения переменных среды, объединения записных книжек и работы с секретами. Пакет NotebookUtils доступен в конвейерах PySpark (Python), Scala, SparkR и конвейерах Fabric.
Примечание.
- MsSparkUtils официально переименована в NotebookUtils. Существующий код остается обратно совместимым и не приведет к критическим изменениям. Настоятельно рекомендуется обновить записные книжки до записных книжек, чтобы обеспечить постоянную поддержку и доступ к новым функциям. Пространство имен mssparkutils будет прекращено в будущем.
- NotebookUtils предназначен для работы с Spark 3.4(Runtime версии 1.2) и более поздних версий. Все новые функции и обновления будут поддерживаться исключительно с пространством имен notebookutils в дальнейшем.
Служебные программы файловой системы
Notebookutils.fs предоставляет служебные программы для работы с различными файловыми системами, включая Azure Data Lake Storage (ADLS) 2-го поколения и Хранилище BLOB-объектов Azure. Убедитесь, что доступ к Azure Data Lake Storage 2-го поколения и Хранилищу BLOB-объектов Azure настроен правильно.
Чтобы получить общие сведения о доступных методах, выполните следующие команды:
notebookutils.fs.help()
Выходные данные
notebookutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use notebookutils.fs.help("methodName") for more info about a method.
NotebookUtils работает с файловой системой так же, как и API Spark. Возьмите notebookutils.fs.mkdirs() и Fabric lakehouse, например:
Использование | Относительный путь из корневого каталога HDFS | Абсолютный путь для файловой системы ABFS | Абсолютный путь к локальной файловой системе на узле драйвера |
---|---|---|---|
Не по умолчанию lakehouse | Не поддерживается | notebookutils.fs.mkdirs("abfss:// container_name<@storage_account_name.dfs.core.windows.net/><> new_dir")<> | notebookutils.fs.mkdirs("file:/<new_dir>") |
Озеро по умолчанию | Каталог в разделе "Файлы" или "Таблицы": notebookutils.fs.mkdirs("Files/<new_dir>") | notebookutils.fs.mkdirs("abfss:// container_name<@storage_account_name.dfs.core.windows.net/><> new_dir")<> | notebookutils.fs.mkdirs("file:/<new_dir>") |
Перечень файлов
Чтобы получить список содержимого каталога, используйте notebookutils.fs.ls("Путь к каталогу"). Например:
notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp") # The full path of the local file system of driver node
API notebookutils.fs.ls()
работает по-разному при использовании относительного пути в зависимости от типа блокнота.
В записной книжке Spark: относительный путь задаётся относительно пути ABFSS по умолчанию в Lakehouse. Например,
notebookutils.fs.ls("Files")
указывает на каталогFiles
в Lakehouse по умолчанию.Например:
notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
В записной книжке Python: относительный путь относится к рабочему каталогу локальной файловой системы, который по умолчанию — /home/trusted-service-user/work. Поэтому вместо относительного пути следует использовать полный путь,
notebookutils.fs.ls("/lakehouse/default/Files")
для доступа к каталогуFiles
в Lakehouse по умолчанию.Например:
notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
Просмотр свойств файла.
Этот метод возвращает свойства файла, включая имя файла, путь к файлу, размер файла и то, является ли он каталогом и файлом.
files = notebookutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Создать новый каталог
Этот метод создает указанный каталог, если он не существует, и создает все необходимые родительские каталоги.
notebookutils.fs.mkdirs('new directory name')
notebookutils.fs.mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
notebookutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Копировать файл
Этот метод копирует файл или каталог и поддерживает действие копирования в файловых системах.
notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Примечание.
Из-за ограничений ярлыков OneLake , когда необходимо использовать notebookutils.fs.cp()
для копирования данных из ярлыка типа S3/GCS, рекомендуется использовать смонтированный путь вместо пути abfss.
Файл выполнения копирования
Этот метод обеспечивает более эффективный подход к копированию или перемещению файлов, особенно при работе с большими томами данных. Для повышения производительности в Fabric рекомендуется использовать fastcp
в качестве замены традиционного cp
метода.
Примечание.
-
notebookutils.fs.fastcp()
не поддерживает копирование файлов в OneLake в разных регионах. В этом случае вместо этого можно использоватьnotebookutils.fs.cp()
. - Из-за ограничений сочетания клавиш OneLake, когда необходимо использовать
notebookutils.fs.fastcp()
для копирования данных из ярлыков типа S3/GCS, рекомендуется использовать монтированный путь вместо пути abfss.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Предварительный просмотр содержимого файла
Этот метод возвращает до первых байтов maxBytes заданного файла в виде строки, закодированной в UTF-8.
notebookutils.fs.head('file path', maxBytes to read)
Переместить файл
Этот метод перемещает файл или каталог и поддерживает перемещение между файловыми системами.
notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
Записать в файл
Этот метод записывает указанную строку в файл, закодированный в UTF-8.
notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Добавить содержимое в файл
Этот метод добавляет заданную строку в файл, закодированный в UTF-8.
notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Примечание.
-
notebookutils.fs.append()
иnotebookutils.fs.put()
не поддерживают одновременную запись в тот же файл из-за отсутствия гарантий атомарности. - При использовании API
notebookutils.fs.append
в циклеfor
для записи в тот же файл рекомендуется добавить операторsleep
, делая паузу на 0,5–1 секунды между повторяющимися записями. Эта рекомендация обусловлена тем, что внутренняя операцияflush
APInotebookutils.fs.append
асинхронна, поэтому короткая задержка помогает обеспечить целостность данных.
Удалить файл или каталог
Этот метод удаляет файл или каталог.
notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Каталог mount/unmount
Дополнительные сведения об использовании в подключении файлов и отключении.
Служебные программы записных книжек
Используйте служебные программы записной книжки для запуска записной книжки или выхода из записной книжки со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:
notebookutils.notebook.help()
Выходные данные:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.
[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.
Use notebookutils.notebook.help("methodName") for more info about a method.
Примечание.
Служебные программы записной книжки не применимы для определений заданий Apache Spark (SJD).
Ссылка на записную книжку
Этот метод ссылается на записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. На записную книжку, на которую ссылается ссылка, выполняется в пуле Spark записной книжки, которая вызывает эту функцию.
notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
Например:
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
Записная книжка Fabric также поддерживает ссылки на записные книжки в нескольких рабочих областях, указав идентификатор рабочей области.
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
Ссылку моментального снимка ссылки можно открыть в выходных данных ячейки. Моментальный снимок записывает результаты выполнения кода и позволяет легко отлаживать эталонный запуск.
Примечание.
- Справочная записная книжка между рабочими областями поддерживается средой выполнения версии 1.2 и выше.
- Если вы используете файлы в разделе "Ресурс записной книжки", используйте
notebookutils.nbResPath
в записной книжке, на которой ссылается ссылка, чтобы убедиться, что он указывает на ту же папку, что и в интерактивном запуске.
Ссылка на параллельное выполнение нескольких записных книжек
Внимание
Эта функция доступна в предварительной версии.
Этот метод notebookutils.notebook.runMultiple()
позволяет выполнять несколько записных книжек параллельно или с предопределенной топологической структурой. API использует многопоточную реализацию в сеансе Spark, что означает, что референсная тетрадь разделяет вычислительные ресурсы.
С помощью notebookutils.notebook.runMultiple()
:
Одновременно выполняйте несколько записных книжек, не ожидая завершения каждой из них.
Укажите зависимости и порядок выполнения записных книжек с помощью простого формата JSON.
Оптимизируйте использование вычислительных ресурсов Spark и уменьшите затраты на проекты Fabric.
Просмотр моментальных снимков каждой записи запуска записной книжки в выходных данных и отладка и мониторинг задач записной книжки удобно.
Получите значение выхода для каждого исполнительного действия и используйте их в подчиненных задачах.
Вы также можете попробовать запустить notebookutils.notebook.help(runMultiple), чтобы найти пример и подробное использование.
Ниже приведен простой пример запуска списка записных книжек параллельно с помощью этого метода:
notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
Результат выполнения корневой записной книжки выглядит следующим образом:
Вот пример запуска записных книжек с топологической структурой с помощью notebookutils.notebook.runMultiple()
. Используйте этот метод для легкой оркестрации записных книжек с помощью интерфейса кода.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
Результат выполнения корневой записной книжки выглядит следующим образом:
Мы также предоставляем метод для проверки правильности определения DAG.
notebookutils.notebook.validateDAG(DAG)
Примечание.
- Степень параллелизма выполнения нескольких записных книжек ограничена общим доступным вычислительным ресурсом сеанса Spark.
- Верхний предел для действий записной книжки или параллельных записных книжек составляет 50. Превышение этого предела может привести к проблемам стабильности и производительности из-за высокого использования вычислительных ресурсов. Если возникают проблемы, рассмотрите возможность разделения записных книжек на несколько
runMultiple
вызовов или уменьшения параллелизма, изменив поле параллелизма в параметре DAG. - Время ожидания по умолчанию для всего DAG составляет 12 часов, а время ожидания по умолчанию для каждой ячейки в дочерней записной книжке составляет 90 секунд. Вы можете изменить время ожидания, задав поля timeoutInSeconds и timeoutPerCellInSeconds в параметре DAG.
Выход из записной книжки
Этот метод завершает записную книжку со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.
При вызове функции exit() из записной книжки в интерактивном режиме записная книжка Fabric создает исключение, пропускает последующие ячейки и сохраняет сеанс Spark в живых.
Когда проводится оркестрация записной книжки внутри конвейера, который вызывает функцию exit(), задача записной книжки возвращается со значением выхода. Это завершает выполнение конвейера и останавливает сеанс Spark.
При вызове функции exit() в записной книжке, на которую ссылается ссылка, Fabric Spark остановит дальнейшее выполнение записной книжки, на которую ссылается ссылка, и продолжит выполнять следующие ячейки в главной записной книжке, которая вызывает функцию run(). Например: Notebook1 имеет три ячейки и вызывает функцию exit() во второй ячейке. Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Записная книжка1 останавливается во второй ячейке при нажатии функции exit(). Notebook2 продолжает выполнять свою четвертую и пятую ячейку.
notebookutils.notebook.exit("value string")
Примечание.
Функция exit() перезаписывает текущие выходные данные ячейки. Чтобы избежать потери выходных данных других строк кода, вызовите notebookutils.notebook.exit()
в отдельной ячейке.
Например:
Пример1 записной книжки со следующими двумя ячейками:
Ячейка 1 определяет входной параметр со значением по умолчанию, равным 10.
Ячейка 2 выходит из записной книжки с входным значением.
Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
Выходные данные:
Notebook is executed successfully with exit value 10
Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Выходные данные:
Notebook is executed successfully with exit value 20
Управление артефактами записной книжки
notebookutils.notebook
предоставляет специализированные служебные программы для программного управления элементами записной книжки. Эти API позволяют легко создавать, получать, обновлять и удалять элементы записной книжки.
Чтобы эффективно использовать эти методы, рассмотрим следующие примеры использования:
Создание записной книжки
with open("/path/to/notebook.ipynb", "r") as f:
content = f.read()
artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")
Получение содержимого записной книжки
artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")
Обновление записной книжки
updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name", "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")
Удаление записной книжки
is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")
Перечисление записных книжек в рабочей области
artifacts_list = notebookutils.notebook.list("optional_workspace_id")
Служебные программы для учетных данных
Служебные программы учетных данных можно использовать для получения маркеров доступа и управления секретами в Azure Key Vault.
Чтобы получить общие сведения о доступных методах, используйте следующую команду:
notebookutils.credentials.help()
Выходные данные:
Help on module notebookutils.credentials in notebookutils:
NAME
notebookutils.credentials - Utility for credentials operations in Fabric
FUNCTIONS
getSecret(akvName, secret) -> str
Gets a secret from the given Azure Key Vault.
:param akvName: The name of the Azure Key Vault.
:param secret: The name of the secret.
:return: The secret value.
getToken(audience) -> str
Gets a token for the given audience.
:param audience: The audience for the token.
:return: The token.
help(method_name=None)
Provides help for the notebookutils.credentials module or the specified method.
Examples:
notebookutils.credentials.help()
notebookutils.credentials.help("getToken")
:param method_name: The name of the method to get help with.
DATA
creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...
FILE
/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py
Получение токена
getToken возвращает маркер Microsoft Entra для заданной аудитории и имени (необязательно). В следующем списке показаны доступные в настоящее время ключи аудитории:
- Ресурс аудитории хранилища: "хранилище"
- Ресурс Power BI: "pbi"
- Ресурс Azure Key Vault: keyvault
- Ресурс RTA KQL DB Synapse: kusto
Выполните следующую команду, чтобы получить маркер:
notebookutils.credentials.getToken('audience Key')
Получить секрет с помощью учетных данных пользователя
getSecret возвращает секрет Azure Key Vault для заданной конечной точки Azure Key Vault и имени секрета с использованием учетных данных пользователя.
notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
Подключение файлов и отключение
Fabric поддерживает следующие сценарии подключения в пакете служебных программ Microsoft Spark. Вы можете использовать интерфейсы API подключения, отключения, getMountPath()и mounts() для подключения удаленного хранилища (ADLS 2-го поколения) ко всем рабочим узлам (узлам драйвера и рабочим узлам). После установки точки подключения хранилища используйте локальный API файлов для доступа к данным, как будто он хранится в локальной файловой системе.
Подключение учетной записи ADLS 2-го поколения
В следующем примере показано, как подключить Azure Data Lake Storage 2-го поколения. Подключение Хранилища BLOB-объектов работает аналогично.
В этом примере предполагается, что у вас есть одна учетная запись Data Lake Storage 2-го поколения с именем storegen2, а у учетной записи есть один контейнер с именем mycontainer, который требуется подключить к /test в сеанс Spark записной книжки.
Чтобы подключить контейнер с именем mycontainer, notebookutils сначала необходимо проверить, есть ли у вас разрешение на доступ к контейнеру. В настоящее время Fabric поддерживает два метода проверки подлинности для операции подключения триггера: accountKey и sastoken.
Подключение с помощью маркера подписанного URL-адреса или ключа учетной записи
NotebookUtils поддерживает явное передача ключа учетной записи или маркера подписанного URL-адреса (SAS) в качестве параметра для подключения целевого объекта.
По соображениям безопасности рекомендуется хранить ключи учетной записи или маркеры SAS в Azure Key Vault (как показано на следующем снимке экрана). Затем их можно получить с помощью API notebookutils.credentials.getSecret . Дополнительные сведения об Azure Key Vault см. в статье "Сведения о ключах управляемой учетной записи хранения Azure Key Vault".
Пример кода для метода accountKey :
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Пример кода для sastoken:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Параметры подключения:
- fileCacheTimeout: Blob-объекты по умолчанию кэшируются в локальной временной папке на 120 секунд. В течение этого времени blobfuse не проверяет, обновлен ли файл. Параметр может быть задан для изменения времени ожидания по умолчанию. Если одновременно несколько клиентов изменяют файлы, чтобы избежать несоответствий между локальными и удаленными файлами, рекомендуется сократить время кэша или даже изменить его на 0 и всегда получать последние файлы с сервера.
- время ожидания: время ожидания операции подключения — 120 секунд по умолчанию. Параметр может быть задан для изменения времени ожидания по умолчанию. Если количество исполнителей или время ожидания подключения слишком много, рекомендуется увеличить значение.
Эти параметры можно использовать следующим образом:
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Примечание.
В целях безопасности рекомендуется избежать внедрения учетных данных непосредственно в код. Для дальнейшей защиты ваших учетных данных, все секреты, отображаемые в выходных данных записной книжки, редактируются. Дополнительные сведения см. в статье Скрытие секретов.
Как подключить озеро
Пример кода для подключения lakehouse к /<mount_name>:
notebookutils.fs.mount(
"abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse",
"/<mount_name>"
)
Доступ к файлам в точке подключения с помощью API notebookutils fs
Основной целью операции подключения является предоставление клиентам доступа к данным, хранящимся в удаленной учетной записи хранения, с помощью API локальной файловой системы. Вы также можете получить доступ к данным с помощью API notebookutils fs с подключенным путем в качестве параметра. Этот формат пути немного отличается.
Предположим, что вы подключили контейнер Data Lake Storage 2-го поколения mycontainer к /test с помощью API подключения. При доступе к данным с помощью локального API файловой системы формат пути выглядит следующим образом:
/synfs/notebook/{sessionId}/test/{filename}
Если вы хотите получить доступ к данным с помощью API notebookutils fs, рекомендуется использовать getMountPath(), чтобы получить точный путь:
path = notebookutils.fs.getMountPath("/test")
Список каталогов:
notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
Чтение содержимого файла:
notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
Создание каталога:
notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
Доступ к файлам под точкой подключения через локальный путь
Вы можете легко считывать и записывать файлы в точке подключения с помощью стандартной файловой системы. Ниже приведен пример Python:
#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Как проверить существующие точки подключения
Api notebookutils.fs.mounts() можно использовать для проверки всех существующих сведений о точке подключения:
notebookutils.fs.mounts()
Отключение точки подключения
Используйте следующий код, чтобы отключить точку подключения (/test в этом примере):
notebookutils.fs.unmount("/test")
Известные ограничения
Текущее подключение — это конфигурация уровня задания; Рекомендуется использовать API подключений, чтобы проверить, существует ли точка подключения или недоступна.
Механизм отключения не применяется автоматически. Когда приложение завершит работу, чтобы отключить точку подключения и освободить место на диске, необходимо явно вызвать API отключения в коде. В противном случае точка подключения по-прежнему будет существовать в узле после завершения работы приложения.
Подключение учетной записи хранения ADLS 1-го поколения не поддерживается.
Служебные программы Lakehouse
notebookutils.lakehouse
предоставляет служебные программы, адаптированные для управления элементами Lakehouse. Эти служебные программы позволяют создавать, получать, обновлять и удалять артефакты Lakehouse без усилий.
Обзор методов
Ниже приведен обзор доступных методов, предоставляемых notebookutils.lakehouse
:
# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]
# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table]
# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table]
Примеры использования
Чтобы эффективно использовать эти методы, рассмотрим следующие примеры использования:
Создание Lakehouse
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
Получение Lakehouse
artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")
Обновление Lakehouse
updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
Удаление Lakehouse
is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")
Перечисление Lakehouses в рабочей области
artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")
Перечисление всех таблиц в Lakehouse
artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")
Запуск операции таблицы загрузки в Lakehouse
notebookutils.lakehouse.loadTable(
{
"relativePath": "Files/myFile.csv",
"pathType": "File",
"mode": "Overwrite",
"recursive": False,
"formatOptions": {
"format": "Csv",
"header": True,
"delimiter": ","
}
}, "table_name", "artifact_name", "optional_workspace_id")
Дополнительная информация:
Для получения дополнительных сведений о каждом методе и его параметрах используйте функцию notebookutils.lakehouse.help("methodName")
.
Служебные программы среды выполнения
Отображение сведений о контексте сеанса
С notebookutils.runtime.context
помощью контекста текущего динамического сеанса можно получить сведения о контексте, включая имя записной книжки, озеро по умолчанию, сведения о рабочей области, если это запуск конвейера и т. д.
notebookutils.runtime.context
Управление сеансами
Остановка интерактивного сеанса
Вместо того чтобы вручную нажать кнопку остановки, иногда удобнее остановить интерактивный сеанс, вызвав API в коде. В таких случаях мы предоставляем API notebookutils.session.stop()
для поддержки остановки интерактивного сеанса через код, он доступен для Scala и PySpark.
notebookutils.session.stop()
notebookutils.session.stop()
API останавливает текущий интерактивный сеанс асинхронно в фоновом режиме. Он также останавливает сеанс Spark и освобождает ресурсы, занятые сеансом, поэтому они доступны другим сеансам в том же пуле.
Перезапустите интерпретатор Python
Служебная программа notebookutils.session позволяет перезапустить интерпретатор Python.
notebookutils.session.restartPython()
Примечание.
- В режиме запуска с указанием на записную книжку, команда
restartPython()
перезапускает интерпретатор Python текущей записной книжки, на которую идет ссылка. - В редких случаях команда может завершиться ошибкой из-за механизмов отражения в Spark, но добавление повторной попытки может помочь устранить эту проблему.
Известная проблема
При использовании версии среды выполнения выше 1.2 и запуска
notebookutils.help()
перечисленные интерфейсы fabricClient, API PBIClient пока не поддерживаются, будут доступны в дальнейшем. Кроме того, API учетных данных пока не поддерживается в записных книжках Scala.Записная книжка Python не поддерживает API остановки, restartPython при использовании утилиты notebookutils.session для управления сеансами.