Verwenden von Spark zum Arbeiten mit Datendateien

Abgeschlossen

Einer der Vorteile von Spark ist, dass Sie Code in verschiedenen Programmiersprachen schreiben und ausführen können. So können Sie Ihre bereits vorhandenen Skills nutzen und die am besten geeignete Sprache für eine bestimmte Aufgabe verwenden. Die Standardsprache im neuen Azure Databricks Spark Notebook ist PySpark - eine für Spark optimierte Version von Python, die aufgrund ihrer umfassenden Unterstützung für Datenbearbeitung und -visualisierung häufig von Datenspezialisten und Analysten verwendet wird. Zusätzlich können Sie Sprachen wie Scala (eine von Java abgeleitete Sprache, die interaktiv verwendet werden kann) und SQL (eine Variante der häufig verwendeten SQL-Sprache, die in der Spark SQL-Bibliothek enthalten ist, um mit relationalen Datenstrukturen zu arbeiten) verwenden. Softwareentwickler können auch kompilierte Lösungen erstellen, die unter Spark ausgeführt werden, indem sie Frameworks wie Java verwenden.

Untersuchen von Daten mit Dataframes

Spark verwendet nativ eine Datenstruktur, die als resilientes verteiltes Dataset (Resilient Distributed Dataset, RDD) bezeichnet wird. Sie können zwar Code schreiben, der direkt mit RDDs arbeitet, aber die am häufigsten verwendete Datenstruktur für die Arbeit mit strukturierten Daten in Spark ist der Dataframe, der als Teil der Spark SQL-Bibliothek bereitgestellt wird. Dataframes in Spark ähneln denen der allgegenwärtigen Python-Bibliothek Pandas, sind aber für die verteilte Verarbeitungsumgebung von Spark optimiert.

Hinweis

Zusätzlich zur Dataframe-API bietet Spark SQL eine stark typisierte Dataset-API, die in Java und Scala unterstützt wird. In diesem Modul werden wir uns auf die Dataframe-API konzentrieren.

Laden von Daten in einen Dataframe

Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Angenommen, Sie haben die folgenden Daten in einer durch Komma getrennten Textdatei namens products.csv im Datenordner in Ihrem Databricks File System (DBFS)-Speicher:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Daten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Die Zeile %pyspark am Anfang wird als Magic-Befehl bezeichnet und teilt Spark mit, dass die in dieser Zelle verwendete Sprache PySpark ist. Hier ist der entsprechende Scala-Code für das Beispiel mit den Produktdaten:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Der Magic-Befehl %spark wird verwendet, um Scala anzugeben.

Tipp

Sie können auch die Sprache auswählen, die Sie für jede Zelle in der Notebookschnittstelle verwenden möchten.

Beide zuvor gezeigten Beispiele würden die Ausgabe wie folgt erzeugen:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Angeben eines Dataframeschemas

Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Dataframe angeben, der aus einer Datei namens product-data.csv in diesem Format geladen werden soll:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Die Ergebnisse wären wieder ähnlich zu:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Filtern und Gruppieren von Dataframes

Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Das folgende Codebeispiel verwendet z. B. die select-Methode, um die Spalten ProductName und ListPrice aus dem df-Dataframe mit den Produktdaten des vorherigen Beispiels abzurufen:

pricelist_df = df.select("ProductID", "ListPrice")

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Wie die meisten Methoden zur Datenbearbeitung gibt select ein neues Dataframe-Objekt zurück.

Tipp

Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:

pricelist_df = df["ProductID", "ListPrice"]

Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden z. B. die Methoden select und where miteinander verknüpft, um einen neuen Dataframe zu erstellen, der die Spalten ProductName und ListPrice für Produkte mit der Kategorie Mountain Bikes oder Road Bikes enthält:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539,9900
... ...

Um Daten zu gruppieren und zu aggregieren, können Sie die groupBy-Methode und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

Category count
Lenkköpfe 3
Räder 14
Mountainbikes 32
... ...

Verwenden von SQL-Ausdrücken in Spark

Die Dataframe-API ist Teil einer Spark-Bibliothek namens Spark SQL, mit der Datenanalysten SQL-Ausdrücke verwenden können, um Daten abzufragen und zu bearbeiten.

Erstellen von Datenbankobjekten im Spark-Katalog

Der Spark-Katalog ist ein Metastore für relationale Datenobjekte wie Sichten und Tabellen. Die Spark-Runtime kann den Katalog verwenden, um Code, der in einer von Spark unterstützten Sprache geschrieben wurde, problemlos mit SQL-Ausdrücken zu integrieren, die für einige Datenanalysten oder Entwickler natürlicher sind.

Eine der einfachsten Möglichkeiten, Daten in einem Dataframe für die Abfrage im Spark-Katalog verfügbar zu machen, ist das Erstellen einer temporären Sicht, wie im folgenden Codebeispiel gezeigt:

df.createOrReplaceTempView("products")

Eine Sicht ist temporär, das heißt, sie wird am Ende der aktuellen Sitzung automatisch gelöscht. Sie können auch Tabellen erstellen, die im Katalog persistiert werden, um eine Datenbank zu definieren, die mithilfe von Spark SQL abgefragt werden kann.

Hinweis

Wir werden uns in diesem Modul nicht eingehend mit Spark-Katalogtabellen befassen, aber es lohnt sich, ein paar wichtige Punkte zu erwähnen:

  • Sie können eine leere Tabelle mithilfe der spark.catalog.createTable-Methode erstellen. Tabellen sind Metadatenstrukturen, die ihre zugrunde liegenden Daten an dem mit dem Katalog verbundenen Speicherort speichern. Wenn Sie eine Tabelle löschen, werden auch die zugrunde liegenden Daten gelöscht.
  • Sie können einen Dataframe als Tabelle speichern, indem Sie seine saveAsTable-Methode verwenden.
  • Sie können eine externe Tabelle mithilfe der spark.catalog.createExternalTable-Methode erstellen. Externe Tabellen definieren Metadaten im Katalog, beziehen ihre zugrunde liegenden Daten jedoch von einem externen Speicherort, in der Regel einem Ordner in einem Data Lake. Wenn Sie eine externe Tabelle löschen, werden die zugrunde liegenden Daten nicht gelöscht.

Verwenden der Spark SQL-API zur Abfrage von Daten

Sie können die Spark SQL-API in einem in einer beliebigen Sprache geschriebenen Code verwenden, um Daten im Katalog abzufragen. Der folgende PySpark-Code verwendet z. B. eine SQL-Abfrage, um Daten aus der Sicht Produkte als Dataframe zurückzugeben.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Die Ergebnisse des Codebeispiels würden ähnlich wie in der folgenden Tabelle aussehen:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539,9900
... ...

Verwenden von SQL-Code

Das vorherige Beispiel hat gezeigt, wie Sie die Spark SQL-API verwenden können, um SQL-Ausdrücke in Spark-Code einzubetten. In einem Notebook können Sie den Magic-Befehl %sql auch dazu verwenden, SQL-Code auszuführen, der Objekte wie folgt im Katalog abfragt:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Das SQL-Codebeispiel gibt ein Resultset zurück, das automatisch im Notebook als Tabelle angezeigt wird, wie die folgende:

Kategorie ProductCount
Trägershorts 3
Fahrradträger 1
Fahrradständer 1
... ...