แชร์ผ่าน


Tutorial: Run your first Delta Live Tables pipeline

This tutorial takes you through the steps to configure your first Delta Live Tables pipeline, write basic ETL code, and run a pipeline update.

All steps in this tutorial are designed for workspaces with Unity Catalog enabled. You can also configure Delta Live Tables pipelines to work with the legacy Hive metastore. See Use Delta Live Tables pipelines with legacy Hive metastore.

Note

This tutorial has instructions for developing and validating new pipeline code using Databricks notebooks. You can also configure pipelines using source code in Python or SQL files.

You can configure a pipeline to run your code if you already have source code written using Delta Live Tables syntax. See Configure a Delta Live Tables pipeline.

You can use fully declarative SQL syntax in Databricks SQL to register and set refresh schedules for materialized views and streaming tables as Unity Catalog-managed objects. See Use materialized views in Databricks SQL and Load data using streaming tables in Databricks SQL.

Example: Ingest and process New York baby names data

The example in this article uses a publicly available dataset that contains records of New York State baby names. This example demonstrates using a Delta Live Tables pipeline to:

  • Read raw CSV data from a volume into a table.
  • Read the records from the ingestion table and use Delta Live Tables expectations to create a new table that contains cleansed data.
  • Use the cleansed records as input to Delta Live Tables queries that create derived datasets.

This code demonstrates a simplified example of the medallion architecture. See What is the medallion lakehouse architecture?.

Implementations of this example are provided for Python and SQL. Follow the steps to create a new pipeline and notebook, and then copy-paste the provided code.

Example notebooks with complete code are also provided.

Requirements

  • To start a pipeline, you must have cluster creation permission or access to a cluster policy defining a Delta Live Tables cluster. The Delta Live Tables runtime creates a cluster before it runs your pipeline and fails if you don’t have the correct permission.

  • All users can trigger updates using serverless pipelines by default. Serverless must be enabled at the account level and might not be available in your workspace region. See Enable serverless compute.

  • The examples in this tutorial use Unity Catalog. Databricks recommends creating a new schema to run this tutorial, as multiple database objects are created in the target schema.

    • To create a new schema in a catalog, you must have ALL PRIVILEGES or USE CATALOG and CREATE SCHEMA privileges.
    • If you cannot create a new schema, run this tutorial against an existing schema. You must have the following privileges:
      • USE CATALOG for the parent catalog.
      • ALL PRIVILEGES or USE SCHEMA, CREATE MATERIALIZED VIEW, and CREATE TABLE privileges on the target schema.
    • This tutorial uses a volume to store sample data. Databricks recommends creating a new volume for this tutorial. If you create a new schema for this tutorial, you can create a new volume in that schema.
      • To create a new volume in an existing schema, you must have the following privileges:
        • USE CATALOG for the parent catalog.
        • ALL PRIVILEGES or USE SCHEMA and CREATE VOLUME privileges on the target schema.
      • You can optionally use an existing volume. You must have the following privileges:
        • USE CATALOG for the parent catalog.
        • USE SCHEMA for the parent schema.
        • ALL PRIVILEGES or READ VOLUME and WRITE VOLUME on the target volume.

    To set these permissions, contact your Databricks administrator. For more on Unity Catalog privileges, see Unity Catalog privileges and securable objects.

Step 0: Download data

This example loads data from a Unity Catalog volume. The following code downloads a CSV file and stores it in the specified volume. Open a new notebook and run the following code to download this data to the specified volume:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

Replace <catalog-name>, <schema-name>, and <volume-name> with the catalog, schema, and volume names for a Unity Catalog volume. The provided code attempts to create the specified schema and volume if these objects do not exist. You must have the appropriate privileges to create and write to objects in Unity Catalog. See Requirements.

Note

Make sure this notebook has successfully run before continuing with the tutorial. Do not configure this notebook as part of your pipeline.

Step 1: Create a pipeline

Delta Live Tables creates pipelines by resolving dependencies defined in notebooks or files (called source code) using Delta Live Tables syntax. Each source code file can contain only one language, but you can add multiple language-specific notebooks or files in the pipeline.

Important

Do not configure any assets in the Source code field. Leaving this field black creates and configures a notebook for source code authoring.

The instructions in this tutorial use serverless compute and Unity Catalog. Use the default settings for all configuration options not specified in these instructions.

Note

If serverless is not enabled or supported in your workspace, you can complete the tutorial as written using default compute settings. You must manually select Unity Catalog under Storage options in the Destination section of the Create pipeline UI.

To configure a new pipeline, do the following:

  1. In the sidebar, click Delta Live Tables.
  2. Click Create pipeline.
  3. In Pipeline name, type a unique pipeline name.
  4. Select the Serverless checkbox.
  5. In Destination, to configure a Unity Catalog location where tables are published, select a Catalog and a Schema.
  6. In Advanced, click Add configuration and then define pipeline parameters for the catalog, schema, and volume to which you downloaded data using the following parameter names:
    • my_catalog
    • my_schema
    • my_volume
  7. Click Create.

The pipelines UI appears for the new pipeline. A source code notebook is automatically created and configured for the pipeline.

The notebook is created in a new directory in your user directory. The name of the new directory and file match the name of your pipeline. For example, /Users/your.username@databricks.com/my_pipeline/my_pipeline.

A link to access this notebook is under the Source code field in the Pipeline details panel. Click the link to open the notebook before proceeding to the next step.

Step 2: Declare materialized views and streaming tables in a notebook with Python or SQL

You can use Datbricks notebooks to interactively develop and validate source code for Delta Live Tables pipelines. You must attach your notebook to the pipeline to use this functionality. To attach your newly created notebook to the pipeline you just created:

  1. Click Connect in the upper-right to open the compute configuration menu.
  2. Hover over the name of the pipeline you created in Step 1.
  3. Click Connect.

The UI changes to include Validate and Start buttons in the upper-right. To learn more about notebook support for pipeline code development, see Develop and debug Delta Live Tables pipelines in notebooks.

Important

  • Delta Live Tables pipelines evaluate all cells in a notebook during planning. Unlike notebooks that are run against all-purpose compute or scheduled as jobs, pipelines do not guarantee that cells run in the specified order.
  • Notebooks can only contain a single programming language. Do not mix Python and SQL code in pipeline source code notebooks.

For details on developing code with Python or SQL, see Develop pipeline code with Python or Develop pipeline code with SQL.

Example pipeline code

To implement the example in this tutorial, copy and paste the following code into a cell in the notebook configured as source code for your pipeline.

The provided code does the following:

  • Imports necessary modules (Python only).
  • References parameters defined during pipeline configuration.
  • Defines a streaming table named baby_names_raw that ingests from a volume.
  • Defines a materialized view named baby_names_prepared that validates ingested data.
  • Defines a materialized view named top_baby_names_2021 that has a highly refined view of the data.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Step 3: Start a pipeline update

To start a pipeline update, click the Start button in the top right of the notebook UI.

Example notebooks

The following notebooks contain the same code examples provided in this article. These notebooks have the same requirements as the steps in this article. See Requirements.

To import a notebook, complete the following steps:

  1. Open the notebook UI.
    • Click + New > Notebook.
    • An empty notebook opens.
  2. Click File > Import…. The Import dialog appears.
  3. Select the URL option for Import from.
  4. Paste the URL of the notebook.
  5. Click Import.

This tutorial requires that you run a data setup notebook before configuring and running your Delta Live Tables pipeline. Import the following notebook, attach the notebook to a compute resource, fill in the required variable for my_catalog, my_schema, and my_volume, and click Run all.

Data download for pipelines tutorial

Get notebook

The following notebooks provide examples in Python or SQL. When you import a notebook, it is saved to your user home directory.

After importing one of the below notebooks, complete the steps to create a pipeline, but use the Source code file picker to select the downloaded notebook. After creating the pipeline with a notebook configured as source code, click Start in the pipeline UI to trigger an update.

Get started with Delta Live Tables Python notebook

Get notebook

Get started with Delta Live Tables SQL notebook

Get notebook