你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

改编 R 脚本以便在生产环境中运行

本文介绍如何对现有的 R 脚本进行适当的更改,以便在 Azure 机器学习中将其作为作业运行。

必须做出本文中详述的大部分甚至所有更改。

删除用户交互

R 脚本必须设计为以无人值守的方式运行,并在容器中通过 Rscript 命令执行。 确保从脚本中删除任何交互式输入或输出。

添加分析

如果你的脚本需要任何类型的输入参数(大多数脚本都需要),请通过 Rscript 调用将输入传递到脚本中。

Rscript <name-of-r-script>.R
--data_file ${{inputs.<name-of-yaml-input-1>}} 
--brand ${{inputs.<name-of-yaml-input-2>}}

在 R 脚本中,分析输入并执行正确的类型转换。 我们建议使用 optparse 包。

以下代码片段演示如何执行以下操作:

  • 启动分析程序
  • 将所有输入添加为选项
  • 使用适当的数据类型分析输入

还可以添加默认值以方便测试。 建议添加默认值为 ./outputs--output 参数,以便存储脚本的任何输出。

library(optparse)

parser <- OptionParser()

parser <- add_option(
  parser,
  "--output",
  type = "character",
  action = "store",
  default = "./outputs"
)

parser <- add_option(
  parser,
  "--data_file",
  type = "character",
  action = "store",
  default = "data/myfile.csv"
)

parser <- add_option(
  parser,
  "--brand",
  type = "double",
  action = "store",
  default = 1
)
args <- parse_args(parser)

args 是命名列表。 稍后可以在脚本中使用这些参数中的任何一个。

找到 azureml_utils.R 帮助器脚本

必须在要运行的 R 脚本所在的同一工作目录中找到一个名为 azureml_utils.R 的帮助器脚本。 要运行的 R 脚本需要该帮助器脚本才能与 MLflow 服务器通信。 该帮助器脚本提供一个方法用于连续检索身份验证令牌,因为该令牌在运行的作业中快速变化。 该帮助器脚本还允许使用 R MLflow API 中提供的日志记录函数来记录模型、参数、标记和一般项目。

  1. 使用以下代码创建 azureml_utils.R 文件:

    # Azure ML utility to enable usage of the MLFlow R API for tracking with Azure Machine Learning (Azure ML). This utility does the following::
    # 1. Understands Azure ML MLflow tracking url by extending OSS MLflow R client.
    # 2. Manages Azure ML Token refresh for remote runs (runs that execute in Azure Machine Learning). It uses tcktk2 R libraray to schedule token refresh.
    #    Token refresh interval can be controlled by setting the environment variable MLFLOW_AML_TOKEN_REFRESH_INTERVAL and defaults to 30 seconds.
    
    library(mlflow)
    library(httr)
    library(later)
    library(tcltk2)
    
    new_mlflow_client.mlflow_azureml <- function(tracking_uri) {
      host <- paste("https", tracking_uri$path, sep = "://")
      get_host_creds <- function () {
        mlflow:::new_mlflow_host_creds(
          host = host,
          token = Sys.getenv("MLFLOW_TRACKING_TOKEN"),
          username = Sys.getenv("MLFLOW_TRACKING_USERNAME", NA),
          password = Sys.getenv("MLFLOW_TRACKING_PASSWORD", NA),
          insecure = Sys.getenv("MLFLOW_TRACKING_INSECURE", NA)
        )
      }
      cli_env <- function() {
        creds <- get_host_creds()
        res <- list(
          MLFLOW_TRACKING_USERNAME = creds$username,
          MLFLOW_TRACKING_PASSWORD = creds$password,
          MLFLOW_TRACKING_TOKEN = creds$token,
          MLFLOW_TRACKING_INSECURE = creds$insecure
        )
        res[!is.na(res)]
      }
      mlflow:::new_mlflow_client_impl(get_host_creds, cli_env, class = "mlflow_azureml_client")
    }
    
    get_auth_header <- function() {
        headers <- list()
        auth_token <- Sys.getenv("MLFLOW_TRACKING_TOKEN")
        auth_header <- paste("Bearer", auth_token, sep = " ")
        headers$Authorization <- auth_header
        headers
    }
    
    get_token <- function(host, exp_id, run_id) {
        req_headers <- do.call(httr::add_headers, get_auth_header())
        token_host <- gsub("mlflow/v1.0","history/v1.0", host)
        token_host <- gsub("azureml://","https://", token_host)
        api_url <- paste0(token_host, "/experimentids/", exp_id, "/runs/", run_id, "/token")
        GET( api_url, timeout(getOption("mlflow.rest.timeout", 30)), req_headers)
    }
    
    
    fetch_token_from_aml <- function() {
        message("Refreshing token")
        tracking_uri <- Sys.getenv("MLFLOW_TRACKING_URI")
        exp_id <- Sys.getenv("MLFLOW_EXPERIMENT_ID")
        run_id <- Sys.getenv("MLFLOW_RUN_ID")
        sleep_for <- 1
        time_left <- 30
        response <- get_token(tracking_uri, exp_id, run_id)
        while (response$status_code == 429 && time_left > 0) {
            time_left <- time_left - sleep_for
            warning(paste("Request returned with status code 429 (Rate limit exceeded). Retrying after ",
                        sleep_for, " seconds. Will continue to retry 429s for up to ", time_left,
                        " second.", sep = ""))
            Sys.sleep(sleep_for)
            sleep_for <- min(time_left, sleep_for * 2)
            response <- get_token(tracking_uri, exp_id)
        }
    
        if (response$status_code != 200){
            error_response = paste("Error fetching token will try again after sometime: ", str(response), sep = " ")
            warning(error_response)
        }
    
        if (response$status_code == 200){
            text <- content(response, "text", encoding = "UTF-8")
            json_resp <-jsonlite::fromJSON(text, simplifyVector = FALSE)
            json_resp$token
            Sys.setenv(MLFLOW_TRACKING_TOKEN = json_resp$token)
            message("Refreshing token done")
        }
    }
    
    clean_tracking_uri <- function() {
        tracking_uri <- httr::parse_url(Sys.getenv("MLFLOW_TRACKING_URI"))
        tracking_uri$query = ""
        tracking_uri <-httr::build_url(tracking_uri)
        Sys.setenv(MLFLOW_TRACKING_URI = tracking_uri)
    }
    
    clean_tracking_uri()
    tcltk2::tclTaskSchedule(as.integer(Sys.getenv("MLFLOW_TOKEN_REFRESH_INTERVAL_SECONDS", 30))*1000, fetch_token_from_aml(), id = "fetch_token_from_aml", redo = TRUE)
    
    # Set MLFlow related env vars
    Sys.setenv(MLFLOW_BIN = system("which mlflow", intern = TRUE))
    Sys.setenv(MLFLOW_PYTHON_BIN = system("which python", intern = TRUE))
    
  2. 使用以下代码行启动 R 脚本:

source("azureml_utils.R")

将数据文件作为本地文件读取

当你将 R 脚本作为作业运行时,Azure 机器学习会获取你在作业提交内容中指定的数据,并将其装载到正在运行的容器。 因此,你可以像读取正在运行的容器上的本地文件一样读取数据文件。

  • 确保源数据已注册为数据资产
  • 在作业提交参数中按名称传递数据资产
  • 像平时读取本地文件一样读取这些文件

参数部分中所示定义输入参数。 使用参数 data-file 指定完整路径,以便可以使用 read_csv(args$data_file) 读取数据资产。

保存作业项目(图像、数据等)

重要

本部分不适用于模型。 有关特定于模型的保存和日志记录操作说明,请参阅以下两部分。

可以存储由 Azure 机器学习中的 R 脚本生成的任意脚本输出,例如数据文件、图像、序列化 R 对象等。 创建一个 ./outputs 目录来存储任何生成的项目(图像、模型、数据等)。保存到 ./outputs 的任何文件将自动包含在运行中,并在运行结束时上传到试验。 由于在输入参数部分为 --output 参数添加了默认值,因此请在 R 脚本中包含以下代码片段以创建 output 目录。

if (!dir.exists(args$output)) {
  dir.create(args$output)
}

创建该目录后,将项目保存到该目录。 例如:

# create and save a plot
library(ggplot2)

myplot <- ggplot(...)

ggsave(myplot, 
       filename = file.path(args$output,"forecast-plot.png"))


# save an rds serialized object
saveRDS(myobject, file = file.path(args$output,"myobject.rds"))

使用 carrier 包运行模型 crate 操作

R MLflow API 文档 指定 R 模型需要采用 crate 模型风格

  • 如果你的 R 脚本训练了一个模型,并且你生成了一个模型对象,则需要对该模型执行 crate,只有这样,以后才能使用 Azure 机器学习部署它。
  • 如果使用 crate 函数,请在调用所需的任何打包函数时使用显式命名空间。

假设使用 fable 包创建了一个名为 my_ts_model 的时序模型对象。 为了使此模型在部署时可调用,请创建一个 crate(将在其中传递模型对象),并在周期数中创建一个预测边际:

library(carrier)
crated_model <- crate(function(x)
{
  fabletools::forecast(!!my_ts_model, h = x)
})

crated_model 对象是要记录的对象。

使用 R MLflow API 记录模型、参数、标记或其他项目

除了保存任何生成的项目之外,还可以记录每个运行的模型、标记和参数。 请使用 R MLflow API 执行此操作。

记录模型时,将记录按上一部分所述创建的模型。

注意

记录模型时,该模型也将被保存并添加到运行项目中。 除非未记录模型,否则无需显式保存该模型。

若要记录模型和/或参数,请执行以下操作:

  1. 使用 mlflow_start_run() 启动运行
  2. 使用 mlflow_log_modelmlflow_log_parammlflow_log_batch 记录项目
  3. 不要使用 mlflow_end_run() 结束运行。 请跳过此调用,因为它当前会导致错误。

例如,若要记录在上一部分创建的 crated_model 对象,请在 R 脚本中包含以下代码:

提示

在记录模型时使用 models 作为 artifact_path 的值,这是最佳做法(即使可以指定其他名称。)

mlflow_start_run()

mlflow_log_model(
  model = crated_model, # the crate model object
  artifact_path = "models" # a path to save the model object to
  )

mlflow_log_param(<key-name>, <value>)

# mlflow_end_run() - causes an error, do not include mlflow_end_run()

脚本结构和示例

完成本文中所述的所有更改后,使用这些代码片段作为构造 R 脚本的指南。

# BEGIN R SCRIPT

# source the azureml_utils.R script which is needed to use the MLflow back end
# with R
source("azureml_utils.R")

# load your packages here. Make sure that they are installed in the container.
library(...)

# parse the command line arguments.
library(optparse)

parser <- OptionParser()

parser <- add_option(
  parser,
  "--output",
  type = "character",
  action = "store",
  default = "./outputs"
)

parser <- add_option(
  parser,
  "--data_file",
  type = "character",
  action = "store",
  default = "data/myfile.csv"
)

parser <- add_option(
  parser,
  "--brand",
  type = "double",
  action = "store",
  default = 1
)
args <- parse_args(parser)

# your own R code goes here
# - model building/training
# - visualizations
# - etc.

# create the ./outputs directory
if (!dir.exists(args$output)) {
  dir.create(args$output)
}

# log models and parameters to MLflow
mlflow_start_run()

mlflow_log_model(
  model = crated_model, # the crate model object
  artifact_path = "models" # a path to save the model object to
  )

mlflow_log_param(<key-name>, <value>)

# mlflow_end_run() - causes an error, do not include mlflow_end_run()
## END OF R SCRIPT

创建环境

若要运行 R 脚本,请使用 Azure CLI 的 ml 扩展(也称为 CLI v2)。 ml 命令使用 YAML 作业定义文件。 有关使用 az ml 提交作业的详细信息,请参阅使用 Azure 机器学习 CLI 训练模型

YAML 作业文件指定一个环境。 需要先在工作区中创建此环境才能运行作业。

可以在 Azure 机器学习工作室中或使用 Azure CLI 创建环境。

无论使用哪种方法,都要使用 Dockerfile。 R 环境的所有 Docker 上下文文件必须包含以下规范才能在 Azure 机器学习中正常运行:

FROM rocker/tidyverse:latest

# Install python
RUN apt-get update -qq && \
 apt-get install -y python3-pip tcl tk libz-dev libpng-dev

RUN ln -f /usr/bin/python3 /usr/bin/python
RUN ln -f /usr/bin/pip3 /usr/bin/pip
RUN pip install -U pip

# Install azureml-MLflow
RUN pip install azureml-MLflow
RUN pip install MLflow

# Create link for python
RUN ln -f /usr/bin/python3 /usr/bin/python

# Install R packages required for logging with MLflow (these are necessary)
RUN R -e "install.packages('mlflow', dependencies = TRUE, repos = 'https://cloud.r-project.org/')"
RUN R -e "install.packages('carrier', dependencies = TRUE, repos = 'https://cloud.r-project.org/')"
RUN R -e "install.packages('optparse', dependencies = TRUE, repos = 'https://cloud.r-project.org/')"
RUN R -e "install.packages('tcltk2', dependencies = TRUE, repos = 'https://cloud.r-project.org/')"

基础映像是 rocker/tidyverse:latest,其中已安装许多 R 包及其依赖项。

重要

必须提前安装脚本需要运行的任何 R 包。 根据需要向 Docker 上下文文件添加更多行。

RUN R -e "install.packages('<package-to-install>', dependencies = TRUE, repos = 'https://cloud.r-project.org/')"

其他建议

可能需要考虑的其他一些建议:

  • 使用 R 的 tryCatch 函数进行异常和错误处理
  • 添加显式日志记录进行故障排除和调试

后续步骤