Partilhar via


Tutorial: Executar o seu primeiro pipeline Delta Live Tables

Este tutorial leva você pelas etapas para configurar seu primeiro pipeline Delta Live Tables, escrever código ETL básico e executar uma atualização de pipeline.

Todas as etapas neste tutorial são projetadas para espaços de trabalho com o Unity Catalog habilitado. Você também pode configurar pipelines Delta Live Tables para trabalhar com o metastore herdado do Hive. Consulte Usar pipelines Delta Live Tables com metastore herdado do Hive.

Nota

Este tutorial tem instruções para desenvolver e validar novo código de pipeline usando notebooks Databricks. Você também pode configurar pipelines usando código-fonte em arquivos Python ou SQL.

Você pode configurar um pipeline para executar seu código se já tiver o código-fonte escrito usando a sintaxe Delta Live Tables. Consulte Configurar um pipeline Delta Live Tables.

Você pode usar a sintaxe SQL totalmente declarativa no Databricks SQL para registrar e definir agendas de atualização para exibições materializadas e tabelas de streaming como objetos gerenciados pelo Unity Catalog. Consulte Usar exibições materializadas no Databricks SQL e Carregar dados usando tabelas de streaming no Databricks SQL.

Exemplo: Ingerir e processar dados de nomes de bebés de Nova Iorque

O exemplo neste artigo usa um conjunto de dados disponível publicamente que contém registros de nomes de bebês do Estado de Nova York. Este exemplo demonstra o uso de um pipeline Delta Live Tables para:

  • Leia dados CSV brutos de um volume em uma tabela.
  • Leia os registros da tabela de ingestão e use as expectativas do Delta Live Tables para criar uma nova tabela que contenha dados limpos.
  • Use os registros limpos como entrada para consultas Delta Live Tables que criam conjuntos de dados derivados.

Este código demonstra um exemplo simplificado da arquitetura medalhão. Veja O que é a arquitetura do medalhão lakehouse?.

Implementações deste exemplo são fornecidas para Python e SQL. Siga as etapas para criar um novo pipeline e bloco de anotações e, em seguida, copie e cole o código fornecido.

Exemplos de blocos de anotações com código completo também são fornecidos.

Requisitos

  • Para iniciar um pipeline, você deve ter permissão de criação de cluster ou acesso a uma política de cluster que defina um cluster Delta Live Tables. O runtime de Tabelas Dinâmicas Delta cria um cluster antes de executar o pipeline e falha se não tiver a permissão correta.

  • Todos os usuários podem disparar atualizações usando pipelines sem servidor por padrão. O Serverless deve estar habilitado no nível da conta e pode não estar disponível na região do espaço de trabalho. Consulte Ativar computação sem servidor.

  • Os exemplos neste tutorial usam o Unity Catalog. O Databricks recomenda a criação de um novo esquema para executar este tutorial, pois vários objetos de banco de dados são criados no esquema de destino.

    • Para criar um novo esquema em um catálogo, você deve ter ALL PRIVILEGES privilégios ou USE CATALOG e CREATE SCHEMA .
    • Se não for possível criar um novo esquema, execute este tutorial em relação a um esquema existente. Você deve ter os seguintes privilégios:
      • USE CATALOG para o catálogo pai.
      • ALL PRIVILEGES ou USE SCHEMA, CREATE MATERIALIZED VIEWe CREATE TABLE privilégios no esquema de destino.
    • Este tutorial usa um volume para armazenar dados de exemplo. A Databricks recomenda a criação de um novo volume para este tutorial. Se você criar um novo esquema para este tutorial, poderá criar um novo volume nesse esquema.
      • Para criar um novo volume em um esquema existente, você deve ter os seguintes privilégios:
        • USE CATALOG para o catálogo pai.
        • ALL PRIVILEGES ou USE SCHEMA e CREATE VOLUME privilégios no esquema de destino.
      • Opcionalmente, você pode usar um volume existente. Você deve ter os seguintes privilégios:
        • USE CATALOG para o catálogo pai.
        • USE SCHEMA para o esquema pai.
        • ALL PRIVILEGES ou READ VOLUME e WRITE VOLUME no volume-alvo.

    Para definir essas permissões, entre em contato com o administrador do Databricks. Para obter mais informações sobre os privilégios do Catálogo Unity, consulte Privilégios do Catálogo Unity e objetos protegíveis.

Passo 0: Transferir dados

Este exemplo carrega dados de um volume do Catálogo Unity. O código a seguir baixa um arquivo CSV e o armazena no volume especificado. Abra um novo bloco de anotações e execute o seguinte código para baixar esses dados para o volume especificado:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes de catálogo, esquema e volume de um volume do Catálogo Unity. O código fornecido tenta criar o esquema e o volume especificados se esses objetos não existirem. Você deve ter os privilégios apropriados para criar e gravar em objetos no Unity Catalog. Consulte Requisitos.

Nota

Certifique-se de que este bloco de notas foi executado com êxito antes de continuar com o tutorial. Não configure este bloco de anotações como parte do seu pipeline.

Etapa 1: Criar um pipeline

O Delta Live Tables cria pipelines resolvendo dependências definidas em blocos de anotações ou arquivos (chamados de código-fonte) usando a sintaxe Delta Live Tables. Cada arquivo de código-fonte pode conter apenas um idioma, mas você pode adicionar vários blocos de anotações ou arquivos específicos do idioma no pipeline.

Importante

Não configure nenhum ativo no campo Código-fonte . Deixar esse campo preto cria e configura um bloco de anotações para a criação do código-fonte.

As instruções neste tutorial usam computação sem servidor e Unity Catalog. Use as configurações padrão para todas as opções de configuração não mencionadas nestas instruções.

Nota

Se serverless não estiver habilitado ou suportado em seu espaço de trabalho, você poderá concluir o tutorial conforme escrito usando as configurações de computação padrão. Você deve selecionar manualmente Unity Catalog em Opções de armazenamento na seção Destino da interface do usuário Criar pipeline .

Para configurar um novo pipeline, faça o seguinte:

  1. Clique em Delta Live Tables na barra lateral.
  2. Clique em Criar pipeline.
  3. Forneça um nome de pipeline exclusivo.
  4. Marque a caixa ao lado de Serverless.
  5. Selecione um Catálogo para publicar dados.
  6. Selecione um esquema no catálogo.
    • Especifique um novo nome de esquema para criar um esquema.
  7. Defina três parâmetros de pipeline usando o botão Adicionar configuração em Avançado para adicionar três configurações. Especifique o catálogo, o esquema e o volume para os quais você baixou dados usando os seguintes nomes de parâmetro:
    • my_catalog
    • my_schema
    • my_volume
  8. Clique em Criar.

A interface do usuário dos pipelines aparece para o pipeline recém-criado. Um bloco de anotações de código-fonte é criado e configurado automaticamente para o pipeline.

O bloco de anotações é criado em um novo diretório no diretório do usuário. O nome do novo diretório e arquivo correspondem ao nome do seu pipeline. Por exemplo, /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Um link para acessar este bloco de anotações está no campo Código-fonte no painel Detalhes do pipeline. Clique no link para abrir o bloco de anotações antes de prosseguir para a próxima etapa.

Etapa 2: Declarar visualizações materializadas e tabelas de streaming em um bloco de anotações com Python ou SQL

Você pode usar notebooks Datbricks para desenvolver e validar interativamente o código-fonte para pipelines Delta Live Tables. Você deve anexar seu bloco de anotações ao pipeline para usar essa funcionalidade. Para anexar seu bloco de anotações recém-criado ao pipeline que você acabou de criar:

  1. Clique em Conectar no canto superior direito para abrir o menu de configuração de computação.
  2. Passe o cursor sobre o nome do pipeline criado na Etapa 1.
  3. Clique em Ligar.

A interface do usuário muda para incluir os botões Validar e Iniciar no canto superior direito. Para saber mais sobre o suporte de notebook para desenvolvimento de código de pipeline, consulte Desenvolver e depurar pipelines Delta Live Tables em blocos de anotações.

Importante

  • Os pipelines Delta Live Tables avaliam todas as células de um bloco de anotações durante o planejamento. Ao contrário dos blocos de anotações que são executados em computação multiuso ou agendados como trabalhos, os pipelines não garantem que as células sejam executadas na ordem especificada.
  • Os blocos de notas só podem conter uma única linguagem de programação. Não misture Python e código SQL em blocos de anotações de código-fonte de pipeline.

Para obter detalhes sobre como desenvolver código com Python ou SQL, consulte Desenvolver código de pipeline com Python ou Desenvolver código de pipeline com SQL.

Exemplo de código de pipeline

Para implementar o exemplo neste tutorial, copie e cole o código a seguir em uma célula do bloco de anotações configurada como código-fonte para seu pipeline.

O código fornecido faz o seguinte:

  • Importa módulos necessários (somente Python).
  • Referencia parâmetros definidos durante a configuração do pipeline.
  • Define uma tabela de streaming chamada baby_names_raw que ingere a partir de um volume.
  • Define uma exibição materializada chamada baby_names_prepared que valida os dados ingeridos.
  • Define uma exibição materializada nomeada top_baby_names_2021 que tem uma exibição altamente refinada dos dados.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Etapa 3: Iniciar uma atualização de pipeline

Para iniciar uma atualização de pipeline, clique no botão Iniciar no canto superior direito da interface do usuário do bloco de anotações.

Exemplos de blocos de notas

Os blocos de anotações a seguir contêm os mesmos exemplos de código fornecidos neste artigo. Esses blocos de anotações têm os mesmos requisitos das etapas deste artigo. Consulte Requisitos.

Para importar um bloco de anotações, conclua as seguintes etapas:

  1. Abra a interface do usuário do bloco de anotações.
    • Clique em + Novo>Bloco de Notas.
    • Um bloco de anotações vazio é aberto.
  2. Clique em File (Ficheiro)>Import (Importar). A caixa de diálogo Importar é exibida.
  3. Selecione a opção URL para Importar de.
  4. Cole o URL do bloco de notas.
  5. Clique em Importar.

Este tutorial requer que você execute um bloco de anotações de configuração de dados antes de configurar e executar seu pipeline Delta Live Tables. Importe o bloco de anotações a seguir, anexe-o a um recurso de computação, preencha a variável necessária para my_catalog, my_schemae my_volume, e clique em Executar tudo.

Tutorial de download de dados para pipelines

Obter o bloco de notas

Os blocos de anotações a seguir fornecem exemplos em Python ou SQL. Quando você importa um bloco de anotações, ele é salvo no diretório inicial do usuário.

Depois de importar um dos blocos de anotações abaixo, conclua as etapas para criar um pipeline, mas use o seletor de arquivos de código-fonte para selecionar o bloco de anotações baixado. Depois de criar o pipeline com um bloco de anotações configurado como código-fonte, clique em Iniciar na interface do usuário do pipeline para disparar uma atualização.

Introdução ao bloco de anotações Python Delta Live Tables

Obter o bloco de notas

Introdução ao bloco de anotações SQL Delta Live Tables

Obter o bloco de notas