Interaktives Data Wrangling mit Apache Spark in Azure Machine Learning
Data Wrangling wird zu einem der wichtigsten Aspekte von Projekten für maschinelles Lernen. Die Azure Machine Learning-Integration mit Azure Synapse Analytics bietet mithilfe von Azure Synapse Zugriff auf einen Apache Spark-Pool, sodass interaktives Data Wrangling mithilfe von Azure Machine Learning-Notebooks durchgeführt werden kann.
In diesem Artikel erfahren Sie, wie Sie Data Wrangling mithilfe der folgenden Elemente durchführen können.
- Serverloses Spark Compute
- Angefügter Synapse Spark-Pool
Voraussetzungen
- Ein Azure-Abonnement: Sollten Sie über kein Azure-Abonnement verfügen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.
- Ein Azure Machine Learning-Arbeitsbereich. Weitere Informationen finden Sie unter Erstellen von Arbeitsbereichsressourcen.
- Ein ADLS Gen2-Speicherkonto (Azure Data Lake Storage). Weitere Informationen finden Sie unter Erstellen eines Azure Data Lake Storage (ADLS) Gen2-Speicherkontos.
- (Optional): Eine Azure Key Vault-Instanz. Weitere Informationen finden Sie unter Erstellen einer Azure Key Vault-Instanz.
- (Optional): Ein Dienstprinzipal. Weitere Informationen finden Sie unter Erstellen eines Dienstprinzipals.
- (Optional:) Ein angefügter Synapse Spark-Pool im Azure Machine Learning-Arbeitsbereich
Bevor Sie Ihre Data Wrangling-Aufgaben starten, sollten Sie sich über den Prozess der Speicherung von Geheimnissen informieren.
- Zugriffsschlüssel für das Azure Blob Storage-Konto
- SAS-Token (Shared Access Signature)
- Dienstprinzipalinformationen für ADLS Gen2 (Azure Data Lake Storage)
in der Azure Key Vault-Instanz vertraut machen. Außerdem müssen Sie wissen, wie Rollenzuweisungen in Azure-Speicherkonten funktionieren. In den folgenden Abschnitten in diesem Dokument werden diese Konzepte beschrieben. Anschließend wird auf die Einzelheiten des interaktiven Data Wrangling mithilfe des Spark-Pools in Azure Machine Learning-Notebooks eingegangen.
Tipp
Weitere Informationen zur Konfiguration der Rollenzuweisung für Azure Storage-Konten und zum Zugreifen auf Daten in Ihren Speicherkonten per Passthrough der Benutzeridentität finden Sie unter Hinzufügen von Rollenzuweisungen in Azure Storage-Konten.
Interaktives Data Wrangling mit Apache Spark
Azure Machine Learning bietet serverloses Spark-Compute und einen angefügten Synapse Spark-Pool für interaktives Data Wrangling mit Apache Spark in Azure Machine Learning-Notebooks. Für serverloses Spark Compute müssen keine Ressourcen im Azure Synapse-Arbeitsbereich erstellt werden. Stattdessen wird eine vollständig verwaltete serverlose Spark-Computeressource direkt in den Azure Machine Learning-Notebooks verfügbar. Die Verwendung des serverlosen Spark-Compute ist die einfachste Möglichkeit, auf einen Spark-Cluster in Azure Machine Learning zuzugreifen.
Serverloses Spark Compute in Azure Machine Learning Notebooks
Serverloses Spark Compute ist standardmäßig in Azure Machine Learning Notebooks verfügbar. Um in einem Notebook darauf zuzugreifen, wählen Sie unter Azure Machine Learning Serverless Spark die Option Serverloses Spark-Compute aus dem Auswahlmenü Compute aus.
Die Benutzeroberfläche von Notebooks bietet auch Optionen für die Konfiguration von Spark-Sitzungen für serverloses Spark-Compute. So konfigurieren Sie eine Spark-Sitzung:
- Wählen Sie am oberen Rand des Bildschirms Sitzung konfigurieren.
- Wählen Sie die Apache Spark-Version aus der Dropdownliste aus.
Wichtig
Azure Synapse-Runtime für Apache Spark: Ankündigungen
- Azure Synapse Runtime for Apache Spark 3.2:
- EOLA-Datum: 8. Juli 2023
- Datum für Supportende: 8. Juli 2024. Nach diesem Datum wird die Runtime deaktiviert.
- Apache Spark 3.3:
- EOLA-Ankündigungsdatum: 12. Juli 2024
- Datum für Supportende: 31. März 2025. Nach diesem Datum wird die Runtime deaktiviert.
- Wenn Sie weiterhin Support erhalten und von einer optimalen Leistung profitieren möchten, sollten Sie zu Apache Spark 3.4 migrieren.
- Azure Synapse Runtime for Apache Spark 3.2:
- Wählen Sie im Dropdownmenü Instanztyp aus. Diese Typen werden derzeit unterstützt:
Standard_E4s_v3
Standard_E8s_v3
Standard_E16s_v3
Standard_E32s_v3
Standard_E64s_v3
- Geben Sie einen Wert für das Spark-Sitzungstimeout in Minuten ein.
- Wählen Sie aus, ob Sie Executoren dynamisch zuordnen möchten.
- Wählen Sie die Anzahl der Executors für die Spark-Sitzung aus.
- Wählen Sie aus dem Dropdownmenü die Executor size (Executorgröße) aus.
- Wählen Sie aus dem Dropdownmenü die Driver size (Treibergröße) aus.
- Aktivieren Sie das Kontrollkästchen Conda-Datei hochladen, um eine Conda-Datei zum Konfigurieren einer Spark-Sitzung zu verwenden. Wählen Sie anschließend Durchsuchen aus, und wählen Sie die Conda-Datei mit der gewünschten Spark-Sitzungskonfiguration aus.
- Fügen Sie Eigenschaften zu den Konfigurationseinstellungen hinzu, geben Sie Werte in die Textfelder Eigenschaft und Wert ein, und wählen Sie Hinzufügen aus.
- Wählen Sie Übernehmen.
- Wählen Sie im Popupfenster Neue Sitzung konfigurieren? die Option Sitzung beenden aus.
Die Änderungen an der Sitzungskonfiguration bleiben bestehen und werden für eine andere Notebook-Sitzung verfügbar, die mit dem serverlosen Spark Compute gestartet wird.
Tipp
Wenn Sie Conda-Pakete auf Sitzungsebene verwenden, können Sie die Kaltstartzeit der Spark-Sitzung verbessern, wenn Sie die Konfigurationsvariable spark.hadoop.aml.enable_cache
auf true festlegen. Ein Kaltstart der Sitzung mit Conda-Paketen auf Sitzungsebene dauert in der Regel 10 bis 15 Minuten, wenn die Sitzung zum ersten Mal gestartet wird. Nachfolgende Kaltstarts von Sitzungen, bei denen die Konfigurationsvariable auf „true“ gesetzt ist, dauern jedoch normalerweise drei bis fünf Minuten.
Importieren und Aufbereiten von Daten aus ADLS Gen2 (Azure Data Lake Storage)
Sie können auf in Azure Data Lake Storage (ADLS) Gen2-Speicherkonten gespeicherte Daten mit abfss://
-Daten-URIs zugreifen und diese verarbeiten. Dazu befolgen Sie einem der beiden Datenzugriffsmechanismen:
- Passthrough der Benutzeridentität
- Dienstprinzipalbasierter Datenzugriff
Tipp
Data Wrangling mit serverlosem Spark-Computing und Passthrough der Benutzeridentität für den Zugriff auf Daten in einem ADLS Gen 2-Speicherkonto (Azure Data Lake Storage) erfordert die geringste Anzahl von Konfigurationsschritten.
So starten Sie das interaktive Data Wrangling mit dem Passthrough der Benutzeridentität:
Überprüfen Sie, ob die Benutzeridentität über die Rollenzuweisungen Mitwirkender und Mitwirkender an Storage-Blobdaten im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügt.
Um das serverlose Spark Compute zu verwenden, wählen Sie unter Azure Machine Learning Serverless Spark die Option Serverloses Spark Compute im Compute-Auswahlmenü aus.
Wählen Sie im Auswahlmenü Compute unter Synapse Spark-Pool einen angefügten Synapse Spark-Pool aus, um einen angefügten Synapse Spark-Pool zu verwenden.
Bei diesem Codebeispiel für Titanic-Data Wrangling wird die Verwendung eines Daten-URI im Format
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
mitpyspark.pandas
undpyspark.ml.feature.Imputer
veranschaulicht.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled", index_col="PassengerId", )
Hinweis
Dieses Python-Codebeispiel verwendet
pyspark.pandas
. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.
So bereiten Sie Daten durch den Zugriff über einen Dienstprinzipal auf:
Überprüfen Sie, ob der Dienstprinzipal über die Rollenzuweisungen Mitwirkender und Mitwirkender an Storage-Blobdaten im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügt.
Erstellen Sie Azure Key Vault-Geheimnisse für die Mandanten-ID, die Client-ID und die Werte des geheimen Clientschlüssels des Dienstprinzipals.
Wählen Sie im Menü Compute die Option Serverloses Spark-Compute unter Serverloses Spark-Compute von Azure Machine Learning aus. Sie können auch einen angefügten Synapse Spark-Pool unter Synapse Spark-Pools im Menü Compute auswählen.
Legen Sie die Mandanten-ID, die Client-ID und den geheimen Clientschlüssel des Dienstprinzipals in der Konfiguration fest, und führen Sie das folgende Codebeispiel aus.
Der
get_secret()
-Aufruf im Code hängt vom Namen der Azure Key Vault-Instanz und den Namen der Azure Key Vault-Geheimnisse ab, die für die Mandanten-ID, die Client-ID und den geheimen Clientschlüssel des Dienstprinzipals erstellt wurden. Legen Sie diese entsprechenden Eigenschaftsnamen/Werte in der Konfiguration fest:- Eigenschaft der Client-ID:
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Eigenschaft des geheimen Clientschlüssels:
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Eigenschaft der Mandanten-ID:
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Wert der Mandanten-ID:
https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary # Set up service principal tenant ID, client ID and secret from Azure Key Vault client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>") tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>") client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>") # Set up service principal which has access of the data sc._jsc.hadoopConfiguration().set( "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth" ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_id, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_secret, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", )
- Eigenschaft der Client-ID:
Importieren Sie die Daten wie im Codebeispiel gezeigt mithilfe des Daten-URI im Format
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
, und bereiten Sie sie auf. Verwenden Sie dabei die Titanic-Daten.
Importieren und Aufbereiten von Daten aus Azure Blob Storage
Sie können auf Azure Blob Storage-Daten entweder mit dem Zugriffsschlüssel für das Speicherkonto oder einem SAS-Token (Shared Access Signature) zugreifen. Sie sollten diese Anmeldeinformationen in der Azure Key Vault-Instanz als Geheimnis speichern und als Eigenschaften in der Sitzungskonfiguration festlegen.
So beginnen Sie das interaktive Data Wrangling:
Wählen Sie im linken Bereich von Azure Machine Learning Studio Notebooks aus.
Wählen Sie im Menü Compute die Option Serverloses Spark-Compute unter Serverloses Spark-Compute von Azure Machine Learning aus. Sie können auch einen angefügten Synapse Spark-Pool unter Synapse Spark-Pools im Menü Compute auswählen.
So konfigurieren Sie den Zugriffsschlüssel für das Speicherkonto oder ein SAS-Token (Shared Access Signature) für den Datenzugriff in Azure Machine Learning-Notebooks:
Legen Sie die
fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
-Eigenschaft für den Zugriffsschlüssel wie in diesem Codeschnipsel gezeigt fest:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key )
Legen Sie die
fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
-Eigenschaft für das SAS-Token wie in diesem Codeschnipsel gezeigt fest:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", sas_token, )
Hinweis
Bei den
get_secret()
-Aufrufen in den obigen Codeschnipseln sind der Name der Azure Key Vault-Instanz und die Namen der Geheimnisse erforderlich, die für den Zugriffsschlüssel oder das SAS-Token des Azure Blob Storage-Kontos erstellt wurden.
Führen Sie den Data Wrangling-Code im selben Notebook aus. Formatieren Sie den Daten-URI ähnlich wie in diesem Codeschnipsel als
wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>
:import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled", index_col="PassengerId", )
Hinweis
Dieses Python-Codebeispiel verwendet
pyspark.pandas
. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.
Importieren und Aufbereiten von Daten aus dem Azure Machine Learning-Datenspeicher
Definieren Sie einen Pfad zu Daten im Datenspeicher mit dem URI-Formatazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>
, um auf Daten aus dem Azure Machine Learning-Datenspeicher zuzugreifen. So bereiten Sie Daten aus einem Azure Machine Learning-Datenspeicher interaktiv in einer Notebooks-Sitzung auf:
Wählen Sie Serverloses Spark Compute unter Azure Machine Learning Serverless Spark aus dem Compute-Auswahlmenü aus, oder wählen Sie einen angefügten Synapse Spark-Pool unter Synapse Spark-Pools aus dem Compute-Auswahlmenü aus.
In diesem Codebeispiel wird gezeigt, wie Titanic-Daten mithilfe von
azureml://
-Datenspeicher-URI,pyspark.pandas
undpyspark.ml.feature.Imputer
aus einem Azure Machine Learning-Datenspeicher gelesen und aufbereitet werden.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "azureml://datastores/workspaceblobstore/paths/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "azureml://datastores/workspaceblobstore/paths/data/wrangled", index_col="PassengerId", )
Hinweis
Dieses Python-Codebeispiel verwendet
pyspark.pandas
. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.
Azure Machine Learning-Datenspeicher können mithilfe der Anmeldeinformationen für das Azure-Speicherkonto auf Daten zugreifen
- Zugriffsschlüssel
- SAS-Token
- Dienstprinzipal
oder den Datenzugriff ohne Anmeldeinformationen verwenden. Je nach Datenspeichertyp und zugrunde liegendem Azure-Speicherkontotyp können Sie einen geeigneten Authentifizierungsmechanismus auswählen, um den Datenzugriff sicherzustellen. In dieser Tabelle sind die Authentifizierungsmechanismen für den Zugriff auf Daten in den Azure Machine Learning-Datenspeichern zusammengefasst:
Speicherkontotyp | Datenzugriff ohne Anmeldeinformationen | Datenzugriffsmechanismus | Rollenzuweisungen |
---|---|---|---|
Azure Blob | Nein | Zugriffsschlüssel oder SAS-Token | Keine Rollenzuweisungen erforderlich |
Azure Blob | Ja | Passthrough der Benutzeridentität* | Die Benutzeridentität sollte über geeignete Rollenzuweisungen im Azure Blob Storage-Konto verfügen. |
ADLS Gen2 (Azure Data Lake Storage) | Nein | Dienstprinzipal | Der Dienstprinzipal sollte über geeignete Rollenzuweisungen im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügen. |
ADLS Gen2 (Azure Data Lake Storage) | Ja | Passthrough der Benutzeridentität | Die Benutzeridentität sollte über geeignete Rollenzuweisungen im ADLS Gen2-Speicherkonto (Azure Data Lake Storage) verfügen. |
* Der Passthrough der Benutzeridentitäten funktioniert nur für Datenspeicher ohne Anmeldeinformationen, die auf Azure Blob Storage-Konten verweisen, wenn vorläufiges Löschen nicht aktiviert ist.
Zugreifen auf Daten in der Standarddateifreigabe
Die Standard-Dateifreigabe wird sowohl in serverlose Spark Compute- als auch in angeschlossene Synapse Spark-Pools eingebunden.
In Azure Machine Learning Studio werden Dateien in der Standarddateifreigabe in der Verzeichnisstruktur auf der Registerkarte Dateien angezeigt. Notebookcode kann mit dem file://
-Protokoll zusammen mit dem absoluten Pfad der Datei und ohne weitere Konfigurationen direkt auf Dateien zugreifen, die in dieser Dateifreigabe gespeichert sind. Dieser Codeausschnitt zeigt, wie Sie auf eine Datei zugreifen, die in der Standarddateifreigabe gespeichert ist:
import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
inputCols=["Age"],
outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")
Hinweis
Dieses Python-Codebeispiel verwendet pyspark.pandas
. Dies wird nur von der Spark-Laufzeitversion 3.2 oder höher unterstützt.