Modellrückschluss mit Hugging Face Transformers für die linguistische Datenverarbeitung (NLP)
Wichtig
- Diese Dokumentation wurde eingestellt und wird unter Umständen nicht aktualisiert. Die in diesem Inhalt erwähnten Produkte, Dienste oder Technologien werden nicht mehr unterstützt.
- Databricks empfiehlt stattdessen die Verwendung
ai_query
für die Batchableitung. Weitere Informationen finden Sie unter "Ausführen der Batch-Ableitung mithilfe von ai_query".
In diesem Artikel erfahren Sie, wie Sie Hugging Face Transformers für NLP-Modellrückschlüsse (Natural Language Processing; linguistische Datenverarbeitung) verwenden.
Hugging Face Transformers stellt die Pipelines-Klasse bereit, um das vortrainierte Modell für Rückschlüsse zu verwenden. Hugging Face Transformers-Pipelines unterstützen verschiedenste NLP-Aufgaben, die Sie problemlos in Azure Databricks verwenden können.
Anforderungen
- MLflow 2.3
- Jeder Cluster, in dem die Hugging Face-Bibliothek
transformers
installiert ist, kann für Batchrückschlüsse verwendet werden. Die Bibliothektransformers
ist ab Databricks Runtime 10.4 LTS ML vorinstalliert. Viele der beliebten NLP-Modelle funktionieren am besten auf GPU-Hardware, sodass Sie mit aktueller GPU-Hardware wahrscheinlich die beste Leistung erzielen – es sei denn, Sie verwenden ein Modell, das speziell für die Verwendung auf CPUs optimiert ist.
Verwenden von Pandas-UDFs zum Verteilen der Modellberechnung in einem Spark-Cluster
Wenn Sie mit vortrainierten Modellen experimentieren, können Sie das Modell in Pandas-UDFs einschließen und die Berechnung auf Worker-CPUs oder -GPUs durchführen. Pandas-UDFs verteilen das Modell an die einzelnen Worker.
Sie können auch eine Hugging Face Transformers-Pipeline für maschinelle Übersetzungen erstellen und eine Pandas-UDF verwenden, um die Pipeline auf den Workern eines Spark-Clusters auszuführen:
import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf
device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)
@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
return pd.Series(translations)
Wenn Sie device
auf diese Weise festlegen, wird sichergestellt, dass GPUs verwendet werden, wenn sie im Cluster verfügbar sind.
Die Hugging Face-Pipelines für die Übersetzung geben eine Liste von Python-Objekten vom Typ dict
mit jeweils einem einzelnen Schlüssel (translation_text
) und einem Wert zurück, der den übersetzten Text enthält. Diese UDF extrahiert die Übersetzung aus den Ergebnissen, um eine Pandas-Serie mit nur dem übersetzten Text zurückzugeben. Wenn Ihre Pipeline durch Festlegen von device=0
für die Verwendung von GPUs konzipiert wurde, weist Spark automatisch GPUs auf den Workerknoten neu zu, wenn Ihr Cluster über Instanzen mit mehreren GPUs verfügt.
Um die UDF zum Übersetzen einer Textspalte zu verwenden, können Sie die UDF in einer select
-Anweisung aufrufen:
texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))
Zurückgeben komplexer Ergebnistypen
Mithilfe von Pandas-UDFs können Sie auch eine strukturiertere Ausgabe zurückgeben. Bei der Erkennung benannter Entitäten geben Pipelines beispielsweise eine Liste von dict
-Objekten zurück, die die Entität, ihre Spanne, den Typ und einen zugeordneten Score enthalten. Das Beispiel ähnelt zwar dem Beispiel für die Übersetzung, der Rückgabetyp für die @pandas_udf
-Anmerkung ist bei der Erkennung benannter Entitäten allerdings komplexer.
Sie können sich ein Bild von den zu verwendenden Rückgabetypen machen, indem Sie Pipelineergebnisse überprüfen (beispielsweise durch Ausführen der Pipeline im Treiber).
Verwenden Sie in diesem Beispiel den folgenden Code:
from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)
Resultierende Anmerkungen:
[[{'entity_group': 'ORG',
'score': 0.99933606,
'word': 'Hugging Face',
'start': 0,
'end': 12},
{'entity_group': 'LOC',
'score': 0.99967843,
'word': 'New York City',
'start': 42,
'end': 55}],
[{'entity_group': 'ORG',
'score': 0.9996372,
'word': 'Databricks',
'start': 0,
'end': 10},
{'entity_group': 'LOC',
'score': 0.999588,
'word': 'San Francisco',
'start': 23,
'end': 36}]]
Wenn Sie dies als Rückgabetyp darstellen möchten, können Sie ein Array (array
) von struct
-Feldern verwenden und dabei die dict
-Einträge als Felder von struct
auflisten:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))
display(df.select(df.texts, ner_udf(df.texts).alias('entities')))
Optimieren der Leistung
Bei der Optimierung der Leistung der UDF gibt es mehrere wichtige Aspekte. Erstens: Jede GPU muss effektiv verwenden werden. Dies können Sie anpassen, indem Sie die Größe der Batches ändern, die von der Transformers-Pipeline an die GPU gesendet werden. Zweitens: Stellen Sie sicher, dass der Datenrahmen (DataFrame) korrekt partitioniert ist, um den gesamten Cluster zu nutzen.
Darüber hinaus empfiehlt es sich gegebenenfalls, das Hugging Face-Modell zwischenzuspeichern, um die Ladezeit des Modells oder Kosten für eingehende Daten zu sparen.
Auswählen einer Batchgröße
Die oben beschriebenen UDFs sollten zwar direkt mit der Batchgröße 1 (batch_size
) funktionieren, dabei werden jedoch möglicherweise die für die Worker verfügbaren Ressourcen nicht effizient genutzt. Stimmen Sie die Batchgröße auf das Modell und die Hardware im Cluster ab, um die Leistung zu verbessern. Databricks empfiehlt, verschiedene Batchgrößen für die Pipeline in Ihrem Cluster auszuprobieren, um die beste Leistung zu ermitteln. Weitere Informationen zur Pipelinebatchverarbeitung und zu anderen Leistungsoptionen finden Sie in der Hugging Face-Dokumentation unter Pipeline batching bzw. unter Performance and Scalability.
Versuchen Sie, eine Batchgröße zu finden, die groß genug ist, um die GPU vollständig zu nutzen, aber keine CUDA out of memory
-Fehler verursacht. Falls bei der Optimierung CUDA out of memory
-Fehler auftreten, müssen Sie das Notebook trennen und erneut anfügen, um den vom Modell verwendeten Arbeitsspeicher und die Daten in der GPU freizugeben.
Überwachen Sie die GPU-Leistung, indem Sie sich die aktuellen Clustermetriken für einen Cluster ansehen und eine Metrik auswählen – beispielsweise gpu0-util
für die GPU-Prozessorauslastung oder gpu0_mem_util
für die GPU-Speicherauslastung.
Optimieren der Parallelität mit Zeitplanung auf Phasenebene
Standardmäßig plant Spark eine einzelne Aufgabe pro GPU auf jedem Computer. Um die Parallelität zu erhöhen, können Sie die Planung auf Phasenebene verwenden, um Spark mitzuteilen, wie viele Aufgaben pro GPU ausgeführt werden sollen. Wenn Spark also beispielsweise zwei Aufgaben pro GPU ausführen soll, können Sie dies wie folgt angeben:
from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder
task_requests = TaskResourceRequests().resource("gpu", 0.5)
builder = ResourceProfileBuilder()
resource_profile = builder.require(task_requests).build
rdd = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns)))).rdd.withResources(resource_profile)
Neupartitionieren von Daten, um die gesamte verfügbare Hardware zu verwenden
Der zweite Leistungsaspekt ist die vollständige Nutzung der Hardware in Ihrem Cluster. Im Allgemeinen empfiehlt sich ein kleines Vielfaches der Anzahl von GPUs Ihrer Worker (bei GPU-Clustern) oder der Anzahl von Kernen in den Workern Ihres Clusters (bei CPU-Clustern). Ihr Eingabedatenrahmen verfügt möglicherweise bereits über genügend Partitionen, um von der Parallelität des Clusters zu profitieren. Verwenden Sie df.rdd.getNumPartitions()
, um zu sehen, wie viele Partitionen der Datenrahmen enthält. Ein Datenrahmen kann mithilfe von repartitioned_df = df.repartition(desired_partition_count)
neu partitioniert werden.
Zwischenspeichern des Modells in DBFS oder auf Bereitstellungspunkten
Wenn Sie häufig ein Modell aus verschiedenen oder neu gestarteten Clustern laden, können Sie das Hugging Face-Modell ggf. auch im DBFS-Stammvolume oder auf einem Bereitstellungspunkt zwischenspeichern. Dies kann die Kosten für eingehenden Datenverkehr verringern und die Ladezeit des Modells in einem neuen oder neu gestarteten Cluster verkürzen. Legen Sie dazu die Umgebungsvariable TRANSFORMERS_CACHE
in Ihrem Code fest, bevor Sie die Pipeline laden.
Beispiel:
import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'
Alternativ können Sie ähnliche Ergebnisse erzielen, indem Sie das Modell in MLflow mit der MLflow-Variante transformers
protokollieren.
Notebook: Hugging Face Transformers-Rückschlüsse und MLflow-Protokollierung
Dieses Notebook ist ein End-to-End-Beispiel für die Textzusammenfassung mit Hugging Face Transformers-Pipelinerückschlüssen und MLflow-Protokollierung für einen schnellen Einstieg mit Beispielcode.
Notebook für Hugging Face Transformers-Pipelinerückschlüsse
Zusätzliche Ressourcen
Sie können Ihr Hugging Face-Modell mit den folgenden Leitfäden optimieren:
- Vorbereiten von Daten für die Optimierung von Hugging Face-Modellen
- Optimieren von Hugging Face-Modellen für eine einzelne GPU
Weitere Informationen finden Sie unter What are Hugging Face Transformers?.