Trabajar con datos en un objeto DataFrame de Spark

Completado

En la unidad anterior, ha aprendido a conectarse a un origen de datos, cargar datos en un dataframe y, opcionalmente, a guardar este en un almacén de lago de datos como un archivo o tabla. Ahora vamos a explorar el dataframe con un poco más de detalle.

De forma nativa, Spark usa una estructura de datos denominada conjunto de datos distribuido resistente (RDD); pero, aunque puede escribir código que funcione directamente con RDD, la estructura de datos más usada para trabajar con datos estructurados en Spark es el objeto dataframe, que se proporciona como parte de la biblioteca Spark SQL. Los elementos dataframe de Spark son similares a los de la biblioteca Pandas de Python, pero están optimizados para funcionar en el entorno de procesamiento distribuido de Spark.

Nota

Además de Dataframe API, Spark SQL proporciona una API Dataset fuertemente tipada que se admite en Java y Scala. En este módulo se centrará en Dataframe API.

Carga de datos en un elemento dataframe

Ahora se explorará un ejemplo hipotético para ver cómo puede usar un elemento dataframe para trabajar con datos. Supongamos que tiene los siguientes datos en un archivo de texto delimitado por comas denominado products.csv en la carpeta Files/data del almacén de lago:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferencia de un esquema

En un cuaderno de Spark, puede usar el código PySpark siguiente para cargar los datos del archivo en un objeto DataFrame y mostrar las primeras 10 filas:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Como ya aprendió anteriormente, la línea %%pyspark al principio se denomina comando magic e indica a Spark que el lenguaje usado en esta celda es PySpark. En la mayoría de los casos, PySpark es el idioma predeterminado y, generalmente, será el que utilicemos en los ejemplos de este módulo. No obstante, por cuestiones de integridad, este es el código de Scala equivalente para el ejemplo de datos de productos:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

El %%spark mágico se usa para especificar Spark dentro de una celda de código Scala. Observe que la implementación de Scala del dataframe se comporta de forma similar a la versión de PySpark.

Los dos ejemplos de código generarían una salida similar a la siguiente:

ProductID ProductName Category ListPrice
771 Mountain-100 Silver, 38 Bicicletas de montaña 3399.9900
772 Mountain-100 Silver, 42 Bicicletas de montaña 3399.9900
773 Mountain-100 Silver, 44 Bicicletas de montaña 3399.9900
... ... ... ...

Especificación de un esquema explícito

En el ejemplo anterior, la primera fila del archivo CSV contenía los nombres de columna y Spark ha podido deducir el tipo de datos de cada columna a partir de los datos que contiene. También puede especificar un esquema explícito para los datos, lo que resulta útil cuando los nombres de columna no se incluyen en el archivo de datos, como en este archivo CSV de ejemplo:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

En el siguiente ejemplo de PySpark se muestra cómo especificar un esquema para el elemento dataframe que se va a cargar desde un archivo denominado product-data.csv en este formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

De nuevo, los resultados serán similares a los siguientes:

ProductID ProductName Category ListPrice
771 Mountain-100 Silver, 38 Bicicletas de montaña 3399.9900
772 Mountain-100 Silver, 42 Bicicletas de montaña 3399.9900
773 Mountain-100 Silver, 44 Bicicletas de montaña 3399.9900
... ... ... ...

Sugerencia

La especificación de un esquema explícito también mejora el rendimiento.

Filtrado y agrupación de elementos dataframe

Puede usar los métodos de la clase Dataframe para filtrar, ordenar, agrupar y manipular los datos que contiene. Por ejemplo, en el ejemplo de código siguiente se usa el método select para recuperar las columnas ProductID y ListPrice del elemento dataframe df que contiene datos de producto del ejemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Como sucede con la mayoría de los métodos de manipulación de datos, select devuelve un nuevo objeto dataframe.

Sugerencia

La selección de un subconjunto de columnas de un elemento dataframe es una operación común, que también se puede lograr mediante la siguiente sintaxis más corta:

pricelist_df = df["ProductID", "ListPrice"]

Puede "encadenar" métodos para realizar una serie de manipulaciones que generen como resultado un elemento dataframe transformado. Por ejemplo, en este código de ejemplo se encadenan los métodos select y where para crear un elemento dataframe que contenga las columnas ProductName y ListPrice para productos con una categoría de Mountain Bikes o Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:

ProductName Category ListPrice
Mountain-100 Silver, 38 Bicicletas de montaña 3399.9900
Road-750 Black, 52 Bicicletas de carretera 539.9900
... ... ...

Para agrupar y agregar datos, puede usar el método groupBy y las funciones de agregado. Por ejemplo, en el código de PySpark siguiente se cuenta el número de productos para cada categoría:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:

Category count
Tubos de dirección 3
Ruedas 14
Bicicletas de montaña 32
... ...