Conceptos básicos de PySpark
En este artículo se describen ejemplos sencillos para ilustrar el uso de PySpark. Se supone que comprende los conceptos de Apache Spark fundamentales y ejecuta comandos en un cuaderno de Azure Databricks conectado al proceso. Puede crear DataFrames mediante datos de ejemplo, realizar transformaciones básicas, incluidas las operaciones de fila y columna en estos datos, combinar varios DataFrames y agregar estos datos, visualizar estos datos y, a continuación, guardarlos en una tabla o archivo.
Carga de datos
Algunos ejemplos de este artículo usan datos de ejemplo proporcionados por Databricks para demostrar el uso de DataFrames para cargar, transformar y guardar datos. Si desea usar sus propios datos que aún no están en Databricks, puede cargarlos primero y crear un DataFrame a partir de él. Consulte Crear o modificar una tabla mediante la carga de archivos y Carga de archivos en un volumen de catálogo de Unity.
Acerca de los datos de ejemplo de Databricks
Databricks proporciona datos de ejemplo en el catálogo samples
y en el directorio /databricks-datasets
.
- Para acceder a los datos de ejemplo del catálogo
samples
, use el formatosamples.<schema-name>.<table-name>
. En este artículo se usan tablas del esquemasamples.tpch
, que contiene datos de una empresa ficticia. La tablacustomer
contiene información sobre los clientes yorders
contiene información sobre los pedidos realizados por esos clientes. - Use
dbutils.fs.ls
para explorar los datos en/databricks-datasets
. Use Spark SQL o DataFrames para consultar datos en esta ubicación mediante rutas de acceso de archivo. Para obtener más información sobre los datos de ejemplo proporcionados por Databricks, consulte Conjuntos de datos de ejemplo.
Importar tipos de datos
Muchas operaciones de PySpark requieren que use funciones SQL o interactúe con tipos nativos de Spark. Solo puede importar directamente esas funciones y tipos que necesite, o bien puede importar todo el módulo.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Dado que algunas funciones importadas podrían invalidar las funciones integradas de Python, algunos usuarios eligen importar estos módulos mediante un alias. En los ejemplos siguientes se muestra un alias común que se usa en ejemplos de código de Apache Spark:
import pyspark.sql.types as T
import pyspark.sql.functions as F
Para obtener una lista completa de los tipos de datos, consulte Tipos de datos de Spark.
Para obtener una lista completa de las funciones SQL de PySpark, consulte Funciones de Spark.
Crear un DataFrame
Hay varias maneras de crear un DataFrame. Normalmente, se define un DataFrame en un origen de datos, como una tabla o una colección de archivos. A continuación, como se describe en la sección Conceptos básicos de Apache Spark, use una acción, como display
, para desencadenar las transformaciones que se van a ejecutar. El método display
genera DataFrames.
Crear un DataFrame con valores especificados
Para crear un DataFrame con valores especificados, use el método createDataFrame
, donde las filas se expresan como una lista de tuplas:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Observe en la salida que los tipos de datos de columnas de df_children
se deducen automáticamente. También puede especificar los tipos agregando un esquema. Los esquemas se definen mediante el StructType
que se compone de StructFields
que especifican el nombre, el tipo de datos y una marca booleana que indica si contienen un valor null o no. Debe importar tipos de datos de pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Crear un DataFrame a partir de una tabla en Unity Catalog
Para crear un DataFrame a partir de una tabla en Unity Catalog, use el método table
que identifica la tabla con el formato <catalog-name>.<schema-name>.<table-name>
. Haga clic en Catálogo en la barra de navegación izquierda para usar el Explorador de catálogos para ir a la tabla. Haga clic en ella y seleccione Copiar ruta de acceso de tabla para insertar la ruta de acceso de la tabla en el cuaderno.
En el ejemplo siguiente se carga la tabla samples.tpch.customer
, pero también puede proporcionar la ruta de acceso a su propia tabla.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Crear un DataFrame a partir de un archivo cargado
Para crear un DataFrame a partir de un archivo que cargó en volúmenes de Unity Catalog, use la propiedad read
. Este método devuelve un DataFrameReader
, que puede usar para leer el formato adecuado. Haga clic en la opción de catálogo de la barra lateral pequeña de la izquierda y use el explorador de catálogos para localizar el archivo. Selecciónelo y, a continuación, haga clic en Copiar ruta de acceso del archivo de volumen.
El ejemplo siguiente lee de un archivo *.csv
, pero DataFrameReader
admite la carga de archivos en muchos otros formatos. Consulte Métodos de DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Para obtener más información sobre los volúmenes de Unity Catalog, vea ¿Qué son los volúmenes del Unity Catalog?.
Crear un DataFrame a partir de una respuesta JSON
Para crear un DataFrame a partir de una carga de respuesta JSON devuelta por una API de REST, use el paquete de Python requests
para consultar y analizar la respuesta. Debe importar el paquete para usarlo. En este ejemplo se usan datos de la base de datos de la aplicación de medicamentos de la Food and Drug Administration de los Estados Unidos.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Para obtener información sobre cómo trabajar con JSON y otros datos semiestructurados en Databricks, consulte Modelado de datos semiestructurados.
Seleccionar un campo o objeto JSON
Para seleccionar un campo u objeto específico del JSON convertido, use la notación []
. Por ejemplo, para seleccionar el campo products
que es una matriz de productos:
display(df_drugs.select(df_drugs["products"]))
También puede encadenar llamadas de método para recorrer varios campos. Por ejemplo, para generar el nombre de la organización del primer producto en una aplicación de medicamento:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Crear un DataFrame a partir de un archivo
Para demostrar cómo crear un DataFrame a partir de un archivo, en este ejemplo se cargan datos CSV en el directorio /databricks-datasets
.
Para navegar a los conjuntos de datos de ejemplo, puede usar los comandos del sistema de archivos de Databricks Utilties. En el ejemplo siguiente se usa dbutils
para enumerar los conjuntos de datos disponibles en /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
Como alternativa, puede usar %fs
para acceder a los comandos del sistema de archivos de la CLI de Databricks, como se muestra en el ejemplo siguiente:
%fs ls '/databricks-datasets'
Para crear un DataFrame a partir de un archivo o directorio de archivos, especifique la ruta de acceso en el método load
:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transformar datos con DataFrames
Los DataFrames facilitan la transformación de datos mediante métodos integrados para ordenar, filtrar y agregar datos. Muchas transformaciones no se especifican como métodos en DataFrames, sino que se proporcionan en el paquete spark.sql.functions
. Consulte Funciones de SQL de Databricks Spark.
Operaciones de columna
Spark proporciona muchas operaciones básicas de columna:
- Seleccionar columnas
- Crear columnas
- Cambiar el nombre de las columnas
- Tipos de columna de conversión
- Quitar columnas
Sugerencia
Para generar todas las columnas de un DataFrame, use columns
, por ejemplo df_customer.columns
.
Seleccionar columnas
Puede seleccionar columnas específicas mediante select
y col
. La función col
está en el submódulo pyspark.sql.functions
.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
También puede hacer referencia a una columna mediante expr
, que toma una expresión definida como cadena:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
También puede usar selectExpr
, que acepta expresiones SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Para seleccionar columnas mediante un literal de cadena, haga lo siguiente:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Para seleccionar explícitamente una columna de un DataFrame específico, puede usar el operador []
o el operador .
. (El operador .
no se puede usar para seleccionar columnas que empiecen por un entero, o que contengan un espacio o un carácter especial). Esto puede resultar especialmente útil cuando se combinan DataFrames en los que algunas columnas tienen el mismo nombre.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Crear columnas
Para crear una columna, use el método withColumn
. En el ejemplo siguiente se crea una nueva columna que contiene un valor booleano en función de si el saldo de la cuenta de cliente c_acctbal
supera 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Cambiar el nombre de las columnas
Para cambiar el nombre de una columna, use el método withColumnRenamed
, que acepta los nombres de columna existentes y nuevos:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
El método alias
resulta especialmente útil cuando desea cambiar el nombre de las columnas como parte de las agregaciones:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Tipos de columna de conversión
En algunos casos, puede que desee cambiar el tipo de datos de una o varias de las columnas del DataFrame. Para ello, use el método cast
para convertir entre tipos de datos de columna. En el ejemplo siguiente se muestra cómo convertir una columna de un entero a un tipo de cadena mediante el método col
para hacer referencia a una columna:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Quitar columnas
Para quitar columnas, puede omitir columnas durante una selección o select(*) except
o puede usar el método drop
:
df_customer_flag_renamed.drop("balance_flag_renamed")
También puede quitar varias columnas a la vez:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Operaciones de fila
Spark proporciona muchas operaciones básicas de fila:
- Filtrar filas
- Quitar filas duplicadas
- Controlar valores null
- Anexión de filas
- Ordenar filas
- Filtrar filas
Filtrar filas
Para filtrar filas, use el método filter
o where
en un DataFrame para devolver solo determinadas filas. Para identificar una columna en la que se va a filtrar, use el método col
o una expresión que se evalúe como una columna.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Para filtrar por varias condiciones, use operadores lógicos. Por ejemplo, &
y |
le permiten usar condiciones AND
y OR
, respectivamente. En el ejemplo siguiente se filtran las filas en las que c_nationkey
es igual a 20
y c_acctbal
es mayor que 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Quitar filas duplicadas
Para desduplicar filas, use distinct
, que devuelve solo las filas únicas.
df_unique = df_customer.distinct()
Controlar valores null
Para controlar valores null, quite filas que contengan valores null mediante el método na.drop
. Este método le permite especificar si desea quitar filas que contengan valores null any
o valores null all
.
Para quitar los valores null, use cualquiera de los ejemplos siguientes.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Si solo desea filtrar las filas que contienen todos los valores null, use lo siguiente:
df_customer_no_nulls = df_customer.na.drop("all")
Puede aplicar esto para un subconjunto de columnas especificando esto, como se muestra a continuación:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Para rellenar los valores que faltan, use el método fill
. Puede optar por aplicar esto a todas las columnas o a un subconjunto de columnas. En el ejemplo siguiente, los saldos de la cuenta que tienen un valor null para su saldo de cuenta c_acctbal
se rellenan con 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Para reemplazar cadenas por otros valores, use el método replace
. En el ejemplo siguiente, las cadenas de dirección vacías se reemplazan por la palabra UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Anexión de filas
Para anexar filas, debe usar el método union
para crear un nuevo DataFrame. En el ejemplo siguiente, el DataFrame df_that_one_customer
creado anteriormente y df_filtered_customer
se combinan, lo que devuelve un DataFrame con tres clientes:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Nota:
También puede combinar DataFrames escribiéndolos en una tabla y anexando nuevas filas. En el caso de las cargas de trabajo de producción, el procesamiento incremental de orígenes de datos en una tabla de destino puede reducir drásticamente la latencia y los costes de proceso a medida que los datos crecen en tamaño. Consulte Ingesta de datos en una instancia de Databricks Lakehouse.
Ordenar filas
Importante
La ordenación puede ser costosa a gran escala y, si almacena datos ordenados y vuelve a cargar los datos con Spark, no se garantiza el orden. Asegúrate de usar la ordenación intencionadamente.
Para ordenar las filas por una o varias columnas, use el método sort
o orderBy
. De manera predeterminada, estos métodos se ordenan de la A a la Z:
df_customer.orderBy(col("c_acctbal"))
Para filtrar en orden de la Z a la A, use desc
:
df_customer.sort(col("c_custkey").desc())
En el ejemplo siguiente se muestra cómo ordenar en dos columnas:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Para limitar el número de filas que se van a devolver una vez ordenado el DataFrame, use el método limit
. En el ejemplo siguiente solo se muestran los 10
resultados principales:
display(df_sorted.limit(10))
Combinar DataFrames
Para combinar dos o más DataFrames, use el método join
. Puede especificar cómo desea que los DataFrames se unan en los parámetros how
(tipo de combinación) y on
(en qué columnas basar la combinación). Entre los tipos de combinación comunes se incluyen:
inner
: este es el valor predeterminado del tipo de combinación, que devuelve un DataFrame que mantiene solo las filas donde hay una coincidencia para el parámetroon
en los DataFrames.left
: mantiene todas las filas del primer DataFrame especificado y solo las filas del segundo DataFrame especificado que tienen una coincidencia con el primero.outer
: una combinación externa mantiene todas las filas de ambos DataFrames independientemente de la coincidencia.
Para obtener información detallada sobre las combinaciones, consulte Trabajar con combinaciones en Azure Databricks. Para obtener una lista de las combinaciones admitidas en PySpark, consulte Combinaciones de DataFrame.
En el ejemplo siguiente se devuelve un solo DataFrame donde cada fila del DataFrame orders
se combina con la fila correspondiente del DataFrame customers
. Se usa una combinación interna, ya que se espera que cada pedido corresponda exactamente a un cliente.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Para combinar en varias condiciones, use operadores booleanos como &
y |
para especificar AND
y OR
, respectivamente. El ejemplo siguiente agrega una condición adicional, filtrando solo las filas que tienen o_totalprice
mayor que 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Adición de datos
Para agregar datos en un DataFrame, similar a GROUP BY
en SQL, use el método groupBy
para especificar columnas por las que agrupar y el método agg
para especificar agregaciones. Importe agregaciones comunes, como avg
, sum
, max
y min
desde pyspark.sql.functions
. En el ejemplo siguiente se muestra el saldo medio de los clientes por segmento de mercado:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Algunas agregaciones son acciones, lo que significa que desencadenan cálculos. En este caso, no es necesario usar otras acciones para generar resultados.
Para contar filas en un DataFrame, use el método count
:
df_customer.count()
Encadenar llamadas
Los métodos que transforman DataFrames devuelven DataFrames y Spark no actúa en las transformaciones hasta que se llama a las acciones. Esta evaluación diferida significa que puede encadenar varios métodos para mayor comodidad y legibilidad. En el ejemplo siguiente se muestra cómo encadenar el filtrado, la agregación y el orden:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualizar el DataFrame
Para visualizar un DataFrame en un cuaderno, haga clic en el signo + situado junto a la tabla situada en la parte superior izquierda del DataFrame y, a continuación, seleccione Visualización para agregar uno o varios gráficos basados en el DataFrame. Para obtener más información sobre las visualizaciones, consulte Visualizaciones en cuadernos de Databricks.
display(df_order)
Para realizar visualizaciones adicionales, Databricks recomienda usar API de Pandas para Spark. .pandas_api()
permite convertir a la API de Pandas correspondiente para un DataFrame de Spark. Para obtener más información, consulte API de Pandas en Spark.
Guardar sus datos
Una vez que haya transformado los datos, puede guardarlos mediante los métodos DataFrameWriter
. Puede encontrar una lista completa de estos métodos en DataFrameWriter. Las siguientes secciones muestran cómo guardar su DataFrame como una tabla y como una colección de archivos de datos.
Guardar el DataFrame como una tabla
Para guardar el DataFrame como una tabla en Unity Catalog, use el método write.saveAsTable
y especifique la ruta de acceso con el formato <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Escribir el DataFrame como CSV
Para escribir el DataFrame en el formato *.csv
, use el método write.csv
, especificando el formato y las opciones. De forma predeterminada, si los datos existen en la ruta de acceso especificada, se produce un error en la operación de escritura. Puede especificar uno de los siguientes modos para realizar una acción diferente:
overwrite
sobrescribe todos los datos existentes en la ruta de acceso de destino con el contenido de DataFrame.append
anexa el contenido del DataFrame a los datos de la ruta de acceso de destino.ignore
produce un error silencioso en la escritura si existen datos en la ruta de acceso de destino.
En el ejemplo siguiente se muestra la sobrescritura de datos con contenido de DataFrame como archivos CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Pasos siguientes
Para aprovechar más funcionalidades de Spark en Databricks, consulte: