Esercizio: Integrare un notebook nelle pipeline di Azure Synapse

Completato

In questa unità si crea un notebook di Spark per Azure Synapse per analizzare e trasformare i dati caricati da un flusso di dati per mapping e archiviare i dati in un data lake. Si crea una cella di parametri che accetta un parametro di tipo stringa che definisce il nome della cartella per i dati che il notebook scrive nel data lake.

Si aggiunge quindi il notebook a una pipeline di Synapse e si passa l'ID univoco dell'esecuzione della pipeline al parametro del notebook, in modo che in un secondo momento sarà possibile correlare l'esecuzione della pipeline ai dati salvati dall'attività del notebook.

Si usa infine l'hub di monitoraggio in Synapse Studio per monitorare l'esecuzione della pipeline, ottenere l'ID esecuzione e quindi individuare i file corrispondenti archiviati nel data lake.

Informazioni su Apache Spark e i notebook

Apache Spark è un framework di elaborazione parallela che supporta l'elaborazione in memoria per migliorare le prestazioni di applicazioni analitiche di Big Data. Apache Spark in Azure Synapse Analytics è una delle implementazioni Microsoft di Apache Spark nel cloud.

Un notebook di Apache Spark in Synapse Studio è un'interfaccia Web che consente di creare file che contengono codice in tempo reale, visualizzazioni e testo descrittivo. I notebook possono essere usati per convalidare idee ed eseguire esperimenti rapidi per ottenere informazioni cognitive dettagliate dai dati. I notebook sono anche ampiamente usati per la preparazione e la visualizzazione dei dati, l'apprendimento automatico e altri scenari di Big Data.

Creare un notebook di Spark per Synapse

Si supponga di aver creato un flusso di dati per mapping in Synapse Analytics per elaborare, creare un join e importare i dati dei profili utente. Si vogliono ora trovare i primi cinque prodotti per ogni utente, considerando quelli preferiti e di prima scelta e quelli più acquistati negli ultimi 12 mesi. Si vogliono quindi calcolare i primi cinque prodotti in generale.

In questo esercizio si crea un notebook di Spark per Synapse per eseguire questi calcoli.

  1. Aprire Synapse Analytics Studio (https://web.azuresynapse.net/) e passare all'hub Dati.

    La voce di menu Dati è evidenziata.

  2. Selezionare la scheda Collegato (1) ed espandere l'account di archiviazione del data lake primario (2) sotto Azure Data Lake Storage Gen2. Selezionare il contenitore wwi-02 (3) e aprire la cartella top-products (4). Fare clic con il pulsante destro del mouse su un file Parquet (5), scegliere la voce di menu Nuovo notebook (6) e quindi selezionare Carica nel dataframe (7). Se la cartella non è visualizzata, selezionare Refresh.

    Il file Parquet e l'opzione Nuovo notebook sono evidenziati.

  3. Verificare che il notebook sia collegato al pool di Spark.

    La voce di menu Collega a pool di Spark è evidenziata.

  4. Sostituire il nome del file Parquet con *.parquet (1) per selezionare tutti i file Parquet nella cartella top-products. Ad esempio, il percorso dovrebbe essere simile a: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    Il nome file è evidenziato.

  5. Selezionare Esegui tutti sulla barra degli strumenti del notebook per eseguire il notebook.

    Vengono visualizzati i risultati della cella.

    Nota

    La prima volta che si esegue un notebook in un pool di Spark, Synapse crea una nuova sessione. Questa operazione può richiedere all'incirca 3-5 minuti.

    Nota

    Per eseguire solo la cella, passare il puntatore sulla cella e selezionare l'icona Esegui cella a sinistra della cella oppure selezionare la cella e quindi premere CTRL+INVIO.

  6. Creare una nuova cella più sotto selezionando il pulsante + e selezionando l'elemento Cella di codice. Il pulsante + si trova sotto la cella del notebook a sinistra. In alternativa, è anche possibile espandere il menu + Cella sulla barra degli strumenti del notebook e selezionare l'elemento Cella di codice.

    L'opzione di menu Aggiungi codice è evidenziata.

  7. Eseguire il comando seguente nella nuova cella per popolare un nuovo dataframe denominato topPurchases, creare una nuova vista temporanea denominata top_purchases e visualizzare le prime 100 righe:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    L'output dovrebbe essere simile al seguente:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Eseguire il comando seguente in una nuova cella per creare una nuova vista temporanea usando SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Nota

    Non è disponibile alcun output per questa query.

    La query usa la vista temporanea top_purchases come origine e applica un metodo row_number() over per applicare un numero di riga per i record per ogni utente, dove ItemsPurchasedLast12Months è il maggiore. La clausola where filtra i risultati in modo da recuperare un massimo di cinque prodotti dove sia IsTopProduct che IsPreferredProduct sono impostati su true. In questo modo si ottengono i primi cinque prodotti più acquistati per ogni utente, dove tali prodotti sono identificati anche come prodotti preferiti, in base al profilo utente archiviato in Azure Cosmos DB.

  9. Eseguire il comando seguente in una nuova cella per creare e visualizzare un nuovo dataframe che archivia i risultati della vista temporanea top_5_products creata nella cella precedente:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Verrà visualizzato un output simile al seguente, con i primi cinque prodotti preferiti per ogni utente:

    Vengono visualizzati i primi cinque prodotti preferiti per ogni utente.

  10. Calcolare i primi cinque prodotti in generale, in base a quelli preferiti dai clienti e più acquistati. A tale scopo, eseguire il comando seguente in una nuova cella:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    In questa cella sono stati raggruppati i primi cinque prodotti preferiti per ID prodotto, sono stati sommati gli articoli totali acquistati negli ultimi 12 mesi, è stato ordinato tale valore in ordine decrescente e sono stati restituiti i primi cinque risultati. L'output dovrebbe essere simile al seguente:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Creare una cella di parametri

Le pipeline di Azure Synapse cercano la cella di parametri e considerano il contenuto di questa cella come impostazioni predefinite per i parametri passati in fase di esecuzione. Il motore di esecuzione aggiungerà una nuova cella sotto la cella di parametri con i parametri di input per sovrascrivere i valori predefiniti. Quando non viene designata una cella di parametri, la cella inserita viene posizionata nella parte superiore del notebook.

  1. Questo notebook verrà eseguito da una pipeline. Si vuole passare un parametro che imposta un valore di variabile runId che verrà usato per assegnare un nome al file Parquet. Eseguire il comando seguente in una nuova cella:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Viene usata la libreria uuid fornita con Spark per generare un GUID casuale. Si vuole eseguire l'override della variabile runId con un parametro passato dalla pipeline. A questo scopo, è necessario attivarla come cella di parametri.

  2. Selezionare i puntini di sospensione delle azioni (...) nell'angolo in alto a destra della cella (1) e quindi selezionare Attiva/Disattiva la cella di parametri (2).

    La voce di menu è evidenziata.

    Dopo aver attivato questa opzione, nella cella verrà visualizzato il tag Parametri.

    La cella è configurata in modo da accettare i parametri.

  3. Incollare il codice seguente in una nuova cella per usare la variabile runId come nome del file Parquet nel percorso /top5-products/ nell'account del data lake primario. Nel percorso sostituire YOUR_DATALAKE_NAME con il nome del proprio account del data lake primario. Per trovarlo, scorrere verso l'alto fino alla cella 1 nella parte superiore della pagina (1). Copiare l'account di archiviazione del data lake dal percorso (2). Incollare questo valore sostituendo YOUR_DATALAKE_NAME nel percorso (3) all'interno della nuova cella, quindi eseguire il comando nella cella.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    Il percorso viene aggiornato con il nome dell'account del data lake primario.

  4. Verificare che il file sia stato scritto nel data lake. Passare all'hub Dati e selezionare la scheda Collegato (1). Espandere l'account di archiviazione del data lake primario e quindi selezionare il contenitore wwi-02 (2). Passare alla cartella top5-products (3). Verrà visualizzata una cartella per il file Parquet nella directory con un GUID come nome file (4).

    Il file parquet è evidenziato.

    Il metodo di scrittura di Parquet sul dataframe della cella del notebook ha creato questa directory perché in precedenza non esisteva.

Aggiungere il notebook a una pipeline di Synapse

Tornando al flusso di dati per mapping descritto all'inizio dell'esercizio, si supponga di voler eseguire questo notebook dopo le esecuzioni del flusso di dati durante il processo di orchestrazione. A questo scopo, si aggiunge il notebook a una pipeline come nuova attività del notebook.

  1. Tornare al notebook. Selezionare Proprietà (1) nell'angolo in alto a destra del notebook e quindi immettere Calculate Top 5 Products in Nome (2).

    È visualizzato il pannello Proprietà.

  2. Selezionare Aggiungi alla pipeline (1) nell'angolo in alto a destra del notebook e quindi selezionare Pipeline esistente (2).

    Il pulsante Aggiungi alla pipeline è evidenziato.

  3. Selezionare la pipeline Write User Profile Data to ASA (Scrivi i dati del profilo utente in ASA) (1) e quindi selezionare Aggiungi *2).

    La pipeline è selezionata.

  4. Synapse Studio aggiunge l'attività del notebook alla pipeline. Posizionare l'attività Notebook a destra dell'attività Flusso di dati. Selezionare l'attività Flusso di dati e trascinare fino all'attività Notebook una casella verde per la connessione della pipeline di attività con stato Riuscita.

    La freccia verde è evidenziata.

    La freccia dell'attività riuscita indica alla pipeline di eseguire l'attività Notebook dopo che l'attività Flusso di dati è stata eseguita correttamente.

  5. Selezionare l'attività Notebook (1), selezionare la scheda Impostazioni (2), espandere Parametri di base (3) e quindi selezionare + Nuovo (4). Immettere runId nel campo Nome (5). In Tipo (6) selezionare Stringa. In Valore selezionare Aggiungi contenuto dinamico (7).

    Vengono visualizzate le impostazioni.

  6. Selezionare ID esecuzione della pipeline in Variabili di sistema (1). @pipeline().RunId viene aggiunto alla casella del contenuto dinamico (2). Selezionare Fine (3) per chiudere la finestra di dialogo.

    Viene visualizzato il modulo del contenuto dinamico.

    Il valore ID esecuzione della pipeline è un GUID univoco assegnato a ogni esecuzione della pipeline. Questo valore verrà usato come nome del file Parquet passando questo valore come parametro del notebook runId. È quindi possibile esaminare la cronologia di esecuzione della pipeline e trovare il file Parquet specifico creato per ogni esecuzione della pipeline.

  7. Selezionare Pubblica tutti, quindi Pubblica per salvare le modifiche.

    Pubblica tutti è evidenziato.

  8. Al termine della pubblicazione, selezionare Aggiungi trigger (1), quindi Trigger now (2) (Attiva ora) per eseguire la pipeline aggiornata.

    La voce di menu del trigger è evidenziata.

  9. Selezionare OK per eseguire il trigger.

    Il pulsante OK è evidenziato.

Monitorare l'esecuzione della pipeline

L'hub di monitoraggio consente di monitorare le attività correnti e cronologiche per SQL, Apache Spark e le pipeline.

  1. Passare all'hub Monitoraggio.

    La voce di menu dell'hub Monitoraggio è selezionata.

  2. Selezionare Esecuzioni della pipeline (1) e attendere il completamento dell'esecuzione della pipeline (2). Può essere necessario aggiornare (3) la vista.

    L'esecuzione della pipeline è stata completata.

  3. Selezionare il nome della pipeline per visualizzare le esecuzioni attività della pipeline.

    Il nome della pipeline è selezionato.

  4. Si notino sia l'attività Flusso di dati che la nuova attività Notebook (1). Prendere nota del valore ID esecuzione della pipeline (2). Questo valore verrà confrontato con il nome del file Parquet generato dal notebook. Selezionare il nome del notebook Calculate Top 5 Products (Calcola i primi 5 prodotti) per visualizzarne i dettagli (3).

    I dettagli dell'esecuzione della pipeline sono visualizzati.

  5. Qui vengono visualizzati i dettagli dell'esecuzione del notebook. È possibile selezionare Riproduzione (1) per guardare una riproduzione dell'avanzamento da un processo all'altro (2). Nella parte inferiore è possibile visualizzare la diagnostica e i log con opzioni di filtro diverse (3). A destra è possibile visualizzare i dettagli dell'esecuzione, ad esempio la durata, l'ID di Livy, i dettagli del pool di Spark e così via. Selezionare il collegamento Visualizza dettagli in un processo per visualizzarne i dettagli (5).

    I dettagli dell'esecuzione sono visualizzati.

  6. L'interfaccia utente dell'applicazione Spark viene aperta in una nuova scheda dove è possibile visualizzare i dettagli della fase. Espandere la visualizzazione DAG per visualizzare i dettagli della fase.

    I dettagli della fase di Spark sono visualizzati.

  7. Tornare all'hub Dati.

    Hub dati.

  8. Selezionare la scheda Collegato (1), selezionare il contenitore wwi-02 (2) nell'account di archiviazione del data lake primario, passare alla cartella top5-products (3) e verificare che esista una cartella per il file Parquet il cui nome corrisponde all'ID esecuzione della pipeline.

    Il file è evidenziato.

    Come si può notare, è presente un file il cui nome corrisponde all'ID esecuzione della pipeline annotato in precedenza:

    L'ID esecuzione della pipeline è evidenziato.

    Questi valori corrispondono perché l'ID esecuzione della pipeline è stato passato al parametro runId nell'attività Notebook.