Compartir vía


Tutorial: Delta Lake

En este tutorial se presentan las operaciones comunes de Delta Lake en Azure Databricks, incluidas las siguientes:

Puede ejecutar el código Python, Scala y SQL de ejemplo de este artículo desde un cuaderno adjuntado a un recurso de proceso de Azure Databricks, como un clúster . También puede ejecutar el código de SQL de este artículo desde una consulta asociada a un almacén de SQL en Databricks SQL.

Preparación de los datos de origen

Este tutorial se basa en un conjunto de datos llamado Personas 10 M. Contiene 10 millones de registros ficticios que contienen hechos sobre personas, como nombres y apellidos, fecha de nacimiento y salario. En este tutorial se supone que este conjunto de datos está en un catálogo de Unity volumen que está asociado al área de trabajo de Azure Databricks de destino.

Para obtener el conjunto de datos people 10 M de este tutorial, haga lo siguiente:

  1. Vaya a la página People 10 M en Kaggle.
  2. Haga clic en Descargar para descargar un archivo denominado archive.zip en el equipo local.
  3. Extraiga el nombre de archivo export.csv del archivo archive.zip. El archivo export.csv contiene los datos de este tutorial.

Para cargar el archivo export.csv en el volumen, haga lo siguiente:

  1. En la barra lateral, haga clic en Catálogo.
  2. En Explorador de catálogos, navegue y abra el volumen donde desea cargar el archivo export.csv.
  3. Haga clic en el botón Cargar en este volumen.
  4. Arrastre y coloque, o busque y seleccione el archivo export.csv en el equipo local.
  5. Haga clic en Cargar.

En los ejemplos de código siguientes, reemplace /Volumes/main/default/my-volume/export.csv por la ruta de acceso al archivo export.csv del volumen de destino.

Creación de una tabla

Todas las tablas creadas en Azure Databricks usan Delta Lake de manera predeterminada. Databricks recomienda el uso de Unity Catalog para tablas administradas.

En el ejemplo de código anterior y los ejemplos de código siguientes, reemplace el nombre de la tabla main.default.people_10m por el catálogo, el esquema y el nombre de tabla de destino en el catálogo de Unity.

Nota:

Delta Lake es el valor predeterminado para todos los comandos de lectura, escritura y creación de tablas de Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

Las operaciones anteriores crean una nueva tabla administrada. Para obtener información sobre las opciones disponibles al crear una tabla Delta, consulte CREATE TABLE.

En Databricks Runtime 13.3 LTS y versiones posteriores, puede usar CREATE TABLE LIKE para crear una nueva tabla Delta vacía que duplica las propiedades de esquema y tabla de una tabla Delta de origen. Esto puede ser especialmente útil al promover tablas de un entorno de desarrollo en producción, como se muestra en el ejemplo de código siguiente:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Para crear una tabla vacía, también puede usar la DeltaTableBuilderAPI en Delta Lake para Python y Scala. En comparación con las API de DataFrameWriter equivalentes, estas API facilitan la especificación de información adicional, como comentarios de columnas, propiedades de tabla y columnas generadas.

Importante

Esta característica está en versión preliminar pública.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Actualización/inserción (upsert) en una tabla

Para combinar un conjunto de actualizaciones e inserciones en una tabla Delta existente, use el método DeltaTable.merge para python y Scalay la instrucción MERGE INTO para SQL. Por ejemplo, en el ejemplo siguiente se toman datos de la tabla de origen y se combinan en la tabla Delta de destino. Cuando hay una fila coincidente en ambas tablas, Delta Lake actualiza la columna de datos mediante la expresión especificada. Cuando no hay ninguna fila que coincida, Delta Lake agrega una nueva fila. Esta operación se conoce como upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

En SQL, si especifica *, actualiza o inserta todas las columnas de la tabla de destino, suponiendo que la tabla de origen tenga las mismas columnas que la tabla de destino. Si la tabla de destino no tiene las mismas columnas, la consulta produce un error de análisis.

Debe especificar un valor para cada columna de la tabla al realizar una operación de inserción (por ejemplo, cuando no hay ninguna fila coincidente en el conjunto de datos existente). Sin embargo, no es necesario actualizar todos los valores.

Para ver los resultados, consulte la tabla.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Lectura de una tabla

Puede acceder a los datos de las tablas Delta por el nombre o la ruta de acceso de la tabla, como se muestra en los ejemplos siguientes:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Escritura en una tabla

Delta Lake usa una sintaxis estándar para escribir datos en tablas.

Para agregar datos nuevos de forma atómica a una tabla Delta existente, use el modo anexar como se muestra en los ejemplos siguientes:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Para reemplazar todos los datos de una tabla, use el modo de sobrescritura como en los ejemplos siguientes:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Actualización de una tabla

Puede actualizar aquellos datos que coincidan con un predicado en una tabla Delta. Por ejemplo, en la tabla de people_10m ejemplo, para cambiar una abreviatura en la columna gender de M o F a Male o Female, puede ejecutar lo siguiente:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Eliminación en una tabla

Puede quitar aquellos datos que coincidan con un predicado de una tabla Delta. Por instancia, en el ejemplo people_10m tabla, para eliminar todas las filas correspondientes a las personas con un valor en la columna birthDate de antes de 1955, puede ejecutar lo siguiente:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Importante

La eliminación quita los datos de la versión más reciente de la tabla Delta, pero no lo quita del almacenamiento físico hasta que las versiones anteriores se vacían explícitamente. Para más información, consulte la información sobre vacuum.

Visualización del historial de tablas

Para ver el historial de una tabla, use el método DeltaTable.history para python y Scalay la instrucción DESCRIBE HISTORY en SQL, que proporciona información de procedencia, incluida la versión de tabla, la operación, el usuario, etc., para cada escritura en una tabla.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Consulta de una versión anterior de la tabla (viaje en el tiempo)

El viaje en el tiempo de Delta Lake permite consultar una instantánea anterior de una tabla de Delta.

Para consultar una versión anterior de una tabla, especifique la versión de la tabla’o la marca de tiempo. Por ejemplo, para consultar la versión 0 o 2024-05-15T22:43:15.000+00:00Z de marca de tiempo del historial anterior, use lo siguiente:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

En el caso de las marcas de tiempo, solo se aceptan cadenas de fecha o marca de tiempo, por ejemplo, "2024-05-15T22:43:15.000+00:00" o "2024-05-15 22:43:15".

Las opciones DataFrameReader permiten crear un DataFrame a partir de una tabla Delta que se corrigió en una versión específica o marca de tiempo de la tabla, por ejemplo:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Para más información, consulte Trabajo con el historial de tablas de Delta Lake.

Optimización de una tabla

Después de realizar varios cambios en una tabla, es posible que tenga muchos archivos pequeños. Para mejorar la velocidad de las consultas de lectura, puede usar la operación de optimización para contraer archivos pequeños en archivos más grandes:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Orden Z por columnas

Para mejorar aún más el rendimiento de lectura, puede colocar información relacionada en el mismo conjunto de archivos mediante la ordenación en z. Los algoritmos de omisión de datos de Delta Lake usan esta intercalación para reducir drásticamente la cantidad de datos que se deben leer. Para los datos de orden z, especifique las columnas que se van a ordenar en la operación Ordenar en z por. Por ejemplo, para intercalar mediante gender, ejecute:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Para obtener el conjunto completo de opciones disponibles al ejecutar la operación de optimización, consulte Optimización del diseño del archivo de datos.

Limpieza de instantáneas con VACUUM

Delta Lake proporciona aislamiento de instantáneas para lecturas, lo que significa que es seguro ejecutar una operación de optimización incluso mientras otros usuarios o trabajos consultan la tabla. Sin embargo, al final debe limpiar las instantáneas antiguas. Para ello, ejecute la operación de vacío:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Para obtener más información sobre el uso eficaz de la operación de vacío, consulte Quitar archivos de datos sin usar conde vacío.