Freigeben über


Streamingdaten mit strukturiertem Spark-Streaming in Lakehouse einspeisen

Strukturiertes Streaming ist eine auf Spark basierende skalierbare und fehlertolerante Streamverarbeitungs-Engine. Spark sorgt für die inkrementelle und kontinuierliche Ausführung des Streamingvorgangs, während weiterhin Daten eingehen.

Strukturiertes Streaming ist seit Spark 2.2 verfügbar. Seitdem ist dies der empfohlene Ansatz für das Datenstreaming. Das grundlegende Prinzip hinter einem strukturierten Stream besteht darin, einen Livedatenstrom als Tabelle zu behandeln, in der neue Daten immer fortlaufend angefügt werden wie neue Zeilen in einer Tabelle. Es existieren einige definierte integrierte Streamingdateiquellen wie CSV, JSON, ORC, Parquet sowie eine integrierte Unterstützung für Messagingdienste wie Kafka und Event Hubs.

In diesem Artikel erhalten Sie Einblicke in die Optimierung der Verarbeitung und Erfassung von Ereignissen durch strukturiertes Spark-Streaming in Produktionsumgebungen mit hohem Durchsatz. Zu den empfohlenen Ansätzen zählen:

  • Optimierung des Datenstreamingdurchsatzes
  • Optimieren von Schreibvorgängen in der Deltatabelle und
  • Ereignisbatchverarbeitung.

Spark-Auftragsdefinitionen und Spark-Notebooks

Spark-Notebooks sind ein hervorragendes Tool, um Ideen zu überprüfen und Experimente durchzuführen, um Erkenntnisse aus Ihren Daten oder Ihrem Code zu erhalten. Notebooks werden auch häufig für Datenvorbereitung, Datenvisualisierung, maschinelles Lernen und andere Big Data-Szenarien verwendet. Spark-Auftragsdefinitionen sind nicht interaktive codeorientierte Aufgaben, die über einen langen Zeitraum in einem Spark-Cluster ausgeführt werden. Spark-Auftragsdefinitionen bieten hohe Stabilität und verlässliche Verfügbarkeit.

Spark-Notebooks sind eine hervorragende Quelle, um die Logik Ihres Codes zu testen und allen Geschäftsanforderungen gerecht zu werden. Um jedoch die Ausführung in einem Produktionsszenario aufrechtzuerhalten, sind Spark-Auftragsdefinitionen mit aktivierter Wiederholungsrichtlinie die beste Lösung.

Wiederholungsrichtlinien für die Spark-Auftragsdefinition

In Microsoft Fabric kann der Benutzer eine Wiederholungsrichtlinie für Spark-Auftragsdefinitionsaufträge festlegen. Obwohl das Skript im Auftrag unendlich sein kann, kann für die Infrastruktur, auf der das Skript ausgeführt wird, ein Problem auftreten, das das Beenden des Auftrags erfordert. Der Auftrag könnte auch aufgrund notwendiger Patches für die zugrunde liegenden Infrastruktur entfernt werden. Die Wiederholungsrichtlinie ermöglicht es dem Benutzer, Regeln für den automatischen Neustart des Auftrags festzulegen, wenn er aufgrund von Problemen mit der Infrastruktur beendet wird. Die Parameter geben an, wie oft der Auftrag neu gestartet werden soll (bis hin zu unendliche Wiederholungen). Außerdem legen sie den Zeitraum zwischen Wiederholungen fest. Auf diese Weise können die Benutzer sicherstellen, dass ihre Spark-Auftragsdefinitionsaufträge unendlich weiter ausgeführt werden, bis sie sich entscheiden, sie zu beenden.

Streamingquellen

Das Einrichten des Streamings mit Event Hubs erfordert eine grundlegende Konfiguration, die den Namen des Event Hubs-Namespace, den Hubnamen, den Namen des freigegebenen Zugriffsschlüssels und die Consumergruppe umfasst. Eine Consumergruppe ist eine Ansicht eines vollständigen Event Hubs. Mithilfe dieser Lösung erhalten mehrere verarbeitende Anwendungen jeweils eine separate Ansicht des Eventstreams und können den Datenstrom unabhängig, im individuellen Tempo und mit eigenen Offsets lesen.

Partitionen sind ein wesentlicher Bestandteil, um eine große Datenmenge verarbeiten zu können. Ein einzelner Prozessor verfügt über eine begrenzte Kapazität für die Behandlung von Ereignissen pro Sekunde, während mehrere Prozessoren bei paralleler Ausführung diese Aufgabe besser erledigen können. Partitionen ermöglichen die parallele Verarbeitung umfangreicher Ereignismengen.

Wenn zu viele Partitionen mit einer niedrigen Erfassungsrate verwendet werden, verarbeiten die Partitionsleser nur einen winzigen Teil dieser Daten, was zu einer suboptimalen Verarbeitung führt. Die ideale Anzahl von Partitionen hängt direkt von der gewünschten Verarbeitungsrate ab. Wenn Sie Ihre Ereignisverarbeitung skalieren wollen, sollten sie über zusätzliche Partitionen nachdenken. Es gibt keine bestimmte Durchsatzbegrenzung für eine Partition. Der aggregierte Durchsatz in Ihrem Namespace ist allerdings durch die Anzahl der Durchsatzeinheiten beschränkt. Wenn Sie die Anzahl der Durchsatzeinheiten in Ihrem Namespace erhöhen, wünschen Sie vielleicht zusätzliche Partitionen, um es gleichzeitigen Lesern zu ermöglichen, ihren maximalen Durchsatz zu erzielen.

Es wird empfohlen, die beste Anzahl von Partitionen für Ihr Durchsatzszenario zu untersuchen und zu testen. Es ist jedoch üblich, bei Szenarien mit hohem Durchsatz 32 oder mehr Partitionen einzusetzen.

Der Azure Event Hubs-Connector für Apache Spark (azure-event-hubs-spark) wird empfohlen, um die Spark-Anwendung mit Azure Event Hubs zu verbinden.

Lakehouse als Streamingsenke

Delta Lake ist eine Open-Source-Speicherebene, die zusätzlich zu Data Lake-Speicherlösungen ACID-Transaktionen (Atomicity, Consistency, Isolation and Durability, Atomarität, Konsistenz, Isolation und Dauerhaftigkeit) bereitstellt. Delta Lake unterstützt auch skalierbare Metadatenverarbeitung, Schemaentwicklung, Zeitreisen (Versionsverwaltung von Daten), offene Formate und andere Features.

In der Fabric Datentechnik wird Delta Lake für Folgendes verwendet:

  • Einfaches Ausführen von Upserts (Einfügen/Aktualisieren) und Löschen von Daten mithilfe von Spark SQL
  • Komprimieren von Daten, um den Zeitaufwand für das Abfragen von Daten zu minimieren
  • Anzeigen des Status von Tabellen vor und nach der Ausführung von Vorgängen
  • Abrufen eines Verlauf der in Tabellen ausgeführten Vorgänge.

Delta wird als eines der möglichen Formate für Ausgabesenken hinzugefügt, die in writeStream verwendet werden. Weitere Informationen zu den vorhandenen Ausgabesenken finden Sie im Spark-Programmierhandbuch für strukturiertes Streaming.

Im folgenden Beispiel wird veranschaulicht, wie Daten in Delta Lake gestreamt werden können.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 

df = spark \ 
  .readStream \ 
  .format("eventhubs") \ 
  .options(**ehConf) \ 
  .load()  

Schema = StructType([StructField("<column_name_01>", StringType(), False), 
                     StructField("<column_name_02>", StringType(), False), 
                     StructField("<column_name_03>", DoubleType(), True), 
                     StructField("<column_name_04>", LongType(), True), 
                     StructField("<column_name_05>", LongType(), True)]) 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .toTable("deltaeventstable") 

Hier einige Informationen zum Codeschnipsel in diesem Beispiel:

  • format() ist die Anweisung, die das Ausgabeformat der Daten definiert.
  • outputMode() definiert, auf welche Weise die neuen Zeilen in das Streaming geschrieben werden (also anfügen oder überschreiben).
  • toTable() speichert die gestreamten Daten in einer Delta-Tabelle, die mit dem als Parameter übergebenen Wert erstellt wurde.

Optimieren von Delta-Schreibvorgängen

Die Datenpartitionierung ist ein wichtiger Bestandteil beim Erstellen einer stabilen Streaminglösung: Die Partitionierung verbessert die Organisation der Daten und den Durchsatz. Nach Delta-Vorgängen kann es leicht zur Fragmentierung von Daten kommen, was zu einer Vielzahl kleiner Dateien führt. Auch zu große Dateien sind ein Problem, da das Schreiben auf den Datenträger lange dauert. Die Herausforderung bei der Datenpartitionierung besteht darin, das richtige Gleichgewicht zu finden, das zu optimalen Dateigrößen führt. Spark unterstützt die Partitionierung im Arbeitsspeicher und auf dem Datenträger. Optimal partitionierte Daten können die beste Leistung bieten, wenn Daten in Delta Lake beibehalten und daraus abgefragt werden.

  • Beim Partitionieren von Daten auf dem Datenträger können Sie mithilfe der Funktion partitionBy() auswählen, wie die Daten basierend auf Spalten partitioniert werden sollen. partitionBy() ist eine Funktion zum Partitionieren eines großen semantischen Modells in kleinere Dateien basierend auf einer oder mehreren Spalten, die beim Schreiben auf den Datenträger bereitgestellt werden. Die Partitionierung ist eine Möglichkeit, die Leistung von Abfragen bei der Arbeit mit einem großen semantischen Modell zu verbessern. Vermeiden Sie es, eine Spalte auszuwählen, die zu kleine oder zu große Partitionen generiert. Definieren Sie eine Partition basierend auf einer Reihe von Spalten mit einer guten Kardinalität, und teilen Sie die Daten in Dateien optimaler Größe auf.
  • Die Partitionierung von Daten im Arbeitsspeicher kann mithilfe von repartition()- oder coalesce()-Transformationen erfolgen. Dadurch werden Daten auf mehreren Workerknoten verteilt und mehrere Aufgaben erstellt, die Daten parallel lesen und verarbeiten können, indem die Grundlagen eines resilienten verteilten Datasets (Resilient Distributed Dataset, RDD) verwendet werden. Es ermöglicht die Aufteilung des semantischen Modells in logische Partitionen, die auf verschiedenen Knoten des Clusters berechnet werden können.
    • repartition() wird verwendet, um die Anzahl der Partitionen im Arbeitsspeicher zu erhöhen oder zu verringern. Die Neupartitionierung verteilt ganze Daten über das Netzwerk neu und gleicht sie über alle Partitionen hinweg aus.
    • coalesce() wird nur verwendet, um die Anzahl der Partitionen effizient zu verringern. Dies ist eine optimierte Version von repartition(), bei der die Verschiebung von Daten über alle Partitionen hinweg mithilfe von „coalesce()“ geringer ausfällt.

Die Kombination beider Partitionierungsansätze ist eine gute Lösung in Szenarien mit hohem Durchsatz. repartition() erstellt eine bestimmte Anzahl von Partitionen im Arbeitsspeicher, während partitionBy() Dateien für jede Speicherpartition und Partitionierungsspalte auf den Datenträger schreibt. Das folgende Beispiel veranschaulicht die Verwendung beider Partitionierungsstrategien im selben Spark-Auftrag: Die Daten werden zunächst in 48 Partitionen im Arbeitsspeicher aufgeteilt (angenommen, wir verfügen über insgesamt 48 CPU-Kerne) und dann auf dem Datenträger basierend auf zwei vorhandenen Spalten in der Nutzlast partitioniert.

import pyspark.sql.functions as f 
from pyspark.sql.types import * 
import json 

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Optimierter Schreibvorgang

Eine weitere Option zum Optimieren von Schreibvorgängen in Delta Lake besteht in der Verwendung von Optimized Write (optimierten Schreibvorgängen). Optimized Write ist ein optionales Feature, das die Art und Weise verbessert, wie Daten in die Delta-Tabelle geschrieben werden. Spark führt die Partitionen vor dem Schreiben der Daten zusammen oder teilt sie auf, wodurch der Durchsatz der auf den Datenträger geschriebenen Daten maximiert wird. Es kommt jedoch zu einem vollständigen Shuffle, was bei einigen Workloads zu einer Leistungsbeeinträchtigung führen kann. Aufträge, die coalesce() und/oder repartition() zum Partitionieren von Daten auf dem Datenträger verwenden, können so umgestaltet werden, dass sie stattdessen mit Optimized Write beginnen.

Der folgende Code ist ein Beispiel für die Verwendung von Optimized Write. Beachten Sie, dass partitionBy() weiterhin verwendet wird.

spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", true) 
 
rawData = df \ 
 .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .toTable("deltaeventstable") 

Batchverarbeitungsereignisse

Um die Anzahl der Vorgänge zu minimieren, was die erforderliche Zeit für die Erfassung von Daten in Delta Lake verkürzt, ist die Batchverarbeitung von Ereignissen eine praktische Alternative.

Trigger definieren, wie oft eine Streamingabfrage ausgeführt (ausgelöst) werden soll, um neue Daten auszugeben. Durch das Einrichten wird ein regelmäßiges Zeitintervall für die Verarbeitung von Microbatches definiert, wodurch Daten und Batchereignisse in wenigen persistenten Vorgängen erfasst werden, anstatt ständig auf den Datenträger geschrieben zu werden.

Das folgende Beispiel zeigt eine Streamingabfrage, bei der Ereignisse in regelmäßigen Abständen von einer Minute verarbeitet werden.

rawData = df \ 
  .withColumn("bodyAsString", f.col("body").cast("string")) \  
  .select(f.from_json("bodyAsString", Schema).alias("events")) \ 
  .select("events.*") \ 
  .repartition(48) \ 
  .writeStream \ 
  .format("delta") \ 
  .option("checkpointLocation", "Files/checkpoint") \ 
  .outputMode("append") \ 
  .partitionBy("<column_name_01>", "<column_name_02>") \ 
  .trigger(processingTime="1 minute") \ 
  .toTable("deltaeventstable") 

Der Vorteil der Kombination der Batchverarbeitung von Ereignissen in Delta-Tabellenschreibvorgängen besteht darin, dass dadurch größere Delta-Dateien mit mehr Daten erstellt und kleine Dateien vermieden werden. Sie sollten die Menge der erfassten Daten analysieren und die optimale Verarbeitungszeit finden, um die Größe der von der Delta-Bibliothek erstellten Parquet-Dateien zu optimieren.

Überwachung

Spark 3.1 und höhere Versionen verfügen über eine integrierte Benutzeroberfläche für strukturiertes Streaming, die die folgenden Streamingmetriken enthält:

  • Eingaberate
  • Verarbeitungsrate
  • Eingabezeilen
  • Batchdauer
  • Vorgangsdauer