Verwenden von Spark zum Arbeiten mit Datendateien
Einer der Vorteile von Spark ist, dass Sie Code in verschiedenen Programmiersprachen schreiben und ausführen können. So können Sie Ihre bereits vorhandenen Skills nutzen und die am besten geeignete Sprache für eine bestimmte Aufgabe verwenden. Die Standardsprache im neuen Azure Databricks Spark Notebook ist PySpark - eine für Spark optimierte Version von Python, die aufgrund ihrer umfassenden Unterstützung für Datenbearbeitung und -visualisierung häufig von Datenspezialisten und Analysten verwendet wird. Zusätzlich können Sie Sprachen wie Scala (eine von Java abgeleitete Sprache, die interaktiv verwendet werden kann) und SQL (eine Variante der häufig verwendeten SQL-Sprache, die in der Spark SQL-Bibliothek enthalten ist, um mit relationalen Datenstrukturen zu arbeiten) verwenden. Softwareentwickler können auch kompilierte Lösungen erstellen, die unter Spark ausgeführt werden, indem sie Frameworks wie Java verwenden.
Untersuchen von Daten mit Dataframes
Spark verwendet nativ eine Datenstruktur, die als resilientes verteiltes Dataset (Resilient Distributed Dataset, RDD) bezeichnet wird. Sie können zwar Code schreiben, der direkt mit RDDs arbeitet, aber die am häufigsten verwendete Datenstruktur für die Arbeit mit strukturierten Daten in Spark ist der Dataframe, der als Teil der Spark SQL-Bibliothek bereitgestellt wird. Dataframes in Spark ähneln denen der allgegenwärtigen Python-Bibliothek Pandas, sind aber für die verteilte Verarbeitungsumgebung von Spark optimiert.
Hinweis
Zusätzlich zur Dataframe-API bietet Spark SQL eine stark typisierte Dataset-API, die in Java und Scala unterstützt wird. In diesem Modul werden wir uns auf die Dataframe-API konzentrieren.
Laden von Daten in einen Dataframe
Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Angenommen, Sie haben die folgenden Daten in einer durch Komma getrennten Textdatei namens products.csv im Datenordner in Ihrem Databricks File System (DBFS)-Speicher:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Daten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Die Zeile %pyspark
am Anfang wird als Magic-Befehl bezeichnet und teilt Spark mit, dass die in dieser Zelle verwendete Sprache PySpark ist. Hier ist der entsprechende Scala-Code für das Beispiel mit den Produktdaten:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
Der Magic-Befehl %spark
wird verwendet, um Scala anzugeben.
Tipp
Sie können auch die Sprache auswählen, die Sie für jede Zelle in der Notebookschnittstelle verwenden möchten.
Beide zuvor gezeigten Beispiele würden die Ausgabe wie folgt erzeugen:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Angeben eines Dataframeschemas
Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Dataframe angeben, der aus einer Datei namens product-data.csv in diesem Format geladen werden soll:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Die Ergebnisse wären wieder ähnlich zu:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Filtern und Gruppieren von Dataframes
Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Das folgende Codebeispiel verwendet z. B. die select-Methode, um die Spalten ProductName und ListPrice aus dem df-Dataframe mit den Produktdaten des vorherigen Beispiels abzurufen:
pricelist_df = df.select("ProductID", "ListPrice")
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Wie die meisten Methoden zur Datenbearbeitung gibt select ein neues Dataframe-Objekt zurück.
Tipp
Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:
pricelist_df = df["ProductID", "ListPrice"]
Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden z. B. die Methoden select und where miteinander verknüpft, um einen neuen Dataframe zu erstellen, der die Spalten ProductName und ListPrice für Produkte mit der Kategorie Mountain Bikes oder Road Bikes enthält:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539,9900 |
... | ... |
Um Daten zu gruppieren und zu aggregieren, können Sie die groupBy-Methode und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
Category | count |
---|---|
Lenkköpfe | 3 |
Räder | 14 |
Mountainbikes | 32 |
... | ... |
Verwenden von SQL-Ausdrücken in Spark
Die Dataframe-API ist Teil einer Spark-Bibliothek namens Spark SQL, mit der Datenanalysten SQL-Ausdrücke verwenden können, um Daten abzufragen und zu bearbeiten.
Erstellen von Datenbankobjekten im Spark-Katalog
Der Spark-Katalog ist ein Metastore für relationale Datenobjekte wie Sichten und Tabellen. Die Spark-Runtime kann den Katalog verwenden, um Code, der in einer von Spark unterstützten Sprache geschrieben wurde, problemlos mit SQL-Ausdrücken zu integrieren, die für einige Datenanalysten oder Entwickler natürlicher sind.
Eine der einfachsten Möglichkeiten, Daten in einem Dataframe für die Abfrage im Spark-Katalog verfügbar zu machen, ist das Erstellen einer temporären Sicht, wie im folgenden Codebeispiel gezeigt:
df.createOrReplaceTempView("products")
Eine Sicht ist temporär, das heißt, sie wird am Ende der aktuellen Sitzung automatisch gelöscht. Sie können auch Tabellen erstellen, die im Katalog persistiert werden, um eine Datenbank zu definieren, die mithilfe von Spark SQL abgefragt werden kann.
Hinweis
Wir werden uns in diesem Modul nicht eingehend mit Spark-Katalogtabellen befassen, aber es lohnt sich, ein paar wichtige Punkte zu erwähnen:
- Sie können eine leere Tabelle mithilfe der
spark.catalog.createTable
-Methode erstellen. Tabellen sind Metadatenstrukturen, die ihre zugrunde liegenden Daten an dem mit dem Katalog verbundenen Speicherort speichern. Wenn Sie eine Tabelle löschen, werden auch die zugrunde liegenden Daten gelöscht. - Sie können einen Dataframe als Tabelle speichern, indem Sie seine
saveAsTable
-Methode verwenden. - Sie können eine externe Tabelle mithilfe der
spark.catalog.createExternalTable
-Methode erstellen. Externe Tabellen definieren Metadaten im Katalog, beziehen ihre zugrunde liegenden Daten jedoch von einem externen Speicherort, in der Regel einem Ordner in einem Data Lake. Wenn Sie eine externe Tabelle löschen, werden die zugrunde liegenden Daten nicht gelöscht.
Verwenden der Spark SQL-API zur Abfrage von Daten
Sie können die Spark SQL-API in einem in einer beliebigen Sprache geschriebenen Code verwenden, um Daten im Katalog abzufragen. Der folgende PySpark-Code verwendet z. B. eine SQL-Abfrage, um Daten aus der Sicht Produkte als Dataframe zurückzugeben.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Die Ergebnisse des Codebeispiels würden ähnlich wie in der folgenden Tabelle aussehen:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539,9900 |
... | ... |
Verwenden von SQL-Code
Das vorherige Beispiel hat gezeigt, wie Sie die Spark SQL-API verwenden können, um SQL-Ausdrücke in Spark-Code einzubetten. In einem Notebook können Sie den Magic-Befehl %sql
auch dazu verwenden, SQL-Code auszuführen, der Objekte wie folgt im Katalog abfragt:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Das SQL-Codebeispiel gibt ein Resultset zurück, das automatisch im Notebook als Tabelle angezeigt wird, wie die folgende:
Kategorie | ProductCount |
---|---|
Trägershorts | 3 |
Fahrradträger | 1 |
Fahrradständer | 1 |
... | ... |