使用 Hugging Face Transformers 进行 NLP 的模型推理

重要

  • 本文档已过时,将来可能不会更新。 本内容中提及的产品、服务或技术不再受支持。
  • Databricks 建议改用 ai_query 进行批处理推理。 请参阅使用 ai_query 执行批处理推理

本文介绍如何使用 Hugging Face Transformers 进行自然语言处理 (NLP) 模型推理。

Hugging Face Transformers 提供管道类来使用预训练模型进行推理。 🤗 Transformers 管道支持广泛的 NLP 任务,你可以在 Azure Databricks 上轻松使用这些任务。

要求

  • MLflow 2.3
  • 任何安装了 Hugging Face transformers 库的群集都可用于批量推理。 transformers 库已预装在 Databricks Runtime 10.4 LTS ML 及更高版本上。 许多流行的 NLP 模型在 GPU 硬件上可以保持最佳工作状态,因此你可以使用最新的 GPU 硬件来获得最佳性能,除非使用专门针对 CPU 优化的模型。

使用 Pandas UDF 在 Spark 群集上分配模型计算

在体验预先训练的模型时,可以使用 Pandas UDF 来包装模型并在工作器 CPU 或 GPU 上执行计算。 Pandas UDF 会将模型分配到每个工作器。

还可以创建用于机器翻译的 Hugging Face Transformers 管道,并使用 Pandas UDF 在 Spark 群集的辅助角色上运行该管道:

import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf

device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)

@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
  translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
  return pd.Series(translations)

以这种方式设置 device 可以确保使用群集上可用的 GPU。

用于翻译的 Hugging Face 管道返回一系列 Python dict 对象,其中每个对象都有一个 translation_text 键和一个包含翻译文本的值。 此 UDF 从结果中提取翻译,以返回仅包含翻译文本的 Pandas 系列。 如果通过设置 device=0 将管道构造为使用 GPU,在群集中的实例包含多个 GPU 的情况下,Spark 会自动在工作器节点上重新分配 GPU。

若要使用 UDF 翻译文本列,可以在 select 语句中调用 UDF:

texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))

返回复杂结果类型

使用 Pandas UDF 还可以返回结构化程度更高的输出。 例如,在命名实体识别中,管道将返回包含实体、其范围、类型和关联分数的 dict 对象列表。 虽然与翻译示例类似,但在命名实体识别案例中,@pandas_udf 注释的返回类型更复杂。

可以通过检查管道结果来了解要使用的返回类型,例如通过在驱动器上运行管道。

此示例使用了以下代码:

from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)

生成注释:

[[{'entity_group': 'ORG',
   'score': 0.99933606,
   'word': 'Hugging Face',
   'start': 0,
   'end': 12},
  {'entity_group': 'LOC',
   'score': 0.99967843,
   'word': 'New York City',
   'start': 42,
   'end': 55}],
 [{'entity_group': 'ORG',
   'score': 0.9996372,
   'word': 'Databricks',
   'start': 0,
   'end': 10},
  {'entity_group': 'LOC',
   'score': 0.999588,
   'word': 'San Francisco',
   'start': 23,
   'end': 36}]]

若要将此数据表示为返回类型,可以使用 struct 字段的 array,将 dict 条目列为 struct 的字段:

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
  return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))

display(df.select(df.texts, ner_udf(df.texts).alias('entities')))

调整性能

可以在多个重要方面优化 UDF 的性能。 第一个方面是有效使用每个 GPU,对此可以通过更改转换器管道发送到 GPU 的批大小来进行调整。 第二个方面是确保数据帧已合理分区以利用整个群集。

最后,建议缓存 Hugging Face 模型以节省模型加载时间或流入成本。

选择批大小

虽然使用值为 1 的 batch_size 就能现成地使用上述 UDF,但这样可能无法有效利用可供工作器使用的资源。 若要提高性能,请根据群集中的模型和硬件优化批大小。 Databricks 建议为群集上的管道尝试各种批大小,以获得最佳性能。 在 Hugging Face 文档中详细了解管道批处理和其他性能选项

尝试找到一个足够大的批大小,以全面促进 GPU 的利用,但不会导致 CUDA out of memory 错误。 如果在优化过程中收到 CUDA out of memory 错误,则需要分离再重新附加笔记本,以释放模型及 GPU 中的数据占用的内存。

通过查看群集的实时群集指标并选择某个指标(例如,表示 GPU 处理器利用率的 gpu0-util,或表示 GPU 内存利用率的 gpu0_mem_util)来监视 GPU 性能。

使用阶段级计划优化并行度

默认情况下,Spark 在每台计算机上为每个 GPU 计划一个任务。 若要提高并行度,可以使用阶段级计划来告知 Spark 每个 GPU 要运行的任务数。 例如,如果希望 Spark 为每个 GPU 运行两个任务,可按以下方式进行指定:

from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder

task_requests = TaskResourceRequests().resource("gpu", 0.5)

builder = ResourceProfileBuilder()
resource_profile = builder.require(task_requests).build

rdd = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns)))).rdd.withResources(resource_profile)

将数据重新分区以使用所有可用的硬件

在性能方面,第二个考虑因素是充分利用群集中的硬件。 一般情况下,使用工作器上 GPU 数量(对于 GPU 群集)或群集中工作器上的核心数量(对于 CPU 群集)的较小倍数就能获得不错的效果。 输入数据帧可能已有足够的分区来利用群集的并行度。 若要查看数据帧包含多少个分区,请使用 df.rdd.getNumPartitions()。 可以使用 repartitioned_df = df.repartition(desired_partition_count) 将数据帧重新分区。

将模型缓存在 DBFS 或装载点上

如果你经常从不同群集或重启的群集加载模型,则还建议将 Hugging Face 模型缓存在 DBFS 根卷或某个装载点上。 这可以降低流入成本,并减少在新群集或重启的群集上加载模型所需的时间。 为此,请在加载管道之前,先在代码中设置 TRANSFORMERS_CACHE 环境变量。

例如:

import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'

也可通过使用 MLflow transformers 风格将模型记录到 MLflow 来获得类似的结果。

笔记本:Hugging Face 转换器推理和 MLflow 日志记录

为了让你快速开始使用示例代码,此笔记本提供了一个端到端示例,它使用 Hugging Face 转换器管道推理和 MLflow 日志记录进行文本汇总。

Hugging Face 转换器管道推理笔记本

获取笔记本

其他资源

可按照以下指南微调 Hugging Face 模型:

详细了解什么是 Hugging Face Transformers?