Limpieza y transformación de datos con grupos de Apache Spark (en desuso)
SE APLICA A: Azure ML del SDK de Python v1
Advertencia
La integración de Azure Synapse Analytics con Azure Machine Learning, disponible en el SDK de Python v1, está en desuso. Los usuarios todavía pueden usar el área de trabajo de Synapse, registrada con Azure Machine Learning como servicio vinculado. Sin embargo, ya no se puede registrar una nueva área de trabajo de Synapse con Azure Machine Learning como servicio vinculado. Se recomienda usar el proceso de Spark sin servidor y los grupos de Synapse Spark conectados, disponibles en la CLI v2 y el SDK de Python v2. Para más información, visite https://aka.ms/aml-spark.
En este artículo, aprenderá a realizar de manera interactiva tareas de limpieza y transformación de datos dentro de una sesión de Synapse dedicada, con tecnología de Azure Synapse Analytics, en un cuaderno de Jupyter. Estas tareas se basan en el SDK de Azure Machine Learning para Python. Para obtener más información sobre Azure Machine Learning, visite Uso de Apache Spark (con tecnología de Azure Synapse Analytics) en la canalización de aprendizaje automático (versión preliminar). Para obtener más información sobre cómo usar Azure Synapse Analytics con un área de trabajo de Synapse, visite la documentación de introducción Azure Synapse Analytics.
Integración de Azure Machine Learning y Azure Synapse Analytics
Con la integración de Azure Synapse Analytics en Azure Machine Learning (versión preliminar) se puede conectar un grupo de Apache Spark respaldado por Azure Synapse para la exploración y preparación interactivas de datos. Con esta integración, puede tener un recurso de proceso dedicado para la limpieza a escala de datos, todo ello dentro del mismo cuaderno de Python que se usa para entrenar los modelos de aprendizaje automático.
Requisitos previos
Configure su entorno de desarrollo para instalar el SDK de Azure Machine Learning o use una instancia de proceso de Azure Machine Learning con el SDK ya instalado.
Instale el SDK de Azure Machine Learning para Python.
Creación de un grupo de Apache Spark mediante Azure Portal, herramientas web o Synapse Studio
Instale el paquete
azureml-synapse
(versión preliminar) con este código:pip install azureml-synapse
Vincule el área de trabajo de Azure Machine Learning y el de Azure Synapse Analytics mediante el SDK de Python de Azure Machine Learning o con el Estudio de Azure Machine Learning
Asocie un grupo de Spark de Synapse como destino de proceso
Inicio del grupo de Spark de Synapse para tareas de limpieza y transformación de datos
Para comenzar la preparación de datos con el grupo de Apache Spark, especifique el nombre del proceso de Apache Spark asociado. Este nombre se puede encontrar a través de Estudio de Azure Machine Learning en la pestaña Procesos asociados.
Importante
Para seguir usando el grupo de Apache Spark, debe indicar qué recurso de proceso se va a utilizar en las tareas de limpieza y transformación de datos. Use %synapse
para líneas de código únicas y %%synapse
para varias líneas:
%synapse start -c SynapseSparkPoolAlias
Una vez iniciada la sesión, puede comprobar sus metadatos:
%synapse meta
Puede especificar un entorno de Azure Machine Learning para usarlo durante la sesión de Apache Spark. Solo se aplicarán las dependencias de Conda especificadas en el entorno. No se admiten las imágenes acopladas.
Advertencia
Los grupos de Apache Spark no admiten las dependencias de Python especificadas en las dependencias de Conda del entorno. Actualmente, solo se admiten versiones fijas de Python: incluya sys.version_info
en el script para comprobar la versión de Python
Este código crea la myenv
variable de entorno para instalar la versión 1.20.0 de azureml-core
y la versión 1.17.0 de numpy
antes de que se inicie la sesión. Después puede incluir este entorno en la instrucción start
de la sesión de Apache Spark.
from azureml.core import Workspace, Environment
# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)
Para comenzar la preparación de datos con el grupo de Apache Spark en su entorno personalizado, especifique tanto el nombre del grupo de Apache Spark como qué entorno utilizar durante la sesión de Apache Spark. Puede proporcionar el identificador de la suscripción, el grupo de recursos del área de trabajo de aprendizaje automático y el nombre del área de trabajo de aprendizaje automático.
Importante
Asegúrese de habilitar la opción Permitir paquetes de nivel de sesión en el área de trabajo de Synapse vinculada.
%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName
Carga de datos desde el almacenamiento
Después de iniciar la sesión de Apache Spark, lea los datos que quiera preparar. La carga de datos es compatible con Azure Blob Storage y Azure Data Lake Storage Generation 1 y Generation 2.
Tiene dos opciones para cargar los datos de estos servicios de almacenamiento:
Carga directa de los datos desde el almacenamiento mediante su ruta de acceso del sistema de archivos distribuido de Hadoop (HDFS)
Lectura de los datos de un conjunto de datos de Azure Machine Learning existente
Para acceder a estos servicios de almacenamiento, necesita permisos de lector de datos de Storage Blob. Para escribir datos de nuevo en estos servicios de almacenamiento, necesitará permisos de colaborador de datos de Storage Blob. Obtenga más información sobre los permisos y roles de almacenamiento.
Carga de datos con la ruta de acceso del sistema de archivos distribuido de Hadoop (HDFS)
Para cargar y leer datos de almacenamiento de la ruta de acceso de HDFS correspondiente, necesita las credenciales de autenticación de acceso a datos. Estas credenciales varían según el tipo de almacenamiento. Este ejemplo de código muestra cómo se leen datos de Azure Blob Storage en una trama de datos de Spark con la clave de acceso o el token de firma de acceso compartido (SAS):
%%synapse
# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")
# read from blob
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")
Este ejemplo de código muestra cómo se leen datos de Azure Data Lake Storage Generation 1 (ADLS Gen1) con las credenciales de la entidad de servicio:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")
Este ejemplo de código muestra cómo se leen datos de Azure Data Lake Storage Generation 2 (ADLS Gen2) con las credenciales de la entidad de servicio:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")
Lectura de datos de conjuntos de datos registrados
También puede colocar un conjunto de datos registrado existente en el área de trabajo y realizar la preparación de datos convirtiéndolo en una trama de datos de Spark. Este ejemplo autentifica el área de trabajo, obtiene un objeto TabularDataset registrado, blob_dset
, que hace referencia a los archivos del almacenamiento de blobs y convierte TabularDataset en una trama de datos de Spark. Al convertir los conjuntos de datos en tramas de datos de Spark, puede usar las bibliotecas de preparación y exploración de datos pyspark
.
%%synapse
from azureml.core import Workspace, Dataset
subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"
ws = Workspace(workspace_name = workspace_name,
subscription_id = subscription_id,
resource_group = resource_group)
dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()
Realización de tareas de limpieza y transformación de datos
Después de recuperar y explorar los datos, puede llevar a cabo las tareas de limpieza y transformación de datos. Este ejemplo de código amplía el ejemplo de HDFS de la sección anterior. En función de la columna Sobreviviente, filtra los datos de la trama de datos de Spark df
y agrupa los grupos por Año:
%%synapse
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)
df.show()
Almacenamiento de datos y detención de la sesión de Spark
Una vez completada la exploración y la preparación de los datos, almacene los datos preparados para poder usarlos más adelante en su cuenta de almacenamiento de Azure. En este ejemplo de código, los datos preparados se escriben en Azure Blob Storage sobrescribiendo el archivo Titanic.csv
original del directorio training_data
. Para volver a escribir en el almacenamiento, necesita permisos de colaborador de datos de Blob Storage. Para más información, visite Asignación de un rol de Azure para el acceso a datos de blob.
%% synapse
df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")
Después de completar la preparación de los datos y guardar los datos preparados en el almacenamiento, finalice el uso del grupo de Apache Spark con este comando:
%synapse stop
Crear un conjunto de datos para representar los datos preparados
Cuando esté listo para usar los datos preparados para el entrenamiento de modelos, conéctese al almacenamiento con un almacén de datos de Azure Machine Learning y especifique el archivo o los archivos que desea usar con un conjunto de datos de Azure Machine Learning.
Este ejemplo de código
- En él se supone que ya ha creado un almacén de datos que se conecta al servicio de almacenamiento en el que guardó los datos preparados
- Recupera el almacén de datos existente,
mydatastore
, del área de trabajo,ws
, con el método get(). - Crea un objeto FileDataset,
train_ds
, que hace referencia a los archivos de datos preparados ubicados en el directoriomydatastore
training_data
- Crea una variable
input1
. Después, esta variable se puede usar para que los archivos de datos del conjunto de datostrain_ds
estén disponibles para un destino de proceso correspondiente a las tareas de entrenamiento.
from azureml.core import Datastore, Dataset
datastore = Datastore.get(ws, datastore_name='mydatastore')
datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()
Uso de ScriptRunConfig
para enviar una ejecución de experimento a un grupo de Synapse Spark
Si está listo para automatizar y enviar a producción las tareas de limpieza y transformación de datos, puede enviar una ejecución de experimento a un grupo de Spark de Synapse asociado con el objeto ScriptRunConfig. De forma similar, si tiene una canalización de Azure Machine Learning, puede usar SynapseSparkStep para especificar el grupo de Spark de Synapse como destino de proceso para el paso de preparación de datos de la canalización. La disponibilidad de los datos para el grupo de Spark de Synapse depende del tipo del conjunto de datos.
- En el caso de FileDataset, puede usar el método
as_hdfs()
. Cuando se envía la ejecución, el conjunto de datos está disponible para el grupo de Spark de Synapse como un sistema de archivos distribuido de Hadoop (HFDS) - Para TabularDataset, puede usar el método
as_named_input()
El siguiente código de ejemplo
- Crea la variable
input2
a partir del mismo objeto FileDatasettrain_ds
que se creó en el ejemplo de código anterior - Cree
output
variable con la claseHDFSOutputDatasetConfiguration
. Una vez finalizada la ejecución, esta clase nos permite guardar la salida de la ejecución como el conjunto de datostest
en el almacén de datosmydatastore
. En el área de trabajo Azure Machine Learning, el conjunto de datostest
está registrado con el nombreregistered_dataset
- Configura los valores que debe usar la ejecución para llevarse a cabo en el grupo de Spark de Synapse
- Define los parámetros ScriptRunConfig para hacer lo siguiente
- Usar el script
dataprep.py
para la ejecución - Especificar los datos debe usar como entrada y cómo hacer que estén disponibles para el grupo de Spark de Synapse
- Especificar dónde se almacenarán los datos de salida
output
- Usar el script
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig
from azureml.core import Experiment
input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")
run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name
run_config.spark.configuration["spark.driver.memory"] = "1g"
run_config.spark.configuration["spark.driver.cores"] = 2
run_config.spark.configuration["spark.executor.memory"] = "1g"
run_config.spark.configuration["spark.executor.cores"] = 1
run_config.spark.configuration["spark.executor.instances"] = 1
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")
run_config.environment.python.conda_dependencies = conda_dep
script_run_config = ScriptRunConfig(source_directory = './code',
script= 'dataprep.py',
arguments = ["--file_input", input2,
"--output_dir", output],
run_config = run_config)
Para obtener más información sobre run_config.spark.configuration
y la configuración general de Spark, visite Clase SparkConfiguration y la documentación de configuración de Apache Spark.
Una vez haya configurado el objeto ScriptRunConfig
, puede enviar la ejecución.
from azureml.core import Experiment
exp = Experiment(workspace=ws, name="synapse-spark")
run = exp.submit(config=script_run_config)
run
Para obtener más información, incluida la información sobre el script de dataprep.py
usado en este ejemplo, consulte el cuaderno de ejemplo.
Después de que haya preparado los datos, puede usarlos como entrada para los trabajos de entrenamiento. En el ejemplo de código anterior, registered_dataset
es lo que se especificaría como datos de entrada para los trabajos de entrenamiento.
Cuadernos de ejemplo
Revise estos cuadernos de ejemplo para ver más conceptos y demostraciones de las funcionalidades de integración de Azure Synapse Analytics y Azure Machine Learning:
- Ejecute una sesión interactiva de Spark desde un cuaderno en el área de trabajo de Azure Machine Learning.
- Envíe una ejecución de experimento de Azure Machine Learning con un grupo de Spark de Synapse como destino de proceso.