Sdílet prostřednictvím


Interaktivní transformace dat pomocí Apache Sparku ve službě Azure Machine Learning

Transformace dat se stává jedním z nejdůležitějších aspektů projektů strojového učení. Integrace služby Azure Machine Learning se službou Azure Synapse Analytics poskytuje přístup k fondu Apache Spark založenému na Azure Synapse pro interaktivní transformace dat využívající poznámkové bloky Azure Machine Learning.

V tomto článku se dozvíte, jak zpracovávat transformace dat pomocí

  • Výpočetní prostředí Spark bez serveru
  • Připojený fond Synapse Spark

Požadavky

Než začnete s úkoly transformace dat, přečtěte si o procesu ukládání tajných kódů.

  • Přístupový klíč účtu služby Azure Blob Storage
  • Token sdíleného přístupového podpisu (SAS)
  • Informace o instančním objektu služby Azure Data Lake Storage (ADLS) Gen2

ve službě Azure Key Vault. Potřebujete také vědět, jak zpracovávat přiřazení rolí v účtech úložiště Azure. Následující části tohoto dokumentu popisují tyto koncepty. Pak prozkoumáme podrobnosti o interaktivní transformaci dat pomocí fondů Sparku v poznámkových blocích Azure Machine Learning.

Tip

Další informace o konfiguraci přiřazení role účtu úložiště Azure nebo pokud přistupujete k datům v účtech úložiště pomocí předávání identity uživatele, najdete další informace v tématu Přidání přiřazení rolí v účtech úložiště Azure.

Transformace interaktivních dat pomocí Apache Sparku

Pro interaktivní transformaci dat pomocí Apache Sparku v poznámkových blocích Azure Machine Learning nabízí Azure Machine Learning bezserverové výpočetní prostředí Spark a připojený fond Synapse Spark. Výpočetní prostředí Spark bez serveru nevyžaduje vytvoření prostředků v pracovním prostoru Azure Synapse. Místo toho se plně spravovaný bezserverový výpočetní výkon Sparku zpřístupní přímo v poznámkových blocích Azure Machine Learning. Použití bezserverového výpočetního prostředí Spark je nejjednodušší způsob přístupu ke clusteru Spark ve službě Azure Machine Learning.

Výpočetní prostředí Spark bez serveru v poznámkových blocích Azure Machine Learning

Výpočetní prostředí Spark bez serveru je ve výchozím nastavení dostupné v poznámkových blocích Azure Machine Learning. Pokud k němu chcete získat přístup v poznámkovém bloku, v nabídce pro výběr výpočetních prostředků vyberte bezserverové výpočetní prostředí Spark bezserverové služby Azure Machine Learning.

Uživatelské rozhraní Poznámkové bloky také poskytuje možnosti konfigurace relace Sparku pro výpočetní prostředí Spark bez serveru. Konfigurace relace Sparku:

  1. V horní části obrazovky vyberte Konfigurovat relaci .
  2. V rozevírací nabídce vyberte verzi Apache Sparku.

    Důležité

    Azure Synapse Runtime pro Apache Spark: Oznámení

    • Azure Synapse Runtime pro Apache Spark 3.2:
      • Datum oznámení EOLA: 8. července 2023
      • Datum ukončení podpory: 8. července 2024. Po tomto datu bude modul runtime zakázán.
    • Apache Spark 3.3:
      • Datum oznámení EOLA: 12. července 2024
      • Datum ukončení podpory: 31. března 2025. Po tomto datu bude modul runtime zakázán.
    • Pokud chcete pokračovat v podpoře a optimálním výkonu, doporučujeme migrovat na Apache Spark 3.4.
  3. V rozevírací nabídce vyberte Typ instance. V současné době se podporují tyto typy:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Zadejte hodnotu časového limitu relace Sparku v minutách.
  5. Vyberte, zda chcete dynamicky přidělovat exekutory.
  6. Vyberte počet exekutorů pro relaci Sparku.
  7. V rozevírací nabídce vyberte velikost exekutoru.
  8. V rozevírací nabídce vyberte velikost ovladače.
  9. Pokud chcete ke konfiguraci relace Sparku použít soubor Conda, zaškrtněte políčko Nahrát soubor conda. Pak vyberte Procházet a zvolte soubor Conda s požadovanou konfigurací relace Sparku.
  10. Přidejte vlastnosti nastavení konfigurace, vstupní hodnoty do textových polí Vlastnost a Hodnota a vyberte Přidat.
  11. Vyberte Použít.
  12. V místní nabídce Konfigurovat novou relaci? vyberte Zastavit relaci.

Změny konfigurace relace se zachovají a budou k dispozici pro jinou relaci poznámkového bloku, která se spouští s bezserverovým výpočetním prostředím Spark.

Tip

Pokud používáte balíčky Conda na úrovni relace, můžete zkrátit dobu spuštění relace Sparku, pokud nastavíte konfigurační proměnnou spark.hadoop.aml.enable_cache na hodnotu true. Při prvním spuštění relace s balíčky Conda na úrovni relace obvykle trvá 10 až 15 minut. Následující relace však začíná proměnnou konfigurace nastavenou na true obvykle trvá tři až pět minut.

Import a uspořádání dat z Azure Data Lake Storage (ADLS) Gen2

K datům uloženým ve službě Azure Data Lake Storage (ADLS) Gen2 můžete přistupovat a měnit jejich uspořádání pomocí abfss:// identifikátorů URI dat. Chcete-li to provést, musíte postupovat podle jednoho ze dvou mechanismů přístupu k datům:

  • Předávání identity uživatele
  • Přístup k datům založeným na instančním objektu

Tip

Transformace dat s výpočetním prostředím Spark bez serveru a předáním identity uživatele pro přístup k datům v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 vyžaduje nejmenší počet kroků konfigurace.

Zahájení interaktivní transformace dat s předáváním identity uživatele:

  • Ověřte, že identita uživatele má v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 přiřazení role Přispěvatel a Přispěvatel dat objektů blob služby Storage.

  • Pokud chcete použít bezserverové výpočetní prostředky Sparku, vyberte v nabídce pro výběr výpočetních prostředků bezserverový Spark bezserverový spark bez serveru Azure Machine Learning.

  • Pokud chcete použít připojený fond Synapse Spark, vyberte připojený fond Synapse Spark v části Fondy Synapse Sparku z nabídky Výběr výpočetních prostředků .

  • Tento vzorový kód transformace dat Titanic ukazuje použití identifikátoru URI dat ve formátu abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> s pyspark.pandas a pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Poznámka:

    Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Uspořádání dat prostřednictvím instančního objektu:

  1. Ověřte, že instanční objekt má v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 přiřazené role Přispěvatel dat přispěvatele a Přispěvatel dat objektů blob služby Storage.

  2. Vytvořte tajné kódy služby Azure Key Vault pro ID tenanta instančního objektu, ID klienta a hodnoty tajných kódů klienta.

  3. V nabídce Pro výběr výpočetních prostředků v části Bezserverová sparková služba Azure Machine Learning vyberte bezserverové výpočetní prostředí Spark bez serveru. Můžete také vybrat připojený fond Synapse Spark v části Fondy Synapse Spark z nabídky Výběr výpočetních prostředků .

  4. V konfiguraci nastavte ID tenanta instančního objektu, ID klienta a tajné klíče klienta a spusťte následující ukázku kódu.

    • Volání get_secret() v kódu závisí na názvu služby Azure Key Vault a na názvech tajných kódů služby Azure Key Vault vytvořených pro ID tenanta instančního objektu, ID klienta a tajný klíč klienta. Nastavte v konfiguraci tyto odpovídající názvy a hodnoty vlastností:

      • Vlastnost ID klienta: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Vlastnost tajného klíče klienta: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Vlastnost ID tenanta: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Hodnota ID tenanta: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Pomocí dat Titanic naimportujte a přeuspořádejte data pomocí identifikátoru URI dat ve abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> formátu, jak je znázorněno v ukázce kódu.

Import a uspořádání dat ze služby Azure Blob Storage

K datům azure Blob Storage můžete přistupovat buď pomocí přístupového klíče účtu úložiště, nebo pomocí tokenu sdíleného přístupového podpisu (SAS). Tyto přihlašovací údaje byste měli uložit ve službě Azure Key Vault jako tajný klíč a nastavit je jako vlastnosti v konfiguraci relace.

Zahájení interaktivní transformace dat:

  1. Na studio Azure Machine Learning levém panelu vyberte Poznámkové bloky.

  2. V nabídce Pro výběr výpočetních prostředků v části Bezserverová sparková služba Azure Machine Learning vyberte bezserverové výpočetní prostředí Spark bez serveru. Můžete také vybrat připojený fond Synapse Spark v části Fondy Synapse Spark z nabídky Výběr výpočetních prostředků .

  3. Konfigurace přístupového klíče účtu úložiště nebo tokenu sdíleného přístupového podpisu (SAS) pro přístup k datům v poznámkových blocích Azure Machine Learning:

    • Pro přístupový klíč nastavte fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net vlastnost, jak je znázorněno v tomto fragmentu kódu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Pro token SAS nastavte fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net vlastnost, jak je znázorněno v tomto fragmentu kódu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Poznámka:

      Volání get_secret() v předchozích fragmentech kódu vyžadují název služby Azure Key Vault a názvy tajných kódů vytvořených pro přístupový klíč nebo token SAS účtu služby Azure Blob Storage.

  4. Spusťte kód transformace dat ve stejném poznámkovém bloku. Naformátujte identifikátor URI dat jako , podobně jako wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>tento fragment kódu:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Poznámka:

    Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Import a uspořádání dat z úložiště dat služby Azure Machine Learning

Pokud chcete získat přístup k datům z úložiště dat Azure Machine Learning, definujte cestu k datům v úložišti dat pomocí formátu azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>identifikátoru URI. Interaktivní uspořádání dat z úložiště dat služby Azure Machine Learning v relaci Poznámkové bloky:

  1. V nabídce pro výběr výpočetních prostředků vyberte výpočetní prostředí Spark bez serveru Bezserverové služby Azure Machine Learning nebo v nabídce pro výběr výpočetních prostředků vyberte připojený fond Synapse Spark ve fondech Synapse Spark.

  2. Tento vzorový kód ukazuje, jak číst a měnit data Titanic z úložiště dat služby Azure Machine Learning pomocí azureml:// identifikátoru URI pyspark.pandasúložiště dat a pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Poznámka:

    Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Úložiště dat služby Azure Machine Learning mají přístup k datům pomocí přihlašovacích údajů účtu úložiště Azure.

  • přístupový klíč
  • Token SAS
  • instanční objekt

nebo používají přístup k datům bez přihlašovacích údajů. V závislosti na typu úložiště dat a základním typu účtu úložiště Azure vyberte příslušný ověřovací mechanismus, který zajistí přístup k datům. Tato tabulka shrnuje mechanismy ověřování pro přístup k datům v úložištích dat Azure Machine Learning:

Storage account type Přístup k datům bez přihlašovacích údajů Mechanismus přístupu k datům Přiřazení rolí
Azure Blob No Přístupový klíč nebo token SAS Nejsou potřeba žádná přiřazení rolí.
Azure Blob Ano Předávání identity uživatele* Identita uživatele by měla mít v účtu služby Azure Blob Storage odpovídající přiřazení rolí.
Azure Data Lake Storage (ADLS) Gen 2 No Instanční objekt Instanční objekt by měl mít v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 odpovídající přiřazení rolí.
Azure Data Lake Storage (ADLS) Gen 2 Ano Předávání identity uživatele Identita uživatele by měla mít v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 odpovídající přiřazení rolí.

* Předávání identity uživatele funguje pro úložiště dat bez přihlašovacích údajů, které odkazují na účty úložiště objektů blob v Azure, pouze pokud není povolené obnovitelné odstranění .

Přístup k datům ve výchozí sdílené složce

Výchozí sdílená složka se připojí k výpočetním prostředkům Sparku bez serveru i k připojeným fondům Synapse Spark.

Snímek obrazovky znázorňující použití sdílené složky

V studio Azure Machine Learning se soubory ve výchozí sdílené složce zobrazují ve stromu adresářů na kartě Soubory. Kód poznámkového bloku může přímo přistupovat k souborům uloženým v této sdílené složce pomocí file:// protokolu spolu s absolutní cestou k souboru bez dalších konfigurací. Tento fragment kódu ukazuje, jak získat přístup k souboru uloženému ve výchozí sdílené složce:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Poznámka:

Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Další kroky