Compartir a través de


Conceptos básicos de PySpark

En este artículo se describen ejemplos sencillos para ilustrar el uso de PySpark. Se supone que comprende los conceptos de Apache Spark fundamentales y ejecuta comandos en un cuaderno de Azure Databricks conectado al proceso. Puede crear DataFrames mediante datos de ejemplo, realizar transformaciones básicas, incluidas las operaciones de fila y columna en estos datos, combinar varios DataFrames y agregar estos datos, visualizar estos datos y, a continuación, guardarlos en una tabla o archivo.

Carga de datos

Algunos ejemplos de este artículo usan datos de ejemplo proporcionados por Databricks para demostrar el uso de DataFrames para cargar, transformar y guardar datos. Si desea usar sus propios datos que aún no están en Databricks, puede cargarlos primero y crear un DataFrame a partir de él. Consulte Crear o modificar una tabla mediante la carga de archivos y Carga de archivos en un volumen de catálogo de Unity.

Acerca de los datos de ejemplo de Databricks

Databricks proporciona datos de ejemplo en el catálogo samples y en el directorio /databricks-datasets.

  • Para acceder a los datos de ejemplo del catálogo samples, use el formato samples.<schema-name>.<table-name>. En este artículo se usan tablas del esquema samples.tpch, que contiene datos de una empresa ficticia. La tabla customer contiene información sobre los clientes y orders contiene información sobre los pedidos realizados por esos clientes.
  • Use dbutils.fs.ls para explorar los datos en /databricks-datasets. Use Spark SQL o DataFrames para consultar datos en esta ubicación mediante rutas de acceso de archivo. Para obtener más información sobre los datos de ejemplo proporcionados por Databricks, consulte Conjuntos de datos de ejemplo.

Importar tipos de datos

Muchas operaciones de PySpark requieren que use funciones SQL o interactúe con tipos nativos de Spark. Solo puede importar directamente esas funciones y tipos que necesite, o bien puede importar todo el módulo.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Dado que algunas funciones importadas podrían invalidar las funciones integradas de Python, algunos usuarios eligen importar estos módulos mediante un alias. En los ejemplos siguientes se muestra un alias común que se usa en ejemplos de código de Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Para obtener una lista completa de los tipos de datos, consulte Tipos de datos de Spark.

Para obtener una lista completa de las funciones SQL de PySpark, consulte Funciones de Spark.

Crear un DataFrame

Hay varias maneras de crear un DataFrame. Normalmente, se define un DataFrame en un origen de datos, como una tabla o una colección de archivos. A continuación, como se describe en la sección Conceptos básicos de Apache Spark, use una acción, como display, para desencadenar las transformaciones que se van a ejecutar. El método display genera DataFrames.

Crear un DataFrame con valores especificados

Para crear un DataFrame con valores especificados, use el método createDataFrame, donde las filas se expresan como una lista de tuplas:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Observe en la salida que los tipos de datos de columnas de df_children se deducen automáticamente. También puede especificar los tipos agregando un esquema. Los esquemas se definen mediante el StructType que se compone de StructFields que especifican el nombre, el tipo de datos y una marca booleana que indica si contienen un valor null o no. Debe importar tipos de datos de pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Crear un DataFrame a partir de una tabla en Unity Catalog

Para crear un DataFrame a partir de una tabla en Unity Catalog, use el método table que identifica la tabla con el formato <catalog-name>.<schema-name>.<table-name>. Haga clic en Catálogo en la barra de navegación izquierda para usar el Explorador de catálogos para ir a la tabla. Haga clic en ella y seleccione Copiar ruta de acceso de tabla para insertar la ruta de acceso de la tabla en el cuaderno.

En el ejemplo siguiente se carga la tabla samples.tpch.customer, pero también puede proporcionar la ruta de acceso a su propia tabla.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Crear un DataFrame a partir de un archivo cargado

Para crear un DataFrame a partir de un archivo que cargó en volúmenes de Unity Catalog, use la propiedad read. Este método devuelve un DataFrameReader, que puede usar para leer el formato adecuado. Haga clic en la opción de catálogo de la barra lateral pequeña de la izquierda y use el explorador de catálogos para localizar el archivo. Selecciónelo y, a continuación, haga clic en Copiar ruta de acceso del archivo de volumen.

El ejemplo siguiente lee de un archivo *.csv, pero DataFrameReader admite la carga de archivos en muchos otros formatos. Consulte Métodos de DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Para obtener más información sobre los volúmenes de Unity Catalog, vea ¿Qué son los volúmenes del Unity Catalog?.

Crear un DataFrame a partir de una respuesta JSON

Para crear un DataFrame a partir de una carga de respuesta JSON devuelta por una API de REST, use el paquete de Python requests para consultar y analizar la respuesta. Debe importar el paquete para usarlo. En este ejemplo se usan datos de la base de datos de la aplicación de medicamentos de la Food and Drug Administration de los Estados Unidos.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Para obtener información sobre cómo trabajar con JSON y otros datos semiestructurados en Databricks, consulte Modelado de datos semiestructurados.

Seleccionar un campo o objeto JSON

Para seleccionar un campo u objeto específico del JSON convertido, use la notación []. Por ejemplo, para seleccionar el campo products que es una matriz de productos:

display(df_drugs.select(df_drugs["products"]))

También puede encadenar llamadas de método para recorrer varios campos. Por ejemplo, para generar el nombre de la organización del primer producto en una aplicación de medicamento:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Crear un DataFrame a partir de un archivo

Para demostrar cómo crear un DataFrame a partir de un archivo, en este ejemplo se cargan datos CSV en el directorio /databricks-datasets.

Para navegar a los conjuntos de datos de ejemplo, puede usar los comandos del sistema de archivos de Databricks Utilties. En el ejemplo siguiente se usa dbutils para enumerar los conjuntos de datos disponibles en /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Como alternativa, puede usar %fs para acceder a los comandos del sistema de archivos de la CLI de Databricks, como se muestra en el ejemplo siguiente:

%fs ls '/databricks-datasets'

Para crear un DataFrame a partir de un archivo o directorio de archivos, especifique la ruta de acceso en el método load:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Transformar datos con DataFrames

Los DataFrames facilitan la transformación de datos mediante métodos integrados para ordenar, filtrar y agregar datos. Muchas transformaciones no se especifican como métodos en DataFrames, sino que se proporcionan en el paquete spark.sql.functions. Consulte Funciones de SQL de Databricks Spark.

Operaciones de columna

Spark proporciona muchas operaciones básicas de columna:

Sugerencia

Para generar todas las columnas de un DataFrame, use columns, por ejemplo df_customer.columns.

Seleccionar columnas

Puede seleccionar columnas específicas mediante select y col. La función col está en el submódulo pyspark.sql.functions.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

También puede hacer referencia a una columna mediante expr, que toma una expresión definida como cadena:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

También puede usar selectExpr, que acepta expresiones SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Para seleccionar columnas mediante un literal de cadena, haga lo siguiente:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Para seleccionar explícitamente una columna de un DataFrame específico, puede usar el operador [] o el operador .. (El operador . no se puede usar para seleccionar columnas que empiecen por un entero, o que contengan un espacio o un carácter especial). Esto puede resultar especialmente útil cuando se combinan DataFrames en los que algunas columnas tienen el mismo nombre.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Crear columnas

Para crear una columna, use el método withColumn. En el ejemplo siguiente se crea una nueva columna que contiene un valor booleano en función de si el saldo de la cuenta de cliente c_acctbal supera 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Cambiar el nombre de las columnas

Para cambiar el nombre de una columna, use el método withColumnRenamed, que acepta los nombres de columna existentes y nuevos:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

El método alias resulta especialmente útil cuando desea cambiar el nombre de las columnas como parte de las agregaciones:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Tipos de columna de conversión

En algunos casos, puede que desee cambiar el tipo de datos de una o varias de las columnas del DataFrame. Para ello, use el método cast para convertir entre tipos de datos de columna. En el ejemplo siguiente se muestra cómo convertir una columna de un entero a un tipo de cadena mediante el método col para hacer referencia a una columna:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Quitar columnas

Para quitar columnas, puede omitir columnas durante una selección o select(*) except o puede usar el método drop:

df_customer_flag_renamed.drop("balance_flag_renamed")

También puede quitar varias columnas a la vez:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Operaciones de fila

Spark proporciona muchas operaciones básicas de fila:

Filtrar filas

Para filtrar filas, use el método filter o where en un DataFrame para devolver solo determinadas filas. Para identificar una columna en la que se va a filtrar, use el método col o una expresión que se evalúe como una columna.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Para filtrar por varias condiciones, use operadores lógicos. Por ejemplo, & y | le permiten usar condiciones AND y OR, respectivamente. En el ejemplo siguiente se filtran las filas en las que c_nationkey es igual a 20 y c_acctbal es mayor que 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Quitar filas duplicadas

Para desduplicar filas, use distinct, que devuelve solo las filas únicas.

df_unique = df_customer.distinct()

Controlar valores null

Para controlar valores null, quite filas que contengan valores null mediante el método na.drop. Este método le permite especificar si desea quitar filas que contengan valores null any o valores null all.

Para quitar los valores null, use cualquiera de los ejemplos siguientes.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Si solo desea filtrar las filas que contienen todos los valores null, use lo siguiente:

df_customer_no_nulls = df_customer.na.drop("all")

Puede aplicar esto para un subconjunto de columnas especificando esto, como se muestra a continuación:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Para rellenar los valores que faltan, use el método fill. Puede optar por aplicar esto a todas las columnas o a un subconjunto de columnas. En el ejemplo siguiente, los saldos de la cuenta que tienen un valor null para su saldo de cuenta c_acctbal se rellenan con 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Para reemplazar cadenas por otros valores, use el método replace. En el ejemplo siguiente, las cadenas de dirección vacías se reemplazan por la palabra UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Anexión de filas

Para anexar filas, debe usar el método union para crear un nuevo DataFrame. En el ejemplo siguiente, el DataFrame df_that_one_customer creado anteriormente y df_filtered_customer se combinan, lo que devuelve un DataFrame con tres clientes:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Nota:

También puede combinar DataFrames escribiéndolos en una tabla y anexando nuevas filas. En el caso de las cargas de trabajo de producción, el procesamiento incremental de orígenes de datos en una tabla de destino puede reducir drásticamente la latencia y los costes de proceso a medida que los datos crecen en tamaño. Consulte Ingesta de datos en una instancia de Databricks Lakehouse.

Ordenar filas

Importante

La ordenación puede ser costosa a gran escala y, si almacena datos ordenados y vuelve a cargar los datos con Spark, no se garantiza el orden. Asegúrate de usar la ordenación intencionadamente.

Para ordenar las filas por una o varias columnas, use el método sort o orderBy. De manera predeterminada, estos métodos se ordenan de la A a la Z:

df_customer.orderBy(col("c_acctbal"))

Para filtrar en orden de la Z a la A, use desc:

df_customer.sort(col("c_custkey").desc())

En el ejemplo siguiente se muestra cómo ordenar en dos columnas:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Para limitar el número de filas que se van a devolver una vez ordenado el DataFrame, use el método limit. En el ejemplo siguiente solo se muestran los 10 resultados principales:

display(df_sorted.limit(10))

Combinar DataFrames

Para combinar dos o más DataFrames, use el método join. Puede especificar cómo desea que los DataFrames se unan en los parámetros how (tipo de combinación) y on (en qué columnas basar la combinación). Entre los tipos de combinación comunes se incluyen:

  • inner: este es el valor predeterminado del tipo de combinación, que devuelve un DataFrame que mantiene solo las filas donde hay una coincidencia para el parámetro on en los DataFrames.
  • left: mantiene todas las filas del primer DataFrame especificado y solo las filas del segundo DataFrame especificado que tienen una coincidencia con el primero.
  • outer: una combinación externa mantiene todas las filas de ambos DataFrames independientemente de la coincidencia.

Para obtener información detallada sobre las combinaciones, consulte Trabajar con combinaciones en Azure Databricks. Para obtener una lista de las combinaciones admitidas en PySpark, consulte Combinaciones de DataFrame.

En el ejemplo siguiente se devuelve un solo DataFrame donde cada fila del DataFrame orders se combina con la fila correspondiente del DataFrame customers. Se usa una combinación interna, ya que se espera que cada pedido corresponda exactamente a un cliente.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Para combinar en varias condiciones, use operadores booleanos como & y | para especificar AND y OR, respectivamente. El ejemplo siguiente agrega una condición adicional, filtrando solo las filas que tienen o_totalprice mayor que 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Adición de datos

Para agregar datos en un DataFrame, similar a GROUP BY en SQL, use el método groupBy para especificar columnas por las que agrupar y el método agg para especificar agregaciones. Importe agregaciones comunes, como avg, sum, max y min desde pyspark.sql.functions. En el ejemplo siguiente se muestra el saldo medio de los clientes por segmento de mercado:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Algunas agregaciones son acciones, lo que significa que desencadenan cálculos. En este caso, no es necesario usar otras acciones para generar resultados.

Para contar filas en un DataFrame, use el método count:

df_customer.count()

Encadenar llamadas

Los métodos que transforman DataFrames devuelven DataFrames y Spark no actúa en las transformaciones hasta que se llama a las acciones. Esta evaluación diferida significa que puede encadenar varios métodos para mayor comodidad y legibilidad. En el ejemplo siguiente se muestra cómo encadenar el filtrado, la agregación y el orden:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualizar el DataFrame

Para visualizar un DataFrame en un cuaderno, haga clic en el signo + situado junto a la tabla situada en la parte superior izquierda del DataFrame y, a continuación, seleccione Visualización para agregar uno o varios gráficos basados en el DataFrame. Para obtener más información sobre las visualizaciones, consulte Visualizaciones en cuadernos de Databricks.

display(df_order)

Para realizar visualizaciones adicionales, Databricks recomienda usar API de Pandas para Spark. .pandas_api() permite convertir a la API de Pandas correspondiente para un DataFrame de Spark. Para obtener más información, consulte API de Pandas en Spark.

Guardar sus datos

Una vez que haya transformado los datos, puede guardarlos mediante los métodos DataFrameWriter. Puede encontrar una lista completa de estos métodos en DataFrameWriter. Las siguientes secciones muestran cómo guardar su DataFrame como una tabla y como una colección de archivos de datos.

Guardar el DataFrame como una tabla

Para guardar el DataFrame como una tabla en Unity Catalog, use el método write.saveAsTable y especifique la ruta de acceso con el formato <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Escribir el DataFrame como CSV

Para escribir el DataFrame en el formato *.csv, use el método write.csv, especificando el formato y las opciones. De forma predeterminada, si los datos existen en la ruta de acceso especificada, se produce un error en la operación de escritura. Puede especificar uno de los siguientes modos para realizar una acción diferente:

  • overwrite sobrescribe todos los datos existentes en la ruta de acceso de destino con el contenido de DataFrame.
  • append anexa el contenido del DataFrame a los datos de la ruta de acceso de destino.
  • ignore produce un error silencioso en la escritura si existen datos en la ruta de acceso de destino.

En el ejemplo siguiente se muestra la sobrescritura de datos con contenido de DataFrame como archivos CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Pasos siguientes

Para aprovechar más funcionalidades de Spark en Databricks, consulte: