使用 Hugging Face Transformers 进行 NLP 的模型推理
重要
- 本文档已过时,将来可能不会更新。 本内容中提及的产品、服务或技术不再受支持。
- Databricks 建议改用
ai_query
进行批处理推理。 请参阅使用 ai_query 执行批处理推理。
本文介绍如何使用 Hugging Face Transformers 进行自然语言处理 (NLP) 模型推理。
Hugging Face Transformers 提供管道类来使用预训练模型进行推理。 🤗 Transformers 管道支持广泛的 NLP 任务,你可以在 Azure Databricks 上轻松使用这些任务。
要求
- MLflow 2.3
- 任何安装了 Hugging Face
transformers
库的群集都可用于批量推理。transformers
库已预装在 Databricks Runtime 10.4 LTS ML 及更高版本上。 许多流行的 NLP 模型在 GPU 硬件上可以保持最佳工作状态,因此你可以使用最新的 GPU 硬件来获得最佳性能,除非使用专门针对 CPU 优化的模型。
使用 Pandas UDF 在 Spark 群集上分配模型计算
在体验预先训练的模型时,可以使用 Pandas UDF 来包装模型并在工作器 CPU 或 GPU 上执行计算。 Pandas UDF 会将模型分配到每个工作器。
还可以创建用于机器翻译的 Hugging Face Transformers 管道,并使用 Pandas UDF 在 Spark 群集的辅助角色上运行该管道:
import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf
device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)
@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
return pd.Series(translations)
以这种方式设置 device
可以确保使用群集上可用的 GPU。
用于翻译的 Hugging Face 管道返回一系列 Python dict
对象,其中每个对象都有一个 translation_text
键和一个包含翻译文本的值。 此 UDF 从结果中提取翻译,以返回仅包含翻译文本的 Pandas 系列。 如果通过设置 device=0
将管道构造为使用 GPU,在群集中的实例包含多个 GPU 的情况下,Spark 会自动在工作器节点上重新分配 GPU。
若要使用 UDF 翻译文本列,可以在 select
语句中调用 UDF:
texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))
返回复杂结果类型
使用 Pandas UDF 还可以返回结构化程度更高的输出。 例如,在命名实体识别中,管道将返回包含实体、其范围、类型和关联分数的 dict
对象列表。 虽然与翻译示例类似,但在命名实体识别案例中,@pandas_udf
注释的返回类型更复杂。
可以通过检查管道结果来了解要使用的返回类型,例如通过在驱动器上运行管道。
此示例使用了以下代码:
from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)
生成注释:
[[{'entity_group': 'ORG',
'score': 0.99933606,
'word': 'Hugging Face',
'start': 0,
'end': 12},
{'entity_group': 'LOC',
'score': 0.99967843,
'word': 'New York City',
'start': 42,
'end': 55}],
[{'entity_group': 'ORG',
'score': 0.9996372,
'word': 'Databricks',
'start': 0,
'end': 10},
{'entity_group': 'LOC',
'score': 0.999588,
'word': 'San Francisco',
'start': 23,
'end': 36}]]
若要将此数据表示为返回类型,可以使用 struct
字段的 array
,将 dict
条目列为 struct
的字段:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))
display(df.select(df.texts, ner_udf(df.texts).alias('entities')))
调整性能
可以在多个重要方面优化 UDF 的性能。 第一个方面是有效使用每个 GPU,对此可以通过更改转换器管道发送到 GPU 的批大小来进行调整。 第二个方面是确保数据帧已合理分区以利用整个群集。
最后,建议缓存 Hugging Face 模型以节省模型加载时间或流入成本。
选择批大小
虽然使用值为 1 的 batch_size
就能现成地使用上述 UDF,但这样可能无法有效利用可供工作器使用的资源。 若要提高性能,请根据群集中的模型和硬件优化批大小。 Databricks 建议为群集上的管道尝试各种批大小,以获得最佳性能。 在 Hugging Face 文档中详细了解管道批处理和其他性能选项。
尝试找到一个足够大的批大小,以全面促进 GPU 的利用,但不会导致 CUDA out of memory
错误。 如果在优化过程中收到 CUDA out of memory
错误,则需要分离再重新附加笔记本,以释放模型及 GPU 中的数据占用的内存。
通过查看群集的实时群集指标并选择某个指标(例如,表示 GPU 处理器利用率的 gpu0-util
,或表示 GPU 内存利用率的 gpu0_mem_util
)来监视 GPU 性能。
使用阶段级计划优化并行度
默认情况下,Spark 在每台计算机上为每个 GPU 计划一个任务。 若要提高并行度,可以使用阶段级计划来告知 Spark 每个 GPU 要运行的任务数。 例如,如果希望 Spark 为每个 GPU 运行两个任务,可按以下方式进行指定:
from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder
task_requests = TaskResourceRequests().resource("gpu", 0.5)
builder = ResourceProfileBuilder()
resource_profile = builder.require(task_requests).build
rdd = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns)))).rdd.withResources(resource_profile)
将数据重新分区以使用所有可用的硬件
在性能方面,第二个考虑因素是充分利用群集中的硬件。 一般情况下,使用工作器上 GPU 数量(对于 GPU 群集)或群集中工作器上的核心数量(对于 CPU 群集)的较小倍数就能获得不错的效果。 输入数据帧可能已有足够的分区来利用群集的并行度。 若要查看数据帧包含多少个分区,请使用 df.rdd.getNumPartitions()
。 可以使用 repartitioned_df = df.repartition(desired_partition_count)
将数据帧重新分区。
将模型缓存在 DBFS 或装载点上
如果你经常从不同群集或重启的群集加载模型,则还建议将 Hugging Face 模型缓存在 DBFS 根卷或某个装载点上。 这可以降低流入成本,并减少在新群集或重启的群集上加载模型所需的时间。 为此,请在加载管道之前,先在代码中设置 TRANSFORMERS_CACHE
环境变量。
例如:
import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'
也可通过使用 MLflow transformers
风格将模型记录到 MLflow 来获得类似的结果。
笔记本:Hugging Face 转换器推理和 MLflow 日志记录
为了让你快速开始使用示例代码,此笔记本提供了一个端到端示例,它使用 Hugging Face 转换器管道推理和 MLflow 日志记录进行文本汇总。
Hugging Face 转换器管道推理笔记本
其他资源
可按照以下指南微调 Hugging Face 模型: