Ejercicio: Integración de un cuaderno en canalizaciones de Azure Synapse

Completado

En esta unidad, creará un cuaderno de Spark de Azure Synapse para analizar y transformar los datos cargados por un flujo de datos de asignación y los almacenará en un lago de datos. Creará una celda de parámetro que acepte un parámetro de cadena que defina el nombre de la carpeta para los datos que el cuaderno escribe en el lago de datos.

Después, agregará este cuaderno a una canalización de Synapse y pasará el identificador de ejecución de canalización único al parámetro de cuaderno para que posteriormente pueda poner en correlación la ejecución de la canalización con los datos guardados por la actividad del cuaderno.

Por último, usará el centro de supervisión de Synapse Studio para supervisar la ejecución de la canalización, obtener el identificador de ejecución y, después, buscar los archivos correspondientes almacenados en el lago de datos.

Acerca de Apache Spark y los cuadernos

Apache Spark es una plataforma de procesamiento paralelo que admite el procesamiento en memoria para mejorar el rendimiento de aplicaciones de análisis de macrodatos. Apache Spark en Azure Synapse Analytics es una de las implementaciones de Microsoft de Apache Spark en la nube.

Un cuaderno de Apache Spark en Synapse Studio es una interfaz web para crear archivos que contienen código dinámico, visualizaciones y texto narrativo. Los cuadernos son un buen lugar para validar ideas y aplicar experimentos rápidos para sacar conclusiones a partir de los datos. Los cuadernos también se usan ampliamente en la preparación de datos, la visualización de datos, el aprendizaje automático y otros escenarios de macrodatos.

Creación de un cuaderno de Synapse Spark

Imagine que ha creado un flujo de datos de asignación en Synapse Analytics para procesar, combinar e importar datos de perfil de usuario. Ahora, quiere encontrar los cinco productos principales para cada usuario, en función de cuáles son los preferidos y los principales, y las compras principales en los últimos 12 meses. Después, quiere calcular los cinco productos principales en general.

En este paso, creará un cuaderno Spark de Synapse para realizar estos cálculos.

  1. Abra Synapse Analytics Studio (https://web.azuresynapse.net/) y vaya al centro Data (Datos).

    El elemento del menú Datos esta resaltado.

  2. Seleccione la pestaña Linked (Vinculado) (1) y expanda la cuenta de almacenamiento de lago de datos principal (2) debajo de Azure Data Lake Storage Gen2. Seleccione el contenedor wwi-02(3) y abra la carpeta top-products(4). Haga clic con el botón derecho en cualquier archivo de Parquet (5), seleccione el elemento de menú New notebook (Nuevo cuaderno) (6) y después Load to DataFrame (7) (Cargar en DataFrame). Si no ve la carpeta, seleccione Refresh.

    El archivo Parquet y la opción de nuevo cuaderno están resaltadas.

  3. Asegúrese de que el cuaderno está adjunto al grupo de Spark.

    El elemento de menú Asociar a grupo de Spark está resaltado.

  4. Reemplace el nombre del archivo de Parquet por *.parquet (1) para seleccionar todos los archivos de Parquet de la carpeta top-products. Por ejemplo, la ruta de acceso debe ser similar a: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    El nombre de archivo está resaltado.

  5. Seleccione Ejecutar todo en la barra de herramientas del cuaderno para ejecutarlo.

    Se muestran los resultados de la celda.

    Nota:

    La primera vez que se ejecuta un cuaderno en un grupo de Spark, Synapse crea una nueva sesión. Este proceso puede tardar aproximadamente entre 3 y 5 minutos.

    Nota:

    Para ejecutar solo la celda, mantenga el mouse sobre ella y seleccione el icono Run cell (Ejecutar celda) situado a la izquierda de la celda, o bien seleccione la celda y presione Ctrl+Entrar.

  6. Cree una celda debajo; para ello, seleccione el botón + y después el elemento Code cell (Celda de código). El botón + se encuentra debajo de la celda del cuaderno de la izquierda. Como alternativa, también puede expandir el menú + Cell (+ Celda) en la barra de herramientas del cuaderno y seleccionar el elemento Code cell.

    La opción de menú Agregar código está resaltada.

  7. Ejecute el comando siguiente en la nueva celda para rellenar una nueva trama de datos denominada topPurchases, crear una vista temporal denominada top_purchases y mostrar las primeras 100 filas:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    La salida debe tener una apariencia similar a la siguiente:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Ejecute el comando siguiente en una nueva celda para crear una vista temporal mediante SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Nota:

    No hay ninguna salida para esta consulta.

    La consulta usa la vista temporal top_purchases como origen y aplica un método row_number() over para aplicar un número de fila para los registros de cada usuario donde ItemsPurchasedLast12Months sea mayor. La cláusula where filtra los resultados para que solo se recuperen hasta cinco productos en los que tanto IsTopProduct como IsPreferredProduct estén establecidos en true. Esto proporciona los cinco productos más comprados para cada usuario en los que esos productos también se han identificado como sus favoritos, según su perfil de usuario almacenado en Azure Cosmos DB.

  9. Ejecute el comando siguiente en una nueva celda para crear y mostrar un nuevo elemento DataFrame que almacena los resultados de la vista temporal top_5_products que ha creado en la celda anterior:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Debería ver una salida similar a la siguiente, en la que se muestran los cinco productos principales preferidos por cada usuario:

    Los cinco productos preferidos se muestran por usuario.

  10. Calcule los cinco productos principales generales, en función de los que prefieran los clientes y que más hayan comprado. Para ello, ejecute el comando siguiente en una nueva celda:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    En esta celda, se agrupan los cinco productos principales preferidos por identificador de producto, se suma el total de artículos comprados en los últimos 12 meses, se ordena ese valor en orden descendente y se devuelven los cinco primeros resultados. La salida debe ser similar a la siguiente:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Creación de una celda de parámetros

Las canalizaciones de Azure Synapse buscan la celda de parámetros y la tratan como el valor predeterminado para los parámetros que se pasan en tiempo de ejecución. El motor de ejecución agregará una nueva celda debajo de la celda de parámetros con los parámetros de entrada que van a sobrescribir los valores predeterminados. Cuando no se designa ninguna celda de parámetros, la celda insertada se insertará en la parte superior del cuaderno.

  1. Ahora se ejecutará este cuaderno desde una canalización. El objetivo es pasar un parámetro que establece un valor variable runId que se usará para dar nombre al archivo de Parquet. Ejecute el comando siguiente en una nueva celda:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Se usa la biblioteca uuid incluida en Spark para generar un GUID aleatorio. La intención es reemplazar la variable runId con un parámetro pasado por la canalización. Para ello, es necesario alternar esto como una celda de parámetro.

  2. Seleccione los puntos suspensivos de acciones (...) en la esquina superior derecha de la celda (1) y, después, seleccione Toggle parameter cell (2) (Alternar celda de parámetros).

    El elemento de menú está resaltado.

    Después de alternar esta opción, verá la etiqueta Parameters (Parámetros) en la celda.

    La celda está configurada para aceptar parámetros.

  3. Pegue el código siguiente en una nueva celda para usar la variable runId como nombre de archivo de Parquet en la ruta de acceso /top5-products/ de la cuenta del lago de datos principal. Reemplace YOUR_DATALAKE_NAME en la ruta de acceso con el nombre de la cuenta del lago de datos principal. Para encontrarla, desplácese hasta la Celda 1 en la parte superior de la página (1). Copie la cuenta de almacenamiento de lago de datos de la ruta de acceso (2). Pegue este valor como reemplazo de YOUR_DATALAKE_NAME en la ruta (3) dentro de la nueva celda y, después, ejecute la celda.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    La ruta de acceso se actualiza con el nombre de la cuenta del lago de datos principal.

  4. Compruebe que el archivo se ha escrito en el lago de datos. Vaya al centro Data (Datos) y seleccione la pestaña Linked (Vinculado) (1). Expanda la cuenta de almacenamiento de lago de datos principal y seleccione el contenedor wwi-02(2). Vaya a la carpeta top5-products(3). Debería ver una carpeta para el archivo de Parquet en el directorio con un GUID como nombre de archivo (4).

    El archivo de Parquet está resaltado.

    El método de escritura de Parquet en la trama de datos de la celda del cuaderno ha creado este directorio porque no existía antes.

Adición del cuaderno a una canalización de Synapse

Volviendo al flujo de datos de asignación que se ha descrito al principio del ejercicio, imagine que quiere ejecutar este cuaderno después de que el flujo de datos se ejecute como parte del proceso de orquestación. Para ello, agregue este cuaderno a una canalización como una nueva actividad de cuaderno.

  1. Vuelva al cuaderno. Seleccione Propiedades (1) en la esquina superior derecha del cuaderno y, después, escriba Calculate Top 5 Products en Nombre (2).

    Se muestra la hoja de propiedades.

  2. Seleccione Add to pipeline (Agregar a la canalización) (1) en la esquina superior derecha del cuaderno y, después, seleccione Existing pipeline (2) (Canalización existente).

    El botón Agregar a canalización está resaltado.

  3. Seleccione la canalización Write User Profile Data to ASA (Escribir datos de perfil de usuario en ASA) (1) y después Agregar *(2).

    La canalización está seleccionada.

  4. Synapse Studio agrega la actividad Cuaderno a la canalización. Reorganice la actividad Cuaderno para ubicarla a la derecha de la actividad Flujo de datos. Seleccione la actividad Flujo de datos y arrastre un cuadro verde de conexión de canalización de actividad Success (Correcta) a la actividad Cuaderno.

    La flecha verde está resaltada.

    La flecha de actividad Success (Correcta) indica a la canalización que ejecute la actividad Cuaderno después de que la actividad Flujo de datos se ejecute correctamente.

  5. Seleccione la actividad Cuaderno (1), después la pestaña Settings (Configuración) (2), expanda Base parameters (3) (Parámetros base) y seleccione + New (4) (+ Nuevo). Escriba runId en el campo Nombre(5). Seleccione String (Cadena) para Type (6) (Tipo). En Value (Valor), seleccione Add dynamic content (7) (Agregar contenido).

    Se muestra la configuración.

  6. Seleccione Pipeline run ID (Identificador de ejecución de canalización) en System variables (1) (Variables del sistema). Esto agrega @pipeline().RunId al cuadro de contenido dinámico (2). Seleccione Finish (3) (Finalizar) para cerrar el cuadro de diálogo.

    Se muestra el formulario de contenido dinámico.

    El valor de identificador de ejecución de canalización es un GUID único asignado a cada ejecución de canalización. Se usará este valor para el nombre del archivo de Parquet, y se pasará como parámetro del cuaderno runId. Después, se puede examinar el historial de ejecución de la canalización y buscar el archivo de Parquet específico creado para cada ejecución de canalización.

  7. Seleccione Publish all (Publicar todo) y después Publish (Publicar) para guardar los cambios.

    Publicar todo está resaltado.

  8. Una vez que se ha completado la publicación, seleccione Add trigger (1) (Agregar desencadenador) y después Trigger now (2) (Desencadenar ahora) para ejecutar la canalización actualizada.

    El elemento de menú Desencadenar está resaltado.

  9. Seleccione OK (Aceptar) para ejecutar el desencadenador.

    El botón Aceptar está resaltado.

Supervisión de la ejecución de la canalización

El centro de supervisión permite supervisar las actividades actuales e históricas de SQL, Apache Spark y Pipelines.

  1. Vaya al centro de conectividad Supervisión.

    El elemento de menú Centro de supervisión está seleccionado.

  2. Seleccione Pipeline runs (1) (Ejecuciones de canalización) y espere a que la ejecución de la canalización se complete correctamente (2). Es posible que tenga que actualizar (3) la vista.

    La canalización se ha ejecutado correctamente.

  3. Seleccione el nombre de la canalización para ver sus ejecuciones de actividad.

    El nombre de la canalización está seleccionado.

  4. Observe tanto la actividad Flujo de datos como la nueva actividad Cuaderno(1). Anote el valor de Pipeline run ID (Identificador de ejecución de canalización) (2). Se comparará con el nombre de archivo de Parquet generado por el cuaderno. Seleccione el nombre del cuaderno Calculate Top 5 Products (Calcular los cinco productos principales) para ver sus detalles (3).

    Se muestran los detalles de ejecución de la canalización.

  5. Aquí se ven los detalles de ejecución del cuaderno. Puede seleccionar Reproducir (1) para ver una reproducción del progreso mediante los trabajos (2). En la parte inferior, puede ver Diagnósticos y Registros con diferentes opciones de filtro (3). A la derecha, se pueden ver los detalles de la ejecución, como la duración, el identificador de Livy, los detalles del grupo de Spark, entre otros. Seleccione el vínculo Ver detalles de un trabajo para ver sus detalles (5).

    Se muestran los detalles de ejecución.

  6. La interfaz de usuario de la aplicación Spark se abre en una nueva pestaña donde se pueden ver los detalles de la fase. Expanda la visualización de DAG para ver los detalles de la fase.

    Se muestran los detalles de la fase de Spark.

  7. Vuelva al centro Data (Datos).

    Centro de datos.

  8. Seleccione la pestaña Linked (Vinculado) (1), después seleccione el contenedor wwi-02(2) en la cuenta de almacenamiento de lago de datos principal, vaya a la carpeta top5-products(3) y compruebe que existe una carpeta para el archivo de Parquet cuyo nombre coincide con el identificador de ejecución de canalización.

    El archivo está resaltado.

    Como puede ver, hay un archivo cuyo nombre coincide con el identificador de ejecución de canalización que se ha anotado antes:

    El identificador de ejecución de canalización está resaltado.

    Estos valores coinciden porque se ha pasado el identificador de ejecución de canalización al parámetro runId en la actividad Cuaderno.