Compartir a través de


Limpieza y transformación de datos con grupos de Apache Spark (en desuso)

SE APLICA A: Azure ML del SDK de Python v1

Advertencia

La integración de Azure Synapse Analytics con Azure Machine Learning, disponible en el SDK de Python v1, está en desuso. Los usuarios todavía pueden usar el área de trabajo de Synapse, registrada con Azure Machine Learning como servicio vinculado. Sin embargo, ya no se puede registrar una nueva área de trabajo de Synapse con Azure Machine Learning como servicio vinculado. Se recomienda usar el proceso de Spark sin servidor y los grupos de Synapse Spark conectados, disponibles en la CLI v2 y el SDK de Python v2. Para más información, visite https://aka.ms/aml-spark.

En este artículo, aprenderá a realizar de manera interactiva tareas de limpieza y transformación de datos dentro de una sesión de Synapse dedicada, con tecnología de Azure Synapse Analytics, en un cuaderno de Jupyter. Estas tareas se basan en el SDK de Azure Machine Learning para Python. Para obtener más información sobre Azure Machine Learning, visite Uso de Apache Spark (con tecnología de Azure Synapse Analytics) en la canalización de aprendizaje automático (versión preliminar). Para obtener más información sobre cómo usar Azure Synapse Analytics con un área de trabajo de Synapse, visite la documentación de introducción Azure Synapse Analytics.

Integración de Azure Machine Learning y Azure Synapse Analytics

Con la integración de Azure Synapse Analytics en Azure Machine Learning (versión preliminar) se puede conectar un grupo de Apache Spark respaldado por Azure Synapse para la exploración y preparación interactivas de datos. Con esta integración, puede tener un recurso de proceso dedicado para la limpieza a escala de datos, todo ello dentro del mismo cuaderno de Python que se usa para entrenar los modelos de aprendizaje automático.

Requisitos previos

Inicio del grupo de Spark de Synapse para tareas de limpieza y transformación de datos

Para comenzar la preparación de datos con el grupo de Apache Spark, especifique el nombre del proceso de Apache Spark asociado. Este nombre se puede encontrar a través de Estudio de Azure Machine Learning en la pestaña Procesos asociados.

obtener el nombre de proceso asociado

Importante

Para seguir usando el grupo de Apache Spark, debe indicar qué recurso de proceso se va a utilizar en las tareas de limpieza y transformación de datos. Use %synapse para líneas de código únicas y %%synapse para varias líneas:

%synapse start -c SynapseSparkPoolAlias

Una vez iniciada la sesión, puede comprobar sus metadatos:

%synapse meta

Puede especificar un entorno de Azure Machine Learning para usarlo durante la sesión de Apache Spark. Solo se aplicarán las dependencias de Conda especificadas en el entorno. No se admiten las imágenes acopladas.

Advertencia

Los grupos de Apache Spark no admiten las dependencias de Python especificadas en las dependencias de Conda del entorno. Actualmente, solo se admiten versiones fijas de Python: incluya sys.version_info en el script para comprobar la versión de Python

Este código crea la myenvvariable de entorno para instalar la versión 1.20.0 de azureml-core y la versión 1.17.0 de numpy antes de que se inicie la sesión. Después puede incluir este entorno en la instrucción start de la sesión de Apache Spark.


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

Para comenzar la preparación de datos con el grupo de Apache Spark en su entorno personalizado, especifique tanto el nombre del grupo de Apache Spark como qué entorno utilizar durante la sesión de Apache Spark. Puede proporcionar el identificador de la suscripción, el grupo de recursos del área de trabajo de aprendizaje automático y el nombre del área de trabajo de aprendizaje automático.

Importante

Asegúrese de habilitar la opción Permitir paquetes de nivel de sesión en el área de trabajo de Synapse vinculada.

Habilitación de paquetes de nivel de sesión

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

Carga de datos desde el almacenamiento

Después de iniciar la sesión de Apache Spark, lea los datos que quiera preparar. La carga de datos es compatible con Azure Blob Storage y Azure Data Lake Storage Generation 1 y Generation 2.

Tiene dos opciones para cargar los datos de estos servicios de almacenamiento:

Para acceder a estos servicios de almacenamiento, necesita permisos de lector de datos de Storage Blob. Para escribir datos de nuevo en estos servicios de almacenamiento, necesitará permisos de colaborador de datos de Storage Blob. Obtenga más información sobre los permisos y roles de almacenamiento.

Carga de datos con la ruta de acceso del sistema de archivos distribuido de Hadoop (HDFS)

Para cargar y leer datos de almacenamiento de la ruta de acceso de HDFS correspondiente, necesita las credenciales de autenticación de acceso a datos. Estas credenciales varían según el tipo de almacenamiento. Este ejemplo de código muestra cómo se leen datos de Azure Blob Storage en una trama de datos de Spark con la clave de acceso o el token de firma de acceso compartido (SAS):

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

Este ejemplo de código muestra cómo se leen datos de Azure Data Lake Storage Generation 1 (ADLS Gen1) con las credenciales de la entidad de servicio:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

Este ejemplo de código muestra cómo se leen datos de Azure Data Lake Storage Generation 2 (ADLS Gen2) con las credenciales de la entidad de servicio:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

Lectura de datos de conjuntos de datos registrados

También puede colocar un conjunto de datos registrado existente en el área de trabajo y realizar la preparación de datos convirtiéndolo en una trama de datos de Spark. Este ejemplo autentifica el área de trabajo, obtiene un objeto TabularDataset registrado, blob_dset, que hace referencia a los archivos del almacenamiento de blobs y convierte TabularDataset en una trama de datos de Spark. Al convertir los conjuntos de datos en tramas de datos de Spark, puede usar las bibliotecas de preparación y exploración de datos pyspark.

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

Realización de tareas de limpieza y transformación de datos

Después de recuperar y explorar los datos, puede llevar a cabo las tareas de limpieza y transformación de datos. Este ejemplo de código amplía el ejemplo de HDFS de la sección anterior. En función de la columna Sobreviviente, filtra los datos de la trama de datos de Spark df y agrupa los grupos por Año:

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

Almacenamiento de datos y detención de la sesión de Spark

Una vez completada la exploración y la preparación de los datos, almacene los datos preparados para poder usarlos más adelante en su cuenta de almacenamiento de Azure. En este ejemplo de código, los datos preparados se escriben en Azure Blob Storage sobrescribiendo el archivo Titanic.csv original del directorio training_data. Para volver a escribir en el almacenamiento, necesita permisos de colaborador de datos de Blob Storage. Para más información, visite Asignación de un rol de Azure para el acceso a datos de blob.

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

Después de completar la preparación de los datos y guardar los datos preparados en el almacenamiento, finalice el uso del grupo de Apache Spark con este comando:

%synapse stop

Crear un conjunto de datos para representar los datos preparados

Cuando esté listo para usar los datos preparados para el entrenamiento de modelos, conéctese al almacenamiento con un almacén de datos de Azure Machine Learning y especifique el archivo o los archivos que desea usar con un conjunto de datos de Azure Machine Learning.

Este ejemplo de código

  • En él se supone que ya ha creado un almacén de datos que se conecta al servicio de almacenamiento en el que guardó los datos preparados
  • Recupera el almacén de datos existente, mydatastore, del área de trabajo, ws, con el método get().
  • Crea un objeto FileDataset, train_ds, que hace referencia a los archivos de datos preparados ubicados en el directorio mydatastore training_data
  • Crea una variable input1. Después, esta variable se puede usar para que los archivos de datos del conjunto de datos train_ds estén disponibles para un destino de proceso correspondiente a las tareas de entrenamiento.
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

Uso de ScriptRunConfig para enviar una ejecución de experimento a un grupo de Synapse Spark

Si está listo para automatizar y enviar a producción las tareas de limpieza y transformación de datos, puede enviar una ejecución de experimento a un grupo de Spark de Synapse asociado con el objeto ScriptRunConfig. De forma similar, si tiene una canalización de Azure Machine Learning, puede usar SynapseSparkStep para especificar el grupo de Spark de Synapse como destino de proceso para el paso de preparación de datos de la canalización. La disponibilidad de los datos para el grupo de Spark de Synapse depende del tipo del conjunto de datos.

  • En el caso de FileDataset, puede usar el método as_hdfs(). Cuando se envía la ejecución, el conjunto de datos está disponible para el grupo de Spark de Synapse como un sistema de archivos distribuido de Hadoop (HFDS)
  • Para TabularDataset, puede usar el método as_named_input()

El siguiente código de ejemplo

  • Crea la variable input2 a partir del mismo objeto FileDataset train_ds que se creó en el ejemplo de código anterior
  • Cree output variable con la clase HDFSOutputDatasetConfiguration. Una vez finalizada la ejecución, esta clase nos permite guardar la salida de la ejecución como el conjunto de datos test en el almacén de datos mydatastore. En el área de trabajo Azure Machine Learning, el conjunto de datos test está registrado con el nombre registered_dataset
  • Configura los valores que debe usar la ejecución para llevarse a cabo en el grupo de Spark de Synapse
  • Define los parámetros ScriptRunConfig para hacer lo siguiente
    • Usar el script dataprep.py para la ejecución
    • Especificar los datos debe usar como entrada y cómo hacer que estén disponibles para el grupo de Spark de Synapse
    • Especificar dónde se almacenarán los datos de salida output
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

Para obtener más información sobre run_config.spark.configuration y la configuración general de Spark, visite Clase SparkConfiguration y la documentación de configuración de Apache Spark.

Una vez haya configurado el objeto ScriptRunConfig, puede enviar la ejecución.

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

Para obtener más información, incluida la información sobre el script de dataprep.py usado en este ejemplo, consulte el cuaderno de ejemplo.

Después de que haya preparado los datos, puede usarlos como entrada para los trabajos de entrenamiento. En el ejemplo de código anterior, registered_dataset es lo que se especificaría como datos de entrada para los trabajos de entrenamiento.

Cuadernos de ejemplo

Revise estos cuadernos de ejemplo para ver más conceptos y demostraciones de las funcionalidades de integración de Azure Synapse Analytics y Azure Machine Learning:

Pasos siguientes