Compartir vía


Tutorial: Carga y transformación de datos mediante DataFrames de Apache Spark

En este tutorial se muestra cómo cargar y transformar datos mediante la API DataFrame de Apache Spark Python (PySpark), la API DataFrame de Apache Spark Scala y la API SparkDataFrame de SparkR en Azure Databricks.

Al final de este tutorial, comprenderá lo que es un DataFrame y estará familiarizado con las siguientes tareas:

Python

Consulte también la referencia de la API de PySpark de Apache Spark.

Scala

Consulte también la referencia de la API de Scala de Apache Spark.

R

Consulte también la referencia a la API de Apache SparkR.

¿Qué es un DataFrame?

Un DataFrame es una estructura de datos etiquetada bidimensional con columnas de tipos potencialmente diferentes. Puedes pensar en un DataFrame como una hoja de cálculo, una tabla SQL o un diccionario de objetos de serie. DataFrame de Apache Spark proporciona un amplio conjunto de funciones (selección de columnas, filtro, unión, incorporación) que permiten resolver problemas comunes de análisis de datos de forma eficaz.

Los DataFrames de Apache Spark son una compilación de abstracción basada en conjuntos de datos distribuidos resistentes (RDD). Spark DataFrame y Spark SQL usan un motor unificado de planificación y optimización, lo que le permite obtener un rendimiento casi idéntico en todos los lenguajes admitidos en Azure Databricks (Python, SQL, Scala y R).

Requisitos

Para completar el siguiente tutorial, debe cumplir los siguientes requisitos:

  • Para usar los ejemplos de este tutorial, el área de trabajo debe tener habilitado Unity Catalog.

  • En los ejemplos de este tutorial se usa un volumen de Unity Catalog para almacenar datos de ejemplo. Para usar estos ejemplos, cree un volumen y use los nombres de catálogo, esquema y volumen de ese volumen para establecer la ruta de acceso del volumen usada por los ejemplos.

  • Debe tener estos permisos en Unity Catalog:

    • READ VOLUME y WRITE VOLUME, o ALL PRIVILEGES para el volumen usado para este tutorial.
    • USE SCHEMA o ALL PRIVILEGES para el esquema usado para este tutorial.
    • USE CATALOG o ALL PRIVILEGES para el catálogo usado para este tutorial.

    Para establecer estos permisos, consulte los privilegios de administrador de Databricks o Unity Catalog y objetos protegibles.

Sugerencia

Para ver un cuaderno completado para este artículo, vea cuadernos del tutorial DataFrame.

Paso 1: Definir variables y cargar el archivo CSV

En este paso se definen las variables para su uso en este tutorial y, a continuación, se carga un archivo CSV que contiene los datos de nombres de bebé de health.data.ny.gov en el volumen de Unity Catalog.

  1. Para abrir un nuevo cuaderno, haga clic en el icono Icono Nuevo. Para obtener información sobre cómo navegar por cuadernos de Azure Databricks, consulte Interfaz y controles del cuaderno de Databricks.

  2. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog. Reemplace <table_name> por un nombre de la tabla de su elección. Cargará los datos de nombres de bebé en esta tabla más adelante en este tutorial.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Presione Shift+Enter para ejecutar la celda y crear una nueva celda en blanco.

  4. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código copia el archivo rows.csv de health.data.ny.gov en el volumen de Unity Catalog mediante el comando dbutuils de Databricks.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Paso 2: Crear un DataFrame

En este paso se crea un DataFrame denominado df1 con datos de prueba y, a continuación, se muestra su contenido.

  1. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código crea el DataFrame con datos de prueba y, a continuación, muestra el contenido y el esquema del DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Paso 3: Cargar datos en un DataFrame desde un archivo CSV

Este paso crea un DataFrame denominado df_csv desde el archivo CSV que cargó anteriormente en el volumen de Unity Catalog. Consulte spark.read.csv.

  1. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código carga los datos de nombres de bebé en el DataFrame df_csv desde el archivo CSV y, a continuación, muestra el contenido del DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Puede cargar datos de muchos formatos de archivo admitidos.

Paso 4: Ver e interactuar con el DataFrame

Vea e interactúe con los DataFrames de nombres de bebé mediante los métodos siguientes.

Aprenda a mostrar el esquema de un DataFrame de Apache Spark. Spark usa el término esquema para hacer referencia a los nombres y los tipos de datos de las columnas del DataFrame.

Nota:

Azure Databricks también usa el esquema de términos para describir una colección de tablas registradas en un catálogo.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra el esquema de los DataFrames con el método .printSchema() para ver los esquemas de los dos DataFrames y preparar la unión de ambos.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Cambiar un nombre de columna en el DataFrame

Obtenga información sobre cómo cambiar el nombre de una columna en un DataFrame.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código cambia el nombre de una columna del DataFrame df1_csv para que coincida con la columna correspondiente en el DataFrame df1. Este código usa el método withColumnRenamed() de Apache Spark.

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Combinar DataFrames

Obtenga información sobre cómo crear un DataFrame que agregue las filas de un DataFrame a otro.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark union() para combinar el contenido del primer DataFrame df con el DataFrame df_csv que contiene los datos de nombres de bebé cargados desde el archivo CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Filtrado de filas en un DataFrame

Descubra los nombres de bebé más populares en el conjunto de datos mediante el filtrado de filas mediante los métodos .filter() o .where() de Apache Spark. Use el filtrado para seleccionar un subconjunto de filas para devolver o modificar en un DataFrame. No hay ninguna diferencia en el rendimiento o la sintaxis, como se muestra en los siguientes ejemplos.

Uso del método .filter()

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark .filter() para mostrar esas filas en el DataFrame con un recuento de más de 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Uso del método .where()

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark .where() para mostrar esas filas en el DataFrame con un recuento de más de 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Seleccionar columnas de un DataFrame y ordenar por frecuencia

Obtenga información sobre la frecuencia de un nombre de bebé con el método select() para especificar las columnas de DataFrame que se van a devolver. Use las funciones orderby y desc de Apache Spark para ordenar los resultados.

El módulo pyspark.sql para Apache Spark proporciona compatibilidad con funciones SQL. Entre estas funciones que se usan en este tutorial se encuentran las funciones orderBy(), desc() y expr() de Apache Spark. Para habilitar el uso de estas funciones, debe importarlas en la sesión según sea necesario.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función desc() y, a continuación, usa el método select() de Apache Spark y las funciones orderBy() y desc() de Apache Spark para mostrar los nombres más comunes y sus recuentos en orden descendente.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Creación de un DataFrame de subconjunto

Obtenga información sobre cómo crear un DataFrame de subconjunto a partir de un DataFrame existente.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método filter de Apache Spark para crear un nuevo DataFrame que restringe los datos por año, recuento y sexo. Usa el método select() de Apache Spark para limitar las columnas. También usa las funciones orderBy() y desc() de Apache Spark para ordenar el nuevo DataFrame por recuento.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Paso 5: Guardar el DataFrame

Obtenga información sobre cómo guardar un DataFrame. Puede guardar el DataFrame en una tabla o escribir el DataFrame en un archivo o en varios archivos.

Guardar el DataFrame en una tabla

Azure Databricks usa el formato Delta Lake para todas las tablas de forma predeterminada. Para guardar el Dataframe, debe tener privilegios de tabla CREATE en el catálogo y el esquema.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el contenido de DataFrame en una tabla mediante la variable que definió al principio de este tutorial.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

La mayoría de las aplicaciones Spark funcionan en grandes conjuntos de datos y de forma distribuida. Spark escribe un directorio de archivos en lugar de un solo archivo. Delta Lake divide las carpetas y los archivos de Parquet. Muchos sistemas de datos pueden leer estos directorios de archivos. Azure Databricks recomienda usar tablas a través de rutas de acceso de archivo para la mayoría de las aplicaciones.

Guardar el DataFrame en archivos JSON

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el DataFrame en un directorio de archivos JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Lea el DataFrame de un archivo JSON

Aprenda a usar el método spark.read.format() de Apache Spark para leer datos JSON de un directorio en un DataFrame.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra los archivos JSON que guardó en el ejemplo anterior.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Tareas adicionales: Ejecución de consultas SQL en PySpark, Scala y R

Los DataFrames de Apache Spark proporcionan las siguientes opciones para combinar SQL con PySpark, Scala y R. Puede ejecutar el código siguiente en el mismo cuaderno que creó para este tutorial.

Especificar una columna como una consulta SQL

Aprenda a usar el método selectExpr() de Apache Spark. Se trata de una variante del método select() que acepta expresiones SQL y devuelve un DataFrame actualizado. Este método permite usar una expresión SQL, como upper.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark selectExpr() y la expresión SQL upper para convertir una columna de cadena en mayúsculas (y cambiar el nombre de la columna).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Uso de expr() para usar la sintaxis SQL para una columna

Obtenga información sobre cómo importar y usar la función de Apache Spark expr() para usar la sintaxis SQL en cualquier lugar en el que se especifique una columna.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función expr() y, a continuación, usa la función expr() de Apache Spark y la expresión SQL lower para convertir una columna de cadena en minúsculas (y cambiar el nombre de la columna).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Ejecución de una consulta SQL arbitraria mediante la función spark.sql()

Aprenda a usar la función spark.sql() de Apache Spark para ejecutar consultas SQL arbitrarias.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa la función spark.sql() de Apache Spark para consultar una tabla SQL mediante la sintaxis SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Cuaderno del tutorial de DataFrame

Los cuadernos siguientes incluyen las consultas de ejemplos de este tutorial.

Python

Tutorial de DataFrames con Python

Obtener el cuaderno

Scala

Tutorial de DataFrames con Scala

Obtener el cuaderno

R

Tutorial de DataFrames con R

Obtener el cuaderno

Recursos adicionales