Databricks SDK for R
注意
本文介绍 Databricks Labs 提供的 Databricks SDK for R,它处于试验状态。 若要提供反馈、提出问题和报告问题,请使用 GitHub 上 Databricks SDK for R 存储库中的“问题”选项卡。
本文介绍如何使用 Databricks SDK for R 在 Azure Databricks 工作区中自动执行 Azure Databricks 操作。本文补充了 Databricks SDK for R 文档。
注意
Databricks SDK for R 不支持 Azure Databricks 帐户中的操作自动化。 若要调用帐户级操作,请使用其他 Databricks SDK,例如:
开始之前
在开始使用 Databricks SDK for R 之前,开发计算机必须满足以下要求:
你要自动化的目标 Azure Databricks 工作区的 Azure Databricks 个人访问令牌。
注意
Databricks SDK for R 仅支持 Azure Databricks 个人访问令牌身份验证。
R 或者 R 兼容的集成开发环境 (IDE)。 Databricks 建议使用 RStudio Desktop,并在本文的说明中使用了它。
开始使用 Databricks SDK for R
使 Azure Databricks 工作区 URL 和个人访问令牌可用于 R 项目的脚本。 例如,可以将以下内容添加到 R 项目的
.Renviron
文件中。 将<your-workspace-url>
替换为每工作区 URL,例如https://adb-1234567890123456.7.azuredatabricks.net
。 将<your-personal-access-token>
替换为你的 Azure Databricks 个人访问令牌,例如dapi12345678901234567890123456789012
。DATABRICKS_HOST=<your-workspace-url> DATABRICKS_TOKEN=<your-personal-access-token>
若要创建 Azure Databricks 个人访问令牌,请遵循工作区用户的 Azure Databricks 个人访问令牌中的步骤。
有关提供 Azure Databricks 工作区 URL 和个人访问令牌的其他方法,请参阅 GitHub 中 Databricks SDK for R 存储库中的身份验证。
重要
请勿将
.Renviron
文件添加到版本控制系统,因为这有公开敏感信息的风险,例如 Azure Databricks 个人访问令牌。安装 Databricks SDK for R 包。 例如,在 RStudio Desktop 中,在“控制台”视图(“视图 > 将焦点移动到控制台”)中,运行以下命令,一次一个:
install.packages("devtools") library(devtools) install_github("databrickslabs/databricks-sdk-r")
注意
Databricks SDK for R 包在 CRAN 上不可用。
添加代码以引用 Databricks SDK for R 并列出 Azure Databricks 工作区中的所有群集。 例如,在项目的
main.r
文件中,代码可能如下所示:require(databricks) client <- DatabricksClient() list_clusters(client)[, "cluster_name"]
运行脚本。 例如,使用 RStuidio Desktop 时,在项目的
main.r
文件处于活动状态的脚本编辑器中,单击“源”>“源”或“带回显的源”。此时会显示群集列表。 例如,在 RStudio Desktop 中,它位于控制台视图中。
代码示例
以下代码示例演示如何使用 Databricks SDK for R 创建和删除群集以及创建作业。
创建群集
此代码示例使用指定的 Databricks Runtime 版本和群集节点类型创建群集。 此群集有一个工作器,群集在空闲 15 分钟后自动终止。
require(databricks)
client <- DatabricksClient()
response <- create_cluster(
client = client,
cluster_name = "my-cluster",
spark_version = "12.2.x-scala2.12",
node_type_id = "Standard_DS3_v2",
autotermination_minutes = 15,
num_workers = 1
)
# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]
# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
host <- paste(host, "/", sep = "")
}
print(paste(
"View the cluster at ",
host,
"#setting/clusters/",
response$cluster_id,
"/configuration",
sep = "")
)
永久删除群集
此代码示例从工作区中永久删除具有指定群集 ID 的群集。
require(databricks)
client <- DatabricksClient()
cluster_id <- readline("ID of the cluster to delete (for example, 1234-567890-ab123cd4):")
delete_cluster(client, cluster_id)
创建作业
此代码示例会创建一个可用于在指定群集上运行指定笔记本的 Azure Databricks 作业。 此代码运行时,它会从控制台的用户处获取现有笔记本的路径、现有群集 ID 和相关作业设置。
require(databricks)
client <- DatabricksClient()
job_name <- readline("Some short name for the job (for example, my-job):")
description <- readline("Some short description for the job (for example, My job):")
existing_cluster_id <- readline("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4):")
notebook_path <- readline("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook):")
task_key <- readline("Some key to apply to the job's tasks (for example, my-key):")
print("Attempting to create the job. Please wait...")
notebook_task <- list(
notebook_path = notebook_path,
source = "WORKSPACE"
)
job_task <- list(
task_key = task_key,
description = description,
existing_cluster_id = existing_cluster_id,
notebook_task = notebook_task
)
response <- create_job(
client,
name = job_name,
tasks = list(job_task)
)
# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]
# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
host <- paste(host, "/", sep = "")
}
print(paste(
"View the job at ",
host,
"#job/",
response$job_id,
sep = "")
)
日志记录
可以使用常用 logging
包来记录消息。 此包支持多个日志记录级别和自定义日志格式。 可以使用此包将消息记录到控制台或文件。 若要记录消息,请执行以下操作:
安装
logging
包。 例如,在 RStudio Desktop 中的“控制台”视图中(“视图”>“将焦点移动到控制台”),运行以下命令:install.packages("logging") library(logging)
启动日志记录包,设置记录消息的位置,并设置日志记录级别。 例如,以下代码将所有
ERROR
消息记录到results.log
文件中。basicConfig() addHandler(writeToFile, file="results.log") setLevel("ERROR")
根据需要记录消息。 例如,如果代码无法进行身份验证或列出可用群集的名称,则以下代码会记录任何错误。
require(databricks) require(logging) basicConfig() addHandler(writeToFile, file="results.log") setLevel("ERROR") tryCatch({ client <- DatabricksClient() }, error = function(e) { logerror(paste("Error initializing DatabricksClient(): ", e$message)) return(NA) }) tryCatch({ list_clusters(client)[, "cluster_name"] }, error = function(e) { logerror(paste("Error in list_clusters(client): ", e$message)) return(NA) })
测试
若要测试代码,可以使用 R 测试框架(如 testthat)。 若要在不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态的情况下在模拟条件下测试代码,可以使用 R 模拟库(如 mockery)。
例如,给定以下名为 helpers.r
的文件,其中包含返回有关新群集的信息的 createCluster
函数:
library(databricks)
createCluster <- function(
databricks_client,
cluster_name,
spark_version,
node_type_id,
autotermination_minutes,
num_workers
) {
response <- create_cluster(
client = databricks_client,
cluster_name = cluster_name,
spark_version = spark_version,
node_type_id = node_type_id,
autotermination_minutes = autotermination_minutes,
num_workers = num_workers
)
return(response)
}
给定以下名为 main.R
的文件,用于调用 createCluster
函数:
library(databricks)
source("helpers.R")
client <- DatabricksClient()
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = createCluster(
databricks_client = client,
cluster_name = "my-cluster",
spark_version = "<spark-version>",
node_type_id = "<node-type-id>",
autotermination_minutes = 15,
num_workers = 1
)
print(response$cluster_id)
以下名为 test-helpers.py
的文件测试 createCluster
函数是否返回预期的响应。 此测试模拟 DatabricksClient
对象,定义模拟对象的设置,然后将模拟对象传递给 createCluster
函数,而不是在目标工作区中创建群集。 然后,测试将检查函数是否返回新模拟群集的预期 ID。
# install.packages("testthat")
# install.pacakges("mockery")
# testthat::test_file("test-helpers.R")
lapply(c("databricks", "testthat", "mockery"), library, character.only = TRUE)
source("helpers.R")
test_that("createCluster mock returns expected results", {
# Create a mock response.
mock_response <- list(cluster_id = "abc123")
# Create a mock function for create_cluster().
mock_create_cluster <- mock(return_value = mock_response)
# Run the test with the mock function.
with_mock(
create_cluster = mock_create_cluster,
{
# Create a mock Databricks client.
mock_client <- mock()
# Call the function with the mock client.
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response <- createCluster(
databricks_client = mock_client,
cluster_name = "my-cluster",
spark_version = "<spark-version>",
node_type_id = "<node-type-id>",
autotermination_minutes = 15,
num_workers = 1
)
# Check that the function returned the correct mock response.
expect_equal(response$cluster_id, "abc123")
}
)
})
其他资源
有关详细信息,请参阅: