Esercizio: Integrare un notebook nelle pipeline di Azure Synapse
In questa unità si crea un notebook di Spark per Azure Synapse per analizzare e trasformare i dati caricati da un flusso di dati per mapping e archiviare i dati in un data lake. Si crea una cella di parametri che accetta un parametro di tipo stringa che definisce il nome della cartella per i dati che il notebook scrive nel data lake.
Si aggiunge quindi il notebook a una pipeline di Synapse e si passa l'ID univoco dell'esecuzione della pipeline al parametro del notebook, in modo che in un secondo momento sarà possibile correlare l'esecuzione della pipeline ai dati salvati dall'attività del notebook.
Si usa infine l'hub di monitoraggio in Synapse Studio per monitorare l'esecuzione della pipeline, ottenere l'ID esecuzione e quindi individuare i file corrispondenti archiviati nel data lake.
Informazioni su Apache Spark e i notebook
Apache Spark è un framework di elaborazione parallela che supporta l'elaborazione in memoria per migliorare le prestazioni di applicazioni analitiche di Big Data. Apache Spark in Azure Synapse Analytics è una delle implementazioni Microsoft di Apache Spark nel cloud.
Un notebook di Apache Spark in Synapse Studio è un'interfaccia Web che consente di creare file che contengono codice in tempo reale, visualizzazioni e testo descrittivo. I notebook possono essere usati per convalidare idee ed eseguire esperimenti rapidi per ottenere informazioni cognitive dettagliate dai dati. I notebook sono anche ampiamente usati per la preparazione e la visualizzazione dei dati, l'apprendimento automatico e altri scenari di Big Data.
Creare un notebook di Spark per Synapse
Si supponga di aver creato un flusso di dati per mapping in Synapse Analytics per elaborare, creare un join e importare i dati dei profili utente. Si vogliono ora trovare i primi cinque prodotti per ogni utente, considerando quelli preferiti e di prima scelta e quelli più acquistati negli ultimi 12 mesi. Si vogliono quindi calcolare i primi cinque prodotti in generale.
In questo esercizio si crea un notebook di Spark per Synapse per eseguire questi calcoli.
Aprire Synapse Analytics Studio (https://web.azuresynapse.net/) e passare all'hub Dati.
Selezionare la scheda Collegato (1) ed espandere l'account di archiviazione del data lake primario (2) sotto Azure Data Lake Storage Gen2. Selezionare il contenitore wwi-02 (3) e aprire la cartella top-products (4). Fare clic con il pulsante destro del mouse su un file Parquet (5), scegliere la voce di menu Nuovo notebook (6) e quindi selezionare Carica nel dataframe (7). Se la cartella non è visualizzata, selezionare
Refresh
.Verificare che il notebook sia collegato al pool di Spark.
Sostituire il nome del file Parquet con
*.parquet
(1) per selezionare tutti i file Parquet nella cartellatop-products
. Ad esempio, il percorso dovrebbe essere simile a:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Selezionare Esegui tutti sulla barra degli strumenti del notebook per eseguire il notebook.
Nota
La prima volta che si esegue un notebook in un pool di Spark, Synapse crea una nuova sessione. Questa operazione può richiedere all'incirca 3-5 minuti.
Nota
Per eseguire solo la cella, passare il puntatore sulla cella e selezionare l'icona Esegui cella a sinistra della cella oppure selezionare la cella e quindi premere CTRL+INVIO.
Creare una nuova cella più sotto selezionando il pulsante + e selezionando l'elemento Cella di codice. Il pulsante + si trova sotto la cella del notebook a sinistra. In alternativa, è anche possibile espandere il menu + Cella sulla barra degli strumenti del notebook e selezionare l'elemento Cella di codice.
Eseguire il comando seguente nella nuova cella per popolare un nuovo dataframe denominato
topPurchases
, creare una nuova vista temporanea denominatatop_purchases
e visualizzare le prime 100 righe:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
L'output dovrebbe essere simile al seguente:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Eseguire il comando seguente in una nuova cella per creare una nuova vista temporanea usando SQL:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Nota
Non è disponibile alcun output per questa query.
La query usa la vista temporanea
top_purchases
come origine e applica un metodorow_number() over
per applicare un numero di riga per i record per ogni utente, doveItemsPurchasedLast12Months
è il maggiore. La clausolawhere
filtra i risultati in modo da recuperare un massimo di cinque prodotti dove siaIsTopProduct
cheIsPreferredProduct
sono impostati su true. In questo modo si ottengono i primi cinque prodotti più acquistati per ogni utente, dove tali prodotti sono identificati anche come prodotti preferiti, in base al profilo utente archiviato in Azure Cosmos DB.Eseguire il comando seguente in una nuova cella per creare e visualizzare un nuovo dataframe che archivia i risultati della vista temporanea
top_5_products
creata nella cella precedente:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Verrà visualizzato un output simile al seguente, con i primi cinque prodotti preferiti per ogni utente:
Calcolare i primi cinque prodotti in generale, in base a quelli preferiti dai clienti e più acquistati. A tale scopo, eseguire il comando seguente in una nuova cella:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
In questa cella sono stati raggruppati i primi cinque prodotti preferiti per ID prodotto, sono stati sommati gli articoli totali acquistati negli ultimi 12 mesi, è stato ordinato tale valore in ordine decrescente e sono stati restituiti i primi cinque risultati. L'output dovrebbe essere simile al seguente:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Creare una cella di parametri
Le pipeline di Azure Synapse cercano la cella di parametri e considerano il contenuto di questa cella come impostazioni predefinite per i parametri passati in fase di esecuzione. Il motore di esecuzione aggiungerà una nuova cella sotto la cella di parametri con i parametri di input per sovrascrivere i valori predefiniti. Quando non viene designata una cella di parametri, la cella inserita viene posizionata nella parte superiore del notebook.
Questo notebook verrà eseguito da una pipeline. Si vuole passare un parametro che imposta un valore di variabile
runId
che verrà usato per assegnare un nome al file Parquet. Eseguire il comando seguente in una nuova cella:import uuid # Generate random GUID runId = uuid.uuid4()
Viene usata la libreria
uuid
fornita con Spark per generare un GUID casuale. Si vuole eseguire l'override della variabilerunId
con un parametro passato dalla pipeline. A questo scopo, è necessario attivarla come cella di parametri.Selezionare i puntini di sospensione delle azioni (...) nell'angolo in alto a destra della cella (1) e quindi selezionare Attiva/Disattiva la cella di parametri (2).
Dopo aver attivato questa opzione, nella cella verrà visualizzato il tag Parametri.
Incollare il codice seguente in una nuova cella per usare la variabile
runId
come nome del file Parquet nel percorso/top5-products/
nell'account del data lake primario. Nel percorso sostituireYOUR_DATALAKE_NAME
con il nome del proprio account del data lake primario. Per trovarlo, scorrere verso l'alto fino alla cella 1 nella parte superiore della pagina (1). Copiare l'account di archiviazione del data lake dal percorso (2). Incollare questo valore sostituendoYOUR_DATALAKE_NAME
nel percorso (3) all'interno della nuova cella, quindi eseguire il comando nella cella.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Verificare che il file sia stato scritto nel data lake. Passare all'hub Dati e selezionare la scheda Collegato (1). Espandere l'account di archiviazione del data lake primario e quindi selezionare il contenitore wwi-02 (2). Passare alla cartella top5-products (3). Verrà visualizzata una cartella per il file Parquet nella directory con un GUID come nome file (4).
Il metodo di scrittura di Parquet sul dataframe della cella del notebook ha creato questa directory perché in precedenza non esisteva.
Aggiungere il notebook a una pipeline di Synapse
Tornando al flusso di dati per mapping descritto all'inizio dell'esercizio, si supponga di voler eseguire questo notebook dopo le esecuzioni del flusso di dati durante il processo di orchestrazione. A questo scopo, si aggiunge il notebook a una pipeline come nuova attività del notebook.
Tornare al notebook. Selezionare Proprietà (1) nell'angolo in alto a destra del notebook e quindi immettere
Calculate Top 5 Products
in Nome (2).Selezionare Aggiungi alla pipeline (1) nell'angolo in alto a destra del notebook e quindi selezionare Pipeline esistente (2).
Selezionare la pipeline Write User Profile Data to ASA (Scrivi i dati del profilo utente in ASA) (1) e quindi selezionare Aggiungi *2).
Synapse Studio aggiunge l'attività del notebook alla pipeline. Posizionare l'attività Notebook a destra dell'attività Flusso di dati. Selezionare l'attività Flusso di dati e trascinare fino all'attività Notebook una casella verde per la connessione della pipeline di attività con stato Riuscita.
La freccia dell'attività riuscita indica alla pipeline di eseguire l'attività Notebook dopo che l'attività Flusso di dati è stata eseguita correttamente.
Selezionare l'attività Notebook (1), selezionare la scheda Impostazioni (2), espandere Parametri di base (3) e quindi selezionare + Nuovo (4). Immettere
runId
nel campo Nome (5). In Tipo (6) selezionare Stringa. In Valore selezionare Aggiungi contenuto dinamico (7).Selezionare ID esecuzione della pipeline in Variabili di sistema (1).
@pipeline().RunId
viene aggiunto alla casella del contenuto dinamico (2). Selezionare Fine (3) per chiudere la finestra di dialogo.Il valore ID esecuzione della pipeline è un GUID univoco assegnato a ogni esecuzione della pipeline. Questo valore verrà usato come nome del file Parquet passando questo valore come parametro del notebook
runId
. È quindi possibile esaminare la cronologia di esecuzione della pipeline e trovare il file Parquet specifico creato per ogni esecuzione della pipeline.Selezionare Pubblica tutti, quindi Pubblica per salvare le modifiche.
Al termine della pubblicazione, selezionare Aggiungi trigger (1), quindi Trigger now (2) (Attiva ora) per eseguire la pipeline aggiornata.
Selezionare OK per eseguire il trigger.
Monitorare l'esecuzione della pipeline
L'hub di monitoraggio consente di monitorare le attività correnti e cronologiche per SQL, Apache Spark e le pipeline.
Passare all'hub Monitoraggio.
Selezionare Esecuzioni della pipeline (1) e attendere il completamento dell'esecuzione della pipeline (2). Può essere necessario aggiornare (3) la vista.
Selezionare il nome della pipeline per visualizzare le esecuzioni attività della pipeline.
Si notino sia l'attività Flusso di dati che la nuova attività Notebook (1). Prendere nota del valore ID esecuzione della pipeline (2). Questo valore verrà confrontato con il nome del file Parquet generato dal notebook. Selezionare il nome del notebook Calculate Top 5 Products (Calcola i primi 5 prodotti) per visualizzarne i dettagli (3).
Qui vengono visualizzati i dettagli dell'esecuzione del notebook. È possibile selezionare Riproduzione (1) per guardare una riproduzione dell'avanzamento da un processo all'altro (2). Nella parte inferiore è possibile visualizzare la diagnostica e i log con opzioni di filtro diverse (3). A destra è possibile visualizzare i dettagli dell'esecuzione, ad esempio la durata, l'ID di Livy, i dettagli del pool di Spark e così via. Selezionare il collegamento Visualizza dettagli in un processo per visualizzarne i dettagli (5).
L'interfaccia utente dell'applicazione Spark viene aperta in una nuova scheda dove è possibile visualizzare i dettagli della fase. Espandere la visualizzazione DAG per visualizzare i dettagli della fase.
Tornare all'hub Dati.
Selezionare la scheda Collegato (1), selezionare il contenitore wwi-02 (2) nell'account di archiviazione del data lake primario, passare alla cartella top5-products (3) e verificare che esista una cartella per il file Parquet il cui nome corrisponde all'ID esecuzione della pipeline.
Come si può notare, è presente un file il cui nome corrisponde all'ID esecuzione della pipeline annotato in precedenza:
Questi valori corrispondono perché l'ID esecuzione della pipeline è stato passato al parametro
runId
nell'attività Notebook.