Análisis de datos con Spark
Una de las ventajas de usar Spark es que puede escribir y ejecutar código en varios lenguajes de programación, lo que le permite usar las aptitudes de programación que ya tiene y el lenguaje más adecuado para una tarea determinada. El lenguaje predeterminado en un cuaderno nuevo de Spark de Azure Synapse Analytics es PySpark, una versión de Python optimizada para Spark, que suelen usar los científicos y analistas de datos debido a su sólida compatibilidad con la manipulación y visualización de datos. Además, puede usar lenguajes como Scala (un lenguaje derivado de Java que se puede utilizar de forma interactiva) y SQL (una variante del lenguaje SQL usado habitualmente incluido en la biblioteca Spark SQL para trabajar con estructuras de datos relacionales). Los ingenieros de software también pueden crear soluciones compiladas que se ejecutan en Spark mediante marcos como Java y Microsoft .NET.
Exploración de datos con elementos dataframe
De forma nativa, Spark usa una estructura de datos denominada conjunto de datos distribuido resistente (RDD); pero aunque puede escribir código que funcione directamente con RDD, la estructura de datos más usada para trabajar con datos estructurados en Spark es el elemento dataframe, que se proporciona como parte de la biblioteca Spark SQL. Los elementos dataframe de Spark son similares a los de la biblioteca Pandas de Python, pero están optimizados para funcionar en el entorno de procesamiento distribuido de Spark.
Nota
Además de Dataframe API, Spark SQL proporciona una API Dataset fuertemente tipada que se admite en Java y Scala. En este módulo se centrará en Dataframe API.
Carga de datos en un elemento dataframe
Ahora se explorará un ejemplo hipotético para ver cómo puede usar un elemento dataframe para trabajar con datos. Imagine que tiene los siguientes datos en un archivo de texto delimitado por comas denominado products.csv en la cuenta de almacenamiento principal para un área de trabajo de Azure Synapse Analytics:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
En un cuaderno de Spark, puede usar el siguiente código de PySpark para cargar los datos en un elemento dataframe y mostrar las primeras 10 filas:
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
La línea %%pyspark
al principio se denomina comando magic e indica a Spark que el lenguaje usado en esta celda es PySpark. Puede seleccionar el lenguaje que quiera usar como predeterminado en la barra de herramientas de la interfaz de Notebook y, después, usar un comando magic a fin de invalidar esa opción para una celda específica. Por ejemplo, este es el código de Scala equivalente para el ejemplo de datos de productos:
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
El comando magic %%spark
se usa para especificar Scala.
Los dos ejemplos de código generarían una salida similar a la siguiente:
ProductID | ProductName | Category | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Bicicletas de montaña | 3399.9900 |
772 | Mountain-100 Silver, 42 | Bicicletas de montaña | 3399.9900 |
773 | Mountain-100 Silver, 44 | Bicicletas de montaña | 3399.9900 |
... | ... | ... | ... |
Especificación de un esquema de elementos dataframe
En el ejemplo anterior, la primera fila del archivo CSV contenía los nombres de columna y Spark ha podido deducir el tipo de datos de cada columna a partir de los datos que contiene. También puede especificar un esquema explícito para los datos, lo que resulta útil cuando los nombres de columna no se incluyen en el archivo de datos, como en este archivo CSV de ejemplo:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
En el siguiente ejemplo de PySpark se muestra cómo especificar un esquema para el elemento dataframe que se va a cargar desde un archivo denominado product-data.csv en este formato:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
De nuevo, los resultados serán similares a los siguientes:
ProductID | ProductName | Category | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Bicicletas de montaña | 3399.9900 |
772 | Mountain-100 Silver, 42 | Bicicletas de montaña | 3399.9900 |
773 | Mountain-100 Silver, 44 | Bicicletas de montaña | 3399.9900 |
... | ... | ... | ... |
Filtrado y agrupación de elementos dataframe
Puede usar los métodos de la clase Dataframe para filtrar, ordenar, agrupar y manipular los datos que contiene. Por ejemplo, en el ejemplo de código siguiente se usa el método select para recuperar las columnas ProductName y ListPrice del elemento dataframe df que contiene datos de producto del ejemplo anterior:
pricelist_df = df.select("ProductID", "ListPrice")
Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Como sucede con la mayoría de los métodos de manipulación de datos, select devuelve un nuevo objeto dataframe.
Sugerencia
La selección de un subconjunto de columnas de un elemento dataframe es una operación común, que también se puede lograr mediante la siguiente sintaxis más corta:
pricelist_df = df["ProductID", "ListPrice"]
Puede "encadenar" métodos para realizar una serie de manipulaciones que generen como resultado un elemento dataframe transformado. Por ejemplo, en este código de ejemplo se encadenan los métodos select y where para crear un elemento dataframe que contenga las columnas ProductName y ListPrice para productos con una categoría de Mountain Bikes o Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Para agrupar y agregar datos, puede usar el método groupBy y las funciones de agregado. Por ejemplo, en el código de PySpark siguiente se cuenta el número de productos para cada categoría:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:
Category | count |
---|---|
Tubos de dirección | 3 |
Ruedas | 14 |
Bicicletas de montaña | 32 |
... | ... |
Uso de expresiones SQL en Spark
Dataframe API forma parte de una biblioteca de Spark denominada Spark SQL, que permite a los analistas de datos usar expresiones SQL para consultar y manipular datos.
Creación de objetos de base de datos en el catálogo de Spark
El catálogo de Spark es un metastore para objetos de datos relacionales, como vistas y tablas. El entorno de ejecución de Spark puede usar el catálogo para integrar sin problemas el código escrito en cualquier lenguaje compatible con Spark con expresiones SQL que pueden ser más naturales para algunos analistas de datos o desarrolladores.
Una de las formas más sencillas de hacer que los datos de un elemento dataframe estén disponibles para realizar consultas en el catálogo de Spark consiste en crear una vista temporal, como se muestra en el ejemplo de código siguiente:
df.createOrReplaceTempView("products")
Una vista es temporal, lo que significa que se elimina automáticamente al final de la sesión actual. También puede crear tablas que se conserven en el catálogo para definir una base de datos que se puede consultar mediante Spark SQL.
Nota
En este módulo no se explorarán las tablas de catálogo de Spark en profundidad, pero merece la pena dedicar tiempo a resaltar algunos puntos clave:
- Puede crear una tabla vacía mediante el método
spark.catalog.createTable
. Las tablas son estructuras de metadatos que almacenan sus datos subyacentes en la ubicación de almacenamiento asociada al catálogo. Al eliminar una tabla también se eliminan sus datos subyacentes. - Puede guardar un elemento dataframe como una tabla mediante su método
saveAsTable
. - Puede crear una tabla externa mediante el método
spark.catalog.createExternalTable
. Las tablas externas definen metadatos en el catálogo, pero obtienen sus datos subyacentes de una ubicación de almacenamiento externa; normalmente, una carpeta de un lago de datos. La eliminación de una tabla externa no elimina los datos subyacentes.
Uso de Spark SQL API para consultar datos
Puede usar Spark SQL API en código escrito en cualquier lenguaje para consultar datos en el catálogo. Por ejemplo, en el código de PySpark siguiente se usa una consulta de SQL para devolver datos de la vista products como un elemento dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Los resultados del ejemplo de código serían similares a la tabla siguiente:
ProductID | ProductName | ListPrice |
---|---|---|
38 | Mountain-100 Silver, 38 | 3399.9900 |
52 | Road-750 Black, 52 | 539.9900 |
... | ... | ... |
Uso de código SQL
En el ejemplo anterior se ha mostrado cómo usar Spark SQL API para insertar expresiones SQL en código de Spark. En un cuaderno, también puede usar el comando magic %%sql
para ejecutar código de SQL que consulte objetos en el catálogo, de la siguiente manera:
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
El ejemplo de código SQL devuelve un conjunto de resultados que se muestra automáticamente en el cuaderno como una tabla, como la siguiente:
Category | ProductCount |
---|---|
Pantalones de ciclismo | 3 |
Bastidores de bicicletas | 1 |
Soportes de bicicletas | 1 |
... | ... |