Oefening: een notebook integreren in Azure Synapse-pijplijnen
In deze les maakt u een Azure Synapse Spark-notebook voor het analyseren en transformeren van gegevens die zijn geladen door een toewijzingsgegevensstroom en slaat u de gegevens op in een data lake. U maakt een parametercel die een tekenreeksparameter accepteert die de mapnaam definieert voor de gegevens die het notebook naar de data lake schrijft.
Vervolgens voegt u dit notebook toe aan een Synapse-pijplijn en geeft u de unieke pijplijnuitvoerings-id door aan de notebookparameter, zodat u de pijplijnuitvoering later kunt correleren met de gegevens die zijn opgeslagen door de notebookactiviteit.
Ten slotte gebruikt u de Monitor-hub in Synapse Studio om de pijplijnuitvoering te bewaken, de uitvoerings-id op te halen en vervolgens de bijbehorende bestanden te zoeken die zijn opgeslagen in de data lake.
Over Apache Spark en notebooks
Apache Spark is een framework voor parallelle verwerking dat ondersteuning biedt voor in-memory verwerking om de prestaties van toepassingen voor de analyse van big data te verbeteren. Apache Spark in Azure Synapse Analytics is een van de implementaties van Apache Spark van Microsoft in de cloud.
Een Apache Spark-notebook in Synapse Studio is een webinterface waarmee u bestanden kunt maken die livecode, visualisaties en verhaaltekst bevatten. Notebooks zijn een goede plek om ideeën te valideren en snelle experimenten te gebruiken om inzichten uit uw gegevens te verkrijgen. Notebooks worden ook veel gebruikt in gegevensvoorbereiding, gegevensvisualisatie, machine learning en andere big data-scenario's.
Een Synapse Spark-notebook maken
Stel dat u een toewijzingsgegevensstroom hebt gemaakt in Synapse Analytics voor het verwerken, samenvoegen en importeren van gebruikersprofielgegevens. Nu wilt u de vijf belangrijkste producten voor elke gebruiker vinden, op basis van welke producten zowel de voorkeur hebben als de beste keuze, en de meeste aankopen hebben in de afgelopen 12 maanden. Vervolgens wilt u de vijf belangrijkste producten in het algemeen berekenen.
In deze oefening maakt u een Synapse Spark-notebook om deze berekeningen te maken.
Open Synapse Analytics Studio (https://web.azuresynapse.net/) en ga naar de Data Hub.
Selecteer het tabblad Gekoppeld (1) en vouw het primaire Data Lake Storage-account (2) onder Azure Data Lake Storage Gen2 uit. Selecteer de wwi-02-container (3) en open de map met topproducten (4). Klik met de rechtermuisknop op een Parquet-bestand (5), selecteer het menu-item Nieuw notitieblok (6) en selecteer vervolgens Laden naar DataFrame (7). Als u de map niet ziet, selecteert u
Refresh
.Zorg ervoor dat het notebook is gekoppeld aan uw Spark-pool.
Vervang de Parquet-bestandsnaam door
*.parquet
(1) om alle Parquet-bestanden in detop-products
map te selecteren. Het pad moet bijvoorbeeld vergelijkbaar zijn met:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Selecteer Alles uitvoeren op de werkbalk van het notitieblok om het notebook uit te voeren.
Notitie
De eerste keer dat u een notebook uitvoert in een Spark-pool, maakt Synapse een nieuwe sessie. Dit kan ongeveer 3 - 5 minuten duren.
Notitie
Als u alleen de cel wilt uitvoeren, beweegt u de muisaanwijzer over de cel en selecteert u het pictogram Cel uitvoeren links van de cel of selecteert u de cel en voert u Ctrl+Enter in.
Maak een nieuwe cel eronder door de + knop te selecteren en het celitem Code te selecteren. De knop + bevindt zich onder de notebookcel aan de linkerkant. U kunt ook het menu + Cel in de werkbalk Notitieblok uitvouwen en het item Codecel selecteren.
Voer de volgende opdracht uit in de nieuwe cel om een nieuw dataframe met de naam
topPurchases
in te vullen, maak een nieuwe tijdelijke weergave met de naamtop_purchases
en geef de eerste 100 rijen weer:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
De uitvoer moet er als volgt uitzien:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Voer de volgende opdracht uit in een nieuwe cel om een nieuwe tijdelijke weergave te maken met behulp van SQL:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Notitie
Er is geen uitvoer voor deze query.
De query maakt gebruik van de
top_purchases
tijdelijke weergave als bron en past eenrow_number() over
methode toe om een rijnummer toe te passen voor de records voor elke gebruiker die hetItemsPurchasedLast12Months
grootst is. Dewhere
component filtert de resultaten zodat we maximaal vijf producten ophalen waarbij beideIsTopProduct
enIsPreferredProduct
zijn ingesteld op waar. Dit geeft ons de vijf meest gekochte producten voor elke gebruiker waar deze producten ook worden geïdentificeerd als hun favoriete producten, volgens hun gebruikersprofiel dat is opgeslagen in Azure Cosmos DB.Voer de volgende opdracht uit in een nieuwe cel om een nieuw DataFrame te maken en weer te geven waarin de resultaten worden opgeslagen van de
top_5_products
tijdelijke weergave die u in de vorige cel hebt gemaakt:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Als het goed is, ziet u een uitvoer die vergelijkbaar is met de volgende, waarin de vijf beste producten per gebruiker worden weergegeven:
Bereken de vijf belangrijkste producten in het algemeen, op basis van de producten die zowel door klanten als het meest worden gekocht. Voer hiervoor de volgende opdracht uit in een nieuwe cel:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
In deze cel hebben we de vijf beste producten gegroepeerd op product-id, de totale artikelen opgeteld die in de afgelopen 12 maanden zijn gekocht, die waarde in aflopende volgorde gesorteerd en de top vijf resultaten geretourneerd. Uw uitvoer moet er als volgt uitzien:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Een parametercel maken
Azure Synapse-pijplijnen zoeken naar de parametercel en behandelen deze cel als standaardwaarden voor de parameters die tijdens de uitvoering zijn doorgegeven. De uitvoeringsengine voegt een nieuwe cel toe onder de parametercel met invoerparameters om de standaardwaarden te overschrijven. Wanneer een parameterscel niet is aangewezen, wordt de geïnjecteerde cel boven aan het notebook ingevoegd.
We gaan dit notebook uitvoeren vanuit een pijplijn. We willen een parameter doorgeven waarmee een
runId
variabelewaarde wordt ingesteld die wordt gebruikt om het Parquet-bestand een naam te geven. Voer de volgende opdracht uit in een nieuwe cel:import uuid # Generate random GUID runId = uuid.uuid4()
We gebruiken de
uuid
bibliotheek die bij Spark wordt geleverd om een willekeurige GUID te genereren. We willen derunId
variabele overschrijven met een parameter die door de pijplijn wordt doorgegeven. Hiervoor moeten we dit in-/uitschakelen als een parametercel.Selecteer het beletselteken (...) in de rechterbovenhoek van de cel (1) en selecteer vervolgens De parametercel in-/uitschakelen (2).
Nadat u deze optie hebt ingeschakeld, ziet u de tag Parameters in de cel.
Plak de volgende code in een nieuwe cel om de
runId
variabele te gebruiken als de Parquet-bestandsnaam in het/top5-products/
pad in het primaire Data Lake-account. VervangYOUR_DATALAKE_NAME
in het pad door de naam van uw primaire Data Lake-account. Schuif naar cel 1 boven aan de pagina (1) om dit te vinden. Kopieer het Data Lake Storage-account van het pad (2). Plak deze waarde als vervanging voorYOUR_DATALAKE_NAME
in het pad (3) in de nieuwe cel en voer vervolgens de opdracht uit in de cel.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Controleer of het bestand naar de data lake is geschreven. Ga naar de datahub en selecteer het tabblad Gekoppeld (1). Vouw het primaire Data Lake Storage-account uit en selecteer vervolgens de wwi-02-container (2). Ga naar de map top5-producten (3). Als het goed is, ziet u een map voor het Parquet-bestand in de map met een GUID als bestandsnaam (4).
De Parquet-schrijfmethode op het dataframe in de notebook-cel heeft deze map gemaakt omdat deze nog niet eerder bestond.
Notebook toevoegen aan een Synapse-pijplijn
Als u terugwijst naar de toewijzing Gegevensstroom we aan het begin van de oefening hebben beschreven, moet u dit notebook uitvoeren nadat de Gegevensstroom wordt uitgevoerd als onderdeel van het indelingsproces. Hiervoor voegt u dit notebook toe aan een pijplijn als een nieuwe notebookactiviteit.
Ga terug naar het notitieblok. Selecteer Eigenschappen (1) in de rechterbovenhoek van het notitieblok en voer
Calculate Top 5 Products
de naam (2) in.Selecteer Toevoegen aan pijplijn (1) in de rechterbovenhoek van het notebook en selecteer vervolgens Bestaande pijplijn (2).
Selecteer de gebruikersprofielgegevens schrijven naar ASA-pijplijn (1) en selecteer vervolgens *(2).
Synapse Studio voegt de notebookactiviteit toe aan de pijplijn. Rangschik de notebookactiviteit zo dat deze zich rechts van de gegevensstroomactiviteit bevindt. Selecteer de gegevensstroomactiviteit en sleep een groen vak voor de verbindingslijn voor een geslaagde activiteit naar de notebookactiviteit.
Met de pijl Geslaagde activiteit wordt de pijplijn geïnstrueerd om de Notebook-activiteit uit te voeren nadat de gegevensstroomactiviteit is uitgevoerd.
Selecteer de notebookactiviteit (1) en selecteer vervolgens het tabblad Instellingen (2), vouw basisparameters (3) uit en selecteer vervolgens + Nieuw (4). Voer
runId
in het veld Naam (5) in. Selecteer Tekenreeks voor het type (6). Selecteer Dynamische inhoud toevoegen (7) voor de waarde.Selecteer de pijplijnuitvoerings-id onder Systeemvariabelen (1). Hiermee wordt
@pipeline().RunId
het dynamische inhoudsvak (2) toegevoegd. Selecteer Voltooien (3) om het dialoogvenster te sluiten.De waarde van de pijplijnuitvoerings-id is een unieke GUID die aan elke pijplijnuitvoering is toegewezen. We gebruiken deze waarde voor de naam van het Parquet-bestand door deze waarde door te geven als de
runId
notebookparameter. Vervolgens kunnen we de geschiedenis van de pijplijnuitvoering bekijken en het specifieke Parquet-bestand vinden dat voor elke pijplijnuitvoering is gemaakt.Selecteer Alles publiceren en publiceren om uw wijzigingen op te slaan.
Nadat het publiceren is voltooid, selecteert u Trigger toevoegen (1) en activeert u nu (2) om de bijgewerkte pijplijn uit te voeren.
Selecteer OK om de trigger uit te voeren.
De pijplijnuitvoering controleren.
Met de Monitor-hub kunt u huidige en historische activiteiten voor SQL, Apache Spark en Pijplijnen bewaken.
Ga naar de Monitor-hub .
Selecteer Pijplijnuitvoeringen (1) en wacht tot de pijplijnuitvoering is voltooid (2). Mogelijk moet u de weergave vernieuwen (3 ).
Selecteer de naam van de pijplijn om de uitvoeringen van de activiteit van de pijplijn weer te geven.
Let op zowel de gegevensstroomactiviteit als de nieuwe notebookactiviteit (1). Noteer de waarde van de pijplijnuitvoerings-id (2). We vergelijken dit met de Parquet-bestandsnaam die door het notebook is gegenereerd. Selecteer de naam van het notitieblok top 5 producten berekenen om de details ervan weer te geven (3).
Hier ziet u de details van de uitvoering van notebooks. U kunt Afspelen (1) selecteren om een weergave van de voortgang door de taken (2) te bekijken. Onderaan kunt u de diagnostische gegevens en logboeken met verschillende filteropties (3) bekijken. Aan de rechterkant kunnen we de uitvoeringsdetails bekijken, zoals de duur, Livy-id, Spark-pooldetails, enzovoort. Selecteer de koppeling Details weergeven voor een taak om de details ervan weer te geven (5).
De gebruikersinterface van de Spark-toepassing wordt geopend op een nieuw tabblad waar we de fasedetails kunnen zien. Vouw de DAG-visualisatie uit om de fasedetails weer te geven.
Ga terug naar de Data Hub.
Selecteer het tabblad Gekoppeld (1) en selecteer vervolgens de wwi-02-container (2) in het primaire Data Lake Storage-account, ga naar de map top5-products (3) en controleer of er een map bestaat voor het Parquet-bestand waarvan de naam overeenkomt met de pijplijnuitvoerings-id.
Zoals u kunt zien, hebben we een bestand waarvan de naam overeenkomt met de pijplijnuitvoerings-id die we eerder hebben genoteerd:
Deze waarden komen overeen omdat we de pijplijnuitvoerings-id hebben doorgegeven aan de
runId
parameter in de Notebook-activiteit.