Oefening: een notebook integreren in Azure Synapse-pijplijnen

Voltooid

In deze les maakt u een Azure Synapse Spark-notebook voor het analyseren en transformeren van gegevens die zijn geladen door een toewijzingsgegevensstroom en slaat u de gegevens op in een data lake. U maakt een parametercel die een tekenreeksparameter accepteert die de mapnaam definieert voor de gegevens die het notebook naar de data lake schrijft.

Vervolgens voegt u dit notebook toe aan een Synapse-pijplijn en geeft u de unieke pijplijnuitvoerings-id door aan de notebookparameter, zodat u de pijplijnuitvoering later kunt correleren met de gegevens die zijn opgeslagen door de notebookactiviteit.

Ten slotte gebruikt u de Monitor-hub in Synapse Studio om de pijplijnuitvoering te bewaken, de uitvoerings-id op te halen en vervolgens de bijbehorende bestanden te zoeken die zijn opgeslagen in de data lake.

Over Apache Spark en notebooks

Apache Spark is een framework voor parallelle verwerking dat ondersteuning biedt voor in-memory verwerking om de prestaties van toepassingen voor de analyse van big data te verbeteren. Apache Spark in Azure Synapse Analytics is een van de implementaties van Apache Spark van Microsoft in de cloud.

Een Apache Spark-notebook in Synapse Studio is een webinterface waarmee u bestanden kunt maken die livecode, visualisaties en verhaaltekst bevatten. Notebooks zijn een goede plek om ideeën te valideren en snelle experimenten te gebruiken om inzichten uit uw gegevens te verkrijgen. Notebooks worden ook veel gebruikt in gegevensvoorbereiding, gegevensvisualisatie, machine learning en andere big data-scenario's.

Een Synapse Spark-notebook maken

Stel dat u een toewijzingsgegevensstroom hebt gemaakt in Synapse Analytics voor het verwerken, samenvoegen en importeren van gebruikersprofielgegevens. Nu wilt u de vijf belangrijkste producten voor elke gebruiker vinden, op basis van welke producten zowel de voorkeur hebben als de beste keuze, en de meeste aankopen hebben in de afgelopen 12 maanden. Vervolgens wilt u de vijf belangrijkste producten in het algemeen berekenen.

In deze oefening maakt u een Synapse Spark-notebook om deze berekeningen te maken.

  1. Open Synapse Analytics Studio (https://web.azuresynapse.net/) en ga naar de Data Hub.

    Het menu-item Gegevens is gemarkeerd.

  2. Selecteer het tabblad Gekoppeld (1) en vouw het primaire Data Lake Storage-account (2) onder Azure Data Lake Storage Gen2 uit. Selecteer de wwi-02-container (3) en open de map met topproducten (4). Klik met de rechtermuisknop op een Parquet-bestand (5), selecteer het menu-item Nieuw notitieblok (6) en selecteer vervolgens Laden naar DataFrame (7). Als u de map niet ziet, selecteert u Refresh.

    De optie Parquet-bestand en nieuwe notitieblok zijn gemarkeerd.

  3. Zorg ervoor dat het notebook is gekoppeld aan uw Spark-pool.

    Het menu-item Koppelen aan Spark-pool is gemarkeerd.

  4. Vervang de Parquet-bestandsnaam door *.parquet (1) om alle Parquet-bestanden in de top-products map te selecteren. Het pad moet bijvoorbeeld vergelijkbaar zijn met: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    De bestandsnaam is gemarkeerd.

  5. Selecteer Alles uitvoeren op de werkbalk van het notitieblok om het notebook uit te voeren.

    De celresultaten worden weergegeven.

    Notitie

    De eerste keer dat u een notebook uitvoert in een Spark-pool, maakt Synapse een nieuwe sessie. Dit kan ongeveer 3 - 5 minuten duren.

    Notitie

    Als u alleen de cel wilt uitvoeren, beweegt u de muisaanwijzer over de cel en selecteert u het pictogram Cel uitvoeren links van de cel of selecteert u de cel en voert u Ctrl+Enter in.

  6. Maak een nieuwe cel eronder door de + knop te selecteren en het celitem Code te selecteren. De knop + bevindt zich onder de notebookcel aan de linkerkant. U kunt ook het menu + Cel in de werkbalk Notitieblok uitvouwen en het item Codecel selecteren.

    De menuoptie Code toevoegen is gemarkeerd.

  7. Voer de volgende opdracht uit in de nieuwe cel om een nieuw dataframe met de naam topPurchasesin te vullen, maak een nieuwe tijdelijke weergave met de naam top_purchasesen geef de eerste 100 rijen weer:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    De uitvoer moet er als volgt uitzien:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Voer de volgende opdracht uit in een nieuwe cel om een nieuwe tijdelijke weergave te maken met behulp van SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Notitie

    Er is geen uitvoer voor deze query.

    De query maakt gebruik van de top_purchases tijdelijke weergave als bron en past een row_number() over methode toe om een rijnummer toe te passen voor de records voor elke gebruiker die het ItemsPurchasedLast12Months grootst is. De where component filtert de resultaten zodat we maximaal vijf producten ophalen waarbij beide IsTopProduct en IsPreferredProduct zijn ingesteld op waar. Dit geeft ons de vijf meest gekochte producten voor elke gebruiker waar deze producten ook worden geïdentificeerd als hun favoriete producten, volgens hun gebruikersprofiel dat is opgeslagen in Azure Cosmos DB.

  9. Voer de volgende opdracht uit in een nieuwe cel om een nieuw DataFrame te maken en weer te geven waarin de resultaten worden opgeslagen van de top_5_products tijdelijke weergave die u in de vorige cel hebt gemaakt:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Als het goed is, ziet u een uitvoer die vergelijkbaar is met de volgende, waarin de vijf beste producten per gebruiker worden weergegeven:

    De vijf belangrijkste voorkeursproducten worden per gebruiker weergegeven.

  10. Bereken de vijf belangrijkste producten in het algemeen, op basis van de producten die zowel door klanten als het meest worden gekocht. Voer hiervoor de volgende opdracht uit in een nieuwe cel:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    In deze cel hebben we de vijf beste producten gegroepeerd op product-id, de totale artikelen opgeteld die in de afgelopen 12 maanden zijn gekocht, die waarde in aflopende volgorde gesorteerd en de top vijf resultaten geretourneerd. Uw uitvoer moet er als volgt uitzien:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Een parametercel maken

Azure Synapse-pijplijnen zoeken naar de parametercel en behandelen deze cel als standaardwaarden voor de parameters die tijdens de uitvoering zijn doorgegeven. De uitvoeringsengine voegt een nieuwe cel toe onder de parametercel met invoerparameters om de standaardwaarden te overschrijven. Wanneer een parameterscel niet is aangewezen, wordt de geïnjecteerde cel boven aan het notebook ingevoegd.

  1. We gaan dit notebook uitvoeren vanuit een pijplijn. We willen een parameter doorgeven waarmee een runId variabelewaarde wordt ingesteld die wordt gebruikt om het Parquet-bestand een naam te geven. Voer de volgende opdracht uit in een nieuwe cel:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    We gebruiken de uuid bibliotheek die bij Spark wordt geleverd om een willekeurige GUID te genereren. We willen de runId variabele overschrijven met een parameter die door de pijplijn wordt doorgegeven. Hiervoor moeten we dit in-/uitschakelen als een parametercel.

  2. Selecteer het beletselteken (...) in de rechterbovenhoek van de cel (1) en selecteer vervolgens De parametercel in-/uitschakelen (2).

    Het menu-item is gemarkeerd.

    Nadat u deze optie hebt ingeschakeld, ziet u de tag Parameters in de cel.

    De cel is geconfigureerd om parameters te accepteren.

  3. Plak de volgende code in een nieuwe cel om de runId variabele te gebruiken als de Parquet-bestandsnaam in het /top5-products/ pad in het primaire Data Lake-account. Vervang YOUR_DATALAKE_NAME in het pad door de naam van uw primaire Data Lake-account. Schuif naar cel 1 boven aan de pagina (1) om dit te vinden. Kopieer het Data Lake Storage-account van het pad (2). Plak deze waarde als vervanging voor YOUR_DATALAKE_NAME in het pad (3) in de nieuwe cel en voer vervolgens de opdracht uit in de cel.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    Het pad wordt bijgewerkt met de naam van het primaire Data Lake-account.

  4. Controleer of het bestand naar de data lake is geschreven. Ga naar de datahub en selecteer het tabblad Gekoppeld (1). Vouw het primaire Data Lake Storage-account uit en selecteer vervolgens de wwi-02-container (2). Ga naar de map top5-producten (3). Als het goed is, ziet u een map voor het Parquet-bestand in de map met een GUID als bestandsnaam (4).

    Het Parquet-bestand is gemarkeerd.

    De Parquet-schrijfmethode op het dataframe in de notebook-cel heeft deze map gemaakt omdat deze nog niet eerder bestond.

Notebook toevoegen aan een Synapse-pijplijn

Als u terugwijst naar de toewijzing Gegevensstroom we aan het begin van de oefening hebben beschreven, moet u dit notebook uitvoeren nadat de Gegevensstroom wordt uitgevoerd als onderdeel van het indelingsproces. Hiervoor voegt u dit notebook toe aan een pijplijn als een nieuwe notebookactiviteit.

  1. Ga terug naar het notitieblok. Selecteer Eigenschappen (1) in de rechterbovenhoek van het notitieblok en voer Calculate Top 5 Products de naam (2) in.

    De blade Eigenschappen wordt weergegeven.

  2. Selecteer Toevoegen aan pijplijn (1) in de rechterbovenhoek van het notebook en selecteer vervolgens Bestaande pijplijn (2).

    De knop Toevoegen aan pijplijn is gemarkeerd.

  3. Selecteer de gebruikersprofielgegevens schrijven naar ASA-pijplijn (1) en selecteer vervolgens *(2).

    De pijplijn is geselecteerd.

  4. Synapse Studio voegt de notebookactiviteit toe aan de pijplijn. Rangschik de notebookactiviteit zo dat deze zich rechts van de gegevensstroomactiviteit bevindt. Selecteer de gegevensstroomactiviteit en sleep een groen vak voor de verbindingslijn voor een geslaagde activiteit naar de notebookactiviteit.

    De groene pijl is gemarkeerd.

    Met de pijl Geslaagde activiteit wordt de pijplijn geïnstrueerd om de Notebook-activiteit uit te voeren nadat de gegevensstroomactiviteit is uitgevoerd.

  5. Selecteer de notebookactiviteit (1) en selecteer vervolgens het tabblad Instellingen (2), vouw basisparameters (3) uit en selecteer vervolgens + Nieuw (4). Voer runId in het veld Naam (5) in. Selecteer Tekenreeks voor het type (6). Selecteer Dynamische inhoud toevoegen (7) voor de waarde.

    De instellingen worden weergegeven.

  6. Selecteer de pijplijnuitvoerings-id onder Systeemvariabelen (1). Hiermee wordt @pipeline().RunId het dynamische inhoudsvak (2) toegevoegd. Selecteer Voltooien (3) om het dialoogvenster te sluiten.

    Het formulier voor dynamische inhoud wordt weergegeven.

    De waarde van de pijplijnuitvoerings-id is een unieke GUID die aan elke pijplijnuitvoering is toegewezen. We gebruiken deze waarde voor de naam van het Parquet-bestand door deze waarde door te geven als de runId notebookparameter. Vervolgens kunnen we de geschiedenis van de pijplijnuitvoering bekijken en het specifieke Parquet-bestand vinden dat voor elke pijplijnuitvoering is gemaakt.

  7. Selecteer Alles publiceren en publiceren om uw wijzigingen op te slaan.

    Alles publiceren is gemarkeerd.

  8. Nadat het publiceren is voltooid, selecteert u Trigger toevoegen (1) en activeert u nu (2) om de bijgewerkte pijplijn uit te voeren.

    Het menu-item trigger is gemarkeerd.

  9. Selecteer OK om de trigger uit te voeren.

    De knop OK is gemarkeerd.

De pijplijnuitvoering controleren.

Met de Monitor-hub kunt u huidige en historische activiteiten voor SQL, Apache Spark en Pijplijnen bewaken.

  1. Ga naar de Monitor-hub .

    De menuopdracht Hub controleren is geselecteerd.

  2. Selecteer Pijplijnuitvoeringen (1) en wacht tot de pijplijnuitvoering is voltooid (2). Mogelijk moet u de weergave vernieuwen (3 ).

    De pijplijnuitvoering is voltooid.

  3. Selecteer de naam van de pijplijn om de uitvoeringen van de activiteit van de pijplijn weer te geven.

    De naam van de pijplijn is geselecteerd.

  4. Let op zowel de gegevensstroomactiviteit als de nieuwe notebookactiviteit (1). Noteer de waarde van de pijplijnuitvoerings-id (2). We vergelijken dit met de Parquet-bestandsnaam die door het notebook is gegenereerd. Selecteer de naam van het notitieblok top 5 producten berekenen om de details ervan weer te geven (3).

    De details van de pijplijnuitvoering worden weergegeven.

  5. Hier ziet u de details van de uitvoering van notebooks. U kunt Afspelen (1) selecteren om een weergave van de voortgang door de taken (2) te bekijken. Onderaan kunt u de diagnostische gegevens en logboeken met verschillende filteropties (3) bekijken. Aan de rechterkant kunnen we de uitvoeringsdetails bekijken, zoals de duur, Livy-id, Spark-pooldetails, enzovoort. Selecteer de koppeling Details weergeven voor een taak om de details ervan weer te geven (5).

    De uitvoeringsdetails worden weergegeven.

  6. De gebruikersinterface van de Spark-toepassing wordt geopend op een nieuw tabblad waar we de fasedetails kunnen zien. Vouw de DAG-visualisatie uit om de fasedetails weer te geven.

    De details van de Spark-fase worden weergegeven.

  7. Ga terug naar de Data Hub.

    Data hub.

  8. Selecteer het tabblad Gekoppeld (1) en selecteer vervolgens de wwi-02-container (2) in het primaire Data Lake Storage-account, ga naar de map top5-products (3) en controleer of er een map bestaat voor het Parquet-bestand waarvan de naam overeenkomt met de pijplijnuitvoerings-id.

    Het bestand is gemarkeerd.

    Zoals u kunt zien, hebben we een bestand waarvan de naam overeenkomt met de pijplijnuitvoerings-id die we eerder hebben genoteerd:

    De id van de pijplijnuitvoering is gemarkeerd.

    Deze waarden komen overeen omdat we de pijplijnuitvoerings-id hebben doorgegeven aan de runId parameter in de Notebook-activiteit.