Utiliser des données dans un dataframe Spark
En mode natif, Spark utilise une structure de données appelée jeu de données distribué résilient (RDD, resilient distributed dataset). Toutefois, même si vous pouvez écrire du code qui fonctionne directement avec des jeux RDD, la structure de données la plus couramment utilisée pour utiliser des données structurées dans Spark est le dataframe, qui est fourni dans le cadre de la bibliothèque Spark SQL. Les dataframes dans Spark sont similaires à ceux de la bibliothèque Pandas Python omniprésente, mais sont optimisés pour fonctionner dans l’environnement de traitement distribué de Spark.
Notes
En plus de l’API Dataframe, Spark SQL fournit une API Dataset fortement typée qui est prise en charge dans Java et Scala. Dans ce module, nous allons nous concentrer sur l’API Dataframe.
Chargement des données dans un dataframe
Explorons un exemple hypothétique afin de voir comment utiliser un dataframe pour travailler avec des données. Supposez que vous disposez des données suivantes dans un fichier texte délimité par des virgules appelé products.csv dans le dossier Files/data de votre lakehouse :
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Inférence d’un schéma
Dans un notebook Spark, vous pouvez utiliser le code PySpark suivant pour charger les données du fichier dans un dataframe et afficher les 10 premières lignes :
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
La ligne %%pyspark
au début est appelée magic et indique à Spark que le langage utilisé dans cette cellule est PySpark. Vous pouvez sélectionner le langage que vous souhaitez utiliser comme valeur par défaut dans la barre d’outils de l’interface Notebook, puis utiliser une commande magic pour remplacer ce choix pour une cellule spécifique. Par exemple, voici le code Scala équivalent pour l’exemple de données des produits :
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
La commande magic %%spark
est utilisée pour spécifier Scala.
Ces deux exemples de code produisent une sortie comme suit :
ProductID | ProductName | Catégorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | VTT | 3399.9900 |
772 | Mountain-100 Silver, 42 | VTT | 3399.9900 |
773 | Mountain-100 Silver, 44 | VTT | 3399.9900 |
... | ... | ... | ... |
Spécification d’un schéma explicite
Dans l’exemple précédent, la première ligne du fichier CSV contenait les noms de colonne, et Spark pouvait déduire le type de données de chaque colonne en se basant sur les données qu’elle contenait. Vous pouvez également spécifier un schéma explicite pour les données, ce qui est utile lorsque les noms de colonne ne sont pas inclus dans le fichier de données, comme cet exemple CSV :
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
L’exemple PySpark suivant montre comment spécifier un schéma pour que le dataframe soit chargé à partir d’un fichier appelé product-data.csv dans ce format :
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Les résultats seraient une fois de plus similaires à :
ProductID | ProductName | Catégorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | VTT | 3399.9900 |
772 | Mountain-100 Silver, 42 | VTT | 3399.9900 |
773 | Mountain-100 Silver, 44 | VTT | 3399.9900 |
... | ... | ... | ... |
Conseil
La spécification d’un schéma explicite améliore également les performances !
Filtrage et regroupement des dataframes
Vous pouvez utiliser les méthodes de la classe Dataframe pour filtrer, trier, regrouper et manipuler les données qu’elle contient. Par exemple, l’exemple de code suivant utilise la méthode select pour récupérer les colonnes ProductID et ListPrice à partir du dataframe df contenant les données de produit de l’exemple précédent :
pricelist_df = df.select("ProductID", "ListPrice")
Les résultats de cet exemple de code devraient ressembler à ceci :
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Comme la plupart des méthodes de manipulation de données, select retourne un nouvel objet de dataframe.
Conseil
La sélection d’une partie des colonnes d’un dataframe est une opération courante, qui peut également être réalisée à l’aide de la syntaxe plus courte suivante :
pricelist_df = df["ProductID", "ListPrice"]
Vous pouvez « chaîner » les méthodes ensemble pour effectuer une série de manipulations qui entraînent un dataframe transformé. Par exemple, cet exemple de code chaîne les méthodes select et where pour créer un dataframe contenant les colonnes ProductName et ListPrice des produits avec la catégorie Vélos VTT ou Vélos de route :
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Les résultats de cet exemple de code devraient ressembler à ceci :
ProductName | Catégorie | ListPrice |
---|---|---|
Mountain-100 Silver, 38 | VTT | 3399.9900 |
Road-750 Noir, 52 | Vélos de route | 539.9900 |
... | ... | ... |
Pour regrouper et agréger des données, vous pouvez utiliser la méthode groupBy et les fonctions d’agrégation. Par exemple, le code PySpark suivant compte le nombre de produits de chaque catégorie :
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Les résultats de cet exemple de code devraient ressembler à ceci :
Catégorie | count |
---|---|
Oreillettes | 3 |
Roues | 14 |
VTT | 32 |
... | ... |
Enregistrement d’un dataframe
Vous souhaiterez souvent utiliser Spark pour transformer des données brutes et enregistrer les résultats en vue de procéder à une analyse plus approfondie ou un traitement en aval. L’exemple de code suivant enregistre le dataframe dans un fichier Parquet dans le lac de données, en remplaçant l’éventuel fichier qui porte le même nom.
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
Notes
Le format Parquet est généralement préféré pour les fichiers de données que vous allez utiliser pour une analyse plus approfondie ou une ingestion dans un magasin analytique. Parquet est un format très efficace qui est pris en charge par la plupart des systèmes d’analytique de données à grande échelle. Parfois, votre besoin de transformation de données peut en fait simplement consister à convertir des données d’un autre format (comme CSV) vers Parquet !
Partitionnement du fichier de sortie
Le partitionnement est une technique d’optimisation qui permet à Spark d’optimiser les performances sur les nœuds Worker. Des gains de performances supplémentaires peuvent être obtenus lors du filtrage des données dans les requêtes en éliminant les E/S disque non nécessaires.
Pour enregistrer un dataframe en tant que jeu de fichiers partitionné, utilisez la méthode partitionBy lors de l’écriture des données. L’exemple suivant enregistre le dataframe bikes_df (qui contient les données de produit pour les catégories mountain bikes et road bikes) et partitionne les données par catégorie :
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
Les noms de dossiers générés lors du partitionnement d’un dataframe incluent le nom et la valeur de colonne de partitionnement au format column=value, de sorte que l’exemple de code crée un dossier nommé bike_data qui contient les sous-dossiers suivants :
- Category=Mountain Bikes
- Category=Road Bikes
Chaque sous-dossier contient un ou plusieurs fichiers Parquet avec les données de produit pour la catégorie appropriée.
Notes
Vous pouvez partitionner les données selon plusieurs colonnes, ce qui aboutit à une hiérarchie de dossiers pour chaque clé de partitionnement. Par exemple, vous pourriez partitionner des données de commande par année et par mois, afin que la hiérarchie de dossiers inclue un dossier pour chaque valeur des années, qui à son tour contient un sous-dossier pour chaque valeur des mois.
Charger des données partitionnées
Lors de la lecture de données partitionnées dans un dataframe, vous pouvez charger des données à partir de n’importe quel dossier de la hiérarchie en spécifiant des valeurs explicites ou des caractères génériques pour les champs partitionnés. L’exemple suivant charge des données pour des produits de la catégorie Road Bikes :
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
Notes
Les colonnes de partitionnement spécifiées dans le chemin de fichier sont omises dans le dataframe résultant. Les résultats produits par l’exemple de requête ne vont pas inclure la colonne Category ; la catégorie de toutes les lignes sera Road Bikes.