Utiliser des données dans un dataframe Spark

Effectué

En mode natif, Spark utilise une structure de données appelée jeu de données distribué résilient (RDD, resilient distributed dataset). Toutefois, même si vous pouvez écrire du code qui fonctionne directement avec des jeux RDD, la structure de données la plus couramment utilisée pour utiliser des données structurées dans Spark est le dataframe, qui est fourni dans le cadre de la bibliothèque Spark SQL. Les dataframes dans Spark sont similaires à ceux de la bibliothèque Pandas Python omniprésente, mais sont optimisés pour fonctionner dans l’environnement de traitement distribué de Spark.

Notes

En plus de l’API Dataframe, Spark SQL fournit une API Dataset fortement typée qui est prise en charge dans Java et Scala. Dans ce module, nous allons nous concentrer sur l’API Dataframe.

Chargement des données dans un dataframe

Explorons un exemple hypothétique afin de voir comment utiliser un dataframe pour travailler avec des données. Supposez que vous disposez des données suivantes dans un fichier texte délimité par des virgules appelé products.csv dans le dossier Files/data de votre lakehouse :

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inférence d’un schéma

Dans un notebook Spark, vous pouvez utiliser le code PySpark suivant pour charger les données du fichier dans un dataframe et afficher les 10 premières lignes :

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

La ligne %%pyspark au début est appelée magic et indique à Spark que le langage utilisé dans cette cellule est PySpark. Vous pouvez sélectionner le langage que vous souhaitez utiliser comme valeur par défaut dans la barre d’outils de l’interface Notebook, puis utiliser une commande magic pour remplacer ce choix pour une cellule spécifique. Par exemple, voici le code Scala équivalent pour l’exemple de données des produits :

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

La commande magic %%spark est utilisée pour spécifier Scala.

Ces deux exemples de code produisent une sortie comme suit :

ProductID ProductName Catégorie ListPrice
771 Mountain-100 Silver, 38 VTT 3399.9900
772 Mountain-100 Silver, 42 VTT 3399.9900
773 Mountain-100 Silver, 44 VTT 3399.9900
... ... ... ...

Spécification d’un schéma explicite

Dans l’exemple précédent, la première ligne du fichier CSV contenait les noms de colonne, et Spark pouvait déduire le type de données de chaque colonne en se basant sur les données qu’elle contenait. Vous pouvez également spécifier un schéma explicite pour les données, ce qui est utile lorsque les noms de colonne ne sont pas inclus dans le fichier de données, comme cet exemple CSV :

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

L’exemple PySpark suivant montre comment spécifier un schéma pour que le dataframe soit chargé à partir d’un fichier appelé product-data.csv dans ce format :

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Les résultats seraient une fois de plus similaires à :

ProductID ProductName Catégorie ListPrice
771 Mountain-100 Silver, 38 VTT 3399.9900
772 Mountain-100 Silver, 42 VTT 3399.9900
773 Mountain-100 Silver, 44 VTT 3399.9900
... ... ... ...

Conseil

La spécification d’un schéma explicite améliore également les performances !

Filtrage et regroupement des dataframes

Vous pouvez utiliser les méthodes de la classe Dataframe pour filtrer, trier, regrouper et manipuler les données qu’elle contient. Par exemple, l’exemple de code suivant utilise la méthode select pour récupérer les colonnes ProductID et ListPrice à partir du dataframe df contenant les données de produit de l’exemple précédent :

pricelist_df = df.select("ProductID", "ListPrice")

Les résultats de cet exemple de code devraient ressembler à ceci :

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Comme la plupart des méthodes de manipulation de données, select retourne un nouvel objet de dataframe.

Conseil

La sélection d’une partie des colonnes d’un dataframe est une opération courante, qui peut également être réalisée à l’aide de la syntaxe plus courte suivante :

pricelist_df = df["ProductID", "ListPrice"]

Vous pouvez « chaîner » les méthodes ensemble pour effectuer une série de manipulations qui entraînent un dataframe transformé. Par exemple, cet exemple de code chaîne les méthodes select et where pour créer un dataframe contenant les colonnes ProductName et ListPrice des produits avec la catégorie Vélos VTT ou Vélos de route :

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Les résultats de cet exemple de code devraient ressembler à ceci :

ProductName Catégorie ListPrice
Mountain-100 Silver, 38 VTT 3399.9900
Road-750 Noir, 52 Vélos de route 539.9900
... ... ...

Pour regrouper et agréger des données, vous pouvez utiliser la méthode groupBy et les fonctions d’agrégation. Par exemple, le code PySpark suivant compte le nombre de produits de chaque catégorie :

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Les résultats de cet exemple de code devraient ressembler à ceci :

Catégorie count
Oreillettes 3
Roues 14
VTT 32
... ...

Enregistrement d’un dataframe

Vous souhaiterez souvent utiliser Spark pour transformer des données brutes et enregistrer les résultats en vue de procéder à une analyse plus approfondie ou un traitement en aval. L’exemple de code suivant enregistre le dataframe dans un fichier Parquet dans le lac de données, en remplaçant l’éventuel fichier qui porte le même nom.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Notes

Le format Parquet est généralement préféré pour les fichiers de données que vous allez utiliser pour une analyse plus approfondie ou une ingestion dans un magasin analytique. Parquet est un format très efficace qui est pris en charge par la plupart des systèmes d’analytique de données à grande échelle. Parfois, votre besoin de transformation de données peut en fait simplement consister à convertir des données d’un autre format (comme CSV) vers Parquet !

Partitionnement du fichier de sortie

Le partitionnement est une technique d’optimisation qui permet à Spark d’optimiser les performances sur les nœuds Worker. Des gains de performances supplémentaires peuvent être obtenus lors du filtrage des données dans les requêtes en éliminant les E/S disque non nécessaires.

Pour enregistrer un dataframe en tant que jeu de fichiers partitionné, utilisez la méthode partitionBy lors de l’écriture des données. L’exemple suivant enregistre le dataframe bikes_df (qui contient les données de produit pour les catégories mountain bikes et road bikes) et partitionne les données par catégorie :

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Les noms de dossiers générés lors du partitionnement d’un dataframe incluent le nom et la valeur de colonne de partitionnement au format column=value, de sorte que l’exemple de code crée un dossier nommé bike_data qui contient les sous-dossiers suivants :

  • Category=Mountain Bikes
  • Category=Road Bikes

Chaque sous-dossier contient un ou plusieurs fichiers Parquet avec les données de produit pour la catégorie appropriée.

Notes

Vous pouvez partitionner les données selon plusieurs colonnes, ce qui aboutit à une hiérarchie de dossiers pour chaque clé de partitionnement. Par exemple, vous pourriez partitionner des données de commande par année et par mois, afin que la hiérarchie de dossiers inclue un dossier pour chaque valeur des années, qui à son tour contient un sous-dossier pour chaque valeur des mois.

Charger des données partitionnées

Lors de la lecture de données partitionnées dans un dataframe, vous pouvez charger des données à partir de n’importe quel dossier de la hiérarchie en spécifiant des valeurs explicites ou des caractères génériques pour les champs partitionnés. L’exemple suivant charge des données pour des produits de la catégorie Road Bikes :

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Notes

Les colonnes de partitionnement spécifiées dans le chemin de fichier sont omises dans le dataframe résultant. Les résultats produits par l’exemple de requête ne vont pas inclure la colonne Category ; la catégorie de toutes les lignes sera Road Bikes.