Compartir a través de


Introducción: Ingesta e inserción de datos adicionales

Este artículo de introducción le guía a través del uso de un cuaderno de Azure Databricks para ingerir un archivo CSV que contiene datos de nombres de bebé adicionales en su volumen de Unity Catalog y, después, importar los nuevos datos de nombres de bebé en una tabla existente mediante Python, Scala y R.

Importante

Este artículo de introducción es una continuación de Introducción: Importación y visualización de datos CSV desde un cuaderno. Debe completar los pasos descritos en ese artículo para poder completar el artículo actual. Para obtener el cuaderno completo para ese artículo de introducción, consulte Importación y visualización de cuadernos de datos.

Requisitos

Para completar las tareas de este artículo, debe cumplir los siguientes requisitos:

Sugerencia

Para ver un cuaderno completado para este artículo, consulte Importación y visualización de cuadernos de datos.

Paso 1: Crear un nuevo cuaderno

Para crear un cuaderno en el área de trabajo, haga clic en Nuevo iconoNuevo en la barra lateral y a continuación, haga clic en Cuaderno. Se abre un cuaderno en blanco en el área de trabajo.

Para obtener más información sobre cómo crear y administrar cuadernos, consulte Administración de cuadernos.

Paso 2: Definir variables

En este paso, definirá variables para su uso en el cuaderno de ejemplo que cree en este artículo.

  1. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog. De manera opcional, reemplace el valor table_name por un nombre de la tabla de su elección. Guardará los datos del nombre del bebé en esta tabla más adelante en este artículo.

  2. Presione Shift+Enter para ejecutar la celda y crear una nueva celda en blanco.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    file_name = "new_baby_names.csv"
    table_name = "baby_names"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val fileName = "new_baby_names.csv"
    val tableName = "baby_names"
    val pathVolume = s"/Volumes/${catalog}/${schema}/${volume}"
    val pathTable = s"${catalog}.${schema}"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    file_name <- "new_baby_names.csv"
    table_name <- "baby_names"
    path_volume <- paste0("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste0(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    

Paso 3: Agregar nuevo archivo CSV de datos al volumen de Unity Catalog

Este paso crea un DataFrame denominado df con un nuevo nombre de bebé para 2022 y, luego, guarda esos datos en un nuevo archivo CSV en el volumen de Unity Catalog.

Nota:

Este paso simula la adición de nuevos datos anuales a los datos existentes cargados durante años anteriores. En el entorno de producción, estos datos incrementales se almacenarían en el almacenamiento en la nube.

  1. Copie y pegue el código siguiente en la nueva celda del cuaderno vacío. Este código crea el DataFrame con los datos adicionales del nuevo nombre de bebé y, después, escribe esos datos en un archivo CSV en el volumen de Unity Catalog.

    Python

    data = [[2022, "CARL", "Albany", "M", 42]]
    
    df = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    # display(df)
    (df.coalesce(1)
        .write
        .option("header", "true")
        .mode("overwrite")
        .csv(f"{path_volume}/{file_name}"))
    

    Scala

    val data = Seq((2022, "CARL", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df = data.toDF(columns: _*)
    
    // display(df)
    df.coalesce(1)
        .write
        .option("header", "true")
        .mode("overwrite")
        .csv(f"{pathVolume}/{fileName}")
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(Year = 2022,
        First_Name = "CARL",
        County = "Albany",
        Sex = "M",
        Count = 42)
    
    df <- createDataFrame(data)
    # display(df)
    write.df(df, path = paste0(path_volume, "/", file_name),
        source = "csv",
        mode = "overwrite",
        header = "true")
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Paso 4: Cargar datos en un DataFrame desde un archivo CSV

Nota:

Este paso simula la carga de datos desde el almacenamiento en la nube.

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código carga los nuevos datos de nombres de bebé en un nuevo DataFrame desde el archivo CSV.

    Python

    df1 = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df1)
    

    Scala

    val df1 = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    display(df1)
    

    R

    df1 <- read.df(paste0(path_volume, "/", file_name),
        source = "csv",
        header = TRUE,
        inferSchema = TRUE)
    display(df1)
    
  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

Paso 5: Insertar datos en una tabla existente

  1. Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código anexa los nuevos datos de nombres de bebé del DataFrame a la tabla existente.

    Python

    df.write.mode("append").insertInto(f"{path_table}.{table_name}")
    display(spark.sql(f"SELECT * FROM {path_table}.{table_name} WHERE Year = 2022"))
    

    Scala

    df1.write.mode("append").insertInto(s"${pathTable}.${tableName}")
    display(spark.sql(s"SELECT * FROM ${pathTable}.${tableName} WHERE Year = 2022"))
    

    R

    # The write.df function in R, as provided by the SparkR package, does not directly support writing to Unity Catalog.
    # In this example, you write the DataFrame into a temporary view and then use the SQL command to insert data from the temporary view to the Unity Catalog table
    createOrReplaceTempView(df1, "temp_view")
    sql(paste0("INSERT INTO ", path_table, ".", table_name, " SELECT * FROM temp_view"))
    display(sql(paste0("SELECT * FROM ", path_table, ".", table_name, " WHERE Year = 2022")))
    
  2. Presione Ctrl+Enter para ejecutar la celda.

Ingesta de cuadernos de datos adicionales

Use uno de los siguientes cuadernos para realizar los pasos descritos en este artículo. Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog. De manera opcional, reemplace el valor table_name por un nombre de la tabla de su elección.

Python

Ingesta e inserción de datos adicionales mediante Python

Obtener el cuaderno

Scala

Ingesta e inserción de datos adicionales mediante Scala

Obtener el cuaderno

R

Ingesta e inserción de datos adicionales mediante R

Obtener el cuaderno

Pasos siguientes

Para obtener información sobre la limpieza y mejora de los datos, consulte Comenzar: Mejora y limpieza de datos.

Recursos adicionales