Übung: Integrieren eines Notebooks mit Azure Synapse-Pipelines
In dieser Lerneinheit erstellen Sie ein Azure Synapse Spark-Notebook, um von einem Zuordnungsdatenfluss geladene Daten zu analysieren und zu transformieren. Anschließend speichern Sie die Daten in einem Data Lake. Sie erstellen eine Parameterzelle, die einen Zeichenfolgenparameter akzeptiert, der den Ordnernamen für die Daten definiert, die das Notebook in den Data Lake schreibt.
Dann fügen Sie dieses Notebook einer Synapse-Pipeline hinzu und übergeben die eindeutige Ausführungs-ID der Pipeline an den Notebookparameter, damit Sie die Pipelineausführung später mit den von der Aktivität „Notebook“ gespeicherten Daten korrelieren können.
Schließlich verwenden Sie den Hub Überwachung in Synapse Studio, um die Pipelineausführung zu überwachen, die Ausführungs-ID abzurufen und dann die entsprechenden Dateien zu suchen, die im Data Lake gespeichert sind.
Informationen zu Apache Spark und Notebooks
Apache Spark ist ein Framework für die Parallelverarbeitung, das In-Memory-Verarbeitung unterstützt, um die Leistung von Big Data-Analyseanwendungen zu steigern. Apache Spark in Azure Synapse Analytics ist eine der cloudbasierten Apache Spark-Implementierungen von Microsoft.
Ein Apache Spark-Notebook in Synapse Studio ist eine Webschnittstelle, mit der Sie Dateien erstellen können, die Livecode, Visualisierungen und narrativen Text enthalten. Notebooks sind ein guter Ausgangspunkt, um Ideen zu überprüfen und schnelle Experimente zu verwenden, um Erkenntnisse aus Ihren Daten zu gewinnen. Notebooks werden auch häufig bei der Datenvorbereitung, Datenvisualisierung, Machine Learning und andere Big Data-Szenarien verwendet.
Erstellen eines Synapse Spark-Notebooks
Angenommen, Sie haben einen Zuordnungsdatenfluss in Synapse Analytics erstellt, um Benutzerprofildaten zu verarbeiten, zu verknüpfen und zu importieren. Nun möchten Sie für die letzten zwölf Monate die fünf bevorzugten, beliebtesten und meistgekauften Produkte aller Benutzer*innen finden. Anschließend möchten Sie die fünf beliebtesten Produkte insgesamt berechnen.
In dieser Übung erstellen Sie ein Synapse Spark-Notebook, um diese Berechnungen anzustellen.
Öffnen Synapse Analytics Studio (https://web.azuresynapse.net/), und navigieren Sie zum Hub https://web.azuresynapse.net/.
Wählen Sie die Registerkarte Verknüpft(1) aus, und erweitern Sie das primäre Data Lake-Speicherkonto (2) unterhalb von Azure Data Lake Storage Gen2. Wählen Sie den Container wwi-02(3) aus, und öffnen Sie den Ordner top-products(4). Klicken Sie mit der rechten Maustaste auf eine beliebige Parquet-Datei (5), und wählen Sie das Menüelement Neues Notebook(6) und dann In Datenframe laden (7) aus. Wenn der Ordner nicht angezeigt wird, klicken Sie auf
Refresh
.Vergewissern Sie sich, dass das Notebook an Ihren Spark-Pool angefügt ist.
Ersetzen Sie den Parquet-Dateinamen durch
*.parquet
(1), um alle Parquet-Dateien im Ordnertop-products
auszuwählen. Der Pfad sollte z. B. in etwa wie der folgende lauten:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Wählen Sie auf der Symbolleiste des Notebooks Alle ausführen aus, um das Notebook auszuführen.
Hinweis
Wenn Sie ein Notebook zum ersten Mal in einem Spark-Pool ausführen, erstellt Synapse eine neue Sitzung. Dieser Vorgang kann etwa drei bis fünf Minuten dauern.
Hinweis
Wenn Sie nur die Zelle ausführen möchten, zeigen Sie auf diese, und klicken Sie links neben der Zelle auf das Symbol Zelle ausführen, oder wählen Sie die Zelle aus, und drücken Sie STRG+EINGABETASTE.
Erstellen Sie darunter eine neue Zelle, indem Sie auf die Schaltfläche + und dann auf das Element + (Codezelle) klicken. Die Schaltfläche + befindet sich unter der Notebookzelle auf der linken Seite. Alternativ können Sie auch das Menü + Zelle in der Notebooksymbolleiste erweitern und das Element Codezelle auswählen.
Führen Sie den folgenden Befehl in der neuen Zelle aus, um einen neuen Dataframe namens
topPurchases
aufzufüllen, eine neue temporäre Ansicht namenstop_purchases
zu erstellen, und die ersten 100 Zeilen anzuzeigen:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
Die Ausgabe sollte in etwa wie folgt aussehen:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Führen Sie den folgenden Befehl in einer neuen Zelle aus, um mithilfe von SQL eine neue temporäre Ansicht zu erstellen:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Hinweis
Für diese Abfrage gibt es keine Ausgabe.
Die Abfrage verwendet die temporäre Ansicht
top_purchases
als Quelle und wendet einerow_number() over
-Methode an, um eine Zeilennummer für die Datensätze der einzelnen Benutzer anzuwenden, wobeiItemsPurchasedLast12Months
am größten ist. Diewhere
-Klausel filtert die Ergebnisse, sodass wir nur bis zu fünf Produkte abrufen, bei denen sowohlIsTopProduct
als auchIsPreferredProduct
auf „true“ festgelegt sind. Dadurch erhalten wir die fünf meistgekauften Produkte für jeden Benutzer, bei denen diese Produkte auch entsprechend ihrem Benutzerprofil, das in Azure Cosmos DB gespeichert ist, als ihre bevorzugten Produkte identifiziert werden.Führen Sie den folgenden Befehl in einer neuen Zelle aus, um einen neuen Dataframe zu erstellen und anzuzeigen, in dem die Ergebnisse der temporären Ansicht
top_5_products
gespeichert werden, die Sie in der vorherigen Zelle erstellt haben:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Es sollte eine Ausgabe ähnlich der folgenden angezeigt werden, in der die fünf bevorzugten Produkte pro Benutzer angezeigt werden:
Berechnen Sie die fünf wichtigsten Produkte insgesamt, basierend auf den Produkten, die sowohl von Kunden bevorzugt, als auch am meisten gekauft wurden. Führen Sie hierzu den folgenden Befehl in einer neuen Zelle aus:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
In dieser Zelle haben wir die fünf bevorzugten Produkte nach Produkt-ID geordnet, die Gesamtanzahl der in den letzten 12 Monaten gekauften Artikel addiert, diesen Wert in absteigender Reihenfolge sortiert, und die ersten fünf Ergebnisse zurückgegeben. Die Ausgabe sollte in etwa wie folgt aussehen:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Erstellen einer Parameterzelle
Azure Synapse-Pipelines suchen nach der Parameterzelle und behandeln diese Zelle als Standard für die Parameter, die zur Ausführungszeit übergeben werden. Die Ausführungs-Engine fügt eine neue Zelle mit Eingabeparametern unter der Parameterzelle hinzu, um die Standardwerte zu überschreiben. Wenn keine Parameterzelle angegeben ist, wird die Zelle ganz oben im Notebook eingefügt.
Wir führen dieses Notebook aus einer Pipeline aus. Wir möchten einen Parameter übergeben, der einen
runId
-Variablenwert festlegt, der zum Benennen der Parquet-Datei verwendet wird. Führen Sie den folgenden Befehl in einer neuen Zelle aus:import uuid # Generate random GUID runId = uuid.uuid4()
Wir verwenden die in Spark integrierte
uuid
-Bibliothek, um eine zufällige GUID zu generieren. Wir möchten die VariablerunId
mit einem Parameter überschreiben, der von der Pipeline übergeben wird. Dazu müssen wir diese Zelle als Parameterzelle umschalten.Wählen Sie oben rechts in der Zelle (1) auf die Auslassungspunkte (...) und dann auf Toggle parameter cell (2) (Parameterzelle umschalten).
Nachdem Sie diese Option umgeschaltet haben, ist das Tag Parameters an die Zelle angefügt.
Fügen Sie den folgenden Code in eine neue Zelle ein, um die Variable
runId
als Parquet-Dateinamen im Pfad/top5-products/
im primären Data Lake-Konto zu verwenden. Ersetzen SieYOUR_DATALAKE_NAME
im Pfad mit dem Namen Ihres primären Data Lake-Kontos. Diesen finden Sie oben auf der Seite in Zelle 1(1). Kopieren Sie das Data Lake-Speicherkonto aus dem Pfad (2). Fügen Sie diesen Wert als Ersatz fürYOUR_DATALAKE_NAME
in den PfadYOUR_DATALAKE_NAME
in die neue Zelle ein, und führen Sie den Befehl in der Zelle aus.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Überprüfen Sie, ob die Datei in den Data Lake geschrieben wurde. Navigieren Sie zum Hub Daten, und wählen Sie die Registerkarte Verknüpft(1) aus. Erweitern Sie das primäre Data-Lake-Speicherkonto, und wählen Sie den Container wwi-02(2) aus. Navigieren Sie zum Ordner top5-products(3). Im Verzeichnis sollte ein Ordner für die Parquet-Datei mit einer GUID als Dateiname (4) angezeigt werden.
Die Parquet-Schreibmethode für den Dataframe in der Notebookzelle hat dieses Verzeichnis erstellt, da es noch nicht vorhanden war.
Hinzufügen des Notebooks zu einer Synapse-Pipeline
Wir kommen noch einmal zurück auf den Zuordnungsdatenfluss vom Beginn der Übung: Nehmen wir an, dass Sie dieses Notebook ausführen möchten, nachdem der Datenfluss im Rahmen Ihres Orchestrierungsprozesses ausgeführt wurde. Dazu fügen Sie dieses Notebook einer Pipeline als neue Notebook-Aktivität hinzu.
Kehren Sie zum Notebook zurück. Klicken Sie oben rechts im Notebook auf Eigenschaften (1), und geben Sie dann
Calculate Top 5 Products
bei Name (2) ein.Klicken Sie oben rechts im Notebook auf Add to pipeline (Zu Pipeline hinzufügen) (1) und dann auf Existing pipeline (2) (Vorhandene Pipeline).
Wählen Sie die Pipeline Write User Profile Data to ASA (Benutzerprofildaten in ASA schreiben) (1) aus, und klicken Sie auf Hinzufügen *(2).
Synapse Studio fügt der Pipeline die Notebook-Aktivität hinzu. Ordnen Sie die Notebook-Aktivität neu an, sodass sie sich rechts neben der Datenflussaktivität befindet. Wählen Sie die Aktivität „Datenfluss“ aus, und ziehen Sie für die Pipelineverbindung ein grünes Feld der Aktivität Erfolg zur Aktivität „Notebook“.
Der Pfeil der Aktivität „Erfolg“ weist die Pipeline an, die Aktivität „Notebook“ auszuführen, nachdem die Aktivität „Datenfluss“ erfolgreich ausgeführt wurde.
Wählen Sie die Aktivität „Notebook“ (1) und dann die Registerkarte Einstellungen(2) aus, erweitern Sie die Basisparameter (3), und klicken Sie auf + Neu (4). Geben
runId
Sie in das FeldrunId
(5) ein. Wählen Sie für Typ (6) die Option Zeichenfolge aus. Wählen Sie für den Wert die Option Dynamischen Inhalt hinzufügen (7) aus.Wählen Sie unter Systemvariablen (1) die Option Pipelineausführungs-ID aus. Dadurch wird
@pipeline().RunId
dem dynamischen Inhaltsfeld@pipeline().RunId
hinzugefügt. Klicken Sie auf Fertig stellen (3), um das Dialogfeld zu schließen.Der Wert „Pipelineausführungs-ID“ ist eine eindeutige GUID, die jeder Pipelineausführung zugewiesen wird. Wir verwenden diesen Wert für den Namen der Parquet-Datei, indem wir diesen Wert als
runId
Notebook-Parameter übergeben. Wir können dann den Ausführungsverlauf der Pipeline durchsuchen, und die spezifische Parquet-Datei finden, die für die jeweilige Pipelineausführung erstellt wird.Wählen Sie Alle veröffentlichen und dann Veröffentlichen aus, um Ihre Änderungen zu speichern.
Wählen Sie nach Abschluss der Veröffentlichung Trigger hinzufügen (1) und dann Jetzt auslösen (2) aus, um die aktualisierte Pipeline auszuführen.
Wählen Sie OK aus, um den Trigger auszuführen.
Überwachen der Pipelineausführung
Mit dem Hub Überwachung können Sie aktuelle und vergangene Aktivitäten für SQL, Apache Spark und Pipelines überwachen.
Wechseln Sie zum Hub Überwachen.
Klicken Sie auf Pipelineausführungen (1), und warten Sie, bis die Pipeline erfolgreich ausgeführt wurde (2). Möglicherweise müssen Sie die Ansicht aktualisieren (3).
Wählen Sie den Namen der Pipeline aus, um die Aktivitätsausführungen der Pipeline anzuzeigen.
Beachten Sie sowohl die Aktivität Datenfluss als auch die neue Aktivität Notebook(1). Notieren Sie sich den Wert der Pipelineausführungs-ID(2). Diesen vergleichen wir mit dem Vom Notebook generierten Parquet-Dateinamen. Wählen Sie den Notebooknamen Calculate Top 5 Products (Die 5 wichtigsten Produkte berechnen) aus, um dessen Details anzuzeigen (3).
Hier sehen wir die Details zur Notebookausführung. Sie können auf Wiedergabe (1) klicken, um den Fortschritts für Aufträge (2) wiederzugeben. Unten können Sie die Diagnose und Protokolle mit unterschiedlichen Filteroptionen (3) anzeigen. Auf der rechten Seite können wir die Ausführungsdetails wie Dauer, Livy-ID, Details zum Spark-Pool usw. anzeigen. Klicken Sie für einen Auftrag auf den Link Details anzeigen, um die zugehörigen Details (5) anzuzeigen.
Die Benutzeroberfläche der Spark-Anwendung wird in einer neuen Registerkarte geöffnet, auf der die Phasendetails angezeigt werden. Erweitern Sie die DAG-Visualisierung, um die Phasendetails anzuzeigen.
Kehren Sie zum Hub Daten zurück.
Wählen Sie die Registerkarte Verknüpft(1) und dann den Container wwi-02(2) im primären Data-Lake-Speicherkonto aus. Navigieren Sie zum Ordner top5-products(3), und überprüfen Sie, ob ein Ordner für die Parquet-Datei vorhanden ist, deren Name der Pipelineausführungs-ID entspricht.
Wie Sie sehen können, haben wir eine Datei, deren Name der zuvor notierten Pipelineausführungs-ID entspricht:
Diese Werte stimmen überein, da wir die Pipelineausführungs-ID an den
runId
Parameter für die Notebook-Aktivität übergeben haben.