Поделиться через


NotebookUtils (бывший MSSparkUtils) для Fabric

Служебные программы записной книжки (NotebookUtils) — это встроенный пакет, помогающий легко выполнять распространенные задачи в Fabric Notebook. Записные книжки NotebookUtils можно использовать для работы с файловыми системами, получения переменных среды, объединения записных книжек и работы с секретами. Пакет NotebookUtils доступен в конвейерах PySpark (Python), Scala, SparkR и конвейерах Fabric.

Примечание.

  • MsSparkUtils официально переименована в NotebookUtils. Существующий код остается обратно совместимым и не приведет к критическим изменениям. Настоятельно рекомендуется обновить записные книжки до записных книжек, чтобы обеспечить постоянную поддержку и доступ к новым функциям. Пространство имен mssparkutils будет прекращено в будущем.
  • NotebookUtils предназначен для работы с Spark 3.4(Runtime версии 1.2) и более поздних версий. Все новые функции и обновления будут поддерживаться исключительно с пространством имен notebookutils в дальнейшем.

Служебные программы файловой системы

Notebookutils.fs предоставляет служебные программы для работы с различными файловыми системами, включая Azure Data Lake Storage (ADLS) 2-го поколения и Хранилище BLOB-объектов Azure. Убедитесь, что доступ к Azure Data Lake Storage 2-го поколения и Хранилищу BLOB-объектов Azure настроен правильно.

Чтобы получить общие сведения о доступных методах, выполните следующие команды:

notebookutils.fs.help()

Выходные данные

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

NotebookUtils работает с файловой системой так же, как и API Spark. Возьмите notebookutils.fs.mkdirs() и Fabric lakehouse, например:

Использование Относительный путь из корневого каталога HDFS Абсолютный путь для файловой системы ABFS Абсолютный путь к локальной файловой системе на узле драйвера
Не по умолчанию lakehouse Не поддерживается notebookutils.fs.mkdirs("abfss:// container_name<@storage_account_name.dfs.core.windows.net/><> new_dir")<> notebookutils.fs.mkdirs("file:/<new_dir>")
Озеро по умолчанию Каталог в разделе "Файлы" или "Таблицы": notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss:// container_name<@storage_account_name.dfs.core.windows.net/><> new_dir")<> notebookutils.fs.mkdirs("file:/<new_dir>")

Перечень файлов

Чтобы получить список содержимого каталога, используйте notebookutils.fs.ls("Путь к каталогу"). Например:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

API notebookutils.fs.ls() работает по-разному при использовании относительного пути в зависимости от типа блокнота.

  • В записной книжке Spark: относительный путь задаётся относительно пути ABFSS по умолчанию в Lakehouse. Например, notebookutils.fs.ls("Files") указывает на каталог Files в Lakehouse по умолчанию.

    Например:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • В записной книжке Python: относительный путь относится к рабочему каталогу локальной файловой системы, который по умолчанию — /home/trusted-service-user/work. Поэтому вместо относительного пути следует использовать полный путь, notebookutils.fs.ls("/lakehouse/default/Files") для доступа к каталогу Files в Lakehouse по умолчанию.

    Например:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

Просмотр свойств файла.

Этот метод возвращает свойства файла, включая имя файла, путь к файлу, размер файла и то, является ли он каталогом и файлом.

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Создать новый каталог

Этот метод создает указанный каталог, если он не существует, и создает все необходимые родительские каталоги.

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Копировать файл

Этот метод копирует файл или каталог и поддерживает действие копирования в файловых системах.

notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Примечание.

Из-за ограничений ярлыков OneLake , когда необходимо использовать notebookutils.fs.cp() для копирования данных из ярлыка типа S3/GCS, рекомендуется использовать смонтированный путь вместо пути abfss.

Файл выполнения копирования

Этот метод обеспечивает более эффективный подход к копированию или перемещению файлов, особенно при работе с большими томами данных. Для повышения производительности в Fabric рекомендуется использовать fastcp в качестве замены традиционного cp метода.

Примечание.

  • notebookutils.fs.fastcp() не поддерживает копирование файлов в OneLake в разных регионах. В этом случае вместо этого можно использовать notebookutils.fs.cp() .
  • Из-за ограничений сочетания клавиш OneLake, когда необходимо использовать notebookutils.fs.fastcp() для копирования данных из ярлыков типа S3/GCS, рекомендуется использовать монтированный путь вместо пути abfss.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Предварительный просмотр содержимого файла

Этот метод возвращает до первых байтов maxBytes заданного файла в виде строки, закодированной в UTF-8.

notebookutils.fs.head('file path', maxBytes to read)

Переместить файл

Этот метод перемещает файл или каталог и поддерживает перемещение между файловыми системами.

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Записать в файл

Этот метод записывает указанную строку в файл, закодированный в UTF-8.

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Добавить содержимое в файл

Этот метод добавляет заданную строку в файл, закодированный в UTF-8.

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Примечание.

  • notebookutils.fs.append() и notebookutils.fs.put() не поддерживают одновременную запись в тот же файл из-за отсутствия гарантий атомарности.
  • При использовании API notebookutils.fs.append в цикле for для записи в тот же файл рекомендуется добавить оператор sleep, делая паузу на 0,5–1 секунды между повторяющимися записями. Эта рекомендация обусловлена тем, что внутренняя операция flush API notebookutils.fs.append асинхронна, поэтому короткая задержка помогает обеспечить целостность данных.

Удалить файл или каталог

Этот метод удаляет файл или каталог.

notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Каталог mount/unmount

Дополнительные сведения об использовании в подключении файлов и отключении.

Служебные программы записных книжек

Используйте служебные программы записной книжки для запуска записной книжки или выхода из записной книжки со значением. Чтобы получить общие сведения о доступных методах, используйте следующую команду:

notebookutils.notebook.help()

Выходные данные:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.

[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

Примечание.

Служебные программы записной книжки не применимы для определений заданий Apache Spark (SJD).

Ссылка на записную книжку

Этот метод ссылается на записную книжку и возвращает значение выхода. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере. На записную книжку, на которую ссылается ссылка, выполняется в пуле Spark записной книжки, которая вызывает эту функцию.

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Например:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Записная книжка Fabric также поддерживает ссылки на записные книжки в нескольких рабочих областях, указав идентификатор рабочей области.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Ссылку моментального снимка ссылки можно открыть в выходных данных ячейки. Моментальный снимок записывает результаты выполнения кода и позволяет легко отлаживать эталонный запуск.

Снимок экрана: результат выполнения ссылки.

Снимок экрана: пример моментального снимка.

Примечание.

  • Справочная записная книжка между рабочими областями поддерживается средой выполнения версии 1.2 и выше.
  • Если вы используете файлы в разделе "Ресурс записной книжки", используйте notebookutils.nbResPath в записной книжке, на которой ссылается ссылка, чтобы убедиться, что он указывает на ту же папку, что и в интерактивном запуске.

Ссылка на параллельное выполнение нескольких записных книжек

Внимание

Эта функция доступна в предварительной версии.

Этот метод notebookutils.notebook.runMultiple() позволяет выполнять несколько записных книжек параллельно или с предопределенной топологической структурой. API использует многопоточную реализацию в сеансе Spark, что означает, что референсная тетрадь разделяет вычислительные ресурсы.

С помощью notebookutils.notebook.runMultiple():

  • Одновременно выполняйте несколько записных книжек, не ожидая завершения каждой из них.

  • Укажите зависимости и порядок выполнения записных книжек с помощью простого формата JSON.

  • Оптимизируйте использование вычислительных ресурсов Spark и уменьшите затраты на проекты Fabric.

  • Просмотр моментальных снимков каждой записи запуска записной книжки в выходных данных и отладка и мониторинг задач записной книжки удобно.

  • Получите значение выхода для каждого исполнительного действия и используйте их в подчиненных задачах.

Вы также можете попробовать запустить notebookutils.notebook.help(runMultiple), чтобы найти пример и подробное использование.

Ниже приведен простой пример запуска списка записных книжек параллельно с помощью этого метода:


notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Результат выполнения корневой записной книжки выглядит следующим образом:

Снимок экрана: ссылка на список записных книжек.

Вот пример запуска записных книжек с топологической структурой с помощью notebookutils.notebook.runMultiple(). Используйте этот метод для легкой оркестрации записных книжек с помощью интерфейса кода.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Результат выполнения корневой записной книжки выглядит следующим образом:

Снимок экрана: ссылка на список записных книжек с параметрами.

Мы также предоставляем метод для проверки правильности определения DAG.

notebookutils.notebook.validateDAG(DAG)

Примечание.

  • Степень параллелизма выполнения нескольких записных книжек ограничена общим доступным вычислительным ресурсом сеанса Spark.
  • Верхний предел для действий записной книжки или параллельных записных книжек составляет 50. Превышение этого предела может привести к проблемам стабильности и производительности из-за высокого использования вычислительных ресурсов. Если возникают проблемы, рассмотрите возможность разделения записных книжек на несколько runMultiple вызовов или уменьшения параллелизма, изменив поле параллелизма в параметре DAG.
  • Время ожидания по умолчанию для всего DAG составляет 12 часов, а время ожидания по умолчанию для каждой ячейки в дочерней записной книжке составляет 90 секунд. Вы можете изменить время ожидания, задав поля timeoutInSeconds и timeoutPerCellInSeconds в параметре DAG.

Выход из записной книжки

Этот метод завершает записную книжку со значением. Вызовы вложенных функций можно запускать в записной книжке в интерактивном режиме или в конвейере.

  • При вызове функции exit() из записной книжки в интерактивном режиме записная книжка Fabric создает исключение, пропускает последующие ячейки и сохраняет сеанс Spark в живых.

  • Когда проводится оркестрация записной книжки внутри конвейера, который вызывает функцию exit(), задача записной книжки возвращается со значением выхода. Это завершает выполнение конвейера и останавливает сеанс Spark.

  • При вызове функции exit() в записной книжке, на которую ссылается ссылка, Fabric Spark остановит дальнейшее выполнение записной книжки, на которую ссылается ссылка, и продолжит выполнять следующие ячейки в главной записной книжке, которая вызывает функцию run(). Например: Notebook1 имеет три ячейки и вызывает функцию exit() во второй ячейке. Notebook2 содержит пять ячеек и вызывает run(notebook1) в третьей ячейке. При запуске Notebook2 Записная книжка1 останавливается во второй ячейке при нажатии функции exit(). Notebook2 продолжает выполнять свою четвертую и пятую ячейку.

notebookutils.notebook.exit("value string")

Примечание.

Функция exit() перезаписывает текущие выходные данные ячейки. Чтобы избежать потери выходных данных других строк кода, вызовите notebookutils.notebook.exit() в отдельной ячейке.

Например:

Пример1 записной книжки со следующими двумя ячейками:

  • Ячейка 1 определяет входной параметр со значением по умолчанию, равным 10.

  • Ячейка 2 выходит из записной книжки с входным значением.

Снимок экрана: пример записной книжки выхода.

Вы можете запустить Sample1 в другой записной книжке со значениями по умолчанию:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Выходные данные:

Notebook is executed successfully with exit value 10

Вы можете запустить Sample1 в другой записной книжке и указать входное значение 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Выходные данные:

Notebook is executed successfully with exit value 20

Управление артефактами записной книжки

notebookutils.notebook предоставляет специализированные служебные программы для программного управления элементами записной книжки. Эти API позволяют легко создавать, получать, обновлять и удалять элементы записной книжки.

Чтобы эффективно использовать эти методы, рассмотрим следующие примеры использования:

Создание записной книжки

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

Получение содержимого записной книжки

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

Обновление записной книжки

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

Удаление записной книжки

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

Перечисление записных книжек в рабочей области

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

Служебные программы для учетных данных

Служебные программы учетных данных можно использовать для получения маркеров доступа и управления секретами в Azure Key Vault.

Чтобы получить общие сведения о доступных методах, используйте следующую команду:

notebookutils.credentials.help()

Выходные данные:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

Получение токена

getToken возвращает маркер Microsoft Entra для заданной аудитории и имени (необязательно). В следующем списке показаны доступные в настоящее время ключи аудитории:

  • Ресурс аудитории хранилища: "хранилище"
  • Ресурс Power BI: "pbi"
  • Ресурс Azure Key Vault: keyvault
  • Ресурс RTA KQL DB Synapse: kusto

Выполните следующую команду, чтобы получить маркер:

notebookutils.credentials.getToken('audience Key')

Получить секрет с помощью учетных данных пользователя

getSecret возвращает секрет Azure Key Vault для заданной конечной точки Azure Key Vault и имени секрета с использованием учетных данных пользователя.

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Подключение файлов и отключение

Fabric поддерживает следующие сценарии подключения в пакете служебных программ Microsoft Spark. Вы можете использовать интерфейсы API подключения, отключения, getMountPath()и mounts() для подключения удаленного хранилища (ADLS 2-го поколения) ко всем рабочим узлам (узлам драйвера и рабочим узлам). После установки точки подключения хранилища используйте локальный API файлов для доступа к данным, как будто он хранится в локальной файловой системе.

Подключение учетной записи ADLS 2-го поколения

В следующем примере показано, как подключить Azure Data Lake Storage 2-го поколения. Подключение Хранилища BLOB-объектов работает аналогично.

В этом примере предполагается, что у вас есть одна учетная запись Data Lake Storage 2-го поколения с именем storegen2, а у учетной записи есть один контейнер с именем mycontainer, который требуется подключить к /test в сеанс Spark записной книжки.

Снимок экрана, показывающий, где выбрать контейнер для подключения.

Чтобы подключить контейнер с именем mycontainer, notebookutils сначала необходимо проверить, есть ли у вас разрешение на доступ к контейнеру. В настоящее время Fabric поддерживает два метода проверки подлинности для операции подключения триггера: accountKey и sastoken.

Подключение с помощью маркера подписанного URL-адреса или ключа учетной записи

NotebookUtils поддерживает явное передача ключа учетной записи или маркера подписанного URL-адреса (SAS) в качестве параметра для подключения целевого объекта.

По соображениям безопасности рекомендуется хранить ключи учетной записи или маркеры SAS в Azure Key Vault (как показано на следующем снимке экрана). Затем их можно получить с помощью API notebookutils.credentials.getSecret . Дополнительные сведения об Azure Key Vault см. в статье "Сведения о ключах управляемой учетной записи хранения Azure Key Vault".

Снимок экрана, показывающий, где хранятся секреты в Azure Key Vault.

Пример кода для метода accountKey :

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Пример кода для sastoken:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Параметры подключения:

  • fileCacheTimeout: Blob-объекты по умолчанию кэшируются в локальной временной папке на 120 секунд. В течение этого времени blobfuse не проверяет, обновлен ли файл. Параметр может быть задан для изменения времени ожидания по умолчанию. Если одновременно несколько клиентов изменяют файлы, чтобы избежать несоответствий между локальными и удаленными файлами, рекомендуется сократить время кэша или даже изменить его на 0 и всегда получать последние файлы с сервера.
  • время ожидания: время ожидания операции подключения — 120 секунд по умолчанию. Параметр может быть задан для изменения времени ожидания по умолчанию. Если количество исполнителей или время ожидания подключения слишком много, рекомендуется увеличить значение.

Эти параметры можно использовать следующим образом:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Примечание.

В целях безопасности рекомендуется избежать внедрения учетных данных непосредственно в код. Для дальнейшей защиты ваших учетных данных, все секреты, отображаемые в выходных данных записной книжки, редактируются. Дополнительные сведения см. в статье Скрытие секретов.

Как подключить озеро

Пример кода для подключения lakehouse к /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

Доступ к файлам в точке подключения с помощью API notebookutils fs

Основной целью операции подключения является предоставление клиентам доступа к данным, хранящимся в удаленной учетной записи хранения, с помощью API локальной файловой системы. Вы также можете получить доступ к данным с помощью API notebookutils fs с подключенным путем в качестве параметра. Этот формат пути немного отличается.

Предположим, что вы подключили контейнер Data Lake Storage 2-го поколения mycontainer к /test с помощью API подключения. При доступе к данным с помощью локального API файловой системы формат пути выглядит следующим образом:

/synfs/notebook/{sessionId}/test/{filename}

Если вы хотите получить доступ к данным с помощью API notebookutils fs, рекомендуется использовать getMountPath(), чтобы получить точный путь:

path = notebookutils.fs.getMountPath("/test")
  • Список каталогов:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • Чтение содержимого файла:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Создание каталога:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

Доступ к файлам под точкой подключения через локальный путь

Вы можете легко считывать и записывать файлы в точке подключения с помощью стандартной файловой системы. Ниже приведен пример Python:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Как проверить существующие точки подключения

Api notebookutils.fs.mounts() можно использовать для проверки всех существующих сведений о точке подключения:

notebookutils.fs.mounts()

Отключение точки подключения

Используйте следующий код, чтобы отключить точку подключения (/test в этом примере):

notebookutils.fs.unmount("/test")

Известные ограничения

  • Текущее подключение — это конфигурация уровня задания; Рекомендуется использовать API подключений, чтобы проверить, существует ли точка подключения или недоступна.

  • Механизм отключения не применяется автоматически. Когда приложение завершит работу, чтобы отключить точку подключения и освободить место на диске, необходимо явно вызвать API отключения в коде. В противном случае точка подключения по-прежнему будет существовать в узле после завершения работы приложения.

  • Подключение учетной записи хранения ADLS 1-го поколения не поддерживается.

Служебные программы Lakehouse

notebookutils.lakehouse предоставляет служебные программы, адаптированные для управления элементами Lakehouse. Эти служебные программы позволяют создавать, получать, обновлять и удалять артефакты Lakehouse без усилий.

Обзор методов

Ниже приведен обзор доступных методов, предоставляемых notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

Примеры использования

Чтобы эффективно использовать эти методы, рассмотрим следующие примеры использования:

Создание Lakehouse

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Получение Lakehouse

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

Обновление Lakehouse

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Удаление Lakehouse

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Перечисление Lakehouses в рабочей области

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

Перечисление всех таблиц в Lakehouse

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

Запуск операции таблицы загрузки в Lakehouse

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

Дополнительная информация:

Для получения дополнительных сведений о каждом методе и его параметрах используйте функцию notebookutils.lakehouse.help("methodName") .

Служебные программы среды выполнения

Отображение сведений о контексте сеанса

С notebookutils.runtime.context помощью контекста текущего динамического сеанса можно получить сведения о контексте, включая имя записной книжки, озеро по умолчанию, сведения о рабочей области, если это запуск конвейера и т. д.

notebookutils.runtime.context

Управление сеансами

Остановка интерактивного сеанса

Вместо того чтобы вручную нажать кнопку остановки, иногда удобнее остановить интерактивный сеанс, вызвав API в коде. В таких случаях мы предоставляем API notebookutils.session.stop() для поддержки остановки интерактивного сеанса через код, он доступен для Scala и PySpark.

notebookutils.session.stop()

notebookutils.session.stop() API останавливает текущий интерактивный сеанс асинхронно в фоновом режиме. Он также останавливает сеанс Spark и освобождает ресурсы, занятые сеансом, поэтому они доступны другим сеансам в том же пуле.

Перезапустите интерпретатор Python

Служебная программа notebookutils.session позволяет перезапустить интерпретатор Python.

notebookutils.session.restartPython()

Примечание.

  • В режиме запуска с указанием на записную книжку, команда restartPython() перезапускает интерпретатор Python текущей записной книжки, на которую идет ссылка.
  • В редких случаях команда может завершиться ошибкой из-за механизмов отражения в Spark, но добавление повторной попытки может помочь устранить эту проблему.

Известная проблема

  • При использовании версии среды выполнения выше 1.2 и запуска notebookutils.help()перечисленные интерфейсы fabricClient, API PBIClient пока не поддерживаются, будут доступны в дальнейшем. Кроме того, API учетных данных пока не поддерживается в записных книжках Scala.

  • Записная книжка Python не поддерживает API остановки, restartPython при использовании утилиты notebookutils.session для управления сеансами.